業務可視化-讓你的流程圖”Run”起來(3.分支選擇&跨語言分散式運行節點)

  • 2022 年 7 月 25 日
  • 筆記

前言

首先,感謝大家對上一篇文章[業務可視化-讓你的流程圖”Run”起來(2.問題與改進)]的支援。

分享一下近期我對這個項目的一些改進。

1. 增加了分支選擇工程,可以根據節點的運行結果決定執行哪一個節點。

2. 增加了分散式運行節點功能,可以將流程節點部署到任意伺服器,通過隊列來調度節點,也就是說節點的運行將不在局限於Java語言。

1. 如何讓流程圖「Run」起來

首先我們回顧一下前兩篇文章的知識,如何讓流程圖「Run」起來:

工程目錄[ html/network.html ]里,提供了一個圖形化介面的流程設計器,可以通過拖拽的方式設計流程並生成Json文件。

反之,也可以將Json文件轉化為流程圖並進行編輯。

 

1.1 創建流程

創建流程過程如下圖所示:

節點和邊的ID自動生成,可以自己定義節點名和每個邊對應的節點返回值。

點擊[ update json ]後,即可生成/更新流程圖對應的Json文件。

 

1.2 節點與被執行的Java方法綁定

我們需要寫一個Java類,繼承自FlowRunner。

然後在裡面寫每個節點對應的方法,用@Node注釋來實現與流程中節點的綁定。

同時將1.1中生成的Json文件放到和Java類相同的目錄下。

TestFlow1.java

public class TestFlow1 extends FlowRunner {

	@Node(label = "a")
	public int process_a() {
		System.out.println("processing a");
		return 1;
	}

	@Node(label = "b")
	public void process_b() {
		System.out.println("processing b");
	}

	@Node(label = "c")
	public void process_c() {
		System.out.println("processing c");
	}

	@Node(label = "d")
	public void process_d() {
		System.out.println("processing d");
	}
}

TestFlow1.json

{
	"flowId": "your flow id",
	"nodes": [
		{
			"id": "e21eb7b6-2f23-4264-a50f-e42321dd295b",
			"label": "a",
			"readyCheck": 0
		},
		{
			"id": "f2a76819-b6a8-49db-af25-fab8274550f3",
			"label": "b",
			"readyCheck": 0
		},
		{
			"id": "73f8bd68-8454-4b02-9098-c0c7bb6ffdb2",
			"label": "c",
			"readyCheck": 0
		},
		{
			"id": "3553d1f7-e4c3-4e4b-a9ef-80b94ebbb8af",
			"label": "d",
			"readyCheck": 1
		}
	],
	"edges": [
		{
			"id": "36bdc526-f6ae-45de-9bb7-34c293b34006",
			"from": "e21eb7b6-2f23-4264-a50f-e42321dd295b",
			"to": "f2a76819-b6a8-49db-af25-fab8274550f3",
			"condition": "1",
			"arrows": "to"
		},
		{
			"id": "652b871d-338d-45f5-91a9-3a488ed9b6f4",
			"from": "e21eb7b6-2f23-4264-a50f-e42321dd295b",
			"to": "73f8bd68-8454-4b02-9098-c0c7bb6ffdb2",
			"condition": "2",
			"arrows": "to"
		},
		{
			"id": "2691b6fe-ede9-4d1c-8b49-82d2a4ef014a",
			"from": "f2a76819-b6a8-49db-af25-fab8274550f3",
			"to": "3553d1f7-e4c3-4e4b-a9ef-80b94ebbb8af",
			"arrows": "to"
		},
		{
			"id": "d8026555-7609-4d27-8689-fd3dbcfe11d7",
			"from": "73f8bd68-8454-4b02-9098-c0c7bb6ffdb2",
			"to": "3553d1f7-e4c3-4e4b-a9ef-80b94ebbb8af",
			"arrows": "to"
		}
	]
}

Test1.java

public class Test1 {

	public static void main(String[] args) {
		
		TestFlow1 testFlow = new TestFlow1();
		testFlow.startFlow(true);
		
	}
}

 

1.3 啟動流程

調用1.2中寫好的Java類的startFlow方法,即可啟動流程。

同步啟動

TestFlow1 testFlow = new TestFlow1();
testFlow.startFlow(true);

非同步啟動

TestFlow1 testFlow = new TestFlow1();
testFlow.startFlow(false);

