Hướng dẫn non blocking socket python - Python ổ cắm không chặn

Ngày: Ngày 31 tháng 5 năm 2015

Là một người làm việc với ngăn xếp web và các ngôn ngữ như Python hoặc Ruby, có những cơ hội cao mà bạn đã nghe nói về I/O không chặn. Bạn cũng có thể sử dụng nó với một số dự án của bạn hoặc đã thử tay với các thư viện như Gevent hoặc Tornado. Nhưng làm thế nào để các thư viện này làm cho các yêu cầu mạng không chặn. Đây là điều mà tôi đã luôn tự hỏi khi tôi thử Gevent. Tôi chỉ không thể có được cái đầu của mình xung quanh thực tế là khi bạn gửi một cái gì đó đến một ổ cắm hoặc nhận từ nó, nó sẽ chặn thực thi trong ít nhất thời gian cần thiết để truyền dữ liệu. Vậy làm thế nào để tôi có thể thực hiện một cái gì đó khác trong khi I/O đang xảy ra? Vì vậy, tôi bắt đầu đào sâu, cố gắng hiểu cách thực hiện một số yêu cầu mạng không chặn trong Python.

Với loạt bài viết này, tôi sẽ cố gắng giới thiệu chủ đề và đi vào càng nhiều chi tiết càng tốt.

Không chặn I/O là gì?

Vì vậy, trước tiên hãy xem những gì đang chặn? Một chức năng đang chặn nếu nó phải chờ một cái gì đó hoàn thành. Có, mọi chức năng đều bị chặn - bất kể bạn đang thực hiện I/O hoặc thực hiện nhiệm vụ CPU. Mọi thứ đều mất một thời gian. Nếu một chức năng đang thực hiện một số nhiệm vụ đang làm cho CPU hoạt động, thì nó đang chặn chức năng trở lại. Tương tự, nếu một chức năng đang cố gắng lấy một cái gì đó từ cơ sở dữ liệu, thì nó sẽ chờ kết quả đến và sẽ chặn cho đến lúc đó để tiếp tục xử lý. Nhưng điều đó xảy ra là máy chủ không sử dụng CPU trong khi nó đang chờ cơ sở dữ liệu gửi phản hồi.Blocking? A function is blocking if it has to wait for something to complete. Yes, every function is blocking — no matter if you are doing I/O or doing CPU task. Everything takes some time. If a function is doing some task which is making the CPU work, then it is blocking the function from returning. Similarly, if a function is trying to get something from the database, then it is going to wait for the result to come and will block until then to continue the processing. But it so happens that the server is not making any use of the CPU while it is waiting for the database to send the response.

Vì vậy, nếu một chức năng bị chặn (vì bất kỳ lý do gì), nó có khả năng trì hoãn việc thực thi các tác vụ khác. Và tiến trình tổng thể của toàn bộ hệ thống có thể bị ảnh hưởng. Nếu chức năng bị chặn vì nó đang thực hiện một số tác vụ CPU, thì chúng ta không thể làm được gì nhiều. Nhưng nếu nó bị chặn vì I/O, chúng ta biết rằng CPU không hoạt động và có thể được sử dụng để bắt đầu một nhiệm vụ khác cần CPU.

Hãy xem một ví dụ về việc chặn yêu cầu mạng. Tôi có một máy chủ TCP rất đơn giản được viết bằng Python:

import socket
import sys

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

port = 1234 if len(sys.argv) == 1 else int(sys.argv[1])
sock.bind(('localhost', port))
sock.listen(5)

try:
while True:
conn, info = sock.accept()

data = conn.recv(1024)
while data:
print data
data = conn.recv(1024)
except KeyboardInterrupt:
sock.close

Và đây là một máy khách đơn giản để kết nối với máy chủ của chúng tôi:

import socket

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', 1234))

data = 'foobar\n' * 10 * 1024 * 1024 # 70 MB of data
assert sock.send(data) == len(data) # True

Mã trên sẽ chặn trong một thời gian dài. Nếu có mã sau lần cuối cùng, nó sẽ không được thực thi cho đến khi phương thức send trả về. Chuyện gì đang xảy ra ở đây? Phương pháp send() sẽ cố gắng truyền tất cả dữ liệu trong khi bộ đệm ghi sẽ được điền vào. Hạt nhân sẽ đặt quá trình vào giấc ngủ cho đến khi dữ liệu trong bộ đệm được chuyển đến đích và bộ đệm lại trống. Khi bộ đệm trở nên trống, kernel sẽ đánh thức quá trình này một lần nữa để có được phần dữ liệu tiếp theo sẽ được chuyển. Nói tóm lại, mã của bạn sẽ chặn và nó sẽ không để bất cứ điều gì khác tiến hành.

