Compare commits

...

13 commits

Author SHA1 Message Date
Richard Newman
877c683089 And some more. 2018-02-05 17:17:30 -08:00
Richard Newman
44168680f5 Make things work. 2018-02-05 17:11:19 -08:00
Grisha Kruglov
53248c5840 broken peekable 2018-02-05 19:46:03 -05:00
Grisha Kruglov
deb6ba29a5 iterating process first pass with tests 2018-02-05 17:45:56 -05:00
Grisha Kruglov
0f957f242b Iterator-based tx reader sketch 2018-02-03 16:42:04 -05:00
Grisha Kruglov
94ed876cfa rebased 2018-02-01 19:52:36 -05:00
Grisha Kruglov
0d5e39a26d Tighter tx check; comments; failing test in CI 2018-02-01 19:17:12 -05:00
Grisha Kruglov
8ae1ddf03e Single pass 2018-02-01 19:17:12 -05:00
Grisha Kruglov
14e28de2f5 Feedback pass 1 2018-02-01 19:17:12 -05:00
Grisha Kruglov
5c2c29bf26 working with tests 2018-02-01 19:17:12 -05:00
Grisha Kruglov
4f243fbc32 third pass, compiles and should work 2018-02-01 19:17:12 -05:00
Grisha Kruglov
96ea7dc86f second pass, broken 2018-02-01 19:17:12 -05:00
Grisha Kruglov
b0c6399a38 first pass on transaction read client 2018-02-01 19:17:12 -05:00
8 changed files with 370 additions and 20 deletions

View file

@ -37,7 +37,7 @@ pub mod db;
mod bootstrap; mod bootstrap;
pub mod debug; pub mod debug;
mod add_retract_alter_set; mod add_retract_alter_set;
mod entids; pub mod entids;
pub mod errors; pub mod errors;
mod metadata; mod metadata;
mod schema; mod schema;

147
tests/tolstoy.rs Normal file
View file

@ -0,0 +1,147 @@
// Copyright 2016 Mozilla
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// this file except in compliance with the License. You may obtain a copy of the
// License at http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
extern crate mentat;
extern crate mentat_core;
extern crate mentat_tolstoy;
use std::collections::BTreeMap;
use mentat::conn::Conn;
use mentat::new_connection;
use mentat_tolstoy::tx_processor::{
TxReceiver,
DatomsIterator,
Processor,
TxPart,
};
use mentat_tolstoy::errors::Result;
use mentat_core::{
ValueType,
TypedValue,
Entid,
};
struct TxCountingReceiver {
pub tx_count: u32,
pub is_done: bool,
}
impl TxCountingReceiver {
fn new() -> TxCountingReceiver {
TxCountingReceiver {
tx_count: 0,
is_done: false
}
}
}
impl TxReceiver for TxCountingReceiver {
fn tx<T>(&mut self, tx_id: Entid, d: &mut T) -> Result<()>
where T: Iterator<Item=TxPart> {
self.tx_count = self.tx_count + 1;
Ok(())
}
fn done(&mut self) -> Result<()> {
self.is_done = true;
Ok(())
}
}
#[derive(Debug)]
struct TestingReceiver {
pub txes: BTreeMap<Entid, Vec<TxPart>>,
pub is_done: bool,
}
impl TestingReceiver {
fn new() -> TestingReceiver {
TestingReceiver {
txes: BTreeMap::new(),
is_done: false,
}
}
}
impl TxReceiver for TestingReceiver {
fn tx<T>(&mut self, tx_id: Entid, d: &mut T) -> Result<()>
where T: Iterator<Item=TxPart> {
let datoms = self.txes.entry(tx_id).or_insert(vec![]);
for datom in d {
datoms.push(datom);
}
Ok(())
}
fn done(&mut self) -> Result<()> {
self.is_done = true;
Ok(())
}
}
fn assert_tx_datoms_count(receiver: &TestingReceiver, tx_num: usize, expected_datoms: usize) {
let tx = receiver.txes.keys().nth(tx_num).expect("first tx");
let datoms = receiver.txes.get(tx).expect("datoms");
assert_eq!(expected_datoms, datoms.len());
}
#[test]
fn test_reader() {
let mut c = new_connection("").expect("Couldn't open conn.");
let mut conn = Conn::connect(&mut c).expect("Couldn't open DB.");
// Don't inspect the bootstrap transaction, but we'd like to see it's there.
let mut receiver = TxCountingReceiver::new();
assert_eq!(false, receiver.is_done);
Processor::process(&c, &mut receiver).expect("processor");
assert_eq!(true, receiver.is_done);
assert_eq!(1, receiver.tx_count);
let ids = conn.transact(&mut c, r#"[
[:db/add "s" :db/ident :foo/numba]
[:db/add "s" :db/valueType :db.type/long]
[:db/add "s" :db/cardinality :db.cardinality/one]
]"#).expect("successful transaction").tempids;
let numba_entity_id = ids.get("s").unwrap();
// Expect to see one more transaction of four parts (one for tx datom itself).
let mut receiver = TestingReceiver::new();
Processor::process(&c, &mut receiver).expect("processor");
println!("{:#?}", receiver);
assert_eq!(2, receiver.txes.keys().count());
assert_tx_datoms_count(&receiver, 1, 4);
let ids = conn.transact(&mut c, r#"[
[:db/add "b" :foo/numba 123]
]"#).expect("successful transaction").tempids;
let asserted_e = ids.get("b").unwrap();
// Expect to see a single two part transaction
let mut receiver = TestingReceiver::new();
Processor::process(&c, &mut receiver).expect("processor");
assert_eq!(3, receiver.txes.keys().count());
assert_tx_datoms_count(&receiver, 2, 2);
// Inspect the transaction part.
let tx_id = receiver.txes.keys().nth(2).expect("tx");
let datoms = receiver.txes.get(tx_id).expect("datoms");
let part = &datoms[0];
assert_eq!(asserted_e, &part.e);
assert_eq!(numba_entity_id, &part.a);
assert!(part.v.matches_type(ValueType::Long));
assert_eq!(TypedValue::Long(123), part.v);
assert_eq!(true, part.added);
}

