]> scm.dxcluster.org Git - spider.git/blob - perl/Msg.pm
move ::process things back into persec
[spider.git] / perl / Msg.pm
1 #
2 # This has been taken from the 'Advanced Perl Programming' book by Sriram Srinivasan 
3 #
4 # I am presuming that the code is distributed on the same basis as perl itself.
5 #
6 # I have modified it to suit my devious purposes (Dirk Koopman G1TLH)
7 #
8 #
9 #
10
11 package Msg;
12
13 use strict;
14
15 use DXUtil;
16
17 use Mojo::IOLoop;
18 use Mojo::IOLoop::Stream;
19
20 use DXDebug;
21 use Timer;
22
23 use vars qw($now %conns $noconns $cnum $total_in $total_out $connect_timeout $disc_waittime);
24
25 $total_in = $total_out = 0;
26
27 $now = time;
28
29 $cnum = 0;
30 $connect_timeout = 5;
31 $disc_waittime = 3;
32
33 our %delqueue;
34
35 #
36 #-----------------------------------------------------------------
37 # Generalised initializer
38
39 sub new
40 {
41     my ($pkg, $rproc) = @_;
42         my $obj = ref($pkg);
43         my $class = $obj || $pkg;
44
45     my $conn = {
46         rproc => $rproc,
47                 inqueue => [],
48                 outqueue => [],
49                 state => 0,
50                 lineend => "\r\n",
51                 csort => 'telnet',
52                 timeval => 60,
53                 blocking => 0,
54                 cnum => (($cnum < 999) ? (++$cnum) : ($cnum = 1)),
55     };
56
57         $noconns++;
58         
59         dbg("$class Connection created (total $noconns)") if isdbg('connll');
60         return bless $conn, $class;
61 }
62
63 sub set_error
64 {
65         my $conn = shift;
66         my $callback = shift;
67         $conn->{sock}->on(error => sub {$callback->($_[1]);});
68 }
69
70 sub set_on_eof
71 {
72         my $conn = shift;
73         my $callback = shift;
74         $conn->{sock}->on(close => sub {$callback->()});
75 }
76
77 sub set_rproc
78 {
79         my $conn = shift;
80         my $callback = shift;
81         $conn->{rproc} = $callback;
82 }
83
84 # save it
85 sub conns
86 {
87         my $pkg = shift;
88         my $call = shift;
89         my $ref;
90         
91         if (ref $pkg) {
92                 $call = $pkg->{call} unless $call;
93                 return undef unless $call;
94                 dbg((ref $pkg) . " changing $pkg->{call} to $call") if isdbg('connll') && exists $pkg->{call} && $call ne $pkg->{call};
95                 delete $conns{$pkg->{call}} if exists $pkg->{call} && exists $conns{$pkg->{call}} && $pkg->{call} ne $call; 
96                 $pkg->{call} = $call;
97                 $ref = $conns{$call} = $pkg;
98                 dbg((ref $pkg) . " Connection $pkg->{cnum} $call stored") if isdbg('connll');
99         } else {
100                 $ref = $conns{$call};
101         }
102         return $ref;
103 }
104
105 # this is only called by any dependent processes going away unexpectedly
106 sub pid_gone
107 {
108         my ($pkg, $pid) = @_;
109         
110         my @pid = grep {$_->{pid} == $pid} values %conns;
111         foreach my $p (@pid) {
112                 &{$p->{eproc}}($p, "$pid has gorn") if exists $p->{eproc};
113                 $p->disconnect;
114         }
115 }
116
117 sub ax25
118 {
119         my $conn = shift;
120         return $conn->{csort} eq 'ax25';
121 }
122
123 sub peerhost
124 {
125         my $conn = shift;
126         unless ($conn->{peerhost}) {
127                 $conn->{peerhost} ||= 'ax25' if $conn->ax25;
128                 $conn->{peerhost} ||= $conn->{sock}->handle->peerhost if $conn->{sock};
129                 $conn->{peerhost} ||= 'UNKNOWN';
130         }
131         return $conn->{peerhost};
132 }
133
134 #-----------------------------------------------------------------
135 # Send side routines
136
137 sub _on_connect
138 {
139         my $conn = shift;
140         my $handle = shift;
141         undef $conn->{sock};
142         my $sock = $conn->{sock} = Mojo::IOLoop::Stream->new($handle);
143         $sock->on(read => sub {$conn->_rcv($_[1]);} );
144         $sock->on(error => sub {delete $conn->{sock}; $conn->disconnect;});
145         $sock->on(close => sub {delete $conn->{sock}; $conn->disconnect;});
146         $sock->timeout(0);
147         $sock->start;
148         $conn->{peerhost} = eval { $handle->peerhost; };
149         dbg((ref $conn) . " connected $conn->{cnum} to $conn->{peerhost}:$conn->{peerport}") if isdbg('connll');
150         if ($conn->{on_connect}) {
151                 &{$conn->{on_connect}}($conn, $handle);
152         }
153 }
154
155 sub is_connected
156 {
157         my $conn = shift;
158         my $sock = $conn->{sock};
159         return ref $sock && $sock->isa('Mojo::IOLoop::Stream');
160 }
161
162 sub connect {
163     my ($pkg, $to_host, $to_port, %args) = @_;
164         my $timeout = delete $args{timeout} || $connect_timeout;
165         
166     # Create a connection end-point object
167     my $conn = $pkg;
168         unless (ref $pkg) {
169                 my $rproc = delete $args{rproc}; 
170                 $conn = $pkg->new($rproc);
171         }
172         $conn->{peerhost} = $to_host;
173         $conn->{peerport} = $to_port;
174         $conn->{sort} = 'Outgoing';
175
176         dbg((ref $conn) . " connecting $conn->{cnum} to $to_host:$to_port") if isdbg('connll');
177         
178         my $sock;
179         $conn->{sock} = $sock = Mojo::IOLoop::Client->new;
180         $sock->on(connect => sub {$conn->_on_connect($_[1])} );
181         $sock->on(error => sub {&{$conn->{eproc}}($conn, $_[1]) if exists $conn->{eproc}; $conn->disconnect});
182         $sock->on(close => sub {$conn->disconnect});
183
184         # copy any args like on_connect, on_disconnect etc
185         while (my ($k, $v) = each %args) {
186                 $conn->{$k} = $v;
187         }
188         
189         $sock->connect(address => $to_host, port => $to_port, timeout => $timeout);
190         
191     return $conn;
192 }
193
194 sub start_program
195 {
196         my ($conn, $line, $sort) = @_;
197         my $pid;
198         
199 #       local $^F = 10000;              # make sure it ain't closed on exec
200 #       my ($a, $b) = $io_socket->socketpair(AF_UNIX, SOCK_STREAM, PF_UNSPEC);
201 #       if ($a && $b) {
202 #               $a->autoflush(1);
203 #               $b->autoflush(1);
204 #               $pid = fork;
205 #               if (defined $pid) {
206 #                       if ($pid) {
207 #                               close $b;
208 #                               $conn->{sock} = $a;
209 #                               $conn->{csort} = $sort;
210 #                               $conn->{lineend} = "\cM" if $sort eq 'ax25';
211 #                               $conn->{pid} = $pid;
212 #                               if ($conn->{rproc}) {
213 #                                       my $callback = sub {$conn->_rcv};
214 #                                       Msg::set_event_handler ($a, read => $callback);
215 #                               }
216 #                               dbg("connect $conn->{cnum}: started pid: $conn->{pid} as $line") if isdbg('connect');
217 #                       } else {
218 #                               $^W = 0;
219 #                               dbgclose();
220 #                               STDIN->close;
221 #                               STDOUT->close;
222 #                               STDOUT->close;
223 #                               *STDIN = IO::File->new_from_fd($b, 'r') or die;
224 #                               *STDOUT = IO::File->new_from_fd($b, 'w') or die;
225 #                               *STDERR = IO::File->new_from_fd($b, 'w') or die;
226 #                               close $a;
227 #                               unless ($main::is_win) {
228 #                                       #                                               $SIG{HUP} = 'IGNORE';
229 #                                       $SIG{HUP} = $SIG{CHLD} = $SIG{TERM} = $SIG{INT} = 'DEFAULT';
230 #                                       alarm(0);
231 #                               }
232 #                               exec "$line" or dbg("exec '$line' failed $!");
233 #                       } 
234 #               } else {
235 #                       dbg("cannot fork for $line");
236 #               }
237 #       } else {
238 #               dbg("no socket pair $! for $line");
239 #       }
240         return $pid;
241 }
242
243 sub disconnect
244 {
245         my $conn = shift;
246         my $count = $conn->{disconnecting}++;
247         if (isdbg('connll')) {
248                 my ($pkg, $fn, $line) = caller;
249                 dbg((ref $conn) . "::disconnect on call $conn->{call} attempt $conn->{disconnecting} called from ${pkg}::${fn} line $line ");
250         }
251         return if $count;
252
253         
254         my $sock = $conn->{sock};
255         if ($sock) {
256
257                 # remove me from the active list
258                 my $call;
259                 if ($call = $conn->{call}) {
260                         my $ref = $conns{$call};
261                         delete $conns{$call} if $ref && $ref == $conn;
262                 }
263                 $conn->{delay} = Mojo::IOLoop->delay (
264 #                                Mojo::IOLoop->delay (
265                                                                                           sub {
266                                                                                                   my $delay = shift;
267                                                                                                   dbg("before drain $call");
268                                                                                                   $sock->on(drain => $delay->begin);
269                                                                                                   1;
270                                                                                           },
271                                                                                           sub {
272                                                                                                   my $delay = shift;
273                                                                                                   _close_it($conn);
274                                                                                                   1;
275                                                                                           }
276                                                                                          );
277                 $conn->{delay}->wait;
278                 
279                 $delqueue{$conn} = $conn; # save this connection until everything is finished
280         } else {
281                 dbg((ref $conn) . " socket missing on $conn->{call}") if isdbg('connll');
282                 _close_it($conn);
283         }
284 }
285
286 sub _close_it
287 {
288     my $conn = shift;
289     my $sock = delete $conn->{sock};
290         $conn->{state} = 'E';
291         $conn->{timeout}->del if $conn->{timeout};
292
293         if (isdbg('connll')) {
294                 my ($pkg, $fn, $line) = caller;
295                 dbg((ref $conn) . "::_close_it on call $conn->{call} attempt $conn->{disconnecting} called from ${pkg}::${fn} line $line ");
296         }
297
298         # be careful to delete the correct one
299         my $call;
300         if ($call = $conn->{call}) {
301                 my $ref = $conns{$call};
302                 delete $conns{$call} if $ref && $ref == $conn;
303         }
304         $call ||= 'unallocated';
305
306         dbg((ref $conn) . " Connection $conn->{cnum} $call starting to close") if isdbg('connll');
307         
308         if ($conn->{on_disconnect}) {
309                 &{$conn->{on_disconnect}}($conn);
310         }
311
312         if ($sock) {
313                 dbg((ref $conn) . " Connection $conn->{cnum} $call closing gracefully") if isdbg('connll');
314                 $sock->close_gracefully;
315         }
316         
317         # get rid of any references
318         for (keys %$conn) {
319                 if (ref($conn->{$_})) {
320                         delete $conn->{$_};
321                 }
322         }
323
324         delete $delqueue{$conn};        # finally remove the $conn
325         
326         unless ($main::is_win) {
327                 kill 'TERM', $conn->{pid} if exists $conn->{pid};
328         }
329 }
330
331 sub _send_stuff
332 {
333         my $conn = shift;
334         my $rq = $conn->{outqueue};
335     my $sock = $conn->{sock};
336         return unless defined $sock;
337         return if $conn->{disconnecting};
338         
339         while (@$rq) {
340                 my $data = shift @$rq;
341                 my $lth = length $data;
342                 my $call = $conn->{call} || 'none';
343                 if (isdbg('raw')) {
344                         if (isdbg('raw')) {
345                                 dbgdump('raw', "$call send $lth: ", $lth);
346                         }
347                 }
348                 if (defined $sock) {
349                         $sock->write($data);
350                         $total_out += $lth;
351                 } else {
352                         dbg("_send_stuff $call ending data ignored: $data");
353                 }
354         }
355 }
356
357 sub send_now {
358     my ($conn, $msg) = @_;
359     $conn->enqueue($msg);
360     _send_stuff($conn);
361 }
362
363 sub send_later {
364         goto &send_now;
365 }
366
367 sub send_raw
368 {
369     my ($conn, $msg) = @_;
370         push @{$conn->{outqueue}}, $msg;
371         _send_stuff($conn);
372 }
373
374 sub enqueue {
375     my $conn = shift;
376     push @{$conn->{outqueue}}, defined $_[0] ? $_[0] : '';
377 }
378
379 sub _err_will_block 
380 {
381         return 0;
382 }
383
384 sub close_on_empty
385 {
386         my $conn = shift;
387         $conn->{sock}->on(drain => sub {$conn->disconnect;});
388 }
389
390 #-----------------------------------------------------------------
391 # Receive side routines
392
393 sub new_server 
394 {
395 #    @_ == 4 || die "Msg->new_server (myhost, myport, login_proc)\n";
396         my ($pkg, $my_host, $my_port, $login_proc) = @_;
397         my $conn = $pkg->new($login_proc);
398         
399     my $sock = $conn->{sock} = Mojo::IOLoop::Server->new;
400         $sock->on(accept=>sub{$conn->new_client($_[1]);});
401         $sock->listen(address=>$my_host, port=>$my_port);
402         $sock->start;
403         
404     die "Could not create socket: $! \n" unless $conn->{sock};
405         return $conn;
406 }
407
408
409 sub nolinger
410 {
411         my $conn = shift;
412 }
413
414 sub dequeue
415 {
416         my $conn = shift;
417         return if $conn->{disconnecting};
418         
419         if ($conn->{msg} =~ /\cJ/) {
420                 my @lines = split /\cM?\cJ/, $conn->{msg};
421                 if ($conn->{msg} =~ /\cM?\cJ$/) {
422                         delete $conn->{msg};
423                 } else {
424                         $conn->{msg} = pop @lines;
425                 }
426                 for (@lines) {
427                         last if $conn->{disconnecting};
428                         &{$conn->{rproc}}($conn, defined $_ ? $_ : '');
429                 }
430         }
431 }
432
433 sub _rcv {                     # Complement to _send
434     my $conn = shift; # $rcv_now complement of $flush
435         my $msg = shift;
436     my $sock = $conn->{sock};
437     return unless defined($sock);
438         return if $conn->{disconnecting};
439
440         $total_in += length $msg;
441
442         my @lines;
443         if (isdbg('raw')) {
444                 my $call = $conn->{call} || 'none';
445                 my $lth = length $msg;
446                 dbgdump('raw', "$call read $lth: ", $msg);
447         }
448         if ($conn->{echo}) {
449                 my @ch = split //, $msg;
450                         my $out;
451                         for (@ch) {
452                                 if (/[\cH\x7f]/) {
453                                         $out .= "\cH \cH";
454                                         $conn->{msg} =~ s/.$//;
455                                 } else {
456                                         $out .= $_;
457                                         $conn->{msg} .= $_;
458                                 }
459                         }
460                         if (defined $out) {
461                                 $conn->send_raw($out);
462                         }
463         } else {
464                 $conn->{msg} .= $msg;
465         }
466
467         unless ($conn->{disable_read}) {
468                 $conn->dequeue if exists $conn->{msg};
469         }
470 }
471
472 sub new_client {
473         my $server_conn = shift;
474         my $handle = shift;
475         
476         my $conn = $server_conn->new($server_conn->{rproc});
477         my $sock = $conn->{sock} = Mojo::IOLoop::Stream->new($handle);
478         $sock->on(read => sub {$conn->_rcv($_[1])});
479         $sock->timeout(0);
480         $sock->start;
481         dbg((ref $conn) . "accept $conn->{cnum} from $conn->{peerhost} $conn->{peerport}") if isdbg('connll');
482
483         my ($rproc, $eproc) = &{$server_conn->{rproc}} ($conn, $conn->{peerhost} = $handle->peerhost, $conn->{peerport} = $handle->peerport);
484         $conn->{sort} = 'Incoming';
485         if ($eproc) {
486                 $conn->{eproc} = $eproc;
487         }
488         if ($rproc) {
489                 $conn->{rproc} = $rproc;
490         } else {  # Login failed
491                 &{$conn->{eproc}}($conn, undef) if exists $conn->{eproc};
492                 $conn->disconnect();
493         }
494         return $conn;
495 }
496
497 sub close_server
498 {
499         my $conn = shift;
500         delete $conn->{sock};
501 }
502
503 # close all clients (this is for forking really)
504 sub close_all_clients
505 {
506         foreach my $conn (values %conns) {
507                 $conn->disconnect;
508         }
509 }
510
511 sub disable_read
512 {
513         my $conn = shift;
514         return defined $_[0] ? $conn->{disable_read} = $_[0] : $_[0];
515 }
516
517
518 #
519 #----------------------------------------------------
520 # Event loop routines used by both client and server
521
522 sub set_event_handler {
523         my $sock = shift;
524         my %args = @_;
525         my ($pkg, $fn, $line) = caller;
526         my $s;
527         foreach (my ($k,$v) = each %args) {
528                 $s .= "$k => $v, ";
529         }
530         $s =~ s/[\s,]$//;
531         dbg("Msg::set_event_handler called from ${pkg}::${fn} line $line doing $s");
532 }
533
534 sub sleep
535 {
536         my ($pkg, $interval) = @_;
537         my $now = time;
538         while (time - $now < $interval) {
539                 sleep 1;
540         }
541 }
542
543 sub DESTROY
544 {
545         my $conn = shift;
546         my $call = $conn->{call} || 'unallocated';
547
548         if (isdbg('connll')) {
549                 my ($pkg, $fn, $line) = caller;
550                 dbg((ref $conn) . "::DESTROY on call $call called from ${pkg}::${fn} line $line ");
551                 
552         }
553
554         my $call = $conn->{call} || 'unallocated';
555         my $host = $conn->{peerhost} || '';
556         my $port = $conn->{peerport} || '';
557         my $sock = $conn->{sock};
558
559         if ($sock) {
560                 $sock->close_gracefully;
561         }
562         
563         $noconns--;
564         dbg((ref $conn) . " Connection $conn->{cnum} $call [$host $port] being destroyed (total $noconns)") if isdbg('connll');
565 }
566
567 1;
568
569 __END__
570