Đa xử lý Python không ghi vào tệp

Mỗi chương trình Python có ít nhất một luồng thực thi được gọi là luồng chính. Cả quy trình và luồng đều được tạo và quản lý bởi hệ điều hành bên dưới

Đôi khi, chúng tôi có thể cần tạo các luồng bổ sung trong chương trình của mình để thực thi mã đồng thời

Python cung cấp khả năng tạo và quản lý các luồng mới thông qua mô-đun luồng và luồng. lớp chủ đề

Bạn có thể tìm hiểu thêm về Python thread trong hướng dẫn

  • Luồng trong Python. Hướng dẫn hoàn chỉnh

Trong lập trình đồng thời, chúng ta có thể cần ghi vào một tệp từ nhiều luồng

Việc ghi vào tệp có an toàn theo luồng không?

Chạy các vòng lặp của bạn bằng cách sử dụng tất cả các CPU, tải xuống cuốn sách MIỄN PHÍ của tôi để tìm hiểu cách thực hiện

Ghi vào tệp không an toàn cho chủ đề

Ghi vào tệp có nghĩa là mở đường dẫn tệp và ghi hoặc nối thêm dữ liệu vào tệp

Một tệp có thể được mở bằng công cụ tích hợp chỉ định đường dẫn tệp và chế độ mở, chẳng hạn như 'w' để viết văn bản hoặc 'a' để nối thêm văn bản. Sau khi mở, dữ liệu có thể được ghi thông qua các hàm write() hoặc writelines()

Cách tốt nhất là mở tệp bằng giao diện trình quản lý ngữ cảnh để tệp được đóng tự động sau khi mọi thao tác ghi kết thúc

Ví dụ

1

2

3

4

5

.. .

# mở tệp để nối thêm

với mở('đường dẫn/đến/tệp. txt', 'a') as file:

# ghi văn bản vào dữ liệu

tệp. viết('Kiểm tra')

Việc ghi vào cùng một tệp từ nhiều luồng đồng thời không phải là luồng an toàn và có thể dẫn đến tình trạng dồn đuổi

An toàn luồng có nghĩa là việc ghi hoặc nối thêm vào cùng một tệp từ nhiều luồng có thể dẫn đến tình trạng tương tranh

Kết quả của một tình trạng tương tranh trong khi ghi hoặc nối thêm vào tệp có thể là dữ liệu được ghi vào tệp bị ghi đè hoặc bị hỏng, dẫn đến mất dữ liệu

Bây giờ chúng ta đã biết rằng việc ghi vào một tệp không an toàn theo luồng, hãy xem cách chúng ta có thể làm cho nó an toàn theo luồng

Bối rối với API mô-đun phân luồng?
Tải xuống bảng cheat PDF MIỄN PHÍ của tôi

Cách ghi vào tệp an toàn theo chủ đề

Việc ghi vào một tệp có thể được thực hiện an toàn theo luồng bằng cách sử dụng khóa loại trừ lẫn nhau (mutex)

Bất kỳ mã nào mở và ghi vào tệp hoặc nối thêm vào tệp đều có thể được coi là một phần quan trọng của mã tùy thuộc vào điều kiện chủng tộc

Mã này có thể được bảo vệ khỏi các điều kiện chủng tộc bằng cách yêu cầu luồng truy cập trước tiên phải có khóa mutex trước khi thực hiện phần quan trọng

Một khóa mutex chỉ có thể được lấy bởi một luồng tại một thời điểm và sau khi được lấy sẽ ngăn chặn bất kỳ luồng nào khác lấy được nó cho đến khi khóa được giải phóng

Điều này có nghĩa là chỉ một luồng duy nhất có thể ghi vào tệp tại một thời điểm, làm cho việc ghi vào luồng tệp an toàn

Điều này có thể đạt được bằng cách sử dụng luồng. khóa lớp

Đầu tiên, một khóa có thể được tạo và chia sẻ giữa tất cả các mã cần truy cập vào cùng một tệp

1

2

3

.. .

# tạo khóa

khóa = phân luồng. Khóa()

Sau khi được tạo, một luồng sau đó có thể lấy khóa trước khi ghi vào tệp bằng cách gọi hàm thu thập (). Sau khi ghi vào tệp xong, khóa có thể được giải phóng bằng cách gọi hàm release()

Ví dụ

1

2

3

4

5

6

7

8

9

.. .

# lấy khóa

khóa. có được()

# mở tệp để nối thêm

với mở('đường dẫn/đến/tệp. txt', 'a') as file:

# ghi văn bản vào dữ liệu

