All of this info is here on one page and not broken up into
cross-linked pages to be easier to understand and potentially more
eye-pleasing. Deal. I'm a programmer, not a web page designer.
Overview
So how does this work? Or, what actually happens when you run
this parallel BladeEnc on (for example) 4 machines?
Essentially, the parallelization was not too difficult. The MP3
encoding was already broken down into performing the mathematical
operations over small portions of the file. It was essentially
something like:
while (!eof(input_file)) {
read(next_portion_of_input_file);
encode(that_portion);
write(results_to_output_file);
}
The difficult part about parallelization is typically about how to
split the problem up into independent (or nearly independent) parts.
The structure shown above - assuming that the encode()
function was independent of previous calls to encode() -
is trivial to parallelize.
For example, if we want to run on four machines, we can split the
input file into four parts, give 1/4 of the file to each machine, and
let each machine loop over encode() for their portion of
the file. Then take the output from each machine, put it in the right
order, and write it out to a single output file. Done.
With such a scheme, the more work that you throw at it, the more
efficient it becomes. Hence, trying this scheme with small MP3 files
will probably not result in any noticeable speedup (in fact, it may be
slower than running in serial, because of the added
overhead for working in parallel). It is necessary to give the
parallel engine enough work to offset the overhead added by the
parallel framework. Generally, this is not very much overhead (read
on to find out why), but parallel is not free.
Of course, it's not quite that simple, but that's the general idea
that I used.
Overlap
One of the problems that occured is that MP3 is a differential
encoding algorithm. For all of you non-math types out there, this
means that calls to encode() do depend
on the results of previous calls to encode(). Hence,
splitting the file into four parts and giving one part to each rank is
not quite sufficient. Grossly speaking, if nothing else, the first
call to encode() on each rank (other than the rank that
got the first 1/4 of the file) will generate incorrect results, since
they don't have the results from the last section of the previous
quarter of the file. That is, visualize the splitting of the file as
such:
SERIAL PROCESSING OF THE INPUT:
|--------- --------- --------- ---------|
rank 0
PARALLEL PROCESSING OF THE INPUT:
|---------||---------||---------||---------|
rank 0 rank 1 rank 2 rank 3
(spaces were added in the serial diagram just to make it line up
nicely with the parallel diagram; the only thing that matters is the
number of dashes)
Imagine that each "-" in the picture represents a
call to encode(). If the file were processed in serial
(i.e., on one rank), the first call to
encode() in the section that is marked for rank 1 would
come after the last call to encode() in
the section that is marked for rank 0.
However, when the different sections of the file are processed on
different ranks, the first call to encode() in the
section that is marked for rank 1 will not have any basis - there will
be no differential (or, more specifically, the differential will be
from zero) from the previous sample because from rank 1's point of
view, there was no previous sample.
Conclusion: simply splitting the file into multiple chunks and
encoding them separately doesn't work. Indeed, doing that produces
very obvious "clicks" in the output audio, since you're effectively
resetting the differential at the boundary point for each rank.
Solution: send some "overlap" to each rank (except to rank 0).
That is, use a distribution something like:
rank 1 rank 3
|- ---------| |- --------|
|---------| |- --------|
rank 0 rank 2
So each rank gets the last chunk full of samples from the rank
before it. Hence, it can build up a differential so that the first
"real" frame that it does (i.e., the second frame)
will be based upon the previous call to encode(), and we
don't get an annoying "click" in the output because the differential
between the different ranks (i.e., different parts of the input file)
is actually correct.
Sending such redundant chunks and the time required to process
them does add a bit of overhead, but it is negligible when compared to
amount of real work that the rank computes.
Load balancing
The master / slave approach is by no means perfect. But it does solve
a lot of problems, and creates a convenient (and fairly simple)
programming abstraction. But how do we choose M (the number of
portions to divide the file into)? We must consider the overhead that
it introduces. Let me digress to a short discussion about networking...
Every time you send a message, there is some additional control
information sent with it (e.g., the packet header), such as who the
message is for, the length of the message, etc. Messages longer than
a certain size will get broken up into a series of smaller messages.
For example, TCP messages longer than 1,500 bytes will get broken up
into as many 1,500 byte sub-messages are necessary to send the entire
message. Each sub-message is called a packet. So when you send a
message, it is broken down into one or more packets, sent to the
destination, and then reassembled back into the original, single message.
But the overhead for each packet is a constant number of bytes.
So in the case of TCP, whether my message is one byte long or 1,500
bytes long, the overhead is the same - H bytes.
So which is more efficient, sending 1,500 one-byte messages, or
sending one 1,500 byte message? Consider: sending 1,500 one-byte
messages actually sends 1,500 * (1 + H) bytes, where sending one 1,500
byte message sends 1 * (1,500 + H) bytes.
O = overhead byte
D = data byte
1,500 one byte messages: |OOOOOOOOD|
|OOOOOOOOD|
|OOOOOOOOD|
...etc...
|OOOOOOOOD|
One 1,500 byte message: |OOOOOOOODDDDDDD...etc...DDDDDD|
Clearly, sending one big message is more efficient (and we're not
even talking about operating system delay or other sources of latency
here), because the number of D's is always the same, but
the number of O's is not. Generalize this a bit more:
sending a small number of large messages is more efficient than
sending a large number of small messages.
Note that this is essentially the same for shared memory
communication (or any message passing kinds of communication). Hence,
it doesn't just apply to TCP networks.
With that in mind, back to the master / slave discussion.
So we need to choose an M that nicely divides the file so that we
can get good performance out of both fast and slow slaves, but doesn't
introduce too much network overhead. How the heck do we do that?
I chose to do an end-run around this question - don't try to
calculate M. Instead, we initially choose the size of the portion
that we send to each rank. This number should be fairly low, and,
unless you conduct some benchmarks, is somewhat arbitrary because you
don't know the speed of the slaves beforehand (currently, I send 32
chunks to each rank in the master's first iteration).
As each slave reports that it is done, the master doubles the size
of the portion that it gave to that slave last time, reads in that
amount from the input file, and sends it to the slave. In this way,
we keep sending larger portions of data to the slaves as they request
more. The faster slaves who keep asking for more portions will be
given larger and larger portions of work. That is, the size of our
messages increases so that our networking efficiency increases (there's
a cutoff point, of course, but you get the idea).
Note that there is a max size that the portions will grow to - the
current implementation grows to a max of 256 chunks. This is a
compromise between a not-too-short amount of time on a fast slave and
a not-too-long amount of time on a slow slave.
This approach also solves the "too many ranks" problem. Consider
what was discussed above - if you try to parallelize a problem that is
too small, your run time will likely be longer than
if you had run in serial because of the overhead that parallelism
introduces. The same problem occurs even if you have a fairly large
piece of work to perform, but try to use too many ranks. That is,
each rank's piece of work is fairly small, thereby reducing the
overall efficiency. Since each rank will perform its work quickly,
the ranks will all be in contention to return their output to the
master (which is an inherently serial process).
More to the point, the overall efficiency of the parallel program
is dependent on the efficiency of its parts. If each rank has lousy
efficiency (say because they only have a small amount of work to do),
the overall efficiency will be lousy. So instead of dividing the
total amount of work by the number of processors, we split it into
fixed sizes and dole out the work until there is no more to perform.
This guarantees that there is some minimum size of work that will be
sent to each slave, and provides a guaranteed lower bound on
efficiency. Chosen properly, this lower bound can be as close to
optimal (i.e., the upper bound) as possible.
Output file ordering
So we haven't discussed how the output MP3 data is gathered by the
master and written out to the output file yet - we just did a bit of
hand-waving in the previous sections saying "don't worry about it; the
output happens somehow." In reality, here's what happens...
As each slave finishes the work that the master gave to them, they
effectively send the resulting MP3 output data back to the master. The master
receives and stores the MP3 data in memory and, if there is any input
left in the input file, gives the slave a new chunk of work to do.
However, consider the following:
- The size of the MP3 output data may be different for each input
set.
- Slaves may not operate at the same speed.
- Even if slaves are the same speed, workstations on a LAN can only
transmit to the master one at a time (regardless of the underlying
network media). So even if multiple ranks finish their work
"simultaneously", only one of them will be able to immediately send
the results to the master - the rest will automatically be queued up
due to normal networking/message passing protocols.
- The issue is essentially the same for SMPs as well; the master
only talks to one slave at a time.
Why is this important? Consider what happens when rank 5 returns
its MP3 output data before rank 3 returns its MP3 output data. The
master cannot write rank 5's data to the output file, because it has
not written the previous ranks' output data (at the very least, it has
not written rank 3's data), and since it doesn't know the length of
rank 3's output data, it can speculatively write rank 5's data "in the
right place" ahead of time.
INPUT FILE:
|------||------||------||------||------|...
rank 1 rank 2 rank 3 rank 4 rank 5
OUTPUT FILE:
|---||-||----||--||---|...
rank 1 rank 3 rank 5
rank 2 rank 4
More to the point, the output file must be written in the same
order that the input file was read. That is, we cannot write rank 5's
MP3 output data until the MP3 output data has been written from ranks
1, 2, 3, and 4.
Specifically, the master has to keep track of which portions of
the input file it has given to which slave. As slaves return their
output data, the master has to check which portion of the input file
that slave was encoding and whether all preceding portions of the
output file have been written yet. If they have, the master can write
out the new output data. If the preceding sections have not all been
written yet, the new output data must be queued up for later writing.
Using SMPs / multithreading
So does this scheme work with SMPs?
Absolutely. One of the nice things about MPI is that it
intentionally doesn't distinguish between whether ranks are on the
same physical machine or not. When you use MPI to send a message,
you just rely on the MPI implementation to do the fastest thing to
send the message to the destination rank (regardless of whether the
source and destination ranks are on the same machine or not).
So when running BladeEnc in parallel, simply start up one per CPU
(plus 1 for the master). So if you have 4 uniprocessors, start 5
ranks. If you have 4 dual processors, start 9 ranks. If you have 2
uniprocessors and 2 dual processors, start 7 ranks, but ensure to
distribute them properly so that there is one slave on each of the
uniprocessors and two on the dual processor machines.
Tord and I have briefly discussed this scheme as well as a
different potential multi-threading scheme for BladeEnc. That is,
someone else has proposed a multi-threading scheme for BladeEnc that
is more of a pipelined model, where each thread does a different stage
of the computation in the MP3 encoding. Input and output data is
therefore simply passed between threads in the pipeline.
After giving this some thought (and again giving the disclaimer
that I don't know much about the MP3 algorithm), I'm not sure that
this model would be a performance win or not. Let me explain.
The model that I have given above works for SMPs as well. It
could certainly be improved such that one instance of BladeEnc runs
per node, not per CPU, and each rank
is responsible for having "sub-worker" threads. That is, the rank
spawns K threads, where K is the number of CPUs on the machine that
the rank physically resides on. Each of the K threads loops over
encode() on its portion of the input that that rank
received. This is just a logical extension of the previous
subsetting.
This would arguably be a better approach than having one rank per
CPU, because it could potentially avoid some network interface
congestion/contention between ranks on the same physical machine. I
did not take this approach, however, because there are some global
variables that are used in the encoding code (making it non-thread
safe), and I didn't want to muck around with that [yet].
Using the pipeline thread approach has a few drawbacks (IMHO):
- Each stage in the pipeline is likely to have different amounts of
work, unless the stages are very carefully instrumented. If each
stage has a different amount of work, it will result in what could
effectively be termed pipeline stalls - shorter stages will be forced
to wait for work from previous longer stages, and thereby use the CPU
less than all the time.
- There are (will be) a fixed number of stages in the encoding
algorithm - let's call it A. On a given machine, you have fixed
number of available CPUs - K. When (A == K), life is good. But when
(A > K), you'll have more threads than CPUs, which will cause
thrashing. If (A < K), you'll have less threads than CPUs, meaning
that you'll be wasting (K - A) CPUs since you won't be using them.
The point that I'm making is that the pipeline approach may not
scale nicely to any number of CPUs where (K != A). Yes, there
probably are solutions (particularly for the [A < K] case) to make it
scale nicely, but it could get quite complicated and ugly quite fast.
So it's not clear to me that this pipeline model is a good one.
The nice thing about the master / slave model (either as it is now, or
if we introduce slave sub-threads) is that it scales nicely with this
kind of algorithm.
However, it is quite possible that I missed something; Tord and I
really only spoke very briefly about this. This is my $0.02.
LAM/MPI shortcuts / conveniences
One of the newer features of MPI is that an MPI application can launch
other MPI applications (although not all MPI implementations have
incorporated this feature yet). This feature is quite convenient in
BladeEnc's case, because, when combined with another MPI feature that
allows the querying of how many compute ranks are available, it allows
the automatic parallelization of BladeEnc without the user knowing
that anything magic has happened.
For example, in LAM/MPI's case, once you lamboot to
fire up the MPI run-time environment on some number of nodes, you
can simply:
% bladeenc foo.wav bar.wav baz.wav
just like you have always done in the past. Additionally, your
favorite GUI front end that uses BladeEnc for its back-end MP3
encoding will also work in parallel - no special command line flags or
mpirun is necessary.
Specifically, if parallel BladeEnc detects that there is only one
rank when it starts, and BladeEnc's configure
determined that the underlying MPI supports the spawning feature,
BladeEnc will automatically start MPI_UNIVERSE_SIZE
slaves. MPI_UNIVERSE_SIZE is typically the number of
CPUs available, but it may be the number of machines available - it is
dependant upon the MPI implementation (LAM 6.3.2 uses number of nodes,
but LAM 6.3.3 will use number of CPUs).
However, if BladeEnc detects only one rank and the spawning
capability is not available in the underlying MPI,
you'll essentially be using the stock serial version of BladeEnc.
The same is true if there are only two ranks.
That is, it doesn't make much sense for there to be one master and one
slave - the master would simply send the data to the slave, the slave
would process it, and send it back. All we're doing is incurring
network overhead; the process is not sped up at all. Hence, if there
are only two ranks, the second rank will be ignored and the master
will proceed in serial mode.
The code itself
The majority of parallel code is in its own file,
parallel.c (and parallel.h). A few hooks
had to be inserted in main.c and some other files, but
nothing big. grep for "#if WANT_MPI" in the
.c files to see where parallel logic has been added.
You should also read the file bladeenc/JMS-NOTES in
the source code distribution for some notes on the parallel algorithm
and whatnot.
main.c
Since I only added a few hooks in main.c, it's
easiest to talk about them with just a list of bullets. All the added
code in main.c is surrounded by "#if
WANT_MPI", so it's easy to find.
- First off, we have to include
<mpi.h> to get
all the MPI function prototypes, types, and extern constant
declarations.
- There's a few global variables instantiated:
-
num_desired_slaves: The number of slaves that the
user set with the -slaves=N command line switch.
-
num_slaves: The number of slaves that are actually
being used.
-
slave_comm: A private MPI communicator for the
slaves to communicate in. (Sidenote: a "communicator" is essentially
a private communications space inside of MPI that contains a set of
ranks [i.e., processes] and a unique context that makes it private)
- We have to do a few things if we don't want
MPI, too. Recall that if the
configure script doesn't
find support for MPI, you'll get a plain vanilla version of MPI. So
the first "#if !WANT_MPI" section defines a few variables
that have to be used in the main loop if we're not going to go
parallel.
- Right at the top of
main(), we call
parallel_startup(). Slave ranks entering this call will
not return -- they will be dispatched to the "worker" code. The
master will return.
- Further down, after we have parsed the command line to check
for the
-slaves=N argument, we call
parallel_spawn(). This will spawn slaves if the following
conditions are met:
- The underlying MPI supports the MPI-2 function
MPI_Comm_spawn().
- Parallel BladeEnc was started with just one rank.
- The number of desired slaves is greater than or equal to 2.
- A litle further down, we print out a tagline specifying if
we're encoding in parallel or in serial.
- The main loop for encoding a single song has been moved to
parallel.c. There are two main functions:
serial_master() (for encoding with just one CPU) and
parallel_master(). Depending on how many slaves we have,
call the appropriate routine (if we were compiled without MPI support,
unconditionally call serial_master()).
- There's a litle more bookkeeping (specifically if we want
"
{" and "}" or not) if we compile with
parallel support.
- Finally, when the BladeEnc is completely finished, we tell
the slaves that we are finished (i.e., distribute a special "We're
done!" message to each of the slaves), and then shut down MPI in an
orderly fashion with calls to
parallel_master_finish()
and parallel_shutdown().
- Additionally, in the
quit() routine (which is
called for abnormal termination, IIRC), we also have to call
parallel_master_finish() and
parallel_shutdown so that all the slaves shut down when
the master shuts down.
- In the
readGlobalSwitches() routine, a little
code was added to check for the -slaves=N command line
switch, and to set num_desired_slaves as appropriate (or
print a warning if BladeEnc wasn't compiled with MPI support, or if
the MPI doesn't support spawning). Similar logic was added in the
printUage() routine.
parallel.c
This code is entirely original; it's not just a few hooks into the
original BladeEnc code (the tabbing style is obviously different, for
one thing ;-). Hence, I'm not going to go into as much details as I
did for main.c -- there's tons of coments in the file
itself for that. Instead, I'll just describe each function and the
general algorithm that it implements.
Just about the entire file is surrounded by "#if
WANT_MPI", so even though parallel.c is compiled
regardless of whether configure finds support for MPI,
this file will essentially be empty after the preprocessor chunks
through it if we are compiling a serial BladeEnc (with the exception
that the serial_master() is always included).
I do appoligize for all the debugging code in this file (all the
#if MASTER_OUT and #if SLAVE_OUT and
whatnot). Trust me in that debugging in parallel is a real pain in
the... watch your mouth! (I'm just talking about Shaft...)
-
serial_master()
This function is essentially the same as the main per-song loop
that is in the main BladeEnc distribution. I made it a separate
routine simply for ease of logic and reduction of clutter in
main.c.
-
make_mpi_datatypes()
MPI can create maps of arbitrary datatypes (i.e., C structures) --
even if they are not contiguous in memory -- and therefore
[potentially] send them with a single memory copy. This is opposed to
copying (possibly non-contiguous) memory into a single, contiguous
buffer, and then sending that second buffer (which can be quite
expensive). There's actually a lot more to the story than this, but
this is the general idea.
This routine makes 5 MPI datatype maps for use later in the code:
-
codecInitInDatatype: This datatype is used only to
build the jobDatatype datatype, below.
-
splInDatatype: This datatype is used only to build
the jobDatatype datatype, below.
-
jobDatatype: This datatype is used only to build the
slaveJobDatatype, below.
-
slaveJobDatatype: This datatype is used to send
information on a set of chunks to a slave.
-
slaveReturnDatatype: This datatype is used to send
information on MP3 output data back to the master when a slave
finishes with its set of input chunks.
-
parallel_startup()
This function is called by main(). It starts up MPI
and sees how many ranks are running. If we only have one rank
running, see how many ranks are available (i.e., check the MPI
attribute MPI_UNIVERSE_SIZE). If we're going to be
running in parallel, call make_mpi_datatypes(). Finally,
if this rank is a slave, call parallel_slave() (which
never returns). Hence, only the master returns to
main().
-
parallel_spawn()
This function is called by main() to spawn slaves if
only one rank was started. This function effects the "automagically
parallel" functionality -- if you just startup BladeEnc with
"./bladeenc foo.wav", we'll try to figure out how many CPUs MPI is
allowed to use, and then spawn across all of them.
The logic is a bit twisted (yes, Virginia, it has to be that way),
but there are plenty of comments. The end result is that the slaves
are spawned (if they don't already exist) and we get a private MPI
communicator to use with the slaves (this is not strictly necessary,
but as a parallel library writer, it's force of habit for me).
-
parallel_shutdown()
Just like the name implies, this function closes out MPI. It
frees all the MPI datatype maps (because all
righteous code cleans up after itself -- relying on
exit() is lame), and then shuts down MPI by calling
MPI_Finalize().
-
parallel_master()
This is the heart of the beast.
It is important to note that this function is currently invoked
once for each input file -- it is not invoked once for
all input files.
The master initially sends out initialSamples chunks
to each slave. All but the first slave additionally receive
redundantFrames chunks (bad choice of variable name --
since the first set of samples doesn't need any redundant chunks, see
Overlap, above, for why we need redundant
chunks). The master loops calculating a slave_job
structure data members, reading from the input file, and sending the
data to a slave. Each set of input is labeled with a sequence number.
Each portion is sent in two or three messages: the first message is
the slave_job structure. If there are any redundant
chunks, they are sent in the second message. The last message sends
the chunks that are to be processed by that slave.
Additional bookkeeping is done to store the sequence number that
was given to each slave. This bookkeeping is necessary for writing
the output MP3 data in the correct order (see Output File Ordering, above).
After the initial samples are sent, we copy the last bit into the
redundant chunks buffer, and enter the main loop. We actually loop on
the number of slaves that are performing work (as the work runs out,
this value will go down to zero). This turns out to be a convenient
measure; trust me.
The manager posts an open-ended recieve; it waits to hear back
from any slave rank. It will eventually receive a
slave_return structure from some rank (since there are
still slaves working). It then reverse indentifies who slave it was
who sent the slave_return, and allocated memory to
receive the slave's output MP3 data. The MP3 data is received into
this temporary buffer, and passed to queue_result() for
writing out to the output MP3 file.
The manager then doubles the number of input samples that was last
sent to this slave (but bounds it by a maximum value), and tries to
read in that many more samples from the input file. It sends a new
slave_job structure to the slave telling it how many
chunks to expect, and follows that with the redundant chunks and real
chunks. Bookkeeping is performed to record the serial number of this
input data. We also do some clever buffer swapping (if I do say so
myself) to keep the redunant chunks for the next slave who comes to
turn in output.
If there is no more input to be processed, the manager doesn't
send anything back to the slave (this works nicely because the slave
will block waiting for something from the master; see the description
of parallel_master_finish(), below); it just decrements
the number of working slaves. See? I told you that the looping on
the number of working slaves was a Good Thing. :-)
Finally, when the master is finished, it frees up some temporary
memory and returns. As noted in the code, the beauty of the file
output routine is that it is guaranteed to be finished by the time the
manager reaches this point -- there is no need to drain it.
-
parallel_master_finish()
This routine exists solely to tell the slaves to quit. A special
"we're done!" slave_job message is sent to all of the
slaves. This is necessary because the current implementation loops
over calling parallel_master for each song. So in
between each song, each slave will go into a waiting state, waiting
for the master to indicate whether there will be another song or
whether we are all done and need to quit.
-
parallel_slave()
Surprisingly, this function is actually fairly straightforward.
There's some MPI bookkeeping in the beginning to create the
private MPI communicator, get our slave number, etc.
The main loop is forever -- we loop on a while(1) and
only break when the master sends the special slave_job
indicating that the entire run is completed.
For each iteration of the loop, we receive a
slave_job from the master. The codec is initialized and
other setup functions are performed. The slave receives the redundant
chunks (if any), and then receives the real chunks. If there were any
redundant chunks, process them, and then flush the output MP3 data.
Essentially record where to being keeping the output MP3 data, and
then start processing the real chunks.
When the salve finishs the real chunks, it sends back a
slave_return structure to the master telling him a) to
expect some data from this slave, and b) how much data to expect. The
slave then sends the actual output data back to the master.
This completes the slave loop iteration; it loops back around to
receive another slave_job structure from the master.
Note that the slave is inherently stateless -- it loops receiving a
set of chunks (plus potential redundant chunks) from the master, and
quits when the master tells it to. The slave has no concept of songs
or files. It's much simpler this way.
When the slave receives the "we're done!" message, it breaks out
of the loop, invokes parallel_shutdown() and then
exit().
-
reset_result_queue()
This is a trivial function that is invoked from
parallel_master to reset the state of the result queue
(see queue_result(), below). It is only here to hide the
internals of the result queue from the main master code
(object-oriented habit on my part).
-
queue_result()
This is a neat little function to implement what was described in
the Overlap section, above.
This function is called by parallel_master() to write
data to the output MP3 file. It checks the serial number of the new
data against the serial number of the last chunk written. If the new
serial number is one greater than the last one written, it writes out
the new data immediately. It then tries to progress the queue by
looking for the next serial number. The queue is stored in order by
serial number, so this is quite easy (standard linked list
manipulation stuff).
If the data chunk is not the next segment, it is added to
the list (in the Right Place). More standard linked list manipulation
stuff (oh for the STL... but this is plain vanilla C, so we are denied
this pleasure here).
And that's it. It's that simple.
Ok, it's not quite simple. But it's cool. :-)