From 63b88c38d585e330e516b25e88280c9913ff8fb4 Mon Sep 17 00:00:00 2001 From: Michael Whittaker Date: Fri, 29 Jan 2021 18:09:50 -0800 Subject: [PATCH] First full stab at multiple objectives. --- quorums/quorum_system.py | 144 +++++++++++++++++++++++++++++++++++++-- quorums/strategy.py | 137 ------------------------------------- quorums/viz.py | 13 ++-- 3 files changed, 144 insertions(+), 150 deletions(-) delete mode 100644 quorums/strategy.py diff --git a/quorums/quorum_system.py b/quorums/quorum_system.py index 1e10da5..0646a99 100644 --- a/quorums/quorum_system.py +++ b/quorums/quorum_system.py @@ -2,14 +2,15 @@ # Does this mess things up? from . import distribution +from . import geometry from .distribution import Distribution from .expr import Expr, Node -from .strategy import Strategy -from typing import (Callable, Dict, Iterator, Generic, List, Optional, Set, - TypeVar) +from .geometry import Point, Segment +from typing import * import collections import datetime import itertools +import numpy as np import pulp @@ -23,7 +24,6 @@ LATENCY = 'latency' # TODO(mwhittaker): Add some other non-optimal strategies. # TODO(mwhittaker): Make it easy to make arbitrary strategies. - class QuorumSystem(Generic[T]): def __init__(self, reads: Optional[Expr[T]] = None, writes: Optional[Expr[T]] = None) -> None: @@ -46,6 +46,8 @@ class QuorumSystem(Generic[T]): raise ValueError('A QuorumSystem must be instantiated with a set ' 'of read quorums or a set of write quorums') + self.x_to_node = {node.x: node for node in self.nodes()} + def __repr__(self) -> str: return f'QuorumSystem(reads={self.reads}, writes={self.writes})' @@ -302,7 +304,7 @@ class QuorumSystem(Generic[T]): for (rq, v) in zip(read_quorums, read_quorum_vars) for quorum in [{x_to_node[x] for x in rq}] ) - write_latency = (1 - fr) * sum( + write_latency = (1. - fr) * sum( v * self._write_quorum_latency(quorum).total_seconds() for (wq, v) in zip(write_quorums, write_quorum_vars) for quorum in [{x_to_node[x] for x in wq}] @@ -377,8 +379,138 @@ class QuorumSystem(Generic[T]): (wq, v.varValue) for (wq, v) in zip(write_quorums, write_quorum_vars) if v.varValue != 0] - return Strategy(nodes, + return Strategy(self, [rq for (rq, _) in non_zero_read_quorums], [weight for (_, weight) in non_zero_read_quorums], [wq for (wq, _) in non_zero_write_quorums], [weight for (_, weight) in non_zero_write_quorums]) + + +class Strategy(Generic[T]): + def __init__(self, + qs: QuorumSystem[T], + reads: List[Set[T]], + read_weights: List[float], + writes: List[Set[T]], + write_weights: List[float]) -> None: + self.qs = qs + self.reads = reads + self.read_weights = read_weights + self.writes = writes + self.write_weights = write_weights + + self.unweighted_read_load: Dict[T, float] = \ + collections.defaultdict(float) + for (read_quorum, weight) in zip(self.reads, self.read_weights): + for x in read_quorum: + self.unweighted_read_load[x] += weight + + self.unweighted_write_load: Dict[T, float] = \ + collections.defaultdict(float) + for (write_quorum, weight) in zip(self.writes, self.write_weights): + for x in write_quorum: + self.unweighted_write_load[x] += weight + + def __str__(self) -> str: + non_zero_reads = {tuple(r): p + for (r, p) in zip(self.reads, self.read_weights) + if p > 0} + non_zero_writes = {tuple(w): p + for (w, p) in zip(self.writes, self.write_weights) + if p > 0} + return f'Strategy(reads={non_zero_reads}, writes={non_zero_writes})' + + def get_read_quorum(self) -> Set[T]: + return np.random.choice(self.reads, p=self.read_weights) + + def get_write_quorum(self) -> Set[T]: + return np.random.choice(self.writes, p=self.write_weights) + + def load(self, + read_fraction: Optional[Distribution] = None, + write_fraction: Optional[Distribution] = None) \ + -> float: + d = distribution.canonicalize_rw(read_fraction, write_fraction) + return sum(weight * self._load(fr) + for (fr, weight) in d.items()) + + # TODO(mwhittaker): Rename throughput. + def capacity(self, + read_fraction: Optional[Distribution] = None, + write_fraction: Optional[Distribution] = None) \ + -> float: + return 1 / self.load(read_fraction, write_fraction) + + def network_load(self, + read_fraction: Optional[Distribution] = None, + write_fraction: Optional[Distribution] = None) -> float: + d = distribution.canonicalize_rw(read_fraction, write_fraction) + fr = sum(weight * f for (f, weight) in d.items()) + read_network_load = fr * sum( + len(rq) * p + for (rq, p) in zip(self.reads, self.read_weights) + ) + write_network_load = (1 - fr) * sum( + len(wq) * p + for (wq, p) in zip(self.writes, self.write_weights) + ) + return read_network_load + write_network_load + + def latency(self, + read_fraction: Optional[Distribution] = None, + write_fraction: Optional[Distribution] = None) \ + -> datetime.timedelta: + d = distribution.canonicalize_rw(read_fraction, write_fraction) + fr = sum(weight * f for (f, weight) in d.items()) + + read_latency = fr * sum(( + self.qs._read_quorum_latency(quorum) * p # type: ignore + for (rq, p) in zip(self.reads, self.read_weights) + for quorum in [{self.qs.x_to_node[x] for x in rq}] + ), datetime.timedelta(seconds=0)) # type: ignore + write_latency = (1 - fr) * sum(( + self.qs._write_quorum_latency(quorum) * p # type: ignore + for (wq, p) in zip(self.writes, self.write_weights) + for quorum in [{self.qs.x_to_node[x] for x in wq}] + ), datetime.timedelta(seconds=0)) # type:ignore + return read_latency + write_latency # type: ignore + + def node_load(self, + node: Node[T], + read_fraction: Optional[Distribution] = None, + write_fraction: Optional[Distribution] = None) \ + -> float: + d = distribution.canonicalize_rw(read_fraction, write_fraction) + return sum(weight * self._node_load(node.x, fr) + for (fr, weight) in d.items()) + + def node_utilization(self, + node: Node[T], + read_fraction: Optional[Distribution] = None, + write_fraction: Optional[Distribution] = None) \ + -> float: + # TODO(mwhittaker): Implement. + return 0.0 + + def node_throghput(self, + node: Node[T], + read_fraction: Optional[Distribution] = None, + write_fraction: Optional[Distribution] = None) \ + -> float: + # TODO(mwhittaker): Implement. + return 0.0 + + def _node_load(self, x: T, fr: float) -> float: + """ + _node_load returns the load on x given a fixed read fraction fr. + """ + fw = 1 - fr + node = self.qs.x_to_node[x] + return (fr * self.unweighted_read_load[x] / node.read_capacity + + fw * self.unweighted_write_load[x] / node.write_capacity) + + def _load(self, fr: float) -> float: + """ + _load returns the load given a fixed read fraction fr. + """ + return max(self._node_load(node.x, fr) for node in self.qs.nodes()) diff --git a/quorums/strategy.py b/quorums/strategy.py deleted file mode 100644 index df6682e..0000000 --- a/quorums/strategy.py +++ /dev/null @@ -1,137 +0,0 @@ -from . import distribution -from . import geometry -from .distribution import Distribution -from .expr import Node -from .geometry import Point, Segment -from typing import Dict, Generic, List, Optional, Set, Tuple, TypeVar -import collections -import itertools -import numpy as np - - -T = TypeVar('T') - - -class Strategy(Generic[T]): - def __init__(self, - nodes: Set[Node[T]], - reads: List[Set[T]], - read_weights: List[float], - writes: List[Set[T]], - write_weights: List[float]) -> None: - self.nodes = nodes - self.read_capacity = {node.x: node.read_capacity for node in nodes} - self.write_capacity = {node.x: node.write_capacity for node in nodes} - self.reads = reads - self.read_weights = read_weights - self.writes = writes - self.write_weights = write_weights - - self.unweighted_read_load: Dict[T, float] = \ - collections.defaultdict(float) - for (read_quorum, weight) in zip(self.reads, self.read_weights): - for x in read_quorum: - self.unweighted_read_load[x] += weight - - self.unweighted_write_load: Dict[T, float] = \ - collections.defaultdict(float) - for (write_quorum, weight) in zip(self.writes, self.write_weights): - for x in write_quorum: - self.unweighted_write_load[x] += weight - - def __str__(self) -> str: - non_zero_reads = {tuple(r): p - for (r, p) in zip(self.reads, self.read_weights) - if p > 0} - non_zero_writes = {tuple(w): p - for (w, p) in zip(self.writes, self.write_weights) - if p > 0} - return f'Strategy(reads={non_zero_reads}, writes={non_zero_writes})' - - def __repr__(self) -> str: - return (f'Strategy(nodes={self.nodes}, '+ - f'reads={self.reads}, ' + - f'read_weights={self.read_weights},' + - f'writes={self.writes}, ' + - f'write_weights={self.write_weights})') - - def get_read_quorum(self) -> Set[T]: - return np.random.choice(self.reads, p=self.read_weights) - - def get_write_quorum(self) -> Set[T]: - return np.random.choice(self.writes, p=self.write_weights) - - def load(self, - read_fraction: Optional[Distribution] = None, - write_fraction: Optional[Distribution] = None) \ - -> float: - d = distribution.canonicalize_rw(read_fraction, write_fraction) - return sum(weight * self._load(fr) - for (fr, weight) in d.items()) - - # TODO(mwhittaker): Rename throughput. - def capacity(self, - read_fraction: Optional[Distribution] = None, - write_fraction: Optional[Distribution] = None) \ - -> float: - return 1 / self.load(read_fraction, write_fraction) - - def network_load(self, - read_fraction: Optional[Distribution] = None, - write_fraction: Optional[Distribution] = None) -> float: - d = distribution.canonicalize_rw(read_fraction, write_fraction) - fr = sum(weight * f for (f, weight) in d.items()) - read_network_load = fr * sum( - len(rq) * p - for (rq, p) in zip(self.reads, self.read_weights) - ) - write_network_load = (1 - fr) * sum( - len(wq) * p - for (wq, p) in zip(self.writes, self.write_weights) - ) - return read_network_load + write_network_load - - def latency(self, - read_fraction: Optional[Distribution] = None, - write_fraction: Optional[Distribution] = None) -> float: - # TODO(mwhittaker): Implement. - return 0 - - def node_load(self, - node: Node[T], - read_fraction: Optional[Distribution] = None, - write_fraction: Optional[Distribution] = None) \ - -> float: - d = distribution.canonicalize_rw(read_fraction, write_fraction) - return sum(weight * self._node_load(node.x, fr) - for (fr, weight) in d.items()) - - def node_utilization(self, - node: Node[T], - read_fraction: Optional[Distribution] = None, - write_fraction: Optional[Distribution] = None) \ - -> float: - # TODO(mwhittaker): Implement. - return 0.0 - - def node_throghput(self, - node: Node[T], - read_fraction: Optional[Distribution] = None, - write_fraction: Optional[Distribution] = None) \ - -> float: - # TODO(mwhittaker): Implement. - return 0.0 - - def _node_load(self, x: T, fr: float) -> float: - """ - _node_load returns the load on x given a fixed read fraction fr. - """ - fw = 1 - fr - return (fr * self.unweighted_read_load[x] / self.read_capacity[x] + - fw * self.unweighted_write_load[x] / self.write_capacity[x]) - - def _load(self, fr: float) -> float: - """ - _load returns the load given a fixed read fraction fr. - """ - return max(self._node_load(node.x, fr) for node in self.nodes) diff --git a/quorums/viz.py b/quorums/viz.py index 153617a..2a9a6c8 100644 --- a/quorums/viz.py +++ b/quorums/viz.py @@ -3,7 +3,7 @@ from . import geometry from .distribution import Distribution from .expr import Node from .geometry import Point, Segment -from .strategy import Strategy +from .quorum_system import Strategy from typing import Dict, List, Optional, Set, Tuple, TypeVar import collections import matplotlib @@ -34,7 +34,7 @@ def plot_node_load_on(ax: plt.Axes, write_fraction: Optional[Distribution] = None): _plot_node_load_on(ax, strategy, - nodes or list(strategy.nodes), + nodes or list(strategy.qs.nodes()), scale=1, scale_by_node_capacity=True, read_fraction=read_fraction, @@ -61,7 +61,7 @@ def plot_node_utilization_on(ax: plt.Axes, write_fraction: Optional[Distribution] = None): _plot_node_load_on(ax, strategy, - nodes or list(strategy.nodes), + nodes or list(strategy.qs.nodes()), scale=strategy.capacity(read_fraction, write_fraction), scale_by_node_capacity=True, read_fraction=read_fraction, @@ -88,7 +88,7 @@ def plot_node_throughput_on(ax: plt.Axes, write_fraction: Optional[Distribution] = None): _plot_node_load_on(ax, strategy, - nodes or list(strategy.nodes), + nodes or list(strategy.qs.nodes()), scale=strategy.capacity(read_fraction, write_fraction), scale_by_node_capacity=False, read_fraction=read_fraction, @@ -131,8 +131,7 @@ def _plot_node_load_on(ax: plt.Axes, edgecolor='white', width=0.8) for j, (bar_height, bottom) in enumerate(zip(bar_heights, bottoms)): - # TODO(mwhittaker): Fix the unhappy typechecker. - text = ''.join(str(x) for x in sorted(list(quorum))) + text = ''.join(str(x) for x in sorted(list(quorum))) # type: ignore if bar_height != 0: ax.text(x_ticks[j], bottom + bar_height / 2, text, ha='center', va='center') @@ -173,7 +172,7 @@ def _group(segments: Dict[T, Segment]) -> Dict[Segment, List[T]]: def plot_load_distribution_on(ax: plt.Axes, strategy: Strategy[T], nodes: Optional[List[Node[T]]] = None): - nodes = nodes or list(strategy.nodes) + nodes = nodes or list(strategy.qs.nodes()) # We want to plot every node's load distribution. Multiple nodes might # have the same load distribution, so we group the nodes by their