diff --git a/quorums/__init__.py b/quorums/__init__.py index 3c10e52..ac42cf1 100644 --- a/quorums/__init__.py +++ b/quorums/__init__.py @@ -1,5 +1,5 @@ from .expr import Node, choose, majority -from .quorum_system import QuorumSystem +from .quorum_system import QuorumSystem, Strategy from .viz import ( plot_node_load, plot_node_load_on, diff --git a/quorums/geometry.py b/quorums/geometry.py index 41eef4e..246dc48 100644 --- a/quorums/geometry.py +++ b/quorums/geometry.py @@ -1,6 +1,5 @@ from typing import Any, Callable, List, NamedTuple, Optional, Tuple import math -import unittest class Point(NamedTuple): @@ -90,165 +89,3 @@ def max_of_segments(segments: List[Segment]) -> List[Tuple[float, float]]: xs.append(p.x) xs.sort() return [(x, max(segments, key=lambda s: s(x))(x)) for x in xs] - - -class TestGeometry(unittest.TestCase): - def test_eq(self): - l = Point(0, 1) - r = Point(1, 1) - m = Point(0.5, 0.5) - self.assertEqual(Segment(l, r), Segment(l, r)) - self.assertNotEqual(Segment(l, r), Segment(l, m)) - - def test_compatible(self): - s1 = Segment(Point(0, 1), Point(1, 2)) - s2 = Segment(Point(0, 2), Point(1, 1)) - s3 = Segment(Point(0.5, 2), Point(1, 1)) - self.assertTrue(s1.compatible(s2)) - self.assertTrue(s2.compatible(s1)) - self.assertFalse(s1.compatible(s3)) - self.assertFalse(s3.compatible(s1)) - self.assertFalse(s2.compatible(s3)) - self.assertFalse(s3.compatible(s2)) - - def test_call(self): - segment = Segment(Point(0, 0), Point(1, 1)) - for x in [0.0, 0.25, 0.5, 0.75, 1.0]: - self.assertEqual(segment(x), x) - - segment = Segment(Point(0, 0), Point(1, 2)) - for x in [0.0, 0.25, 0.5, 0.75, 1.0]: - self.assertEqual(segment(x), 2*x) - - segment = Segment(Point(1, 2), Point(3, 6)) - for x in [1.0, 1.25, 1.5, 1.75, 2.0, 2.25, 2.5, 2.75, 3.0]: - self.assertEqual(segment(x), 2*x) - - segment = Segment(Point(0, 1), Point(1, 0)) - for x in [0.0, 0.25, 0.5, 0.75, 1.0]: - self.assertEqual(segment(x), 1 - x) - - def test_slope(self): - self.assertEqual(Segment(Point(0, 0), Point(1, 1)).slope(), 1.0) - self.assertEqual(Segment(Point(0, 1), Point(1, 2)).slope(), 1.0) - self.assertEqual(Segment(Point(1, 1), Point(2, 2)).slope(), 1.0) - self.assertEqual(Segment(Point(1, 1), Point(2, 3)).slope(), 2.0) - self.assertEqual(Segment(Point(1, 1), Point(2, 0)).slope(), -1.0) - - def test_above(self): - s1 = Segment(Point(0, 0), Point(1, 0.5)) - s2 = Segment(Point(0, 0.5), Point(1, 2)) - s3 = Segment(Point(0, 1.5), Point(1, 0.5)) - - self.assertFalse(s1.above(s1)) - self.assertFalse(s1.above(s2)) - self.assertFalse(s1.above(s3)) - - self.assertTrue(s2.above(s1)) - self.assertFalse(s2.above(s2)) - self.assertFalse(s2.above(s3)) - - self.assertTrue(s3.above(s1)) - self.assertFalse(s3.above(s2)) - self.assertFalse(s3.above(s3)) - - def test_above_eq(self): - s1 = Segment(Point(0, 0), Point(1, 0.5)) - s2 = Segment(Point(0, 0.5), Point(1, 2)) - s3 = Segment(Point(0, 1.5), Point(1, 0.5)) - - self.assertTrue(s1.above_eq(s1)) - self.assertFalse(s1.above_eq(s2)) - self.assertFalse(s1.above_eq(s3)) - - self.assertTrue(s2.above_eq(s1)) - self.assertTrue(s2.above_eq(s2)) - self.assertFalse(s2.above_eq(s3)) - - self.assertTrue(s3.above_eq(s1)) - self.assertFalse(s3.above_eq(s2)) - self.assertTrue(s3.above_eq(s3)) - - def test_intersects(self): - s1 = Segment(Point(0, 0), Point(1, 0.5)) - s2 = Segment(Point(0, 0.5), Point(1, 2)) - s3 = Segment(Point(0, 1.5), Point(1, 0.5)) - - self.assertTrue(s1.intersects(s1)) - self.assertFalse(s1.intersects(s2)) - self.assertTrue(s1.intersects(s3)) - - self.assertFalse(s2.intersects(s1)) - self.assertTrue(s2.intersects(s2)) - self.assertTrue(s2.intersects(s3)) - - self.assertTrue(s3.intersects(s1)) - self.assertTrue(s3.intersects(s2)) - self.assertTrue(s3.intersects(s3)) - - def test_intersection(self): - s1 = Segment(Point(0, 0), Point(1, 1)) - s2 = Segment(Point(0, 1), Point(1, 0)) - s3 = Segment(Point(0, 1), Point(1, 1)) - s4 = Segment(Point(0, 0.25), Point(1, 0.25)) - - self.assertEqual(s1.intersection(s1), None) - self.assertEqual(s1.intersection(s2), Point(0.5, 0.5)) - self.assertEqual(s1.intersection(s3), Point(1, 1)) - self.assertEqual(s1.intersection(s4), Point(0.25, 0.25)) - - self.assertEqual(s2.intersection(s1), Point(0.5, 0.5)) - self.assertEqual(s2.intersection(s2), None) - self.assertEqual(s2.intersection(s3), Point(0, 1)) - self.assertEqual(s2.intersection(s4), Point(0.75, 0.25)) - - self.assertEqual(s3.intersection(s1), Point(1, 1)) - self.assertEqual(s3.intersection(s2), Point(0, 1)) - self.assertEqual(s3.intersection(s3), None) - self.assertEqual(s3.intersection(s4), None) - - self.assertEqual(s4.intersection(s1), Point(0.25, 0.25)) - self.assertEqual(s4.intersection(s2), Point(0.75, 0.25)) - self.assertEqual(s4.intersection(s3), None) - self.assertEqual(s4.intersection(s4), None) - - def test_max_one_segment(self): - s1 = Segment(Point(0, 0), Point(1, 1)) - s2 = Segment(Point(0, 1), Point(1, 0)) - s3 = Segment(Point(0, 1), Point(1, 1)) - s4 = Segment(Point(0, 0.25), Point(1, 0.25)) - s5 = Segment(Point(0, 0.75), Point(1, 0.75)) - - def is_subset(xs: List[Any], ys: List[Any]) -> bool: - return all(x in ys for x in xs) - - for s in [s1, s2, s3, s4, s5]: - self.assertEqual(max_of_segments([s]), [s.l, s.r]) - - expected = [ - ([s1, s1], [(0, 0), (1, 1)]), - ([s1, s2], [(0, 1), (0.5, 0.5), (1, 1)]), - ([s1, s3], [(0, 1), (1, 1)]), - ([s1, s4], [(0, 0.25), (0.25, 0.25), (1, 1)]), - ([s1, s5], [(0, 0.75), (0.75, 0.75), (1, 1)]), - ([s2, s2], [(0, 1), (1, 0)]), - ([s2, s3], [(0, 1), (1, 1)]), - ([s2, s4], [(0, 1), (0.75, 0.25), (1, 0.25)]), - ([s2, s5], [(0, 1), (0.25, 0.75), (1, 0.75)]), - ([s3, s3], [(0, 1), (1, 1)]), - ([s3, s4], [(0, 1), (1, 1)]), - ([s3, s5], [(0, 1), (1, 1)]), - ([s4, s4], [(0, 0.25), (1, 0.25)]), - ([s4, s5], [(0, 0.75), (1, 0.75)]), - ([s5, s5], [(0, 0.75), (1, 0.75)]), - - ([s1, s2, s4], [(0, 1), (0.5, 0.5), (1, 1)]), - ([s1, s2, s5], [(0, 1), (0.25, 0.75), (0.75, 0.75), (1, 1)]), - ] - for segments, path in expected: - self.assertTrue(is_subset(path, max_of_segments(segments))) - self.assertTrue(is_subset(path, max_of_segments(segments[::-1]))) - - -if __name__ == '__main__': - unittest.main() diff --git a/quorums/quorum_system.py b/quorums/quorum_system.py index 0646a99..cbaac8b 100644 --- a/quorums/quorum_system.py +++ b/quorums/quorum_system.py @@ -1,6 +1,3 @@ -# TODO(mwhittaker): We can define a set of read quorums that are not minimal. -# Does this mess things up? - from . import distribution from . import geometry from .distribution import Distribution @@ -16,7 +13,6 @@ import pulp T = TypeVar('T') - LOAD = 'load' NETWORK = 'network' LATENCY = 'latency' @@ -29,8 +25,7 @@ class QuorumSystem(Generic[T]): writes: Optional[Expr[T]] = None) -> None: if reads is not None and writes is not None: optimal_writes = reads.dual() - if not all(optimal_writes.is_quorum(write_quorum) - for write_quorum in writes.quorums()): + if not all(optimal_writes.is_quorum(wq) for wq in writes.quorums()): raise ValueError( 'Not all read quorums intersect all write quorums') @@ -63,9 +58,15 @@ class QuorumSystem(Generic[T]): def is_write_quorum(self, xs: Set[T]) -> bool: return self.writes.is_quorum(xs) + def node(self, x: T) -> Node[T]: + return self.x_to_node[x] + def nodes(self) -> Set[Node[T]]: return self.reads.nodes() | self.writes.nodes() + def elements(self) -> Set[T]: + return {node.x for node in self.nodes()} + def resilience(self) -> int: return min(self.read_resilience(), self.write_resilience()) @@ -75,6 +76,107 @@ class QuorumSystem(Generic[T]): def write_resilience(self) -> int: return self.writes.resilience() + def dup_free(self) -> bool: + return self.reads.dup_free() and self.writes.dup_free() + + def load(self, + optimize: str = LOAD, + load_limit: Optional[float] = None, + network_limit: Optional[float] = None, + latency_limit: Optional[datetime.timedelta] = None, + read_fraction: Optional[Distribution] = None, + write_fraction: Optional[Distribution] = None, + f: int = 0) -> float: + return self.strategy( + optimize, + load_limit, + network_limit, + latency_limit, + read_fraction, + write_fraction, + f + ).load(read_fraction, write_fraction) + + def capacity(self, + optimize: str = LOAD, + load_limit: Optional[float] = None, + network_limit: Optional[float] = None, + latency_limit: Optional[datetime.timedelta] = None, + read_fraction: Optional[Distribution] = None, + write_fraction: Optional[Distribution] = None, + f: int = 0) -> float: + return self.strategy( + optimize, + load_limit, + network_limit, + latency_limit, + read_fraction, + write_fraction, + f + ).capacity(read_fraction, write_fraction) + + def network_load(self, + optimize: str = LOAD, + load_limit: Optional[float] = None, + network_limit: Optional[float] = None, + latency_limit: Optional[datetime.timedelta] = None, + read_fraction: Optional[Distribution] = None, + write_fraction: Optional[Distribution] = None, + f: int = 0) -> float: + return self.strategy( + optimize, + load_limit, + network_limit, + latency_limit, + read_fraction, + write_fraction, + f + ).network_load(read_fraction, write_fraction) + + def latency(self, + optimize: str = LOAD, + load_limit: Optional[float] = None, + network_limit: Optional[float] = None, + latency_limit: Optional[datetime.timedelta] = None, + read_fraction: Optional[Distribution] = None, + write_fraction: Optional[Distribution] = None, + f: int = 0) -> float: + return self.strategy( + optimize, + load_limit, + network_limit, + latency_limit, + read_fraction, + write_fraction, + f + ).latency(read_fraction, write_fraction) + + def uniform_strategy(self, f: int = 0) -> 'Strategy[T]': + """ + uniform_strategy(f) returns a uniform strategy over the minimal + f-resilient quorums. That is, every minimal f-resilient quorum is + equally likely to be chosen. + """ + if f < 0: + raise ValueError('f must be >= 0') + elif f == 0: + read_quorums = list(self.read_quorums()) + write_quorums = list(self.write_quorums()) + else: + xs = list(self.elements()) + read_quorums = list(self._f_resilient_quorums(f, xs, self.reads)) + write_quorums = list(self._f_resilient_quorums(f, xs, self.reads)) + if len(read_quorums) == 0: + raise ValueError(f'There are no {f}-resilient read quorums') + if len(write_quorums) == 0: + raise ValueError(f'There are no {f}-resilient write quorums') + + read_quorums = self._minimize(read_quorums) + write_quorums = self._minimize(write_quorums) + sigma_r = {frozenset(rq): 1 / len(rq) for rq in read_quorums} + sigma_w = {frozenset(wq): 1 / len(wq) for wq in write_quorums} + return Strategy(self, sigma_r, sigma_w) + def strategy(self, optimize: str = LOAD, load_limit: Optional[float] = None, @@ -82,10 +184,10 @@ class QuorumSystem(Generic[T]): latency_limit: Optional[datetime.timedelta] = None, read_fraction: Optional[Distribution] = None, write_fraction: Optional[Distribution] = None, - f: int = 0) \ - -> 'Strategy[T]': - if f < 0: - raise ValueError('f must be >= 0') + f: int = 0) -> 'Strategy[T]': + if optimize not in {LOAD, NETWORK, LATENCY}: + raise ValueError( + f'optimize must be one of {LOAD}, {NETWORK}, or {LATENCY}') if optimize == LOAD and load_limit is not None: raise ValueError( @@ -99,6 +201,9 @@ class QuorumSystem(Generic[T]): raise ValueError( 'a latency limit cannot be set when optimizing for latency') + if f < 0: + raise ValueError('f must be >= 0') + d = distribution.canonicalize_rw(read_fraction, write_fraction) if f == 0: return self._load_optimal_strategy( @@ -110,7 +215,7 @@ class QuorumSystem(Generic[T]): network_limit=network_limit, latency_limit=latency_limit) else: - xs = [node.x for node in self.nodes()] + xs = list(self.elements()) read_quorums = list(self._f_resilient_quorums(f, xs, self.reads)) write_quorums = list(self._f_resilient_quorums(f, xs, self.reads)) if len(read_quorums) == 0: @@ -126,13 +231,23 @@ class QuorumSystem(Generic[T]): network_limit=network_limit, latency_limit=latency_limit) - def dup_free(self) -> bool: - return self.reads.dup_free() and self.writes.dup_free() + def _minimize(self, sets: List[Set[T]]) -> List[Set[T]]: + sets = sorted(sets, key=lambda s: len(s)) + minimal_elements: List[Set[T]] = [] + for x in sets: + if not any(x >= y for y in minimal_elements): + minimal_elements.append(x) + return minimal_elements def _f_resilient_quorums(self, f: int, xs: List[T], e: Expr) -> Iterator[Set[T]]: + """ + Consider a set X of elements in xs. We say X is f-resilient if, despite + removing an arbitrary set of f elements from X, X is a quorum in e. + _f_resilient_quorums returns the set of all f-resilient quorums. + """ assert f >= 1 def helper(s: Set[T], i: int) -> Iterator[Set[T]]: @@ -148,25 +263,6 @@ class QuorumSystem(Generic[T]): return helper(set(), 0) - def load(self, - read_fraction: Optional[Distribution] = None, - write_fraction: Optional[Distribution] = None, - f: int = 0) \ - -> float: - return 0 - # TODO(mwhittaker): Remove. - # sigma = self.strategy(read_fraction, write_fraction, f) - # return sigma.load(read_fraction, write_fraction) - - def capacity(self, - read_fraction: Optional[Distribution] = None, - write_fraction: Optional[Distribution] = None, - f: int = 0) \ - -> float: - return 0 - # TODO(mwhittaker): Remove. - # return 1 / self.load(read_fraction, write_fraction, f) - def _read_quorum_latency(self, quorum: Set[Node[T]]) -> datetime.timedelta: return self._quorum_latency(quorum, self.is_read_quorum) @@ -205,54 +301,132 @@ class QuorumSystem(Generic[T]): read_quorums = [{a, b}, {c, d}] write_quorums = [{a, c}, {a, d}, {b, c}, {b, d}] - We can form a linear program to compute the optimal load of this quorum - system for some fixed read fraction fr as follows. First, we create a - variable ri for every read quorum i and a variable wi for every write - quorum i. ri represents the probabilty of selecting the ith read - quorum, and wi represents the probabilty of selecting the ith write - quorum. We introduce an additional variable l that represents the load - and solve the following linear program. + We want to find the strategy that is optimal with respect to load, + network load, or latency that satisfies the provided load, network + load, or latency constraints. - min L subject to - r0 + r1 + r2 = 1 - w0 + w1 = 1 - fr (r0) + (1 - fr) (w0 + w1) <= L # a's load - fr (r0) + (1 - fr) (w2 + w3) <= L # b's load - fr (r1) + (1 - fr) (w0 + w2) <= L # c's load - fr (r1) + (1 - fr) (w1 + w3) <= L # d's load + We can find the optimal strategy using linear programming. First, we + create a variable ri for every read quorum i and a variable wi for + every write quorum i. ri represents the probabilty of selecting the ith + read quorum, and wi represents the probabilty of selecting the ith + write quorum. - If we assume every element x has read capacity rcap_x and write - capacity wcap_x, then we adjust the linear program like this. + We now explain how to represent load, network load, and latency as + linear expressions. - min L subject to - r0 + r1 + r2 = 1 - w0 + w1 = 1 - fr/rcap_a (r0) + (1 - fr)/wcap_a (w0 + w1) <= L # a's load - fr/rcap_b (r0) + (1 - fr)/wcap_b (w2 + w3) <= L # b's load - fr/rcap_c (r1) + (1 - fr)/wcap_c (w0 + w2) <= L # c's load - fr/rcap_d (r1) + (1 - fr)/wcap_d (w1 + w3) <= L # d's load + Load + ==== + Assume a read fraction fr and write fraction fw. The load of a node a is - Assume we have fr = 0.9 with 80% probabilty and fr = 0.5 with 20%. Then - we adjust the linear program as follows to find the strategy that - minimzes the average load. + load(a) = (fr * rprob(a) / rcap(a)) + (fw * wprob(a) / wcap(a)) - min 0.8 * L_0.9 + 0.2 * L_0.5 subject to - r0 + r1 + r2 = 1 - w0 + w1 = 1 - 0.9/rcap_a (r0) + 0.1/wcap_a (w0 + w1) <= L_0.9 # a's load - 0.9/rcap_b (r0) + 0.1/wcap_b (w2 + w3) <= L_0.9 # b's load - 0.9/rcap_c (r1) + 0.1/wcap_c (w0 + w2) <= L_0.9 # c's load - 0.9/rcap_d (r1) + 0.1/wcap_d (w1 + w3) <= L_0.9 # d's load - 0.5/rcap_a (r0) + 0.5/wcap_a (w0 + w1) <= L_0.5 # a's load - 0.5/rcap_b (r0) + 0.5/wcap_b (w2 + w3) <= L_0.5 # b's load - 0.5/rcap_c (r1) + 0.5/wcap_c (w0 + w2) <= L_0.5 # c's load - 0.5/rcap_d (r1) + 0.5/wcap_d (w1 + w3) <= L_0.5 # d's load + where prob_r(a) and prob_w(a) are the probabilities that a is selected + as part of a read or write quorum respectively; and rcap(a) and wcap(a) + are the read and write capacities of a. We can express prob_r(a) and + prob_w(a) as follows: + + rprob(a) = sum({ri | a is in read quorum i}) + wprob(a) = sum({wi | a is in write quorum i}) + + Using the example grid quorum above, we have: + + rprob(a) = r0 wprob(a) = w0 + w1 + rprob(b) = r0 wprob(b) = w2 + w3 + rprob(c) = r1 wprob(c) = w0 + w2 + rprob(d) = r1 wprob(d) = w1 + w3 + + The load of a strategy is the maximum load on any node. We can compute + this by minimizing a new variable l and constraining the load of every + node to be less than l. Using the example above, we have + + min l subject to + fr * r0 * rcap(a) + fw * (w0 + w1) * wcap(a) <= l + fr * r0 * rcap(b) + fw * (w2 + w3) * wcap(b) <= l + fr * r1 * rcap(c) + fw * (w0 + w2) * wcap(c) <= l + fr * r1 * rcap(d) + fw * (w1 + w3) * wcap(d) <= l + + To compute the load of a strategy with respect to a distribution of + read_fractions, we compute the load for every value of fr and weight + according to the distribution. For example, imagine fr is 0.9 80% of + the time and 0.5 20% of the time. We have: + + min 0.8 * l0.9 + 0.2 * l0.5 + 0.9 * r0 * rcap(a) + 0.1 * (w0 + w1) * wcap(a) <= l0.9 + 0.9 * r0 * rcap(b) + 0.1 * (w2 + w3) * wcap(b) <= l0.9 + 0.9 * r1 * rcap(c) + 0.1 * (w0 + w2) * wcap(c) <= l0.9 + 0.9 * r1 * rcap(d) + 0.1 * (w1 + w3) * wcap(d) <= l0.9 + 0.5 * r0 * rcap(a) + 0.5 * (w0 + w1) * wcap(a) <= l0.5 + 0.5 * r0 * rcap(b) + 0.5 * (w2 + w3) * wcap(b) <= l0.5 + 0.5 * r1 * rcap(c) + 0.5 * (w0 + w2) * wcap(c) <= l0.5 + 0.5 * r1 * rcap(d) + 0.5 * (w1 + w3) * wcap(d) <= l0.5 + + Let the expression for load be LOAD. + + Network + ======= + The network load of a strategy is the expected size of a quorum. For a + fixed fr, We can compute the network load as: + + fr * sum_i(size(read quorum i) * ri) + + fw * sum_i(size(write quorum i) * ri) + + Using the example above: + + fr * (2*r0 + 2*r1) + fw * (2*w0 + 2*w1 + 2*w2 + 2*w3) + + For a distribution of read fractions, we compute the weighted average. + Let the expression for network load be NETWORK. + + Latency + ======= + The latency of a strategy is the expected latency of a quorum. We can + compute the latency as: + + fr * sum_i(latency(read quorum i) * ri) + + fw * sum_i(latency(write quorum i) * ri) + + Using the example above (assuming every node has a latency of 1): + + fr * (1*r0 + 1*r1) + fw * (1*w0 + 1*w1 + 1*w2 + 1*w3) + + For a distribution of read fractions, we compute the weighted average. + Let the expression for latency be LATENCY. + + Linear Program + ============== + To find an optimal strategy, we use a linear program. The objective + specified by the user is minimized, and any provided constraints are + added as constraints to the program. For example, imagine the user + wants a load optimal strategy with network load <= 2 and latency <= 3. + We form the program: + + min LOAD subject to + sum_i(ri) = 1 # ensure we have a valid distribution on read quorums + sum_i(wi) = 1 # ensure we have a valid distribution on write quorums + NETWORK <= 2 + LATENCY <= 3 + + Using the example above assuming a fixed fr, we have: + + min l subject to + fr * r0 * rcap(a) + fw * (w0 + w1) * wcap(a) <= l + fr * r0 * rcap(b) + fw * (w2 + w3) * wcap(b) <= l + fr * r1 * rcap(c) + fw * (w0 + w2) * wcap(c) <= l + fr * r1 * rcap(d) + fw * (w1 + w3) * wcap(d) <= l + fr * (2*r0 + 2*r1) + fw * (2*w0 + 2*w1 + 2*w2 + 2*w3) <= 2 + fr * (1*r0 + 1*r1) + fw * (1*w0 + 1*w1 + 1*w2 + 1*w3) <= 3 + + If we instead wanted to minimize network load with load <= 4 and + latency <= 5, we would have the following program: + + min fr * (2*r0 + 2*r1) + + fw * (2*w0 + 2*w1 + 2*w2 + 2*w3) subject to + fr * r0 * rcap(a) + fw * (w0 + w1) * wcap(a) <= 4 + fr * r0 * rcap(b) + fw * (w2 + w3) * wcap(b) <= 4 + fr * r1 * rcap(c) + fw * (w0 + w2) * wcap(c) <= 4 + fr * r1 * rcap(d) + fw * (w1 + w3) * wcap(d) <= 4 + fr * (1*r0 + 1*r1) + fw * (1*w0 + 1*w1 + 1*w2 + 1*w3) <= 5 """ - nodes = self.nodes() - x_to_node = {node.x: node for node in nodes} - read_capacity = {node.x: node.read_capacity for node in nodes} - write_capacity = {node.x: node.write_capacity for node in nodes} - # Create a variable for every read quorum and every write quorum. While # we do this, map each element x to the read and write quorums that # it's in. For example, image we have the following read and write @@ -285,46 +459,46 @@ class QuorumSystem(Generic[T]): for x in write_quorum: x_to_write_quorum_vars[x].append(v) - fr = sum(weight * f for (f, weight) in read_fraction.items()) + fr = sum(p * fr for (fr, p) in read_fraction.items()) def network() -> pulp.LpAffineExpression: - read_network = fr * sum( + reads = fr * sum( v * len(rq) for (rq, v) in zip(read_quorums, read_quorum_vars) ) - write_network = (1 - fr) * sum( + writes = (1 - fr) * sum( v * len(wq) for (wq, v) in zip(write_quorums, write_quorum_vars) ) - return read_network + write_network + return reads + writes def latency() -> pulp.LpAffineExpression: - read_latency = fr * sum( + reads = fr * sum( v * self._read_quorum_latency(quorum).total_seconds() for (rq, v) in zip(read_quorums, read_quorum_vars) - for quorum in [{x_to_node[x] for x in rq}] + for quorum in [{self.node(x) for x in rq}] ) - write_latency = (1. - fr) * sum( + writes = (1 - fr) * sum( v * self._write_quorum_latency(quorum).total_seconds() for (wq, v) in zip(write_quorums, write_quorum_vars) - for quorum in [{x_to_node[x] for x in wq}] + for quorum in [{self.node(x) for x in wq}] ) - return read_latency + write_latency + return reads + writes def fr_load(problem: pulp.LpProblem, fr: float) -> pulp.LpAffineExpression: l = pulp.LpVariable(f'l_{fr}', 0, 1) - for node in nodes: + for node in self.nodes(): x = node.x x_load: pulp.LpAffineExpression = 0 if x in x_to_read_quorum_vars: vs = x_to_read_quorum_vars[x] - x_load += fr * sum(vs) / read_capacity[x] + x_load += fr * sum(vs) / self.node(x).read_capacity if x in x_to_write_quorum_vars: vs = x_to_write_quorum_vars[x] - x_load += (1 - fr) * sum(vs) / write_capacity[x] + x_load += (1 - fr) * sum(vs) / self.node(x).write_capacity problem += (x_load <= l, f'{x}{fr}') @@ -332,11 +506,11 @@ class QuorumSystem(Generic[T]): def load(problem: pulp.LpProblem, read_fraction: Dict[float, float]) -> pulp.LpAffineExpression: - return sum(weight * fr_load(problem, fr) - for (fr, weight) in read_fraction.items()) + return sum(p * fr_load(problem, fr) + for (fr, p) in read_fraction.items()) - # Form the linear program to find the load. - problem = pulp.LpProblem("load", pulp.LpMinimize) + # Form the linear program. + problem = pulp.LpProblem("optimal_strategy", pulp.LpMinimize) # We add these constraints to make sure that the probabilities we # select form valid probabilty distributions. @@ -365,152 +539,147 @@ class QuorumSystem(Generic[T]): 'latency limit') # Solve the linear program. - print(problem) problem.solve(pulp.apis.PULP_CBC_CMD(msg=False)) if problem.status != pulp.LpStatusOptimal: raise ValueError('no strategy satisfies the given constraints') # Prune out any quorums with 0 probability. - non_zero_read_quorums = [ - (rq, v.varValue) + sigma_r = { + frozenset(rq): v.varValue for (rq, v) in zip(read_quorums, read_quorum_vars) - if v.varValue != 0] - non_zero_write_quorums = [ - (wq, v.varValue) + if v.varValue != 0 + } + sigma_w = { + frozenset(wq): v.varValue for (wq, v) in zip(write_quorums, write_quorum_vars) - if v.varValue != 0] - return Strategy(self, - [rq for (rq, _) in non_zero_read_quorums], - [weight for (_, weight) in non_zero_read_quorums], - [wq for (wq, _) in non_zero_write_quorums], - [weight for (_, weight) in non_zero_write_quorums]) + if v.varValue != 0 + } + + return Strategy(self, sigma_r, sigma_w) class Strategy(Generic[T]): def __init__(self, qs: QuorumSystem[T], - reads: List[Set[T]], - read_weights: List[float], - writes: List[Set[T]], - write_weights: List[float]) -> None: + sigma_r: Dict[FrozenSet[T], float], + sigma_w: Dict[FrozenSet[T], float]) -> None: self.qs = qs - self.reads = reads - self.read_weights = read_weights - self.writes = writes - self.write_weights = write_weights + self.sigma_r = sigma_r + self.sigma_w = sigma_w - self.unweighted_read_load: Dict[T, float] = \ - collections.defaultdict(float) - for (read_quorum, weight) in zip(self.reads, self.read_weights): + # The probability that x is chosen as part of a read quorum. + self.x_read_probability: Dict[T, float] = collections.defaultdict(float) + for (read_quorum, p) in self.sigma_r.items(): for x in read_quorum: - self.unweighted_read_load[x] += weight + self.x_read_probability[x] += p - self.unweighted_write_load: Dict[T, float] = \ - collections.defaultdict(float) - for (write_quorum, weight) in zip(self.writes, self.write_weights): + # The probability that x is chosen as part of a write quorum. + self.x_write_probability: Dict[T, float] = collections.defaultdict(float) + for (write_quorum, weight) in self.sigma_w.items(): for x in write_quorum: - self.unweighted_write_load[x] += weight + self.x_write_probability[x] += weight + @no_type_check def __str__(self) -> str: - non_zero_reads = {tuple(r): p - for (r, p) in zip(self.reads, self.read_weights) - if p > 0} - non_zero_writes = {tuple(w): p - for (w, p) in zip(self.writes, self.write_weights) - if p > 0} - return f'Strategy(reads={non_zero_reads}, writes={non_zero_writes})' + # T may not comparable, so mypy complains about this sort. + reads = {tuple(sorted(rq)): p for (rq, p) in self.sigma_r.items()} + writes = {tuple(sorted(wq)): p for (wq, p) in self.sigma_w.items()} + return f'Strategy(reads={reads}, writes={writes})' + + def quorum_system(self) -> QuorumSystem[T]: + return self.qs + + def node(self, x: T) -> Node[T]: + return self.qs.node(x) + + def nodes(self) -> Set[Node[T]]: + return self.qs.nodes() def get_read_quorum(self) -> Set[T]: - return np.random.choice(self.reads, p=self.read_weights) + return set(np.random.choice(list(self.sigma_r.keys()), + p=list(self.sigma_r.values()))) def get_write_quorum(self) -> Set[T]: - return np.random.choice(self.writes, p=self.write_weights) + return set(np.random.choice(list(self.sigma_w.keys()), + p=list(self.sigma_w.values()))) def load(self, read_fraction: Optional[Distribution] = None, - write_fraction: Optional[Distribution] = None) \ - -> float: + write_fraction: Optional[Distribution] = None) -> float: d = distribution.canonicalize_rw(read_fraction, write_fraction) - return sum(weight * self._load(fr) - for (fr, weight) in d.items()) + return sum(p * self._load(fr) for (fr, p) in d.items()) - # TODO(mwhittaker): Rename throughput. def capacity(self, read_fraction: Optional[Distribution] = None, - write_fraction: Optional[Distribution] = None) \ - -> float: + write_fraction: Optional[Distribution] = None) -> float: return 1 / self.load(read_fraction, write_fraction) def network_load(self, read_fraction: Optional[Distribution] = None, write_fraction: Optional[Distribution] = None) -> float: d = distribution.canonicalize_rw(read_fraction, write_fraction) - fr = sum(weight * f for (f, weight) in d.items()) - read_network_load = fr * sum( - len(rq) * p - for (rq, p) in zip(self.reads, self.read_weights) - ) - write_network_load = (1 - fr) * sum( - len(wq) * p - for (wq, p) in zip(self.writes, self.write_weights) - ) - return read_network_load + write_network_load + fr = sum(p * fr for (fr, p) in d.items()) + reads = fr * sum(p * len(rq) for (rq, p) in self.sigma_r.items()) + writes = (1 - fr) * sum(p * len(wq) for (wq, p) in self.sigma_w.items()) + return reads + writes + # mypy doesn't like calling sum with timedeltas. + @no_type_check def latency(self, read_fraction: Optional[Distribution] = None, write_fraction: Optional[Distribution] = None) \ -> datetime.timedelta: d = distribution.canonicalize_rw(read_fraction, write_fraction) - fr = sum(weight * f for (f, weight) in d.items()) + fr = sum(p * fr for (fr, p) in d.items()) - read_latency = fr * sum(( - self.qs._read_quorum_latency(quorum) * p # type: ignore - for (rq, p) in zip(self.reads, self.read_weights) - for quorum in [{self.qs.x_to_node[x] for x in rq}] - ), datetime.timedelta(seconds=0)) # type: ignore - write_latency = (1 - fr) * sum(( - self.qs._write_quorum_latency(quorum) * p # type: ignore - for (wq, p) in zip(self.writes, self.write_weights) - for quorum in [{self.qs.x_to_node[x] for x in wq}] - ), datetime.timedelta(seconds=0)) # type:ignore - return read_latency + write_latency # type: ignore + reads = fr * sum(( + p * self.qs._read_quorum_latency({self.node(x) for x in rq}) + for (rq, p) in self.sigma_r.items() + ), datetime.timedelta(seconds=0)) + + writes = (1 - fr) * sum(( + p * self.qs._write_quorum_latency({self.node(x) for x in wq}) + for (wq, p) in self.sigma_w.items() + ), datetime.timedelta(seconds=0)) + + return reads + writes def node_load(self, node: Node[T], read_fraction: Optional[Distribution] = None, - write_fraction: Optional[Distribution] = None) \ - -> float: + write_fraction: Optional[Distribution] = None) -> float: d = distribution.canonicalize_rw(read_fraction, write_fraction) - return sum(weight * self._node_load(node.x, fr) - for (fr, weight) in d.items()) + return sum(p * self._node_load(node, fr) for (fr, p) in d.items()) def node_utilization(self, node: Node[T], read_fraction: Optional[Distribution] = None, write_fraction: Optional[Distribution] = None) \ -> float: - # TODO(mwhittaker): Implement. - return 0.0 + d = distribution.canonicalize_rw(read_fraction, write_fraction) + return sum(p * self._node_utilization(node, fr) + for (fr, p) in d.items()) - def node_throghput(self, - node: Node[T], - read_fraction: Optional[Distribution] = None, - write_fraction: Optional[Distribution] = None) \ - -> float: - # TODO(mwhittaker): Implement. - return 0.0 - - def _node_load(self, x: T, fr: float) -> float: - """ - _node_load returns the load on x given a fixed read fraction fr. - """ - fw = 1 - fr - node = self.qs.x_to_node[x] - return (fr * self.unweighted_read_load[x] / node.read_capacity + - fw * self.unweighted_write_load[x] / node.write_capacity) + def node_throughput(self, + node: Node[T], + read_fraction: Optional[Distribution] = None, + write_fraction: Optional[Distribution] = None) -> float: + d = distribution.canonicalize_rw(read_fraction, write_fraction) + return sum(p * self._node_throughput(node, fr) for (fr, p) in d.items()) def _load(self, fr: float) -> float: - """ - _load returns the load given a fixed read fraction fr. - """ - return max(self._node_load(node.x, fr) for node in self.qs.nodes()) + return max(self._node_load(node, fr) for node in self.nodes()) + + def _node_load(self, node: Node[T], fr: float) -> float: + fw = 1 - fr + return (fr * self.x_read_probability[node.x] / node.read_capacity + + fw * self.x_write_probability[node.x] / node.write_capacity) + + def _node_utilization(self, node: Node[T], fr: float) -> float: + return self._node_load(node, fr) / self._load(fr) + + def _node_throughput(self, node: Node[T], fr: float) -> float: + cap = 1 / self._load(fr) + fw = 1 - fr + return cap * (fr * self.x_read_probability[node.x] + + fw * self.x_write_probability[node.x]) diff --git a/quorums/viz.py b/quorums/viz.py index 2a9a6c8..bebdaeb 100644 --- a/quorums/viz.py +++ b/quorums/viz.py @@ -4,7 +4,7 @@ from .distribution import Distribution from .expr import Node from .geometry import Point, Segment from .quorum_system import Strategy -from typing import Dict, List, Optional, Set, Tuple, TypeVar +from typing import Dict, FrozenSet, List, Optional, Set, Tuple, TypeVar import collections import matplotlib import matplotlib.pyplot as plt @@ -107,19 +107,18 @@ def _plot_node_load_on(ax: plt.Axes, x_index = {x: i for (i, x) in enumerate(x_list)} x_ticks = list(range(len(x_list))) - def one_hot(quorum: Set[T]) -> np.array: + def one_hot(quorum: FrozenSet[T]) -> np.array: bar_heights = np.zeros(len(x_list)) for x in quorum: bar_heights[x_index[x]] = 1 return bar_heights - def plot_quorums(quorums: List[Set[T]], - weights: List[float], + def plot_quorums(sigma: Dict[FrozenSet[T], float], fraction: float, bottoms: np.array, capacities: np.array, cmap: matplotlib.colors.Colormap): - for (i, (quorum, weight)) in enumerate(zip(quorums, weights)): + for (i, (quorum, weight)) in enumerate(sigma.items()): bar_heights = scale * fraction * weight * one_hot(quorum) if scale_by_node_capacity: bar_heights /= capacities @@ -127,7 +126,7 @@ def _plot_node_load_on(ax: plt.Axes, ax.bar(x_ticks, bar_heights, bottom=bottoms, - color=cmap(0.75 - i * 0.5 / len(quorums)), + color=cmap(0.75 - i * 0.5 / len(sigma)), edgecolor='white', width=0.8) for j, (bar_height, bottom) in enumerate(zip(bar_heights, bottoms)): @@ -142,9 +141,9 @@ def _plot_node_load_on(ax: plt.Axes, read_capacities = np.array([node.read_capacity for node in nodes]) write_capacities = np.array([node.write_capacity for node in nodes]) bottoms = np.zeros(len(x_list)) - plot_quorums(sigma.reads, sigma.read_weights, fr, bottoms, read_capacities, + plot_quorums(sigma.sigma_r, fr, bottoms, read_capacities, matplotlib.cm.get_cmap('Reds')) - plot_quorums(sigma.writes, sigma.write_weights, fw, bottoms, + plot_quorums(sigma.sigma_w, fw, bottoms, write_capacities, matplotlib.cm.get_cmap('Blues')) ax.set_xticks(x_ticks) ax.set_xticklabels(str(x) for x in x_list) diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_geometry.py b/tests/test_geometry.py new file mode 100644 index 0000000..8abfc9e --- /dev/null +++ b/tests/test_geometry.py @@ -0,0 +1,162 @@ +from quorums import * +from quorums.geometry import * +from typing import Any, Callable, List, NamedTuple, Optional, Tuple +import unittest + + +class TestGeometry(unittest.TestCase): + def test_eq(self): + l = Point(0, 1) + r = Point(1, 1) + m = Point(0.5, 0.5) + self.assertEqual(Segment(l, r), Segment(l, r)) + self.assertNotEqual(Segment(l, r), Segment(l, m)) + + def test_compatible(self): + s1 = Segment(Point(0, 1), Point(1, 2)) + s2 = Segment(Point(0, 2), Point(1, 1)) + s3 = Segment(Point(0.5, 2), Point(1, 1)) + self.assertTrue(s1.compatible(s2)) + self.assertTrue(s2.compatible(s1)) + self.assertFalse(s1.compatible(s3)) + self.assertFalse(s3.compatible(s1)) + self.assertFalse(s2.compatible(s3)) + self.assertFalse(s3.compatible(s2)) + + def test_call(self): + segment = Segment(Point(0, 0), Point(1, 1)) + for x in [0.0, 0.25, 0.5, 0.75, 1.0]: + self.assertEqual(segment(x), x) + + segment = Segment(Point(0, 0), Point(1, 2)) + for x in [0.0, 0.25, 0.5, 0.75, 1.0]: + self.assertEqual(segment(x), 2*x) + + segment = Segment(Point(1, 2), Point(3, 6)) + for x in [1.0, 1.25, 1.5, 1.75, 2.0, 2.25, 2.5, 2.75, 3.0]: + self.assertEqual(segment(x), 2*x) + + segment = Segment(Point(0, 1), Point(1, 0)) + for x in [0.0, 0.25, 0.5, 0.75, 1.0]: + self.assertEqual(segment(x), 1 - x) + + def test_slope(self): + self.assertEqual(Segment(Point(0, 0), Point(1, 1)).slope(), 1.0) + self.assertEqual(Segment(Point(0, 1), Point(1, 2)).slope(), 1.0) + self.assertEqual(Segment(Point(1, 1), Point(2, 2)).slope(), 1.0) + self.assertEqual(Segment(Point(1, 1), Point(2, 3)).slope(), 2.0) + self.assertEqual(Segment(Point(1, 1), Point(2, 0)).slope(), -1.0) + + def test_above(self): + s1 = Segment(Point(0, 0), Point(1, 0.5)) + s2 = Segment(Point(0, 0.5), Point(1, 2)) + s3 = Segment(Point(0, 1.5), Point(1, 0.5)) + + self.assertFalse(s1.above(s1)) + self.assertFalse(s1.above(s2)) + self.assertFalse(s1.above(s3)) + + self.assertTrue(s2.above(s1)) + self.assertFalse(s2.above(s2)) + self.assertFalse(s2.above(s3)) + + self.assertTrue(s3.above(s1)) + self.assertFalse(s3.above(s2)) + self.assertFalse(s3.above(s3)) + + def test_above_eq(self): + s1 = Segment(Point(0, 0), Point(1, 0.5)) + s2 = Segment(Point(0, 0.5), Point(1, 2)) + s3 = Segment(Point(0, 1.5), Point(1, 0.5)) + + self.assertTrue(s1.above_eq(s1)) + self.assertFalse(s1.above_eq(s2)) + self.assertFalse(s1.above_eq(s3)) + + self.assertTrue(s2.above_eq(s1)) + self.assertTrue(s2.above_eq(s2)) + self.assertFalse(s2.above_eq(s3)) + + self.assertTrue(s3.above_eq(s1)) + self.assertFalse(s3.above_eq(s2)) + self.assertTrue(s3.above_eq(s3)) + + def test_intersects(self): + s1 = Segment(Point(0, 0), Point(1, 0.5)) + s2 = Segment(Point(0, 0.5), Point(1, 2)) + s3 = Segment(Point(0, 1.5), Point(1, 0.5)) + + self.assertTrue(s1.intersects(s1)) + self.assertFalse(s1.intersects(s2)) + self.assertTrue(s1.intersects(s3)) + + self.assertFalse(s2.intersects(s1)) + self.assertTrue(s2.intersects(s2)) + self.assertTrue(s2.intersects(s3)) + + self.assertTrue(s3.intersects(s1)) + self.assertTrue(s3.intersects(s2)) + self.assertTrue(s3.intersects(s3)) + + def test_intersection(self): + s1 = Segment(Point(0, 0), Point(1, 1)) + s2 = Segment(Point(0, 1), Point(1, 0)) + s3 = Segment(Point(0, 1), Point(1, 1)) + s4 = Segment(Point(0, 0.25), Point(1, 0.25)) + + self.assertEqual(s1.intersection(s1), None) + self.assertEqual(s1.intersection(s2), Point(0.5, 0.5)) + self.assertEqual(s1.intersection(s3), Point(1, 1)) + self.assertEqual(s1.intersection(s4), Point(0.25, 0.25)) + + self.assertEqual(s2.intersection(s1), Point(0.5, 0.5)) + self.assertEqual(s2.intersection(s2), None) + self.assertEqual(s2.intersection(s3), Point(0, 1)) + self.assertEqual(s2.intersection(s4), Point(0.75, 0.25)) + + self.assertEqual(s3.intersection(s1), Point(1, 1)) + self.assertEqual(s3.intersection(s2), Point(0, 1)) + self.assertEqual(s3.intersection(s3), None) + self.assertEqual(s3.intersection(s4), None) + + self.assertEqual(s4.intersection(s1), Point(0.25, 0.25)) + self.assertEqual(s4.intersection(s2), Point(0.75, 0.25)) + self.assertEqual(s4.intersection(s3), None) + self.assertEqual(s4.intersection(s4), None) + + def test_max_one_segment(self): + s1 = Segment(Point(0, 0), Point(1, 1)) + s2 = Segment(Point(0, 1), Point(1, 0)) + s3 = Segment(Point(0, 1), Point(1, 1)) + s4 = Segment(Point(0, 0.25), Point(1, 0.25)) + s5 = Segment(Point(0, 0.75), Point(1, 0.75)) + + def is_subset(xs: List[Any], ys: List[Any]) -> bool: + return all(x in ys for x in xs) + + for s in [s1, s2, s3, s4, s5]: + self.assertEqual(max_of_segments([s]), [s.l, s.r]) + + expected = [ + ([s1, s1], [(0, 0), (1, 1)]), + ([s1, s2], [(0, 1), (0.5, 0.5), (1, 1)]), + ([s1, s3], [(0, 1), (1, 1)]), + ([s1, s4], [(0, 0.25), (0.25, 0.25), (1, 1)]), + ([s1, s5], [(0, 0.75), (0.75, 0.75), (1, 1)]), + ([s2, s2], [(0, 1), (1, 0)]), + ([s2, s3], [(0, 1), (1, 1)]), + ([s2, s4], [(0, 1), (0.75, 0.25), (1, 0.25)]), + ([s2, s5], [(0, 1), (0.25, 0.75), (1, 0.75)]), + ([s3, s3], [(0, 1), (1, 1)]), + ([s3, s4], [(0, 1), (1, 1)]), + ([s3, s5], [(0, 1), (1, 1)]), + ([s4, s4], [(0, 0.25), (1, 0.25)]), + ([s4, s5], [(0, 0.75), (1, 0.75)]), + ([s5, s5], [(0, 0.75), (1, 0.75)]), + + ([s1, s2, s4], [(0, 1), (0.5, 0.5), (1, 1)]), + ([s1, s2, s5], [(0, 1), (0.25, 0.75), (0.75, 0.75), (1, 1)]), + ] + for segments, path in expected: + self.assertTrue(is_subset(path, max_of_segments(segments))) + self.assertTrue(is_subset(path, max_of_segments(segments[::-1])))