
Chào các "dev-er" tương lai của anh Creyt! Hôm nay, chúng ta sẽ "đào" sâu vào một khái niệm mà nhiều bạn trẻ thường bỏ qua, nhưng nó lại là "xương sống" của các ứng dụng Node.js xịn sò, đó là stream.Readable. Nghe cái tên có vẻ "hàn lâm" đúng không? Nhưng thật ra, nó cực kỳ thực tế và hữu ích, đặc biệt khi bạn cần xử lý những "núi" dữ liệu mà không muốn làm "nghẹt thở" cái máy tính của mình.
Tưởng tượng thế này: Bạn đang xem một video TikTok dài 10 phút. Bạn có muốn đợi nó tải toàn bộ 10 phút về máy rồi mới xem không? Hay bạn muốn xem luôn trong khi nó vẫn đang tải từng đoạn nhỏ? Chắc chắn là cái thứ hai rồi, đúng không? Đó chính là tinh thần của stream.Readable!
Nó giống như một "ống dẫn nước" (pipe) hoặc một "băng chuyền" (conveyor belt) vậy. Thay vì "bê" cả cái hồ nước về nhà (tải hết dữ liệu vào RAM), bạn chỉ cần mở vòi và nước (dữ liệu) sẽ chảy ra từ từ, vừa đủ dùng. Khi nào cần thêm, bạn lại "kéo" tiếp. Cái cơ chế "kéo" (pull-based) này chính là điểm mấu chốt của Readable stream.
stream.Readable Là Gì và Để Làm Gì?
Về cơ bản, stream.Readable trong Node.js là một abstract base class (lớp cơ sở trừu tượng) để tạo ra các đối tượng có khả năng đọc dữ liệu theo luồng. Tức là, nó cho phép bạn đọc dữ liệu từng phần một, thay vì đọc toàn bộ vào bộ nhớ cùng lúc.
Tại sao phải làm thế?
- Tiết kiệm bộ nhớ (RAM): Khi làm việc với các file siêu to khổng lồ (video 4K, log file hàng GB, dataset hàng triệu bản ghi), việc tải hết vào RAM là bất khả thi hoặc sẽ làm ứng dụng của bạn sập nguồn. Stream giúp bạn xử lý từng "miếng" nhỏ.
- Tăng tốc độ phản hồi: Người dùng không phải chờ đợi toàn bộ dữ liệu được xử lý. Họ nhận được phản hồi ngay lập tức khi những phần đầu tiên của dữ liệu sẵn sàng. Giống như xem TikTok vậy!
- Xử lý dữ liệu liên tục: Rất lý tưởng cho các ứng dụng cần xử lý dữ liệu theo thời gian thực hoặc từ các nguồn không xác định kích thước trước (như input của người dùng, dữ liệu từ sensor).
Cơ chế hoạt động (hơi sâu một chút):
Readable stream có một "bộ đệm" (buffer) nội bộ. Khi bạn "kéo" dữ liệu, nó sẽ cố gắng lấp đầy bộ đệm này đến một mức nhất định (gọi là highWaterMark). Khi bộ đệm đầy, nó sẽ ngừng đọc từ nguồn cho đến khi bạn tiêu thụ bớt dữ liệu đi. Đây chính là cơ chế "backpressure" giúp hệ thống không bị quá tải.
Code Ví Dụ Minh Hoạ: Tạo Ra Dòng Chảy Số Đếm
Để bạn dễ hình dung, anh Creyt sẽ hướng dẫn bạn tạo một Readable stream đơn giản, nó sẽ "phát ra" các số từ 0 đến N.
const { Readable } = require('stream');
// Tạo một custom Readable stream
class CounterStream extends Readable {
constructor(options) {
super(options);
this.currentNumber = 0;
this.maxNumber = options.maxNumber || 10; // Giới hạn số đếm
}
// Phương thức _read() là trái tim của mọi Readable stream
// Nó được gọi khi stream cần thêm dữ liệu để đẩy vào buffer nội bộ
_read(size) { // 'size' là gợi ý về lượng byte mong muốn, nhưng không bắt buộc phải tuân thủ
if (this.currentNumber <= this.maxNumber) {
const chunk = Buffer.from(String(this.currentNumber) + '\n'); // Chuyển số thành Buffer và thêm xuống dòng
this.push(chunk); // Đẩy dữ liệu vào internal buffer
console.log(`[Producer] Đã đẩy số: ${this.currentNumber}`);
this.currentNumber++;
} else {
this.push(null); // Khi không còn dữ liệu, đẩy null để báo hiệu kết thúc stream
console.log('[Producer] Đã hết số để đẩy. Stream kết thúc.');
}
}
}
// Khởi tạo stream và đặt giới hạn
const myCounterStream = new CounterStream({ maxNumber: 5 });
console.log('--- Bắt đầu đọc dữ liệu từ CounterStream ---');
// Cách 1: Sử dụng sự kiện 'data' (Chế độ chảy - flowing mode)
// Đây là cách phổ biến và dễ dùng nhất.
// Khi có dữ liệu, sự kiện 'data' sẽ bắn ra.
myCounterStream.on('data', (chunk) => {
console.log(`[Consumer] Đã nhận: ${chunk.toString().trim()}`);
});
// Sự kiện 'end' được bắn ra khi stream kết thúc (nhận được push(null))
myCounterStream.on('end', () => {
console.log('--- CounterStream đã kết thúc ---');
});
// Sự kiện 'error' để bắt lỗi nếu có
myCounterStream.on('error', (err) => {
console.error('Lỗi xảy ra:', err);
});
/*
// Cách 2: Chế độ tạm dừng (paused mode) - Ít dùng trực tiếp hơn, nhưng quan trọng để hiểu
// Trong chế độ này, bạn phải tự gọi .read() để kéo dữ liệu
console.log('--- Bắt đầu đọc dữ liệu từ CounterStream (Paused Mode) ---');
let data;
while (null !== (data = myCounterStream.read())) {
console.log(`[Consumer Paused] Đã nhận: ${data.toString().trim()}`);
}
console.log('--- CounterStream (Paused Mode) đã kết thúc ---');
*/
Giải thích code:
class CounterStream extends Readable: Chúng ta tạo một class mới kế thừa từReadable.constructor: Khởi tạo các biến trạng thái (currentNumber,maxNumber)._read(size): Đây là phương thức "thần thánh" mà bạn phải implement khi tạoReadablestream. Node.js sẽ gọi_read()khi nó cảm thấy "đói" dữ liệu (tức là bộ đệm nội bộ đang cạn). Trong phương thức này, bạn sẽ lấy dữ liệu từ nguồn gốc của mình (ở đây là biếncurrentNumber), chuyển nó thànhBuffer, và dùngthis.push(chunk)để đẩy vào bộ đệm của stream.this.push(null): Cực kỳ quan trọng! Khi không còn dữ liệu để đọc, bạn phải gọithis.push(null)để báo hiệu rằng stream đã kết thúc. Điều này sẽ kích hoạt sự kiệnendcho các listener.myCounterStream.on('data', ...): Đây là cách thông thường để tiêu thụ dữ liệu từ mộtReadablestream. Mỗi khi stream có dữ liệu mới trong bộ đệm và sẵn sàng, sự kiệndatasẽ được kích hoạt.myCounterStream.on('end', ...): Bắn ra khi stream đã hoàn thành việc đẩy dữ liệu.myCounterStream.on('error', ...): Để bắt các lỗi có thể xảy ra trong quá trình đọc.

