Cover V12, I07

Article
Listing 1

jul2003.tar

Listing 1 clusterpunchserver

#!/usr/local/bin/perl
use strict;
use Data::Dumper;
use Fcntl ':flock';
use FindBin;
use Getopt::Std;
use IO::Socket qw(:DEFAULT :crlf);
use POSIX qw(strftime setsid);
use POSIX 'WNOHANG';
use Time::HiRes qw(tv_interval gettimeofday);
use Config::General;

use lib "$FindBin::RealBin/../lib";
use clusterpunch; # accessory functions

use vars qw($opt_f $opt_l $opt_d $opt_p $opt_h $opt_D $opt_v $opt_s);
getopts("f:l:p:vdshD");

my $VERSION = "0.1";

$Data::Dumper::Varname = "STAT";
$Data::Dumper::Purity = 1;

# Limit incoming messages to 1000 bytes.
use constant MAX_MSG_LEN => 1000;

################################################################
# Load parameters from configuration files into %CONFIG.
# Parameters can be redefined if multiple files are read in
################################################################

chomp(my $SERVERNAME = `hostname -s`); 

my %CONFIG = LoadConfig(ConfigFiles($opt_f));
# Process command line parameters, overriding any
# configuration file settings
$CONFIG{"verbose"} = (grep(defined $_, ($opt_v,$CONFIG{"verbose"},0)))[0];
$CONFIG{"debug"} = (grep(defined $_, ($opt_D,$CONFIG{"debug"},0)))[0];
$clusterpunch::debug = $CONFIG{"debug"};
$CONFIG{"logging"} = (grep(defined $_, ($opt_s,$CONFIG{"logging"},0)))[0];
$CONFIG{"port"} = (grep(defined $_, ($opt_p,$CONFIG{"port"},8095)))[0];
$CONFIG{"logdir"} = (grep(defined $_, ($opt_l,$CONFIG{"logdir"},"/tmp")))[0];
$CONFIG{"daemon"} = (grep(defined $_, ($opt_d,$CONFIG{"daemon"},0)))[0];
$CONFIG{"daemon"} = 0 if $CONFIG{"verbose"};

# Make sure we can write to the LOG directory, if it is defined
$CONFIG{logdir} = undef 
    if defined $CONFIG{logdir} && (! -d $CONFIG{logdir} || ! -w _);

$SIG{HUP}  = $SIG{INT} = \&Finish;
$SIG{CHLD} = \&Reaper;

foreach (keys %CONFIG) {
    Log("$_ ",(ref($CONFIG{$_}) eq "ARRAY")?int(@{$CONFIG{$_}}):$CONFIG{$_});
}

my ($msg_in,$clienthost,$clientport)=(undef,"-",undef);
my $sock = InitiateSocket(port=>$CONFIG{port});

if(! $sock) {
    Log("Server already running? Cannot create socket - aborting start request.");
    exit 0;
}

# Run in the background, if daemon is requested and if verbose is not requested.

if(! $CONFIG{"verbose"} && $CONFIG{"daemon"}) {
    Log("Backgrounding");
    BecomeDaemon();
} else {
    Log("Running in foreground");
}
Log("Servicing incoming requests - parent PID $$");

# loop:
#  parent->listen
#  if(connection)
#     parent->forkchild       child->process_connection
#                             child->execute_commands
#                             child->send_results_to_client
#                             child->exit