View file

@ -12,16 +12,17 @@ serde = "1.0"
serde_json = "1.0" serde_json = "1.0"
serde_derive = "1.0" serde_derive = "1.0"
lazy_static = "0.2" lazy_static = "0.2"
itertools = "0.6.5"
uuid = { version = "0.5", features = ["v4", "serde"] } uuid = { version = "0.5", features = ["v4", "serde"] }
error-chain = { git = "https://github.com/rnewman/error-chain", branch = "rnewman/sync" } error-chain = { git = "https://github.com/rnewman/error-chain", branch = "rnewman/sync" }
[dependencies.mentat_core]
path = "../core"
[dependencies.mentat_db] [dependencies.mentat_db]
path = "../db" path = "../db"
[dependencies.edn]
path = "../edn"
[dependencies.rusqlite] [dependencies.rusqlite]
version = "0.12" version = "0.12"
# System sqlite might be very old. # System sqlite might be very old.

41
tolstoy/src/errors.rs Normal file
View file

@ -0,0 +1,41 @@
// Copyright 2016 Mozilla
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// this file except in compliance with the License. You may obtain a copy of the
// License at http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
#![allow(dead_code)]
use std;
use hyper;
use rusqlite;
use uuid;
use mentat_db;
error_chain! {
types {
Error, ErrorKind, ResultExt, Result;
}
foreign_links {
IOError(std::io::Error);
HttpError(hyper::Error);
SqlError(rusqlite::Error);
UuidParseError(uuid::ParseError);
}
links {
DbError(mentat_db::Error, mentat_db::ErrorKind);
}
errors {
UnexpectedState(t: String) {
description("encountered unexpected state")
display("encountered unexpected state: {}", t)
}
}
}

View file

@ -20,22 +20,12 @@ extern crate futures;
extern crate serde; extern crate serde;
extern crate serde_json; extern crate serde_json;
extern crate mentat_db; extern crate mentat_db;
extern crate mentat_core;
extern crate rusqlite; extern crate rusqlite;
extern crate edn;
extern crate uuid; extern crate uuid;
extern crate itertools;
pub mod schema; pub mod schema;
pub mod metadata; pub mod metadata;
pub mod tx_processor;
error_chain! { pub mod errors;
types {
Error, ErrorKind, ResultExt, Result;
}
foreign_links {
IOError(std::io::Error);
HttpError(hyper::Error);
SqlError(rusqlite::Error);
UuidParseError(edn::UuidParseError);
}
}

View file

