[padb-devel] [padb commit] r59 - The tree code is now present and working in at least one case, startup,

codesite-noreply at google.com codesite-noreply at google.com
Fri Jun 19 15:40:05 BST 2009


Author: apittman
Date: Fri Jun 19 07:39:30 2009
New Revision: 59

Modified:
    branches/full-duplex/src/padb

Log:
The tree code is now present and working in at least one case, startup,
connecting, signon, reply, command processing and clean shutdown now
work correctly across multiple hosts.


Modified: branches/full-duplex/src/padb
==============================================================================
--- branches/full-duplex/src/padb	(original)
+++ branches/full-duplex/src/padb	Fri Jun 19 07:39:30 2009
@@ -626,7 +626,7 @@
      }

      # Put the args in a hash so that they can be referenced by name.
-    if ( defined $allfns{$mode}{secondary} ) {
+    if ( defined $mode and defined $allfns{$mode}{secondary} ) {
          foreach my $sec ( @{ $allfns{$mode}{secondary} } ) {
              $secondary_args{ $sec->{arg_long} } = $sec->{value};
          }
@@ -2321,9 +2321,11 @@

      $mpd_dfile = $fn;

-    my $cmd = "mpirun -machinefile $fn -np $i";
+    my $cmd = "mpdrun -machinefile $fn -np $i";
+
+    my $hosts = $#hosts + 1;

-    return ( $cmd, undef );
+    return ( $cmd, undef, $hosts );
  }

  sub mpd_cleanup_pcmd {
@@ -2413,9 +2415,9 @@

      }

-    if ( $conf{"verbose"} ) {
-        print Dumper \%open_jobs;
-    }
+    #if ( $conf{"verbose"} ) {
+    #    print Dumper \%open_jobs;
+    #}
  }

  sub open_get_jobs {
@@ -2457,9 +2459,10 @@
      $open_dfile = $fn;

      my $prefix = find_ompi_prefix();
-    my $cmd    = "mpirun -machinefile $fn -np $i $prefix";
+    my $cmd    = "orterun -machinefile $fn -np $i $prefix";
+    my $hosts  = $#hosts + 1;

-    return ( $cmd, undef );
+    return ( $cmd, undef, $hosts );
  }

  sub open_cleanup_pcmd {
@@ -2782,8 +2785,9 @@

      #     print Dumper $lines;
      my $s = "";
-    foreach my $l ( sort { $a <=> $b } ( keys %{ $lines->{from_vpid} } ) )  
{
-        $s .= $lines->{from_vpid}{$l}{state};
+    foreach my $l ( sort { $a <=> $b } ( keys %{ $lines->{target_responce}  
} ) )
+    {
+        $s .= $lines->{target_responce}{$l}{state};
      }
      print("$s\n");
  }
@@ -3029,7 +3033,6 @@

      print $socket "hello $word\n";

-    #printf("Connecting to $host $port $word\n");
      return $socket;
  }

@@ -3059,13 +3062,58 @@
          $sd->{line_cb}( $handle, $sd, $1 );
          $sd->{str} = $2;
      } else {
-        printf("No match\n");
+
+        # Likely just truncated input, wait for more to arrive.
+        # printf("No match ()\n");
      }

      return;

  }

