打造自己的 Node.js Transform Stream
熟悉並學習實作 Node.js Stream,在 Node.js 開發者生涯裡是一件很重要的事,尤其在資料處理的工作上更是需要運用 Stream。在這些應該用的情境下,若不懂得使用 Stream,我們所開發出來的程式其執行效能及穩定性會相當令人擔心。
而如果你從未自己實作過 Stream,從 Transform Stream 開始入手是一個好選擇,也是一個非常實用的開發技巧。
更多關於 Stream 的說明,可以參閱 Node.js 官網上的文件:https://nodejs.org/api/stream.html
什麼是 Transform Stream?
你可能知道 Node.js 裡有多種 Stream 的機制,但其實主要是 ReadableStream 和 WritableStream 兩種基本 Stream 的組成和變化。而對一般開發者來說,最常自己實作的是 Transform Stream,你可以想像這是一個產品生產線上的加工器,進入 Transform Stream 的資料會被加工後輸出。
而以一個 Stream 而言,Transform Stream 同時具有 ReadableStream(讀入)和 WritableStream(輸出)的特性,俗話說「左耳進右耳出」就是其最佳的寫照。
舉一個 Node.js 官方的例子,利用 Gzip 的 Transform Stream 將通過的資料流進行壓縮:
const zlib = require('zlib');
const gzip = zlib.createGzip();
const fs = require('fs');
const inp = fs.createReadStream('input.txt');
const out = fs.createWriteStream('input.txt.gz');
inp.pipe(gzip).pipe(out);
實作第一個 Transform Stream
先不必暸解太多 Stream 的專有名詞和機制,若想要實作一個標準的「耳邊風」Stream,程式碼如下:
const Transform = require('stream').Transform;
const util = require('util');
const MyTransform = module.exports = function(options) {
// 直接呼叫時建立一個實例
if (!(this instanceof MyTransform))
return new Parser(options);
// 呼叫原始 Transform 的 constructor
Transform.call(this, options);
};
// 繼承 Transform
util.inherits(MyTransform, Transform);
// 當每一筆資料進來時
MyTransform.prototype._transform = function(data, encoding, callback) {
// 將輸入進來的資料直接推送出去
this.push(data);
// 完成這筆資料的處理工作
callback();
};
實際結合檔案讀取寫入,以使用這個 Transform Stream:
const fs = require('fs');
const myStream = new MyTransform();
// 讀取檔案
const input = fs.createReadStream('/my/file');
const output = fs.createWriteStream('/my/file.out');
// 導入 myStream,輸出後寫入 file.out
input.pipe(myStream).pipe(output);
範例:累積並打包一批資料輸出
有些實際應用中,我們需要累積一定量的資料,然後打包成一包輸出,尤其像是我們在做開放資料的處理時,總是會將整理好的資料一批批的批次寫入資料庫。會這樣做的原因,是因為一筆寫一次太耗時(與資料庫來回的時間),一次 50、100 或更多筆資料寫入資料庫,會有更好的效率。
這時我們可以寫一個自己的 Transform Stream 來做到這件事,如每輸入 10 筆資料後,打包成一個陣列輸出:
const Transform = require('stream').Transform;
const util = require('util');
const BatchStream = module.exports = function(options) {
// 直接呼叫時建立一個實例
if (!(this instanceof MyTransform))
return new Parser(options);
// 啟用 object mode,讓此 Stream 不是用文字或 Binary 格式,而是以物件形式輸入、輸出資料
let opts = Object.assign(options, {
objectMode: true
});
// 呼叫原始 Transform 的 constructor
Transform.call(this, options);
// 建立一個暫存區陣列
this.batch = [];
};
// 繼承 Transform
util.inherits(BatchStream, Transform);
// 當每一筆資料進來時
BatchStream.prototype._transform = function(data, encoding, callback) {
// 放入暫存區
this.batch.push(data);
// 每 10 筆推送出去一次
if (this.batch.length === 10) {
this.push(this.batch);
// 清空暫存區
this.batch = [];
}
// 完成這筆資料
callback();
};
// 當前一個 Stream 的資料輸入已經全部完成時
BatchStream.prototype._flush = function(callback) {
// 將尚未推送出去的資料送出去
if (this.batch.length > 0) {
this.push(this.batch);
// 清空暫存區
this.batch = [];
}
// 完成
callback();
};
實用的 Transform Stream 簡易用法
如果每一個 Transform Stream 都要先設計定義一個原型物件後才能使用,那也太煩人,在實際開發上會相當不便。這時可以運用簡單的方法,建立一個客製化的 Transform Stream:
const fs = require('fs');
const Transform = require('stream').Transform;
const myStream = new MyTransform();
// 讀取檔案
const input = fs.createReadStream('/my/file');
const output = fs.createWriteStream('/my/file.out');
// 導入 myStream,輸出後寫入 file.out
input
.pipe(new Transform({
transform(data, encoding, callback) {
// 將輸入進來的資料直接推送出去
this.push(data);
// 完成這筆資料的處理工作
callback();
}
})
.pipe(output);
精簡!使用 ECMAScript 新支援的 class 關鍵字
在各種新一代的 JavaScript 引擎上,已經可以使用 class
關鍵字來定義物件了,我們也可以使用 class 來定義自己的 Transform Stream,程式碼會看起來精簡許多:
const Transform = require('stream').Transform;
class MyTransform extends Transform {
constructor(options) {
super(options)
}
// 當每一筆資料進來時
_transform(data, encoding, callback) {
// 將輸入進來的資料直接推送出去
this.push(data);
// 完成這筆資料的處理工作
callback();
}
}
再精簡一點的技巧:用 callback 推送資料
用 this.push()
來推送一筆資料出去,有時確實還太囉唆,我們可以用 callback()
一次搞定:
const Transform = require('stream').Transform;
class MyTransform extends Transform {
constructor(options) {
super(options)
}
// 當每一筆資料進來時
_transform(data, encoding, callback) {
// 完成這筆資料的處理工作,同時將輸入進來的資料直接推送出去
callback(null, data);
}
}
疑難排解:怪 Bug?為什麼程式會提前結束、資料有漏?
很多人在玩弄 Stream 時,實作自己的 Transform Stream 時會發現,時常掉資料,或是資料還沒跑完,應用程式就提前結束,感覺相當不穩定。
通常,這得提到 Stream 本身的機制才能夠很完善的說明原因,但簡單來說,Stream 本身會在資料滿載處理不過來時暫停運作(可參考 Node.js Stream 的 highWaterMark 設定),所以如果 Stream 後面沒有下一家 Stream 接手消化資料,這條資料流就會堵塞卡死。
所以,通常發生這樣的情況,肯定是因為你沒有幫自己的 Transform Stream 設定下一家該往哪去,例如:
input.pipe(myStream); // 下面沒有了
尤其是,當 Stream 暫停運作後,事件引擎就沒有新的事件在跑。眾所皆知,當 JavaScript 的事件引擎沒有事件時,Node.js 整個應用程式自然就會結束。
此外,Node.js Stream 的設計上,除了用 .pipe()
來設定下一家是誰之外,還有另一種辦法可排泄資料,那就是使用 .on()
監聽 data
事件:
myStream.on('data', function(data) {
// ...
});
但通常不建議這樣使用,尤其是當你接到資料時,需要做許多非同步(Asynchronous)的複雜工作時。當資料量大時,這樣監聽事件的做法並無法做資料節流,會導致你一瞬間觸發許多非同步工作,進而將你的系統資源耗盡。若在分秒算錢的雲端系統上,你會得到爆量的結果,不是伺服器負荷不過來,就是把你的錢燒盡。
註:如果你想實作一個沒有後面又可以運作的 Stream,你必須要參考 WritableStream 的實作方式,理論上做法大同小異。
後記
之前有人抱怨,搞不太懂 Stream 在做什麼事,或到底怎麼實際開發使用。
其實 Stream 是一個看似單純,但細節很多的機制,而且開發過程中會在觀察者與非觀察者切換,對許多初學者來說更是一大挑戰,更別說會碰上一些掉資料等看似奇怪的行為。所以,很能理解不少人為什麼看不太懂網路上各種說明 Stream 機制的文章,也看不到太多人使用 Stream 機制在實際的開發上。
所以本文以較通俗簡單的範例和方法來說明 Stream 的使用,至於比較嚴格的定義或原理性的說明,就留給讀者自己去翻閱官方相關文件了。:-)
留言
張貼留言