Hướng dẫn does threading make python faster? - luồng có làm cho python nhanh hơn không?

Bây giờ tôi đang nghiên cứu cách tìm nạp dữ liệu từ trang web càng nhanh càng tốt. Để có được tốc độ nhanh hơn, tôi đang xem xét sử dụng nhiều luồng. Dưới đây là mã tôi đã sử dụng để kiểm tra sự khác biệt giữa bài viết đa luồng và đơn giản.

import threading import time import urllib import urllib2 class Post: def __init__(self, website, data, mode): self.website = website self.data = data #mode is either "Simple"(Simple POST) or "Multiple"(Multi-thread POST) self.mode = mode def post(self): #post data req = urllib2.Request(self.website) open_url = urllib2.urlopen(req, self.data) if self.mode == "Multiple": time.sleep(0.001) #read HTMLData HTMLData = open_url.read() print "OK" if __name__ == "__main__": current_post = Post("//forum.xda-developers.com/login.php", "vb_login_username=test&vb_login_password&securitytoken=guest&do=login", \ "Simple") #save the time before post data origin_time = time.time() if(current_post.mode == "Multiple"): #multithreading POST for i in range(0, 10): thread = threading.Thread(target = current_post.post) thread.start() thread.join() #calculate the time interval time_interval = time.time() - origin_time print time_interval if(current_post.mode == "Simple"): #simple POST for i in range(0, 10): current_post.post() #calculate the time interval time_interval = time.time() - origin_time print time_interval