tệp. viết('Kiểm tra')

# nhả khóa

khóa. phát hành()

Giống như mở tệp, giao diện trình quản lý bối cảnh có thể được sử dụng để đảm bảo rằng khóa luôn được giải phóng sau khi thoát mã kèm theo khối

Ví dụ

1

2

3

4

5

6

7

.. .

# lấy khóa

với khóa.

# mở tệp để nối thêm

với mở('đường dẫn/đến/tệp. txt', 'a') as file:

# ghi văn bản vào dữ liệu

tệp. viết('Kiểm tra')

Nếu một luồng cố lấy khóa trong khi nó đã được lấy, thì nó phải chặn hoặc đợi cho đến khi khóa được giải phóng. Quá trình chờ đợi này được thực hiện tự động bởi chuỗi khi cố gắng lấy khóa, không yêu cầu mã bổ sung

Bạn có thể tìm hiểu thêm về cách bảo vệ các phần quan trọng bằng khóa mutex trong hướng dẫn này

  • Cách sử dụng Khóa Mutex trong Python

Tiếp theo, hãy xem xét một số ví dụ đã hoạt động


Khóa học lập trình Python miễn phí

Tải xuống bảng cheat API luồng của tôi và như một phần thưởng, bạn sẽ có quyền truy cập MIỄN PHÍ vào khóa học email 7 ngày của tôi

Khám phá cách sử dụng mô-đun phân luồng Python, bao gồm cách tạo và bắt đầu các luồng mới cũng như cách sử dụng khóa và ngữ nghĩa mutex

Tìm hiểu thêm
 


Ví dụ về Thread-Unsafe Ghi vào tập tin

Trước khi chúng ta xem cách ghi vào tệp an toàn theo chuỗi, hãy xem cách nó không an toàn theo chuỗi

Chúng ta có thể phát triển một ví dụ chứng minh rằng việc thêm vào một tệp từ nhiều luồng không phải là luồng an toàn và dẫn đến mất dữ liệu

Trong ví dụ này, chúng tôi sẽ tạo 1.000 luồng, mỗi luồng sẽ tạo 1.000 số ngẫu nhiên trong khoảng từ 0 đến 1 và viết chúng dưới dạng một dòng văn bản vào cùng một tệp

Có một vài cách để cấu trúc chương trình này, ví dụ

  • Mỗi luồng mở tệp mỗi khi cần viết một dòng văn bản
  • Mỗi luồng mở tệp một lần và ghi từng dòng khi được tạo
  • Ứng dụng mở tệp một lần và mỗi luồng ghi bằng cách sử dụng cùng một tệp xử lý

Cách tiếp cận cụ thể được thực hiện thực sự phụ thuộc vào các chi tiết của ứng dụng

Trong trường hợp này, chúng tôi sẽ mở tệp một lần trong ứng dụng và yêu cầu tất cả các luồng ghi vào tệp bằng cách sử dụng cùng một trình xử lý tệp. Điều này thực sự gây hấn và rất dễ hiển thị tình trạng cạnh tranh khi nhiều luồng ghi vào cùng một tệp

Đầu tiên, chúng ta có thể định nghĩa một chức năng tác vụ sẽ được thực thi bởi mỗi luồng

Hàm sẽ lấy một số nguyên duy nhất để xác định luồng và xử lý tệp làm đối số. Tác vụ sẽ lặp trong 1.000 lần lặp và mỗi lần lặp nó sẽ tạo một số ngẫu nhiên bằng cách sử dụng ngẫu nhiên. random() và sau đó viết số này như một phần của dòng văn bản vào tệp

Hàm task() bên dưới thực hiện điều này

1

2

3

4

5

6

7

8

# nhiệm vụ cho chủ đề công nhân

def nhiệm vụ(số, file):

    # vòng lặp nhiệm vụ

    cho i trong phạm vi(1000):

        # tạo số ngẫu nhiên trong khoảng từ 0 đến 1

        giá trị = ngẫu nhiên()

        # ghi vào tệp

        tệp. ghi(f'Chủ đề {số} có {giá trị}. \n')

Tiếp theo, trong luồng chính, chúng ta có thể mở tệp để nối thêm

Lưu ý, chúng tôi sẽ ghi vào một tệp có tên là đầu ra. txt” trong thư mục làm việc hiện tại. Đây là cùng một thư mục chứa tập lệnh Python của bạn cho chương trình

1

2

3

4

5

.. .

# làm hỏng đường dẫn tệp được chia sẻ

filepath = 'đầu ra. txt'

