[padb-devel] [padb commit] r63 - add a inner-callback mode which doesn't rely on stdout

codesite-noreply at google.com codesite-noreply at google.com
Wed Jun 24 13:05:21 BST 2009


Author: apittman
Date: Wed Jun 24 01:04:19 2009
New Revision: 63

Modified:
    branches/full-duplex/src/padb

Log:
add a inner-callback mode which doesn't rely on stdout
forwarding from the parallel processes to the mpirun
command.  This should fix the problems on mpd/mpich2


Modified: branches/full-duplex/src/padb
==============================================================================
--- branches/full-duplex/src/padb	(original)
+++ branches/full-duplex/src/padb	Wed Jun 24 01:04:19 2009
@@ -243,6 +243,8 @@

  my %conf;

+my $secret;
+
  # Config options the inner knows about, only forward options if they are  
in this list.
  my @inner_conf = qw(edb edbopt minfo rmgr scripts slurm-job-step verbose);

@@ -280,11 +282,12 @@
  };

  $rmgr{"mpd"} = {
-    'is_installed'    => \&mpd_is_installed,
-    'get_active_jobs' => \&mpd_get_jobs,
-    'setup_pcmd'      => \&mpd_setup_pcmd,
-    'cleanup_pcmd'    => \&mpd_cleanup_pcmd,
-    'find_pids'       => \&mpd_find_pids,
+    'is_installed'           => \&mpd_is_installed,
+    'get_active_jobs'        => \&mpd_get_jobs,
+    'setup_pcmd'             => \&mpd_setup_pcmd,
+    'cleanup_pcmd'           => \&mpd_cleanup_pcmd,
+    'find_pids'              => \&mpd_find_pids,
+    'require_inner_callback' => 1,
  };

  $rmgr{"orte"} = {
@@ -379,7 +382,8 @@
  $conf{"local-fd-name"}       = "/dev/null";
  $conf{"stack-strip-above"} =
    "elan_waitWord,elan_pollWord,elan_deviceCheck,opal_condition_wait";
-$conf{"full-duplex"} = 0;
+$conf{"full-duplex"}    = 0;
+$conf{"inner-callback"} = 0;

  # $conf{stack-format}        = undef;

@@ -3161,6 +3165,7 @@
      $cdata->{active}  = 1;
      $cdata->{str}     = "";
      $cdata->{line_cb} = \&command_from_inner;
+    $cdata->{eof_cb}  = \&eof_from_inner;

      $comm_data->{sockets}{ $cdata->{socket} } = $cdata;
      $comm_data->{sel}->add( $cdata->{socket} );
@@ -3272,6 +3277,47 @@

  }

+sub handle_signon {
+    my ( $comm_data, $host, $port, $key ) = @_;
+
+    $comm_data->{remote}{$host}{port} = $port;
+    $comm_data->{remote}{$host}{key}  = $key;
+    $comm_data->{signons}++;
+
+    if ( $comm_data->{signons} == $comm_data->{hosts} ) {
+        connect_to_children($comm_data);
+    }
+}
+
+sub hello_from_inner {
+    my ( $comm_data, $cdata, $line ) = @_;
+
+    # Children connect back with "Hello $outerkey $hostname $port  
$innernkey";
+    my @words = split( " ", $line );
+    if ( $#words != 4 or $words[0] ne "Hello" or $words[1] ne $secret ) {
+        printf("Bad signon $line\n");
+        return 0;
+    }
+
+    handle_signon( $comm_data, $words[2], $words[3], $words[4] );
+
+    if ( $comm_data->{signons} == $comm_data->{hosts} ) {
+
+        # Don't listen on this port any more;
+        $comm_data->{sel}->remove( $comm_data->{listen} );
+        $comm_data->{listen}->close();
+
+    }
+}
+
+sub eof_from_inner {
+    my ( $comm_data, $cdata ) = @_;
+
+    if ( $comm_data->{state} ne "shutdown" ) {
+        printf("Unexpected EOF from child socket ($comm_data->{state})\n");
+    }
+}
+
  sub go_parallel {
      my $jobid = shift;
      my $cmd   = shift;
@@ -3282,6 +3328,23 @@
      my $h     = shift;
      my $hosts = shift;

+    my $comm_data;
+
+    my $sel = IO::Select->new();
+    if ( $conf{"inner-callback"} ) {
+        my $sl = IO::Socket::INET->new(
+            Reuse  => 1,
+            Proto  => 'tcp',
+            Listen => 2,
+        ) or die("Failed to create local port");
+
+        $comm_data->{listen} = $sl;
+        my $port     = $sl->sockport();
+        my $hostname = hostname();
+        $cmd .= " --outer=$hostname:$port";
+        $sel->add($sl);
+    }
+
      my $errors = 0;

      my $report_errors = 1;
@@ -3298,17 +3361,15 @@

      close $pcmd->{in};

-    my $comm_data;
-
-    $comm_data->{mode}  = $mode;
-    $comm_data->{hosts} = $hosts;
-    $comm_data->{cmd}   = $cmd;
-    $comm_data->{jobid} = $jobid;
+    $comm_data->{mode}    = $mode;
+    $comm_data->{hosts}   = $hosts;
+    $comm_data->{cmd}     = $cmd;
+    $comm_data->{jobid}   = $jobid;
+    $comm_data->{signons} = 0;

      # State, one of "connecting" "live" and "shutdown";
      $comm_data->{state} = "connecting";

-    my $sel = IO::Select->new();
      $sel->add( $pcmd->{out} );
      $sel->add( $pcmd->{err} );

@@ -3331,18 +3392,10 @@
                      my @words = split( " ", $line );
                      if ( $#words == 3 and $words[0] eq "connect" ) {

-                        my $host = $words[1];
-
-                        $comm_data->{remote}{$host}{port} = $words[2];
-                        $comm_data->{remote}{$host}{key}  = $words[3];
-                        $comm_data->{signons}++;
-
-                        if ( $comm_data->{signons} == $comm_data->{hosts}  
) {
-                            connect_to_children($comm_data);
-                        }
+                        handle_signon( $comm_data, $words[1], $words[2],
+                            $words[3] );
                          next;
-                    }
-                    if ( $words[0] eq "debug" ) {
+                    } elsif ( $words[0] eq "debug" ) {
                          my $count = $sel->count();
                          print("There are $count sockets\n");
                          next;
@@ -3366,23 +3419,34 @@
                      my $data;
                      my $nb = sysread( $h, $data, 1024 );

-                    if ( not defined $data or $nb == 0 ) {
-                        if ( not $comm_data->{state} eq "shutdown" ) {
-                            printf("EOF from child socket ($nb)\n");
+                    if ( $nb == 0 ) {
+                        if ( defined( $cdata->{eof_cb} ) ) {
+                            $cdata->{eof_cb}( $comm_data, $cdata );
                          }
                          $sel->remove($h);
                          $h->close();
-                        next;
+                    } else {
+                        $cdata->{str} .= $data;
+                        extract_line( $comm_data, $cdata );
                      }
+                } elsif ( exists( $comm_data->{listen} )
+                    and $h eq $comm_data->{listen} )
+                {
+
+                    # It's a new socket on our listening port.
+                    my $new = $h->accept();
+                    $sel->add($new);
+                    my %cdata;
+                    $cdata{str}     = "";
+                    $cdata{line_cb} = \&hello_from_inner;

-                    $cdata->{str} .= $data;
-                    extract_line( $comm_data, $cdata );
+                    $comm_data->{sockets}{$new} = \%cdata;

                  } else {
                      printf("Responce from unknown fd $h\n");
                      exit(1);
                  }
-            }
+            }    #for...
          }
          my $t2    = time() - $start;
          my $count = $sel->count();