Giống như bạn có thể thấy, đây là một mã rất đơn giản. Đầu tiên tôi đặt chế độ thành "đơn giản" và tôi có thể nhận được khoảng thời gian: 50 giây (có thể tốc độ của tôi hơi chậm :(). Sau đó, tôi đặt chế độ thành "nhiều" và tôi nhận được khoảng thời gian: 35. Từ đó tôi có thể thấy, nhiều luồng thực sự có thể tăng tốc độ, nhưng kết quả không tốt như tôi tưởng tượng. Tôi muốn có được tốc độ nhanh hơn nhiều.50s(maybe my speed is a little slow :(). then i set the mode to "Multiple", and i get the time interval: 35. from that i can see, multi-thread can actually increase the speed, but the result isnt as good as i imagine. i want to get a much faster speed.

Từ việc gỡ lỗi, tôi thấy rằng chương trình chủ yếu chặn tại dòng: import json import logging import os from pathlib import Path from urllib.request import urlopen, Request logger = logging.getLogger(__name__) types = {'image/jpeg', 'image/png'} def get_links(client_id): headers = {'Authorization': 'Client-ID {}'.format(client_id)} req = Request('//api.imgur.com/3/gallery/random/random/', headers=headers, method='GET') with urlopen(req) as resp: data = json.loads(resp.read().decode('utf-8')) return [item['link'] for item in data['data'] if 'type' in item and item['type'] in types] def download_link(directory, link): download_path = directory / os.path.basename(link) with urlopen(link) as image, download_path.open('wb') as f: f.write(image.read()) logger.info('Downloaded %s', link) def setup_download_dir(): download_dir = Path('images') if not download_dir.exists(): download_dir.mkdir() return download_dir 0, dòng mã này mất rất nhiều thời gian để đăng và nhận dữ liệu từ trang web được chỉ định. Tôi đoán có lẽ tôi có thể có được tốc độ nhanh hơn bằng cách thêm import json import logging import os from pathlib import Path from urllib.request import urlopen, Request logger = logging.getLogger(__name__) types = {'image/jpeg', 'image/png'} def get_links(client_id): headers = {'Authorization': 'Client-ID {}'.format(client_id)} req = Request('//api.imgur.com/3/gallery/random/random/', headers=headers, method='GET') with urlopen(req) as resp: data = json.loads(resp.read().decode('utf-8')) return [item['link'] for item in data['data'] if 'type' in item and item['type'] in types] def download_link(directory, link): download_path = directory / os.path.basename(link) with urlopen(link) as image, download_path.open('wb') as f: f.write(image.read()) logger.info('Downloaded %s', link) def setup_download_dir(): download_dir = Path('images') if not download_dir.exists(): download_dir.mkdir() return download_dir 1 và sử dụng nhiều luồng bên trong hàm import json import logging import os from pathlib import Path from urllib.request import urlopen, Request logger = logging.getLogger(__name__) types = {'image/jpeg', 'image/png'} def get_links(client_id): headers = {'Authorization': 'Client-ID {}'.format(client_id)} req = Request('//api.imgur.com/3/gallery/random/random/', headers=headers, method='GET') with urlopen(req) as resp: data = json.loads(resp.read().decode('utf-8')) return [item['link'] for item in data['data'] if 'type' in item and item['type'] in types] def download_link(directory, link): download_path = directory / os.path.basename(link) with urlopen(link) as image, download_path.open('wb') as f: f.write(image.read()) logger.info('Downloaded %s', link) def setup_download_dir(): download_dir = Path('images') if not download_dir.exists(): download_dir.mkdir() return download_dir 2, nhưng tôi không thể làm điều đó bởi vì chức năng riêng của Python.

Nếu không xem xét các giới hạn có thể sử dụng mà máy chủ chặn tốc độ bài, tôi có thể làm gì khác để có được tốc độ nhanh hơn? Hoặc bất kỳ mã nào khác tôi có thể sửa đổi? cảm ơn nhiều!

Lưu ý: Theo yêu cầu phổ biến mà tôi chứng minh một số kỹ thuật thay thế --- bao gồm async/chờ đợi, chỉ có sẵn kể từ khi Python 3.5 --- tôi đã thêm một số cập nhật vào cuối bài viết. Vui thích! By popular request that I demonstrate some alternative techniques---including async/await, only available since the advent of Python 3.5---I've added some updates at the end of the article. Enjoy!

Các cuộc thảo luận chỉ trích Python thường nói về việc khó sử dụng Python cho công việc đa luồng, chỉ tay vào cái được gọi là khóa phiên dịch toàn cầu (được gọi một cách trìu mến là GIL) ngăn chặn nhiều chủ đề của mã Python chạy đồng thời. Do đó, mô -đun đa luồng Python không hoàn toàn hoạt động theo cách bạn mong đợi nếu bạn không phải là nhà phát triển Python và bạn đến từ các ngôn ngữ khác như C ++ hoặc Java. Cần phải làm rõ rằng người ta vẫn có thể viết mã bằng Python chạy đồng thời hoặc song song và tạo ra sự khác biệt rõ rệt về hiệu suất dẫn đến, miễn là một số điều nhất định được xem xét. Nếu bạn chưa đọc nó, tôi khuyên bạn nên xem bài viết của Eqbal Quran, về sự đồng thời và song song trong Ruby ở đây trên blog kỹ thuật Toptal.

Trong hướng dẫn đồng thời Python này, chúng tôi sẽ viết một kịch bản Python nhỏ để tải xuống các hình ảnh phổ biến hàng đầu từ Imgur. Chúng tôi sẽ bắt đầu với một phiên bản tải xuống hình ảnh tuần tự hoặc một lần. Như một điều kiện tiên quyết, bạn sẽ phải đăng ký một ứng dụng trên Imgur. Nếu bạn chưa có tài khoản Imgur, vui lòng tạo một tài khoản trước.

Các tập lệnh trong các ví dụ luồng này đã được thử nghiệm với Python 3.6.4. Với một số thay đổi, họ cũng nên chạy với Python 2, ur Surb là những gì đã thay đổi nhất giữa hai phiên bản Python này.

Bắt đầu với Python MultiThreading

Chúng ta hãy bắt đầu bằng cách tạo một mô -đun Python, được đặt tên là import json import logging import os from pathlib import Path from urllib.request import urlopen, Request logger = logging.getLogger(__name__) types = {'image/jpeg', 'image/png'} def get_links(client_id): headers = {'Authorization': 'Client-ID {}'.format(client_id)} req = Request('//api.imgur.com/3/gallery/random/random/', headers=headers, method='GET') with urlopen(req) as resp: data = json.loads(resp.read().decode('utf-8')) return [item['link'] for item in data['data'] if 'type' in item and item['type'] in types] def download_link(directory, link): download_path = directory / os.path.basename(link) with urlopen(link) as image, download_path.open('wb') as f: f.write(image.read()) logger.info('Downloaded %s', link) def setup_download_dir(): download_dir = Path('images') if not download_dir.exists(): download_dir.mkdir() return download_dir 3. Tệp này sẽ chứa tất cả các chức năng cần thiết để tìm nạp danh sách các hình ảnh và tải xuống chúng. Chúng tôi sẽ chia các chức năng này thành ba chức năng riêng biệt:

  • import json import logging import os from pathlib import Path from urllib.request import urlopen, Request logger = logging.getLogger(__name__) types = {'image/jpeg', 'image/png'} def get_links(client_id): headers = {'Authorization': 'Client-ID {}'.format(client_id)} req = Request('//api.imgur.com/3/gallery/random/random/', headers=headers, method='GET') with urlopen(req) as resp: data = json.loads(resp.read().decode('utf-8')) return [item['link'] for item in data['data'] if 'type' in item and item['type'] in types] def download_link(directory, link): download_path = directory / os.path.basename(link) with urlopen(link) as image, download_path.open('wb') as f: f.write(image.read()) logger.info('Downloaded %s', link) def setup_download_dir(): download_dir = Path('images') if not download_dir.exists(): download_dir.mkdir() return download_dir 4
  • import json import logging import os from pathlib import Path from urllib.request import urlopen, Request logger = logging.getLogger(__name__) types = {'image/jpeg', 'image/png'} def get_links(client_id): headers = {'Authorization': 'Client-ID {}'.format(client_id)} req = Request('//api.imgur.com/3/gallery/random/random/', headers=headers, method='GET') with urlopen(req) as resp: data = json.loads(resp.read().decode('utf-8')) return [item['link'] for item in data['data'] if 'type' in item and item['type'] in types] def download_link(directory, link): download_path = directory / os.path.basename(link) with urlopen(link) as image, download_path.open('wb') as f: f.write(image.read()) logger.info('Downloaded %s', link) def setup_download_dir(): download_dir = Path('images') if not download_dir.exists(): download_dir.mkdir() return download_dir 5
  • import json import logging import os from pathlib import Path from urllib.request import urlopen, Request logger = logging.getLogger(__name__) types = {'image/jpeg', 'image/png'} def get_links(client_id): headers = {'Authorization': 'Client-ID {}'.format(client_id)} req = Request('//api.imgur.com/3/gallery/random/random/', headers=headers, method='GET') with urlopen(req) as resp: data = json.loads(resp.read().decode('utf-8')) return [item['link'] for item in data['data'] if 'type' in item and item['type'] in types] def download_link(directory, link): download_path = directory / os.path.basename(link) with urlopen(link) as image, download_path.open('wb') as f: f.write(image.read()) logger.info('Downloaded %s', link) def setup_download_dir(): download_dir = Path('images') if not download_dir.exists(): download_dir.mkdir() return download_dir 6

Hàm thứ ba, import json import logging import os from pathlib import Path from urllib.request import urlopen, Request logger = logging.getLogger(__name__) types = {'image/jpeg', 'image/png'} def get_links(client_id): headers = {'Authorization': 'Client-ID {}'.format(client_id)} req = Request('//api.imgur.com/3/gallery/random/random/', headers=headers, method='GET') with urlopen(req) as resp: data = json.loads(resp.read().decode('utf-8')) return [item['link'] for item in data['data'] if 'type' in item and item['type'] in types] def download_link(directory, link): download_path = directory / os.path.basename(link) with urlopen(link) as image, download_path.open('wb') as f: f.write(image.read()) logger.info('Downloaded %s', link) def setup_download_dir(): download_dir = Path('images') if not download_dir.exists(): download_dir.mkdir() return download_dir 6, sẽ được sử dụng để tạo thư mục đích tải xuống nếu nó không tồn tại.

API IMGUR yêu cầu yêu cầu HTTP chịu tiêu đề import json import logging import os from pathlib import Path from urllib.request import urlopen, Request logger = logging.getLogger(__name__) types = {'image/jpeg', 'image/png'} def get_links(client_id): headers = {'Authorization': 'Client-ID {}'.format(client_id)} req = Request('//api.imgur.com/3/gallery/random/random/', headers=headers, method='GET') with urlopen(req) as resp: data = json.loads(resp.read().decode('utf-8')) return [item['link'] for item in data['data'] if 'type' in item and item['type'] in types] def download_link(directory, link): download_path = directory / os.path.basename(link) with urlopen(link) as image, download_path.open('wb') as f: f.write(image.read()) logger.info('Downloaded %s', link) def setup_download_dir(): download_dir = Path('images') if not download_dir.exists(): download_dir.mkdir() return download_dir 8 với ID máy khách. Bạn có thể tìm thấy ID máy khách này từ bảng điều khiển của ứng dụng mà bạn đã đăng ký trên IMGUR và phản hồi sẽ được mã hóa JSON. Chúng ta có thể sử dụng thư viện JSON tiêu chuẩn Python, để giải mã nó. Tải xuống hình ảnh là một nhiệm vụ thậm chí còn đơn giản hơn, vì tất cả những gì bạn phải làm là lấy hình ảnh bằng URL của nó và ghi nó vào một tệp.

Đây là những gì kịch bản trông như thế nào:

import json import logging import os from pathlib import Path from urllib.request import urlopen, Request logger = logging.getLogger(__name__) types = {'image/jpeg', 'image/png'} def get_links(client_id): headers = {'Authorization': 'Client-ID {}'.format(client_id)} req = Request('//api.imgur.com/3/gallery/random/random/', headers=headers, method='GET') with urlopen(req) as resp: data = json.loads(resp.read().decode('utf-8')) return [item['link'] for item in data['data'] if 'type' in item and item['type'] in types] def download_link(directory, link): download_path = directory / os.path.basename(link) with urlopen(link) as image, download_path.open('wb') as f: f.write(image.read()) logger.info('Downloaded %s', link) def setup_download_dir(): download_dir = Path('images') if not download_dir.exists(): download_dir.mkdir() return download_dir

Tiếp theo, chúng ta sẽ cần viết một mô -đun sẽ sử dụng các chức năng này để tải xuống các hình ảnh, từng cái một. Chúng tôi sẽ đặt tên cho import json import logging import os from pathlib import Path from urllib.request import urlopen, Request logger = logging.getLogger(__name__) types = {'image/jpeg', 'image/png'} def get_links(client_id): headers = {'Authorization': 'Client-ID {}'.format(client_id)} req = Request('//api.imgur.com/3/gallery/random/random/', headers=headers, method='GET') with urlopen(req) as resp: data = json.loads(resp.read().decode('utf-8')) return [item['link'] for item in data['data'] if 'type' in item and item['type'] in types] def download_link(directory, link): download_path = directory / os.path.basename(link) with urlopen(link) as image, download_path.open('wb') as f: f.write(image.read()) logger.info('Downloaded %s', link) def setup_download_dir(): download_dir = Path('images') if not download_dir.exists(): download_dir.mkdir() return download_dir 9 này. Điều này sẽ chứa chức năng chính của phiên bản tải xuống hình ảnh đầu tiên của chúng tôi. Mô -đun sẽ truy xuất ID máy khách IMGUR trong biến môi trường import logging import os from time import time from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) def main(): ts = time() client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) for link in links: download_link(download_dir, link) logging.info('Took %s seconds', time() - ts) if __name__ == '__main__': main() 0. Nó sẽ gọi import json import logging import os from pathlib import Path from urllib.request import urlopen, Request logger = logging.getLogger(__name__) types = {'image/jpeg', 'image/png'} def get_links(client_id): headers = {'Authorization': 'Client-ID {}'.format(client_id)} req = Request('//api.imgur.com/3/gallery/random/random/', headers=headers, method='GET') with urlopen(req) as resp: data = json.loads(resp.read().decode('utf-8')) return [item['link'] for item in data['data'] if 'type' in item and item['type'] in types] def download_link(directory, link): download_path = directory / os.path.basename(link) with urlopen(link) as image, download_path.open('wb') as f: f.write(image.read()) logger.info('Downloaded %s', link) def setup_download_dir(): download_dir = Path('images') if not download_dir.exists(): download_dir.mkdir() return download_dir 6 để tạo thư mục đích tải xuống. Cuối cùng, nó sẽ tìm thấy một danh sách các hình ảnh bằng hàm import json import logging import os from pathlib import Path from urllib.request import urlopen, Request logger = logging.getLogger(__name__) types = {'image/jpeg', 'image/png'} def get_links(client_id): headers = {'Authorization': 'Client-ID {}'.format(client_id)} req = Request('//api.imgur.com/3/gallery/random/random/', headers=headers, method='GET') with urlopen(req) as resp: data = json.loads(resp.read().decode('utf-8')) return [item['link'] for item in data['data'] if 'type' in item and item['type'] in types] def download_link(directory, link): download_path = directory / os.path.basename(link) with urlopen(link) as image, download_path.open('wb') as f: f.write(image.read()) logger.info('Downloaded %s', link) def setup_download_dir(): download_dir = Path('images') if not download_dir.exists(): download_dir.mkdir() return download_dir 4, lọc tất cả các URL GIF và album, sau đó sử dụng import json import logging import os from pathlib import Path from urllib.request import urlopen, Request logger = logging.getLogger(__name__) types = {'image/jpeg', 'image/png'} def get_links(client_id): headers = {'Authorization': 'Client-ID {}'.format(client_id)} req = Request('//api.imgur.com/3/gallery/random/random/', headers=headers, method='GET') with urlopen(req) as resp: data = json.loads(resp.read().decode('utf-8')) return [item['link'] for item in data['data'] if 'type' in item and item['type'] in types] def download_link(directory, link): download_path = directory / os.path.basename(link) with urlopen(link) as image, download_path.open('wb') as f: f.write(image.read()) logger.info('Downloaded %s', link) def setup_download_dir(): download_dir = Path('images') if not download_dir.exists(): download_dir.mkdir() return download_dir 5 để tải xuống và lưu từng hình ảnh đó vào đĩa. Đây là những gì import json import logging import os from pathlib import Path from urllib.request import urlopen, Request logger = logging.getLogger(__name__) types = {'image/jpeg', 'image/png'} def get_links(client_id): headers = {'Authorization': 'Client-ID {}'.format(client_id)} req = Request('//api.imgur.com/3/gallery/random/random/', headers=headers, method='GET') with urlopen(req) as resp: data = json.loads(resp.read().decode('utf-8')) return [item['link'] for item in data['data'] if 'type' in item and item['type'] in types] def download_link(directory, link): download_path = directory / os.path.basename(link) with urlopen(link) as image, download_path.open('wb') as f: f.write(image.read()) logger.info('Downloaded %s', link) def setup_download_dir(): download_dir = Path('images') if not download_dir.exists(): download_dir.mkdir() return download_dir 9 trông giống như:

import logging import os from time import time from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) def main(): ts = time() client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) for link in links: download_link(download_dir, link) logging.info('Took %s seconds', time() - ts) if __name__ == '__main__': main()

Trên máy tính xách tay của tôi, tập lệnh này mất 19,4 giây để tải xuống 91 hình ảnh. Xin lưu ý rằng những con số này có thể thay đổi dựa trên mạng bạn đang bật. 19.4 giây là không dài khủng khiếp, nhưng nếu chúng ta muốn tải xuống nhiều hình ảnh hơn thì sao? Có lẽ 900 hình ảnh, thay vì 90. Với trung bình 0,2 giây mỗi hình ảnh, 900 hình ảnh sẽ mất khoảng 3 phút. Đối với 9000 hình ảnh sẽ mất 30 phút. Tin tốt là bằng cách giới thiệu đồng thời hoặc song song, chúng ta có thể tăng tốc độ này một cách đáng kể.

Tất cả các ví dụ mã tiếp theo sẽ chỉ hiển thị các báo cáo nhập mới và cụ thể cho các ví dụ đó. Để thuận tiện, tất cả các tập lệnh Python này có thể được tìm thấy trong kho GitHub này.

Đồng thời và song song trong Python: Ví dụ về luồng

Chủ đề là một trong những cách tiếp cận nổi tiếng nhất để đạt được sự đồng thời và song song của Python. Chủ đề là một tính năng thường được cung cấp bởi hệ điều hành. Chủ đề nhẹ hơn các quy trình và chia sẻ cùng một không gian bộ nhớ.

Trong ví dụ về luồng python này, chúng tôi sẽ viết một mô -đun mới để thay thế import json import logging import os from pathlib import Path from urllib.request import urlopen, Request logger = logging.getLogger(__name__) types = {'image/jpeg', 'image/png'} def get_links(client_id): headers = {'Authorization': 'Client-ID {}'.format(client_id)} req = Request('//api.imgur.com/3/gallery/random/random/', headers=headers, method='GET') with urlopen(req) as resp: data = json.loads(resp.read().decode('utf-8')) return [item['link'] for item in data['data'] if 'type' in item and item['type'] in types] def download_link(directory, link): download_path = directory / os.path.basename(link) with urlopen(link) as image, download_path.open('wb') as f: f.write(image.read()) logger.info('Downloaded %s', link) def setup_download_dir(): download_dir = Path('images') if not download_dir.exists(): download_dir.mkdir() return download_dir 9. Mô -đun này sẽ tạo ra một nhóm gồm tám luồng, tạo ra tổng cộng chín luồng bao gồm cả luồng chính. Tôi đã chọn tám luồng công nhân vì máy tính của tôi có tám lõi CPU và một luồng công nhân trên mỗi lõi dường như là một con số tốt cho số lượng chủ đề để chạy cùng một lúc. Trong thực tế, con số này được chọn cẩn thận hơn nhiều dựa trên các yếu tố khác, chẳng hạn như các ứng dụng và dịch vụ khác chạy trên cùng một máy.