# Mở tập tin

tệp = mở(đường dẫn tệp, 'a')

Sau đó, chúng tôi có thể tạo 1.000 luồng mới được định cấu hình để gọi hàm task(), mỗi luồng có một mã định danh số nguyên duy nhất và xử lý tệp được chia sẻ

Điều này có thể được thực hiện trong một danh sách hiểu

1

2

3

.. .

# cấu hình nhiều chủ đề

luồng = [Luồng(target=task, args=(i,file)) for i in range(1000)]

Sau đó, luồng chính có thể bắt đầu tất cả các luồng bằng cách gọi phương thức start() và sau đó đợi cho đến khi tất cả các luồng kết thúc bằng cách gọi phương thức join()

1

2

3

4

5

6

7

.. .

# bắt đầu chủ đề

cho chuỗi trong chuỗi.

    chuỗi. bắt đầu()

# chờ chủ đề kết thúc

cho chuỗi trong chuỗi.

    chuỗi. tham gia()

Cuối cùng, luồng chính sẽ đóng tệp

1

2

3

.. .

# đóng tệp

tệp. đóng()

Liên kết điều này lại với nhau, ví dụ hoàn chỉnh được liệt kê bên dưới

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

#Trăn Siêu Nhanh. com

# ví dụ về việc ghi vào tệp không an toàn cho luồng

từ ngẫu nhiên nhập ngẫu nhiên

từ thời gian nhập ngủ

từ luồng nhập Luồng

 

# nhiệm vụ cho chủ đề công nhân

def nhiệm vụ(số, file):

    # vòng lặp nhiệm vụ

    cho i trong phạm vi(1000):

        # tạo số ngẫu nhiên trong khoảng từ 0 đến 1

        giá trị = ngẫu nhiên()

        # ghi vào tệp

        tệp. ghi(f'Chủ đề {số} có {giá trị}. \n')

 

# làm hỏng đường dẫn tệp được chia sẻ

filepath = 'đầu ra. txt'

# Mở tập tin

tệp = mở(đường dẫn tệp, 'a')

# cấu hình nhiều chủ đề

luồng = [Luồng(target=task, args=(i,file)) for i in range(1000)]

# bắt đầu chủ đề

cho chuỗi trong chuỗi.

    chuỗi. bắt đầu()

# chờ chủ đề kết thúc

cho chuỗi trong chuỗi.

    chuỗi. tham gia()

# đóng tệp

tệp. đóng()

Trước tiên, chạy ví dụ sẽ mở tệp để nối thêm

Tiếp theo, 1.000 luồng được tạo và bắt đầu, sau đó luồng chính sẽ chặn cho đến khi tất cả các tác vụ hoàn tất

Mỗi luồng lặp và ghi 1.000 dòng vào tệp

Tất cả các nhiệm vụ đã hoàn thành và tệp đã đóng

Kỳ vọng là tệp chứa 1.000.000 dòng, với 1.000 dòng được viết bởi mỗi luồng

Chúng tôi có thể kiểm tra điều này bằng cách mở tệp trong trình soạn thảo văn bản và kiểm tra nội dung

1

2

3

4

5

6

7

8

9

10

11

Chủ đề 0 nhận 0. 11577738406435278

Chủ đề 0 nhận 0. 09757771957904438

Chủ đề 0 nhận 0. 5200579840159526

Chủ đề 0 nhận 0. 5369696807463148

Chủ đề 0 nhận 0. 5260154027798022

Chủ đề 0 nhận 0. 4863295793428698

Chủ đề 0 nhận 0. 36290927042061494

Chủ đề 0 nhận 0. 8170862775710401

Chủ đề 0 nhận 0. 5448456180398039

Chủ đề 0 nhận 0. 48267153945021646

...

Nhập lệnh sau trên hệ điều hành POSIX (e. g. Linux hoặc MacOS) để báo cáo tổng số dòng trong tệp

1

đầu ra của mèo. txt. wc -l

Trong trường hợp này, chúng ta có thể thấy rằng tệp chứa ít hơn số dòng dự kiến

Dữ liệu bị mất. Điều này nhấn mạnh rằng có nhiều luồng nối vào cùng một tệp (sử dụng cùng một tệp xử lý) không phải là luồng an toàn

1

993495

Xóa tệp và chạy lại chương trình

Bạn có thể sẽ thấy một số dòng khác nhau mỗi khi mã được chạy, do tính chất không thể đoán trước của điều kiện cuộc đua

1

998346

Tiếp theo, hãy xem cách chúng ta có thể ghi vào tệp theo luồng an toàn

