站長資訊網(wǎng)
        最全最豐富的資訊網(wǎng)站

        node中的stream(流)有幾種類型

        node stream有4種類型:1、Readable(可讀流)。需要實現(xiàn)“_read”方法來返回內(nèi)容;2、Writable(可寫流),需要實現(xiàn)“_write”方法來接受內(nèi)容;3、Duplex(可讀可寫流),需要實現(xiàn)“_read”和“_write”方法來接受和返回內(nèi)容;4、Transform(轉(zhuǎn)換流),需要實現(xiàn)“_transform”方法來把接受的內(nèi)容轉(zhuǎn)換之后返回內(nèi)容。

        node中的stream(流)有幾種類型

        本教程操作環(huán)境:windows7系統(tǒng)、nodejs16版,DELL G3電腦。

        流(Stream)在 Nodejs 中是個十分基礎(chǔ)的概念,很多基礎(chǔ)模塊都是基于流實現(xiàn)的,扮演著十分重要的角色。同時流也是是一個十分難以理解的概念,這主要是相關(guān)的文檔比較缺少,對于 NodeJs 初學者來說,理解流往往需要花很多時間理解,才能真正掌握這個概念,所幸的是,對于大部分 NodeJs 使用者來說,僅僅是用來開發(fā) Web 應用,對流的不充分認識并不影響使用。但是,理解流能夠?qū)?NodeJs 中的其他模塊有更好的理解,同時在某些情況下,使用流來處理數(shù)據(jù)會有更好的效果。

        Stream

        Stream 是在 Node.js 中處理流數(shù)據(jù)的抽象接口。Stream 并不是一個實際的接口,而是對所有流的一種統(tǒng)稱。實際的接口有 ReadableStream、 WritableStream、ReadWriteStream 這幾個。

        interface ReadableStream extends EventEmitter {     readable: boolean;     read(size?: number): string | Buffer;     setEncoding(encoding: BufferEncoding): this;     pause(): this;     resume(): this;     isPaused(): boolean;     pipe<T extends WritableStream>(destination: T, options?: { end?: boolean | undefined; }): T;     unpipe(destination?: WritableStream): this;     unshift(chunk: string | Uint8Array, encoding?: BufferEncoding): void;     wrap(oldStream: ReadableStream): this;     [Symbol.asyncIterator](): AsyncIterableIterator<string | Buffer>; }  interface WritableStream extends EventEmitter {     writable: boolean;     write(buffer: Uint8Array | string, cb?: (err?: Error | null) => void): boolean;     write(str: string, encoding?: BufferEncoding, cb?: (err?: Error | null) => void): boolean;     end(cb?: () => void): this;     end(data: string | Uint8Array, cb?: () => void): this;     end(str: string, encoding?: BufferEncoding, cb?: () => void): this; }  interface ReadWriteStream extends ReadableStream, WritableStream { }

        可以看出 ReadableStream 和 WritableStream 都是繼承 EventEmitter 類的接口(ts中接口是可以繼承類的,因為他們只是在進行類型的合并)。

        上面這些接口對應的實現(xiàn)類分別是 Readable、Writable 和 Duplex

        NodeJs中的流有4種:

        • Readable 可讀流(實現(xiàn)ReadableStream)

        • Writable 可寫流(實現(xiàn)WritableStream)

        • Duplex 可讀可寫流(繼承Readable后實現(xiàn)WritableStream)

        • Transform 轉(zhuǎn)換流(繼承Duplex)

        它們都有要實現(xiàn)的方法:

        • Readable 需要實現(xiàn) _read 方法來返回內(nèi)容

        • Writable 需要實現(xiàn) _write 方法來接受內(nèi)容

        • Duplex 需要實現(xiàn) _read 和 _write 方法來接受和返回內(nèi)容

        • Transform 需要實現(xiàn) _transform 方法來把接受的內(nèi)容轉(zhuǎn)換之后返回

        Readable

        可讀流(Readable)是流的一種類型,他有兩種模式三種狀態(tài)

        兩種讀取模式:

        • 流動模式:數(shù)據(jù)會從底層系統(tǒng)讀取寫入到緩沖區(qū),當緩沖區(qū)被寫滿后自動通過 EventEmitter 盡快的將數(shù)據(jù)傳遞給所注冊的事件處理程序中

        • 暫停模式:在這種模式下將不會主動觸發(fā) EventEmitter 傳輸數(shù)據(jù),必須顯示的調(diào)用 Readable.read() 方法來從緩沖區(qū)中讀取數(shù)據(jù),read 會觸發(fā)響應到 EventEmitter 事件。

        三種狀態(tài):

        • readableFlowing === null(初始狀態(tài))

        • readableFlowing === false(暫停模式)

        • readableFlowing === true(流動模式)

        初始時流的 readable.readableFlowingnull

        添加data事件后變?yōu)?true 。調(diào)用 pause()、unpipe()、或接收到背壓或者添加 readable 事件,則 readableFlowing 會被設(shè)為 false ,在這個狀態(tài)下,為 data 事件綁定監(jiān)聽器不會使 readableFlowing 切換到 true。

        調(diào)用 resume() 可以讓可讀流的 readableFlowing 切換到 true

        移除所有的 readable 事件是使 readableFlowing 變?yōu)?null 的唯一方法。

        事件名 說明
        readable 當緩沖區(qū)有新的可讀取數(shù)據(jù)時觸發(fā)(每一個想緩存池插入節(jié)點都會觸發(fā))
        data 每一次消費數(shù)據(jù)后都會觸發(fā),參數(shù)是本次消費的數(shù)據(jù)
        close 流關(guān)閉時觸發(fā)
        error 流發(fā)生錯誤時觸發(fā)
        方法名 說明
        read(size) 消費長度為size的數(shù)據(jù),返回null表示當前數(shù)據(jù)不足size,否則返回本次消費的數(shù)據(jù)。size不傳遞時表示消費緩存池中所有數(shù)據(jù)
        const fs = require('fs');  const readStreams = fs.createReadStream('./EventEmitter.js', {     highWaterMark: 100// 緩存池浮標值 })  readStreams.on('readable', () => {     console.log('緩沖區(qū)滿了')     readStreams.read()// 消費緩存池的所有數(shù)據(jù),返回結(jié)果并且觸發(fā)data事件 })   readStreams.on('data', (data) => {     console.log('data') })

        https://github1s.com/nodejs/node/blob/v16.14.0/lib/internal/streams/readable.js#L527

        當 size 為 0 會觸發(fā) readable 事件。

        當緩存池中的數(shù)據(jù)長度達到浮標值 highWaterMark 后,就不會在主動請求生產(chǎn)數(shù)據(jù),而是等待數(shù)據(jù)被消費后在生產(chǎn)數(shù)據(jù)

        暫停狀態(tài)的流如果不調(diào)用 read 來消費數(shù)據(jù)時,后續(xù)也不會在觸發(fā) datareadable,當調(diào)用 read 消費時會先判斷本次消費后剩余的數(shù)據(jù)長度是否低于 浮標值,如果低于 浮標值 就會在消費前請求生產(chǎn)數(shù)據(jù)。這樣在 read 后的邏輯執(zhí)行完成后新的數(shù)據(jù)大概率也已經(jīng)生產(chǎn)完成,然后再次觸發(fā) readable,這種提前生產(chǎn)下一次消費的數(shù)據(jù)存放在緩存池的機制也是緩存流為什么快的原因

        流動狀態(tài)下的流有兩種情況

        • 生產(chǎn)速度慢于消費速度時:這種情況下每一個生產(chǎn)數(shù)據(jù)后一般緩存池中都不會有剩余數(shù)據(jù),直接將本次生產(chǎn)的數(shù)據(jù)傳遞給 data 事件即可(因為沒有進入緩存池,所以也不用調(diào)用 read 來消費),然后立即開始生產(chǎn)新數(shù)據(jù),待上一次數(shù)據(jù)消費完后新數(shù)據(jù)才生產(chǎn)好,再次觸發(fā) data ,一只到流結(jié)束。
        • 生產(chǎn)速度快于消費速度時:此時每一次生產(chǎn)完數(shù)據(jù)后一般緩存池都還存在未消費的數(shù)據(jù),這種情況一般會在消費數(shù)據(jù)時開始生產(chǎn)下一次消費的數(shù)據(jù),待舊數(shù)據(jù)消費完后新數(shù)據(jù)已經(jīng)生產(chǎn)完并且放入緩存池

        他們的區(qū)別僅僅在于數(shù)據(jù)生產(chǎn)后緩存池是否還存在數(shù)據(jù),如果存在數(shù)據(jù)則將生產(chǎn)的數(shù)據(jù) push 到緩存池等待消費,如果不存在則直接將數(shù)據(jù)交給 data 而不加入緩存池。

        值得注意的是當一個緩存池中存在數(shù)據(jù)的流從暫停模式進入的流動模式時,會先循環(huán)調(diào)用 read 來消費數(shù)據(jù)只到返回 null

        暫停模式

        node中的stream(流)有幾種類型

        暫停模式下,一個可讀流讀創(chuàng)建時,模式是暫停模式,創(chuàng)建后會自動調(diào)用 _read 方法,把數(shù)據(jù)從數(shù)據(jù)源 push 到緩沖池中,直到緩沖池中的數(shù)據(jù)達到了浮標值。每當數(shù)據(jù)到達浮標值時,可讀流會觸發(fā)一個 " readable " 事件,告訴消費者有數(shù)據(jù)已經(jīng)準備好了,可以繼續(xù)消費。

        一般來說, 'readable' 事件表明流有新的動態(tài):要么有新的數(shù)據(jù),要么到達流的盡頭。所以,數(shù)據(jù)源的數(shù)據(jù)被讀完前,也會觸發(fā)一次 'readable' 事件;

        消費者 " readable " 事件的處理函數(shù)中,通過 stream.read(size) 主動消費緩沖池中的數(shù)據(jù)。

        const { Readable } = require('stream')  let count = 1000 const myReadable = new Readable({     highWaterMark: 300,     // 參數(shù)的 read 方法會作為流的 _read 方法,用于獲取源數(shù)據(jù)     read(size) {         // 假設(shè)我們的源數(shù)據(jù)上 1000 個1         let chunk = null         // 讀取數(shù)據(jù)的過程一般是異步的,例如IO操作         setTimeout(() => {             if (count > 0) {                 let chunkLength = Math.min(count, size)                 chunk = '1'.repeat(chunkLength)                 count -= chunkLength             }             this.push(chunk)         }, 500)     } }) // 每一次成功 push 數(shù)據(jù)到緩存池后都會觸發(fā) readable myReadable.on('readable', () => {     const chunk = myReadable.read()//消費當前緩存池中所有數(shù)據(jù)     console.log(chunk.toString()) })

        值得注意的是, 如果 read(size) 的 size 大于浮標值,會重新計算新的浮標值,新浮標值是size的下一個二次冪(size <= 2^n,n取最小值)

        //  hwm 不會大于 1GB. const MAX_HWM = 0x40000000; function computeNewHighWaterMark(n) {   if (n >= MAX_HWM) {     // 1GB限制     n = MAX_HWM;   } else {     //取下一個2最高冪,以防止過度增加hwm     n--;     n |= n >>> 1;     n |= n >>> 2;     n |= n >>> 4;     n |= n >>> 8;     n |= n >>> 16;     n++;   }   return n; }

        流動模式

        node中的stream(流)有幾種類型

        所有可讀流開始的時候都是暫停模式,可以通過以下方法可以切換至流動模式:

        • 添加 " data " 事件句柄;
        • 調(diào)用 “ resume ”方法;
        • 使用 " pipe " 方法把數(shù)據(jù)發(fā)送到可寫流

        流動模式下,緩沖池里面的數(shù)據(jù)會自動輸出到消費端進行消費,同時,每次輸出數(shù)據(jù)后,會自動回調(diào) _read 方法,把數(shù)據(jù)源的數(shù)據(jù)放到緩沖池中,如果此時緩存池中不存在數(shù)據(jù)則會直接吧數(shù)據(jù)傳遞給 data 事件,不會經(jīng)過緩存池;直到流動模式切換至其他暫停模式,或者數(shù)據(jù)源的數(shù)據(jù)被讀取完了( push(null) );

        可讀流可以通過以下方式切換回暫停模式:

        • 如果沒有管道目標,則調(diào)用 stream.pause() 。
        • 如果有管道目標,則移除所有管道目標。調(diào)用 stream.unpipe() 可以移除多個管道目標。
        const { Readable } = require('stream')  let count = 1000 const myReadable = new Readable({     highWaterMark: 300,     read(size) {         let chunk = null         setTimeout(() => {             if (count > 0) {                 let chunkLength = Math.min(count, size)                 chunk = '1'.repeat(chunkLength)                 count -= chunkLength             }             this.push(chunk)         }, 500)     } })  myReadable.on('data', data => {     console.log(data.toString()) })

        Writable

        相對可讀流來說,可寫流要簡單一些。

        node中的stream(流)有幾種類型

        當生產(chǎn)者調(diào)用 write(chunk) 時,內(nèi)部會根據(jù)一些狀態(tài)(corked,writing等)選擇是否緩存到緩沖隊列中或者調(diào)用 _write,每次寫完數(shù)據(jù)后,會嘗試清空緩存隊列中的數(shù)據(jù)。如果緩沖隊列中的數(shù)據(jù)大小超出了浮標值(highWaterMark),消費者調(diào)用 write(chunk) 后會返回 false,這時候生產(chǎn)者應該停止繼續(xù)寫入。

        那么什么時候可以繼續(xù)寫入呢?當緩沖中的數(shù)據(jù)都被成功 _write 之后,清空了緩沖隊列后會觸發(fā) drain 事件,這時候生產(chǎn)者可以繼續(xù)寫入數(shù)據(jù)。

        當生產(chǎn)者需要結(jié)束寫入數(shù)據(jù)時,需要調(diào)用 stream.end 方法通知可寫流結(jié)束。

        const { Writable, Duplex } = require('stream') let fileContent = '' const myWritable = new Writable({     highWaterMark: 10,     write(chunk, encoding, callback) {// 會作為_write方法         setTimeout(()=>{             fileContent += chunk             callback()// 寫入結(jié)束后調(diào)用         }, 500)     } })  myWritable.on('close', ()=>{     console.log('close', fileContent) }) myWritable.write('123123')// true myWritable.write('123123')// false myWritable.end()

        注意,在緩存池中數(shù)據(jù)到達浮標值后,此時緩存池中可能存在多個節(jié)點,在清空緩存池的過程中(循環(huán)調(diào)用_read),并不會向可讀流一樣盡量一次消費長度為浮標值的數(shù)據(jù),而是每次消費一個緩沖區(qū)節(jié)點,即使這個緩沖區(qū)長度于浮標值不一致也是如此

        const { Writable } = require('stream')   let fileContent = '' const myWritable = new Writable({     highWaterMark: 10,     write(chunk, encoding, callback) {         setTimeout(()=>{             fileContent += chunk             console.log('消費', chunk.toString())             callback()// 寫入結(jié)束后調(diào)用         }, 100)     } })  myWritable.on('close', ()=>{     console.log('close', fileContent) })  let count = 0 function productionData(){     let flag = true     while (count <= 20 && flag){         flag = myWritable.write(count.toString())         count++     }     if(count > 20){         myWritable.end()     } } productionData() myWritable.on('drain', productionData)

        上述是一個浮標值為 10 的可寫流,現(xiàn)在數(shù)據(jù)源是一個 0——20 到連續(xù)的數(shù)字字符串,productionData 用于寫入數(shù)據(jù)。

        • 首先第一次調(diào)用 myWritable.write("0") 時,因為緩存池不存在數(shù)據(jù),所以 "0" 不進入緩存池,而是直接交給 _wirte,myWritable.write("0") 返回值為 true

        • 當執(zhí)行 myWritable.write("1") 時,因為 _wirtecallback 還未調(diào)用,表明上一次數(shù)據(jù)還未寫入完,位置保證數(shù)據(jù)寫入的有序性,只能創(chuàng)建一個緩沖區(qū)將 "1" 加入緩存池中。后面 2-9 都是如此

        • 當執(zhí)行 myWritable.write("10") 時,此時緩沖區(qū)長度為 9(1-9),還未到達浮標值, "10" 繼續(xù)作為一個緩沖區(qū)加入緩存池中,此時緩存池長度變?yōu)?11,所以 myWritable.write("1") 返回 false,這意味著緩沖區(qū)的數(shù)據(jù)已經(jīng)足夠,我們需要等待 drain 事件通知時再生產(chǎn)數(shù)據(jù)。

        • 100ms過后,_write("0", encoding, callback)callback 被調(diào)用,表明 "0" 已經(jīng)寫入完成。然后會檢查緩存池中是否存在數(shù)據(jù),如果存在則會先調(diào)用 _read 消費緩存池的頭節(jié)點("1"),然后繼續(xù)重復這個過程直到緩存池為空后觸發(fā) drain 事件,再次執(zhí)行 productionData

        • 調(diào)用 myWritable.write("11"),觸發(fā)第1步開始的過程,直到流結(jié)束。

        Duplex

        在理解了可讀流與可寫流后,雙工流就好理解了,雙工流事實上是繼承了可讀流然后實現(xiàn)了可寫流(源碼是這么寫的,但是應該說是同時實現(xiàn)了可讀流和可寫流更加好)。

        node中的stream(流)有幾種類型

        Duplex 流需要同時實現(xiàn)下面兩個方法

        • 實現(xiàn) _read() 方法,為可讀流生產(chǎn)數(shù)據(jù)

        • 實現(xiàn) _write() 方法,為可寫流消費數(shù)據(jù)

        上面兩個方法如何實現(xiàn)在上面可寫流可讀流的部分已經(jīng)介紹過了,這里需要注意的是,雙工流是存在兩個獨立的緩存池分別提供給兩個流,他們的數(shù)據(jù)源也不一樣

        以 NodeJs 的標準輸入輸出流為例:

        • 當我們在控制臺輸入數(shù)據(jù)時會觸發(fā)其 data 事件,這證明他有可讀流的功能,每一次用戶鍵入回車相當于調(diào)用可讀的 push 方法推送生產(chǎn)的數(shù)據(jù)。
        • 當我們調(diào)用其 write 方法時也可以向控制臺輸出內(nèi)容,但是不會觸發(fā) data 事件,這說明他有可寫流的功能,而且有獨立的緩沖區(qū),_write 方法的實現(xiàn)內(nèi)容就是讓控制臺展示文字。
        // 每當用戶在控制臺輸入數(shù)據(jù)(_read),就會觸發(fā)data事件,這是可讀流的特性 process.stdin.on('data', data=>{     process.stdin.write(data); })  // 每隔一秒向標準輸入流生產(chǎn)數(shù)據(jù)(這是可寫流的特性,會直接輸出到控制臺上),不會觸發(fā)data setInterval(()=>{     process.stdin.write('不是用戶控制臺輸入的數(shù)據(jù)') }, 1000)

        Transform

        node中的stream(流)有幾種類型

        可以將 Duplex 流視為具有可寫流的可讀流。兩者都是獨立的,每個都有獨立的內(nèi)部緩沖區(qū)。讀寫事件獨立發(fā)生。

                                     Duplex Stream                           ------------------|                     Read  <-----               External Source             You           ------------------|                       Write ----->               External Sink                           ------------------|

        Transform 流是雙工的,其中讀寫以因果關(guān)系進行。雙工流的端點通過某種轉(zhuǎn)換鏈接。讀取要求發(fā)生寫入。

                                         Transform Stream                            --------------|--------------             You     Write  ---->                   ---->  Read  You                            --------------|--------------

        對于創(chuàng)建 Transform 流,最重要的是要實現(xiàn) _transform 方法而不是 _write 或者 _read。 _transform 中對可寫流寫入的數(shù)據(jù)做處理(消費)然后為可讀流生產(chǎn)數(shù)據(jù)。

        轉(zhuǎn)換流還經(jīng)常會實現(xiàn)一個 `_flush` 方法,他會在流結(jié)束前被調(diào)用,一般用于對流的末尾追加一些東西,例如壓縮文件時的一些壓縮信息就是在這里加上的
        const { write } = require('fs') const { Transform, PassThrough } = require('stream')  const reurce = '1312123213124341234213423428354816273513461891468186499126412'  const transform = new Transform({     highWaterMark: 10,     transform(chunk ,encoding, callback){// 轉(zhuǎn)換數(shù)據(jù),調(diào)用push將轉(zhuǎn)換結(jié)果加入緩存池         this.push(chunk.toString().replace('1', '@'))         callback()     },     flush(callback){// end觸發(fā)前執(zhí)行         this.push('<<<')         callback()     } })   // write 不斷寫入數(shù)據(jù) let count = 0 transform.write('>>>') function productionData() {     let flag = true     while (count <= 20 && flag) {         flag = transform.write(count.toString())         count++     }     if (count > 20) {         transform.end()     } } productionData() transform.on('drain', productionData)   let result = '' transform.on('data', data=>{     result += data.toString() }) transform.on('end', ()=>{     console.log(result)     // >>>0@23456789@0@1@2@3@4@5@6@7@8@920<<< })

        贊(0)
        分享到: 更多 (0)
        網(wǎng)站地圖   滬ICP備18035694號-2    滬公網(wǎng)安備31011702889846號
        主站蜘蛛池模板: 国产精品视频第一区二区三区| 2021国产精品视频网站| 国产精品久久久久久| 久久久久久青草大香综合精品| 8AV国产精品爽爽ⅴa在线观看| 亚洲日韩国产精品乱| 国产精品欧美一区二区三区 | 柠檬福利精品视频导航| 无码国产精品一区二区免费16| 四虎国产精品免费久久| 国产午夜精品一区二区| 亚洲一二成人精品区| 国产成人精品免费视频动漫| 少妇人妻偷人精品免费视频| 欧美精品一区二区在线精品 | 国产精品v片在线观看不卡| 亚洲中文字幕久久精品无码喷水| 国产乱子伦精品免费视频| 久久亚洲国产欧洲精品一| 国产午夜无码精品免费看动漫| 亚洲精品无码永久在线观看你懂的 | 久久精品国产亚洲网站| 国产精品嫩草影院AV| 青草国产精品久久久久久| 一本一本久久a久久综合精品蜜桃 一本一道精品欧美中文字幕 | 亚洲AV无码精品色午夜果冻不卡| 日本精品久久久久影院日本| 国产乱人伦精品一区二区在线观看| 尤物国产在线精品福利一区| 国产精品视频一区二区三区经| 99久久精品国产免看国产一区| 国产精品igao视频网网址| 国产成人精品视频一区二区不卡| 国产精品乱码高清在线观看| 国产麻豆精品一区二区三区v视界| 国产亚洲精品资源在线26u| 国产精品无码久久综合| 99久久人妻无码精品系列蜜桃| 国产精品黄网站| 亚洲精品福利视频| 国产高清在线精品一本大道国产 |