projects
/
spider.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
only put somthing in lastt if it is actively being moved
[spider.git]
/
perl
/
Msg.pm
diff --git
a/perl/Msg.pm
b/perl/Msg.pm
index 1ed5377c20f890238aba810e798f4c54ec5bfeef..8be2fc1bf071d39dd464907de8397da7fdd27891 100644
(file)
--- a/
perl/Msg.pm
+++ b/
perl/Msg.pm
@@
-16,7
+16,7
@@
use IO::Socket;
use DXDebug;
use Timer;
use DXDebug;
use Timer;
-use vars qw(%rd_callbacks %wt_callbacks %er_callbacks $rd_handles $wt_handles $er_handles $now %conns $noconns);
+use vars qw(%rd_callbacks %wt_callbacks %er_callbacks $rd_handles $wt_handles $er_handles $now %conns $noconns
$blocking_supported $cnum
);
%rd_callbacks = ();
%wt_callbacks = ();
%rd_callbacks = ();
%wt_callbacks = ();
@@
-26,14
+26,20
@@
$wt_handles = IO::Select->new();
$er_handles = IO::Select->new();
$now = time;
$er_handles = IO::Select->new();
$now = time;
-my $blocking_supported = 0;
BEGIN {
# Checks if blocking is supported
eval {
BEGIN {
# Checks if blocking is supported
eval {
- require POSIX; POSIX->import(qw
(F_SETFL F_GETFL O_NONBLOCK));
+ require POSIX; POSIX->import(qw
(O_NONBLOCK F_SETFL F_GETFL))
};
};
- $blocking_supported = 1 unless $@;
+ if ($@ || $main::is_win) {
+# print STDERR "POSIX Blocking *** NOT *** supported $@\n";
+ $blocking_supported = 0;
+ } else {
+ $blocking_supported = 1;
+# print STDERR "POSIX Blocking enabled\n";
+ }
+
# import as many of these errno values as are available
eval {
# import as many of these errno values as are available
eval {
@@
-47,6
+53,8
@@
my $eagain = eval {EAGAIN()};
my $einprogress = eval {EINPROGRESS()};
my $ewouldblock = eval {EWOULDBLOCK()};
$^W = $w;
my $einprogress = eval {EINPROGRESS()};
my $ewouldblock = eval {EWOULDBLOCK()};
$^W = $w;
+$cnum = 0;
+
#
#-----------------------------------------------------------------
#
#-----------------------------------------------------------------
@@
-67,9
+75,11
@@
sub new
csort => 'telnet',
timeval => 60,
blocking => 0,
csort => 'telnet',
timeval => 60,
blocking => 0,
+ cnum => (($cnum < 999) ? (++$cnum) : ($cnum = 1)),
};
$noconns++;
};
$noconns++;
+
dbg('connll', "Connection created ($noconns)");
return bless $conn, $class;
}
dbg('connll', "Connection created ($noconns)");
return bless $conn, $class;
}
@@
-112,10
+122,11
@@
sub conns
if (ref $pkg) {
$call = $pkg->{call} unless $call;
return undef unless $call;
if (ref $pkg) {
$call = $pkg->{call} unless $call;
return undef unless $call;
- confess "changing $pkg->{call} to $call" if exists $pkg->{call} && $call ne $pkg->{call};
+ dbg('connll', "changing $pkg->{call} to $call") if exists $pkg->{call} && $call ne $pkg->{call};
+ delete $conns{$pkg->{call}} if exists $pkg->{call} && exists $conns{$pkg->{call}} && $pkg->{call} ne $call;
$pkg->{call} = $call;
$ref = $conns{$call} = $pkg;
$pkg->{call} = $call;
$ref = $conns{$call} = $pkg;
- dbg('connll', "Connection $call stored");
+ dbg('connll', "Connection $
pkg->{cnum} $
call stored");
} else {
$ref = $conns{$call};
}
} else {
$ref = $conns{$call};
}
@@
-128,9
+139,9
@@
sub pid_gone
my ($pkg, $pid) = @_;
my @pid = grep {$_->{pid} == $pid} values %conns;
my ($pkg, $pid) = @_;
my @pid = grep {$_->{pid} == $pid} values %conns;
- for (@pid) {
- &{$
_->{eproc}}($_, "$pid has gorn") if exists $_
->{eproc};
- $
_
->disconnect;
+ for
each my $p
(@pid) {
+ &{$
p->{eproc}}($p, "$pid has gorn") if exists $p
->{eproc};
+ $
p
->disconnect;
}
}
}
}
@@
-155,10
+166,8
@@
sub connect {
my $proto = getprotobyname('tcp');
$sock->socket(AF_INET, SOCK_STREAM, $proto) or return undef;
my $proto = getprotobyname('tcp');
$sock->socket(AF_INET, SOCK_STREAM, $proto) or return undef;
- if ($conn->{blocking}) {
- blocking($sock, 0);
- $conn->{blocking} = 0;
- }
+ blocking($sock, 0);
+ $conn->{blocking} = 0;
my $ip = gethostbyname($to_host);
# my $r = $sock->connect($to_port, $ip);
my $ip = gethostbyname($to_host);
# my $r = $sock->connect($to_port, $ip);
@@
-190,9
+199,9
@@
sub disconnect {
delete $conns{$call} if $ref && $ref == $conn;
}
$call ||= 'unallocated';
delete $conns{$call} if $ref && $ref == $conn;
}
$call ||= 'unallocated';
- dbg('connll', "Connection $call disconnected");
+ dbg('connll', "Connection $c
onn->{cnum} $c
all disconnected");
- unless ($
^O =~ /^MS/i
) {
+ unless ($
main::is_win
) {
kill 'TERM', $conn->{pid} if exists $conn->{pid};
}
kill 'TERM', $conn->{pid} if exists $conn->{pid};
}
@@
-268,20
+277,21
@@
sub _send {
$conn->disconnect;
return 0; # fail. Message remains in queue ..
}
$conn->disconnect;
return 0; # fail. Message remains in queue ..
}
- }
+ } elsif (isdbg('raw')) {
+ my $call = $conn->{call} || 'none';
+ dbgdump('raw', "$call send $bytes_written: ", $msg);
+ }
$offset += $bytes_written;
$bytes_to_write -= $bytes_written;
}
delete $conn->{send_offset};
$offset = 0;
shift @$rq;
$offset += $bytes_written;
$bytes_to_write -= $bytes_written;
}
delete $conn->{send_offset};
$offset = 0;
shift @$rq;
- last unless $flush; # Go back to select and wait
+
#
last unless $flush; # Go back to select and wait
# for it to fire again.
}
# Call me back if queue has not been drained.
# for it to fire again.
}
# Call me back if queue has not been drained.
- if (@$rq) {
- set_event_handler ($sock, write => sub {$conn->_send(0)});
- } else {
+ unless (@$rq) {
set_event_handler ($sock, write => undef);
if (exists $conn->{close_on_empty}) {
&{$conn->{eproc}}($conn, undef) if exists $conn->{eproc};
set_event_handler ($sock, write => undef);
if (exists $conn->{close_on_empty}) {
&{$conn->{eproc}}($conn, undef) if exists $conn->{eproc};
@@
-370,6
+380,10
@@
sub _rcv { # Complement to _send
if (defined ($bytes_read)) {
if ($bytes_read > 0) {
$conn->{msg} .= $msg;
if (defined ($bytes_read)) {
if ($bytes_read > 0) {
$conn->{msg} .= $msg;
+ if (isdbg('raw')) {
+ my $call = $conn->{call} || 'none';
+ dbgdump('raw', "$call read $bytes_read: ", $msg);
+ }
}
} else {
if (_err_will_block($!)) {
}
} else {
if (_err_will_block($!)) {
@@
-384,7
+398,9
@@
FINISH:
&{$conn->{eproc}}($conn, $!) if exists $conn->{eproc};
$conn->disconnect;
} else {
&{$conn->{eproc}}($conn, $!) if exists $conn->{eproc};
$conn->disconnect;
} else {
- $conn->dequeue if exists $conn->{msg};
+ unless ($conn->{disable_read}) {
+ $conn->dequeue if exists $conn->{msg};
+ }
}
}
}
}
@@
-394,6
+410,8
@@
sub new_client {
if ($sock) {
my $conn = $server_conn->new($server_conn->{rproc});
$conn->{sock} = $sock;
if ($sock) {
my $conn = $server_conn->new($server_conn->{rproc});
$conn->{sock} = $sock;
+ blocking($sock, 0);
+ $conn->{blocking} = 0;
my ($rproc, $eproc) = &{$server_conn->{rproc}} ($conn, $conn->{peerhost} = $sock->peerhost(), $conn->{peerport} = $sock->peerport());
$conn->{sort} = 'Incoming';
if ($eproc) {
my ($rproc, $eproc) = &{$server_conn->{rproc}} ($conn, $conn->{peerhost} = $sock->peerhost(), $conn->{peerport} = $sock->peerport());
$conn->{sort} = 'Incoming';
if ($eproc) {
@@
-423,11
+441,18
@@
sub close_server
# close all clients (this is for forking really)
sub close_all_clients
{
# close all clients (this is for forking really)
sub close_all_clients
{
- for (values %conns) {
- $
_
->disconnect;
+ for
each my $conn
(values %conns) {
+ $
conn
->disconnect;
}
}
}
}
+sub disable_read
+{
+ my $conn = shift;
+ set_event_handler ($conn->{sock}, read => undef);
+ return $_[0] ? $conn->{disable_read} = $_[0] : $_[0];
+}
+
#
#----------------------------------------------------
# Event loop routines used by both client and server
#
#----------------------------------------------------
# Event loop routines used by both client and server
@@
-476,7
+501,7
@@
sub event_loop {
# Quit the loop if no handles left to process
last unless ($rd_handles->count() || $wt_handles->count());
# Quit the loop if no handles left to process
last unless ($rd_handles->count() || $wt_handles->count());
- ($rset, $wset) = IO::Select->select($rd_handles, $wt_handles, $er_handles, $timeout);
+ ($rset, $wset
, $eset
) = IO::Select->select($rd_handles, $wt_handles, $er_handles, $timeout);
foreach $e (@$eset) {
&{$er_callbacks{$e}}($e) if exists $er_callbacks{$e};
foreach $e (@$eset) {
&{$er_callbacks{$e}}($e) if exists $er_callbacks{$e};
@@
-509,7
+534,9
@@
sub DESTROY
{
my $conn = shift;
my $call = $conn->{call} || 'unallocated';
{
my $conn = shift;
my $call = $conn->{call} || 'unallocated';
- dbg('connll', "Connection $call being destroyed ($noconns)");
+ my $host = $conn->{peerhost} || '';
+ my $port = $conn->{peerport} || '';
+ dbg('connll', "Connection $conn->{cnum} $call [$host $port] being destroyed");
$noconns--;
}
$noconns--;
}