@ -14,7 +14,7 @@ use rusqlite;
use uuid::Uuid; use uuid::Uuid;
use schema; use schema;
use Result; use errors::Result;
trait HeadTrackable { trait HeadTrackable {
fn remote_head(&self) -> Result<Uuid>; fn remote_head(&self) -> Result<Uuid>;

View file

@ -9,7 +9,7 @@
// specific language governing permissions and limitations under the License. // specific language governing permissions and limitations under the License.
use rusqlite; use rusqlite;
use Result; use errors::Result;
pub static REMOTE_HEAD_KEY: &str = r#"remote_head"#; pub static REMOTE_HEAD_KEY: &str = r#"remote_head"#;

171
tolstoy/src/tx_processor.rs Normal file
View file

@ -0,0 +1,171 @@
// // Copyright 2016 Mozilla
// //
// // Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// // this file except in compliance with the License. You may obtain a copy of the
// // License at http://www.apache.org/licenses/LICENSE-2.0
// // Unless required by applicable law or agreed to in writing, software distributed
// // under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// // CONDITIONS OF ANY KIND, either express or implied. See the License for the
// // specific language governing permissions and limitations under the License.
extern crate core;
use self::core::iter::Peekable;
use rusqlite;
use errors::{
Result,
};
use mentat_db::{
TypedSQLValue,
};
use mentat_core::{
TypedValue,
Entid,
};
#[derive(Debug, Clone)]
pub struct TxPart {
pub e: Entid,
pub a: Entid,
pub v: TypedValue,
pub tx: Entid,
pub added: bool,
}
#[derive(Debug, Clone)]
pub struct Tx {
pub tx: Entid,
pub tx_instant: TypedValue,
}
pub trait TxReceiver {
fn tx<T>(&mut self, tx_id: Entid, d: &mut T) -> Result<()>
where T: Iterator<Item=TxPart>;
fn done(&mut self) -> Result<()>;
}
pub struct Processor {}
pub struct DatomsIterator<'conn, 't, T>
where T: Sized + Iterator<Item=Result<TxPart>> + 't {
at_first: bool,
at_last: bool,
first: &'conn TxPart,
rows: &'t mut Peekable<T>,
}
impl<'conn, 't, T> DatomsIterator<'conn, 't, T>
where T: Sized + Iterator<Item=Result<TxPart>> + 't {
fn new(first: &'conn TxPart, rows: &'t mut Peekable<T>) -> DatomsIterator<'conn, 't, T>
{
DatomsIterator {
at_first: true,
at_last: false,
first: first,
rows: rows,
}
}
}
impl<'conn, 't, T> Iterator for DatomsIterator<'conn, 't, T>
where T: Sized + Iterator<Item=Result<TxPart>> + 't {
type Item = TxPart;
fn next(&mut self) -> Option<Self::Item> {
if self.at_last {
return None;
}
if self.at_first {
self.at_first = false;
return Some(self.first.clone());
}
// Look ahead to see if we're about to cross into
// the next partition.
{
let next_option = self.rows.peek();
match next_option {
Some(&Ok(ref next)) => {
if next.tx != self.first.tx {
self.at_last = true;
return None;
}
},
_ => ()
}
}
// We're in the correct partition.
if let Some(result) = self.rows.next() {
match result {
Err(_) => None,
Ok(datom) => {
Some(TxPart {
e: datom.e,
a: datom.a,
v: datom.v.clone(),
tx: datom.tx,
added: datom.added,
})
},
}
} else {
self.at_last = true;
None
}
}
}
fn to_tx_part(row: &::rusqlite::Row) -> Result<TxPart> {
Ok(TxPart {
e: row.get(0),
a: row.get(1),
v: TypedValue::from_sql_value_pair(row.get(2), row.get(5))?,
tx: row.get(3),
added: row.get(4),
})
}
impl Processor {
pub fn process<R>(sqlite: &rusqlite::Connection, receiver: &mut R) -> Result<()>
where R: TxReceiver {
let mut stmt = sqlite.prepare(
"SELECT e, a, v, tx, added, value_type_tag FROM transactions ORDER BY tx"
)?;
let mut rows = stmt.query_and_then(&[], to_tx_part)?.peekable();
let mut current_tx = None;
while let Some(row) = rows.next() {
let datom = row?;
// if current_tx == Some(row.tx)
match current_tx {
Some(tx) => {
if tx == datom.tx {
continue;
} else {
current_tx = Some(datom.tx);
receiver.tx(
datom.tx,
&mut DatomsIterator::new(&datom, &mut rows)
)?;
continue;
}
},
None => {
current_tx = Some(datom.tx);
receiver.tx(
datom.tx,
&mut DatomsIterator::new(&datom, &mut rows)
)?;
}
}
}
receiver.done()?;
Ok(())
}
}