projects
/
spider.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
fixed lack of stream?
[spider.git]
/
perl
/
DXMsg.pm
diff --git
a/perl/DXMsg.pm
b/perl/DXMsg.pm
index e3ea16ad4d7fb672ddce392ad6b2f3d5a40b8b07..92c113304726f82aaa8efaf75e4841b328e9ac2d 100644
(file)
--- a/
perl/DXMsg.pm
+++ b/
perl/DXMsg.pm
@@
-86,6
+86,7
@@
$importfn = "$msgdir/import"; # import directory
keep => '0,Keep this?,yesno',
lastt => '5,Last processed,cldatetime',
waitt => '5,Wait until,cldatetime',
keep => '0,Keep this?,yesno',
lastt => '5,Last processed,cldatetime',
waitt => '5,Wait until,cldatetime',
+ delete => '5,Awaiting Delete,yesno',
);
# allocate a new object
);
# allocate a new object
@@
-115,19
+116,6
@@
sub alloc
return $self;
}
return $self;
}
-sub workclean
-{
- my $ref = shift;
- delete $ref->{lines};
- delete $ref->{linesreq};
- delete $ref->{tonode};
- delete $ref->{fromnode};
- delete $ref->{stream};
- delete $ref->{file};
- delete $ref->{count};
- delete $ref->{lastt} if exists $ref->{lastt};
- delete $ref->{waitt} if exists $ref->{waitt};
-}
sub process
{
sub process
{
@@
-155,7
+143,7
@@
sub process
my @f = split /\^/, $line;
my ($pcno) = $f[0] =~ /^PC(\d\d)/; # just get the number
my ($tonode, $fromnode) = @f[1, 2];
my @f = split /\^/, $line;
my ($pcno) = $f[0] =~ /^PC(\d\d)/; # just get the number
my ($tonode, $fromnode) = @f[1, 2];
- my $stream = $f[3] if
$pcno > 29 && $pcno <= 33
;
+ my $stream = $f[3] if
($pcno >= 29 && $pcno <= 33) || $pcno == 42
;
SWITCH: {
if ($pcno == 28) { # incoming message
SWITCH: {
if ($pcno == 28) { # incoming message
@@
-168,8
+156,8
@@
sub process
# and cancel them this should both resolve timed out incoming messages
# and crossing of message between nodes, incoming messages have priority
# and cancel them this should both resolve timed out incoming messages
# and crossing of message between nodes, incoming messages have priority
- if (exists $busy{$fromnode}) {
- my $ref = $busy{$fromnode};
+ my $ref = get_busy($fromnode);
+ if ($ref) {
my $otonode = $ref->{tonode} || "unknown";
dbg("Busy, stopping msgno: $ref->{msgno} $fromnode->$otonode") if isdbg('msg');
$ref->stop_msg($fromnode);
my $otonode = $ref->{tonode} || "unknown";
dbg("Busy, stopping msgno: $ref->{msgno} $fromnode->$otonode") if isdbg('msg');
$ref->stop_msg($fromnode);
@@
-177,7
+165,7
@@
sub process
my $t = cltounix($f[5], $f[6]);
$stream = next_transno($fromnode);
my $t = cltounix($f[5], $f[6]);
$stream = next_transno($fromnode);
-
my
$ref = DXMsg->alloc($stream, uc $f[3], $f[4], $t, $f[7], $f[8], $origin, '0', $f[11]);
+ $ref = DXMsg->alloc($stream, uc $f[3], $f[4], $t, $f[7], $f[8], $origin, '0', $f[11]);
# fill in various forwarding state variables
$ref->{fromnode} = $fromnode;
# fill in various forwarding state variables
$ref->{fromnode} = $fromnode;
@@
-188,8
+176,8
@@
sub process
$ref->{count} = 0; # no of lines between PC31s
dbg("new message from $f[4] to $f[3] '$f[8]' stream $fromnode/$stream\n") if isdbg('msg');
Log('msg', "Incoming message $f[4] to $f[3] '$f[8]' origin: $origin" );
$ref->{count} = 0; # no of lines between PC31s
dbg("new message from $f[4] to $f[3] '$f[8]' stream $fromnode/$stream\n") if isdbg('msg');
Log('msg', "Incoming message $f[4] to $f[3] '$f[8]' origin: $origin" );
-
$work{"$fromnode,$stream"} = $ref
; # store in work
-
$busy{$fromnode} = $ref
; # set interlock
+
set_fwq($fromnode, $stream, $ref)
; # store in work
+
set_busy($fromnode, $ref)
; # set interlock
$self->send(DXProt::pc30($fromnode, $tonode, $stream)); # send ack
$ref->{lastt} = $main::systime;
$self->send(DXProt::pc30($fromnode, $tonode, $stream)); # send ack
$ref->{lastt} = $main::systime;
@@
-204,7
+192,7
@@
sub process
}
if ($pcno == 29) { # incoming text
}
if ($pcno == 29) { # incoming text
- my $ref =
$work{"$fromnode,$stream"}
;
+ my $ref =
get_fwq($fromnode, $stream)
;
if ($ref) {
$f[4] =~ s/\%5E/^/g;
push @{$ref->{lines}}, $f[4];
if ($ref) {
$f[4] =~ s/\%5E/^/g;
push @{$ref->{lines}}, $f[4];
@@
-223,16
+211,16
@@
sub process
}
if ($pcno == 30) { # this is a incoming subject ack
}
if ($pcno == 30) { # this is a incoming subject ack
- my $ref =
$work{"$fromnode,"};
# note no stream at this stage
+ my $ref =
get_fwq($fromnode);
# note no stream at this stage
if ($ref) {
if ($ref) {
- del
ete $work{"$fromnode,"}
;
+ del
_fwq($fromnode)
;
$ref->{stream} = $stream;
$ref->{count} = 0;
$ref->{linesreq} = 5;
$ref->{stream} = $stream;
$ref->{count} = 0;
$ref->{linesreq} = 5;
- $work{"$fromnode,$stream"} = $ref; # new ref
+ set_fwq($fromnode, $stream, $ref); # new ref
+ set_busy($fromnode, $ref); # interlock
dbg("incoming subject ack stream $stream\n") if isdbg('msg');
dbg("incoming subject ack stream $stream\n") if isdbg('msg');
- $busy{$fromnode} = $ref; # interlock
- push @{$ref->{lines}}, ($ref->read_msg_body);
+ $ref->{lines} = [ $ref->read_msg_body ];
$ref->send_tranche($self);
$ref->{lastt} = $main::systime;
} else {
$ref->send_tranche($self);
$ref->{lastt} = $main::systime;
} else {
@@
-243,7
+231,7
@@
sub process
}
if ($pcno == 31) { # acknowledge a tranche of lines
}
if ($pcno == 31) { # acknowledge a tranche of lines
- my $ref =
$work{"$fromnode,$stream"}
;
+ my $ref =
get_fwq($fromnode, $stream)
;
if ($ref) {
dbg("tranche ack stream $stream\n") if isdbg('msg');
$ref->send_tranche($self);
if ($ref) {
dbg("tranche ack stream $stream\n") if isdbg('msg');
$ref->send_tranche($self);
@@
-257,7
+245,7
@@
sub process
if ($pcno == 32) { # incoming EOM
dbg("stream $stream: EOM received\n") if isdbg('msg');
if ($pcno == 32) { # incoming EOM
dbg("stream $stream: EOM received\n") if isdbg('msg');
- my $ref =
$work{"$fromnode,$stream"}
;
+ my $ref =
get_fwq($fromnode, $stream)
;
if ($ref) {
$self->send(DXProt::pc33($fromnode, $tonode, $stream)); # acknowledge it
if ($ref) {
$self->send(DXProt::pc33($fromnode, $tonode, $stream)); # acknowledge it
@@
-330,11
+318,11
@@
sub process
}
if ($pcno == 33) { # acknowledge the end of message
}
if ($pcno == 33) { # acknowledge the end of message
- my $ref =
$work{"$fromnode,$stream"}
;
+ my $ref =
get_fwq($fromnode, $stream)
;
if ($ref) {
if ($ref->{private}) { # remove it if it private and gone off site#
Log('msg', "Message $ref->{msgno} from $ref->{from} sent to $fromnode and deleted");
if ($ref) {
if ($ref->{private}) { # remove it if it private and gone off site#
Log('msg', "Message $ref->{msgno} from $ref->{from} sent to $fromnode and deleted");
- $ref->
del_msg
;
+ $ref->
{delete}++
;
} else {
Log('msg', "Message $ref->{msgno} from $ref->{from} sent to $fromnode");
push @{$ref->{gotit}}, $fromnode; # mark this up as being received
} else {
Log('msg', "Message $ref->{msgno} from $ref->{from} sent to $fromnode");
push @{$ref->{gotit}}, $fromnode; # mark this up as being received
@@
-381,7
+369,7
@@
sub process
$ref->{count} = 0; # no of lines between PC31s
$ref->{file} = 1;
$ref->{lastt} = $main::systime;
$ref->{count} = 0; # no of lines between PC31s
$ref->{file} = 1;
$ref->{lastt} = $main::systime;
-
$work{"$fromnode,$stream"} = $ref
; # store in work
+
set_fwq($fromnode, $stream, $ref)
; # store in work
$self->send(DXProt::pc30($fromnode, $tonode, $stream)); # send ack
last SWITCH;
$self->send(DXProt::pc30($fromnode, $tonode, $stream)); # send ack
last SWITCH;
@@
-389,7
+377,7
@@
sub process
if ($pcno == 42) { # abort transfer
dbg("stream $stream: abort received\n") if isdbg('msg');
if ($pcno == 42) { # abort transfer
dbg("stream $stream: abort received\n") if isdbg('msg');
- my $ref =
$work{"$fromnode,$stream"}
;
+ my $ref =
get_fwq($fromnode, $stream)
;
if ($ref) {
$ref->stop_msg($fromnode);
$ref = undef;
if ($ref) {
$ref->stop_msg($fromnode);
$ref = undef;
@@
-400,7
+388,7
@@
sub process
if ($pcno == 49) { # global delete on subject
for (@msg) {
if ($_->{from} eq $f[1] && $_->{subject} eq $f[2]) {
if ($pcno == 49) { # global delete on subject
for (@msg) {
if ($_->{from} eq $f[1] && $_->{subject} eq $f[2]) {
- $_->
del_msg()
;
+ $_->
{delete}++
;
Log('msg', "Message $_->{msgno} from $_->{from} ($_->{subject}) fully deleted");
DXChannel::broadcast_nodes($line, $self);
}
Log('msg', "Message $_->{msgno} from $_->{from} ($_->{subject}) fully deleted");
DXChannel::broadcast_nodes($line, $self);
}
@@
-466,15
+454,24
@@
sub store
sub del_msg
{
my $self = shift;
sub del_msg
{
my $self = shift;
+ my $dxchan = shift;
- # remove it from the active message list
- dbg("\@msg = " . scalar @msg . " before delete") if isdbg('msg');
- @msg = grep { $_ != $self } @msg;
-
- # remove the file
- unlink filename($self->{msgno});
- dbg("deleting $self->{msgno}\n") if isdbg('msg');
- dbg("\@msg = " . scalar @msg . " after delete") if isdbg('msg');
+ if ($self->{tonode}) {
+ $self->{delete}++;
+ } else {
+ my $call;
+ if ($dxchan) {
+ $call = " by " . $dxchan->call;
+ } else {
+ $call = '';
+ }
+
+ # remove it from the active message list
+ @msg = grep { $_ != $self } @msg;
+
+ # remove the file
+ unlink filename($self->{msgno});
+ }
}
# clean out old messages from the message queue
}
# clean out old messages from the message queue
@@
-565,7
+562,7
@@
sub read_msg_body
if (!open($file, $fn)) {
dbg("Error reading $fn $!");
Log('err' ,"Error reading $fn $!");
if (!open($file, $fn)) {
dbg("Error reading $fn $!");
Log('err' ,"Error reading $fn $!");
- return
undef
;
+ return
()
;
}
@out = map {chomp; $_} <$file>;
close($file);
}
@out = map {chomp; $_} <$file>;
close($file);
@@
-629,6
+626,15
@@
sub queue_msg
next;
}
next;
}
+ # is it being sent anywhere currently?
+ next if $ref->{tonode}; # ignore it if it already being processed
+
+ # is it awaiting deletion?
+ if ($ref->{delete}) {
+ $ref->del_msg;
+ next;
+ }
+
# firstly, is it private and unread? if so can I find the recipient
# in my cluster node list offsite?
# firstly, is it private and unread? if so can I find the recipient
# in my cluster node list offsite?
@@
-636,7
+642,6
@@
sub queue_msg
my $dxchan;
if ($ref->{private}) {
next if $ref->{'read'}; # if it is read, it is stuck here
my $dxchan;
if ($ref->{private}) {
next if $ref->{'read'}; # if it is read, it is stuck here
- next if $ref->{tonode}; # ignore it if it already being processed
$clref = Route::get($ref->{to});
if ($clref) {
$dxchan = $clref->dxchan;
$clref = Route::get($ref->{to});
if ($clref) {
$dxchan = $clref->dxchan;
@@
-703,8
+708,8
@@
sub start_msg
$self->{count} = 0;
$self->{tonode} = $dxchan->call;
$self->{fromnode} = $main::mycall;
$self->{count} = 0;
$self->{tonode} = $dxchan->call;
$self->{fromnode} = $main::mycall;
-
$busy{$self->{tonode}} = $self
;
-
$work{"$self->{tonode},"} = $self
;
+
set_busy($self->{tonode}, $self)
;
+
set_fwq($self->{tonode}, undef, $self)
;
$self->{lastt} = $main::systime;
my ($fromnode, $origin);
$fromnode = $self->{fromnode};
$self->{lastt} = $main::systime;
my ($fromnode, $origin);
$fromnode = $self->{fromnode};
@@
-719,7
+724,19
@@
sub get_busy
return $busy{$call};
}
return $busy{$call};
}
-# get the busy queue
+sub set_busy
+{
+ my $call = shift;
+ return $busy{$call} = shift;
+}
+
+sub del_busy
+{
+ my $call = shift;
+ return delete $busy{$call};
+}
+
+# get the whole busy queue
sub get_all_busy
{
return keys %busy;
sub get_all_busy
{
return keys %busy;
@@
-760,14
+777,27
@@
sub stop_msg
{
my $self = shift;
my $node = shift;
{
my $self = shift;
my $node = shift;
- my $stream = $self->{stream}
if exists $self->{stream}
;
+ my $stream = $self->{stream};
dbg("stop msg $self->{msgno} -> node $node\n") if isdbg('msg');
dbg("stop msg $self->{msgno} -> node $node\n") if isdbg('msg');
- delete $work{"$node,"};
- delete $work{"$node,$stream"} if $stream;
+ del_fwq($node, $stream);
$self->workclean;
$self->workclean;
- delete $busy{$node};
+ del_busy($node);
+}
+
+sub workclean
+{
+ my $ref = shift;
+ delete $ref->{lines};
+ delete $ref->{linesreq};
+ delete $ref->{tonode};
+ delete $ref->{fromnode};
+ delete $ref->{stream};
+ delete $ref->{file};
+ delete $ref->{count};
+ delete $ref->{lastt} if exists $ref->{lastt};
+ delete $ref->{waitt} if exists $ref->{waitt};
}
# get a new transaction number from the file specified
}
# get a new transaction number from the file specified
@@
-973,8
+1003,10
@@
sub do_send_stuff
sub dir
{
my $ref = shift;
sub dir
{
my $ref = shift;
+ my $flag = $ref->read ? '-' : ' ';
+ $flag = 'D' if $ref->delete;
return sprintf "%6d%s%s%5d %8.8s %8.8s %-6.6s %5.5s %-30.30s",
return sprintf "%6d%s%s%5d %8.8s %8.8s %-6.6s %5.5s %-30.30s",
- $ref->msgno, $
ref->read ? '-' : ' '
, $ref->private ? 'p' : ' ', $ref->size,
+ $ref->msgno, $
flag
, $ref->private ? 'p' : ' ', $ref->size,
$ref->to, $ref->from, cldate($ref->t), ztime($ref->t), $ref->subject;
}
$ref->to, $ref->from, cldate($ref->t), ztime($ref->t), $ref->subject;
}
@@
-1155,6
+1187,7
@@
sub import_msgs
my @out = import_one($main::me, \@msg, $splitit);
Log('msg', @out);
}
my @out = import_one($main::me, \@msg, $splitit);
Log('msg', @out);
}
+ queue_msg(0);
}
# import one message as a list in bbs (as extended) mode
}
# import one message as a list in bbs (as extended) mode
@@
-1174,7
+1207,9
@@
sub import_one
# first line;
my $line = shift @$ref;
# first line;
my $line = shift @$ref;
- my @f = split /\s+/, $line;
+ my @f = split /\b/, $line;
+ @f = map {s/\s+//g; length $_ ? $_ : ()} @f;
+
unless (@f && $f[0] =~ /^(:?S|SP|SB|SEND)$/ ) {
my $m = "invalid first line in import '$line'";
dbg($m) if isdbg('msg');
unless (@f && $f[0] =~ /^(:?S|SP|SB|SEND)$/ ) {
my $m = "invalid first line in import '$line'";
dbg($m) if isdbg('msg');
@@
-1191,16
+1226,16
@@
sub import_one
;
} elsif ($notincalls && ($f eq 'RR')) {
$rr = '1';
;
} elsif ($notincalls && ($f eq 'RR')) {
$rr = '1';
- } elsif (
$f eq '@' && @f) { # this is bbs syntax, for origin
-
$origin = uc
shift @f;
+ } elsif (
($f =~ /^[\@\.\#\$]$/ || $f eq '.#') && @f) { # this is bbs syntax, for AT
+ shift @f;
} elsif ($f eq '<' && @f) { # this is bbs syntax for from call
$from = uc shift @f;
} elsif ($f =~ /^\$/) { # this is bbs syntax for a bid
next;
} elsif ($f eq '<' && @f) { # this is bbs syntax for from call
$from = uc shift @f;
} elsif ($f =~ /^\$/) { # this is bbs syntax for a bid
next;
- } elsif ($f =~ /^<
\S+
/) { # this is bbs syntax for from call
-
($from) = $f =~ /^<(\S+)$/
;
- } elsif ($f =~ /^\
@\S+/) { # this is bbs syntax for origin
-
($origin) = $f =~ /^\@(\S+)$/
;
+ } elsif ($f =~ /^<
(\S+)
/) { # this is bbs syntax for from call
+
$from = $1
;
+ } elsif ($f =~ /^\
$\S+/) { # this is bbs syntax for bid
+ ;
} else {
# callsign ?
} else {
# callsign ?