akka-typed(4) – EventSourcedBehavior in action

  前面提到過,akka-typed中較重要的改變是加入了EventSourcedBehavior。也就是說增加了一種專門負責EventSource模式的actor, 最終和其它種類的actor一道可以完美實現CQRS。新的actor,我還是把它稱為persistentActor,還是一種能維護和維持運行狀態的actor。即,actor內部狀態可以存放在數據庫里,然後通過一組功能函數來提供對狀態的處理轉變,即持續化處理persistence。當然作為一種具備EventSourcedBehavior的actor, 普遍應有的actor屬性、方法、消息處理協議、監管什麼的都還必須存在。在這篇討論里我們就通過案例和源碼來說明一下EventSourcedBehavior是如何維護內部狀態及作為一種actor又應該怎麼去使用它。

我們把上一篇討論里購物車的例子拿來用,再增加一些消息回復response機制,主要是彙報購物車狀態:

object ItemInfo {
  case class Item(name: String, price: Double)
}

object MyCart {
 import ItemInfo._

  sealed trait Command 
  sealed trait Event extends CborSerializable
  sealed trait Response 

  //commands
  case class AddItem(item: Item) extends Command
  case object PayCart extends Command
  case class CountItems(replyTo: ActorRef[Response]) extends Command

  //event
  case class ItemAdded(item: Item) extends Event
  case object CartPaid extends Event

  //state
  case class CartLoad(load: List[Item] = Nil)

  //response
  case class PickedItems(items: List[Item]) extends Response
  case object CartEmpty extends Response

  val commandHandler: (CartLoad, Command) => Effect[Event,CartLoad] = { (state, cmd) =>
    cmd match {
      case AddItem(item) =>
        Effect.persist(ItemAdded(item))
      case PayCart =>
        Effect.persist(CartPaid)
      case CountItems(replyTo) =>
        Effect.none.thenRun { cart =>
          cart.load match {
            case Nil =>
              replyTo ! CartEmpty
            case listOfItems =>
              replyTo ! PickedItems(listOfItems)
          }
        }
    }
  }

  val eventHandler: (CartLoad,Event) => CartLoad = { (state,evt) =>
    evt match {
      case ItemAdded(item) =>
         state.copy(load = item :: state.load)
      case CartPaid =>
        state.copy(load = Nil)
    }
  }

  def apply(): Behavior[Command] = EventSourcedBehavior[Command,Event,CartLoad](
    persistenceId = PersistenceId("10","1013"),
    emptyState = CartLoad(),
    commandHandler = commandHandler,
    eventHandler = eventHandler
  )

}

object Shopper {

  import ItemInfo._

  sealed trait Command extends CborSerializable

  case class GetItem(item: Item) extends Command
  case object Settle extends Command
  case object GetCount extends Command

  case class WrappedResponse(res: MyCart.Response) extends Command

  def apply(): Behavior[Command] = Behaviors.setup[Command] { ctx =>
    val shoppingCart = ctx.spawn(MyCart(), "shopping-cart")
    val cartRef: ActorRef[MyCart.Response] = ctx.messageAdapter(WrappedResponse)
    Behaviors.receiveMessage { msg =>
      msg match {
        case GetItem(item) =>
          shoppingCart ! MyCart.AddItem(item)
        case Settle =>
          shoppingCart ! MyCart.PayCart
        case GetCount =>
          shoppingCart ! MyCart.CountItems(cartRef)
        case WrappedResponse(res) => res match {
          case MyCart.PickedItems(items) =>
            ctx.log.info("**************Current Items in Cart: {}*************", items)
          case MyCart.CartEmpty =>
            ctx.log.info("**************shopping cart is empty!***************")
        }
      }
      Behaviors.same
    }
  }

}


object ShoppingCart extends App {
  import ItemInfo._
  val shopper = ActorSystem(Shopper(),"shopper")
  shopper ! Shopper.GetItem(Item("banana",11.20))
  shopper ! Shopper.GetItem(Item("watermelon",4.70))
  shopper ! Shopper.GetCount
  shopper ! Shopper.Settle
  shopper ! Shopper.GetCount
  scala.io.StdIn.readLine()

  shopper.terminate()

}

