ParallelProcessor

ParallelProcessor utilizes multiple CPU cores to process compute-intensive tasks.

If you have a some time-consuming statements in a for-loop and no state is shared among loops, you can map these statements to different processes. Assume you need to process a couple of files, you can do this in parallel:

def mapper(filename):
    with open(filename) as f_in, open(filename + '.out') as f_out:
        f_out.write(process_a_file(f_in.read()))

pp = ParallelProcessor(2, mapper)
pp.start()

for fname in ['file1', 'file2', 'file3', 'file4']:
    pp.add_task(fname)

pp.task_done()
pp.join()

It’s not required to write a cumbersome loop statement if you have iterable object or type (list, generator, etc). Instead, you could use map:

pp = ParallelProcessor(2, mapper)
pp.start()

pp.map(['file1', 'file2', 'file3', 'file4'])

pp.task_done()
pp.join()

Usually, some files are small and some are big, it would be better if it can keep all cores busy. One way is to send content line by line to each process (assume content is line-separated):

def mapper(line, _idx):
    with open('processed_{}.out'.format(_idx), 'a') as f_out:
        f_out.write(process_a_line(line))

pp = ParallelProcessor(2, mapper, enable_process_id=True)
pp.start()

for fname in ['file1', 'file2', 'file3', 'file4']:
    with open(fname) as f_in:
        for line in f_in:
            pp.add_task(line)

pp.task_done()
pp.join()

One problem here is you need to acquire file descriptor every time the mapper is called. To avoid this, use Mapper class to replace mapper function. It allows user to define how the process is constructed and deconstructed:

class MyMapper(Mapper):
    def enter(self):
        self.f = open('processed_{}.out'.format(self._idx), 'w')

    def exit(self, *args, **kwargs):
        self.f.close()

    def process(self, line):
        self.f.write(process_a_line(line))

pp = ParallelProcessor(..., mapper=MyMapper, ...)

In some situations, you may need to use collector to collect data back from child processes to main process:

processed = []

def mapper(line):
    return process_a_line(line)

def collector(data):
    processed.append(data)

pp = ParallelProcessor(2, mapper, collector=collector)
pp.start()

for fname in ['file1', 'file2', 'file3', 'file4']:
    with open(fname) as f_in:
        for line in f_in:
            pp.add_task(line)

pp.task_done()
pp.join()

print(processed)

You can count the executions in collector to estimate the progress. To get the progress of mapper, create a progress function and set it in ParallelProcessor:

def progress(p):

    # print('Total task: {}, Added to queue: {}, Mapper Loaded: {}, Mapper Processed {}'.format(
    #    p['total'], p['added'], p['loaded'], p['processed']))
    if p['processed'] % 10 == 0:
        print('Progress: {}%'.format(100.0 * p['processed'] / p['total']))

pp = ParallelProcessor(8, mapper=mapper, progress=progress, progress_total=len(tasks))
pp.start()

for t in tasks:
    pp.add_task(t)
class pyrallel.parallel_processor.CollectorThread(instance, collector)

Handle collector in main process. Create a thread and call ParallelProcessor.collect().

run()

Method representing the thread’s activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

class pyrallel.parallel_processor.Mapper(idx)

Mapper class.

This defines how mapper works.

The methods will be called in following order:

enter (one time) -> process (many times) -> exit (one time)
enter()

Invoked when subprocess is created and listening the queue.

exit(*args, **kwargs)

Invoked when subprocess is going to exit. Arguments will be set if exception occurred.

process(*args, **kwargs)

Same as mapper function, but self argument can provide additional context (e.g., self._idx).

class pyrallel.parallel_processor.ParallelProcessor(num_of_processor: int, mapper: Callable, max_size_per_mapper_queue: int = 0, collector: Callable = None, max_size_per_collector_queue: int = 0, enable_process_id: bool = False, batch_size: int = 1, progress=None, progress_total=None, use_shm=False, enable_collector_queues=True, single_mapper_queue: bool = False)
Parameters:
  • num_of_processor (int) – Number of processes to use.
  • mapper (Callable / Mapper) – Function or subclass of Mapper class.
  • max_size_per_mapper_queue (int, optional) – Maximum size of mapper queue for one process. If it’s full, the corresponding process will be blocked. 0 by default means unlimited.
  • collector (Callable, optional) – If the collector data needs to be get in main process (another thread), set this handler, the arguments are same to the return from mapper. The return result is one by one, order is arbitrary.
  • max_size_per_collector_queue (int, optional) – Maximum size of collector queue for one process. If it’s full, the corresponding process will be blocked. 0 by default means unlimited.
  • enable_process_id (bool, optional) – If it’s true, an additional argument _idx (process id) will be passed to mapper function. This has no effect for Mapper class. It defaults to False.
  • batch_size (int, optional) – Batch size, defaults to 1.
  • progress (Callable, optional) – Progress function, which takes a dictionary as input. The dictionary contains following keys: total can be set by progress_total, added indicates the number of tasks has been added to the queue, loaded indicates the number of tasks has been loaded to worker processes, processed indicates the number of tasks has been processed by worker processes. Defaults to None.
  • progress_total (int, optional) – Total number of tasks. Defaults to None.
  • use_shm (bool, optional) – When True, and when running on Python version 3.8 or later, use ShmQueue for higher performance. Defaults to False.
  • enable_collector_queues (bool, optional) – When True, create a collector queue for each processor. When False, do not allocate collector queues, saving resources. Defaults to True.
  • single_mapper_queue (bool, optional) – When True, allocate a single mapper queue that will be shared between the worker processes. Sending processes can go to sleep when the mapper queue is full. When False, each process gets its own mapper queue, and CPU-intensive polling may be needed to find a mapper queue which can accept a new request.

Note

  • Do NOT implement heavy compute-intensive operations in collector, they should be in mapper.
  • Tune the value for queue size and batch size will optimize performance a lot.
  • collector only collects returns from mapper or Mapper.process.
  • The frequency of executing progress function depends on CPU.
add_task(*args, **kwargs)

Add data to one a mapper queue.

When a single mapper queue is in use, put the process to sleep if the queue is full. When multiple mapper queues are in use (one per process), use CPU-intensive polling (round-robin processing) to find the next available queue. (main process, blocked or unblocked depending upon single_mapper_queue)

collect()

Get data from collector queue sequentially. (main process, unblocked, using round robin to find next available queue)

get_progress()

Get progress information from each mapper. (main process)

join()

Block until processes and threads return.

start()

Start processes and threads.

task_done()

Indicate that all resources which need to add_task are added to processes. (main process, blocked)

class pyrallel.parallel_processor.ProgressThread(instance, progress, progress_total, num_of_processor)

Progress information in main process.

run()

Method representing the thread’s activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.