diff --git a/tests/tolstoy.rs b/tests/tolstoy.rs index 744973f6..d515a3ae 100644 --- a/tests/tolstoy.rs +++ b/tests/tolstoy.rs @@ -115,6 +115,8 @@ fn test_reader() { let mut receiver = TestingReceiver::new(); Processor::process(&c, &mut receiver).expect("processor"); + println!("{:#?}", receiver); + assert_eq!(2, receiver.txes.keys().count()); assert_tx_datoms_count(&receiver, 1, 4); diff --git a/tolstoy/src/tx_processor.rs b/tolstoy/src/tx_processor.rs index 89e4293a..05545634 100644 --- a/tolstoy/src/tx_processor.rs +++ b/tolstoy/src/tx_processor.rs @@ -7,6 +7,9 @@ // // under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR // // CONDITIONS OF ANY KIND, either express or implied. See the License for the // // specific language governing permissions and limitations under the License. +extern crate core; + +use self::core::iter::Peekable; use rusqlite; @@ -29,6 +32,7 @@ pub struct TxPart { pub e: Entid, pub a: Entid, pub v: TypedValue, + pub tx: Entid, pub added: bool, } @@ -45,28 +49,25 @@ pub trait TxReceiver { pub struct Processor {} -struct RawDatom { - e: Entid, - a: Entid, - v: TypedValue, // composite of 'v' and 'value_type_tag' - tx: Entid, - added: bool, -} +type DemRows<'s> = Peekable>>; pub struct DatomsIterator<'conn> { at_first: bool, at_last: bool, - first: &'conn RawDatom, - rows: &'conn mut Iterator>, + first: &'conn TxPart, + rows: DemRows<'conn>, } impl<'conn> DatomsIterator<'conn> { - fn new(first: &'conn RawDatom, rows: &'conn mut Iterator>) -> DatomsIterator<'conn> { + fn new(first: &'conn TxPart, rows: &'conn mut DemRows<'conn>) -> DatomsIterator<'conn> + where + I: Iterator> + { DatomsIterator { at_first: true, at_last: false, first: first, - rows: rows + rows: Box::new(rows), } } } @@ -81,14 +82,30 @@ impl<'conn> Iterator for DatomsIterator<'conn> { if self.at_first { self.at_first = false; - return Some(Ok(TxPart { - e: self.first.e, - a: self.first.a, - v: self.first.v.clone(), - added: self.first.added, - })); + return Some(Ok(self.first.clone())); } + // Look ahead to see if we're about to cross into + // the next partition. + { + let next_option = self.rows.peek(); + match next_option { + Some(next_result) => { + match *next_result { + Ok(ref next) => { + if next.tx != self.first.tx { + self.at_last = true; + return None; + } + }, + _ => () + } + }, + _ => () + } + } + + // We're in the correct partition. if let Some(row) = self.rows.next() { let datom = match row { Ok(r) => r, @@ -106,6 +123,7 @@ impl<'conn> Iterator for DatomsIterator<'conn> { e: datom.e, a: datom.a, v: datom.v.clone(), + tx: datom.tx, added: datom.added, })); } else { @@ -121,8 +139,8 @@ impl Processor { "SELECT e, a, v, tx, added, value_type_tag FROM transactions ORDER BY tx" )?; - let mut rows = stmt.query_and_then(&[], |row| -> Result { - Ok(RawDatom { + let mut rows = stmt.query_and_then(&[], |row| -> Result { + Ok(TxPart { e: row.get(0), a: row.get(1), v: TypedValue::from_sql_value_pair(row.get(2), row.get(5))?, @@ -133,9 +151,11 @@ impl Processor { let mut current_tx = None; - while let Some(row) = rows.next() { + let mut peekable = rows.peekable(); + while let Some(row) = peekable.next() { let datom = row?; + // if current_tx == Some(row.tx) match current_tx { Some(tx) => { if tx == datom.tx { @@ -144,7 +164,7 @@ impl Processor { current_tx = Some(datom.tx); receiver.tx( datom.tx, - &mut DatomsIterator::new(&datom, &mut rows) + &mut DatomsIterator::new(&datom, &mut peekable) )?; continue; } @@ -153,7 +173,7 @@ impl Processor { current_tx = Some(datom.tx); receiver.tx( datom.tx, - &mut DatomsIterator::new(&datom, &mut rows) + &mut DatomsIterator::new(&datom, &mut peekable) )?; } }