[padb-devel] [padb commit] r56 - First code to implement bi-directional communication
codesite-noreply at google.com
codesite-noreply at google.com
Wed Jun 17 12:21:52 BST 2009
Author: apittman
Date: Wed Jun 17 02:47:07 2009
New Revision: 56
Modified:
branches/full-duplex/src/padb
Log:
First code to implement bi-directional communication
between the outer and the inner processes, currently
it only works for one node and it only works for
--mpi-watch.
Modified: branches/full-duplex/src/padb
==============================================================================
--- branches/full-duplex/src/padb (original)
+++ branches/full-duplex/src/padb Wed Jun 17 02:47:07 2009
@@ -205,6 +205,8 @@
use File::Temp qw(tempfile);
use MIME::Base64;
use Config;
+use IO::Socket;
+use IO::Select;
###############################################################################
#
@@ -377,6 +379,7 @@
$conf{"local-fd-name"} = "/dev/null";
$conf{"stack-strip-above"} =
"elan_waitWord,elan_pollWord,elan_deviceCheck,opal_condition_wait";
+$conf{"full-duplex"} = 0;
# $conf{stack-format} = undef;
@@ -2779,8 +2782,8 @@
# print Dumper $lines;
my $s = "";
- foreach my $l ( sort { $a <=> $b } ( keys %{ $lines->{raw} } ) ) {
- $s .= $lines->{raw}{$l}{state};
+ foreach my $l ( sort { $a <=> $b } ( keys %{ $lines->{from_vpid} } ) )
{
+ $s .= $lines->{from_vpid}{$l}{state};
}
print("$s\n");
}
@@ -3016,6 +3019,228 @@
}
}
+sub connect_to_child {
+ my ( $host, $port, $word ) = @_;
+
+ my $socket = IO::Socket::INET->new(
+ PeerAddr => $host,
+ PeerPort => $port,
+ Proto => 'tcp',
+ ) or die("Failed to connect to child");
+
+ print $socket "hello $word\n";
+
+ #printf("Connecting to $host $port $word\n");
+ return $socket;
+}
+
+sub my_encode {
+ return encode_base64( nfreeze(shift), "" );
+}
+
+sub my_decode {
+ return thaw( decode_base64(shift) );
+}
+
+# We have read data on a socket, process it and call
+# any callback.
+sub extract_line {
+ my ( $handle, $sd ) = @_;
+
+ my $str = $sd->{str};
+
+ # Do this to allow telnet sessions to work.
+ $str =~ s/\r//g;
+
+ # printf("Testing $str\n");
+
+ if ( $str =~ /^([^\n]+)\n(.*)$/ ) {
+
+ # printf("Calling callback for \"$1\"\n");
+ $sd->{line_cb}( $handle, $sd, $1 );
+ $sd->{str} = $2;
+ } else {
+ printf("No match\n");
+ }
+
+ return;
+
+}
+
+sub issue_command_to_inner {
+ my ( $cdata, $cmd ) = @_;
+ my $str = my_encode($cmd);
+ $cdata->{socket}->print("$str\n");
+}
+
+sub command_from_inner {
+ my ( $handle, $cdata, $line ) = @_;
+
+ if ( $line eq "Welcome" ) {
+
+ # printf("Sending data to all childen\n");
+ # Tell all hosts to go.
+
+ my $req;
+ $req->{mode} = $handle->{mode};
+ $req->{cmd} = $handle->{cmd};
+ $req->{jobid} = $handle->{jobid};
+ $req->{cinner} = \%cinner;
+
+ # print Dumper $req;
+ issue_command_to_inner( $cdata, $req );
+
+ return;
+ }
+
+ # A reply from inner.
+ my $d = my_decode($line);
+
+ $allfns{ $handle->{mode} }{out_handler}( undef, $d );
+
+ my $req;
+ $req->{quit} = 1;
+ $handle->{shutdown_in_progress} = 1;
+ issue_command_to_inner( $cdata, $req );
+
+ print Dumper $d if ( $conf{"full-duplex"} eq "debug" );
+
+}
+
+sub go_parallel {
+ my $jobid = shift;
+ my $cmd = shift;
+ my $ncpus = shift;
+ my $raw = shift;
+ my $stats = shift;
+ my $mode = shift;
+ my $h = shift;
+
+ my $errors = 0;
+
+ my $report_errors = 1;
+
+ my $pcmd = {
+ pid => -1,
+ in => "",
+ out => *OUT,
+ err => *ERR,
+ };
+
+ $pcmd->{pid} = open3( $pcmd->{in}, *OUT, *ERR, $cmd )
+ or die "Unable to open3() pcmd: $!\n";
+
+ close $pcmd->{in};
+
+ my $comm_data;
+
+ $comm_data->{mode} = $mode;
+ $comm_data->{cmd} = $cmd;
+ $comm_data->{jobid} = $jobid;
+ $comm_data->{shutdown_in_progress} = 0;
+
+ my $sel = IO::Select->new();
+ $sel->add( $pcmd->{out} );
+ $sel->add( $pcmd->{err} );
+
+ while ( my @live = $sel->can_read() ) {
+ foreach my $h (@live) {
+ if ( $h eq $pcmd->{out} ) {
+ my $line = $h->getline();
+ if ( not defined $line ) {
+ if ( not $comm_data->{shutdown_in_progress} ) {
+ printf("Warning, EOF from ofd\n");
+ }
+ $sel->remove($h);
+ $h->close();
+ next;
+ }
+ my @words = split( " ", $line );
+ if ( $#words == 3 and $words[0] eq "connect" ) {
+
+ my $socket =
+ connect_to_child( $words[1], $words[2], $words[3] );
+ my $cdata;
+ $cdata->{active} = 1;
+ $cdata->{str} = "";
+ $cdata->{socket} = $socket;
+ $cdata->{line_cb} = \&command_from_inner;
+ $comm_data->{sockets}{$socket} = $cdata;
+ $sel->add($socket);
+ next;
+ }
+ if ( $words[0] eq "debug" ) {
+ my $count = $sel->count();
+ print("There are $count sockets\n");
+ next;
+ }
+ print("inner: $line");
+ } elsif ( $h eq $pcmd->{err} ) {
+ my $line = $h->getline();
+
+ if ( not defined $line ) {
+ if ( not $comm_data->{shutdown_in_progress} ) {
+ printf("Warning, EOF from efd\n");
+ }
+ $sel->remove($h);
+ $h->close();
+ next;
+ }
+ printf("einner:$line");
+ } elsif ( defined $comm_data->{sockets}{$h} ) {
+ my $cdata = $comm_data->{sockets}{$h};
+
+ my $data;
+ my $nb = sysread( $h, $data, 1024 );
+
+ #printf("read $data ($nb) from fd\n");
+
+ if ( not defined $data or $nb == 0 ) {
+ if ( not $comm_data->{shutdown_in_progress} ) {
+ printf("EOF from child socket ($nb)\n");
+ }
+ $sel->remove($h);
+ $h->close();
+ next;
+ }
+
+ $cdata->{str} .= $data;
+ extract_line( $comm_data, $cdata );
+
+ } else {
+ printf("Responce from unknown fd $h\n");
+ exit(1);
+ }
+ }
+ my $count = $sel->count();
+ if ( $count == 1 ) {
+ printf("All sockets closed?\n");
+ }
+ }
+
+ waitpid( $pcmd->{pid}, 0 );
+ my $res = $?;
+
+ printf("result from parallel command was $res\n")
+ if ( $conf{"verbose"} );
+
+ if ( $res != 0 ) {
+ my %status = rc_status($res);
+ if ( job_is_running($jobid) ) {
+ if ($report_errors) {
+ printf("Failed to run parallel command (rc =
$status{rc})\n");
+ }
+ } else {
+ printf("Job $jobid is no longer active\n");
+ return 1;
+ }
+ }
+
+ cleanup_pcmd();
+
+ exit(0);
+}
+
sub go_job_once {
my $jobid = shift;
my $cmd = shift;
@@ -3226,7 +3451,13 @@
sleep( $conf{"interval"} );
}
}
- my $errors = go_job_once( $jobid, $cmd, $ncpus, $raw, $stats, $mode,
$h );
+ my $errors;
+ if ( $conf{"full-duplex"} ) {
+ $errors = go_parallel( $jobid, "$cmd --full-duplex",
+ $ncpus, $raw, $stats, $mode, $h );
+ } else {
+ $errors = go_job_once( $jobid, $cmd, $ncpus, $raw, $stats, $mode,
$h );
+ }
cleanup_pcmd();
return $errors;
}
@@ -5505,6 +5736,17 @@
return \%res;
}
+sub mpi_watch_all {
+ my ($list) = @_;
+ my %res;
+ foreach my $proc ( @{$list} ) {
+ my $vp = $proc->{vp};
+ my $pid = $proc->{pid};
+ $res{$vp} = mpi_watch( $vp, $pid );
+ }
+ return \%res;
+}
+
sub show_pid {
my ( $vp, $pid ) = @_;
@@ -5854,6 +6096,154 @@
);
}
+sub command_from_parent {
+ my $cmd = shift;
+ my $res;
+
+ # This is only for debugging.
+ if ( $confInner{verbose} ) {
+ $res->{request} = $cmd;
+ }
+
+ if ( $cmd->{quit} == 1 ) {
+ exit(0);
+ }
+
+ # Setup the environment.
+ foreach my $key ( keys( %{ $cmd->{cinner} } ) ) {
+ $confInner{$key} = $cmd->{cinner}{$key};
+ }
+
+ $confInner{mode} = $cmd->{mode};
+
+ # Find the pids and register them all.
+ $rmgr{ $confInner{rmgr} }{find_pids}( $cmd->{jobid} );
+
+ # Now do the work.
+ $res->{from_vpid} =
+ $allfns{ $cmd->{mode} }{handler_all}( $confInner{"all-pids"} );
+
+ return $res;
+}
+
+sub command_from_outer {
+ my ( $netdata, $cdata, $line ) = @_;
+
+ my $s = $cdata->{socket};
+ if ( not $cdata->{trusted} ) {
+ if ( $line eq "hello $netdata->{key}" ) {
+
+ #printf("Trusting connection from $cdata->{desc}\n");
+ $cdata->{trusted} = 1;
+ $cdata->{str} = "";
+ $s->printf("Welcome\n");
+ $netdata->{parent} = $cdata;
+ } elsif ( $line eq "debug" ) {
+ my $r = Dumper($netdata);
+ $s->printf($r);
+ $s->flush();
+ $netdata->{sel}->remove($s);
+ $s->close();
+ $cdata->{dead} = 1;
+ print("debug\n");
+ } else {
+
+ #printf("Closing connection from $cdata->{desc} (Bad
signon)\n");
+ $netdata->{sel}->remove($s);
+ $s->close();
+ $cdata->{dead} = 1;
+ }
+ return;
+ }
+
+ $cdata->{last_cmd} = my_decode($line);
+ if ( $netdata->{parent} eq $cdata ) {
+ my $res = command_from_parent( my_decode($line) );
+ my $reply = my_encode($res);
+ $cdata->{socket}->printf("$reply\n");
+ }
+}
+
+sub connect_to_outer {
+
+ my $server = IO::Socket::INET->new(
+
+ Reuse => 1,
+ Proto => 'tcp',
+ LocalPort => 37132,
+ Listen => 2,
+ ) or die("not the best start");
+
+ my $lport = $server->sockport();
+ my $hostname = hostname();
+ my $key = "boris";
+ my $signon_text = "connect $hostname $lport $key\n";
+
+ # For now just print the signon code to stdout and let the outer pick
it up.
+ print($signon_text);
+
+ my $netdata;
+ $netdata->{sel} = IO::Select->new();
+ $netdata->{sel}->add($server);
+ $netdata->{server} = $server;
+ $netdata->{key} = $key;
+
+ my $sel = $netdata->{sel};
+ my $server = $netdata->{server};
+
+ while ( my @data = $sel->can_read() ) {
+ foreach my $s (@data) {
+ if ( $s == $server ) {
+ my $new = $server->accept() or die("Failed accept");
+ $sel->add($new);
+ my $peer = getpeername($new);
+ my ( $port, $addr ) = unpack_sockaddr_in($peer);
+ my $ip = inet_ntoa($addr);
+ my $hostname = gethostbyaddr( $addr, AF_INET );
+
+ # printf "New connection from $hostname ($ip) $port\n";
+ my %sinfo;
+ $sinfo{hostname} = $hostname;
+ $sinfo{trusted} = 0;
+ $sinfo{port} = $port;
+ $sinfo{desc} = "$hostname:$port";
+ $sinfo{socket} = $new;
+ $sinfo{line_cb} = \&command_from_outer;
+ $netdata->{connections}{$new} = \%sinfo;
+
+ # $new->printf("Hello from padb\n");
+ #$new->autoflush();
+ next;
+ }
+
+ my $sinfo = $netdata->{connections}{$s};
+ my $d;
+ sysread( $s, $d, 1024 );
+
+ # Dead connection.
+ if ( not defined $d ) {
+ printf("null read from $sinfo->{desc}\n");
+ if ( eof($s) ) {
+ $sel->remove($s);
+ $s->close();
+ $sinfo->{trusted} = 0;
+ $sinfo->{dead} = 1;
+ my $count = $sel->count();
+ printf("EOF from $sinfo->{desc} $count sockets
left\n");
+ }
+ next;
+ }
+
+ $sinfo->{str} .= $d;
+ extract_line( $netdata, $sinfo );
+
+ }
+ }
+ my $count = $sel->count();
+ printf("Thats not supposed to happen count=($count)\n");
+
+}
+
sub inner_main {
$confInner{"slurm-job-step"} = "0";
@@ -5872,6 +6262,7 @@
my @config_options;
my $line_formatted;
my $jobid;
+ my $full_duplex;
my %optionhash = (
"config-option|O=s" => \@config_options,
@@ -5879,7 +6270,8 @@
"line-formatted" => \$line_formatted,
"rank=i" => \@ranks,
"stats-full" => \$stats,
- "verbose|v+" => \$confInner{"verbose"}
+ "verbose|v+" => \$confInner{"verbose"},
+ "full-duplex" => \$full_duplex,
);
my %config_hash;
@@ -5902,6 +6294,12 @@
GetOptions(%optionhash) or die("could not parse options\n");
+ # If this works then nothing below here is needed as all
+ # requests can be sent over the socket.
+ if ($full_duplex) {
+ connect_to_outer();
+ }
+
my $mode;
foreach my $arg ( keys %config_hash ) {
@@ -6113,7 +6511,7 @@
};
$allfns{mpi_watch} = {
- 'handler' => \&mpi_watch,
+ 'handler_all' => \&mpi_watch_all,
'arg_long' => 'mpi-watch',
'help' => "Trace MPI programs",
'pre_out_handler' => \&pre_mpi_watch,
More information about the padb-devel
mailing list