quoracle/tests/test_search.py
2021-02-04 20:59:10 -08:00

227 lines
7.1 KiB
Python

from quoracle import *
from quoracle.expr import Expr
from quoracle.search import _dup_free_exprs, _partitionings
from typing import Any, FrozenSet, List
import datetime
import unittest
class TestSearch(unittest.TestCase):
def test_partitions(self):
def setify(partitions: List[List[List[Any]]]) \
-> FrozenSet[FrozenSet[FrozenSet[Any]]]:
return frozenset(frozenset(frozenset(s) for s in partition)
for partition in partitions)
def assert_equal(x: List[List[Any]], y: List[List[Any]]):
self.assertEqual(setify(x), setify(y))
assert_equal(list(_partitionings([])), [])
assert_equal(list(_partitionings([1])), [[[1]]])
assert_equal(list(_partitionings([1, 2])), [
[[1], [2]],
[[1, 2]],
])
assert_equal(list(_partitionings([1, 2, 3])), [
[[1], [2], [3]],
[[1, 2], [3]],
[[1, 3], [2]],
[[2, 3], [1]],
[[1, 2, 3]],
])
assert_equal(list(_partitionings([1, 2, 3, 4])), [
[[1], [2], [3], [4]],
[[1, 2], [3], [4]],
[[1, 3], [2], [4]],
[[1, 4], [2], [3]],
[[2, 3], [1], [4]],
[[2, 4], [1], [3]],
[[3, 4], [1], [2]],
[[1, 2], [3, 4]],
[[1, 3], [2, 4]],
[[1, 4], [2, 3]],
[[2, 3, 4], [1]],
[[1, 3, 4], [2]],
[[1, 2, 4], [3]],
[[1, 2, 3], [4]],
[[1, 2, 3, 4]],
])
def test_dup_free_exprs(self):
def quorums(e: Expr) -> FrozenSet[FrozenSet[Any]]:
return frozenset(frozenset(q) for q in e.quorums())
def assert_equal(xs: List[Expr], ys: List[Expr]) -> None:
self.assertEqual(frozenset(quorums(x) for x in xs),
frozenset(quorums(y) for y in ys))
a = Node('a')
b = Node('b')
c = Node('c')
d = Node('d')
assert_equal(list(_dup_free_exprs([a])), [a])
assert_equal(list(_dup_free_exprs([a, b])), [
a + b,
a * b,
])
assert_equal(list(_dup_free_exprs([a, b, c])), [
a + b + c,
choose(2, [a, b, c]),
a * b * c,
(a + b) + c,
(a + b) * c,
(a * b) + c,
(a * b) * c,
(a + c) + b,
(a + c) * b,
(a * c) + b,
(a * c) * b,
(b + c) + a,
(b + c) * a,
(b * c) + a,
(b * c) * a,
])
assert_equal(list(_dup_free_exprs([a, b, c], max_height=1)), [
a + b + c,
choose(2, [a, b, c]),
a * b * c,
])
assert_equal(list(_dup_free_exprs([a, b, c, d], max_height=1)), [
a + b + c + d,
choose(2, [a, b, c, d]),
choose(3, [a, b, c, d]),
a * b * c * d,
])
assert_equal(list(_dup_free_exprs([a, b, c, d], max_height=2)), [
a + b + c + d,
choose(2, [a, b, c, d]),
choose(3, [a, b, c, d]),
a * b * c * d,
(a + b) + c + d,
(a + b) * c * d,
(a * b) + c + d,
(a * b) * c * d,
choose(2, [a + b, c, d]),
choose(2, [a * b, c, d]),
(a + c) + b + d,
(a + c) * b * d,
(a * c) + b + d,
(a * c) * b * d,
choose(2, [a + c, b, d]),
choose(2, [a * c, b, d]),
(a + d) + b + c,
(a + d) * b * c,
(a * d) + b + c,
(a * d) * b * c,
choose(2, [a + d, b, c]),
choose(2, [a * d, b, c]),
(b + c) + a + d,
(b + c) * a * d,
(b * c) + a + d,
(b * c) * a * d,
choose(2, [b + c, a, d]),
choose(2, [b * c, a, d]),
(b + d) + a + c,
(b + d) * a * c,
(b * d) + a + c,
(b * d) * a * c,
choose(2, [b + d, a, c]),
choose(2, [b * d, a, c]),
(c + d) + a + b,
(c + d) * a * b,
(c * d) + a + b,
(c * d) * a * b,
choose(2, [c + d, a, b]),
choose(2, [c * d, a, b]),
(a + b) + (c + d),
(a + b) + (c * d),
(a + b) * (c + d),
(a + b) * (c * d),
(a * b) + (c + d),
(a * b) + (c * d),
(a * b) * (c + d),
(a * b) * (c * d),
(a + c) + (b + d),
(a + c) + (b * d),
(a + c) * (b + d),
(a + c) * (b * d),
(a * c) + (b + d),
(a * c) + (b * d),
(a * c) * (b + d),
(a * c) * (b * d),
(a + d) + (b + c),
(a + d) + (b * c),
(a + d) * (b + c),
(a + d) * (b * c),
(a * d) + (b + c),
(a * d) + (b * c),
(a * d) * (b + c),
(a * d) * (b * c),
a + (b + c + d),
a + (b * c * d),
a * (b + c + d),
a * (b * c * d),
a + choose(2, [b, c, d]),
a * choose(2, [b, c, d]),
b + (a + c + d),
b + (a * c * d),
b * (a + c + d),
b * (a * c * d),
b + choose(2, [a, c, d]),
b * choose(2, [a, c, d]),
c + (a + b + d),
c + (a * b * d),
c * (a + b + d),
c * (a * b * d),
c + choose(2, [a, b, d]),
c * choose(2, [a, b, d]),
d + (a + b + c),
d + (a * b * c),
d * (a + b + c),
d * (a * b * c),
d + choose(2, [a, b, c]),
d * choose(2, [a, b, c]),
])
def test_search(self):
a = Node('a', capacity=1, latency=datetime.timedelta(seconds=2))
b = Node('b', capacity=2, latency=datetime.timedelta(seconds=1))
c = Node('c', capacity=1, latency=datetime.timedelta(seconds=2))
d = Node('d', capacity=2, latency=datetime.timedelta(seconds=1))
e = Node('e', capacity=1, latency=datetime.timedelta(seconds=2))
f = Node('f', capacity=2, latency=datetime.timedelta(seconds=1))
for fr in [0, 0.5, 1]:
search([a, b, c], read_fraction=fr)
search([a, b, c], read_fraction=fr, optimize='network')
search([a, b, c], read_fraction=fr, optimize='latency')
search([a, b, c], read_fraction=fr, resilience=1)
search([a, b, c], read_fraction=fr, f=1)
search([a, b, c],
read_fraction=0.25,
network_limit=3,
latency_limit=datetime.timedelta(seconds=2))
for fr in [0, 0.5]:
t = datetime.timedelta(seconds=0.25)
nodes = [a, b, c, d, e, f]
search(nodes, read_fraction=fr, timeout=t)
search(nodes, read_fraction=fr, timeout=t, optimize='network')
search(nodes, read_fraction=fr, timeout=t, optimize='latency')
search(nodes, read_fraction=fr, timeout=t, resilience=1)
search(nodes, read_fraction=fr, timeout=t, f=1)