實際上EventSourcedBehavior里還嵌入了回復機制,完成一項Command處理後必須回復指令方,否則程序無法通過編譯。如下:

private def withdraw(acc: OpenedAccount, cmd: Withdraw): ReplyEffect[Event, Account] = {
  if (acc.canWithdraw(cmd.amount))
    Effect.persist(Withdrawn(cmd.amount)).thenReply(cmd.replyTo)(_ => Confirmed)
  else
    Effect.reply(cmd.replyTo)(Rejected(s"Insufficient balance ${acc.balance} to be able to withdraw ${cmd.amount}"))
}

不過這個回復機制是一種副作用。即,串連在Effect產生之後立即實施。這個動作是在eventHandler之前。在這個時段無法回復最新的狀態。

說到side-effect, 如Effect.persist().thenRun(produceSideEffect): 當成功持續化event後可以安心進行一些其它的操作。例如,當影響庫存數的event被persist後可以馬上從賬上扣減庫存。

在上面這個ShoppingCart例子里我們沒有發現狀態轉換代碼如Behaviors.same。這隻能是EventSourcedBehavior屬於更高層次的Behavior,狀態轉換已經嵌入在eventHandler里了,還記着這個函數的款式吧  (State,Event) => State, 這個State就是狀態了。

Events persist在journal里,如果persist操作中journal出現異常,EventSourcedBehavior自備了安全監管策略,如下:

  def apply(): Behavior[Command] = EventSourcedBehavior[Command,Event,CartLoad](
    persistenceId = PersistenceId("10","1013"),
    emptyState = CartLoad(),
    commandHandler = commandHandler,
    eventHandler = eventHandler
  ).onPersistFailure(
    SupervisorStrategy
    .restartWithBackoff(minBackoff = 10.seconds, maxBackoff = 60.seconds, randomFactor = 0.1)
    .withMaxRestarts(3)
    .withResetBackoffAfter(10.seconds))

值得注意的是:這個策略只適用於onPersistFailure(),從外部用Behaviors.supervisor()包嵌是無法實現處理PersistFailure效果的。但整個actor還是需要一種Backoff策略,因為在EventSourcedBehavior內部commandHandler,eventHandler里可能也會涉及一些數據庫操作。在操作失敗後需要某種Backoff重啟策略。那麼我們可以為actor增加監控策略如下:

  def apply(): Behavior[Command] =
    Behaviors.supervise(
      Behaviors.setup { ctx =>
        EventSourcedBehavior[Command, Event, CartLoad](
          persistenceId = PersistenceId("10", "1013"),
          emptyState = CartLoad(),
          commandHandler = commandHandler,
          eventHandler = eventHandler
        ).onPersistFailure(
          SupervisorStrategy
            .restartWithBackoff(minBackoff = 10.seconds, maxBackoff = 60.seconds, randomFactor = 0.1)
            .withMaxRestarts(3)
            .withResetBackoffAfter(10.seconds))
      }
    ).onFailure(
      SupervisorStrategy
        .restartWithBackoff(minBackoff = 10.seconds, maxBackoff = 60.seconds, randomFactor = 0.1)
        .withMaxRestarts(3)
        .withResetBackoffAfter(10.seconds)
    )

現在這個MyCart可以說已經是個安全、強韌性的actor了。

既然是一種persistentActor,那麼持久化的管理應該也算是核心功能了。EventSourcedBehavior通過接收信號提供了對持久化過程監控功能,如:

 def apply(): Behavior[Command] =
    Behaviors.supervise(
      Behaviors.setup[Command] { ctx =>
        EventSourcedBehavior[Command, Event, CartLoad](
          persistenceId = PersistenceId("10", "1013"),
          emptyState = CartLoad(),
          commandHandler = commandHandler,
          eventHandler = eventHandler
        ).onPersistFailure(
          SupervisorStrategy
            .restartWithBackoff(minBackoff = 10.seconds, maxBackoff = 60.seconds, randomFactor = 0.1)
            .withMaxRestarts(3)
            .withResetBackoffAfter(10.seconds)
        ).receiveSignal {
          case (state, RecoveryCompleted) =>
            ctx.log.info("**************Recovery Completed with state: {}***************",state)
          case (state, SnapshotCompleted(meta))  =>
            ctx.log.info("**************Snapshot Completed with state: {},id({},{})***************",state,meta.persistenceId, meta.sequenceNr)
          case (state,RecoveryFailed(err)) =>
            ctx.log.error("recovery failed with: {}",err.getMessage)
          case (state,SnapshotFailed(meta,err)) =>
            ctx.log.error("snapshoting failed with: {}",err.getMessage)
        }
      }
    ).onFailure(
      SupervisorStrategy
        .restartWithBackoff(minBackoff = 10.seconds, maxBackoff = 60.seconds, randomFactor = 0.1)
        .withMaxRestarts(3)
        .withResetBackoffAfter(10.seconds)
    )

