Python Concurrency and Parallelism: building a custom ProcessPoolExecutor


At Ostorlab we scan hundreds of Mobile Applications each day, each scan is very resource intensive but at the same time, since the beginning, we had to optimize the code for speed and maximize use of cloud resources.

Without re-discussing concurrency and parallelism on Python, there are plenty of excellent ressources on the subject, my favorite is "Fluent Python" by Luciano Ramalho.

But here are some quick notes:

In CPython, the GIL prevents multiple native threads from executing Python bytecodes at once. This means that Threading is inappropriate for CPU-bound parallelism. I recommend these talk for more information on the subject "Understanding the Python GIL" and "Removing Python's GIL: The Gilectomy" Processing is more adapted to CPU-bound parallelism, however spawning a process results in a significant overhead, if the goal is to run multiple tasks both concurrently and in parallel, running each task in a separate process is highly inefficient To solve that issue, concurrent.futures library was introduced in Python 3.2 and back-ported to Python 2.5. The library allows spawning a number of workers, default is the number of cores on your computer, to which you can pass tasks for processing. Tasks are sent through queues after being serialized using Pickle.

This is an example from Fluent Python public Github:

"""Download flags of top 20 countries by population
ProcessPoolExecutor version
Sample run::
    $ python3 flags_threadpool.py    BD retrieved.    EG retrieved.    CN retrieved.    ...    PH retrieved.    
    US retrieved.    IR retrieved.    20 flags downloaded in 0.93s
"""
# BEGIN FLAGS_PROCESSPOOL

from concurrent import futures
from flags import save_flag, get_flag, show, main

MAX_WORKERS = 20

def download_one(cc):
    image = get_flag(cc)
    show(cc)
    save_flag(image, cc.lower() + '.gif')
    return cc


def download_many(cc_list):
    with futures.ProcessPoolExecutor() as executor:  # <1>        
        res = executor.map(download_one, sorted(cc_list))

    return len(list(res))


if __name__ == '__main__':
    main(download_many)
# END FLAGS_PROCESSPOOL

Playing with this, we rapidly faced a lot of limitations because of Pickle serialization limitations, the two main issues were:

Some tasks were decorated with parameterized decorators which caused a similar error to this one Error pickling <function>. The arguments passed were sometimes quite big and resulted in a recursion limit error. For the second issue, we had to think of a way to pass arguments to tasks without going through the queue and the serialization process.

Before using the ProcessPoolExecutor, we used simple Processes which received the arguments in args parameter:

p = multiprocessing.Process(
        target=_process_worker,
        args=(self._call_queue,
              self._result_queue,
              self._initial_args,
              self._initial_kwargs))

Passing arguments this way posed no issue on the size and requires of course no serialization. Python allows to pass queues, pipes and even shared memory using the types multiprocessing.Value and multiprocessing.Array (see multiprocessing)

The particularity of our code was that the passed argument were always the same and significant in size, so we decided to modify ProcessPoolExecutor to pass these arguments at the initialization which he then passes to the spawned working processes.

Tasks will be executed with these argument as initial parameters, here are the main changes, add initial_args and initial_kwargs attributes:

class ProcessPoolExecutor(_base.Executor):
    def __init__(self, max_workers=None, *args, **kwargs):
        """Initializes a new ProcessPoolExecutor instance.
        Args:            max_workers: The maximum number of processes that can be used to               
         execute the given calls. If None or not given then as many                
         worker processes will be created as the machine has processors.        
        """        
        _check_system_limits()

        if max_workers is None:
            self._max_workers = multiprocessing.cpu_count()
        else:
            self._max_workers = max_workers

        # Make the call queue slightly larger than the number of processes to        
        # prevent the worker processes from idling. But don't make it too big        
        # because futures in the call queue cannot be cancelled.        
        self._call_queue = multiprocessing.Queue(self._max_workers + EXTRA_QUEUED_CALLS)
        self._result_queue = multiprocessing.Queue()
        self._work_ids = queue.Queue()
        self._queue_management_thread = None        
        self._processes = set()

        # Shutdown is a two-step process.        self._shutdown_thread = False        self._shutdown_lock = threading.Lock()
        self._queue_count = 0  
        self._pending_work_items = {}
        self._initial_args = args
        self._initial_kwargs = kwargs

Pass the attributes to the spawned processes:

def _adjust_process_count(self):
    for _ in range(len(self._processes), self._max_workers):
        p = multiprocessing.Process(
                target=_process_worker,
                args=(self._call_queue,
                      self._result_queue,
                      self._initial_args,
                      self._initial_kwargs))
        p.start()
        self._processes.add(p)

Use the parameters to run each task:

def _process_worker(call_queue, result_queue, args, kwargs):
    """Evaluates calls from call_queue and places the results in result_queue.
    This worker is run in a separate process.
    Args:        call_queue: A multiprocessing.Queue of _CallItems that will be read and            
    evaluated by the worker.        
    result_queue: A multiprocessing.Queue of _ResultItems that will written            
    to by the worker.        
    shutdown: A multiprocessing.Event that will be set as a signal to the            
    worker that it should exit when call_queue is empty.    
    """    
    while True:
        call_item = call_queue.get(block=True)
        if call_item is None:
            # Wake up queue management thread
            result_queue.put(None)
            return        
        try:
            args += call_item.args
            kwargs.update(**call_item.kwargs)
            r = call_item.fn(*args, **kwargs)
        except BaseException:
            e = sys.exc_info()[1]
            result_queue.put(_ResultItem(call_item.work_id, exception=e))
        else:
            result_queue.put(_ResultItem(call_item.work_id, result=r))

The benefit was even greater as we removed all the extra processing due to serialization of the same object.

The same code went from executing for more than 20 mins to less than a minute thanks to the use of multiprocessing, fixed process spawning overhead and bypassing all the extra serialization.

Going further, we plan on experimenting with Go and Unikernels to see if we can gain more performance.