From 7f1a1a39345f83c5dceeb7cbb45590dec8eca900 Mon Sep 17 00:00:00 2001 From: Michael Whittaker Date: Fri, 22 Jan 2021 14:47:07 -0800 Subject: [PATCH] Split up code into separate modules. --- examples.py | 2 +- quorums/__init__.py | 2 + quorums/distribution.py | 57 ++++ quorums/expr.py | 250 +++++++++++++++++ quorums/quorum_system.py | 174 ++++++++++++ quorums/quorums.py | 561 --------------------------------------- quorums/strategy.py | 93 +++++++ 7 files changed, 577 insertions(+), 562 deletions(-) create mode 100644 quorums/distribution.py create mode 100644 quorums/expr.py create mode 100644 quorums/quorum_system.py create mode 100644 quorums/strategy.py diff --git a/examples.py b/examples.py index 631c883..b6dd8ea 100644 --- a/examples.py +++ b/examples.py @@ -1,4 +1,4 @@ -from quorums.quorums import * +from quorums import * a = Node('a') b = Node('b') diff --git a/quorums/__init__.py b/quorums/__init__.py index e69de29..14493b2 100644 --- a/quorums/__init__.py +++ b/quorums/__init__.py @@ -0,0 +1,2 @@ +from .expr import Node, choose, majority +from .quorum_system import QuorumSystem diff --git a/quorums/distribution.py b/quorums/distribution.py new file mode 100644 index 0000000..2198835 --- /dev/null +++ b/quorums/distribution.py @@ -0,0 +1,57 @@ +from typing import Dict, Optional, Union + +Fraction = float +Weight = float +Probability = float +Distribution = Union[ + # For example, 1 means 100% reads. + int, + # For example, 0.25 means 25% reads. + float, + # For example, {0.25: 1, 0.8: 2} means 25% reads one third of the time and + # 80% reads two thirds of the time. + Dict[Fraction, Weight], +] + +def canonicalize(d: Distribution) -> Dict[Fraction, Probability]: + if isinstance(d, int): + if d < 0 or d > 1: + raise ValueError('distribution must be in the range [0, 1]') + return {float(d): 1.} + elif isinstance(d, float): + if d < 0 or d > 1: + raise ValueError('distribution must be in the range [0, 1]') + return {d: 1.} + elif isinstance(d, dict): + if len(d) == 0: + raise ValueError('distribution cannot empty') + + if any(weight < 0 for weight in d.values()): + raise ValueError('distribution cannot have negative weights') + + total_weight = sum(d.values()) + if total_weight == 0: + raise ValueError('distribution cannot have zero weight') + + return {float(f): weight / total_weight + for (f, weight) in d.items() + if weight > 0} + else: + raise ValueError('distribution must be an int, a float, a Dict[float, ' + 'float] or a List[Tuple[float, float]]') + + +def canonicalize_rw(read_fraction: Optional[Distribution], + write_fraction: Optional[Distribution]) \ + -> Dict[Fraction, Probability]: + if read_fraction is None and write_fraction is None: + raise ValueError('Either read_fraction or write_fraction must be given') + elif read_fraction is not None and write_fraction is not None: + raise ValueError('Only one of read_fraction or write_fraction can be ' + 'given') + elif read_fraction is not None: + return canonicalize(read_fraction) + else: + assert write_fraction is not None + return {1 - f: weight + for (f, weight) in canonicalize(write_fraction).items()} diff --git a/quorums/expr.py b/quorums/expr.py new file mode 100644 index 0000000..ae08b01 --- /dev/null +++ b/quorums/expr.py @@ -0,0 +1,250 @@ +from typing import Dict, Iterator, Generic, List, Optional, Set, TypeVar +import itertools +import pulp + + +T = TypeVar('T') + + +def _min_hitting_set(sets: Iterator[Set[T]]) -> int: + x_vars: Dict[T, pulp.LpVariable] = dict() + next_id = itertools.count() + + problem = pulp.LpProblem("min_hitting_set", pulp.LpMinimize) + for (i, xs) in enumerate(sets): + for x in xs: + if x not in x_vars: + id = next(next_id) + x_vars[x] = pulp.LpVariable(f'x{id}', cat=pulp.LpBinary) + problem += sum(x_vars[x] for x in xs) >= 1 + + problem += sum(x_vars.values()) + problem.solve(pulp.apis.PULP_CBC_CMD(msg=False)) + return int(sum(v.varValue for v in x_vars.values())) + + +class Expr(Generic[T]): + def __add__(self, rhs: 'Expr[T]') -> 'Expr[T]': + def _or(lhs: Expr[T], rhs: Expr[T]) -> 'Or[T]': + if isinstance(lhs, Or) and isinstance(rhs, Or): + return Or(lhs.es + rhs.es) + elif isinstance(lhs, Or): + return Or(lhs.es + [rhs]) + elif isinstance(rhs, Or): + return Or([lhs] + rhs.es) + else: + return Or([lhs, rhs]) + + + return _or(self, rhs) + + def __mul__(self, rhs: 'Expr[T]') -> 'Expr[T]': + def _and(lhs: Expr[T], rhs: Expr[T]) -> 'And[T]': + if isinstance(lhs, And) and isinstance(rhs, And): + return And(lhs.es + rhs.es) + elif isinstance(lhs, And): + return And(lhs.es + [rhs]) + elif isinstance(rhs, And): + return And([lhs] + rhs.es) + else: + return And([lhs, rhs]) + + return _and(self, rhs) + + def quorums(self) -> Iterator[Set[T]]: + raise NotImplementedError + + def is_quorum(self, xs: Set[T]) -> bool: + raise NotImplementedError + + def elements(self) -> Set[T]: + return {node.x for node in self.nodes()} + + def nodes(self) -> Set['Node[T]']: + raise NotImplementedError + + def resilience(self) -> int: + if self.dup_free(): + return self._dup_free_min_failures() - 1 + else: + return _min_hitting_set(self.quorums()) - 1 + + def dual(self) -> 'Expr[T]': + raise NotImplementedError + + def dup_free(self) -> bool: + return len(self.nodes()) == self._num_leaves() + + def _num_leaves(self) -> int: + raise NotImplementedError + + def _dup_free_min_failures(self) -> int: + raise NotImplementedError + + +class Node(Expr[T]): + def __init__(self, + x: T, + capacity: Optional[float] = None, + read_capacity: Optional[float] = None, + write_capacity: Optional[float] = None) -> None: + self.x = x + + # A user either specifies capacity or (read_capacity and + # write_capacity), but not both. + if (capacity is None and + read_capacity is None and + write_capacity is None): + self.read_capacity = 1.0 + self.write_capacity = 1.0 + elif (capacity is not None and + read_capacity is None and + write_capacity is None): + self.read_capacity = capacity + self.write_capacity = capacity + elif (capacity is None and + read_capacity is not None and + write_capacity is not None): + self.read_capacity = read_capacity + self.write_capacity = write_capacity + else: + raise ValueError('You must specify capacity or (read_capacity ' + 'and write_capacity)') + + def __str__(self) -> str: + return str(self.x) + + def __repr__(self) -> str: + return f'Node({self.x})' + + def quorums(self) -> Iterator[Set[T]]: + yield {self.x} + + def is_quorum(self, xs: Set[T]) -> bool: + return self.x in xs + + def nodes(self) -> Set['Node[T]']: + return {self} + + def dual(self) -> Expr: + return self + + def _num_leaves(self) -> int: + return 1 + + def _dup_free_min_failures(self) -> int: + return 1 + + +class Or(Expr[T]): + def __init__(self, es: List[Expr[T]]) -> None: + if len(es) == 0: + raise ValueError(f'Or cannot be constructed with an empty list') + + self.es = es + + def __str__(self) -> str: + return '(' + ' + '.join(str(e) for e in self.es) + ')' + + def __repr__(self) -> str: + return f'Or({self.es})' + + def quorums(self) -> Iterator[Set[T]]: + for e in self.es: + yield from e.quorums() + + def is_quorum(self, xs: Set[T]) -> bool: + return any(e.is_quorum(xs) for e in self.es) + + def nodes(self) -> Set[Node[T]]: + return set.union(*[e.nodes() for e in self.es]) + + def dual(self) -> Expr: + return And([e.dual() for e in self.es]) + + def _num_leaves(self) -> int: + return sum(e._num_leaves() for e in self.es) + + def _dup_free_min_failures(self) -> int: + return sum(e._dup_free_min_failures() for e in self.es) + + +class And(Expr[T]): + def __init__(self, es: List[Expr[T]]) -> None: + if len(es) == 0: + raise ValueError(f'And cannot be constructed with an empty list') + + self.es = es + + def __str__(self) -> str: + return '(' + ' * '.join(str(e) for e in self.es) + ')' + + def __repr__(self) -> str: + return f'And({self.es})' + + def quorums(self) -> Iterator[Set[T]]: + for subquorums in itertools.product(*[e.quorums() for e in self.es]): + yield set.union(*subquorums) + + def is_quorum(self, xs: Set[T]) -> bool: + return all(e.is_quorum(xs) for e in self.es) + + def nodes(self) -> Set[Node[T]]: + return set.union(*[e.nodes() for e in self.es]) + + def dual(self) -> Expr: + return Or([e.dual() for e in self.es]) + + def _num_leaves(self) -> int: + return sum(e._num_leaves() for e in self.es) + + def _dup_free_min_failures(self) -> int: + return min(e._dup_free_min_failures() for e in self.es) + +class Choose(Expr[T]): + def __init__(self, k: int, es: List[Expr[T]]) -> None: + if k <= 0 or k > len(es): + raise ValueError(f'k must be in the range [1, {len(es)}]') + + self.k = k + self.es = es + + def __str__(self) -> str: + return f'choose{self.k}(' + ', '.join(str(e) for e in self.es) + ')' + + def __repr__(self) -> str: + return f'Chose({self.k}, {self.es})' + + def quorums(self) -> Iterator[Set[T]]: + for combo in itertools.combinations(self.es, self.k): + for subquorums in itertools.product(*[e.quorums() for e in combo]): + yield set.union(*subquorums) + + def is_quorum(self, xs: Set[T]) -> bool: + return sum(1 if e.is_quorum(xs) else 0 for e in self.es) >= self.k + + def nodes(self) -> Set[Node[T]]: + return set.union(*[e.nodes() for e in self.es]) + + def dual(self) -> Expr: + # TODO(mwhittaker): Prove that this is in fact the dual. + return Choose(len(self.es) - self.k + 1, [e.dual() for e in self.es]) + + def _num_leaves(self) -> int: + return sum(e._num_leaves() for e in self.es) + + def _dup_free_min_failures(self) -> int: + return sum(sorted(e._dup_free_min_failures() for e in self.es)[:self.k]) + + +def choose(k: int, es: List[Expr[T]]) -> Expr[T]: + if k == 1: + return Or(es) + elif k == len(es): + return And(es) + else: + return Choose(k, es) + + +def majority(es: List[Expr[T]]) -> Expr[T]: + return choose(len(es) // 2 + 1, es) diff --git a/quorums/quorum_system.py b/quorums/quorum_system.py new file mode 100644 index 0000000..43a81f9 --- /dev/null +++ b/quorums/quorum_system.py @@ -0,0 +1,174 @@ +# TODO(mwhittaker): We can define a set of read quorums that are not minimal. +# Does this mess things up? + +from . import distribution +from .distribution import Distribution +from .expr import Expr, Node +from .strategy import ExplicitStrategy, Strategy +from typing import Dict, Iterator, Generic, List, Optional, Set, TypeVar +import collections +import itertools +import pulp + + +T = TypeVar('T') + + +class QuorumSystem(Generic[T]): + def __init__(self, reads: Optional[Expr[T]] = None, + writes: Optional[Expr[T]] = None) -> None: + if reads is not None and writes is not None: + # TODO(mwhittaker): Think of ways to make this more efficient. + assert all(len(r & w) > 0 + for (r, w) in itertools.product(reads.quorums(), + writes.quorums())) + self.reads = reads + self.writes = writes + elif reads is not None and writes is None: + self.reads = reads + self.writes = reads.dual() + elif reads is None and writes is not None: + self.reads = writes.dual() + self.writes = writes + else: + raise ValueError('A QuorumSystem must be instantiated with a set ' + 'of read quorums or a set of write quorums') + + def __repr__(self) -> str: + return f'QuorumSystem(reads={self.reads}, writes={self.writes})' + + def read_quorums(self) -> Iterator[Set[T]]: + return self.reads.quorums() + + def write_quorums(self) -> Iterator[Set[T]]: + return self.writes.quorums() + + def is_read_quorum(self, xs: Set[T]) -> bool: + return self.reads.is_quorum(xs) + + def is_write_quorum(self, xs: Set[T]) -> bool: + return self.writes.is_quorum(xs) + + def nodes(self) -> Set[Node[T]]: + return self.reads.nodes() | self.writes.nodes() + + def resilience(self) -> int: + return min(self.read_resilience(), self.write_resilience()) + + def read_resilience(self) -> int: + return self.reads.resilience() + + def write_resilience(self) -> int: + return self.writes.resilience() + + def strategy(self, + read_fraction: Optional[Distribution] = None, + write_fraction: Optional[Distribution] = None, + f: int = 0) \ + -> 'Strategy[T]': + if f < 0: + raise ValueError('f must be >= 0') + + d = distribution.canonicalize_rw(read_fraction, write_fraction) + if f == 0: + return self._load_optimal_strategy( + list(self.read_quorums()), + list(self.write_quorums()), + d) + else: + xs = [node.x for node in self.nodes()] + read_quorums = list(self._f_resilient_quorums(f, xs, self.reads)) + write_quorums = list(self._f_resilient_quorums(f, xs, self.reads)) + if len(read_quorums) == 0: + raise ValueError(f'There are no {f}-resilient read quorums') + if len(write_quorums) == 0: + raise ValueError(f'There are no {f}-resilient write quorums') + return self._load_optimal_strategy(read_quorums, write_quorums, d) + + def dup_free(self) -> bool: + return self.reads.dup_free() and self.writes.dup_free() + + def _f_resilient_quorums(self, + f: int, + xs: List[T], + e: Expr) -> Iterator[Set[T]]: + assert f >= 1 + + def helper(s: Set[T], i: int) -> Iterator[Set[T]]: + if all(e.is_quorum(s - set(failure)) + for failure in itertools.combinations(s, min(f, len(s)))): + yield set(s) + return + + for j in range(i, len(xs)): + s.add(xs[j]) + yield from helper(s, j + 1) + s.remove(xs[j]) + + return helper(set(), 0) + + def load(self, + read_fraction: Optional[Distribution] = None, + write_fraction: Optional[Distribution] = None, + f: int = 0) \ + -> float: + sigma = self.strategy(read_fraction, write_fraction, f) + return sigma.load(read_fraction, write_fraction) + + def _load_optimal_strategy(self, + read_quorums: List[Set[T]], + write_quorums: List[Set[T]], + read_fraction: Dict[float, float]) \ + -> 'Strategy[T]': + # TODO(mwhittaker): Explain f_r calculation. + fr = sum(f * weight for (f, weight) in read_fraction.items()) + + nodes = self.reads.nodes() | self.writes.nodes() + read_capacity = {node.x: node.read_capacity for node in nodes} + write_capacity = {node.x: node.write_capacity for node in nodes} + + read_quorum_vars: List[pulp.LpVariable] = [] + x_to_read_quorum_vars: Dict[T, List[pulp.LpVariable]] = \ + collections.defaultdict(list) + + for (i, read_quorum) in enumerate(read_quorums): + v = pulp.LpVariable(f'r{i}', 0, 1) + read_quorum_vars.append(v) + for x in read_quorum: + x_to_read_quorum_vars[x].append(v) + + write_quorum_vars: List[pulp.LpVariable] = [] + x_to_write_quorum_vars: Dict[T, List[pulp.LpVariable]] = \ + collections.defaultdict(list) + for (i, write_quorum) in enumerate(write_quorums): + v = pulp.LpVariable(f'w{i}', 0, 1) + write_quorum_vars.append(v) + for x in write_quorum: + x_to_write_quorum_vars[x].append(v) + + # Form the linear program to find the load. + problem = pulp.LpProblem("load", pulp.LpMinimize) + + # If we're trying to balance the strategy, then we want to minimize the + # pairwise absolute differences between the read probabilities and the + # write probabilities. + l = pulp.LpVariable('l', 0, 1) + problem += l + problem += (sum(read_quorum_vars) == 1, 'valid read strategy') + problem += (sum(write_quorum_vars) == 1, 'valid write strategy') + for node in nodes: + x = node.x + x_load: pulp.LpAffineExpression = 0 + if x in x_to_read_quorum_vars: + x_load += fr * sum(x_to_read_quorum_vars[x]) / read_capacity[x] + if x in x_to_write_quorum_vars: + x_load += ((1 - fr) * sum(x_to_write_quorum_vars[x]) / + write_capacity[x]) + problem += (x_load <= l, x) + + problem.solve(pulp.apis.PULP_CBC_CMD(msg=False)) + return ExplicitStrategy(nodes, + read_quorums, + [v.varValue for v in read_quorum_vars], + write_quorums, + [v.varValue for v in write_quorum_vars]) diff --git a/quorums/quorums.py b/quorums/quorums.py index d3e1ca5..ba32ae2 100644 --- a/quorums/quorums.py +++ b/quorums/quorums.py @@ -1,567 +1,6 @@ # TODO(mwhittaker): We can define a set of read quorums that are not minimal. # Does this mess things up? -from typing import (Dict, Iterator, Generic, List, Optional, Set, Tuple, - TypeVar, Union) -import collections -import itertools -import numpy as np -import pulp - - -T = TypeVar('T') - - -def _min_hitting_set(sets: Iterator[Set[T]]) -> int: - x_vars: Dict[T, pulp.LpVariable] = dict() - next_id = itertools.count() - - problem = pulp.LpProblem("min_hitting_set", pulp.LpMinimize) - for (i, xs) in enumerate(sets): - for x in xs: - if x not in x_vars: - id = next(next_id) - x_vars[x] = pulp.LpVariable(f'x{id}', cat=pulp.LpBinary) - problem += sum(x_vars[x] for x in xs) >= 1 - - problem += sum(x_vars.values()) - problem.solve(pulp.apis.PULP_CBC_CMD(msg=False)) - return int(sum(v.varValue for v in x_vars.values())) - - -class Expr(Generic[T]): - def __add__(self, rhs: 'Expr[T]') -> 'Expr[T]': - return _or(self, rhs) - - def __mul__(self, rhs: 'Expr[T]') -> 'Expr[T]': - return _and(self, rhs) - - def quorums(self) -> Iterator[Set[T]]: - raise NotImplementedError - - def is_quorum(self, xs: Set[T]) -> bool: - raise NotImplementedError - - def elements(self) -> Set[T]: - return {node.x for node in self.nodes()} - - def nodes(self) -> Set['Node[T]']: - raise NotImplementedError - - def resilience(self) -> int: - if self.dup_free(): - return self._dup_free_min_failures() - 1 - else: - return _min_hitting_set(self.quorums()) - 1 - - def dual(self) -> 'Expr[T]': - raise NotImplementedError - - def dup_free(self) -> bool: - return len(self.nodes()) == self._num_leaves() - - def _num_leaves(self) -> int: - raise NotImplementedError - - def _dup_free_min_failures(self) -> int: - raise NotImplementedError - - -class Node(Expr[T]): - def __init__(self, - x: T, - capacity: Optional[float] = None, - read_capacity: Optional[float] = None, - write_capacity: Optional[float] = None) -> None: - self.x = x - - # A user either specifies capacity or (read_capacity and - # write_capacity), but not both. - if (capacity is None and - read_capacity is None and - write_capacity is None): - self.read_capacity = 1.0 - self.write_capacity = 1.0 - elif (capacity is not None and - read_capacity is None and - write_capacity is None): - self.read_capacity = capacity - self.write_capacity = capacity - elif (capacity is None and - read_capacity is not None and - write_capacity is not None): - self.read_capacity = read_capacity - self.write_capacity = write_capacity - else: - raise ValueError('You must specify capacity or (read_capacity ' - 'and write_capacity)') - - def __str__(self) -> str: - return str(self.x) - - def __repr__(self) -> str: - return f'Node({self.x})' - - def quorums(self) -> Iterator[Set[T]]: - yield {self.x} - - def is_quorum(self, xs: Set[T]) -> bool: - return self.x in xs - - def nodes(self) -> Set['Node[T]']: - return {self} - - def dual(self) -> Expr: - return self - - def _num_leaves(self) -> int: - return 1 - - def _dup_free_min_failures(self) -> int: - return 1 - - -class Or(Expr[T]): - def __init__(self, es: List[Expr[T]]) -> None: - if len(es) == 0: - raise ValueError(f'Or cannot be constructed with an empty list') - - self.es = es - - def __str__(self) -> str: - return '(' + ' + '.join(str(e) for e in self.es) + ')' - - def __repr__(self) -> str: - return f'Or({self.es})' - - def quorums(self) -> Iterator[Set[T]]: - for e in self.es: - yield from e.quorums() - - def is_quorum(self, xs: Set[T]) -> bool: - return any(e.is_quorum(xs) for e in self.es) - - def nodes(self) -> Set[Node[T]]: - return set.union(*[e.nodes() for e in self.es]) - - def dual(self) -> Expr: - return And([e.dual() for e in self.es]) - - def _num_leaves(self) -> int: - return sum(e._num_leaves() for e in self.es) - - def _dup_free_min_failures(self) -> int: - return sum(e._dup_free_min_failures() for e in self.es) - - -class And(Expr[T]): - def __init__(self, es: List[Expr[T]]) -> None: - if len(es) == 0: - raise ValueError(f'And cannot be constructed with an empty list') - - self.es = es - - def __str__(self) -> str: - return '(' + ' * '.join(str(e) for e in self.es) + ')' - - def __repr__(self) -> str: - return f'And({self.es})' - - def quorums(self) -> Iterator[Set[T]]: - for subquorums in itertools.product(*[e.quorums() for e in self.es]): - yield set.union(*subquorums) - - def is_quorum(self, xs: Set[T]) -> bool: - return all(e.is_quorum(xs) for e in self.es) - - def nodes(self) -> Set[Node[T]]: - return set.union(*[e.nodes() for e in self.es]) - - def dual(self) -> Expr: - return Or([e.dual() for e in self.es]) - - def _num_leaves(self) -> int: - return sum(e._num_leaves() for e in self.es) - - def _dup_free_min_failures(self) -> int: - return min(e._dup_free_min_failures() for e in self.es) - -class Choose(Expr[T]): - def __init__(self, k: int, es: List[Expr[T]]) -> None: - if k <= 0 or k > len(es): - raise ValueError(f'k must be in the range [1, {len(es)}]') - - self.k = k - self.es = es - - def __str__(self) -> str: - return f'choose{self.k}(' + ', '.join(str(e) for e in self.es) + ')' - - def __repr__(self) -> str: - return f'Chose({self.k}, {self.es})' - - def quorums(self) -> Iterator[Set[T]]: - for combo in itertools.combinations(self.es, self.k): - for subquorums in itertools.product(*[e.quorums() for e in combo]): - yield set.union(*subquorums) - - def is_quorum(self, xs: Set[T]) -> bool: - return sum(1 if e.is_quorum(xs) else 0 for e in self.es) >= self.k - - def nodes(self) -> Set[Node[T]]: - return set.union(*[e.nodes() for e in self.es]) - - def dual(self) -> Expr: - # TODO(mwhittaker): Prove that this is in fact the dual. - return Choose(len(self.es) - self.k + 1, [e.dual() for e in self.es]) - - def _num_leaves(self) -> int: - return sum(e._num_leaves() for e in self.es) - - def _dup_free_min_failures(self) -> int: - return sum(sorted(e._dup_free_min_failures() for e in self.es)[:self.k]) - - -def _and(lhs: Expr[T], rhs: Expr[T]) -> 'And[T]': - if isinstance(lhs, And) and isinstance(rhs, And): - return And(lhs.es + rhs.es) - elif isinstance(lhs, And): - return And(lhs.es + [rhs]) - elif isinstance(rhs, And): - return And([lhs] + rhs.es) - else: - return And([lhs, rhs]) - - -def _or(lhs: Expr[T], rhs: Expr[T]) -> 'Or[T]': - if isinstance(lhs, Or) and isinstance(rhs, Or): - return Or(lhs.es + rhs.es) - elif isinstance(lhs, Or): - return Or(lhs.es + [rhs]) - elif isinstance(rhs, Or): - return Or([lhs] + rhs.es) - else: - return Or([lhs, rhs]) - - -def choose(k: int, es: List[Expr[T]]) -> Expr[T]: - if k == 1: - return Or(es) - elif k == len(es): - return And(es) - else: - return Choose(k, es) - - -def majority(es: List[Expr[T]]) -> Expr[T]: - return choose(len(es) // 2 + 1, es) - - -ReadFraction = float -ReadWriteFraction = float -Weight = float -Probability = float -Distribution = Union[ - # For example, 1 means 100% reads. - int, - # For example, 0.25 means 25% reads. - float, - # For example, {0.25: 1, 0.8: 2} means 25% reads one third of the time and - # 80% reads two thirds of the time. - Dict[ReadWriteFraction, Weight], -] - - -def _canonicalize_distribution(d: Distribution) \ - -> Dict[ReadWriteFraction, Probability]: - if isinstance(d, int): - if d < 0 or d > 1: - raise ValueError('distribution must be in the range [0, 1]') - return {float(d): 1.} - elif isinstance(d, float): - if d < 0 or d > 1: - raise ValueError('distribution must be in the range [0, 1]') - return {d: 1.} - elif isinstance(d, dict): - if len(d) == 0: - raise ValueError('distribution cannot empty') - - if any(weight < 0 for weight in d.values()): - raise ValueError('distribution cannot have negative weights') - - total_weight = sum(d.values()) - if total_weight == 0: - raise ValueError('distribution cannot have zero weight') - - return {float(f): weight / total_weight - for (f, weight) in d.items() - if weight > 0} - else: - raise ValueError('distribution must be an int, a float, a Dict[float, ' - 'float] or a List[Tuple[float, float]]') - - -def _canonicalize_rw_distribution(read_fraction: Optional[Distribution], - write_fraction: Optional[Distribution]) \ - -> Dict[ReadFraction, Probability]: - if read_fraction is None and write_fraction is None: - raise ValueError('Either read_fraction or write_fraction must be given') - elif read_fraction is not None and write_fraction is not None: - raise ValueError('Only one of read_fraction or write_fraction can be ' - 'given') - elif read_fraction is not None: - return _canonicalize_distribution(read_fraction) - else: - assert write_fraction is not None - return {1 - f: weight - for (f, weight) in - _canonicalize_distribution(write_fraction).items()} - - -class QuorumSystem(Generic[T]): - def __init__(self, reads: Optional[Expr[T]] = None, - writes: Optional[Expr[T]] = None) -> None: - if reads is not None and writes is not None: - # TODO(mwhittaker): Think of ways to make this more efficient. - assert all(len(r & w) > 0 - for (r, w) in itertools.product(reads.quorums(), - writes.quorums())) - self.reads = reads - self.writes = writes - elif reads is not None and writes is None: - self.reads = reads - self.writes = reads.dual() - elif reads is None and writes is not None: - self.reads = writes.dual() - self.writes = writes - else: - raise ValueError('A QuorumSystem must be instantiated with a set ' - 'of read quorums or a set of write quorums') - - def __repr__(self) -> str: - return f'QuorumSystem(reads={self.reads}, writes={self.writes})' - - def read_quorums(self) -> Iterator[Set[T]]: - return self.reads.quorums() - - def write_quorums(self) -> Iterator[Set[T]]: - return self.writes.quorums() - - def is_read_quorum(self, xs: Set[T]) -> bool: - return self.reads.is_quorum(xs) - - def is_write_quorum(self, xs: Set[T]) -> bool: - return self.writes.is_quorum(xs) - - def nodes(self) -> Set[Node[T]]: - return self.reads.nodes() | self.writes.nodes() - - def resilience(self) -> int: - return min(self.read_resilience(), self.write_resilience()) - - def read_resilience(self) -> int: - return self.reads.resilience() - - def write_resilience(self) -> int: - return self.writes.resilience() - - def strategy(self, - read_fraction: Optional[Distribution] = None, - write_fraction: Optional[Distribution] = None, - f: int = 0) \ - -> 'Strategy[T]': - if f < 0: - raise ValueError('f must be >= 0') - - d = _canonicalize_rw_distribution(read_fraction, write_fraction) - if f == 0: - return self._load_optimal_strategy( - list(self.read_quorums()), - list(self.write_quorums()), - d) - else: - xs = [node.x for node in self.nodes()] - read_quorums = list(self._f_resilient_quorums(f, xs, self.reads)) - write_quorums = list(self._f_resilient_quorums(f, xs, self.reads)) - if len(read_quorums) == 0: - raise ValueError(f'There are no {f}-resilient read quorums') - if len(write_quorums) == 0: - raise ValueError(f'There are no {f}-resilient write quorums') - return self._load_optimal_strategy(read_quorums, write_quorums, d) - - def dup_free(self) -> bool: - return self.reads.dup_free() and self.writes.dup_free() - - def _f_resilient_quorums(self, - f: int, - xs: List[T], - e: Expr) -> Iterator[Set[T]]: - assert f >= 1 - - def helper(s: Set[T], i: int) -> Iterator[Set[T]]: - if all(e.is_quorum(s - set(failure)) - for failure in itertools.combinations(s, min(f, len(s)))): - yield set(s) - return - - for j in range(i, len(xs)): - s.add(xs[j]) - yield from helper(s, j + 1) - s.remove(xs[j]) - - return helper(set(), 0) - - def load(self, - read_fraction: Optional[Distribution] = None, - write_fraction: Optional[Distribution] = None, - f: int = 0) \ - -> float: - sigma = self.strategy(read_fraction, write_fraction, f) - return sigma.load(read_fraction, write_fraction) - - def _load_optimal_strategy(self, - read_quorums: List[Set[T]], - write_quorums: List[Set[T]], - read_fraction: Dict[float, float]) \ - -> 'Strategy[T]': - # TODO(mwhittaker): Explain f_r calculation. - fr = sum(f * weight for (f, weight) in read_fraction.items()) - - nodes = self.reads.nodes() | self.writes.nodes() - read_capacity = {node.x: node.read_capacity for node in nodes} - write_capacity = {node.x: node.write_capacity for node in nodes} - - read_quorum_vars: List[pulp.LpVariable] = [] - x_to_read_quorum_vars: Dict[T, List[pulp.LpVariable]] = \ - collections.defaultdict(list) - - for (i, read_quorum) in enumerate(read_quorums): - v = pulp.LpVariable(f'r{i}', 0, 1) - read_quorum_vars.append(v) - for x in read_quorum: - x_to_read_quorum_vars[x].append(v) - - write_quorum_vars: List[pulp.LpVariable] = [] - x_to_write_quorum_vars: Dict[T, List[pulp.LpVariable]] = \ - collections.defaultdict(list) - for (i, write_quorum) in enumerate(write_quorums): - v = pulp.LpVariable(f'w{i}', 0, 1) - write_quorum_vars.append(v) - for x in write_quorum: - x_to_write_quorum_vars[x].append(v) - - # Form the linear program to find the load. - problem = pulp.LpProblem("load", pulp.LpMinimize) - - # If we're trying to balance the strategy, then we want to minimize the - # pairwise absolute differences between the read probabilities and the - # write probabilities. - l = pulp.LpVariable('l', 0, 1) - problem += l - problem += (sum(read_quorum_vars) == 1, 'valid read strategy') - problem += (sum(write_quorum_vars) == 1, 'valid write strategy') - for node in nodes: - x = node.x - x_load: pulp.LpAffineExpression = 0 - if x in x_to_read_quorum_vars: - x_load += fr * sum(x_to_read_quorum_vars[x]) / read_capacity[x] - if x in x_to_write_quorum_vars: - x_load += ((1 - fr) * sum(x_to_write_quorum_vars[x]) / - write_capacity[x]) - problem += (x_load <= l, x) - - problem.solve(pulp.apis.PULP_CBC_CMD(msg=False)) - return ExplicitStrategy(nodes, - read_quorums, - [v.varValue for v in read_quorum_vars], - write_quorums, - [v.varValue for v in write_quorum_vars]) - - -class Strategy(Generic[T]): - def load(self, - read_fraction: Optional[Distribution] = None, - write_fraction: Optional[Distribution] = None) \ - -> float: - raise NotImplementedError - - def get_read_quorum(self) -> Set[T]: - raise NotImplementedError - - def get_write_quorum(self) -> Set[T]: - raise NotImplementedError - - -class ExplicitStrategy(Strategy[T]): - def __init__(self, - nodes: Set[Node[T]], - reads: List[Set[T]], - read_weights: List[float], - writes: List[Set[T]], - write_weights: List[float]) -> None: - self.nodes = nodes - self.read_capacity = {node.x: node.read_capacity for node in nodes} - self.write_capacity = {node.x: node.write_capacity for node in nodes} - self.reads = reads - self.read_weights = read_weights - self.writes = writes - self.write_weights = write_weights - - def __str__(self) -> str: - non_zero_reads = {tuple(r): p - for (r, p) in zip(self.reads, self.read_weights) - if p > 0} - non_zero_writes = {tuple(w): p - for (w, p) in zip(self.writes, self.write_weights) - if p > 0} - return (f'ExplicitStrategy(reads={non_zero_reads}, ' + - f'writes={non_zero_writes})') - - def __repr__(self) -> str: - return (f'ExplicitStrategy(nodes={self.nodes}, '+ - f'reads={self.reads}, ' + - f'read_weights={self.read_weights},' + - f'writes={self.writes}, ' + - f'write_weights={self.write_weights})') - - def load(self, - read_fraction: Optional[Distribution] = None, - write_fraction: Optional[Distribution] = None) \ - -> float: - d = _canonicalize_rw_distribution(read_fraction, write_fraction) - fr = sum(f * weight for (f, weight) in d.items()) - - read_load: Dict[T, float] = collections.defaultdict(float) - for (read_quorum, weight) in zip(self.reads, self.read_weights): - for x in read_quorum: - read_load[x] += weight - - write_load: Dict[T, float] = collections.defaultdict(float) - for (write_quorum, weight) in zip(self.writes, self.write_weights): - for x in write_quorum: - write_load[x] += weight - - loads: List[float] = [] - for node in self.nodes: - x = node.x - load = 0.0 - if x in read_load: - load += fr * read_load[x] / self.read_capacity[x] - if x in write_load: - load += (1 - fr) * write_load[x] / self.write_capacity[x] - loads.append(load) - - return max(loads) - - # TODO(mwhittaker): Add read/write load and capacity and read/write cap. - - def get_read_quorum(self) -> Set[T]: - return np.random.choice(self.reads, p=self.read_weights) - - def get_write_quorum(self) -> Set[T]: - return np.random.choice(self.writes, p=self.write_weights) - - # a = Node('a') diff --git a/quorums/strategy.py b/quorums/strategy.py new file mode 100644 index 0000000..cab5109 --- /dev/null +++ b/quorums/strategy.py @@ -0,0 +1,93 @@ +from . import distribution +from .distribution import Distribution +from .expr import Node +from typing import Dict, Generic, List, Optional, Set, TypeVar +import collections +import numpy as np + + +T = TypeVar('T') + + +class Strategy(Generic[T]): + def load(self, + read_fraction: Optional[Distribution] = None, + write_fraction: Optional[Distribution] = None) \ + -> float: + raise NotImplementedError + + def get_read_quorum(self) -> Set[T]: + raise NotImplementedError + + def get_write_quorum(self) -> Set[T]: + raise NotImplementedError + + +class ExplicitStrategy(Strategy[T]): + def __init__(self, + nodes: Set[Node[T]], + reads: List[Set[T]], + read_weights: List[float], + writes: List[Set[T]], + write_weights: List[float]) -> None: + self.nodes = nodes + self.read_capacity = {node.x: node.read_capacity for node in nodes} + self.write_capacity = {node.x: node.write_capacity for node in nodes} + self.reads = reads + self.read_weights = read_weights + self.writes = writes + self.write_weights = write_weights + + def __str__(self) -> str: + non_zero_reads = {tuple(r): p + for (r, p) in zip(self.reads, self.read_weights) + if p > 0} + non_zero_writes = {tuple(w): p + for (w, p) in zip(self.writes, self.write_weights) + if p > 0} + return (f'ExplicitStrategy(reads={non_zero_reads}, ' + + f'writes={non_zero_writes})') + + def __repr__(self) -> str: + return (f'ExplicitStrategy(nodes={self.nodes}, '+ + f'reads={self.reads}, ' + + f'read_weights={self.read_weights},' + + f'writes={self.writes}, ' + + f'write_weights={self.write_weights})') + + def load(self, + read_fraction: Optional[Distribution] = None, + write_fraction: Optional[Distribution] = None) \ + -> float: + d = distribution.canonicalize_rw(read_fraction, write_fraction) + fr = sum(f * weight for (f, weight) in d.items()) + + read_load: Dict[T, float] = collections.defaultdict(float) + for (read_quorum, weight) in zip(self.reads, self.read_weights): + for x in read_quorum: + read_load[x] += weight + + write_load: Dict[T, float] = collections.defaultdict(float) + for (write_quorum, weight) in zip(self.writes, self.write_weights): + for x in write_quorum: + write_load[x] += weight + + loads: List[float] = [] + for node in self.nodes: + x = node.x + load = 0.0 + if x in read_load: + load += fr * read_load[x] / self.read_capacity[x] + if x in write_load: + load += (1 - fr) * write_load[x] / self.write_capacity[x] + loads.append(load) + + return max(loads) + + # TODO(mwhittaker): Add read/write load and capacity and read/write cap. + + def get_read_quorum(self) -> Set[T]: + return np.random.choice(self.reads, p=self.read_weights) + + def get_write_quorum(self) -> Set[T]: + return np.random.choice(self.writes, p=self.write_weights)