EventSourcedBehavior.receiveSignal是個偏函數:

  def receiveSignal(signalHandler: PartialFunction[(State, Signal), Unit]): EventSourcedBehavior[Command, Event, State]

下面是一個EventSourcedBehavior Signal 清單:

sealed trait EventSourcedSignal extends Signal

@DoNotInherit sealed abstract class RecoveryCompleted extends EventSourcedSignal
case object RecoveryCompleted extends RecoveryCompleted {
  def instance: RecoveryCompleted = this
}

final case class RecoveryFailed(failure: Throwable) extends EventSourcedSignal {
  def getFailure(): Throwable = failure
}

final case class SnapshotCompleted(metadata: SnapshotMetadata) extends EventSourcedSignal {
  def getSnapshotMetadata(): SnapshotMetadata = metadata
}

final case class SnapshotFailed(metadata: SnapshotMetadata, failure: Throwable) extends EventSourcedSignal {

  def getFailure(): Throwable = failure
  def getSnapshotMetadata(): SnapshotMetadata = metadata
}

object SnapshotMetadata {

  /**
   * @param persistenceId id of persistent actor from which the snapshot was taken.
   * @param sequenceNr sequence number at which the snapshot was taken.
   * @param timestamp time at which the snapshot was saved, defaults to 0 when unknown.
   *                  in milliseconds from the epoch of 1970-01-01T00:00:00Z.
   */
  def apply(persistenceId: String, sequenceNr: Long, timestamp: Long): SnapshotMetadata =
    new SnapshotMetadata(persistenceId, sequenceNr, timestamp)
}

/**
 * Snapshot metadata.
 *
 * @param persistenceId id of persistent actor from which the snapshot was taken.
 * @param sequenceNr sequence number at which the snapshot was taken.
 * @param timestamp time at which the snapshot was saved, defaults to 0 when unknown.
 *                  in milliseconds from the epoch of 1970-01-01T00:00:00Z.
 */
final class SnapshotMetadata(val persistenceId: String, val sequenceNr: Long, val timestamp: Long) {
  override def toString: String =
    s"SnapshotMetadata($persistenceId,$sequenceNr,$timestamp)"
}

final case class DeleteSnapshotsCompleted(target: DeletionTarget) extends EventSourcedSignal {
  def getTarget(): DeletionTarget = target
}

final case class DeleteSnapshotsFailed(target: DeletionTarget, failure: Throwable) extends EventSourcedSignal {
  def getFailure(): Throwable = failure
  def getTarget(): DeletionTarget = target
}

final case class DeleteEventsCompleted(toSequenceNr: Long) extends EventSourcedSignal {
  def getToSequenceNr(): Long = toSequenceNr
}

final case class DeleteEventsFailed(toSequenceNr: Long, failure: Throwable) extends EventSourcedSignal {
  def getFailure(): Throwable = failure
  def getToSequenceNr(): Long = toSequenceNr
}