while (1) {
  # endlessly wait for something from the socket
  next unless $sock->recv($msg_in,MAX_MSG_LEN);

  # parse incoming commands and reload the configuration
  # if required
  my @commands = ProcessMessage($msg_in);

  # check if we need to load up the punches from file again
  if(grep (/reload/, map {$_->{command}} @commands)) {
      %CONFIG = LoadConfig(ConfigFiles($opt_f));
  }
  # fork off a child, which handles the request 
  # becareful - the child may be receiving a shutdown signal which
  # it must communicate back to the parent!
  my $parentpid = $$;
  my $childpid = fork;
  if(! defined $childpid) {
      Log("problem forking - exiting");
      exit;
  }
  if($childpid) {
      # we're in the parent - wait for next connection
      Log("Parent $$ has forked process $childpid");
      next;
  }
  # we're in the child ... process connection and quit
  Log("Child $$ from parent $parentpid - processing connection");

  $clienthost = IdentifyClient($sock);
  $clientport = $sock->peerport;
  my $msglength  = length($msg_in);
  Log("RCV from $clienthost command $msg_in $msglength bytes");

  ################################################################
  # Iterate through all commands and take appropriate action
  # Each command is the hash {command=>SCALAR,args=>LISTREF}

  my %STAT;
  # First check if we need to shutdown!
  if(grep (/shutdown|halt|stop/, map {$_->{command}} @commands)) {
      # must send the parent a HUP signal
      Log("sending HUP to parent $parentpid");
      my $cnt = kill 1, $parentpid;
      Log("kill returned $cnt");
      # finish the child
      Finish();
  }

  # Check if we need debugging
  $clusterpunch::debug = 1 if grep (/debug/, map {$_->{command}} @commands);

  my @punches = @{$CONFIG{punch}};
 PUNCH: foreach my $command (@commands) {
    my $commandtext = $command->{command};
    my ($punch) = grep($_->{name} eq $commandtext, @punches);
    next PUNCH unless $punch;
    # found a punch for the current command text
    my @args = @{$command->{args}};
    my ($punch_name,$statistic,$cumulative,$valuemap,$valuetype,$function,$system) = 
     @{$punch}{qw(name statistic cumulative valuemap valuetype function system)};
     Log("PUNCH($commandtext) found valuetype $valuetype");

     my ($timer,$call_value,$punch_value);
     $timer = [gettimeofday] if $valuetype eq "timer";

     if($function) {
     @_ = @args;
     $call_value = eval $function;
     if($@) {
         Log("PUNCH($commandtext) could not parse function");
         next PUNCH;
     }
     } elsif ($system) {
     Log("PUNCH($commandtext) system [$system]");
     open(PROC,"$system |");
     while(<PROC>) {
       s/^\s+//;
       $call_value .= $_;
     }
     close(PROC);
     chomp $call_value;
     }
     if($valuetype eq "return") {
     $punch_value = $call_value;
     } else {
     $punch_value = tv_interval($timer);
     }
     if($valuemap) {
     @_ = ($punch_value);
     $punch_value = eval $valuemap;
     if($@) {
         Log("PUNCH($commandtext) could not parse valuemap");
         next PUNCH;
     }
     }
     Log("PUNCH($commandtext) returned $punch_value");

     $STAT{$statistic}   = $punch_value;
     $STAT{$cumulative} += $punch_value if defined $cumulative;
     
     next PUNCH;
     
     if($commandtext eq "mem") {
     # append the argument to the memory field e.g. memfree, memtotal, memswapused
     my $field = "mem";
    if(@args) {
        $field .= $args[0];
    } else {
        $field .= "free";
    }
    $STAT{$field} = MemFree(@args);
    Log("return $STAT{memfree}");
    }
  }
  $STAT{live} = 1;
  $STAT{host} = $SERVERNAME;
  Log(Dumper(\%STAT));
  my $dump = Dumper(\%STAT);
  $sock->send(pack("a*",$dump)) or die "send(): $!\n";
  exit;
}

# should never get here ... but if we do
$sock->close();
exit 0;

################################################################
#
# Parse socket messages
#
# The message $msg_in is of the format
# cmd(arg1,arg2,...,argn);cmd(...);...;cmd(...)
#
# These are parsed into the @commands array which has the format
# $commands[n] = {command=>commandtext,args=>[arg1,arg2,...,argn]};
#
################################################################

sub ProcessMessage {
  my $msg_in = shift;
  my @commandblocks = split(/;/,$msg_in);
  my @commands;
  foreach my $commandblock (@commandblocks) {
    my $command;
    my @args;
    if($commandblock =~ /(.*?)\((.*)\)/) {
      $command = $1;
      @args = split(/,/,$2);
    } else {
      $command = $commandblock;
    }
    push(@commands,{command=>$command,args=>\@args});
  }
  return @commands;
}

################################################################
# 
# Daemonize - run the script in the background
#
################################################################

sub BecomeDaemon {
  my $child = fork;
  if(! defined $child) {
    die "cannot fork";
  }
  if($child) {
      Log("$SERVERNAME going to daemon mode");
      exit 0;
  }
  setsid();
  open(STDIN,"</dev/null");
  open(STDOUT,">/dev/null");
  open(STDERR,">&STDOUT");
  chdir "/";
  umask(0);
  $ENV{PATH} = "/usr/local/bin:/usr/local/sbin:/bin/:/sbin:/usr/bin/:/usr/sbin";
  return $$;
}

################################################################
# Initiate and return the socket
################################################################
sub InitiateSocket {
    my %input = @_;
    my $port = $input{port} || return undef;
    my $socket = IO::Socket::INET->new(Proto=>"udp",LocalPort=>$port);    
    return $socket;
}

################################################################
# Identify the client connecting to us
################################################################
sub IdentifyClient {
    my $socket = shift;
    my $clienthost = gethostbyaddr($sock->peeraddr,AF_INET) || $sock->peerhost;
    $clienthost =~ s/(.*?)\..*/$1/ if $clienthost =~ /[A-Za-z]/;
    return $clienthost;
}

################################################################
# Reap the children to avoid zombies
################################################################
sub Reaper {
    while (waitpid(-1,WNOHANG)>0) {}
}

################################################################
# Cleanly exit - close the socket and log 
################################################################
sub Finish {
  $sock->close();
  Log("pid $$ shutting down - Finish()");
  exit 0;
}

################################################################
# Log to file using a semaphore file as a lock
################################################################
sub Log {
  my $timestamp = strftime "%Y-%m-%d %H:%M:%S",localtime;
  $clienthost ||= "-";
  if($CONFIG{verbose}) {
    warn "$SERVERNAME | $clienthost [$timestamp] ",@_,"\n";
  }
  if($CONFIG{logging} && $CONFIG{logdir}) {
    if(open(LOGSEM,">$CONFIG{logdir}/$SERVERNAME.sem")) {
      flock(LOGSEM,LOCK_EX);
      if(open(LOG,">>$CONFIG{logdir}/$SERVERNAME.log")) {
    print LOG "$SERVERNAME | $clienthost [$timestamp] ",@_,"\n";
    close(LOG);
      }
      flock(LOGSEM,LOCK_UN);
      close(LOGSEM);
    }
  }
}