[padb-devel] [padb commit] r56 - First code to implement bi-directional communication

codesite-noreply at google.com codesite-noreply at google.com
Wed Jun 17 12:21:52 BST 2009


Author: apittman
Date: Wed Jun 17 02:47:07 2009
New Revision: 56

Modified:
    branches/full-duplex/src/padb

Log:
First code to implement bi-directional communication
between the outer and the inner processes, currently
it only works for one node and it only works for
--mpi-watch.


Modified: branches/full-duplex/src/padb
==============================================================================
--- branches/full-duplex/src/padb	(original)
+++ branches/full-duplex/src/padb	Wed Jun 17 02:47:07 2009
@@ -205,6 +205,8 @@
  use File::Temp qw(tempfile);
  use MIME::Base64;
  use Config;
+use IO::Socket;
+use IO::Select;

   
###############################################################################
  #
@@ -377,6 +379,7 @@
  $conf{"local-fd-name"}       = "/dev/null";
  $conf{"stack-strip-above"} =
    "elan_waitWord,elan_pollWord,elan_deviceCheck,opal_condition_wait";
+$conf{"full-duplex"} = 0;

  # $conf{stack-format}        = undef;

@@ -2779,8 +2782,8 @@

      #     print Dumper $lines;
      my $s = "";
-    foreach my $l ( sort { $a <=> $b } ( keys %{ $lines->{raw} } ) ) {
-        $s .= $lines->{raw}{$l}{state};
+    foreach my $l ( sort { $a <=> $b } ( keys %{ $lines->{from_vpid} } ) )  
{
+        $s .= $lines->{from_vpid}{$l}{state};
      }
      print("$s\n");
  }
@@ -3016,6 +3019,228 @@
      }
  }

