akka-typed(3) – PersistentActor has EventSourcedBehavior

   akka-typed中已經沒有PersistentActor了。取而代之的是帶有EventSourcedBehavior的actor,也就是一種專門支援EventSource模式的actor。EventSource的原理和作用在之前的部落格里已經有了比較詳細的介紹,這裡就不再重複了。本篇直接從EventsourcedBehavior actor的具體應用開始介紹。支援EventSource應用的基本數據類型包括 指令Command, 事件Event,狀態State。EventSourcing其實就是一個有限狀態機fsm finite-state-machine,執行Command,產生Event,改變State,終而復始。下面是一個簡單的EventSource類型定義:

trait CborSerializable {}
object Cart {
  case class Item(name: String, price: Double)

  sealed trait Command extends CborSerializable
  sealed trait Event

//commands
  case class AddItem(item: Item) extends Command
  case object PayCart extends Command

//event
  case class ItemAdded(item: Item) extends Event
  case object CartPaid extends Event

//state
  case class CartLoad(load: Set[Item] = Set.empty)


  val commandHandler: (CartLoad, Command) => Effect[Event,CartLoad] = { (state, cmd) =>
    cmd match {
        case AddItem(item) =>
          Effect.persist(ItemAdded(item))
        case PayCart =>
          Effect.persist(CartPaid)
      }
  }

  val eventHandler: (CartLoad,Event) => CartLoad = { (state,evt) =>
    evt match {
      case ItemAdded(item) =>
        val sts = state.copy(load = state.load+item)
        println(s"current cart loading: ${sts}")
        sts
      case CartPaid =>
        val sts = state.copy(load = Set.empty)
        println(s"current cart loading: ${sts.load}")
        sts
    }
  }

  def apply(): Behavior[Command] = EventSourcedBehavior[Command,Event,CartLoad](
    persistenceId = PersistenceId("10","1012"),
    emptyState = CartLoad(),
    commandHandler = commandHandler,
    eventHandler = eventHandler
  )

}

object EventSource extends App {
  import Cart._
  val cart = ActorSystem(Cart(),"shopping-cart")
  cart ! Cart.AddItem(Item("banana",11.20))
  cart ! Cart.AddItem(Item("watermelon",4.70))

  scala.io.StdIn.readLine()

  cart.terminate()

}

首先要搞清楚幾件事:EvenSourcedBehavior定義了一個actor。從Behavior[Command]這個結果類型來看,這個actor可以接收並處理Command類型的消息。既然是個actor那麼應該具備了receiveMessage,receiveSignal這兩項基本能力,但我們又不用自己來定義這些功能。怎麼回事呢?看看EventSourcedBehavior的源程式碼吧:

object EventSourcedBehavior {
...
  def apply[Command, Event, State](
      persistenceId: PersistenceId,
      emptyState: State,
      commandHandler: (State, Command) => Effect[Event, State],
      eventHandler: (State, Event) => State): EventSourcedBehavior[Command, Event, State] = {
    val loggerClass = LoggerClass.detectLoggerClassFromStack(classOf[EventSourcedBehavior[_, _, _]], logPrefixSkipList)
    EventSourcedBehaviorImpl(persistenceId, emptyState, commandHandler, eventHandler, loggerClass)
  }
...
}

這個EventSourcedBehavior就是某種Behavior。它的所有特殊功能看來應該是在EventSourcedBehaviorsImpl里實現的:

private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State](
    persistenceId: PersistenceId,
    emptyState: State,
    commandHandler: EventSourcedBehavior.CommandHandler[Command, Event, State],
    eventHandler: EventSourcedBehavior.EventHandler[State, Event],
    loggerClass: Class[_],
...
) extends EventSourcedBehavior[Command, Event, State] {
...
    Behaviors
      .supervise {
        Behaviors.setup[Command] { _ =>
          val eventSourcedSetup = new BehaviorSetup(
            ctx.asInstanceOf[ActorContext[InternalProtocol]],
            persistenceId,
            emptyState,
            commandHandler,
            eventHandler,
            WriterIdentity.newIdentity(),
            actualSignalHandler,
            tagger,
            eventAdapter,
            snapshotAdapter,
            snapshotWhen,
            recovery,
            retention,
            holdingRecoveryPermit = false,
            settings = settings,
            stashState = stashState)

          // needs to accept Any since we also can get messages from the journal
          // not part of the user facing Command protocol
          def interceptor: BehaviorInterceptor[Any, InternalProtocol] = new BehaviorInterceptor[Any, InternalProtocol] {

            import BehaviorInterceptor._
            override def aroundReceive(
                ctx: typed.TypedActorContext[Any],
                msg: Any,
                target: ReceiveTarget[InternalProtocol]): Behavior[InternalProtocol] = {
              val innerMsg = msg match {
                case res: JournalProtocol.Response           => InternalProtocol.JournalResponse(res)
                case res: SnapshotProtocol.Response          => InternalProtocol.SnapshotterResponse(res)
                case RecoveryPermitter.RecoveryPermitGranted => InternalProtocol.RecoveryPermitGranted
                case internal: InternalProtocol              => internal // such as RecoveryTickEvent
                case cmd: Command @unchecked                 => InternalProtocol.IncomingCommand(cmd)
              }
              target(ctx, innerMsg)
            }

            override def aroundSignal(
                ctx: typed.TypedActorContext[Any],
                signal: Signal,
                target: SignalTarget[InternalProtocol]): Behavior[InternalProtocol] = {
              if (signal == PostStop) {
                eventSourcedSetup.cancelRecoveryTimer()
                // clear stash to be GC friendly
                stashState.clearStashBuffers()
              }
              target(ctx, signal)
            }

            override def toString: String = "EventSourcedBehaviorInterceptor"
          }

          Behaviors.intercept(() => interceptor)(RequestingRecoveryPermit(eventSourcedSetup)).narrow
        }

      }
      .onFailure[JournalFailureException](supervisionStrategy)
}