Điều này gần giống như cái trước, ngoại trừ chúng ta hiện có một lớp mới, import logging import os from time import time from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) def main(): ts = time() client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) for link in links: download_link(download_dir, link) logging.info('Took %s seconds', time() - ts) if __name__ == '__main__': main() 6, đây là một hậu duệ của lớp Python import logging import os from time import time from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) def main(): ts = time() client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) for link in links: download_link(download_dir, link) logging.info('Took %s seconds', time() - ts) if __name__ == '__main__': main() 7. Phương thức chạy đã được ghi đè, chạy một vòng lặp vô hạn. Trên mỗi lần lặp, nó gọi import logging import os from time import time from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) def main(): ts = time() client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) for link in links: download_link(download_dir, link) logging.info('Took %s seconds', time() - ts) if __name__ == '__main__': main() 8 để thử và lấy URL từ hàng đợi an toàn cho luồng. Nó chặn cho đến khi có một mục trong hàng đợi cho người lao động xử lý. Khi công nhân nhận được một mục từ hàng đợi, sau đó, nó gọi cùng một phương thức import json import logging import os from pathlib import Path from urllib.request import urlopen, Request logger = logging.getLogger(__name__) types = {'image/jpeg', 'image/png'} def get_links(client_id): headers = {'Authorization': 'Client-ID {}'.format(client_id)} req = Request('//api.imgur.com/3/gallery/random/random/', headers=headers, method='GET') with urlopen(req) as resp: data = json.loads(resp.read().decode('utf-8')) return [item['link'] for item in data['data'] if 'type' in item and item['type'] in types] def download_link(directory, link): download_path = directory / os.path.basename(link) with urlopen(link) as image, download_path.open('wb') as f: f.write(image.read()) logger.info('Downloaded %s', link) def setup_download_dir(): download_dir = Path('images') if not download_dir.exists(): download_dir.mkdir() return download_dir 5 đã được sử dụng trong tập lệnh trước để tải hình ảnh vào thư mục hình ảnh. Sau khi tải xuống kết thúc, công nhân báo hiệu hàng đợi rằng nhiệm vụ đó được thực hiện. Điều này rất quan trọng, bởi vì hàng đợi theo dõi số lượng nhiệm vụ đã được thực hiện. Cuộc gọi đến import logging import os from queue import Queue from threading import Thread from time import time from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) class DownloadWorker(Thread): def __init__(self, queue): Thread.__init__(self) self.queue = queue def run(self): while True: # Get the work from the queue and expand the tuple directory, link = self.queue.get() try: download_link(directory, link) finally: self.queue.task_done() def main(): ts = time() client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) # Create a queue to communicate with the worker threads queue = Queue() # Create 8 worker threads for x in range(8): worker = DownloadWorker(queue) # Setting daemon to True will let the main thread exit even though the workers are blocking worker.daemon = True worker.start() # Put the tasks into the queue as a tuple for link in links: logger.info('Queueing {}'.format(link)) queue.put((download_dir, link)) # Causes the main thread to wait for the queue to finish processing all the tasks queue.join() logging.info('Took %s', time() - ts) if __name__ == '__main__': main() 0 sẽ chặn chủ đề chính mãi mãi nếu các công nhân không báo hiệu rằng họ đã hoàn thành một nhiệm vụ.

import logging import os from queue import Queue from threading import Thread from time import time from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) class DownloadWorker(Thread): def __init__(self, queue): Thread.__init__(self) self.queue = queue def run(self): while True: # Get the work from the queue and expand the tuple directory, link = self.queue.get() try: download_link(directory, link) finally: self.queue.task_done() def main(): ts = time() client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) # Create a queue to communicate with the worker threads queue = Queue() # Create 8 worker threads for x in range(8): worker = DownloadWorker(queue) # Setting daemon to True will let the main thread exit even though the workers are blocking worker.daemon = True worker.start() # Put the tasks into the queue as a tuple for link in links: logger.info('Queueing {}'.format(link)) queue.put((download_dir, link)) # Causes the main thread to wait for the queue to finish processing all the tasks queue.join() logging.info('Took %s', time() - ts) if __name__ == '__main__': main()

Chạy tập lệnh ví dụ về luồng python này trên cùng một máy được sử dụng kết quả trước đó trong thời gian tải xuống là 4,1 giây! Đó là nhanh hơn 4,7 lần so với ví dụ trước. Mặc dù điều này nhanh hơn nhiều, nhưng điều đáng nói là chỉ có một chủ đề được thực hiện tại một thời điểm trong suốt quá trình này do GIL. Do đó, mã này đồng thời nhưng không song song. Lý do nó vẫn nhanh hơn là vì đây là một nhiệm vụ ràng buộc IO. Bộ xử lý hầu như không đổ mồ hôi trong khi tải xuống những hình ảnh này và phần lớn thời gian được dành để chờ mạng. Đây là lý do tại sao đa luồng Python có thể giúp tăng tốc độ lớn. Bộ xử lý có thể chuyển đổi giữa các luồng bất cứ khi nào một trong số chúng sẵn sàng thực hiện một số công việc. Sử dụng mô -đun luồng trong Python hoặc bất kỳ ngôn ngữ được giải thích nào khác với GIL thực sự có thể dẫn đến giảm hiệu suất. Nếu mã của bạn đang thực hiện một tác vụ ràng buộc CPU, chẳng hạn như giải nén các tệp GZIP, sử dụng mô -đun import logging import os from queue import Queue from threading import Thread from time import time from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) class DownloadWorker(Thread): def __init__(self, queue): Thread.__init__(self) self.queue = queue def run(self): while True: # Get the work from the queue and expand the tuple directory, link = self.queue.get() try: download_link(directory, link) finally: self.queue.task_done() def main(): ts = time() client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) # Create a queue to communicate with the worker threads queue = Queue() # Create 8 worker threads for x in range(8): worker = DownloadWorker(queue) # Setting daemon to True will let the main thread exit even though the workers are blocking worker.daemon = True worker.start() # Put the tasks into the queue as a tuple for link in links: logger.info('Queueing {}'.format(link)) queue.put((download_dir, link)) # Causes the main thread to wait for the queue to finish processing all the tasks queue.join() logging.info('Took %s', time() - ts) if __name__ == '__main__': main() 1 sẽ dẫn đến thời gian thực hiện chậm hơn. Đối với các tác vụ ràng buộc CPU và thực hiện thực sự song song, chúng ta có thể sử dụng mô -đun đa xử lý.

Trong khi việc thực hiện thực tế Python thực hiện, Cpython, có một GIL, nhưng điều này không đúng với tất cả các triển khai Python. Ví dụ, Ironpython, một triển khai Python sử dụng khung .NET, không có GIL và Jython, việc triển khai dựa trên Java cũng không. Bạn có thể tìm thấy một danh sách các triển khai Python hoạt động ở đây.

Đồng thời và song song trong Python Ví dụ 2: Sinh sản nhiều quá trình

Mô -đun đa bộ xử lý dễ dàng giảm hơn so với mô -đun luồng, vì chúng tôi không cần thêm một lớp như ví dụ về luồng Python. Những thay đổi duy nhất chúng ta cần thực hiện là trong chức năng chính.