Hãy làm cho ví dụ trên không khối:

import socket

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', 1234))
sock.setblocking(0)

data = 'foobar\n' * 10 * 1024 * 1024 # 70 MB of data
assert sock.send(data) == len(data) # AssertionError

Khi bạn chạy máy khách trên, bạn sẽ nhận thấy rằng nó hoàn toàn không chặn được. Nhưng có một vấn đề với khách hàng - nó không gửi tất cả dữ liệu. Phương thức socket.send Trả về số byte được gửi. Khi bạn thực hiện một ổ cắm không chặn bằng cách gọi setblocking(0), nó sẽ không bao giờ chờ đợi hoạt động hoàn thành. Vì vậy, khi bạn gọi phương thức send(), nó sẽ đặt càng nhiều dữ liệu trong bộ đệm càng tốt và trả về. Vì điều này được đọc bởi kết nối từ xa, dữ liệu được xóa khỏi bộ đệm. Nếu bộ đệm đầy đủ và chúng tôi tiếp tục gửi dữ liệu,

import socket

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', 1234))

data = 'foobar\n' * 10 * 1024 * 1024 # 70 MB of data
assert sock.send(data) == len(data) # True

0 sẽ được nâng lên. Khi bạn cố gắng gửi dữ liệu nhiều hơn bộ đệm có thể chứa, chỉ số lượng dữ liệu có thể được cung cấp thực sự được gửi và send() trả về số byte được gửi. Điều này rất hữu ích để chúng tôi có thể cố gắng gửi dữ liệu còn lại khi bộ đệm trở nên trống. Hãy để cố gắng đạt được điều đó:full and we continue to send data,
import socket

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', 1234))

data = 'foobar\n' * 10 * 1024 * 1024 # 70 MB of data
assert sock.send(data) == len(data) # True

0 will be raised. When you try to send data more than the buffer can accommodate, only the amount of data that can be accommodated is actually sent and send() returns the number of bytes sent. This is useful so that we can try to send the remaining data when the buffer becomes empty. Let’s try to achieve that:

import errno
import select
import socket

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', 1234))
sock.setblocking(0)

data = 'foobar\n' * 1024 * 1024
data_size = len(data)
print 'Bytes to send: ', len(data)

total_sent = 0
while len(data):
try:
sent = sock.send(data)
total_sent += sent
data = data[sent:]
print 'Sending data'
except socket.error, e:
if e.errno != errno.EAGAIN:
raise e
print 'Blocking with', len(data), 'remaining'
select.select([], [sock], []) # This blocks until

assert total_sent == data_size # True

Trong ví dụ trên, chúng tôi đảm bảo rằng chúng tôi tiếp tục cố gắng gửi dữ liệu còn lại miễn là chúng tôi chưa gửi tất cả. Khi bộ đệm ghi đầy đủ và không thể chứa nhiều dữ liệu hơn, lỗi

import socket

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', 1234))

data = 'foobar\n' * 10 * 1024 * 1024 # 70 MB of data
assert sock.send(data) == len(data) # True

2 được nêu ra yêu cầu chúng tôi thử lại. Nếu bạn kiểm tra đối tượng ngoại lệ, thông báo ngoại lệ là tài nguyên tạm thời không có sẵn. Vì vậy, chúng tôi tiếp tục cố gắng gửi dữ liệu còn lại cho đến khi chúng tôi đã gửi tất cả.

Hiểu chọn ()

Dòng cuối cùng của ví dụ trên giới thiệu mô -đun chọn. Chọn Mô -đun giúp chúng tôi xử lý nhiều mô tả tệp cùng một lúc. Mô -đun được chọn bao gồm các triển khai của

import socket

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', 1234))

data = 'foobar\n' * 10 * 1024 * 1024 # 70 MB of data
assert sock.send(data) == len(data) # True

3,
import socket

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', 1234))

data = 'foobar\n' * 10 * 1024 * 1024 # 70 MB of data
assert sock.send(data) == len(data) # True

4,
import socket

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', 1234))

data = 'foobar\n' * 10 * 1024 * 1024 # 70 MB of data
assert sock.send(data) == len(data) # True

5 và
import socket

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', 1234))