Choáng ngợp trước các API đồng thời của python?
Để tìm sự giải thoát, hãy tải xuống Bản đồ tư duy về đồng thời Python MIỄN PHÍ của tôi

Ví dụ về ghi an toàn luồng vào tệp

Chúng tôi có thể viết hoặc nối một tệp theo cách an toàn cho luồng bằng cách sử dụng khóa mutex

Ví dụ trong phần trước có thể được cập nhật để đảm bảo an toàn cho luồng bằng cách yêu cầu mỗi luồng trước tiên phải có khóa trước khi ghi vào tệp

Điều này có thể đạt được bằng cách chuyển khóa tới chức năng tác vụ làm đối số và sau đó lấy khóa trước mỗi lệnh gọi write() trên phần xử lý tệp được chia sẻ

Phiên bản cập nhật của hàm task() với những thay đổi này được liệt kê bên dưới

1

2

3

4

5

6

7

8

9

# nhiệm vụ cho chủ đề công nhân

def nhiệm vụ(số, file, lock):

    # vòng lặp nhiệm vụ

    cho i trong phạm vi(1000):

        # tạo số ngẫu nhiên trong khoảng từ 0 đến 1

        giá trị = ngẫu nhiên()

        # ghi vào tệp

        với khóa.

            tệp. ghi(f'Chủ đề {số} có {giá trị}. \n')

Trong luồng chính, sau đó chúng ta có thể tạo phiên bản khóa được chia sẻ

1

2

3

.. .

# tạo khóa chung

khóa = Khóa()

Sau đó, khóa có thể được chuyển đến từng luồng dưới dạng đối số cho hàm task()

1

2

3

.. .

# cấu hình nhiều chủ đề

luồng = [Luồng(target=task, args=(i,file,lock)) for i in range(1000)]

Liên kết điều này lại với nhau, ví dụ hoàn chỉnh về cách ghi an toàn luồng vào tệp được liệt kê bên dưới

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

#Trăn Siêu Nhanh. com

# ví dụ về cách ghi an toàn luồng vào tệp

từ ngẫu nhiên nhập ngẫu nhiên

từ luồng nhập Luồng

từ luồng nhập Khóa

 

# nhiệm vụ cho chủ đề công nhân

def nhiệm vụ(số, file, lock):

    # vòng lặp nhiệm vụ

    cho i trong phạm vi(1000):

        # tạo số ngẫu nhiên trong khoảng từ 0 đến 1

        giá trị = ngẫu nhiên()

        # ghi vào tệp

        với khóa.

            tệp. ghi(f'Chủ đề {số} có {giá trị}. \n')

 

# tạo khóa chung

khóa = Khóa()

# làm hỏng đường dẫn tệp được chia sẻ

filepath = 'đầu ra. txt'

# Mở tập tin

tệp = mở(đường dẫn tệp, 'a')

# cấu hình nhiều chủ đề

luồng = [Luồng(target=task, args=(i,file,lock)) for i in range(1000)]

# bắt đầu chủ đề

cho chuỗi trong chuỗi.

    chuỗi. bắt đầu()

# chờ chủ đề kết thúc

cho chuỗi trong chuỗi.

    chuỗi. tham gia()

# đóng tệp

tệp. đóng()

Trước tiên, chạy ví dụ sẽ tạo khóa dùng chung và xử lý tệp dùng chung

Các luồng mới được tạo và định cấu hình để thực thi hàm task(), nhận xử lý tệp được chia sẻ và khóa làm đối số

Chủ đề chính bắt đầu các chủ đề và sau đó đợi các tác vụ hoàn thành

Mỗi luồng lặp 1.000 lần, mỗi lần lặp lại tạo ra một số ngẫu nhiên, lấy khóa và ghi một dòng vào tệp. Nếu khóa đã được mua, thì chuỗi sẽ đợi cho đến khi khóa khả dụng

Điều này thêm một số độ trễ cho chương trình, tăng thời gian chạy

mở đầu ra. txt trong một tệp văn bản và kiểm tra nội dung

1

2

3

4

5

6

7

8

9

10

11

Chủ đề 0 nhận 0. 22420494281917025

Chủ đề 0 nhận 0. 21289722108152498

Chủ đề 0 nhận 0. 8053214755874063

Chủ đề 0 nhận 0. 19071164153864895

Chủ đề 0 nhận 0. 5812611973492516

Chủ đề 0 nhận 0. 47701010872803906

Chủ đề 0 nhận 0. 6587265088322402