EventSourcedBehaviorImpl還是一種Behavior[Command],它又是通過一個BehaviorInterceptor實現的。BehaviorInterceptor.aroundReceive和BehaviorInterceptor.aroundSignal可以代替receiveMessage和receiveSignal的工作,這點從這兩個函數的結果類型可以得到一些驗證:

 /* @tparam Outer The outer message type – the type of messages the intercepting behavior will accept
 * @tparam Inner The inner message type - the type of message the wrapped behavior accepts
 *
 * @see [[BehaviorSignalInterceptor]]
 */
abstract class BehaviorInterceptor[Outer, Inner](val interceptMessageClass: Class[Outer]) {
  import BehaviorInterceptor._
...

 /**
   * Intercept a message sent to the running actor. Pass the message on to the next behavior
   * in the stack by passing it to `target.apply`, return `Behaviors.same` without invoking `target`
   * to filter out the message.
   *
   * @return The behavior for next message or signal
   */
  def aroundReceive(ctx: TypedActorContext[Outer], msg: Outer, target: ReceiveTarget[Inner]): Behavior[Inner]

  /**
   * Override to intercept a signal sent to the running actor. Pass the signal on to the next behavior
   * in the stack by passing it to `target.apply`.
   *
   * @return The behavior for next message or signal
   *
   * @see [[BehaviorSignalInterceptor]]
   */
  def aroundSignal(ctx: TypedActorContext[Outer], signal: Signal, target: SignalTarget[Inner]): Behavior[Inner]

...
}

另外,對於EventSourcedBehavior來說,收到Command, 處理Command方式應該是通過外部提供的這個commandHandler來實現才是最值得注意的:

  final class HandlingCommands(state: RunningState[S])
      extends AbstractBehavior[InternalProtocol](setup.context)
      with WithSeqNrAccessible {

    def onMessage(msg: InternalProtocol): Behavior[InternalProtocol] = msg match {
      case IncomingCommand(c: C @unchecked) => onCommand(state, c)
      case JournalResponse(r)               => onDeleteEventsJournalResponse(r, state.state)
      case SnapshotterResponse(r)           => onDeleteSnapshotResponse(r, state.state)
      case get: GetState[S @unchecked]      => onGetState(get)
      case _                                => Behaviors.unhandled
    }

    override def onSignal: PartialFunction[Signal, Behavior[InternalProtocol]] = {
      case PoisonPill =>
        if (isInternalStashEmpty && !isUnstashAllInProgress) Behaviors.stopped
        else new HandlingCommands(state.copy(receivedPoisonPill = true))
      case signal =>
        if (setup.onSignal(state.state, signal, catchAndLog = false)) this
        else Behaviors.unhandled
    }

    def onCommand(state: RunningState[S], cmd: C): Behavior[InternalProtocol] = {
      val effect = setup.commandHandler(state.state, cmd)
      applyEffects(cmd, state, effect.asInstanceOf[EffectImpl[E, S]]) // TODO can we avoid the cast?
    }

...
}

上面這段程式碼已經足夠說明了。根據commandHandler和eventHandler的函數類型可以得出EventSourcedBehavior處理流程 (State, Command) => (State, Event) => new State, 最終輸出new State:

object EventSourcedBehavior {
  type CommandHandler[Command, Event, State] = (State, Command) => Effect[Event, State]
  type EventHandler[State, Event] = (State, Event) => State
...
}

