MPI topic: Process management

Experimental html version of Parallel Programming in MPI, OpenMP, and PETSc by Victor Eijkhout. download the textbook at https:/theartofhpc.com/pcse
\[ \newcommand\inv{^{-1}}\newcommand\invt{^{-t}} \newcommand\bbP{\mathbb{P}} \newcommand\bbR{\mathbb{R}} \newcommand\defined{ \mathrel{\lower 5pt \hbox{${\equiv\atop\mathrm{\scriptstyle D}}$}}} \] 8.1 : Process spawning
8.1.1 : MPI startup with universe
8.1.2 : MPMD
8.2 : Socket-style communications
8.2.1 : Server calls
8.2.2 : Client calls
8.2.3 : Published service names
8.2.4 : Unix sockets
8.3 : Sessions
8.3.1 : Short description of the session model
8.3.2 : Session creation
8.3.2.1 : Session info
8.3.2.2 : Session error handler
8.3.3 : Process sets and communicators
8.3.4 : Example
8.4 : Functionality available outside init/finalize
Back to Table of Contents

8 MPI topic: Process management

In this course we have up to now only considered the SPMD model of running MPI programs. In some rare cases you may want to run in an MPMD mode, rather than SPMD  . This can be achieved either on the OS level, using options of the mpiexec mechanism, or you can use MPI's built-in process management. Read on if you're interested in the latter.

8.1 Process spawning

crumb trail: > mpi-proc > Process spawning

The first version of MPI did not contain any process management routines, even though the earlier PVM project did have that functionality. Process management was later added with \mpistandard{2}.

Unlike what you might think, newly added processes do not become part of MPI_COMM_WORLD ; rather, they get their own communicator, and an \indextermsubh{inter}{communicator} (section  7.6  ) is established between this new group and the existing one. The first routine is

 , which tries to fire up multiple copies of a single named executable. Errors in starting up these codes are returned in an array of integers, or if you're feeling sure of yourself, specify MPI_ERRCODES_IGNORE  .

It is not immediately clear whether there is opportunity for spawning new executables; after all, MPI_COMM_WORLD contains all your available processors. You can probably tell your job starter to reserve space for a few extra processes, but that is installation-dependent (see below). However, there is a standard mechanism for querying whether such space has been reserved. The attribute MPI_UNIVERSE_SIZE  , retrieved with MPI_Comm_get_attr (section  15.1.2  ), will tell you to the total number of hosts available.

If this option is not supported, you can determine yourself how many processes you want to spawn. However, if you exceed the hardware resources, your multi-tasking operating system (which is some variant of Unix for almost everyone) will use time-slicing to start the spawned processes, but you will not gain any performance.

Here is an example of a work manager. First we query how much space we have for new processes, using the flag to see if this option is supported:

int universe_size, *universe_size_attr,uflag;
MPI_Comm_get_attr
  (comm_world,MPI_UNIVERSE_SIZE,
   &universe_size_attr,&uflag);
if (uflag) {
  universe_size = *universe_size_attr;
} else {
  printf("This MPI does not support UNIVERSE_SIZE.\nUsing world size");
  universe_size = world_n;
}
int work_n = universe_size - world_n;
if (world_p==0) {
  printf("A universe of size %d leaves room for %d workers\n",
         universe_size,work_n);
  printf(".. spawning from %s\n",procname);
}

(See section  15.1.2 for that dereference behavior.)

Then we actually spawn the processes:

const char *workerprogram = "./spawnapp";
MPI_Comm_spawn(workerprogram,MPI_ARGV_NULL,
               work_n,MPI_INFO_NULL,
               0,comm_world,&comm_inter,NULL);
## spawnmanager.py
try :
    universe_size = comm.Get_attr(MPI.UNIVERSE_SIZE)
    if universe_size is None:
        print("Universe query returned None")
        universe_size = nprocs + 4
    else:
        print("World has {} ranks in a universe of {}"\
              .format(nprocs,universe_size))
except :
    print("Exception querying universe size")
    universe_size = nprocs + 4
nworkers = universe_size - nprocs

itercomm = comm.Spawn("./spawn_worker.py", maxprocs=nworkers)