Chủ đề 0 nhận 0. 2080983771931354

Chủ đề 0 nhận 0. 8302360563792428

Chủ đề 0 nhận 0. 678505225691399

...

Xem lại tổng số mục nhập trong tệp

Sẽ có 1.000.000 dòng trong tệp. Đã tránh được tình trạng chạy đua vì giờ đây mỗi luồng ghi vào cùng một tệp theo cách an toàn cho luồng

Điều quan trọng là 1.000.000 dòng được ghi vào tệp mỗi khi chương trình được chạy, giúp nó nhất quán

1

1000000

Lưu ý, các số ngẫu nhiên được ghi vào tệp sẽ khác nhau mỗi khi chương trình được chạy

Có được khóa là một nút cổ chai trong mã, làm tăng thời gian chạy chung của chương trình

Một cách tiếp cận khác sẽ dành cho mỗi luồng để lấy khóa khi bắt đầu tác vụ, sau đó tạo và viết tất cả các số ngẫu nhiên

Điều này sẽ khiến khóa được lấy 1.000 lần, một lần cho mỗi luồng, thay vì 1.000.000 lần, một lần cho mỗi lần ghi vào tệp

Ví dụ: phiên bản cập nhật của hàm task() với thay đổi này được liệt kê bên dưới

1

2

3

4

5

6

7

8

9

10

# nhiệm vụ cho chủ đề công nhân

def nhiệm vụ(số, file, lock):

    # giành được khóa

    với khóa.

        # vòng lặp nhiệm vụ

        cho i trong phạm vi(1000):

            # tạo số ngẫu nhiên trong khoảng từ 0 đến 1

            giá trị = ngẫu nhiên()

            # ghi vào tệp

            tệp. ghi(f'Chủ đề {số} có {giá trị}. \n')

Tiếp theo, hãy xem cách chúng ta có thể ghi một cách an toàn vào tệp bằng chuỗi chuyên dụng

Ví dụ về chủ đề ghi tệp chuyên dụng an toàn

Một cách tiếp cận khác để ghi vào tệp theo cách an toàn theo luồng là sử dụng luồng ghi tệp chuyên dụng

Nếu chỉ có một luồng duy nhất chịu trách nhiệm ghi vào tệp, thì tất cả việc ghi tệp sẽ an toàn cho luồng

Điều này có thể đạt được bằng cách tạo một luồng mới lặp lại trong suốt thời gian chương trình đọc các dòng từ các luồng khác và ghi chúng vào tệp. Các luồng công nhân có thể giao tiếp với luồng trình ghi tệp bằng cách sử dụng cấu trúc dữ liệu hàng đợi an toàn cho luồng. Công nhân sẽ đặt tin nhắn vào hàng đợi và chuỗi trình ghi tệp sẽ nhận tin nhắn từ hàng đợi và ghi chúng vào tệp

Đầu tiên, chúng ta có thể viết một hàm để được thực thi bởi luồng trình ghi tệp duy nhất

Hàm sẽ lấy đường dẫn đến tệp và thể hiện hàng đợi làm đối số

Đầu tiên, tệp sẽ được mở bằng trình quản lý bối cảnh. Sau đó, nhiệm vụ sẽ lặp lại mãi mãi. Mỗi lần lặp lại, nó sẽ lấy một dòng văn bản từ hàng đợi bằng cách gọi get(), ghi nó vào tệp, xóa nội dung của bộ đệm tệp (viết văn bản được lưu vào bộ đệm), sau đó đánh dấu tác vụ là xong

Xóa nội dung sau mỗi lần ghi rất tốn kém về mặt tính toán, nhưng đảm bảo rằng nếu tệp bị đóng, chẳng hạn như bởi quy trình Python, thì sẽ không có nội dung nào chờ được ghi

Hàm file_writer() thực hiện điều này được liệt kê bên dưới

1

2

3

4

5

6

7

8

9

10

11

12

13

14

# nhiệm vụ viết tập tin chuyên dụng

def file_writer(filepath, queue):

    # mở tệp

    với mở(đường dẫn tệp, 'w') as file:

        # chạy vĩnh viễn

        trong khi Đúng.

            # lấy một dòng văn bản từ hàng đợi

            dòng = hàng đợi. nhận()

            # ghi vào tệp

            tệp. viết(dòng)

            # xóa bộ đệm

            tệp. xả()

            # đánh dấu đơn vị công việc là hoàn thành

            hàng đợi. task_done()

Tiếp theo, chúng ta có thể cập nhật hàm task() được thực thi bởi worker thread

