[padb] r388 committed - Extend padb to work with (a subset of) LSF jobs....

padb at googlecode.com padb at googlecode.com
Mon Feb 15 18:09:14 GMT 2010


Revision: 388
Author: apittman
Date: Mon Feb 15 10:08:15 2010
Log: Extend padb to work with (a subset of) LSF jobs.

Note that this commit does not allow padb to attach to all LSF jobs but  
rather
padb is now able to detect two types of wrapper script commonly used with  
LSF and
extract the information it needs from the wrapper script.  As such this is  
a step
forward but not a general case solution for LSF support and will not meet
everybody's need.

This commit is largely the work of Thipadin @ Bull as sent to the developers
list on 9/2/10 with the following changes:

The resource manager has been renamed to the more simple 'lsf'.
lsf_mpi has been replaced with lsf_mode with valid values being mpich2
  or openmpi
slurp_remote_cmd() has been added instead of calling slurm_cmd() with
  "ssh host ..."

http://code.google.com/p/padb/source/detail?r=388

Modified:
  /trunk/src/padb

=======================================
--- /trunk/src/padb	Tue Feb  2 08:17:46 2010
+++ /trunk/src/padb	Mon Feb 15 10:08:15 2010
@@ -370,7 +370,8 @@

  # Config options the inner knows about, only forward options if they are in
  # this list.
-my @inner_conf = qw(edb edbopt rmgr scripts slurm_job_step pbs_server);
+my @inner_conf =
+  qw(edb edbopt rmgr scripts slurm_job_step pbs_server lsf_mode  
lsfmpi_server lsfmpi_mpirpid lsfmpi_port);

  # More config options the inner knows about, these are forwarded on the
  # command line rather than over the sockets.
@@ -507,6 +508,13 @@
      find_pids       => \&pbs_find_pids,
  };

+$rmgr{lsf} = {
+    is_installed    => \&lsfmpi_is_installed,
+    get_active_jobs => \&lsfmpi_get_jobs,
+    setup_job       => \&lsfmpi_setup_pcmd,
+    find_pids       => \&lsfmpi_find_pids,
+};
+
   
###############################################################################
  #
  # Config options
@@ -905,6 +913,11 @@
      close $CFD;
      return @out;
  }
+
+sub slurp_remote_cmd {
+    my ( $host, $cmd ) = @_;
+    return slurm_cmd("ssh $host $cmd");
+}

  sub slurp_dir {
      my ($dir) = @_;
@@ -2839,6 +2852,344 @@

      return %pcmd;
  }
