哇咔咔乾貨來啦:PowerJob 原理剖析之 Akka Toolkit

本文適合有 Java 基礎知識的人群

作者:HelloGitHub-Salieri

HelloGitHub 推出的《講解開源項目》系列。

Akka is a toolkit for building highly concurrent, distributed, and resilient message-driven applications for Java and Scala.

上面這段文字摘抄自 Akka 官網(akka.io),翻譯成中文也就是:「Akka 是一個為 Java 和 Scala 構建高並發、分佈式和彈性消息驅動應用程序的工具包」。而 Akka 具有的一切特性,其實都源自於一個用於處理並發計算問題的模型——Actor 模型。

PowerJob 項目地址:

//github.com/KFCFans/PowerJob

一、Actor 模型

Actor 模型在 1973 年於 Carl Hewitt、Peter Bishop 及 Richard Steiger 的論文中提出,現在已經被用作並發計算的理論理解框架和並發系統的實際實現基礎。

在計算機科學中,Actor 模型是一種並發運算上的模型。Actor 是一種程序上的抽象概念,被視為並發運算的基本單元:當一個 Actor 接收到一則消息,它可以做出一些決策、創建更多的 Actor 、發送更多的消息、決定要如何處理接下來的消息。Actors 可以修改它們自己的私有狀態,但是只能通過消息間接的相互影響(避免了基於鎖的同步)。

每一個 Actor 都由狀態(State)、行為(Behavior)和郵箱(MailBox,其實就是一個消息隊列)三部分組成:

  • 狀態:Actor 中的狀態指 Actor 對象的變量信息,狀態由 Actor 自己管理,避免了並發環境下的鎖和內存原子性等問題。
  • 行為:Actor 中的計算邏輯,通過 Actor 接收到的消息來改變 Actor 的狀態。
  • 郵箱:郵箱是 Actor 和 Actor 之間的通信橋樑,郵箱內部通過 FIFO(先入先出)消息隊列來存儲發送方 Actor 消息,接受方 Actor 從郵箱隊列中獲取消息。

前面說了一大堆晦澀難懂的概念,相信大家看的也都雲里霧裡的。這裡結合我自己的理解用白話文講一下:其實 Actor 模型的設計思想就是事件驅動,可以簡單理解為線程級的消息中間件。所有 Actor 之間不共享數據,只通過消息溝通,因此不用關心傳統並發程序編寫過程中的並發安全問題(因為根本沒有共享的數據)。同時,得益於 Actor 底層輕巧的設計(這部分其實屬於具體實現了,不過目前所有的實現 Actor 設計都很輕量),使得單機可以存在百萬量級的 Actor,因此能夠帶來極好的並發性能。

此外,由於 Actor 模型中萬物都是 Actor,所以它是天然支持分佈式的,即不同機器之間的 Actor 通訊和本地 Actor 之間的通訊沒有實質上的區別。

因此,只要你掌握了事件驅動的編程思想,利用 Actor 模型,結合具體的實現框架(比如 JVM 系的 Akka),能夠輕鬆編寫出高性能的分佈式應用。

二、Akka Toolkits

Akka Toolkit 也就是 Akka 工具包,其實就是 JVM 平台上對 Actor 模型的一種實現。Akka 本身提供了完整的 Actor 模型支持,包括對並發/並行程序的簡單的、高級別的抽象、異步、非阻塞、高性能的事件驅動編程模型和非常輕量的事件驅動處理。同時,作為一個「工具包」,Akka 還額外提供了許多功能,由於篇幅有限,這裡就簡單介紹幾個包,有興趣可以前往官網(見參考文檔)詳細了解~

  • akka-streams:流處理組件,提供直觀、安全的方式來進行異步、非阻塞的背壓流處理。

  • akka-http:HTTP 組件,現代、快速、異步、流媒體優先的 HTTP 服務器和客戶端。

  • akka-cluster:集群組件,包括集群成員管理、彈性路由等。

  • akka-remote(artery-remoting):通訊組件,也是 PowerJob 所使用的核心組件,然而官網並不推薦直接使用(直接使用 remote 啟動還會警告使用了過於底層的 API),普通分佈式應用推薦直接使用 cluster。

  • akka-persistence:持久化組件,提供「至少投遞一次」的能力來保證消息的可靠送達。

三、Akka 簡單使用

接下來是關於 Akka 的一個超簡明教程,幫助大家初步理解併入門 Akka,其內容涵蓋了所有 PowerJob 中用到的 API,也就是說,看懂這部分,源碼中的 Akka 就不再可怕嘍~

3.1 開發 Actor

首先,不得不提的一點是,Akka 從 2.6 版本開始,維護了 2 套 API(算上 Scala 和 Java 版本就 4 套了…看着IDE的智能提示就頭大…),分別叫 classic 和 typed。typed 與原先的 classic 相比,最大的特色就是其具有了類型(Java 范型)。每一個 Actor 處理的消息類型可以直接由范型規定,從而有效限制程序 bug(將錯誤從運行期提前到了編譯期)。然而,對於複雜系統要處理的消息不勝枚舉,強類型就限制了一個 Actor 只能處理一種類型的消息。雖然從邏輯上來講確實清晰,但實際工程實現中,必然導致代碼閱讀困難,整體結構鬆散(個人感覺這一點也是計算機科學與工程之間存在分歧的表現,當然也可能是我學藝不精,不了解正確的用法)。解釋了那麼多,終於可以點明主旨了~作者比較喜歡 classic,因此 PowerJob 只使用 AKKA classic API,本文也只涉及 AKKA classic API,反正官網說了會長期維護~

