akka-typed(6) – cluster:group router, cluster-load-balancing
- 2020 年 6 月 11 日
- 筆記
- akka, akka-cluster, Cassandra, event-sourcing, scala, 編程語言
先談談akka-typed的router actor。route 分pool router, group router兩類。我們先看看pool-router的使用示範:
val pool = Routers.pool(poolSize = 4)( // make sure the workers are restarted if they fail Behaviors.supervise(WorkerRoutee()).onFailure[Exception](SupervisorStrategy.restart)) val router = ctx.spawn(pool, "worker-pool") (0 to 10).foreach { n => router ! WorkerRoutee.DoLog(s"msg $n") }
上面例子里的pool是個pool-router,意思是一個有4個routees的routee池。每個routee都是通過WorkerRoutee()構建的,意味着routee池中只有一個種類的actor。pool-router是通過工廠方法直接在本地(JVM)構建(spawn)所有的routee。也就是說所有routee都是router的子actor。
再看看group-router的使用例子:
val serviceKey = ServiceKey[Worker.Command]("log-worker")
// this would likely happen elsewhere - if we create it locally we
// can just as well use a pool
val workerRoutee = ctx.spawn(WorkerRoutee(), "worker-route")
ctx.system.receptionist ! Receptionist.Register(serviceKey, workerRoutee)
val group = Routers.group(serviceKey)
val router = ctx.spawn(group, "worker-group")
// the group router will stash messages until it sees the first listing of registered
// services from the receptionist, so it is safe to send messages right away
(0 to 10).foreach { n =>
router ! WorkerRoutee.DoLog(s"msg $n")
}
group-router與pool-router有較多分別:
1、routee是在router之外構建的,router是用一個key通過Receptionist獲取同key的actor清單作為routee group的
2、Receptionist是集群全局的。任何節點上的actor都可以發送註冊消息在Receptionist上登記
3、沒有size限制,任何actor一旦在Receptionist上登記即變成routee,接受router管理
應該說如果想把運算任務分配在集群里的各節點上並行運算實現load-balance效果,group-router是最合適的選擇。不過對不同的運算任務需要多少routee則需要用戶自行決定,不像以前akka-classic里通過cluster-metrics根據節點負載情況自動增減routee實例那麼方便。
Receptionist: 既然說到,那麼就再深入一點介紹Receptionist的應用:上面提到,Receptionist是集群全局的。就是說任何節點上的actor都可以在Receptonist上註冊形成一個生存在集群中不同節點的actor清單。如果Receptionist把這個清單提供給一個用戶,那麼這個用戶就可以把運算任務配置到各節點上,實現某種意義上的分佈式運算模式。Receptionist的使用方式是:通過向本節點的Receptionist發送消息去登記ActorRef,然後通過Receptionist發佈的登記變化消息即可獲取最新的ActorRef清單:
val WorkerServiceKey = ServiceKey[Worker.TransformText]("Worker")
ctx.system.receptionist ! Receptionist.Register(WorkerServiceKey, ctx.self)
ctx.system.receptionist ! Receptionist.Subscribe(Worker.WorkerServiceKey, subscriptionAdapter)
Receptionist的登記和清單獲取是以ServiceKey作為關聯的。那麼獲取的清單內應該全部是一種類型的actor,只不過它們的地址可能是跨節點的,但它們只能進行同一種運算。從另一個角度說,一項任務是分佈在不同節點的actor並行進行運算的。
在上篇討論里提過:如果發佈-訂閱機制是在兩個actor之間進行的,那麼這兩個actor也需要在規定的信息交流協議框架下作業:我們必須注意消息類型,提供必要的消息類型轉換機制。下面是一個Receptionist登記示範:
object Worker {
val WorkerServiceKey = ServiceKey[Worker.TransformText]("Worker")
sealed trait Command
final case class TransformText(text: String, replyTo: ActorRef[TextTransformed]) extends Command with CborSerializable
final case class TextTransformed(text: String) extends CborSerializable
def apply(): Behavior[Command] =
Behaviors.setup { ctx =>
// each worker registers themselves with the receptionist
ctx.log.info("Registering myself with receptionist")
ctx.system.receptionist ! Receptionist.Register(WorkerServiceKey, ctx.self)
Behaviors.receiveMessage {
case TransformText(text, replyTo) =>
replyTo ! TextTransformed(text.toUpperCase)
Behaviors.same
}
}
}
Receptionist登記比較直接:登記者不需要Receptionist返回消息,所以隨便用ctx.self作為消息的sender。注意TransformText的replyTo: ActorRef[TextTransformed],代表sender是個可以處理TextTransformed消息類型的actor。實際上,在sender方是通過ctx.ask提供了TextTransformed的類型轉換。
Receptionist.Subscribe需要Receptionist返回一個actor清單,所以是個request/response模式。那麼發送給Receptionist消息中的replyTo必須是發送者能處理的類型,如下:
def apply(): Behavior[Event] = Behaviors.setup { ctx =>
Behaviors.withTimers { timers =>
// subscribe to available workers
val subscriptionAdapter = ctx.messageAdapter[Receptionist.Listing] {
case Worker.WorkerServiceKey.Listing(workers) =>
WorkersUpdated(workers)
}
ctx.system.receptionist ! Receptionist.Subscribe(Worker.WorkerServiceKey, subscriptionAdapter)
...
}
ctx.messageAdapter進行了一個從Receptionist.Listing返回類型到WorkersUpdated類型的轉換機制登記:從Receptionist回復的List類型會被轉換成WorkersUpdated類型,如下:
...
Behaviors.receiveMessage {
case WorkersUpdated(newWorkers) =>
ctx.log.info("List of services registered with the receptionist changed: {}", newWorkers)
...
另外,上面提過的TextTransformed轉換如下:
ctx.ask[Worker.TransformText,Worker.TextTransformed](selectedWorker, Worker.TransformText(text, _)) {
case Success(transformedText) => TransformCompleted(transformedText.text, text)
case Failure(ex) => JobFailed("Processing timed out", text)
}
ctx.ask將TextTransformed轉換成TransformCompleted。完整的Behavior定義如下:
object Frontend {
sealed trait Event
private case object Tick extends Event
private final case class WorkersUpdated(newWorkers: Set[ActorRef[Worker.TransformText]]) extends Event
private final case class TransformCompleted(originalText: String, transformedText: String) extends Event
private final case class JobFailed(why: String, text: String) extends Event
def apply(): Behavior[Event] = Behaviors.setup { ctx =>
Behaviors.withTimers { timers =>
// subscribe to available workers
val subscriptionAdapter = ctx.messageAdapter[Receptionist.Listing] {
case Worker.WorkerServiceKey.Listing(workers) =>
WorkersUpdated(workers)
}
ctx.system.receptionist ! Receptionist.Subscribe(Worker.WorkerServiceKey, subscriptionAdapter)
timers.startTimerWithFixedDelay(Tick, Tick, 2.seconds)
running(ctx, IndexedSeq.empty, jobCounter = 0)
}
}
private def running(ctx: ActorContext[Event], workers: IndexedSeq[ActorRef[Worker.TransformText]], jobCounter: Int): Behavior[Event] =
Behaviors.receiveMessage {
case WorkersUpdated(newWorkers) =>
ctx.log.info("List of services registered with the receptionist changed: {}", newWorkers)
running(ctx, newWorkers.toIndexedSeq, jobCounter)
case Tick =>
if (workers.isEmpty) {
ctx.log.warn("Got tick request but no workers available, not sending any work")
Behaviors.same
} else {
// how much time can pass before we consider a request failed
implicit val timeout: Timeout = 5.seconds
val selectedWorker = workers(jobCounter % workers.size)
ctx.log.info("Sending work for processing to {}", selectedWorker)
val text = s"hello-$jobCounter"
ctx.ask[Worker.TransformText,Worker.TextTransformed](selectedWorker, Worker.TransformText(text, _)) {
case Success(transformedText) => TransformCompleted(transformedText.text, text)
case Failure(ex) => JobFailed("Processing timed out", text)
}
running(ctx, workers, jobCounter + 1)
}
case TransformCompleted(originalText, transformedText) =>
ctx.log.info("Got completed transform of {}: {}", originalText, transformedText)
Behaviors.same
case JobFailed(why, text) =>
ctx.log.warn("Transformation of text {} failed. Because: {}", text, why)
Behaviors.same
}
現在我們可以示範用group-router來實現某種跨節點的分佈式運算。因為group-router是通過Receptionist來實現對routees管理的,而Receptionist是集群全局的,意味着如果我們在各節點上構建routee,然後向Receptionist登記,就會形成一個跨節點的routee ActorRef清單。如果把任務分配到這個清單上的routee上去運算,應該能實現集群節點負載均衡的效果。下面我們就示範這個loadbalancer。流程很簡單:在一個接入點 (serviceActor)中構建workersRouter,然後3個workerRoutee並向Receptionist登記,把接到的任務分解成子任務逐個發送給workersRouter。每個workerRoutee完成任務後將結果發送給一個聚合器Aggregator,Aggregator在核對完成接收所有workerRoutee返回的結果後再把匯總結果返回serverActor。先看看這個serverActor:
object Service {
val routerServiceKey = ServiceKey[WorkerRoutee.Command]("workers-router")
sealed trait Command extends CborSerializable
case class ProcessText(text: String) extends Command {
require(text.nonEmpty)
}
case class WrappedResult(res: Aggregator.Response) extends Command
def serviceBehavior(workersRouter: ActorRef[WorkerRoutee.Command]): Behavior[Command] = Behaviors.setup[Command] { ctx =>
val aggregator = ctx.spawn(Aggregator(), "aggregator")
val aggregatorRef: ActorRef[Aggregator.Response] = ctx.messageAdapter(WrappedResult)
Behaviors.receiveMessage {
case ProcessText(text) =>
ctx.log.info("******************** received ProcessText command: {} ****************",text)
val words = text.split(' ').toIndexedSeq
aggregator ! Aggregator.CountText(words.size, aggregatorRef)
words.foreach { word =>
workersRouter ! WorkerRoutee.Count(word, aggregator)
}
Behaviors.same
case WrappedResult(msg) =>
msg match {
case Aggregator.Result(res) =>
ctx.log.info("************** mean length of words = {} **********", res)
}
Behaviors.same
}
}
def singletonService(ctx: ActorContext[Command], workersRouter: ActorRef[WorkerRoutee.Command]) = {
val singletonSettings = ClusterSingletonSettings(ctx.system)
.withRole("front")
SingletonActor(
Behaviors.supervise(
serviceBehavior(workersRouter)
).onFailure(
SupervisorStrategy
.restartWithBackoff(minBackoff = 10.seconds, maxBackoff = 60.seconds, randomFactor = 0.1)
.withMaxRestarts(3)
.withResetBackoffAfter(10.seconds)
)
, "singletonActor"
).withSettings(singletonSettings)
}
def apply(): Behavior[Command] = Behaviors.setup[Command] { ctx =>
val cluster = Cluster(ctx.system)
val workersRouter = ctx.spawn(
Routers.group(routerServiceKey)
.withRoundRobinRouting(),
"workersRouter"
)
(0 until 3).foreach { n =>
val routee = ctx.spawn(WorkerRoutee(cluster.selfMember.address.toString), s"work-routee-$n")
ctx.system.receptionist ! Receptionist.register(routerServiceKey, routee)
}
val singletonActor = ClusterSingleton(ctx.system).init(singletonService(ctx, workersRouter))
Behaviors.receiveMessage {
case job@ProcessText(text) =>
singletonActor ! job
Behaviors.same
}
}
}
整體goup-router和routee的構建是在apply()里,並把接到的任務轉發給singletonActor。singletonActor是以serviceBehavior為核心的一個actor。在servceBehavior里把收到的任務分解並分別發送給workersRouter。值得注意的是:serviceBehavior期望接收從Aggregator的回應,它們之間存在request/response模式信息交流,所以需要Aggregator.Response到WrappedResult的類型轉換機制。還有:子任務是通過workersRoute發送給個workerRoutee的,我們需要各workerRoutee把運算結果返給給Aggregator,所以發送給workersRouter的消息包含了Aggregator的ActorRef,如:workersRouter ! WorkerRoutee.Count(cnt,aggregatorRef)。
Aggregator是個persistentActor, 如下:
object Aggregator {
sealed trait Command
sealed trait Event extends CborSerializable
sealed trait Response
case class CountText(cnt: Int, replyTo: ActorRef[Response]) extends Command
case class MarkLength(word: String, len: Int) extends Command
case class TextCounted(cnt: Int) extends Event
case class LengthMarked(word: String, len: Int) extends Event
case class Result(meanWordLength: Double) extends Response
case class State(expectedNum: Int = 0, lens: List[Int] = Nil)
var replyTo: ActorRef[Response] = _
def commandHandler: (State,Command) => Effect[Event,State] = (st,cmd) => {
cmd match {
case CountText(cnt,ref) =>
replyTo = ref
Effect.persist(TextCounted(cnt))
case MarkLength(word,len) =>
Effect.persist(LengthMarked(word,len))
}
}
def eventHandler: (State, Event) => State = (st,ev) => {
ev match {
case TextCounted(cnt) =>
st.copy(expectedNum = cnt, lens = Nil)
case LengthMarked(word,len) =>
val state = st.copy(lens = len :: st.lens)
if (state.lens.size >= state.expectedNum) {
val meanWordLength = state.lens.sum.toDouble / state.lens.size
replyTo ! Result(meanWordLength)
State()
} else state
}
}
val takeSnapShot: (State,Event,Long) => Boolean = (st,ev,seq) => {
if (st.lens.isEmpty) {
if (ev.isInstanceOf[LengthMarked])
true
else
false
} else
false
}
def apply(): Behavior[Command] = Behaviors.supervise(
Behaviors.setup[Command] { ctx =>
EventSourcedBehavior(
persistenceId = PersistenceId("33","2333"),
emptyState = State(),
commandHandler = commandHandler,
eventHandler = eventHandler
).onPersistFailure(
SupervisorStrategy
.restartWithBackoff(minBackoff = 10.seconds, maxBackoff = 60.seconds, randomFactor = 0.1)
.withMaxRestarts(3)
.withResetBackoffAfter(10.seconds)
).receiveSignal {
case (state, RecoveryCompleted) =>
ctx.log.info("**************Recovery Completed with state: {}***************",state)
case (state, SnapshotCompleted(meta)) =>
ctx.log.info("**************Snapshot Completed with state: {},id({},{})***************",state,meta.persistenceId, meta.sequenceNr)
case (state,RecoveryFailed(err)) =>
ctx.log.error("*************recovery failed with: {}***************",err.getMessage)
case (state,SnapshotFailed(meta,err)) =>
ctx.log.error("***************snapshoting failed with: {}*************",err.getMessage)
}.snapshotWhen(takeSnapShot)
}
).onFailure(
SupervisorStrategy
.restartWithBackoff(minBackoff = 10.seconds, maxBackoff = 60.seconds, randomFactor = 0.1)
.withMaxRestarts(3)
.withResetBackoffAfter(10.seconds)
)
}
注意這個takeSnapShot函數:這個函數是在EventSourcedBehavior.snapshotWhen(takeSnapShot)調用的。傳入參數是(State,Event,seqenceNr),我們需要對State,Event的當前值進行分析後返回true代表做一次snapshot。
看看一部分顯示就知道任務已經分配到幾個節點上的routee:
20:06:59.072 [ClusterSystem-akka.actor.default-dispatcher-15] INFO com.learn.akka.WorkerRoutee$ - ************** processing [this] on akka://[email protected]:51182 *********** 20:06:59.072 [ClusterSystem-akka.actor.default-dispatcher-3] INFO com.learn.akka.WorkerRoutee$ - ************** processing [text] on akka://[email protected]:51182 *********** 20:06:59.072 [ClusterSystem-akka.actor.default-dispatcher-36] INFO com.learn.akka.WorkerRoutee$ - ************** processing [be] on akka://[email protected]:51182 *********** 20:06:59.236 [ClusterSystem-akka.actor.default-dispatcher-16] INFO com.learn.akka.WorkerRoutee$ - ************** processing [will] on akka://[email protected]:51173 *********** 20:06:59.236 [ClusterSystem-akka.actor.default-dispatcher-26] INFO com.learn.akka.WorkerRoutee$ - ************** processing [is] on akka://[email protected]:25251 *********** 20:06:59.236 [ClusterSystem-akka.actor.default-dispatcher-13] INFO com.learn.akka.WorkerRoutee$ - ************** processing [the] on akka://[email protected]:51173 *********** 20:06:59.236 [ClusterSystem-akka.actor.default-dispatcher-3] INFO com.learn.akka.WorkerRoutee$ - ************** processing [that] on akka://[email protected]:25251 *********** 20:06:59.236 [ClusterSystem-akka.actor.default-dispatcher-3] INFO com.learn.akka.WorkerRoutee$ - ************** processing [analyzed] on akka://[email protected]:25251 ***********
這個例子的源代碼如下:
package com.learn.akka import akka.actor.typed._ import akka.persistence.typed._ import akka.persistence.typed.scaladsl._ import scala.concurrent.duration._ import akka.actor.typed.receptionist._ import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl._ import akka.cluster.typed.Cluster import akka.cluster.typed.ClusterSingleton import akka.cluster.typed.ClusterSingletonSettings import akka.cluster.typed.SingletonActor import com.typesafe.config.ConfigFactory object WorkerRoutee { sealed trait Command extends CborSerializable case class Count(word: String, replyTo: ActorRef[Aggregator.Command]) extends Command def apply(nodeAddress: String): Behavior[Command] = Behaviors.setup {ctx => Behaviors.receiveMessage[Command] { case Count(word,replyTo) => ctx.log.info("************** processing [{}] on {} ***********",word,nodeAddress) replyTo ! Aggregator.MarkLength(word,word.length) Behaviors.same } } } object Aggregator { sealed trait Command sealed trait Event extends CborSerializable sealed trait Response case class CountText(cnt: Int, replyTo: ActorRef[Response]) extends Command case class MarkLength(word: String, len: Int) extends Command case class TextCounted(cnt: Int) extends Event case class LengthMarked(word: String, len: Int) extends Event case class Result(meanWordLength: Double) extends Response case class State(expectedNum: Int = 0, lens: List[Int] = Nil) var replyTo: ActorRef[Response] = _ def commandHandler: (State,Command) => Effect[Event,State] = (st,cmd) => { cmd match { case CountText(cnt,ref) => replyTo = ref Effect.persist(TextCounted(cnt)) case MarkLength(word,len) => Effect.persist(LengthMarked(word,len)) } } def eventHandler: (State, Event) => State = (st,ev) => { ev match { case TextCounted(cnt) => st.copy(expectedNum = cnt, lens = Nil) case LengthMarked(word,len) => val state = st.copy(lens = len :: st.lens) if (state.lens.size >= state.expectedNum) { val meanWordLength = state.lens.sum.toDouble / state.lens.size replyTo ! Result(meanWordLength) State() } else state } } val takeSnapShot: (State,Event,Long) => Boolean = (st,ev,seq) => { if (st.lens.isEmpty) { if (ev.isInstanceOf[LengthMarked]) true else false } else false } def apply(): Behavior[Command] = Behaviors.supervise( Behaviors.setup[Command] { ctx => EventSourcedBehavior( persistenceId = PersistenceId("33","2333"), emptyState = State(), commandHandler = commandHandler, eventHandler = eventHandler ).onPersistFailure( SupervisorStrategy .restartWithBackoff(minBackoff = 10.seconds, maxBackoff = 60.seconds, randomFactor = 0.1) .withMaxRestarts(3) .withResetBackoffAfter(10.seconds) ).receiveSignal { case (state, RecoveryCompleted) => ctx.log.info("**************Recovery Completed with state: {}***************",state) case (state, SnapshotCompleted(meta)) => ctx.log.info("**************Snapshot Completed with state: {},id({},{})***************",state,meta.persistenceId, meta.sequenceNr) case (state,RecoveryFailed(err)) => ctx.log.error("*************recovery failed with: {}***************",err.getMessage) case (state,SnapshotFailed(meta,err)) => ctx.log.error("***************snapshoting failed with: {}*************",err.getMessage) }.snapshotWhen(takeSnapShot) } ).onFailure( SupervisorStrategy .restartWithBackoff(minBackoff = 10.seconds, maxBackoff = 60.seconds, randomFactor = 0.1) .withMaxRestarts(3) .withResetBackoffAfter(10.seconds) ) } object Service { val routerServiceKey = ServiceKey[WorkerRoutee.Command]("workers-router") sealed trait Command extends CborSerializable case class ProcessText(text: String) extends Command { require(text.nonEmpty) } case class WrappedResult(res: Aggregator.Response) extends Command def serviceBehavior(workersRouter: ActorRef[WorkerRoutee.Command]): Behavior[Command] = Behaviors.setup[Command] { ctx => val aggregator = ctx.spawn(Aggregator(), "aggregator") val aggregatorRef: ActorRef[Aggregator.Response] = ctx.messageAdapter(WrappedResult) Behaviors.receiveMessage { case ProcessText(text) => ctx.log.info("******************** received ProcessText command: {} ****************",text) val words = text.split(' ').toIndexedSeq aggregator ! Aggregator.CountText(words.size, aggregatorRef) words.foreach { word => workersRouter ! WorkerRoutee.Count(word, aggregator) } Behaviors.same case WrappedResult(msg) => msg match { case Aggregator.Result(res) => ctx.log.info("************** mean length of words = {} **********", res) } Behaviors.same } } def singletonService(ctx: ActorContext[Command], workersRouter: ActorRef[WorkerRoutee.Command]) = { val singletonSettings = ClusterSingletonSettings(ctx.system) .withRole("front") SingletonActor( Behaviors.supervise( serviceBehavior(workersRouter) ).onFailure( SupervisorStrategy .restartWithBackoff(minBackoff = 10.seconds, maxBackoff = 60.seconds, randomFactor = 0.1) .withMaxRestarts(3) .withResetBackoffAfter(10.seconds) ) , "singletonActor" ).withSettings(singletonSettings) } def apply(): Behavior[Command] = Behaviors.setup[Command] { ctx => val cluster = Cluster(ctx.system) val workersRouter = ctx.spawn( Routers.group(routerServiceKey) .withRoundRobinRouting(), "workersRouter" ) (0 until 3).foreach { n => val routee = ctx.spawn(WorkerRoutee(cluster.selfMember.address.toString), s"work-routee-$n") ctx.system.receptionist ! Receptionist.register(routerServiceKey, routee) } val singletonActor = ClusterSingleton(ctx.system).init(singletonService(ctx, workersRouter)) Behaviors.receiveMessage { case job@ProcessText(text) => singletonActor ! job Behaviors.same } } } object LoadBalance { def main(args: Array[String]): Unit = { if (args.isEmpty) { startup("compute", 25251) startup("compute", 25252) startup("compute", 25253) startup("front", 25254) } else { require(args.size == 2, "Usage: role port") startup(args(0), args(1).toInt) } } def startup(role: String, port: Int): Unit = { // Override the configuration of the port when specified as program argument val config = ConfigFactory .parseString(s""" akka.remote.artery.canonical.port=$port akka.cluster.roles = [$role] """) .withFallback(ConfigFactory.load("cluster-persistence")) val frontEnd = ActorSystem[Service.Command](Service(), "ClusterSystem", config) if (role == "front") { println("*************** sending ProcessText command ************") frontEnd ! Service.ProcessText("this is the text that will be analyzed") } } }
cluster-persistence.conf
akka.actor.allow-java-serialization = on akka { loglevel = INFO actor { provider = cluster serialization-bindings { "com.learn.akka.CborSerializable" = jackson-cbor } } remote { artery { canonical.hostname = "127.0.0.1" canonical.port = 0 } } cluster { seed-nodes = [ "akka://[email protected]:25251", "akka://[email protected]:25252"] } # use Cassandra to store both snapshots and the events of the persistent actors persistence { journal.plugin = "akka.persistence.cassandra.journal" snapshot-store.plugin = "akka.persistence.cassandra.snapshot" } } akka.persistence.cassandra { # don't use autocreate in production journal.keyspace = "poc" journal.keyspace-autocreate = on journal.tables-autocreate = on snapshot.keyspace = "poc_snapshot" snapshot.keyspace-autocreate = on snapshot.tables-autocreate = on } datastax-java-driver { basic.contact-points = ["192.168.11.189:9042"] basic.load-balancing-policy.local-datacenter = "datacenter1" }
build.sbt
name := "learn-akka-typed" version := "0.1" scalaVersion := "2.13.1" scalacOptions in Compile ++= Seq("-deprecation", "-feature", "-unchecked", "-Xlog-reflective-calls", "-Xlint") javacOptions in Compile ++= Seq("-Xlint:unchecked", "-Xlint:deprecation") val AkkaVersion = "2.6.5" val AkkaPersistenceCassandraVersion = "1.0.0" libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-cluster-sharding-typed" % AkkaVersion, "com.typesafe.akka" %% "akka-persistence-typed" % AkkaVersion, "com.typesafe.akka" %% "akka-persistence-query" % AkkaVersion, "com.typesafe.akka" %% "akka-serialization-jackson" % AkkaVersion, "com.typesafe.akka" %% "akka-persistence-cassandra" % AkkaPersistenceCassandraVersion, "com.typesafe.akka" %% "akka-slf4j" % AkkaVersion, "ch.qos.logback" % "logback-classic" % "1.2.3" )