Update dependency versions.

This commit is contained in:
Gregory Burd 2020-02-27 14:58:06 -05:00
parent 25e01687d0
commit e22258f12f
11 changed files with 436 additions and 137 deletions

121
COPYING
View file

@ -1,121 +0,0 @@
Creative Commons Legal Code
CC0 1.0 Universal
CREATIVE COMMONS CORPORATION IS NOT A LAW FIRM AND DOES NOT PROVIDE
LEGAL SERVICES. DISTRIBUTION OF THIS DOCUMENT DOES NOT CREATE AN
ATTORNEY-CLIENT RELATIONSHIP. CREATIVE COMMONS PROVIDES THIS
INFORMATION ON AN "AS-IS" BASIS. CREATIVE COMMONS MAKES NO WARRANTIES
REGARDING THE USE OF THIS DOCUMENT OR THE INFORMATION OR WORKS
PROVIDED HEREUNDER, AND DISCLAIMS LIABILITY FOR DAMAGES RESULTING FROM
THE USE OF THIS DOCUMENT OR THE INFORMATION OR WORKS PROVIDED
HEREUNDER.
Statement of Purpose
The laws of most jurisdictions throughout the world automatically confer
exclusive Copyright and Related Rights (defined below) upon the creator
and subsequent owner(s) (each and all, an "owner") of an original work of
authorship and/or a database (each, a "Work").
Certain owners wish to permanently relinquish those rights to a Work for
the purpose of contributing to a commons of creative, cultural and
scientific works ("Commons") that the public can reliably and without fear
of later claims of infringement build upon, modify, incorporate in other
works, reuse and redistribute as freely as possible in any form whatsoever
and for any purposes, including without limitation commercial purposes.
These owners may contribute to the Commons to promote the ideal of a free
culture and the further production of creative, cultural and scientific
works, or to gain reputation or greater distribution for their Work in
part through the use and efforts of others.
For these and/or other purposes and motivations, and without any
expectation of additional consideration or compensation, the person
associating CC0 with a Work (the "Affirmer"), to the extent that he or she
is an owner of Copyright and Related Rights in the Work, voluntarily
elects to apply CC0 to the Work and publicly distribute the Work under its
terms, with knowledge of his or her Copyright and Related Rights in the
Work and the meaning and intended legal effect of CC0 on those rights.
1. Copyright and Related Rights. A Work made available under CC0 may be
protected by copyright and related or neighboring rights ("Copyright and
Related Rights"). Copyright and Related Rights include, but are not
limited to, the following:
i. the right to reproduce, adapt, distribute, perform, display,
communicate, and translate a Work;
ii. moral rights retained by the original author(s) and/or performer(s);
iii. publicity and privacy rights pertaining to a person's image or
likeness depicted in a Work;
iv. rights protecting against unfair competition in regards to a Work,
subject to the limitations in paragraph 4(a), below;
v. rights protecting the extraction, dissemination, use and reuse of data
in a Work;
vi. database rights (such as those arising under Directive 96/9/EC of the
European Parliament and of the Council of 11 March 1996 on the legal
protection of databases, and under any national implementation
thereof, including any amended or successor version of such
directive); and
vii. other similar, equivalent or corresponding rights throughout the
world based on applicable law or treaty, and any national
implementations thereof.
2. Waiver. To the greatest extent permitted by, but not in contravention
of, applicable law, Affirmer hereby overtly, fully, permanently,
irrevocably and unconditionally waives, abandons, and surrenders all of
Affirmer's Copyright and Related Rights and associated claims and causes
of action, whether now known or unknown (including existing as well as
future claims and causes of action), in the Work (i) in all territories
worldwide, (ii) for the maximum duration provided by applicable law or
treaty (including future time extensions), (iii) in any current or future
medium and for any number of copies, and (iv) for any purpose whatsoever,
including without limitation commercial, advertising or promotional
purposes (the "Waiver"). Affirmer makes the Waiver for the benefit of each
member of the public at large and to the detriment of Affirmer's heirs and
successors, fully intending that such Waiver shall not be subject to
revocation, rescission, cancellation, termination, or any other legal or
equitable action to disrupt the quiet enjoyment of the Work by the public
as contemplated by Affirmer's express Statement of Purpose.
3. Public License Fallback. Should any part of the Waiver for any reason
be judged legally invalid or ineffective under applicable law, then the
Waiver shall be preserved to the maximum extent permitted taking into
account Affirmer's express Statement of Purpose. In addition, to the
extent the Waiver is so judged Affirmer hereby grants to each affected
person a royalty-free, non transferable, non sublicensable, non exclusive,
irrevocable and unconditional license to exercise Affirmer's Copyright and
Related Rights in the Work (i) in all territories worldwide, (ii) for the
maximum duration provided by applicable law or treaty (including future
time extensions), (iii) in any current or future medium and for any number
of copies, and (iv) for any purpose whatsoever, including without
limitation commercial, advertising or promotional purposes (the
"License"). The License shall be deemed effective as of the date CC0 was
applied by Affirmer to the Work. Should any part of the License for any
reason be judged legally invalid or ineffective under applicable law, such
partial invalidity or ineffectiveness shall not invalidate the remainder
of the License, and in such case Affirmer hereby affirms that he or she
will not (i) exercise any of his or her remaining Copyright and Related
Rights in the Work or (ii) assert any associated claims and causes of
action with respect to the Work, in either case contrary to Affirmer's
express Statement of Purpose.
4. Limitations and Disclaimers.
a. No trademark or patent rights held by Affirmer are waived, abandoned,
surrendered, licensed or otherwise affected by this document.
b. Affirmer offers the Work as-is and makes no representations or
warranties of any kind concerning the Work, express, implied,
statutory or otherwise, including without limitation warranties of
title, merchantability, fitness for a particular purpose, non
infringement, or the absence of latent or other defects, accuracy, or
the present or absence of errors, whether or not discoverable, all to
the greatest extent permissible under applicable law.
c. Affirmer disclaims responsibility for clearing rights of other persons
that may apply to the Work or any use thereof, including without
limitation any person's Copyright and Related Rights in the Work.
Further, Affirmer disclaims responsibility for obtaining any necessary
consents, permissions or other rights required for any use of the
Work.
d. Affirmer understands and acknowledges that Creative Commons is not a
party to this document and has no duty or obligation with respect to
this CC0 or use of the Work.

10
LICENSE
View file

@ -1,10 +0,0 @@
Akka sample by Lightbend
Licensed under Public Domain (CC0)
To the extent possible under law, the person who associated CC0 with
this Template has waived all copyright and related or neighboring
rights to this Template.
You should have received a copy of the CC0 legalcode along with this
work. If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.

159
TODO Normal file
View file

@ -0,0 +1,159 @@
Requirements for a new timeseries database (TSDB):
Hardware: a AWS/EC2 ic3.8xlarge (Intel Xeon E5-2680v2) based cloud server with 60GB of RAM
* support time series with nanosecond granularity
* 2 billion unique time series identified by a key (string)
* 1 billion data points (time stamp, value, tag set) added per minute
* 50 quadrillion data points per year per server
* ~17 million writes per second (~1B/min)
* support for historic as well as live streaming queries
* more than 50,000 queries per second at peak (per-node)
* 20 million reads per second, reads must succeed in ~2µ seconds (microseconds) against the low-latency pool in the p95
* over 1 year ability to store 2.1 trillion data points, and supporting 119M queries per second (53M inserts per second) in a four-node cluster
* store data for 26 hours in low-latency pool
* store data for ~146 years in higher-latency format with full precision or which optionally degrades precision gracefully to conserve space
* accelerates computation of associative statistics over arbitrary time ranges, using precomputed statistics over power-of-two-aligned time ranges
* 2.93x compression on disk format including the precomputed statistical records and tags
* at minimum two in-memory, non co-located (-affinity) replicas (for disaster recovery capacity)
* 100% read-available service when any node remains lively and network available to a client
* ability to scan over all in-memory data within the past 26hrs saturating network bandwidth or cache and scan other older time-frames as needed
* provides copy-on-write semantics, with versioning and efficient changeset calculation
* supports "distillate streams" — streams computed from one or more existing streams — that are efficiently recomputed as data changes or arrives out-of-order
* scales linearly in the number of nodes
* support for deleting data matching a query within a time range
* design must be able to support at least 2x growth per year
* data is stored with ??? format/precision and with NaN, missing/null, and is value? boolean range information (value must be within [a,b) etc.)
* missing data can be estimated with a confidence
*
Akka:
* VectorClock -> https://github.com/ricardobcl/ServerWideClocks
* Membership -> https://github.com/lalithsuresh/rapid
* Gossip ->
* Failure Detector -> https://github.com/lalithsuresh/rapid
* Consistent Hashing -> https://doc.akka.io/docs/akka/current/cluster-routing.html @slfrichie's work
* https://doc.akka.io/docs/akka/current/multi-node-testing.html
* https://doc.akka.io/docs/akka/current/remoting-artery.html
* Timeseries
* https://www.youtube.com/playlist?list=PLSE8ODhjZXjY0GMWN4X8FIkYNfiu8_Wl9
* https://blog.acolyer.org/2016/05/04/btrdb-optimizing-storage-system-design-for-timeseries-processing/
* https://www.usenix.org/conference/fast16/technical-sessions/presentation/andersen
* http://www.vldb.org/pvldb/vol8/p1816-teller.pdf
* https://www.cockroachlabs.com/blog/time-travel-queries-select-witty_subtitle-the_future/
https://www.cockroachlabs.com/docs/stable/as-of-system-time.html
https://github.com/oxlade39/STorrent - scala/akka implementation of bittorent
https://github.com/atomashpolskiy/bittorrent - java implementation of bittorrent
https://github.com/humio/hanoidb
https://github.com/cb372/scalacache - scala cache API (cache2k)
https://github.com/smacke/jaydio - direct IO
http://probcomp.csail.mit.edu/blog/programming-and-probability-sampling-from-a-discrete-distribution-over-an-infinite-set/
https://github.com/silt/silt https://www.cs.cmu.edu/~dga/papers/silt-sosp2011.pdf
* BTrDB
https://news.ycombinator.com/item?id=20280135
https://github.com/PingThingsIO/btrdb-explained
https://btrdb.readthedocs.io/en/latest/explained.html
http://btrdb-viz-latest.surge.sh/
https://www.usenix.org/conference/fast16/technical-sessions/presentation/andersen
https://blog.acolyer.org/2016/05/04/btrdb-optimizing-storage-system-design-for-timeseries-processing/
* Trigram Tags for Feed Forward Cuckoo/Neural/Bloom Filters
https://github.com/efficient/ffbf
https://www.cs.cmu.edu/~dga/papers/ffbf-jea2012.pdf
https://blog.acolyer.org/2019/07/19/meta-learning-neural-bloom-filters/
https://www.cs.cmu.edu/~dga/papers/cuckoo-conext2014.pdf https://brilliant.org/wiki/cuckoo-filter/ https://bdupras.github.io/filter-tutorial/
https://github.com/google/codesearch/blob/master/index/read.go https://github.com/google/codesearch/blob/master/index/write.go
rolling hash
chunk data in blocks defined by a 50% fill-rate of the bloom filter of fixed 8KB size
for every bit of information we can extract from the search string we get a 50% reduction in the number of
chunks we need to scan
* HyperLogLog++ for approximate counts
https://github.com/clarkduvall/hyperloglog
https://research.neustar.biz/2013/01/24/hyperloglog-googles-take-on-engineering-hll/
* Log Linear bucketed histograms
merge using a tree algorithm to try to merge similar histograms (e.g. with four, merge 0-1, 2-3, then (0-1)-(2-3))
* Value estimation, anomaly detection
http://probcomp.csail.mit.edu/software/bayesdb/
https://github.com/probcomp/bayeslite
https://github.com/probcomp/BayesDB https://github.com/probcomp/BayesDB/tree/6d7b9207672f39275291fd5897836a1da073fa78
https://news.ycombinator.com/item?id=6864339
https://www.youtube.com/watch?v=7_m7JCLKmTY
https://www.youtube.com/watch?v=Bvjj0Hagzbg
https://www.datainnovation.org/2013/12/5-qs-for-the-creators-of-bayesdb-a-database-built-for-data-science/
* time-partitioned trees
QUERY SYNTAX:
-----------------------------------------------------------------------------------------------------------------------
INFER ... WITH CONFIDENCE 0.95
SIMULATE salary FROM employees_demo GIVEN division='sales' TIMES 100;
API:
-----------------------------------------------------------------------------------------------------------------------
Types:
- AUTH
token representing authentication for access to the system
- UUID
unique value identifying
- BITMAP
an EWAH-compressed bitmap index indicating presents of tags for values in this stream
- StartTime/EndTime
[begin:end) - include data that matches the begin time, not end, a beging of 12:00 will only match data +1 ns after 12:00 and later
(begin:end) - doesn't include data that matches the start or end of the range only within
(begin:end] -
[begin:) - from time until now and ongoing as a stream
[:end] - from the first recorded value forward until end
[:] - all values recorded in all time and ongoing as a stream
- AsOfTime - uses system recorded time to find the next version closest to specified time, time can be in the past or
future in which case the stream doesn't start until the system time as known to the node servicing the query passes
the indicated time
InsertValues(AUTH, UUID, EWAH, [(time, value)]) → boolean
StreamValues(AUTH, UUID, starting EWAH, [(time, value, optional Δ-EWAH)]): Stream/Socket
Creates a new version of a stream with the given collection of (time,value) pairs inserted. Logically, the stream is
maintained in time order. Most commonly, points are appended to the end of the stream, but this cannot be assumed:
readings from a device may be delivered to the store out of order, duplicates may occur, holes may be back-filled and
corrections may be made to old data. Each insertion of a collection of values creates a new version, leaving the old
version unmodified. This allows new analyses to be performed on old versions of the data.
GetRange(AUTH, UUID, EWAH, StartTime, EndTime, Version|AsOfSystemTime) → (Version, [(time, value)])
StreamRange(AUTH, UUID, EWAH, StartTime, EndTime, Version|AsOfSystemTime) → (Version, [(time, value)])
Retrieves all the data between two times (non-inclusive in a given version of the stream. The latest version can be indicated,
thereby eliminating a call toGetLatestVer-sion(UUID)→Versionto obtain the latest version for astream prior to querying a range. The exact version num-ber is returned along with the data to facilitate a repeat-able query in future
Get-StatisticalRange(UUID, StartTime, EndTime, Ver-sion, Resolution)→(Version, [(Time, Min, Mean,Max, Count)])is used to retrieve statistical records be-tween two times at a given temporal resolution. Eachrecord covers 2resolutionnanoseconds. The start time andend time are on 2resolutionboundaries and result recordsare periodic in that time unit; thus summaries are alignedacross streams. Unaligned windows can also be queried,with a marginal decrease in performance.GetNearestValue(UUID, Time, Version, Direction)→(Version, (Time, Value))locates the nearest point to agiven time, either forwards or backwards. It is commonlyused to obtain the current, or most recent to now, valueof a stream of interest.
ComputeDiff(UUID, FromVersion,ToVersion, Resolution)→[(StartTime, EndTime)]provides the time ranges that contain differences betweenthe given versions. The size of the changeset returnedcan be limited by limiting the number of versions be-tweenFromVersionandToVersionas each version hasa maximum size. Each returned time range will be largerthan 2resolutionnanoseconds, allowing the caller to opti-mize for batch size.
As utilities
DeleteValues(AUTH, UUID, ...
DeleteValues(AUTH, UUID, between versions?
DeleteValues(AUTH, UUID, with tags that match EWAH
DeleteRange(UUID, StartTime, End-Time): create a new version of the stream with the givenrange deleted andFlush(UUID)ensure the given streamis flushed to replicated storage.
------------------------------------------------------------------------------------------------------------------------------
distillation pipeline
time-partitioning copy-on-write version-annotated k-arytree
To retain historic data, the tree iscopy on write: eachinsert into the tree forms an overlay on the previous treeaccessible via a new root node. Providing historic dataqueries in this way ensures that all versions of the treerequire equal effort to query unlike log replay mech-anisms which introduce overheads proportional to howmuch data has changed or how old is the version that isbeing queried. Using the storage structure as the indexensures that queries to any version of the stream have anindex to use, and reduces network round trips.
The compression enginecom-presses the min, mean, max, count, address and versionfields in internal nodes, as well as the time and valuefields in leaf nodes. It uses a method we calldelta-deltacoding followed by Huffman coding using a fixedtree. Typical delta coding works by calculating the dif-ference between every value in the sequence and storingthat using variable-length symbols (as the delta is nor-mally smaller than the absolute values [18]). Unfortu-nately with high-precision sensor data, this process doesnot work well because nanosecond timestamps producevery large deltas, and even linearly-changing values pro-duce sequences of large, but similar, delta values. Delta-delta compression re-places run-length encoding and encodes each delta as thedifference from the mean of a window of previous deltavalues. The result is a sequence of only the jitter values.Incidentally this works well for the addresses and ver-sion numbers, as they too are linearly increasing valueswith some jitter in the deltas. In the course of system de-velopment, we found that this algorithm produces betterresults, with a simpler implementation, than the residualcoding in FLAC [13] which was the initial inspiration.
GOAL:
* segment files
* have a torrent file description
* must reach a replication level where the spread of chunks within these file is sufficient to rebuild them (erasure coding) even if 1/2 of the cluster is missing
* bittorrent leaching/seeding to automatically deliver files within directories to nodes
* hanoidb/silt-like data storage, fast ingest merged to larger space/search efficient segment files
* BTrDB trie
* nanosecond resolution
* tagged datapoints
* computations within data
LB -> cluster -> node -> socket with metrics data on it
-> data parsed, consistent hashed to nodes with similar time ranges for archival

View file

@ -1,7 +1,10 @@
import com.typesafe.sbt.SbtMultiJvm.multiJvmSettings
import com.typesafe.sbt.SbtMultiJvm.MultiJvmKeys.MultiJvm
val akkaVersion = "2.6.0"
//https://stackoverflow.com/a/58456468
ThisBuild / useCoursier := false
val akkaVersion = "2.6.3"
lazy val `kaka` = project
.in(file("."))
@ -14,11 +17,14 @@ lazy val `kaka` = project
run / javaOptions ++= Seq("-Xms128m", "-Xmx1024m", "-Djava.library.path=./target/native"),
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor-typed" % akkaVersion,
"com.typesafe.akka" %% "akka-cluster" % akkaVersion,
"com.typesafe.akka" %% "akka-cluster-typed" % akkaVersion,
"com.typesafe.akka" %% "akka-cluster-metrics" % akkaVersion,
"com.typesafe.akka" %% "akka-serialization-jackson" % akkaVersion,
"com.typesafe.akka" %% "akka-multi-node-testkit" % akkaVersion,
"ch.qos.logback" % "logback-classic" % "1.2.3",
"org.scalatest" %% "scalatest" % "3.0.8" % Test,
"ch.qos.logback" % "logback-classic" % "1.2.3",
"io.kamon" % "sigar-loader" % "1.6.6-rev002",
"org.scalatest" %% "scalatest" % "3.1.1" % Test,
"com.typesafe.akka" %% "akka-actor-testkit-typed" % akkaVersion % Test),
run / fork := false,
Global / cancelable := false,
@ -27,3 +33,5 @@ lazy val `kaka` = project
licenses := Seq(("CC0", url("http://creativecommons.org/publicdomain/zero/1.0")))
)
.configs (MultiJvm)
//dependencyUpdatesFilter -= moduleFilter(organization = "org.scala-lang")

View file

@ -1 +1 @@
sbt.version=1.3.3
sbt.version=1.3.8

View file

@ -1,2 +1,25 @@
logLevel := Level.Warn
resolvers ++= Seq(
Classpaths.sbtPluginReleases,
"Local Maven Repository" at Path.userHome.asFile.toURI.toURL + ".m2/repository",
"Local Ivy2 Cache Repository" at Path.userHome.asFile.toURI.toURL + ".ivy2/cache",
Resolver.sonatypeRepo("snapshots"),
Resolver.typesafeRepo("releases"),
Resolver.sonatypeRepo("releases"),
Resolver.bintrayIvyRepo("jetbrains", "sbt-plugins")
)
// Use the Dotty compiler for Scala code.
addSbtPlugin("ch.epfl.lamp" % "sbt-dotty" % "0.4.0")
// Makes our code tidy
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.3.1")
// Native Packager allows us to create standalone jar
addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.6.1")
addSbtPlugin("com.typesafe.sbt" % "sbt-multi-jvm" % "0.4.0")
addSbtPlugin("com.dwijnand" % "sbt-dynver" % "3.0.0")
addSbtPlugin("com.dwijnand" % "sbt-dynver" % "4.0.0")
addSbtPlugin("org.jmotor.sbt" % "sbt-dependency-updates" % "1.2.1")
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.10")

View file

@ -5,6 +5,24 @@ akka {
serialization-bindings {
"sample.cluster.CborSerializable" = jackson-cbor
}
deployment {
/factorialFrontend/factorialBackendRouter = {
# Router type provided by metrics extension.
router = cluster-metrics-adaptive-group
# Router parameter specific for metrics extension.
# metrics-selector = heap
# metrics-selector = load
# metrics-selector = cpu
metrics-selector = mix
#
routees.paths = ["/user/factorialBackend"]
cluster {
enabled = on
use-roles = ["backend"]
allow-local-routees = off
}
}
}
}
remote {
artery {
@ -16,6 +34,12 @@ akka {
seed-nodes = [
"akka://ClusterSystem@127.0.0.1:25251",
"akka://ClusterSystem@127.0.0.1:25252"]
failure-detector {
implementation-class = "akka.remote.AdaptiveAccrualFailureDetector"
threshold = 0.05
scaling-factor = 0.9
}
}
extensions = [ "akka.cluster.metrics.ClusterMetricsExtension" ]
}

View file

@ -0,0 +1,125 @@
package akka.remote
import akka.remote.FailureDetector.Clock
import java.util.concurrent.atomic.AtomicReference
import scala.annotation.tailrec
import com.typesafe.config.Config
import akka.event.EventStream
/**
* Implementation of 'A New Adaptive Accrual Failure Detector for Dependable Distributed Systems' by Satzger al. as defined in their paper:
* [https://pdfs.semanticscholar.org/8805/d522cd6cef723aae55595f918e09914e4316.pdf]
*
* The idea of this failure detector is to predict the arrival time of the next heartbeat based on
* the history of inter-arrival times between heartbeats. The algorithm approximates the cumulative distribution function (CDF)
* of inter-arrival times of the heartbeat messages.
*
* The suspicion value of a failure is calculated as follows:
*
* {{{
* P = |StΔ| / |S|
* }}}
*
* where:
* - S is the list of historical inter-arrival times of heartbeats
* - StΔ the list of inter-arrival times that are smaller or equal to
* - = previous heartbeat timestamp - current heartbeat timestamp
*
* @param threshold A low threshold is prone to generate many wrong suspicions but ensures a quick detection in the event
* of a real crash. Conversely, a high threshold generates fewer mistakes but needs more time to detect
* actual crashes
* @param maxSampleSize Number of samples to use for calculation of mean and standard deviation of
* inter-arrival times.
* @param scalingFactor A scaling factor to prevent the failure detector to overestimate the probability of failures
* particularly in the case of increasing network latency times
* @param clock The clock, returning current time in milliseconds, but can be faked for testing
* purposes. It is only used for measuring intervals (duration).
*/
class AdaptiveAccrualFailureDetector(
val threshold: Double,
val maxSampleSize: Int,
val scalingFactor: Double,
eventStream: Option[EventStream])(
implicit
clock: Clock) extends FailureDetector {
/**
* Constructor that reads parameters from config.
* Expecting config properties named `threshold`, `max-sample-size`,
* `min-std-deviation`, `acceptable-heartbeat-pause` and
* `heartbeat-interval`.
*/
def this(config: Config, ev: EventStream) =
this(
threshold = config.getDouble("threshold"),
maxSampleSize = config.getInt("max-sample-size"),
scalingFactor = config.getDouble("scaling-factor"),
Some(ev))
require(threshold > 0.0, "failure-detector.threshold must be > 0")
require(maxSampleSize > 0, "failure-detector.max-sample-size must be > 0")
require(scalingFactor > 0, "failure-detector.scaling-factor must be > 0")
private val firstHeartbeat: HeartbeatHistory = HeartbeatHistory(maxSampleSize)
/**
* Implement using optimistic lockless concurrency, all state is represented
* by this immutable case class and managed by an AtomicReference.
*/
private /* final */ case class State(history: HeartbeatHistory, freshnessPoint: Option[Long]) // see: https://github.com/scala/bug/issues/4440
private val state = new AtomicReference[State](State(history = firstHeartbeat, freshnessPoint = None))
override def isAvailable: Boolean = isAvailable(clock())
private def isAvailable(timestamp: Long): Boolean = suspicion(timestamp) < threshold
override def isMonitoring: Boolean = state.get.freshnessPoint.nonEmpty
@tailrec
final override def heartbeat(): Unit = {
val timestamp = clock()
val oldState = state.get
val newHistory = oldState.freshnessPoint match {
case None =>
// This is heartbeat from a new resource according to the algorithm do not add any initial history.
firstHeartbeat
case Some(freshnessPoint) =>
// This is a known connection.
val = timestamp - freshnessPoint
oldState.history :+
}
val newState = oldState.copy(history = newHistory, freshnessPoint = Some(timestamp)) // Record new timestamp.
// If we won the race then update else try again.
if (!state.compareAndSet(oldState, newState)) heartbeat() // Recur.
}
/**
* The suspicion level of the accrual failure detector (expressed as probability).
*
* If a connection does not have any records in failure detector then it is considered healthy.
*/
def suspicion: Double = suspicion(clock())
private def suspicion(timestamp: Long): Double = {
val oldState = state.get
val freshnessPoint = oldState.freshnessPoint
if (freshnessPoint.isEmpty || oldState.history.intervals.isEmpty) 0.0 // Treat unmanaged connections, e.g. with zero heartbeats, as healthy connections.
else {
val = timestamp - freshnessPoint.get
val S = oldState.history.intervals
val SLength = S.length
val StΔLength = S.count(_ <= * scalingFactor)
StΔLength.toDouble / SLength.toDouble
}
}
}

View file

@ -15,7 +15,7 @@ import akka.cluster.typed.Subscribe
object ClusterListener {
sealed trait Event
// internal adapted cluster events only
// Internal adapted cluster events only.
private final case class ReachabilityChange(reachabilityEvent: ReachabilityEvent) extends Event
private final case class MemberChange(event: MemberEvent) extends Event

View file

@ -0,0 +1,59 @@
import akka.actor.{ Actor, ActorLogging, ReceiveTimeout }
import akka.routing.FromConfig
import scala.annotation.tailrec
import scala.concurrent.Future
import scala.concurrent.duration._
import akka.pattern.pipe
class FactorialBackend extends Actor with ActorLogging {
import context.dispatcher
def receive = {
case (n: Int) =>
Future(factorial(n))
.map { result =>
(n, result)
}
.pipeTo(sender())
}
def factorial(n: Int): BigInt = {
@tailrec def factorialAcc(acc: BigInt, n: Int): BigInt = {
if (n <= 1) acc
else factorialAcc(acc * n, n - 1)
}
factorialAcc(BigInt(1), n)
}
}
class FactorialFrontend(upToN: Int, repeat: Boolean) extends Actor with ActorLogging {
val backend = context.actorOf(FromConfig.props(), name = "factorialBackendRouter")
override def preStart(): Unit = {
sendJobs()
if (repeat) {
context.setReceiveTimeout(10.seconds)
}
}
def receive = {
case (n: Int, factorial: BigInt) =>
if (n == upToN) {
log.debug("{}! = {}", n, factorial)
if (repeat) sendJobs()
else context.stop(self)
}
case ReceiveTimeout =>
log.info("Timeout")
sendJobs()
}
def sendJobs(): Unit = {
log.info("Starting batch of factorials up to [{}]", upToN)
(1 to upToN).foreach { backend ! _ }
}
}

View file

@ -0,0 +1,32 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>INFO</level>
</filter>
<encoder>
<pattern>[%date{ISO8601}] [%level] [%logger] [%thread] [%X{akkaSource}] - %msg%n</pattern>
</encoder>
</appender>
<!--
Logging from tests are silenced by this appender. When there is a test failure
the captured logging events are flushed to the appenders defined for the
akka.actor.testkit.typed.internal.CapturingAppenderDelegate logger.
-->
<appender name="CapturingAppender" class="akka.actor.testkit.typed.internal.CapturingAppender" />
<!--
The appenders defined for this CapturingAppenderDelegate logger are used
when there is a test failure and all logging events from the test are
flushed to these appenders.
-->
<logger name="akka.actor.testkit.typed.internal.CapturingAppenderDelegate" >
<appender-ref ref="STDOUT"/>
</logger>
<root level="DEBUG">
<appender-ref ref="CapturingAppender"/>
</root>
</configuration>