IN3200/IN4200: Chapter 9
Distributed-memory parallel programming with MPI
Textbook: Hager & Wellein, Introduction to High Performance Computing for Scientists and
Engineers
Objective
The concept of explicit “message passing”
Introduction to MPI (the Message Passing Interface)
Message passing
Shared-memory programming (such as OpenMP) does not
work for distributed-memory parallel computers
There is no way for one processor to directly access the
address space of another process
Explicit “message passing” is required
This can also be a programming model for shared-memory or
hybrid systems
Basic features of message-passing programming
The same program runs on all processes (Single Program
Multiple Data, or SPMD)—no difference from OpenMP
programing in this regard
The work of each process is implementation in a sequential
language (such as C)
Data exchange (sending and receiving messages) is done via
calls to an appropriate library
All variables in a process are local to this process (nothing is
shared)
“Messages” carry data to be exchanged between processes
Information needed about a message:
Which process is sending the message?
Where is the data on the sending process?
What kind of data is being sent?
How much data constitutes the message?
Which process is going to receive the message?
Where should the data be placed on the receiving process?
What amount of data is the receiving process prepared to
accept?
MPI (Message Passing Interface)
MPI is a library standard for programming distributed memory
MPI implementation(s) available on almost every major
parallel platform (also on shared-memory machines)
Portability, good performance & functionality
Collaborative computing by a group of individual processes
Each process has its own local memory
Explicit message passing enables information exchange and
collaboration between processes
MPI C language binding
#include <mpi.h>
rv = MPI_Xxxxx(parameter, ... )
Beware of the case-sensitive naming pattern.
The return value (rv) transports information about the success of
the MPI operation. (MPI_SUCCESS is returned if the MPI
routine completed successfully.)
MPI_Init
The MPI_Init function initializes the parallel environment.
int MPI_Init( int *argc, char ***argv )
The MPI_Init function takes pointers to the main() function’s
arguments so that the library can evaluate and remove any
additional command line arguments that may have been added by
the MPI startup process.
MPI communicator
An MPI communicator is a "communication universe" for a
group of processes
MPI_COMM_WORLD – name of the default MPI communicator,
i.e., the collection of all processes
Each process in a communicator is identified by its rank
Almost every MPI command needs to provide a communicator
as input argument
MPI process rank
Each process has a unique rank, i.e. an integer identifier,
within a communicator
The rank value is between 0 and #procs-1
The rank value is used to distinguish one process from another
Commands MPI_Comm_size & MPI_Comm_rank are very useful
int size, my_rank;
MPI_Comm_size (MPI_COMM_WORLD, &size);
MPI_Comm_rank (MPI_COMM_WORLD, &my_rank);
MPI_Finalize
An MPI parallel program is shut down by a call to
MPI_Finalize().
Note that no MPI process except rank 0 is guaranteed to execute
any code beyond MPI_Finalize().
“Hello world” in MPI
#include <stdio.h>
#include <mpi.h>
int main (int nargs, char** args)
{
int size, my_rank;
MPI_Init (&nargs, &args);
MPI_Comm_size (MPI_COMM_WORLD, &size);
MPI_Comm_rank (MPI_COMM_WORLD, &my_rank);
printf("Hello world, I’ve rank %d out of %d procs.\n",
my_rank,size);
MPI_Finalize ();
return 0;
}
Example running result of “Hello world”
Example of compilation: mpicc hello_world_mpi.c
Example of parallel execution: mpirun -np 4 ./a.out
Example running result (note: no deterministic order of the
output):
Hello world, I’ve rank 2 out of 4 procs.
Hello world, I’ve rank 1 out of 4 procs.
Hello world, I’ve rank 3 out of 4 procs.
Hello world, I’ve rank 0 out of 4 procs.
Note: Compilation and execution can vary from system to system.
The abstract “picture” of parallel execution
Process 0 Process 1 ··· Process P-1
#in lude <stdio.h> #in lude <stdio.h> #in lude <stdio.h>
#in lude <mpi.h> #in lude <mpi.h> #in lude <mpi.h>
int main (int nargs, har** args) int main (int nargs, har** args) int main (int nargs, har** args)
{ { {
int size, my_rank; int size, my_rank; int size, my_rank;
MPI_Init (&nargs, &args); MPI_Init (&nargs, &args); MPI_Init (&nargs, &args);
MPI_Comm_size (MPI_COMM_WORLD, &size); MPI_Comm_size (MPI_COMM_WORLD, &size); MPI_Comm_size (MPI_COMM_WORLD, &size);
MPI_Comm_rank (MPI_COMM_WORLD, &my_rank); MPI_Comm_rank (MPI_COMM_WORLD, &my_rank); MPI_Comm_rank (MPI_COMM_WORLD, &my_rank);
printf("Hello world, I've rank %d out of %d pro s.\n", printf("Hello world, I've rank %d out of %d pro s.\n", printf("Hello world, I've rank %d out of %d pro s.\n",
my_rank,size); my_rank,size); my_rank,size);
MPI_Finalize (); MPI_Finalize (); MPI_Finalize ();
return 0; return 0; return 0;
} } }
MPI message
An MPI message is simply an array of elements of a particular MPI
data type.
Data types can either be standard types (pre-defined) or derived
types, which must be defined by appropriate MPI calls.
Point-to-point communication
One “sender” and one “receiver”: point-to-point communication
Both the sender and receiver are identified by their ranks
(within an MPI communicator)
Each point-to-point message can carry an additional integer
label, the so-called tag
The simplest MPI send command
int MPI_Send(void *buf, int count,
MPI_Datatype datatype,
int dest, int tag,
MPI_Comm comm);
This blocking send function returns when the data has been
delivered to the system and the buffer can be reused. The message
may not have been received by the destination process.
The simplest MPI receive command
int MPI_Recv(void *buf, int count
MPI_Datatype datatype,
int source, int tag,
MPI_Comm comm,
MPI_Status *status);
This blocking receive function waits until a matching message
is received from the system so that the buffer contains the
incoming message.
Match of data type, source process (or MPI_ANY_SOURCE),
message tag (or MPI_ANY_TAG).
Receiving fewer datatype elements than count is ok, but
receiving more elements is an error.
MPI_Status
The MPI_Recv function has an additional output argument: the
status object, which can be used to determine “unknown”
parameters (if any).
For example, the source or tag of a received message may not be
known if wildcard values were used in the receive function.
To query the information stored in a status object:
status.MPI_SOURCE
status.MPI_TAG
MPI_Get_count (MPI_Status *status,
MPI_Datatype datatype,
int *count);
MPI parallelization of numerical integration
int rank, size;
double a=0.0, b=1.0, mya, myb, psum;
MPI_Status;
MPI_Init (&nargs, &args);
MPI_Comm_size (MPI_COMM_WORLD, &size);
MPI_Comm_rank (MPI_COMM_WORLD, &rank);
mya = a + rank*(b-a)/size;
myb = mya + (b-a)/size;
psum = integrate(mya,myb); // integrate over its "subinterval"
if (rank==0) {
double res = psum;
for (i=1; i<size; i++) {
MPI_Recv(&psum,1,MPI_DOUBLE,i,0,MPI_COMM_WORLD,&status);
res += psum;
}
printf("Result: %g\n", res);
}
else {
MPI_Send(&psum,1,MPI_DOUBLE,0,0,MPI_COMM_WORLD);
}
MPI_Finalize();
Some explanations
Each MPI process is assigned a “subinterval”: [mya, myb] to
work on.
Each MPI process then computes its partial result (psum)
Thereafter, the partial results need to be summed up
The strategy chosen by the preceding MPI example is to let
each process (except rank 0) send its partial result to process
rank 0, which sums up all the partial results
Additional remarks
The preceding MPI example can be improved in several ways:
Rank 0 receives in total size-1 messages, one from each of
the other processes. The order of receiving the messages is
fixed, probably not the same as the incoming order of the
messages. Using MPI_ANY_SOURCE (wildcard) instead of a
prescribed sender rank is more appropriate.
Rank 0 calls MPI_Recv after its own calculation
(integrate(mya,myb)). If some of the other processes finish
their computation earlier, communication cannot proceed, and
it cannot be overlapped with computation on rank 0.
(Non-blocking point-to-point communication will be more
appropriate, see Section 9.2.4.)
Rank 0 can easily become a “bottleneck”, because it is
responsible for receiving all the partial results. (Use of
collective communication, such as that specifically designed for
“reduction”, will be more appropriate, see Section 9.2.3.)
“Uncertainties/risks” with MPI_Send
While MPI_Send is easy to use, one should be aware that the MPI
standard allows for a considerable amount of freedom in its actual
implementation.
Internally it may work completely synchronously, meaning
that the call can not return until a message transfer has at
least started after a “handshake” with the receiver.
However, it may also copy the message to an intermediate
buffer and return right away, leaving the “handshake” and data
transmission to another mechanism, like a background thread.
It may even change its behavior depending on any explicit or
hidden parameters.
Scenarios of “handshake”
Non-Buffered Blocking Message Passing Operations
sending receiving sending receiving sending receiving
process process process process process process
send request to send
request to send request to send
okay to send receive send okay to send receive send receive
okay to send
data data data
(a) Sender comes first; (b) Sender and receiver come (c) Receiver comes first;
idling at sender at about the same time; idling at receiver
idling minimized
Source: A. Grama, A. Gupta, G. Karypis, and V. Kumar. Introduction to Parallel Computing. Addison
Wesley, 2003
Possibilities for deadlock
Deadlocks may occur if the possible synchronousness of
MPI_Send is not taken into account. A typical communication
pattern where this may become crucial is a “ring shift”:
int rank, size, left, right,in_buf[N], out_buf[N];
MPI_Status;
// .......
MPI_Comm_size (MPI_COMM_WORLD, &size);
MPI_Comm_rank (MPI_COMM_WORLD, &rank);
left = rank==size-1 ? 0 : rank+1;
right = rank==0 ? size-1 : rank-1;
MPI_Send(out_buf,N,MPI_INT,left,0,MPI_COMM_WORLD);
MPI_Recv(in_buf,N,MPI_INT,right,0,MPI_COMM_WORLD,&status);
Depiction of the “ring shift” pattern
If MPI_Send is synchronous (and there is no buffering), all
processes call it first and then wait forever for a matching
receive to be posted—deadlock.
However, the ring shift may run without problems if the
messages are sufficiently short. In fact, most MPI
implementations provide a (small) internal buffer for short
messages and switch to synchronous mode when the buffer is
full or too small.
One simple solution
int rank, size, left, right,in_buf[N], out_buf[N];
MPI_Status;
// .......
MPI_Comm_size (MPI_COMM_WORLD, &size);
MPI_Comm_rank (MPI_COMM_WORLD, &rank);
left = rank==size-1 ? 0 : rank+1;
right = rank==0 ? size-1 : rank-1;
if (rank%2) {
MPI_Recv(in_buf,N,MPI_INT,right,0,MPI_COMM_WORLD,&status);
MPI_Send(out_buf,N,MPI_INT,left,0,MPI_COMM_WORLD);
}
else {
MPI_Send(out_buf,N,MPI_INT,left,0,MPI_COMM_WORLD);
MPI_Recv(in_buf,N,MPI_INT,right,0,MPI_COMM_WORLD,&status);
}
Depiction of the simple solution
Special MPI functions for “ring shifts”
int MPI_Sendrecv(const void *sendbuf, int sendcount, MPI_Datatype sendtype,
int dest, int sendtag,
void *recvbuf, int recvcount, MPI_Datatype recvtype,
int source, int recvtag,
MPI_Comm comm, MPI_Status *status)
int MPI_Sendrecv_replace(void *buf, int count, MPI_Datatype datatype,
int dest, int sendtag, int source, int recvtag,
MPI_Comm comm, MPI_Status *status)
Both routines use blocking communication, are guaranteed to not
be subject to the deadlock effects that may occur with separate
send and receive.
Collective communication in MPI
Recall the numerical integration example: summing up the partial
sums is a reduction operation.
MPI has mechanisms that make reductions much simpler and in
most cases more efficient than looping over all ranks and collecting
results.
Since a reduction is a procedure involves all ranks in a
communicator, it belongs to the so-called collective
communication operations in MPI.
A collective MPI routine must be called by all ranks in a
communicator.
MPI_Barrier
MPI_Barrier is the simplest collective in MPI, it does not actually
perform any real data transfer.
int MPI_Barrier(MPI_Comm comm)
The barrier synchronizes the members of the communicator, i.e., all
processes must call it before they are allowed to return to the user
code.
Don’t over-use MPI_Barrier! There are other MPI routines that
allow for implicit or explicit synchronization with finer control.
MPI_Bcast
int MPI_Bcast( void *buffer, int count, MPI_Datatype datatype, int root,
MPI_Comm comm )
MPI_Bcast sends a message from one process (the “root”) to all
others in the communicator.
The buffer argument to MPI_Bcast() is a send buffer on the root
and a receive buffer on any other process.
Depiction of MPI_Bcast
MPI_Gather & MPI_Scatter
Examples of more advanced MPI collective calls that are concerned
with global data distribution:
MPI_Gather() collects the send buffer contents of all
processes and concatenates them in rank order into the receive
buffer of the root process
MPI_Scatter() does the reverse, distributing equal-sized
chunks of the root’s send buffer
MPI_Gatherv() and MPI_Scatterv() support arbitrary
per-rank chunk sizes
MPI_Gather & MPI_Scatter syntax
int MPI_Gather(const void *sendbuf, int sendcount, MPI_Datatype sendtype,
void *recvbuf, int recvcount, MPI_Datatype recvtype,
int root, MPI_Comm comm)
int MPI_Scatter(const void *sendbuf, int sendcount, MPI_Datatype sendtype,
void *recvbuf, int recvcount, MPI_Datatype recvtype, int root,
MPI_Comm comm)
The syntax of MPI_Gatherv() and MPI_Scatterv() is more
complex, not listed here.
Depiction of MPI_Gather & MPI_Scatter
(a) Equal-Size Gather and Scatter Operations
P0 P1 P2 P3
A = 1 8 A = 4 1 A = 4 6 A = 0 3
Gather(A, B, P2)
B = 1 8 4 1 4 6 0 3
Scatter(B, A, P2)
A = 1 8 A = 4 1 A = 4 6 A = 0 3
P0 P1 P2 P3
(b) Unequal-Size Gather and Scatter Operations
P0 P1 P2 P3
A = 1 A = 8 2 4 1 A = 4 6 A = 9 0 3
Gather(A, B, P1)
B = 1 8 2 4 1 4 6 9 0 3
Scatter(B, A, P1)
A = 1 A = 8 2 4 1 A = 4 6 A = 9 0 3
P0 P1 P2 P3
MPI_Reduce
int MPI_Reduce(const void *sendbuf, void *recvbuf,
int count, MPI_Datatype datatype,
MPI_Op op, int root, MPI_Comm comm)
MPI_Reduce() combines the contents of the sendbuf array on all
processes, element-wise, using an operator encoded by the op
argument, and stores the result in recvbuf on root
Examples of predefined operators: MPI_MAX, MPI_MIN,
MPI_SUM and MPI_PROD
Depiction of MPI_Reduce
Rewrite of the “numerical integration” example
int rank, size;
double a=0.0, b=1.0, mya, myb, psum, res=0.;
MPI_Status;
MPI_Init (&nargs, &args);
MPI_Comm_size (MPI_COMM_WORLD, &size);
MPI_Comm_rank (MPI_COMM_WORLD, &rank);
mya = a + rank*(b-a)/size;
myb = mya + (b-a)/size;
psum = integrate(mya,myb); // integrate over its "subinterval"
MPI_Reduce(&psum,&res,1,MPI_DOUBLE,MPI_SUM,0,MPI_COMM_WORLD);
if (rank==0)
printf("Result: %g\n", res);
MPI_Finalize();
MPI_Allreduce
int MPI_Allreduce(const void *sendbuf, void *recvbuf, int count,
MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
The result of the reduction operation (recvbuf) is available on all
MPI ranks.
Nonblocking point-to-point communication
Point-to-point communication can be performed with nonblocking
semantics.
A nonblocking point-to-point call merely initiates a message
transmission and returns very quickly to the user code.
In an efficient implementation, waiting for data to arrive and
the actual data transfer occur in the background, leaving
resources free for computation.
In other words, nonblocking MPI is a way in which
communication may be overlapped with computation if
implemented efficiently.
The message buffer must not be used as long as the user
program has not been notified that it is safe to do so (which
can be checked by suitable MPI calls).
Nonblocking and blocking MPI calls are mutually compatible,
i.e., a message sent via a blocking send can be matched by a
nonblocking receive.
MPI_Isend
int MPI_Isend(const void *buf, int count, MPI_Datatype datatype,
int dest, int tag,
MPI_Comm comm, MPI_Request *request)
As opposed to the blocking send (MPI_Send()), MPI_Isend()
has an additional output argument, the request handle (of type
struct MPI_Request).
It serves as an identifier by which the program can later refer
to the “pending” communication request.
MPI_Irecv
int MPI_Irecv(void *buf, int count, MPI_Datatype datatype,
int source, int tag,
MPI_Comm comm, MPI_Request * request)
The status object known from MPI_Recv() is not needed for
MPI_Irecv.
MPI_Test & MPI_Wait
Checking a pending communication for completion can be done via
the MPI_Test() and MPI_Wait() functions. The former only tests
for completion and returns a flag, while the latter blocks until the
buffer can be used.
int MPI_Test(MPI_Request *request, int *flag, MPI_Status *status)
int MPI_Wait(MPI_Request *request, MPI_Status *status)
The status object contains useful information only if the pending
communication is a completed receive (i.e., in the case of
MPI_Test() the value of flag must be true).
MPI_Waitall
In the case of multiple non-blocking communication operations
(multiple requests pending), it is more convenient to use the
MPI_Waitall function:
int MPI_Waitall(int count, MPI_Request array_of_requests[],
MPI_Status array_of_statuses[])
The “numerical integration” example, yet again
// ...
double *tmp;
MPI_Request *request_array;
MPI_Status *status_array;
if (rank==0) {
// allocate arrays of tmp, request_array & status_array...
for (i=1; i<size; i++)
MPI_Irecv(&tmp[i-1],1,MPI_DOUBLE,i,0,MPI_COMM_WORLD,&request_array[i-1]);
}
mya = a + rank*(b-a)/size;
myb = mya + (b-a)/size;
psum = integrate(mya,myb); // integrate over its "subinterval"
if (rank==0) {
double res = psum;
MPI_Waitall (size-1,request_array,status_array);
for (i=1; i<size; i++)
res += tmp[i-1];
printf("Result: %g\n", res);
}
else {
MPI_Send(&psum,1,MPI_DOUBLE,0,0,MPI_COMM_WORLD);
}
Benefits of nonblocking communication
Nonblocking communication provides an obvious way to
overlap communication, i.e., overhead, with useful work.
The possible performance advantage, however, depends on
many factors, and may even be nonexistent.
But even if there is no real overlap, multiple outstanding
nonblocking requests may improve performance because the
MPI library can decide which of them gets serviced first.
Nonblocking communication helps to avoid deadlock.
A short summary
Virtual topologies
MPI suits very well for implementing domain decomposition
(Section 5.2.1) on distributed-memory parallel computers.
However, setting up the process grid and keeping track of
which ranks have to exchange halo data is nontrivial.
MPI contains some functionality to support this recurring task
in the form of virtual topologies.
These provide a convenient process naming scheme, which fits
the required communication pattern.
Cartesian topologies
Example: a 2D global Cartesian mesh of size 3000 × 4000. Suppose
we want to use 3 × 4 = 12 MPI processes to divide the global
mesh, each holding a piece of 1000 × 1000.
Cartesian topologies (2)
As shown in the preceding figure, each process can either be
identified by its rank or its Cartesian coordinates.
Each process has a number of neighbors, which depends on
the grid’s dimensionality. (In our example, the number of
dimensions is two, which leads to at most four neighbors per
process.)
MPI can help with establishing the mapping between ranks
and Cartesian coordinates in the process grid.
MPI_Cart_create
int MPI_Cart_create(MPI_Comm comm_old, int ndims, const int dims[],
const int periods[], int reorder, MPI_Comm * comm_cart)
A new, “Cartesian” communicator comm_cart is generated,
which can be used later to refer to the topology.
The periods array specifies which Cartesian directions are
periodic, and the reorder parameter allows, if true, for rank
reordering so that the rank of a process in communicators
comm_old and comm_cart may differ.
Here, MPI merely keeps track of the topology information.
MPI_Cart_coords & MPI_Cart_rank
int MPI_Cart_coords(MPI_Comm comm, int rank, int maxdims, int coords[])
int MPI_Cart_rank(MPI_Comm comm, const int coords[], int *rank)
These are two “service” functions responsible for the translation
between Cartesian process coordinates and an MPI rank.
MPI_Cart_coords() calculates the Cartesian coordinates for a
given rank.
The reverse mapping, i.e., from Cartesian coordinates to an
MPI rank, is performed by MPI_Cart_rank().
Example of 2D Cartesian grid
int rank, size;
MPI_Comm comm;
int dim[2], period[2], reorder;
int coord[2], id;
// ....
dim[0]=4; dim[1]=3;
period[0]=0; period[1]=1;
reorder=1;
MPI_Cart_create(MPI_COMM_WORLD, 2, dim, period, reorder, &comm);
if (rank == 5) {
MPI_Cart_coords(comm, rank, 2, coord);
printf("Rank %d coordinates are %d %d\n", rank, coord[0], coord[1]);
}
if(rank==0) {
coord[0]=3; coord[1]=1;
MPI_Cart_rank(comm, coord, &id);
printf("The processor at position (%d, %d) has rank %d\n", coord[0], coord[1], id);
}
MPI_Cart_shift
A regular task with domain decomposition is to find out who the
next neighbors of a certain process are along a certain Cartesian
dimension.
int MPI_Cart_shift(MPI_Comm comm, int direction, int disp, int *rank_source,
int *rank_dest)
1D example
// ....
dims[0] = size;
periods[0] = 1;
MPI_Cart_create( MPI_COMM_WORLD, 1, dims, periods, 0, &comm );
MPI_Cart_shift( comm, 0, 1, &source, &dest );
if (source != ((rank - 1 + size) % size)) {
errs++;
printf( "source for shift 1 is %d\n", source );fflush(stdout);
}
if (dest != ((rank + 1) % size)) {
errs++;
printf( "dest for shift 1 is %d\n", dest );fflush(stdout);
}
MPI_Cart_shift( comm, 0, -1, &source, &dest );
if (source != ((rank + 1) % size)) {
errs++;
printf( "source for shift -1 is %d\n", source );fflush(stdout);
}
if (dest != ((rank - 1 + size) % size)) {
errs++;
printf( "dest for shift -1 is %d\n", dest );fflush(stdout);
}