diff --git a/tests/tolstoy.rs b/tests/tolstoy.rs index b9fbc973..42576c96 100644 --- a/tests/tolstoy.rs +++ b/tests/tolstoy.rs @@ -17,7 +17,7 @@ use mentat::conn::Conn; use mentat::new_connection; use mentat_tolstoy::tx_client::{ TxReader, - TxClient + TxClient, }; use mentat_core::{ ValueType, diff --git a/tolstoy/Cargo.toml b/tolstoy/Cargo.toml index 36b83016..f2afd43f 100644 --- a/tolstoy/Cargo.toml +++ b/tolstoy/Cargo.toml @@ -12,6 +12,7 @@ serde = "1.0" serde_json = "1.0" serde_derive = "1.0" lazy_static = "0.2" +itertools = "0.6.5" uuid = { version = "0.5", features = ["v4", "serde"] } error-chain = { git = "https://github.com/rnewman/error-chain", branch = "rnewman/sync" } diff --git a/tolstoy/src/lib.rs b/tolstoy/src/lib.rs index 05896d64..ee5ab559 100644 --- a/tolstoy/src/lib.rs +++ b/tolstoy/src/lib.rs @@ -23,6 +23,7 @@ extern crate mentat_db; extern crate mentat_core; extern crate rusqlite; extern crate uuid; +extern crate itertools; pub mod schema; pub mod metadata; diff --git a/tolstoy/src/tx_client.rs b/tolstoy/src/tx_client.rs index a9d3e5a0..8338f02b 100644 --- a/tolstoy/src/tx_client.rs +++ b/tolstoy/src/tx_client.rs @@ -8,8 +8,8 @@ // CONDITIONS OF ANY KIND, either express or implied. See the License for the // specific language governing permissions and limitations under the License. -use std::collections::BTreeMap; -use std::collections::btree_map::Entry; +// use itertools::structs::Batching; +use itertools::Itertools; use rusqlite; @@ -27,7 +27,7 @@ use mentat_core::{ Entid, }; -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct TxPart { pub e: Entid, pub a: Entid, @@ -49,7 +49,7 @@ pub struct TxPart { // - A structure that doesn't require pulling the entire tx into memory. This might be a cursor, // a rowid range, or something else that's scoped to the lifetime of a particular database transaction, // in order to preserve isolation. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct Tx { pub tx: Entid, pub tx_instant: TypedValue, @@ -64,6 +64,8 @@ struct RawDatom { added: bool, } +// type TxIter = Batching; + pub trait TxReader { fn all(sqlite: &rusqlite::Connection) -> Result>; } @@ -71,11 +73,14 @@ pub trait TxReader { pub struct TxClient {} impl TxReader for TxClient { + // TODO what should a type signature look like for this to return the + // batching iterator? fn all(sqlite: &rusqlite::Connection) -> Result> { let mut stmt = sqlite.prepare( - "SELECT e, a, v, tx, added, value_type_tag FROM transactions" + "SELECT e, a, v, tx, added, value_type_tag FROM transactions ORDER BY tx" )?; - let datoms: Vec> = stmt.query_and_then(&[], |row| -> Result { + + let row_iterator = stmt.query_and_then(&[], |row| -> Result { Ok(RawDatom { e: row.get(0), a: row.get(1), @@ -83,54 +88,62 @@ impl TxReader for TxClient { tx: row.get(3), added: row.get(4), }) - })?.collect(); + })?; - // It's convenient to have a consistently ordered set of results, - // so we use a sorting map. - let mut txes_by_tx: BTreeMap = BTreeMap::new(); + let txes_iterator = row_iterator.batching(|rows| { + let mut has_next_tx = false; + let mut next_tx: Option> = None; - // On first pass, build our Txes and TxParts for each. - for datom_result in datoms { - let datom = datom_result?; - - // Datom represents a transaction. - if datom.a == entids::DB_TX_INSTANT && datom.tx == datom.e { - // Does the Tx already exist in the map? That means we've inserted it - // with an incomplete tx_instant; update it. - if let Entry::Occupied(mut tx) = txes_by_tx.entry(datom.tx) { - tx.get_mut().tx_instant = datom.v; - continue; + // Our rows are partitioned into batches that represent transactions, + // thanks to ORDER BY clause above. We come up with a transaction by + // iterating through rows until we have its full representation. + // TODO place limits to safeguard against bad data? + loop { + if let Some(datom) = rows.next() { + let datom = match datom { + Ok(d) => d, + Err(_) => break // TODO propagate error from from_sql_value_pair above + }; + let part = TxPart { + e: datom.e, + a: datom.a, + v: datom.v.clone(), + added: datom.added, + }; + if !has_next_tx { + next_tx = Some(Box::new(Tx { + tx: datom.tx, + tx_instant: datom.v.clone(), + parts: vec![part], + })); + has_next_tx = true; + } else { + // Datom represents a transaction, we're done with this chunk of rows. + if datom.a == entids::DB_TX_INSTANT && datom.tx == datom.e { + match next_tx { + Some(ref mut t) => {t.tx_instant = part.v;}, + None => break // TODO bad state + } + break; + // Datom represents a transaction part - take a note of it, continue iterating. + } else { + match next_tx { + Some(ref mut t) => {t.parts.push(part);}, + None => break // TODO bad state + } + } + } + } else { + break; } - // Otherwise, insert brand new Tx into our map. - txes_by_tx.insert(datom.tx, Tx { - tx: datom.tx, - tx_instant: datom.v, - parts: Vec::new(), - }); - // Datom represents a transaction part. - } else { - let part = TxPart { - e: datom.e, - a: datom.a, - v: datom.v, - added: datom.added, - }; - // Does the Tx for this part already exist in the map? - // Append this part to the parts list. - if let Entry::Occupied(mut tx) = txes_by_tx.entry(datom.tx) { - tx.get_mut().parts.push(part); - continue; - } - // Otherwise, insert the Tx with the current part in its parts list. - txes_by_tx.insert(datom.tx, Tx { - tx: datom.tx, - tx_instant: TypedValue::Long(0), // to be set as we iterate - parts: vec![part], - }); } - } - // Finally, consume the Tx map into a Vec of its values. - Ok(txes_by_tx.into_iter().map(|(_, tx)| tx).collect()) + // TODO due to TODOs above, this is ambiguous: + // either there's no transaction, or something went wrong! + next_tx + }).map(|t| *t); + + // TODO just return the iterator... + Ok(txes_iterator.collect()) } }