MapReduce

This is a multiprocessing-based map reduce computing model.

It’s different from normal MapReduce model:

  • Manager fires up mapper and reducer processes simultaneously: Output of mapper is identical to reducer, so reducers don’t need to wait until all mappers finish.
  • Data can be passed to mapper gradually: Mappers are waiting to consume data until user tells them no more new data will be added.
  • Reducing is not between two mapper’s output (though the api to user is as this) but output and context: Data pickling (serialization) and unpickling (unserialization) for IPC are time consuming. As an alternation, each reducer process holds a context which aggregates output in reducing step. Once all output is reduced, reducing will be among contexts.
  • It doesn’t support shuffling and reduce-by-key.

Example:

def mapper(x):
    time.sleep(0.0001)
    return x

def reducer(r1, r2):
    return r1 + r2

mr = MapReduce(8, mapper, reducer)
mr.start()

for i in range(10000):
    mr.add_task(i)

mr.task_done()
result = mr.join()

print(result)
class pyrallel.map_reduce.MapReduce(num_of_process: int, mapper: Callable, reducer: Callable, mapper_queue_size: int = 0, reducer_queue_size: int = 0)
Parameters:
  • num_of_process (int) – Number of process for both mappers and reducers.
  • mapper (Callable) – Mapper function. The signature is mapper(*args, **kwargs) -> object.
  • reducer (Callable) – Reducer function. The signature is reduce(object, object) -> object. object arguments are the returns from mapper s.
  • mapper_queue_size (int, optional) – Maximum size of mapper queue, 0 by default means unlimited.
  • reducer_queue_size (int, optional) – Maximum size of reduce queue, 0 by default means unlimited.
add_task(*args, **kwargs)

Add data.

Parameters:
  • args – Same to args in mapper function.
  • kwargs – Same to kwargs in mapper function.
join()

This method blocks until all mappers and reducers finish.

Returns:The final reduced object.
Return type:object
start()

Start all child processes.

task_done()

No more new task.