Hàm phải nhận hàng đợi làm đối số, sau đó thêm các dòng vào hàng đợi mỗi lần lặp bằng cách gọi hàm put()

Hàm task() được cập nhật với những thay đổi này được liệt kê bên dưới

1

2

3

4

5

6

7

8

# nhiệm vụ cho chủ đề công nhân

def nhiệm vụ(số, queue):

    # vòng lặp nhiệm vụ

    cho i trong phạm vi(1000):

        # tạo số ngẫu nhiên trong khoảng từ 0 đến 1

        giá trị = ngẫu nhiên()

        # đưa kết quả vào hàng đợi

        hàng đợi. đặt(f'Chủ đề {số} có {giá trị}. \n')

Tiếp theo, trong luồng chính, chúng ta có thể tạo hàng đợi được chia sẻ

Đây sẽ là một thể hiện của hàng đợi. lớp xếp hàng. Đây là một triển khai an toàn theo luồng của cấu trúc dữ liệu hàng đợi, cho phép nhiều luồng gọi get() và put() mà không cần điều kiện chủng tộc

Điều quan trọng là hàm get() sẽ chặn cho đến khi có tác phẩm mới trong hàng đợi. Điều này rất hữu ích, vì nó có nghĩa là chuỗi trình ghi tệp của chúng tôi sẽ kiên nhẫn chờ đợi và không tiêu tốn tài nguyên trong khi chờ các dòng văn bản được in

1

2

3

.. .

# tạo hàng đợi chung

hàng đợi = Hàng đợi()

Tiếp theo, chúng ta có thể tạo và bắt đầu một chuỗi mới để thực thi hàm file_writer() của mình

Điều quan trọng, luồng mới này sẽ được đánh dấu là luồng daemon, đây là một tác vụ nền. Điều này có nghĩa là quy trình Python có thể thoát trong khi chuỗi này vẫn đang chạy

1

2

3

4

5

6

.. .

# làm hỏng đường dẫn tệp được chia sẻ

filepath = 'đầu ra. txt'

# tạo và bắt đầu chủ đề ghi tệp

writer_thread = Chủ đề(mục tiêu=file_writer, args=(filepath,queue), daemon=True)

writer_thread. bắt đầu()

Sau đó, chúng tôi có thể tạo và định cấu hình chuỗi công nhân, chuyển phiên bản của hàng đợi được chia sẻ

1

2

3

.. .

# cấu hình chủ đề công nhân

luồng = [Luồng(target=task, args=(i,queue)) for i in range(1000)]

Cuối cùng, sau khi chuỗi công nhân kết thúc, chuỗi chính có thể đợi trên hàng đợi để xử lý tất cả các tác vụ đã gửi

Điều này có thể đạt được bằng cách gọi hàm join()

1

2

3

.. .

# đợi tất cả các tác vụ trong hàng đợi được xử lý

hàng đợi. tham gia()

Mỗi lệnh gọi put() của luồng công nhân sẽ tăng một bộ đếm trong hàng đợi và mỗi lệnh gọi task_done() tiếp theo của luồng máy in sẽ giảm bộ đếm, cho biết đơn vị công nhân đã được xử lý hoàn tất

Sau khi chuỗi công nhân kết thúc, không còn dòng nào để thêm vào hàng đợi. Do đó, khi bộ đếm bên trong của hàng đợi về 0, chúng tôi biết rằng tất cả công việc đã được xử lý

Chức năng tham gia () sẽ chặn cho đến khi tất cả công việc đã gửi trên hàng đợi được xử lý

Liên kết điều này lại với nhau, ví dụ hoàn chỉnh về ghi vào tệp theo cách an toàn luồng bằng cách sử dụng luồng chuyên dụng và hàng đợi được liệt kê bên dưới

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

#Trăn Siêu Nhanh. com

# ví dụ về cách ghi an toàn cho luồng vào tệp bằng luồng trình ghi chuyên dụng

từ ngẫu nhiên nhập ngẫu nhiên

từ luồng nhập Luồng

từ hàng đợi nhập Hàng đợi

 

# nhiệm vụ viết tập tin chuyên dụng

def file_writer(filepath, queue):

    # mở tệp

    với mở(đường dẫn tệp, 'w') as file:

        # chạy vĩnh viễn

        trong khi Đúng.

            # lấy một dòng văn bản từ hàng đợi

            dòng = hàng đợi. nhận()

            # ghi vào tệp

            tệp. viết(dòng)

            # xóa bộ đệm

            tệp. xả()

            # đánh dấu đơn vị công việc là hoàn thành

            hàng đợi. task_done()

 

