Compare commits
1 commit
master
...
grisha/tol
Author | SHA1 | Date | |
---|---|---|---|
|
b3a7dabd58 |
8 changed files with 363 additions and 20 deletions
|
@ -37,7 +37,7 @@ pub mod db;
|
||||||
mod bootstrap;
|
mod bootstrap;
|
||||||
pub mod debug;
|
pub mod debug;
|
||||||
mod add_retract_alter_set;
|
mod add_retract_alter_set;
|
||||||
mod entids;
|
pub mod entids;
|
||||||
pub mod errors;
|
pub mod errors;
|
||||||
mod metadata;
|
mod metadata;
|
||||||
mod schema;
|
mod schema;
|
||||||
|
|
144
tests/tolstoy.rs
Normal file
144
tests/tolstoy.rs
Normal file
|
@ -0,0 +1,144 @@
|
||||||
|
// Copyright 2016 Mozilla
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
|
||||||
|
// this file except in compliance with the License. You may obtain a copy of the
|
||||||
|
// License at http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
// Unless required by applicable law or agreed to in writing, software distributed
|
||||||
|
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
|
||||||
|
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
|
||||||
|
// specific language governing permissions and limitations under the License.
|
||||||
|
|
||||||
|
extern crate mentat;
|
||||||
|
extern crate mentat_core;
|
||||||
|
extern crate mentat_tolstoy;
|
||||||
|
|
||||||
|
use std::collections::BTreeMap;
|
||||||
|
|
||||||
|
use mentat::conn::Conn;
|
||||||
|
|
||||||
|
use mentat::new_connection;
|
||||||
|
use mentat_tolstoy::tx_processor::{
|
||||||
|
Processor,
|
||||||
|
TxReceiver,
|
||||||
|
TxPart,
|
||||||
|
};
|
||||||
|
use mentat_tolstoy::errors::Result;
|
||||||
|
use mentat_core::{
|
||||||
|
Entid,
|
||||||
|
TypedValue,
|
||||||
|
ValueType,
|
||||||
|
};
|
||||||
|
|
||||||
|
struct TxCountingReceiver {
|
||||||
|
pub tx_count: usize,
|
||||||
|
pub is_done: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TxCountingReceiver {
|
||||||
|
fn new() -> TxCountingReceiver {
|
||||||
|
TxCountingReceiver {
|
||||||
|
tx_count: 0,
|
||||||
|
is_done: false,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TxReceiver for TxCountingReceiver {
|
||||||
|
fn tx<T>(&mut self, _tx_id: Entid, _d: &mut T) -> Result<()>
|
||||||
|
where T: Iterator<Item=TxPart> {
|
||||||
|
self.tx_count = self.tx_count + 1;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn done(&mut self) -> Result<()> {
|
||||||
|
self.is_done = true;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
struct TestingReceiver {
|
||||||
|
pub txes: BTreeMap<Entid, Vec<TxPart>>,
|
||||||
|
pub is_done: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TestingReceiver {
|
||||||
|
fn new() -> TestingReceiver {
|
||||||
|
TestingReceiver {
|
||||||
|
txes: BTreeMap::new(),
|
||||||
|
is_done: false,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TxReceiver for TestingReceiver {
|
||||||
|
fn tx<T>(&mut self, tx_id: Entid, d: &mut T) -> Result<()>
|
||||||
|
where T: Iterator<Item=TxPart> {
|
||||||
|
let datoms = self.txes.entry(tx_id).or_insert(vec![]);
|
||||||
|
datoms.extend(d);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn done(&mut self) -> Result<()> {
|
||||||
|
self.is_done = true;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn assert_tx_datoms_count(receiver: &TestingReceiver, tx_num: usize, expected_datoms: usize) {
|
||||||
|
let tx = receiver.txes.keys().nth(tx_num).expect("first tx");
|
||||||
|
let datoms = receiver.txes.get(tx).expect("datoms");
|
||||||
|
assert_eq!(expected_datoms, datoms.len());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_reader() {
|
||||||
|
let mut c = new_connection("").expect("Couldn't open conn.");
|
||||||
|
let mut conn = Conn::connect(&mut c).expect("Couldn't open DB.");
|
||||||
|
|
||||||
|
// Don't inspect the bootstrap transaction, but we'd like to see it's there.
|
||||||
|
let mut receiver = TxCountingReceiver::new();
|
||||||
|
assert_eq!(false, receiver.is_done);
|
||||||
|
Processor::process(&c, &mut receiver).expect("processor");
|
||||||
|
assert_eq!(true, receiver.is_done);
|
||||||
|
assert_eq!(1, receiver.tx_count);
|
||||||
|
|
||||||
|
let ids = conn.transact(&mut c, r#"[
|
||||||
|
[:db/add "s" :db/ident :foo/numba]
|
||||||
|
[:db/add "s" :db/valueType :db.type/long]
|
||||||
|
[:db/add "s" :db/cardinality :db.cardinality/one]
|
||||||
|
]"#).expect("successful transaction").tempids;
|
||||||
|
let numba_entity_id = ids.get("s").unwrap();
|
||||||
|
|
||||||
|
// Expect to see one more transaction of four parts (one for tx datom itself).
|
||||||
|
let mut receiver = TestingReceiver::new();
|
||||||
|
Processor::process(&c, &mut receiver).expect("processor");
|
||||||
|
|
||||||
|
println!("{:#?}", receiver);
|
||||||
|
|
||||||
|
assert_eq!(2, receiver.txes.keys().count());
|
||||||
|
assert_tx_datoms_count(&receiver, 1, 4);
|
||||||
|
|
||||||
|
let ids = conn.transact(&mut c, r#"[
|
||||||
|
[:db/add "b" :foo/numba 123]
|
||||||
|
]"#).expect("successful transaction").tempids;
|
||||||
|
let asserted_e = ids.get("b").unwrap();
|
||||||
|
|
||||||
|
// Expect to see a single two part transaction
|
||||||
|
let mut receiver = TestingReceiver::new();
|
||||||
|
Processor::process(&c, &mut receiver).expect("processor");
|
||||||
|
|
||||||
|
assert_eq!(3, receiver.txes.keys().count());
|
||||||
|
assert_tx_datoms_count(&receiver, 2, 2);
|
||||||
|
|
||||||
|
// Inspect the transaction part.
|
||||||
|
let tx_id = receiver.txes.keys().nth(2).expect("tx");
|
||||||
|
let datoms = receiver.txes.get(tx_id).expect("datoms");
|
||||||
|
let part = &datoms[0];
|
||||||
|
|
||||||
|
assert_eq!(asserted_e, &part.e);
|
||||||
|
assert_eq!(numba_entity_id, &part.a);
|
||||||
|
assert!(part.v.matches_type(ValueType::Long));
|
||||||
|
assert_eq!(TypedValue::Long(123), part.v);
|
||||||
|
assert_eq!(true, part.added);
|
||||||
|
}
|
|
@ -16,12 +16,12 @@ uuid = { version = "0.5", features = ["v4", "serde"] }
|
||||||
|
|
||||||
error-chain = { git = "https://github.com/rnewman/error-chain", branch = "rnewman/sync" }
|
error-chain = { git = "https://github.com/rnewman/error-chain", branch = "rnewman/sync" }
|
||||||
|
|
||||||
|
[dependencies.mentat_core]
|
||||||
|
path = "../core"
|
||||||
|
|
||||||
[dependencies.mentat_db]
|
[dependencies.mentat_db]
|
||||||
path = "../db"
|
path = "../db"
|
||||||
|
|
||||||
[dependencies.edn]
|
|
||||||
path = "../edn"
|
|
||||||
|
|
||||||
[dependencies.rusqlite]
|
[dependencies.rusqlite]
|
||||||
version = "0.12"
|
version = "0.12"
|
||||||
# System sqlite might be very old.
|
# System sqlite might be very old.
|
||||||
|
|
41
tolstoy/src/errors.rs
Normal file
41
tolstoy/src/errors.rs
Normal file
|
@ -0,0 +1,41 @@
|
||||||
|
// Copyright 2016 Mozilla
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
|
||||||
|
// this file except in compliance with the License. You may obtain a copy of the
|
||||||
|
// License at http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
// Unless required by applicable law or agreed to in writing, software distributed
|
||||||
|
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
|
||||||
|
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
|
||||||
|
// specific language governing permissions and limitations under the License.
|
||||||
|
|
||||||
|
#![allow(dead_code)]
|
||||||
|
|
||||||
|
use std;
|
||||||
|
use hyper;
|
||||||
|
use rusqlite;
|
||||||
|
use uuid;
|
||||||
|
use mentat_db;
|
||||||
|
|
||||||
|
error_chain! {
|
||||||
|
types {
|
||||||
|
Error, ErrorKind, ResultExt, Result;
|
||||||
|
}
|
||||||
|
|
||||||
|
foreign_links {
|
||||||
|
IOError(std::io::Error);
|
||||||
|
HttpError(hyper::Error);
|
||||||
|
SqlError(rusqlite::Error);
|
||||||
|
UuidParseError(uuid::ParseError);
|
||||||
|
}
|
||||||
|
|
||||||
|
links {
|
||||||
|
DbError(mentat_db::Error, mentat_db::ErrorKind);
|
||||||
|
}
|
||||||
|
|
||||||
|
errors {
|
||||||
|
UnexpectedState(t: String) {
|
||||||
|
description("encountered unexpected state")
|
||||||
|
display("encountered unexpected state: {}", t)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -20,22 +20,11 @@ extern crate futures;
|
||||||
extern crate serde;
|
extern crate serde;
|
||||||
extern crate serde_json;
|
extern crate serde_json;
|
||||||
extern crate mentat_db;
|
extern crate mentat_db;
|
||||||
|
extern crate mentat_core;
|
||||||
extern crate rusqlite;
|
extern crate rusqlite;
|
||||||
extern crate edn;
|
|
||||||
extern crate uuid;
|
extern crate uuid;
|
||||||
|
|
||||||
pub mod schema;
|
pub mod schema;
|
||||||
pub mod metadata;
|
pub mod metadata;
|
||||||
|
pub mod tx_processor;
|
||||||
error_chain! {
|
pub mod errors;
|
||||||
types {
|
|
||||||
Error, ErrorKind, ResultExt, Result;
|
|
||||||
}
|
|
||||||
|
|
||||||
foreign_links {
|
|
||||||
IOError(std::io::Error);
|
|
||||||
HttpError(hyper::Error);
|
|
||||||
SqlError(rusqlite::Error);
|
|
||||||
UuidParseError(edn::UuidParseError);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
@ -14,7 +14,7 @@ use rusqlite;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
use schema;
|
use schema;
|
||||||
use Result;
|
use errors::Result;
|
||||||
|
|
||||||
trait HeadTrackable {
|
trait HeadTrackable {
|
||||||
fn remote_head(&self) -> Result<Uuid>;
|
fn remote_head(&self) -> Result<Uuid>;
|
||||||
|
|
|
@ -9,7 +9,7 @@
|
||||||
// specific language governing permissions and limitations under the License.
|
// specific language governing permissions and limitations under the License.
|
||||||
|
|
||||||
use rusqlite;
|
use rusqlite;
|
||||||
use Result;
|
use errors::Result;
|
||||||
|
|
||||||
pub static REMOTE_HEAD_KEY: &str = r#"remote_head"#;
|
pub static REMOTE_HEAD_KEY: &str = r#"remote_head"#;
|
||||||
|
|
||||||
|
|
169
tolstoy/src/tx_processor.rs
Normal file
169
tolstoy/src/tx_processor.rs
Normal file
|
@ -0,0 +1,169 @@
|
||||||
|
// Copyright 2016 Mozilla
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
|
||||||
|
// this file except in compliance with the License. You may obtain a copy of the
|
||||||
|
// License at http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
// Unless required by applicable law or agreed to in writing, software distributed
|
||||||
|
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
|
||||||
|
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
|
||||||
|
// specific language governing permissions and limitations under the License.
|
||||||
|
use std::iter::Peekable;
|
||||||
|
|
||||||
|
use rusqlite;
|
||||||
|
|
||||||
|
use errors::{
|
||||||
|
Result,
|
||||||
|
};
|
||||||
|
|
||||||
|
use mentat_db::{
|
||||||
|
TypedSQLValue,
|
||||||
|
};
|
||||||
|
|
||||||
|
use mentat_core::{
|
||||||
|
Entid,
|
||||||
|
TypedValue,
|
||||||
|
};
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct TxPart {
|
||||||
|
pub e: Entid,
|
||||||
|
pub a: Entid,
|
||||||
|
pub v: TypedValue,
|
||||||
|
pub tx: Entid,
|
||||||
|
pub added: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct Tx {
|
||||||
|
pub tx: Entid,
|
||||||
|
pub tx_instant: TypedValue,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub trait TxReceiver {
|
||||||
|
fn tx<T>(&mut self, tx_id: Entid, d: &mut T) -> Result<()>
|
||||||
|
where T: Iterator<Item=TxPart>;
|
||||||
|
fn done(&mut self) -> Result<()>;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct Processor {}
|
||||||
|
|
||||||
|
pub struct DatomsIterator<'conn, 't, T>
|
||||||
|
where T: Sized + Iterator<Item=Result<TxPart>> + 't {
|
||||||
|
at_first: bool,
|
||||||
|
at_last: bool,
|
||||||
|
first: &'conn TxPart,
|
||||||
|
rows: &'t mut Peekable<T>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'conn, 't, T> DatomsIterator<'conn, 't, T>
|
||||||
|
where T: Sized + Iterator<Item=Result<TxPart>> + 't {
|
||||||
|
fn new(first: &'conn TxPart, rows: &'t mut Peekable<T>) -> DatomsIterator<'conn, 't, T>
|
||||||
|
{
|
||||||
|
DatomsIterator {
|
||||||
|
at_first: true,
|
||||||
|
at_last: false,
|
||||||
|
first: first,
|
||||||
|
rows: rows,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'conn, 't, T> Iterator for DatomsIterator<'conn, 't, T>
|
||||||
|
where T: Sized + Iterator<Item=Result<TxPart>> + 't {
|
||||||
|
type Item = TxPart;
|
||||||
|
|
||||||
|
fn next(&mut self) -> Option<Self::Item> {
|
||||||
|
if self.at_last {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
|
if self.at_first {
|
||||||
|
self.at_first = false;
|
||||||
|
return Some(self.first.clone());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Look ahead to see if we're about to cross into
|
||||||
|
// the next partition.
|
||||||
|
{
|
||||||
|
let next_option = self.rows.peek();
|
||||||
|
match next_option {
|
||||||
|
Some(&Ok(ref next)) => {
|
||||||
|
if next.tx != self.first.tx {
|
||||||
|
self.at_last = true;
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
// Empty, or error. Either way, this iterator's done.
|
||||||
|
_ => {
|
||||||
|
self.at_last = true;
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// We're in the correct partition, return a TxPart.
|
||||||
|
if let Some(result) = self.rows.next() {
|
||||||
|
match result {
|
||||||
|
Err(_) => None,
|
||||||
|
Ok(datom) => {
|
||||||
|
Some(TxPart {
|
||||||
|
e: datom.e,
|
||||||
|
a: datom.a,
|
||||||
|
v: datom.v.clone(),
|
||||||
|
tx: datom.tx,
|
||||||
|
added: datom.added,
|
||||||
|
})
|
||||||
|
},
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
self.at_last = true;
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn to_tx_part(row: &rusqlite::Row) -> Result<TxPart> {
|
||||||
|
Ok(TxPart {
|
||||||
|
e: row.get(0),
|
||||||
|
a: row.get(1),
|
||||||
|
v: TypedValue::from_sql_value_pair(row.get(2), row.get(3))?,
|
||||||
|
tx: row.get(4),
|
||||||
|
added: row.get(5),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Processor {
|
||||||
|
pub fn process<R>(sqlite: &rusqlite::Connection, receiver: &mut R) -> Result<()>
|
||||||
|
where R: TxReceiver {
|
||||||
|
let mut stmt = sqlite.prepare(
|
||||||
|
"SELECT e, a, v, value_type_tag, tx, added FROM transactions ORDER BY tx"
|
||||||
|
)?;
|
||||||
|
|
||||||
|
let mut rows = stmt.query_and_then(&[], to_tx_part)?.peekable();
|
||||||
|
let mut current_tx = None;
|
||||||
|
while let Some(row) = rows.next() {
|
||||||
|
let datom = row?;
|
||||||
|
|
||||||
|
match current_tx {
|
||||||
|
Some(tx) => {
|
||||||
|
if tx != datom.tx {
|
||||||
|
current_tx = Some(datom.tx);
|
||||||
|
receiver.tx(
|
||||||
|
datom.tx,
|
||||||
|
&mut DatomsIterator::new(&datom, &mut rows)
|
||||||
|
)?;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
None => {
|
||||||
|
current_tx = Some(datom.tx);
|
||||||
|
receiver.tx(
|
||||||
|
datom.tx,
|
||||||
|
&mut DatomsIterator::new(&datom, &mut rows)
|
||||||
|
)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
receiver.done()?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in a new issue