data = 'foobar\n' * 10 * 1024 * 1024 # 70 MB of data
assert sock.send(data) == len(data) # True

6, được sử dụng bởi các thư viện như Eventlet, Twisted, Tornado và các thư viện khác. Chúng tôi sẽ xem xét chúng sau trong các bài viết sắp tới của loạt bài này. Vì chúng tôi đã tạo ra ổ cắm không chặn, chúng tôi không biết khi nào chúng tôi thực sự có thể viết thư cho nó trừ khi chúng tôi tiếp tục cố gắng viết thư cho nó và hy vọng nó sẽ không thất bại. Đây là một sự lãng phí lớn của thời gian CPU. Trong ví dụ trên, chúng tôi gọi hàm
import socket

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', 1234))

data = 'foobar\n' * 10 * 1024 * 1024 # 70 MB of data
assert sock.send(data) == len(data) # True

7 để tránh chính xác điều đó.

import socket

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', 1234))

data = 'foobar\n' * 10 * 1024 * 1024 # 70 MB of data
assert sock.send(data) == len(data) # True

7 Mong đợi ba đối số - Danh sách các mô tả tệp để xem để đọc, danh sách các mô tả tệp để xem để viết và danh sách các mô tả tệp để xem lỗi. Thời gian chờ có thể được thông qua như một đối số thứ 4 tùy chọn có thể được sử dụng để ngăn chặn
import socket

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', 1234))

data = 'foobar\n' * 10 * 1024 * 1024 # 70 MB of data
assert sock.send(data) == len(data) # True

7 chặn vô thời hạn. Nó trả về một tập hợp con của cả ba danh sách được truyền theo cùng một thứ tự, tức là tất cả các mô tả tệp đã sẵn sàng để đọc, viết hoặc đã gây ra một số lỗi.

Chúng tôi gọi hàm

import socket

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', 1234))

data = 'foobar\n' * 10 * 1024 * 1024 # 70 MB of data
assert sock.send(data) == len(data) # True

7 và truyền nó mô tả tệp yêu cầu cho chúng tôi biết cái nào trong số này sẵn sàng để đọc hoặc viết. Trong ví dụ trên, các khối
import socket

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', 1234))

data = 'foobar\n' * 10 * 1024 * 1024 # 70 MB of data
assert sock.send(data) == len(data) # True

7 nếu không có mô tả tệp đã sẵn sàng để làm việc. Bạn có thể nói rằng điều này vẫn đang chặn việc thực hiện chương trình của chúng tôi nhưng đây chỉ là nền tảng để xây dựng những điều tốt hơn. Đến bây giờ,
import socket

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', 1234))

data = 'foobar\n' * 10 * 1024 * 1024 # 70 MB of data
assert sock.send(data) == len(data) # True

7 sẽ chỉ chặn cho đến khi đối tượng sock của chúng tôi trở lại có thể ghi lại. Nếu chúng tôi loại bỏ dòng đó, tập lệnh của chúng tôi sẽ tiếp tục hoạt động nhưng vô dụng hơn rất nhiều trong khi các lần lặp lặp sẽ được chạy vì hầu hết chúng sẽ dẫn đến ngoại lệ.

Nhưng làm thế nào để

import socket

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', 1234))

data = 'foobar\n' * 10 * 1024 * 1024 # 70 MB of data
assert sock.send(data) == len(data) # True

7 thực sự hoạt động? Chà,
import socket

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', 1234))

data = 'foobar\n' * 10 * 1024 * 1024 # 70 MB of data
assert sock.send(data) == len(data) # True

7 không là gì ngoài giao diện cho cuộc gọi hệ thống UNIX
import socket

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', 1234))

data = 'foobar\n' * 10 * 1024 * 1024 # 70 MB of data
assert sock.send(data) == len(data) # True

7. Và nó khá dễ hiểu vì việc sử dụng không khác nhiều so với giao diện Python. Đối với những người tò mò, bạn có thể đọc thêm về trang người đàn ông để chọn và tại các liên kết sau:

  • http://en.wikipedia.org/wiki/Select_(Unix)
  • http://www.quora.com/Network-Programming/How-is-select-implemented

Giới thiệu về các vòng lặp sự kiện cho các sự kiện mạng

Bây giờ chúng tôi đã hiểu

import socket

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', 1234))

data = 'foobar\n' * 10 * 1024 * 1024 # 70 MB of data
assert sock.send(data) == len(data) # True