Để sử dụng nhiều quy trình, chúng tôi tạo ra một đa xử lý import logging import os from queue import Queue from threading import Thread from time import time from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) class DownloadWorker(Thread): def __init__(self, queue): Thread.__init__(self) self.queue = queue def run(self): while True: # Get the work from the queue and expand the tuple directory, link = self.queue.get() try: download_link(directory, link) finally: self.queue.task_done() def main(): ts = time() client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) # Create a queue to communicate with the worker threads queue = Queue() # Create 8 worker threads for x in range(8): worker = DownloadWorker(queue) # Setting daemon to True will let the main thread exit even though the workers are blocking worker.daemon = True worker.start() # Put the tasks into the queue as a tuple for link in links: logger.info('Queueing {}'.format(link)) queue.put((download_dir, link)) # Causes the main thread to wait for the queue to finish processing all the tasks queue.join() logging.info('Took %s', time() - ts) if __name__ == '__main__': main() 2. Với phương thức MAP mà nó cung cấp, chúng tôi sẽ chuyển danh sách các URL đến nhóm, từ đó sẽ sinh ra tám quy trình mới và sử dụng từng quy trình để tải xuống các hình ảnh song song. Đây là sự song song thực sự, nhưng nó đi kèm với một chi phí. Toàn bộ bộ nhớ của tập lệnh được sao chép vào mỗi quy trình con được sinh ra. Trong ví dụ đơn giản này, nó không phải là một vấn đề lớn, nhưng nó có thể dễ dàng trở thành chi phí nghiêm trọng cho các chương trình không tầm thường.

import logging import os from functools import partial from multiprocessing.pool import Pool from time import time from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logging.getLogger('requests').setLevel(logging.CRITICAL) logger = logging.getLogger(__name__) def main(): ts = time() client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) download = partial(download_link, download_dir) with Pool(4) as p: p.map(download, links) logging.info('Took %s seconds', time() - ts) if __name__ == '__main__': main()

Đồng thời và song song trong Python Ví dụ 3: Phân phối cho nhiều công nhân

Mặc dù các mô -đun phân xử và đa bộ xử lý rất tốt cho các tập lệnh đang chạy trên máy tính cá nhân của bạn, bạn nên làm gì nếu bạn muốn công việc được thực hiện trên một máy khác hoặc bạn cần mở rộng lên nhiều hơn CPU trên một máy có thể xử lý? Một trường hợp sử dụng tuyệt vời cho điều này là các nhiệm vụ back-end dài cho các ứng dụng web. Nếu bạn có một số nhiệm vụ chạy dài, bạn không muốn quay một loạt các quy trình phụ hoặc luồng trên cùng một máy cần phải chạy phần còn lại của mã ứng dụng. Điều này sẽ làm giảm hiệu suất của ứng dụng của bạn cho tất cả người dùng của bạn. Điều tuyệt vời là có thể chạy các công việc này trên một máy khác hoặc nhiều máy khác.

Một thư viện Python tuyệt vời cho nhiệm vụ này là RQ, một thư viện rất đơn giản nhưng mạnh mẽ. Trước tiên, bạn tạo ra một hàm và các đối số của nó bằng thư viện. Điều này Pickles biểu diễn cuộc gọi chức năng, sau đó được thêm vào danh sách Redis. Enqueueing công việc là bước đầu tiên, nhưng sẽ không làm gì cả. Chúng tôi cũng cần ít nhất một công nhân để lắng nghe hàng đợi công việc đó.

Bước đầu tiên là cài đặt và chạy máy chủ Redis trên máy tính của bạn hoặc có quyền truy cập vào máy chủ Redis đang chạy. Sau đó, chỉ có một vài thay đổi nhỏ được thực hiện đối với mã hiện có. Trước tiên, chúng tôi tạo một thể hiện của một hàng đợi RQ và chuyển nó một ví dụ của một máy chủ Redis từ thư viện Redis-PY. Sau đó, thay vì chỉ gọi phương thức import json import logging import os from pathlib import Path from urllib.request import urlopen, Request logger = logging.getLogger(__name__) types = {'image/jpeg', 'image/png'} def get_links(client_id): headers = {'Authorization': 'Client-ID {}'.format(client_id)} req = Request('//api.imgur.com/3/gallery/random/random/', headers=headers, method='GET') with urlopen(req) as resp: data = json.loads(resp.read().decode('utf-8')) return [item['link'] for item in data['data'] if 'type' in item and item['type'] in types] def download_link(directory, link): download_path = directory / os.path.basename(link) with urlopen(link) as image, download_path.open('wb') as f: f.write(image.read()) logger.info('Downloaded %s', link) def setup_download_dir(): download_dir = Path('images') if not download_dir.exists(): download_dir.mkdir() return download_dir 5 của chúng tôi, chúng tôi gọi import logging import os from queue import Queue from threading import Thread from time import time from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) class DownloadWorker(Thread): def __init__(self, queue): Thread.__init__(self) self.queue = queue def run(self): while True: # Get the work from the queue and expand the tuple directory, link = self.queue.get() try: download_link(directory, link) finally: self.queue.task_done() def main(): ts = time() client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) # Create a queue to communicate with the worker threads queue = Queue() # Create 8 worker threads for x in range(8): worker = DownloadWorker(queue) # Setting daemon to True will let the main thread exit even though the workers are blocking worker.daemon = True worker.start() # Put the tasks into the queue as a tuple for link in links: logger.info('Queueing {}'.format(link)) queue.put((download_dir, link)) # Causes the main thread to wait for the queue to finish processing all the tasks queue.join() logging.info('Took %s', time() - ts) if __name__ == '__main__': main() 4. Phương thức Enqueue có chức năng như đối số đầu tiên của nó, sau đó bất kỳ đối số hoặc đối số từ khóa nào khác được chuyển theo hàm đó khi công việc thực sự được thực hiện.

Một bước cuối cùng chúng ta cần làm là bắt đầu một số công nhân. RQ cung cấp một tập lệnh tiện dụng để chạy công nhân trên hàng đợi mặc định. Chỉ cần chạy import logging import os from queue import Queue from threading import Thread from time import time from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) class DownloadWorker(Thread): def __init__(self, queue): Thread.__init__(self) self.queue = queue def run(self): while True: # Get the work from the queue and expand the tuple directory, link = self.queue.get() try: download_link(directory, link) finally: self.queue.task_done() def main(): ts = time() client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) # Create a queue to communicate with the worker threads queue = Queue() # Create 8 worker threads for x in range(8): worker = DownloadWorker(queue) # Setting daemon to True will let the main thread exit even though the workers are blocking worker.daemon = True worker.start() # Put the tasks into the queue as a tuple for link in links: logger.info('Queueing {}'.format(link)) queue.put((download_dir, link)) # Causes the main thread to wait for the queue to finish processing all the tasks queue.join() logging.info('Took %s', time() - ts) if __name__ == '__main__': main() 5 trong cửa sổ thiết bị đầu cuối và nó sẽ bắt đầu một công nhân nghe trên hàng đợi mặc định. Vui lòng đảm bảo thư mục làm việc hiện tại của bạn giống như nơi các tập lệnh cư trú. Nếu bạn muốn nghe một hàng đợi khác, bạn có thể chạy import logging import os from queue import Queue from threading import Thread from time import time from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) class DownloadWorker(Thread): def __init__(self, queue): Thread.__init__(self) self.queue = queue def run(self): while True: # Get the work from the queue and expand the tuple directory, link = self.queue.get() try: download_link(directory, link) finally: self.queue.task_done() def main(): ts = time() client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) # Create a queue to communicate with the worker threads queue = Queue() # Create 8 worker threads for x in range(8): worker = DownloadWorker(queue) # Setting daemon to True will let the main thread exit even though the workers are blocking worker.daemon = True worker.start() # Put the tasks into the queue as a tuple for link in links: logger.info('Queueing {}'.format(link)) queue.put((download_dir, link)) # Causes the main thread to wait for the queue to finish processing all the tasks queue.join() logging.info('Took %s', time() - ts) if __name__ == '__main__': main() 6 và nó sẽ nghe hàng đợi được đặt tên đó. Điều tuyệt vời về RQ là miễn là bạn có thể kết nối với Redis, bạn có thể chạy nhiều công nhân như bạn muốn trên nhiều máy khác nhau như bạn muốn; Do đó, nó rất dễ dàng để mở rộng quy mô khi ứng dụng của bạn phát triển. Đây là nguồn cho phiên bản RQ:

