nodejs的tream(流)解析與模擬文件讀寫流源碼實現

  • 什麼是流?
  • 可讀流於可寫流
  • 雙工流於轉換流
  • 背壓機制與文件流模擬實現

 一、什麼是流?

關於流的概念早在1964年就有記錄被提出了,簡單的說「流」就是控制數據傳輸過程的程序,比如在那篇記錄中有這樣的描述:

在編寫代碼時,我們應該有一些方法將程序像連接水管一樣連接起來 — 當我們需要獲取一些數據時,可以去通過”擰”其他的部分來達到目的。這也應該是IO應有的方式」。

——Doug McIlroy. October 11, 1964

在nodejs中將這種「流」的概念抽象成一個模型,可以實現有序的、可控制大小的、可按需獲取的方式實現數據傳輸,該模型在nodejs中實現的JavaScript層就是tream模塊。

1.流的基本模型

 在nodejs中基於流的基本模型又實現了四種基本流類型:

Writable(可寫流):可以理解為將數據存儲到指定的地方中,例如fs.crateWriteStream()
Readable(可讀流):可以理解為將數據從指定的地方取出來,例如fs.crateReadStream()
Duplex(雙工流):例如net.Socket
Transform(轉換流):例如zlib.crateDeflate()

這個四個流的基本類型在stream模塊上都以類的形式作為導出API,我們可以基於這四個基本類型自定義自己的流的具體實現,比如下面通過可讀流(Readable)作為示例:

自定義可讀流的思考:數據來源、如何獲取數據?

數據來源採用一個數組模擬,獲取數據的方式流程上stream已經實現了,但具體的獲取功能它在內部提供了定義接口,具體功能並沒有實現,這就是接下來我們需要實現的部分(假設每次從數組中讀取一個數組):

 1 //自定義可讀流
 2 const {Readable} = require('stream');
 3 //模擬數據來源
 4 let source = ['a','b','c'];
 5 //自定義繼承Readable
 6 class MyReadable extends Readable{
 7     constructor(source){
 8         super();
 9         this.source = source;
10     }
11     _read(){
12         let data = this.source.shift() || null; 
13         this.push(data);
14     }
15 };

在Readable上有一個read()方法可以獲取指定量數據,先不討論它的具體實現過程,看下面的幾個測試結果:

測試一:

//實例化自定義讀取流
let myReadable = new MyReadable(source);
console.log(myReadable.read(1)); //<Buffer 61>
console.log(myReadable.read(1)); //<Buffer 62>
console.log(myReadable.read(1)); //<Buffer 63>

測試二:

let myReadable = new MyReadable(source);
console.log(myReadable.read());  //<Buffer 61>
console.log(myReadable.read(1)); //<Buffer 62>
console.log(myReadable.read(1)); //<Buffer 63>

測試三:

//測試三
console.log(myReadable.read(2));  //null
console.log(myReadable.read(1)); //<Buffer 61>
console.log(myReadable.read(1)); //<Buffer 62>

測試四:

//測試四
console.log(myReadable.read(1));  //<Buffer 61>
console.log(myReadable.read(2)); //<null>
console.log(myReadable.read(1)); //<Buffer 62>

首先通過打印結果來看,讀取流並不是將數組中的元素值直接返回,而返回的是Buffer。這是因為在流內部使用了Buffer作為數據中轉容器,通過push傳入的數據如果不是Buffer則會使用Buffer.from()轉換後再緩存到這個Buffer內。

至於為什麼read(size)在前面為什麼讀出的null,首先要理解size的意思是從緩存內讀出指定位元組長度的數據,當中轉Buffer內沒有足夠未讀的數據就不會返回數據,而是返回null。然後就是要了解Buffer內的數據是什麼時候被傳入的,當我們每調用一次read()本質上內部就會執行一次_read(),也就是實現向Buffer緩存數據的操作,當Buffer中緩存足夠指定讀出位元組長度時就會成功返回數據,否則就會返回null,比如下面的示例:

console.log(myReadable.read(2));  //null--這次一次向Buffer內存了一個位元組,但不夠要讀的位元組長度是2,所以返回null
console.log(myReadable.read(2)); //<Buffer 61 62>--這次又向Buffer內存了一個位元組,加上次存的位元組剛好滿足要讀的位元組長度2,所以成功返回兩個位元組的數據

通過read()返回數據肯定不是符合流的模式,根據前面前面的基本模型來看,它應該是一種可以自動的將數據源中的數據源源不斷的返回,nodejs的流tream底層是通過事件的機制來實現的,所以正確獲取數據的方式是通過data、readable事件來獲取數據:

myReadable.on('data',(chunk)=>{
    console.log(chunk);
});
//打印結果
//<Buffer 61>
//<Buffer 62>
//<Buffer 63> 

