[padb-devel] [padb commit] r87 - Clean up the comms loop on the outer, give every fd a callback to

codesite-noreply at google.com codesite-noreply at google.com
Wed Jul 1 16:56:07 BST 2009


Author: apittman
Date: Wed Jul  1 06:34:20 2009
New Revision: 87

Modified:
    branches/full-duplex/src/padb

Log:
Clean up the comms loop on the outer, give every fd a callback to
handle output/eof or new connections.  This cleans up the code and
also means that sysread() is called instead of getline() on fd's
from the inner process which prevents output going astray.


Modified: branches/full-duplex/src/padb
==============================================================================
--- branches/full-duplex/src/padb	(original)
+++ branches/full-duplex/src/padb	Wed Jul  1 06:34:20 2009
@@ -3111,17 +3111,11 @@
      # Do this to allow telnet sessions to work.
      $str =~ s/\r//g;

-    # printf("Testing $str\n");
-
-    if ( $str =~ /^([^\n]+)\n(.*)$/ ) {
-
-        #  printf("Calling callback for \"$1\"\n");
+    # Allow multi-line output here, making sure we process each line.
+    while ( $str =~ /^([^\n]+)\n(.*)$/ ) {
          $sd->{line_cb}( $handle, $sd, $1 );
          $sd->{str} = $2;
-    } else {
-
-        # Likely just truncated input, wait for more to arrive.
-        # printf("No match ()\n");
+        $str = $2;
      }

      return;
@@ -3161,15 +3155,15 @@
          $comm_data->{remote}{$td}{port},
          $comm_data->{remote}{$td}{key}
      );
-    $cdata->{active}  = 1;
-    $cdata->{str}     = "";
-    $cdata->{line_cb} = \&command_from_inner;
-    $cdata->{eof_cb}  = \&eof_from_inner;
+    $cdata->{active}   = 1;
+    $cdata->{str}      = "";
+    $cdata->{fd_desc}  = "child socket";
+    $cdata->{line_cb}  = \&command_from_inner;
+    $cdata->{eof_cb}   = \&eof_from_fd;
+    $cdata->{event_cb} = \&handle_event_from_socket;

      $comm_data->{sockets}{ $cdata->{socket} } = $cdata;
      $comm_data->{sel}->add( $cdata->{socket} );
-
-    #print Dumper $comm_data;
  }

  sub issue_command_to_inner {
@@ -3374,14 +3368,65 @@
      }
  }

