From d848d954cf0d890b0771b45642f80b7450788515 Mon Sep 17 00:00:00 2001 From: Grisha Kruglov Date: Tue, 30 Jan 2018 19:32:29 -0500 Subject: [PATCH] Issue 508 - Iterating transcation processor r=rnewman Review comments --- db/src/lib.rs | 2 +- tests/tolstoy.rs | 144 ++++++++++++++++++++++++++++++ tolstoy/Cargo.toml | 6 +- tolstoy/src/errors.rs | 41 +++++++++ tolstoy/src/lib.rs | 17 +--- tolstoy/src/metadata.rs | 2 +- tolstoy/src/schema.rs | 2 +- tolstoy/src/tx_processor.rs | 169 ++++++++++++++++++++++++++++++++++++ 8 files changed, 363 insertions(+), 20 deletions(-) create mode 100644 tests/tolstoy.rs create mode 100644 tolstoy/src/errors.rs create mode 100644 tolstoy/src/tx_processor.rs diff --git a/db/src/lib.rs b/db/src/lib.rs index 7af9ed34..a5927978 100644 --- a/db/src/lib.rs +++ b/db/src/lib.rs @@ -37,7 +37,7 @@ pub mod db; mod bootstrap; pub mod debug; mod add_retract_alter_set; -mod entids; +pub mod entids; pub mod errors; mod metadata; mod schema; diff --git a/tests/tolstoy.rs b/tests/tolstoy.rs new file mode 100644 index 00000000..67179b50 --- /dev/null +++ b/tests/tolstoy.rs @@ -0,0 +1,144 @@ +// Copyright 2016 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +extern crate mentat; +extern crate mentat_core; +extern crate mentat_tolstoy; + +use std::collections::BTreeMap; + +use mentat::conn::Conn; + +use mentat::new_connection; +use mentat_tolstoy::tx_processor::{ + Processor, + TxReceiver, + TxPart, +}; +use mentat_tolstoy::errors::Result; +use mentat_core::{ + Entid, + TypedValue, + ValueType, +}; + +struct TxCountingReceiver { + pub tx_count: usize, + pub is_done: bool, +} + +impl TxCountingReceiver { + fn new() -> TxCountingReceiver { + TxCountingReceiver { + tx_count: 0, + is_done: false, + } + } +} + +impl TxReceiver for TxCountingReceiver { + fn tx(&mut self, _tx_id: Entid, _d: &mut T) -> Result<()> + where T: Iterator { + self.tx_count = self.tx_count + 1; + Ok(()) + } + + fn done(&mut self) -> Result<()> { + self.is_done = true; + Ok(()) + } +} + +#[derive(Debug)] +struct TestingReceiver { + pub txes: BTreeMap>, + pub is_done: bool, +} + +impl TestingReceiver { + fn new() -> TestingReceiver { + TestingReceiver { + txes: BTreeMap::new(), + is_done: false, + } + } +} + +impl TxReceiver for TestingReceiver { + fn tx(&mut self, tx_id: Entid, d: &mut T) -> Result<()> + where T: Iterator { + let datoms = self.txes.entry(tx_id).or_insert(vec![]); + datoms.extend(d); + Ok(()) + } + + fn done(&mut self) -> Result<()> { + self.is_done = true; + Ok(()) + } +} + +fn assert_tx_datoms_count(receiver: &TestingReceiver, tx_num: usize, expected_datoms: usize) { + let tx = receiver.txes.keys().nth(tx_num).expect("first tx"); + let datoms = receiver.txes.get(tx).expect("datoms"); + assert_eq!(expected_datoms, datoms.len()); +} + +#[test] +fn test_reader() { + let mut c = new_connection("").expect("Couldn't open conn."); + let mut conn = Conn::connect(&mut c).expect("Couldn't open DB."); + + // Don't inspect the bootstrap transaction, but we'd like to see it's there. + let mut receiver = TxCountingReceiver::new(); + assert_eq!(false, receiver.is_done); + Processor::process(&c, &mut receiver).expect("processor"); + assert_eq!(true, receiver.is_done); + assert_eq!(1, receiver.tx_count); + + let ids = conn.transact(&mut c, r#"[ + [:db/add "s" :db/ident :foo/numba] + [:db/add "s" :db/valueType :db.type/long] + [:db/add "s" :db/cardinality :db.cardinality/one] + ]"#).expect("successful transaction").tempids; + let numba_entity_id = ids.get("s").unwrap(); + + // Expect to see one more transaction of four parts (one for tx datom itself). + let mut receiver = TestingReceiver::new(); + Processor::process(&c, &mut receiver).expect("processor"); + + println!("{:#?}", receiver); + + assert_eq!(2, receiver.txes.keys().count()); + assert_tx_datoms_count(&receiver, 1, 4); + + let ids = conn.transact(&mut c, r#"[ + [:db/add "b" :foo/numba 123] + ]"#).expect("successful transaction").tempids; + let asserted_e = ids.get("b").unwrap(); + + // Expect to see a single two part transaction + let mut receiver = TestingReceiver::new(); + Processor::process(&c, &mut receiver).expect("processor"); + + assert_eq!(3, receiver.txes.keys().count()); + assert_tx_datoms_count(&receiver, 2, 2); + + // Inspect the transaction part. + let tx_id = receiver.txes.keys().nth(2).expect("tx"); + let datoms = receiver.txes.get(tx_id).expect("datoms"); + let part = &datoms[0]; + + assert_eq!(asserted_e, &part.e); + assert_eq!(numba_entity_id, &part.a); + assert!(part.v.matches_type(ValueType::Long)); + assert_eq!(TypedValue::Long(123), part.v); + assert_eq!(true, part.added); +} diff --git a/tolstoy/Cargo.toml b/tolstoy/Cargo.toml index 03f57bef..36b83016 100644 --- a/tolstoy/Cargo.toml +++ b/tolstoy/Cargo.toml @@ -16,12 +16,12 @@ uuid = { version = "0.5", features = ["v4", "serde"] } error-chain = { git = "https://github.com/rnewman/error-chain", branch = "rnewman/sync" } +[dependencies.mentat_core] +path = "../core" + [dependencies.mentat_db] path = "../db" -[dependencies.edn] -path = "../edn" - [dependencies.rusqlite] version = "0.12" # System sqlite might be very old. diff --git a/tolstoy/src/errors.rs b/tolstoy/src/errors.rs new file mode 100644 index 00000000..aea4effe --- /dev/null +++ b/tolstoy/src/errors.rs @@ -0,0 +1,41 @@ +// Copyright 2016 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +#![allow(dead_code)] + +use std; +use hyper; +use rusqlite; +use uuid; +use mentat_db; + +error_chain! { + types { + Error, ErrorKind, ResultExt, Result; + } + + foreign_links { + IOError(std::io::Error); + HttpError(hyper::Error); + SqlError(rusqlite::Error); + UuidParseError(uuid::ParseError); + } + + links { + DbError(mentat_db::Error, mentat_db::ErrorKind); + } + + errors { + UnexpectedState(t: String) { + description("encountered unexpected state") + display("encountered unexpected state: {}", t) + } + } +} diff --git a/tolstoy/src/lib.rs b/tolstoy/src/lib.rs index f7b70e18..396c233e 100644 --- a/tolstoy/src/lib.rs +++ b/tolstoy/src/lib.rs @@ -20,22 +20,11 @@ extern crate futures; extern crate serde; extern crate serde_json; extern crate mentat_db; +extern crate mentat_core; extern crate rusqlite; -extern crate edn; extern crate uuid; pub mod schema; pub mod metadata; - -error_chain! { - types { - Error, ErrorKind, ResultExt, Result; - } - - foreign_links { - IOError(std::io::Error); - HttpError(hyper::Error); - SqlError(rusqlite::Error); - UuidParseError(edn::UuidParseError); - } -} +pub mod tx_processor; +pub mod errors; diff --git a/tolstoy/src/metadata.rs b/tolstoy/src/metadata.rs index 2dc26121..b3539a0c 100644 --- a/tolstoy/src/metadata.rs +++ b/tolstoy/src/metadata.rs @@ -14,7 +14,7 @@ use rusqlite; use uuid::Uuid; use schema; -use Result; +use errors::Result; trait HeadTrackable { fn remote_head(&self) -> Result; diff --git a/tolstoy/src/schema.rs b/tolstoy/src/schema.rs index 1a58e06e..c6075b3f 100644 --- a/tolstoy/src/schema.rs +++ b/tolstoy/src/schema.rs @@ -9,7 +9,7 @@ // specific language governing permissions and limitations under the License. use rusqlite; -use Result; +use errors::Result; pub static REMOTE_HEAD_KEY: &str = r#"remote_head"#; diff --git a/tolstoy/src/tx_processor.rs b/tolstoy/src/tx_processor.rs new file mode 100644 index 00000000..5065c674 --- /dev/null +++ b/tolstoy/src/tx_processor.rs @@ -0,0 +1,169 @@ +// Copyright 2016 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. +use std::iter::Peekable; + +use rusqlite; + +use errors::{ + Result, +}; + +use mentat_db::{ + TypedSQLValue, +}; + +use mentat_core::{ + Entid, + TypedValue, +}; + +#[derive(Debug, Clone)] +pub struct TxPart { + pub e: Entid, + pub a: Entid, + pub v: TypedValue, + pub tx: Entid, + pub added: bool, +} + +#[derive(Debug, Clone)] +pub struct Tx { + pub tx: Entid, + pub tx_instant: TypedValue, +} + +pub trait TxReceiver { + fn tx(&mut self, tx_id: Entid, d: &mut T) -> Result<()> + where T: Iterator; + fn done(&mut self) -> Result<()>; +} + +pub struct Processor {} + +pub struct DatomsIterator<'conn, 't, T> +where T: Sized + Iterator> + 't { + at_first: bool, + at_last: bool, + first: &'conn TxPart, + rows: &'t mut Peekable, +} + +impl<'conn, 't, T> DatomsIterator<'conn, 't, T> +where T: Sized + Iterator> + 't { + fn new(first: &'conn TxPart, rows: &'t mut Peekable) -> DatomsIterator<'conn, 't, T> + { + DatomsIterator { + at_first: true, + at_last: false, + first: first, + rows: rows, + } + } +} + +impl<'conn, 't, T> Iterator for DatomsIterator<'conn, 't, T> +where T: Sized + Iterator> + 't { + type Item = TxPart; + + fn next(&mut self) -> Option { + if self.at_last { + return None; + } + + if self.at_first { + self.at_first = false; + return Some(self.first.clone()); + } + + // Look ahead to see if we're about to cross into + // the next partition. + { + let next_option = self.rows.peek(); + match next_option { + Some(&Ok(ref next)) => { + if next.tx != self.first.tx { + self.at_last = true; + return None; + } + }, + // Empty, or error. Either way, this iterator's done. + _ => { + self.at_last = true; + return None; + } + } + } + + // We're in the correct partition, return a TxPart. + if let Some(result) = self.rows.next() { + match result { + Err(_) => None, + Ok(datom) => { + Some(TxPart { + e: datom.e, + a: datom.a, + v: datom.v.clone(), + tx: datom.tx, + added: datom.added, + }) + }, + } + } else { + self.at_last = true; + None + } + } +} + +fn to_tx_part(row: &rusqlite::Row) -> Result { + Ok(TxPart { + e: row.get(0), + a: row.get(1), + v: TypedValue::from_sql_value_pair(row.get(2), row.get(3))?, + tx: row.get(4), + added: row.get(5), + }) +} + +impl Processor { + pub fn process(sqlite: &rusqlite::Connection, receiver: &mut R) -> Result<()> + where R: TxReceiver { + let mut stmt = sqlite.prepare( + "SELECT e, a, v, value_type_tag, tx, added FROM transactions ORDER BY tx" + )?; + + let mut rows = stmt.query_and_then(&[], to_tx_part)?.peekable(); + let mut current_tx = None; + while let Some(row) = rows.next() { + let datom = row?; + + match current_tx { + Some(tx) => { + if tx != datom.tx { + current_tx = Some(datom.tx); + receiver.tx( + datom.tx, + &mut DatomsIterator::new(&datom, &mut rows) + )?; + } + }, + None => { + current_tx = Some(datom.tx); + receiver.tx( + datom.tx, + &mut DatomsIterator::new(&datom, &mut rows) + )?; + } + } + } + receiver.done()?; + Ok(()) + } +}