|
In This Section
Collective Communication
Collective communication provides a more structured alternative to
point-to-point communication. With collective communications, all of
the processes within a communicator collaborate on a single
communication operation that fits one of several common
communication patterns used in message-passing
applications. Collective operations include simple barriers,
one-to-all, all-to-one, and all-to-all communications, and parallel
reduction operations that combine the values provided by each of the
processes in the communication.
Although it is possible to express parallel programs entirely
through point-to-point operations (some even call send and receive
the "assembly language" of distributed-memory parallel programming),
collectives provide several advantages for writing parallel
programs. For these reasons, it is generally preferred to use
collectives whenever possible, falling back to point-to-point
operations when no suitable collective exists.
- Code Readability/Maintainability
- It is often easier to write and reason about programs that use
collective communication than the equivalent program using
point-to-point communication. Collectives express the intent of a
communication better (e.g., a
Scatter operation is
clearly distributing data from one process to all of the other
processes), and there are often far fewer collective operations
needed to accomplish a task than point-to-point messages (e.g., a
single all-to-all operation instead of N2 point-to-point
operations), making it easier to debug programs using
collectives.
- Performance
- MPI implementations typically contain optimized algorithms for
collective operations that take advantage of knowledge of the
network topology and hardware, even taking advantage of
hardware-based implementations of some collective operations. These
optimizations are hard to implement directly over point-to-point,
without the knowledge already available in the MPI implementation
itself. Therefore, using collective operations can help improve the
performance of parallel programs and make that performance more
portable to other clusters with different configurations.
Barrier: Marching Computations
In an MPI program, the various processes perform their local
computations without regard to the behavior of the other processes
in the program, except when the processes are waiting for some
inter-process communication to complete. In many parallel programs,
all of the processes work more-or-less independently, but we want to
make sure that all of the processes are on the same step at the same
time. The Barrier collective operation is used for
precisely this operation. When processes enter the barrier, they do
not exit the barrier until all processes have entered the
barrier. Place barriers before or after a step of the computation
that all processes need to perform at the same time.
In the example program below, each of the iterations of the loop is
completely synchronized, so that every process is on the same
iteration at the same time.
using System;
using MPI;
class Barrier
{
static void Main(string[] args)
{
using (new MPI.Environment(ref args))
{
Intracommunicator comm = Communicator.world;
for (int i = 1; i <= 5; ++i)
{
comm.Barrier();
if (comm.Rank == 0)
Console.WriteLine("Everyone is on step " + i + ".");
}
}
}
}
Executing this program with any number of processes will produce
the following output (here, we use 8 processes).
C:\Barrier\bin\Debug>mpiexec -n 8 Barrier.exe
Everyone is on step 1.
Everyone is on step 2.
Everyone is on step 3.
Everyone is on step 4.
Everyone is on step 5.
All-to-one: Gathering Data
The MPI Gather operation collects data provided by all
of the processes in a communicator on a single process, called the
root process. Gather is typically used to bring summary
data from a computation back to the process responsible for
communicating that information to the user. For example, in the
following program, we gather the names of the processors (or hosts)
on which each process is executing, then sort and display that
information at the root.
using System;
using MPI;
class Hostnames
{
static void Main(string[] args)
{
using (new MPI.Environment(ref args))
{
Intracommunicator comm = Communicator.world;
string[] hostnames = comm.Gather(MPI.Environment.ProcessorName, 0);
if (comm.Rank == 0)
{
Array.Sort(hostnames);
foreach(string host in hostnames)
Console.WriteLine(host);
}
}
}
}
In the call to Gather, each process provides a value
(in this case, the string produced by reading
the ProcessorName property) to the Gather
operation, along with the rank of the "root" node (here, process
zero). The Gather operation will return an array of
values to the root node, where the ith value in
the array corresponds to the value provided by the process with
rank i. All other processes receive a null
array.
To gather all of the data from all of the nodes, use
the Allgather collective. Allgather is
similar to Gather, with two differences: first, there
is no parameter identifying the "root" process, and
second, all processes receive the same array containing the
contributions from every process. An Allgather is,
therefore, the same as a Gather followed by
a Broadcast, described below.
One-to-all: Spreading the Message
While the Gather and Allgather
collectives bring together the data from all of the processes,
the Broadcast and Scatter collectives
distribute data from one process to all of the processes.
The Broadcast operation takes a value from one process
and broadcasts it to every other process. For example, imagine a
system that takes user input from a single process (rank 0) and
distributes that command to all of the processes so that they all
execute the command concurrently (and coordinate to complete the
command). Such a system could be implemented with MPI as follows,
using Broadcast to distribute the command:
using System;
using MPI;
class CommandServer
{
static void Main(string[] args)
{
using (new MPI.Environment(ref args))
{
Intracommunicator comm = Communicator.world;
string command = null;
do
{
if (comm.Rank == 0)
command = GetInputFromUser();
comm.Broadcast(ref command, 0);
} while (command != "quit");
}
}
}
The Broadcast operation requires only two arguments;
the second, familiar argument is the rank of the root process, which
will supply the value. The first argument contains the value to send
(at the root) or the place in which the received value will be
stored (for every process). The pattern used in this example is
quite common for Broadcast: all processes define the
same variable, but only the root process gives it a meaningful
value. Then the processes coordinate to broadcast the root's value
to every process, and all processes follow the same code path to
handle the data.
The Scatter collective, like Broadcast,
broadcasts values from a root process to every other
process. Scatter, however, will
broadcast different values to each of the processes, allowing
the root to hand out different tasks to each of the other
processes. The root process provides an array of values, in which
the ith value will be sent to the process with
rank i. All of the processes then return their value from
the Scatter operation.
All-to-all: Something for Everyone
The Alltoall collective transmits data from every
process to every other process. Each process will provide an array
whose ith value will be sent to the process with
rank i. Each process will then receive in return a different
array, whose jth value will be the value received
from the process with rank j. In the example below, we
generate unique strings to be sent from each process to every other
process.
string[] data = new string[comm.Size];
for (int dest = 0; dest < comm.Size; ++dest)
data[dest] = "From " + comm.Rank + " to " + dest;
string[] results = comm.Alltoall(data[]);
When executed with 8 processes, rank 1 will receive an array
containing the following strings (in order):
From 0 to 1.
From 1 to 1.
From 2 to 1.
From 3 to 1.
From 4 to 1.
From 5 to 1.
From 6 to 1.
From 7 to 1.
Combining Results with Parallel Reduction
MPI contains several parallel reduction operations that combine the
values provided by each of the processes into a single value that
somehow sums up the results. The way in which results are combined
is determined by the user, allowing many different kinds of
computations to be expressed. For example, one can compute the sum
or product of values produced by the processes, find the minimum or
maximum of those values, or concatenate the results
computed by each process.
The most basic reduction operation is the Reduce
collective, which combines the values provided by each of the
processes and returns the result at the designated root process. If
the process with rank i contributes the
value vi, the result of the reduction for n
processes is v1 + v2 +
... + vn, where + can be any associative
operation.
To illustrate the use of Reduce, we're going to use
MPI to compute an approximation of pi. The algorithm is
relatively simple: inscribe a unit circle within a unit square, and
then randomly throw darts within the unit square. The ratio of the
number of darts that land within the circle to the number of darts
that land within the square is the same as the ration of the area of
the circle to the area of the square, and therefore can be used to
compute pi. Using this principle, the following sequential program
computes an approximation of pi:
using System;
class SequentialPi
{
static void Main(string[] args)
{
int dartsPerProcessor = 10000;
Random random = new Random();
int dartsInCircle = 0;
for (int i = 0; i < dartsPerProcessor; ++i)
{
double x = (random.NextDouble() - 0.5) * 2;
double y = (random.NextDouble() - 0.5) * 2;
if (x * x + y * y <= 1.0)
++dartsInCircle;
}
Console.WriteLine("Pi is approximately {0:F15}.",
4*(double)totalDartsInCircle/(double)dartsPerProcessor);
}
}
When running this program, the more darts you throw, the better the
approximation to pi. To parallelize this program, we'll use MPI to
run several processes, each of which will throw darts
independently. Once all of the processes have finished, we'll sum up
the results (the total number of darts that landed inside the circle
on all processes) to compute pi. The complete code for the parallel
calculation of pi follows, but the most important line
uses Reduce to sum the total number of darts that
landed in the circle across all of the processes:
int totalDartsInCircle = comm.Reduce(dartsInCircle, Operation<int>.Add, 0);
The three arguments to Reduce are the number of darts
that landed in the circle locally, a
delegate Operation<int>.Add that sums integers,
and the rank of the root process (here, 0). Any other .NET delegate
would also work, e.g.,
public static int AddInts(int x, int y) { return x + y; }
int totalDartsInCircle = comm.Reduce(dartsInCircle, AddInts, 0);
However, using the MPI.NET Operation class permits
better optimizations within MPI.NET. Without further delay, here is
the complete MPI program for computing an approximation to pi in
parallel:
using System;
using MPI;
class Pi
{
static void Main(string[] args)
{
using (new MPI.Environment(ref args))
{
Intracommunicator comm = Communicator.world;
int dartsPerProcessor = 10000;
Random random = new Random(5 * world.Rank);
int dartsInCircle = 0;
for (int i = 0; i < dartsPerProcessor; ++i)
{
double x = (random.NextDouble() - 0.5) * 2;
double y = (random.NextDouble() - 0.5) * 2;
if (x * x + y * y <= 1.0)
++dartsInCircle;
}
int totalDartsInCircle = comm.Reduce(dartsInCircle, Operation<int>.Add, 0);
if (comm.Rank == 0)
Console.WriteLine("Pi is approximately {0:F15}.",
4*(double)totalDartsInCircle/(world.Size*(double)dartsPerProcessor));
}
}
}
|