Flink中非同步AsyncIO的實現 (源碼分析)
- 2019 年 11 月 15 日
- 筆記
先上張圖整體了解Flink中的非同步io
阿里貢獻給flink的,優點就不說了嘛,官網上都有,就是寫庫不會柱塞性能更好
然後來看一下, Flink 中非同步io主要分為兩種
一種是有序Ordered
一種是無序UNordered
主要區別是往下游output的順序(注意這裡順序不是寫庫的順序既然都非同步了寫庫的順序自然是無法保證的),有序的會按接收的順序繼續往下游output發送,無序就是誰先處理完誰就先往下游發送
兩張圖了解這兩種模式的實現
有序:record數據會通過非同步執行緒寫庫,Emitter是一個守護進程,會不停的拉取queue頭部的數據,如果頭部的數據非同步寫庫完成,Emitter將頭數據往下游發送,如果頭元素還沒有非同步寫庫完成,柱塞
無序:record數據會通過非同步執行緒寫庫,這裡有兩個queue,一開始放在uncompleteedQueue,當哪個record非同步寫庫成功後就直接放到completedQueue中,Emitter是一個守護進程,completedQueue只要有數據,會不停的拉取queue數據往下游發送
可以看到原理還是很簡單的,兩句話就總結完了,就是利用queue和java的非同步執行緒,現在來看下源碼
這裡AsyncIO在Flink中被設計成operator中的一種,自然去OneInputStreamOperator的實現類中去找
於是來看一下AsyncWaitOperator.java
看到它的open方法(open方法會在taskmanager啟動job的時候全部統一調用,可以翻一下以前的文章)
這裡啟動了一個守護執行緒Emitter,來看下執行緒具體做了什麼
1處拉取數據,2處就是常規的將拉取到的數據往下游emit,Emitter拉取數據,這裡先不講因為分為有序的和無序的
這裡已經知道了這個Emitter的作用是循環的拉取數據往下游發送
回到AsyncWaitOperator.java在它的open方法初始化了Emitter,那它是如何處理接收到的數據的呢,看它的ProcessElement()方法
其實主要就是三個個方法
先是!!!將record封裝成了一個包裝類StreamRecordQueueEntry,主要是這個包裝類的構造方法中,創建了一個CompleteableFuture(這個的complete方法其實會等到用戶程式碼執行的時候用戶自己決定什麼時候完成)
1處主要就是講元素加入到了對應的queue,這裡也分為兩種有序和無序的
這裡也先不講這兩種模式加入數據的區別
接著2處就是調用用戶的程式碼了,來看看官網的非同步io的例子
給了一個Future作為參數,用戶自己起了一個執行緒(這裡思考一下就知道了為什麼要新起一個非同步執行緒去執行,因為如果不起執行緒的話,那processElement方法就柱塞了,無法非同步了)去寫庫讀庫等,然後調用了這個參數的complete方法(也就是前面那個包裝類中的CompleteableFuture)並且傳入了一個結果
看下complete方法源碼
這個resultFuture是每個record的包裝類StreamRecordQueueEntry的其中一個屬性是一個CompletableFuture
那現在就清楚了,用戶程式碼在自己新起的執行緒中當自己的邏輯執行完以後會使這個非同步執行緒結束,並輸入一個結果
那這個幹嘛用的呢
最開始的圖中看到有序和無序實現原理,有序用一個queue,無序用兩個queue分別就對應了
OrderedStreamElementQueue類中
UnorderedStreamElementQueue類中
回到前面有兩個地方沒有細講,一是兩種模式的Emitter是如何拉取數據的,二是兩種模式下數據是如何加入OrderedStreamElementQueue的
有序模式:
1.先來看一下有序模式的,Emitter的數據拉取,和數據的加入
其tryPut()方法
onComplete方法
onCompleteHandler方法
這裡比較繞,先將接收的數據加入queue中,然後onComplete()中當上一個非同步執行緒getFuture() 其實就是每個元素包裝類裡面的那個CompletableFuture,當他結束時(會在用戶方法用戶調用complete時結束)非同步調用傳入的對象的 accept方法,accept方法中調用了onCompleteHandler()方法,onCompleteHandler方法中會判斷queue是否為空,以及queue的頭元素是否完成了用戶的非同步方法,當完成的時候,就會將headIsCompleted這個對象signalAll()喚醒
2.接著看有序模式Emitter的拉取數據
這裡有序方式拉取數據的邏輯很清晰,如果為空或者頭元素沒有完成用戶的非同步方法,headIsCompleted這個對象會wait住(上面可以知道,當加入元素的到queue且頭元素完成非同步方法的時候會signalAll())然後將頭數據返回,往下游發送
這樣就實現了有序發送,因為Emitter只拉取頭元素且已經完成用戶非同步方法的頭元素
無序模式:
這裡和有序模式就大同小異了,只是變成了,接收數據後直接加入uncompletedQueue,當數據完成非同步方法的時候就,放到completedQueue裡面去並signalAll(),只要completedqueue裡面有數據,Emitter就拉取往下發
這樣就實現了無序模式,也就是非同步寫入誰先處理完就直接放到完成隊列裡面去,然後往下發,不用管接收數據的順序