diff --git a/quorums/__init__.py b/quorums/__init__.py index ac42cf1..f3c87a2 100644 --- a/quorums/__init__.py +++ b/quorums/__init__.py @@ -1,5 +1,6 @@ from .expr import Node, choose, majority -from .quorum_system import QuorumSystem, Strategy +from .quorum_system import NoStrategyFoundError, QuorumSystem, Strategy +from .search import NoQuorumSystemFoundError, search from .viz import ( plot_node_load, plot_node_load_on, diff --git a/quorums/expr.py b/quorums/expr.py index df330e5..43c99fd 100644 --- a/quorums/expr.py +++ b/quorums/expr.py @@ -246,6 +246,12 @@ class Choose(Expr[T]): def choose(k: int, es: List[Expr[T]]) -> Expr[T]: + if len(es) == 0: + raise ValueError('no expressions provided') + + if not (1 <= k <= len(es)): + raise ValueError('k must be in the range [1, len(es)]') + if k == 1: return Or(es) elif k == len(es): @@ -255,4 +261,7 @@ def choose(k: int, es: List[Expr[T]]) -> Expr[T]: def majority(es: List[Expr[T]]) -> Expr[T]: + if len(es) == 0: + raise ValueError('no expressions provided') + return choose(len(es) // 2 + 1, es) diff --git a/quorums/quorum_system.py b/quorums/quorum_system.py index cbaac8b..acbec6e 100644 --- a/quorums/quorum_system.py +++ b/quorums/quorum_system.py @@ -20,6 +20,11 @@ LATENCY = 'latency' # TODO(mwhittaker): Add some other non-optimal strategies. # TODO(mwhittaker): Make it easy to make arbitrary strategies. + +class NoStrategyFoundError(ValueError): + pass + + class QuorumSystem(Generic[T]): def __init__(self, reads: Optional[Expr[T]] = None, writes: Optional[Expr[T]] = None) -> None: @@ -167,9 +172,11 @@ class QuorumSystem(Generic[T]): read_quorums = list(self._f_resilient_quorums(f, xs, self.reads)) write_quorums = list(self._f_resilient_quorums(f, xs, self.reads)) if len(read_quorums) == 0: - raise ValueError(f'There are no {f}-resilient read quorums') + raise NoStrategyFoundError( + f'There are no {f}-resilient read quorums') if len(write_quorums) == 0: - raise ValueError(f'There are no {f}-resilient write quorums') + raise NoStrategyFoundError( + f'There are no {f}-resilient write quorums') read_quorums = self._minimize(read_quorums) write_quorums = self._minimize(write_quorums) @@ -219,9 +226,11 @@ class QuorumSystem(Generic[T]): read_quorums = list(self._f_resilient_quorums(f, xs, self.reads)) write_quorums = list(self._f_resilient_quorums(f, xs, self.reads)) if len(read_quorums) == 0: - raise ValueError(f'There are no {f}-resilient read quorums') + raise NoStrategyFoundError( + f'There are no {f}-resilient read quorums') if len(write_quorums) == 0: - raise ValueError(f'There are no {f}-resilient write quorums') + raise NoStrategyFoundError( + f'There are no {f}-resilient write quorums') return self._load_optimal_strategy( read_quorums, write_quorums, @@ -541,7 +550,8 @@ class QuorumSystem(Generic[T]): # Solve the linear program. problem.solve(pulp.apis.PULP_CBC_CMD(msg=False)) if problem.status != pulp.LpStatusOptimal: - raise ValueError('no strategy satisfies the given constraints') + raise NoStrategyFoundError( + 'no strategy satisfies the given constraints') # Prune out any quorums with 0 probability. sigma_r = { diff --git a/quorums/search.py b/quorums/search.py new file mode 100644 index 0000000..35efbaa --- /dev/null +++ b/quorums/search.py @@ -0,0 +1,135 @@ +from .distribution import Distribution +from .expr import choose, Expr, Node +from .quorum_system import (LATENCY, LOAD, NETWORK, NoStrategyFoundError, + QuorumSystem, Strategy) +from typing import Iterator, List, Optional, TypeVar +import datetime +import itertools + + +T = TypeVar('T') + + +class NoQuorumSystemFoundError(ValueError): + pass + + +def _partitionings(xs: List[T]) -> Iterator[List[List[T]]]: + """ + _partitionings(xs) yields all possible partitionings of xs. For example, + _partitionings([1, 2, 3]) yields + + [[1], [2], [3]] + [[1, 2], [3]] + [[1, 3], [2]] + [[2, 3], [1]] + [[1, 2, 3]] + """ + if len(xs) == 0: + return + + def helper(xs: List[T]) -> Iterator[List[List[T]]]: + if len(xs) == 0: + yield [] + return + + x = xs[0] + for partition in helper(xs[1:]): + yield [[x]] + partition + for i in range(len(partition)): + yield partition[:i] + [[x] + partition[i]] + partition[i+1:] + + yield from helper(xs) + + +def _dup_free_exprs(nodes: List[Node[T]], + max_height: int = 0) -> Iterator[Expr[T]]: + """ + _dup_free_exprs(nodes) yields all possible duplicate free expressions over + `nodes` with height at most `max_height`. If `max_height` is not positive, + there is no height limit. Note that an expression might be yielded more + than once. + """ + assert len(nodes) > 0 + + if len(nodes) == 1: + yield nodes[0] + return + + if max_height == 1: + for k in range(1, len(nodes) + 1): + yield choose(k, nodes) # type: ignore + return + + for partitioning in _partitionings(nodes): + # We ignore the partitioning that includes every node in a single + # partition. + if len(partitioning) == 1: + continue + + subiterators = [_dup_free_exprs(p, max_height-1) for p in partitioning] + for subexprs in itertools.product(*subiterators): + for k in range(1, len(subexprs) + 1): + yield choose(k, list(subexprs)) + + +def search(nodes: List[Node[T]], + read_fraction: Optional[Distribution] = None, + write_fraction: Optional[Distribution] = None, + optimize: str = LOAD, + resilience: int = 0, + load_limit: Optional[float] = None, + network_limit: Optional[float] = None, + latency_limit: Optional[datetime.timedelta] = None, + f: int = 0, + timeout: datetime.timedelta = datetime.timedelta(seconds=0)) \ + -> QuorumSystem[T]: + start_time = datetime.datetime.now() + + def metric(sigma: Strategy[T]) -> float: + if optimize == LOAD: + return sigma.load(read_fraction, write_fraction) + elif optimize == NETWORK: + return sigma.network_load(read_fraction, write_fraction) + else: + return sigma.latency(read_fraction, write_fraction).total_seconds() + + opt_qs: Optional[QuorumSystem[T]] = None + opt_metric: Optional[float] = None + + def do_search(exprs: Iterator[Expr[T]]) -> None: + nonlocal opt_qs + nonlocal opt_metric + + for reads in exprs: + qs = QuorumSystem(reads=reads) + if qs.resilience() < resilience: + continue + + + try: + sigma = qs.strategy(optimize = optimize, + load_limit = load_limit, + network_limit = network_limit, + latency_limit = latency_limit, + read_fraction = read_fraction, + write_fraction = write_fraction, + f = f) + sigma_metric = metric(sigma) + if opt_metric is None or sigma_metric < opt_metric: + opt_qs = qs + opt_metric = sigma_metric + except NoStrategyFoundError: + pass + + if (timeout != datetime.timedelta(seconds=0) and + datetime.datetime.now() - start_time >= timeout): + return + + do_search(_dup_free_exprs(nodes, max_height=2)) + do_search(_dup_free_exprs(nodes)) + + if opt_qs is None: + raise ValueError('no quorum system found') + else: + return opt_qs diff --git a/tests/test_search.py b/tests/test_search.py new file mode 100644 index 0000000..2a9fa3e --- /dev/null +++ b/tests/test_search.py @@ -0,0 +1,227 @@ +from quorums import * +from quorums.expr import Expr +from quorums.search import _dup_free_exprs, _partitionings +from typing import Any, FrozenSet, List +import datetime +import unittest + + +class TestSearch(unittest.TestCase): + def test_partitions(self): + def setify(partitions: List[List[List[Any]]]) \ + -> FrozenSet[FrozenSet[FrozenSet[Any]]]: + return frozenset(frozenset(frozenset(s) for s in partition) + for partition in partitions) + + def assert_equal(x: List[List[Any]], y: List[List[Any]]): + self.assertEqual(setify(x), setify(y)) + + assert_equal(list(_partitionings([])), []) + assert_equal(list(_partitionings([1])), [[[1]]]) + assert_equal(list(_partitionings([1, 2])), [ + [[1], [2]], + [[1, 2]], + ]) + assert_equal(list(_partitionings([1, 2, 3])), [ + [[1], [2], [3]], + [[1, 2], [3]], + [[1, 3], [2]], + [[2, 3], [1]], + [[1, 2, 3]], + ]) + assert_equal(list(_partitionings([1, 2, 3, 4])), [ + [[1], [2], [3], [4]], + [[1, 2], [3], [4]], + [[1, 3], [2], [4]], + [[1, 4], [2], [3]], + [[2, 3], [1], [4]], + [[2, 4], [1], [3]], + [[3, 4], [1], [2]], + [[1, 2], [3, 4]], + [[1, 3], [2, 4]], + [[1, 4], [2, 3]], + [[2, 3, 4], [1]], + [[1, 3, 4], [2]], + [[1, 2, 4], [3]], + [[1, 2, 3], [4]], + [[1, 2, 3, 4]], + ]) + + def test_dup_free_exprs(self): + def quorums(e: Expr) -> FrozenSet[FrozenSet[Any]]: + return frozenset(frozenset(q) for q in e.quorums()) + + def assert_equal(xs: List[Expr], ys: List[Expr]) -> None: + self.assertEqual(frozenset(quorums(x) for x in xs), + frozenset(quorums(y) for y in ys)) + + a = Node('a') + b = Node('b') + c = Node('c') + d = Node('d') + + assert_equal(list(_dup_free_exprs([a])), [a]) + assert_equal(list(_dup_free_exprs([a, b])), [ + a + b, + a * b, + ]) + assert_equal(list(_dup_free_exprs([a, b, c])), [ + a + b + c, + choose(2, [a, b, c]), + a * b * c, + (a + b) + c, + (a + b) * c, + (a * b) + c, + (a * b) * c, + (a + c) + b, + (a + c) * b, + (a * c) + b, + (a * c) * b, + (b + c) + a, + (b + c) * a, + (b * c) + a, + (b * c) * a, + ]) + assert_equal(list(_dup_free_exprs([a, b, c], max_height=1)), [ + a + b + c, + choose(2, [a, b, c]), + a * b * c, + ]) + assert_equal(list(_dup_free_exprs([a, b, c, d], max_height=1)), [ + a + b + c + d, + choose(2, [a, b, c, d]), + choose(3, [a, b, c, d]), + a * b * c * d, + ]) + assert_equal(list(_dup_free_exprs([a, b, c, d], max_height=2)), [ + a + b + c + d, + choose(2, [a, b, c, d]), + choose(3, [a, b, c, d]), + a * b * c * d, + + (a + b) + c + d, + (a + b) * c * d, + (a * b) + c + d, + (a * b) * c * d, + choose(2, [a + b, c, d]), + choose(2, [a * b, c, d]), + + (a + c) + b + d, + (a + c) * b * d, + (a * c) + b + d, + (a * c) * b * d, + choose(2, [a + c, b, d]), + choose(2, [a * c, b, d]), + + (a + d) + b + c, + (a + d) * b * c, + (a * d) + b + c, + (a * d) * b * c, + choose(2, [a + d, b, c]), + choose(2, [a * d, b, c]), + + (b + c) + a + d, + (b + c) * a * d, + (b * c) + a + d, + (b * c) * a * d, + choose(2, [b + c, a, d]), + choose(2, [b * c, a, d]), + + (b + d) + a + c, + (b + d) * a * c, + (b * d) + a + c, + (b * d) * a * c, + choose(2, [b + d, a, c]), + choose(2, [b * d, a, c]), + + (c + d) + a + b, + (c + d) * a * b, + (c * d) + a + b, + (c * d) * a * b, + choose(2, [c + d, a, b]), + choose(2, [c * d, a, b]), + + (a + b) + (c + d), + (a + b) + (c * d), + (a + b) * (c + d), + (a + b) * (c * d), + (a * b) + (c + d), + (a * b) + (c * d), + (a * b) * (c + d), + (a * b) * (c * d), + + (a + c) + (b + d), + (a + c) + (b * d), + (a + c) * (b + d), + (a + c) * (b * d), + (a * c) + (b + d), + (a * c) + (b * d), + (a * c) * (b + d), + (a * c) * (b * d), + + (a + d) + (b + c), + (a + d) + (b * c), + (a + d) * (b + c), + (a + d) * (b * c), + (a * d) + (b + c), + (a * d) + (b * c), + (a * d) * (b + c), + (a * d) * (b * c), + + a + (b + c + d), + a + (b * c * d), + a * (b + c + d), + a * (b * c * d), + a + choose(2, [b, c, d]), + a * choose(2, [b, c, d]), + + b + (a + c + d), + b + (a * c * d), + b * (a + c + d), + b * (a * c * d), + b + choose(2, [a, c, d]), + b * choose(2, [a, c, d]), + + c + (a + b + d), + c + (a * b * d), + c * (a + b + d), + c * (a * b * d), + c + choose(2, [a, b, d]), + c * choose(2, [a, b, d]), + + d + (a + b + c), + d + (a * b * c), + d * (a + b + c), + d * (a * b * c), + d + choose(2, [a, b, c]), + d * choose(2, [a, b, c]), + ]) + + def test_search(self): + a = Node('a', capacity=1, latency=datetime.timedelta(seconds=2)) + b = Node('b', capacity=2, latency=datetime.timedelta(seconds=1)) + c = Node('c', capacity=1, latency=datetime.timedelta(seconds=2)) + d = Node('d', capacity=2, latency=datetime.timedelta(seconds=1)) + e = Node('e', capacity=1, latency=datetime.timedelta(seconds=2)) + f = Node('f', capacity=2, latency=datetime.timedelta(seconds=1)) + + for fr in [0, 0.5, 1]: + search([a, b, c], read_fraction=fr) + search([a, b, c], read_fraction=fr, optimize='network') + search([a, b, c], read_fraction=fr, optimize='latency') + search([a, b, c], read_fraction=fr, resilience=1) + search([a, b, c], read_fraction=fr, f=1) + + search([a, b, c], + read_fraction=0.25, + network_limit=3, + latency_limit=datetime.timedelta(seconds=2)) + + for fr in [0, 0.5]: + t = datetime.timedelta(seconds=0.25) + nodes = [a, b, c, d, e, f] + search(nodes, read_fraction=fr, timeout=t) + search(nodes, read_fraction=fr, timeout=t, optimize='network') + search(nodes, read_fraction=fr, timeout=t, optimize='latency') + search(nodes, read_fraction=fr, timeout=t, resilience=1) + search(nodes, read_fraction=fr, timeout=t, f=1)