當然,EventSourcedBehavior之所以能具備自我修復能力其中一項是因為它有對持久化的事件重演機制。如果每次啟動都需要對所有歷史事件進行重演的話會很不現實。必須用snapshot來濃縮歷史事件:

  def apply(): Behavior[Command] =
    Behaviors.supervise(
      Behaviors.setup[Command] { ctx =>
        EventSourcedBehavior[Command, Event, CartLoad](
          persistenceId = PersistenceId("10", "1013"),
          emptyState = CartLoad(),
          commandHandler = commandHandler,
          eventHandler = eventHandler
        ).onPersistFailure(
          SupervisorStrategy
            .restartWithBackoff(minBackoff = 10.seconds, maxBackoff = 60.seconds, randomFactor = 0.1)
            .withMaxRestarts(3)
            .withResetBackoffAfter(10.seconds)
        ).receiveSignal {
          case (state, RecoveryCompleted) =>
            ctx.log.info("**************Recovery Completed with state: {}***************",state)
          case (state, SnapshotCompleted(meta))  =>
            ctx.log.info("**************Snapshot Completed with state: {},id({},{})***************",state,meta.persistenceId, meta.sequenceNr)
          case (state,RecoveryFailed(err)) =>
            ctx.log.error("recovery failed with: {}",err.getMessage)
          case (state,SnapshotFailed(meta,err)) =>
            ctx.log.error("snapshoting failed with: {}",err.getMessage)
        }.snapshotWhen {
          case (state,CartPaid,seqnum) =>
            ctx.log.info("*****************snapshot taken at: {} with state: {}",seqnum,state)
            true
          case (state,event,seqnum) => false
        }.withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 100, keepNSnapshots = 2))
      }
    ).onFailure(
      SupervisorStrategy
        .restartWithBackoff(minBackoff = 10.seconds, maxBackoff = 60.seconds, randomFactor = 0.1)
        .withMaxRestarts(3)
        .withResetBackoffAfter(10.seconds)
    )

下面是本次示範的源碼:

build.sbt

name := "learn-akka-typed"

version := "0.1"

scalaVersion := "2.13.1"
scalacOptions in Compile ++= Seq("-deprecation", "-feature", "-unchecked", "-Xlog-reflective-calls", "-Xlint")
javacOptions in Compile ++= Seq("-Xlint:unchecked", "-Xlint:deprecation")

val AkkaVersion = "2.6.5"
val AkkaPersistenceCassandraVersion = "1.0.0"


libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-cluster-sharding-typed" % AkkaVersion,
  "com.typesafe.akka" %% "akka-persistence-typed" % AkkaVersion,
  "com.typesafe.akka" %% "akka-persistence-query" % AkkaVersion,
  "com.typesafe.akka" %% "akka-serialization-jackson" % AkkaVersion,
  "com.typesafe.akka" %% "akka-persistence-cassandra" % AkkaPersistenceCassandraVersion,
  "com.typesafe.akka" %% "akka-slf4j" % AkkaVersion,
  "ch.qos.logback"     % "logback-classic"             % "1.2.3"
)

application.conf

akka.actor.allow-java-serialization = on
akka {
  loglevel = DEBUG
  actor {
    serialization-bindings {
      "com.learn.akka.CborSerializable" = jackson-cbor
    }
  }
  # use Cassandra to store both snapshots and the events of the persistent actors
  persistence {
    journal.plugin = "akka.persistence.cassandra.journal"
    snapshot-store.plugin = "akka.persistence.cassandra.snapshot"
  }

}
akka.persistence.cassandra {
  # don't use autocreate in production
  journal.keyspace = "poc"
  journal.keyspace-autocreate = on
  journal.tables-autocreate = on
  snapshot.keyspace = "poc_snapshot"
  snapshot.keyspace-autocreate = on
  snapshot.tables-autocreate = on
}

datastax-java-driver {
  basic.contact-points = ["192.168.11.189:9042"]
  basic.load-balancing-policy.local-datacenter = "datacenter1"
}

ShoppingCart.scala

package com.learn.akka

import akka.actor.typed._
import akka.persistence.typed._
import akka.actor.typed.scaladsl.Behaviors
import akka.persistence.typed.scaladsl._
import scala.concurrent.duration._

object ItemInfo {
  case class Item(name: String, price: Double)
}

object MyCart {
 import ItemInfo._

  sealed trait Command
  sealed trait Event extends CborSerializable
  sealed trait Response

  //commands
  case class AddItem(item: Item) extends Command
  case object PayCart extends Command
  case class CountItems(replyTo: ActorRef[Response]) extends Command

  //event
  case class ItemAdded(item: Item) extends Event
  case object CartPaid extends Event

  //state
  case class CartLoad(load: List[Item] = Nil)

