]> scm.dxcluster.org Git - spider.git/blob - perl/DXMsg.pm
get set/nobeeps to work?
[spider.git] / perl / DXMsg.pm
1 #!/usr/bin/perl
2 #
3 # This module impliments the message handling for a dx cluster
4 #
5 # Copyright (c) 1998 Dirk Koopman G1TLH
6 #
7 # $Id$
8 #
9 #
10 # Notes for implementors:-
11 #
12 # PC28 field 11 is the RR required flag
13 # PC28 field 12 is a VIA routing (ie it is a node call) 
14 #
15
16 package DXMsg;
17
18 @ISA = qw(DXProt DXChannel);
19
20 use DXUtil;
21 use DXChannel;
22 use DXUser;
23 use DXM;
24 use DXCluster;
25 use DXProtVars;
26 use DXProtout;
27 use DXDebug;
28 use DXLog;
29 use IO::File;
30 use Fcntl;
31 use Carp;
32
33 use strict;
34 use vars qw(%work @msg $msgdir %valid %busy $maxage $last_clean
35                         @badmsg $badmsgfn $forwardfn @forward $timeout $waittime
36                     $queueinterval $lastq);
37
38 %work = ();                                             # outstanding jobs
39 @msg = ();                                              # messages we have
40 %busy = ();                                             # station interlocks
41 $msgdir = "$main::root/msg";    # directory contain the msgs
42 $maxage = 30 * 86400;                   # the maximum age that a message shall live for if not marked 
43 $last_clean = 0;                                # last time we did a clean
44 @forward = ();                  # msg forward table
45 $timeout = 30*60;               # forwarding timeout
46 $waittime = 30*60;              # time an aborted outgoing message waits before trying again
47 $queueinterval = 1*60;          # run the queue every 1 minute
48 $lastq = 0;
49
50
51 $badmsgfn = "$msgdir/badmsg.pl";  # list of TO address we wont store
52 $forwardfn = "$msgdir/forward.pl";  # the forwarding table
53
54 %valid = (
55                   fromnode => '5,From Node',
56                   tonode => '5,To Node',
57                   to => '0,To',
58                   from => '0,From',
59                   t => '0,Msg Time,cldatetime',
60                   private => '5,Private',
61                   subject => '0,Subject',
62                   linesreq => '0,Lines per Gob',
63                   rrreq => '5,Read Confirm',
64                   origin => '0,Origin',
65                   lines => '5,Data',
66                   stream => '9,Stream No',
67                   count => '5,Gob Linecnt',
68                   file => '5,File?,yesno',
69                   gotit => '5,Got it Nodes,parray',
70                   lines => '5,Lines,parray',
71                   'read' => '5,Times read',
72                   size => '0,Size',
73                   msgno => '0,Msgno',
74                   keep => '0,Keep this?,yesno',
75                   lastt => '5,Last processed,cldatetime',
76                   waitt => '5,Wait until,cldatetime',
77                  );
78
79 sub DESTROY
80 {
81         my $self = shift;
82         undef $self->{lines};
83         undef $self->{gotit};
84 }
85
86 # allocate a new object
87 # called fromnode, tonode, from, to, datetime, private?, subject, nolinesper  
88 sub alloc                  
89 {
90         my $pkg = shift;
91         my $self = bless {}, $pkg;
92         $self->{msgno} = shift;
93         my $to = shift;
94         #  $to =~ s/-\d+$//o;
95         $self->{to} = ($to eq $main::mycall) ? $main::myalias : $to;
96         my $from = shift;
97         $from =~ s/-\d+$//o;
98         $self->{from} = uc $from;
99         $self->{t} = shift;
100         $self->{private} = shift;
101         $self->{subject} = shift;
102         $self->{origin} = shift;
103         $self->{'read'} = shift;
104         $self->{rrreq} = shift;
105         $self->{gotit} = [];
106         $self->{lastt} = $main::systime;
107     
108         return $self;
109 }
110
111 sub workclean
112 {
113         my $ref = shift;
114         delete $ref->{lines};
115         delete $ref->{linesreq};
116         delete $ref->{tonode};
117         delete $ref->{fromnode};
118         delete $ref->{stream};
119         delete $ref->{lines};
120         delete $ref->{file};
121         delete $ref->{count};
122         delete $ref->{lastt} if exists $ref->{lastt};
123         delete $ref->{waitt} if exists $ref->{waitt};
124 }
125
126 sub process
127 {
128         my ($self, $line) = @_;
129
130         # this is periodic processing
131         if (!$self || !$line) {
132
133                 if ($main::systime > $lastq + $queueinterval) {
134
135                         # wander down the work queue stopping any messages that have timed out
136                         for (keys %busy) {
137                                 my $node = $_;
138                                 my $ref = $busy{$_};
139                                 if (exists $ref->{lastt} && $main::systime >= $ref->{lastt} + $timeout) {
140                                         dbg('msg', "Timeout, stopping msgno: $ref->{msgno} -> $node");
141                                         $ref->stop_msg($node);
142                                         
143                                         # delay any outgoing messages that fail
144                                         $ref->{waitt} = $main::systime + $waittime + rand(120) if $node ne $main::mycall;
145                                 }
146                         }
147
148                         # queue some message if the interval timer has gone off
149                         queue_msg(0);
150                         $lastq = $main::systime;
151                 }
152
153                 # clean the message queue
154                 clean_old() if $main::systime - $last_clean > 3600 ;
155                 return;
156         }
157
158         my @f = split /\^/, $line;
159         my ($pcno) = $f[0] =~ /^PC(\d\d)/; # just get the number
160
161  SWITCH: {
162                 if ($pcno == 28) {              # incoming message
163
164                         # first look for any messages in the busy queue 
165                         # and cancel them this should both resolve timed out incoming messages
166                         # and crossing of message between nodes, incoming messages have priority
167                         if (exists $busy{$f[2]}) {
168                                 my $ref = $busy{$f[2]};
169                                 my $tonode = $ref->{tonode};
170                                 dbg('msg', "Busy, stopping msgno: $ref->{msgno} -> $f[2]");
171                                 $ref->stop_msg($self->call);
172                         }
173
174                         my $t = cltounix($f[5], $f[6]);
175                         my $stream = next_transno($f[2]);
176                         my $ref = DXMsg->alloc($stream, uc $f[3], $f[4], $t, $f[7], $f[8], $f[13], '0', $f[11]);
177                         
178                         # fill in various forwarding state variables
179                         $ref->{fromnode} = $f[2];
180                         $ref->{tonode} = $f[1];
181                         $ref->{rrreq} = $f[11];
182                         $ref->{linesreq} = $f[10];
183                         $ref->{stream} = $stream;
184                         $ref->{count} = 0;      # no of lines between PC31s
185                         dbg('msg', "new message from $f[4] to $f[3] '$f[8]' stream $stream\n");
186                         $work{"$f[2]$stream"} = $ref; # store in work
187                         $busy{$f[2]} = $ref; # set interlock
188                         $self->send(DXProt::pc30($f[2], $f[1], $stream)); # send ack
189                         $ref->{lastt} = $main::systime;
190
191                         # look to see whether this is a non private message sent to a known callsign
192                         my $uref = DXUser->get_current($ref->{to});
193                         if (iscallsign($ref->{to}) && !$ref->{private} && $uref && $uref->homenode) {
194                                 $ref->{private} = 1;
195                                 dbg('msg', "set bull to $ref->{to} to private");
196                         }
197                         last SWITCH;
198                 }
199                 
200                 if ($pcno == 29) {              # incoming text
201                         my $ref = $work{"$f[2]$f[3]"};
202                         if ($ref) {
203                                 push @{$ref->{lines}}, $f[4];
204                                 $ref->{count}++;
205                                 if ($ref->{count} >= $ref->{linesreq}) {
206                                         $self->send(DXProt::pc31($f[2], $f[1], $f[3]));
207                                         dbg('msg', "stream $f[3]: $ref->{count} lines received\n");
208                                         $ref->{count} = 0;
209                                 }
210                                 $ref->{lastt} = $main::systime;
211                         } else {
212                                 dbg('msg', "PC29 from unknown stream $f[3] from $f[2]" );
213                                 $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream
214                         }
215                         last SWITCH;
216                 }
217                 
218                 if ($pcno == 30) {              # this is a incoming subject ack
219                         my $ref = $work{$f[2]}; # note no stream at this stage
220                         if ($ref) {
221                                 delete $work{$f[2]};
222                                 $ref->{stream} = $f[3];
223                                 $ref->{count} = 0;
224                                 $ref->{linesreq} = 5;
225                                 $work{"$f[2]$f[3]"} = $ref;     # new ref
226                                 dbg('msg', "incoming subject ack stream $f[3]\n");
227                                 $busy{$f[2]} = $ref; # interlock
228                                 $ref->{lines} = [];
229                                 push @{$ref->{lines}}, ($ref->read_msg_body);
230                                 $ref->send_tranche($self);
231                                 $ref->{lastt} = $main::systime;
232                         } else {
233                                 dbg('msg', "PC30 from unknown stream $f[3] from $f[2]" );
234                                 $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream
235                         } 
236                         last SWITCH;
237                 }
238                 
239                 if ($pcno == 31) {              # acknowledge a tranche of lines
240                         my $ref = $work{"$f[2]$f[3]"};
241                         if ($ref) {
242                                 dbg('msg', "tranche ack stream $f[3]\n");
243                                 $ref->send_tranche($self);
244                                 $ref->{lastt} = $main::systime;
245                         } else {
246                                 dbg('msg', "PC31 from unknown stream $f[3] from $f[2]" );
247                                 $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream
248                         } 
249                         last SWITCH;
250                 }
251                 
252                 if ($pcno == 32) {              # incoming EOM
253                         dbg('msg', "stream $f[3]: EOM received\n");
254                         my $ref = $work{"$f[2]$f[3]"};
255                         if ($ref) {
256                                 $self->send(DXProt::pc33($f[2], $f[1], $f[3])); # acknowledge it
257                                 
258                                 # get the next msg no - note that this has NOTHING to do with the stream number in PC protocol
259                                 # store the file or message
260                                 # remove extraneous rubbish from the hash
261                                 # remove it from the work in progress vector
262                                 # stuff it on the msg queue
263                                 if ($ref->{lines} && @{$ref->{lines}} > 0) { # ignore messages with 0 lines
264                                         if ($ref->{file}) {
265                                                 $ref->store($ref->{lines});
266                                         } else {
267
268                                                 # does an identical message already exist?
269                                                 my $m;
270                                                 for $m (@msg) {
271                                                         if ($ref->{subject} eq $m->{subject} && $ref->{t} == $m->{t} && $ref->{from} eq $m->{from} && $ref->{to} eq $m->{to}) {
272                                                                 $ref->stop_msg($self->call);
273                                                                 my $msgno = $m->{msgno};
274                                                                 dbg('msg', "duplicate message to $msgno\n");
275                                                                 Log('msg', "duplicate message to $msgno");
276                                                                 return;
277                                                         }
278                                                 }
279                                                         
280                                                 # look for 'bad' to addresses 
281                                                 if (grep $ref->{to} eq $_, @badmsg) {
282                                                         $ref->stop_msg($self->call);
283                                                         dbg('msg', "'Bad' TO address $ref->{to}");
284                                                         Log('msg', "'Bad' TO address $ref->{to}");
285                                                         return;
286                                                 }
287
288                                                 $ref->{msgno} = next_transno("Msgno");
289                                                 push @{$ref->{gotit}}, $f[2]; # mark this up as being received
290                                                 $ref->store($ref->{lines});
291                                                 add_dir($ref);
292                                                 my $dxchan = DXChannel->get($ref->{to});
293                                                 $dxchan->send($dxchan->msg('m9')) if $dxchan && $dxchan->is_user;
294                                                 Log('msg', "Message $ref->{msgno} from $ref->{from} received from $f[2] for $ref->{to}");
295                                         }
296                                 }
297                                 $ref->stop_msg($self->call);
298                         } else {
299                                 dbg('msg', "PC32 from unknown stream $f[3] from $f[2]" );
300                                 $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream
301                         }
302                         # queue_msg(0);
303                         last SWITCH;
304                 }
305                 
306                 if ($pcno == 33) {              # acknowledge the end of message
307                         my $ref = $work{"$f[2]$f[3]"};
308                         if ($ref) {
309                                 if ($ref->{private}) { # remove it if it private and gone off site#
310                                         Log('msg', "Message $ref->{msgno} from $ref->{from} sent to $f[2] and deleted");
311                                         $ref->del_msg;
312                                 } else {
313                                         Log('msg', "Message $ref->{msgno} from $ref->{from} sent to $f[2]");
314                                         push @{$ref->{gotit}}, $f[2]; # mark this up as being received
315                                         $ref->store($ref->{lines});     # re- store the file
316                                 }
317                                 $ref->stop_msg($self->call);
318                         } else {
319                                 dbg('msg', "PC33 from unknown stream $f[3] from $f[2]" );
320                                 $self->send(DXProt::pc42($f[2], $f[1], $f[3])); # unknown stream
321                         } 
322
323                         # send next one if present
324                         queue_msg(0);
325                         last SWITCH;
326                 }
327                 
328                 if ($pcno == 40) {              # this is a file request
329                         $f[3] =~ s/\\/\//og; # change the slashes
330                         $f[3] =~ s/\.//og;      # remove dots
331                         $f[3] =~ s/^\///o;   # remove the leading /
332                         $f[3] = lc $f[3];       # to lower case;
333                         dbg('msg', "incoming file $f[3]\n");
334                         $f[3] = 'packclus/' . $f[3] unless $f[3] =~ /^packclus\//o;
335                         
336                         # create any directories
337                         my @part = split /\//, $f[3];
338                         my $part;
339                         my $fn = "$main::root";
340                         pop @part;                      # remove last part
341                         foreach $part (@part) {
342                                 $fn .= "/$part";
343                                 next if -e $fn;
344                                 last SWITCH if !mkdir $fn, 0777;
345                                 dbg('msg', "created directory $fn\n");
346                         }
347                         my $stream = next_transno($f[2]);
348                         my $ref = DXMsg->alloc($stream, "$main::root/$f[3]", $self->call, time, !$f[4], $f[3], ' ', '0', '0');
349                         
350                         # forwarding variables
351                         $ref->{fromnode} = $f[1];
352                         $ref->{tonode} = $f[2];
353                         $ref->{linesreq} = $f[5];
354                         $ref->{stream} = $stream;
355                         $ref->{count} = 0;      # no of lines between PC31s
356                         $ref->{file} = 1;
357                         $ref->{lastt} = $main::systime;
358                         $work{"$f[2]$stream"} = $ref; # store in work
359                         $self->send(DXProt::pc30($f[2], $f[1], $stream)); # send ack 
360                         
361                         last SWITCH;
362                 }
363                 
364                 if ($pcno == 42) {              # abort transfer
365                         dbg('msg', "stream $f[3]: abort received\n");
366                         my $ref = $work{"$f[2]$f[3]"};
367                         if ($ref) {
368                                 $ref->stop_msg($self->call);
369                                 $ref = undef;
370                         }
371                         last SWITCH;
372                 }
373
374                 if ($pcno == 49) {      # global delete on subject
375                         for (@msg) {
376                                 if ($_->{from} eq $f[1] && $_->{subject} eq $f[2]) {
377                                         $_->del_msg();
378                                         Log('msg', "Message $_->{msgno} from $_->{from} ($_->{subject}) fully deleted");
379                                         DXProt::broadcast_ak1a($line, $self);
380                                 }
381                         }
382                 }
383         }
384 }
385
386
387 # store a message away on disc or whatever
388 #
389 # NOTE the second arg is a REFERENCE not a list
390 sub store
391 {
392         my $ref = shift;
393         my $lines = shift;
394         
395         # we only proceed if there are actually any lines in the file
396 #       if (!$lines || @{$lines} == 0) {
397 #               return;
398 #       }
399         
400         if ($ref->{file}) {                     # a file
401                 dbg('msg', "To be stored in $ref->{to}\n");
402                 
403                 my $fh = new IO::File "$ref->{to}", "w";
404                 if (defined $fh) {
405                         my $line;
406                         foreach $line (@{$lines}) {
407                                 print $fh "$line\n";
408                         }
409                         $fh->close;
410                         dbg('msg', "file $ref->{to} stored\n");
411                         Log('msg', "file $ref->{to} from $ref->{from} stored" );
412                 } else {
413                         confess "can't open file $ref->{to} $!";  
414                 }
415         } else {                                        # a normal message
416
417                 # attempt to open the message file
418                 my $fn = filename($ref->{msgno});
419                 
420                 dbg('msg', "To be stored in $fn\n");
421                 
422                 # now save the file, overwriting what's there, YES I KNOW OK! (I will change it if it's a problem)
423                 my $fh = new IO::File "$fn", "w";
424                 if (defined $fh) {
425                         my $rr = $ref->{rrreq} ? '1' : '0';
426                         my $priv = $ref->{private} ? '1': '0';
427                         print $fh "=== $ref->{msgno}^$ref->{to}^$ref->{from}^$ref->{t}^$priv^$ref->{subject}^$ref->{origin}^$ref->{'read'}^$rr\n";
428                         print $fh "=== ", join('^', @{$ref->{gotit}}), "\n";
429                         my $line;
430                         $ref->{size} = 0;
431                         foreach $line (@{$lines}) {
432                                 $ref->{size} += (length $line) + 1;
433                                 print $fh "$line\n";
434                         }
435                         $fh->close;
436                         dbg('msg', "msg $ref->{msgno} stored\n");
437                         Log('msg', "msg $ref->{msgno} from $ref->{from} to $ref->{to} stored" );
438                 } else {
439                         confess "can't open msg file $fn $!";  
440                 }
441         }
442 }
443
444 # delete a message
445 sub del_msg
446 {
447         my $self = shift;
448         
449         # remove it from the active message list
450         @msg = map { $_ != $self ? $_ : () } @msg;
451         
452         # belt and braces (one day I will ask someone if this is REALLY necessary)
453         delete $self->{gotit};
454         delete $self->{list};
455         
456         # remove the file
457         unlink filename($self->{msgno});
458         dbg('msg', "deleting $self->{msgno}\n");
459 }
460
461 # clean out old messages from the message queue
462 sub clean_old
463 {
464         my $ref;
465         
466         # mark old messages for deletion
467         foreach $ref (@msg) {
468                 if (!$ref->{keep} && $ref->{t} < $main::systime - $maxage) {
469                         $ref->{deleteme} = 1;
470                         delete $ref->{gotit};
471                         delete $ref->{list};
472                         unlink filename($ref->{msgno});
473                         dbg('msg', "deleting old $ref->{msgno}\n");
474                 }
475         }
476         
477         # remove them all from the active message list
478         @msg = map { $_->{deleteme} ? () : $_ } @msg;
479         $last_clean = $main::systime;
480 }
481
482 # read in a message header
483 sub read_msg_header
484
485         my $fn = shift;
486         my $file;
487         my $line;
488         my $ref;
489         my @f;
490         my $size;
491         
492         $file = new IO::File;
493         if (!open($file, $fn)) {
494                 print "Error reading $fn $!\n";
495                 return undef;
496         }
497         $size = -s $fn;
498         $line = <$file>;                        # first line
499         chomp $line;
500         $size -= length $line;
501         if (! $line =~ /^===/o) {
502                 print "corrupt first line in $fn ($line)\n";
503                 return undef;
504         }
505         $line =~ s/^=== //o;
506         @f = split /\^/, $line;
507         $ref = DXMsg->alloc(@f);
508         
509         $line = <$file>;                        # second line
510         chomp $line;
511         $size -= length $line;
512         if (! $line =~ /^===/o) {
513                 print "corrupt second line in $fn ($line)\n";
514                 return undef;
515         }
516         $line =~ s/^=== //o;
517         $ref->{gotit} = [];
518         @f = split /\^/, $line;
519         push @{$ref->{gotit}}, @f;
520         $ref->{size} = $size;
521         
522         close($file);
523         
524         return $ref;
525 }
526
527 # read in a message header
528 sub read_msg_body
529 {
530         my $self = shift;
531         my $msgno = $self->{msgno};
532         my $file;
533         my $line;
534         my $fn = filename($msgno);
535         my @out;
536         
537         $file = new IO::File;
538         if (!open($file, $fn)) {
539                 print "Error reading $fn $!\n";
540                 return undef;
541         }
542         chomp (@out = <$file>);
543         close($file);
544         
545         shift @out if $out[0] =~ /^=== /;
546         shift @out if $out[0] =~ /^=== /;
547         return @out;
548 }
549
550 # send a tranche of lines to the other end
551 sub send_tranche
552 {
553         my ($self, $dxchan) = @_;
554         my @out;
555         my $to = $self->{tonode};
556         my $from = $self->{fromnode};
557         my $stream = $self->{stream};
558         my $lines = $self->{lines};
559         my ($c, $i);
560         
561         for ($i = 0, $c = $self->{count}; $i < $self->{linesreq} && $c < @$lines; $i++, $c++) {
562                 push @out, DXProt::pc29($to, $from, $stream, $lines->[$c]);
563     }
564     $self->{count} = $c;
565
566     push @out, DXProt::pc32($to, $from, $stream) if $i < $self->{linesreq};
567         $dxchan->send(@out);
568 }
569
570         
571 # find a message to send out and start the ball rolling
572 sub queue_msg
573 {
574         my $sort = shift;
575         my $call = shift;
576         my $ref;
577         my $clref;
578         my @nodelist = DXChannel::get_all_ak1a();
579         
580         # bat down the message list looking for one that needs to go off site and whose
581         # nearest node is not busy.
582
583         dbg('msg', "queue msg ($sort)\n");
584         foreach $ref (@msg) {
585                 # firstly, is it private and unread? if so can I find the recipient
586                 # in my cluster node list offsite?
587
588                 # ignore 'delayed' messages until their waiting time has expired
589                 if (exists $ref->{waitt}) {
590                         next if $ref->{waitt} > $main::systime;
591                         delete $ref->{waitt};
592                 } 
593
594                 # deal with routed private messages
595                 my $noderef;
596                 if ($ref->{private}) {
597                         $clref = DXCluster->get_exact($ref->{to});
598                         unless ($clref) {             # otherwise look for a homenode
599                                 my $uref = DXUser->get($ref->{to});
600                                 my $hnode =  $uref->homenode if $uref;
601                                 $clref = DXCluster->get_exact($hnode) if $hnode;
602                         }
603                         if ($clref && !grep { $clref->{dxchan} == $_ } DXCommandmode::get_all) {
604                                 next if $clref->call eq $main::mycall;  # i.e. it lives here
605                                 $noderef = $clref->{dxchan};
606                                 $ref->start_msg($noderef) if !get_busy($noderef->call)  && $noderef->state eq 'normal';
607                         }
608                 }
609                 
610                 # otherwise we are dealing with a bulletin or forwarded private message
611                 # compare the gotit list with
612                 # the nodelist up above, if there are sites that haven't got it yet
613                 # then start sending it - what happens when we get loops is anyone's
614                 # guess, use (to, from, time, subject) tuple?
615                 foreach $noderef (@nodelist) {
616                         next if $noderef->call eq $main::mycall;
617                         next if grep { $_ eq $noderef->call } @{$ref->{gotit}};
618                         next unless $ref->forward_it($noderef->call);           # check the forwarding file
619
620                         # if we are here we have a node that doesn't have this message
621                         $ref->start_msg($noderef) if !get_busy($noderef->call)  && $noderef->state eq 'normal';
622                         last;
623                 }
624
625                 # if all the available nodes are busy then stop
626                 last if @nodelist == scalar grep { get_busy($_->call) } @nodelist;
627         }
628 }
629
630 # is there a message for me?
631 sub for_me
632 {
633         my $call = uc shift;
634         my $ref;
635         
636         foreach $ref (@msg) {
637                 # is it for me, private and unread? 
638                 if ($ref->{to} eq $call && $ref->{private}) {
639                         return 1 if !$ref->{'read'};
640                 }
641         }
642         return 0;
643 }
644
645 # start the message off on its travels with a PC28
646 sub start_msg
647 {
648         my ($self, $dxchan) = @_;
649         
650         dbg('msg', "start msg $self->{msgno}\n");
651         $self->{linesreq} = 5;
652         $self->{count} = 0;
653         $self->{tonode} = $dxchan->call;
654         $self->{fromnode} = $main::mycall;
655         $busy{$self->{tonode}} = $self;
656         $work{$self->{tonode}} = $self;
657         $self->{lastt} = $main::systime;
658         $dxchan->send(DXProt::pc28($self->{tonode}, $self->{fromnode}, $self->{to}, $self->{from}, $self->{t}, $self->{private}, $self->{subject}, $self->{origin}, $self->{rrreq}));
659 }
660
661 # get the ref of a busy node
662 sub get_busy
663 {
664         my $call = shift;
665         return $busy{$call};
666 }
667
668 # get the busy queue
669 sub get_all_busy
670 {
671         return values %busy;
672 }
673
674 # get the forwarding queue
675 sub get_fwq
676 {
677         return values %work;
678 }
679
680 # stop a message from continuing, clean it out, unlock interlocks etc
681 sub stop_msg
682 {
683         my $self = shift;
684         my $node = shift;
685         my $stream = $self->{stream} if exists $self->{stream};
686         
687         
688         dbg('msg', "stop msg $self->{msgno} -> node $node\n");
689         delete $work{$node};
690         delete $work{"$node$stream"} if $stream;
691         $self->workclean;
692         delete $busy{$node};
693 }
694
695 # get a new transaction number from the file specified
696 sub next_transno
697 {
698         my $name = shift;
699         $name =~ s/\W//og;                      # remove non-word characters
700         my $fn = "$msgdir/$name";
701         my $msgno;
702         
703         my $fh = new IO::File;
704         if (sysopen($fh, $fn, O_RDWR|O_CREAT, 0666)) {
705                 $fh->autoflush(1);
706                 $msgno = $fh->getline;
707                 chomp $msgno;
708                 $msgno++;
709                 seek $fh, 0, 0;
710                 $fh->print("$msgno\n");
711                 dbg('msg', "msgno $msgno allocated for $name\n");
712                 $fh->close;
713         } else {
714                 confess "can't open $fn $!";
715         }
716         return $msgno;
717 }
718
719 # initialise the message 'system', read in all the message headers
720 sub init
721 {
722         my $dir = new IO::File;
723         my @dir;
724         my $ref;
725
726         # load various control files
727         my @in = load_badmsg();
728         print "@in\n" if @in;
729         @in = load_forward();
730         print "@in\n" if @in;
731
732         # read in the directory
733         opendir($dir, $msgdir) or confess "can't open $msgdir $!";
734         @dir = readdir($dir);
735         closedir($dir);
736
737         @msg = ();
738         for (sort @dir) {
739                 next unless /^m\d+$/o;
740                 
741                 $ref = read_msg_header("$msgdir/$_");
742                 next unless $ref;
743                 
744                 # delete any messages to 'badmsg.pl' places
745                 if (grep $ref->{to} eq $_, @badmsg) {
746                         dbg('msg', "'Bad' TO address $ref->{to}");
747                         Log('msg', "'Bad' TO address $ref->{to}");
748                         $ref->del_msg;
749                         next;
750                 }
751
752                 # add the message to the available queue
753                 add_dir($ref); 
754         }
755 }
756
757 # add the message to the directory listing
758 sub add_dir
759 {
760         my $ref = shift;
761         confess "tried to add a non-ref to the msg directory" if !ref $ref;
762         push @msg, $ref;
763 }
764
765 # return all the current messages
766 sub get_all
767 {
768         return @msg;
769 }
770
771 # get a particular message
772 sub get
773 {
774         my $msgno = shift;
775         for (@msg) {
776                 return $_ if $_->{msgno} == $msgno;
777                 last if $_->{msgno} > $msgno;
778         }
779         return undef;
780 }
781
782 # return the official filename for a message no
783 sub filename
784 {
785         return sprintf "$msgdir/m%06d", shift;
786 }
787
788 #
789 # return a list of valid elements 
790
791
792 sub fields
793 {
794         return keys(%valid);
795 }
796
797 #
798 # return a prompt for a field
799 #
800
801 sub field_prompt
802
803         my ($self, $ele) = @_;
804         return $valid{$ele};
805 }
806
807 #
808 # send a message state machine
809 sub do_send_stuff
810 {
811         my $self = shift;
812         my $line = shift;
813         my @out;
814         
815         if ($self->state eq 'send1') {
816                 #  $DB::single = 1;
817                 confess "local var gone missing" if !ref $self->{loc};
818                 my $loc = $self->{loc};
819                 $loc->{subject} = $line;
820                 $loc->{lines} = [];
821                 $self->state('sendbody');
822                 #push @out, $self->msg('sendbody');
823                 push @out, $self->msg('m8');
824         } elsif ($self->state eq 'sendbody') {
825                 confess "local var gone missing" if !ref $self->{loc};
826                 my $loc = $self->{loc};
827                 if ($line eq "\032" || $line eq '%1A' || uc $line eq "/EX") {
828                         my $to;
829                         
830                         if (@{$loc->{lines}} > 0) {
831                                 foreach $to (@{$loc->{to}}) {
832                                         my $ref;
833                                         my $systime = $main::systime;
834                                         my $mycall = $main::mycall;
835                                         $ref = DXMsg->alloc(DXMsg::next_transno('Msgno'),
836                                                                                 uc $to,
837                                                                                 $self->call, 
838                                                                                 $systime,
839                                                                                 $loc->{private}, 
840                                                                                 $loc->{subject}, 
841                                                                                 $mycall,
842                                                                                 '0',
843                                                                                 $loc->{rrreq});
844                                         $ref->store($loc->{lines});
845                                         $ref->add_dir();
846                                         push @out, $self->msg('m11', $ref->{msgno}, $to);
847                                         #push @out, "msgno $ref->{msgno} sent to $to";
848                                         my $dxchan = DXChannel->get(uc $to);
849                                         if ($dxchan) {
850                                                 if ($dxchan->is_user()) {
851                                                         $dxchan->send($dxchan->msg('m9'));
852                                                 }
853                                         }
854                                 }
855                         }
856                         delete $loc->{lines};
857                         delete $loc->{to};
858                         delete $self->{loc};
859                         $self->func(undef);
860                         
861                         $self->state('prompt');
862                 } elsif ($line eq "\031" || uc $line eq "/ABORT" || uc $line eq "/QUIT") {
863                         #push @out, $self->msg('sendabort');
864                         push @out, $self->msg('m10');
865                         delete $loc->{lines};
866                         delete $loc->{to};
867                         delete $self->{loc};
868                         $self->func(undef);
869                         $self->state('prompt');
870                 } else {
871                         
872                         # i.e. it ain't and end or abort, therefore store the line
873                         push @{$loc->{lines}}, length($line) > 0 ? $line : " ";
874                 }
875         }
876         return (1, @out);
877 }
878
879 # return the standard directory line for this ref 
880 sub dir
881 {
882         my $ref = shift;
883         return sprintf "%6d%s%s%5d %8.8s %8.8s %-6.6s %5.5s %-30.30s", 
884                 $ref->msgno, $ref->read ? '-' : ' ', $ref->private ? 'p' : ' ', $ref->size,
885                         $ref->to, $ref->from, cldate($ref->t), ztime($ref->t), $ref->subject;
886 }
887
888 # load the forward table
889 sub load_forward
890 {
891         my @out;
892         my $s = readfilestr($forwardfn);
893         if ($s) {
894                 eval $s;
895                 push @out, $@ if $@;
896         }
897         return @out;
898 }
899
900 # load the bad message table
901 sub load_badmsg
902 {
903         my @out;
904         my $s = readfilestr($badmsgfn);
905         if ($s) {
906                 eval $s;
907                 push @out, $@ if $@;
908         }
909         return @out;
910 }
911
912 #
913 # forward that message or not according to the forwarding table
914 # returns 1 for forward, 0 - to ignore
915 #
916
917 sub forward_it
918 {
919         my $ref = shift;
920         my $call = shift;
921         my $i;
922         
923         for ($i = 0; $i < @forward; $i += 5) {
924                 my ($sort, $field, $pattern, $action, $bbs) = @forward[$i..($i+4)]; 
925                 my $tested;
926                 
927                 # are we interested?
928                 next if $ref->{private} && $sort ne 'P';
929                 next if !$ref->{private} && $sort ne 'B';
930                 
931                 # select field
932                 $tested = $ref->{to} if $field eq 'T';
933                 my $at = $ref->{to} =~ /\@\s*(\S+)/;
934                 $tested = $at if $field eq '\@';
935                 $tested = $ref->{from} if $field eq 'F';
936                 $tested = $ref->{origin} if $field eq 'O';
937                 $tested = $ref->{subject} if $field eq 'S';
938
939                 if (!$pattern || $tested =~ m{$pattern}i) {
940                         return 0 if $action eq 'I';
941                         return 1 if !$bbs || grep $_ eq $call, @{$bbs};
942                 }
943         }
944         return 0;
945 }
946
947 no strict;
948 sub AUTOLOAD
949 {
950         my $self = shift;
951         my $name = $AUTOLOAD;
952         return if $name =~ /::DESTROY$/;
953         $name =~ s/.*:://o;
954         
955         confess "Non-existant field '$AUTOLOAD'" if !$valid{$name};
956         @_ ? $self->{$name} = shift : $self->{$name} ;
957 }
958
959 1;
960
961 __END__