極簡 Node.js 入門 – 4.4 可寫流

  • 2020 年 9 月 29 日
  • 筆記

極簡 Node.js 入門系列教程://www.yuque.com/sunluyong/node

本文更佳閱讀體驗://www.yuque.com/sunluyong/node/writable

什麼是可寫流

可寫流是對數據流向設備的抽象,用來消費上游流過來的數據,通過可寫流程式可以把數據寫入設備,常見的是本地磁碟文件或者 TCP、HTTP 等網路響應。看一個之前用過的例子

process.stdin.pipe(process.stdout);

process.stdout 是一個可寫流,程式把可讀流 process.stdin 傳過來的數據寫入的標準輸出設備。在了解了可讀流的基礎上理解可寫流非常簡單,流就是有方向的數據,其中可讀流是數據源,可寫流是目的地,中間的管道環節是雙向流。

可寫流使用

調用可寫流實例的 _write() _方法就可以把數據寫入可寫流

const fs = require('fs');
const rs = fs.createReadStream('./w.js');
const ws = fs.createWriteStream('./copy.js');

rs.setEncoding('utf-8');

rs.on('data', chunk => {
  ws.write(chunk);
});

前面提到過監聽了可讀流的 data 事件就會使可讀流進入流動模式,我們在回調事件里調用了可寫流的 write() 方法,這樣數據就被寫入了可寫流抽象的設備中,也就是當前目錄下的 copy.js 文件

write() 方法有三個參數

  • chunk {String| Buffer},表示要寫入的數據
  • encoding 當寫入的數據是字元串的時候可以設置編碼
  • callback 數據被寫入之後的回調函數

自定義可寫流

和自定義可讀流類似,簡單的自定義可寫流只需要兩步

  1. 繼承 stream 模組的 Writable
  2. 實現 _write() 方法

用個簡單例子演示可寫流實現,把傳入可寫流的數據轉成大寫之後輸出到標準輸出設備 stdout

const Writable = require('stream').Writable
class OutputStream extends Writable {
    _write(chunk, enc, done) {
        // 轉大寫之後寫入標準輸出設備
        process.stdout.write(chunk.toString().toUpperCase());
        // 此處不嚴謹,應該是監聽寫完之後才調用 done
        process.nextTick(done);
    }
}
module.exports = OutputStream;

和最終可寫流暴露出來的 write() 方法一樣, _write() 方法有三個參數,作用類似

  • chunk 寫入的數據,大部分時候是 buffer,除非 decodeStrings 被設置為 false
  • encoding 如果數據是字元串,可以設置編碼,buffer 或者 object 模式會忽略
  • callback 數據寫入後的回調函數,可以通知流傳入下一個數據;當出現錯誤的時候也可以設置一個 error 參數

除了在流實現中的 _write() 之外,還可以實現 _writev() 方法,一次處理多個數據塊,這個方法用於被滯留的數據寫入隊列調用,可以不實現

實例化可寫流 options

有了可寫流的類之後可以實例化使用了,實例化可寫流的時候有幾個 option 可選,了解一下接下來要用到的三個核心 options

  • objectMode 默認是 false, 設置成 true 後 writable.write() 方法除了寫入 string 和 buffer 外,還可以寫入任意 JavaScript 對象。很有用的一個選項,後面介紹 transform 流的時候詳細介紹
  • highWaterMark 每次最多寫入的數據量, Buffer 的時候默認值 16kb, objectMode 時默認值 16
  • decodeStrings 是否把傳入的數據轉成 Buffer,默認是 true

這樣就更清楚的知道 _write() 方法傳入的參數的含義了,而且對後面介紹 back pressure 機制的理解很有幫助。

事件

和可讀流一樣,可寫流也有幾個常用的事件,有了可讀流的基礎,理解起來比較簡單
**pipe**  當可讀流調用 pipe() 方法向可寫流傳輸數據的時候會觸發可寫流的 pipe 事件
**unpipe**  當可讀流調用 unpipe() 方法移除數據傳遞的時候會觸發可寫流的 unpipe 事件
這兩個事件用於通知可寫流數據將要到來和將要被切斷,在通常情況下使用的很少

writeable.write() 方法是有一個 bool 的返回值的,前面提到了 highWaterMark,當要求寫入的數據大於可寫流的 highWaterMark 的時候,數據不會被一次寫入,有一部分數據被滯留,這時候 writeable.write() 就會返回 false,如果可以處理完就會返回 true
**drain** 當之前存在滯留數據,也就是 writeable.write() 返回過 false,經過一段時間的消化,處理完了積壓數據,可以繼續寫入新數據的時候觸發(drain 的本意即為排水、枯竭,挺形象的)

除了 write() 方法可寫流還有一個常用的方法 end(),參數和 write() 方法相同,但也可以不傳入參數,表示沒有其它數據需要寫入,可寫流可以關閉了
**finish** 當調用 writable.end() 方法,並且所有數據都被寫入底層後會觸發 finish 事件,同樣出現錯誤後會觸發 error ** **事件

back pressure

了解了這些事件,結合上之前提到的可讀流的一些知識,就能探討一些有意思的話題了。前面章節提到過用流相對於直接操作文件的好處之一是不會把記憶體壓爆,那麼流是怎麼做到的呢?

很容易聯想到流不是一次性把所有數據載入記憶體處理,而是一邊讀一邊寫。但一般數據讀取的速度會遠遠快於寫入的速度,那麼 pipe() 方法是怎麼做到供需平衡的呢?主要靠以下三個要點

  1. 可讀流有流動和暫停兩種模式,可以通過 **pause()  resume() **方法切換
  2. 可寫流的 **write() **方法會返回是否能處理當前的數據,每次可以處理多少是 highWatermark 決定的
  3. 當可寫流處理完了積壓數據會觸發 drain 事件

可以利用這三點來做到數據讀取和寫入的同步,還是使用之前的例子,但為了使消費速度降下來,刻意隔一秒再通知完成

class OutputStream extends Writable {
    _write(chunk, enc, done) {
        // 轉大寫之後寫入標準輸出設備
        process.stdout.write(chunk.toString().toUpperCase());
        // 故意延緩通知繼續傳遞數據的時間,造成寫入速度慢的現象
        setTimeout(done, 1000);
    }
}

使用一下自定義的兩個類

const RandomNumberStream = require('./RandomNumberStream');
const OutputStream = require('./OutputStream');

const rns = new RandomNumberStream(100);
const os = new OutputStream({
    highWaterMark: 8 // 把水位降低,默認16k還是挺大的
});

rns.on('data', chunk => {
    // 當待處理隊列大於 highWaterMark 時返回 false
    if (os.write(chunk) === false) { 
        console.log('pause');
        rns.pause(); // 暫停數據讀取
    }
});

// 當待處理隊列小於 highWaterMark 時觸發 drain 事件
os.on('drain', () => {
    console.log('drain')
    rns.resume(); // 恢複數據讀取
});

結合前面的三點和注釋很容易看懂上面程式碼,這就是 pipe() 方法起作用的核心原理,官方教程中也有對 back presure 機制的詳細講解

對數據的來源的去向有了大概了解,就可以學習使用雙向流對數據進行加工了

  • duplex
  • transform