From 1b84ec313408834ee6b76ad3756e6d1203cf19ee Mon Sep 17 00:00:00 2001 From: Michael Whittaker Date: Wed, 20 Jan 2021 21:52:19 -0800 Subject: [PATCH] Added read and write capacity handling. This stuff is weird because it's obvious how to adjust the LP, but understanding what things mean intuitively is weird. I settled on the new load being the inverse of the peak throughput possible. With default load, we assume capacity at every node is 1. --- quorums/quorums.py | 203 +++++++++++++++++++++++++++++++-------------- 1 file changed, 142 insertions(+), 61 deletions(-) diff --git a/quorums/quorums.py b/quorums/quorums.py index 05c8dcf..5a27c50 100644 --- a/quorums/quorums.py +++ b/quorums/quorums.py @@ -1,3 +1,6 @@ +# TODO(mwhittaker): We can define a set of read quorums that are not minimal. +# Does this mess things up? + from typing import (Dict, Iterator, Generic, List, Optional, Set, Tuple, TypeVar, Union) import collections @@ -10,6 +13,9 @@ T = TypeVar('T') class Expr(Generic[T]): + def nodes(self) -> Set['Node[T]']: + raise NotImplementedError + def quorums(self) -> Iterator[Set[T]]: raise NotImplementedError @@ -27,15 +33,43 @@ class Expr(Generic[T]): class Node(Expr[T]): - def __init__(self, x: T) -> None: + def __init__(self, + x: T, + capacity: Optional[float] = None, + read_capacity: Optional[float] = None, + write_capacity: Optional[float] = None) -> None: self.x = x + # A user either specifies capacity or (read_capacity and + # write_capacity), but not both. + if (capacity is None and + read_capacity is None and + write_capacity is None): + self.read_capacity = 1.0 + self.write_capacity = 1.0 + elif (capacity is not None and + read_capacity is None and + write_capacity is None): + self.read_capacity = capacity + self.write_capacity = capacity + elif (capacity is None and + read_capacity is not None and + write_capacity is not None): + self.read_capacity = read_capacity + self.write_capacity = write_capacity + else: + raise ValueError('You must specify capacity or (read_capacity ' + 'and write_capacity)') + def __str__(self) -> str: return str(self.x) def __repr__(self) -> str: return f'Node({self.x})' + def nodes(self) -> Set['Node[T]']: + return {self} + def quorums(self) -> Iterator[Set[T]]: yield {self.x} @@ -45,6 +79,12 @@ class Node(Expr[T]): def dual(self) -> Expr: return self + def _read_capacities(self) -> Dict[T, float]: + return {self.x: self.read_capacity} + + def _write_capacities(self) -> Dict[T, float]: + return {self.x: self.write_capacity} + class Or(Expr[T]): def __init__(self, es: List[Expr[T]]) -> None: @@ -59,6 +99,9 @@ class Or(Expr[T]): def __repr__(self) -> str: return f'Or({self.es})' + def nodes(self) -> Set[Node[T]]: + return set.union(*[e.nodes() for e in self.es]) + def quorums(self) -> Iterator[Set[T]]: for e in self.es: yield from e.quorums() @@ -83,6 +126,9 @@ class And(Expr[T]): def __repr__(self) -> str: return f'And({self.es})' + def nodes(self) -> Set[Node[T]]: + return set.union(*[e.nodes() for e in self.es]) + def quorums(self) -> Iterator[Set[T]]: for subquorums in itertools.product(*[e.quorums() for e in self.es]): yield set.union(*subquorums) @@ -108,6 +154,9 @@ class Choose(Expr[T]): def __repr__(self) -> str: return f'Chose({self.k}, {self.es})' + def nodes(self) -> Set[Node[T]]: + return set.union(*[e.nodes() for e in self.es]) + def quorums(self) -> Iterator[Set[T]]: for combo in itertools.combinations(self.es, self.k): for subquorums in itertools.product(*[e.quorums() for e in combo]): @@ -230,25 +279,34 @@ class QuorumSystem(Generic[T]): def _load_optimal_strategy(self, read_fraction: Dict[float, float]) -> \ 'Strategy[T]': + # TODO(mwhittaker): Explain f_r calculation. fr = sum(f * weight for (f, weight) in read_fraction.items()) - reads = list(self.read_quorums()) - writes = list(self.write_quorums()) - read_load: Dict[T, List[pulp.LpVariable]] = collections.defaultdict(list) - read_weights: List[pulp.LpVariable] = [] - for (i, r) in enumerate(reads): + read_quorums = list(self.read_quorums()) + write_quorums = list(self.write_quorums()) + + nodes = self.reads.nodes() | self.writes.nodes() + read_capacity = {node.x: node.read_capacity for node in nodes} + write_capacity = {node.x: node.write_capacity for node in nodes} + + read_quorum_vars: List[pulp.LpVariable] = [] + x_to_read_quorum_vars: Dict[T, List[pulp.LpVariable]] = \ + collections.defaultdict(list) + + for (i, read_quorum) in enumerate(read_quorums): v = pulp.LpVariable(f'r{i}', 0, 1) - read_weights.append(v) - for node in r: - read_load[node].append(v) + read_quorum_vars.append(v) + for x in read_quorum: + x_to_read_quorum_vars[x].append(v) - write_load: Dict[T, List[pulp.LpVariable]] = collections.defaultdict(list) - write_weights: List[pulp.LpVariable] = [] - for (i, r) in enumerate(writes): + write_quorum_vars: List[pulp.LpVariable] = [] + x_to_write_quorum_vars: Dict[T, List[pulp.LpVariable]] = \ + collections.defaultdict(list) + for (i, write_quorum) in enumerate(write_quorums): v = pulp.LpVariable(f'w{i}', 0, 1) - write_weights.append(v) - for node in r: - write_load[node].append(v) + write_quorum_vars.append(v) + for x in write_quorum: + x_to_write_quorum_vars[x].append(v) # Form the linear program to find the load. problem = pulp.LpProblem("load", pulp.LpMinimize) @@ -258,20 +316,25 @@ class QuorumSystem(Generic[T]): # write probabilities. l = pulp.LpVariable('l', 0, 1) problem += l - problem += (sum(read_weights) == 1, 'valid read strategy') - problem += (sum(write_weights) == 1, 'valid write strategy') - for node in read_load.keys() | write_load.keys(): - node_load: pulp.LpAffineExpression = 0 - if node in read_load: - node_load += fr * sum(read_load[node]) - if node in write_load: - node_load += (1 - fr) * sum(write_load[node]) - problem += (node_load <= l, node) + problem += (sum(read_quorum_vars) == 1, 'valid read strategy') + problem += (sum(write_quorum_vars) == 1, 'valid write strategy') + for node in nodes: + x = node.x + x_load: pulp.LpAffineExpression = 0 + if x in x_to_read_quorum_vars: + x_load += fr * sum(x_to_read_quorum_vars[x]) / read_capacity[x] + if x in x_to_write_quorum_vars: + x_load += ((1 - fr) * sum(x_to_write_quorum_vars[x]) / + write_capacity[x]) + problem += (x_load <= l, x) # print(problem) problem.solve(pulp.apis.PULP_CBC_CMD(msg=False)) - return ExplicitStrategy(reads, [v.varValue for v in read_weights], - writes, [v.varValue for v in write_weights]) + return ExplicitStrategy(nodes, + read_quorums, + [v.varValue for v in read_quorum_vars], + write_quorums, + [v.varValue for v in write_quorum_vars]) # for v in read_weights + write_weights: # print(f'{v.name} = {v.varValue}') # return l.varValue @@ -290,10 +353,14 @@ class Strategy(Generic[T]): class ExplicitStrategy(Strategy[T]): def __init__(self, + nodes: Set[Node[T]], reads: List[Set[T]], read_weights: List[float], writes: List[Set[T]], write_weights: List[float]) -> None: + self.nodes = nodes + self.read_capacity = {node.x: node.read_capacity for node in nodes} + self.write_capacity = {node.x: node.write_capacity for node in nodes} self.reads = reads self.read_weights = read_weights self.writes = writes @@ -310,7 +377,8 @@ class ExplicitStrategy(Strategy[T]): f'writes={non_zero_writes})') def __repr__(self) -> str: - return (f'ExplicitStrategy(reads={self.reads}, ' + + return (f'ExplicitStrategy(nodes={self.nodes}, '+ + f'reads={self.reads}, ' + f'read_weights={self.read_weights},' + f'writes={self.writes}, ' + f'write_weights={self.write_weights})') @@ -322,25 +390,28 @@ class ExplicitStrategy(Strategy[T]): fr = sum(f * weight for (f, weight) in d.items()) read_load: Dict[T, float] = collections.defaultdict(float) - for (r, p) in zip(self.reads, self.read_weights): - for node in r: - read_load[node] += p + for (read_quorum, weight) in zip(self.reads, self.read_weights): + for x in read_quorum: + read_load[x] += weight write_load: Dict[T, float] = collections.defaultdict(float) - for (w, p) in zip(self.writes, self.write_weights): - for node in w: - write_load[node] += p + for (write_quorum, weight) in zip(self.writes, self.write_weights): + for x in write_quorum: + write_load[x] += weight - node_loads: List[float] = [] - for node in read_load.keys() | write_load.keys(): - node_load = 0.0 - if node in read_load: - node_load += fr * read_load[node] - if node in write_load: - node_load += (1 - fr) * write_load[node] - node_loads.append(node_load) + loads: List[float] = [] + for node in self.nodes: + x = node.x + load = 0.0 + if x in read_load: + load += fr * read_load[x] / self.read_capacity[x] + if x in write_load: + load += (1 - fr) * write_load[x] / self.write_capacity[x] + loads.append(load) - return max(node_loads) + return max(loads) + + # TODO(mwhittaker): Add read/write load and capacity and read/write cap. def get_read_quorum(self) -> Set[T]: return np.random.choice(self.reads, p=self.read_weights) @@ -349,30 +420,40 @@ class ExplicitStrategy(Strategy[T]): return np.random.choice(self.writes, p=self.write_weights) -a = Node('a') -b = Node('b') -c = Node('c') -d = Node('d') -e = Node('e') -f = Node('f') -g = Node('g') -h = Node('h') -i = Node('i') +a = Node('a', write_capacity=200, read_capacity=400) +b = Node('b', write_capacity=100, read_capacity=200) +c = Node('c', write_capacity=50, read_capacity=100) + +qs = QuorumSystem(reads = a*b + a*c) +print(list(qs.read_quorums())) +sigma = qs.strategy(read_fraction=0.5) +print(list(qs.write_quorums())) +print(sigma) +print(1 / sigma.load(read_fraction=0.5)) + +# d = Node('d') +# e = Node('e') +# f = Node('f') +# g = Node('g') +# h = Node('h') +# i = Node('i') # grid = QuorumSystem(reads=a*b*c + d*e*f + g*h*i) # sigma = grid.strategy(0.1) # print(grid) # print(sigma) -wpaxos = QuorumSystem(reads=majority([majority([a, b, c]), - majority([d, e, f]), - majority([g, h, i])])) -sigma_1 = wpaxos.strategy(read_fraction=0.1) -sigma_5 = wpaxos.strategy(read_fraction=0.5) -sigma_9 = wpaxos.strategy(read_fraction=0.9) -sigma_even = wpaxos.strategy(read_fraction={0.1: 2, 0.5: 2, 0.9: 1}) -for sigma in [sigma_1, sigma_5, sigma_9, sigma_even]: - frs = [0.1, 0.5, 0.9, {0.1: 2, 0.5: 2, 0.9: 1}] - print([sigma.load(fr) for fr in frs]) + + +# wpaxos = QuorumSystem(reads=majority([majority([a, b, c]), +# majority([d, e, f]), +# majority([g, h, i])])) +# sigma_1 = wpaxos.strategy(read_fraction=0.1) +# sigma_5 = wpaxos.strategy(read_fraction=0.5) +# sigma_9 = wpaxos.strategy(read_fraction=0.9) +# sigma_even = wpaxos.strategy(read_fraction={0.1: 2, 0.5: 2, 0.9: 1}) +# for sigma in [sigma_1, sigma_5, sigma_9, sigma_even]: +# frs = [0.1, 0.5, 0.9, {0.1: 2, 0.5: 2, 0.9: 1}] +# print([sigma.load(fr) for fr in frs]) # - num_quorums # - has dups?