MapReduce¶
This is a multiprocessing-based map reduce computing model.
It’s different from normal MapReduce model:
- Manager fires up mapper and reducer processes simultaneously: Output of mapper is identical to reducer, so reducers don’t need to wait until all mappers finish.
- Data can be passed to mapper gradually: Mappers are waiting to consume data until user tells them no more new data will be added.
- Reducing is not between two mapper’s output (though the api to user is as this) but output and context: Data pickling (serialization) and unpickling (unserialization) for IPC are time consuming. As an alternation, each reducer process holds a context which aggregates output in reducing step. Once all output is reduced, reducing will be among contexts.
- It doesn’t support shuffling and reduce-by-key.
Example:
def mapper(x):
time.sleep(0.0001)
return x
def reducer(r1, r2):
return r1 + r2
mr = MapReduce(8, mapper, reducer)
mr.start()
for i in range(10000):
mr.add_task(i)
mr.task_done()
result = mr.join()
print(result)
-
class
pyrallel.map_reduce.
MapReduce
(num_of_process: int, mapper: Callable, reducer: Callable, mapper_queue_size: int = 0, reducer_queue_size: int = 0)¶ Parameters: - num_of_process (int) – Number of process for both mappers and reducers.
- mapper (Callable) – Mapper function. The signature is mapper(*args, **kwargs) -> object.
- reducer (Callable) – Reducer function. The signature is reduce(object, object) -> object. object arguments are the returns from mapper s.
- mapper_queue_size (int, optional) – Maximum size of mapper queue, 0 by default means unlimited.
- reducer_queue_size (int, optional) – Maximum size of reduce queue, 0 by default means unlimited.
-
add_task
(*args, **kwargs)¶ Add data.
Parameters: - args – Same to args in mapper function.
- kwargs – Same to kwargs in mapper function.
-
join
()¶ This method blocks until all mappers and reducers finish.
Returns: The final reduced object. Return type: object
-
start
()¶ Start all child processes.
-
task_done
()¶ No more new task.