+
+###############################################################################
+#
+# lsf-mpich2 wrapper  support
+# the jobs launched by mpich2_wrapper thanks to mpirun.lsf and #BSUB -a  
mpich2
+# The job submission file looks like:
+#
+##! /bin/bash
+##BSUB -J "JOB_NAME"
+##BSUB -o JOB_NAME.%J
+##BSUB -n 4
+##BSUB -e JOB_NAME_err.%J
+##BSUB -a mpich2
+#mpirun.lsf ./mpi_prog
+#
+# lsf-ompi-wrapper support thanks to #BSUB -a openmpi and mpirun.lsf.
+# The job submission file looks like:
+##! /bin/bash
+##BSUB -J "PP_SNDRCV"
+##BSUB -o PP_SNDRCV.%J
+##BSUB -n 4
+##BSUB -e PP_SNDRCVerr.%J
+##BSUB -a openmpi
+#mpirun.lsf ./pp_sndrcv_spbl
+#
+###############################################################################
+
+my %lsfmpi_tabjobs;
+
+sub lsfmpi_is_installed {
+    return ( find_exe('mpirun.lsf')
+          and ( find_exe('mpich2_wrapper') or find_exe('openmpi_wrapper')  
) );
+}
+
+sub lsf_get_line_ppid {
+    my ( $ppid, $rank_pid, $rank_ppid, @handle ) = @_;
+    my $ret_line;
+    my $pid;
+    foreach my $line (@handle) {
+        $line =~ s/^ +//;    # take off leading space
+        my @champs = split( /\s+/, $line );
+        if ( $champs[$rank_ppid] == $ppid ) {
+            $pid      = $champs[$rank_pid];
+            $ret_line = $line;
+            last;
+        }
+    }
+    return ( $ret_line, $pid );
+}
+
+sub lsfmpi_get_mpiport {
+    my ( $host, $portpath ) = @_;
+    my $portfound = 0;
+    my $port;
+    my @handle = slurp_remote_cmd( $host, "cat $portpath" );
+    foreach my $line (@handle) {
+        if ( $line =~ /TaskStarter/ ) {
+            my @champs = split( " ", $line );
+            foreach my $word (@champs) {
+                if ( $word eq "-p" ) {  # don't use =~ because may take  
--prefix
+                    $portfound = 1;
+                    next;
+                }
+                if ( $portfound == 1 ) {
+                    $port = $word;
+                    last;
+                }
+            }
+            last;
+        }
+    }
+    return $port;
+}
+
+sub lsfmpi_get_mpiproc {
+    my ( $ppid, $host, $job ) = @_;
+    my $rank_pid  = 0;
+    my $rank_ppid = 1;
+    my $proc;
+    my $path_file;
+    my $count_line = 0;
+    my $mode;
+
+    #get ps from the leading host(the one that start mpirun.lsf)
+    my @handle =
+      slurp_remote_cmd( $host, "ps -o pid=,ppid=,cmd= -u $target_user" );
+
+    $count_line = @handle;
+    for ( my $i = 0 ; $i < $count_line ; $i++ ) {    # to avoid loop
+        my ( $line, $pid );
+        next if ( !defined $ppid );
+        ( $line, $pid ) =
+          lsf_get_line_ppid( $ppid, $rank_pid, $rank_ppid, @handle );
+        next if ( !defined $line );
+        if ( $line =~ /mpi/ && $line =~ /-configfile/ ) {
+            my @champs = split( " ", $line );
+            foreach my $word (@champs) {
+                if ( $word eq "-configfile" ) {
+                    $mode = 'mpich2';
+                    next;
+                }
+                if ( defined $mode ) {
+                    $path_file = $word;    # get path of -configfile
+                    $proc      = $pid;
+                    last;
+                }
+            }
+            if ( $path_file =~ /$job\.newconf$/ )
+            {    # format is .mpich2_wrapper.jobid.newconf
+                last;
+            }
+            $path_file = undef;
+        } elsif ( $line =~ /mpi/ && $line =~ /-app/ ) {
+            my @champs = split( " ", $line );
+            foreach my $word (@champs) {
+                if ( $word eq "--app" ) {
+                    $mode = 'openmpi';
+                    next;
+                }
+                if ( defined $mode ) {
+                    $path_file = $word;    # get path file of --app param
+                    $proc      = $pid;
+                    last;
+                }
+            }
+            if ( $path_file =~ /$job$/ ) {    # format  
is .openmpi_appfile_jobid
+                last;
+            }
+            $path_file = undef;
+        } else {
+            $ppid = $pid;
+        }
+    }
+    return ( $proc, $path_file, $mode );
+}
+
+sub lsf_get_jobpgid {
+    my ($jobid) = @_;
+    my $resfound = 0;
+    my @proc;
+    my $cmd    = "bjobs -l $jobid ";
+    my @handle = slurp_cmd($cmd);
+    foreach my $line (@handle) {
+        if ( $line =~ /Resource usage collected./i ) {
+            $resfound = 1;
+            next;
+        }
+        if ( $resfound == 1 ) {
+            $line =~ s/^ +//;    # take off space at start
+            if ( $line =~ /^PGID:/i ) {
+                my @champs = split( " ", $line );
+                my $pgid = $champs[1];
+                chop($pgid) if ( $pgid =~ /;$/ );
+                push( @proc, $pgid );
+                my $firstpid = 0;
+                foreach my $word (@champs) {
+                    if ( $word =~ /^PIDs:/ ) {
+                        $firstpid = 1;
+                        next;
+                    }
+                    if ( $firstpid == 1 ) {
+                        push( @proc, $word );
+                    }
+                }
+                last;
+            }
+        }
+    }
+    return (@proc);
+}
+
+sub lsfmpi_get_hostport {
+    my $job = shift;
+    my $d   = lsfmpi_get_data();
+    my $host;
+    my $port;
+    my $mpirunpid;
+    my $path_port;
+    my $lsf_mode;
+
+    my @hosts = @{ $d->{$job}{hosts} } if ( defined $d->{$job}{hosts} );
+
+    $host = $hosts[0] if ( defined $hosts[0] );
+
+    #get the pgid of the job(first job pid)
+    my @pgid = lsf_get_jobpgid($job);
+    my $ppid = $pgid[0];
+
+    #get the port of the leading proc (mpirun proc port)
+    if ( defined $ppid and defined $host ) {
+        ( $mpirunpid, $path_port, $lsf_mode ) =
+          lsfmpi_get_mpiproc( $ppid, $host, $job );
+        $d->{$job}{lsf_mode} = $lsf_mode;    # can be 'mpich2' or 'openmpi'
+        $port = lsfmpi_get_mpiport( $host, $path_port )
+          if ( defined($path_port) );
+    }
+    return ( $host, $mpirunpid, $port );
+}
+
+sub lsfmpi_get_lbjobs {
+    my $jobidfound  = 0;
+    my $found_title = 0;
+    my $jobid;
+    my $rank_jobid   = 0;
+    my $rank_user    = 1;
+    my $rank_stat    = 2;
+    my $rank_ehost   = 5;
+    my $rank_jobname = 6;
+    my $cmd          = "bjobs -r -u $target_user ";
+    my @output       = slurp_cmd($cmd);
+    foreach my $line (@output) {
+        $line =~ s/^ +//;    # suppress blank in front of line
+        my @champs = split( /\s+/, $line );
+        next if ( $champs[$rank_jobid] eq 'JOBID' );
+        next if ( $#champs == -1 );    # empty line
+        if ( $#champs != 0 ) {         # line with many fields is first  
line
+            $jobid = undef;
+            $jobid = $champs[$rank_jobid];
+            my @ehosts = split( '\*', $champs[$rank_ehost] );
+            $lsfmpi_tabjobs{$jobid}{nproc} = $ehosts[0];
+            my $exec_host = $ehosts[1];
+            push( @{ $lsfmpi_tabjobs{$jobid}{hosts} }, $exec_host )
+              if ( defined($exec_host) );
+        } elsif ( defined $jobid )
+        {    # line with one field, should be continued line(exec_host)
+            my @ehosts = split( '\*', $champs[0] );
+            my $exec_host = $ehosts[1];
+            chomp($exec_host);
+            $lsfmpi_tabjobs{$jobid}{nproc} += $ehosts[0];    # nprocess
+            push( @{ $lsfmpi_tabjobs{$jobid}{hosts} }, $exec_host );
+        }
+    }
+}
+
+sub lsfmpi_get_data {
+    return \%lsfmpi_tabjobs if ( keys %lsfmpi_tabjobs != 0 );
+    lsfmpi_get_lbjobs();    # get job list by bjobs
+    return \%lsfmpi_tabjobs;
+}
+
+sub lsfmpi_get_jobs {
+    my $user = shift;
+    my @ret_jobs;
+    my $d    = lsfmpi_get_data();
+    my @jobs = keys %{$d};
+
+    # filter other jobs that aren't launched by mpich2_wrapper
+    # (for exemple by mpd; mpiexec; in the submitted job)
+    # to do this we have criteria below
+    # jobs launched by mpich2_wrapper will have -configfile parameter
+    # jobs launched by ompi_wrapper will have --app parameter
+    foreach my $job (@jobs) {
+        my ( $server, $mpirpid, $port ) = lsfmpi_get_hostport($job);
+        if ( defined($mpirpid) and defined($port) and defined($server) ) {
+            $d->{$job}{server}  = $server;
+            $d->{$job}{mpirpid} = $mpirpid;
+            $d->{$job}{port}    = $port;
+            push( @ret_jobs, $job );
+        }
+    }
+    return @ret_jobs;
+}
+
+sub lsfmpi_setup_pcmd {
+    my $job = shift;
+    my $cmd;
+    my $index = 0;
+    my %pcmd;
+    my $d = lsfmpi_get_data();
+
+    my ( $server, $mpirpid, $port );
+
+    $server  = $d->{$job}{server};
+    $mpirpid = $d->{$job}{mpirpid};
+    $port    = $d->{$job}{port};
+    config_set_internal( 'lsf_mode',       $d->{$job}{lsf_mode} );
+    config_set_internal( 'lsfmpi_server',  $server );
+    config_set_internal( 'lsfmpi_mpirpid', $mpirpid );
+    config_set_internal( 'lsfmpi_port',    $port );
+    my @hosts = @{ $d->{$job}{hosts} };
+    $pcmd{nprocesses} = $d->{$job}{nproc};
+    $pcmd{nhosts}     = @hosts;
+    @{ $pcmd{host_list} } = @hosts;
+
+    return %pcmd;
+}
+
+sub get_pids_ppid {
+
+    #   get all pids from ppid be careful about defunct
+    my ( $ppid, $rank_pid, $rank_ppid, @handle ) = @_;
+    my $pid;
+    my @proc;
+    foreach my $line (@handle) {
+        $line =~ s/^ +//;    # take off leading space
+        my @champs = split( /\s+/, $line );
+        next if ( $champs[$rank_pid] eq 'PID' );
+        if ( $champs[$rank_ppid] == $ppid ) {
+            $pid = $champs[$rank_pid];
+            if ( $line =~ /defunct/i ) {
+                next;
+            }
+            push( @proc, $pid );
+        }
+    }
+    return (@proc);
+}
+
+sub get_pids_fromport {
+
+    #   get all pids from port -p host:port_num
+    my ( $port, $rank_pid, $rank_ppid, $rank_cmd, @handle ) = @_;
+    my $portfound = 0;
+    my @proc;
+    foreach my $line (@handle) {
+        $line =~ s/^ +//;    # take off space at start
+        my @champs = split( /\s+/, $line );
+        my $cmd    = $champs[$rank_cmd];
+        my $base   = basename($cmd);
+        if ( $base eq "TaskStarter" ) {
+            if ( $line =~ /$port/ ) {
+                $portfound = 0;
+                foreach my $word (@champs) {
+                    if ( $word eq "-p" )
+                    {        # don't use =~ because may take --prefix
+                        $portfound = 1;
+                        next;
+                    }
+                    if ( $portfound == 1 ) {
+                        push( @proc, $champs[$rank_pid] ) if ( $word eq  
$port );
+                        last;
+                    }
+                }
+            }
+        }
+    }
+    return @proc;
+}

  # open support.
  #
@@ -8752,6 +9103,98 @@
      }
      return;
  }
