From e22258f12f4ade5b423f1d4dfdcb2f4e1a75685f Mon Sep 17 00:00:00 2001 From: Greg Burd Date: Thu, 27 Feb 2020 14:58:06 -0500 Subject: [PATCH] Update dependency versions. --- COPYING | 121 ------------- LICENSE | 10 -- TODO | 159 ++++++++++++++++++ build.sbt | 14 +- project/build.properties | 2 +- project/plugins.sbt | 25 ++- src/main/resources/application.conf | 24 +++ .../AdaptiveAccrualFailureDetector.scala | 125 ++++++++++++++ .../cluster/simple/ClusterListener.scala | 2 +- .../sample/factorial/FactorialService.scala | 59 +++++++ src/test/resources/logback-test.xml | 32 ++++ 11 files changed, 436 insertions(+), 137 deletions(-) delete mode 100644 COPYING create mode 100644 TODO create mode 100644 src/main/scala/akka/remote/AdaptiveAccrualFailureDetector.scala create mode 100644 src/main/scala/sample/factorial/FactorialService.scala create mode 100644 src/test/resources/logback-test.xml diff --git a/COPYING b/COPYING deleted file mode 100644 index 0e259d4..0000000 --- a/COPYING +++ /dev/null @@ -1,121 +0,0 @@ -Creative Commons Legal Code - -CC0 1.0 Universal - - CREATIVE COMMONS CORPORATION IS NOT A LAW FIRM AND DOES NOT PROVIDE - LEGAL SERVICES. DISTRIBUTION OF THIS DOCUMENT DOES NOT CREATE AN - ATTORNEY-CLIENT RELATIONSHIP. CREATIVE COMMONS PROVIDES THIS - INFORMATION ON AN "AS-IS" BASIS. CREATIVE COMMONS MAKES NO WARRANTIES - REGARDING THE USE OF THIS DOCUMENT OR THE INFORMATION OR WORKS - PROVIDED HEREUNDER, AND DISCLAIMS LIABILITY FOR DAMAGES RESULTING FROM - THE USE OF THIS DOCUMENT OR THE INFORMATION OR WORKS PROVIDED - HEREUNDER. - -Statement of Purpose - -The laws of most jurisdictions throughout the world automatically confer -exclusive Copyright and Related Rights (defined below) upon the creator -and subsequent owner(s) (each and all, an "owner") of an original work of -authorship and/or a database (each, a "Work"). - -Certain owners wish to permanently relinquish those rights to a Work for -the purpose of contributing to a commons of creative, cultural and -scientific works ("Commons") that the public can reliably and without fear -of later claims of infringement build upon, modify, incorporate in other -works, reuse and redistribute as freely as possible in any form whatsoever -and for any purposes, including without limitation commercial purposes. -These owners may contribute to the Commons to promote the ideal of a free -culture and the further production of creative, cultural and scientific -works, or to gain reputation or greater distribution for their Work in -part through the use and efforts of others. - -For these and/or other purposes and motivations, and without any -expectation of additional consideration or compensation, the person -associating CC0 with a Work (the "Affirmer"), to the extent that he or she -is an owner of Copyright and Related Rights in the Work, voluntarily -elects to apply CC0 to the Work and publicly distribute the Work under its -terms, with knowledge of his or her Copyright and Related Rights in the -Work and the meaning and intended legal effect of CC0 on those rights. - -1. Copyright and Related Rights. A Work made available under CC0 may be -protected by copyright and related or neighboring rights ("Copyright and -Related Rights"). Copyright and Related Rights include, but are not -limited to, the following: - - i. the right to reproduce, adapt, distribute, perform, display, - communicate, and translate a Work; - ii. moral rights retained by the original author(s) and/or performer(s); -iii. publicity and privacy rights pertaining to a person's image or - likeness depicted in a Work; - iv. rights protecting against unfair competition in regards to a Work, - subject to the limitations in paragraph 4(a), below; - v. rights protecting the extraction, dissemination, use and reuse of data - in a Work; - vi. database rights (such as those arising under Directive 96/9/EC of the - European Parliament and of the Council of 11 March 1996 on the legal - protection of databases, and under any national implementation - thereof, including any amended or successor version of such - directive); and -vii. other similar, equivalent or corresponding rights throughout the - world based on applicable law or treaty, and any national - implementations thereof. - -2. Waiver. To the greatest extent permitted by, but not in contravention -of, applicable law, Affirmer hereby overtly, fully, permanently, -irrevocably and unconditionally waives, abandons, and surrenders all of -Affirmer's Copyright and Related Rights and associated claims and causes -of action, whether now known or unknown (including existing as well as -future claims and causes of action), in the Work (i) in all territories -worldwide, (ii) for the maximum duration provided by applicable law or -treaty (including future time extensions), (iii) in any current or future -medium and for any number of copies, and (iv) for any purpose whatsoever, -including without limitation commercial, advertising or promotional -purposes (the "Waiver"). Affirmer makes the Waiver for the benefit of each -member of the public at large and to the detriment of Affirmer's heirs and -successors, fully intending that such Waiver shall not be subject to -revocation, rescission, cancellation, termination, or any other legal or -equitable action to disrupt the quiet enjoyment of the Work by the public -as contemplated by Affirmer's express Statement of Purpose. - -3. Public License Fallback. Should any part of the Waiver for any reason -be judged legally invalid or ineffective under applicable law, then the -Waiver shall be preserved to the maximum extent permitted taking into -account Affirmer's express Statement of Purpose. In addition, to the -extent the Waiver is so judged Affirmer hereby grants to each affected -person a royalty-free, non transferable, non sublicensable, non exclusive, -irrevocable and unconditional license to exercise Affirmer's Copyright and -Related Rights in the Work (i) in all territories worldwide, (ii) for the -maximum duration provided by applicable law or treaty (including future -time extensions), (iii) in any current or future medium and for any number -of copies, and (iv) for any purpose whatsoever, including without -limitation commercial, advertising or promotional purposes (the -"License"). The License shall be deemed effective as of the date CC0 was -applied by Affirmer to the Work. Should any part of the License for any -reason be judged legally invalid or ineffective under applicable law, such -partial invalidity or ineffectiveness shall not invalidate the remainder -of the License, and in such case Affirmer hereby affirms that he or she -will not (i) exercise any of his or her remaining Copyright and Related -Rights in the Work or (ii) assert any associated claims and causes of -action with respect to the Work, in either case contrary to Affirmer's -express Statement of Purpose. - -4. Limitations and Disclaimers. - - a. No trademark or patent rights held by Affirmer are waived, abandoned, - surrendered, licensed or otherwise affected by this document. - b. Affirmer offers the Work as-is and makes no representations or - warranties of any kind concerning the Work, express, implied, - statutory or otherwise, including without limitation warranties of - title, merchantability, fitness for a particular purpose, non - infringement, or the absence of latent or other defects, accuracy, or - the present or absence of errors, whether or not discoverable, all to - the greatest extent permissible under applicable law. - c. Affirmer disclaims responsibility for clearing rights of other persons - that may apply to the Work or any use thereof, including without - limitation any person's Copyright and Related Rights in the Work. - Further, Affirmer disclaims responsibility for obtaining any necessary - consents, permissions or other rights required for any use of the - Work. - d. Affirmer understands and acknowledges that Creative Commons is not a - party to this document and has no duty or obligation with respect to - this CC0 or use of the Work. diff --git a/LICENSE b/LICENSE index 4239f09..e69de29 100644 --- a/LICENSE +++ b/LICENSE @@ -1,10 +0,0 @@ -Akka sample by Lightbend - -Licensed under Public Domain (CC0) - -To the extent possible under law, the person who associated CC0 with -this Template has waived all copyright and related or neighboring -rights to this Template. - -You should have received a copy of the CC0 legalcode along with this -work. If not, see . diff --git a/TODO b/TODO new file mode 100644 index 0000000..a32fc52 --- /dev/null +++ b/TODO @@ -0,0 +1,159 @@ +Requirements for a new timeseries database (TSDB): + Hardware: a AWS/EC2 ic3.8xlarge (Intel Xeon E5-2680v2) based cloud server with 60GB of RAM + * support time series with nanosecond granularity + * 2 billion unique time series identified by a key (string) + * 1 billion data points (time stamp, value, tag set) added per minute + * 50 quadrillion data points per year per server + * ~17 million writes per second (~1B/min) + * support for historic as well as live streaming queries + * more than 50,000 queries per second at peak (per-node) + * 20 million reads per second, reads must succeed in ~2µ seconds (microseconds) against the low-latency pool in the p95 + * over 1 year ability to store 2.1 trillion data points, and supporting 119M queries per second (53M inserts per second) in a four-node cluster + * store data for 26 hours in low-latency pool + * store data for ~146 years in higher-latency format with full precision or which optionally degrades precision gracefully to conserve space + * accelerates computation of associative statistics over arbitrary time ranges, using precomputed statistics over power-of-two-aligned time ranges + * 2.93x compression on disk format including the precomputed statistical records and tags + * at minimum two in-memory, non co-located (-affinity) replicas (for disaster recovery capacity) + * 100% read-available service when any node remains lively and network available to a client + * ability to scan over all in-memory data within the past 26hrs saturating network bandwidth or cache and scan other older time-frames as needed + * provides copy-on-write semantics, with versioning and efficient changeset calculation + * supports "distillate streams" — streams computed from one or more existing streams — that are efficiently recomputed as data changes or arrives out-of-order + * scales linearly in the number of nodes + * support for deleting data matching a query within a time range + * design must be able to support at least 2x growth per year + * data is stored with ??? format/precision and with NaN, missing/null, and is value? boolean range information (value must be within [a,b) etc.) + * missing data can be estimated with a confidence + * + +Akka: +* VectorClock -> https://github.com/ricardobcl/ServerWideClocks +* Membership -> https://github.com/lalithsuresh/rapid +* Gossip -> +* Failure Detector -> https://github.com/lalithsuresh/rapid +* Consistent Hashing -> https://doc.akka.io/docs/akka/current/cluster-routing.html @slfrichie's work +* https://doc.akka.io/docs/akka/current/multi-node-testing.html +* https://doc.akka.io/docs/akka/current/remoting-artery.html + +* Timeseries + * https://www.youtube.com/playlist?list=PLSE8ODhjZXjY0GMWN4X8FIkYNfiu8_Wl9 + * https://blog.acolyer.org/2016/05/04/btrdb-optimizing-storage-system-design-for-timeseries-processing/ + * https://www.usenix.org/conference/fast16/technical-sessions/presentation/andersen + * http://www.vldb.org/pvldb/vol8/p1816-teller.pdf + * https://www.cockroachlabs.com/blog/time-travel-queries-select-witty_subtitle-the_future/ + https://www.cockroachlabs.com/docs/stable/as-of-system-time.html + +https://github.com/oxlade39/STorrent - scala/akka implementation of bittorent +https://github.com/atomashpolskiy/bittorrent - java implementation of bittorrent +https://github.com/humio/hanoidb +https://github.com/cb372/scalacache - scala cache API (cache2k) +https://github.com/smacke/jaydio - direct IO +http://probcomp.csail.mit.edu/blog/programming-and-probability-sampling-from-a-discrete-distribution-over-an-infinite-set/ + + +https://github.com/silt/silt https://www.cs.cmu.edu/~dga/papers/silt-sosp2011.pdf + +* BTrDB + https://news.ycombinator.com/item?id=20280135 + https://github.com/PingThingsIO/btrdb-explained + https://btrdb.readthedocs.io/en/latest/explained.html + http://btrdb-viz-latest.surge.sh/ + https://www.usenix.org/conference/fast16/technical-sessions/presentation/andersen + https://blog.acolyer.org/2016/05/04/btrdb-optimizing-storage-system-design-for-timeseries-processing/ +* Trigram Tags for Feed Forward Cuckoo/Neural/Bloom Filters + https://github.com/efficient/ffbf + https://www.cs.cmu.edu/~dga/papers/ffbf-jea2012.pdf + https://blog.acolyer.org/2019/07/19/meta-learning-neural-bloom-filters/ + https://www.cs.cmu.edu/~dga/papers/cuckoo-conext2014.pdf https://brilliant.org/wiki/cuckoo-filter/ https://bdupras.github.io/filter-tutorial/ + https://github.com/google/codesearch/blob/master/index/read.go https://github.com/google/codesearch/blob/master/index/write.go + rolling hash + chunk data in blocks defined by a 50% fill-rate of the bloom filter of fixed 8KB size + for every bit of information we can extract from the search string we get a 50% reduction in the number of + chunks we need to scan +* HyperLogLog++ for approximate counts + https://github.com/clarkduvall/hyperloglog + https://research.neustar.biz/2013/01/24/hyperloglog-googles-take-on-engineering-hll/ +* Log Linear bucketed histograms + merge using a tree algorithm to try to merge similar histograms (e.g. with four, merge 0-1, 2-3, then (0-1)-(2-3)) +* Value estimation, anomaly detection + http://probcomp.csail.mit.edu/software/bayesdb/ + https://github.com/probcomp/bayeslite + https://github.com/probcomp/BayesDB https://github.com/probcomp/BayesDB/tree/6d7b9207672f39275291fd5897836a1da073fa78 + https://news.ycombinator.com/item?id=6864339 + https://www.youtube.com/watch?v=7_m7JCLKmTY + https://www.youtube.com/watch?v=Bvjj0Hagzbg + https://www.datainnovation.org/2013/12/5-qs-for-the-creators-of-bayesdb-a-database-built-for-data-science/ + +* time-partitioned trees + +QUERY SYNTAX: +----------------------------------------------------------------------------------------------------------------------- + +INFER ... WITH CONFIDENCE 0.95 +SIMULATE salary FROM employees_demo GIVEN division='sales' TIMES 100; + +API: +----------------------------------------------------------------------------------------------------------------------- +Types: + - AUTH + token representing authentication for access to the system + - UUID + unique value identifying + - BITMAP + an EWAH-compressed bitmap index indicating presents of tags for values in this stream + - StartTime/EndTime + [begin:end) - include data that matches the begin time, not end, a beging of 12:00 will only match data +1 ns after 12:00 and later + (begin:end) - doesn't include data that matches the start or end of the range only within + (begin:end] - + [begin:) - from time until now and ongoing as a stream + [:end] - from the first recorded value forward until end + [:] - all values recorded in all time and ongoing as a stream + - AsOfTime - uses system recorded time to find the next version closest to specified time, time can be in the past or + future in which case the stream doesn't start until the system time as known to the node servicing the query passes + the indicated time + +InsertValues(AUTH, UUID, EWAH, [(time, value)]) → boolean +StreamValues(AUTH, UUID, starting EWAH, [(time, value, optional Δ-EWAH)]): Stream/Socket + Creates a new version of a stream with the given collection of (time,value) pairs inserted. Logically, the stream is + maintained in time order. Most commonly, points are appended to the end of the stream, but this cannot be assumed: + readings from a device may be delivered to the store out of order, duplicates may occur, holes may be back-filled and + corrections may be made to old data. Each insertion of a collection of values creates a new version, leaving the old + version unmodified. This allows new analyses to be performed on old versions of the data. + +GetRange(AUTH, UUID, EWAH, StartTime, EndTime, Version|AsOfSystemTime) → (Version, [(time, value)]) +StreamRange(AUTH, UUID, EWAH, StartTime, EndTime, Version|AsOfSystemTime) → (Version, [(time, value)]) + Retrieves all the data between two times (non-inclusive in a given version of the stream. The ‘latest’ version can be indicated, + thereby eliminating a call toGetLatestVer-sion(UUID)→Versionto obtain the latest version for astream prior to querying a range. The exact version num-ber is returned along with the data to facilitate a repeat-able query in future + +Get-StatisticalRange(UUID, StartTime, EndTime, Ver-sion, Resolution)→(Version, [(Time, Min, Mean,Max, Count)])is used to retrieve statistical records be-tween two times at a given temporal resolution. Eachrecord covers 2resolutionnanoseconds. The start time andend time are on 2resolutionboundaries and result recordsare periodic in that time unit; thus summaries are alignedacross streams. Unaligned windows can also be queried,with a marginal decrease in performance.GetNearestValue(UUID, Time, Version, Direction)→(Version, (Time, Value))locates the nearest point to agiven time, either forwards or backwards. It is commonlyused to obtain the ‘current’, or most recent to now, valueof a stream of interest. + +ComputeDiff(UUID, FromVersion,ToVersion, Resolution)→[(StartTime, EndTime)]provides the time ranges that contain differences betweenthe given versions. The size of the changeset returnedcan be limited by limiting the number of versions be-tweenFromVersionandToVersionas each version hasa maximum size. Each returned time range will be largerthan 2resolutionnanoseconds, allowing the caller to opti-mize for batch size. + +As utilities +DeleteValues(AUTH, UUID, ... +DeleteValues(AUTH, UUID, between versions? +DeleteValues(AUTH, UUID, with tags that match EWAH +DeleteRange(UUID, StartTime, End-Time): create a new version of the stream with the givenrange deleted andFlush(UUID)ensure the given streamis flushed to replicated storage. +------------------------------------------------------------------------------------------------------------------------------ +distillation pipeline +time-partitioning copy-on-write version-annotated k-arytree + +To retain historic data, the tree iscopy on write: eachinsert into the tree forms an overlay on the previous treeaccessible via a new root node. Providing historic dataqueries in this way ensures that all versions of the treerequire equal effort to query – unlike log replay mech-anisms which introduce overheads proportional to howmuch data has changed or how old is the version that isbeing queried. Using the storage structure as the indexensures that queries to any version of the stream have anindex to use, and reduces network round trips. + +The compression enginecom-presses the min, mean, max, count, address and versionfields in internal nodes, as well as the time and valuefields in leaf nodes. It uses a method we calldelta-deltacoding followed by Huffman coding using a fixedtree. Typical delta coding works by calculating the dif-ference between every value in the sequence and storingthat using variable-length symbols (as the delta is nor-mally smaller than the absolute values [18]). Unfortu-nately with high-precision sensor data, this process doesnot work well because nanosecond timestamps producevery large deltas, and even linearly-changing values pro-duce sequences of large, but similar, delta values. Delta-delta compression re-places run-length encoding and encodes each delta as thedifference from the mean of a window of previous deltavalues. The result is a sequence of only the jitter values.Incidentally this works well for the addresses and ver-sion numbers, as they too are linearly increasing valueswith some jitter in the deltas. In the course of system de-velopment, we found that this algorithm produces betterresults, with a simpler implementation, than the residualcoding in FLAC [13] which was the initial inspiration. + +GOAL: +* segment files + * have a torrent file description + * must reach a replication level where the spread of chunks within these file is sufficient to rebuild them (erasure coding) even if 1/2 of the cluster is missing + * bittorrent leaching/seeding to automatically deliver files within directories to nodes + +* hanoidb/silt-like data storage, fast ingest merged to larger space/search efficient segment files +* BTrDB trie +* nanosecond resolution +* tagged datapoints +* computations within data + + +LB -> cluster -> node -> socket with metrics data on it + -> data parsed, consistent hashed to nodes with similar time ranges for archival + diff --git a/build.sbt b/build.sbt index 250019d..1d32a45 100644 --- a/build.sbt +++ b/build.sbt @@ -1,7 +1,10 @@ import com.typesafe.sbt.SbtMultiJvm.multiJvmSettings import com.typesafe.sbt.SbtMultiJvm.MultiJvmKeys.MultiJvm -val akkaVersion = "2.6.0" +//https://stackoverflow.com/a/58456468 +ThisBuild / useCoursier := false + +val akkaVersion = "2.6.3" lazy val `kaka` = project .in(file(".")) @@ -14,11 +17,14 @@ lazy val `kaka` = project run / javaOptions ++= Seq("-Xms128m", "-Xmx1024m", "-Djava.library.path=./target/native"), libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-actor-typed" % akkaVersion, + "com.typesafe.akka" %% "akka-cluster" % akkaVersion, "com.typesafe.akka" %% "akka-cluster-typed" % akkaVersion, + "com.typesafe.akka" %% "akka-cluster-metrics" % akkaVersion, "com.typesafe.akka" %% "akka-serialization-jackson" % akkaVersion, "com.typesafe.akka" %% "akka-multi-node-testkit" % akkaVersion, - "ch.qos.logback" % "logback-classic" % "1.2.3", - "org.scalatest" %% "scalatest" % "3.0.8" % Test, + "ch.qos.logback" % "logback-classic" % "1.2.3", + "io.kamon" % "sigar-loader" % "1.6.6-rev002", + "org.scalatest" %% "scalatest" % "3.1.1" % Test, "com.typesafe.akka" %% "akka-actor-testkit-typed" % akkaVersion % Test), run / fork := false, Global / cancelable := false, @@ -27,3 +33,5 @@ lazy val `kaka` = project licenses := Seq(("CC0", url("http://creativecommons.org/publicdomain/zero/1.0"))) ) .configs (MultiJvm) + +//dependencyUpdatesFilter -= moduleFilter(organization = "org.scala-lang") diff --git a/project/build.properties b/project/build.properties index 6adcdc7..a919a9b 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=1.3.3 +sbt.version=1.3.8 diff --git a/project/plugins.sbt b/project/plugins.sbt index 2d02635..84b479a 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,2 +1,25 @@ +logLevel := Level.Warn + +resolvers ++= Seq( + Classpaths.sbtPluginReleases, + "Local Maven Repository" at Path.userHome.asFile.toURI.toURL + ".m2/repository", + "Local Ivy2 Cache Repository" at Path.userHome.asFile.toURI.toURL + ".ivy2/cache", + Resolver.sonatypeRepo("snapshots"), + Resolver.typesafeRepo("releases"), + Resolver.sonatypeRepo("releases"), + Resolver.bintrayIvyRepo("jetbrains", "sbt-plugins") +) + +// Use the Dotty compiler for Scala code. +addSbtPlugin("ch.epfl.lamp" % "sbt-dotty" % "0.4.0") + +// Makes our code tidy +addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.3.1") + +// Native Packager allows us to create standalone jar +addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.6.1") + addSbtPlugin("com.typesafe.sbt" % "sbt-multi-jvm" % "0.4.0") -addSbtPlugin("com.dwijnand" % "sbt-dynver" % "3.0.0") +addSbtPlugin("com.dwijnand" % "sbt-dynver" % "4.0.0") +addSbtPlugin("org.jmotor.sbt" % "sbt-dependency-updates" % "1.2.1") +addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.10") diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf index 552e16c..bd6bf28 100644 --- a/src/main/resources/application.conf +++ b/src/main/resources/application.conf @@ -5,6 +5,24 @@ akka { serialization-bindings { "sample.cluster.CborSerializable" = jackson-cbor } + deployment { + /factorialFrontend/factorialBackendRouter = { + # Router type provided by metrics extension. + router = cluster-metrics-adaptive-group + # Router parameter specific for metrics extension. + # metrics-selector = heap + # metrics-selector = load + # metrics-selector = cpu + metrics-selector = mix + # + routees.paths = ["/user/factorialBackend"] + cluster { + enabled = on + use-roles = ["backend"] + allow-local-routees = off + } + } + } } remote { artery { @@ -16,6 +34,12 @@ akka { seed-nodes = [ "akka://ClusterSystem@127.0.0.1:25251", "akka://ClusterSystem@127.0.0.1:25252"] + failure-detector { + implementation-class = "akka.remote.AdaptiveAccrualFailureDetector" + threshold = 0.05 + scaling-factor = 0.9 + } } + extensions = [ "akka.cluster.metrics.ClusterMetricsExtension" ] } diff --git a/src/main/scala/akka/remote/AdaptiveAccrualFailureDetector.scala b/src/main/scala/akka/remote/AdaptiveAccrualFailureDetector.scala new file mode 100644 index 0000000..2c50123 --- /dev/null +++ b/src/main/scala/akka/remote/AdaptiveAccrualFailureDetector.scala @@ -0,0 +1,125 @@ +package akka.remote + +import akka.remote.FailureDetector.Clock +import java.util.concurrent.atomic.AtomicReference + +import scala.annotation.tailrec +import com.typesafe.config.Config +import akka.event.EventStream + +/** + * Implementation of 'A New Adaptive Accrual Failure Detector for Dependable Distributed Systems' by Satzger al. as defined in their paper: + * [https://pdfs.semanticscholar.org/8805/d522cd6cef723aae55595f918e09914e4316.pdf] + * + * The idea of this failure detector is to predict the arrival time of the next heartbeat based on + * the history of inter-arrival times between heartbeats. The algorithm approximates the cumulative distribution function (CDF) + * of inter-arrival times of the heartbeat messages. + * + * The suspicion value of a failure is calculated as follows: + * + * {{{ + * P = |StΔ| / |S| + * }}} + * + * where: + * - S is the list of historical inter-arrival times of heartbeats + * - StΔ the list of inter-arrival times that are smaller or equal to tΔ + * - tΔ = previous heartbeat timestamp - current heartbeat timestamp + * + * @param threshold A low threshold is prone to generate many wrong suspicions but ensures a quick detection in the event + * of a real crash. Conversely, a high threshold generates fewer mistakes but needs more time to detect + * actual crashes + * @param maxSampleSize Number of samples to use for calculation of mean and standard deviation of + * inter-arrival times. + * @param scalingFactor A scaling factor to prevent the failure detector to overestimate the probability of failures + * particularly in the case of increasing network latency times + * @param clock The clock, returning current time in milliseconds, but can be faked for testing + * purposes. It is only used for measuring intervals (duration). + */ +class AdaptiveAccrualFailureDetector( + val threshold: Double, + val maxSampleSize: Int, + val scalingFactor: Double, + eventStream: Option[EventStream])( + implicit + clock: Clock) extends FailureDetector { + + /** + * Constructor that reads parameters from config. + * Expecting config properties named `threshold`, `max-sample-size`, + * `min-std-deviation`, `acceptable-heartbeat-pause` and + * `heartbeat-interval`. + */ + def this(config: Config, ev: EventStream) = + this( + threshold = config.getDouble("threshold"), + maxSampleSize = config.getInt("max-sample-size"), + scalingFactor = config.getDouble("scaling-factor"), + Some(ev)) + + require(threshold > 0.0, "failure-detector.threshold must be > 0") + require(maxSampleSize > 0, "failure-detector.max-sample-size must be > 0") + require(scalingFactor > 0, "failure-detector.scaling-factor must be > 0") + + private val firstHeartbeat: HeartbeatHistory = HeartbeatHistory(maxSampleSize) + + /** + * Implement using optimistic lockless concurrency, all state is represented + * by this immutable case class and managed by an AtomicReference. + */ + private /* final */ case class State(history: HeartbeatHistory, freshnessPoint: Option[Long]) // see: https://github.com/scala/bug/issues/4440 + + private val state = new AtomicReference[State](State(history = firstHeartbeat, freshnessPoint = None)) + + override def isAvailable: Boolean = isAvailable(clock()) + + private def isAvailable(timestamp: Long): Boolean = suspicion(timestamp) < threshold + + override def isMonitoring: Boolean = state.get.freshnessPoint.nonEmpty + + @tailrec + final override def heartbeat(): Unit = { + + val timestamp = clock() + val oldState = state.get + + val newHistory = oldState.freshnessPoint match { + case None => + // This is heartbeat from a new resource according to the algorithm do not add any initial history. + firstHeartbeat + case Some(freshnessPoint) => + // This is a known connection. + val tΔ = timestamp - freshnessPoint + oldState.history :+ tΔ + } + + val newState = oldState.copy(history = newHistory, freshnessPoint = Some(timestamp)) // Record new timestamp. + + // If we won the race then update else try again. + if (!state.compareAndSet(oldState, newState)) heartbeat() // Recur. + } + + /** + * The suspicion level of the accrual failure detector (expressed as probability). + * + * If a connection does not have any records in failure detector then it is considered healthy. + */ + def suspicion: Double = suspicion(clock()) + + private def suspicion(timestamp: Long): Double = { + val oldState = state.get + val freshnessPoint = oldState.freshnessPoint + + if (freshnessPoint.isEmpty || oldState.history.intervals.isEmpty) 0.0 // Treat unmanaged connections, e.g. with zero heartbeats, as healthy connections. + else { + val tΔ = timestamp - freshnessPoint.get + val S = oldState.history.intervals + val SLength = S.length + val StΔLength = S.count(_ <= tΔ * scalingFactor) + + StΔLength.toDouble / SLength.toDouble + } + } + +} + diff --git a/src/main/scala/sample/cluster/simple/ClusterListener.scala b/src/main/scala/sample/cluster/simple/ClusterListener.scala index ad5918f..b10264a 100644 --- a/src/main/scala/sample/cluster/simple/ClusterListener.scala +++ b/src/main/scala/sample/cluster/simple/ClusterListener.scala @@ -15,7 +15,7 @@ import akka.cluster.typed.Subscribe object ClusterListener { sealed trait Event - // internal adapted cluster events only + // Internal adapted cluster events only. private final case class ReachabilityChange(reachabilityEvent: ReachabilityEvent) extends Event private final case class MemberChange(event: MemberEvent) extends Event diff --git a/src/main/scala/sample/factorial/FactorialService.scala b/src/main/scala/sample/factorial/FactorialService.scala new file mode 100644 index 0000000..5981fc0 --- /dev/null +++ b/src/main/scala/sample/factorial/FactorialService.scala @@ -0,0 +1,59 @@ +import akka.actor.{ Actor, ActorLogging, ReceiveTimeout } +import akka.routing.FromConfig + +import scala.annotation.tailrec +import scala.concurrent.Future +import scala.concurrent.duration._ +import akka.pattern.pipe + +class FactorialBackend extends Actor with ActorLogging { + + import context.dispatcher + + def receive = { + case (n: Int) => + Future(factorial(n)) + .map { result => + (n, result) + } + .pipeTo(sender()) + } + + def factorial(n: Int): BigInt = { + @tailrec def factorialAcc(acc: BigInt, n: Int): BigInt = { + if (n <= 1) acc + else factorialAcc(acc * n, n - 1) + } + factorialAcc(BigInt(1), n) + } + +} + +class FactorialFrontend(upToN: Int, repeat: Boolean) extends Actor with ActorLogging { + + val backend = context.actorOf(FromConfig.props(), name = "factorialBackendRouter") + + override def preStart(): Unit = { + sendJobs() + if (repeat) { + context.setReceiveTimeout(10.seconds) + } + } + + def receive = { + case (n: Int, factorial: BigInt) => + if (n == upToN) { + log.debug("{}! = {}", n, factorial) + if (repeat) sendJobs() + else context.stop(self) + } + case ReceiveTimeout => + log.info("Timeout") + sendJobs() + } + + def sendJobs(): Unit = { + log.info("Starting batch of factorials up to [{}]", upToN) + (1 to upToN).foreach { backend ! _ } + } +} diff --git a/src/test/resources/logback-test.xml b/src/test/resources/logback-test.xml new file mode 100644 index 0000000..d33c48e --- /dev/null +++ b/src/test/resources/logback-test.xml @@ -0,0 +1,32 @@ + + + + + + INFO + + + [%date{ISO8601}] [%level] [%logger] [%thread] [%X{akkaSource}] - %msg%n + + + + + + + + + + + + + + +