3 tốt hơn, hãy sử dụng nó để làm tốt hơn ví dụ cuối cùng của chúng tôi, nơi chúng tôi thực sự sử dụng một ổ cắm không chặn. Chúng tôi sẽ sử dụng các trình tạo để đảm bảo rằng tập lệnh của chúng tôi không chặn thực thi những thứ khác và cho phép mã khác cũng tiến hành. Xem xét ví dụ này:

import errno
import select
import socket
import time

def other_task():
i = 0
while i < 2000:
i += 1
print i
time.sleep(0.02)
yield

def send_data_task(port, data):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', port))
sock.setblocking(0)

data = (data + '\n') * 1024 * 1024
print 'Bytes to send: ', len(data)

total_sent = 0
while len(data):
try:
sent = sock.send(data)
total_sent += sent
data = data[sent:]
print 'Sending data'
except socket.error, e:
if e.errno != errno.EAGAIN:
raise e
yield ('w', sock)

print 'Bytes sent: ', total_sent

if __name__ == '__main__':
tasks = [
other_task(),
send_data_task(port=1234, data='foo'),
send_data_task(port=5678, data='bar'),
]

fds = dict(w={}, r={})
while len(tasks) or len(fds['w']) or len(fds['r']):
new_tasks = []
for task in tasks:
try:
resp = next(task)
try:
iter(resp)
fds[resp[0]][resp[1]] = task
except TypeError:
# this task has to be done since not
# dependent on any fd
new_tasks.append(task)
except StopIteration:
# function completed
pass

if len(fds['w'].keys()) or len(fds['r'].keys()):
readable, writeable, exceptional = select.select(
fds['r'].keys(), fds['w'].keys(), [], 0)
for readable_sock in readable:
new_tasks.append(fds['r'][fd])
del fds['r'][fd]
for fd in writeable:
new_tasks.append(fds['w'][fd])
del fds['w'][fd]
# ignore exceptional for now

tasks = new_tasks

Để chạy ví dụ này, chúng tôi sẽ chạy hai trường hợp máy chủ và làm cho tập lệnh của chúng tôi thực hiện đồng thời hai điều - chạy một hàm chỉ tăng một biến trong một vòng lặp và gửi dữ liệu đến hai máy chủ. Cả hai máy chủ về cơ bản là giống nhau. Chúng chỉ đang chạy trên các cổng khác nhau để tập lệnh của chúng tôi có thể kết nối với hai máy chủ cùng một lúc.

Trong ví dụ trên, chúng tôi có hai chức năng

import socket

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', 1234))
sock.setblocking(0)

data = 'foobar\n' * 10 * 1024 * 1024 # 70 MB of data
assert sock.send(data) == len(data) # AssertionError

7 và
import socket

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', 1234))
sock.setblocking(0)

data = 'foobar\n' * 10 * 1024 * 1024 # 70 MB of data
assert sock.send(data) == len(data) # AssertionError

8.
import socket

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', 1234))
sock.setblocking(0)

data = 'foobar\n' * 10 * 1024 * 1024 # 70 MB of data
assert sock.send(data) == len(data) # AssertionError

7 cố gắng gửi nhiều dữ liệu đến máy chủ TCP.
import socket

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', 1234))
sock.setblocking(0)

data = 'foobar\n' * 10 * 1024 * 1024 # 70 MB of data
assert sock.send(data) == len(data) # AssertionError

8 chỉ chạy một quầy. Chúng tôi muốn thực hiện cả hai chức năng. Nếu chúng ta thực hiện từng cái một, thì cuối cùng chúng ta sẽ chặn một lần thực hiện sau. Tuy nhiên, chúng ta có thể có cả hai chức năng tiến hành đồng thời. Chúng tôi sử dụng các ổ cắm và máy phát không chặn để thực hiện hai chức năng được hợp tác.

Hàm

import socket

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', 1234))
sock.setblocking(0)

data = 'foobar\n' * 10 * 1024 * 1024 # 70 MB of data
assert sock.send(data) == len(data) # AssertionError

7 tạo ra một ổ cắm không chặn và cố gắng gửi dữ liệu giống như ví dụ trước của chúng tôi. Sự khác biệt duy nhất là nó
import errno
import select
import socket

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', 1234))
sock.setblocking(0)

data = 'foobar\n' * 1024 * 1024
data_size = len(data)
print 'Bytes to send: ', len(data)

