]> scm.dxcluster.org Git - spider.git/blob - perl/cluster.pl
typo
[spider.git] / perl / cluster.pl
1 #!/usr/bin/perl -w
2 #
3 # This is the DX cluster 'daemon'. It sits in the middle of its little
4 # web of client routines sucking and blowing data where it may.
5 #
6 # Hence the name of 'spider' (although it may become 'dxspider')
7 #
8 # Copyright (c) 1998 Dirk Koopman G1TLH
9 #
10 # $Id$
11
12
13 require 5.004;
14
15 # make sure that modules are searched in the order local then perl
16 BEGIN {
17         umask 002;
18         
19         # root of directory tree for this system
20         $root = "/spider"; 
21         $root = $ENV{'DXSPIDER_ROOT'} if $ENV{'DXSPIDER_ROOT'};
22         
23         unshift @INC, "$root/perl";     # this IS the right way round!
24         unshift @INC, "$root/local";
25
26         # try to create and lock a lockfile (this isn't atomic but 
27         # should do for now
28         $lockfn = "$root/perl/cluster.lck";       # lock file name
29         if (-e $lockfn) {
30                 open(CLLOCK, "$lockfn") or die "Can't open Lockfile ($lockfn) $!";
31                 my $pid = <CLLOCK>;
32                 chomp $pid;
33                 die "Lockfile ($lockfn) and process $pid exist, another cluster running?" if kill 0, $pid;
34                 close CLLOCK;
35         }
36         open(CLLOCK, ">$lockfn") or die "Can't open Lockfile ($lockfn) $!";
37         print CLLOCK "$$\n";
38         close CLLOCK;
39
40         $is_win = ($^O =~ /^MS/ || $^O =~ /^OS-2/) ? 1 : 0; # is it Windows?
41         $systime = time;
42 }
43
44 use DXVars;
45 use Msg;
46 use IntMsg;
47 use Internet;
48 use Listeners;
49 use ExtMsg;
50 use AGWConnect;
51 use AGWMsg;
52 use DXDebug;
53 use DXLog;
54 use DXLogPrint;
55 use DXUtil;
56 use DXChannel;
57 use DXUser;
58 use DXM;
59 use DXCommandmode;
60 use DXProtVars;
61 use DXProtout;
62 use DXProt;
63 use DXMsg;
64 use DXCron;
65 use DXConnect;
66 use DXBearing;
67 use DXDb;
68 use DXHash;
69 use DXDupe;
70 use Script;
71 use Prefix;
72 use Spot;
73 use Bands;
74 use Keps;
75 use Minimuf;
76 use Sun;
77 use Geomag;
78 use CmdAlias;
79 use Filter;
80 use AnnTalk;
81 use BBS;
82 use WCY;
83 use BadWords;
84 use Timer;
85 use Route;
86 use Route::Node;
87 use Route::User;
88
89 use Data::Dumper;
90 use IO::File;
91 use Fcntl ':flock'; 
92 use POSIX ":sys_wait_h";
93
94 use Local;
95
96 package main;
97
98 use strict;
99 use vars qw(@inqueue $systime $version $starttime $lockfn @outstanding_connects 
100                         $zombies $root @listeners $lang $myalias @debug $userfn $clusteraddr 
101                         $clusterport $mycall $decease $is_win $routeroot 
102                    );
103
104 @inqueue = ();                                  # the main input queue, an array of hashes
105 $systime = 0;                                   # the time now (in seconds)
106 $version = "1.48";                              # the version no of the software
107 $starttime = 0;                 # the starting time of the cluster   
108 #@outstanding_connects = ();     # list of outstanding connects
109 @listeners = ();                                # list of listeners
110
111 use vars qw($VERSION $BRANCH $build $branch);
112 $VERSION = sprintf( "%d.%03d", q$Revision$ =~ /(\d+)\.(\d+)/ );
113 $BRANCH = sprintf( "%d.%03d", q$Revision$ =~ /\d+\.\d+\.(\d+)\.(\d+)/ ) || 0;
114 $main::build += 14;                             # add an offset to make it bigger than last system
115 $main::build += $VERSION;
116 $main::branch += $BRANCH;
117
118       
119 # send a message to call on conn and disconnect
120 sub already_conn
121 {
122         my ($conn, $call, $mess) = @_;
123
124         $conn->disable_read(1);
125         dbg("-> D $call $mess\n") if isdbg('chan'); 
126         $conn->send_now("D$call|$mess");
127         sleep(2);
128         $conn->disconnect;
129 }
130
131 sub error_handler
132 {
133         my $dxchan = shift;
134         $dxchan->{conn}->set_error(undef) if exists $dxchan->{conn};
135         $dxchan->disconnect(1);
136 }
137
138 # handle incoming messages
139 sub new_channel
140 {
141         my ($conn, $msg) = @_;
142         my ($sort, $call, $line) = DXChannel::decode_input(0, $msg);
143         return unless defined $sort;
144
145         unless (is_callsign($call)) {
146                 already_conn($conn, $call, DXM::msg($lang, "illcall", $call));
147                 return;
148         }
149
150         # set up the basic channel info
151         # is there one already connected to me - locally? 
152         my $user = DXUser->get($call);
153         my $dxchan = DXChannel->get($call);
154         if ($dxchan) {
155                 my $mess = DXM::msg($lang, ($user && $user->is_node) ? 'concluster' : 'conother', $call, $main::mycall);
156                 already_conn($conn, $call, $mess);
157                 return;
158         }
159
160         # is he locked out ?
161         my $basecall = $call;
162         $basecall =~ s/-\d+$//;
163         my $baseuser = DXUser->get($basecall);
164         if ($baseuser && $baseuser->lockout) {
165                 my $lock = $user->lockout if $user;
166                 if (!$user || !defined $lock || $lock) {
167                         my $host = $conn->{peerhost} || "unknown";
168                         Log('DXCommand', "$call on $host is locked out, disconnected");
169                         $conn->disconnect;
170                         return;
171                 }
172         }
173         
174         if ($user) {
175                 $user->{lang} = $main::lang if !$user->{lang}; # to autoupdate old systems
176         } else {
177                 $user = DXUser->new($call);
178         }
179         
180
181         # create the channel
182         $dxchan = DXCommandmode->new($call, $conn, $user) if $user->is_user;
183         $dxchan = DXProt->new($call, $conn, $user) if $user->is_node;
184         $dxchan = BBS->new($call, $conn, $user) if $user->is_bbs;
185         die "Invalid sort of user on $call = $sort" if !$dxchan;
186
187         # check that the conn has a callsign
188         $conn->conns($call) if $conn->isa('IntMsg');
189
190         # set callbacks
191         $conn->set_error(sub {error_handler($dxchan)});
192         $conn->set_rproc(sub {my ($conn,$msg) = @_; rec($dxchan, $conn, $msg);});
193         rec($dxchan, $conn, $msg);
194 }
195
196 sub rec 
197 {
198         my ($dxchan, $conn, $msg) = @_;
199         
200         # queue the message and the channel object for later processing
201         if (defined $msg) {
202                 my $self = bless {}, "inqueue";
203                 $self->{dxchan} = $dxchan;
204                 $self->{data} = $msg;
205                 push @inqueue, $self;
206         }
207 }
208
209 sub login
210 {
211         return \&new_channel;
212 }
213
214 # cease running this program, close down all the connections nicely
215 sub cease
216 {
217         my $dxchan;
218
219         unless ($is_win) {
220                 $SIG{'TERM'} = 'IGNORE';
221                 $SIG{'INT'} = 'IGNORE';
222         }
223         
224         DXUser::sync;
225
226         eval {
227                 Local::finish();   # end local processing
228         };
229         dbg("Local::finish error $@") if $@;
230
231         # disconnect nodes
232         foreach $dxchan (DXChannel->get_all_nodes) {
233             $dxchan->disconnect(2) unless $dxchan == $DXProt::me;
234         }
235         Msg->event_loop(100, 0.01);
236
237         # disconnect users
238         foreach $dxchan (DXChannel->get_all_users) {
239                 $dxchan->disconnect;
240         }
241
242         # disconnect AGW
243         AGWMsg::finish();
244
245         # end everything else
246         Msg->event_loop(100, 0.01);
247         DXUser::finish();
248         DXDupe::finish();
249
250         # close all databases
251         DXDb::closeall;
252
253         # close all listeners
254         foreach my $l (@listeners) {
255                 $l->close_server;
256         }
257
258         dbg("DXSpider version $version, build $build ended") if isdbg('chan');
259         Log('cluster', "DXSpider V$version, build $build ended");
260         dbgclose();
261         Logclose();
262         unlink $lockfn;
263 #       $SIG{__WARN__} = $SIG{__DIE__} =  sub {my $a = shift; cluck($a); };
264         exit(0);
265 }
266
267 # the reaper of children
268 sub reap
269 {
270         my $cpid;
271         while (($cpid = waitpid(-1, WNOHANG)) > 0) {
272                 dbg("cpid: $cpid") if isdbg('reap');
273 #               Msg->pid_gone($cpid);
274                 $zombies-- if $zombies > 0;
275         }
276         dbg("cpid: $cpid") if isdbg('reap');
277 }
278
279 # this is where the input queue is dealt with and things are dispatched off to other parts of
280 # the cluster
281 sub process_inqueue
282 {
283         while (@inqueue) {
284                 my $self = shift @inqueue;
285                 return if !$self;
286         
287                 my $data = $self->{data};
288                 my $dxchan = $self->{dxchan};
289                 my $error;
290                 my ($sort, $call, $line) = DXChannel::decode_input($dxchan, $data);
291                 return unless defined $sort;
292         
293                 # do the really sexy console interface bit! (Who is going to do the TK interface then?)
294                 dbg("<- $sort $call $line\n") if $sort ne 'D' && isdbg('chan');
295
296                 # handle A records
297                 my $user = $dxchan->user;
298                 if ($sort eq 'A' || $sort eq 'O') {
299                         $dxchan->start($line, $sort);  
300                 } elsif ($sort eq 'I') {
301                         die "\$user not defined for $call" if !defined $user;
302                         # normal input
303                         $dxchan->normal($line);
304                         $dxchan->disconnect if ($dxchan->{state} eq 'bye');
305                 } elsif ($sort eq 'Z') {
306                         $dxchan->disconnect;
307                 } elsif ($sort eq 'D') {
308                         ;                                       # ignored (an echo)
309                 } elsif ($sort eq 'G') {
310                         $dxchan->enhanced($line);
311                 } else {
312                         print STDERR atime, " Unknown command letter ($sort) received from $call\n";
313                 }
314         }
315 }
316
317 sub uptime
318 {
319         my $t = $systime - $starttime;
320         my $days = int $t / 86400;
321         $t -= $days * 86400;
322         my $hours = int $t / 3600;
323         $t -= $hours * 3600;
324         my $mins = int $t / 60;
325         return sprintf "%d %02d:%02d", $days, $hours, $mins;
326 }
327
328 sub AGWrestart
329 {
330         AGWMsg::init(\&new_channel);
331 }
332
333 #############################################################
334 #
335 # The start of the main line of code 
336 #
337 #############################################################
338
339 $starttime = $systime = time;
340 $lang = 'en' unless $lang;
341
342 # open the debug file, set various FHs to be unbuffered
343 dbginit(\&DXCommandmode::broadcast_debug);
344 foreach (@debug) {
345         dbgadd($_);
346 }
347 STDOUT->autoflush(1);
348
349 # calculate build number
350 $build += $main::version;
351 $build = "$build.$branch" if $branch;
352
353 Log('cluster', "DXSpider V$version, build $build started");
354
355 # banner
356 dbg("Copyright (c) 1998-2001 Dirk Koopman G1TLH");
357 dbg("DXSpider Version $version, build $build started");
358
359 # load Prefixes
360 dbg("loading prefixes ...");
361 Prefix::load();
362
363 # load band data
364 dbg("loading band data ...");
365 Bands::load();
366
367 # initialise User file system
368 dbg("loading user file system ..."); 
369 DXUser->init($userfn, 1);
370
371 # start listening for incoming messages/connects
372 dbg("starting listeners ...");
373 my $conn = IntMsg->new_server($clusteraddr, $clusterport, \&login);
374 $conn->conns("Server $clusteraddr/$clusterport");
375 push @listeners, $conn;
376 dbg("Internal port: $clusteraddr $clusterport");
377 foreach my $l (@main::listen) {
378         $conn = ExtMsg->new_server($l->[0], $l->[1], \&login);
379         $conn->conns("Server $l->[0]/$l->[1]");
380         push @listeners, $conn;
381         dbg("External Port: $l->[0] $l->[1]");
382 }
383 AGWrestart();
384
385 # load bad words
386 dbg("load badwords: " . (BadWords::load or "Ok"));
387
388 # prime some signals
389 unless ($DB::VERSION) {
390         $SIG{INT} = $SIG{TERM} = sub { $decease = 1 };
391 }
392
393 unless ($is_win) {
394         $SIG{HUP} = 'IGNORE';
395         $SIG{CHLD} = sub { $zombies++ };
396         
397         $SIG{PIPE} = sub {      dbg("Broken PIPE signal received"); };
398         $SIG{IO} = sub {        dbg("SIGIO received"); };
399         $SIG{WINCH} = $SIG{STOP} = $SIG{CONT} = 'IGNORE';
400         $SIG{KILL} = 'DEFAULT';     # as if it matters....
401
402         # catch the rest with a hopeful message
403         for (keys %SIG) {
404                 if (!$SIG{$_}) {
405                         #               dbg("Catching SIG $_") if isdbg('chan');
406                         $SIG{$_} = sub { my $sig = shift;       DXDebug::confess("Caught signal $sig");  }; 
407                 }
408         }
409 }
410
411 # start dupe system
412 DXDupe::init();
413
414 # read in system messages
415 DXM->init();
416
417 # read in command aliases
418 CmdAlias->init();
419
420 # initialise the Geomagnetic data engine
421 Geomag->init();
422 WCY->init();
423
424 # initial the Spot stuff
425 Spot->init();
426
427 # initialise the protocol engine
428 dbg("reading in duplicate spot and WWV info ...");
429 DXProt->init();
430
431 # put in a DXCluster node for us here so we can add users and take them away
432 $routeroot = Route::Node->new($mycall, $version*100+5300, Route::here($DXProt::me->here)|Route::conf($DXProt::me->conf));
433
434 # make sure that there is a routing OUTPUT node default file
435 #unless (Filter::read_in('route', 'node_default', 0)) {
436 #       my $dxcc = $DXProt::me->dxcc;
437 #       $Route::filterdef->cmd($DXProt::me, 'route', 'accept', "node_default call $mycall" );
438 #}
439
440 # read in any existing message headers and clean out old crap
441 dbg("reading existing message headers ...");
442 DXMsg->init();
443 DXMsg::clean_old();
444
445 # read in any cron jobs
446 dbg("reading cron jobs ...");
447 DXCron->init();
448
449 # read in database descriptors
450 dbg("reading database descriptors ...");
451 DXDb::load();
452
453 # starting local stuff
454 dbg("doing local initialisation ...");
455 eval {
456         Local::init();
457 };
458 dbg("Local::init error $@") if $@;
459
460 dbg("cleaning out old debug files");
461 DXDebug::dbgclean();
462
463 # print various flags
464 #dbg("seful info - \$^D: $^D \$^W: $^W \$^S: $^S \$^P: $^P");
465
466 # this, such as it is, is the main loop!
467 dbg("orft we jolly well go ...");
468 my $script = new Script "startup";
469 $script->run($DXProt::me) if $script;
470
471 #open(DB::OUT, "|tee /tmp/aa");
472
473 for (;;) {
474 #       $DB::trace = 1;
475         
476         Msg->event_loop(10, 0.010);
477         my $timenow = time;
478         process_inqueue();                      # read in lines from the input queue and despatch them
479 #       $DB::trace = 0;
480         
481         # do timed stuff, ongoing processing happens one a second
482         if ($timenow != $systime) {
483                 reap if $zombies;
484                 $systime = $timenow;
485                 DXCron::process();      # do cron jobs
486                 DXCommandmode::process(); # process ongoing command mode stuff
487                 DXProt::process();              # process ongoing ak1a pcxx stuff
488                 DXConnect::process();
489                 DXMsg::process();
490                 DXDb::process();
491                 DXUser::process();
492                 DXDupe::process();
493                 AGWMsg::process();
494                                 
495                 eval { 
496                         Local::process();       # do any localised processing
497                 };
498                 dbg("Local::process error $@") if $@;
499         }
500         if ($decease) {
501                 last if --$decease <= 0;
502         }
503 }
504 cease(0);
505 exit(0);
506
507