+sub connect_to_child {
+    my ( $host, $port, $word ) = @_;
+
+    my $socket = IO::Socket::INET->new(
+        PeerAddr => $host,
+        PeerPort => $port,
+        Proto    => 'tcp',
+    ) or die("Failed to connect to child");
+
+    print $socket "hello $word\n";
+
+    #printf("Connecting to $host $port $word\n");
+    return $socket;
+}
+
+sub my_encode {
+    return encode_base64( nfreeze(shift), "" );
+}
+
+sub my_decode {
+    return thaw( decode_base64(shift) );
+}
+
+# We have read data on a socket, process it and call
+# any callback.
+sub extract_line {
+    my ( $handle, $sd ) = @_;
+
+    my $str = $sd->{str};
+
+    # Do this to allow telnet sessions to work.
+    $str =~ s/\r//g;
+
+    # printf("Testing $str\n");
+
+    if ( $str =~ /^([^\n]+)\n(.*)$/ ) {
+
+        #  printf("Calling callback for \"$1\"\n");
+        $sd->{line_cb}( $handle, $sd, $1 );
+        $sd->{str} = $2;
+    } else {
+        printf("No match\n");
+    }
+
+    return;
+
+}
+
+sub issue_command_to_inner {
+    my ( $cdata, $cmd ) = @_;
+    my $str = my_encode($cmd);
+    $cdata->{socket}->print("$str\n");
+}
+
+sub command_from_inner {
+    my ( $handle, $cdata, $line ) = @_;
+
+    if ( $line eq "Welcome" ) {
+
+        # printf("Sending data to all childen\n");
+        # Tell all hosts to go.
+
+        my $req;
+        $req->{mode}   = $handle->{mode};
+        $req->{cmd}    = $handle->{cmd};
+        $req->{jobid}  = $handle->{jobid};
+        $req->{cinner} = \%cinner;
+
+        # print Dumper $req;
+        issue_command_to_inner( $cdata, $req );
+
+        return;
+    }
+
+    # A reply from inner.
+    my $d = my_decode($line);
+
+    $allfns{ $handle->{mode} }{out_handler}( undef, $d );
+
+    my $req;
+    $req->{quit}                    = 1;
+    $handle->{shutdown_in_progress} = 1;
+    issue_command_to_inner( $cdata, $req );
+
+    print Dumper $d if ( $conf{"full-duplex"} eq "debug" );
+
+}
+
+sub go_parallel {
+    my $jobid = shift;
+    my $cmd   = shift;
+    my $ncpus = shift;
+    my $raw   = shift;
+    my $stats = shift;
+    my $mode  = shift;
+    my $h     = shift;
+
+    my $errors = 0;
+
+    my $report_errors = 1;
+
+    my $pcmd = {
+        pid => -1,
+        in  => "",
+        out => *OUT,
+        err => *ERR,
+    };
+
+    $pcmd->{pid} = open3( $pcmd->{in}, *OUT, *ERR, $cmd )
+      or die "Unable to open3() pcmd: $!\n";
+
+    close $pcmd->{in};
+
+    my $comm_data;
+
+    $comm_data->{mode}                 = $mode;
+    $comm_data->{cmd}                  = $cmd;
+    $comm_data->{jobid}                = $jobid;
+    $comm_data->{shutdown_in_progress} = 0;
+
+    my $sel = IO::Select->new();
+    $sel->add( $pcmd->{out} );
+    $sel->add( $pcmd->{err} );
+
+    while ( my @live = $sel->can_read() ) {
+        foreach my $h (@live) {
+            if ( $h eq $pcmd->{out} ) {
+                my $line = $h->getline();
+                if ( not defined $line ) {
+                    if ( not $comm_data->{shutdown_in_progress} ) {
+                        printf("Warning, EOF from ofd\n");
+                    }
+                    $sel->remove($h);
+                    $h->close();
+                    next;
+                }
+                my @words = split( " ", $line );
+                if ( $#words == 3 and $words[0] eq "connect" ) {
+
+                    my $socket =
+                      connect_to_child( $words[1], $words[2], $words[3] );
+                    my $cdata;
+                    $cdata->{active}               = 1;
+                    $cdata->{str}                  = "";
+                    $cdata->{socket}               = $socket;
+                    $cdata->{line_cb}              = \&command_from_inner;
+                    $comm_data->{sockets}{$socket} = $cdata;
+                    $sel->add($socket);
+                    next;
+                }
+                if ( $words[0] eq "debug" ) {
+                    my $count = $sel->count();
+                    print("There are $count sockets\n");
+                    next;
+                }
+                print("inner: $line");
+            } elsif ( $h eq $pcmd->{err} ) {
+                my $line = $h->getline();
+
+                if ( not defined $line ) {
+                    if ( not $comm_data->{shutdown_in_progress} ) {
+                        printf("Warning, EOF from efd\n");
+                    }
+                    $sel->remove($h);
+                    $h->close();
+                    next;
+                }
+                printf("einner:$line");
+            } elsif ( defined $comm_data->{sockets}{$h} ) {
+                my $cdata = $comm_data->{sockets}{$h};
+
+                my $data;
+                my $nb = sysread( $h, $data, 1024 );
+
+                #printf("read $data ($nb) from fd\n");
+
+                if ( not defined $data or $nb == 0 ) {
+                    if ( not $comm_data->{shutdown_in_progress} ) {
+                        printf("EOF from child socket ($nb)\n");
+                    }
+                    $sel->remove($h);
+                    $h->close();
+                    next;
+                }
+
+                $cdata->{str} .= $data;
+                extract_line( $comm_data, $cdata );
+
+            } else {
+                printf("Responce from unknown fd $h\n");
+                exit(1);
+            }
+        }
+        my $count = $sel->count();
+        if ( $count == 1 ) {
+            printf("All sockets closed?\n");
+        }
+    }
+
+    waitpid( $pcmd->{pid}, 0 );
+    my $res = $?;
+
+    printf("result from parallel command was $res\n")
+      if ( $conf{"verbose"} );
+
+    if ( $res != 0 ) {
+        my %status = rc_status($res);
+        if ( job_is_running($jobid) ) {
+            if ($report_errors) {
+                printf("Failed to run parallel command (rc =  
$status{rc})\n");
+            }
+        } else {
+            printf("Job $jobid is no longer active\n");
+            return 1;
+        }
+    }
+
+    cleanup_pcmd();
+
+    exit(0);
+}
+
  sub go_job_once {
      my $jobid = shift;
      my $cmd   = shift;
@@ -3226,7 +3451,13 @@
              sleep( $conf{"interval"} );
          }
      }