  //response
  case class PickedItems(items: List[Item]) extends Response
  case object CartEmpty extends Response

  val commandHandler: (CartLoad, Command) => Effect[Event,CartLoad] = { (state, cmd) =>
    cmd match {
      case AddItem(item) =>
        Effect.persist(ItemAdded(item))
      case PayCart =>
        Effect.persist(CartPaid)
      case CountItems(replyTo) =>
        Effect.none.thenRun { cart =>
          cart.load match {
            case Nil =>
              replyTo ! CartEmpty
            case listOfItems =>
              replyTo ! PickedItems(listOfItems)
          }
        }
    }
  }

  val eventHandler: (CartLoad,Event) => CartLoad = { (state,evt) =>
    evt match {
      case ItemAdded(item) =>
         state.copy(load = item :: state.load)
      case CartPaid =>
        state.copy(load = Nil)
    }
  }

  def apply(): Behavior[Command] =
    Behaviors.supervise(
      Behaviors.setup[Command] { ctx =>
        EventSourcedBehavior[Command, Event, CartLoad](
          persistenceId = PersistenceId("10", "1013"),
          emptyState = CartLoad(),
          commandHandler = commandHandler,
          eventHandler = eventHandler
        ).onPersistFailure(
          SupervisorStrategy
            .restartWithBackoff(minBackoff = 10.seconds, maxBackoff = 60.seconds, randomFactor = 0.1)
            .withMaxRestarts(3)
            .withResetBackoffAfter(10.seconds)
        ).receiveSignal {
          case (state, RecoveryCompleted) =>
            ctx.log.info("**************Recovery Completed with state: {}***************",state)
          case (state, SnapshotCompleted(meta))  =>
            ctx.log.info("**************Snapshot Completed with state: {},id({},{})***************",state,meta.persistenceId, meta.sequenceNr)
          case (state,RecoveryFailed(err)) =>
            ctx.log.error("recovery failed with: {}",err.getMessage)
          case (state,SnapshotFailed(meta,err)) =>
            ctx.log.error("snapshoting failed with: {}",err.getMessage)
        }.snapshotWhen {
          case (state,CartPaid,seqnum) =>
            ctx.log.info("*****************snapshot taken at: {} with state: {}",seqnum,state)
            true
          case (state,event,seqnum) => false
        }.withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 100, keepNSnapshots = 2))
      }
    ).onFailure(
      SupervisorStrategy
        .restartWithBackoff(minBackoff = 10.seconds, maxBackoff = 60.seconds, randomFactor = 0.1)
        .withMaxRestarts(3)
        .withResetBackoffAfter(10.seconds)
    )
}

object Shopper {

  import ItemInfo._

  sealed trait Command extends CborSerializable

  case class GetItem(item: Item) extends Command
  case object Settle extends Command
  case object GetCount extends Command

  case class WrappedResponse(res: MyCart.Response) extends Command

  def apply(): Behavior[Command] = Behaviors.setup[Command] { ctx =>
    val shoppingCart = ctx.spawn(MyCart(), "shopping-cart")
    val cartRef: ActorRef[MyCart.Response] = ctx.messageAdapter(WrappedResponse)
    Behaviors.receiveMessage { msg =>
      msg match {
        case GetItem(item) =>
          shoppingCart ! MyCart.AddItem(item)
        case Settle =>
          shoppingCart ! MyCart.PayCart
        case GetCount =>
          shoppingCart ! MyCart.CountItems(cartRef)
        case WrappedResponse(res) => res match {
          case MyCart.PickedItems(items) =>
            ctx.log.info("**************Current Items in Cart: {}*************", items)
          case MyCart.CartEmpty =>
            ctx.log.info("**************shopping cart is empty!***************")
        }
      }
      Behaviors.same
    }
  }

}


object ShoppingCart extends App {
  import ItemInfo._
  val shopper = ActorSystem(Shopper(),"shopper")
  shopper ! Shopper.GetItem(Item("banana",11.20))
  shopper ! Shopper.GetItem(Item("watermelon",4.70))
  shopper ! Shopper.GetCount
  shopper ! Shopper.Settle
  shopper ! Shopper.GetCount
  scala.io.StdIn.readLine()

  shopper.terminate()

}