事件驅動的微服務-事件驅動設計
本篇是「事件驅動的微服務」系列的第二篇,主要講述事件驅動設計。如果想要了解總體設計,請看第一篇“事件驅動的微服務-總體設計”
程式流程
我們通過一個具體的例子來講解事件驅動設計。 本文中的程式有兩個微服務,一個是訂單服務(Order Service), 另一個是支付服務(Payment Service)。用戶調用訂單服務的用例createOrder()來創建訂單,創建之後的訂單暫時還沒有支付資訊,訂單服務然後發布命令(Command)給支付服務,支付服務完成支付,發送支付完成(Payment Created)消息。訂單服務收到消息(Event),在Order表裡增加Payment_Id並修改訂單狀態為「已付款」。
下面就是組件圖:
事件處理
事件分成內部事件和外部事件,內部事件是存在於一個微服務內部的事件,不與其他微服務共享。如果用DDD的語言來描述就是在有界上下文(Bounded Context)內的域事件(Domain Event)。外部事件是從一個微服務發布,而被其他微服務接收的事件。如果用DDD的語言來描述就是在不同有界上下文(Bounded Context)之間傳送的域事件(Domain Event)。這兩種事件的處理方式不同。
內部事件:
對於內部事件的處理早已有了成熟的方法,它的基本思路是創建一個事件匯流排(Event Bus),由它來監聽事件。然後註冊不同的事件處理器(Event Handler)來處理事件。這種思路被廣泛地用於各種領域。
下面就是事件匯流排(Event Bus)的介面,它有兩個函數,一個是發布事件(Publish Event),另一個是添加事件處理器(Event Handler)。一個事件可以有一個或多個事件處理器。
type EventBus interface {
PublishEvent(EventMessage)
AddHandler(EventHandler, ...interface{})
}
事件匯流排的程式碼的關鍵部分是載入事件處理器。我們以訂單服務為例,下面就是載入事件處理器(Event Handler)的程式碼,它是初始化容器程式碼的一部分。在這段程式碼中,它只註冊了一個事件,支付完成事件(PaymentCreateEvent),和與之相對應的事件處理器-支付完成事件處理器(PaymentCreatedEventHandler)。
func loadEventHandler(c servicecontainer.ServiceContainer) error {
var value interface{}
var found bool
rluf, err := containerhelper.BuildModifyOrderUseCase(&c)
if err != nil {
return err
}
pceh := event.PaymentCreatedEventHandler{rluf}
if value, found = c.Get(container.EVENT_BUS); !found {
message := "can't find key=" + container.EVENT_BUS + " in container "
return errors.New(message)
}
eb := value.(ycq.EventBus)
eb.AddHandler(pceh,&event.PaymentCreatedEvent{})
return nil
}
由於在處理事件時要調用相應的用例,因此需要把用例注入到事件處理器中。在上段程式碼中,首先從容器中獲得用例,然後創建事件處理器,最後把事件和與之對應的處理器加入到事件匯流排中。
事件的發布是通過調用事件匯流排的PublishEvent()來實現的。下面的例子就是在訂單服務中通過消息中間件來監聽來自外部的支付完成事件(PaymentCreatedEvent),收到後,把它轉化成內部事件,然後發送到事件匯流排上,這樣已經註冊的事件處理器就能處理它了。
eb := value.(ycq.EventBus)
subject := config.SUBJECT_PAYMENT_CREATED
_, err := ms.Subscribe(subject, func(pce event.PaymentCreatedEvent) {
cpm := pce.NewPaymentCreatedDescriptor()
logger.Log.Debug("payload:",pce)
eb.PublishEvent(cpm)
})
那麼事件是怎樣被處理的呢?關鍵就在PublishEvent函數。當一個事件發布時,事件匯流排會把所有註冊到該事件的事件處理器的Handle()函數依次調用一遍, 下面就是PublishEvent()的程式碼。這樣每個事件處理器只要實現Handle()函數就可以了。
func (b *InternalEventBus) PublishEvent(event EventMessage) {
if handlers, ok := b.eventHandlers[event.EventType()]; ok {
for handler := range handlers {
handler.Handle(event)
}
}
}
下面就是PaymentCreatedEventHandler的程式碼。它的邏輯比較簡單,就是從Event里獲得需要的支付資訊,然後調用相應的用例來完成UpdatePayment()功能。
type PaymentCreatedEventHandler struct {
Mouc usecase.ModifyOrderUseCaseInterface
}
func(pc PaymentCreatedEventHandler) Handle (message ycq.EventMessage) {
switch event := message.Event().(type) {
case *PaymentCreatedEvent:
status := model.ORDER_STATUS_PAID
err := pc.Mouc.UpdatePayment(event.OrderNumber, event.Id,status)
if err != nil {
logger.Log.Errorf("error in PaymentCreatedEventHandler:", err)
}
default:
logger.Log.Errorf("event type mismatch in PaymentCreatedEventHandler:")
}
}
我在這裡用到了一個第三方庫“jetbasrawi/go.cqrs”來處理Eventbus。Jetbasrawi是一個事件溯源(Event Sourcing)的庫。事件溯源與事件驅動很容易搞混,它們看起來有點像,但實際上是完全不同的兩個東西。事件驅動是微服務之間的一種調用方式,存在於微服務之間,與RPC的調用方式相對應;而事件溯源是一種編程模式,你可以在微服務內部使用它或不使用它。但我一時找不到事件驅動的庫,就先找一個事件溯源的庫來用。其實自己寫一個也很簡單,但我不覺得能寫的比jetbasrawi更好,那就還是先用它把。不過事件溯源要比事件驅動複雜,因此用Jetbasrawi可能有點大材小用了。
外部事件:
外部事件的不同之處是它要在微服務之間進行傳送,因此需要消息中間件。我定義了一個通用介面,這樣可以支援不同的消息中間件。它的最重要的兩個函數是publish()和Subscribe()。
package gmessaging
type MessagingInterface interface {
Publish(subject string, i interface{}) error
Subscribe(subject string, cb interface{} ) (interface{}, error)
Flush() error
// Close will close the decorated connection (For example, it could be the coded connection)
Close()
// CloseConnection will close the connection to the messaging server. If the connection is not decorated, then it is
// the same with Close(), otherwise, it is different
CloseConnection()
}
由於定義了通用介面,它可以支援多種消息中間件,我這裡選的是“NATS”消息中間件。當初選它是因為它是雲原生計算基金會(“CNCF”)的項目,而且功能強大,速度也快。如果想了解雲原生概念,請參見“雲原生的不同解釋及正確含義”
下面的程式碼就是NATS的實現,如果你想換用別的消息中間件,可以參考下面的程式碼。
type Nat struct {
Ec *nats.EncodedConn
}
func (n Nat) Publish(subject string, i interface{}) error {
return n.Ec.Publish(subject,i)
}
func (n Nat) Subscribe(subject string, i interface{} ) (interface{}, error) {
h := i.(nats.Handler)
subscription, err :=n.Ec.Subscribe(subject, h)
return subscription, err
}
func (n Nat) Flush() error {
return n.Ec.Flush()
}
func (n Nat) Close() {
n.Ec.Close()
}
func (n Nat) CloseConnection() {
n.Ec.Conn.Close()
}
「Publish(subject string, i interface{})」有兩個參數,「subject」是消息中間件的隊列(Queue)或者是主題(Topic)。第二個參數是要發送的資訊,它一般是JSON格式。使用消息中間件時需要一個鏈接(Connection),這裡用的是「*nats.EncodedConn」, 它是一個封裝之後的鏈接,它裡面含有一個JSON解碼器,可以支援在結構(struct)和JSON之間進行轉換。當你調用發布函數時,發送的是結構(struct),解碼器自動把它轉換成JSON文本再發送出去。「Subscribe(subject string, i interface{} )」也有兩個參數,第一個與Publish()的一樣,第二個是事件驅動器。當接收到JSON文本後,解碼器自動把它轉換成結構(struct),然後調用事件處理器。
我把與消息中間件有關的程式碼寫成了一個單獨的第三方庫,這樣不論你是否使用本框架都可以使用這個庫。詳細資訊參見“jfeng45/gmessaging”
命令
命令(Command)在程式碼實現上和事件(Event)非常相似,但他們在概念上完全不同。例如支付申請(Make Payment)是命令,是你主動要求第三方(支付服務)去做一件事情,而且你知道這個第三方是誰。支付完成(Payment Created)是事件,是你在彙報一件事情已經做完,而其他第三方程式可能會根據它的結果來決定是否要做下一步的動作,例如訂單服務當收到支付完成這個事件時,就可以更改自己的訂單狀態為「已支付」。這裡,事件的發送方並不知道誰會對這條消息感興趣,因此這個發送是廣播式發送。而且這個動作(支付)已經完成,而命令是尚未完成的動作,因此接收方可以選擇拒絕執行一條命令。我們平常經常講的事件驅動是松耦合,而RPC是緊耦合,這裡指的是事件方式,而不是命令方式。採用命令方式時,由於你已經知道了要發給誰,因此是緊耦合的。
在實際應用中,我們所看到的大部分的命令都是在一個微服務內部使用,很少有在微服務之間發送命令的,微服務之間傳遞的主要是事件。但由於事件和命令很容易混淆,有不少在微服務之間傳遞的「事件」實際上是「命令」。因此並不是使用事件驅動方式就能把程式變成松耦合的,而要進一步檢查你是否將「命令」錯用成了「事件」。在本程式中會嚴格區分它們。
下面就是命令匯流排(Dispatcher)的介面,除了函數名字不一樣外,其他與事件匯流排幾乎一模一樣。
type Dispatcher interface {
Dispatch(CommandMessage) error
RegisterHandler(CommandHandler, ...interface{}) error
}
我們完全可以把它定義成下面的樣子,是不是就與事件匯流排很像了?下面的介面和上面的是等值的。
type CommandBus interface {
PublishCommand(CommandMessage) error
AddHandler(CommandHandler, ...interface{}) error
}
事件和命令的其他方面,例如定義方式,處理流程,實現方式,傳送方式也幾乎一模一樣。詳細的我就不講了,你可以自己看程式碼進行比較。那我們可不可以只用一個事件匯流排同時處理時間和命令呢?理論上來講是沒有問題的。我開始的時候也是這麼想的,但由於現在的介面(“jetbasrawi/go.cqrs”)不支援,如果要改的話需要重新定義介面,因此就暫時放棄了。另外,他們兩個在概念上還是很不同的,所以在實現上定義不同的介面也是有必要的。
事件和命令設計
下面來講解在設計事件驅動時應注意的問題。
結構設計
事件驅動模式與RPC相比增加的部分是事件和命令。因此首先要考慮的是要對RPC的程式結構做哪些擴充和怎樣擴充。「Event」和「command」從本質上來講是業務邏輯的一部分,因此應屬於領域層。因此在程式結構上也增加了兩個目錄「Event」和「command」分別用來存放事件和命令。結構如下圖所示。
發送和接收的不同處理方式
現在的程式碼在處理外部事件時,在發送端和接收端的方式是不一樣的。
下面就是發送端的程式碼(程式碼在支付服務項目里),整個程式碼功能是創建支付,完成之後再發布「支付完成」消息。它直接通過消息中間件介面把事件發送出去。
type MakePaymentUseCase struct {
PaymentDataInterface dataservice.PaymentDataInterface
Mi gmessaging.MessagingInterface
}
func (mpu *MakePaymentUseCase) MakePayment(payment *model.Payment) (*model.Payment, error) {
payment, err := mpu.PaymentDataInterface.Insert(payment)
if err!= nil {
return nil, errors.Wrap(err, "")
}
pce := event.NewPaymentCreatedEvent(*payment)
err = mpu.Mi.Publish(config.SUBJECT_PAYMENT_CREATED, pce)
if err != nil {
return nil, err
}
return payment, nil
}
下面就是接收端的程式碼例子。是它是先用消息介面接收時間,再把外部事件轉化為內部事件,然後調用事件匯流排的介面在微服務內部發布事件。
eb := value.(ycq.EventBus)
subject := config.SUBJECT_PAYMENT_CREATED
_, err := ms.Subscribe(subject, func(pce event.PaymentCreatedEvent) {
cpm := pce.NewPaymentCreatedDescriptor()
logger.Log.Debug("payload:",pce)
eb.PublishEvent(cpm)
})
為什麼會有這種不同?在接收時,可不可以不生成內部事件,而是直接調用用例來處理外部事件呢?在發送時,如果沒有別的內部事件處理器,那麼直接調用消息中間件來發送是最簡單的方法(這個發送過程是輕量級的,耗時很短)。而接收時可能需要處理比較複雜的業務邏輯。因此你希望把這個過程分成接收和處理兩個部分,讓複雜的業務邏輯在另外一個過程里處理,這樣可以盡量縮短接收時間,提高接收效率。
是否需每個事件都要單獨的事件處理器?
現在的設計是每個事件和事件驅動器都有一個單獨的文件。我見過有些人只用一個文件例如PaymentEvent來存放所有與Payment相關的事件,事件驅動器也是一樣。這兩種辦法都是可行的。現在看來,生成單獨的文件比較清晰,但如果以後事件非常多,也許一個文件存放多個事件會比較容易管理,不過到那時再改也不遲。
事件處理邏輯放在哪?
對於一個好的設計來講,所有的業務邏輯都應該集中在一起,這樣便於管理。在現在的架構里,業務邏輯是放在用例(Use Case)里的,但事件處理器里也需要有業務邏輯,應該怎麼辦?支付事件處理器的主要功能是修改訂單中的付款資訊,這部分的業務邏輯已經體現在修改訂單(Modify Order)用例里,因此支付事件處理器只要調用修改訂單的MakePayment()函數就可以了。實際上所有的事件處理器都應該這樣設計,它們本身不應包含業務邏輯,而只是一個簡單的封裝,去調用用例里的業務邏輯。那麼可不可以直接在用例里定義Handle()函數,這樣用例就變成了事件處理器?這樣的設計確實可行,但我覺得把事件處理器做成一個單獨的文件,這樣邏輯上更清晰。因為修改訂單付款功能你是一定要有的,但事件處理器只有在事件驅動模式下才有,它們是屬於兩個不同層面的東西,只有分開放置才層次清晰。
源程式:
完整的源程式鏈接:
索引:
3 “CNCF”
5 “NATS”