total_sent = 0
while len(data):
try:
sent = sock.send(data)
total_sent += sent
data = data[sent:]
print 'Sending data'
except socket.error, e:
if e.errno != errno.EAGAIN:
raise e
print 'Blocking with', len(data), 'remaining'
select.select([], [sock], []) # This blocks until

assert total_sent == data_size # True

2 khi bộ đệm ghi đầy đủ và một ngoại lệ được nâng lên. Vì chúng tôi không thể gửi thêm dữ liệu thông qua ổ cắm nữa, chúng tôi có thể có một khối quy trình mã khác không phụ thuộc vào chức năng này. Hàm
import socket

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', 1234))
sock.setblocking(0)

data = 'foobar\n' * 10 * 1024 * 1024 # 70 MB of data
assert sock.send(data) == len(data) # AssertionError

8 là một hàm đơn giản cho ví dụ này chỉ chạy một bộ đếm trong một vòng lặp. Chức năng này cũng mang lại sau mỗi lần lặp của vòng lặp.

Trong trường hợp

import socket

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', 1234))
sock.setblocking(0)

data = 'foobar\n' * 10 * 1024 * 1024 # 70 MB of data
assert sock.send(data) == len(data) # AssertionError

7, chúng tôi
import errno
import select
import socket

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', 1234))
sock.setblocking(0)

data = 'foobar\n' * 1024 * 1024
data_size = len(data)
print 'Bytes to send: ', len(data)

total_sent = 0
while len(data):
try:
sent = sock.send(data)
total_sent += sent
data = data[sent:]
print 'Sending data'
except socket.error, e:
if e.errno != errno.EAGAIN:
raise e
print 'Blocking with', len(data), 'remaining'
select.select([], [sock], []) # This blocks until

assert total_sent == data_size # True

5 khi chúng tôi không thể gửi thêm dữ liệu vì bộ đệm ghi của chúng tôi đã đầy. Nhưng khi chúng tôi mang lại, chúng tôi cũng trả lại một tuple với loại hoạt động trên ổ cắm (‘w, để viết,‘ r, để đọc) và chính đối tượng ổ cắm. Khi thực hiện được trả lại cho callee, chúng tôi duy trì một ánh xạ các đối tượng ổ cắm đến trình tạo đã trả lại nó.

Trong trường hợp

import errno
import select
import socket

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', 1234))
sock.setblocking(0)

data = 'foobar\n' * 1024 * 1024
data_size = len(data)
print 'Bytes to send: ', len(data)

total_sent = 0
while len(data):
try:
sent = sock.send(data)
total_sent += sent
data = data[sent:]
print 'Sending data'
except socket.error, e:
if e.errno != errno.EAGAIN:
raise e
print 'Blocking with', len(data), 'remaining'
select.select([], [sock], []) # This blocks until

assert total_sent == data_size # True

6, chúng tôi mang lại sau mỗi lần lặp. Tại sao? Nếu chúng tôi không làm điều đó, chức năng sẽ tiếp tục thực hiện cho đến khi hoàn thành tất cả những gì nó phải làm và chức năng khác của chúng tôi
import socket

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', 1234))
sock.setblocking(0)

data = 'foobar\n' * 10 * 1024 * 1024 # 70 MB of data
assert sock.send(data) == len(data) # AssertionError

7 sẽ không có cơ hội tiến hành. Vì vậy, chúng tôi có ý thức cố gắng chuyển thực thi sang một chức năng khác bất cứ khi nào có thể. Vì chức năng này không phụ thuộc vào bất kỳ đối tượng FD hoặc ổ cắm nào, chúng tôi không trả lại bất cứ điều gì khi chúng tôi mang lại. Đây chỉ là một cách chúng tôi đã thiết kế triển khai của mình - xấu xí như nó có thể nhìn nhưng nó giữ cho mọi thứ đơn giản để hiểu.

Trong khối chính của chúng tôi, chúng tôi duy trì danh sách các chức năng mà chúng tôi muốn gọi trong danh sách có tên là

import errno
import select
import socket

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', 1234))
sock.setblocking(0)

data = 'foobar\n' * 1024 * 1024
data_size = len(data)
print 'Bytes to send: ', len(data)

total_sent = 0
while len(data):
try:
sent = sock.send(data)
total_sent += sent
data = data[sent:]
print 'Sending data'
except socket.error, e:
if e.errno != errno.EAGAIN:
raise e
print 'Blocking with', len(data), 'remaining'
select.select([], [sock], []) # This blocks until

assert total_sent == data_size # True