+# For each remote process generate a tree, giving each
+# process a parent and a number of children.
+# Currently just make this a simple "ladder" but should
+# probably be a f-nomial tree.
+sub generate_comm_tree {
+    my ($a)  = @_;
+    my @b    = @{$a};
+    my $last = "root";
+    my %comm_tree;
+    foreach my $c (@b) {
+        $comm_tree{$c}{parent} = $last;
+        push( @{ $comm_tree{$last}{children} }, $c );
+        $last = $c;
+    }
+
+    return \%comm_tree;
+}
+
+# Called once when we have the socket details of the last child.
+sub connect_to_children {
+    my $comm_data = shift;
+    @{ $comm_data->{host_ids} } = sort( keys( %{ $comm_data->{remote} } )  
);
+    $comm_data->{connection_tree} =
+      generate_comm_tree( $comm_data->{host_ids} );
+    my $td = $comm_data->{connection_tree}->{root}{children}[0];
+
+    #printf("I'm connecting to $td\n");
+    my $cdata;
+    $cdata->{socket} = connect_to_child(
+        $td,
+        $comm_data->{remote}{$td}{port},
+        $comm_data->{remote}{$td}{key}
+    );
+    $cdata->{active}  = 1;
+    $cdata->{str}     = "";
+    $cdata->{line_cb} = \&command_from_inner;
+
+    $comm_data->{sockets}{ $cdata->{socket} } = $cdata;
+    $comm_data->{sel}->add( $cdata->{socket} );
+
+    #print Dumper $comm_data;
+}
+
  sub issue_command_to_inner {
      my ( $cdata, $cmd ) = @_;
      my $str = my_encode($cmd);
@@ -3076,33 +3124,46 @@
      my ( $handle, $cdata, $line ) = @_;

      if ( $line eq "Welcome" ) {
+        my $req;
+        $req->{mode}            = "signon";
+        $req->{connection_tree} = $handle->{connection_tree};
+        $req->{remote}          = $handle->{remote};
+	issue_command_to_inner( $cdata, $req );
+        return;
+    }

-        # printf("Sending data to all childen\n");
-        # Tell all hosts to go.
+    # A reply from inner.
+    my $d = my_decode($line);

+    if ( $handle->{state} eq "connecting" ) {
+        $handle->{state} = "live";
          my $req;
          $req->{mode}   = $handle->{mode};
-        $req->{cmd}    = $handle->{cmd};
-        $req->{jobid}  = $handle->{jobid};
+	$req->{jobid}  = $handle->{jobid};
          $req->{cinner} = \%cinner;
-
-        # print Dumper $req;
          issue_command_to_inner( $cdata, $req );
-
          return;
      }

-    # A reply from inner.
-    my $d = my_decode($line);
+    if ( $handle->{state} eq "live" ) {
+        $handle->{state} = "shutdown";
+        my $req;
+        $req->{mode} = "exit";
+        issue_command_to_inner( $cdata, $req );
+	
+
+        $allfns{ $handle->{mode} }{out_handler}( undef, $d );
+        return;
+    }

-    $allfns{ $handle->{mode} }{out_handler}( undef, $d );
+    if ( $handle->{state} eq "shutdown" ) {

-    my $req;
-    $req->{quit}                    = 1;
-    $handle->{shutdown_in_progress} = 1;
-    issue_command_to_inner( $cdata, $req );
+        # Nothing to do here.
+        return;
+    }

-    print Dumper $d if ( $conf{"full-duplex"} eq "debug" );
+    print("Hmm, unknown state! $handle->{state}\n");
+    return;

  }

@@ -3114,6 +3175,7 @@
      my $stats = shift;
      my $mode  = shift;
      my $h     = shift;
+    my $hosts = shift;

      my $errors = 0;

@@ -3133,87 +3195,102 @@

      my $comm_data;

-    $comm_data->{mode}                 = $mode;
-    $comm_data->{cmd}                  = $cmd;
-    $comm_data->{jobid}                = $jobid;
-    $comm_data->{shutdown_in_progress} = 0;
+    $comm_data->{mode}  = $mode;
+    $comm_data->{hosts} = $hosts;
+    $comm_data->{cmd}   = $cmd;
+    $comm_data->{jobid} = $jobid;
+
+    # State, one of "connecting" "live" and "shutdown";
+    $comm_data->{state} = "connecting";

      my $sel = IO::Select->new();
      $sel->add( $pcmd->{out} );
      $sel->add( $pcmd->{err} );