import logging import os from redis import Redis from rq import Queue from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logging.getLogger('requests').setLevel(logging.CRITICAL) logger = logging.getLogger(__name__) def main(): client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) q = Queue(connection=Redis(host='localhost', port=6379)) for link in links: q.enqueue(download_link, download_dir, link) if __name__ == '__main__': main()

Tuy nhiên, RQ không phải là giải pháp hàng đợi công việc Python duy nhất. RQ rất dễ sử dụng và bao gồm các trường hợp sử dụng đơn giản rất tốt, nhưng nếu cần có nhiều tùy chọn nâng cao hơn, các giải pháp hàng đợi Python 3 khác (như cần tây) có thể được sử dụng.

Python đa luồng so với đa xử lý

Nếu mã của bạn bị ràng buộc IO, cả đa xử lý và đa luồng trong Python sẽ hoạt động cho bạn. Đa bộ xử lý là một điều dễ dàng hơn để giảm so với ren nhưng có chi phí bộ nhớ cao hơn. Nếu mã của bạn bị ràng buộc CPU, đa xử lý rất có thể sẽ là lựa chọn tốt hơn, đặc biệt là nếu máy đích có nhiều lõi hoặc CPU. Đối với các ứng dụng web và khi bạn cần mở rộng quy mô công việc trên nhiều máy, RQ sẽ tốt hơn cho bạn.

Cập nhật

Python import logging import os from queue import Queue from threading import Thread from time import time from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) class DownloadWorker(Thread): def __init__(self, queue): Thread.__init__(self) self.queue = queue def run(self): while True: # Get the work from the queue and expand the tuple directory, link = self.queue.get() try: download_link(directory, link) finally: self.queue.task_done() def main(): ts = time() client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) # Create a queue to communicate with the worker threads queue = Queue() # Create 8 worker threads for x in range(8): worker = DownloadWorker(queue) # Setting daemon to True will let the main thread exit even though the workers are blocking worker.daemon = True worker.start() # Put the tasks into the queue as a tuple for link in links: logger.info('Queueing {}'.format(link)) queue.put((download_dir, link)) # Causes the main thread to wait for the queue to finish processing all the tasks queue.join() logging.info('Took %s', time() - ts) if __name__ == '__main__': main() 7

Một cái gì đó mới kể từ Python 3.2 đã được chạm vào trong bài viết gốc là gói import logging import os from queue import Queue from threading import Thread from time import time from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) class DownloadWorker(Thread): def __init__(self, queue): Thread.__init__(self) self.queue = queue def run(self): while True: # Get the work from the queue and expand the tuple directory, link = self.queue.get() try: download_link(directory, link) finally: self.queue.task_done() def main(): ts = time() client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) # Create a queue to communicate with the worker threads queue = Queue() # Create 8 worker threads for x in range(8): worker = DownloadWorker(queue) # Setting daemon to True will let the main thread exit even though the workers are blocking worker.daemon = True worker.start() # Put the tasks into the queue as a tuple for link in links: logger.info('Queueing {}'.format(link)) queue.put((download_dir, link)) # Causes the main thread to wait for the queue to finish processing all the tasks queue.join() logging.info('Took %s', time() - ts) if __name__ == '__main__': main() 7. Gói này cung cấp một cách khác để sử dụng đồng thời và song song với Python.

Trong bài viết gốc, tôi đã đề cập rằng mô -đun đa bộ xử lý Python sẽ dễ dàng rơi vào mã hiện có so với mô -đun luồng. Điều này là do mô -đun luồng Python 3 yêu cầu phân nhóm lớp import logging import os from time import time from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) def main(): ts = time() client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) for link in links: download_link(download_dir, link) logging.info('Took %s seconds', time() - ts) if __name__ == '__main__': main() 7 và cũng tạo ra import logging import os from functools import partial from multiprocessing.pool import Pool from time import time from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logging.getLogger('requests').setLevel(logging.CRITICAL) logger = logging.getLogger(__name__) def main(): ts = time() client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) download = partial(download_link, download_dir) with Pool(4) as p: p.map(download, links) logging.info('Took %s seconds', time() - ts) if __name__ == '__main__': main() 0 cho các luồng để theo dõi công việc.

Sử dụng đồng thời.

import logging import os from concurrent.futures import ThreadPoolExecutor from functools import partial from time import time from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) def main(): client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) # By placing the executor inside a with block, the executors shutdown method # will be called cleaning up threads. # # By default, the executor sets number of workers to 5 times the number of # CPUs. with ThreadPoolExecutor() as executor: # Create a new partially applied function that stores the directory # argument. # # This allows the download_link function that normally takes two # arguments to work with the map function that expects a function of a # single argument. fn = partial(download_link, download_dir) # Executes fn concurrently using threads on the links iterable. The # timeout is for the entire process, not a single call, so downloading # all images must complete within 30 seconds. executor.map(fn, links, timeout=30) if __name__ == '__main__': main()

Bây giờ chúng tôi đã có tất cả các hình ảnh này được tải xuống với Python import logging import os from functools import partial from multiprocessing.pool import Pool from time import time from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logging.getLogger('requests').setLevel(logging.CRITICAL) logger = logging.getLogger(__name__) def main(): ts = time() client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) download = partial(download_link, download_dir) with Pool(4) as p: p.map(download, links) logging.info('Took %s seconds', time() - ts) if __name__ == '__main__': main() 1 của chúng tôi, chúng tôi có thể sử dụng chúng để kiểm tra tác vụ ràng buộc CPU. Chúng ta có thể tạo các phiên bản hình thu nhỏ của tất cả các hình ảnh trong cả một tập lệnh đơn, đơn, sau đó kiểm tra một giải pháp dựa trên đa xử lý.

Chúng tôi sẽ sử dụng thư viện gối để xử lý việc thay đổi kích thước của hình ảnh.

Đây là kịch bản ban đầu của chúng tôi.

import logging from pathlib import Path from time import time from PIL import Image logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) def create_thumbnail(size, path): """ Creates a thumbnail of an image with the same name as image but with _thumbnail appended before the extension. E.g.: >>> create_thumbnail((128, 128), 'image.jpg') A new thumbnail image is created with the name image_thumbnail.jpg :param size: A tuple of the width and height of the image :param path: The path to the image file :return: None """ image = Image.open(path) image.thumbnail(size) path = Path(path) name = path.stem + '_thumbnail' + path.suffix thumbnail_path = path.with_name(name) image.save(thumbnail_path) def main(): ts = time() for image_path in Path('images').iterdir(): create_thumbnail((128, 128), image_path) logging.info('Took %s', time() - ts) if __name__ == '__main__': main()

Tập lệnh này lặp lại trên các đường dẫn trong thư mục import logging import os from functools import partial from multiprocessing.pool import Pool from time import time from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logging.getLogger('requests').setLevel(logging.CRITICAL) logger = logging.getLogger(__name__) def main(): ts = time() client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) download = partial(download_link, download_dir) with Pool(4) as p: p.map(download, links) logging.info('Took %s seconds', time() - ts) if __name__ == '__main__': main() 2 và cho mỗi đường dẫn, nó chạy chức năng created_thumbnail. Chức năng này sử dụng gối để mở hình ảnh, tạo hình thu nhỏ và lưu hình ảnh mới, nhỏ hơn với cùng tên với bản gốc nhưng với import logging import os from functools import partial from multiprocessing.pool import Pool from time import time from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logging.getLogger('requests').setLevel(logging.CRITICAL) logger = logging.getLogger(__name__) def main(): ts = time() client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) download = partial(download_link, download_dir) with Pool(4) as p: p.map(download, links) logging.info('Took %s seconds', time() - ts) if __name__ == '__main__': main() 3 được gắn vào tên.