8. Nói chính xác, cả hai chức năng của chúng tôi đều sử dụng
import errno
import select
import socket

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', 1234))
sock.setblocking(0)

data = 'foobar\n' * 1024 * 1024
data_size = len(data)
print 'Bytes to send: ', len(data)

total_sent = 0
while len(data):
try:
sent = sock.send(data)
total_sent += sent
data = data[sent:]
print 'Sending data'
except socket.error, e:
if e.errno != errno.EAGAIN:
raise e
print 'Blocking with', len(data), 'remaining'
select.select([], [sock], []) # This blocks until

assert total_sent == data_size # True

5 và do đó các trình tạo trở về khi chúng tôi gọi chúng. Vì vậy,
import errno
import select
import socket

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', 1234))
sock.setblocking(0)

data = 'foobar\n' * 1024 * 1024
data_size = len(data)
print 'Bytes to send: ', len(data)

total_sent = 0
while len(data):
try:
sent = sock.send(data)
total_sent += sent
data = data[sent:]
print 'Sending data'
except socket.error, e:
if e.errno != errno.EAGAIN:
raise e
print 'Blocking with', len(data), 'remaining'
select.select([], [sock], []) # This blocks until

assert total_sent == data_size # True

8 thực sự duy trì một danh sách các trình tạo được trả về bởi các chức năng mà chúng tôi muốn hợp tác thực thi. Chúng tôi chạy một vòng lặp miễn là các nhiệm vụ của chúng tôi không hoàn thành việc thực hiện của họ. Trên mỗi lần lặp của vòng lặp, chúng tôi chạy từng tác vụ một-một bằng cách sử dụng hàm
import errno
import select
import socket
import time

def other_task():
i = 0
while i < 2000:
i += 1
print i
time.sleep(0.02)
yield

def send_data_task(port, data):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', port))
sock.setblocking(0)

data = (data + '\n') * 1024 * 1024
print 'Bytes to send: ', len(data)

total_sent = 0
while len(data):
try:
sent = sock.send(data)
total_sent += sent
data = data[sent:]
print 'Sending data'
except socket.error, e:
if e.errno != errno.EAGAIN:
raise e
yield ('w', sock)

print 'Bytes sent: ', total_sent

if __name__ == '__main__':
tasks = [
other_task(),
send_data_task(port=1234, data='foo'),
send_data_task(port=5678, data='bar'),
]

fds = dict(w={}, r={})
while len(tasks) or len(fds['w']) or len(fds['r']):
new_tasks = []
for task in tasks:
try:
resp = next(task)
try:
iter(resp)
fds[resp[0]][resp[1]] = task
except TypeError:
# this task has to be done since not
# dependent on any fd
new_tasks.append(task)
except StopIteration:
# function completed
pass

if len(fds['w'].keys()) or len(fds['r'].keys()):
readable, writeable, exceptional = select.select(
fds['r'].keys(), fds['w'].keys(), [], 0)
for readable_sock in readable:
new_tasks.append(fds['r'][fd])
del fds['r'][fd]
for fd in writeable:
new_tasks.append(fds['w'][fd])
del fds['w'][fd]
# ignore exceptional for now

tasks = new_tasks

1. Chức năng tiếp tục thực thi và mang lại bất cứ khi nào nó có thể.

Trong khi vòng lặp chạy miễn là danh sách các tác vụ không trống hoặc chúng tôi có bất kỳ đối tượng FD hoặc ổ cắm nào để xem. Chúng tôi chạy từng nhiệm vụ từng người một. Khi chúng tôi gọi

import errno
import select
import socket
import time

def other_task():
i = 0
while i < 2000:
i += 1
print i
time.sleep(0.02)
yield

def send_data_task(port, data):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', port))
sock.setblocking(0)

data = (data + '\n') * 1024 * 1024
print 'Bytes to send: ', len(data)

total_sent = 0
while len(data):
try:
sent = sock.send(data)
total_sent += sent
data = data[sent:]
print 'Sending data'
except socket.error, e:
if e.errno != errno.EAGAIN:
raise e
yield ('w', sock)

print 'Bytes sent: ', total_sent

if __name__ == '__main__':
tasks = [
other_task(),
send_data_task(port=1234, data='foo'),
send_data_task(port=5678, data='bar'),
]