# nhiệm vụ cho chủ đề công nhân

def nhiệm vụ(số, queue):

    # vòng lặp nhiệm vụ

    cho i trong phạm vi(1000):

        # tạo số ngẫu nhiên trong khoảng từ 0 đến 1

        giá trị = ngẫu nhiên()

        # đưa kết quả vào hàng đợi

        hàng đợi. đặt(f'Chủ đề {số} có {giá trị}. \n')

 

# tạo hàng đợi chung

hàng đợi = Hàng đợi()

# làm hỏng đường dẫn tệp được chia sẻ

filepath = 'đầu ra. txt'

# tạo và bắt đầu chủ đề ghi tệp

writer_thread = Chủ đề(mục tiêu=file_writer, args=(filepath,queue), daemon=True)

writer_thread. bắt đầu()

# cấu hình chủ đề công nhân

luồng = [Luồng(target=task, args=(i,queue)) for i in range(1000)]

# bắt đầu chủ đề

cho chuỗi trong chuỗi.

    chuỗi. bắt đầu()

# chờ chủ đề kết thúc

cho chuỗi trong chuỗi.

    chuỗi. tham gia()

# đợi tất cả các tác vụ trong hàng đợi được xử lý

hàng đợi. tham gia()

Chạy ví dụ đầu tiên sẽ tạo hàng đợi được chia sẻ

Tiếp theo, luồng trình ghi tệp được tạo và định cấu hình để thực thi hàm file_writer() của chúng tôi dưới dạng luồng trình nền. Nó được bắt đầu và chạy mãi mãi, đọc các mục từ hàng đợi được chia sẻ và ghi chúng vào tệp

Chuỗi công nhân được bắt đầu và chuyển một phiên bản của hàng đợi được chia sẻ. Mỗi luồng lặp lại 1.000 lần, tạo số và viết dòng vào hàng đợi được chia sẻ

Chủ đề chính chờ tất cả các chủ đề công nhân hoàn thành. Sau đó, nó sẽ chặn trên hàng đợi được chia sẻ cho đến khi tất cả các dòng đã gửi được ghi vào tệp

Luồng chính kết thúc, luồng này sau đó kết thúc luồng daemon nền, đóng mạnh tệp đang mở

mở đầu ra. txt trong trình soạn thảo văn bản và xem lại nội dung

1

2

3

4

5

6

7

8

9

10

11

Chủ đề 0 nhận 0. 14775478937834163

Chủ đề 0 nhận 0. 09069478905986883

Chủ đề 0 nhận 0. 9265023972899946

Chủ đề 0 nhận 0. 8192999500888586

Chủ đề 0 nhận 0. 042930675313449296

Chủ đề 0 nhận 0. 9344168225488523

Chủ đề 0 nhận 0. 6227562426387442

Chủ đề 0 nhận 0. 1159773782005542

Chủ đề 0 nhận 0. 8186760081390562

Chủ đề 0 nhận 0. 2684847766800359

...

Kiểm tra tổng số dòng trong tệp

Chúng ta có thể thấy rằng nó có 1.000.000 dòng dự kiến. Điều quan trọng là nó sẽ có cùng số dòng mỗi khi chương trình được chạy vì không có điều kiện chạy đua nào khi ghi vào tệp

1

1000000

Cách chúng tôi xử lý việc đóng tệp hơi thô

Tốt hơn là thực sự đóng tệp như một phần của hoạt động bình thường của chương trình

Điều này có thể đạt được bằng cách cập nhật hàm file_writer(). Chúng tôi có thể kiểm tra một tin nhắn đặc biệt và nếu nhận được, hãy đánh dấu nó là đã xử lý và trả về từ chức năng, thao tác này sẽ chấm dứt luồng trình ghi tệp. Giá trị Không có là một đối tượng mà chúng ta có thể sử dụng làm thông báo đặc biệt

Ví dụ

1

2

3

4

5

.. .

# kiểm tra xem chúng ta đã hoàn tất chưa

nếu dòng Không có.

    # thoát khỏi vòng lặp

   nghỉ

Sau khi thoát khỏi vòng lặp, tệp được đóng lại và chúng tôi có thể đánh dấu thông báo cuối cùng được đưa vào hàng đợi là đã xử lý

1

2

3

.. .

# đánh dấu tín hiệu thoát là đã xử lý, sau khi đóng tệp

hàng đợi. task_done()

Hàm file_writer() được cập nhật với những thay đổi này được liệt kê bên dưới

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

# nhiệm vụ viết tập tin chuyên dụng