1.4 關閉流程執行器

流程執行器會在第一個流程啟動的時候自動啟動,在整個系統關閉的時候,我們需要將流程執行器關閉,如下。

FlowStarter.shutdown();

1.5 流程執行結果確認

在流程執行完畢後,日誌會輸出執行結果的json文件,我們可以將這個文件粘貼到1.1介紹的工具里,生成圖形化的執行結果來確認節點的運行狀況。

運行成功日誌

Ready queue thread started.
Complete queue thread started.
json:
{"flowId":"123","nodes":[{"id":"1","label":"a"},{"id":"2","label":"b"},{"id":"0b5ba9df-b6c7-4752-94e2-debb6104015c","label":"c"},{"id":"29bc32c7-acd8-4893-9410-e9895da38b2e","label":"d"}],"edges":[{"id":"1","from":"1","to":"2","arrows":"to"},{"id":"078ffa82-5eff-4d33-974b-53890f2c9a18","from":"1","to":"0b5ba9df-b6c7-4752-94e2-debb6104015c","arrows":"to"},{"id":"90663193-7077-4aca-9011-55bc8745403f","from":"2","to":"29bc32c7-acd8-4893-9410-e9895da38b2e","arrows":"to"},{"id":"a6882e25-c07a-4abd-907e-e269d4eda0ec","from":"0b5ba9df-b6c7-4752-94e2-debb6104015c","to":"29bc32c7-acd8-4893-9410-e9895da38b2e","arrows":"to"}]}
execute:1
node name:a
processing a
execute:2
node name:b
processing b
execute:0b5ba9df-b6c7-4752-94e2-debb6104015c
node name:c
processing c
execute:29bc32c7-acd8-4893-9410-e9895da38b2e
node name:d
processing d
Complete success.
json:
{"nodes":[{"id": "1","label": "a" ,"color": "#36AE7C"},{"id": "2","label": "b" ,"color": "#36AE7C"},{"id": "0b5ba9df-b6c7-4752-94e2-debb6104015c","label": "c" ,"color": "#36AE7C"},{"id": "29bc32c7-acd8-4893-9410-e9895da38b2e","label": "d" ,"color": "#36AE7C"}],"edges":[{"id": "1","from": "1","to": "2","arrows": "to"},{"id": "078ffa82-5eff-4d33-974b-53890f2c9a18","from": "1","to": "0b5ba9df-b6c7-4752-94e2-debb6104015c","arrows": "to"},{"id": "90663193-7077-4aca-9011-55bc8745403f","from": "2","to": "29bc32c7-acd8-4893-9410-e9895da38b2e","arrows": "to"},{"id": "a6882e25-c07a-4abd-907e-e269d4eda0ec","from": "0b5ba9df-b6c7-4752-94e2-debb6104015c","to": "29bc32c7-acd8-4893-9410-e9895da38b2e","arrows": "to"}]}

流程執行結束後,會輸出執行結果和運行後的流程圖狀態。
可以直接將json貼到下面的位置,查看看結果(綠色表示正常結束,紅色表示異常結束,白色表示等待執行)。

運行失敗日誌

Ready queue thread started.
Complete queue thread started.
json:
{"flowId":"123","nodes":[{"id":"1","label":"a"},{"id":"2","label":"b"},{"id":"0b5ba9df-b6c7-4752-94e2-debb6104015c","label":"c"},{"id":"29bc32c7-acd8-4893-9410-e9895da38b2e","label":"d"}],"edges":[{"id":"1","from":"1","to":"2","arrows":"to"},{"id":"078ffa82-5eff-4d33-974b-53890f2c9a18","from":"1","to":"0b5ba9df-b6c7-4752-94e2-debb6104015c","arrows":"to"},{"id":"90663193-7077-4aca-9011-55bc8745403f","from":"2","to":"29bc32c7-acd8-4893-9410-e9895da38b2e","arrows":"to"},{"id":"a6882e25-c07a-4abd-907e-e269d4eda0ec","from":"0b5ba9df-b6c7-4752-94e2-debb6104015c","to":"29bc32c7-acd8-4893-9410-e9895da38b2e","arrows":"to"}]}
execute:1
node name:a
processing a
execute:2
node name:b
processing b
execute:0b5ba9df-b6c7-4752-94e2-debb6104015c
node name:c
processing c
java.lang.reflect.InvocationTargetException
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at io.github.nobuglady.network.fw.FlowRunner.execute(FlowRunner.java:49)
	at io.github.nobuglady.network.fw.executor.NodeRunner.run(NodeRunner.java:93)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.RuntimeException: test
	at io.github.nobuglady.network.MyFlow1.process_b(MyFlow1.java:16)
	... 11 more
