iterating process first pass with tests

This commit is contained in:
Grisha Kruglov 2018-02-05 17:45:56 -05:00
parent 0f957f242b
commit deb6ba29a5
4 changed files with 255 additions and 172 deletions

View file

@ -12,28 +12,97 @@ extern crate mentat;
extern crate mentat_core; extern crate mentat_core;
extern crate mentat_tolstoy; extern crate mentat_tolstoy;
use std::collections::BTreeMap;
use mentat::conn::Conn; use mentat::conn::Conn;
use mentat::new_connection; use mentat::new_connection;
use mentat_tolstoy::tx_client::{ use mentat_tolstoy::tx_processor::{
TxReader, TxReceiver,
TxClient, DatomsIterator,
Processor,
TxPart,
}; };
use mentat_tolstoy::errors::Result;
use mentat_core::{ use mentat_core::{
ValueType, ValueType,
TypedValue TypedValue,
Entid,
}; };
struct TxCountingReceiver {
pub tx_count: u32,
pub is_done: bool,
}
impl TxCountingReceiver {
fn new() -> TxCountingReceiver {
TxCountingReceiver {
tx_count: 0,
is_done: false
}
}
}
impl TxReceiver for TxCountingReceiver {
fn tx(&mut self, tx_id: Entid, d: &mut DatomsIterator) -> Result<()> {
self.tx_count = self.tx_count + 1;
Ok(())
}
fn done(&mut self) -> Result<()> {
self.is_done = true;
Ok(())
}
}
#[derive(Debug)]
struct TestingReceiver {
pub txes: BTreeMap<Entid, Vec<TxPart>>,
pub is_done: bool,
}
impl TestingReceiver {
fn new() -> TestingReceiver {
TestingReceiver {
txes: BTreeMap::new(),
is_done: false,
}
}
}
impl TxReceiver for TestingReceiver {
fn tx(&mut self, tx_id: Entid, d: &mut DatomsIterator) -> Result<()> {
let datoms = self.txes.entry(tx_id).or_insert(vec![]);
for datom in d {
datoms.push(datom?);
}
Ok(())
}
fn done(&mut self) -> Result<()> {
self.is_done = true;
Ok(())
}
}
fn assert_tx_datoms_count(receiver: &TestingReceiver, tx_num: usize, expected_datoms: usize) {
let tx = receiver.txes.keys().nth(tx_num).expect("first tx");
let datoms = receiver.txes.get(tx).expect("datoms");
assert_eq!(expected_datoms, datoms.len());
}
#[test] #[test]
fn test_reader() { fn test_reader() {
let mut c = new_connection("").expect("Couldn't open conn."); let mut c = new_connection("").expect("Couldn't open conn.");
let mut conn = Conn::connect(&mut c).expect("Couldn't open DB."); let mut conn = Conn::connect(&mut c).expect("Couldn't open DB.");
let txes = TxClient::all(&c).expect("bootstrap transactions");
// Don't inspect the bootstrap transaction, but we'd like to see it's there. // Don't inspect the bootstrap transaction, but we'd like to see it's there.
assert_eq!(1, txes.len()); let mut receiver = TxCountingReceiver::new();
assert_eq!(94, txes[0].parts.len()); assert_eq!(false, receiver.is_done);
Processor::process(&c, &mut receiver).expect("processor");
assert_eq!(true, receiver.is_done);
assert_eq!(1, receiver.tx_count);
let ids = conn.transact(&mut c, r#"[ let ids = conn.transact(&mut c, r#"[
[:db/add "s" :db/ident :foo/numba] [:db/add "s" :db/ident :foo/numba]
@ -42,34 +111,33 @@ fn test_reader() {
]"#).expect("successful transaction").tempids; ]"#).expect("successful transaction").tempids;
let numba_entity_id = ids.get("s").unwrap(); let numba_entity_id = ids.get("s").unwrap();
let txes = TxClient::all(&c).expect("got transactions"); // Expect to see one more transaction of four parts (one for tx datom itself).
let mut receiver = TestingReceiver::new();
Processor::process(&c, &mut receiver).expect("processor");
// Expect to see one more transaction of three parts. assert_eq!(2, receiver.txes.keys().count());
assert_eq!(2, txes.len()); assert_tx_datoms_count(&receiver, 1, 4);
assert_eq!(3, txes[1].parts.len());
println!("{:?}", txes[1]);
let ids = conn.transact(&mut c, r#"[ let ids = conn.transact(&mut c, r#"[
[:db/add "b" :foo/numba 123] [:db/add "b" :foo/numba 123]
]"#).expect("successful transaction").tempids; ]"#).expect("successful transaction").tempids;
let asserted_e = ids.get("b").unwrap(); let asserted_e = ids.get("b").unwrap();
let txes = TxClient::all(&c).expect("got transactions"); // Expect to see a single two part transaction
let mut receiver = TestingReceiver::new();
Processor::process(&c, &mut receiver).expect("processor");
// Expect to see a single part transactions assert_eq!(3, receiver.txes.keys().count());
// TODO verify that tx itself looks sane assert_tx_datoms_count(&receiver, 2, 2);
assert_eq!(3, txes.len());
assert_eq!(1, txes[2].parts.len());
// Inspect the transaction part. // Inspect the transaction part.
let part = &txes[2].parts[0]; let tx_id = receiver.txes.keys().nth(2).expect("tx");
let datoms = receiver.txes.get(tx_id).expect("datoms");
let part = &datoms[0];
assert_eq!(asserted_e, &part.e); assert_eq!(asserted_e, &part.e);
assert_eq!(numba_entity_id, &part.a); assert_eq!(numba_entity_id, &part.a);
assert!(part.v.matches_type(ValueType::Long)); assert!(part.v.matches_type(ValueType::Long));
assert_eq!(TypedValue::Long(123), part.v); assert_eq!(TypedValue::Long(123), part.v);
assert_eq!(true, part.added); assert_eq!(true, part.added);
// TODO retractions
} }

View file

@ -27,5 +27,5 @@ extern crate itertools;
pub mod schema; pub mod schema;
pub mod metadata; pub mod metadata;
pub mod tx_client; pub mod tx_processor;
pub mod errors; pub mod errors;

View file

@ -1,149 +0,0 @@
// Copyright 2016 Mozilla
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// this file except in compliance with the License. You may obtain a copy of the
// License at http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
// use itertools::structs::Batching;
use itertools::Itertools;
use rusqlite;
use errors::{
Result,
};
use mentat_db::{
entids,
TypedSQLValue,
};
use mentat_core::{
TypedValue,
Entid,
};
#[derive(Debug, Clone)]
pub struct TxPart {
pub e: Entid,
pub a: Entid,
pub v: TypedValue,
pub added: bool,
}
// Notes on 'parts' representation:
// Currently it's suitable for uses which necessitate pulling in the entire tx into memory,
// and don't require efficient querying/monitoring by attributes of parts.
//
// Example: streaming transactions to/from the server.
//
// In the future, consider:
// - A structure that makes typical tx-listener questions — "does this transaction mention
// an attribute or entity I care about?" — efficient. That might be a trie, it might be a
// bunch of extra data structures (e.g., a set of mentioned attributes), or something else.
// With an unsorted Vec<TxPart>, looking for a mentioned attribute requires linear search of the entire vector.
// - A structure that doesn't require pulling the entire tx into memory. This might be a cursor,
// a rowid range, or something else that's scoped to the lifetime of a particular database transaction,
// in order to preserve isolation.
#[derive(Debug, Clone)]
pub struct Tx {
pub tx: Entid,
pub tx_instant: TypedValue,
pub parts: Vec<TxPart>,
}
struct RawDatom {
e: Entid,
a: Entid,
v: TypedValue, // composite of 'v' and 'value_type_tag'
tx: Entid,
added: bool,
}
// type TxIter<I, F> = Batching<I, F>;
pub trait TxReader {
fn all(sqlite: &rusqlite::Connection) -> Result<Vec<Tx>>;
}
pub struct TxClient {}
impl TxReader for TxClient {
// TODO what should a type signature look like for this to return the
// batching iterator?
fn all(sqlite: &rusqlite::Connection) -> Result<Vec<Tx>> {
let mut stmt = sqlite.prepare(
"SELECT e, a, v, tx, added, value_type_tag FROM transactions ORDER BY tx"
)?;
let row_iterator = stmt.query_and_then(&[], |row| -> Result<RawDatom> {
Ok(RawDatom {
e: row.get(0),
a: row.get(1),
v: TypedValue::from_sql_value_pair(row.get(2), row.get(5))?,
tx: row.get(3),
added: row.get(4),
})
})?;
let txes_iterator = row_iterator.batching(|rows| {
let mut has_next_tx = false;
let mut next_tx: Option<Box<Tx>> = None;
// Our rows are partitioned into batches that represent transactions,
// thanks to ORDER BY clause above. We come up with a transaction by
// iterating through rows until we have its full representation.
// TODO place limits to safeguard against bad data?
loop {
if let Some(datom) = rows.next() {
let datom = match datom {
Ok(d) => d,
Err(_) => break // TODO propagate error from from_sql_value_pair above
};
let part = TxPart {
e: datom.e,
a: datom.a,
v: datom.v.clone(),
added: datom.added,
};
if !has_next_tx {
next_tx = Some(Box::new(Tx {
tx: datom.tx,
tx_instant: datom.v.clone(),
parts: vec![part],
}));
has_next_tx = true;
} else {
// Datom represents a transaction, we're done with this chunk of rows.
if datom.a == entids::DB_TX_INSTANT && datom.tx == datom.e {
match next_tx {
Some(ref mut t) => {t.tx_instant = part.v;},
None => break // TODO bad state
}
break;
// Datom represents a transaction part - take a note of it, continue iterating.
} else {
match next_tx {
Some(ref mut t) => {t.parts.push(part);},
None => break // TODO bad state
}
}
}
} else {
break;
}
}
// TODO due to TODOs above, this is ambiguous:
// either there's no transaction, or something went wrong!
next_tx
}).map(|t| *t);
// TODO just return the iterator...
Ok(txes_iterator.collect())
}
}

164
tolstoy/src/tx_processor.rs Normal file
View file

@ -0,0 +1,164 @@
// // Copyright 2016 Mozilla
// //
// // Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// // this file except in compliance with the License. You may obtain a copy of the
// // License at http://www.apache.org/licenses/LICENSE-2.0
// // Unless required by applicable law or agreed to in writing, software distributed
// // under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// // CONDITIONS OF ANY KIND, either express or implied. See the License for the
// // specific language governing permissions and limitations under the License.
use rusqlite;
use errors::{
Result,
};
use mentat_db::{
entids,
TypedSQLValue,
};
use mentat_core::{
TypedValue,
Entid,
};
#[derive(Debug, Clone)]
pub struct TxPart {
pub e: Entid,
pub a: Entid,
pub v: TypedValue,
pub added: bool,
}
#[derive(Debug, Clone)]
pub struct Tx {
pub tx: Entid,
pub tx_instant: TypedValue,
}
pub trait TxReceiver {
fn tx(&mut self, tx_id: Entid, d: &mut DatomsIterator) -> Result<()>;
fn done(&mut self) -> Result<()>;
}
pub struct Processor {}
struct RawDatom {
e: Entid,
a: Entid,
v: TypedValue, // composite of 'v' and 'value_type_tag'
tx: Entid,
added: bool,
}
pub struct DatomsIterator<'conn> {
at_first: bool,
at_last: bool,
first: &'conn RawDatom,
rows: &'conn mut Iterator<Item=Result<RawDatom>>,
}
impl<'conn> DatomsIterator<'conn> {
fn new(first: &'conn RawDatom, rows: &'conn mut Iterator<Item=Result<RawDatom>>) -> DatomsIterator<'conn> {
DatomsIterator {
at_first: true,
at_last: false,
first: first,
rows: rows
}
}
}
impl<'conn> Iterator for DatomsIterator<'conn> {
type Item = Result<TxPart>;
fn next(&mut self) -> Option<Self::Item> {
if self.at_last {
return None;
}
if self.at_first {
self.at_first = false;
return Some(Ok(TxPart {
e: self.first.e,
a: self.first.a,
v: self.first.v.clone(),
added: self.first.added,
}));
}
if let Some(row) = self.rows.next() {
let datom = match row {
Ok(r) => r,
Err(e) => {
self.at_last = true;
return Some(Err(e));
}
};
if datom.a == entids::DB_TX_INSTANT && datom.tx == datom.e {
self.at_last = true;
}
return Some(Ok(TxPart {
e: datom.e,
a: datom.a,
v: datom.v.clone(),
added: datom.added,
}));
} else {
self.at_last = true;
return None;
}
}
}
impl Processor {
pub fn process(sqlite: &rusqlite::Connection, receiver: &mut TxReceiver) -> Result<()> {
let mut stmt = sqlite.prepare(
"SELECT e, a, v, tx, added, value_type_tag FROM transactions ORDER BY tx"
)?;
let mut rows = stmt.query_and_then(&[], |row| -> Result<RawDatom> {
Ok(RawDatom {
e: row.get(0),
a: row.get(1),
v: TypedValue::from_sql_value_pair(row.get(2), row.get(5))?,
tx: row.get(3),
added: row.get(4),
})
})?;
let mut current_tx = None;
while let Some(row) = rows.next() {
let datom = row?;
match current_tx {
Some(tx) => {
if tx == datom.tx {
continue;
} else {
current_tx = Some(datom.tx);
receiver.tx(
datom.tx,
&mut DatomsIterator::new(&datom, &mut rows)
)?;
continue;
}
},
None => {
current_tx = Some(datom.tx);
receiver.tx(
datom.tx,
&mut DatomsIterator::new(&datom, &mut rows)
)?;
}
}
}
receiver.done()?;
Ok(())
}
}