可讀流中data事件獲取數據的機制:每一次_read()實現一次數據緩存時,就會觸發一次data事件,並將當前_read()緩存到Buffer內的數據全部傳遞給data事件回調函數作為實參chunk。

可讀流readable事件獲取數據的機制:該事件會在啟動流操作緩存空間寫入數據觸發一次(也就是_read()執行一次後),然後再是在數據源的數據全部被寫入到Buffer後觸發一次(但需要注意這個全部寫入描述並不準確,因為它是在當_read()向Buffer寫入null時觸發)。因為任務回調是異步操,當它觸發後回調執行時Buffer內的數據量是不確定的,從nodejs流的設計機制來說readable並不是為了獲取數據而存在,而僅僅是作為傳遞流操作的狀態信息,但也可以通過這個事件配合read()方法獲取數據:

myReadable.on('readable',()=>{
    let data = null;
    while((data = myReadable.read(1)) !== null){
        console.log(data);
    }
});

測試在數據源上出現一個null元素對readable事件的影響:

//更新模擬數據來源
let source = ['a','b','c',null,'d','e'];
//測試readable事件(還是使用上一個示例的輸出代碼)
<Buffer 61>
<Buffer 62>
<Buffer 63>

通過輸出結果可以看到,在數組中添加的null,’d’,’e’沒有被成功輸出,這是因為當push()向Buffer內緩存數據時寫入的是null,表示不在為流提供消費數據,類似終止數據源向緩存中提供數據,流操作隨之結束。所以,push(null)可以理解為流操作結束的最終閥門,並且無法重啟,所以在定義流時push(null)用來表示流的數據源傳輸數據到達終點。

然後就是控制器(閥門的兩個操作):

pause()暫停:這個方法會暫停_read()方法向緩存區寫數據時觸發data事件,也就是說調用這個方法只能中斷向消費者輸出數據,而不會中斷_read()從數據源中讀取數據寫入緩存區。並且這個方法調用會觸發pause事件,既暫停消費數據事件。

resume()啟動:這個方法可以重啟被pause()暫停的數據消費,重新啟動data事件從緩存區讀出數據。但需要注意這個方法會觸發resume事件,但不是在重啟時觸發這個事件,而是在表示啟動時觸發resume事件,關於這個事件的詳細內容在下一節解析,這裡只需要了解resume()可以重啟被pause()暫停的data事件。

 1 myReadable.on('data',(chunk)=>{
 2     //消費數據
 3     console.log(chunk);
 4     if(chunk.toString() === "a"){
 5         console.log("暫停數據消費")
 6         myReadable.pause();
 7     }
 8 });
 9 myReadable.on("pause",()=>{
10     //暫停消費數據時觸發
11     console.log("重啟數據消費");
12     myReadable.resume();    //重啟消費數據
13 });
14 //測試結果
15 <Buffer 61>
16 暫停數據消費
17 重啟數據消費
18 <Buffer 62> 
19 <Buffer 63> 

通過上面對可讀流的簡單測試和解析,可以了解到流就是將數據從一個地方源源不斷的傳輸到另一個地方,並且可以通過控制傳輸單個數據節點的大小來控制傳輸速度,也可以隨時暫停和啟動來實現傳輸的可控性。但關於中轉緩存部分沒有做任何解析,而這部分直接關係到內存的分配和使用,這也是影響I/O性能的核心部分,接下來逐個解析可讀流、可寫流、雙工流、轉換流。

 二、可讀流與寫入流

2.1可寫流

 1 //自定義可寫流
 2 const {Writable} = require('stream');
 3 let buf = Buffer.alloc(10,0,'utf-8');    //數據寫入的目的地
 4 class MyWritable extends Writable {
 5     constructor(options){
 6         super(options);
 7         this.buf = buf;
 8         this.offset = 0;
 9     }
10     _write(chunk, en, done){
11         setTimeout(()=>{
12             this.buf.write(chunk.toString(),this.offset);  //因為中間緩存寫入將數據轉換成了Buffer,這裡buf接收數據的編碼是utf-8,所以又要轉換成字符串
13             this.offset++;
14             done();
15         });
16     }
17 };
18 //構造寫入流實例
19 let myWritable = new MyWritable({
20     highWaterMark:1
21 });
22 //寫入數據
23 myWritable.write('a',()=>{
24     console.log(buf);    //<Buffer 61 00 00 00 00 00 00 00 00 00>
25 });
26 myWritable.write('b',()=>{
27     console.log(buf);    //<Buffer 61 62 00 00 00 00 00 00 00 00>
28 });
29 myWritable.write('c',()=>{
30     console.log(buf);    //<Buffer 61 62 63 00 00 00 00 00 00 00>
31 });

根可讀流一樣,在寫入數據流中包含數據源(write()寫入的數據)、中間緩存、目的地(最後數據存儲的地方,示例中用buffer作為這個數據的寫入目的地),同樣也可以實現源源不斷的向數據存儲資源中寫入數據。