java.lang.reflect.InvocationTargetException
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at io.github.nobuglady.network.fw.FlowRunner.execute(FlowRunner.java:49)
	at io.github.nobuglady.network.fw.executor.NodeRunner.run(NodeRunner.java:93)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.RuntimeException: test
	at io.github.nobuglady.network.MyFlow1.process_b(MyFlow1.java:16)
	... 11 more
Complete error.
json:
{"nodes":[{"id": "1","label": "a" ,"color": "#36AE7C"},{"id": "2","label": "b" ,"color": "#EB5353"},{"id": "0b5ba9df-b6c7-4752-94e2-debb6104015c","label": "c" ,"color": "#36AE7C"},{"id": "29bc32c7-acd8-4893-9410-e9895da38b2e","label": "d" ,"color": "#E8F9FD"}],"edges":[{"id": "1","from": "1","to": "2","arrows": "to"},{"id": "078ffa82-5eff-4d33-974b-53890f2c9a18","from": "1","to": "0b5ba9df-b6c7-4752-94e2-debb6104015c","arrows": "to"},{"id": "90663193-7077-4aca-9011-55bc8745403f","from": "2","to": "29bc32c7-acd8-4893-9410-e9895da38b2e","arrows": "to"},{"id": "a6882e25-c07a-4abd-907e-e269d4eda0ec","from": "0b5ba9df-b6c7-4752-94e2-debb6104015c","to": "29bc32c7-acd8-4893-9410-e9895da38b2e","arrows": "to"}]}

流程執行結束後,會輸出執行結果和運行後的流程圖狀態。
可以直接將json貼到下面的位置,查看看結果(綠色表示正常結束,紅色表示異常結束,白色表示等待執行)。

2. 分支選擇的改進

2.1 分支選擇定義

我們可以在圖形介面中定義每條邊的值,流程運行時,對節點的返回值與後續邊的值進行比對,比對結果一直則執行該條邊對應的後續節點。

2.2 節點啟動條件定義

對於多條邊Join到一個節點的情況,我們需要定義該節點啟動的條件,如下:

1. 指向該節點的任意一條邊通過檢查,則啟動該節點

2. 指想該節點的所有邊都通過檢車後,啟動該節點

上圖表示節點b和節點c 任意一個節點完成後,執行節點d

 

2.3 節點返回值綁定

節點的返回值與Java的方法返回值自動綁定,流程執行後,

對於有返回值的方法,則會調用該返回值的toString方法作為該節點的返回值。

對於無返回值的方法,則默認空文字列為返回值。

比如,返回int值,則用返回的int值與後續邊的條件做對比。

@Node(label = "a")
	public int process_a() {
		System.out.println("processing a");
		return 1;
	}

返回String值,則用返回的String值與後續邊的條件做對比。

@Node(label = "a")
	public String process_a() {
		System.out.println("processing a");
		return "1";
	}

返回自定義Object等,則用返回的Ojbect值的toString()方法生成的字元串與後續邊的條件做對比。

	@Node(label = "a")
	public MyObj process_a() {
		System.out.println("processing a");
		return new MyObj();
	}

2.4 節點間參數傳遞

目前還沒有對節點間參數傳遞做特別的處理,

可以通過類變數等方式進行節點間參數的傳遞。

3. 分散式運行的改進

3.1 系統結構

把單體的工程改進成分散式的工程,首先要明確系統結構和改進點。

目前的系統結構如下圖所示(黃色部分可以配置成分散式運行)

系統通過兩個隊列來進行節點間控制資訊的流轉。

1. 待啟動隊列

2. 完成隊列

3.1.1 待啟動隊列

生產者:流程管理器(FlowManager),流程啟動後,流程管理器將初始節點放入[待啟動隊列]中,等待消費。

消費者:流程執行器(NodeExecutor),流程執行器監聽[待啟動隊列],得到消息後,根據節點資訊運行該節點,運行完成後,將節點的運行結果放入[完成隊列]中,等待消費。

3.1.2 完成隊列