fds = dict(w={}, r={})
while len(tasks) or len(fds['w']) or len(fds['r']):
new_tasks = []
for task in tasks:
try:
resp = next(task)
try:
iter(resp)
fds[resp[0]][resp[1]] = task
except TypeError:
# this task has to be done since not
# dependent on any fd
new_tasks.append(task)
except StopIteration:
# function completed
pass

if len(fds['w'].keys()) or len(fds['r'].keys()):
readable, writeable, exceptional = select.select(
fds['r'].keys(), fds['w'].keys(), [], 0)
for readable_sock in readable:
new_tasks.append(fds['r'][fd])
del fds['r'][fd]
for fd in writeable:
new_tasks.append(fds['w'][fd])
del fds['w'][fd]
# ignore exceptional for now

tasks = new_tasks

2, nó sẽ mang lại một tuple với thao tác (đọc hoặc viết), chúng tôi đã thực hiện trên ổ cắm và chính đối tượng ổ cắm. Chúng tôi giữ đối tượng ổ cắm trong một từ điển gọi là
import errno
import select
import socket
import time

def other_task():
i = 0
while i < 2000:
i += 1
print i
time.sleep(0.02)
yield

def send_data_task(port, data):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', port))
sock.setblocking(0)

data = (data + '\n') * 1024 * 1024
print 'Bytes to send: ', len(data)

total_sent = 0
while len(data):
try:
sent = sock.send(data)
total_sent += sent
data = data[sent:]
print 'Sending data'
except socket.error, e:
if e.errno != errno.EAGAIN:
raise e
yield ('w', sock)

print 'Bytes sent: ', total_sent

if __name__ == '__main__':
tasks = [
other_task(),
send_data_task(port=1234, data='foo'),
send_data_task(port=5678, data='bar'),
]

fds = dict(w={}, r={})
while len(tasks) or len(fds['w']) or len(fds['r']):
new_tasks = []
for task in tasks:
try:
resp = next(task)
try:
iter(resp)
fds[resp[0]][resp[1]] = task
except TypeError:
# this task has to be done since not
# dependent on any fd
new_tasks.append(task)
except StopIteration:
# function completed
pass

if len(fds['w'].keys()) or len(fds['r'].keys()):
readable, writeable, exceptional = select.select(
fds['r'].keys(), fds['w'].keys(), [], 0)
for readable_sock in readable:
new_tasks.append(fds['r'][fd])
del fds['r'][fd]
for fd in writeable:
new_tasks.append(fds['w'][fd])
del fds['w'][fd]
# ignore exceptional for now

tasks = new_tasks

3 trong đó chúng tôi duy trì hai từ điển khác nhau của các đối tượng - một cho những từ mà chúng tôi đang viết và một từ khác cho những người chúng tôi đang đọc. Sau đó, chúng tôi chạy
import socket

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', 1234))
sock.setblocking(0)

data = 'foobar\n' * 10 * 1024 * 1024 # 70 MB of data
assert sock.send(data) == len(data) # AssertionError

8 và nó không mang lại gì.

Việc thực hiện được trả lại cho khối chính. Sau khi thực hiện các tác vụ, chúng tôi xem liệu có bất kỳ đối tượng hoặc FD nào mà chúng tôi cần xem bằng

import socket

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', 1234))

data = 'foobar\n' * 10 * 1024 * 1024 # 70 MB of data
assert sock.send(data) == len(data) # True

7 và gọi nó phù hợp.
import socket

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', 1234))

data = 'foobar\n' * 10 * 1024 * 1024 # 70 MB of data
assert sock.send(data) == len(data) # True

7 Trả về một tập hợp con của ổ cắm/FD có thể được đọc từ hoặc viết thành. Nếu chúng ta có thể đọc từ A hoặc ghi vào bất kỳ ổ cắm/FD nào, chúng ta sẽ tìm trình tạo tương ứng trong từ điển tương ứng trong
import errno
import select
import socket
import time

def other_task():
i = 0
while i < 2000:
i += 1
print i
time.sleep(0.02)
yield

def send_data_task(port, data):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', port))
sock.setblocking(0)

data = (data + '\n') * 1024 * 1024
print 'Bytes to send: ', len(data)

total_sent = 0
while len(data):
try:
sent = sock.send(data)
total_sent += sent
data = data[sent:]
print 'Sending data'
except socket.error, e:
if e.errno != errno.EAGAIN:
raise e
yield ('w', sock)

print 'Bytes sent: ', total_sent

if __name__ == '__main__':
tasks = [
other_task(),
send_data_task(port=1234, data='foo'),
send_data_task(port=5678, data='bar'),
]

