Dapr-Actor構建塊
前言:
前篇-綁定 文章對Dapr的綁定構建塊進行了解,本篇繼續對 Actor 構建塊進行了解學習。
一、Actor簡介:
Actors 為最低級別的「計算單元」。 換句話說,您將程式碼寫入獨立單元 ( 稱為actor) ,該單元接收消息並一次處理消息,而不進行任何類型的並行或執行緒處理。
當程式碼處理一條消息時,它可以向其他參與者發送一條或多條消息,或者創建新的 Actors。 底層 運行時 將管理每個 actor 的運行方式,時機和位置,並在 Actors 之間傳遞消息。
大量 Actors 可以同時執行,而 Actors 可以相互獨立執行。
Dapr 包含專門實現 virtual actors 模式 的運行時。 通過 Dapr 的實現,您可以根據 Actors 模型編寫 Dapr Actor,而 Dapr 利用底層平台提供的可擴展性和可靠性保證。
應用場景:
- 您的問題空間涉及大量(數千或更多) 的獨立和孤立的小單位和邏輯。
- 您想要處理單執行緒對象,這些對象不需要外部組件的大量交互,例如在一組 Actors 之間查詢狀態。
- 您的 actor 實例不會通過發出I/O操作來阻塞調用方。
生命周期:
Dapr Actors 是虛擬的,意思是他們的生命周期與他們的 in – memory 表現不相關。 因此,它們不需要顯式創建或銷毀。 Dapr Actors 運行時在第一次接收到該 actor ID 的請求時自動激活 actor。 如果 actor 在一段時間內未被使用,那麼 Dapr Actors 運行時將回收記憶體對象。 如果以後需要重新啟動,它還將保持對 actor 的一切原有數據。
調用 actor 方法和 reminders 將重置空閑時間,例如,reminders 觸發將使 actor 保持活動狀態。 不論 actor 是否處於活動狀態或不活動狀態 Actor reminders 都會觸發,對不活動 actor ,那麼會首先激活 actor。 Actor timers 不會重置空閑時間,因此 timer 觸發不會使參與者保持活動狀態。 Timer 僅在 actor 活躍時被觸發。
空閑超時和掃描時間間隔 Dapr 運行時用於查看是否可以對 actor 進行垃圾收集。 當 Dapr 運行時調用 actor 服務以獲取受支援的 actor 類型時,可以傳遞此資訊。
Virtual actors 生命周期抽象會將一些警告作為 virtual actors 模型的結果,而事實上, Dapr Actors 實施有時會偏離此模型。
在第一次將消息發送到其 actor 標識時,將自動激活 actor ( 導致構造 actor 對象) 。 在一段時間後,actor 對象將被垃圾回收。 以後,再次使用 actor ID 訪問,將構造新的 actor。 Actor 的狀態比對象的生命周期更久,因為狀態存儲在 Dapr 運行時的配置狀態提供程式中(也就是說Actor即使不在活躍狀態,仍然可以讀取它的狀態)。
二、工作原理:
Dapr Sidecar 提供了用於調用執行組件的 HTTP/gRPC API。 這是 HTTP API 的URL格式:
//localhost:<daprPort>/v1.0/actors/<actorType>/<actorId>/
<daprPort>
: Dapr 偵聽的 HTTP 埠。<actorType>
:執行組件類型。<actorId>
:要調用的特定參與者的 ID。
a)Actor組件放置服務流程:
- 啟動時,Sidecar 調用執行組件服務以獲取註冊的執行組件類型和執行組件的配置設置。
- Sidecar 將註冊的執行組件類型的列表發送到放置服務。
- 放置服務會將更新的分區資訊廣播到所有執行組件服務實例。 每個實例都將保留分區資訊的快取副本,並使用它來調用執行組件。
b)調用Actor組件方法流程:
- 服務在Sidecar 上調用執行組件 API。 請求正文中的 JSON 有效負載包含要發送到執行組件的數據。
- Sidecar 使用位置服務中的本地快取的分區資訊來確定哪個執行組件服務實例 (分區) 負責託管 ID 為的執行組件
3
。 在此示例中,它是 pod 2 中的服務實例。 調用將轉發到相應的Sidecar 。 - Pod 2 中的Sidecar 實例調用服務實例以調用執行組件。 服務實例激活actor(如果它還沒有激活)並執行actor 方法。
三、Actor timers(定時器) 和 reminders(提醒)
可以使用計時器和提醒來計劃自身的調用。 這兩個概念都支援配置截止時間。 不同之處在於回調註冊的生存期:
- 只要激活執行組件,計時器就會保持活動狀態。 計時器 不會 重置空閑計時器,因此它們不能使執行組件處於活動狀態。
- 提醒長於執行組件激活。 如果停用了某個執行組件,則會重新激活該執行組件。 提醒 將 重置空閑計時器。
1、Timers定時器:
Dapr Actor 運行時確保回調方法被順序調用,而非並發調用。 這意味著,在此回調完成執行之前,不會有其他Actor方法或timer/remider回調被執行。
Timer的下一個周期在回調完成執行後開始計算。 這意味著 timer 在回調執行時停止,並在回調完成時啟動。
Dapr Actor 運行時在回調完成時保存對actor的狀態所作的更改。 如果在保存狀態時發生錯誤,那麼將取消激活該actor對象,並且將激活新實例。
當actor作為垃圾回收(GC)的一部分被停用時,所有 timer 都會停止。 在此之後,將不會再調用 timer 的回調。 此外, Dapr Actors 運行時不會保留有關在失活之前運行的 timer 的任何資訊。 也就是說,重新啟動 actor 後將會激活的 timer 完全取決於註冊時登記的 timer。
a) 創建定時器:
POST/PUT http://localhost:3500/v1.0/actors/<actorType>/<actorId>/timers/<name>
Timer 的 duetime
和回調函數可以在請求主體中指定。 到期時間(due time)表示註冊後 timer 將首次觸發的時間。 period
表示timer在此之後觸發的頻率。 到期時間為0表示立即執行。 負 due times 和負 periods 都是無效。
以下請求體配置了一個 timer, dueTime
9秒, period
3秒。 這意味著它將在9秒後首次觸發,然後每3秒觸發一次。
{ "dueTime":"0h0m9s0ms", "period":"0h0m3s0ms" }
b) 刪除定時器:
DELETE http://localhost:3500/v1.0/actors/<actorType>/<actorId>/timers/<name>
2、Reminders 提醒:
Reminders 是一種在指定時間內觸發 persistent 回調的機制。 它們的功能類似於 timer。 但與 timer 不同,在所有情況下 reminders 都會觸發,直到 actor 顯式取消註冊 reminders 或刪除 actor 。 具體而言, reminders 會在所有 actor 失活和故障時也會觸發觸發,因為Dapr Actors 運行時會將 reminders 資訊持久化到 Dapr Actors 狀態提供者中。
a) 創建Reminders
POST/PUT http://localhost:3500/v1.0/actors/<actorType>/<actorId>/reminders/<name>
Reminders 的 duetime
和回調函數可以在請求主體中指定。 到期時間(due time)表示註冊後 reminders將首次觸發的時間。 period
表示在此之後 reminders 將觸發的頻率。 到期時間為0表示立即執行。 負 due times 和負 periods 都是無效。 若要註冊僅觸發一次的 reminders ,請將 period 設置為空字元串。
以下請求體配置了一個 reminders, dueTime
9秒, period
3秒。 這意味著它將在9秒後首次觸發,然後每3秒觸發一次。
{ "dueTime":"0h0m9s0ms", "period":"0h0m3s0ms" }
b) 獲取Reminders
GET http://localhost:3500/v1.0/actors/<actorType>/<actorId>/reminders/<name>
c) 刪除Reminders
DELETE http://localhost:3500/v1.0/actors/<actorType>/<actorId>/reminders/<name>
四、數據持久化:
使用 Dapr 狀態管理構建塊保存執行組件狀態。由於執行組件可以一輪執行多個狀態操作,因此狀態存儲組件必須支援多項事務。
當前狀態管理組件支援事務/Actors支援情況:
Name | CRUD | 事務 | ETag | Actors | 狀態 | 組件版本 | 自從 |
---|---|---|---|---|---|---|---|
Aerospike | ✅ | ❌ | ✅ | ❌ | Alpha | v1 | 1.0 |
Apache Cassandra | ✅ | ❌ | ❌ | ❌ | Alpha | v1 | 1.0 |
Cloudstate | ✅ | ❌ | ✅ | ❌ | Alpha | v1 | 1.0 |
Couchbase | ✅ | ❌ | ✅ | ❌ | Alpha | v1 | 1.0 |
Hashicorp Consul | ✅ | ❌ | ❌ | ❌ | Alpha | v1 | 1.0 |
Hazelcast | ✅ | ❌ | ❌ | ❌ | Alpha | v1 | 1.0 |
Memcached | ✅ | ❌ | ❌ | ❌ | Alpha | v1 | 1.0 |
MongoDB | ✅ | ✅ | ✅ | ✅ | GA | v1 | 1.0 |
MySQL | ✅ | ✅ | ✅ | ✅ | Alpha | v1 | 1.0 |
PostgreSQL | ✅ | ✅ | ✅ | ✅ | Alpha | v1 | 1.0 |
Redis | ✅ | ✅ | ✅ | ✅ | GA | v1 | 1.0 |
RethinkDB | ✅ | ✅ | ✅ | ✅ | Alpha | v1 | 1.0 |
Zookeeper | ✅ | ❌ | ✅ | ❌ | Alpha | v1 | 1.0 |
需要支援Actor狀態存儲需添加以下內容:
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: statestore
spec:
type: state.redis
version: v1
metadata:
- name: redisHost
value: localhost:6379
- name: redisPassword
value: ""
- name: actorStateStore
value: "true"
五、.NET Core 實例:
1、添加nuget包引用:Dapr.Actors和Dapr.Actors.AspNetCore。
2、定義IOrderStatusActor介面,需要繼承自IActor
public interface IOrderStatusActor : IActor { Task<string> Paid(string orderId); Task<string> GetStatus(string orderId); }
執行組件方法的返回類型必須為 Task
或 Task<T>
。 此外,執行組件方法最多只能有一個參數。 返回類型和參數都必須可 System.Text.Json
序列化。
3、定義OrderStatusActor實現IOrderStatusActor,並繼承自Actor
public class OrderStatusActor : Actor, IOrderStatusActor { public OrderStatusActor(ActorHost host) : base(host) { } public async Task<string> Paid(string orderId) { // change order status to paid await StateManager.AddOrUpdateStateAsync(orderId, "init", (key, currentStatus) => "paid"); return orderId; } public async Task<string> GetStatus(string orderId) { return await StateManager.GetStateAsync<string>(orderId); } }
4、修改Statup.cs文件
public void ConfigureServices(IServiceCollection services) { services.AddControllers(); //註冊Actor services.AddActors(option => { option.Actors.RegisterActor<OrderStatusActor>(); }); } public void Configure(IApplicationBuilder app, IWebHostEnvironment env) { if (env.IsDevelopment()) { app.UseDeveloperExceptionPage(); } app.UseRouting(); app.UseAuthorization(); app.UseEndpoints(endpoints => { endpoints.MapControllers(); // endpoints.MapActorsHandlers(); }); }
5、添加ActorController操作Actor
[Route("api/[controller]")] [ApiController] public class ActorController : ControllerBase { private readonly IActorProxyFactory _actorProxyFactory; public ActorController(IActorProxyFactory actorProxyFactory) { _actorProxyFactory = actorProxyFactory; } /// <summary> /// 方式一:ActorProxy.Create方式 /// </summary> /// <param name="orderId"></param> /// <returns></returns> [HttpGet("paid/{orderId}")] public async Task<ActionResult> PaidAsync(string orderId) { var actorId = new ActorId("myid-" + orderId); var proxy = ActorProxy.Create<IOrderStatusActor>(actorId, "OrderStatusActor"); var result = await proxy.Paid(orderId); return Ok(result); } /// <summary> /// 方式二:依賴注入方式 /// </summary> /// <param name="orderId"></param> /// <returns></returns> [HttpGet("get/{orderId}")] public async Task<ActionResult> GetAsync(string orderId) { var proxy = _actorProxyFactory.CreateActorProxy<IOrderStatusActor>( new ActorId("myid-" + orderId), "OrderStatusActor"); return Ok(await proxy.GetStatus(orderId)); } }
6、Timer應用:使用Actor基類的 RegisterTimerAsync
方法註冊計時器:在OrderStatusActor類中新增方法
#region Timer操作 /// <summary> /// 啟動Timer定時器 /// </summary> /// <param name="name">定時器名稱</param> /// <param name="text">定時器參數</param> /// <returns></returns> public Task StartTimerAsync(string name, string text) { //註冊立即執行的間隔3s執行的定時器 return RegisterTimerAsync( name, nameof(TimerCallbackAsync), Encoding.UTF8.GetBytes(text), TimeSpan.Zero, TimeSpan.FromSeconds(3)); } /// <summary> /// 定時器回調 /// </summary> /// <param name="state"></param> /// <returns></returns> public Task TimerCallbackAsync(byte[] state) { var text = Encoding.UTF8.GetString(state); Console.WriteLine($"Timer fired: {text}"); return Task.CompletedTask; } /// <summary> /// 停止定時器 /// </summary> /// <param name="name"></param> /// <returns></returns> public Task StopTimerAsync(string name) { //停止計時器 UnregisterTimerAsync return UnregisterTimerAsync(name); } #endregion
7、Reminder操作:使用Actor基類的 RegisterReminderAsync 方法計劃計時器。在OrderStatusActor類中新增方法
#region Reminder 操作 public Task SetReminderAsync(string text) { return RegisterReminderAsync( "test-reminder", Encoding.UTF8.GetBytes(text), TimeSpan.Zero, TimeSpan.FromSeconds(1)); } /// <summary> /// Reminder觸發處理(實現IRemindable介面處理觸發) /// </summary> /// <param name="reminderName"></param> /// <param name="state"></param> /// <param name="dueTime"></param> /// <param name="period"></param> /// <returns></returns> public Task ReceiveReminderAsync(string reminderName, byte[] state, TimeSpan dueTime, TimeSpan period) { if (reminderName == "test-reminder") { var text = Encoding.UTF8.GetString(state); Console.WriteLine($"reminder fired: {text}"); } return Task.CompletedTask; } #endregion
8、啟動定時器:
public class OrderStatusActor : Actor, IOrderStatusActor, IRemindable { public OrderStatusActor(ActorHost host) : base(host) { //註冊Timer StartTimerAsync("test-timer", "this is a test timer").ConfigureAwait(false).GetAwaiter().GetResult(); //設置Reminder SetReminderAsync("this is a test reminder").ConfigureAwait(false).GetAwaiter().GetResult(); } //其他處理邏輯 }
總結:
Dapr 執行組件構建基塊可以更輕鬆地編寫正確的並發系統。 執行組件是狀態和邏輯的小單元。 它們使用基於輪次的訪問模型,無需使用鎖定機制編寫執行緒安全程式碼。 執行組件是隱式創建的,在未執行任何操作時以無提示方式從記憶體中卸載。 重新激活執行組件時,自動持久保存並載入執行組件中存儲的任何狀態。 執行組件模型實現通常是為特定語言或平台創建的。 但是,藉助 Dapr 執行組件構建基塊,可以從任何語言或平台利用執行組件模型。
Actor組件支援計時器和提醒來計劃將來的工作。 計時器不會重置空閑計時器,並且允許執行組件在未執行其他操作時停用。 提醒會重置空閑計時器,並且也會自動保留。 計時器和提醒都遵守基於輪次的訪問模型,確保在處理計時器/提醒事件時無法執行任何其他操作。
使用 Dapr 狀態管理構建基塊 持久保存執行組件狀態。 支援多項事務的任何狀態存儲都可用於存儲執行組件狀態。