def file_writer(filepath, queue):

    # mở tệp

    với mở(đường dẫn tệp, 'w') as file:

        # chạy cho đến khi sự kiện được đặt

        trong khi Đúng.

            # lấy một dòng văn bản từ hàng đợi

            dòng = hàng đợi. nhận()

            # kiểm tra xem chúng ta đã hoàn tất chưa

            nếu dòng Không:

                # thoát khỏi vòng lặp

                nghỉ

            # ghi vào tệp

            tệp. viết(dòng)

            # xóa bộ đệm

            tệp. xả()

            # đánh dấu đơn vị công việc là hoàn thành

            hàng đợi. task_done()

    # đánh dấu tín hiệu thoát là đã xử lý sau khi đóng tệp

    hàng đợi. task_done()

Trong luồng chính, khi tất cả các luồng công nhân kết thúc, chúng ta có thể báo hiệu rằng không có thêm thông báo nào được mong đợi bằng cách gọi put() trên hàng đợi và chuyển giá trị Không có

1

2

3

.. .

# báo hiệu cho chuỗi trình ghi tệp rằng chúng tôi đã hoàn thành

hàng đợi. đặt(Không có)

Điều này sẽ báo hiệu cho chuỗi trình ghi tệp rằng không có thêm thông báo nào được mong đợi và kết thúc

Sau đó, luồng chính có thể tham gia hàng đợi và đợi luồng trình ghi tệp xử lý tất cả các thông báo, bao gồm cả tín hiệu kết thúc

1

2

3

.. .

# đợi tất cả các tác vụ trong hàng đợi được xử lý

hàng đợi. tham gia()

Liên kết điều này lại với nhau, ví dụ hoàn chỉnh với những cải tiến này được liệt kê bên dưới

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

#Trăn Siêu Nhanh. com

# ví dụ về cách ghi an toàn cho luồng vào tệp bằng luồng trình ghi chuyên dụng

từ ngẫu nhiên nhập ngẫu nhiên

từ luồng nhập Luồng

từ hàng đợi nhập Hàng đợi

 

# nhiệm vụ viết tập tin chuyên dụng

def file_writer(filepath, queue):

    # mở tệp

    với mở(đường dẫn tệp, 'w') as file:

        # chạy cho đến khi sự kiện được đặt

        trong khi Đúng.

            # lấy một dòng văn bản từ hàng đợi

            dòng = hàng đợi. nhận()

            # kiểm tra xem chúng ta đã hoàn tất chưa

            nếu dòng Không:

                # thoát khỏi vòng lặp

                nghỉ

            # ghi vào tệp

            tệp. viết(dòng)

            # xóa bộ đệm

            tệp. xả()

            # đánh dấu đơn vị công việc là hoàn thành

            hàng đợi. task_done()

    # đánh dấu tín hiệu thoát là đã xử lý sau khi đóng tệp

    hàng đợi. task_done()

 

# nhiệm vụ cho chủ đề công nhân

def nhiệm vụ(số, queue):

    # vòng lặp nhiệm vụ

    cho i trong phạm vi(1000):

        # tạo số ngẫu nhiên trong khoảng từ 0 đến 1

        giá trị = ngẫu nhiên()

        # đưa kết quả vào hàng đợi

        hàng đợi. đặt(f'Chủ đề {số} có {giá trị}. \n')

 

# tạo hàng đợi chung

hàng đợi = Hàng đợi()

# làm hỏng đường dẫn tệp được chia sẻ

filepath = 'đầu ra. txt'

# tạo và bắt đầu chủ đề ghi tệp

writer_thread = Chủ đề(mục tiêu=file_writer, args=(filepath,queue), daemon=True)

writer_thread. bắt đầu()

# cấu hình chủ đề công nhân

luồng = [Luồng(target=task, args=(i,queue)) for i in range(1000)]

# bắt đầu chủ đề

cho chuỗi trong chuỗi.

    chuỗi. bắt đầu()

# chờ chủ đề kết thúc

cho chuỗi trong chuỗi.

    chuỗi. tham gia()

# báo hiệu cho chuỗi trình ghi tệp rằng chúng tôi đã hoàn thành

hàng đợi. đặt(Không có)

# đợi tất cả các tác vụ trong hàng đợi được xử lý

hàng đợi. tham gia()

Chạy ví dụ tạo ra cùng một luồng ghi an toàn vào tệp

Điều quan trọng là chúng tôi tự tin rằng tệp được đóng chính xác như một phần của quá trình thực thi bình thường của chương trình