.NET 開源工作流: Slickflow流程引擎高級開發(七)–消息隊列(RabbitMQ)的集成使用

前言:工作流流程過程中,除了正常的人工審批類型的節點外,事件類型的節點處理也尤為重要。比如比較常見的事件類型的節點有:Timer/Message/Signal等。本文重點闡述消息類型的節點處理,以及實現消息驅動流程過程中對消息隊列(RabbitMQ)的集成使用方式。

1. 節點間消息傳遞

1.1 MessageThrow

    消息拋出節點,當執行到這個節點時,特定消息主題的消息記錄將會添加到消息隊列,然後等待被訂閱的消息消費者來激活消息處理服務。

1.2 MessageCatch

    消息接收節點,消息接收節點上定義了消息主題,並且在消息隊列中訂閱了該主題。當有特定主題的消息被發布時候,消息消費者會捕獲到消息主題,同時消息處理服務中,將會定位到流程的該節點位置,然後通過流程服務來判定下一步的流轉流轉。

1.3 單一流程內的節點消息傳遞

    該流程只進行消息的發布或者接收,而消息的另外一方則可能是業務系統。兩者之間的關聯是靠消息主題來識別。如下圖所示:

 

 

1.4 跨流程間的節點消息傳遞

    通常可以用泳道流程來表示跨流程間的消息通知。泳道流程中,有多個流程,其中一個是泳道主流程,其它的則為泳道附屬流程。將會在下面的章節中具體描述。

2. 跨流程消息傳遞

2.1 泳道流程

    泳道流程中,使用多個泳道來區分不同的流程,流程之間可以通過消息來傳遞資訊,如下圖所示:

 

2.2 消息節點類型

    消息節點在流程中的位置主要有:開始節點(Start)、中間節點(Internmediate)和結束節點(End)

3. 消息隊列(RabbitMQ)集成使用

3.1 消息隊列介紹

     RabbitMQ是實現了高級消息隊列協議(AMQP)的開源消息代理軟體(亦稱面向消息的中間件)。RabbitMQ伺服器是用Erlang語言編寫的,而群集和故障轉移是構建在開放電信平台框架上的。所有主要的程式語言均有與代理介面通訊的客戶端庫。

    來源://baike.baidu.com/item/rabbitmq/9372144?fr=aladdin

3.2 消息隊列集成引擎

    Slickflow引擎集成RabbitMQ消息隊列,其特點是:已經經過大量用戶案例檢驗,證明性能能夠滿足日常業務需求,同時文檔豐富,便於開發人員學習上手。

4. 泳道流程消息隊列集成案例

    下圖是泳道流程的具體示例,包含了一個主流程,和一個泳道附屬流程,兩個流程間的觸發處理,是靠消息來傳遞。

4.1 流程協作說明

    1) 泳道附屬流程的啟動

      泳道附屬流程的開始節點(StartCatch)是一個消息接收節點,其接收內容來自主流程的中間消息節點(IntermediateThrow)。當訂單主流程的「同步訂單」節點完成後,將會發布消息,此時,生產附屬流程因為訂閱了該消息主題,所以根據開始節點的類型為消息觸發類型,生產附屬流程將會被啟動,生成新的流程實例。

    2) 中間消息節點接收消息

    流程運行到到中間節點位置後,需要等待特殊主體的消息記錄後,然後才能繼續流轉。常見的場景:比如電子商務系統接入第三方支付系統時,當商城客戶下單支付後,從第三方回傳支付成功的消息後,當前的訂購流程財主繼續向下流轉。這種消息等待接收的處理,就可以使用消息隊列來完成。

    在泳道流程中,當”生產計劃”流程完成後,將會發布消息給主流程,泳道主流程接收到消息後,可以繼續處理當前的節點,並且向下進行流程流轉到”客戶回饋”節點。

4.2 消息驅動過程說明

    1) 消息發布程式碼示例

        /// <summary>
        /// 消息發布
        /// </summary>
        /// <param name="topic">主題</param>
        /// <param name="line">內容</param>
        public void Publish(string topic, string line)
        {
            var channel = MQClientFactory.CreatePublishChannel();
            channel.QueueDeclare(queue: topic,
                                     durable: false,
                                     exclusive: false,
                                     autoDelete: false,
                                     arguments: null);

            var body = Encoding.UTF8.GetBytes(line);
            channel.BasicPublish(exchange: "",
                                 routingKey: topic,
                                 basicProperties: null,
                                 body: body);
        }

  

    2) 消息訂閱程式碼示例

        /// <summary>
        /// 消息訂閱
        /// </summary>
        /// <param name="topic">主題</param>
        /// <returns></returns>
        public void Subscribe(string topic)
        {
            var channel = MQClientFactory.CreateRecieveChannel();
            channel.QueueDeclare(queue: topic,
                                     durable: false,
                                     exclusive: false,
                                     autoDelete: false,
                                     arguments: null);

            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body;
                var line = Encoding.UTF8.GetString(body);
                var msgMediator = new MessageMediator();
                msgMediator.InvokeFromMessage(ConsumeMessageFunction, topic, line);
            };
            channel.BasicConsume(queue: topic,
                                 autoAck: true,
                                 consumer: consumer);
        }

5. 消息隊列數據監控

    RabbitMQ帶有後檯面板監控,可以看到消息隊列中的數據情況,可以幫助開發人員調試消息處理程式的正常工作。

 

 

6. 總結

   流程引擎集成消息隊列RabbitMQ的方式,可以保障業務系統跟流程關聯繫統集成關聯的可靠性。存在消息隊列中的消息主題,可以被消息訂閱方在後期處理,使得系統的部署更加靈活,降低了系統之間的耦合性。其次,跨流程直接的消息通訊比較常見,通過集成消息隊列來存儲消息和分發消息,可以使得業務系統處理的能力加強。從單一流程的流轉過度到多流程直接的集體協作模式。