# TODO(mwhittaker): We can define a set of read quorums that are not minimal. # Does this mess things up? from . import distribution from .distribution import Distribution from .expr import Expr, Node from .strategy import Strategy from typing import (Callable, Dict, Iterator, Generic, List, Optional, Set, TypeVar) import collections import datetime import itertools import pulp T = TypeVar('T') LOAD = 'load' NETWORK = 'network' LATENCY = 'latency' # TODO(mwhittaker): Add some other non-optimal strategies. # TODO(mwhittaker): Make it easy to make arbitrary strategies. class QuorumSystem(Generic[T]): def __init__(self, reads: Optional[Expr[T]] = None, writes: Optional[Expr[T]] = None) -> None: if reads is not None and writes is not None: optimal_writes = reads.dual() if not all(optimal_writes.is_quorum(write_quorum) for write_quorum in writes.quorums()): raise ValueError( 'Not all read quorums intersect all write quorums') self.reads = reads self.writes = writes elif reads is not None and writes is None: self.reads = reads self.writes = reads.dual() elif reads is None and writes is not None: self.reads = writes.dual() self.writes = writes else: raise ValueError('A QuorumSystem must be instantiated with a set ' 'of read quorums or a set of write quorums') def __repr__(self) -> str: return f'QuorumSystem(reads={self.reads}, writes={self.writes})' def read_quorums(self) -> Iterator[Set[T]]: return self.reads.quorums() def write_quorums(self) -> Iterator[Set[T]]: return self.writes.quorums() def is_read_quorum(self, xs: Set[T]) -> bool: return self.reads.is_quorum(xs) def is_write_quorum(self, xs: Set[T]) -> bool: return self.writes.is_quorum(xs) def nodes(self) -> Set[Node[T]]: return self.reads.nodes() | self.writes.nodes() def resilience(self) -> int: return min(self.read_resilience(), self.write_resilience()) def read_resilience(self) -> int: return self.reads.resilience() def write_resilience(self) -> int: return self.writes.resilience() def strategy(self, optimize: str = LOAD, load_limit: Optional[float] = None, network_limit: Optional[float] = None, latency_limit: Optional[datetime.timedelta] = None, read_fraction: Optional[Distribution] = None, write_fraction: Optional[Distribution] = None, f: int = 0) \ -> 'Strategy[T]': if f < 0: raise ValueError('f must be >= 0') if optimize == LOAD and load_limit is not None: raise ValueError( 'a load limit cannot be set when optimizing for load') if optimize == NETWORK and network_limit is not None: raise ValueError( 'a network limit cannot be set when optimizing for network') if optimize == LATENCY and latency_limit is not None: raise ValueError( 'a latency limit cannot be set when optimizing for latency') d = distribution.canonicalize_rw(read_fraction, write_fraction) if f == 0: return self._load_optimal_strategy( list(self.read_quorums()), list(self.write_quorums()), d, optimize=optimize, load_limit=load_limit, network_limit=network_limit, latency_limit=latency_limit) else: xs = [node.x for node in self.nodes()] read_quorums = list(self._f_resilient_quorums(f, xs, self.reads)) write_quorums = list(self._f_resilient_quorums(f, xs, self.reads)) if len(read_quorums) == 0: raise ValueError(f'There are no {f}-resilient read quorums') if len(write_quorums) == 0: raise ValueError(f'There are no {f}-resilient write quorums') return self._load_optimal_strategy( read_quorums, write_quorums, d, optimize=optimize, load_limit=load_limit, network_limit=network_limit, latency_limit=latency_limit) def dup_free(self) -> bool: return self.reads.dup_free() and self.writes.dup_free() def _f_resilient_quorums(self, f: int, xs: List[T], e: Expr) -> Iterator[Set[T]]: assert f >= 1 def helper(s: Set[T], i: int) -> Iterator[Set[T]]: if all(e.is_quorum(s - set(failure)) for failure in itertools.combinations(s, min(f, len(s)))): yield set(s) return for j in range(i, len(xs)): s.add(xs[j]) yield from helper(s, j + 1) s.remove(xs[j]) return helper(set(), 0) def load(self, read_fraction: Optional[Distribution] = None, write_fraction: Optional[Distribution] = None, f: int = 0) \ -> float: return 0 # TODO(mwhittaker): Remove. # sigma = self.strategy(read_fraction, write_fraction, f) # return sigma.load(read_fraction, write_fraction) def capacity(self, read_fraction: Optional[Distribution] = None, write_fraction: Optional[Distribution] = None, f: int = 0) \ -> float: return 0 # TODO(mwhittaker): Remove. # return 1 / self.load(read_fraction, write_fraction, f) def _read_quorum_latency(self, quorum: Set[Node[T]]) -> datetime.timedelta: return self._quorum_latency(quorum, self.is_read_quorum) def _write_quorum_latency(self, quorum: Set[Node[T]]) -> datetime.timedelta: return self._quorum_latency(quorum, self.is_write_quorum) def _quorum_latency(self, quorum: Set[Node[T]], is_quorum: Callable[[Set[T]], bool]) \ -> datetime.timedelta: nodes = list(quorum) nodes.sort(key=lambda node: node.latency) for i in range(len(quorum)): if is_quorum({node.x for node in nodes[:i+1]}): return nodes[i].latency raise ValueError('_quorum_latency called on a non-quorum') def _load_optimal_strategy( self, read_quorums: List[Set[T]], write_quorums: List[Set[T]], read_fraction: Dict[float, float], optimize: str = LOAD, load_limit: Optional[float] = None, network_limit: Optional[float] = None, latency_limit: Optional[datetime.timedelta] = None) -> 'Strategy[T]': """ Consider the following 2x2 grid quorum system. a b c d with read_quorums = [{a, b}, {c, d}] write_quorums = [{a, c}, {a, d}, {b, c}, {b, d}] We can form a linear program to compute the optimal load of this quorum system for some fixed read fraction fr as follows. First, we create a variable ri for every read quorum i and a variable wi for every write quorum i. ri represents the probabilty of selecting the ith read quorum, and wi represents the probabilty of selecting the ith write quorum. We introduce an additional variable l that represents the load and solve the following linear program. min L subject to r0 + r1 + r2 = 1 w0 + w1 = 1 fr (r0) + (1 - fr) (w0 + w1) <= L # a's load fr (r0) + (1 - fr) (w2 + w3) <= L # b's load fr (r1) + (1 - fr) (w0 + w2) <= L # c's load fr (r1) + (1 - fr) (w1 + w3) <= L # d's load If we assume every element x has read capacity rcap_x and write capacity wcap_x, then we adjust the linear program like this. min L subject to r0 + r1 + r2 = 1 w0 + w1 = 1 fr/rcap_a (r0) + (1 - fr)/wcap_a (w0 + w1) <= L # a's load fr/rcap_b (r0) + (1 - fr)/wcap_b (w2 + w3) <= L # b's load fr/rcap_c (r1) + (1 - fr)/wcap_c (w0 + w2) <= L # c's load fr/rcap_d (r1) + (1 - fr)/wcap_d (w1 + w3) <= L # d's load Assume we have fr = 0.9 with 80% probabilty and fr = 0.5 with 20%. Then we adjust the linear program as follows to find the strategy that minimzes the average load. min 0.8 * L_0.9 + 0.2 * L_0.5 subject to r0 + r1 + r2 = 1 w0 + w1 = 1 0.9/rcap_a (r0) + 0.1/wcap_a (w0 + w1) <= L_0.9 # a's load 0.9/rcap_b (r0) + 0.1/wcap_b (w2 + w3) <= L_0.9 # b's load 0.9/rcap_c (r1) + 0.1/wcap_c (w0 + w2) <= L_0.9 # c's load 0.9/rcap_d (r1) + 0.1/wcap_d (w1 + w3) <= L_0.9 # d's load 0.5/rcap_a (r0) + 0.5/wcap_a (w0 + w1) <= L_0.5 # a's load 0.5/rcap_b (r0) + 0.5/wcap_b (w2 + w3) <= L_0.5 # b's load 0.5/rcap_c (r1) + 0.5/wcap_c (w0 + w2) <= L_0.5 # c's load 0.5/rcap_d (r1) + 0.5/wcap_d (w1 + w3) <= L_0.5 # d's load """ nodes = self.nodes() x_to_node = {node.x: node for node in nodes} read_capacity = {node.x: node.read_capacity for node in nodes} write_capacity = {node.x: node.write_capacity for node in nodes} # Create a variable for every read quorum and every write quorum. While # we do this, map each element x to the read and write quorums that # it's in. For example, image we have the following read and write # quorums: # # read_quorums = [{a}, {a, b}, {a, c}] # write_quorums = [{a, b}, {a, b, c}] # # Then, we'd have # # read_quorum_vars = [r0, r1, 2] # write_quorum_vars = [w0, w1] # x_to_read_quorum_vars = {a: [r1, r2, r3], b: [r1], c: [r2]} # x_to_write_quorum_vars = {a: [w1, w2], b: [w2, w2], c: [w2]} read_quorum_vars: List[pulp.LpVariable] = [] x_to_read_quorum_vars: Dict[T, List[pulp.LpVariable]] = \ collections.defaultdict(list) for (i, read_quorum) in enumerate(read_quorums): v = pulp.LpVariable(f'r{i}', 0, 1) read_quorum_vars.append(v) for x in read_quorum: x_to_read_quorum_vars[x].append(v) write_quorum_vars: List[pulp.LpVariable] = [] x_to_write_quorum_vars: Dict[T, List[pulp.LpVariable]] = \ collections.defaultdict(list) for (i, write_quorum) in enumerate(write_quorums): v = pulp.LpVariable(f'w{i}', 0, 1) write_quorum_vars.append(v) for x in write_quorum: x_to_write_quorum_vars[x].append(v) fr = sum(weight * f for (f, weight) in read_fraction.items()) def network() -> pulp.LpAffineExpression: read_network = fr * sum( v * len(rq) for (rq, v) in zip(read_quorums, read_quorum_vars) ) write_network = (1 - fr) * sum( v * len(wq) for (wq, v) in zip(write_quorums, write_quorum_vars) ) return read_network + write_network def latency() -> pulp.LpAffineExpression: read_latency = fr * sum( v * self._read_quorum_latency(quorum).total_seconds() for (rq, v) in zip(read_quorums, read_quorum_vars) for quorum in [{x_to_node[x] for x in rq}] ) write_latency = (1 - fr) * sum( v * self._write_quorum_latency(quorum).total_seconds() for (wq, v) in zip(write_quorums, write_quorum_vars) for quorum in [{x_to_node[x] for x in wq}] ) return read_latency + write_latency def fr_load(problem: pulp.LpProblem, fr: float) -> pulp.LpAffineExpression: l = pulp.LpVariable(f'l_{fr}', 0, 1) for node in nodes: x = node.x x_load: pulp.LpAffineExpression = 0 if x in x_to_read_quorum_vars: vs = x_to_read_quorum_vars[x] x_load += fr * sum(vs) / read_capacity[x] if x in x_to_write_quorum_vars: vs = x_to_write_quorum_vars[x] x_load += (1 - fr) * sum(vs) / write_capacity[x] problem += (x_load <= l, f'{x}{fr}') return l def load(problem: pulp.LpProblem, read_fraction: Dict[float, float]) -> pulp.LpAffineExpression: return sum(weight * fr_load(problem, fr) for (fr, weight) in read_fraction.items()) # Form the linear program to find the load. problem = pulp.LpProblem("load", pulp.LpMinimize) # We add these constraints to make sure that the probabilities we # select form valid probabilty distributions. problem += (sum(read_quorum_vars) == 1, 'valid read strategy') problem += (sum(write_quorum_vars) == 1, 'valid write strategy') # Add the objective. if optimize == LOAD: problem += load(problem, read_fraction) elif optimize == NETWORK: problem += network() else: assert optimize == LATENCY problem += latency() # Add any constraints. if load_limit is not None: problem += (load(problem, read_fraction) <= load_limit, 'load limit') if network_limit is not None: problem += (network() <= network_limit, 'network limit') if latency_limit is not None: problem += (latency() <= latency_limit.total_seconds(), 'latency limit') # Solve the linear program. print(problem) problem.solve(pulp.apis.PULP_CBC_CMD(msg=False)) if problem.status != pulp.LpStatusOptimal: raise ValueError('no strategy satisfies the given constraints') # Prune out any quorums with 0 probability. non_zero_read_quorums = [ (rq, v.varValue) for (rq, v) in zip(read_quorums, read_quorum_vars) if v.varValue != 0] non_zero_write_quorums = [ (wq, v.varValue) for (wq, v) in zip(write_quorums, write_quorum_vars) if v.varValue != 0] return Strategy(nodes, [rq for (rq, _) in non_zero_read_quorums], [weight for (_, weight) in non_zero_read_quorums], [wq for (wq, _) in non_zero_write_quorums], [weight for (_, weight) in non_zero_write_quorums])