Iterator-based tx reader sketch

This commit is contained in:
Grisha Kruglov 2018-02-03 16:42:04 -05:00
parent 94ed876cfa
commit 0f957f242b
4 changed files with 66 additions and 51 deletions

View file

@ -17,7 +17,7 @@ use mentat::conn::Conn;
use mentat::new_connection; use mentat::new_connection;
use mentat_tolstoy::tx_client::{ use mentat_tolstoy::tx_client::{
TxReader, TxReader,
TxClient TxClient,
}; };
use mentat_core::{ use mentat_core::{
ValueType, ValueType,

View file

@ -12,6 +12,7 @@ serde = "1.0"
serde_json = "1.0" serde_json = "1.0"
serde_derive = "1.0" serde_derive = "1.0"
lazy_static = "0.2" lazy_static = "0.2"
itertools = "0.6.5"
uuid = { version = "0.5", features = ["v4", "serde"] } uuid = { version = "0.5", features = ["v4", "serde"] }
error-chain = { git = "https://github.com/rnewman/error-chain", branch = "rnewman/sync" } error-chain = { git = "https://github.com/rnewman/error-chain", branch = "rnewman/sync" }

View file

@ -23,6 +23,7 @@ extern crate mentat_db;
extern crate mentat_core; extern crate mentat_core;
extern crate rusqlite; extern crate rusqlite;
extern crate uuid; extern crate uuid;
extern crate itertools;
pub mod schema; pub mod schema;
pub mod metadata; pub mod metadata;

View file

@ -8,8 +8,8 @@
// CONDITIONS OF ANY KIND, either express or implied. See the License for the // CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License. // specific language governing permissions and limitations under the License.
use std::collections::BTreeMap; // use itertools::structs::Batching;
use std::collections::btree_map::Entry; use itertools::Itertools;
use rusqlite; use rusqlite;
@ -27,7 +27,7 @@ use mentat_core::{
Entid, Entid,
}; };
#[derive(Debug)] #[derive(Debug, Clone)]
pub struct TxPart { pub struct TxPart {
pub e: Entid, pub e: Entid,
pub a: Entid, pub a: Entid,
@ -49,7 +49,7 @@ pub struct TxPart {
// - A structure that doesn't require pulling the entire tx into memory. This might be a cursor, // - A structure that doesn't require pulling the entire tx into memory. This might be a cursor,
// a rowid range, or something else that's scoped to the lifetime of a particular database transaction, // a rowid range, or something else that's scoped to the lifetime of a particular database transaction,
// in order to preserve isolation. // in order to preserve isolation.
#[derive(Debug)] #[derive(Debug, Clone)]
pub struct Tx { pub struct Tx {
pub tx: Entid, pub tx: Entid,
pub tx_instant: TypedValue, pub tx_instant: TypedValue,
@ -64,6 +64,8 @@ struct RawDatom {
added: bool, added: bool,
} }
// type TxIter<I, F> = Batching<I, F>;
pub trait TxReader { pub trait TxReader {
fn all(sqlite: &rusqlite::Connection) -> Result<Vec<Tx>>; fn all(sqlite: &rusqlite::Connection) -> Result<Vec<Tx>>;
} }
@ -71,11 +73,14 @@ pub trait TxReader {
pub struct TxClient {} pub struct TxClient {}
impl TxReader for TxClient { impl TxReader for TxClient {
// TODO what should a type signature look like for this to return the
// batching iterator?
fn all(sqlite: &rusqlite::Connection) -> Result<Vec<Tx>> { fn all(sqlite: &rusqlite::Connection) -> Result<Vec<Tx>> {
let mut stmt = sqlite.prepare( let mut stmt = sqlite.prepare(
"SELECT e, a, v, tx, added, value_type_tag FROM transactions" "SELECT e, a, v, tx, added, value_type_tag FROM transactions ORDER BY tx"
)?; )?;
let datoms: Vec<Result<RawDatom>> = stmt.query_and_then(&[], |row| -> Result<RawDatom> {
let row_iterator = stmt.query_and_then(&[], |row| -> Result<RawDatom> {
Ok(RawDatom { Ok(RawDatom {
e: row.get(0), e: row.get(0),
a: row.get(1), a: row.get(1),
@ -83,54 +88,62 @@ impl TxReader for TxClient {
tx: row.get(3), tx: row.get(3),
added: row.get(4), added: row.get(4),
}) })
})?.collect(); })?;
// It's convenient to have a consistently ordered set of results, let txes_iterator = row_iterator.batching(|rows| {
// so we use a sorting map. let mut has_next_tx = false;
let mut txes_by_tx: BTreeMap<Entid, Tx> = BTreeMap::new(); let mut next_tx: Option<Box<Tx>> = None;
// On first pass, build our Txes and TxParts for each. // Our rows are partitioned into batches that represent transactions,
for datom_result in datoms { // thanks to ORDER BY clause above. We come up with a transaction by
let datom = datom_result?; // iterating through rows until we have its full representation.
// TODO place limits to safeguard against bad data?
// Datom represents a transaction. loop {
if datom.a == entids::DB_TX_INSTANT && datom.tx == datom.e { if let Some(datom) = rows.next() {
// Does the Tx already exist in the map? That means we've inserted it let datom = match datom {
// with an incomplete tx_instant; update it. Ok(d) => d,
if let Entry::Occupied(mut tx) = txes_by_tx.entry(datom.tx) { Err(_) => break // TODO propagate error from from_sql_value_pair above
tx.get_mut().tx_instant = datom.v; };
continue; let part = TxPart {
e: datom.e,
a: datom.a,
v: datom.v.clone(),
added: datom.added,
};
if !has_next_tx {
next_tx = Some(Box::new(Tx {
tx: datom.tx,
tx_instant: datom.v.clone(),
parts: vec![part],
}));
has_next_tx = true;
} else {
// Datom represents a transaction, we're done with this chunk of rows.
if datom.a == entids::DB_TX_INSTANT && datom.tx == datom.e {
match next_tx {
Some(ref mut t) => {t.tx_instant = part.v;},
None => break // TODO bad state
}
break;
// Datom represents a transaction part - take a note of it, continue iterating.
} else {
match next_tx {
Some(ref mut t) => {t.parts.push(part);},
None => break // TODO bad state
}
}
}
} else {
break;
} }
// Otherwise, insert brand new Tx into our map.
txes_by_tx.insert(datom.tx, Tx {
tx: datom.tx,
tx_instant: datom.v,
parts: Vec::new(),
});
// Datom represents a transaction part.
} else {
let part = TxPart {
e: datom.e,
a: datom.a,
v: datom.v,
added: datom.added,
};
// Does the Tx for this part already exist in the map?
// Append this part to the parts list.
if let Entry::Occupied(mut tx) = txes_by_tx.entry(datom.tx) {
tx.get_mut().parts.push(part);
continue;
}
// Otherwise, insert the Tx with the current part in its parts list.
txes_by_tx.insert(datom.tx, Tx {
tx: datom.tx,
tx_instant: TypedValue::Long(0), // to be set as we iterate
parts: vec![part],
});
} }
}
// Finally, consume the Tx map into a Vec of its values. // TODO due to TODOs above, this is ambiguous:
Ok(txes_by_tx.into_iter().map(|(_, tx)| tx).collect()) // either there's no transaction, or something went wrong!
next_tx
}).map(|t| *t);
// TODO just return the iterator...
Ok(txes_iterator.collect())
} }
} }