[padb-devel] [padb commit] r63 - add a inner-callback mode which doesn't rely on stdout
codesite-noreply at google.com
codesite-noreply at google.com
Wed Jun 24 13:05:21 BST 2009
Author: apittman
Date: Wed Jun 24 01:04:19 2009
New Revision: 63
Modified:
branches/full-duplex/src/padb
Log:
add a inner-callback mode which doesn't rely on stdout
forwarding from the parallel processes to the mpirun
command. This should fix the problems on mpd/mpich2
Modified: branches/full-duplex/src/padb
==============================================================================
--- branches/full-duplex/src/padb (original)
+++ branches/full-duplex/src/padb Wed Jun 24 01:04:19 2009
@@ -243,6 +243,8 @@
my %conf;
+my $secret;
+
# Config options the inner knows about, only forward options if they are
in this list.
my @inner_conf = qw(edb edbopt minfo rmgr scripts slurm-job-step verbose);
@@ -280,11 +282,12 @@
};
$rmgr{"mpd"} = {
- 'is_installed' => \&mpd_is_installed,
- 'get_active_jobs' => \&mpd_get_jobs,
- 'setup_pcmd' => \&mpd_setup_pcmd,
- 'cleanup_pcmd' => \&mpd_cleanup_pcmd,
- 'find_pids' => \&mpd_find_pids,
+ 'is_installed' => \&mpd_is_installed,
+ 'get_active_jobs' => \&mpd_get_jobs,
+ 'setup_pcmd' => \&mpd_setup_pcmd,
+ 'cleanup_pcmd' => \&mpd_cleanup_pcmd,
+ 'find_pids' => \&mpd_find_pids,
+ 'require_inner_callback' => 1,
};
$rmgr{"orte"} = {
@@ -379,7 +382,8 @@
$conf{"local-fd-name"} = "/dev/null";
$conf{"stack-strip-above"} =
"elan_waitWord,elan_pollWord,elan_deviceCheck,opal_condition_wait";
-$conf{"full-duplex"} = 0;
+$conf{"full-duplex"} = 0;
+$conf{"inner-callback"} = 0;
# $conf{stack-format} = undef;
@@ -3161,6 +3165,7 @@
$cdata->{active} = 1;
$cdata->{str} = "";
$cdata->{line_cb} = \&command_from_inner;
+ $cdata->{eof_cb} = \&eof_from_inner;
$comm_data->{sockets}{ $cdata->{socket} } = $cdata;
$comm_data->{sel}->add( $cdata->{socket} );
@@ -3272,6 +3277,47 @@
}
+sub handle_signon {
+ my ( $comm_data, $host, $port, $key ) = @_;
+
+ $comm_data->{remote}{$host}{port} = $port;
+ $comm_data->{remote}{$host}{key} = $key;
+ $comm_data->{signons}++;
+
+ if ( $comm_data->{signons} == $comm_data->{hosts} ) {
+ connect_to_children($comm_data);
+ }
+}
+
+sub hello_from_inner {
+ my ( $comm_data, $cdata, $line ) = @_;
+
+ # Children connect back with "Hello $outerkey $hostname $port
$innernkey";
+ my @words = split( " ", $line );
+ if ( $#words != 4 or $words[0] ne "Hello" or $words[1] ne $secret ) {
+ printf("Bad signon $line\n");
+ return 0;
+ }
+
+ handle_signon( $comm_data, $words[2], $words[3], $words[4] );
+
+ if ( $comm_data->{signons} == $comm_data->{hosts} ) {
+
+ # Don't listen on this port any more;
+ $comm_data->{sel}->remove( $comm_data->{listen} );
+ $comm_data->{listen}->close();
+
+ }
+}
+
+sub eof_from_inner {
+ my ( $comm_data, $cdata ) = @_;
+
+ if ( $comm_data->{state} ne "shutdown" ) {
+ printf("Unexpected EOF from child socket ($comm_data->{state})\n");
+ }
+}
+
sub go_parallel {
my $jobid = shift;
my $cmd = shift;
@@ -3282,6 +3328,23 @@
my $h = shift;
my $hosts = shift;
+ my $comm_data;
+
+ my $sel = IO::Select->new();
+ if ( $conf{"inner-callback"} ) {
+ my $sl = IO::Socket::INET->new(
+ Reuse => 1,
+ Proto => 'tcp',
+ Listen => 2,
+ ) or die("Failed to create local port");
+
+ $comm_data->{listen} = $sl;
+ my $port = $sl->sockport();
+ my $hostname = hostname();
+ $cmd .= " --outer=$hostname:$port";
+ $sel->add($sl);
+ }
+
my $errors = 0;
my $report_errors = 1;
@@ -3298,17 +3361,15 @@
close $pcmd->{in};
- my $comm_data;
-
- $comm_data->{mode} = $mode;
- $comm_data->{hosts} = $hosts;
- $comm_data->{cmd} = $cmd;
- $comm_data->{jobid} = $jobid;
+ $comm_data->{mode} = $mode;
+ $comm_data->{hosts} = $hosts;
+ $comm_data->{cmd} = $cmd;
+ $comm_data->{jobid} = $jobid;
+ $comm_data->{signons} = 0;
# State, one of "connecting" "live" and "shutdown";
$comm_data->{state} = "connecting";
- my $sel = IO::Select->new();
$sel->add( $pcmd->{out} );
$sel->add( $pcmd->{err} );
@@ -3331,18 +3392,10 @@
my @words = split( " ", $line );
if ( $#words == 3 and $words[0] eq "connect" ) {
- my $host = $words[1];
-
- $comm_data->{remote}{$host}{port} = $words[2];
- $comm_data->{remote}{$host}{key} = $words[3];
- $comm_data->{signons}++;
-
- if ( $comm_data->{signons} == $comm_data->{hosts}
) {
- connect_to_children($comm_data);
- }
+ handle_signon( $comm_data, $words[1], $words[2],
+ $words[3] );
next;
- }
- if ( $words[0] eq "debug" ) {
+ } elsif ( $words[0] eq "debug" ) {
my $count = $sel->count();
print("There are $count sockets\n");
next;
@@ -3366,23 +3419,34 @@
my $data;
my $nb = sysread( $h, $data, 1024 );
- if ( not defined $data or $nb == 0 ) {
- if ( not $comm_data->{state} eq "shutdown" ) {
- printf("EOF from child socket ($nb)\n");
+ if ( $nb == 0 ) {
+ if ( defined( $cdata->{eof_cb} ) ) {
+ $cdata->{eof_cb}( $comm_data, $cdata );
}
$sel->remove($h);
$h->close();
- next;
+ } else {
+ $cdata->{str} .= $data;
+ extract_line( $comm_data, $cdata );
}
+ } elsif ( exists( $comm_data->{listen} )
+ and $h eq $comm_data->{listen} )
+ {
+
+ # It's a new socket on our listening port.
+ my $new = $h->accept();
+ $sel->add($new);
+ my %cdata;
+ $cdata{str} = "";
+ $cdata{line_cb} = \&hello_from_inner;
- $cdata->{str} .= $data;
- extract_line( $comm_data, $cdata );
+ $comm_data->{sockets}{$new} = \%cdata;
} else {
printf("Responce from unknown fd $h\n");
exit(1);
}
- }
+ } #for...
}
my $t2 = time() - $start;
my $count = $sel->count();
@@ -3537,6 +3601,34 @@
}
+sub find_padb_secret {
+
+ my $file = "$ENV{HOME}/.padb-secret";
+ if ( !-f $file ) {
+ printf("No secret file ($file)\n");
+ return;
+ }
+ my (
+ $dev, $ino, $mode, $nlink, $uid, $gid, $rdev,
+ $size, $atime, $mtime, $ctime, $blksize, $blocks
+ ) = stat($file);
+
+ # Check that the file is mode 100600 (Octal)
+ if ( $mode != 33152 ) {
+ printf("Wrong permissions on secret file, should be 0600
($file)\n");
+ }
+
+ open( SFD, $file ) or return;
+ my @l = <SFD>;
+ close(SFD);
+ if ( $#l != 0 ) {
+ return;
+ }
+ if ( $l[0] =~ /^secret=(\w+)$/ ) {
+ return $1;
+ }
+}
+
sub go_job {
my $jobid = shift;
my $mode = shift;
@@ -3556,6 +3648,22 @@
my $stats;
+ if ( defined $rmgr{ $conf{rmgr} }{require_inner_callback}
+ and $rmgr{ $conf{rmgr} }{require_inner_callback} )
+ {
+ $conf{"inner-callback"} = 1;
+ }
+
+ if ( $conf{"inner-callback"} ) {
+ $secret = find_padb_secret();
+
+ if ( not defined $secret ) {
+ printf("Error: No secret\n");
+ exit(1);
+ }
+
+ }
+
foreach my $rank (@ranks) {
$rops .= " --rank=$rank";
}
@@ -6553,6 +6661,7 @@
# Loop forever in the inner process.
sub inner_loop_for_comms {
+ my ($outerloc) = @_;
my $server = IO::Socket::INET->new(
Reuse => 1,
@@ -6560,13 +6669,27 @@
Listen => 2,
) or die("Failed to create local port");
- my $lport = $server->sockport();
- my $hostname = hostname();
- my $key = rand();
- my $signon_text = "connect $hostname $lport $key\n";
+ my $lport = $server->sockport();
+ my $hostname = hostname();
+ my $key = rand();
+
+ if ( defined $outerloc ) {
+ my ( $ohost, $oport ) = split( ":", $outerloc );
+ my $os = IO::Socket::INET->new(
+ PeerAddr => $ohost,
+ PeerPort => $oport,
+ Proto => 'tcp',
+ ) or die("Failed to connect to outer");
+ my $secret = find_padb_secret();
+ die("No secret") if not defined $secret;
+ $os->print("Hello $secret $hostname $lport $key\n");
+ $os->close();
+ } else {
# For now just print the signon code to stdout and let the outer pick
it up.
- print($signon_text);
+ my $signon_text = "connect $hostname $lport $key\n";
+ print($signon_text);
+ }
my $netdata;
$netdata->{sel} = IO::Select->new();
@@ -6650,6 +6773,7 @@
my $line_formatted;
my $jobid;
my $full_duplex;
+ my $outerloc;
my %optionhash = (
"config-option|O=s" => \@config_options,
@@ -6659,6 +6783,7 @@
"stats-full" => \$stats,
"verbose|v+" => \$confInner{"verbose"},
"full-duplex" => \$full_duplex,
+ "outer=s" => \$outerloc,
);
my %config_hash;
@@ -6684,7 +6809,7 @@
# If this works then nothing below here is needed as all
# requests can be sent over the socket.
if ($full_duplex) {
- inner_loop_for_comms();
+ inner_loop_for_comms($outerloc);
}
my $mode;
More information about the padb-devel
mailing list