Skip to content

Queue and Multi Processing Pool - Python

May 2019

Interfacing multiprocessing queue to delegate tasks to multiprocessing pool in python

Code snippet

#!/usr/bin/env python3.7

from multiprocessing import Pool, Queue
from os import getpid
from time import sleep
from queue import Empty
from random import randint


QUEUE_TIMEOUT = 5  # seconds

proc_queue = Queue()


def worker_fn(queue: Queue):
    print(f"{getpid()} working")
    try:
        while True:
            item = queue.get(block=True, timeout=QUEUE_TIMEOUT)
            sleep(randint(1, 4))
            print(f"{getpid()} got {item}")
    except Empty:
        print(f"{getpid()} stopping")


proc_pool = Pool(processes=4, initializer=worker_fn, initargs=(proc_queue,))

for i in range(10):
    payload = {"id": i}
    proc_queue.put(payload)

proc_pool.close()
proc_pool.join()

Using multiprocessing pool with async worker function and callback

Code snippet

#!/usr/bin/env python3.7

from os import getpid
import multiprocessing
from time import sleep


def worker_fn(payload):
    sleep(1)
    print(f"{getpid()} processing {payload}")
    return f"{getpid()}-{payload['id']}"


def callback_fn(val):
    print(f"callback {val}")


if __name__ == "__main__":
    proc_pool = multiprocessing.Pool(4)
    for i in range(0, 10):
        payload = {"id": i}
        proc_pool.apply_async(worker_fn, args=(payload,), callback=callback_fn)
    proc_pool.close()
    proc_pool.join()