+
+#
+# LSF-openmpi support and LSF-mpich2
+#
+sub lsfmpi_get_proc {
+    my $job = shift;
+    my @proc;
+    my $rank_pid  = 1;
+    my $rank_ppid = 2;
+    my $rank_cmd  = 3;
+    my $port;
+    my ( $server, $mpirun_pid );
+    $port       = $inner_conf{lsfmpi_port};
+    $server     = $inner_conf{lsfmpi_server};
+    $mpirun_pid = $inner_conf{lsfmpi_mpirpid};
+    my $cmd      = "ps -o uid,pid,ppid,cmd -u $target_user";
+    my @handle   = slurp_cmd($cmd);
+    my $hostname = hostname();
+
+    if ( $hostname eq $server ) {
+
+        #this is the server
+        #get all mpirun children, it should be TaskStarter pids
+        #get all TaskStarter children it should be the appli pids
+        my @ppid_proc;
+        @ppid_proc =
+          get_pids_ppid( $mpirun_pid, $rank_pid, $rank_ppid, @handle );
+        foreach my $pid (@ppid_proc) {
+            my @w_proc = get_pids_ppid( $pid, $rank_pid, $rank_ppid,  
@handle );
+            push( @proc, @w_proc );
+        }
+    }
+    if ( $#proc == -1 ) {    # nothing in @proc so try from port
+            # all cases including case host=server which failed
+            # get all TaskStarter that matched port num
+            # get all TaskStarter children it should be the appli pids
+        my @ppid_proc;
+        @ppid_proc =
+          get_pids_fromport( $port, $rank_pid, $rank_ppid, $rank_cmd,  
@handle );
+        foreach my $pid (@ppid_proc) {
+            my @w_proc = get_pids_ppid( $pid, $rank_pid, $rank_ppid,  
@handle );
+            push( @proc, @w_proc );
+        }
+    }
+    return @proc;
+}
+
+sub lsfmpi_find_pids {
+    my $job = shift;
+    my %vps;
+
+    # Iterate over all processes for this user
+    if ( $inner_conf{lsf_mode} eq 'mpich2' ) {
+        foreach my $pid ( lsfmpi_get_proc($job) ) {
+
+            my $vp;
+            my %env = get_remote_env($pid);
+            if ( !defined( $env{LSB_JOBID} ) || !defined( $env{PMI_RANK} )  
) {
+                %env = get_remote_env_bygdb($pid);
+            }
+
+            if ( $env{LSB_JOBID} eq $job ) {
+                $vp = $env{PMI_RANK};
+            }
+            if ( defined $vp ) {
+                $vps{$vp} = $pid;
+            }
+        }
+    } else {
+
+        # lsf_mode eq 'openmpi'
+        foreach my $pid ( lsfmpi_get_proc($job) ) {
+
+            my $vp;
+            my %env = get_remote_env($pid);
+            if (   !defined( $env{OMPI_COMM_WORLD_SIZE} )
+                || !defined( $env{OMPI_COMM_WORLD_RANK} ) )
+            {
+                %env = get_remote_env_bygdb($pid);
+            }
+
+            $vp = $env{OMPI_COMM_WORLD_RANK};
+            if ( defined $vp ) {
+                $vps{$vp} = $pid;
+            }
+        }
+    }
+    foreach my $vp ( keys %vps ) {
+        my $pid = $vps{$vp};
+        register_target_process( $vp, $pid );
+    }
+}

  sub rms_find_pids {
      my $jobid = shift;




More information about the padb-devel mailing list