然後就是關於寫入流如何控制,這個控制涉及兩個方面:一是單次向程序寫入的數據量如何控制,二是什麼時候應該停止寫入數據。

關於寫入流的單次寫入量可以通過兩種方式:

第一種方式可以在write()寫入時控制寫入數據塊的大小,但這不能實現對內存消耗的精準把控,單個數據塊寫入多少是合適的,寫入的頻率如何掌控都只能從大概的推測上來實現。

第二種方式是基於流的drain事件配合highwaterMark設置中間緩存大小來實現控制,當使用write()寫入數據時,如果當前寫入到中間緩存的數據到達highwaterMark設置的最大值就會返回false,這時候就意味着中間緩存空間已經佔滿,但中間緩存依然還在內部持續的向目標空間寫入,如果寫入完畢中間緩存清空就會觸發drain事件。基於流的這樣實現就可以在write()寫入返回false時停止寫入,等待drain事件觸發再寫入數據,這樣就實現了控制數據寫入的啟停操作,當然單個數據塊大小也可以基於highwaterMark的設置來考慮。

 更具前面的解析和流程圖再來看下面的具體示例:

 1 //自定義可寫流(實現可控的數據寫入操作)
 2 const {Writable} = require('stream');
 3 let buf = Buffer.alloc(10,0,'utf-8');
 4 class MyWritable extends Writable {
 5     constructor(options){
 6         super(options);
 7         this.buf = buf;
 8         this.offset = 0;
 9     }
10     _write(chunk, en, done){
11         setTimeout(()=>{
12             this.buf.write(chunk.toString(),this.offset);
13             this.offset++;
14             done();
15         });
16     }
17 };
18 
19 
20 let myWritable = new MyWritable({
21     highWaterMark:1
22 });
23 let source = 'abc'.split("");
24 let flag = true;
25 let num = 0;
26 
27 function fun(){
28     while(flag && num < source.length){
29        flag = myWritable.write('a',()=>{
30             console.log(buf);
31         });
32         num ++;
33     }
34 }
35 
36 myWritable.on('drain',()=>{
37     console.log("drain 執行了"); //表示中間緩存數據已經完全寫入到目的地,可以重啟數據寫入操作
38     flag = true;
39     fun();
40 });
41 fun();

最後介紹一下關於寫入流的其他方法、事件、配置屬性:

//方法、實例化配置
write():實現數據寫入操作。
cork():該方法用於強制所有寫入數據緩存在中間緩存中,只有調用uncork()、end()方法後才會重新啟動向目的地寫入數據
uncork():重新啟動向目的地寫入數據,用於關閉之前cork()的操作。
end():表示不在寫入數據,但這個方法依然還可以像write()方法一樣寫入最後一個數據塊。這個方法執行後,當中間緩存的數據全部寫入到目的地後觸發finish事件。
destroy():銷毀流,用於強制關閉流操作。如果此時中間緩存還有數據沒寫入到目的地也不會再寫入,並且會報ERR_STREAM_DESTROYED錯誤。
new Writable([options]):實例話可寫流,這裡重點來關注options參數,該參數的屬性可以用來定製流的具體操作,同等與繼承stream.Writable然後在構造內定義具體操作,比如前面示例中定義_write()方法,也可通過這個參數的_write掛載實現,但這裡重點來關注一些配置屬性:
--highWaterMark:設置中間緩存的容量,默認16KB
--decodeStrings:用於設置write()向中間緩存寫入字符串時是否使用write的編碼轉換成Buffer,默認為true,這時候_write()接收到的chunk就是Buffer,除String類型以外其他類型不會被轉換;如果設置為false,_write()接收到的chunk還是字符串。
--defaultEncoding:指定默認編碼格式,如果write()沒有指定編碼就會使用該默認編碼,默認utf8。
--objectMode:是否支持寫入字符串、Buffer、Uint8Array以外的JavaScript值,即使為true也不能支持null值。
--emitClose:被銷毀後是否觸發close事件,默認為true,即銷毀流會觸發close事件。
--autoDestroy:在結束流操作(即end()指定)以後是否自動使用destroy()銷毀流,默認為true。

//事件
dirin:當被寫滿的中間緩存重新被清空後觸發該事件,但第一次write寫入之前也會觸發這個事件。
finish:在調用end()方法之後,並且所有數據都刷新到底層系統之後觸發該事件
colose:當流及其任何底層資源已關閉時,觸發該事件,意味者所有流操作結束(如果寫入流使用emitClose創建,則始終觸發該事件)
error:在寫入或管道數據時發生錯誤會觸發該事件
pipe:當可讀流上調用pipe方法將此寫入流添加到其目標集時,觸發該事件
unpipe:當可讀流通過unpipe()取消該寫入流的源流,觸發該事件,也就是從可讀流的目標集合中刪除這個目標時觸發事件

