Pervasive Technology Labs logo

MPI.NET Tutorial: Collective Communication

  |   Home   |   Download   |   Documentation   |  

In This Section

Collective Communication

Collective communication provides a more structured alternative to point-to-point communication. With collective communications, all of the processes within a communicator collaborate on a single communication operation that fits one of several common communication patterns used in message-passing applications. Collective operations include simple barriers, one-to-all, all-to-one, and all-to-all communications, and parallel reduction operations that combine the values provided by each of the processes in the communication.

Although it is possible to express parallel programs entirely through point-to-point operations (some even call send and receive the "assembly language" of distributed-memory parallel programming), collectives provide several advantages for writing parallel programs. For these reasons, it is generally preferred to use collectives whenever possible, falling back to point-to-point operations when no suitable collective exists.

Code Readability/Maintainability
It is often easier to write and reason about programs that use collective communication than the equivalent program using point-to-point communication. Collectives express the intent of a communication better (e.g., a Scatter operation is clearly distributing data from one process to all of the other processes), and there are often far fewer collective operations needed to accomplish a task than point-to-point messages (e.g., a single all-to-all operation instead of N2 point-to-point operations), making it easier to debug programs using collectives.
Performance
MPI implementations typically contain optimized algorithms for collective operations that take advantage of knowledge of the network topology and hardware, even taking advantage of hardware-based implementations of some collective operations. These optimizations are hard to implement directly over point-to-point, without the knowledge already available in the MPI implementation itself. Therefore, using collective operations can help improve the performance of parallel programs and make that performance more portable to other clusters with different configurations.

Barrier: Marching Computations

In an MPI program, the various processes perform their local computations without regard to the behavior of the other processes in the program, except when the processes are waiting for some inter-process communication to complete. In many parallel programs, all of the processes work more-or-less independently, but we want to make sure that all of the processes are on the same step at the same time. The Barrier collective operation is used for precisely this operation. When processes enter the barrier, they do not exit the barrier until all processes have entered the barrier. Place barriers before or after a step of the computation that all processes need to perform at the same time.

In the example program below, each of the iterations of the loop is completely synchronized, so that every process is on the same iteration at the same time.

using System;
using MPI;

class Barrier
{
    static void Main(string[] args)
    {
        using (new MPI.Environment(ref args))
        {
            Intracommunicator comm = Communicator.world;
            for (int i = 1; i <= 5; ++i)
            {
                comm.Barrier();
                if (comm.Rank == 0)
                    Console.WriteLine("Everyone is on step " + i + ".");
            }
        }
    }
}

Executing this program with any number of processes will produce the following output (here, we use 8 processes).

C:\Barrier\bin\Debug>mpiexec -n 8 Barrier.exe
Everyone is on step 1.
Everyone is on step 2.
Everyone is on step 3.
Everyone is on step 4.
Everyone is on step 5.

All-to-one: Gathering Data

The MPI Gather operation collects data provided by all of the processes in a communicator on a single process, called the root process. Gather is typically used to bring summary data from a computation back to the process responsible for communicating that information to the user. For example, in the following program, we gather the names of the processors (or hosts) on which each process is executing, then sort and display that information at the root.

using System;
using MPI;

class Hostnames
{
    static void Main(string[] args)
    {
        using (new MPI.Environment(ref args))
        {
            Intracommunicator comm = Communicator.world;
            string[] hostnames = comm.Gather(MPI.Environment.ProcessorName, 0);
            if (comm.Rank == 0)
            {
                Array.Sort(hostnames);
                foreach(string host in hostnames) 
                    Console.WriteLine(host);
            }
        }
    }
}

In the call to Gather, each process provides a value (in this case, the string produced by reading the ProcessorName property) to the Gather operation, along with the rank of the "root" node (here, process zero). The Gather operation will return an array of values to the root node, where the ith value in the array corresponds to the value provided by the process with rank i. All other processes receive a null array.

To gather all of the data from all of the nodes, use the Allgather collective. Allgather is similar to Gather, with two differences: first, there is no parameter identifying the "root" process, and second, all processes receive the same array containing the contributions from every process. An Allgather is, therefore, the same as a Gather followed by a Broadcast, described below.

One-to-all: Spreading the Message

While the Gather and Allgather collectives bring together the data from all of the processes, the Broadcast and Scatter collectives distribute data from one process to all of the processes.

The Broadcast operation takes a value from one process and broadcasts it to every other process. For example, imagine a system that takes user input from a single process (rank 0) and distributes that command to all of the processes so that they all execute the command concurrently (and coordinate to complete the command). Such a system could be implemented with MPI as follows, using Broadcast to distribute the command:

using System;
using MPI;

class CommandServer
{
    static void Main(string[] args)
    {
        using (new MPI.Environment(ref args))
        {
            Intracommunicator comm = Communicator.world;
            string command = null;
            do 
            {
                if (comm.Rank == 0)
                    command = GetInputFromUser();

                // distribute the command
                comm.Broadcast(ref command, 0);

                // each process handles the command
            } while (command != "quit");
        }
    }
}

