Index: padb =================================================================== --- padb (revision 387) +++ padb (working copy) @@ -370,7 +370,7 @@ # Config options the inner knows about, only forward options if they are in # this list. -my @inner_conf = qw(edb edbopt rmgr scripts slurm_job_step pbs_server); +my @inner_conf = qw(edb edbopt rmgr scripts slurm_job_step pbs_server lsf_mpi lsfmpi_server lsfmpi_mpirpid lsfmpi_port); # More config options the inner knows about, these are forwarded on the # command line rather than over the sockets. @@ -507,6 +507,13 @@ find_pids => \&pbs_find_pids, }; +$rmgr{"lsf-mpiwr"} = { + 'is_installed' => \&lsfmpi_is_installed, + 'get_active_jobs' => \&lsfmpi_get_jobs, + 'setup_job' => \&lsfmpi_setup_pcmd, + 'find_pids' => \&lsfmpi_find_pids, +}; + ############################################################################### # # Config options @@ -2840,6 +2847,323 @@ return %pcmd; } +############################################################################### +# +# lsf-mpich2 wrapper support +# the jobs launched by mpich2_wrapper thanks to mpirun.lsf and #BSUB -a mpich2 +# The job submission file looks like: +# +##! /bin/bash +##BSUB -J "JOB_NAME" +##BSUB -o JOB_NAME.%J +##BSUB -n 4 +##BSUB -e JOB_NAME_err.%J +##BSUB -a mpich2 +#mpirun.lsf ./mpi_prog +# +# lsf-ompi-wrapper support thanks to #BSUB -a openmpi and mpirun.lsf. +# The job submission file looks like: +##! /bin/bash +##BSUB -J "PP_SNDRCV" +##BSUB -o PP_SNDRCV.%J +##BSUB -n 4 +##BSUB -e PP_SNDRCVerr.%J +##BSUB -a openmpi +#mpirun.lsf ./pp_sndrcv_spbl +# +############################################################################### + +my %lsfmpi_tabjobs; + +sub lsfmpi_is_installed { + return (find_exe('mpirun.lsf') and (find_exe('mpich2_wrapper') or find_exe('openmpi_wrapper'))); +} + +sub get_line_ppid { + my ( $ppid, $rank_pid, $rank_ppid , @handle) = @_; + my $ret_line; + my $pid; + foreach my $line (@handle) { + $line =~ s/^ +//; # take off leading space + my @champs = split(/\s+/,$line); + next if ($champs[$rank_pid] eq 'PID'); + if ($champs[$rank_ppid] == $ppid) { + $pid = $champs[$rank_pid]; + $ret_line = $line; + last; + } + } + return ($ret_line,$pid); +} +sub lsfmpi_get_mpiport{ + my ($host,$portpath) = @_; + my $portfound = 0; + my $port; + my $cmd = "ssh $host cat $portpath "; + my @handle = slurp_cmd($cmd); + foreach my $line (@handle) { + if ( $line =~ /TaskStarter/ ) { + my @champs = split ( " ", $line); + foreach my $word (@champs) { + if ($word eq "-p") { # don't use =~ because may take --prefix + $portfound = 1; + next; + } + if ($portfound == 1) { + $port = $word; + last; + } + } + last; + } + } + return $port; +} +sub lsfmpi_get_mpiproc { + my ($ppid,$host,$job) = @_; + my $rank_pid = 0; + my $rank_ppid = 1; + my $proc; + my $path_file; + my $count_line = 0; + my $found_app = 0; + #get ps from the leading host(the one that start mpirun.lsf) + my $cmd = "ssh $host ps -o pid,ppid,cmd -u $target_user "; + my @handle=slurp_cmd($cmd); + $count_line = @handle; + for(my $i=0;$i < $count_line; $i++) { # to avoid loop + my ($line,$pid); + next if (!defined $ppid); + ($line,$pid)=get_line_ppid($ppid,$rank_pid,$rank_ppid,@handle); + next if (!defined $line); + if ($line =~ /mpi/ && $line =~/-configfile/ ) { + my @champs = split (" ", $line); + foreach my $word (@champs) { + if ($word eq "-configfile") { + $found_app = 1; + next; + } + if ($found_app != 0) { + $path_file = $word; # get path of -configfile + $proc = $pid; + last; + } + } + if ($path_file =~ /$job\.newconf$/ ) { # format is .mpich2_wrapper.jobid.newconf + last; + } + $path_file = undef; + } elsif ($line =~ /mpi/ && $line =~/-app/ ) { + my @champs = split (" ", $line); + foreach my $word (@champs) { + if ($word eq "--app") { + $found_app = 2; + next; + } + if ($found_app != 0) { + $path_file = $word; # get path file of --app param + $proc = $pid; + last; + } + } + if ($path_file =~ /$job$/ ) { # format is .openmpi_appfile_jobid + last; + } + $path_file = undef; + } else { + $ppid = $pid; + } + } + return ($proc,$path_file,$found_app); +} +sub lsf_get_jobpgid { + my ($jobid) = @_; + my $resfound = 0; + my @proc; + my $cmd = "bjobs -l $jobid "; + my @handle = slurp_cmd($cmd); + foreach my $line (@handle) { + if ( $line =~ /Resource usage collected./i ) { + $resfound = 1; + next; + } + if ($resfound == 1) { + $line =~ s/^ +//; # take off space at start + if ($line =~ /^PGID:/i ) { + my @champs = split ( " ", $line); + my $pgid = $champs[1]; + chop ($pgid) if ($pgid =~ /;$/); + push (@proc,$pgid); + my $firstpid = 0; + foreach my $word (@champs) { + if ($word =~ /^PIDs:/) { + $firstpid = 1; + next; + } + if ($firstpid == 1) { + push (@proc,$word); + } + } + last; + } + } + } + return (@proc); +} +sub lsfmpi_get_hostport { + my $job = shift; + my $d = lsfmpi_get_data(); + my $host; + my $port; + my $mpirunpid; + my $path_port; + my $lsf_mpi; + + my @hosts = @{ $d->{$job}{hosts} } if (defined $d->{$job}{hosts}); + + $host = $hosts[0] if (defined $hosts[0]); + #get the pgid of the job(first job pid) + my @pgid = lsf_get_jobpgid($job); + my $ppid = $pgid[0]; + #get the port of the leading proc (mpirun proc port) + if (defined $ppid and defined $host) { + ($mpirunpid, $path_port,$lsf_mpi) = lsfmpi_get_mpiproc($ppid,$host,$job); + $d->{$job}{lsf_mpi} = $lsf_mpi; # can be mpich2 1 or openmpi 2 + $port = lsfmpi_get_mpiport($host,$path_port) if (defined ($path_port)); + } + return ( $host, $mpirunpid, $port ); +} +sub lsfmpi_get_lbjobs { + my $jobidfound = 0; + my $found_title = 0; + my $jobid; + my $rank_jobid=0; + my $rank_user=1; + my $rank_stat=2; + my $rank_ehost=5; + my $rank_jobname=6; + my $cmd = "bjobs -r -u $target_user "; + my @output = slurp_cmd($cmd); + foreach my $line (@output) { + $line =~ s/^ +//; # suppress blank in front of line + my @champs = split(/\s+/,$line); + next if ( $champs[$rank_jobid] eq 'JOBID' ); + next if ($#champs == -1); # empty line + if ($#champs != 0 ) { # line with many fields is first line + $jobid = undef; + $jobid = $champs[$rank_jobid]; + my @ehosts = split('\*',$champs[$rank_ehost]); + $lsfmpi_tabjobs{$jobid}{nproc} = $ehosts[0]; + my $exec_host = $ehosts[1]; + push (@{$lsfmpi_tabjobs{$jobid}{hosts}},$exec_host) if (defined ($exec_host)); + } elsif (defined $jobid ){ # line with one field, should be continued line(exec_host) + my @ehosts = split('\*',$champs[0]); + my $exec_host = $ehosts[1]; + chomp ($exec_host); + $lsfmpi_tabjobs{$jobid}{nproc} += $ehosts[0]; # nprocess + push (@{$lsfmpi_tabjobs{$jobid}{hosts}}, $exec_host); + } + } +} +sub lsfmpi_get_data { + return \%lsfmpi_tabjobs if (keys %lsfmpi_tabjobs != 0) ; + lsfmpi_get_lbjobs(); # get job list by bjobs + return \%lsfmpi_tabjobs; +} + +sub lsfmpi_get_jobs { + my $user = shift; + my @ret_jobs; + my $d = lsfmpi_get_data(); + my @jobs = keys %{$d}; + # filter other jobs that aren't launched by mpich2_wrapper + # (for exemple by mpd; mpiexec; in the submitted job) + # to do this we have criteria below + # jobs launched by mpich2_wrapper will have -configfile parameter + # jobs launched by ompi_wrapper will have --app parameter + foreach my $job (@jobs) { + my ($server,$mpirpid,$port)=lsfmpi_get_hostport($job); + if (defined ($mpirpid) and defined ($port) and defined ($server) ) { + $d->{$job}{server} = $server; + $d->{$job}{mpirpid} = $mpirpid; + $d->{$job}{port} = $port; + push (@ret_jobs, $job); + } + } + return @ret_jobs; +} + + +sub lsfmpi_setup_pcmd { + my $job = shift; + my $cmd; + my $index = 0; + my %pcmd; + my $d = lsfmpi_get_data(); + + my ($server,$mpirpid,$port); + + $server = $d->{$job}{server}; + $mpirpid = $d->{$job}{mpirpid}; + $port = $d->{$job}{port}; + config_set_internal( 'lsf_mpi', $d->{$job}{lsf_mpi}); + config_set_internal( 'lsfmpi_server', $server ); + config_set_internal( 'lsfmpi_mpirpid', $mpirpid ); + config_set_internal( 'lsfmpi_port', $port ); + my @hosts = @{ $d->{$job}{hosts} }; + $pcmd{nprocesses} = $d->{$job}{nproc}; + $pcmd{nhosts} = @hosts; + @{ $pcmd{host_list} } = @hosts; + + return %pcmd; +} +sub get_pids_ppid { +# get all pids from ppid be careful about defunct + my ( $ppid, $rank_pid, $rank_ppid , @handle) = @_; + my $pid; + my @proc; + foreach my $line (@handle) { + $line =~ s/^ +//; # take off leading space + my @champs = split(/\s+/,$line); + next if ($champs[$rank_pid] eq 'PID'); + if ($champs[$rank_ppid] == $ppid) { + $pid = $champs[$rank_pid]; + if ($line =~ /defunct/i ) { + next; + } + push (@proc,$pid); + } + } + return (@proc); +} +sub get_pids_fromport { +# get all pids from port -p host:port_num + my ( $port, $rank_pid, $rank_ppid , $rank_cmd, @handle) = @_; + my $portfound = 0; + my @proc; + foreach my $line (@handle) { + $line =~ s/^ +//; # take off space at start + my @champs = split(/\s+/,$line); + my $cmd = $champs[$rank_cmd]; + my $base = basename ($cmd); + if ( $base eq "TaskStarter") { + if ($line =~ /$port/) { + $portfound = 0; + foreach my $word (@champs) { + if ($word eq "-p") { # don't use =~ because may take --prefix + $portfound = 1; + next; + } + if ($portfound == 1) { + push (@proc, $champs[$rank_pid]) if ($word eq $port); + last; + } + } + } + } + } + return @proc; +} # open support. # ############################################################################### @@ -8752,7 +9076,89 @@ } return; } +# +# LSF-openmpi support and LSF-mpich2 +# +sub lsfmpi_get_proc { + my $job = shift; + my @proc; + my $rank_pid = 1; + my $rank_ppid = 2; + my $rank_cmd = 3; + my $port; + my ($server,$mpirun_pid); + $port = $inner_conf{lsfmpi_port}; + $server = $inner_conf{lsfmpi_server}; + $mpirun_pid = $inner_conf{lsfmpi_mpirpid}; + my $cmd = "ps -o uid,pid,ppid,cmd -u $target_user"; + my @handle=slurp_cmd($cmd); + my $hostname = hostname(); + if ( $hostname eq $server) { + #this is the server + #get all mpirun children, it should be TaskStarter pids + #get all TaskStarter children it should be the appli pids + my @ppid_proc; + @ppid_proc = get_pids_ppid($mpirun_pid,$rank_pid,$rank_ppid,@handle); + foreach my $pid (@ppid_proc) { + my @w_proc = get_pids_ppid($pid,$rank_pid,$rank_ppid,@handle); + push (@proc,@w_proc); + } + } + if ($#proc == -1) { # nothing in @proc so try from port + # all cases including case host=server which failed + # get all TaskStarter that matched port num + # get all TaskStarter children it should be the appli pids + my @ppid_proc; + @ppid_proc = get_pids_fromport($port,$rank_pid,$rank_ppid,$rank_cmd,@handle); + foreach my $pid (@ppid_proc) { + my @w_proc = get_pids_ppid($pid,$rank_pid,$rank_ppid,@handle); + push (@proc,@w_proc); + } + } + return @proc; +} +sub lsfmpi_find_pids { + my $job = shift; + my %vps; + # Iterate over all processes for this user + if ($inner_conf{lsf_mpi} == 1) { # for mpich2 wrapper + foreach my $pid ( lsfmpi_get_proc($job) ) { + + my $vp; + my %env = get_remote_env($pid); + if (!defined ($env{LSB_JOBID}) || !defined ($env{PMI_RANK}) ) { + %env = get_remote_env_bygdb($pid); + } + + if ( $env{LSB_JOBID} eq $job ) { + $vp = $env{PMI_RANK}; + } + if (defined $vp) { + $vps{$vp} = $pid; + } + } + } else { + foreach my $pid (lsfmpi_get_proc($job)){ + + my $vp; + my %env = get_remote_env($pid); + if (!defined ($env{OMPI_COMM_WORLD_SIZE}) || !defined ($env{OMPI_COMM_WORLD_RANK}) ) { + %env = get_remote_env_bygdb($pid); + } + + $vp = $env{OMPI_COMM_WORLD_RANK}; + if (defined $vp) { + $vps{$vp} = $pid; + } + } + } + foreach my $vp ( keys %vps ) { + my $pid = $vps{$vp}; + register_target_process( $vp, $pid ); + } +} + sub rms_find_pids { my $jobid = shift;