Greg Burd 2019-11-20 12:35:51 -05:00
commit 25e01687d0
23 changed files with 1156 additions and 0 deletions

121
COPYING Normal file
View file

@ -0,0 +1,121 @@
Creative Commons Legal Code
CC0 1.0 Universal
CREATIVE COMMONS CORPORATION IS NOT A LAW FIRM AND DOES NOT PROVIDE
LEGAL SERVICES. DISTRIBUTION OF THIS DOCUMENT DOES NOT CREATE AN
ATTORNEY-CLIENT RELATIONSHIP. CREATIVE COMMONS PROVIDES THIS
INFORMATION ON AN "AS-IS" BASIS. CREATIVE COMMONS MAKES NO WARRANTIES
REGARDING THE USE OF THIS DOCUMENT OR THE INFORMATION OR WORKS
PROVIDED HEREUNDER, AND DISCLAIMS LIABILITY FOR DAMAGES RESULTING FROM
THE USE OF THIS DOCUMENT OR THE INFORMATION OR WORKS PROVIDED
HEREUNDER.
Statement of Purpose
The laws of most jurisdictions throughout the world automatically confer
exclusive Copyright and Related Rights (defined below) upon the creator
and subsequent owner(s) (each and all, an "owner") of an original work of
authorship and/or a database (each, a "Work").
Certain owners wish to permanently relinquish those rights to a Work for
the purpose of contributing to a commons of creative, cultural and
scientific works ("Commons") that the public can reliably and without fear
of later claims of infringement build upon, modify, incorporate in other
works, reuse and redistribute as freely as possible in any form whatsoever
and for any purposes, including without limitation commercial purposes.
These owners may contribute to the Commons to promote the ideal of a free
culture and the further production of creative, cultural and scientific
works, or to gain reputation or greater distribution for their Work in
part through the use and efforts of others.
For these and/or other purposes and motivations, and without any
expectation of additional consideration or compensation, the person
associating CC0 with a Work (the "Affirmer"), to the extent that he or she
is an owner of Copyright and Related Rights in the Work, voluntarily
elects to apply CC0 to the Work and publicly distribute the Work under its
terms, with knowledge of his or her Copyright and Related Rights in the
Work and the meaning and intended legal effect of CC0 on those rights.
1. Copyright and Related Rights. A Work made available under CC0 may be
protected by copyright and related or neighboring rights ("Copyright and
Related Rights"). Copyright and Related Rights include, but are not
limited to, the following:
i. the right to reproduce, adapt, distribute, perform, display,
communicate, and translate a Work;
ii. moral rights retained by the original author(s) and/or performer(s);
iii. publicity and privacy rights pertaining to a person's image or
likeness depicted in a Work;
iv. rights protecting against unfair competition in regards to a Work,
subject to the limitations in paragraph 4(a), below;
v. rights protecting the extraction, dissemination, use and reuse of data
in a Work;
vi. database rights (such as those arising under Directive 96/9/EC of the
European Parliament and of the Council of 11 March 1996 on the legal
protection of databases, and under any national implementation
thereof, including any amended or successor version of such
directive); and
vii. other similar, equivalent or corresponding rights throughout the
world based on applicable law or treaty, and any national
implementations thereof.
2. Waiver. To the greatest extent permitted by, but not in contravention
of, applicable law, Affirmer hereby overtly, fully, permanently,
irrevocably and unconditionally waives, abandons, and surrenders all of
Affirmer's Copyright and Related Rights and associated claims and causes
of action, whether now known or unknown (including existing as well as
future claims and causes of action), in the Work (i) in all territories
worldwide, (ii) for the maximum duration provided by applicable law or
treaty (including future time extensions), (iii) in any current or future
medium and for any number of copies, and (iv) for any purpose whatsoever,
including without limitation commercial, advertising or promotional
purposes (the "Waiver"). Affirmer makes the Waiver for the benefit of each
member of the public at large and to the detriment of Affirmer's heirs and
successors, fully intending that such Waiver shall not be subject to
revocation, rescission, cancellation, termination, or any other legal or
equitable action to disrupt the quiet enjoyment of the Work by the public
as contemplated by Affirmer's express Statement of Purpose.
3. Public License Fallback. Should any part of the Waiver for any reason
be judged legally invalid or ineffective under applicable law, then the
Waiver shall be preserved to the maximum extent permitted taking into
account Affirmer's express Statement of Purpose. In addition, to the
extent the Waiver is so judged Affirmer hereby grants to each affected
person a royalty-free, non transferable, non sublicensable, non exclusive,
irrevocable and unconditional license to exercise Affirmer's Copyright and
Related Rights in the Work (i) in all territories worldwide, (ii) for the
maximum duration provided by applicable law or treaty (including future
time extensions), (iii) in any current or future medium and for any number
of copies, and (iv) for any purpose whatsoever, including without
limitation commercial, advertising or promotional purposes (the
"License"). The License shall be deemed effective as of the date CC0 was
applied by Affirmer to the Work. Should any part of the License for any
reason be judged legally invalid or ineffective under applicable law, such
partial invalidity or ineffectiveness shall not invalidate the remainder
of the License, and in such case Affirmer hereby affirms that he or she
will not (i) exercise any of his or her remaining Copyright and Related
Rights in the Work or (ii) assert any associated claims and causes of
action with respect to the Work, in either case contrary to Affirmer's
express Statement of Purpose.
4. Limitations and Disclaimers.
a. No trademark or patent rights held by Affirmer are waived, abandoned,
surrendered, licensed or otherwise affected by this document.
b. Affirmer offers the Work as-is and makes no representations or
warranties of any kind concerning the Work, express, implied,
statutory or otherwise, including without limitation warranties of
title, merchantability, fitness for a particular purpose, non
infringement, or the absence of latent or other defects, accuracy, or
the present or absence of errors, whether or not discoverable, all to
the greatest extent permissible under applicable law.
c. Affirmer disclaims responsibility for clearing rights of other persons
that may apply to the Work or any use thereof, including without
limitation any person's Copyright and Related Rights in the Work.
Further, Affirmer disclaims responsibility for obtaining any necessary
consents, permissions or other rights required for any use of the
Work.
d. Affirmer understands and acknowledges that Creative Commons is not a
party to this document and has no duty or obligation with respect to
this CC0 or use of the Work.

10
LICENSE Normal file
View file

@ -0,0 +1,10 @@
Akka sample by Lightbend
Licensed under Public Domain (CC0)
To the extent possible under law, the person who associated CC0 with
this Template has waived all copyright and related or neighboring
rights to this Template.
You should have received a copy of the CC0 legalcode along with this
work. If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.

159
README.md Normal file
View file

@ -0,0 +1,159 @@
This tutorial contains 3 samples illustrating different [Akka cluster](https://doc.akka.io/docs/akka/2.6/typed/cluster.html) features.
- Subscribe to cluster membership events
- Sending messages to actors running on nodes in the cluster
- Cluster aware routers
## A Simple Cluster Example
Open [application.conf](src/main/resources/application.conf)
To enable cluster capabilities in your Akka project you should, at a minimum, add the remote settings, and use `cluster` as the `akka.actor.provider`. The `akka.cluster.seed-nodes` should normally also be added to your `application.conf` file.
The seed nodes are configured contact points which newly started nodes will try to connect with in order to join the cluster.
Note that if you are going to start the nodes on different machines you need to specify the ip-addresses or host names of the machines in `application.conf` instead of `127.0.0.1`.
Open [SimpleClusterApp.scala](src/main/scala/sample/cluster/simple/App.scala).
The small program together with its configuration starts an ActorSystem with the Cluster enabled. It joins the cluster and starts an actor that logs some membership events. Take a look at the [SimpleClusterListener.scala](src/main/scala/sample/cluster/simple/ClusterListener.scala) actor.
You can read more about the cluster concepts in the [documentation](https://doc.akka.io/docs/akka/2.6/typed/cluster.html).
To run this sample, type `sbt "runMain sample.cluster.simple.App"`.
`sample.cluster.simple.App` starts three actor systems (cluster members) in the same JVM process. It can be more interesting to run them in separate processes. Stop the application and then open three terminal windows.
In the first terminal window, start the first seed node with the following command:
sbt "runMain sample.cluster.simple.App 25251"
25251 corresponds to the port of the first seed-nodes element in the configuration. In the log output you see that the cluster node has been started and changed status to 'Up'.
In the second terminal window, start the second seed node with the following command:
sbt "runMain sample.cluster.simple.App 25252"
25252 corresponds to the port of the second seed-nodes element in the configuration. In the log output you see that the cluster node has been started and joins the other seed node and becomes a member of the cluster. Its status changed to 'Up'.
Switch over to the first terminal window and see in the log output that the member joined.
Start another node in the third terminal window with the following command:
sbt "runMain sample.cluster.simple.App 0"
Now you don't need to specify the port number, 0 means that it will use a random available port. It joins one of the configured seed nodes. Look at the log output in the different terminal windows.
Start even more nodes in the same way, if you like.
Shut down one of the nodes by pressing 'ctrl-c' in one of the terminal windows. It will cause the node to do a graceful leave from the cluster, telling the other nodes in the cluster that it is leaving. It will then be removed from the cluster, which you can see in the log output in the other terminals.
Look at the source code of the actor again. It registers itself as subscriber of certain cluster events. It gets notified a stream of events leading up to the current state. After that it receives events for changes that happen in the cluster.
Now we have seen how to subscribe to cluster membership events. You can read more about it in the [documentation](https://doc.akka.io/docs/akka/2.6/typed/cluster.html#cluster-subscriptions). The membership events show us the state of the cluster but it does not help with accessing actors on other nodes the cluster. To do that we need to use the [Receptionist](https://doc.akka.io/docs/akka/2.6/typed/actor-discovery.html#receptionist).
## Worker registration example
The `Receptionist` is a service registry that will work both when in single JVM apps not using cluster, and in clustered apps.
`ActorRef`s are registered to the receptionist using a `ServiceKey`. The service key is defined with a type of message that actors registered for it will accept and a string identifier.
Let's take a look at an example that illustrates how workers, here only on nodes with the role *backend*, register themselves to the receptionist so that *frontend* nodes will know what workers are available to perform their work. Note that a node could potentially have both roles, since the node roles are a set. The `main` provided only allows one role though.
The example application provides a service to transform text. At a periodic interval the frontend simulates an external request to process a text which it forwards to available workers if there are any.
Since the discovery of workers is dynamic both *backend* and *frontend* nodes can be added to the cluster dynamically.
The backend worker that performs the transformation job is defined in [TransformationBackend.scala](src/main/scala/sample/cluster/transformation/Worker.scala). When starting up a worker registers itself to the receptionist so that it can be discovered through its `ServiceKey` on any node in the cluster.
The frontend that simulates user jobs as well as keeping track of available workers is defined in [Frontend.scala](src/main/scala/sample/cluster/transformation/Frontend.scala). The actor subscribes to the `Receptionist` with the `WorkerServiceKey` to receive updates when the set of available workers in the cluster changes. If a worker dies or its node is removed from the cluster the receptionist will send out an updated listing so the frontend does not need to `watch` the workers.
To run this sample, make sure you have shut down any previously started cluster sample, then type `sbt "runMain sample.cluster.transformation.App"`.
TransformationApp starts 5 actor systems (cluster members) in the same JVM process. It can be more interesting to run them in separate processes. Stop the application and run the following commands in separate terminal windows.
sbt "runMain sample.cluster.transformation.App backend 25251"
sbt "runMain sample.cluster.transformation.App backend 25252"
sbt "runMain sample.cluster.transformation.App backend 0"
sbt "runMain sample.cluster.transformation.App frontend 0"
sbt "runMain sample.cluster.transformation.App frontend 0"
There is a component built into Akka that performs the task of subscribing to the receptionist and keeping track of available actors significantly simplifying such interactions: the group router. Let's look into how we can use those in the next section!
## Cluster Aware Routers
The [group routers](https://doc.akka.io/docs/akka/2.6/typed/routers.html#group-router) relies on the `Receptionist` and will therefore route messages to services registered in any node of the cluster.
Let's take a look at a few samples that make use of cluster aware routers.
## Cluster routing example
Let's take a look at two different ways to distribute work across a cluster using routers.
Note that the samples just shows off various parts of Akka Cluster and does not provide a complete structure to build a resilient distributed application with. The [Distributed Workers With Akka](https://developer.lightbend.com/guides/akka-distributed-workers-scala/) sample covers more of the problems you would have to solve to build a resilient distributed processing application.
### Example with Group of routees
The example application provides a service to calculate statistics for a text. When some text is sent to the service it splits it into words, and delegates the task to count number of characters in each word to a separate worker, a routee of a router. The character count for each word is sent back to an aggregator that calculates the average number of characters per word when all results have been collected.
The worker that counts number of characters in each word is defined in [StatsWorker.scala](src/main/scala/sample/cluster/stats/StatsWorker.scala).
The service that receives text from users and splits it up into words, delegates to a pool of workers and aggregates the result is defined in [StatsService.scala](src/main/scala/sample/cluster/stats/StatsService.scala).
Note, nothing cluster specific so far, just plain actors.
Nodes in the cluster can be marked with roles, to perform different tasks, in our case we use `compute` as a role to
designate cluster nodes that should do processing of word statistics.
In [StatsSample.scala](src/main/scala/sample/cluster/stats/App.scala) each `compute` node starts a `StatsService`
that distributes work over N local `StatsWorkers`. The client nodes then message the `StatsService` instances through a `group` router.
The router finds services by subscribing to the cluster receptionist and a service key. Each worker is registered to the receptionist
when started.
With this design a single `compute` node crashing will only lose the ongoing work in that node and have the other nodes
keep on with their work, but there is no single place to ask for a list of the current work in progress.
To run the sample, type `sbt "runMain sample.cluster.stats.App"` if it is not already started.
StatsSample starts 4 actor systems (cluster members) in the same JVM process. It can be more interesting to run them in separate processes. Stop the application and run the following commands in separate terminal windows.
sbt "runMain sample.cluster.stats.App compute 25251"
sbt "runMain sample.cluster.stats.App compute 25252"
sbt "runMain sample.cluster.stats.App compute 0"
sbt "runMain sample.cluster.stats.App client 0"
### Router example with Cluster Singleton
[StatsSampleOneMaster.scala](src/main/scala/sample/cluster/stats/AppOneMaster.scala) each `compute` node starts
N workers, that register themselves with the receptionist. The `StatsService` is run in a single instance in the cluster
through the Akka Cluster Singleton. The actual work is performed by workers on all compute nodes though. The workers
are reached through a group router used by the singleton.
With this design it would be possible to query the singleton for current work - it knows all current requests in flight
and could potentially make decisions based on knowing exactly what work is currently in progress.
If the singleton node crashes however, all ongoing work is lost though since the state of the singleton is not persistent, when it is started on a new node the `StatsService` will not know of any previous work. It also means that since all work has to go through the singleton it could be come a bottleneck. If one of the other nodes crash only the ongoing work sent to them is lost, however since each ongoing request could be handled by multiple different workers on different nodes a crash could cause problems to many requests.
To run this sample, type `sbt "runMain sample.cluster.stats.AppOneMaster"` if it is not already started.
StatsSampleOneMaster starts 4 actor systems (cluster members) in the same JVM process. It can be more interesting to run them in separate processes. Stop the application and run the following commands in separate terminal windows.
sbt "runMain sample.cluster.stats.AppOneMaster compute 25251"
sbt "runMain sample.cluster.stats.AppOneMaster compute 25252"
sbt "runMain sample.cluster.stats.AppOneMaster compute 0"
sbt "runMain sample.cluster.stats.AppOneMaster client 0"
## Tests
Tests can be found in [src/multi-jvm](src/multi-jvm). You can run them by typing `sbt multi-jvm:test`.

29
build.sbt Normal file
View file

@ -0,0 +1,29 @@
import com.typesafe.sbt.SbtMultiJvm.multiJvmSettings
import com.typesafe.sbt.SbtMultiJvm.MultiJvmKeys.MultiJvm
val akkaVersion = "2.6.0"
lazy val `kaka` = project
.in(file("."))
.settings(multiJvmSettings: _*)
.settings(
organization := "com.typesafe.akka.samples",
scalaVersion := "2.13.1",
Compile / scalacOptions ++= Seq("-deprecation", "-feature", "-unchecked", "-Xlog-reflective-calls", "-Xlint"),
Compile / javacOptions ++= Seq("-Xlint:unchecked", "-Xlint:deprecation"),
run / javaOptions ++= Seq("-Xms128m", "-Xmx1024m", "-Djava.library.path=./target/native"),
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor-typed" % akkaVersion,
"com.typesafe.akka" %% "akka-cluster-typed" % akkaVersion,
"com.typesafe.akka" %% "akka-serialization-jackson" % akkaVersion,
"com.typesafe.akka" %% "akka-multi-node-testkit" % akkaVersion,
"ch.qos.logback" % "logback-classic" % "1.2.3",
"org.scalatest" %% "scalatest" % "3.0.8" % Test,
"com.typesafe.akka" %% "akka-actor-testkit-typed" % akkaVersion % Test),
run / fork := false,
Global / cancelable := false,
// disable parallel tests
Test / parallelExecution := false,
licenses := Seq(("CC0", url("http://creativecommons.org/publicdomain/zero/1.0")))
)
.configs (MultiJvm)

1
project/build.properties Normal file
View file

@ -0,0 +1 @@
sbt.version=1.3.3

2
project/plugins.sbt Normal file
View file

@ -0,0 +1,2 @@
addSbtPlugin("com.typesafe.sbt" % "sbt-multi-jvm" % "0.4.0")
addSbtPlugin("com.dwijnand" % "sbt-dynver" % "3.0.0")

View file

@ -0,0 +1,21 @@
akka {
actor {
provider = cluster
serialization-bindings {
"sample.cluster.CborSerializable" = jackson-cbor
}
}
remote {
artery {
canonical.hostname = "127.0.0.1"
canonical.port = 0
}
}
cluster {
seed-nodes = [
"akka://ClusterSystem@127.0.0.1:25251",
"akka://ClusterSystem@127.0.0.1:25252"]
}
}

View file

@ -0,0 +1,20 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<!-- This is a development logging configuration that logs to standard out, for an example of a production
logging config, see the Akka docs: https://doc.akka.io/docs/akka/2.6/typed/logging.html#logback -->
<appender name="STDOUT" target="System.out" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>[%date{ISO8601}] [%level] [%logger] [%thread] [%X{akkaSource}] - %msg%n</pattern>
</encoder>
</appender>
<appender name="ASYNC" class="ch.qos.logback.classic.AsyncAppender">
<queueSize>1024</queueSize>
<neverBlock>true</neverBlock>
<appender-ref ref="STDOUT" />
</appender>
<root level="INFO">
<appender-ref ref="ASYNC"/>
</root>
</configuration>

View file

@ -0,0 +1,5 @@
include "application"
stats-service {
workers-per-node = 4
}

View file

@ -0,0 +1,5 @@
include "application"
transformation {
workers-per-node = 4
}

View file

@ -0,0 +1,8 @@
package sample.cluster
/**
* Marker trait to tell Akka to serialize messages into CBOR using Jackson for sending over the network
* See application.conf where it is bound to a serializer.
* For more details see the docs https://doc.akka.io/docs/akka/2.6/serialization-jackson.html
*/
trait CborSerializable

View file

@ -0,0 +1,38 @@
package sample.cluster.simple
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior
import com.typesafe.config.ConfigFactory
object App {
object RootBehavior {
def apply(): Behavior[Nothing] = Behaviors.setup[Nothing] { context =>
// Create an actor that handles cluster domain events
context.spawn(ClusterListener(), "ClusterListener")
Behaviors.empty
}
}
def main(args: Array[String]): Unit = {
val ports =
if (args.isEmpty)
Seq(25251, 25252, 0)
else
args.toSeq.map(_.toInt)
ports.foreach(startup)
}
def startup(port: Int): Unit = {
// Override the configuration of the port
val config = ConfigFactory.parseString(s"""
akka.remote.artery.canonical.port=$port
""").withFallback(ConfigFactory.load())
// Create an Akka system
val system = ActorSystem[Nothing](RootBehavior(), "ClusterSystem", config)
}
}

View file

@ -0,0 +1,52 @@
package sample.cluster.simple
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.Behaviors
import akka.cluster.ClusterEvent.MemberEvent
import akka.cluster.ClusterEvent.MemberRemoved
import akka.cluster.ClusterEvent.MemberUp
import akka.cluster.ClusterEvent.ReachabilityEvent
import akka.cluster.ClusterEvent.ReachableMember
import akka.cluster.ClusterEvent.UnreachableMember
import akka.cluster.typed.Cluster
import akka.cluster.typed.Subscribe
object ClusterListener {
sealed trait Event
// internal adapted cluster events only
private final case class ReachabilityChange(reachabilityEvent: ReachabilityEvent) extends Event
private final case class MemberChange(event: MemberEvent) extends Event
def apply(): Behavior[Event] = Behaviors.setup { ctx =>
val memberEventAdapter: ActorRef[MemberEvent] = ctx.messageAdapter(MemberChange)
Cluster(ctx.system).subscriptions ! Subscribe(memberEventAdapter, classOf[MemberEvent])
val reachabilityAdapter = ctx.messageAdapter(ReachabilityChange)
Cluster(ctx.system).subscriptions ! Subscribe(reachabilityAdapter, classOf[ReachabilityEvent])
Behaviors.receiveMessage { message =>
message match {
case ReachabilityChange(reachabilityEvent) =>
reachabilityEvent match {
case UnreachableMember(member) =>
ctx.log.info("Member detected as unreachable: {}", member)
case ReachableMember(member) =>
ctx.log.info("Member back to reachable: {}", member)
}
case MemberChange(changeEvent) =>
changeEvent match {
case MemberUp(member) =>
ctx.log.info("Member is Up: {}", member.address)
case MemberRemoved(member, previousStatus) =>
ctx.log.info("Member is Removed: {} after {}",
member.address, previousStatus)
case _: MemberEvent => // ignore
}
}
Behaviors.same
}
}
}

View file

@ -0,0 +1,61 @@
package sample.cluster.stats
import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior
import akka.actor.typed.receptionist.Receptionist
import akka.actor.typed.receptionist.ServiceKey
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.Routers
import akka.cluster.typed.Cluster
import com.typesafe.config.ConfigFactory
object App {
val StatsServiceKey = ServiceKey[StatsService.ProcessText]("StatsService")
private object RootBehavior {
def apply(): Behavior[Nothing] = Behaviors.setup[Nothing] { ctx =>
val cluster = Cluster(ctx.system)
if (cluster.selfMember.hasRole("compute")) {
// on every compute node there is one service instance that delegates to N local workers
val numberOfWorkers = ctx.system.settings.config.getInt("stats-service.workers-per-node")
val workers = ctx.spawn(Routers.pool(numberOfWorkers)(StatsWorker()), "WorkerRouter")
val service = ctx.spawn(StatsService(workers),"StatsService")
// published through the receptionist to the other nodes in the cluster
ctx.system.receptionist ! Receptionist.Register(StatsServiceKey, service)
}
if (cluster.selfMember.hasRole(("client"))) {
val serviceRouter = ctx.spawn(Routers.group(App.StatsServiceKey), "ServiceRouter")
ctx.spawn(StatsClient(serviceRouter), "Client")
}
Behaviors.empty[Nothing]
}
}
def main(args: Array[String]): Unit = {
if (args.isEmpty) {
startup("compute", 25251)
startup("compute", 25252)
startup("compute", 0)
startup("client", 0)
} else {
require(args.size == 2, "Usage: role port")
startup(args(0), args(1).toInt)
}
}
private def startup(role: String, port: Int): Unit = {
// Override the configuration of the port when specified as program argument
val config = ConfigFactory.parseString(s"""
akka.remote.artery.canonical.port=$port
akka.cluster.roles = [$role]
""")
.withFallback(ConfigFactory.load("stats"))
val system = ActorSystem[Nothing](RootBehavior(), "ClusterSystem", config)
}
}

View file

@ -0,0 +1,77 @@
package sample.cluster.stats
import akka.actor.typed.receptionist.Receptionist
import akka.actor.typed.receptionist.ServiceKey
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.Routers
import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior
import akka.cluster.typed.Cluster
import akka.cluster.typed.ClusterSingleton
import akka.cluster.typed.ClusterSingletonSettings
import akka.cluster.typed.SingletonActor
import com.typesafe.config.ConfigFactory
object AppOneMaster {
val WorkerServiceKey = ServiceKey[StatsWorker.Process]("Worker")
object RootBehavior {
def apply(): Behavior[Nothing] = Behaviors.setup[Nothing] { ctx =>
val cluster = Cluster(ctx.system)
val singletonSettings = ClusterSingletonSettings(ctx.system)
.withRole("compute")
val serviceSingleton = SingletonActor(
Behaviors.setup[StatsService.Command] { ctx =>
// the service singleton accesses available workers through a group router
val workersRouter = ctx.spawn(Routers.group(WorkerServiceKey), "WorkersRouter")
StatsService(workersRouter)
},
"StatsService"
).withStopMessage(StatsService.Stop)
.withSettings(singletonSettings)
val serviceProxy = ClusterSingleton(ctx.system).init(serviceSingleton)
if (cluster.selfMember.hasRole("compute")) {
// on every compute node N local workers, which a cluster singleton stats service delegates work to
val numberOfWorkers = ctx.system.settings.config.getInt("stats-service.workers-per-node")
ctx.log.info("Starting {} workers", numberOfWorkers)
(0 to numberOfWorkers).foreach { n =>
val worker = ctx.spawn(StatsWorker(), s"StatsWorker$n")
ctx.system.receptionist ! Receptionist.Register(WorkerServiceKey, worker)
}
}
if (cluster.selfMember.hasRole("client")) {
ctx.spawn(StatsClient(serviceProxy), "Client")
}
Behaviors.empty
}
}
def main(args: Array[String]): Unit = {
if (args.isEmpty) {
startup("compute", 25251)
startup("compute", 25252)
startup("compute", 0)
startup("client", 0)
} else {
require(args.size == 2, "Usage: role port")
startup(args(0), args(1).toInt)
}
}
def startup(role: String, port: Int): Unit = {
// Override the configuration of the port when specified as program argument
val config = ConfigFactory.parseString(s"""
akka.remote.artery.canonical.port=$port
akka.cluster.roles = [compute]
""")
.withFallback(ConfigFactory.load("stats"))
val system = ActorSystem[Nothing](RootBehavior(), "ClusterSystem", config)
}
}

View file

@ -0,0 +1,37 @@
/*
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package sample.cluster.stats
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.Behaviors
import scala.concurrent.duration._
object StatsClient {
sealed trait Event
private case object Tick extends Event
private case class ServiceResponse(result: StatsService.Response) extends Event
def apply(service: ActorRef[StatsService.ProcessText]): Behavior[Event] =
Behaviors.setup { ctx =>
Behaviors.withTimers { timers =>
timers.startTimerWithFixedDelay(Tick, Tick, 2.seconds)
val responseAdapter = ctx.messageAdapter(ServiceResponse)
Behaviors.receiveMessage {
case Tick =>
ctx.log.info("Sending process request")
service ! StatsService.ProcessText("this is the text that will be analyzed", responseAdapter)
Behaviors.same
case ServiceResponse(result) =>
ctx.log.info("Service result: {}", result)
Behaviors.same
}
}
}
}

View file

@ -0,0 +1,77 @@
package sample.cluster.stats
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.Behaviors
import sample.cluster.CborSerializable
import scala.concurrent.duration._
//#service
object StatsService {
sealed trait Command extends CborSerializable
final case class ProcessText(text: String, replyTo: ActorRef[Response]) extends Command {
require(text.nonEmpty)
}
case object Stop extends Command
sealed trait Response extends CborSerializable
final case class JobResult(meanWordLength: Double) extends Response
final case class JobFailed(reason: String) extends Response
def apply(workers: ActorRef[StatsWorker.Process]): Behavior[Command] =
Behaviors.setup { ctx =>
// if all workers would crash/stop we want to stop as well
ctx.watch(workers)
Behaviors.receiveMessage {
case ProcessText(text, replyTo) =>
ctx.log.info("Delegating request")
val words = text.split(' ').toIndexedSeq
// create per request actor that collects replies from workers
ctx.spawnAnonymous(StatsAggregator(words, workers, replyTo))
Behaviors.same
case Stop =>
Behaviors.stopped
}
}
}
object StatsAggregator {
sealed trait Event
private case object Timeout extends Event
private case class CalculationComplete(length: Int) extends Event
def apply(words: Seq[String], workers: ActorRef[StatsWorker.Process], replyTo: ActorRef[StatsService.Response]): Behavior[Event] =
Behaviors.setup { ctx =>
ctx.setReceiveTimeout(3.seconds, Timeout)
val responseAdapter = ctx.messageAdapter[StatsWorker.Processed](processed =>
CalculationComplete(processed.length)
)
words.foreach { word =>
workers ! StatsWorker.Process(word, responseAdapter)
}
waiting(replyTo, words.size, Nil)
}
private def waiting(replyTo: ActorRef[StatsService.Response], expectedResponses: Int, results: List[Int]): Behavior[Event] =
Behaviors.receiveMessage {
case CalculationComplete(length) =>
val newResults = results :+ length
if (newResults.size == expectedResponses) {
val meanWordLength = newResults.sum.toDouble / newResults.size
replyTo ! StatsService.JobResult(meanWordLength)
Behaviors.stopped
} else {
waiting(replyTo, expectedResponses, newResults)
}
case Timeout =>
replyTo ! StatsService.JobFailed("Service unavailable, try again later")
Behaviors.stopped
}
}
//#service

View file

@ -0,0 +1,46 @@
package sample.cluster.stats
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.ActorContext
import akka.actor.typed.scaladsl.Behaviors
import sample.cluster.CborSerializable
import scala.concurrent.duration._
//#worker
object StatsWorker {
trait Command
final case class Process(word: String, replyTo: ActorRef[Processed]) extends Command with CborSerializable
private case object EvictCache extends Command
final case class Processed(word: String, length: Int) extends CborSerializable
def apply(): Behavior[Command] = Behaviors.setup { ctx =>
Behaviors.withTimers { timers =>
ctx.log.info("Worker starting up")
timers.startTimerWithFixedDelay(EvictCache, EvictCache, 30.seconds)
withCache(ctx, Map.empty)
}
}
private def withCache(ctx: ActorContext[Command], cache: Map[String, Int]): Behavior[Command] = Behaviors.receiveMessage {
case Process(word, replyTo) =>
ctx.log.info("Worker processing request")
cache.get(word) match {
case Some(length) =>
replyTo ! Processed(word, length)
Behaviors.same
case None =>
val length = word.length
val updatedCache = cache + (word -> length)
replyTo ! Processed(word, length)
withCache(ctx, updatedCache)
}
case EvictCache =>
withCache(ctx, Map.empty)
}
}
//#worker

View file

@ -0,0 +1,53 @@
package sample.cluster.transformation
import akka.actor.typed.{ActorSystem, Behavior}
import akka.actor.typed.scaladsl.Behaviors
import akka.cluster.typed.Cluster
import com.typesafe.config.ConfigFactory
object App {
object RootBehavior {
def apply(): Behavior[Nothing] = Behaviors.setup[Nothing] { ctx =>
val cluster = Cluster(ctx.system)
if (cluster.selfMember.hasRole("backend")) {
val workersPerNode = ctx.system.settings.config.getInt("transformation.workers-per-node")
(1 to workersPerNode).foreach { n =>
ctx.spawn(Worker(), s"Worker$n")
}
}
if (cluster.selfMember.hasRole("frontend")) {
ctx.spawn(Frontend(), "Frontend")
}
Behaviors.empty
}
}
def main(args: Array[String]): Unit = {
// starting 2 frontend nodes and 3 backend nodes
if (args.isEmpty) {
startup("backend", 25251)
startup("backend", 25252)
startup("frontend", 0)
startup("frontend", 0)
startup("frontend", 0)
} else {
require(args.length == 2, "Usage: role port")
startup(args(0), args(1).toInt)
}
}
def startup(role: String, port: Int): Unit = {
// Override the configuration of the port and role
val config = ConfigFactory.parseString(s"""
akka.remote.artery.canonical.port=$port
akka.cluster.roles = [frontend]
""")
.withFallback(ConfigFactory.load("transformation"))
val system = ActorSystem[Nothing](RootBehavior(), "ClusterSystem", config)
}
}

View file

@ -0,0 +1,70 @@
package sample.cluster.transformation
import scala.concurrent.duration._
import akka.util.Timeout
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.actor.typed.receptionist.Receptionist
import akka.actor.typed.scaladsl.ActorContext
import akka.actor.typed.scaladsl.Behaviors
import scala.util.Failure
import scala.util.Success
//#frontend
object Frontend {
sealed trait Event
private case object Tick extends Event
private final case class WorkersUpdated(newWorkers: Set[ActorRef[Worker.TransformText]]) extends Event
private final case class TransformCompleted(originalText: String, transformedText: String) extends Event
private final case class JobFailed(why: String, text: String) extends Event
def apply(): Behavior[Event] = Behaviors.setup { ctx =>
Behaviors.withTimers { timers =>
// subscribe to available workers
val subscriptionAdapter = ctx.messageAdapter[Receptionist.Listing] {
case Worker.WorkerServiceKey.Listing(workers) =>
WorkersUpdated(workers)
}
ctx.system.receptionist ! Receptionist.Subscribe(Worker.WorkerServiceKey, subscriptionAdapter)
timers.startTimerWithFixedDelay(Tick, Tick, 2.seconds)
running(ctx, IndexedSeq.empty, jobCounter = 0)
}
}
private def running(ctx: ActorContext[Event], workers: IndexedSeq[ActorRef[Worker.TransformText]], jobCounter: Int): Behavior[Event] =
Behaviors.receiveMessage {
case WorkersUpdated(newWorkers) =>
ctx.log.info("List of services registered with the receptionist changed: {}", newWorkers)
running(ctx, newWorkers.toIndexedSeq, jobCounter)
case Tick =>
if (workers.isEmpty) {
ctx.log.warn("Got tick request but no workers available, not sending any work")
Behaviors.same
} else {
// how much time can pass before we consider a request failed
implicit val timeout: Timeout = 5.seconds
val selectedWorker = workers(jobCounter % workers.size)
ctx.log.info("Sending work for processing to {}", selectedWorker)
val text = s"hello-$jobCounter"
ctx.ask(selectedWorker, Worker.TransformText(text, _)) {
case Success(transformedText) => TransformCompleted(transformedText.text, text)
case Failure(ex) => JobFailed("Processing timed out", text)
}
running(ctx, workers, jobCounter + 1)
}
case TransformCompleted(originalText, transformedText) =>
ctx.log.info("Got completed transform of {}: {}", originalText, transformedText)
Behaviors.same
case JobFailed(why, text) =>
ctx.log.warn("Transformation of text {} failed. Because: {}", text, why)
Behaviors.same
}
}
//#frontend

View file

@ -0,0 +1,32 @@
package sample.cluster.transformation
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.actor.typed.receptionist.Receptionist
import akka.actor.typed.receptionist.ServiceKey
import akka.actor.typed.scaladsl.Behaviors
import sample.cluster.CborSerializable
//#worker
object Worker {
val WorkerServiceKey = ServiceKey[Worker.TransformText]("Worker")
sealed trait Command
final case class TransformText(text: String, replyTo: ActorRef[TextTransformed]) extends Command with CborSerializable
final case class TextTransformed(text: String) extends CborSerializable
def apply(): Behavior[Command] =
Behaviors.setup { ctx =>
// each worker registers themselves with the receptionist
ctx.log.info("Registering myself with receptionist")
ctx.system.receptionist ! Receptionist.Register(WorkerServiceKey, ctx.self)
Behaviors.receiveMessage {
case TransformText(text, replyTo) =>
replyTo ! TextTransformed(text.toUpperCase)
Behaviors.same
}
}
}
//#worker

View file

@ -0,0 +1,114 @@
package sample.cluster.stats
import akka.actor.testkit.typed.scaladsl.TestProbe
import akka.actor.typed.scaladsl.AskPattern._
import akka.actor.typed.scaladsl.adapter._
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.Routers
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.actor.typed.Props
import akka.actor.typed.SpawnProtocol
import akka.cluster.Cluster
import akka.cluster.ClusterEvent.CurrentClusterState
import akka.cluster.ClusterEvent.MemberUp
import akka.cluster.typed.ClusterSingleton
import akka.cluster.typed.ClusterSingletonSettings
import akka.cluster.typed.SingletonActor
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.testkit.ImplicitSender
import akka.util.Timeout
import com.typesafe.config.ConfigFactory
import org.scalatest.BeforeAndAfterAll
import org.scalatest.Matchers
import org.scalatest.WordSpecLike
import scala.concurrent.duration._
import scala.concurrent.Await
import scala.concurrent.Future
object StatsSampleSingleMasterSpecConfig extends MultiNodeConfig {
// register the named roles (nodes) of the test
// note that this is not the same thing as cluster node roles
val first = role("first")
val second = role("second")
val third = role("third")
// this configuration will be used for all nodes
// note that no fixed host names and ports are used
commonConfig(ConfigFactory.parseString("""
akka.loglevel = INFO
akka.actor.provider = cluster
akka.cluster.roles = [compute]
""").withFallback(ConfigFactory.load()))
}
// need one concrete test class per node
class StatsSampleSingleMasterSpecMultiJvmNode1 extends StatsSampleSingleMasterSpec
class StatsSampleSingleMasterSpecMultiJvmNode2 extends StatsSampleSingleMasterSpec
class StatsSampleSingleMasterSpecMultiJvmNode3 extends StatsSampleSingleMasterSpec
abstract class StatsSampleSingleMasterSpec extends MultiNodeSpec(StatsSampleSingleMasterSpecConfig)
with WordSpecLike with Matchers with BeforeAndAfterAll with ImplicitSender {
import StatsSampleSingleMasterSpecConfig._
override def initialParticipants = roles.size
override def beforeAll() = multiNodeSpecBeforeAll()
override def afterAll() = multiNodeSpecAfterAll()
implicit val typedSystem = system.toTyped
var singletonProxy: ActorRef[StatsService.Command] = _
"The stats sample with single master" must {
"illustrate how to startup cluster" in within(15.seconds) {
Cluster(system).subscribe(testActor, classOf[MemberUp])
expectMsgClass(classOf[CurrentClusterState])
val firstAddress = node(first).address
val secondAddress = node(second).address
val thirdAddress = node(third).address
Cluster(system) join firstAddress
receiveN(3).collect { case MemberUp(m) => m.address }.toSet should be(
Set(firstAddress, secondAddress, thirdAddress))
Cluster(system).unsubscribe(testActor)
val singletonSettings = ClusterSingletonSettings(typedSystem).withRole("compute")
singletonProxy = ClusterSingleton(typedSystem).init(
SingletonActor(
Behaviors.setup[StatsService.Command] { ctx =>
// just run some local workers for this test
val workersRouter = ctx.spawn(Routers.pool(2)(StatsWorker()), "WorkersRouter")
StatsService(workersRouter)
},
"StatsService",
).withSettings(singletonSettings)
)
testConductor.enter("all-up")
}
"show usage of the statsServiceProxy" in within(20.seconds) {
// eventually the service should be ok,
// service and worker nodes might not be up yet
awaitAssert {
system.log.info("Trying a request")
val probe = TestProbe[StatsService.Response]()
singletonProxy ! StatsService.ProcessText("this is the text that will be analyzed", probe.ref)
val response = probe.expectMessageType[StatsService.JobResult](3.seconds)
response.meanWordLength should be(3.875 +- 0.001)
}
testConductor.enter("done")
}
}
}

View file

@ -0,0 +1,118 @@
package sample.cluster.stats
import akka.actor.testkit.typed.scaladsl.TestProbe
import akka.actor.typed.receptionist.Receptionist
import akka.actor.typed.scaladsl.AskPattern._
import akka.actor.typed.scaladsl.adapter._
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.actor.typed.Props
import akka.actor.typed.SpawnProtocol
import akka.cluster.Cluster
import akka.cluster.ClusterEvent.CurrentClusterState
import akka.cluster.ClusterEvent.MemberUp
import akka.remote.testkit.MultiNodeConfig
import akka.util.Timeout
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
import scala.concurrent.Await
import scala.concurrent.Future
object StatsSampleSpecConfig extends MultiNodeConfig {
// register the named roles (nodes) of the test
// note that this is not the same thing as cluster node roles
val first = role("first")
val second = role("second")
val third = role("thrid")
// this configuration will be used for all nodes
// note that no fixed host names and ports are used
commonConfig(ConfigFactory.parseString("""
akka.actor.provider = cluster
akka.cluster.roles = [compute]
""").withFallback(ConfigFactory.load()))
}
// need one concrete test class per node
class StatsSampleSpecMultiJvmNode1 extends StatsSampleSpec
class StatsSampleSpecMultiJvmNode2 extends StatsSampleSpec
class StatsSampleSpecMultiJvmNode3 extends StatsSampleSpec
import akka.remote.testkit.MultiNodeSpec
import akka.testkit.ImplicitSender
import org.scalatest.BeforeAndAfterAll
import org.scalatest.Matchers
import org.scalatest.WordSpecLike
abstract class StatsSampleSpec extends MultiNodeSpec(StatsSampleSpecConfig)
with WordSpecLike with Matchers with BeforeAndAfterAll
with ImplicitSender {
import StatsSampleSpecConfig._
override def initialParticipants = roles.size
override def beforeAll() = multiNodeSpecBeforeAll()
override def afterAll() = multiNodeSpecAfterAll()
implicit val typedSystem = system.toTyped
"The stats sample" must {
"illustrate how to startup cluster" in within(15.seconds) {
Cluster(system).subscribe(testActor, classOf[MemberUp])
expectMsgClass(classOf[CurrentClusterState])
val firstAddress = node(first).address
val secondAddress = node(second).address
val thirdAddress = node(third).address
Cluster(system).join(firstAddress)
receiveN(3).collect { case MemberUp(m) => m.address }.toSet should be(
Set(firstAddress, secondAddress, thirdAddress))
Cluster(system).unsubscribe(testActor)
testConductor.enter("all-up")
}
"show usage of the statsService from one node" in within(15.seconds) {
runOn(first, second) {
val worker = system.spawn(StatsWorker(), "StatsWorker")
val service = system.spawn(StatsService(worker), "StatsService")
typedSystem.receptionist ! Receptionist.Register(App.StatsServiceKey, service)
}
runOn(third) {
assertServiceOk()
}
testConductor.enter("done-2")
}
def assertServiceOk(): Unit = {
// eventually the service should be ok,
// first attempts might fail because worker actors not started yet
awaitAssert {
val probe = TestProbe[AnyRef]()
typedSystem.receptionist ! Receptionist.Find(App.StatsServiceKey, probe.ref)
val App.StatsServiceKey.Listing(actors) = probe.expectMessageType[Receptionist.Listing]
actors should not be empty
actors.head ! StatsService.ProcessText("this is the text that will be analyzed", probe.ref)
probe.expectMessageType[StatsService.JobResult].meanWordLength should be(
3.875 +- 0.001)
}
}
"show usage of the statsService from all nodes" in within(15.seconds) {
assertServiceOk()
testConductor.enter("done-3")
}
}
}