Single pass

This commit is contained in:
Grisha Kruglov 2018-02-01 19:06:03 -05:00
parent 14e28de2f5
commit 8ae1ddf03e

View file

@ -64,7 +64,6 @@ struct RawDatom {
v: TypedValue, // composite of 'v' and 'value_type_tag' v: TypedValue, // composite of 'v' and 'value_type_tag'
tx: Entid, tx: Entid,
added: bool, added: bool,
is_transaction: bool
} }
pub trait TxReader { pub trait TxReader {
@ -76,59 +75,63 @@ pub struct TxClient {}
impl TxReader for TxClient { impl TxReader for TxClient {
fn all(sqlite: &rusqlite::Connection) -> Result<Vec<Tx>> { fn all(sqlite: &rusqlite::Connection) -> Result<Vec<Tx>> {
let mut stmt = sqlite.prepare( let mut stmt = sqlite.prepare(
"SELECT "SELECT e, a, v, tx, added, value_type_tag FROM transactions"
e, a, v, tx, added, value_type_tag,
CASE a WHEN :txInstant THEN 1 ELSE 0 END is_transaction
FROM transactions"
)?; )?;
let datoms: Vec<Result<RawDatom>> = stmt.query_and_then_named(&[(":txInstant", &entids::DB_TX_INSTANT)], |row| -> Result<RawDatom> { let datoms: Vec<Result<RawDatom>> = stmt.query_and_then(&[], |row| -> Result<RawDatom> {
Ok(RawDatom { Ok(RawDatom {
e: row.get(0), e: row.get(0),
a: row.get(1), a: row.get(1),
v: TypedValue::from_sql_value_pair(row.get(2), row.get(5))?, v: TypedValue::from_sql_value_pair(row.get(2), row.get(5))?,
tx: row.get(3), tx: row.get(3),
added: row.get(4), added: row.get(4),
is_transaction: row.get(6),
}) })
})?.collect(); })?.collect();
// It's convenient to have a consistently ordered set of results, // It's convenient to have a consistently ordered set of results,
// so we use a sorting map. // so we use a sorting map.
let mut txes_by_tx = BTreeMap::new(); let mut txes_by_tx: BTreeMap<Entid, Tx> = BTreeMap::new();
let mut tx_parts_by_tx = HashMap::new();
// On first pass, build our Txes and TxParts for each. // On first pass, build our Txes and TxParts for each.
for datom_result in datoms { for datom_result in datoms {
let datom = datom_result?; let datom = datom_result?;
// Datom represents a transaction. // Datom represents a transaction.
if datom.is_transaction { if datom.a == entids::DB_TX_INSTANT {
// Does the Tx already exist in the map? That means we've inserted it
// with an incomplete tx_instant; update it.
if let Entry::Occupied(mut tx) = txes_by_tx.entry(datom.tx) {
tx.get_mut().tx_instant = datom.v;
continue;
}
// Otherwise, insert brand new Tx into our map.
txes_by_tx.insert(datom.tx, Tx { txes_by_tx.insert(datom.tx, Tx {
tx: datom.tx, tx: datom.tx,
tx_instant: datom.v, tx_instant: datom.v,
parts: Vec::new(), parts: Vec::new(),
}); });
// Datom represents part of a transaction. Our query statement above guarantees // Datom represents a transaction part.
// that we've already processed corresponding transaction at this point.
} else { } else {
let parts = tx_parts_by_tx.entry(datom.tx).or_insert(Vec::new()); // Does the Tx for this part already exist in the map?
parts.push(TxPart { // Append this part to the parts list.
let part = TxPart {
e: datom.e, e: datom.e,
a: datom.a, a: datom.a,
v: datom.v, v: datom.v,
added: datom.added, added: datom.added,
};
if let Entry::Occupied(mut tx) = txes_by_tx.entry(datom.tx) {
tx.get_mut().parts.push(part);
continue;
}
// Otherwise, insert the Tx with the current part in its parts list.
txes_by_tx.insert(datom.tx, Tx {
tx: datom.tx,
tx_instant: TypedValue::Long(0), // to be set as we iterate
parts: vec![part],
}); });
} }
} }
// On second pass, consume TxParts map and associate parts with corresponding Txes.
for (e, tx_parts) in tx_parts_by_tx.into_iter() {
if let Entry::Occupied(mut tx) = txes_by_tx.entry(e) {
tx.get_mut().parts = tx_parts;
} else {
bail!(ErrorKind::UnexpectedState(format!("Missing transactions datoms for tx={:?}", e)));
}
}
// Finally, consume the Tx map and a Vec of its values. // Finally, consume the Tx map and a Vec of its values.
Ok(txes_by_tx.into_iter().map(|(_, tx)| tx).collect()) Ok(txes_by_tx.into_iter().map(|(_, tx)| tx).collect())
} }