Monday, July 18, 2016

Python Concurrency and Parallelism: building a custom ProcessPoolExecutor

At Ostorlab we scan hundreds of Mobile Applications each day, each scan is very resource intensive but at the same time, since the beginning,  we had to optimize the code for speed and maximize use of cloud ressources.

Afficher l'image d'origine

Without re-discussing concurrency and parallelism on Python, there are plenty of excellent ressources on the subject, my favorite is "Fluent Python" by Luciano Ramalho.

But here are some quick notes:
  • In CPython, the GIL prevents multiple native threads from executing Python bytecodes at once. This means that Threading is inappropriate for CPU-bound parallelism. I recommend these talk for more information on the subject "Understanding the Python GIL" and "Removing Python's GIL: The Gilectomy"
  • Processing is more adapted to CPU-bound parallelism, however spawning a process results in a significant overhead, if the goal is to run multiple tasks both concurrently and in parallel, running each task in a separate process is highly inefficient
To solve that issue, concurrent.futures library was introduced in Python 3.2 and back-ported to Python 2.5. The library allows spawning a number of workers, default is the number of cores on your computer, to which you can pass tasks for processing. Tasks are sent through queues after being serialized using Pickle.

This is an example from Fluent Python public Github:

"""Download flags of top 20 countries by population
ProcessPoolExecutor version
Sample run::
    $ python3 flags_threadpool.py    BD retrieved.    EG retrieved.    CN retrieved.    ...    PH retrieved.    US retrieved.    IR retrieved.    20 flags downloaded in 0.93s
"""# BEGIN FLAGS_PROCESSPOOLfrom concurrent import futures

from flags import save_flag, get_flag, show, main

MAX_WORKERS = 20

def download_one(cc):
    image = get_flag(cc)
    show(cc)
    save_flag(image, cc.lower() + '.gif')
    return cc


def download_many(cc_list):
    with futures.ProcessPoolExecutor() as executor:  # <1>        
        res = executor.map(download_one, sorted(cc_list))

    return len(list(res))


if __name__ == '__main__':
    main(download_many)
# END FLAGS_PROCESSPOOL

Playing with this, we rapidly faced a lot of limitations because of Pickle serialization limitations, the two main issues were:
For the second issue, we had to think of a way to pass arguments to tasks without going through the queue and the serialization process.

Before using the ProcessPoolExecutor, we used simple Processes which received the arguments in args parameter:

p = multiprocessing.Process(
        target=_process_worker,
        args=(self._call_queue,
              self._result_queue,
              self._initial_args,
              self._initial_kwargs))


Passing arguments this way posed no issue on the size and requires of course no serialization. Python allows to pass queues, pipes and even shared memory using the types multiprocessing.Value and multiprocessing.Array (see multiprocessing)

The particularity of our code was that the passed argument were always the same and significant in size, so we  decided to modify ProcessPoolExecutor to pass these arguments at the initialization which he then passes to the spawned working processes.

Tasks will be executed with these argument as initial parameters, here are the main changes, add initial_args and initial_kwargs attributes:

class ProcessPoolExecutor(_base.Executor):
    def __init__(self, max_workers=None, *args, **kwargs):
        """Initializes a new ProcessPoolExecutor instance.
        Args:            max_workers: The maximum number of processes that can be used to                execute the given calls. If None or not given then as many                worker processes will be created as the machine has processors.        """        _check_system_limits()

        if max_workers is None:
            self._max_workers = multiprocessing.cpu_count()
        else:
            self._max_workers = max_workers

        # Make the call queue slightly larger than the number of processes to        # prevent the worker processes from idling. But don't make it too big        # because futures in the call queue cannot be cancelled.        self._call_queue = multiprocessing.Queue(self._max_workers +
                                                 EXTRA_QUEUED_CALLS)
        self._result_queue = multiprocessing.Queue()
        self._work_ids = queue.Queue()
        self._queue_management_thread = None        self._processes = set()

        # Shutdown is a two-step process.        self._shutdown_thread = False        self._shutdown_lock = threading.Lock()
        self._queue_count = 0        self._pending_work_items = {}
        self._initial_args = args
        self._initial_kwargs = kwargs

Pass the attributes to the spawned processes:

def _adjust_process_count(self):
    for _ in range(len(self._processes), self._max_workers):
        p = multiprocessing.Process(
                target=_process_worker,
                args=(self._call_queue,
                      self._result_queue,
                      self._initial_args,
                      self._initial_kwargs))
        p.start()
        self._processes.add(p)

Use the parameters to run each task:

def _process_worker(call_queue, result_queue, args, kwargs):
    """Evaluates calls from call_queue and places the results in result_queue.
    This worker is run in a separate process.
    Args:        call_queue: A multiprocessing.Queue of _CallItems that will be read and            evaluated by the worker.        result_queue: A multiprocessing.Queue of _ResultItems that will written            to by the worker.        shutdown: A multiprocessing.Event that will be set as a signal to the            worker that it should exit when call_queue is empty.    """    while True:
        call_item = call_queue.get(block=True)
        if call_item is None:
            # Wake up queue management thread            result_queue.put(None)
            return        try:
            args += call_item.args
            kwargs.update(**call_item.kwargs)
            r = call_item.fn(*args, **kwargs)
        except BaseException:
            e = sys.exc_info()[1]
            result_queue.put(_ResultItem(call_item.work_id,
                                         exception=e))
        else:
            result_queue.put(_ResultItem(call_item.work_id,
                                         result=r))

The benefit was even greater as we removed all the extra processing due to serialization of the same object.

The same code went from executing for more than 20 mins to less than a minute thanks to the use of multiprocessing, fixed process spawning overhead and bypassing all the extra serialization.

Going further, we plan on experimenting with Go and Unikernels to see if we can gain more performance.

Popular Posts