-    while ( my @live = $sel->can_read() ) {
-        foreach my $h (@live) {
-            if ( $h eq $pcmd->{out} ) {
-                my $line = $h->getline();
-                if ( not defined $line ) {
-                    if ( not $comm_data->{shutdown_in_progress} ) {
-                        printf("Warning, EOF from ofd\n");
+    $comm_data->{sel} = $sel;
+    my $start = time();
+
+    while ( $sel->count() > 1 ) {
+        while ( my @live = $sel->can_read(5) ) {
+            foreach my $h (@live) {
+                if ( $h eq $pcmd->{out} ) {
+                    my $line = $h->getline();
+                    if ( not defined $line ) {
+                        if ( not $comm_data->{state} eq "shutdown" ) {
+                            printf("Warning, EOF from ofd\n");
+                        }
+                        $sel->remove($h);
+                        $h->close();
+                        next;
                      }
-                    $sel->remove($h);
-                    $h->close();
-                    next;
-                }
-                my @words = split( " ", $line );
-                if ( $#words == 3 and $words[0] eq "connect" ) {
-
-                    my $socket =
-                      connect_to_child( $words[1], $words[2], $words[3] );
-                    my $cdata;
-                    $cdata->{active}               = 1;
-                    $cdata->{str}                  = "";
-                    $cdata->{socket}               = $socket;
-                    $cdata->{line_cb}              = \&command_from_inner;
-                    $comm_data->{sockets}{$socket} = $cdata;
-                    $sel->add($socket);
-                    next;
-                }
-                if ( $words[0] eq "debug" ) {
-                    my $count = $sel->count();
-                    print("There are $count sockets\n");
-                    next;
-                }
-                print("inner: $line");
-            } elsif ( $h eq $pcmd->{err} ) {
-                my $line = $h->getline();
-
-                if ( not defined $line ) {
-                    if ( not $comm_data->{shutdown_in_progress} ) {
-                        printf("Warning, EOF from efd\n");
+                    my @words = split( " ", $line );
+                    if ( $#words == 3 and $words[0] eq "connect" ) {
+
+                        my $host = $words[1];
+
+			$comm_data->{remote}{$host}{port} = $words[2];
+                        $comm_data->{remote}{$host}{key}  = $words[3];
+                        $comm_data->{signons}++;
+
+                        if ( $comm_data->{signons} == $comm_data->{hosts}  
) {
+                            connect_to_children($comm_data);
+                        }
+                        next;
                      }
-                    $sel->remove($h);
-                    $h->close();
-                    next;
-                }
-                printf("einner:$line");
-            } elsif ( defined $comm_data->{sockets}{$h} ) {
-                my $cdata = $comm_data->{sockets}{$h};
-
-                my $data;
-                my $nb = sysread( $h, $data, 1024 );
-
-                #printf("read $data ($nb) from fd\n");
-
-                if ( not defined $data or $nb == 0 ) {
-                    if ( not $comm_data->{shutdown_in_progress} ) {
-                        printf("EOF from child socket ($nb)\n");
+                    if ( $words[0] eq "debug" ) {
+                        my $count = $sel->count();
+                        print("There are $count sockets\n");
+                        next;
+                    }
+                    print("inner: $line");
+                } elsif ( $h eq $pcmd->{err} ) {
+                    my $line = $h->getline();
+
+                    if ( not defined $line ) {
+                        if ( not $comm_data->{state} eq "shutdown" ) {
+                            printf("Warning, EOF from efd\n");
+                        }
+                        $sel->remove($h);
+                        $h->close();
+                        next;
+                    }
+                    printf("einner:$line");
+                } elsif ( defined $comm_data->{sockets}{$h} ) {
+                    my $cdata = $comm_data->{sockets}{$h};
+
+                    my $data;
+                    my $nb = sysread( $h, $data, 1024 );
+
+		    if ( not defined $data or $nb == 0 ) {
+                        if ( not $comm_data->{state} eq "shutdown" ) {
+                            printf("EOF from child socket ($nb)\n");
+                        }
+                        $sel->remove($h);
+                        $h->close();
+                        next;
                      }
-                    $sel->remove($h);
-                    $h->close();
-                    next;
-                }

-                $cdata->{str} .= $data;
-                extract_line( $comm_data, $cdata );
+                    $cdata->{str} .= $data;
+                    extract_line( $comm_data, $cdata );

-            } else {
-                printf("Responce from unknown fd $h\n");
-                exit(1);
+                } else {
+                    printf("Responce from unknown fd $h\n");
+                    exit(1);
+                }
+            }
+            my $count = $sel->count();
+            if ( $count == 1 ) {
+                printf("All sockets closed?\n");
              }
          }
+        my $t2    = time() - $start;
          my $count = $sel->count();
-        if ( $count == 1 ) {
-            printf("All sockets closed?\n");
+        if ( $count > 0 ) {
+            printf("Still here, time:$t2 comm_count:$count\n");
+            if ( $comm_data->{signons} != $comm_data->{hosts} ) {
+                my $missing = $comm_data->{hosts} - $comm_data->{signons};
+                print("Waiting for signon from $missing hosts.\n");
+            }
          }
      }

@@ -3375,9 +3452,6 @@

      my $key = job_to_key($jobid);

-    my $cmd;
-    my $ncpus;
-
      my $stats;

      foreach my $rank (@ranks) {
@@ -3394,10 +3468,14 @@

      return 1 unless (@res);

-    $cmd   = $res[0];
-    $ncpus = $res[1];
+    my $cmd = $res[0];
+
+    # These two are only defined by some resource managers.
+    my $ncpus = $res[1];
+    my $hosts = $res[2];

      $conf{"verbose"} && defined $ncpus && print "Job has $ncpus cpus\n";
+    $conf{"verbose"} && defined $hosts && print "Job has $hosts hosts\n";

      # Some versions of perl like to have a space after the O and report  
that
      # -ormgr isn't a valid option if it's not there, perhaps this is a bug
@@ -3452,8 +3530,13 @@
      }
      my $errors;
      if ( $conf{"full-duplex"} ) {
+        if ( not defined $hosts ) {
+            printf("Full duplex mode needs to know the host count\n");
+            printf("Which is doesn't for this resource manager:  
$conf{rmgr}\n");
+            return 1;
+        }
          $errors = go_parallel( $jobid, "$cmd --full-duplex",
-            $ncpus, $raw, $stats, $mode, $h );
+            $ncpus, $raw, $stats, $mode, $h, $hosts );
      } else {
          $errors = go_job_once( $jobid, $cmd, $ncpus, $raw, $stats, $mode,  
$h );
      }
@@ -6095,17 +6178,114 @@
      );
  }

-sub command_from_parent {
-    my $cmd = shift;
-    my $res;
+# Receive a reply from a child.
+# If it's the last reply then combine
+# with others and forward to parent.
+sub reply_from_child {
+    my ( $handle, $sd, $req ) = @_;
+
+    # If it's the first connection over this socket simply
+    # foreward on the signon command.
+    if ( $req eq "Welcome" ) {
+        $sd->{socket}->printf("$handle->{signon_cmd}\n");
+        return;
+    }
+
+    my $r = my_decode($req);

-    # This is only for debugging.
-    if ( $confInner{verbose} ) {
-        $res->{request} = $cmd;
+    # Merge this reply into the local one.
+    $handle->{child_replys}++;
+
+    # $handle->{all_replys}{raw}{ $sd->{hostname} } = $r;
+
+    # Combine the host responces.
+    foreach my $status ( keys( %{ $r->{host_responce} } ) ) {
+        foreach my $host ( keys( %{ $r->{host_responce}{$status} } ) ) {
+            $handle->{all_replys}->{host_responce}{$status}{$host} =
+              $r->{host_responce}{$status}{$host};
+        }
      }

-    if ( defined( $cmd->{quit} ) and ( $cmd->{quit} == 1 ) ) {
-        exit(0);
+    # Combine the target process responces.
+    if ( exists $r->{target_responce} ) {
+        foreach my $tp ( keys( %{ $r->{target_responce} } ) ) {
+            $handle->{all_replys}->{target_responce}{$tp} =
+              $r->{target_responce}{$tp};
+        }
+    }
+
+    # If this is the last reply from a child then report upstream.
+    # print Dumper $handle;
+    if ( $handle->{child_replys} != $handle->{children} ) {
+        my $missing = $handle->{children} - $handle->{child_replys};
+        return;
+    }
+
+    # Send the data upstream.
+    my $reply = $handle->{all_replys};
+
+    reply_to_parent( $handle, $reply );
+    if ($handle->{shutdown} ) {
+        inner_cleanup_and_exit($handle);
+    }
+
+    # Reset local data.
+    $handle->{all_replys}      = undef;
+    $handle->{child_replys}    = 0;
+    $handle->{target_responce} = undef;
+}
+
+# Receive a command (perl reference) from our parent.
+#
+# When we receive a command:
+# 1) Send it on to our children.
+# 2) Execute it.
+# 3) If we have no children send reply.
+sub command_from_parent {
+    my ( $netdata, $cmd ) = @_;
+
+    if ( $cmd->{mode} eq "signon" ) {
+        $netdata->{signon_cmd} = my_encode($cmd);
+	
+	if ( not exists $cmd->{connection_tree}{$confInner{hostname}}{children} )  
{
+	    $netdata->{children} = 0;
+	    return;
+	}
+	
+	my @children = @{ $cmd->{connection_tree}{$confInner{hostname}}{children}  
};
+	$netdata->{children} = $#children + 1;
+	
+	# Only one child is tested so far.
+	foreach my $chostname (@children) {
+	    my $socket = connect_to_child(
+		$chostname,
+		$cmd->{remote}{$chostname}{port},
+		$cmd->{remote}{$chostname}{key}
+		);
+	    my %cdata;
+	    $cdata{socket}   = $socket;
+	    $cdata{hostname} = $chostname;
+	    $cdata{line_cb}  = \&reply_from_child;
+	    $cdata{state}    = "init";
+	    $netdata->{sel}->add($socket);
+	    $netdata->{connections}{$socket} = \%cdata;
+	    push @{ $netdata->{child_sockets} }, $socket;
+	}
+	return;
+    }
+
+    # Forward on to our children before doing any more processing.
+    if ( $netdata->{children} ) {
+	my $req = my_encode($cmd) . "\n";
+        foreach my $child ( @{ $netdata->{child_sockets} } ) {
+	    $child->printf($req);
+            $child->flush();
+        }
+    }
+
+    if ( $cmd->{mode} eq "exit" ) {
+        $netdata->{shutdown} = 1;
+        return;
      }

      # Setup the environment.
@@ -6119,12 +6299,39 @@
      $rmgr{ $confInner{rmgr} }{find_pids}( $cmd->{jobid} );

      # Now do the work.
-    $res->{from_vpid} =
-      $allfns{ $cmd->{mode} }{handler_all}( $confInner{"all-pids"} );
+    my $z = $allfns{ $cmd->{mode} }{handler_all}( $confInner{"all-pids"} );

-    return $res;
+    $netdata->{target_responce} = $z;
+    $netdata->{all_replys}{target_responce} = $z;
+
+    return;
+}
+
+# Time for the inner process to exit, cleanup all sockets and
+# quit.
+sub inner_cleanup_and_exit {
+    my $netdata = shift;
+    foreach my $h ( $netdata->{sel}->handles() ) {
+        $h->flush();
+        $h->close();
+    }
+    exit(0);
  }

+# Send a reply to our parent, put a status of "ok" on for this
+# host.
+sub reply_to_parent {
+    my ( $netdata, $cmd ) = @_;
+
+    $cmd->{host_responce}{ok}{ hostname() } = 1;
+
+    my $reply = my_encode($cmd);
+    $netdata->{parent}->{socket}->printf("$reply\n");
+}
+
+# Process a single line of input onto a socket we are
+# listening on.  This is probably our parent (who may
+# be the outer process) but it needs to be authenticated.
  sub command_from_outer {
      my ( $netdata, $cdata, $line ) = @_;

@@ -6146,36 +6353,40 @@
              $cdata->{dead} = 1;
              print("debug\n");
          } else {
-
-            #printf("Closing connection from $cdata->{desc} (Bad  
signon)\n");
+            printf("Closing connection from $cdata->{desc} (Bad  
signon)\n");
              $netdata->{sel}->remove($s);
              $s->close();
              $cdata->{dead} = 1;
          }
          return;
      }
-
-    $cdata->{last_cmd} = my_decode($line);
-    if ( $netdata->{parent} eq $cdata ) {
-        my $res   = command_from_parent( my_decode($line) );
-        my $reply = my_encode($res);
-        $cdata->{socket}->printf("$reply\n");
+
+    command_from_parent( $netdata, my_decode($line) );
+
+    if ( $netdata->{children} == 0 ) {
+	my $res;
+	$res->{target_responce} = $netdata->{target_responce};
+	reply_to_parent( $netdata, $res );
+	$netdata->{target_responce} = undef;
+	
+	if ($netdata->{shutdown} ) {
+	    inner_cleanup_and_exit($netdata);
+	}
      }
  }

-sub connect_to_outer {
+# Loop forever in the inner process.
+sub inner_loop_for_comms {

      my $server = IO::Socket::INET->new(
-
-        Reuse     => 1,
-        Proto     => 'tcp',
-        LocalPort => 37132,
-        Listen    => 2,
-    ) or die("not the best start");
+        Reuse => 1,
+        Proto => 'tcp',
+        Listen => 2,
+    ) or die("Failed to create local port");

      my $lport       = $server->sockport();
      my $hostname    = hostname();
-    my $key         = "boris";
+    my $key         = rand();
      my $signon_text = "connect $hostname $lport $key\n";

      # For now just print the signon code to stdout and let the outer pick  
it up.
@@ -6186,6 +6397,7 @@
      $netdata->{sel}->add($server);
      $netdata->{server} = $server;
      $netdata->{key}    = $key;
+    $netdata->{shutdown} = 0;

      my $sel = $netdata->{sel};

@@ -6199,7 +6411,7 @@
                  my $ip = inet_ntoa($addr);
                  my $hostname = gethostbyaddr( $addr, AF_INET );

-                # printf "New connection from $hostname ($ip) $port\n";
+                #printf "New connection from $hostname ($ip) $port\n";
                  my %sinfo;
                  $sinfo{hostname}              = $hostname;
                  $sinfo{trusted}               = 0;
@@ -6216,18 +6428,18 @@

              my $sinfo = $netdata->{connections}{$s};
              my $d;
-            sysread( $s, $d, 1024 );
+            my $count = sysread( $s, $d, 1024 );

              # Dead connection.
-            if ( not defined $d ) {
-                printf("null read from $sinfo->{desc}\n");
+            if ( not defined $d or $count eq 0 ) {
+
+                # printf("null read from $sinfo->{desc}\n");
                  if ( eof($s) ) {
                      $sel->remove($s);
                      $s->close();
                      $sinfo->{trusted} = 0;
                      $sinfo->{dead}    = 1;
-                    my $count = $sel->count();
-                    printf("EOF from $sinfo->{desc} $count sockets  
left\n");
+                    my $scount = $sel->count();
                  }
                  next;
              }
@@ -6251,6 +6463,7 @@
      $confInner{"edb"}            = find_edb();
      $confInner{"minfo"}          = find_minfo();
      $confInner{"open-ps"}        = "";
+    $confInner{"hostname"}       = hostname();

      # The different options this script can perform.  One (and only one) of
      # these must be set.
@@ -6295,7 +6508,7 @@
      # If this works then nothing below here is needed as all
      # requests can be sent over the socket.
      if ($full_duplex) {
-        connect_to_outer();
+        inner_loop_for_comms();
      }

      my $mode;
@@ -6331,7 +6544,6 @@

      # Load some non user-modifiable data into conf now
      $confInner{"lineformatted"} = $line_formatted;
-    $confInner{"hostname"}      = hostname();

      $confInner{"myld"} = $ENV{"LD_LIBRARY_PATH"};





More information about the padb-devel mailing list