Make things work.

This commit is contained in:
Richard Newman 2018-02-05 17:11:19 -08:00
parent 53248c5840
commit 44168680f5
2 changed files with 39 additions and 48 deletions

View file

@ -45,7 +45,8 @@ impl TxCountingReceiver {
} }
impl TxReceiver for TxCountingReceiver { impl TxReceiver for TxCountingReceiver {
fn tx(&mut self, tx_id: Entid, d: &mut DatomsIterator) -> Result<()> { fn tx<T>(&mut self, tx_id: Entid, d: &mut T) -> Result<()>
where T: Iterator<Item=TxPart> {
self.tx_count = self.tx_count + 1; self.tx_count = self.tx_count + 1;
Ok(()) Ok(())
} }
@ -72,10 +73,11 @@ impl TestingReceiver {
} }
impl TxReceiver for TestingReceiver { impl TxReceiver for TestingReceiver {
fn tx(&mut self, tx_id: Entid, d: &mut DatomsIterator) -> Result<()> { fn tx<T>(&mut self, tx_id: Entid, d: &mut T) -> Result<()>
where T: Iterator<Item=TxPart> {
let datoms = self.txes.entry(tx_id).or_insert(vec![]); let datoms = self.txes.entry(tx_id).or_insert(vec![]);
for datom in d { for datom in d {
datoms.push(datom?); datoms.push(datom);
} }
Ok(()) Ok(())
} }

View file

@ -43,37 +43,37 @@ pub struct Tx {
} }
pub trait TxReceiver { pub trait TxReceiver {
fn tx(&mut self, tx_id: Entid, d: &mut DatomsIterator) -> Result<()>; fn tx<T>(&mut self, tx_id: Entid, d: &mut T) -> Result<()>
where T: Iterator<Item=TxPart>;
fn done(&mut self) -> Result<()>; fn done(&mut self) -> Result<()>;
} }
pub struct Processor {} pub struct Processor {}
type DemRows<'s> = Peekable<rusqlite::AndThenRows<'s, Result<TxPart>>>; pub struct DatomsIterator<'conn, 't, T>
where T: Sized + Iterator<Item=Result<TxPart>> + 't {
pub struct DatomsIterator<'conn> {
at_first: bool, at_first: bool,
at_last: bool, at_last: bool,
first: &'conn TxPart, first: &'conn TxPart,
rows: DemRows<'conn>, rows: &'t mut Peekable<T>,
} }
impl<'conn> DatomsIterator<'conn> { impl<'conn, 't, T> DatomsIterator<'conn, 't, T>
fn new<I>(first: &'conn TxPart, rows: &'conn mut DemRows<'conn>) -> DatomsIterator<'conn> where T: Sized + Iterator<Item=Result<TxPart>> + 't {
where fn new(first: &'conn TxPart, rows: &'t mut Peekable<T>) -> DatomsIterator<'conn, 't, T>
I: Iterator<Item=Result<TxPart>>
{ {
DatomsIterator { DatomsIterator {
at_first: true, at_first: true,
at_last: false, at_last: false,
first: first, first: first,
rows: Box::new(rows), rows: rows,
} }
} }
} }
impl<'conn> Iterator for DatomsIterator<'conn> { impl<'conn, 't, T> Iterator for DatomsIterator<'conn, 't, T>
type Item = Result<TxPart>; where T: Sized + Iterator<Item=Result<TxPart>> + 't {
type Item = TxPart;
fn next(&mut self) -> Option<Self::Item> { fn next(&mut self) -> Option<Self::Item> {
if self.at_last { if self.at_last {
@ -82,7 +82,7 @@ impl<'conn> Iterator for DatomsIterator<'conn> {
if self.at_first { if self.at_first {
self.at_first = false; self.at_first = false;
return Some(Ok(self.first.clone())); return Some(self.first.clone());
} }
// Look ahead to see if we're about to cross into // Look ahead to see if we're about to cross into
@ -90,51 +90,40 @@ impl<'conn> Iterator for DatomsIterator<'conn> {
{ {
let next_option = self.rows.peek(); let next_option = self.rows.peek();
match next_option { match next_option {
Some(next_result) => { Some(&Ok(ref next)) => {
match *next_result { if next.tx != self.first.tx {
Ok(ref next) => { self.at_last = true;
if next.tx != self.first.tx { return None;
self.at_last = true;
return None;
}
},
_ => ()
} }
}, },
_ => () _ => ()
} }
} }
// We're in the correct partition. // We're in the correct partition.
if let Some(row) = self.rows.next() { if let Some(result) = self.rows.next() {
let datom = match row { match result {
Ok(r) => r, Err(_) => None,
Err(e) => { Ok(datom) => {
self.at_last = true; Some(TxPart {
return Some(Err(e)); e: datom.e,
} a: datom.a,
}; v: datom.v.clone(),
tx: datom.tx,
if datom.a == entids::DB_TX_INSTANT && datom.tx == datom.e { added: datom.added,
self.at_last = true; })
},
} }
return Some(Ok(TxPart {
e: datom.e,
a: datom.a,
v: datom.v.clone(),
tx: datom.tx,
added: datom.added,
}));
} else { } else {
self.at_last = true; self.at_last = true;
return None; None
} }
} }
} }
impl Processor { impl Processor {
pub fn process(sqlite: &rusqlite::Connection, receiver: &mut TxReceiver) -> Result<()> { pub fn process<R>(sqlite: &rusqlite::Connection, receiver: &mut R) -> Result<()>
where R: TxReceiver {
let mut stmt = sqlite.prepare( let mut stmt = sqlite.prepare(
"SELECT e, a, v, tx, added, value_type_tag FROM transactions ORDER BY tx" "SELECT e, a, v, tx, added, value_type_tag FROM transactions ORDER BY tx"
)?; )?;
@ -154,7 +143,7 @@ impl Processor {
let mut peekable = rows.peekable(); let mut peekable = rows.peekable();
while let Some(row) = peekable.next() { while let Some(row) = peekable.next() {
let datom = row?; let datom = row?;
// if current_tx == Some(row.tx) // if current_tx == Some(row.tx)
match current_tx { match current_tx {
Some(tx) => { Some(tx) => {