]> scm.dxcluster.org Git - spider.git/blob - perl/DXMsg.pm
made the communications between clients and server completely ascii (no
[spider.git] / perl / DXMsg.pm
1 #!/usr/bin/perl
2 #
3 # This module impliments the message handling for a dx cluster
4 #
5 # Copyright (c) 1998 Dirk Koopman G1TLH
6 #
7 # $Id$
8 #
9 #
10 # Notes for implementors:-
11 #
12 # PC28 field 11 is the RR required flag
13 # PC28 field 12 is a VIA routing (ie it is a node call) 
14 #
15
16 package DXMsg;
17
18 @ISA = qw(DXProt DXChannel);
19
20 use DXUtil;
21 use DXChannel;
22 use DXUser;
23 use DXM;
24 use DXCluster;
25 use DXProtVars;
26 use DXProtout;
27 use DXDebug;
28 use DXLog;
29 use IO::File;
30 use Fcntl;
31 use Carp;
32
33 use strict;
34 use vars qw(%work @msg $msgdir %valid %busy $maxage $last_clean
35                         @badmsg @swop $swopfn $badmsgfn $forwardfn @forward $timeout $waittime
36                     $queueinterval $lastq $importfn $minchunk $maxchunk);
37
38 %work = ();                                             # outstanding jobs
39 @msg = ();                                              # messages we have
40 %busy = ();                                             # station interlocks
41 $msgdir = "$main::root/msg";    # directory contain the msgs
42 $maxage = 30 * 86400;                   # the maximum age that a message shall live for if not marked 
43 $last_clean = 0;                                # last time we did a clean
44 @forward = ();                  # msg forward table
45 @badmsg = ();                                   # bad message table
46 @swop = ();                                             # swop table
47 $timeout = 30*60;               # forwarding timeout
48 $waittime = 30*60;              # time an aborted outgoing message waits before trying again
49 $queueinterval = 1*60;          # run the queue every 1 minute
50 $lastq = 0;
51
52 $minchunk = 4800;               # minimum chunk size for a split message
53 $maxchunk = 6000;               # maximum chunk size
54
55 $badmsgfn = "$msgdir/badmsg.pl";    # list of TO address we wont store
56 $forwardfn = "$msgdir/forward.pl";  # the forwarding table
57 $swopfn = "$msgdir/swop.pl";        # the swopping table
58 $importfn = "$msgdir/import";       # import directory
59
60
61 %valid = (
62                   fromnode => '5,From Node',
63                   tonode => '5,To Node',
64                   to => '0,To',
65                   from => '0,From',
66                   t => '0,Msg Time,cldatetime',
67                   private => '5,Private',
68                   subject => '0,Subject',
69                   linesreq => '0,Lines per Gob',
70                   rrreq => '5,Read Confirm',
71                   origin => '0,Origin',
72                   lines => '5,Data',
73                   stream => '9,Stream No',
74                   count => '5,Gob Linecnt',
75                   file => '5,File?,yesno',
76                   gotit => '5,Got it Nodes,parray',
77                   lines => '5,Lines,parray',
78                   'read' => '5,Times read',
79                   size => '0,Size',
80                   msgno => '0,Msgno',
81                   keep => '0,Keep this?,yesno',
82                   lastt => '5,Last processed,cldatetime',
83                   waitt => '5,Wait until,cldatetime',
84                  );
85
86 sub DESTROY
87 {
88         my $self = shift;
89         undef $self->{lines};
90         undef $self->{gotit};
91 }
92
93 # allocate a new object
94 # called fromnode, tonode, from, to, datetime, private?, subject, nolinesper  
95 sub alloc                  
96 {
97         my $pkg = shift;
98         my $self = bless {}, $pkg;
99         $self->{msgno} = shift;
100         my $to = shift;
101         #  $to =~ s/-\d+$//o;
102         $self->{to} = ($to eq $main::mycall) ? $main::myalias : $to;
103         my $from = shift;
104         $from =~ s/-\d+$//o;
105         $self->{from} = uc $from;
106         $self->{t} = shift;
107         $self->{private} = shift;
108         $self->{subject} = shift;
109         $self->{origin} = shift;
110         $self->{'read'} = shift;
111         $self->{rrreq} = shift;
112         $self->{gotit} = [];
113         $self->{lastt} = $main::systime;
114     
115         return $self;
116 }
117
118 sub workclean
119 {
120         my $ref = shift;
121         delete $ref->{lines};
122         delete $ref->{linesreq};
123         delete $ref->{tonode};
124         delete $ref->{fromnode};
125         delete $ref->{stream};
126         delete $ref->{lines};
127         delete $ref->{file};
128         delete $ref->{count};
129         delete $ref->{lastt} if exists $ref->{lastt};
130         delete $ref->{waitt} if exists $ref->{waitt};
131 }
132
133 sub process
134 {
135         my ($self, $line) = @_;
136
137         # this is periodic processing
138         if (!$self || !$line) {
139
140                 if ($main::systime > $lastq + $queueinterval) {
141
142                         # wander down the work queue stopping any messages that have timed out
143                         for (keys %busy) {
144                                 my $node = $_;
145                                 my $ref = $busy{$_};
146                                 if (exists $ref->{lastt} && $main::systime >= $ref->{lastt} + $timeout) {
147                                         dbg('msg', "Timeout, stopping msgno: $ref->{msgno} -> $node");
148                                         $ref->stop_msg($node);
149                                         
150                                         # delay any outgoing messages that fail
151                                         $ref->{waitt} = $main::systime + $waittime + rand(120) if $node ne $main::mycall;
152                                 }
153                         }
154
155                         # queue some message if the interval timer has gone off
156                         queue_msg(0);
157
158                         # import any messages in the import directory
159                         import_msgs();
160                         
161                         $lastq = $main::systime;
162                 }
163
164                 # clean the message queue
165                 clean_old() if $main::systime - $last_clean > 3600 ;
166                 return;
167         }
168
169         my @f = split /\^/, $line;
170         my ($pcno) = $f[0] =~ /^PC(\d\d)/; # just get the number
171
172  SWITCH: {
173                 if ($pcno == 28) {              # incoming message
174
175                         # first look for any messages in the busy queue 
176                         # and cancel them this should both resolve timed out incoming messages
177                         # and crossing of message between nodes, incoming messages have priority
178                         if (exists $busy{$f[2]}) {
179                                 my $ref = $busy{$f[2]};
180                                 my $tonode = $ref->{tonode};
181                                 dbg('msg', "Busy, stopping msgno: $ref->{msgno} -> $f[2]");
182                                 $ref->stop_msg($self->call);
183                         }
184
185                         my $t = cltounix($f[5], $f[6]);
186                         my $stream = next_transno($f[2]);
187                         my $ref = DXMsg->alloc($stream, uc $f[3], $f[4], $t, $f[7], $f[8], $f[13], '0', $f[11]);
188                         
189                         # fill in various forwarding state variables
190                         $ref->{fromnode} = $f[2];
191                         $ref->{tonode} = $f[1];
192                         $ref->{rrreq} = $f[11];
193                         $ref->{linesreq} = $f[10];
194                         $ref->{stream} = $stream;
195                         $ref->{count} = 0;      # no of lines between PC31s
196                         dbg('msg', "new message from $f[4] to $f[3] '$f[8]' stream $stream\n");
197                         $work{"$f[2]$stream"} = $ref; # store in work
198                         $busy{$f[2]} = $ref; # set interlock
199                         $self->send(DXProt::pc30($f[2], $f[1], $stream)); # send ack
200                         $ref->{lastt} = $main::systime;
201
202                         # look to see whether this is a non private message sent to a known callsign
203                         my $uref = DXUser->get_current($ref->{to});
204                         if (iscallsign($ref->{to}) && !$ref->{private} && $uref && $uref->homenode) {
205                                 $ref->{private} = 1;
206                                 dbg('msg', "set bull to $ref->{to} to private");
207                         }
208                         last SWITCH;
209                 }
210                 
211                 if ($pcno == 29) {              # incoming text
212                         my $ref = $work{"$f[2]$f[3]"};
213                         if ($ref) {
214                                 push @{$ref->{lines}}, $f[4];
215                                 $ref->{count}++;
216                                 if ($ref->{count} >= $ref->{linesreq}) {
217                                         $self->send(DXProt::pc31($f[2], $f[1], $f[3]));
218                                         dbg('msg', "stream $f[3]: $ref->{count} lines received\n");
219                                         $ref->{count} = 0;
220                                 }
221                                 $ref->{lastt} = $main::systime;
222                         } else {
223                                 dbg('msg', "PC29 from unknown stream $f[3] from $f[2]" );
224                                 $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream
225                         }
226                         last SWITCH;
227                 }
228                 
229                 if ($pcno == 30) {              # this is a incoming subject ack
230                         my $ref = $work{$f[2]}; # note no stream at this stage
231                         if ($ref) {
232                                 delete $work{$f[2]};
233                                 $ref->{stream} = $f[3];
234                                 $ref->{count} = 0;
235                                 $ref->{linesreq} = 5;
236                                 $work{"$f[2]$f[3]"} = $ref;     # new ref
237                                 dbg('msg', "incoming subject ack stream $f[3]\n");
238                                 $busy{$f[2]} = $ref; # interlock
239                                 $ref->{lines} = [];
240                                 push @{$ref->{lines}}, ($ref->read_msg_body);
241                                 $ref->send_tranche($self);
242                                 $ref->{lastt} = $main::systime;
243                         } else {
244                                 dbg('msg', "PC30 from unknown stream $f[3] from $f[2]" );
245                                 $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream
246                         } 
247                         last SWITCH;
248                 }
249                 
250                 if ($pcno == 31) {              # acknowledge a tranche of lines
251                         my $ref = $work{"$f[2]$f[3]"};
252                         if ($ref) {
253                                 dbg('msg', "tranche ack stream $f[3]\n");
254                                 $ref->send_tranche($self);
255                                 $ref->{lastt} = $main::systime;
256                         } else {
257                                 dbg('msg', "PC31 from unknown stream $f[3] from $f[2]" );
258                                 $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream
259                         } 
260                         last SWITCH;
261                 }
262                 
263                 if ($pcno == 32) {              # incoming EOM
264                         dbg('msg', "stream $f[3]: EOM received\n");
265                         my $ref = $work{"$f[2]$f[3]"};
266                         if ($ref) {
267                                 $self->send(DXProt::pc33($f[2], $f[1], $f[3])); # acknowledge it
268                                 
269                                 # get the next msg no - note that this has NOTHING to do with the stream number in PC protocol
270                                 # store the file or message
271                                 # remove extraneous rubbish from the hash
272                                 # remove it from the work in progress vector
273                                 # stuff it on the msg queue
274                                 if ($ref->{lines}) {
275                                         if ($ref->{file}) {
276                                                 $ref->store($ref->{lines});
277                                         } else {
278
279                                                 # does an identical message already exist?
280                                                 my $m;
281                                                 for $m (@msg) {
282                                                         if ($ref->{subject} eq $m->{subject} && $ref->{t} == $m->{t} && $ref->{from} eq $m->{from} && $ref->{to} eq $m->{to}) {
283                                                                 $ref->stop_msg($self->call);
284                                                                 my $msgno = $m->{msgno};
285                                                                 dbg('msg', "duplicate message to $msgno\n");
286                                                                 Log('msg', "duplicate message to $msgno");
287                                                                 return;
288                                                         }
289                                                 }
290
291                                                 # swop addresses
292                                                 $ref->swop_it($self->call);
293                                                 
294                                                 # look for 'bad' to addresses 
295 #                                               if (grep $ref->{to} eq $_, @badmsg) {
296                                                 if ($ref->dump_it($self->call)) {
297                                                         $ref->stop_msg($self->call);
298                                                         dbg('msg', "'Bad' message $ref->{to}");
299                                                         Log('msg', "'Bad' message $ref->{to}");
300                                                         return;
301                                                 }
302
303                                                 $ref->{msgno} = next_transno("Msgno");
304                                                 push @{$ref->{gotit}}, $f[2]; # mark this up as being received
305                                                 $ref->store($ref->{lines});
306                                                 add_dir($ref);
307                                                 my $dxchan = DXChannel->get($ref->{to});
308                                                 $dxchan->send($dxchan->msg('m9')) if $dxchan && $dxchan->is_user;
309                                                 Log('msg', "Message $ref->{msgno} from $ref->{from} received from $f[2] for $ref->{to}");
310                                         }
311                                 }
312                                 $ref->stop_msg($self->call);
313                         } else {
314                                 dbg('msg', "PC32 from unknown stream $f[3] from $f[2]" );
315                                 $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream
316                         }
317                         # queue_msg(0);
318                         last SWITCH;
319                 }
320                 
321                 if ($pcno == 33) {              # acknowledge the end of message
322                         my $ref = $work{"$f[2]$f[3]"};
323                         if ($ref) {
324                                 if ($ref->{private}) { # remove it if it private and gone off site#
325                                         Log('msg', "Message $ref->{msgno} from $ref->{from} sent to $f[2] and deleted");
326                                         $ref->del_msg;
327                                 } else {
328                                         Log('msg', "Message $ref->{msgno} from $ref->{from} sent to $f[2]");
329                                         push @{$ref->{gotit}}, $f[2]; # mark this up as being received
330                                         $ref->store($ref->{lines});     # re- store the file
331                                 }
332                                 $ref->stop_msg($self->call);
333                         } else {
334                                 dbg('msg', "PC33 from unknown stream $f[3] from $f[2]" );
335                                 $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream
336                         } 
337
338                         # send next one if present
339                         queue_msg(0);
340                         last SWITCH;
341                 }
342                 
343                 if ($pcno == 40) {              # this is a file request
344                         $f[3] =~ s/\\/\//og; # change the slashes
345                         $f[3] =~ s/\.//og;      # remove dots
346                         $f[3] =~ s/^\///o;   # remove the leading /
347                         $f[3] = lc $f[3];       # to lower case;
348                         dbg('msg', "incoming file $f[3]\n");
349                         $f[3] = 'packclus/' . $f[3] unless $f[3] =~ /^packclus\//o;
350                         
351                         # create any directories
352                         my @part = split /\//, $f[3];
353                         my $part;
354                         my $fn = "$main::root";
355                         pop @part;                      # remove last part
356                         foreach $part (@part) {
357                                 $fn .= "/$part";
358                                 next if -e $fn;
359                                 last SWITCH if !mkdir $fn, 0777;
360                                 dbg('msg', "created directory $fn\n");
361                         }
362                         my $stream = next_transno($f[2]);
363                         my $ref = DXMsg->alloc($stream, "$main::root/$f[3]", $self->call, time, !$f[4], $f[3], ' ', '0', '0');
364                         
365                         # forwarding variables
366                         $ref->{fromnode} = $f[1];
367                         $ref->{tonode} = $f[2];
368                         $ref->{linesreq} = $f[5];
369                         $ref->{stream} = $stream;
370                         $ref->{count} = 0;      # no of lines between PC31s
371                         $ref->{file} = 1;
372                         $ref->{lastt} = $main::systime;
373                         $work{"$f[2]$stream"} = $ref; # store in work
374                         $self->send(DXProt::pc30($f[2], $f[1], $stream)); # send ack 
375                         
376                         last SWITCH;
377                 }
378                 
379                 if ($pcno == 42) {              # abort transfer
380                         dbg('msg', "stream $f[3]: abort received\n");
381                         my $ref = $work{"$f[2]$f[3]"};
382                         if ($ref) {
383                                 $ref->stop_msg($self->call);
384                                 $ref = undef;
385                         }
386                         last SWITCH;
387                 }
388
389                 if ($pcno == 49) {      # global delete on subject
390                         for (@msg) {
391                                 if ($_->{from} eq $f[1] && $_->{subject} eq $f[2]) {
392                                         $_->del_msg();
393                                         Log('msg', "Message $_->{msgno} from $_->{from} ($_->{subject}) fully deleted");
394                                         DXProt::broadcast_ak1a($line, $self);
395                                 }
396                         }
397                 }
398         }
399 }
400
401
402 # store a message away on disc or whatever
403 #
404 # NOTE the second arg is a REFERENCE not a list
405 sub store
406 {
407         my $ref = shift;
408         my $lines = shift;
409         
410         # we only proceed if there are actually any lines in the file
411 #       if (!$lines || @{$lines} == 0) {
412 #               return;
413 #       }
414         
415         if ($ref->{file}) {                     # a file
416                 dbg('msg', "To be stored in $ref->{to}\n");
417                 
418                 my $fh = new IO::File "$ref->{to}", "w";
419                 if (defined $fh) {
420                         my $line;
421                         foreach $line (@{$lines}) {
422                                 print $fh "$line\n";
423                         }
424                         $fh->close;
425                         dbg('msg', "file $ref->{to} stored\n");
426                         Log('msg', "file $ref->{to} from $ref->{from} stored" );
427                 } else {
428                         confess "can't open file $ref->{to} $!";  
429                 }
430         } else {                                        # a normal message
431
432                 # attempt to open the message file
433                 my $fn = filename($ref->{msgno});
434                 
435                 dbg('msg', "To be stored in $fn\n");
436                 
437                 # now save the file, overwriting what's there, YES I KNOW OK! (I will change it if it's a problem)
438                 my $fh = new IO::File "$fn", "w";
439                 if (defined $fh) {
440                         my $rr = $ref->{rrreq} ? '1' : '0';
441                         my $priv = $ref->{private} ? '1': '0';
442                         print $fh "=== $ref->{msgno}^$ref->{to}^$ref->{from}^$ref->{t}^$priv^$ref->{subject}^$ref->{origin}^$ref->{'read'}^$rr\n";
443                         print $fh "=== ", join('^', @{$ref->{gotit}}), "\n";
444                         my $line;
445                         $ref->{size} = 0;
446                         foreach $line (@{$lines}) {
447                                 $ref->{size} += (length $line) + 1;
448                                 print $fh "$line\n";
449                         }
450                         $fh->close;
451                         dbg('msg', "msg $ref->{msgno} stored\n");
452                         Log('msg', "msg $ref->{msgno} from $ref->{from} to $ref->{to} stored" );
453                 } else {
454                         confess "can't open msg file $fn $!";  
455                 }
456         }
457 }
458
459 # delete a message
460 sub del_msg
461 {
462         my $self = shift;
463         
464         # remove it from the active message list
465         @msg = map { $_ != $self ? $_ : () } @msg;
466         
467         # belt and braces (one day I will ask someone if this is REALLY necessary)
468         delete $self->{gotit};
469         delete $self->{list};
470         
471         # remove the file
472         unlink filename($self->{msgno});
473         dbg('msg', "deleting $self->{msgno}\n");
474 }
475
476 # clean out old messages from the message queue
477 sub clean_old
478 {
479         my $ref;
480         
481         # mark old messages for deletion
482         foreach $ref (@msg) {
483                 if (!$ref->{keep} && $ref->{t} < $main::systime - $maxage) {
484                         $ref->{deleteme} = 1;
485                         delete $ref->{gotit};
486                         delete $ref->{list};
487                         unlink filename($ref->{msgno});
488                         dbg('msg', "deleting old $ref->{msgno}\n");
489                 }
490         }
491         
492         # remove them all from the active message list
493         @msg = map { $_->{deleteme} ? () : $_ } @msg;
494         $last_clean = $main::systime;
495 }
496
497 # read in a message header
498 sub read_msg_header
499
500         my $fn = shift;
501         my $file;
502         my $line;
503         my $ref;
504         my @f;
505         my $size;
506         
507         $file = new IO::File;
508         if (!open($file, $fn)) {
509                 print "Error reading $fn $!\n";
510                 return undef;
511         }
512         $size = -s $fn;
513         $line = <$file>;                        # first line
514         chomp $line;
515         $size -= length $line;
516         if (! $line =~ /^===/o) {
517                 print "corrupt first line in $fn ($line)\n";
518                 return undef;
519         }
520         $line =~ s/^=== //o;
521         @f = split /\^/, $line;
522         $ref = DXMsg->alloc(@f);
523         
524         $line = <$file>;                        # second line
525         chomp $line;
526         $size -= length $line;
527         if (! $line =~ /^===/o) {
528                 print "corrupt second line in $fn ($line)\n";
529                 return undef;
530         }
531         $line =~ s/^=== //o;
532         $ref->{gotit} = [];
533         @f = split /\^/, $line;
534         push @{$ref->{gotit}}, @f;
535         $ref->{size} = $size;
536         
537         close($file);
538         
539         return $ref;
540 }
541
542 # read in a message header
543 sub read_msg_body
544 {
545         my $self = shift;
546         my $msgno = $self->{msgno};
547         my $file;
548         my $line;
549         my $fn = filename($msgno);
550         my @out;
551         
552         $file = new IO::File;
553         if (!open($file, $fn)) {
554                 print "Error reading $fn $!\n";
555                 return undef;
556         }
557         @out = map {chomp; $_} <$file>;
558         close($file);
559         
560         shift @out if $out[0] =~ /^=== /;
561         shift @out if $out[0] =~ /^=== /;
562         return @out;
563 }
564
565 # send a tranche of lines to the other end
566 sub send_tranche
567 {
568         my ($self, $dxchan) = @_;
569         my @out;
570         my $to = $self->{tonode};
571         my $from = $self->{fromnode};
572         my $stream = $self->{stream};
573         my $lines = $self->{lines};
574         my ($c, $i);
575         
576         for ($i = 0, $c = $self->{count}; $i < $self->{linesreq} && $c < @$lines; $i++, $c++) {
577                 push @out, DXProt::pc29($to, $from, $stream, $lines->[$c]);
578     }
579     $self->{count} = $c;
580
581     push @out, DXProt::pc32($to, $from, $stream) if $i < $self->{linesreq};
582         $dxchan->send(@out);
583 }
584
585         
586 # find a message to send out and start the ball rolling
587 sub queue_msg
588 {
589         my $sort = shift;
590         my $call = shift;
591         my $ref;
592         my $clref;
593         my @nodelist = DXChannel::get_all_ak1a();
594         
595         # bat down the message list looking for one that needs to go off site and whose
596         # nearest node is not busy.
597
598         dbg('msg', "queue msg ($sort)\n");
599         foreach $ref (@msg) {
600                 # firstly, is it private and unread? if so can I find the recipient
601                 # in my cluster node list offsite?
602
603                 # ignore 'delayed' messages until their waiting time has expired
604                 if (exists $ref->{waitt}) {
605                         next if $ref->{waitt} > $main::systime;
606                         delete $ref->{waitt};
607                 } 
608
609                 # deal with routed private messages
610                 my $noderef;
611                 if ($ref->{private}) {
612                         next if $ref->{'read'};           # if it is read, it is stuck here
613                         $clref = DXCluster->get_exact($ref->{to});
614                         unless ($clref) {             # otherwise look for a homenode
615                                 my $uref = DXUser->get($ref->{to});
616                                 my $hnode =  $uref->homenode if $uref;
617                                 $clref = DXCluster->get_exact($hnode) if $hnode;
618                         }
619                         if ($clref && !grep { $clref->{dxchan} == $_ } DXCommandmode::get_all()) {
620                                 next if $clref->call eq $main::mycall;  # i.e. it lives here
621                                 $noderef = $clref->{dxchan};
622                                 $ref->start_msg($noderef) if !get_busy($noderef->call)  && $noderef->state eq 'normal';
623                         }
624                 }
625                 
626                 # otherwise we are dealing with a bulletin or forwarded private message
627                 # compare the gotit list with
628                 # the nodelist up above, if there are sites that haven't got it yet
629                 # then start sending it - what happens when we get loops is anyone's
630                 # guess, use (to, from, time, subject) tuple?
631                 foreach $noderef (@nodelist) {
632                         next if $noderef->call eq $main::mycall;
633                         next if grep { $_ eq $noderef->call } @{$ref->{gotit}};
634                         next unless $ref->forward_it($noderef->call);           # check the forwarding file
635
636                         # if we are here we have a node that doesn't have this message
637                         $ref->start_msg($noderef) if !get_busy($noderef->call)  && $noderef->state eq 'normal';
638                         last;
639                 }
640
641                 # if all the available nodes are busy then stop
642                 last if @nodelist == scalar grep { get_busy($_->call) } @nodelist;
643         }
644 }
645
646 # is there a message for me?
647 sub for_me
648 {
649         my $call = uc shift;
650         my $ref;
651         
652         foreach $ref (@msg) {
653                 # is it for me, private and unread? 
654                 if ($ref->{to} eq $call && $ref->{private}) {
655                         return 1 if !$ref->{'read'};
656                 }
657         }
658         return 0;
659 }
660
661 # start the message off on its travels with a PC28
662 sub start_msg
663 {
664         my ($self, $dxchan) = @_;
665         
666         dbg('msg', "start msg $self->{msgno}\n");
667         $self->{linesreq} = 5;
668         $self->{count} = 0;
669         $self->{tonode} = $dxchan->call;
670         $self->{fromnode} = $main::mycall;
671         $busy{$self->{tonode}} = $self;
672         $work{$self->{tonode}} = $self;
673         $self->{lastt} = $main::systime;
674         $dxchan->send(DXProt::pc28($self->{tonode}, $self->{fromnode}, $self->{to}, $self->{from}, $self->{t}, $self->{private}, $self->{subject}, $self->{origin}, $self->{rrreq}));
675 }
676
677 # get the ref of a busy node
678 sub get_busy
679 {
680         my $call = shift;
681         return $busy{$call};
682 }
683
684 # get the busy queue
685 sub get_all_busy
686 {
687         return values %busy;
688 }
689
690 # get the forwarding queue
691 sub get_fwq
692 {
693         return values %work;
694 }
695
696 # stop a message from continuing, clean it out, unlock interlocks etc
697 sub stop_msg
698 {
699         my $self = shift;
700         my $node = shift;
701         my $stream = $self->{stream} if exists $self->{stream};
702         
703         
704         dbg('msg', "stop msg $self->{msgno} -> node $node\n");
705         delete $work{$node};
706         delete $work{"$node$stream"} if $stream;
707         $self->workclean;
708         delete $busy{$node};
709 }
710
711 # get a new transaction number from the file specified
712 sub next_transno
713 {
714         my $name = shift;
715         $name =~ s/\W//og;                      # remove non-word characters
716         my $fn = "$msgdir/$name";
717         my $msgno;
718         
719         my $fh = new IO::File;
720         if (sysopen($fh, $fn, O_RDWR|O_CREAT, 0666)) {
721                 $fh->autoflush(1);
722                 $msgno = $fh->getline;
723                 chomp $msgno;
724                 $msgno++;
725                 seek $fh, 0, 0;
726                 $fh->print("$msgno\n");
727                 dbg('msg', "msgno $msgno allocated for $name\n");
728                 $fh->close;
729         } else {
730                 confess "can't open $fn $!";
731         }
732         return $msgno;
733 }
734
735 # initialise the message 'system', read in all the message headers
736 sub init
737 {
738         my $dir = new IO::File;
739         my @dir;
740         my $ref;
741                 
742         # load various control files
743         print "load badmsg: ", (load_badmsg() or "Ok"), "\n";
744         print "load forward: ", (load_forward() or "Ok"), "\n";
745         print "load swop: ", (load_swop() or "Ok"), "\n";
746
747         # read in the directory
748         opendir($dir, $msgdir) or confess "can't open $msgdir $!";
749         @dir = readdir($dir);
750         closedir($dir);
751
752         @msg = ();
753         for (sort @dir) {
754                 next unless /^m\d+$/o;
755                 
756                 $ref = read_msg_header("$msgdir/$_");
757                 next unless $ref;
758                 
759                 # delete any messages to 'badmsg.pl' places
760                 if (grep $ref->{to} eq $_, @badmsg) {
761                         dbg('msg', "'Bad' TO address $ref->{to}");
762                         Log('msg', "'Bad' TO address $ref->{to}");
763                         $ref->del_msg;
764                         next;
765                 }
766
767                 # add the message to the available queue
768                 add_dir($ref); 
769         }
770 }
771
772 # add the message to the directory listing
773 sub add_dir
774 {
775         my $ref = shift;
776         confess "tried to add a non-ref to the msg directory" if !ref $ref;
777         push @msg, $ref;
778 }
779
780 # return all the current messages
781 sub get_all
782 {
783         return @msg;
784 }
785
786 # get a particular message
787 sub get
788 {
789         my $msgno = shift;
790         for (@msg) {
791                 return $_ if $_->{msgno} == $msgno;
792                 last if $_->{msgno} > $msgno;
793         }
794         return undef;
795 }
796
797 # return the official filename for a message no
798 sub filename
799 {
800         return sprintf "$msgdir/m%06d", shift;
801 }
802
803 #
804 # return a list of valid elements 
805
806
807 sub fields
808 {
809         return keys(%valid);
810 }
811
812 #
813 # return a prompt for a field
814 #
815
816 sub field_prompt
817
818         my ($self, $ele) = @_;
819         return $valid{$ele};
820 }
821
822 #
823 # send a message state machine
824 sub do_send_stuff
825 {
826         my $self = shift;
827         my $line = shift;
828         my @out;
829         
830         if ($self->state eq 'send1') {
831                 #  $DB::single = 1;
832                 confess "local var gone missing" if !ref $self->{loc};
833                 my $loc = $self->{loc};
834                 $loc->{subject} = $line;
835                 $loc->{lines} = [];
836                 $self->state('sendbody');
837                 #push @out, $self->msg('sendbody');
838                 push @out, $self->msg('m8');
839         } elsif ($self->state eq 'sendbody') {
840                 confess "local var gone missing" if !ref $self->{loc};
841                 my $loc = $self->{loc};
842                 if ($line eq "\032" || $line eq '%1A' || uc $line eq "/EX") {
843                         my $to;
844                         
845                         foreach $to (@{$loc->{to}}) {
846                                 my $ref;
847                                 my $systime = $main::systime;
848                                 my $mycall = $main::mycall;
849                                 $ref = DXMsg->alloc(DXMsg::next_transno('Msgno'),
850                                                                         uc $to,
851                                                                         exists $loc->{from} ? $loc->{from} : $self->call, 
852                                                                         $systime,
853                                                                         $loc->{private}, 
854                                                                         $loc->{subject}, 
855                                                                         exists $loc->{origin} ? $loc->{origin} : $mycall,
856                                                                         '0',
857                                                                         $loc->{rrreq});
858                                 $ref->swop_it($self->call);
859                                 $ref->store($loc->{lines});
860                                 $ref->add_dir();
861                                 push @out, $self->msg('m11', $ref->{msgno}, $to);
862                                 #push @out, "msgno $ref->{msgno} sent to $to";
863                                 my $dxchan = DXChannel->get(uc $to);
864                                 if ($dxchan) {
865                                         if ($dxchan->is_user()) {
866                                                 $dxchan->send($dxchan->msg('m9'));
867                                         }
868                                 }
869                         }
870
871                         delete $loc->{lines};
872                         delete $loc->{to};
873                         delete $self->{loc};
874                         $self->func(undef);
875                         
876                         $self->state('prompt');
877                 } elsif ($line eq "\031" || uc $line eq "/ABORT" || uc $line eq "/QUIT") {
878                         #push @out, $self->msg('sendabort');
879                         push @out, $self->msg('m10');
880                         delete $loc->{lines};
881                         delete $loc->{to};
882                         delete $self->{loc};
883                         $self->func(undef);
884                         $self->state('prompt');
885                 } else {
886                         
887                         # i.e. it ain't and end or abort, therefore store the line
888                         push @{$loc->{lines}}, length($line) > 0 ? $line : " ";
889                 }
890         }
891         return (1, @out);
892 }
893
894 # return the standard directory line for this ref 
895 sub dir
896 {
897         my $ref = shift;
898         return sprintf "%6d%s%s%5d %8.8s %8.8s %-6.6s %5.5s %-30.30s", 
899                 $ref->msgno, $ref->read ? '-' : ' ', $ref->private ? 'p' : ' ', $ref->size,
900                         $ref->to, $ref->from, cldate($ref->t), ztime($ref->t), $ref->subject;
901 }
902
903 # load the forward table
904 sub load_forward
905 {
906         my @out;
907         my $s = readfilestr($forwardfn);
908         if ($s) {
909                 eval $s;
910                 push @out, $@ if $@;
911         }
912         return @out;
913 }
914
915 # load the bad message table
916 sub load_badmsg
917 {
918         my @out;
919         my $s = readfilestr($badmsgfn);
920         if ($s) {
921                 eval $s;
922                 push @out, $@ if $@;
923         }
924         return @out;
925 }
926
927 # load the swop message table
928 sub load_swop
929 {
930         my @out;
931         my $s = readfilestr($swopfn);
932         if ($s) {
933                 eval $s;
934                 push @out, $@ if $@;
935         }
936         return @out;
937 }
938
939 #
940 # forward that message or not according to the forwarding table
941 # returns 1 for forward, 0 - to ignore
942 #
943
944 sub forward_it
945 {
946         my $ref = shift;
947         my $call = shift;
948         my $i;
949         
950         for ($i = 0; $i < @forward; $i += 5) {
951                 my ($sort, $field, $pattern, $action, $bbs) = @forward[$i..($i+4)]; 
952                 my $tested;
953                 
954                 # are we interested?
955                 next if $ref->{private} && $sort ne 'P';
956                 next if !$ref->{private} && $sort ne 'B';
957                 
958                 # select field
959                 $tested = $ref->{to} if $field eq 'T';
960                 $tested = $ref->{from} if $field eq 'F';
961                 $tested = $ref->{origin} if $field eq 'O';
962                 $tested = $ref->{subject} if $field eq 'S';
963
964                 if (!$pattern || $tested =~ m{$pattern}i) {
965                         return 0 if $action eq 'I';
966                         return 1 if !$bbs || grep $_ eq $call, @{$bbs};
967                 }
968         }
969         return 0;
970 }
971
972 sub dump_it
973 {
974         my $ref = shift;
975         my $call = shift;
976         my $i;
977         
978         for ($i = 0; $i < @badmsg; $i += 3) {
979                 my ($sort, $field, $pattern) = @badmsg[$i..($i+2)]; 
980                 my $tested;
981                 
982                 # are we interested?
983                 next if $ref->{private} && $sort ne 'P';
984                 next if !$ref->{private} && $sort ne 'B';
985                 
986                 # select field
987                 $tested = $ref->{to} if $field eq 'T';
988                 $tested = $ref->{from} if $field eq 'F';
989                 $tested = $ref->{origin} if $field eq 'O';
990                 $tested = $ref->{subject} if $field eq 'S';
991
992                 if (!$pattern || $tested =~ m{$pattern}i) {
993                         return 1;
994                 }
995         }
996         return 0;
997 }
998
999 sub swop_it
1000 {
1001         my $ref = shift;
1002         my $call = shift;
1003         my $i;
1004         my $count = 0;
1005         
1006         for ($i = 0; $i < @swop; $i += 5) {
1007                 my ($sort, $field, $pattern, $tfield, $topattern) = @swop[$i..($i+4)]; 
1008                 my $tested;
1009                 my $swop;
1010                 my $old;
1011                 
1012                 # are we interested?
1013                 next if $ref->{private} && $sort ne 'P';
1014                 next if !$ref->{private} && $sort ne 'B';
1015                 
1016                 # select field
1017                 $tested = $ref->{to} if $field eq 'T';
1018                 $tested = $ref->{from} if $field eq 'F';
1019                 $tested = $ref->{origin} if $field eq 'O';
1020                 $tested = $ref->{subject} if $field eq 'S';
1021
1022                 # select swop field
1023                 $old = $swop = $ref->{to} if $tfield eq 'T';
1024                 $old = $swop = $ref->{from} if $tfield eq 'F';
1025                 $old = $swop = $ref->{origin} if $tfield eq 'O';
1026                 $old = $swop = $ref->{subject} if $tfield eq 'S';
1027
1028                 if ($tested =~ m{$pattern}i) {
1029                         if ($tested eq $swop) {
1030                                 $swop =~ s{$pattern}{$topattern}i;
1031                         } else {
1032                                 $swop = $topattern;
1033                         }
1034                         Log('msg', "Msg $ref->{msgno}: $tfield $old -> $swop");
1035                         Log('dbg', "Msg $ref->{msgno}: $tfield $old -> $swop");
1036                         $ref->{to} = $swop if $tfield eq 'T';
1037                         $ref->{from} = $swop if $tfield eq 'F';
1038                         $ref->{origin} = $swop if $tfield eq 'O';
1039                         $ref->{subject} = $swop if $tfield eq 'S';
1040                         ++$count;
1041                 }
1042         }
1043         return $count;
1044 }
1045
1046 # import any msgs in the import directory
1047 # the messages are in BBS format (but may have cluster extentions
1048 # so SB UK < GB7TLH is legal
1049 sub import_msgs
1050 {
1051         # are there any to do in this directory?
1052         return unless -d $importfn;
1053         unless (opendir(DIR, $importfn)) {
1054                 dbg('msg', "can\'t open $importfn $!");
1055                 Log('msg', "can\'t open $importfn $!");
1056                 return;
1057         } 
1058
1059         my @names = readdir(DIR);
1060         closedir(DIR);
1061         my $name;
1062         foreach $name (@names) {
1063                 next if $name =~ /^\./;
1064                 my $splitit = $name =~ /^split/;
1065                 my $fn = "$importfn/$name";
1066                 next unless -f $fn;
1067                 unless (open(MSG, $fn)) {
1068                         dbg('msg', "can\'t open import file $fn $!");
1069                         Log('msg', "can\'t open import file $fn $!");
1070                         unlink($fn);
1071                         next;
1072                 }
1073                 my @msg = map { chomp; $_ } <MSG>;
1074                 close(MSG);
1075                 unlink($fn);
1076                 my @out = import_one($DXProt::me, \@msg, $splitit);
1077                 Log('msg', @out);
1078         }
1079 }
1080
1081 # import one message as a list in bbs (as extended) mode
1082 # takes a reference to an array containing the whole message
1083 sub import_one
1084 {
1085         my $dxchan = shift;
1086         my $ref = shift;
1087         my $splitit = shift;
1088         my $private = '1';
1089         my $rr = '0';
1090         my $notincalls = 1;
1091         my $from = $dxchan->call;
1092         my $origin = $main::mycall;
1093         my @to;
1094         my @out;
1095                                 
1096         # first line;
1097         my $line = shift @$ref;
1098         my @f = split /\s+/, $line;
1099         unless (@f && $f[0] =~ /^(:?S|SP|SB|SEND)$/ ) {
1100                 my $m = "invalid first line in import '$line'";
1101                 dbg('MSG', $m );
1102                 return (1, $m);
1103         }
1104         while (@f) {
1105                 my $f = uc shift @f;
1106                 next if $f eq 'SEND';
1107
1108                 # private / noprivate / rr
1109                 if ($notincalls && ($f eq 'B' || $f eq 'SB' || $f =~ /^NOP/oi)) {
1110                         $private = '0';
1111                 } elsif ($notincalls && ($f eq 'P' || $f eq 'SP' || $f =~ /^PRI/oi)) {
1112                         ;
1113                 } elsif ($notincalls && ($f eq 'RR')) {
1114                         $rr = '1';
1115                 } elsif ($f eq '@' && @f) {       # this is bbs syntax, for origin
1116                         $origin = uc shift @f;
1117                 } elsif ($f eq '<' && @f) {     # this is bbs syntax  for from call
1118                         $from = uc shift @f;
1119                 } elsif ($f =~ /^\$/) {     # this is bbs syntax  for a bid
1120                         next;
1121                 } elsif ($f =~ /^<\S+/) {     # this is bbs syntax  for from call
1122                         ($from) = $f =~ /^<(\S+)$/;
1123                 } elsif ($f =~ /^\@\S+/) {     # this is bbs syntax for origin
1124                         ($origin) = $f =~ /^\@(\S+)$/;
1125                 } else {
1126
1127                         # callsign ?
1128                         $notincalls = 0;
1129
1130                         # is this callsign a distro?
1131                         my $fn = "$msgdir/distro/$f.pl";
1132                         if (-e $fn) {
1133                                 my $fh = new IO::File $fn;
1134                                 if ($fh) {
1135                                         local $/ = undef;
1136                                         my $s = <$fh>;
1137                                         $fh->close;
1138                                         my @call;
1139                                         @call = eval $s;
1140                                         return (1, "Error in Distro $f.pl:", $@) if $@;
1141                                         if (@call > 0) {
1142                                                 push @f, @call;
1143                                                 next;
1144                                         }
1145                                 }
1146                         }
1147                         
1148                         if (grep $_ eq $f, @DXMsg::badmsg) {
1149                                 push @out, $dxchan->msg('m3', $f);
1150                         } else {
1151                                 push @to, $f;
1152                         }
1153                 }
1154         }
1155         
1156         # subject is the next line
1157         my $subject = shift @$ref;
1158         
1159         # strip off trailing lines 
1160         pop @$ref while (@$ref && $$ref[-1] =~ /^\s*$/);
1161         
1162         # strip off /EX or /ABORT
1163         return ("aborted") if @$ref && $$ref[-1] =~ m{^/ABORT$}i; 
1164         pop @$ref if (@$ref && $$ref[-1] =~ m{^/EX$}i);                                                                  
1165
1166         # sort out any splitting that needs to be done
1167         my @chunk;
1168         if ($splitit) {
1169                 my $lth = 0;
1170                 my $lines = [];
1171                 for (@$ref) {
1172                         if ($lth >= $maxchunk || ($lth > $minchunk && /^\s*$/)) {
1173                                 push @chunk, $lines;
1174                                 $lines = [];
1175                                 $lth = 0;
1176                         } 
1177                         push @$lines, $_;
1178                         $lth += length; 
1179                 }
1180                 push @chunk, $lines if @$lines;
1181         } else {
1182                 push @chunk, $ref;
1183         }
1184                                   
1185     # write all the messages away
1186         my $i;
1187         for ( $i = 0;  $i < @chunk; $i++) {
1188                 my $chunk = $chunk[$i];
1189                 my $ch_subject;
1190                 if (@chunk > 1) {
1191                         my $num = " [" . ($i+1) . "/" . scalar @chunk . "]";
1192                         $ch_subject = substr($subject, 0, 27 - length $num) .  $num;
1193                 } else {
1194                         $ch_subject = $subject;
1195                 }
1196                 my $to;
1197                 foreach $to (@to) {
1198                         my $systime = $main::systime;
1199                         my $mycall = $main::mycall;
1200                         my $mref = DXMsg->alloc(DXMsg::next_transno('Msgno'),
1201                                                                         $to,
1202                                                                         $from, 
1203                                                                         $systime,
1204                                                                         $private, 
1205                                                                         $ch_subject, 
1206                                                                         $origin,
1207                                                                         '0',
1208                                                                         $rr);
1209                         $mref->swop_it($main::mycall);
1210                         $mref->store($chunk);
1211                         $mref->add_dir();
1212                         push @out, $dxchan->msg('m11', $mref->{msgno}, $to);
1213                         #push @out, "msgno $ref->{msgno} sent to $to";
1214                         my $todxchan = DXChannel->get(uc $to);
1215                         if ($todxchan) {
1216                                 if ($todxchan->is_user()) {
1217                                         $todxchan->send($todxchan->msg('m9'));
1218                                 }
1219                         }
1220                 }
1221         }
1222         return @out;
1223 }
1224
1225 no strict;
1226 sub AUTOLOAD
1227 {
1228         my $self = shift;
1229         my $name = $AUTOLOAD;
1230         return if $name =~ /::DESTROY$/;
1231         $name =~ s/.*:://o;
1232         
1233         confess "Non-existant field '$AUTOLOAD'" if !$valid{$name};
1234         @_ ? $self->{$name} = shift : $self->{$name} ;
1235 }
1236
1237 1;
1238
1239 __END__