Concurrent Python

2 minute read

Multithreading

Because of the GIL (global interpretter lock), python byte code cannot run in parallel. This means that multithreading doesn’t improve performance for CPU-bound tasks; however, it is useful for taking care of multiple IO requests where normally our programs spend a lot of time waiting.

Basic multithreading is similar to threads in c++:

from threading import Thread

def printer(a, b):
    print(f"{a} {b}")

t1 = Thread(target=printer, args=("Hello", "World"))
t2 = Thread(target=printer, args=("Good", "night"))

t1.start()
t2.start()

t1.join()
t2.join()

In many cases, managing threads is simplified using a thread pool executor:

from concurrent.futures import ThreadPoolExecutor

def printer(a, b):
    print(f"{a} {b}")

with ThreadPoolExecutor(max_workers=10) as executor:
    for i in range(100):
        executor.submit(printer, "Hello", i)

Data Sharing

Condition variables

from concurrent.futures import ThreadPoolExecutor
import threading
import time

value = None
done = False

def producer():
    global value, done
    with cv:
        for i in range(10):
            time.sleep(0.1)
            value = i # Produce a new value
            cv.notify()
            cv.wait_for(lambda: value is None)
        done = True
        cv.notify()

def consumer():
    global value, done
    while True:
        with cv:
            cv.wait_for(lambda: value is not None or done)
            if done:
                return
            print(f"consumed {value}")
            value = None
            cv.notify()

cv = threading.Condition()
with ThreadPoolExecutor() as executor:
    executor.submit(producer)
    executor.submit(consumer)

Queues

from concurrent.futures import ThreadPoolExecutor
import queue
import time

def producer():
    for i in range(10):
        time.sleep(0.1)
        q.put(i)
    q.put(None) # Indicate we are done

def consumer():
    while (value := q.get()) is not None:
        print(f"consumed {value}")

q = queue.Queue()
with ThreadPoolExecutor() as executor:
    executor.submit(producer)
    executor.submit(consumer)

Multiprocessing

Multiprocessing spawns new processes, each has their own GIL, so we can run CPU-bound tasks in parallel.

from multiprocessing import Process

def printer(a, b):
    print(f"{a} {b}")

if __name__ == '__main__':
    p1 = Process(target=printer, args=("Hello", "World"))
    p2 = Process(target=printer, args=("Good", "night"))

    p1.start()
    p2.start()

    p1.join()
    p2.join()

Pools are also available. There are a few ways of using these, such as pool.map where a list is passed. Callbacks can also be applied to get results from execution.

from multiprocessing import Pool

def formatter(a, b):
    return(f"{a} {b}")

if __name__ == '__main__':
    with Pool(processes=10) as pool:
        for i in range(100):
            pool.apply_async(formatter, ("Hello", i), 
                callback=print)
        
        pool.close()
        pool.join()

Queues

from multiprocessing import Process, Queue

def printer(q):
    while (v := q.get()) is not None:
        a, b = v
        print(f"{a} {b}")

if __name__ == '__main__':
    q = Queue()
    p = Process(target=printer, args=(q,))
   
    p.start()
    q.put(("Hello", "World"))
    q.put(("Good", "Night"))
    q.put(None)
    p.join()

Pipes

from multiprocessing import Process, Pipe

def piper(conn):
    rx = conn.recv()
    print(f"process in {rx}")
    conn.send(["Good", "Night"])
    conn.close()

if __name__ == '__main__':
    p_conn, c_conn = Pipe()
    p = Process(target=piper, args=(c_conn,))
   
    p.start()
    p_conn.send(["Hello", "World"])
    rx = p_conn.recv()
    print(f"process out {rx}")
    p_conn.close()
    p.join()