Đọc tệp CSV lớn trong nodejs

Gần đây, tôi đang thực hiện một tác vụ liên quan đến việc đọc và xử lý một tệp csv lớn (kích thước ~3GB) và tải một loạt hàng lên AWS SQS. Khá thoải mái với Node và cách thức hoạt động của nó. Tôi biết rằng việc đọc trực tiếp một tệp lớn như vậy bằng cách sử dụng fs.readFile() sẽ không hoạt động và tôi cần một thứ khác. Theo đuổi việc tìm ra giải pháp này là những gì bài đăng này nói về

Trước khi nói về giải pháp, hãy xem cách giải quyết vấn đề sai lầm

sai đường

Giải pháp trên sẽ hoạt động với các tệp nhỏ tối đa vài MB nhưng không hoạt động xa hơn vì fs.readFile sẽ tải toàn bộ tệp vào bộ nhớ. Hiện tại, nói chung, Node sẽ gặp sự cố khi mức sử dụng bộ nhớ của bạn vượt quá giới hạn nhất định, nhìn vào cộng đồng các vấn đề như vậy, ở đâu đó khoảng 1. 4GB. Chắc chắn, bạn chắc chắn có thể tăng giới hạn đó bằng cách chuyển cờ dòng lệnh --max-old-space-size khi bắt đầu xử lý nút nhưng đó không phải là cách hiệu quả để khắc phục điều này

Cách hiệu quả (và công cụ phù hợp) cho vấn đề cụ thể này là Node. luồng js

Sử dụng các luồng Node

Bây giờ chúng ta đã xem ví dụ về luồng, hãy xem cách chúng ta có thể xử lý các tệp lớn cho một trường hợp sử dụng cụ thể bao gồm các bước khác nhau. Trong kịch bản cụ thể của tôi, tôi đã phải làm như sau

  1. Đọc các hàng từ một csv lớn
  2. Gửi từng hàng dưới dạng tin nhắn Hàng đợi AWS SQS

Mục tiêu lớn hơn của quy trình trên là đọc từng hàng từ tệp csv đầu vào và gửi một thông báo bao gồm dữ liệu từ hàng đó vào hàng đợi SQS

Sử dụng luồng không chính xác

Vậy tại sao giải pháp trên không chính xác?

Chạy đoạn mã trên trên một tệp có hàng triệu hàng cuối cùng sẽ làm hỏng quy trình Node mặc dù chúng tôi đang sử dụng luồng để đọc nội dung của nó. Lý do là - sau khi tệp được đọc và một hàng được phân tích cú pháp, chúng tôi sẽ gửi một thông báo tới AWS SQS, đây là hoạt động không đồng bộ. Điều này có nghĩa là mỗi yêu cầu mạng có thể mất thời gian riêng để thực hiện

Luồng đang đọc tệp csv không biết điều này và nó tiếp tục đọc các hàng nhanh nhất có thể. Bây giờ, hãy xem xét hàng trăm hàng được đọc và phân tích cú pháp rất nhanh bằng dữ liệu đến từ luồng đọc, điều này có nghĩa là hàng trăm yêu cầu mạng được tạo đồng thời. Vì chúng tôi có băng thông hạn chế, không phải tất cả các yêu cầu đều kết thúc ngay lập tức và vòng lặp sự kiện của Node sẽ phải lưu giữ từng chi tiết yêu cầu trong bộ nhớ cho đến khi hoàn thành

Khi các yêu cầu bắt đầu bị tắc, Node cuối cùng sẽ bị sập khi chúng tôi đạt đến giới hạn bộ nhớ. Vì vậy, hãy khắc phục điều này

đúng cách

Vì vậy, trước khi xem xét giải pháp khả thi, hãy nghĩ về vấn đề mà chúng ta đang cố gắng giải quyết. Mặc dù chúng tôi đang sử dụng luồng để đọc từ tệp csv, nhưng chúng tôi không thể hướng dẫn hoặc thông báo cho luồng đó rằng người tiêu dùng ở đầu bên kia đang chậm và luồng đó cần phải giảm tốc độ và tạm dừng khi tình huống này xảy ra. Vì vậy, chúng tôi cần một số API trong mô-đun luồng của Node cho phép chúng tôi thực hiện việc này

API cụ thể đó là stream.pipe

Nói chung, bạn có thể sử dụng pipe() trên bất kỳ luồng nào có thể đọc được. Ví dụ về luồng có thể đọc được có thể như sau

  • Đọc một tập tin
  • Thực hiện một truy vấn chọn trên cơ sở dữ liệu
  • Phản hồi HTTP
  • Còn nhiều nữa

Đường ống là một cơ chế trong đó chúng tôi cung cấp đầu ra của một luồng làm đầu vào cho luồng khác. Hãy sửa ví dụ của chúng ta bằng cách sử dụng stream.pipe()