Chạy tập lệnh này trên 160 hình ảnh tổng cộng 36 triệu mất 2,32 giây. Hãy xem liệu chúng ta có thể tăng tốc độ này bằng cách sử dụng ProcessPoolExecutor hay không.

import logging from pathlib import Path from time import time from functools import partial from concurrent.futures import ProcessPoolExecutor from PIL import Image logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) def create_thumbnail(size, path): """ Creates a thumbnail of an image with the same name as image but with _thumbnail appended before the extension. E.g.: >>> create_thumbnail((128, 128), 'image.jpg') A new thumbnail image is created with the name image_thumbnail.jpg :param size: A tuple of the width and height of the image :param path: The path to the image file :return: None """ path = Path(path) name = path.stem + '_thumbnail' + path.suffix thumbnail_path = path.with_name(name) image = Image.open(path) image.thumbnail(size) image.save(thumbnail_path) def main(): ts = time() # Partially apply the create_thumbnail method, setting the size to 128x128 # and returning a function of a single argument. thumbnail_128 = partial(create_thumbnail, (128, 128)) # Create the executor in a with block so shutdown is called when the block # is exited. with ProcessPoolExecutor() as executor: executor.map(thumbnail_128, Path('images').iterdir()) logging.info('Took %s', time() - ts) if __name__ == '__main__': main()

Phương pháp import logging import os from functools import partial from multiprocessing.pool import Pool from time import time from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logging.getLogger('requests').setLevel(logging.CRITICAL) logger = logging.getLogger(__name__) def main(): ts = time() client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) download = partial(download_link, download_dir) with Pool(4) as p: p.map(download, links) logging.info('Took %s seconds', time() - ts) if __name__ == '__main__': main() 4 giống hệt với tập lệnh cuối cùng. Sự khác biệt chính là việc tạo ra một import logging import os from functools import partial from multiprocessing.pool import Pool from time import time from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logging.getLogger('requests').setLevel(logging.CRITICAL) logger = logging.getLogger(__name__) def main(): ts = time() client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) download = partial(download_link, download_dir) with Pool(4) as p: p.map(download, links) logging.info('Took %s seconds', time() - ts) if __name__ == '__main__': main() 5. Phương thức bản đồ của người thực thi được sử dụng để tạo hình thu nhỏ song song. Theo mặc định, import logging import os from functools import partial from multiprocessing.pool import Pool from time import time from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logging.getLogger('requests').setLevel(logging.CRITICAL) logger = logging.getLogger(__name__) def main(): ts = time() client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) download = partial(download_link, download_dir) with Pool(4) as p: p.map(download, links) logging.info('Took %s seconds', time() - ts) if __name__ == '__main__': main() 5 tạo ra một quy trình con trên mỗi CPU. Chạy tập lệnh này trên cùng 160 hình ảnh mất 1,05 giây nhanh hơn 2,2 lần!

Async/Await (chỉ Python 3.5+)

Một trong những mục được yêu cầu nhiều nhất trong các bình luận về bài viết gốc là một ví dụ sử dụng mô -đun Asyncio của Python 3. So với các ví dụ khác, có một số cú pháp Python mới có thể mới đối với hầu hết mọi người và cả một số khái niệm mới. Một lớp phức tạp bổ sung đáng tiếc là do mô-đun import logging import os from functools import partial from multiprocessing.pool import Pool from time import time from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logging.getLogger('requests').setLevel(logging.CRITICAL) logger = logging.getLogger(__name__) def main(): ts = time() client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) download = partial(download_link, download_dir) with Pool(4) as p: p.map(download, links) logging.info('Took %s seconds', time() - ts) if __name__ == '__main__': main() 7 tích hợp của Python không được đồng bộ hóa. Chúng tôi sẽ cần sử dụng một thư viện HTTP Async để có được các lợi ích đầy đủ của Asyncio. Đối với điều này, chúng tôi sẽ sử dụng Aiohttp.

Hãy để Lừa nhảy ngay vào mã và một lời giải thích chi tiết hơn sẽ theo sau.

import asyncio import logging import os from time import time import aiohttp from download import setup_download_dir, get_links logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) async def async_download_link(session, directory, link): """ Async version of the download_link method we've been using in the other examples. :param session: aiohttp ClientSession :param directory: directory to save downloads :param link: the url of the link to download :return: """ download_path = directory / os.path.basename(link) async with session.get(link) as response: with download_path.open('wb') as f: while True: # await pauses execution until the 1024 (or less) bytes are read from the stream chunk = await response.content.read(1024) if not chunk: # We are done reading the file, break out of the while loop break f.write(chunk) logger.info('Downloaded %s', link) # Main is now a coroutine async def main(): client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() # We use a session to take advantage of tcp keep-alive # Set a 3 second read and connect timeout. Default is 5 minutes async with aiohttp.ClientSession(conn_timeout=3, read_timeout=3) as session: tasks = [(async_download_link(session, download_dir, l)) for l in get_links(client_id)] # gather aggregates all the tasks and schedules them in the event loop await asyncio.gather(*tasks, return_exceptions=True) if __name__ == '__main__': ts = time() # Create the asyncio event loop loop = asyncio.get_event_loop() try: loop.run_until_complete(main()) finally: # Shutdown the loop even if there is an exception loop.close() logger.info('Took %s seconds to complete', time() - ts)

Có khá nhiều để giải nén ở đây. Hãy bắt đầu với điểm nhập cảnh chính của chương trình. Điều mới đầu tiên chúng tôi làm với mô -đun Asyncio là có được vòng lặp sự kiện. Vòng lặp sự kiện xử lý tất cả các mã không đồng bộ. Sau đó, vòng lặp được chạy cho đến khi hoàn thành và vượt qua hàm import logging import os from functools import partial from multiprocessing.pool import Pool from time import time from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logging.getLogger('requests').setLevel(logging.CRITICAL) logger = logging.getLogger(__name__) def main(): ts = time() client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) download = partial(download_link, download_dir) with Pool(4) as p: p.map(download, links) logging.info('Took %s seconds', time() - ts) if __name__ == '__main__': main() 8. Có một phần của cú pháp mới trong định nghĩa của chính: import logging import os from functools import partial from multiprocessing.pool import Pool from time import time from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logging.getLogger('requests').setLevel(logging.CRITICAL) logger = logging.getLogger(__name__) def main(): ts = time() client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) download = partial(download_link, download_dir) with Pool(4) as p: p.map(download, links) logging.info('Took %s seconds', time() - ts) if __name__ == '__main__': main() 9. Bạn cũng sẽ nhận thấy import logging import os from redis import Redis from rq import Queue from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logging.getLogger('requests').setLevel(logging.CRITICAL) logger = logging.getLogger(__name__) def main(): client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) q = Queue(connection=Redis(host='localhost', port=6379)) for link in links: q.enqueue(download_link, download_dir, link) if __name__ == '__main__': main() 0 và import logging import os from redis import Redis from rq import Queue from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logging.getLogger('requests').setLevel(logging.CRITICAL) logger = logging.getLogger(__name__) def main(): client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) q = Queue(connection=Redis(host='localhost', port=6379)) for link in links: q.enqueue(download_link, download_dir, link) if __name__ == '__main__': main() 1.