A process can detect whether it was a spawning or a spawned process by using MPI_Comm_get_parent : the resulting intercommunicator is MPI_COMM_NULL on the parent processes.
// spawnapp.c
MPI_Comm comm_parent;
MPI_Comm_get_parent(&comm_parent);
int is_child = (comm_parent!=MPI_COMM_NULL);
if (is_child) {
  int nworkers,workerno;
  MPI_Comm_size(MPI_COMM_WORLD,&nworkers);
  MPI_Comm_rank(MPI_COMM_WORLD,&workerno);
  printf("I detect I am worker %d/%d running on %s\n",
         workerno,nworkers,procname);

The spawned program looks very much like a regular MPI program, with its own initialization and finalize calls.

// spawnworker.c
MPI_Comm_size(MPI_COMM_WORLD,&nworkers);
MPI_Comm_rank(MPI_COMM_WORLD,&workerno);
MPI_Comm_get_parent(&parent);
## spawnworker.py
parentcomm = comm.Get_parent()
nparents = parentcomm.Get_remote_size()

Spawned processes wind up with a value of MPI_COMM_WORLD of their own, but managers and workers can find each other regardless. The spawn routine returns the intercommunicator to the parent; the children can find it through MPI_Comm_get_parent (section  7.6.3  ). The number of spawning processes can be found through MPI_Comm_remote_size on the parent communicator.

Running spawnapp with usize=12, wsize=4
A universe of size 12 leaves room for 8 workers
 .. spawning from c209-026.frontera.tacc.utexas.edu
Worker deduces 8 workers and 4 parents
I detect I am worker 0/8 running on c209-027.frontera.tacc.utexas.edu
I detect I am worker 1/8 running on c209-027.frontera.tacc.utexas.edu
I detect I am worker 2/8 running on c209-027.frontera.tacc.utexas.edu
I detect I am worker 3/8 running on c209-027.frontera.tacc.utexas.edu
I detect I am worker 4/8 running on c209-028.frontera.tacc.utexas.edu
I detect I am worker 5/8 running on c209-028.frontera.tacc.utexas.edu
I detect I am worker 6/8 running on c209-028.frontera.tacc.utexas.edu
I detect I am worker 7/8 running on c209-028.frontera.tacc.utexas.edu

8.1.1 MPI startup with universe

crumb trail: > mpi-proc > Process spawning > MPI startup with universe

You could start up a single copy of this program with

mpiexec -n 1 spawnmanager

but with a hostfile that has more than one host.

TACC note Intel MPI requires you to pass an option -usize to mpiexec indicating the size of the comm universe. With the TACC jobs starter ibrun do the following:

export FI_MLX_ENABLE_SPAWN=yes
# specific
MY_MPIRUN_OPTIONS="-usize 8" ibrun -np 4 spawnmanager
# more generic
MY_MPIRUN_OPTIONS="-usize ${SLURM_NPROCS}" ibrun -np 4 spawnmanager
# using mpiexec:
mpiexec -np 2 -usize ${SLURM_NPROCS} spawnmanager

8.1.2 MPMD

crumb trail: > mpi-proc > Process spawning > MPMD

Instead of spawning a single executable, you can spawn multiple with MPI_Comm_spawn_multiple  . In that case a process can retrieve with the attribute MPI_APPNUM which of the executables it is; section  15.1.2  .

8.2 Socket-style communications

crumb trail: > mpi-proc > Socket-style communications

It is possible to establish connections with running MPI programs that have their own world communicator.

  • The server process establishes a port with

    MPI_Open_port  , and calls MPI_Comm_accept to accept connections to its port.

  • The client process specifies that port in an MPI_Comm_connect call. This establishes the connection.

8.2.1 Server calls

crumb trail: > mpi-proc > Socket-style communications > Server calls

The server calls

 , yielding a port name. Port names are generated by the system and copied into a character buffer of length at most MPI_MAX_PORT_NAME  .

The server then needs to call

prior to the client doing a connect call. This is collective over the calling communicator. It returns an intercommunicator (section  7.6  ) that allows communication with the client.

MPI_Comm intercomm;
char myport[MPI_MAX_PORT_NAME];
MPI_Open_port( MPI_INFO_NULL,myport );
int portlen = strlen(myport);
MPI_Send( myport,portlen+1,MPI_CHAR,1,0,comm_world );
printf("Host sent port <<%s>>\n",myport);
MPI_Comm_accept( myport,MPI_INFO_NULL,0,comm_self,&intercomm );
printf("host accepted connection\n");

The port can be closed with MPI_Close_port  .

8.2.2 Client calls

crumb trail: > mpi-proc > Socket-style communications > Client calls

After the server has generated a port name, the client needs to connect to it with

 , again specifying the port through a character buffer. The connect call is collective over its communicator.

char myport[MPI_MAX_PORT_NAME];
if (work_p==0) {
  MPI_Recv( myport,MPI_MAX_PORT_NAME,MPI_CHAR, 
            MPI_ANY_SOURCE,0, comm_world,MPI_STATUS_IGNORE );
  printf("Worker received port <<%s>>\n",myport);
}
MPI_Bcast( myport,MPI_MAX_PORT_NAME,MPI_CHAR,0,comm_work );

/* * The workers collective connect over the inter communicator */ MPI_Comm intercomm; MPI_Comm_connect( myport,MPI_INFO_NULL,0,comm_work,&intercomm ); if (work_p==0) { int manage_n; MPI_Comm_remote_size(intercomm,&manage_n); printf("%d workers connected to %d managers\n",work_n,manage_n); }

If the named port does not exist (or has been closed), MPI_Comm_connect raises an error of class MPI_ERR_PORT  .

The client can sever the connection with MPI_Comm_disconnect  .

Running the above code on 5 processes gives: \begin{small}

# exchange port name:
Host sent port <<tag#0$OFA#000010e1:0001cde9:0001cdee$rdma_port#1024$rdma_host#10:16:225:0:1:205:199:254:128:0:0:0:0:0:0$>>
Worker received port <<tag#0$OFA#000010e1:0001cde9:0001cdee$rdma_port#1024$rdma_host#10:16:225:0:1:205:199:254:128:0:0:0:0:0:0$>>

# Comm accept/connect
host accepted connection
4 workers connected to 1 managers

# Send/recv over the intercommunicator
Manager sent 4 items over intercomm
Worker zero received data  

\end{small}

8.2.3 Published service names

crumb trail: > mpi-proc > Socket-style communications > Published service names

More elegantly than the port mechanism above, it is possible to publish a named service, with

 , which can then be discovered by other processes.

// publishapp.c
MPI_Comm intercomm;
char myport[MPI_MAX_PORT_NAME];
MPI_Open_port( MPI_INFO_NULL,myport );
MPI_Publish_name( service_name, MPI_INFO_NULL, myport );
MPI_Comm_accept( myport,MPI_INFO_NULL,0,comm_self,&intercomm );

Worker processes connect to the intercommunicator by

char myport[MPI_MAX_PORT_NAME];
MPI_Lookup_name( service_name,MPI_INFO_NULL,myport );
MPI_Comm intercomm;
MPI_Comm_connect( myport,MPI_INFO_NULL,0,comm_work,&intercomm );

For this it is necessary to have a name server running.

\begin{intelnote} Start the hydra name server and use the corresponding mpi starter:

hydra_nameserver &
MPIEXEC=mpiexec.hydra

There is an environment variable, but that doesn't seem to be needed.

export I_MPI_HYDRA_NAMESERVER=`hostname`:8008

It is also possible to specify the name server as an argument to the job starter. \end{intelnote}

At the end of a run, the service should be unpublished with

 . Unpublishing a nonexisting or already unpublished service gives an error code of MPI_ERR_SERVICE  .

MPI provides no guarantee of fairness in servicing connection attempts. That is, connection attempts are not necessarily satisfied in the order in which they were initiated, and competition from other connection attempts may prevent a particular connection attempt from being satisfied.

8.2.4 Unix sockets

crumb trail: > mpi-proc > Socket-style communications > Unix sockets

It is also possible to create an intercommunicator socket with

 .

8.3 Sessions

crumb trail: > mpi-proc > Sessions

The most common way of initializing MPI, with MPI_Init (or MPI_Init_thread  ) and MPI_Finalize  , is known as the world model which can be described as:

  1. There is a single call to MPI_Init or MPI_Init_thread ;

  2. There is a single call to MPI_Finalize ;

  3. With very few exceptions, all MPI calls appear in between the initialize and finalize calls.

This model suffers from some disadvantages:

  1. There is no error handling during MPI_Init  .

  2. MPI can not be finalized and restarted;

  3. If multiple libraries are active, they can not initialize or finalize MPI, but have to base themselves on subcommunicators; section  7.2.2  .

  4. There is no threadsafe way of initializing MPI: a library can't safely do

    MPI_Initialized(&flag);
    if (!flag) MPI_Init(0,0);
    

    if it is running in a multi-threaded environment.

\begin{mpifournote} {Session model}

In addition to the world, where all MPI is bracketed by MPI_Init (or MPI_Init_thread  ) and MPI_Finalize  , there is the session model  , where entities such as libraries can start/end their MPI session independently.

The two models can be used in the same program, but there are limitations on how they can mix.

8.3.1 Short description of the session model

crumb trail: > mpi-proc > Sessions > Short description of the session model

In the session model  , each session starts and finalizes MPI independently, giving each a separate MPI_COMM_WORLD  . The world model then becomes a separate way of starting MPI. You can create a communicator using the world model in addition to starting multiple sessions, each on their own set of processes, possibly identical or overlapping. You can also create sessions without have an MPI_COMM_WORLD created by the world model.

You can not mix in a single call objects from different sessions, from a session and from the world model, or from a session and from MPI_Comm_get_parent or MPI_Comm_join  .

8.3.2 Session creation

crumb trail: > mpi-proc > Sessions > Session creation

An MPI session is initialized and finalized with

and MPI_Session_finalize  , somewhat similar to MPI_Init and MPI_Finalize  .

MPI_Session the_session;
MPI_Session_init
  ( session_request_info,MPI_ERRORS_ARE_FATAL,
    &the_session );
MPI_Session_finalize( &the_session );

This call is thread-safe, in view of the above reasoning.

8.3.2.1 Session info

crumb trail: > mpi-proc > Sessions > Session creation > Session info

The MPI_Info object that is passed to MPI_Session_init can be null, or it can be used to request a threading level:

// session.c
MPI_Info session_request_info = MPI_INFO_NULL;
MPI_Info_create(&session_request_info);
char thread_key[] = "mpi_thread_support_level";
MPI_Info_set(session_request_info,
             thread_key,"MPI_THREAD_MULTIPLE");
Other info keys can be implementation-dependent, but the key thread_support is pre-defined.

Info keys can be retrieved again with MPI_Session_get_info :

MPI_Info session_actual_info;
MPI_Session_get_info( the_session,&session_actual_info );
char thread_level[100]; int info_len = 100, flag;
MPI_Info_get_string( session_actual_info,
                     thread_key,&info_len,thread_level,&flag );

8.3.2.2 Session error handler

crumb trail: > mpi-proc > Sessions > Session creation > Session error handler

The error handler argument accepts a pre-defined error handler (section  15.2.2  ) or one created by MPI_Session_create_errhandler  .

8.3.3 Process sets and communicators

crumb trail: > mpi-proc > Sessions > Process sets and communicators

A session has a number of process set s. Process sets are indicated with a URI  , where the URIs

mpi://WORLD and mpi://SELF are always defined.

You query the `psets' with MPI_Session_get_num_psets and MPI_Session_get_nth_pset : \csnippetwithoutput{sessionpsetq}{examples/mpi/c}{session}

The following partial code creates a communicator equivalent to MPI_COMM_WORLD in the session model:

MPI_Group world_group = MPI_GROUP_NULL;
MPI_Comm  world_comm  = MPI_COMM_NULL;
MPI_Group_from_session_pset
  ( the_session,world_name,&world_group );
MPI_Comm_create_from_group
  ( world_group,"victor-code-session.c",
    MPI_INFO_NULL,MPI_ERRORS_ARE_FATAL,
    &world_comm );
MPI_Group_free( &world_group );
int procid = -1, nprocs = 0;
MPI_Comm_size(world_comm,&nprocs);
MPI_Comm_rank(world_comm,&procid);

However, comparing communicators (with MPI_Comm_compare  ) from the session and world model, or from different sessions, is undefined behavior.

Get the info object (section  15.1.1  ) from a process set: MPI_Session_get_pset_info  . This info object always has the key mpi_size  .

8.3.4 Example

crumb trail: > mpi-proc > Sessions > Example

As an example of the use of sessions, we declare a library class, where each library object starts and ends its own session:

// sessionlib.cxx
class Library {
private:
  MPI_Comm world_comm; MPI_Session session;
public:
  Library() {
    MPI_Info info = MPI_INFO_NULL;
    MPI_Session_init
      ( MPI_INFO_NULL,MPI_ERRORS_ARE_FATAL,&session );
    char world_name[] = "mpi://WORLD";
    MPI_Group world_group;
    MPI_Group_from_session_pset
      ( session,world_name,&world_group );
    MPI_Comm_create_from_group
      ( world_group,"world-session",
	MPI_INFO_NULL,MPI_ERRORS_ARE_FATAL,
	&world_comm );
    MPI_Group_free( &world_group );
  };
   Library() { MPI_Session_finalize(&session); };

Now we create a main program, using the world model, which activates two libraries, passing data to them by parameter:

int main(int argc,char **argv) {

Library lib1,lib2; MPI_Init(0,0); MPI_Comm world = MPI_COMM_WORLD; int procno,nprocs; MPI_Comm_rank(world,&procno); MPI_Comm_size(world,&nprocs); auto sum1 = lib1.compute(procno); auto sum2 = lib2.compute(procno+1);

Note that no mpi calls will go between main program and either of the libraries, or between the two libraries, but this seems to make sense in this scenario.

\end{mpifournote}

8.4 Functionality available outside init/finalize

crumb trail: > mpi-proc > Functionality available outside init/finalize

\begin{raggedlist} MPI_Initialized MPI_Finalized MPI_Get_version MPI_Get_library_version MPI_Info_create MPI_Info_create_env MPI_Info_set MPI_Info_delete MPI_Info_get MPI_Info_get_valuelen MPI_Info_get_nkeys MPI_Info_get_nthkey MPI_Info_dup MPI_Info_free MPI_Info_f2c MPI_Info_c2f MPI_Session_create_errhandler MPI_Session_call_errhandler MPI_Errhandler_free MPI_Errhandler_f2c MPI_Errhandler_c2f MPI_Error_string MPI_Error_class \end{raggedlist} Also all routines starting with MPI_Txxx  .

Back to Table of Contents