pipe trong thuật ngữ UNIX/Linux cơ bản là một lệnh cho phép bạn chuyển đầu ra của một lệnh làm đầu vào cho một lệnh khác. trong nút. js, chúng ta có thể hiểu nó là khả năng chuyển đầu ra của luồng có thể đọc được sang luồng có thể đọc/ghi khác

Nhìn vào đoạn mã trên, bạn có thể thắc mắc tại sao chúng ta phải tạo một tệp riêng biệt với một số cú pháp lớp kỳ lạ mở rộng một số lớp stream.Transform kỳ lạ khác và tất cả những thứ đó. Hãy thư giãn, tôi có một lời giải thích mà chúng ta sẽ xem xét một chút

Trong đoạn mã trên, điều kỳ diệu xảy ra ở dòng

let readable = fs.createReadStream('large.csv');

readable
    .pipe(streamA())
	.pipe(streamB())
	.pipe(streamC())
	.pipe(streamD())
	.pipe(streamE())
0 trong
let readable = fs.createReadStream('large.csv');

readable
    .pipe(streamA())
	.pipe(streamB())
	.pipe(streamC())
	.pipe(streamD())
	.pipe(streamE())
1. Ở đây chúng tôi đang chuyển dữ liệu nhận được từ luồng có thể đọc được của chúng tôi. e. luồng đang đọc tệp csv của chúng tôi theo từng đoạn và chuyển các đoạn đó sang luồng khác tôi. e.
let readable = fs.createReadStream('large.csv');

readable
    .pipe(streamA())
	.pipe(streamB())
	.pipe(streamC())
	.pipe(streamD())
	.pipe(streamE())
2 lớp mà chúng tôi đã mã hóa bằng tay. Tác dụng của việc sử dụng đường ống ở đây là giờ đây, luồng có thể đọc được của chúng tôi thông minh và biết liệu luồng mà nó đang chuyển dữ liệu vào đang chậm lại hay hoạt động bình thường và theo đó, nó tự tạm dừng/tiếp tục đọc từ tệp csv mà không cần chúng tôi ghi một dòng nào

Hiểu nhu cầu viết luồng Chuyển đổi tùy chỉnh

Bây giờ quay lại với cú pháp tùy chỉnh kỳ lạ đó trong lớp PushToSqs. Bất cứ khi nào chúng ta cần chuyển luồng có thể đọc của mình sang bất kỳ luồng nào khác, luồng khác đó phải là luồng có thể đọc được hoặc luồng có thể ghi hoặc luồng biến đổi hoặc luồng song công. Về cơ bản, điều này có nghĩa là chúng ta không thể có một hàm ngẫu nhiên tùy ý đối số với hàm pipe ở trên. Chúng tôi chỉ cần cung cấp một trong các luồng tôi đã đề cập trước đó

const fs = require('fs');
const zlib = require('zlib');
let readable = fs.createReadStream('large.csv');

readable
.pipe(zlib.createGzip())
.pipe(fs.createWriteStream('large.csv.gz'));

Bạn hẳn đã thấy các ví dụ như ví dụ trên qua internet, nơi mọi người sử dụng các thư viện bên thứ 3 khác nhau và hoàn thành công việc của họ. Vậy tại sao chúng ta không thể sử dụng bất kỳ thư viện phổ biến nào như vậy và hoàn thành công việc mà không gặp rắc rối khi tạo luồng của riêng mình?

Nhìn vào ví dụ trên, những gì đang làm là - đọc một tệp csv lớn, nén nó và ghi lại vào đĩa, tất cả đều sử dụng các luồng. Gói bên thứ 3 zlib cung cấp logic nén và gói sẵn có fs thực hiện công việc ghi tệp vào đĩa. Bây giờ, hãy tập trung vào câu lệnh vấn đề mà chúng ta đã xem xét trước đó ở phần đầu của bài viết này - đọc tệp csv và mỗi hàng sẽ được gửi dưới dạng thông báo tới sqs

Tôi không tìm thấy bất kỳ mô-đun bên thứ 3 nào có loại logic nghiệp vụ tùy chỉnh như vậy phù hợp với yêu cầu của tôi và do đó cần phải viết lớp luồng tùy chỉnh của riêng chúng tôi

Tôi sẽ không đi vào chi tiết luồng Chuyển đổi là gì mà tôi sẽ chỉ thêm một số chi tiết để dễ hiểu những gì đang diễn ra trong mã. Luồng biến đổi về cơ bản được sử dụng để chuyển đổi luồng byte từ luồng đến và gửi nó sang luồng tiếp theo. Hàm

let readable = fs.createReadStream('large.csv');

readable
    .pipe(streamA())
	.pipe(streamB())
	.pipe(streamC())
	.pipe(streamD())
	.pipe(streamE())
4 ở dòng 134 trong lớp
let readable = fs.createReadStream('large.csv');

readable
    .pipe(streamA())
	.pipe(streamB())
	.pipe(streamC())
	.pipe(streamD())
	.pipe(streamE())