commandHandler返回Effect[Event,State]類型結果,也就是說處理Command過程就是產生Event過程,下面是Effect的各種選項:

 

object Effect {

  /**
   * Persist a single event
   *
   * Side effects can be chained with `thenRun`
   */
  def persist[Event, State](event: Event): EffectBuilder[Event, State] = Persist(event)

  /**
   * Persist multiple events
   *
   * Side effects can be chained with `thenRun`
   */
  def persist[Event, A <: Event, B <: Event, State](evt1: A, evt2: B, events: Event*): EffectBuilder[Event, State] =
    persist(evt1 :: evt2 :: events.toList)

  /**
   * Persist multiple events
   *
   * Side effects can be chained with `thenRun`
   */
  def persist[Event, State](events: im.Seq[Event]): EffectBuilder[Event, State] =
    PersistAll(events)

  /**
   * Do not persist anything
   *
   * Side effects can be chained with `thenRun`
   */
  def none[Event, State]: EffectBuilder[Event, State] = PersistNothing.asInstanceOf[EffectBuilder[Event, State]]

  /**
   * This command is not handled, but it is not an error that it isn't.
   *
   * Side effects can be chained with `thenRun`
   */
  def unhandled[Event, State]: EffectBuilder[Event, State] = Unhandled.asInstanceOf[EffectBuilder[Event, State]]

  /**
   * Stop this persistent actor
   * Side effects can be chained with `thenRun`
   */
  def stop[Event, State](): EffectBuilder[Event, State] =
    none.thenStop()

  /**
   * Stash the current command. Can be unstashed later with [[Effect.unstashAll]].
   *
   * Note that the stashed commands are kept in an in-memory buffer, so in case of a crash they will not be
   * processed. They will also be discarded if the actor is restarted (or stopped) due to that an exception was
   * thrown from processing a command or side effect after persisting. The stash buffer is preserved for persist
   * failures if a backoff supervisor strategy is defined with [[EventSourcedBehavior.onPersistFailure]].
   *
   * Side effects can be chained with `thenRun`
   */
  def stash[Event, State](): ReplyEffect[Event, State] =
    Stash.asInstanceOf[EffectBuilder[Event, State]].thenNoReply()

  /**
   * Unstash the commands that were stashed with [[Effect.stash]].
   *
   * It's allowed to stash messages while unstashing. Those newly added
   * commands will not be processed by this `unstashAll` effect and have to be unstashed
   * by another `unstashAll`.
   *
   * @see [[EffectBuilder.thenUnstashAll]]
   */
  def unstashAll[Event, State](): Effect[Event, State] =
    CompositeEffect(none.asInstanceOf[EffectBuilder[Event, State]], SideEffect.unstashAll[State]())

  /**
   * Send a reply message to the command. The type of the
   * reply message must conform to the type specified by the passed replyTo `ActorRef`.
   *
   * This has the same semantics as `cmd.replyTo.tell`.
   *
   * It is provided as a convenience (reducing boilerplate) and a way to enforce that replies are not forgotten
   * when the `EventSourcedBehavior` is created with [[EventSourcedBehavior.withEnforcedReplies]]. When
   * `withEnforcedReplies` is used there will be compilation errors if the returned effect isn't a [[ReplyEffect]].
   * The reply message will be sent also if `withEnforcedReplies` isn't used, but then the compiler will not help
   * finding mistakes.
   */
  def reply[ReplyMessage, Event, State](replyTo: ActorRef[ReplyMessage])(
      replyWithMessage: ReplyMessage): ReplyEffect[Event, State] =
    none[Event, State].thenReply[ReplyMessage](replyTo)(_ => replyWithMessage)

  /**
   * When [[EventSourcedBehavior.withEnforcedReplies]] is used there will be compilation errors if the returned effect
   * isn't a [[ReplyEffect]]. This `noReply` can be used as a conscious decision that a reply shouldn't be
   * sent for a specific command or the reply will be sent later.
   */
  def noReply[Event, State]: ReplyEffect[Event, State] =
    none.thenNoReply()

}

 

