Hướng dẫn concurrent loop python - trăn vòng lặp đồng thời

Cách dễ nhất để song song hóa mã này là gì?

Sử dụng Poolexecutor từ concurrent.futures. So sánh mã gốc với cái này, cạnh nhau. Đầu tiên, cách ngắn gọn nhất để tiếp cận điều này là với executor.map:

...
with ProcessPoolExecutor() as executor:
    for out1, out2, out3 in executor.map(calc_stuff, parameters):
        ...

hoặc chia nhỏ bằng cách gửi từng cuộc gọi riêng lẻ:

...
with ThreadPoolExecutor() as executor:
    futures = []
    for parameter in parameters:
        futures.append(executor.submit(calc_stuff, parameter))

    for future in futures:
        out1, out2, out3 = future.result() # this will block
        ...

Rời khỏi ngữ cảnh báo hiệu cho người thực thi giải phóng tài nguyên

Bạn có thể sử dụng các luồng hoặc quy trình và sử dụng cùng một giao diện.

Một ví dụ làm việc

Dưới đây là mã ví dụ hoạt động, sẽ chứng minh giá trị của:

Đặt cái này vào một tệp - FutureTest.py:

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from time import time
from http.client import HTTPSConnection

def processor_intensive(arg):
    def fib(n): # recursive, processor intensive calculation (avoid n > 36)
        return fib(n-1) + fib(n-2) if n > 1 else n
    start = time()
    result = fib(arg)
    return time() - start, result

def io_bound(arg):
    start = time()
    con = HTTPSConnection(arg)
    con.request('GET', '/')
    result = con.getresponse().getcode()
    return time() - start, result

def manager(PoolExecutor, calc_stuff):
    if calc_stuff is io_bound:
        inputs = ('python.org', 'stackoverflow.com', 'stackexchange.com',
                  'noaa.gov', 'parler.com', 'aaronhall.dev')
    else:
        inputs = range(25, 32)
    timings, results = list(), list()
    start = time()
    with PoolExecutor() as executor:
        for timing, result in executor.map(calc_stuff, inputs):
            # put results into correct output list:
            timings.append(timing), results.append(result)
    finish = time()
    print(f'{calc_stuff.__name__}, {PoolExecutor.__name__}')
    print(f'wall time to execute: {finish-start}')
    print(f'total of timings for each call: {sum(timings)}')
    print(f'time saved by parallelizing: {sum(timings) - (finish-start)}')
    print(dict(zip(inputs, results)), end = '\n\n')

def main():
    for computation in (processor_intensive, io_bound):
        for pool_executor in (ProcessPoolExecutor, ThreadPoolExecutor):
            manager(pool_executor, calc_stuff=computation)

if __name__ == '__main__':
    main()

Và đây là đầu ra cho một lần chạy python -m futuretest:

processor_intensive, ProcessPoolExecutor
wall time to execute: 0.7326343059539795
total of timings for each call: 1.8033506870269775
time saved by parallelizing: 1.070716381072998
{25: 75025, 26: 121393, 27: 196418, 28: 317811, 29: 514229, 30: 832040, 31: 1346269}

processor_intensive, ThreadPoolExecutor
wall time to execute: 1.190223217010498
total of timings for each call: 3.3561410903930664
time saved by parallelizing: 2.1659178733825684
{25: 75025, 26: 121393, 27: 196418, 28: 317811, 29: 514229, 30: 832040, 31: 1346269}

io_bound, ProcessPoolExecutor
wall time to execute: 0.533886194229126
total of timings for each call: 1.2977914810180664
time saved by parallelizing: 0.7639052867889404
{'python.org': 301, 'stackoverflow.com': 200, 'stackexchange.com': 200, 'noaa.gov': 301, 'parler.com': 200, 'aaronhall.dev': 200}

io_bound, ThreadPoolExecutor
wall time to execute: 0.38941240310668945
total of timings for each call: 1.6049387454986572
time saved by parallelizing: 1.2155263423919678
{'python.org': 301, 'stackoverflow.com': 200, 'stackexchange.com': 200, 'noaa.gov': 301, 'parler.com': 200, 'aaronhall.dev': 200}

Phân tích chuyên sâu bộ xử lý

Khi thực hiện các tính toán chuyên sâu của bộ xử lý trong Python, mong đợi ProcessPoolExecutor sẽ có hiệu suất hơn so với ThreadPoolExecutor.

Do khóa phiên dịch toàn cầu (a.k.a. GIL), các luồng không thể sử dụng nhiều bộ xử lý, vì vậy mong đợi thời gian cho mỗi tính toán và thời gian tường (thời gian thực trôi qua) sẽ lớn hơn.

Phân tích giới hạn IO

Mặt khác, khi thực hiện các hoạt động ràng buộc IO, hy vọng ThreadPoolExecutor sẽ có hiệu suất hơn ProcessPoolExecutor.

Chủ đề của Python là có thật, OS, chủ đề. Chúng có thể được đưa vào giấc ngủ bởi hệ điều hành và đánh thức lại khi thông tin của họ đến.

Suy nghĩ cuối cùng

Tôi nghi ngờ rằng đa xử lý sẽ chậm hơn trên Windows, vì Windows không hỗ trợ cho việc mỗi quy trình mới phải mất thời gian để khởi chạy.

Bạn có thể làm tổ nhiều luồng bên trong nhiều quy trình, nhưng khuyến nghị không sử dụng nhiều luồng để xoay quanh nhiều quy trình.

Nếu phải đối mặt với một vấn đề xử lý nặng trong Python, bạn có thể tầm thường mở rộng quy mô với các quy trình bổ sung - nhưng không quá nhiều với luồng.