# Copyright (c) 2003 Simcon GmbH (support@simcon-mt.de) # # This program is free software; you can redistribute it and/or modify it # under the terms of the GNU General Public License as published by the Free # Software Foundation; either version 2 of the License, or (at your option) # any later version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for # more details. # # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. package NetHandling; =head1 NAME NetHandling.pm - Library that handles various connections to the network =head1 SYNOPSIS use NetHandling; my %cnx; $cnx{connect_ok} = \&init_conversation; $cnx{sock_error} = \&report_error; $cnx{read_ready} = \&get_msg; $cnx{write_ready} = \&send_msg; my $msg = NetHandling->register_cfg(\%cnx); die $msg if $msg; my %tmr; $tmr{handler} = \&wait_tout; $tmr{arg} = \%cnx; $tmr{period} = 120; $msg = NetHandling->register_tmr(\%tmr); die $msg if $msg; NetHandling->dispatcher(); NetHandling->drop_tmr(\%tmr); NetHandling->drop_cfg(\%cnx); =cut use strict; use IO::Socket::INET; use IO::Socket::UNIX; use Errno; use Socket; use POSIX; use Fcntl; use constant CFG_SLOT => 2; use constant RH_SLOT => 1; use constant WH_SLOT => 0; our $Terminate = 0; our $wr = ''; our $rd = ''; our $wrout = ''; our $rdout = ''; our $ticks_per_sec = POSIX::sysconf( &POSIX::_SC_CLK_TCK ); our ($TimeNow) = POSIX::times(); # here we shall store information about each connection my @cnx; # and here about each timer my @touts; =head1 DESCRIPTION =head1 FUNCTIONS =over 8 =item B This function may be used to reinitiate the timer handling in cases when the time between "use NetHandling" and actuall call to "dispatcher" is large. =cut sub init { ($TimeNow) = POSIX::times(); } =item B This function registers with the system some configuration for socket. The function expects reference to hash with parameters. Mandatory parameter is I. I defines protocol to be used with the socket. For valid values check L or L documentations. To specify domain for the socket use I parameter. Valid values are either UNIX or INET. Domain defaults to INET. If I is given among parameters then socket shall be bound to that address. Note, the I should accompany the host name in I. If I is present then socket shall be listener and accept connections. If I is given then connection shall be initiated. Note, the I must accompany the host name in I. Besides these parameters configuration shall define some handlers for call back. Please see description for B for details. If something is wrong the function returns string description of problem. If everything is fine then returned value is undefined. Also parameter 'Sock' shall be added to configuration and shall point to the socket. Please note that if you define I then you must also define either I or I handlers. Connection is done asynchronously, which means that if you want to know if connection has failed then you need to provide I parameter. If I is not present then I is expected and I handler which shall be called when read on the socket is available. When you want to provide packed address already then use keys I and I. These keys shall be set by the function if you don't provide them. This function can be used to add configuration for existing socket. In this case parameter Sock must be defined. Please note that no other actions beside registration shall be performed. So you shall use explicitly need_read or need_write functions. =cut sub resolve_port_name { my $name = shift; return $name if($name =~ /^\d+$/); my @d = getservbyname($name, 'tcp'); return @d ? $d[2] : undef; } sub register_cfg { shift unless ref $_[0]; my $cfg = shift; if(exists $cfg->{Sock}) { my $fno = fileno($cfg->{Sock}); $cnx[$fno][CFG_SLOT] = $cfg; #warn("DBG $cfg->{ID} socket is added\n"); fcntl($cfg->{Sock}, F_SETFL, O_NONBLOCK) or die("Can't use fcntl: $!\n"); return; } return "Need proto definition" unless exists $cfg->{proto}; return "Support only 'tcp' or 'udp'\n" unless $cfg->{proto} eq 'tcp' || $cfg->{proto} eq 'udp'; return "Need either peer_addr or local_addr" unless exists $cfg->{peer_addr} || exists $cfg->{local_addr} || exists $cfg->{peer} || exists $cfg->{here}; my $sock; if($cfg->{domain} eq 'UNIX') { $cfg->{here} = pack("Sa*", AF_UNIX, $cfg->{local_addr}) if(exists $cfg->{local_addr}); $cfg->{peer} = pack("Sa*", AF_UNIX, $cfg->{peer_addr}) if exists $cfg->{peer_addr}; if(exists $cfg->{peer}) { return "Please define either read_ready or connect_ok handler" if(!_def_callback($cfg, 'read_ready') && !_def_callback($cfg, 'connect_ok')); } elsif(!exists $cfg->{here} || !_def_callback($cfg, 'read_ready')) { return "Please define either peer address or read_ready handler and local address"; } $sock = IO::Socket::UNIX->new(Type => ($cfg->{proto} eq 'tcp' ? SOCK_STREAM : SOCK_DGRAM), Proto => $cfg->{proto}); } else { if(exists $cfg->{local_addr}) { my $host = inet_aton($cfg->{local_addr}); return "Can't resolve local host name" if(! defined $host); my $p = resolve_port_name($cfg->{local_port}); return "Can't get the port name '$cfg->{local_port}'" unless defined $p; $cfg->{here} = pack_sockaddr_in($p, $host); } if(exists $cfg->{peer_addr}) { my $host = inet_aton($cfg->{peer_addr}); return "Can't resolve peer host name" if(! defined $host); my $p = resolve_port_name($cfg->{peer_port}); return "Can't get the port name '$cfg->{peer_port}'" unless defined $p; $cfg->{peer} = pack_sockaddr_in($p, $host); } if(exists $cfg->{peer}) { return "Please define either read_ready or connect_ok handler" if(!_def_callback($cfg, 'read_ready') && !_def_callback($cfg, 'connect_ok')); } elsif(!exists $cfg->{here} || !_def_callback($cfg, 'read_ready')) { return "Please define either peer address or read_ready handler and local address"; } $sock = IO::Socket::INET->new(Proto => $cfg->{proto}); } return "Can't obtain socket: $!" unless $sock; if(exists $cfg->{here}) { $sock->sockopt(SO_REUSEADDR,1) if(exists $cfg->{'listen'} && !exists $cfg->{peer}); if(!bind($sock, $cfg->{here})) { close($sock); return "Can't bind socket to local address: $!"; } } $sock->blocking(0); $cfg->{Sock} = $sock; my $fno = fileno($sock); $cnx[$fno][CFG_SLOT] = $cfg; #warn("DBG $cfg->{ID} socket is added\n"); if(exists $cfg->{peer}) { _init_connect($cfg, $cfg->{peer}); } else { if(exists $cfg->{'listen'} && !listen($sock, $cfg->{'listen'})) { drop_cfg($cfg); return "Can't start listner: $!"; } need_read($cfg); } return; } =item B This function does the main work of watching all sockets and timers. It attempts to call back handlers registered with those sockets and timers. There is only one handler for each timer. Sockets might have the following handlers registered: =over 4 =item connect_ok Is called when connection was established. As argument this function shall receive reference to configuration. =item sock_error Is called when establishment of connection failed for any reason. As argument this function shall receive reference to configuration and error string. To find out error number check $!. You are responsible for calling B! =item read_ready Is called when socket has some data in incoming buffer and recv will succeed. As argument it receives socket and pointer to configuration. Also is called when accept is ready. =item write_ready Is called when writing to socket will succeed. As argument it receives socket and pointer to configuration. =back There is special variable B<$Terminate>. If you set this variable to non-zero value then B handlers shall be called and passed configuration and the value of this variable. If this variable is set to 1 then also for those configurations where B is not registered drop_cfg shall be called and all timers marked as B shall be dropped. (In other words when B is set to 1 then everything is done to finish operations) =cut sub dispatcher { while(@touts || @cnx) { my $tout; if(@touts) { $tout = $touts[0][0] - $TimeNow; $tout /= $ticks_per_sec; } # Just in case the main routin uses signals we check terminate flag right before entering # select and if it is set then we set timeout to 0 seconds $tout = 0 if $Terminate; my $count = select($rdout = $rd, $wrout = $wr, undef, $tout); ($TimeNow) = POSIX::times(); if($count < 0) # some error in select { die "Select with timeout $tout failed: $!\n" unless $!{EINTR}; $count = 0; } if($Terminate) { # tell each socket that termination was requested for(my $i = 0; $i <= $#cnx; $i++) { next unless defined $cnx[$i]; my $cfg = $cnx[$i][CFG_SLOT]; if(_def_callback($cfg, 'term_action')) { $cfg->{term_action}($cfg, $Terminate); } elsif($Terminate == 1) { drop_cfg($cfg); } } if($Terminate == 1) { # and also drop timeouts that want it for(my $i = 0; $i < @touts; $i++) { next unless exists $touts[$i][1]->{fragile}; splice(@touts, $i, 1); $i--; } } last unless (@touts || @cnx); $Terminate = 0; } while(@touts && $touts[0][0] <= $TimeNow) { local($_) = shift @touts; if(_def_callback($_->[1], 'handler')) { $_->[1]{handler}($_->[1]); } } next if !$count; # no need to check readinnes of sockets if nothing was ready for(my $i = 0; $i <= $#cnx; $i++) { $cnx[$i][RH_SLOT]($cnx[$i][CFG_SLOT]) if(vec($rdout, $i, 1)); $cnx[$i][WH_SLOT]($cnx[$i][CFG_SLOT]) if(vec($wrout, $i, 1)); } } } =item B This function adds information about timer to the system. It expects reference to HASH as argument. This reference must define 2 parameters I and I. After specified in I number of seconds I shall be called. As parameter it shall receive the reference to this HASH. =cut sub register_tmr { shift if ! ref $_[0]; my $cfg = shift; my @hldr; $hldr[0] = $cfg->{period} * $ticks_per_sec + $TimeNow; $hldr[1] = $cfg; my $i = 0; $i++ while($i < @touts && $touts[$i][0] <= $hldr[0]); splice(@touts, $i, 0, \@hldr); #my $tmr_id = exists $cfg->{ID} ? "$cfg $cfg->{ID}" : "Unknown $cfg"; #warn("DBG added timer $tmr_id\n"); return; } =item B This function sets new expiration time for the timer. =cut sub reset_tmr { shift unless ref $_[0]; my $cfg = shift; my $msg; $msg = drop_tmr($cfg); return $msg if $msg; return register_tmr($cfg); } =item B This function shall be used when you are done with the socket. It unregisteres your configuration. As parameter it expects reference to configuration. If you define callback I then this call back shall be called. If this call back is not defined then this function closes socket and deletes I element from the hash. =cut sub drop_cfg { shift unless ref $_[0]; my $cfg = shift; return unless exists $cfg->{Sock}; my $sock = $cfg->{Sock}; my $fno = fileno($sock); return unless defined $fno && $#cnx >= $fno; if($#cnx == $fno) { do { pop @cnx; } while(@cnx && ! defined $cnx[-1]); } else { $cnx[$fno] = undef; } vec($wr, $fno, 1) = 0; vec($rd, $fno, 1) = 0; vec($wrout, $fno, 1) = 0; vec($rdout, $fno, 1) = 0; #warn("DBG $cfg->{ID} socket is removed\n"); if(_def_callback($cfg, 'cleanup')) { $cfg->{cleanup}($cfg) } else {delete $cfg->{Sock}; close($sock);}; } =item B This function shall be used when you don't want to use some timer. It is called automatically if you don't define any handler for timer. Please note that if your timer expires but is not droped then it will cause error in dispatcher. As parameter it expects reference to timer. =cut sub drop_tmr { shift unless ref $_[0]; my $cfg = shift; my $i = 0; $i++ while($i < @touts && $touts[$i][1] != $cfg); return "Timer not registered" if $i == @touts; splice(@touts, $i, 1); return; } =for __PRIVATE__ this function simply returns true if specified parameter exists and it is reference to CODE. =cut sub _def_callback { my $cfg = shift; return exists $cfg->{$_[0]} && ref $cfg->{$_[0]} eq 'CODE'; } =for __PRIVATE__ this function starts connecting on given socket to given address. If connection can't be done immidiately then it puts socket into waiting for Write readinnes. Otherwise it calls connect_ok or returns error message. =cut sub _init_connect { my $cfg = shift; my $peer = shift; my $sock = $cfg->{Sock}; if(!connect($sock, $peer)) { if($!{EINPROGRESS}) { $cfg->{_save_write_ready} = $cfg->{write_ready} if exists $cfg->{write_ready}; $cfg->{write_ready} = \&_check_connect_outcome; need_write($cfg); } else { if(_def_callback($cfg, 'sock_error')){ $cfg->{sock_error}($cfg, $!) } else { drop_cfg($cfg) } } return; } # if connection was successfull then we either call handler of this event # or by default start waiting for incoming data if(_def_callback($cfg, 'connect_ok')) { $cfg->{connect_ok}($cfg); } else { need_read($cfg); } return; } =for __PRIVATE__ this is connection outcome checker. It is called when write readinnes is here. We call connect_ok, sock_error, or simply switch it to waiting for read readinnes. =cut sub _check_connect_outcome { my $cfg = shift; my $sock = $cfg->{Sock}; block_write($cfg); delete $cfg->{write_ready}; $cfg->{write_ready} = delete $cfg->{_save_write_ready} if exists $cfg->{_save_write_ready}; $! = unpack('i', getsockopt($sock, SOL_SOCKET, SO_ERROR)); if($! != 0) { my $err = $!; drop_cfg($cfg); $cfg->{sock_error}->($cfg, $err) if _def_callback($cfg, 'sock_error'); return; } # if connection was successfull then we either call handler of this event # or by default start waiting for incoming data if(_def_callback($cfg, 'connect_ok')) { $cfg->{connect_ok}->($cfg); } else { need_read($cfg); } } =item B This is called when you want to wait till read becomes available. Pass your configuration as parameter. =cut sub need_read { shift unless ref $_[0]; my $cfg = shift; return "read_ready is not defined" unless _def_callback($cfg, 'read_ready'); return "This configuration is not registered" unless exists $cfg->{Sock}; my $fno = fileno($cfg->{Sock}); return "Socket is not valid" unless defined $fno; return "This configuration is not registered" unless $#cnx >= $fno && defined $cnx[$fno]; $cnx[$fno][RH_SLOT] = $cfg->{read_ready}; vec($rd, $fno, 1) = 1; return; } =item B This is called when you want to wait till write becomes available. Pass your configuration as parameter. =cut sub need_write { shift unless ref $_[0]; my $cfg = shift; return "write_ready is not defined" unless _def_callback($cfg, 'write_ready'); return "This configuration is not registered" unless exists $cfg->{Sock}; my $fno = fileno($cfg->{Sock}); return "Socket is not valid" unless defined $fno; return "This configuration is not registered" unless $#cnx >= $fno && defined $cnx[$fno]; $cnx[$fno][WH_SLOT] = $cfg->{write_ready}; vec($wr, $fno, 1) = 1; return; } =item B This is called when you decided that you don't want to check read availability. =cut sub block_read { shift unless ref $_[0]; my $cfg = shift; return "This configuration is not registered" unless exists $cfg->{Sock}; my $fno = fileno($cfg->{Sock}); return "Socket is not valid" unless defined $fno; return "This configuration is not registered" unless $#cnx >= $fno && defined $cnx[$fno]; $cnx[$fno][RH_SLOT] = undef; vec($rd, $fno, 1) = 0; return; } =item B This is called when you decided that you don't want to check write availability. =cut sub block_write { shift unless ref $_[0]; my $cfg = shift; return "This configuration is not registered" unless exists $cfg->{Sock}; my $fno = fileno($cfg->{Sock}); return "Socket is not valid" unless defined $fno; return "This configuration is not registered" unless $#cnx >= $fno && defined $cnx[$fno]; $cnx[$fno][WH_SLOT] = undef; vec($wr, $fno, 1) = 0; return; } =over =head1 AUTHOR Andrei A. Voropaev =cut 1;