接著用handleEvent來根據產生的Event更新State,如下:

 

    @tailrec def applyEffects(
        msg: Any,
        state: RunningState[S],
        effect: Effect[E, S],
        sideEffects: immutable.Seq[SideEffect[S]] = Nil): Behavior[InternalProtocol] = {
      if (setup.log.isDebugEnabled && !effect.isInstanceOf[CompositeEffect[_, _]])
        setup.log.debugN(
          s"Handled command [{}], resulting effect: [{}], side effects: [{}]",
          msg.getClass.getName,
          effect,
          sideEffects.size)

      effect match {
        case CompositeEffect(eff, currentSideEffects) =>
          // unwrap and accumulate effects
          applyEffects(msg, state, eff, currentSideEffects ++ sideEffects)

        case Persist(event) =>
          // apply the event before persist so that validation exception is handled before persisting
          // the invalid event, in case such validation is implemented in the event handler.
          // also, ensure that there is an event handler for each single event
          val newState = state.applyEvent(setup, event)

          val eventToPersist = adaptEvent(event)
          val eventAdapterManifest = setup.eventAdapter.manifest(event)

          val newState2 = internalPersist(setup.context, msg, newState, eventToPersist, eventAdapterManifest)

          val shouldSnapshotAfterPersist = setup.shouldSnapshot(newState2.state, event, newState2.seqNr)

          persistingEvents(newState2, state, numberOfEvents = 1, shouldSnapshotAfterPersist, sideEffects)

        case PersistAll(events) =>
          if (events.nonEmpty) {
            // apply the event before persist so that validation exception is handled before persisting
            // the invalid event, in case such validation is implemented in the event handler.
            // also, ensure that there is an event handler for each single event
            var seqNr = state.seqNr
            val (newState, shouldSnapshotAfterPersist) = events.foldLeft((state, NoSnapshot: SnapshotAfterPersist)) {
              case ((currentState, snapshot), event) =>
                seqNr += 1
                val shouldSnapshot =
                  if (snapshot == NoSnapshot) setup.shouldSnapshot(currentState.state, event, seqNr) else snapshot
                (currentState.applyEvent(setup, event), shouldSnapshot)
            }

            val eventsToPersist = events.map(evt => (adaptEvent(evt), setup.eventAdapter.manifest(evt)))

            val newState2 = internalPersistAll(setup.context, msg, newState, eventsToPersist)

            persistingEvents(newState2, state, events.size, shouldSnapshotAfterPersist, sideEffects)

          } else {
            // run side-effects even when no events are emitted
            tryUnstashOne(applySideEffects(sideEffects, state))
          }

        case _: PersistNothing.type =>
          tryUnstashOne(applySideEffects(sideEffects, state))

        case _: Unhandled.type =>
          import akka.actor.typed.scaladsl.adapter._
          setup.context.system.toClassic.eventStream
            .publish(UnhandledMessage(msg, setup.context.system.toClassic.deadLetters, setup.context.self.toClassic))
          tryUnstashOne(applySideEffects(sideEffects, state))

        case _: Stash.type =>
          stashUser(IncomingCommand(msg))
          tryUnstashOne(applySideEffects(sideEffects, state))
      }
    }

好了,基本原理都在這了,再挖下去會更骯髒。為上面的例子設了個運行環境,主要是測試persistence-cassandra-plugin的正確設置,如下:

build.sbt

 name := "learn-akka-typed"

version := "0.1"

scalaVersion := "2.13.1"
scalacOptions in Compile ++= Seq("-deprecation", "-feature", "-unchecked", "-Xlog-reflective-calls", "-Xlint")
javacOptions in Compile ++= Seq("-Xlint:unchecked", "-Xlint:deprecation")

val AkkaVersion = "2.6.5"
val AkkaPersistenceCassandraVersion = "1.0.0"


libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-cluster-sharding-typed" % AkkaVersion,
  "com.typesafe.akka" %% "akka-persistence-typed" % AkkaVersion,
  "com.typesafe.akka" %% "akka-persistence-query" % AkkaVersion,
  "com.typesafe.akka" %% "akka-serialization-jackson" % AkkaVersion,
  "com.typesafe.akka" %% "akka-persistence-cassandra" % AkkaPersistenceCassandraVersion,
  "com.typesafe.akka" %% "akka-slf4j" % AkkaVersion,
  "ch.qos.logback"     % "logback-classic"             % "1.2.3"
)

application.conf

akka.actor.allow-java-serialization = on
akka {
  loglevel = DEBUG
  actor {
    serialization-bindings {
      "com.learn.akka.CborSerializable" = jackson-cbor
    }
  }
  # use Cassandra to store both snapshots and the events of the persistent actors
  persistence {
    journal.plugin = "akka.persistence.cassandra.journal"
    snapshot-store.plugin = "akka.persistence.cassandra.snapshot"
  }

}
akka.persistence.cassandra {
  # don't use autocreate in production
  journal.keyspace = "poc"
  journal.keyspace-autocreate = on
  journal.tables-autocreate = on
  snapshot.keyspace = "poc_snapshot"
  snapshot.keyspace-autocreate = on
  snapshot.tables-autocreate = on
}

datastax-java-driver {
  basic.contact-points = ["192.168.11.189:9042"]
  basic.load-balancing-policy.local-datacenter = "datacenter1"
}