diff --git a/quorums/expr.py b/quorums/expr.py index ae08b01..df330e5 100644 --- a/quorums/expr.py +++ b/quorums/expr.py @@ -1,4 +1,5 @@ from typing import Dict, Iterator, Generic, List, Optional, Set, TypeVar +import datetime import itertools import pulp @@ -87,7 +88,8 @@ class Node(Expr[T]): x: T, capacity: Optional[float] = None, read_capacity: Optional[float] = None, - write_capacity: Optional[float] = None) -> None: + write_capacity: Optional[float] = None, + latency: datetime.timedelta = None) -> None: self.x = x # A user either specifies capacity or (read_capacity and @@ -111,6 +113,12 @@ class Node(Expr[T]): raise ValueError('You must specify capacity or (read_capacity ' 'and write_capacity)') + if latency is None: + self.latency = datetime.timedelta(seconds=1) + else: + self.latency = latency + + def __str__(self) -> str: return str(self.x) diff --git a/quorums/quorum_system.py b/quorums/quorum_system.py index 7ac0fb7..1e10da5 100644 --- a/quorums/quorum_system.py +++ b/quorums/quorum_system.py @@ -5,8 +5,10 @@ from . import distribution from .distribution import Distribution from .expr import Expr, Node from .strategy import Strategy -from typing import Dict, Iterator, Generic, List, Optional, Set, TypeVar +from typing import (Callable, Dict, Iterator, Generic, List, Optional, Set, + TypeVar) import collections +import datetime import itertools import pulp @@ -14,6 +16,14 @@ import pulp T = TypeVar('T') +LOAD = 'load' +NETWORK = 'network' +LATENCY = 'latency' + +# TODO(mwhittaker): Add some other non-optimal strategies. +# TODO(mwhittaker): Make it easy to make arbitrary strategies. + + class QuorumSystem(Generic[T]): def __init__(self, reads: Optional[Expr[T]] = None, writes: Optional[Expr[T]] = None) -> None: @@ -64,6 +74,10 @@ class QuorumSystem(Generic[T]): return self.writes.resilience() def strategy(self, + optimize: str = LOAD, + load_limit: Optional[float] = None, + network_limit: Optional[float] = None, + latency_limit: Optional[datetime.timedelta] = None, read_fraction: Optional[Distribution] = None, write_fraction: Optional[Distribution] = None, f: int = 0) \ @@ -71,12 +85,28 @@ class QuorumSystem(Generic[T]): if f < 0: raise ValueError('f must be >= 0') + if optimize == LOAD and load_limit is not None: + raise ValueError( + 'a load limit cannot be set when optimizing for load') + + if optimize == NETWORK and network_limit is not None: + raise ValueError( + 'a network limit cannot be set when optimizing for network') + + if optimize == LATENCY and latency_limit is not None: + raise ValueError( + 'a latency limit cannot be set when optimizing for latency') + d = distribution.canonicalize_rw(read_fraction, write_fraction) if f == 0: return self._load_optimal_strategy( list(self.read_quorums()), list(self.write_quorums()), - d) + d, + optimize=optimize, + load_limit=load_limit, + network_limit=network_limit, + latency_limit=latency_limit) else: xs = [node.x for node in self.nodes()] read_quorums = list(self._f_resilient_quorums(f, xs, self.reads)) @@ -85,7 +115,14 @@ class QuorumSystem(Generic[T]): raise ValueError(f'There are no {f}-resilient read quorums') if len(write_quorums) == 0: raise ValueError(f'There are no {f}-resilient write quorums') - return self._load_optimal_strategy(read_quorums, write_quorums, d) + return self._load_optimal_strategy( + read_quorums, + write_quorums, + d, + optimize=optimize, + load_limit=load_limit, + network_limit=network_limit, + latency_limit=latency_limit) def dup_free(self) -> bool: return self.reads.dup_free() and self.writes.dup_free() @@ -114,21 +151,46 @@ class QuorumSystem(Generic[T]): write_fraction: Optional[Distribution] = None, f: int = 0) \ -> float: - sigma = self.strategy(read_fraction, write_fraction, f) - return sigma.load(read_fraction, write_fraction) + return 0 + # TODO(mwhittaker): Remove. + # sigma = self.strategy(read_fraction, write_fraction, f) + # return sigma.load(read_fraction, write_fraction) def capacity(self, read_fraction: Optional[Distribution] = None, write_fraction: Optional[Distribution] = None, f: int = 0) \ -> float: - return 1 / self.load(read_fraction, write_fraction, f) + return 0 + # TODO(mwhittaker): Remove. + # return 1 / self.load(read_fraction, write_fraction, f) - def _load_optimal_strategy(self, - read_quorums: List[Set[T]], - write_quorums: List[Set[T]], - read_fraction: Dict[float, float]) \ - -> 'Strategy[T]': + def _read_quorum_latency(self, quorum: Set[Node[T]]) -> datetime.timedelta: + return self._quorum_latency(quorum, self.is_read_quorum) + + def _write_quorum_latency(self, quorum: Set[Node[T]]) -> datetime.timedelta: + return self._quorum_latency(quorum, self.is_write_quorum) + + def _quorum_latency(self, + quorum: Set[Node[T]], + is_quorum: Callable[[Set[T]], bool]) \ + -> datetime.timedelta: + nodes = list(quorum) + nodes.sort(key=lambda node: node.latency) + for i in range(len(quorum)): + if is_quorum({node.x for node in nodes[:i+1]}): + return nodes[i].latency + raise ValueError('_quorum_latency called on a non-quorum') + + def _load_optimal_strategy( + self, + read_quorums: List[Set[T]], + write_quorums: List[Set[T]], + read_fraction: Dict[float, float], + optimize: str = LOAD, + load_limit: Optional[float] = None, + network_limit: Optional[float] = None, + latency_limit: Optional[datetime.timedelta] = None) -> 'Strategy[T]': """ Consider the following 2x2 grid quorum system. @@ -184,7 +246,8 @@ class QuorumSystem(Generic[T]): 0.5/rcap_c (r1) + 0.5/wcap_c (w0 + w2) <= L_0.5 # c's load 0.5/rcap_d (r1) + 0.5/wcap_d (w1 + w3) <= L_0.5 # d's load """ - nodes = self.reads.nodes() | self.writes.nodes() + nodes = self.nodes() + x_to_node = {node.x: node for node in nodes} read_capacity = {node.x: node.read_capacity for node in nodes} write_capacity = {node.x: node.write_capacity for node in nodes} @@ -220,38 +283,92 @@ class QuorumSystem(Generic[T]): for x in write_quorum: x_to_write_quorum_vars[x].append(v) - # Create a variable for every load. - load_vars = {fr: pulp.LpVariable(f'l_{fr}', 0, 1) - for fr in read_fraction.keys()} + fr = sum(weight * f for (f, weight) in read_fraction.items()) + + def network() -> pulp.LpAffineExpression: + read_network = fr * sum( + v * len(rq) + for (rq, v) in zip(read_quorums, read_quorum_vars) + ) + write_network = (1 - fr) * sum( + v * len(wq) + for (wq, v) in zip(write_quorums, write_quorum_vars) + ) + return read_network + write_network + + def latency() -> pulp.LpAffineExpression: + read_latency = fr * sum( + v * self._read_quorum_latency(quorum).total_seconds() + for (rq, v) in zip(read_quorums, read_quorum_vars) + for quorum in [{x_to_node[x] for x in rq}] + ) + write_latency = (1 - fr) * sum( + v * self._write_quorum_latency(quorum).total_seconds() + for (wq, v) in zip(write_quorums, write_quorum_vars) + for quorum in [{x_to_node[x] for x in wq}] + ) + return read_latency + write_latency + + def fr_load(problem: pulp.LpProblem, fr: float) -> pulp.LpAffineExpression: + l = pulp.LpVariable(f'l_{fr}', 0, 1) + + for node in nodes: + x = node.x + x_load: pulp.LpAffineExpression = 0 + + if x in x_to_read_quorum_vars: + vs = x_to_read_quorum_vars[x] + x_load += fr * sum(vs) / read_capacity[x] + + if x in x_to_write_quorum_vars: + vs = x_to_write_quorum_vars[x] + x_load += (1 - fr) * sum(vs) / write_capacity[x] + + problem += (x_load <= l, f'{x}{fr}') + + return l + + def load(problem: pulp.LpProblem, + read_fraction: Dict[float, float]) -> pulp.LpAffineExpression: + return sum(weight * fr_load(problem, fr) + for (fr, weight) in read_fraction.items()) # Form the linear program to find the load. problem = pulp.LpProblem("load", pulp.LpMinimize) - # First, we add our objective. - problem += sum(weight * load_vars[fr] - for (fr, weight) in read_fraction.items()) - - # Next, we make sure that the probabilities we select form valid - # probabilty distributions. + # We add these constraints to make sure that the probabilities we + # select form valid probabilty distributions. problem += (sum(read_quorum_vars) == 1, 'valid read strategy') problem += (sum(write_quorum_vars) == 1, 'valid write strategy') - # Finally, we add constraints for every value of fr. - for fr, weight in read_fraction.items(): - for node in nodes: - x = node.x - x_load: pulp.LpAffineExpression = 0 - if x in x_to_read_quorum_vars: - x_load += (fr * sum(x_to_read_quorum_vars[x]) / - read_capacity[x]) - if x in x_to_write_quorum_vars: - x_load += ((1 - fr) * sum(x_to_write_quorum_vars[x]) / - write_capacity[x]) - problem += (x_load <= load_vars[fr], f'{x}{fr}') + # Add the objective. + if optimize == LOAD: + problem += load(problem, read_fraction) + elif optimize == NETWORK: + problem += network() + else: + assert optimize == LATENCY + problem += latency() + + # Add any constraints. + if load_limit is not None: + problem += (load(problem, read_fraction) <= load_limit, + 'load limit') + + if network_limit is not None: + problem += (network() <= network_limit, 'network limit') + + if latency_limit is not None: + problem += (latency() <= latency_limit.total_seconds(), + 'latency limit') # Solve the linear program. + print(problem) problem.solve(pulp.apis.PULP_CBC_CMD(msg=False)) + if problem.status != pulp.LpStatusOptimal: + raise ValueError('no strategy satisfies the given constraints') + # Prune out any quorums with 0 probability. non_zero_read_quorums = [ (rq, v.varValue) for (rq, v) in zip(read_quorums, read_quorum_vars) diff --git a/quorums/strategy.py b/quorums/strategy.py index fba7251..df6682e 100644 --- a/quorums/strategy.py +++ b/quorums/strategy.py @@ -69,6 +69,34 @@ class Strategy(Generic[T]): return sum(weight * self._load(fr) for (fr, weight) in d.items()) + # TODO(mwhittaker): Rename throughput. + def capacity(self, + read_fraction: Optional[Distribution] = None, + write_fraction: Optional[Distribution] = None) \ + -> float: + return 1 / self.load(read_fraction, write_fraction) + + def network_load(self, + read_fraction: Optional[Distribution] = None, + write_fraction: Optional[Distribution] = None) -> float: + d = distribution.canonicalize_rw(read_fraction, write_fraction) + fr = sum(weight * f for (f, weight) in d.items()) + read_network_load = fr * sum( + len(rq) * p + for (rq, p) in zip(self.reads, self.read_weights) + ) + write_network_load = (1 - fr) * sum( + len(wq) * p + for (wq, p) in zip(self.writes, self.write_weights) + ) + return read_network_load + write_network_load + + def latency(self, + read_fraction: Optional[Distribution] = None, + write_fraction: Optional[Distribution] = None) -> float: + # TODO(mwhittaker): Implement. + return 0 + def node_load(self, node: Node[T], read_fraction: Optional[Distribution] = None, @@ -78,11 +106,21 @@ class Strategy(Generic[T]): return sum(weight * self._node_load(node.x, fr) for (fr, weight) in d.items()) - def capacity(self, - read_fraction: Optional[Distribution] = None, - write_fraction: Optional[Distribution] = None) \ - -> float: - return 1 / self.load(read_fraction, write_fraction) + def node_utilization(self, + node: Node[T], + read_fraction: Optional[Distribution] = None, + write_fraction: Optional[Distribution] = None) \ + -> float: + # TODO(mwhittaker): Implement. + return 0.0 + + def node_throghput(self, + node: Node[T], + read_fraction: Optional[Distribution] = None, + write_fraction: Optional[Distribution] = None) \ + -> float: + # TODO(mwhittaker): Implement. + return 0.0 def _node_load(self, x: T, fr: float) -> float: """