fds = dict(w={}, r={})
while len(tasks) or len(fds['w']) or len(fds['r']):
new_tasks = []
for task in tasks:
try:
resp = next(task)
try:
iter(resp)
fds[resp[0]][resp[1]] = task
except TypeError:
# this task has to be done since not
# dependent on any fd
new_tasks.append(task)
except StopIteration:
# function completed
pass

if len(fds['w'].keys()) or len(fds['r'].keys()):
readable, writeable, exceptional = select.select(
fds['r'].keys(), fds['w'].keys(), [], 0)
for readable_sock in readable:
new_tasks.append(fds['r'][fd])
del fds['r'][fd]
for fd in writeable:
new_tasks.append(fds['w'][fd])
del fds['w'][fd]
# ignore exceptional for now

tasks = new_tasks

3 và nối nó vào danh sách các tác vụ mới sẽ được thực hiện trong lần lặp tiếp theo của chính trong khi vòng lặp. Cuối cùng, chúng tôi chỉ thay thế các nhiệm vụ bằng
import errno
import select
import socket
import time

def other_task():
i = 0
while i < 2000:
i += 1
print i
time.sleep(0.02)
yield

def send_data_task(port, data):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', port))
sock.setblocking(0)

data = (data + '\n') * 1024 * 1024
print 'Bytes to send: ', len(data)

total_sent = 0
while len(data):
try:
sent = sock.send(data)
total_sent += sent
data = data[sent:]
print 'Sending data'
except socket.error, e:
if e.errno != errno.EAGAIN:
raise e
yield ('w', sock)

print 'Bytes sent: ', total_sent

if __name__ == '__main__':
tasks = [
other_task(),
send_data_task(port=1234, data='foo'),
send_data_task(port=5678, data='bar'),
]

fds = dict(w={}, r={})
while len(tasks) or len(fds['w']) or len(fds['r']):
new_tasks = []
for task in tasks:
try:
resp = next(task)
try:
iter(resp)
fds[resp[0]][resp[1]] = task
except TypeError:
# this task has to be done since not
# dependent on any fd
new_tasks.append(task)
except StopIteration:
# function completed
pass

if len(fds['w'].keys()) or len(fds['r'].keys()):
readable, writeable, exceptional = select.select(
fds['r'].keys(), fds['w'].keys(), [], 0)
for readable_sock in readable:
new_tasks.append(fds['r'][fd])
del fds['r'][fd]
for fd in writeable:
new_tasks.append(fds['w'][fd])
del fds['w'][fd]
# ignore exceptional for now

tasks = new_tasks

8 để các vòng trong khi chọn các nhiệm vụ mới.

Điều này tiếp tục chạy cho đến khi chúng tôi không còn nhiệm vụ trong danh sách các nhiệm vụ và không có thêm ổ cắm/FD để xem. Và bằng cách này, các chức năng của chúng tôi hợp tác để cho nhau tiến hành. Ví dụ có lẽ là xấu xí và thậm chí không gần với việc sử dụng trong thế giới thực nhưng nó đơn giản và nó có được ý tưởng.

Lớn trong khi vòng lặp mà bạn thấy trong khối chính của chúng tôi là việc triển khai vòng lặp sự kiện cho tập lệnh của chúng tôi. Nó làm gì? Nó chỉ xem các sự kiện và lịch trình mạng tương ứng các khối mã tương ứng để chạy và khi nào chúng có thể. Tuy nhiên, việc triển khai của chúng tôi là một triển khai rất đơn giản và nó chắc chắn không xử lý những điều phổ biến nhất. Nó chỉ là đủ để đánh giá cao khái niệm.

Ví dụ

Tất cả các ví dụ mã trong bài viết này có thể được tìm thấy ở đây.

Cái gì tiếp theo?

Đó là phần giới thiệu về cách bạn có thể tạo ổ cắm không chặn và sử dụng chức năng Chọn từ mô-đun chọn để xem mô tả tệp để đọc hoặc viết. Chúng tôi đã hiểu cách chọn () hoạt động, tạo một tập lệnh thực hiện mạng I/O không chặn bằng cách sử dụng các ổ cắm không chặn và sử dụng các trình tạo và chọn () để thực hiện vòng lặp I/O rất đơn giản.

Trong bài viết tiếp theo, chúng tôi sẽ xem xét nhiều ví dụ hơn và xem xét cơ sở hạ tầng khác để xử lý các ổ cắm không chặn như Poll và Epoll.