The Broadcast operation requires only two arguments; the second, familiar argument is the rank of the root process, which will supply the value. The first argument contains the value to send (at the root) or the place in which the received value will be stored (for every process). The pattern used in this example is quite common for Broadcast: all processes define the same variable, but only the root process gives it a meaningful value. Then the processes coordinate to broadcast the root's value to every process, and all processes follow the same code path to handle the data.

The Scatter collective, like Broadcast, broadcasts values from a root process to every other process. Scatter, however, will broadcast different values to each of the processes, allowing the root to hand out different tasks to each of the other processes. The root process provides an array of values, in which the ith value will be sent to the process with rank i. All of the processes then return their value from the Scatter operation.

All-to-all: Something for Everyone

The Alltoall collective transmits data from every process to every other process. Each process will provide an array whose ith value will be sent to the process with rank i. Each process will then receive in return a different array, whose jth value will be the value received from the process with rank j. In the example below, we generate unique strings to be sent from each process to every other process.

string[] data = new string[comm.Size];
for (int dest = 0; dest < comm.Size; ++dest)
    data[dest] = "From " + comm.Rank + " to " + dest;
 
string[] results = comm.Alltoall(data[]);

When executed with 8 processes, rank 1 will receive an array containing the following strings (in order):

From 0 to 1.
From 1 to 1.
From 2 to 1.
From 3 to 1.
From 4 to 1.
From 5 to 1.
From 6 to 1.
From 7 to 1.

Combining Results with Parallel Reduction

MPI contains several parallel reduction operations that combine the values provided by each of the processes into a single value that somehow sums up the results. The way in which results are combined is determined by the user, allowing many different kinds of computations to be expressed. For example, one can compute the sum or product of values produced by the processes, find the minimum or maximum of those values, or concatenate the results computed by each process.

The most basic reduction operation is the Reduce collective, which combines the values provided by each of the processes and returns the result at the designated root process. If the process with rank i contributes the value vi, the result of the reduction for n processes is v1 + v2 + ... + vn, where + can be any associative operation.

To illustrate the use of Reduce, we're going to use MPI to compute an approximation of pi. The algorithm is relatively simple: inscribe a unit circle within a unit square, and then randomly throw darts within the unit square. The ratio of the number of darts that land within the circle to the number of darts that land within the square is the same as the ration of the area of the circle to the area of the square, and therefore can be used to compute pi. Using this principle, the following sequential program computes an approximation of pi:

using System;

class SequentialPi
{
    static void Main(string[] args)
    {
        int dartsPerProcessor = 10000;
        Random random = new Random();
        int dartsInCircle = 0;
        for (int i = 0; i < dartsPerProcessor; ++i)
        {
            double x = (random.NextDouble() - 0.5) * 2;
            double y = (random.NextDouble() - 0.5) * 2;
            if (x * x + y * y <= 1.0)
                ++dartsInCircle;
        }

        Console.WriteLine("Pi is approximately {0:F15}.", 
                          4*(double)totalDartsInCircle/(double)dartsPerProcessor);
    }
}

When running this program, the more darts you throw, the better the approximation to pi. To parallelize this program, we'll use MPI to run several processes, each of which will throw darts independently. Once all of the processes have finished, we'll sum up the results (the total number of darts that landed inside the circle on all processes) to compute pi. The complete code for the parallel calculation of pi follows, but the most important line uses Reduce to sum the total number of darts that landed in the circle across all of the processes:

int totalDartsInCircle = comm.Reduce(dartsInCircle, Operation<int>.Add, 0);

The three arguments to Reduce are the number of darts that landed in the circle locally, a delegate Operation<int>.Add that sums integers, and the rank of the root process (here, 0). Any other .NET delegate would also work, e.g.,

public static int AddInts(int x, int y) { return x + y; }
// ...
int totalDartsInCircle = comm.Reduce(dartsInCircle, AddInts, 0);

However, using the MPI.NET Operation class permits better optimizations within MPI.NET. Without further delay, here is the complete MPI program for computing an approximation to pi in parallel:

using System;
using MPI;

class Pi
{
    static void Main(string[] args)
    {
        using (new MPI.Environment(ref args))
        {
            Intracommunicator comm = Communicator.world;
            int dartsPerProcessor = 10000;
            Random random = new Random(5 * comm.Rank);
            int dartsInCircle = 0;
            for (int i = 0; i < dartsPerProcessor; ++i)
            {
                double x = (random.NextDouble() - 0.5) * 2;
                double y = (random.NextDouble() - 0.5) * 2;
                if (x * x + y * y <= 1.0)
                    ++dartsInCircle;
            }

            int totalDartsInCircle = comm.Reduce(dartsInCircle, Operation<int>.Add, 0);
            if (comm.Rank == 0) 
                Console.WriteLine("Pi is approximately {0:F15}.", 
                                  4*(double)totalDartsInCircle/(comm.Size*(double)dartsPerProcessor));
        }
    }
}
Previous: Point-to-Point Communication