broken peekable

This commit is contained in:
Grisha Kruglov 2018-02-05 19:46:03 -05:00
parent deb6ba29a5
commit 53248c5840
2 changed files with 44 additions and 22 deletions

View file

@ -115,6 +115,8 @@ fn test_reader() {
let mut receiver = TestingReceiver::new(); let mut receiver = TestingReceiver::new();
Processor::process(&c, &mut receiver).expect("processor"); Processor::process(&c, &mut receiver).expect("processor");
println!("{:#?}", receiver);
assert_eq!(2, receiver.txes.keys().count()); assert_eq!(2, receiver.txes.keys().count());
assert_tx_datoms_count(&receiver, 1, 4); assert_tx_datoms_count(&receiver, 1, 4);

View file

@ -7,6 +7,9 @@
// // under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR // // under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// // CONDITIONS OF ANY KIND, either express or implied. See the License for the // // CONDITIONS OF ANY KIND, either express or implied. See the License for the
// // specific language governing permissions and limitations under the License. // // specific language governing permissions and limitations under the License.
extern crate core;
use self::core::iter::Peekable;
use rusqlite; use rusqlite;
@ -29,6 +32,7 @@ pub struct TxPart {
pub e: Entid, pub e: Entid,
pub a: Entid, pub a: Entid,
pub v: TypedValue, pub v: TypedValue,
pub tx: Entid,
pub added: bool, pub added: bool,
} }
@ -45,28 +49,25 @@ pub trait TxReceiver {
pub struct Processor {} pub struct Processor {}
struct RawDatom { type DemRows<'s> = Peekable<rusqlite::AndThenRows<'s, Result<TxPart>>>;
e: Entid,
a: Entid,
v: TypedValue, // composite of 'v' and 'value_type_tag'
tx: Entid,
added: bool,
}
pub struct DatomsIterator<'conn> { pub struct DatomsIterator<'conn> {
at_first: bool, at_first: bool,
at_last: bool, at_last: bool,
first: &'conn RawDatom, first: &'conn TxPart,
rows: &'conn mut Iterator<Item=Result<RawDatom>>, rows: DemRows<'conn>,
} }
impl<'conn> DatomsIterator<'conn> { impl<'conn> DatomsIterator<'conn> {
fn new(first: &'conn RawDatom, rows: &'conn mut Iterator<Item=Result<RawDatom>>) -> DatomsIterator<'conn> { fn new<I>(first: &'conn TxPart, rows: &'conn mut DemRows<'conn>) -> DatomsIterator<'conn>
where
I: Iterator<Item=Result<TxPart>>
{
DatomsIterator { DatomsIterator {
at_first: true, at_first: true,
at_last: false, at_last: false,
first: first, first: first,
rows: rows rows: Box::new(rows),
} }
} }
} }
@ -81,14 +82,30 @@ impl<'conn> Iterator for DatomsIterator<'conn> {
if self.at_first { if self.at_first {
self.at_first = false; self.at_first = false;
return Some(Ok(TxPart { return Some(Ok(self.first.clone()));
e: self.first.e,
a: self.first.a,
v: self.first.v.clone(),
added: self.first.added,
}));
} }
// Look ahead to see if we're about to cross into
// the next partition.
{
let next_option = self.rows.peek();
match next_option {
Some(next_result) => {
match *next_result {
Ok(ref next) => {
if next.tx != self.first.tx {
self.at_last = true;
return None;
}
},
_ => ()
}
},
_ => ()
}
}
// We're in the correct partition.
if let Some(row) = self.rows.next() { if let Some(row) = self.rows.next() {
let datom = match row { let datom = match row {
Ok(r) => r, Ok(r) => r,
@ -106,6 +123,7 @@ impl<'conn> Iterator for DatomsIterator<'conn> {
e: datom.e, e: datom.e,
a: datom.a, a: datom.a,
v: datom.v.clone(), v: datom.v.clone(),
tx: datom.tx,
added: datom.added, added: datom.added,
})); }));
} else { } else {
@ -121,8 +139,8 @@ impl Processor {
"SELECT e, a, v, tx, added, value_type_tag FROM transactions ORDER BY tx" "SELECT e, a, v, tx, added, value_type_tag FROM transactions ORDER BY tx"
)?; )?;
let mut rows = stmt.query_and_then(&[], |row| -> Result<RawDatom> { let mut rows = stmt.query_and_then(&[], |row| -> Result<TxPart> {
Ok(RawDatom { Ok(TxPart {
e: row.get(0), e: row.get(0),
a: row.get(1), a: row.get(1),
v: TypedValue::from_sql_value_pair(row.get(2), row.get(5))?, v: TypedValue::from_sql_value_pair(row.get(2), row.get(5))?,
@ -133,9 +151,11 @@ impl Processor {
let mut current_tx = None; let mut current_tx = None;
while let Some(row) = rows.next() { let mut peekable = rows.peekable();
while let Some(row) = peekable.next() {
let datom = row?; let datom = row?;
// if current_tx == Some(row.tx)
match current_tx { match current_tx {
Some(tx) => { Some(tx) => {
if tx == datom.tx { if tx == datom.tx {
@ -144,7 +164,7 @@ impl Processor {
current_tx = Some(datom.tx); current_tx = Some(datom.tx);
receiver.tx( receiver.tx(
datom.tx, datom.tx,
&mut DatomsIterator::new(&datom, &mut rows) &mut DatomsIterator::new(&datom, &mut peekable)
)?; )?;
continue; continue;
} }
@ -153,7 +173,7 @@ impl Processor {
current_tx = Some(datom.tx); current_tx = Some(datom.tx);
receiver.tx( receiver.tx(
datom.tx, datom.tx,
&mut DatomsIterator::new(&datom, &mut rows) &mut DatomsIterator::new(&datom, &mut peekable)
)?; )?;
} }
} }