@@ -3537,6 +3601,34 @@

  }

+sub find_padb_secret {
+
+    my $file = "$ENV{HOME}/.padb-secret";
+    if ( !-f $file ) {
+        printf("No secret file ($file)\n");
+        return;
+    }
+    my (
+        $dev,  $ino,   $mode,  $nlink, $uid,     $gid, $rdev,
+        $size, $atime, $mtime, $ctime, $blksize, $blocks
+    ) = stat($file);
+
+    # Check that the file is mode 100600 (Octal)
+    if ( $mode != 33152 ) {
+        printf("Wrong permissions on secret file, should be 0600  
($file)\n");
+    }
+
+    open( SFD, $file ) or return;
+    my @l = <SFD>;
+    close(SFD);
+    if ( $#l != 0 ) {
+        return;
+    }
+    if ( $l[0] =~ /^secret=(\w+)$/ ) {
+        return $1;
+    }
+}
+
  sub go_job {
      my $jobid = shift;
      my $mode  = shift;
@@ -3556,6 +3648,22 @@

      my $stats;

+    if ( defined $rmgr{ $conf{rmgr} }{require_inner_callback}
+        and $rmgr{ $conf{rmgr} }{require_inner_callback} )
+    {
+        $conf{"inner-callback"} = 1;
+    }
+
+    if ( $conf{"inner-callback"} ) {
+        $secret = find_padb_secret();
+
+        if ( not defined $secret ) {
+            printf("Error: No secret\n");
+            exit(1);
+        }
+
+    }
+
      foreach my $rank (@ranks) {
          $rops .= " --rank=$rank";
      }
@@ -6553,6 +6661,7 @@

  # Loop forever in the inner process.
  sub inner_loop_for_comms {
+    my ($outerloc) = @_;

      my $server = IO::Socket::INET->new(
          Reuse  => 1,
@@ -6560,13 +6669,27 @@
          Listen => 2,
      ) or die("Failed to create local port");

-    my $lport       = $server->sockport();
-    my $hostname    = hostname();
-    my $key         = rand();
-    my $signon_text = "connect $hostname $lport $key\n";
+    my $lport    = $server->sockport();
+    my $hostname = hostname();
+    my $key      = rand();
+
+    if ( defined $outerloc ) {
+        my ( $ohost, $oport ) = split( ":", $outerloc );
+        my $os = IO::Socket::INET->new(
+            PeerAddr => $ohost,
+            PeerPort => $oport,
+            Proto    => 'tcp',
+        ) or die("Failed to connect to outer");
+        my $secret = find_padb_secret();
+        die("No secret") if not defined $secret;
+        $os->print("Hello $secret $hostname $lport $key\n");
+        $os->close();
+    } else {

      # For now just print the signon code to stdout and let the outer pick  
it up.
-    print($signon_text);
+        my $signon_text = "connect $hostname $lport $key\n";
+        print($signon_text);
+    }

      my $netdata;
      $netdata->{sel} = IO::Select->new();
@@ -6650,6 +6773,7 @@
      my $line_formatted;
      my $jobid;
      my $full_duplex;
+    my $outerloc;

      my %optionhash = (
          "config-option|O=s" => \@config_options,
@@ -6659,6 +6783,7 @@
          "stats-full"        => \$stats,
          "verbose|v+"        => \$confInner{"verbose"},
          "full-duplex"       => \$full_duplex,
+        "outer=s"           => \$outerloc,
      );

      my %config_hash;
@@ -6684,7 +6809,7 @@
      # If this works then nothing below here is needed as all
      # requests can be sent over the socket.
      if ($full_duplex) {
-        inner_loop_for_comms();
+        inner_loop_for_comms($outerloc);
      }

      my $mode;




More information about the padb-devel mailing list