# Copyright 2006 The Trustees of Indiana University. # Use, modification and distribution is subject to the Boost Software # License, Version 1.0. (See copy at http://www.boost.org/LICENSE_1_0.txt) # Authors: Chris Mueller # Andrew Lumsdaine import string from types import * try: import mpi _mpi_available = True except: _mpi_available = False # Setup some bogus values for testing class _mpi: pass mpi = _mpi() mpi.rank = 0 mpi.size = 1 __doc__ = """ The psweep library provides a lightweight framework for managing computational experiments based on parameter sweeps. It supplies a component for enumerating all combinations of values in a parameter set and performing operations as the values are incremented in both sequential and parallel/distributed environments. To use it, provide a list of parameters (anything that iter(obj) is valid for) and optionally any functions to call when the parameters change. Functions can return iterators that will be called before the next update. For instance: def Brackets(state, index, value): print '[', str(value) yield None print ']' i = range(10) j = ['house', 'car'] k = ['one', 'two'] params = [i, j, k] state = BoundState(params) state.bind(i, Brackets) state.bind(j, Brackets) state.bind(k, Brackets) e = enumerator(params, state) for s in e: pass This examples prints brackets around each value and is equivalent to the loop: for i in range(10): print '[', i for j in ['house', 'car']: print '[', j for k in ['one', 'two']: print '[', k print ']' print ']' print ']' The library is designed to make it easy to vary the order parameters are enumerated in a parameter sweep and have modular control over the operations that occur as each value is incremented. The goal is to declaratively define the parameters and the operations based on the current task and make it easy to change the application without having to modify multiple nested loops spread across different functions and objects. To run this example in a distributed environment with pyMPI, simply replace the enumerator with one of the supplied DistributedEnumerators (or write your own!): # Distribute round-robin style to all available processors e = round_robin(params, state) # Distribute an individual parameter round-robin style to all # available processors e = domain_distributor(params, domain=j, state=state) # Distribute to the next available resource e = master_worker(params, state) The command line to run the examples using pyMPI is: % mpirun -np 4 pyMPI psweep.py """ # ------------------------------------------------------------ # Enumerator # ------------------------------------------------------------ def enum_head(seq, state): """ Special link for the first link in the chain. """ for s in seq: state[0] = s yield s return def enum_link(outer, inner, state, index): """ A generator that forms a 'link' in the chain of generators that enumerate values. Each position in the enumeration result is filled by one link. State is an externally allocated list that has enough elements for all the values to be enumerated. index is the index of the current value being enumerated. """ for o in outer: for i in inner: state[index] = i yield state return def enumerator(params, state = None): """ A generator that enumerates all the combinations of params. If state is set, it must be a list of len(params). It is used to store the current value. If state is not set, a list is created. The parameters are enumerated in the order they appear in the list, so state contains the current values in the same order: state = [params[0][i], params[1][j], ..., params[n][k]] Each iteration returns the current state. """ # Make the state if necessary if state is None: state = [None for p in params] # Build the chain of generators gen = enum_head(params[0], state) for index, p in enumerate(params[1:]): gen = enum_link(gen, p, state, index + 1) # Iterate! for value in gen: yield state return class enum_factory: """ Return a new enumerator each time an iter is requested. This is useful for nesting enumerators. """ def __init__(self, params, state = None): self._params = params self._state = state return def __iter__(self): return enumerator(self._params, self._state) # ------------------------------------------------------------ # Bound State # ------------------------------------------------------------ class BoundState(list): """ Same as above, but works on parameters/variables instead of size/position. """ def __init__(self, params): size = len(params) self.extend([0 for i in range(size)]) self._effects = [None for i in range(size)] self._conts = [None for i in range(size)] self._indices = {} # param: index for i, p in enumerate(params): self._indices[id(p)] = i return def bind(self, param, effect): self._effects[self._indices[id(param)]] = effect return def _callCont(self, i): try: self._conts[i].next() except StopIteration: pass self._conts[i] = None return # def _effect_setitem(self, i, value): def __setitem__(self, i, value): # Call any continuations on the previous item -- this is the case # when iteration finished and there's one more step left if i < (len(self) - 1) and self._conts[i + 1] is not None: self._callCont(i + 1) # Call any continuations on this item -- this is the normal case if self._conts[i] is not None: self._callCont(i) # Set the value so the state is correct when the side effects are called list.__setitem__(self, i, value) # If there's a side effect for this item, call it. If it returns # a continuation (generator), save it to call later. if self._effects[i] is not None: cont = self._effects[i](self, i, value) # if type(cont) is GeneratorType: if hasattr(cont, 'next') and callable(cont.next): # nominal conformance is more flexible here cont.next() self._conts[i] = cont return def finish(self): """ Call any remaining side effects """ effects = [i for i, e in enumerate(self._conts) if e is not None] effects.reverse() for e in effects: self._callCont(e) return # ------------------------------------------------------------ # DistributedEnumerator # ------------------------------------------------------------ # Utilities class _UpdateState(list): """ A simple state tracker that records the last state position that was updated along with the current values in state (via list). """ def __init__(self, state): self.extend(state) self.reset() def reset(self): self.last = len(self) def __setitem__(self, i, value): self.last = min(self.last, i) list.__setitem__(self, i, value) return class _UpdateStates(list): """ A collection of _UpdateState instances for tracking multiple worker processes. """ def __init__(self, state, size): self.extend(state) self._states = [] for i in range(size): self._states.append(_UpdateState([0 for p in range(len(state))])) return def last(self, i): return self._states[i].last def reset(self, i): self._states[i].reset() def __setitem__(self, i, value): for state in self._states: state[i] = value list.__setitem__(self, i, value) return # ------------------------------ # Round robin distribution # ------------------------------ def round_robin(params, state = None, rank = None, size = None): """ Round robin distribution in which every (size+rank)th state is returned to each worker. There is no master process. The local instance of this on each processor enumerates the parameters and only updates and returns the state for the states assigned to it. """ if rank is None: rank = mpi.rank if size is None: size = mpi.size ustate = _UpdateState([0 for p in params]) e = enumerator(params, ustate) for i, estate in enumerate(e): if i % size == rank: for iup in range(ustate.last, len(params)): state[iup] = estate[iup] ustate.reset() yield state return # ------------------------------ # Domain based distribution # ------------------------------ class _domain_distributor: """ A simple round robin enumerator for distributing based on a single domain (parameter). Does not have the state management overhead of the full round_robin and can be restarted by calling iter. This is equivalent in spirit to enum_factory(round_robin). """ def __init__(self, domain, rank, size): self._domain = domain self._rank = rank self._size = size return def __iter__(self): for i, state in enumerate(self._domain): if i % self._size == self._rank: yield state return def domain_distributor(params, domain, state = None, rank = None, size = None): """ Automated domain decomposition. Distribute the work round-robin style based on the enumeration of the values in domain. Note that is size < len(domain), only size processors will be used. """ if rank is None: rank = mpi.rank if size is None: size = mpi.size # Split up the params and i = params.index(domain) # Inject a distributed parameter factory into the stream params = params[:i] + \ [_domain_distributor(params[i], rank, size)] + \ params[(i+1):] e = enumerator(params, state) for state in e: yield state return def distributor(params, state = None, domain = None, rank = None, size = None): """ A simple dispatcher for round-robin style distribution. If domain is set, use domain_distributor, otheriwise use round_robin. """ if domain is not None: return domain_distributor(params, domain, state, rank, size) else: return round_robin(params, state, rank, size) return # ------------------------------ # Master/Worker Distribution # ------------------------------ # MPI communication tags STATE = 0 # Master sending state to worker WORK = 1 # Worker requesting next state DONE = 2 # Master shutdown signal def distribute_master(params, state, rank, size): """ Iterate through the parameters and assign states workers and they request them. Yield the each state the to caller immediately before it is sent. """ ustates = _UpdateStates([0 for p in params], size) e = enumerator(params, ustates) try: # Give everyone something to do for i in range(1, mpi.size): state = e.next() mpi.send([state, 0], i, STATE) # Loop until StopIteration while True: msg, status = mpi.recv() state = e.next() # print 'assigning:', state, status.source yield state mpi.send([state, ustates.last(status.source)], status.source, STATE) ustates.reset(status.source) except StopIteration: # Send everyone the DONE messgae for i in range(1, mpi.size): mpi.send(0, i, DONE) return def distribute_worker(params, state, rank, size): """ Request work from the master, updating the local state when it arrives and yielding the result. """ if state is None: state = [0 for p in params] while True: data, status = mpi.recv() # print 'getting:', data, status if status.tag == STATE: # Unpack the new state and the index to begin updating from estate, last = data for i in range(last, len(params)): state[i] = estate[i] yield state # Ask for more work mpi.send(0, 0, WORK) elif status.tag == DONE: break return def master_worker(params, state = None, rank = None, size = None): """ Master/worker distributed enumerator. The master process distributes state to the workers and manages update to maintain Bound Task semantics. Based on examples from Gropp, et. al., pp 29-36. """ if rank is None: rank = mpi.rank if size is None: size = mpi.size if rank == 0: return distribute_master(params, state, rank, size) else: return distribute_worker(params, state, rank, size) return # ------------------------------ # Sample Bound functions # ------------------------------ def IndentPrint(state, index, value): """ Print each value indented to its index level. """ print '%s%s' % (' ' * index, str(value)) return def ParenPrint(state, index, value): """ Print each value indented to its index level with parentheses. """ indent = ' ' * index print '%s(' % (indent,) print '%s %s' % (indent, str(value)) yield None print '%s)' % (indent,) return class DebugPrint: def __init__(self, state, index, value): self._state = state self._index = index self._value = value self.setup() return def __iter__(self): self.next = self.setup return self def setup(self): print 'setup', self._state, self._index, self._value self.next = self.cleanup def cleanup(self): print 'cleaup', self._state, self._index, self._value # ------------------------------------------------------------ # Tests # ------------------------------------------------------------ def base_ten(state, base): """ Convert a state in one base to base 10. Useful for testing results. """ p = 0 exp = 1 for value in state: p += exp * (value - 1) exp *= base return p def Test(): i = [1,2,3,4] j = [1,2,3,4] k = [1,2,3,4] params = [i,j,k] if not _mpi_available: print '------------------------------------------------------------' print 'Sequential Example' print '------------------------------------------------------------' state = BoundState(params) state.bind(i, DebugPrint) state.bind(j, IndentPrint) state.bind(k, ParenPrint) e = enumerator(params, state) for s in e: pass state.finish() else: # Distributed Examples def base_ten_print(state, index, value): """ Print the state in base 10 """ print base_ten(state, 4) if mpi.rank == 0: print print '------------------------------------------------------------' print 'Round Robin Example' print '------------------------------------------------------------' state = BoundState(params) state.bind(k, base_ten_print) e = distributor(params, state) for s in e: pass state.finish() mpi.barrier() if mpi.rank == 0: print print '------------------------------------------------------------' print 'Domain Example' print '------------------------------------------------------------' e = distributor(params, state, domain = j) state = BoundState(params) state.bind(k, base_ten_print) for s in e: pass state.finish() mpi.barrier() if mpi.rank == 0: print print '------------------------------------------------------------' print 'Master/Worker Example' print '------------------------------------------------------------' def base_ten_print(state, index, value): if mpi.rank != 0: print mpi.rank, base_ten(state, 4) params = [i, j, k] state = BoundState(params) state.bind(k, base_ten_print) e = master_worker(params, state) for s in e: pass state.finish() return if __name__=='__main__': Test()