2.2可讀流

可讀流的兩種模式:流動模式、暫停模式。

–流動模式:基於data事件實現的不中斷式的提供數據,數據從源到中間緩存,再到數據讀出到消費層不做任何控制處理,數據源源不斷流向消費層。

–暫停模式:基於pause事件實現可中斷式的提供數據模式,數據流中間採用pause()暫停/pause事件/resume()重啟控制可讀流,數據傳輸過程中根據需要可控的往消費層傳輸。

可讀流的三種狀態:readableFlowing–>null(不提供數據狀態)、false(暫停提供數據狀態)、true(持續提供數據狀態)。

–不提供數據狀態:通過綁定data事件、pipe()、resume()、readableFlowing=true,可以使可讀流開始主動觸發數據讀出事件。

–暫停提供數據狀態:該狀態由pause()、readableFlowing=false、unpipe()觸發,該狀態只阻止中間緩存向數據消費層提供數據,並不停止數據源向中間緩存寫入數據。

–持續提供數據狀態:即由data事件、pipe()、resume()、readableFlowing=true實現數據持續流向消費層,該狀態下流持續通過data事件、read()向消費層提供數據。

關於可讀流在第一節中有詳細的示例,這裡就不重複了,下面介紹一下可讀流相關事件、方法、配置屬性:

//方法
destroy():銷毀流,用於強制關閉流操作。如果此時中間緩存區有未向外讀出的數據也不會再讀出了。
isPaused():用於判斷當前流是否處於暫停狀態,返回boolean值。
pause():用於暫停當前流操作,但不會暫停底層向中間緩存區寫入數據。
pipe():管道,該方法用於將可寫流綁定到當前的可讀流上,
read():用於向外讀取指定大小數據塊的方法,一般用於暫停模式,不建議與流動模式的data事件讀取數據混合使用。
resume():重啟當前流操作,用於重啟被pause()暫停的流操作。
setEncoding():用於設置讀取數據的編碼操作。
unpipe():用於分離先前使用pipe()綁定的可寫流。
unshift():將已讀取出來的數據推回到中間緩存區中,用於解決已經被消耗的數據重新被其他數據消費者使用。
wrap():用於兼容舊的nodejs可讀流,創建使用舊流作為其數據源的Readable流。
readable[Symbol.asyncIterator]():返回<AsyncIterator>以完全消費流,流將以大小等於highWaterMark的讀取塊往外讀取。