Cú pháp Async/Await đã được giới thiệu trong PEP492. Cú pháp import logging import os from functools import partial from multiprocessing.pool import Pool from time import time from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logging.getLogger('requests').setLevel(logging.CRITICAL) logger = logging.getLogger(__name__) def main(): ts = time() client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) download = partial(download_link, download_dir) with Pool(4) as p: p.map(download, links) logging.info('Took %s seconds', time() - ts) if __name__ == '__main__': main() 9 đánh dấu một chức năng như một coroutine. Trong nội bộ, các coroutines dựa trên các trình tạo Python, nhưng aren hoàn toàn giống nhau. Coroutines trả về một đối tượng Coroutine tương tự như cách các trình tạo trả về một đối tượng máy phát. Khi bạn có một coroutine, bạn có được kết quả của nó với biểu thức import logging import os from redis import Redis from rq import Queue from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logging.getLogger('requests').setLevel(logging.CRITICAL) logger = logging.getLogger(__name__) def main(): client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) q = Queue(connection=Redis(host='localhost', port=6379)) for link in links: q.enqueue(download_link, download_dir, link) if __name__ == '__main__': main() 0. Khi một coroutine gọi import logging import os from redis import Redis from rq import Queue from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logging.getLogger('requests').setLevel(logging.CRITICAL) logger = logging.getLogger(__name__) def main(): client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) q = Queue(connection=Redis(host='localhost', port=6379)) for link in links: q.enqueue(download_link, download_dir, link) if __name__ == '__main__': main() 0, việc thực hiện coroutine bị đình chỉ cho đến khi có thể chờ hoàn thành. Việc đình chỉ này cho phép hoàn thành công việc khác trong khi Coroutine bị đình chỉ đang chờ đợi một số kết quả. Nói chung, kết quả này sẽ là một loại I/O như yêu cầu cơ sở dữ liệu hoặc trong trường hợp của chúng tôi là yêu cầu HTTP.

Hàm import json import logging import os from pathlib import Path from urllib.request import urlopen, Request logger = logging.getLogger(__name__) types = {'image/jpeg', 'image/png'} def get_links(client_id): headers = {'Authorization': 'Client-ID {}'.format(client_id)} req = Request('//api.imgur.com/3/gallery/random/random/', headers=headers, method='GET') with urlopen(req) as resp: data = json.loads(resp.read().decode('utf-8')) return [item['link'] for item in data['data'] if 'type' in item and item['type'] in types] def download_link(directory, link): download_path = directory / os.path.basename(link) with urlopen(link) as image, download_path.open('wb') as f: f.write(image.read()) logger.info('Downloaded %s', link) def setup_download_dir(): download_dir = Path('images') if not download_dir.exists(): download_dir.mkdir() return download_dir 5 phải được thay đổi khá đáng kể. Trước đây, chúng tôi đã dựa vào import logging import os from functools import partial from multiprocessing.pool import Pool from time import time from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logging.getLogger('requests').setLevel(logging.CRITICAL) logger = logging.getLogger(__name__) def main(): ts = time() client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) download = partial(download_link, download_dir) with Pool(4) as p: p.map(download, links) logging.info('Took %s seconds', time() - ts) if __name__ == '__main__': main() 7 để thực hiện công việc đọc hình ảnh cho chúng tôi. Bây giờ, để cho phép phương pháp của chúng tôi hoạt động đúng với mô hình lập trình ASYNC, chúng tôi đã giới thiệu một vòng lặp import logging import os from redis import Redis from rq import Queue from download import setup_download_dir, get_links, download_link logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logging.getLogger('requests').setLevel(logging.CRITICAL) logger = logging.getLogger(__name__) def main(): client_id = os.getenv('IMGUR_CLIENT_ID') if not client_id: raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!") download_dir = setup_download_dir() links = get_links(client_id) q = Queue(connection=Redis(host='localhost', port=6379)) for link in links: q.enqueue(download_link, download_dir, link) if __name__ == '__main__': main() 7 đọc các khối hình ảnh tại một thời điểm và đình chỉ thực thi trong khi chờ hoàn thành I/O. Điều này cho phép vòng lặp sự kiện lặp lại thông qua việc tải xuống các hình ảnh khác nhau vì mỗi hình ảnh có sẵn dữ liệu mới trong quá trình tải xuống.

Nên có một - một cách tốt nhất là chỉ có một cách tốt để làm điều đó

Mặc dù Zen of Python nói với chúng ta rằng nên có một cách rõ ràng để làm một cái gì đó, có nhiều cách trong Python để giới thiệu đồng thời vào các chương trình của chúng ta. Phương pháp tốt nhất để chọn là sẽ phụ thuộc vào trường hợp sử dụng cụ thể của bạn. Mô hình không đồng bộ tỷ lệ tốt hơn với khối lượng công việc đối chiếu cao (như máy chủ web) so với luồng hoặc đa xử lý, nhưng nó đòi hỏi mã của bạn (và phụ thuộc) phải không đồng bộ để có lợi hoàn toàn.

Hy vọng rằng các ví dụ về luồng python trong bài viết này, và cập nhật, sẽ chỉ cho bạn đi đúng hướng để bạn có ý tưởng về nơi nhìn vào thư viện tiêu chuẩn Python nếu bạn cần giới thiệu đồng thời vào các chương trình của mình.

Liệu luồng có tăng tốc lên Python không?

Chủ đề và quy trình mất khoảng thời gian cho nhau, và cả hai đều nhanh hơn sử dụng vòng lặp. Trong chức năng này, không giống như giao thức trước, mỗi tác vụ được hoàn thành bởi các luồng mất cùng một lượng thời gian như khi hoàn thành bởi vòng lặp.. In this function, unlike the previous one, each task completed by threads takes the same amount of time as when completed by the loop.

Có phải luồng tốt trong Python?

Để tóm tắt lại, luồng trong Python cho phép nhiều luồng được tạo trong một quy trình duy nhất, nhưng do Gil, không ai trong số chúng sẽ chạy cùng một lúc.Chủ đề vẫn là một lựa chọn rất tốt khi chạy nhiều tác vụ ràng buộc I/O đồng thời.Threading is still a very good option when it comes to running multiple I/O bound tasks concurrently.

Có phải luồng trong Python có chậm không?

Kết quả kiểm tra cho thấy mã đa luồng thực sự chậm hơn đáng kể so với mã đa quy trình hoặc thậm chí thực thi tuần tự hóa.Đáng ngạc nhiên là bài kiểm tra cơ bản không có sự đồng thời ở tất cả các bài kiểm tra có thể tạo ra tất cả các bài kiểm tra luồng.multi-threaded code is indeed significantly slower compared to multi-process code or even serialised execution. Surprisingly the baseline test with no concurrency at all outperformed all of the threaded tests.

Chủ đề có tăng hiệu suất không?

Chúng tôi đã thực hiện một nghiên cứu chi tiết về một trình mô phỏng bộ vi xử lý Intel chạy các ứng dụng song song.Trì hoãn luồng có thể giảm mức tiêu thụ năng lượng từ 4% xuống 44% khi mất hiệu suất không đáng kể.Cân bằng chủ đề có thể tăng hiệu suất 20% hoặc có thể giảm 23% mức tiêu thụ năng lượng.Thread balancing can increase performance by 20% or can reduce energy consumption by 23%.