Mẹo Vặt (Best Practices) Từ Anh Creyt Để "Phá Đảo" Readable Streams
-
Đừng chặn ống nước (Don't block the pipe!): Phương thức
_read()phải là non-blocking. Nếu bạn có thao tác I/O nặng (ví dụ: đọc từ database, gọi API) bên trong_read(), hãy đảm bảo nó là bất đồng bộ (asynchronous). Dùngasync/awaithoặc callbacks để không làm treo toàn bộ ứng dụng của bạn.
-
Xử lý lỗi là bạn thân: Luôn luôn lắng nghe sự kiện
error. Dữ liệu có thể đến từ nhiều nguồn khác nhau, và lỗi là điều không thể tránh khỏi. -
Hiểu về
highWaterMarkvà backpressure:highWaterMarklà ngưỡng bộ đệm. Nếu bạn đẩy dữ liệu quá nhanh mà người tiêu thụ không kịp đọc,push()có thể trả vềfalse. Khi đó, bạn nên tạm dừng việc đọc từ nguồn gốc cho đến khi sự kiệndrainđược kích hoạt (đối vớiWritablestream) hoặc đợi Node.js gọi lại_read()(đối vớiReadable). DùReadablestream tự động quản lý_read()nhưng việc hiểu cơ chế này rất quan trọng để tối ưu hiệu suất. -
Sử dụng
pipe()khi có thể: Đây là cách "thanh lịch" nhất để kết nối các stream với nhau. Thay vì tự tay xử lý các sự kiệndata,end,errorgiữa mộtReadablevà mộtWritablestream,pipe()sẽ làm tất cả cho bạn, bao gồm cả quản lý backpressure.// Ví dụ: Đọc file và nén nó, sau đó ghi ra file khác const fs = require('fs'); const zlib = require('zlib'); // Thư viện nén const readStream = fs.createReadStream('large_file.txt'); const gzipStream = zlib.createGzip(); // Một Writable/Readable stream (Transform stream) const writeStream = fs.createWriteStream('large_file.txt.gz'); readStream.pipe(gzipStream).pipe(writeStream) .on('finish', () => console.log('File đã được nén và ghi thành công!')) .on('error', (err) => console.error('Lỗi trong quá trình pipe:', err));
Ứng Dụng Thực Tế: "Stream" Đang Ở Khắp Mọi Nơi!
Bạn có thể không nhận ra, nhưng stream.Readable (hoặc các loại stream khác) đang "chạy ngầm" trong rất nhiều ứng dụng bạn dùng hàng ngày:
- Xem phim/nghe nhạc trực tuyến (Netflix, Spotify, YouTube): Đây là ví dụ kinh điển nhất. Dữ liệu video/audio được stream từng phần nhỏ, giúp bạn xem ngay lập tức mà không cần tải hết về.
- Tải file lớn về máy (Download Manager): Khi bạn tải một file hàng GB, các trình quản lý tải xuống thường sử dụng stream để ghi dữ liệu xuống đĩa mà không cần tải toàn bộ vào RAM trước.
- Xử lý file log (ELK Stack): Các hệ thống thu thập và phân tích log thường phải xử lý hàng terabyte dữ liệu mỗi ngày. Stream giúp đọc, lọc, và chuyển đổi các dòng log một cách hiệu quả.
- API trả về dữ liệu lớn: Một số API trả về kết quả dưới dạng JSON lớn hoặc CSV. Thay vì gửi toàn bộ một lúc, server có thể stream dữ liệu, giúp client nhận và xử lý từng phần.
- Truyền file qua mạng (FTP, HTTP file upload): Khi bạn upload một file lớn lên server, dữ liệu cũng được stream từ client lên server.
Thử Nghiệm và Nên Dùng Cho Case Nào?
Khi nào nên dùng stream.Readable?
- Đọc file từ ổ đĩa (File System): Khi bạn cần đọc các file có kích thước lớn (vài trăm MB đến vài GB).
fs.createReadStream()là mộtReadablestream. - Nhận request body từ HTTP server: Khi client upload file lên server của bạn,
requestobject trong Node.js HTTP server là mộtReadablestream. - Đọc dữ liệu từ database: Một số thư viện database hỗ trợ trả về kết quả dưới dạng stream khi truy vấn dữ liệu lớn.
- Tạo dữ liệu theo yêu cầu: Như ví dụ
CounterStreamở trên, khi bạn cần tạo ra một chuỗi dữ liệu mà không muốn lưu trữ toàn bộ trong bộ nhớ.
Khi nào không nhất thiết phải dùng?
- Dữ liệu nhỏ: Nếu dữ liệu của bạn chỉ vài KB hoặc vài MB, việc đọc toàn bộ vào bộ nhớ (ví dụ:
fs.readFileSync()hoặcfs.promises.readFile()) thường đơn giản và nhanh hơn, không cần đến sự phức tạp của stream. - Dữ liệu cần toàn bộ để xử lý: Nếu bạn bắt buộc phải có toàn bộ dữ liệu trong tay trước khi có thể bắt đầu xử lý (ví dụ: cần tính tổng số phần tử trước khi làm gì đó), thì stream có thể không phải là lựa chọn tối ưu nhất nếu không kết hợp với các kỹ thuật gộp (aggregation).
Thử nghiệm thực tế:
Hãy thử tạo một file văn bản cực lớn (ví dụ: 1GB) bằng cách lặp đi lặp lại một đoạn văn bản. Sau đó, thử đọc nó bằng fs.readFileSync() và so sánh với fs.createReadStream(). Bạn sẽ thấy sự khác biệt rõ rệt về mức độ sử dụng bộ nhớ và thời gian phản hồi. Đó chính là sức mạnh của stream.Readable!
Anh Creyt hy vọng qua bài này, bạn đã có cái nhìn rõ ràng hơn về stream.Readable và tầm quan trọng của nó trong việc xây dựng các ứng dụng Node.js hiệu quả. Hãy nhớ, làm chủ stream là một kỹ năng "level up" đáng giá cho bất kỳ dev nào!
Thuộc Series: Nodejs
Bài giảng này được tự động xuất bản ngẫu nhiên từ thư viện kiến thức. Đừng quên đón xem các Từ khoá Hướng Dẫn tiếp theo nhé!