iterator():返回<AsyncIterator>消費流。用於實現迭代方式往外提供數據,該方法用於迭代器的方式為用戶提供取消流銷毀的選項。(16版以上新增)
//類上的靜態方法(16版以上新增)
map(fn):返回<Readable>使用函數fn映射的流(promise)
filter(fn):此方法允許過濾流,對於流中每個條目,都會調用fn函數,如果其返回真值,則該條目將被傳給結果流。(可以實現promise)
new Readable([options):實例化可讀流。這裡重點來關注options參數,該參數的屬性可以用來定製流的具體操作:
--highWaterMark:用於配置可讀流中間緩存容量,默認16KB。
--encoding:用於配置緩存區中使用指定的編碼解碼為字符串,默認null。
--objectMode:是否表現為對象流,默認false。如果該配置為false,read(n)返回單個大小為n的Buffer。
--emitClose:被銷毀後是否觸發close事件,默認true。
--autoDestroy:是否在流操作結束後自動調用destroy()銷毀流,默認為true。
--signal:表示可能取消的信號

//事件
close:當流及其任何底層資源已關閉時,觸發該事件,意味者流操作結束。如果寫入流基於emitClose創建的,則始終觸發該事件。也就是當end事件觸發後並且緩存區的數據全部讀出後觸發該事件。
data:當流將數據塊移交給消費者時,則觸發該事件。也就是當緩存區向外提供數據時通過觸發該事件來實現。
end:當流中沒有更多數據可供消費,則觸發該事件。也就是當read()向緩存區寫入null時觸發該事件。
error:如果底層流內部故障無法生成數據時,或者當流嘗試推送無效數據塊時,可能觸發該事件。
pause:通過pause()暫停流動模式時,觸發該事件,任何可用數據都將保留在內部緩衝區。
readable:當有從流中讀取的數據或以到達末尾時,觸發該事件。也就是每次向中間緩存區緩存數據後觸發該事件,緩存區沒有數據觸發該事件是為了實現read()向緩存區寫入null來觸發end事件,實現結束可讀流操作。
resume:當調用resume()重啟從中間緩存中向外提供數據時觸發該事件

 三、雙工流與轉換流

 3.1雙工流Duplex

所謂雙工流就是在一個流上同時實現可讀流和可寫流。

//雙工流
const {Duplex} = require('stream');
let buf = Buffer.alloc(10,0,'utf-8');//開闢一個內存空間,作為雙工流的底層數據存儲設備
buf.write("abc");//初識化一些數據
class MyDuplex extends Duplex{
    constructor(options){
        super(options);
        this.buf = buf;
        this.offset = 3;        //設置寫入偏移到初識化數據的末尾處
        this.readStart = 0;     //初識化可讀流的起始位置
        this.readEnd = 1;       //初識化可讀流的結束位置
    }
    _write(chunk, en, done){
        process.nextTick(()=>{
            this.buf.write(chunk.toString(),this.offset);
            this.offset += chunk.length;
            done();
        });
    }
    _read(){
        let data = this.readStart > this.buf.length || this.readStart >= this.offset ?  null : this.buf.subarray(this.readStart,this.readEnd);
        this.push(data,'utf-8');
        this.readStart ++;
        this.readEnd ++;
    }
};
let myDuplex = new MyDuplex();
//測試讀數據
// myDuplex.on('data',(chunk)=>{
//     console.log(chunk);
// });
//測試寫入再讀數據
myDuplex.write("defghijk",()=>{
    myDuplex.on('data',(chunk)=>{
        console.log(chunk.toString());
    });
});

雙工流的特點就是sources、destination共用一個資源,雖然它底層依然管理兩個中間緩存(讀取流的中間緩存區、寫入流的中間緩存區),但它不能兩個操作同時進行,因為任意一個寫入流還是讀取流關閉後就不能重啟,而且中途有寫有讀那就不是流模式了,違背了流源源不斷的往一個方向傳輸數據的原則。所以雙工流本質上就是實現讀寫流兩個操作各消費一次,也就是說要麼執行完讀再執行寫,要麼執行完寫再執行讀。

最典型的雙工流具體功能實現,在nodejs中就是TCP套位元組socket的接收與響應數據操作,程序先將網絡資源全部讀出然後再將響應數據全部寫入,

3.2轉換流Transform

轉換流是在雙工流的基礎上實現的,基於先執行寫入流,再執行讀取流的方式,實現將寫入的數據在內部做一些轉換,再將轉換過的數據通過讀取流的data事件響應回來。

轉換流不會單獨執行數據讀出,而是必須先執行寫入操作,才會觸發data事件將轉換後的數據傳輸出來。

 1 const {Transform} = require('stream');
 2 let buf = Buffer.alloc(10,0,'utf-8');
 3 class MyTransform extends Transform{
 4     constructor(options){
 5         super(options);
 6         this.buf = buf;
 7         this.offset = 0;        //初識化寫入流的ishi位置
 8     }
 9     _transform(chunk, en, cb){
10         let str = chunk.toString().toUpperCase();
11         process.nextTick(()=>{
12             this.buf.write(str,this.offset);
13             this.offset += chunk.length;
14         });
15         this.push(str);
16         cb();
17     }
18 }
19 
20 let myTransform = new MyTransform();
21 myTransform.on('data',(chunk)=>{
22     console.log(chunk.toString());  //ABCDEFG
23 });
24 myTransform.write('abcdefg');

基於轉換流,nodejs內置實現了壓縮流(zlib)和加密流(crypto)。

 四、背壓機制與文件流模擬實現

 4.1背壓機制

在很多業務中都有這樣的需求,先從一個地方讀出一些數據,然後再將這些數據寫入到另一個地方,典型的操作就是文件拷貝。由於數據讀取速度遠遠大於數據寫入速度,這就可能導致讀出到緩存中的數據超過設置緩存限制的最大值,從而造成內存溢出報錯、垃圾回收器(GC)頻繁調用、導致其他進程變慢,為了解決這種需求可能存在的潛在風險提出了一種解決方案,這個解決方案就是背壓機制。

背壓機制的原理:

基於流的I/O操作背壓機制,在nodejs的實現中就是可讀流的pipe(),其內部同時管理兩個流模型:可讀流、可寫流,可讀流的目的地(destination)是可寫流的數據源(sources);可讀流有自己的數據源(sources),可寫流有自己的流目的地(destination)。在這兩個流模型中都有自己獨立的中間緩存區,當可讀流的讀取速度大於可寫流的寫入速度時,可讀流會先將來不及寫入目的地(destination)的數據緩存到可寫流的中間緩存空間中,當可讀流的中間緩存空間也寫滿以後就會通知可讀流暫停向可寫流提供數據,這時候可讀流就會將自身從自己的數據源中讀出的數據寫到可讀流自己的中間緩存空間中。

當可寫流將自己中間緩存中的數據全部寫入到自己的目的地以後,可寫流又開始通知可讀流向自己提供數據。如果這個過程中可讀流自身的中間緩存空間都寫滿了,可讀流還沒等到可寫流通知向它提供數據,這時候可讀流會停止從自己的數據源中讀取數據的操作,直到等到可寫流通知向它提供數據,可讀流將自身中間緩存中的數據讀出傳輸給可寫流,直到可讀流將自身中間緩存中的數據全部清空然後,然後可讀流再開始從自己的數據源中讀取數據提供給可寫流。

循環以上操作,直到數據傳輸完成,這就是流操作的背壓機制。上面的描述看起會比較複雜,下面提供一個簡單的流程圖:

 

下面是基於EventEmitter和 fs.read/fs.write模擬實現的文件流源碼:

  1 //自定義文件讀取流readFile.js
  2 const fs = require('fs');
  3 const EventEmitter = require('events').EventEmitter;
  4 const {Queue} = require('./linked.js');
  5 
  6 class MyFileReadStream extends EventEmitter {
  7     constructor(path, optons={}){
  8         super();
  9         this.path = path;                   //綁定要讀取的文件
 10         this.flags = optons.flags || 'r';   //文件件操作模式:讀取模式
 11         this.mode = optons.mode || 438;     //文件操作權限
 12         this.autoClose = optons.autoClose || true;  //是否關閉(銷毀)當前文件
 13         this.start = optons.start || 0;     //開始讀取的位置
 14         this.end = optons.end;              //結束讀取的位置
 15         this.highWaterMark = optons.highWaterMark || 64 * 1024; //可讀流最大可緩存的位元組數
 16         
 17         this.readOffset = 0;        //從什麼位置讀出
 18         this.cache = new Queue();   //用於緩存數據的隊列
 19         this.readableFlowing = null;    //當前流的狀態:null(不提供數據狀態)、true(持續輸出數據)、false(暫停輸出數據)
 20 
 21         this.open();
 22         this.on('newListener',(type)=>{ //當在MyFileReadStream實例上添加監聽事件時觸發該事件,並將當前監聽事件的名稱傳遞給回調
 23             if(type === 'data'){
 24                 this.read();
 25             }
 26         });
 27     }
 28     open(){
 29         //原生open方法打開指定位置上的文件
 30         fs.open(this.path, this.flags, this.mode, (err, fd)=>{
 31             if(err){
 32                 this.emit('error', err);
 33             }
 34             this.fd = fd;
 35             this.readableFlowing = true;    //當文件正常打開時將流的狀態置為持續數據數據的狀態
 36             this.emit('open',fd);
 37         });
 38     }
 39     read(){
 40         //負責將要讀取的數據通過data事件輸出
 41         if(typeof this.fd !== 'number'){   
 42              //當第一次觸發事件
 43             return this.once('open',this.read);     //在通過open事件獲取到文件標識符後,重新調用數據讀取操作
 44         }
 45         if(!this.readableFlowing){  //當流處於暫停模式時,需要通過resume()重啟流
 46             return ;
 47         }
 48         if(this.cache.size > 0){    //如果可讀流的中間緩存中有數據,就從中間緩存中拿數據通過data事件響應出去
 49             this.emit("data",this.cache.deQueue());
 50             return this.read();
 51         }
 52         //實現數據讀取操作:        
 53         let buf = Buffer.alloc(this.highWaterMark);
 54         let howMuchToRead ;
 55         if(this.end){
 56             howMuchToRead = Math.min(this.end - this.readOffset + 1, this.highWaterMark);
 57         }else{
 58             howMuchToRead = this.highWaterMark;
 59         }
 60         //調用原生文件讀取方法fs.read獲取數據
 61         fs.read(this.fd, buf, 0, howMuchToRead, this.readOffset,(err,readBytes)=>{
 62             if(readBytes){
 63                 this.readOffset += readBytes;
 64                 //這裡需要判斷是否通過data事件輸出數據,還是將數據緩存到中間緩存中
 65                 if(this.readableFlowing){
 66                     //當流處於數據持續輸出狀態,直接將數據通過data事件輸出,並繼續調用read()讀取數據
 67                     this.emit('data',buf.slice(0,readBytes));
 68                     this.read();
 69                 }else{
 70                     //當流處於數據暫停輸出狀態,將數據緩存到中間緩存(即緩存隊列)
 71                    this.cache.enQueue(buf.slice(0,readBytes));
 72                 }
 73             }else{
 74                 //當沒有數據可讀時觸發end事件,並關閉文件標識符
 75                 this.emit('end');
 76                 this.close();
 77             }
 78         });
 79     }
 80     close(){
 81         //關閉文件流操作
 82         fs.close(this.fd,()=>{
 83             this.autoClose= false; //表示文件已被關閉
 84             this.emit('close');
 85         });
 86     }
 87     pause(){
 88         //暫停流操作
 89         this.readableFlowing = false;
 90     }
 91     resume(){
 92         //重啟流操作
 93         this.readableFlowing = true;
 94         this.read();
 95     }
 96     pipe(ws){
 97         this.on('data',(data)=>{
 98             let flag = ws.write(data);
 99             if(!flag){
100                 this.pause();
101             }
102         });
103         this.on("end",()=>{
104             ws.end();
105         });
106         this.on("close",()=>{
107             console.log("文件讀取完成,正常關閉");
108         });
109         ws.on('drain',()=>{
110             this.resume();
111         });
112     }
113 }
114 
115 module.exports = MyFileReadStream;

文件可讀流模擬源碼實現

  1 //自定文件寫入流writeFile.js
  2 const fs = require('fs');
  3 const {EventEmitter} = require('events');
  4 const {Queue} = require('./linked.js');
  5 
  6 class MyFileWriteStream extends EventEmitter{
  7     constructor(path,options={}){
  8         super();
  9         this.path = path;
 10         this.flags = options.flags || 'w';
 11         this.mode = options.mode || 438;
 12         this.autoClose = options.autoClose || true;
 13         this.start = options.start || 0;
 14         this.encoding = options.encoding || 'utf8';
 15         this.highWaterMark = options.highWaterMark || 16*1024;
 16 
 17         this.writeoffset = this.start;  //從什麼位置寫入
 18         this.writing = false;   //是否正在寫入
 19         this.writLen = 0;       //當前MyWriteStream實例要寫入的位元組長度,也就在當的寫入操作中間緩存上有多少個位元組的數據
 20         this.needDrain = false; //中間緩存是否排空了,如果排空了就觸發drain事件
 21         this.cache = new Queue();
 22         this.upstream = false;  //上游狀態是否關閉。比如由上可讀流通過pipe傳輸到當前可寫流上的數據已經到達末尾,可讀流讀取的文件要關閉被打開的文件時,通過可寫流的end()通知可寫流它已經關閉
 23 
 24         this.open();
 25     };
 26     open(){
 27         //原生fs.open
 28         fs.open(this.path, this.flags, (err, fd)=>{
 29             if(err){
 30                 this.emit('error', err);
 31             }
 32             //正常打開文件
 33             this.fd = fd;
 34             this.emit('open', fd);
 35         });
 36     };
 37     //寫入操作的對外接口
 38     write(chunk, encoding, cb){
 39         if(chunk !== null){
 40             chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk);
 41             this.writLen += chunk.length;   //累計當前要寫入的位元組數(即表示寫入流緩存中的數據位元組長度)
 42             let flag = this.writLen < this.highWaterMark;   //檢查緩存中的數據節點長度是否超出流的最大緩存空間。flag為tuer時表示緩存沒有超出,反之則超出了
 43             this.needDrain = !flag;
 44             if(this.writing){
 45                 //當前正在執行寫入,即將內容緩存到隊列
 46                 this.cache.enQueue({
 47                     chunk:chunk,
 48                     encoding:encoding,
 49                     cb:cb
 50                 });
 51             }else{
 52                 //當前不是正在寫入,即立可以即執行寫入操作
 53                 this._write(chunk,encoding,()=>{
 54                     cb && cb();   //執行寫入回調
 55                     //清空排隊的內容
 56                     this._clearBuffer();
 57                 });
 58             }
 59             return flag;
 60         }else{
 61             cb && cb();
 62             //清空排隊的內容
 63             this._clearBuffer();
 64         }
 65     };
 66     //實現寫入
 67     _write(chunk, encoding, cb){
 68         this.writing = true;    //將寫入狀態置為正在寫入
 69         if(typeof this.fd !== 'number'){
 70             //第一次寫入時,writeFile 對象上沒有fd,這事因為_write是同步任務,而this.fd獲取文件描述符的邏輯在open()執行後的異步回調上
 71             //在open()的異步回調任務中綁定this.fd以後會觸發'open'事件,也就是說這個當'open'事件觸發時就會fd了
 72             //所以在第一次寫入時在'open'事件上添加一個一次性的任務,在這個任務內真正的實現寫入操作。
 73             return this.once('open',()=>{
 74                 this._write(chunk, encoding, cb);
 75             });
 76         }
 77         fs.write(this.fd, chunk, this.start, chunk.length, this.writeoffset,(err,writen)=>{
 78             this.writeoffset += writen; //將寫入的位元組數累計到寫入位置上,為下一次寫入提供寫入位置定位
 79             this.writLen -= writen; //將緩存位元組數記錄減掉寫入的位元組數
 80             this.writing = false;
 81             cb && cb();
 82         });
 83     };
 84     _clearBuffer(){
 85         let data = this.cache.deQueue();
 86         if(data){
 87             //當有數據時持續迭代鏈表節點中的緩存,從而實現將緩存中的數據寫入到磁盤中
 88             this._write(data.element.chunk, data.element.encoding, ()=>{
 89                 data.element.cb && data.element.cb();
 90                 this._clearBuffer();
 91             });
 92         }else{
 93             //當緩存中沒有數據
 94             if(this.upstream){  //當上游調用可寫流的程序已經關閉,說明不會再有數據傳入,這時候應該關閉當前的可寫流
 95                 return this.close();
 96             }
 97             if(this.needDrain){ 
 98                 this.needDrain = false;
 99                 this.emit('drain');
100             }
101         }
102     }
103     end(){
104         this.write(null);
105         this.upstream = true;
106     }
107     close(){
108         fs.close(this.fd,()=>{
109             this.emit("close");
110             console.log("文件寫入完成,正常關閉")
111         });
112     }
113 }
114 module.exports = MyFileWriteStream;

