Index: padb =================================================================== --- padb (revision 386) +++ padb (working copy) @@ -370,7 +370,7 @@ # Config options the inner knows about, only forward options if they are in # this list. -my @inner_conf = qw(edb edbopt rmgr scripts slurm_job_step pbs_server); +my @inner_conf = qw(edb edbopt rmgr scripts slurm_job_step pbs_server lsfompi_server lsfompi_mpirpid lsfompi_port); # More config options the inner knows about, these are forwarded on the # command line rather than over the sockets. @@ -507,6 +507,13 @@ find_pids => \&pbs_find_pids, }; +$rmgr{"lsf-ompi"} = { + 'is_installed' => \&lsfompi_is_installed, + 'get_active_jobs' => \&lsfompi_get_jobs, + 'setup_job' => \&lsfompi_setup_pcmd, + 'find_pids' => \&lsfompi_find_pids, +}; + ############################################################################### # # Config options @@ -2840,6 +2847,277 @@ return %pcmd; } +############################################################################### +# +# lsf-ompi-wrapper support. +# The job submission file looks like: +##! /bin/bash +##BSUB -J "PP_SNDRCV" +##BSUB -o PP_SNDRCV.%J +##BSUB -n 4 +##BSUB -e PP_SNDRCVerr.%J +##BSUB -a openmpi +#mpirun.lsf ./pp_sndrcv_spbl +# +############################################################################### +my %lsfompi_tabjobs; +sub lsfompi_is_installed { + return (find_exe("openmpi_wrapper") and find_exe("mpirun.lsf")); +} +sub get_line_ppid { # same as mpich2_wrapper + my ( $ppid, $rank_pid, $rank_ppid , @handle) = @_; + my $ret_line; + my $pid; + foreach my $line (@handle) { + $line =~ s/^ +//; # take off leading space + my @champs = split(/\s+/,$line); + next if ($champs[$rank_pid] eq 'PID'); + if ($champs[$rank_ppid] == $ppid) { + $pid = $champs[$rank_pid]; + $ret_line = $line; + last; + } + } + return ($ret_line,$pid); +} +# have to pick up process with --app +# which belongs to ppid +# return path file +sub lsfompi_get_mpiproc { + my ($ppid,$host) = @_; + my $rank_pid = 0; + my $rank_ppid = 1; + my $proc; + my $path_file; + my $count_line = 0; + #get ps from the leading host(the one that start mpirun.lsf) + my $cmd = "ssh $host ps -o pid,ppid,cmd -u $target_user"; + my @handle=slurp_cmd($cmd); + $count_line = @handle; + for(my $i=0;$i < $count_line; $i++) { # to avoid loop + my ($line,$pid) = get_line_ppid($ppid,$rank_pid,$rank_ppid,@handle); + next if (!defined $line); + if ($line =~ /mpi/ && $line =~/-app/ ) { + my @champs = split (" ", $line); + my $found_app = 0; + foreach my $word (@champs) { + if ($word eq "--app") { + $found_app = 1; + next; + } + if ($found_app != 0) { + $path_file = $word; + $proc = $pid; + last; + } + } + last; + } else { + $ppid = $pid; + } + } + return ($proc,$path_file); +} +sub lsf_get_jobpgid { # same mpich2 + my ($jobid) = @_; + my $resfound = 0; + my @proc; + my $cmd = "bjobs -l $jobid "; + my @handle = slurp_cmd($cmd); + foreach my $line (@handle) { + if ( $line =~ /Resource usage collected./i ) { + $resfound = 1; + next; + } + if ($resfound == 1) { + $line =~ s/^ +//; # take off space at start + if ($line =~ /^PGID:/i ) { + my @champs = split ( " ", $line); + my $pgid = $champs[1]; + chop ($pgid) if ($pgid =~ /;$/); + push (@proc,$pgid); + my $firstpid = 0; + foreach my $word (@champs) { + if ($word =~ /^PIDs:/) { + $firstpid = 1; + next; + } + if ($firstpid == 1) { + push (@proc,$word); + } + } + last; + } + } + } + return (@proc); +} +# port have 'host:port' format +sub lsfompi_get_mpiport{ #same as mpich2 + my ($host,$portpath) = @_; + my $portfound = 0; + my $port; + my $cmd = "ssh $host cat $portpath "; + my @handle = slurp_cmd($cmd); + foreach my $line (@handle) { + if ( $line =~ /TaskStarter/ ) { + my @champs = split ( " ", $line); + foreach my $word (@champs) { + if ($word eq "-p") { # don't use =~ because may take --prefix + $portfound = 1; + next; + } + if ($portfound == 1) { + $port = $word; + last; + } + } + last; + } + } + return $port; +} +sub lsfompi_get_hostport { + my $job = shift; + my $d = lsfompi_get_data(); + my $host; + my $port; + my $mpirunpid; + my $path_port; + + my @hosts = @{ $d->{$job}{hosts} } if (defined $d->{$job}{hosts}); + + $host = $hosts[0] if (defined $hosts[0]); + #get the pgid of the job(first job pid) + my @pgid = lsf_get_jobpgid($job); + my $ppid = $pgid[0]; + #get the port of the leading proc (mpirun proc port) + if (defined $ppid and defined $host) { + ($mpirunpid, $path_port) = lsfompi_get_mpiproc($ppid,$host); + $port = lsfompi_get_mpiport($host,$path_port) if (defined ($path_port)); + } + return ( $host, $mpirunpid, $port ); +} +sub get_pids_ppid { +# get all pids from ppid + my ( $ppid, $rank_pid, $rank_ppid , @handle) = @_; + my $pid; + my @proc; + foreach my $line (@handle) { + $line =~ s/^ +//; # take off leading space + my @champs = split(/\s+/,$line); + next if ($champs[$rank_pid] eq 'PID'); + if ($champs[$rank_ppid] == $ppid) { + $pid = $champs[$rank_pid]; + push (@proc,$pid); + } + } + return (@proc); +} +sub get_pids_fromport { +# get all pids from port + my ( $port, $rank_pid, $rank_ppid , $rank_cmd, @handle) = @_; + my $portfound = 0; + my @proc; + foreach my $line (@handle) { + $line =~ s/^ +//; # take off space at start + my @champs = split(/\s+/,$line); + my $cmd = $champs[$rank_cmd]; + my $base = basename ($cmd); + if ( $base eq "TaskStarter") { + if ($line =~ /$port/) { + $portfound = 0; + foreach my $word (@champs) { + if ($word eq "-p") { # don't use =~ because may take --prefix + $portfound = 1; + next; + } + if ($portfound == 1) { + push (@proc, $champs[$rank_pid]) if ($word eq $port); + last; + } + } + } + } + } + return @proc; +} +sub lsfompi_get_lbjobs { # same as lsfmpich2wr_get_lbjobs + my $jobidfound = 0; + my $found_title = 0; + my $jobid; + my $rank_jobid=0; + my $rank_user=1; + my $rank_stat=2; + my $rank_ehost=5; + my $rank_jobname=6; + my $cmd = "bjobs -r -u $target_user "; + my @output = slurp_cmd($cmd); + foreach my $line (@output) { + $line =~ s/^ +//; # suppress blank in front of line + my @champs = split(/\s+/,$line); + next if ( $champs[$rank_jobid] eq 'JOBID' ); + next if ($#champs == -1); # empty line + if ($#champs != 0 ) { # line with many fields is first line + $jobid = undef; + $jobid = $champs[$rank_jobid]; + my @ehosts = split('\*',$champs[$rank_ehost]); + $lsfompi_tabjobs{$jobid}{nproc} = $ehosts[0]; + my $exec_host = $ehosts[1]; + push (@{$lsfompi_tabjobs{$jobid}{hosts}},$exec_host) if (defined ($exec_host)); + } elsif (defined $jobid ){ # line with one field, should be continued line(exec_host) + my @ehosts = split('\*',$champs[0]); + my $exec_host = $ehosts[1]; + chomp ($exec_host); + $lsfompi_tabjobs{$jobid}{nproc} += $ehosts[0]; # nprocess + push (@{$lsfompi_tabjobs{$jobid}{hosts}}, $exec_host); + } + } +} +sub lsfompi_get_data { + return \%lsfompi_tabjobs if (keys %lsfompi_tabjobs != 0) ; + lsfompi_get_lbjobs(); # get job list by bjobs + return \%lsfompi_tabjobs; +} +sub lsfompi_get_jobs { + my @ret_jobs; + my $user = shift; + my $d = lsfompi_get_data(); + my @jobs = keys %{$d}; + foreach my $job (@jobs) { + # filter other jobs that aren't launched by openmpi_wrapper + # (for exemple by mpirun in the submitted job) + # to do this we have criteria below: + # jobs launched by openompi_wrapper will have --app file + # which contains the wrappers (TaskStarter) processes + # and hosts to be launched etc + my ($server,$mpirpid,$port)=lsfompi_get_hostport($job); + if (defined ($mpirpid) && defined ($port)) { + push (@ret_jobs, $job); + } + } + return @ret_jobs; +} +sub lsfompi_setup_pcmd { # same as lsfmpich2wr_setup_pcmd + my $job = shift; + my ($server,$mpirpid,$port); + my $index = 0; + my $d = lsfompi_get_data(); + ($server,$mpirpid,$port)=lsfompi_get_hostport($job); + config_set_internal( 'lsfompi_server', $server ); + config_set_internal( 'lsfompi_mpirpid', $mpirpid ); + config_set_internal( 'lsfompi_port', $port ); + my @hosts = @{ $d->{$job}{hosts} }; + + my %pcmd; + + $pcmd{nprocesses} = $d->{$job}{nproc}; + $pcmd{nhosts} = @hosts; + @{ $pcmd{host_list} } = @hosts; + + return %pcmd; +} + # open support. # ############################################################################### @@ -8691,6 +8969,65 @@ return; } +# +# LSF-orte support is similar to PBS so using some pbs function +# +sub lsfompi_get_proc { + my $job = shift; + my @proc; + my $rank_pid = 1; + my $rank_ppid = 2; + my $rank_cmd = 3; + my $port; + my ($server,$mpirun_pid); + $port = $inner_conf{lsfompi_port}; + $server = $inner_conf{lsfompi_server}; + $mpirun_pid = $inner_conf{lsfompi_mpirpid}; + my $cmd = "ps -o uid,pid,ppid,cmd -u $target_user"; + my @handle=slurp_cmd($cmd); + my $hostname = hostname(); + if ( $hostname eq $server) { + #this is the server + #get all mpirun children, it should be TaskStarter pids + #get all TaskStarter children it should be the appli pids + my @ppid_proc = get_pids_ppid($mpirun_pid,$rank_pid,$rank_ppid,@handle); + foreach my $pid (@ppid_proc) { + my @w_proc = get_pids_ppid($pid,$rank_pid,$rank_ppid,@handle); + push (@proc,@w_proc); + } + } else { + # the other + # get all TaskStarter that matched port num + # get all TaskStarter children it should be the appli pids + my @ppid_proc = get_pids_fromport($port,$rank_pid,$rank_ppid,$rank_cmd,@handle); + foreach my $pid (@ppid_proc) { + my @w_proc = get_pids_ppid($pid,$rank_pid,$rank_ppid,@handle); + push (@proc,@w_proc); + } + } + return @proc; +} +sub lsfompi_find_pids { + my $job = shift; + my %vps; + foreach my $pid (lsfompi_get_proc($job)){ + + my $vp; + my %env = get_remote_env($pid); + if (!defined ($env{OMPI_COMM_WORLD_SIZE}) || !defined ($env{OMPI_COMM_WORLD_RANK}) ) { + %env = get_remote_env_bygdb($pid); + } + + $vp = $env{OMPI_COMM_WORLD_RANK}; + if (defined $vp) { + $vps{$vp} = $pid; + } + } + foreach my $vp ( keys %vps ) { + my $pid = $vps{$vp}; + register_target_process( $vp, $pid ); + } +} sub rms_find_pids { my $jobid = shift;