2 được gọi mỗi khi nó nhận dữ liệu từ thượng nguồn. Trong trường hợp này, chúng tôi thực hiện việc gửi tin nhắn tới AWS SQS và sau đó gọi hàm gọi lại
let readable = fs.createReadStream('large.csv');

readable
    .pipe(streamA())
	.pipe(streamB())
	.pipe(streamC())
	.pipe(streamD())
	.pipe(streamE())
6 để thông báo cho Node rằng đoạn dữ liệu cụ thể này đã được xử lý thành công và chúng tôi sẵn sàng xử lý đoạn dữ liệu tiếp theo

xử lý lỗi

Nhìn vào câu này từ Node. tài liệu js cho

Một lưu ý quan trọng là nếu luồng Có thể đọc phát sinh lỗi trong quá trình xử lý, thì đích Có thể ghi sẽ không tự động đóng. Nếu xảy ra lỗi, cần phải đóng từng luồng theo cách thủ công để tránh rò rỉ bộ nhớ

Vì vậy, điều đó có ý nghĩa gì đối với giải pháp của chúng tôi là nếu vì lý do nào đó hoặc lý do khác, luồng có thể đọc được của chúng tôi đang đọc từ tệp csv có một số lỗi và tệp không thể đọc được nữa (có thể bị hỏng byte, lỗi đọc đĩa, v.v.) thì giải pháp khác . Điều này có vẻ không quá nghiêm trọng nhưng giả sử chúng tôi có nhiều luồng được dẫn trên luồng có thể đọc được của chúng tôi như bên dưới

let readable = fs.createReadStream('large.csv');

readable
    .pipe(streamA())
	.pipe(streamB())
	.pipe(streamC())
	.pipe(streamD())
	.pipe(streamE())

Tất cả các luồng đang được dẫn vào một chuỗi hoặc sắp xếp sẽ tiếp tục chạy mặc dù luồng có thể đọc được đã bị lỗi. Tốt nhất, trong trường hợp này, chúng ta cũng nên để tất cả các luồng khác ở cuối dòng có thông tin này rằng đã xảy ra lỗi ở đầu nguồn và bạn cần thoát ra một cách nhẹ nhàng để mọi thứ được dọn sạch và chúng ta không bị rò rỉ bộ nhớ. Nhưng khi sử dụng luồng. pipe, chúng ta phải tự làm điều này bằng cách viết trình xử lý lỗi thích hợp và đóng thủ công một luồng khác bất cứ khi nào xảy ra lỗi

Nhưng có một giải pháp tốt hơn có sẵn kể từ Node. js 10. x -

Sử dụng

let readable = fs.createReadStream('large.csv');

readable
    .pipe(streamA())
	.pipe(streamB())
	.pipe(streamC())
	.pipe(streamD())
	.pipe(streamE())
7 sẽ đóng tất cả các luồng trong trường hợp có lỗi ở bất kỳ luồng nào trong chuỗi và cho phép bạn chạy bất kỳ logic xử lý lỗi nào mà bạn có thể muốn để xóa mọi thứ trước khi thoát, do đó ngăn ngừa rò rỉ bộ nhớ

Phần kết luận

đó là nó. Chúng tôi đã thấy các cách đúng và sai khi xử lý các tệp lớn trong Node. js và tạo một lớp truyền phát thủ công tùy chỉnh phù hợp với nhu cầu của chúng tôi. Tôi hy vọng điều này sẽ giúp ai đó ngoài kia tiết kiệm được vài giờ khi cố gắng xử lý một tệp lớn trong Node

Làm cách nào để đọc các tệp lớn trong nodejs?

luồng js. .
Bước 1. Tạo nút. ứng dụng js. .
Bước 2. Cài đặt phụ thuộc. Tiếp theo, cài đặt các gói fs và readline. .
Bước 3. Đọc tệp. .
Bước 4. Phân tích tệp. .
Bước 5. Xuất dữ liệu được phân tích cú pháp

Làm cách nào để đọc tệp CSV trong nodejs?

Bạn sẽ sử dụng phương thức createReadStream() của mô-đun fs để đọc dữ liệu từ tệp CSV và tạo luồng có thể đọc được . Sau đó, bạn sẽ chuyển luồng này sang luồng khác được khởi tạo bằng mô-đun csv-parse để phân tích cú pháp các khối dữ liệu. Khi các khối dữ liệu đã được phân tích cú pháp, bạn có thể đăng nhập chúng vào bảng điều khiển.

Kích thước bao nhiêu là quá lớn đối với tệp CSV?

tệp csv có giới hạn 32.767 ký tự trên mỗi ô . Excel có giới hạn 1.048.576 hàng và 16.384 cột trên mỗi trang tính.

Cách sử dụng CSV

Cách phân tích tệp CSV .
Mở tệp bằng đường dẫn tệp đầy đủ. .
Đọc bản ghi đầu tiên trong tệp và tải bản ghi vào các biến phù hợp. .
Bắt đầu một vòng lặp kết thúc khi tệp kết thúc