文件可寫流模擬源碼實現

模擬流操作需要一個實現鏈表結構的基礎隊列模塊,下面是具體實現源碼:

  1 //鏈表結構
  2 //node節點 + head + null
  3 //head頭 -> null
  4 //size鏈表長度
  5 //next下一個節點 element
  6  //增加、刪除、修改、查詢、清空
  7 
  8  //構造節點
  9  class Node{
 10      constructor(element, next){
 11          this.element = element;
 12          this.next = next;
 13      }
 14  }
 15 //構造鏈表
 16 class LinkedList{
 17     constructor(head, size){
 18         this.head = null;
 19         this.size = 0;
 20     }
 21     //獲取指定節點
 22     _getNode(index){
 23         if(index < 0 || index >= this.size){
 24             throw new Error('cross the border');    //拋出越界錯誤
 25         }
 26         let currentNode = this.head;
 27         for(let i = 0; i < index; i++){
 28             currentNode = currentNode.next;
 29         }
 30         return currentNode;
 31     }
 32     //添加節點:可以在指定的位置添加(即插入節點:傳入索引+節點元素兩個參數),如果只傳入節點元素就默認在鏈表的末尾添加節點
 33     add(index, element){
 34        if(arguments.length === 1){
 35            element = index;
 36            index = this.size;   //當沒有傳入插入位置時,將插入位置默認未鏈表的末尾
 37        }
 38        if(index < 0 || index > this.size){
 39             throw new Error('cross the border');    //拋出越界錯誤
 40        }
 41        if(index === 0){
 42            let head = this.head;
 43            this.head = new Node(element,head);
 44        }else{
 45             let prevNode = this._getNode(index -1);
 46             prevNode.next = new Node(element,prevNode.next);
 47        }
 48        this.size++;
 49     }
 50     //刪除節點
 51     remove(index){
 52         let rmNode = null;  //刪除的節點
 53         if(index === 0){
 54             rmNode = this.head;
 55             if(!rmNode){
 56                 return undefined;
 57             }
 58             this.head = rmNode.next;
 59         }else{
 60             let prevNode = this._getNode(index-1);
 61             rmNode = prevNode.next; 
 62             prevNode.next = prevNode.next.next;
 63         }
 64         this.size --;
 65         return rmNode;
 66     }
 67     //修改鏈表節點
 68     set(index,element){
 69         let node = this._getNode(index);
 70         node.element = element;
 71     }
 72     //查詢鏈表節點
 73     get(index){
 74         return this._getNode(index);
 75     }
 76     //清空鏈表
 77     clear(){
 78         this.head = null;
 79         this.size = 0;
 80     }
 81 }
 82 
 83 //構造鏈表隊列
 84 class Queue{
 85     constructor(){
 86         this.linkedList = new LinkedList();
 87     }
 88     //入隊列
 89     enQueue(data){
 90         this.linkedList.add(data);
 91     }
 92     //出隊列
 93     deQueue(){
 94         return this.linkedList.remove(0);
 95     }
 96 }
 97 
 98 module.exports = {
 99     Node:Node,
100     LinkedList:LinkedList,
101     Queue:Queue
102 };

實現鏈表結構的隊列模塊

最後測試代碼:

 1 const fs = require('fs');
 2 const myFileReadStream = require('./readFile.js');
 3 const myWriteStream = require('./writeFile.js');
 4 
 5 // const rs = fs.createReadStream('./筆記(副本).txt',{
 6 //     highWaterMark:4
 7 // });
 8 // const rs = fs.createReadStream('./tst.txt',{
 9 //     highWaterMark:1
10 // });
11 const rs = new myFileReadStream('./筆記(副本).txt',{
12     highWaterMark:4
13 });
14 
15 // const ws = fs.createWriteStream('./筆記.txt',{
16 //     highWaterMark:12
17 // });
18 const ws = new myWriteStream('./筆記.txt',{
19     highWaterMark:12
20 });
21 rs.pipe(ws);

pipe測試代碼

測試的txt文件可以自己修改,
自定義實現的可讀流和可寫流可以與node原生的文件流API實現交互,
可以基於注釋的測試代碼測試:
比如使用fs的createReadStream與自定義的可寫流模塊實現文件流操作
也可以使用fs的createWriteStream與自定義的可讀流模塊實現文件流操作
Tags: