[padb-devel] [padb commit] r59 - The tree code is now present and working in at least one case, startup,
codesite-noreply at google.com
codesite-noreply at google.com
Fri Jun 19 15:40:05 BST 2009
Author: apittman
Date: Fri Jun 19 07:39:30 2009
New Revision: 59
Modified:
branches/full-duplex/src/padb
Log:
The tree code is now present and working in at least one case, startup,
connecting, signon, reply, command processing and clean shutdown now
work correctly across multiple hosts.
Modified: branches/full-duplex/src/padb
==============================================================================
--- branches/full-duplex/src/padb (original)
+++ branches/full-duplex/src/padb Fri Jun 19 07:39:30 2009
@@ -626,7 +626,7 @@
}
# Put the args in a hash so that they can be referenced by name.
- if ( defined $allfns{$mode}{secondary} ) {
+ if ( defined $mode and defined $allfns{$mode}{secondary} ) {
foreach my $sec ( @{ $allfns{$mode}{secondary} } ) {
$secondary_args{ $sec->{arg_long} } = $sec->{value};
}
@@ -2321,9 +2321,11 @@
$mpd_dfile = $fn;
- my $cmd = "mpirun -machinefile $fn -np $i";
+ my $cmd = "mpdrun -machinefile $fn -np $i";
+
+ my $hosts = $#hosts + 1;
- return ( $cmd, undef );
+ return ( $cmd, undef, $hosts );
}
sub mpd_cleanup_pcmd {
@@ -2413,9 +2415,9 @@
}
- if ( $conf{"verbose"} ) {
- print Dumper \%open_jobs;
- }
+ #if ( $conf{"verbose"} ) {
+ # print Dumper \%open_jobs;
+ #}
}
sub open_get_jobs {
@@ -2457,9 +2459,10 @@
$open_dfile = $fn;
my $prefix = find_ompi_prefix();
- my $cmd = "mpirun -machinefile $fn -np $i $prefix";
+ my $cmd = "orterun -machinefile $fn -np $i $prefix";
+ my $hosts = $#hosts + 1;
- return ( $cmd, undef );
+ return ( $cmd, undef, $hosts );
}
sub open_cleanup_pcmd {
@@ -2782,8 +2785,9 @@
# print Dumper $lines;
my $s = "";
- foreach my $l ( sort { $a <=> $b } ( keys %{ $lines->{from_vpid} } ) )
{
- $s .= $lines->{from_vpid}{$l}{state};
+ foreach my $l ( sort { $a <=> $b } ( keys %{ $lines->{target_responce}
} ) )
+ {
+ $s .= $lines->{target_responce}{$l}{state};
}
print("$s\n");
}
@@ -3029,7 +3033,6 @@
print $socket "hello $word\n";
- #printf("Connecting to $host $port $word\n");
return $socket;
}
@@ -3059,13 +3062,58 @@
$sd->{line_cb}( $handle, $sd, $1 );
$sd->{str} = $2;
} else {
- printf("No match\n");
+
+ # Likely just truncated input, wait for more to arrive.
+ # printf("No match ()\n");
}
return;
}
+# For each remote process generate a tree, giving each
+# process a parent and a number of children.
+# Currently just make this a simple "ladder" but should
+# probably be a f-nomial tree.
+sub generate_comm_tree {
+ my ($a) = @_;
+ my @b = @{$a};
+ my $last = "root";
+ my %comm_tree;
+ foreach my $c (@b) {
+ $comm_tree{$c}{parent} = $last;
+ push( @{ $comm_tree{$last}{children} }, $c );
+ $last = $c;
+ }
+
+ return \%comm_tree;
+}
+
+# Called once when we have the socket details of the last child.
+sub connect_to_children {
+ my $comm_data = shift;
+ @{ $comm_data->{host_ids} } = sort( keys( %{ $comm_data->{remote} } )
);
+ $comm_data->{connection_tree} =
+ generate_comm_tree( $comm_data->{host_ids} );
+ my $td = $comm_data->{connection_tree}->{root}{children}[0];
+
+ #printf("I'm connecting to $td\n");
+ my $cdata;
+ $cdata->{socket} = connect_to_child(
+ $td,
+ $comm_data->{remote}{$td}{port},
+ $comm_data->{remote}{$td}{key}
+ );
+ $cdata->{active} = 1;
+ $cdata->{str} = "";
+ $cdata->{line_cb} = \&command_from_inner;
+
+ $comm_data->{sockets}{ $cdata->{socket} } = $cdata;
+ $comm_data->{sel}->add( $cdata->{socket} );
+
+ #print Dumper $comm_data;
+}
+
sub issue_command_to_inner {
my ( $cdata, $cmd ) = @_;
my $str = my_encode($cmd);
@@ -3076,33 +3124,46 @@
my ( $handle, $cdata, $line ) = @_;
if ( $line eq "Welcome" ) {
+ my $req;
+ $req->{mode} = "signon";
+ $req->{connection_tree} = $handle->{connection_tree};
+ $req->{remote} = $handle->{remote};
+ issue_command_to_inner( $cdata, $req );
+ return;
+ }
- # printf("Sending data to all childen\n");
- # Tell all hosts to go.
+ # A reply from inner.
+ my $d = my_decode($line);
+ if ( $handle->{state} eq "connecting" ) {
+ $handle->{state} = "live";
my $req;
$req->{mode} = $handle->{mode};
- $req->{cmd} = $handle->{cmd};
- $req->{jobid} = $handle->{jobid};
+ $req->{jobid} = $handle->{jobid};
$req->{cinner} = \%cinner;
-
- # print Dumper $req;
issue_command_to_inner( $cdata, $req );
-
return;
}
- # A reply from inner.
- my $d = my_decode($line);
+ if ( $handle->{state} eq "live" ) {
+ $handle->{state} = "shutdown";
+ my $req;
+ $req->{mode} = "exit";
+ issue_command_to_inner( $cdata, $req );
+
+
+ $allfns{ $handle->{mode} }{out_handler}( undef, $d );
+ return;
+ }
- $allfns{ $handle->{mode} }{out_handler}( undef, $d );
+ if ( $handle->{state} eq "shutdown" ) {
- my $req;
- $req->{quit} = 1;
- $handle->{shutdown_in_progress} = 1;
- issue_command_to_inner( $cdata, $req );
+ # Nothing to do here.
+ return;
+ }
- print Dumper $d if ( $conf{"full-duplex"} eq "debug" );
+ print("Hmm, unknown state! $handle->{state}\n");
+ return;
}
@@ -3114,6 +3175,7 @@
my $stats = shift;
my $mode = shift;
my $h = shift;
+ my $hosts = shift;
my $errors = 0;
@@ -3133,87 +3195,102 @@
my $comm_data;
- $comm_data->{mode} = $mode;
- $comm_data->{cmd} = $cmd;
- $comm_data->{jobid} = $jobid;
- $comm_data->{shutdown_in_progress} = 0;
+ $comm_data->{mode} = $mode;
+ $comm_data->{hosts} = $hosts;
+ $comm_data->{cmd} = $cmd;
+ $comm_data->{jobid} = $jobid;
+
+ # State, one of "connecting" "live" and "shutdown";
+ $comm_data->{state} = "connecting";
my $sel = IO::Select->new();
$sel->add( $pcmd->{out} );
$sel->add( $pcmd->{err} );
- while ( my @live = $sel->can_read() ) {
- foreach my $h (@live) {
- if ( $h eq $pcmd->{out} ) {
- my $line = $h->getline();
- if ( not defined $line ) {
- if ( not $comm_data->{shutdown_in_progress} ) {
- printf("Warning, EOF from ofd\n");
+ $comm_data->{sel} = $sel;
+ my $start = time();
+
+ while ( $sel->count() > 1 ) {
+ while ( my @live = $sel->can_read(5) ) {
+ foreach my $h (@live) {
+ if ( $h eq $pcmd->{out} ) {
+ my $line = $h->getline();
+ if ( not defined $line ) {
+ if ( not $comm_data->{state} eq "shutdown" ) {
+ printf("Warning, EOF from ofd\n");
+ }
+ $sel->remove($h);
+ $h->close();
+ next;
}
- $sel->remove($h);
- $h->close();
- next;
- }
- my @words = split( " ", $line );
- if ( $#words == 3 and $words[0] eq "connect" ) {
-
- my $socket =
- connect_to_child( $words[1], $words[2], $words[3] );
- my $cdata;
- $cdata->{active} = 1;
- $cdata->{str} = "";
- $cdata->{socket} = $socket;
- $cdata->{line_cb} = \&command_from_inner;
- $comm_data->{sockets}{$socket} = $cdata;
- $sel->add($socket);
- next;
- }
- if ( $words[0] eq "debug" ) {
- my $count = $sel->count();
- print("There are $count sockets\n");
- next;
- }
- print("inner: $line");
- } elsif ( $h eq $pcmd->{err} ) {
- my $line = $h->getline();
-
- if ( not defined $line ) {
- if ( not $comm_data->{shutdown_in_progress} ) {
- printf("Warning, EOF from efd\n");
+ my @words = split( " ", $line );
+ if ( $#words == 3 and $words[0] eq "connect" ) {
+
+ my $host = $words[1];
+
+ $comm_data->{remote}{$host}{port} = $words[2];
+ $comm_data->{remote}{$host}{key} = $words[3];
+ $comm_data->{signons}++;
+
+ if ( $comm_data->{signons} == $comm_data->{hosts}
) {
+ connect_to_children($comm_data);
+ }
+ next;
}
- $sel->remove($h);
- $h->close();
- next;
- }
- printf("einner:$line");
- } elsif ( defined $comm_data->{sockets}{$h} ) {
- my $cdata = $comm_data->{sockets}{$h};
-
- my $data;
- my $nb = sysread( $h, $data, 1024 );
-
- #printf("read $data ($nb) from fd\n");
-
- if ( not defined $data or $nb == 0 ) {
- if ( not $comm_data->{shutdown_in_progress} ) {
- printf("EOF from child socket ($nb)\n");
+ if ( $words[0] eq "debug" ) {
+ my $count = $sel->count();
+ print("There are $count sockets\n");
+ next;
+ }
+ print("inner: $line");
+ } elsif ( $h eq $pcmd->{err} ) {
+ my $line = $h->getline();
+
+ if ( not defined $line ) {
+ if ( not $comm_data->{state} eq "shutdown" ) {
+ printf("Warning, EOF from efd\n");
+ }
+ $sel->remove($h);
+ $h->close();
+ next;
+ }
+ printf("einner:$line");
+ } elsif ( defined $comm_data->{sockets}{$h} ) {
+ my $cdata = $comm_data->{sockets}{$h};
+
+ my $data;
+ my $nb = sysread( $h, $data, 1024 );
+
+ if ( not defined $data or $nb == 0 ) {
+ if ( not $comm_data->{state} eq "shutdown" ) {
+ printf("EOF from child socket ($nb)\n");
+ }
+ $sel->remove($h);
+ $h->close();
+ next;
}
- $sel->remove($h);
- $h->close();
- next;
- }
- $cdata->{str} .= $data;
- extract_line( $comm_data, $cdata );
+ $cdata->{str} .= $data;
+ extract_line( $comm_data, $cdata );
- } else {
- printf("Responce from unknown fd $h\n");
- exit(1);
+ } else {
+ printf("Responce from unknown fd $h\n");
+ exit(1);
+ }
+ }
+ my $count = $sel->count();
+ if ( $count == 1 ) {
+ printf("All sockets closed?\n");
}
}
+ my $t2 = time() - $start;
my $count = $sel->count();
- if ( $count == 1 ) {
- printf("All sockets closed?\n");
+ if ( $count > 0 ) {
+ printf("Still here, time:$t2 comm_count:$count\n");
+ if ( $comm_data->{signons} != $comm_data->{hosts} ) {
+ my $missing = $comm_data->{hosts} - $comm_data->{signons};
+ print("Waiting for signon from $missing hosts.\n");
+ }
}
}
@@ -3375,9 +3452,6 @@
my $key = job_to_key($jobid);
- my $cmd;
- my $ncpus;
-
my $stats;
foreach my $rank (@ranks) {
@@ -3394,10 +3468,14 @@
return 1 unless (@res);
- $cmd = $res[0];
- $ncpus = $res[1];
+ my $cmd = $res[0];
+
+ # These two are only defined by some resource managers.
+ my $ncpus = $res[1];
+ my $hosts = $res[2];
$conf{"verbose"} && defined $ncpus && print "Job has $ncpus cpus\n";
+ $conf{"verbose"} && defined $hosts && print "Job has $hosts hosts\n";
# Some versions of perl like to have a space after the O and report
that
# -ormgr isn't a valid option if it's not there, perhaps this is a bug
@@ -3452,8 +3530,13 @@
}
my $errors;
if ( $conf{"full-duplex"} ) {
+ if ( not defined $hosts ) {
+ printf("Full duplex mode needs to know the host count\n");
+ printf("Which is doesn't for this resource manager:
$conf{rmgr}\n");
+ return 1;
+ }
$errors = go_parallel( $jobid, "$cmd --full-duplex",
- $ncpus, $raw, $stats, $mode, $h );
+ $ncpus, $raw, $stats, $mode, $h, $hosts );
} else {
$errors = go_job_once( $jobid, $cmd, $ncpus, $raw, $stats, $mode,
$h );
}
@@ -6095,17 +6178,114 @@
);
}
-sub command_from_parent {
- my $cmd = shift;
- my $res;
+# Receive a reply from a child.
+# If it's the last reply then combine
+# with others and forward to parent.
+sub reply_from_child {
+ my ( $handle, $sd, $req ) = @_;
+
+ # If it's the first connection over this socket simply
+ # foreward on the signon command.
+ if ( $req eq "Welcome" ) {
+ $sd->{socket}->printf("$handle->{signon_cmd}\n");
+ return;
+ }
+
+ my $r = my_decode($req);
- # This is only for debugging.
- if ( $confInner{verbose} ) {
- $res->{request} = $cmd;
+ # Merge this reply into the local one.
+ $handle->{child_replys}++;
+
+ # $handle->{all_replys}{raw}{ $sd->{hostname} } = $r;
+
+ # Combine the host responces.
+ foreach my $status ( keys( %{ $r->{host_responce} } ) ) {
+ foreach my $host ( keys( %{ $r->{host_responce}{$status} } ) ) {
+ $handle->{all_replys}->{host_responce}{$status}{$host} =
+ $r->{host_responce}{$status}{$host};
+ }
}
- if ( defined( $cmd->{quit} ) and ( $cmd->{quit} == 1 ) ) {
- exit(0);
+ # Combine the target process responces.
+ if ( exists $r->{target_responce} ) {
+ foreach my $tp ( keys( %{ $r->{target_responce} } ) ) {
+ $handle->{all_replys}->{target_responce}{$tp} =
+ $r->{target_responce}{$tp};
+ }
+ }
+
+ # If this is the last reply from a child then report upstream.
+ # print Dumper $handle;
+ if ( $handle->{child_replys} != $handle->{children} ) {
+ my $missing = $handle->{children} - $handle->{child_replys};
+ return;
+ }
+
+ # Send the data upstream.
+ my $reply = $handle->{all_replys};
+
+ reply_to_parent( $handle, $reply );
+ if ($handle->{shutdown} ) {
+ inner_cleanup_and_exit($handle);
+ }
+
+ # Reset local data.
+ $handle->{all_replys} = undef;
+ $handle->{child_replys} = 0;
+ $handle->{target_responce} = undef;
+}
+
+# Receive a command (perl reference) from our parent.
+#
+# When we receive a command:
+# 1) Send it on to our children.
+# 2) Execute it.
+# 3) If we have no children send reply.
+sub command_from_parent {
+ my ( $netdata, $cmd ) = @_;
+
+ if ( $cmd->{mode} eq "signon" ) {
+ $netdata->{signon_cmd} = my_encode($cmd);
+
+ if ( not exists $cmd->{connection_tree}{$confInner{hostname}}{children} )
{
+ $netdata->{children} = 0;
+ return;
+ }
+
+ my @children = @{ $cmd->{connection_tree}{$confInner{hostname}}{children}
};
+ $netdata->{children} = $#children + 1;
+
+ # Only one child is tested so far.
+ foreach my $chostname (@children) {
+ my $socket = connect_to_child(
+ $chostname,
+ $cmd->{remote}{$chostname}{port},
+ $cmd->{remote}{$chostname}{key}
+ );
+ my %cdata;
+ $cdata{socket} = $socket;
+ $cdata{hostname} = $chostname;
+ $cdata{line_cb} = \&reply_from_child;
+ $cdata{state} = "init";
+ $netdata->{sel}->add($socket);
+ $netdata->{connections}{$socket} = \%cdata;
+ push @{ $netdata->{child_sockets} }, $socket;
+ }
+ return;
+ }
+
+ # Forward on to our children before doing any more processing.
+ if ( $netdata->{children} ) {
+ my $req = my_encode($cmd) . "\n";
+ foreach my $child ( @{ $netdata->{child_sockets} } ) {
+ $child->printf($req);
+ $child->flush();
+ }
+ }
+
+ if ( $cmd->{mode} eq "exit" ) {
+ $netdata->{shutdown} = 1;
+ return;
}
# Setup the environment.
@@ -6119,12 +6299,39 @@
$rmgr{ $confInner{rmgr} }{find_pids}( $cmd->{jobid} );
# Now do the work.
- $res->{from_vpid} =
- $allfns{ $cmd->{mode} }{handler_all}( $confInner{"all-pids"} );
+ my $z = $allfns{ $cmd->{mode} }{handler_all}( $confInner{"all-pids"} );
- return $res;
+ $netdata->{target_responce} = $z;
+ $netdata->{all_replys}{target_responce} = $z;
+
+ return;
+}
+
+# Time for the inner process to exit, cleanup all sockets and
+# quit.
+sub inner_cleanup_and_exit {
+ my $netdata = shift;
+ foreach my $h ( $netdata->{sel}->handles() ) {
+ $h->flush();
+ $h->close();
+ }
+ exit(0);
}
+# Send a reply to our parent, put a status of "ok" on for this
+# host.
+sub reply_to_parent {
+ my ( $netdata, $cmd ) = @_;
+
+ $cmd->{host_responce}{ok}{ hostname() } = 1;
+
+ my $reply = my_encode($cmd);
+ $netdata->{parent}->{socket}->printf("$reply\n");
+}
+
+# Process a single line of input onto a socket we are
+# listening on. This is probably our parent (who may
+# be the outer process) but it needs to be authenticated.
sub command_from_outer {
my ( $netdata, $cdata, $line ) = @_;
@@ -6146,36 +6353,40 @@
$cdata->{dead} = 1;
print("debug\n");
} else {
-
- #printf("Closing connection from $cdata->{desc} (Bad
signon)\n");
+ printf("Closing connection from $cdata->{desc} (Bad
signon)\n");
$netdata->{sel}->remove($s);
$s->close();
$cdata->{dead} = 1;
}
return;
}
-
- $cdata->{last_cmd} = my_decode($line);
- if ( $netdata->{parent} eq $cdata ) {
- my $res = command_from_parent( my_decode($line) );
- my $reply = my_encode($res);
- $cdata->{socket}->printf("$reply\n");
+
+ command_from_parent( $netdata, my_decode($line) );
+
+ if ( $netdata->{children} == 0 ) {
+ my $res;
+ $res->{target_responce} = $netdata->{target_responce};
+ reply_to_parent( $netdata, $res );
+ $netdata->{target_responce} = undef;
+
+ if ($netdata->{shutdown} ) {
+ inner_cleanup_and_exit($netdata);
+ }
}
}
-sub connect_to_outer {
+# Loop forever in the inner process.
+sub inner_loop_for_comms {
my $server = IO::Socket::INET->new(
-
- Reuse => 1,
- Proto => 'tcp',
- LocalPort => 37132,
- Listen => 2,
- ) or die("not the best start");
+ Reuse => 1,
+ Proto => 'tcp',
+ Listen => 2,
+ ) or die("Failed to create local port");
my $lport = $server->sockport();
my $hostname = hostname();
- my $key = "boris";
+ my $key = rand();
my $signon_text = "connect $hostname $lport $key\n";
# For now just print the signon code to stdout and let the outer pick
it up.
@@ -6186,6 +6397,7 @@
$netdata->{sel}->add($server);
$netdata->{server} = $server;
$netdata->{key} = $key;
+ $netdata->{shutdown} = 0;
my $sel = $netdata->{sel};
@@ -6199,7 +6411,7 @@
my $ip = inet_ntoa($addr);
my $hostname = gethostbyaddr( $addr, AF_INET );
- # printf "New connection from $hostname ($ip) $port\n";
+ #printf "New connection from $hostname ($ip) $port\n";
my %sinfo;
$sinfo{hostname} = $hostname;
$sinfo{trusted} = 0;
@@ -6216,18 +6428,18 @@
my $sinfo = $netdata->{connections}{$s};
my $d;
- sysread( $s, $d, 1024 );
+ my $count = sysread( $s, $d, 1024 );
# Dead connection.
- if ( not defined $d ) {
- printf("null read from $sinfo->{desc}\n");
+ if ( not defined $d or $count eq 0 ) {
+
+ # printf("null read from $sinfo->{desc}\n");
if ( eof($s) ) {
$sel->remove($s);
$s->close();
$sinfo->{trusted} = 0;
$sinfo->{dead} = 1;
- my $count = $sel->count();
- printf("EOF from $sinfo->{desc} $count sockets
left\n");
+ my $scount = $sel->count();
}
next;
}
@@ -6251,6 +6463,7 @@
$confInner{"edb"} = find_edb();
$confInner{"minfo"} = find_minfo();
$confInner{"open-ps"} = "";
+ $confInner{"hostname"} = hostname();
# The different options this script can perform. One (and only one) of
# these must be set.
@@ -6295,7 +6508,7 @@
# If this works then nothing below here is needed as all
# requests can be sent over the socket.
if ($full_duplex) {
- connect_to_outer();
+ inner_loop_for_comms();
}
my $mode;
@@ -6331,7 +6544,6 @@
# Load some non user-modifiable data into conf now
$confInner{"lineformatted"} = $line_formatted;
- $confInner{"hostname"} = hostname();
$confInner{"myld"} = $ENV{"LD_LIBRARY_PATH"};
More information about the padb-devel
mailing list