Added search procedure.

This commit is contained in:
Michael Whittaker 2021-01-31 12:31:22 -08:00
parent a42bccaf0c
commit dced6388af
5 changed files with 388 additions and 6 deletions

View file

@ -1,5 +1,6 @@
from .expr import Node, choose, majority
from .quorum_system import QuorumSystem, Strategy
from .quorum_system import NoStrategyFoundError, QuorumSystem, Strategy
from .search import NoQuorumSystemFoundError, search
from .viz import (
plot_node_load,
plot_node_load_on,

View file

@ -246,6 +246,12 @@ class Choose(Expr[T]):
def choose(k: int, es: List[Expr[T]]) -> Expr[T]:
if len(es) == 0:
raise ValueError('no expressions provided')
if not (1 <= k <= len(es)):
raise ValueError('k must be in the range [1, len(es)]')
if k == 1:
return Or(es)
elif k == len(es):
@ -255,4 +261,7 @@ def choose(k: int, es: List[Expr[T]]) -> Expr[T]:
def majority(es: List[Expr[T]]) -> Expr[T]:
if len(es) == 0:
raise ValueError('no expressions provided')
return choose(len(es) // 2 + 1, es)

View file

@ -20,6 +20,11 @@ LATENCY = 'latency'
# TODO(mwhittaker): Add some other non-optimal strategies.
# TODO(mwhittaker): Make it easy to make arbitrary strategies.
class NoStrategyFoundError(ValueError):
pass
class QuorumSystem(Generic[T]):
def __init__(self, reads: Optional[Expr[T]] = None,
writes: Optional[Expr[T]] = None) -> None:
@ -167,9 +172,11 @@ class QuorumSystem(Generic[T]):
read_quorums = list(self._f_resilient_quorums(f, xs, self.reads))
write_quorums = list(self._f_resilient_quorums(f, xs, self.reads))
if len(read_quorums) == 0:
raise ValueError(f'There are no {f}-resilient read quorums')
raise NoStrategyFoundError(
f'There are no {f}-resilient read quorums')
if len(write_quorums) == 0:
raise ValueError(f'There are no {f}-resilient write quorums')
raise NoStrategyFoundError(
f'There are no {f}-resilient write quorums')
read_quorums = self._minimize(read_quorums)
write_quorums = self._minimize(write_quorums)
@ -219,9 +226,11 @@ class QuorumSystem(Generic[T]):
read_quorums = list(self._f_resilient_quorums(f, xs, self.reads))
write_quorums = list(self._f_resilient_quorums(f, xs, self.reads))
if len(read_quorums) == 0:
raise ValueError(f'There are no {f}-resilient read quorums')
raise NoStrategyFoundError(
f'There are no {f}-resilient read quorums')
if len(write_quorums) == 0:
raise ValueError(f'There are no {f}-resilient write quorums')
raise NoStrategyFoundError(
f'There are no {f}-resilient write quorums')
return self._load_optimal_strategy(
read_quorums,
write_quorums,
@ -541,7 +550,8 @@ class QuorumSystem(Generic[T]):
# Solve the linear program.
problem.solve(pulp.apis.PULP_CBC_CMD(msg=False))
if problem.status != pulp.LpStatusOptimal:
raise ValueError('no strategy satisfies the given constraints')
raise NoStrategyFoundError(
'no strategy satisfies the given constraints')
# Prune out any quorums with 0 probability.
sigma_r = {

135
quorums/search.py Normal file
View file

@ -0,0 +1,135 @@
from .distribution import Distribution
from .expr import choose, Expr, Node
from .quorum_system import (LATENCY, LOAD, NETWORK, NoStrategyFoundError,
QuorumSystem, Strategy)
from typing import Iterator, List, Optional, TypeVar
import datetime
import itertools
T = TypeVar('T')
class NoQuorumSystemFoundError(ValueError):
pass
def _partitionings(xs: List[T]) -> Iterator[List[List[T]]]:
"""
_partitionings(xs) yields all possible partitionings of xs. For example,
_partitionings([1, 2, 3]) yields
[[1], [2], [3]]
[[1, 2], [3]]
[[1, 3], [2]]
[[2, 3], [1]]
[[1, 2, 3]]
"""
if len(xs) == 0:
return
def helper(xs: List[T]) -> Iterator[List[List[T]]]:
if len(xs) == 0:
yield []
return
x = xs[0]
for partition in helper(xs[1:]):
yield [[x]] + partition
for i in range(len(partition)):
yield partition[:i] + [[x] + partition[i]] + partition[i+1:]
yield from helper(xs)
def _dup_free_exprs(nodes: List[Node[T]],
max_height: int = 0) -> Iterator[Expr[T]]:
"""
_dup_free_exprs(nodes) yields all possible duplicate free expressions over
`nodes` with height at most `max_height`. If `max_height` is not positive,
there is no height limit. Note that an expression might be yielded more
than once.
"""
assert len(nodes) > 0
if len(nodes) == 1:
yield nodes[0]
return
if max_height == 1:
for k in range(1, len(nodes) + 1):
yield choose(k, nodes) # type: ignore
return
for partitioning in _partitionings(nodes):
# We ignore the partitioning that includes every node in a single
# partition.
if len(partitioning) == 1:
continue
subiterators = [_dup_free_exprs(p, max_height-1) for p in partitioning]
for subexprs in itertools.product(*subiterators):
for k in range(1, len(subexprs) + 1):
yield choose(k, list(subexprs))
def search(nodes: List[Node[T]],
read_fraction: Optional[Distribution] = None,
write_fraction: Optional[Distribution] = None,
optimize: str = LOAD,
resilience: int = 0,
load_limit: Optional[float] = None,
network_limit: Optional[float] = None,
latency_limit: Optional[datetime.timedelta] = None,
f: int = 0,
timeout: datetime.timedelta = datetime.timedelta(seconds=0)) \
-> QuorumSystem[T]:
start_time = datetime.datetime.now()
def metric(sigma: Strategy[T]) -> float:
if optimize == LOAD:
return sigma.load(read_fraction, write_fraction)
elif optimize == NETWORK:
return sigma.network_load(read_fraction, write_fraction)
else:
return sigma.latency(read_fraction, write_fraction).total_seconds()
opt_qs: Optional[QuorumSystem[T]] = None
opt_metric: Optional[float] = None
def do_search(exprs: Iterator[Expr[T]]) -> None:
nonlocal opt_qs
nonlocal opt_metric
for reads in exprs:
qs = QuorumSystem(reads=reads)
if qs.resilience() < resilience:
continue
try:
sigma = qs.strategy(optimize = optimize,
load_limit = load_limit,
network_limit = network_limit,
latency_limit = latency_limit,
read_fraction = read_fraction,
write_fraction = write_fraction,
f = f)
sigma_metric = metric(sigma)
if opt_metric is None or sigma_metric < opt_metric:
opt_qs = qs
opt_metric = sigma_metric
except NoStrategyFoundError:
pass
if (timeout != datetime.timedelta(seconds=0) and
datetime.datetime.now() - start_time >= timeout):
return
do_search(_dup_free_exprs(nodes, max_height=2))
do_search(_dup_free_exprs(nodes))
if opt_qs is None:
raise ValueError('no quorum system found')
else:
return opt_qs

227
tests/test_search.py Normal file
View file

@ -0,0 +1,227 @@
from quorums import *
from quorums.expr import Expr
from quorums.search import _dup_free_exprs, _partitionings
from typing import Any, FrozenSet, List
import datetime
import unittest
class TestSearch(unittest.TestCase):
def test_partitions(self):
def setify(partitions: List[List[List[Any]]]) \
-> FrozenSet[FrozenSet[FrozenSet[Any]]]:
return frozenset(frozenset(frozenset(s) for s in partition)
for partition in partitions)
def assert_equal(x: List[List[Any]], y: List[List[Any]]):
self.assertEqual(setify(x), setify(y))
assert_equal(list(_partitionings([])), [])
assert_equal(list(_partitionings([1])), [[[1]]])
assert_equal(list(_partitionings([1, 2])), [
[[1], [2]],
[[1, 2]],
])
assert_equal(list(_partitionings([1, 2, 3])), [
[[1], [2], [3]],
[[1, 2], [3]],
[[1, 3], [2]],
[[2, 3], [1]],
[[1, 2, 3]],
])
assert_equal(list(_partitionings([1, 2, 3, 4])), [
[[1], [2], [3], [4]],
[[1, 2], [3], [4]],
[[1, 3], [2], [4]],
[[1, 4], [2], [3]],
[[2, 3], [1], [4]],
[[2, 4], [1], [3]],
[[3, 4], [1], [2]],
[[1, 2], [3, 4]],
[[1, 3], [2, 4]],
[[1, 4], [2, 3]],
[[2, 3, 4], [1]],
[[1, 3, 4], [2]],
[[1, 2, 4], [3]],
[[1, 2, 3], [4]],
[[1, 2, 3, 4]],
])
def test_dup_free_exprs(self):
def quorums(e: Expr) -> FrozenSet[FrozenSet[Any]]:
return frozenset(frozenset(q) for q in e.quorums())
def assert_equal(xs: List[Expr], ys: List[Expr]) -> None:
self.assertEqual(frozenset(quorums(x) for x in xs),
frozenset(quorums(y) for y in ys))
a = Node('a')
b = Node('b')
c = Node('c')
d = Node('d')
assert_equal(list(_dup_free_exprs([a])), [a])
assert_equal(list(_dup_free_exprs([a, b])), [
a + b,
a * b,
])
assert_equal(list(_dup_free_exprs([a, b, c])), [
a + b + c,
choose(2, [a, b, c]),
a * b * c,
(a + b) + c,
(a + b) * c,
(a * b) + c,
(a * b) * c,
(a + c) + b,
(a + c) * b,
(a * c) + b,
(a * c) * b,
(b + c) + a,
(b + c) * a,
(b * c) + a,
(b * c) * a,
])
assert_equal(list(_dup_free_exprs([a, b, c], max_height=1)), [
a + b + c,
choose(2, [a, b, c]),
a * b * c,
])
assert_equal(list(_dup_free_exprs([a, b, c, d], max_height=1)), [
a + b + c + d,
choose(2, [a, b, c, d]),
choose(3, [a, b, c, d]),
a * b * c * d,
])
assert_equal(list(_dup_free_exprs([a, b, c, d], max_height=2)), [
a + b + c + d,
choose(2, [a, b, c, d]),
choose(3, [a, b, c, d]),
a * b * c * d,
(a + b) + c + d,
(a + b) * c * d,
(a * b) + c + d,
(a * b) * c * d,
choose(2, [a + b, c, d]),
choose(2, [a * b, c, d]),
(a + c) + b + d,
(a + c) * b * d,
(a * c) + b + d,
(a * c) * b * d,
choose(2, [a + c, b, d]),
choose(2, [a * c, b, d]),
(a + d) + b + c,
(a + d) * b * c,
(a * d) + b + c,
(a * d) * b * c,
choose(2, [a + d, b, c]),
choose(2, [a * d, b, c]),
(b + c) + a + d,
(b + c) * a * d,
(b * c) + a + d,
(b * c) * a * d,
choose(2, [b + c, a, d]),
choose(2, [b * c, a, d]),
(b + d) + a + c,
(b + d) * a * c,
(b * d) + a + c,
(b * d) * a * c,
choose(2, [b + d, a, c]),
choose(2, [b * d, a, c]),
(c + d) + a + b,
(c + d) * a * b,
(c * d) + a + b,
(c * d) * a * b,
choose(2, [c + d, a, b]),
choose(2, [c * d, a, b]),
(a + b) + (c + d),
(a + b) + (c * d),
(a + b) * (c + d),
(a + b) * (c * d),
(a * b) + (c + d),
(a * b) + (c * d),
(a * b) * (c + d),
(a * b) * (c * d),
(a + c) + (b + d),
(a + c) + (b * d),
(a + c) * (b + d),
(a + c) * (b * d),
(a * c) + (b + d),
(a * c) + (b * d),
(a * c) * (b + d),
(a * c) * (b * d),
(a + d) + (b + c),
(a + d) + (b * c),
(a + d) * (b + c),
(a + d) * (b * c),
(a * d) + (b + c),
(a * d) + (b * c),
(a * d) * (b + c),
(a * d) * (b * c),
a + (b + c + d),
a + (b * c * d),
a * (b + c + d),
a * (b * c * d),
a + choose(2, [b, c, d]),
a * choose(2, [b, c, d]),
b + (a + c + d),
b + (a * c * d),
b * (a + c + d),
b * (a * c * d),
b + choose(2, [a, c, d]),
b * choose(2, [a, c, d]),
c + (a + b + d),
c + (a * b * d),
c * (a + b + d),
c * (a * b * d),
c + choose(2, [a, b, d]),
c * choose(2, [a, b, d]),
d + (a + b + c),
d + (a * b * c),
d * (a + b + c),
d * (a * b * c),
d + choose(2, [a, b, c]),
d * choose(2, [a, b, c]),
])
def test_search(self):
a = Node('a', capacity=1, latency=datetime.timedelta(seconds=2))
b = Node('b', capacity=2, latency=datetime.timedelta(seconds=1))
c = Node('c', capacity=1, latency=datetime.timedelta(seconds=2))
d = Node('d', capacity=2, latency=datetime.timedelta(seconds=1))
e = Node('e', capacity=1, latency=datetime.timedelta(seconds=2))
f = Node('f', capacity=2, latency=datetime.timedelta(seconds=1))
for fr in [0, 0.5, 1]:
search([a, b, c], read_fraction=fr)
search([a, b, c], read_fraction=fr, optimize='network')
search([a, b, c], read_fraction=fr, optimize='latency')
search([a, b, c], read_fraction=fr, resilience=1)
search([a, b, c], read_fraction=fr, f=1)
search([a, b, c],
read_fraction=0.25,
network_limit=3,
latency_limit=datetime.timedelta(seconds=2))
for fr in [0, 0.5]:
t = datetime.timedelta(seconds=0.25)
nodes = [a, b, c, d, e, f]
search(nodes, read_fraction=fr, timeout=t)
search(nodes, read_fraction=fr, timeout=t, optimize='network')
search(nodes, read_fraction=fr, timeout=t, optimize='latency')
search(nodes, read_fraction=fr, timeout=t, resilience=1)
search(nodes, read_fraction=fr, timeout=t, f=1)