diff --git a/tests/tolstoy.rs b/tests/tolstoy.rs index 42576c96..744973f6 100644 --- a/tests/tolstoy.rs +++ b/tests/tolstoy.rs @@ -12,28 +12,97 @@ extern crate mentat; extern crate mentat_core; extern crate mentat_tolstoy; +use std::collections::BTreeMap; + use mentat::conn::Conn; use mentat::new_connection; -use mentat_tolstoy::tx_client::{ - TxReader, - TxClient, +use mentat_tolstoy::tx_processor::{ + TxReceiver, + DatomsIterator, + Processor, + TxPart, }; +use mentat_tolstoy::errors::Result; use mentat_core::{ ValueType, - TypedValue + TypedValue, + Entid, }; +struct TxCountingReceiver { + pub tx_count: u32, + pub is_done: bool, +} + +impl TxCountingReceiver { + fn new() -> TxCountingReceiver { + TxCountingReceiver { + tx_count: 0, + is_done: false + } + } +} + +impl TxReceiver for TxCountingReceiver { + fn tx(&mut self, tx_id: Entid, d: &mut DatomsIterator) -> Result<()> { + self.tx_count = self.tx_count + 1; + Ok(()) + } + + fn done(&mut self) -> Result<()> { + self.is_done = true; + Ok(()) + } +} + +#[derive(Debug)] +struct TestingReceiver { + pub txes: BTreeMap>, + pub is_done: bool, +} + +impl TestingReceiver { + fn new() -> TestingReceiver { + TestingReceiver { + txes: BTreeMap::new(), + is_done: false, + } + } +} + +impl TxReceiver for TestingReceiver { + fn tx(&mut self, tx_id: Entid, d: &mut DatomsIterator) -> Result<()> { + let datoms = self.txes.entry(tx_id).or_insert(vec![]); + for datom in d { + datoms.push(datom?); + } + Ok(()) + } + + fn done(&mut self) -> Result<()> { + self.is_done = true; + Ok(()) + } +} + +fn assert_tx_datoms_count(receiver: &TestingReceiver, tx_num: usize, expected_datoms: usize) { + let tx = receiver.txes.keys().nth(tx_num).expect("first tx"); + let datoms = receiver.txes.get(tx).expect("datoms"); + assert_eq!(expected_datoms, datoms.len()); +} + #[test] fn test_reader() { let mut c = new_connection("").expect("Couldn't open conn."); let mut conn = Conn::connect(&mut c).expect("Couldn't open DB."); - let txes = TxClient::all(&c).expect("bootstrap transactions"); - // Don't inspect the bootstrap transaction, but we'd like to see it's there. - assert_eq!(1, txes.len()); - assert_eq!(94, txes[0].parts.len()); + let mut receiver = TxCountingReceiver::new(); + assert_eq!(false, receiver.is_done); + Processor::process(&c, &mut receiver).expect("processor"); + assert_eq!(true, receiver.is_done); + assert_eq!(1, receiver.tx_count); let ids = conn.transact(&mut c, r#"[ [:db/add "s" :db/ident :foo/numba] @@ -42,34 +111,33 @@ fn test_reader() { ]"#).expect("successful transaction").tempids; let numba_entity_id = ids.get("s").unwrap(); - let txes = TxClient::all(&c).expect("got transactions"); + // Expect to see one more transaction of four parts (one for tx datom itself). + let mut receiver = TestingReceiver::new(); + Processor::process(&c, &mut receiver).expect("processor"); - // Expect to see one more transaction of three parts. - assert_eq!(2, txes.len()); - assert_eq!(3, txes[1].parts.len()); - - println!("{:?}", txes[1]); + assert_eq!(2, receiver.txes.keys().count()); + assert_tx_datoms_count(&receiver, 1, 4); let ids = conn.transact(&mut c, r#"[ [:db/add "b" :foo/numba 123] ]"#).expect("successful transaction").tempids; let asserted_e = ids.get("b").unwrap(); - let txes = TxClient::all(&c).expect("got transactions"); + // Expect to see a single two part transaction + let mut receiver = TestingReceiver::new(); + Processor::process(&c, &mut receiver).expect("processor"); - // Expect to see a single part transactions - // TODO verify that tx itself looks sane - assert_eq!(3, txes.len()); - assert_eq!(1, txes[2].parts.len()); + assert_eq!(3, receiver.txes.keys().count()); + assert_tx_datoms_count(&receiver, 2, 2); // Inspect the transaction part. - let part = &txes[2].parts[0]; + let tx_id = receiver.txes.keys().nth(2).expect("tx"); + let datoms = receiver.txes.get(tx_id).expect("datoms"); + let part = &datoms[0]; assert_eq!(asserted_e, &part.e); assert_eq!(numba_entity_id, &part.a); assert!(part.v.matches_type(ValueType::Long)); assert_eq!(TypedValue::Long(123), part.v); assert_eq!(true, part.added); - - // TODO retractions } diff --git a/tolstoy/src/lib.rs b/tolstoy/src/lib.rs index ee5ab559..9ca6abad 100644 --- a/tolstoy/src/lib.rs +++ b/tolstoy/src/lib.rs @@ -27,5 +27,5 @@ extern crate itertools; pub mod schema; pub mod metadata; -pub mod tx_client; +pub mod tx_processor; pub mod errors; diff --git a/tolstoy/src/tx_client.rs b/tolstoy/src/tx_client.rs deleted file mode 100644 index 8338f02b..00000000 --- a/tolstoy/src/tx_client.rs +++ /dev/null @@ -1,149 +0,0 @@ -// Copyright 2016 Mozilla -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use -// this file except in compliance with the License. You may obtain a copy of the -// License at http://www.apache.org/licenses/LICENSE-2.0 -// Unless required by applicable law or agreed to in writing, software distributed -// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR -// CONDITIONS OF ANY KIND, either express or implied. See the License for the -// specific language governing permissions and limitations under the License. - -// use itertools::structs::Batching; -use itertools::Itertools; - -use rusqlite; - -use errors::{ - Result, -}; - -use mentat_db::{ - entids, - TypedSQLValue, -}; - -use mentat_core::{ - TypedValue, - Entid, -}; - -#[derive(Debug, Clone)] -pub struct TxPart { - pub e: Entid, - pub a: Entid, - pub v: TypedValue, - pub added: bool, -} - -// Notes on 'parts' representation: -// Currently it's suitable for uses which necessitate pulling in the entire tx into memory, -// and don't require efficient querying/monitoring by attributes of parts. -// -// Example: streaming transactions to/from the server. -// -// In the future, consider: -// - A structure that makes typical tx-listener questions — "does this transaction mention -// an attribute or entity I care about?" — efficient. That might be a trie, it might be a -// bunch of extra data structures (e.g., a set of mentioned attributes), or something else. -// With an unsorted Vec, looking for a mentioned attribute requires linear search of the entire vector. -// - A structure that doesn't require pulling the entire tx into memory. This might be a cursor, -// a rowid range, or something else that's scoped to the lifetime of a particular database transaction, -// in order to preserve isolation. -#[derive(Debug, Clone)] -pub struct Tx { - pub tx: Entid, - pub tx_instant: TypedValue, - pub parts: Vec, -} - -struct RawDatom { - e: Entid, - a: Entid, - v: TypedValue, // composite of 'v' and 'value_type_tag' - tx: Entid, - added: bool, -} - -// type TxIter = Batching; - -pub trait TxReader { - fn all(sqlite: &rusqlite::Connection) -> Result>; -} - -pub struct TxClient {} - -impl TxReader for TxClient { - // TODO what should a type signature look like for this to return the - // batching iterator? - fn all(sqlite: &rusqlite::Connection) -> Result> { - let mut stmt = sqlite.prepare( - "SELECT e, a, v, tx, added, value_type_tag FROM transactions ORDER BY tx" - )?; - - let row_iterator = stmt.query_and_then(&[], |row| -> Result { - Ok(RawDatom { - e: row.get(0), - a: row.get(1), - v: TypedValue::from_sql_value_pair(row.get(2), row.get(5))?, - tx: row.get(3), - added: row.get(4), - }) - })?; - - let txes_iterator = row_iterator.batching(|rows| { - let mut has_next_tx = false; - let mut next_tx: Option> = None; - - // Our rows are partitioned into batches that represent transactions, - // thanks to ORDER BY clause above. We come up with a transaction by - // iterating through rows until we have its full representation. - // TODO place limits to safeguard against bad data? - loop { - if let Some(datom) = rows.next() { - let datom = match datom { - Ok(d) => d, - Err(_) => break // TODO propagate error from from_sql_value_pair above - }; - let part = TxPart { - e: datom.e, - a: datom.a, - v: datom.v.clone(), - added: datom.added, - }; - if !has_next_tx { - next_tx = Some(Box::new(Tx { - tx: datom.tx, - tx_instant: datom.v.clone(), - parts: vec![part], - })); - has_next_tx = true; - } else { - // Datom represents a transaction, we're done with this chunk of rows. - if datom.a == entids::DB_TX_INSTANT && datom.tx == datom.e { - match next_tx { - Some(ref mut t) => {t.tx_instant = part.v;}, - None => break // TODO bad state - } - break; - // Datom represents a transaction part - take a note of it, continue iterating. - } else { - match next_tx { - Some(ref mut t) => {t.parts.push(part);}, - None => break // TODO bad state - } - } - } - } else { - break; - } - } - - // TODO due to TODOs above, this is ambiguous: - // either there's no transaction, or something went wrong! - next_tx - }).map(|t| *t); - - // TODO just return the iterator... - Ok(txes_iterator.collect()) - } -} diff --git a/tolstoy/src/tx_processor.rs b/tolstoy/src/tx_processor.rs new file mode 100644 index 00000000..89e4293a --- /dev/null +++ b/tolstoy/src/tx_processor.rs @@ -0,0 +1,164 @@ +// // Copyright 2016 Mozilla +// // +// // Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// // this file except in compliance with the License. You may obtain a copy of the +// // License at http://www.apache.org/licenses/LICENSE-2.0 +// // Unless required by applicable law or agreed to in writing, software distributed +// // under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// // CONDITIONS OF ANY KIND, either express or implied. See the License for the +// // specific language governing permissions and limitations under the License. + +use rusqlite; + +use errors::{ + Result, +}; + +use mentat_db::{ + entids, + TypedSQLValue, +}; + +use mentat_core::{ + TypedValue, + Entid, +}; + +#[derive(Debug, Clone)] +pub struct TxPart { + pub e: Entid, + pub a: Entid, + pub v: TypedValue, + pub added: bool, +} + +#[derive(Debug, Clone)] +pub struct Tx { + pub tx: Entid, + pub tx_instant: TypedValue, +} + +pub trait TxReceiver { + fn tx(&mut self, tx_id: Entid, d: &mut DatomsIterator) -> Result<()>; + fn done(&mut self) -> Result<()>; +} + +pub struct Processor {} + +struct RawDatom { + e: Entid, + a: Entid, + v: TypedValue, // composite of 'v' and 'value_type_tag' + tx: Entid, + added: bool, +} + +pub struct DatomsIterator<'conn> { + at_first: bool, + at_last: bool, + first: &'conn RawDatom, + rows: &'conn mut Iterator>, +} + +impl<'conn> DatomsIterator<'conn> { + fn new(first: &'conn RawDatom, rows: &'conn mut Iterator>) -> DatomsIterator<'conn> { + DatomsIterator { + at_first: true, + at_last: false, + first: first, + rows: rows + } + } +} + +impl<'conn> Iterator for DatomsIterator<'conn> { + type Item = Result; + + fn next(&mut self) -> Option { + if self.at_last { + return None; + } + + if self.at_first { + self.at_first = false; + return Some(Ok(TxPart { + e: self.first.e, + a: self.first.a, + v: self.first.v.clone(), + added: self.first.added, + })); + } + + if let Some(row) = self.rows.next() { + let datom = match row { + Ok(r) => r, + Err(e) => { + self.at_last = true; + return Some(Err(e)); + } + }; + + if datom.a == entids::DB_TX_INSTANT && datom.tx == datom.e { + self.at_last = true; + } + + return Some(Ok(TxPart { + e: datom.e, + a: datom.a, + v: datom.v.clone(), + added: datom.added, + })); + } else { + self.at_last = true; + return None; + } + } +} + +impl Processor { + pub fn process(sqlite: &rusqlite::Connection, receiver: &mut TxReceiver) -> Result<()> { + let mut stmt = sqlite.prepare( + "SELECT e, a, v, tx, added, value_type_tag FROM transactions ORDER BY tx" + )?; + + let mut rows = stmt.query_and_then(&[], |row| -> Result { + Ok(RawDatom { + e: row.get(0), + a: row.get(1), + v: TypedValue::from_sql_value_pair(row.get(2), row.get(5))?, + tx: row.get(3), + added: row.get(4), + }) + })?; + + let mut current_tx = None; + + while let Some(row) = rows.next() { + let datom = row?; + + match current_tx { + Some(tx) => { + if tx == datom.tx { + continue; + } else { + current_tx = Some(datom.tx); + receiver.tx( + datom.tx, + &mut DatomsIterator::new(&datom, &mut rows) + )?; + continue; + } + }, + None => { + current_tx = Some(datom.tx); + receiver.tx( + datom.tx, + &mut DatomsIterator::new(&datom, &mut rows) + )?; + } + } + } + receiver.done()?; + Ok(()) + } +}