diff --git a/tolstoy/src/tx_client.rs b/tolstoy/src/tx_client.rs index 66d2d059..1373146c 100644 --- a/tolstoy/src/tx_client.rs +++ b/tolstoy/src/tx_client.rs @@ -64,7 +64,6 @@ struct RawDatom { v: TypedValue, // composite of 'v' and 'value_type_tag' tx: Entid, added: bool, - is_transaction: bool } pub trait TxReader { @@ -76,59 +75,63 @@ pub struct TxClient {} impl TxReader for TxClient { fn all(sqlite: &rusqlite::Connection) -> Result> { let mut stmt = sqlite.prepare( - "SELECT - e, a, v, tx, added, value_type_tag, - CASE a WHEN :txInstant THEN 1 ELSE 0 END is_transaction - FROM transactions" + "SELECT e, a, v, tx, added, value_type_tag FROM transactions" )?; - let datoms: Vec> = stmt.query_and_then_named(&[(":txInstant", &entids::DB_TX_INSTANT)], |row| -> Result { + let datoms: Vec> = stmt.query_and_then(&[], |row| -> Result { Ok(RawDatom { e: row.get(0), a: row.get(1), v: TypedValue::from_sql_value_pair(row.get(2), row.get(5))?, tx: row.get(3), added: row.get(4), - is_transaction: row.get(6), }) })?.collect(); // It's convenient to have a consistently ordered set of results, // so we use a sorting map. - let mut txes_by_tx = BTreeMap::new(); - let mut tx_parts_by_tx = HashMap::new(); + let mut txes_by_tx: BTreeMap = BTreeMap::new(); // On first pass, build our Txes and TxParts for each. for datom_result in datoms { let datom = datom_result?; + // Datom represents a transaction. - if datom.is_transaction { + if datom.a == entids::DB_TX_INSTANT { + // Does the Tx already exist in the map? That means we've inserted it + // with an incomplete tx_instant; update it. + if let Entry::Occupied(mut tx) = txes_by_tx.entry(datom.tx) { + tx.get_mut().tx_instant = datom.v; + continue; + } + // Otherwise, insert brand new Tx into our map. txes_by_tx.insert(datom.tx, Tx { tx: datom.tx, tx_instant: datom.v, parts: Vec::new(), }); - // Datom represents part of a transaction. Our query statement above guarantees - // that we've already processed corresponding transaction at this point. + // Datom represents a transaction part. } else { - let parts = tx_parts_by_tx.entry(datom.tx).or_insert(Vec::new()); - parts.push(TxPart { + // Does the Tx for this part already exist in the map? + // Append this part to the parts list. + let part = TxPart { e: datom.e, a: datom.a, v: datom.v, added: datom.added, + }; + if let Entry::Occupied(mut tx) = txes_by_tx.entry(datom.tx) { + tx.get_mut().parts.push(part); + continue; + } + // Otherwise, insert the Tx with the current part in its parts list. + txes_by_tx.insert(datom.tx, Tx { + tx: datom.tx, + tx_instant: TypedValue::Long(0), // to be set as we iterate + parts: vec![part], }); } } - // On second pass, consume TxParts map and associate parts with corresponding Txes. - for (e, tx_parts) in tx_parts_by_tx.into_iter() { - if let Entry::Occupied(mut tx) = txes_by_tx.entry(e) { - tx.get_mut().parts = tx_parts; - } else { - bail!(ErrorKind::UnexpectedState(format!("Missing transactions datoms for tx={:?}", e))); - } - } - // Finally, consume the Tx map and a Vec of its values. Ok(txes_by_tx.into_iter().map(|(_, tx)| tx).collect()) }