生產者:流程執行器(NodeExecutor),流程執行器監聽[待啟動隊列],得到消息後,根據節點資訊運行該節點,運行完成後,將節點的運行結果放入[完成隊列]中,等待消費。

消費者:流程管理器(FlowManager),流程管理器監聽[完成隊列],得到消息後,根據完成節點的資訊,更新流程圖,然後將後續待啟動的節點放入[待啟動隊列]中,等待消費。

3.2 分散式系統改進

基於3.1介紹的系統結構,可以明顯的發現隊列是單機系統改進為分散式系統的改進點。

所以,把隊列變成可配置的隊列後,系統將可以通過配置文件選擇單機部署,或者分散式部署。

配置文件如下

node.executor.remote=false
queue.ready.manager=io.github.nobuglady.network.fw.queue.ready.ReadyQueueManager
queue.complete.manager=io.github.nobuglady.network.fw.queue.complete.CompleteQueueManager
node.executor=io.github.nobuglady.network.fw.executor.NodePool

 

node.executor

對節點的執行器的配置,系統默認提供了本地的執行器,可以通過Annotation對節點綁定的方法進行調用。

您可以配置自己的節點執行器,需要實現介面INodeExecutor

  onNodeReady:節點準備運行的時候,會調用這個方法

  這個方法里需要寫節點執行的具體方法,並且在節點執行完畢後,將節點運行結果放入[完成隊列]。

  shutdown:系統關閉的時候,會調用這個方法

node.executor.remote

false:本地執行

true:遠程執行

本地執行時,會調用node.executor中配置的執行器,來執行節點的運行。

遠程執行時,則系統不會啟動 流程執行器(NodeExecutor)。也就是不會消費[待啟動隊列]中的消息。

遠程執行時,目標系統監聽[待啟動隊列]的消息,得到消息後,根據節點資訊運行該節點,運行完成後,將節點的運行結果放入[完成隊列]中,等待消費。

所以[待啟動隊列]和[完成隊列]必須配置成遠程系統可以訪問的隊列。

 

queue.ready.manager

待啟動隊列管理器

需要提供隊列的消費和生產的方法

註:配置成遠程執行節點時,系統不會調用此隊列的消費方法。(由遠程系統消費此隊列資訊)

 

queue.complete.manager

完成隊列管理器

需要提供隊列的消費和生產的方法

註:配置成遠程執行節點時,系統不會調用此隊列的生產方法。(由遠程系統生產此隊列資訊)

 

5. 本地運行和分散式運行配置例

下面介紹以RabbiMQ作為遠程隊列,進行分散式調用的配置,選擇其他的遠程隊列可以酌情修改。

工程里的test1-6分別對應如下6種啟動方式,Test1-6.java為啟動類。

每次啟動之前,需要修改ladybugflow1-6.properties為ladybugflow.properties

1.  默認配置:通過流程類啟動流程

啟動程式碼

TestFlow1 testFlow = new TestFlow1();
testFlow.startFlow(true);
FlowStarter.shutdown();

2. 本地節點:自定義【待啟動隊列】和【完成隊列】

啟動程式碼

TestFlow2 testFlow = new TestFlow2();
testFlow.startFlow(true);
FlowStarter.shutdown();

3. 本地節點:自定義【節點執行器】

啟動程式碼

TestFlow3 testFlow = new TestFlow3();
testFlow.startFlow(true);
FlowStarter.shutdown();

4. 遠程節點:通過流程類啟動流程

啟動程式碼

TestFlow4 testFlow = new TestFlow4();
testFlow.startFlow(true);
FlowStarter.shutdown();

5. 默認配置:通過指定Json文件來啟動流程

啟動程式碼

FlowRunner flowRunner = new FlowRunner(new TestFlow5());
flowRunner.startFlowFromJson("io/github/nobuglady/network/demo/test5/TestFlow5.json", true);
FlowStarter.shutdown();

6. 遠程節點:通過指定Json文件來啟動流程

啟動程式碼

FlowRunner flowRunner = new FlowRunner();
flowRunner.startFlowFromJson("io/github/nobuglady/network/demo/test5/TestFlow5.json", true);
FlowStarter.shutdown();

感謝您看文章讀到這裡。

最後

源碼://github.com/nobuglady/ladybugflow

運行例源碼://github.com/nobuglady/ladybugflow-demo

運行例源碼(遠程節點)://github.com/nobuglady/ladybugflow-demo-remote-app