前面說過,對於 Actor 模型個人認為最簡單的理解方式就是消息中間件。Actor 的本質是事件驅動,即接收消息並處理。反映到編程上,Actor 的開發也類似於消息中間件 consumer 的開發,無非是換了個接口、多幾個功能罷了。

話不多說,看代碼:

public class FriendActor extends AbstractActor {
  
    @Override
    public Receive createReceive() {
        return receiveBuilder()
                .match(Ping.class, this::onReceivePing)
                .matchAny(obj -> log.warn("unknown request: {}.", obj))
                .build();
    }

    private void onReceivePing(Ping ping) {
        getSender().tell(AskResponse.succeed(null), getSelf());
    }
}

首先自然是新建類並實現接口 AbstractActor,該接口需要重寫 createReceive 方法,該方法需要一個 Receive 對象作為返回值。對於開發者而言,需要做的就是構建這個 Receive 對象,也就是指明該 Actor 接受到什麼類型的消息時進行什麼樣的處理。

3.2 初始化 ActorSystem

Actor 作為處理消息的「角色」,就像工廠中的一個個工人,每個人各司其職,兢兢業業地接收指令完成任務。然而群龍不能無首,就像現實生活中工人需要由工廠來組織管理一樣,Actor 也需要自己的工廠—— ActorSystem。為此,創建 Actor 之前,首先需要創建 ActorSystem。

PowerJob 使用以下方法創建 ActorSystem。其中,第一個參數指明了該 ActorSystem 的名稱,第二個參數則傳入了該 ActorSystem 所使用的配置信息,包括工作端口、序列化方式、日誌級別等。

actorSystem = ActorSystem.create("powerjob-server", akkaConfig);

完成 ActorSystem 這個「工廠」的創建後,就可以正式開始創建 Actor 了,代碼如下所示:

actorSystem.actorOf(Props.create(FriendActor.class), "friend_actor");

其中,第一個參數Props是一個用來在創建 Actor 時指定選項的配置類;

第二個參數則指定了該 Actor 的名稱,通過該 Actor 的名稱和其 ActorSystem 的名稱,我們就可以構建出路徑 akka://powerjob-server/user/server_actor(本地路徑,遠程路徑需要變更協議並添加地址),然後輕鬆得根據該路徑找到該 Actor,實現通信。

3.3 信息交互

完成 ActorSystem 的初始化和 Actor 的創建後,就可以正式使用 Akka 框架了。PowerJob 主要使用 Akka 框架的 remote 組件,用於完成系統中各個分佈式節點的通訊。

String actorPath = "akka://[email protected]/user/friend_actor";
ActorSelection actorSelect = actorSystem.actorSelection(actorPath);
actorSelect.tell(startTaskReq, null);

和其他通訊方式一樣,進行通訊前,需要首先獲取目標地址。根據 akka-remote 的語法規範,指定目標 Actor 的名稱、其所在的 ActorSystem 名稱和目標機器地址,即可獲取用於通訊的 URI。得到 URI 後,便可通過 actorselection() 方法獲取 Actorselection 對象。通過 Actorselection 對象,調用 tell 方法就可以向目標 Actor 發送消息了。

那麼細心的小夥伴肯定要問了,PowerJob 之所以採用 akka-remote 作為底層通訊框架,是看上了它極簡的通訊 API,看到這裡,也沒發現有多簡單啊。發送一個 HTTP 請求,用高層封裝庫其實也就差不多三行代碼的樣子,你這用個 Akka 前置準備工作還那麼多,說好的簡單呢?那麼下面就帶大家來一探究竟,akka-remote 到底簡單在哪裡~

首先,如果不選擇現有的協議,自己用 Netty 造輪子,那光 server、client、listener、codec 就一大堆代碼了。如果使用現有協議如 HTTP,發送也許 3 行代碼能搞定,但接收一定遠不止三行。HTTP 全稱超文本傳輸協議,那麼傳輸的自然已經是經過序列化的文本數據了,所以接收方需要自行進行解碼、解析,更別提異常處理、失敗重試等功能了。而 akka-remote 呢?從剛剛 Actor 的代碼中可以看出,match 方法後面跟的是一個具體的類,也就是說 Akka 自動幫你完成了反序列化,作為消息的接收方,是真正的拿到就能用,沒有任何多餘代碼。同時,Akka 已經幫你搞定了各種異常後的處理。也就是說,使用 akka-remote,可以讓數據接收方非常的簡單,只專註邏輯的實現。

其次,在分佈式環境中,通訊往往不是單向的。尤其是 PowerJob 這種追求高可用的框架,有時候為了確認消息送達,往往需要應答機制。akka-remote 提供了難以置信的 API 來回復請求:

AskResponse response = new AskResponse(true, "success");
getSender().tell(response, getSelf());

通過 getSender() 方法,就能獲取到消息發送方的 Actor 引用對象,並通過該對象回複信息。

四、最後

那麼以上就是本篇文章全部的內容啦~

通過本篇文章,我相信大家已經了解了 Actor 模型的基礎概念,同時掌握了 JVM 上 Actor 模型的實現——Akka 框架的簡單使用。

下一篇文章,就是萬眾期待的 PowerJob 調度層原理分析啦(小夥伴進群必問榜 TOP 1)~我將會為大家揭秘是什麼支撐着 PowerJob 的調度,讓我能放肆「吹牛」說調度性能秒殺現有一切框架~

那我們下期再見嘍~拜拜~

五、參考文獻

作者遊記


HelloGitHub 交流群現已全面開放(作者在 Java 群),添加微信號:HelloGitHub 為好友入群,可同前端、Java、Go 等各界大佬談笑風生、切磋技術~