diff --git a/tests/tolstoy.rs b/tests/tolstoy.rs index f73c3621..2efbe542 100644 --- a/tests/tolstoy.rs +++ b/tests/tolstoy.rs @@ -1,3 +1,13 @@ +// Copyright 2016 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + extern crate mentat; extern crate mentat_core; extern crate mentat_tolstoy; @@ -6,7 +16,6 @@ use mentat::conn::Conn; use mentat::new_connection; use mentat_tolstoy::tx_client::{ - Tx, TxReader, TxClient }; @@ -60,7 +69,7 @@ fn test_reader() { assert_eq!(numba_entity_id, &part.a); assert!(part.v.matches_type(ValueType::Long)); assert_eq!(TypedValue::Long(123), part.v); - assert_eq!(1, part.added); + assert_eq!(true, part.added); // TODO retractions } diff --git a/tolstoy/Cargo.toml b/tolstoy/Cargo.toml index 827bba94..36b83016 100644 --- a/tolstoy/Cargo.toml +++ b/tolstoy/Cargo.toml @@ -22,9 +22,6 @@ path = "../core" [dependencies.mentat_db] path = "../db" -[dependencies.edn] -path = "../edn" - [dependencies.rusqlite] version = "0.12" # System sqlite might be very old. diff --git a/tolstoy/src/errors.rs b/tolstoy/src/errors.rs index 850a8638..aea4effe 100644 --- a/tolstoy/src/errors.rs +++ b/tolstoy/src/errors.rs @@ -13,7 +13,7 @@ use std; use hyper; use rusqlite; -use edn; +use uuid; use mentat_db; error_chain! { @@ -25,7 +25,7 @@ error_chain! { IOError(std::io::Error); HttpError(hyper::Error); SqlError(rusqlite::Error); - UuidParseError(edn::UuidParseError); + UuidParseError(uuid::ParseError); } links { diff --git a/tolstoy/src/lib.rs b/tolstoy/src/lib.rs index a9746d59..05896d64 100644 --- a/tolstoy/src/lib.rs +++ b/tolstoy/src/lib.rs @@ -22,7 +22,6 @@ extern crate serde_json; extern crate mentat_db; extern crate mentat_core; extern crate rusqlite; -extern crate edn; extern crate uuid; pub mod schema; diff --git a/tolstoy/src/tx_client.rs b/tolstoy/src/tx_client.rs index aa91871d..66d2d059 100644 --- a/tolstoy/src/tx_client.rs +++ b/tolstoy/src/tx_client.rs @@ -10,48 +10,61 @@ use std::collections::BTreeMap; use std::collections::btree_map::Entry; +use std::collections::HashMap; use rusqlite; use errors::{ Result, - ErrorKind -}; - -use mentat_db::types::{ - Entid + ErrorKind, }; use mentat_db::{ entids, - TypedSQLValue + TypedSQLValue, }; use mentat_core::{ - TypedValue + TypedValue, + Entid, }; #[derive(Debug)] pub struct TxPart { pub e: Entid, - pub a: i64, + pub a: Entid, pub v: TypedValue, - pub added: i32 + pub added: bool, } +// Notes on 'parts' representation: +// Currently it's suitable for uses which necessitate pulling in the entire tx into memory, +// and don't require efficient querying/monitoring by attributes of parts. +// +// Example: streaming transactions to/from the server. +// +// In the future, consider: +// - A structure that makes typical tx-listener questions — "does this transaction mention +// an attribute or entity I care about?" — efficient. That might be a trie, it might be a +// bunch of extra data structures (e.g., a set of mentioned attributes), or something else. +// With an unsorted Vec, looking for a mentioned attribute requires linear search of the entire vector. +// - A structure that doesn't require pulling the entire tx into memory. This might be a cursor, +// a rowid range, or something else that's scoped to the lifetime of a particular database transaction, +// in order to preserve isolation. #[derive(Debug)] pub struct Tx { pub tx: Entid, pub tx_instant: TypedValue, - pub parts: Vec + pub parts: Vec, } -struct RawTx { +struct RawDatom { e: Entid, - a: i64, - v: TypedValue, + a: Entid, + v: TypedValue, // composite of 'v' and 'value_type_tag' tx: Entid, - added: i32 + added: bool, + is_transaction: bool } pub trait TxReader { @@ -62,52 +75,61 @@ pub struct TxClient {} impl TxReader for TxClient { fn all(sqlite: &rusqlite::Connection) -> Result> { - // Make sure a=txInstant rows are first, so that we process - // all transactions before we process any transaction parts. let mut stmt = sqlite.prepare( "SELECT e, a, v, tx, added, value_type_tag, CASE a WHEN :txInstant THEN 1 ELSE 0 END is_transaction - FROM transactions ORDER BY is_transaction DESC, tx ASC" + FROM transactions" )?; - let rows: Vec> = stmt.query_and_then_named(&[(":txInstant", &entids::DB_TX_INSTANT)], |row| -> Result { - Ok(RawTx { + let datoms: Vec> = stmt.query_and_then_named(&[(":txInstant", &entids::DB_TX_INSTANT)], |row| -> Result { + Ok(RawDatom { e: row.get(0), a: row.get(1), v: TypedValue::from_sql_value_pair(row.get(2), row.get(5))?, tx: row.get(3), - added: row.get(4) + added: row.get(4), + is_transaction: row.get(6), }) })?.collect(); // It's convenient to have a consistently ordered set of results, // so we use a sorting map. let mut txes_by_tx = BTreeMap::new(); - for row_result in rows { - let row = row_result?; - // Row represents a transaction. - if row.a == entids::DB_TX_INSTANT { - txes_by_tx.insert(row.tx, Tx { - tx: row.tx, - tx_instant: row.v, - parts: Vec::new() + let mut tx_parts_by_tx = HashMap::new(); + + // On first pass, build our Txes and TxParts for each. + for datom_result in datoms { + let datom = datom_result?; + // Datom represents a transaction. + if datom.is_transaction { + txes_by_tx.insert(datom.tx, Tx { + tx: datom.tx, + tx_instant: datom.v, + parts: Vec::new(), }); - // Row represents part of a transaction. Our query statement above guarantees + // Datom represents part of a transaction. Our query statement above guarantees // that we've already processed corresponding transaction at this point. } else { - if let Entry::Occupied(mut t) = txes_by_tx.entry(row.tx) { - t.get_mut().parts.push(TxPart { - e: row.e, - a: row.a, - v: row.v, - added: row.added - }); - } else { - bail!(ErrorKind::UnexpectedState(format!("Encountered transaction part before transaction {:?}", row.tx))) - } + let parts = tx_parts_by_tx.entry(datom.tx).or_insert(Vec::new()); + parts.push(TxPart { + e: datom.e, + a: datom.a, + v: datom.v, + added: datom.added, + }); } } + // On second pass, consume TxParts map and associate parts with corresponding Txes. + for (e, tx_parts) in tx_parts_by_tx.into_iter() { + if let Entry::Occupied(mut tx) = txes_by_tx.entry(e) { + tx.get_mut().parts = tx_parts; + } else { + bail!(ErrorKind::UnexpectedState(format!("Missing transactions datoms for tx={:?}", e))); + } + } + + // Finally, consume the Tx map and a Vec of its values. Ok(txes_by_tx.into_iter().map(|(_, tx)| tx).collect()) } }