-sub eof_from_inner {
+sub inner_stdout_cb {
+    my ( $comm_data, $cdata, $line ) = @_;
+    my @words = split( " ", $line );
+    if ( $#words == 3 and $words[0] eq "connect" ) {
+
+        handle_signon( $comm_data, $words[1], $words[2], $words[3] );
+        return;
+    } elsif ( $words[0] eq "debug" ) {
+        my $count = $comm_data->{sel}->count();
+        print("There are $count sockets\n");
+        return;
+    }
+    print("inner: $line");
+}
+
+sub inner_sterr_cb {
+    my ( $comm_data, $cdata, $line ) = @_;
+    print("einner: $line");
+}
+
+sub eof_from_fd {
      my ( $comm_data, $cdata ) = @_;

      if ( $comm_data->{state} ne "shutdown" ) {
-        printf("Unexpected EOF from child socket ($comm_data->{state})\n");
+        printf("Unexpected EOF from $cdata->{fd_desc}  
($comm_data->{state})\n");
      }
  }

+sub handle_event_from_socket {
+    my ( $comm_data, $h ) = @_;
+    my $cdata = $comm_data->{sockets}{$h};
+
+    my $data;
+    my $nb = sysread( $h, $data, 1024 );
+
+    if ( $nb == 0 ) {
+        if ( defined( $cdata->{eof_cb} ) ) {
+            $cdata->{eof_cb}( $comm_data, $cdata );
+        }
+        $comm_data->{sel}->remove($h);
+        $h->close();
+    } else {
+        $cdata->{str} .= $data;
+        extract_line( $comm_data, $cdata );
+    }
+}
+
+sub handle_event_from_port {
+    my ( $comm_data, $h ) = @_;
+
+    my $new = $h->accept();
+    $comm_data->{sel}->add($new);
+    my %cdata;
+    $cdata{str}                 = "";
+    $cdata{line_cb}             = \&hello_from_inner;
+    $cdata{event_cb}            = \&handle_event_from_socket;
+    $comm_data->{sockets}{$new} = \%cdata;
+}
+
  sub go_parallel {
      my $jobid = shift;
      my $cmd   = shift;
@@ -3403,6 +3448,10 @@
          my $hostname = hostname();
          $cmd .= " --outer=$hostname:$port";
          $sel->add($sl);
+
+        my %cdata;
+        $cdata{event_cb} = \&handle_event_from_port;
+        $comm_data->{sockets}{$sl} = \%cdata;
      }

      my $errors = 0;
@@ -3435,77 +3484,33 @@
      $comm_data->{sel} = $sel;
      my $start = time();

+    my %op;
+    $op{str}                              = "";
+    $op{line_cb}                          = \&inner_stdout_cb;
+    $op{eof_cb}                           = \&eof_from_fd;
+    $op{fd_desc}                          = "Inner stdout";
+    $op{event_cb}                         = \&handle_event_from_socket;
+    $comm_data->{sockets}{ $pcmd->{out} } = \%op;
+
+    my %ep;
+    $ep{str}                              = "";
+    $ep{line_cb}                          = \&inner_stderr_cb;
+    $ep{eof_cb}                           = \&eof_from_fd;
+    $ep{fd_desc}                          = "Inner stderr";
+    $ep{event_cb}                         = \&handle_event_from_socket;
+    $comm_data->{sockets}{ $pcmd->{err} } = \%ep;
+
      while ( $sel->count() > 1 ) {
          while ( my @live = $sel->can_read(5) ) {
              foreach my $h (@live) {
-                if ( $h eq $pcmd->{out} ) {
-                    my $line = $h->getline();
-                    if ( not defined $line ) {
-                        if ( not $comm_data->{state} eq "shutdown" ) {
-                            printf("Warning, EOF from ofd\n");
-                        }
-                        $sel->remove($h);
-                        $h->close();
-                        next;
-                    }
-                    my @words = split( " ", $line );
-                    if ( $#words == 3 and $words[0] eq "connect" ) {
-
-                        handle_signon( $comm_data, $words[1], $words[2],
-                            $words[3] );
-                        next;
-                    } elsif ( $words[0] eq "debug" ) {
-                        my $count = $sel->count();
-                        print("There are $count sockets\n");
-                        next;
-                    }
-                    print("inner: $line");
-                } elsif ( $h eq $pcmd->{err} ) {
-                    my $line = $h->getline();
-
-                    if ( not defined $line ) {
-                        if ( not $comm_data->{state} eq "shutdown" ) {
-                            printf("Warning, EOF from efd\n");
-                        }
-                        $sel->remove($h);
-                        $h->close();
-                        next;
-                    }
-                    printf("einner:$line");
-                } elsif ( defined $comm_data->{sockets}{$h} ) {
+                if ( defined $comm_data->{sockets}{$h} ) {
                      my $cdata = $comm_data->{sockets}{$h};
-
-                    my $data;
-                    my $nb = sysread( $h, $data, 1024 );
-
-                    if ( $nb == 0 ) {
-                        if ( defined( $cdata->{eof_cb} ) ) {
-                            $cdata->{eof_cb}( $comm_data, $cdata );
-                        }
-                        $sel->remove($h);
-                        $h->close();
-                    } else {
-                        $cdata->{str} .= $data;
-                        extract_line( $comm_data, $cdata );
-                    }
-                } elsif ( exists( $comm_data->{listen} )
-                    and $h eq $comm_data->{listen} )
-                {
-
-                    # It's a new socket on our listening port.
-                    my $new = $h->accept();
-                    $sel->add($new);
-                    my %cdata;
-                    $cdata{str}     = "";
-                    $cdata{line_cb} = \&hello_from_inner;
-
-                    $comm_data->{sockets}{$new} = \%cdata;
-
+                    $cdata->{event_cb}( $comm_data, $h );
                  } else {
                      printf("Responce from unknown fd $h\n");
                      exit(1);
                  }
-            }    #for...
+            }
          }
          my $t2    = time() - $start;
          my $count = $sel->count();




More information about the padb-devel mailing list