-    my $errors = go_job_once( $jobid, $cmd, $ncpus, $raw, $stats, $mode,  
$h );
+    my $errors;
+    if ( $conf{"full-duplex"} ) {
+        $errors = go_parallel( $jobid, "$cmd --full-duplex",
+            $ncpus, $raw, $stats, $mode, $h );
+    } else {
+        $errors = go_job_once( $jobid, $cmd, $ncpus, $raw, $stats, $mode,  
$h );
+    }
      cleanup_pcmd();
      return $errors;
  }
@@ -5505,6 +5736,17 @@
      return \%res;
  }

+sub mpi_watch_all {
+    my ($list) = @_;
+    my %res;
+    foreach my $proc ( @{$list} ) {
+        my $vp  = $proc->{vp};
+        my $pid = $proc->{pid};
+        $res{$vp} = mpi_watch( $vp, $pid );
+    }
+    return \%res;
+}
+
  sub show_pid {
      my ( $vp, $pid ) = @_;

@@ -5854,6 +6096,154 @@
      );
  }

+sub command_from_parent {
+    my $cmd = shift;
+    my $res;
+
+    # This is only for debugging.
+    if ( $confInner{verbose} ) {
+        $res->{request} = $cmd;
+    }
+
+    if ( $cmd->{quit} == 1 ) {
+        exit(0);
+    }
+
+    # Setup the environment.
+    foreach my $key ( keys( %{ $cmd->{cinner} } ) ) {
+        $confInner{$key} = $cmd->{cinner}{$key};
+    }
+
+    $confInner{mode} = $cmd->{mode};
+
+    # Find the pids and register them all.
+    $rmgr{ $confInner{rmgr} }{find_pids}( $cmd->{jobid} );
+
+    # Now do the work.
+    $res->{from_vpid} =
+      $allfns{ $cmd->{mode} }{handler_all}( $confInner{"all-pids"} );
+
+    return $res;
+}
+
+sub command_from_outer {
+    my ( $netdata, $cdata, $line ) = @_;
+
+    my $s = $cdata->{socket};
+    if ( not $cdata->{trusted} ) {
+        if ( $line eq "hello $netdata->{key}" ) {
+
+            #printf("Trusting connection from $cdata->{desc}\n");
+            $cdata->{trusted} = 1;
+            $cdata->{str}     = "";
+            $s->printf("Welcome\n");
+            $netdata->{parent} = $cdata;
+        } elsif ( $line eq "debug" ) {
+            my $r = Dumper($netdata);
+            $s->printf($r);
+            $s->flush();
+            $netdata->{sel}->remove($s);
+            $s->close();
+            $cdata->{dead} = 1;
+            print("debug\n");
+        } else {
+
+            #printf("Closing connection from $cdata->{desc} (Bad  
signon)\n");
+            $netdata->{sel}->remove($s);
+            $s->close();
+            $cdata->{dead} = 1;
+        }
+        return;
+    }
+
+    $cdata->{last_cmd} = my_decode($line);
+    if ( $netdata->{parent} eq $cdata ) {
+        my $res   = command_from_parent( my_decode($line) );
+        my $reply = my_encode($res);
+        $cdata->{socket}->printf("$reply\n");
+    }
+}
+
+sub connect_to_outer {
+
+    my $server = IO::Socket::INET->new(
+
+        Reuse     => 1,
+        Proto     => 'tcp',
+        LocalPort => 37132,
+        Listen    => 2,
+    ) or die("not the best start");
+
+    my $lport       = $server->sockport();
+    my $hostname    = hostname();
+    my $key         = "boris";
+    my $signon_text = "connect $hostname $lport $key\n";
+
+    # For now just print the signon code to stdout and let the outer pick  
it up.
+    print($signon_text);
+
+    my $netdata;
+    $netdata->{sel} = IO::Select->new();
+    $netdata->{sel}->add($server);
+    $netdata->{server} = $server;
+    $netdata->{key}    = $key;
+
+    my $sel    = $netdata->{sel};
+    my $server = $netdata->{server};
+
+    while ( my @data = $sel->can_read() ) {
+        foreach my $s (@data) {
+            if ( $s == $server ) {
+                my $new = $server->accept() or die("Failed accept");
+                $sel->add($new);
+                my $peer = getpeername($new);
+                my ( $port, $addr ) = unpack_sockaddr_in($peer);
+                my $ip = inet_ntoa($addr);
+                my $hostname = gethostbyaddr( $addr, AF_INET );
+
+                # printf "New connection from $hostname ($ip) $port\n";
+                my %sinfo;
+                $sinfo{hostname}              = $hostname;
+                $sinfo{trusted}               = 0;
+                $sinfo{port}                  = $port;
+                $sinfo{desc}                  = "$hostname:$port";
+                $sinfo{socket}                = $new;
+                $sinfo{line_cb}               = \&command_from_outer;
+                $netdata->{connections}{$new} = \%sinfo;
+
+                # $new->printf("Hello from padb\n");
+                #$new->autoflush();
+                next;
+            }
+
+            my $sinfo = $netdata->{connections}{$s};
+            my $d;
+            sysread( $s, $d, 1024 );
+
+            # Dead connection.
+            if ( not defined $d ) {
+                printf("null read from $sinfo->{desc}\n");
+                if ( eof($s) ) {
+                    $sel->remove($s);
+                    $s->close();
+                    $sinfo->{trusted} = 0;
+                    $sinfo->{dead}    = 1;
+                    my $count = $sel->count();
+                    printf("EOF from $sinfo->{desc} $count sockets  
left\n");
+                }
+                next;
+            }
+
+            $sinfo->{str} .= $d;
+            extract_line( $netdata, $sinfo );
+
+        }
+    }
+    my $count = $sel->count();
+    printf("Thats not supposed to happen count=($count)\n");
+
+}
+
  sub inner_main {

      $confInner{"slurm-job-step"} = "0";
@@ -5872,6 +6262,7 @@
      my @config_options;
      my $line_formatted;
      my $jobid;
+    my $full_duplex;

      my %optionhash = (
          "config-option|O=s" => \@config_options,
@@ -5879,7 +6270,8 @@
          "line-formatted"    => \$line_formatted,
          "rank=i"            => \@ranks,
          "stats-full"        => \$stats,
-        "verbose|v+"        => \$confInner{"verbose"}
+        "verbose|v+"        => \$confInner{"verbose"},
+        "full-duplex"       => \$full_duplex,
      );

      my %config_hash;
@@ -5902,6 +6294,12 @@

      GetOptions(%optionhash) or die("could not parse options\n");

+    # If this works then nothing below here is needed as all
+    # requests can be sent over the socket.
+    if ($full_duplex) {
+        connect_to_outer();
+    }
+
      my $mode;

      foreach my $arg ( keys %config_hash ) {
@@ -6113,7 +6511,7 @@
      };

      $allfns{mpi_watch} = {
-        'handler'         => \&mpi_watch,
+        'handler_all'     => \&mpi_watch_all,
          'arg_long'        => 'mpi-watch',
          'help'            => "Trace MPI programs",
          'pre_out_handler' => \&pre_mpi_watch,




More information about the padb-devel mailing list