diff --git a/quorums/quorums.py b/quorums/quorums.py index f373b6f..05c8dcf 100644 --- a/quorums/quorums.py +++ b/quorums/quorums.py @@ -1,5 +1,6 @@ from typing import (Dict, Iterator, Generic, List, Optional, Set, Tuple, TypeVar, Union) +import collections import itertools import numpy as np import pulp @@ -212,11 +213,10 @@ class QuorumSystem(Generic[T]): return f'QuorumSystem(reads={self.reads}, writes={self.writes})' def strategy(self, read_fraction: Distribution) -> 'Strategy[T]': - # TODO(mwhittaker): Implement. - reads = list(self.read_quorums()) - writes = list(self.write_quorums()) - return ExplicitStrategy(reads, [1 / len(reads)] * len(reads), - writes, [1 / len(writes)] * len(writes)) + # TODO(mwhittaker): Allow read_fraction or write_fraction. + # TODO(mwhittaker): Implement independent strategy. + return self._load_optimal_strategy( + _canonicalize_distribution(read_fraction)) def is_read_quorum(self, xs: Set[T]) -> bool: return self.reads.is_quorum(xs) @@ -227,9 +227,58 @@ class QuorumSystem(Generic[T]): def write_quorums(self) -> Iterator[Set[T]]: return self.writes.quorums() + def _load_optimal_strategy(self, + read_fraction: Dict[float, float]) -> \ + 'Strategy[T]': + fr = sum(f * weight for (f, weight) in read_fraction.items()) + reads = list(self.read_quorums()) + writes = list(self.write_quorums()) + + read_load: Dict[T, List[pulp.LpVariable]] = collections.defaultdict(list) + read_weights: List[pulp.LpVariable] = [] + for (i, r) in enumerate(reads): + v = pulp.LpVariable(f'r{i}', 0, 1) + read_weights.append(v) + for node in r: + read_load[node].append(v) + + write_load: Dict[T, List[pulp.LpVariable]] = collections.defaultdict(list) + write_weights: List[pulp.LpVariable] = [] + for (i, r) in enumerate(writes): + v = pulp.LpVariable(f'w{i}', 0, 1) + write_weights.append(v) + for node in r: + write_load[node].append(v) + + # Form the linear program to find the load. + problem = pulp.LpProblem("load", pulp.LpMinimize) + + # If we're trying to balance the strategy, then we want to minimize the + # pairwise absolute differences between the read probabilities and the + # write probabilities. + l = pulp.LpVariable('l', 0, 1) + problem += l + problem += (sum(read_weights) == 1, 'valid read strategy') + problem += (sum(write_weights) == 1, 'valid write strategy') + for node in read_load.keys() | write_load.keys(): + node_load: pulp.LpAffineExpression = 0 + if node in read_load: + node_load += fr * sum(read_load[node]) + if node in write_load: + node_load += (1 - fr) * sum(write_load[node]) + problem += (node_load <= l, node) + + # print(problem) + problem.solve(pulp.apis.PULP_CBC_CMD(msg=False)) + return ExplicitStrategy(reads, [v.varValue for v in read_weights], + writes, [v.varValue for v in write_weights]) + # for v in read_weights + write_weights: + # print(f'{v.name} = {v.varValue}') + # return l.varValue + class Strategy(Generic[T]): - def load(self, read_fraction: Distribution) -> int: + def load(self, read_fraction: Distribution) -> float: raise NotImplementedError def get_read_quorum(self) -> Set[T]: @@ -250,10 +299,48 @@ class ExplicitStrategy(Strategy[T]): self.writes = writes self.write_weights = write_weights + def __str__(self) -> str: + non_zero_reads = {tuple(r): p + for (r, p) in zip(self.reads, self.read_weights) + if p > 0} + non_zero_writes = {tuple(w): p + for (w, p) in zip(self.writes, self.write_weights) + if p > 0} + return (f'ExplicitStrategy(reads={non_zero_reads}, ' + + f'writes={non_zero_writes})') + + def __repr__(self) -> str: + return (f'ExplicitStrategy(reads={self.reads}, ' + + f'read_weights={self.read_weights},' + + f'writes={self.writes}, ' + + f'write_weights={self.write_weights})') + # TODO(mwhittaker): Implement __str__ and __repr__. - def load(self, read_fraction: Distribution) -> int: - raise NotImplementedError + def load(self, read_fraction: Distribution) -> float: + d = _canonicalize_distribution(read_fraction) + fr = sum(f * weight for (f, weight) in d.items()) + + read_load: Dict[T, float] = collections.defaultdict(float) + for (r, p) in zip(self.reads, self.read_weights): + for node in r: + read_load[node] += p + + write_load: Dict[T, float] = collections.defaultdict(float) + for (w, p) in zip(self.writes, self.write_weights): + for node in w: + write_load[node] += p + + node_loads: List[float] = [] + for node in read_load.keys() | write_load.keys(): + node_load = 0.0 + if node in read_load: + node_load += fr * read_load[node] + if node in write_load: + node_load += (1 - fr) * write_load[node] + node_loads.append(node_load) + + return max(node_loads) def get_read_quorum(self) -> Set[T]: return np.random.choice(self.reads, p=self.read_weights) @@ -271,10 +358,21 @@ f = Node('f') g = Node('g') h = Node('h') i = Node('i') -grid = QuorumSystem(reads=a*b*c + d*e*f + g*h*i) -sigma = grid.strategy(0.1) -for _ in range(10): - print(sigma.get_write_quorum()) +# grid = QuorumSystem(reads=a*b*c + d*e*f + g*h*i) +# sigma = grid.strategy(0.1) +# print(grid) +# print(sigma) + +wpaxos = QuorumSystem(reads=majority([majority([a, b, c]), + majority([d, e, f]), + majority([g, h, i])])) +sigma_1 = wpaxos.strategy(read_fraction=0.1) +sigma_5 = wpaxos.strategy(read_fraction=0.5) +sigma_9 = wpaxos.strategy(read_fraction=0.9) +sigma_even = wpaxos.strategy(read_fraction={0.1: 2, 0.5: 2, 0.9: 1}) +for sigma in [sigma_1, sigma_5, sigma_9, sigma_even]: + frs = [0.1, 0.5, 0.9, {0.1: 2, 0.5: 2, 0.9: 1}] + print([sigma.load(fr) for fr in frs]) # - num_quorums # - has dups?