Compare commits

...

2 commits

Author SHA1 Message Date
Grisha Kruglov
687b2cf997 Add 'syncable' feature to 'db' crate to conditionally derive serialization for Partition*
This is leading up syncing with partition support.
2018-07-10 17:43:08 -07:00
Grisha Kruglov
26446ddb05 Add a top-level "syncable" feature.
Tested with:

cargo test --all
cargo test --all --no-default-features
cargo build --manifest-path tools/cli/Cargo.toml --no-default-features
cargo run --manifest-path tools/cli/Cargo.toml --no-default-features debugcli

Co-authored-by: Nick Alexander <nalexander@mozilla.com>
2018-07-10 17:31:18 -07:00
10 changed files with 183 additions and 140 deletions

View file

@ -16,9 +16,10 @@ version = "0.8.1"
build = "build/version.rs" build = "build/version.rs"
[features] [features]
default = ["bundled_sqlite3"] default = ["bundled_sqlite3", "syncable"]
bundled_sqlite3 = ["rusqlite/bundled"] bundled_sqlite3 = ["rusqlite/bundled"]
sqlcipher = ["rusqlite/sqlcipher", "mentat_db/sqlcipher"] sqlcipher = ["rusqlite/sqlcipher", "mentat_db/sqlcipher"]
syncable = ["mentat_tolstoy", "mentat_db/syncable"]
[workspace] [workspace]
members = ["tools/cli", "ffi"] members = ["tools/cli", "ffi"]
@ -71,6 +72,7 @@ path = "query-translator"
[dependencies.mentat_tolstoy] [dependencies.mentat_tolstoy]
path = "tolstoy" path = "tolstoy"
optional = true
[profile.release] [profile.release]
opt-level = 3 opt-level = 3

View file

@ -6,6 +6,7 @@ workspace = ".."
[features] [features]
default = [] default = []
sqlcipher = ["rusqlite/sqlcipher"] sqlcipher = ["rusqlite/sqlcipher"]
syncable = ["serde", "serde_json", "serde_derive"]
[dependencies] [dependencies]
failure = "0.1.1" failure = "0.1.1"
@ -17,6 +18,9 @@ log = "0.4"
ordered-float = "0.5" ordered-float = "0.5"
time = "0.1" time = "0.1"
petgraph = "0.4.12" petgraph = "0.4.12"
serde = { version = "1.0", optional = true }
serde_json = { version = "1.0", optional = true }
serde_derive = { version = "1.0", optional = true }
[dependencies.rusqlite] [dependencies.rusqlite]
version = "0.13" version = "0.13"

View file

@ -15,6 +15,9 @@ extern crate itertools;
#[macro_use] extern crate lazy_static; #[macro_use] extern crate lazy_static;
#[macro_use] extern crate log; #[macro_use] extern crate log;
#[cfg(feature = "syncable")]
#[macro_use] extern crate serde_derive;
extern crate petgraph; extern crate petgraph;
extern crate rusqlite; extern crate rusqlite;
extern crate tabwriter; extern crate tabwriter;

View file

@ -38,6 +38,7 @@ use errors;
/// Represents one partition of the entid space. /// Represents one partition of the entid space.
#[derive(Clone,Debug,Eq,Hash,Ord,PartialOrd,PartialEq)] #[derive(Clone,Debug,Eq,Hash,Ord,PartialOrd,PartialEq)]
#[cfg_attr(feature = "syncable", derive(Serialize,Deserialize))]
pub struct Partition { pub struct Partition {
/// The first entid in the partition. /// The first entid in the partition.
pub start: i64, pub start: i64,

View file

@ -29,6 +29,8 @@ use mentat_query_algebrizer;
use mentat_query_projector; use mentat_query_projector;
use mentat_query_pull; use mentat_query_pull;
use mentat_sql; use mentat_sql;
#[cfg(feature = "syncable")]
use mentat_tolstoy; use mentat_tolstoy;
pub type Result<T> = std::result::Result<T, MentatError>; pub type Result<T> = std::result::Result<T, MentatError>;
@ -107,6 +109,7 @@ pub enum MentatError {
#[fail(display = "{}", _0)] #[fail(display = "{}", _0)]
SQLError(#[cause] mentat_sql::SQLError), SQLError(#[cause] mentat_sql::SQLError),
#[cfg(feature = "syncable")]
#[fail(display = "{}", _0)] #[fail(display = "{}", _0)]
TolstoyError(#[cause] mentat_tolstoy::TolstoyError), TolstoyError(#[cause] mentat_tolstoy::TolstoyError),
} }
@ -159,6 +162,7 @@ impl From<mentat_sql::SQLError> for MentatError {
} }
} }
#[cfg(feature = "syncable")]
impl From<mentat_tolstoy::TolstoyError> for MentatError { impl From<mentat_tolstoy::TolstoyError> for MentatError {
fn from(error: mentat_tolstoy::TolstoyError) -> MentatError { fn from(error: mentat_tolstoy::TolstoyError) -> MentatError {
MentatError::TolstoyError(error) MentatError::TolstoyError(error)

View file

@ -30,6 +30,8 @@ extern crate mentat_query_projector;
extern crate mentat_query_pull; extern crate mentat_query_pull;
extern crate mentat_query_translator; extern crate mentat_query_translator;
extern crate mentat_sql; extern crate mentat_sql;
#[cfg(feature = "syncable")]
extern crate mentat_tolstoy; extern crate mentat_tolstoy;
pub use mentat_core::{ pub use mentat_core::{

View file

@ -38,6 +38,7 @@ use mentat_db::{
TxObserver, TxObserver,
}; };
#[cfg(feature = "syncable")]
use mentat_tolstoy::Syncer; use mentat_tolstoy::Syncer;
use uuid::Uuid; use uuid::Uuid;
@ -237,6 +238,7 @@ impl Pullable for Store {
} }
} }
#[cfg(feature = "syncable")]
impl Syncable for Store { impl Syncable for Store {
fn sync(&mut self, server_uri: &String, user_uuid: &String) -> Result<()> { fn sync(&mut self, server_uri: &String, user_uuid: &String) -> Result<()> {
let uuid = Uuid::parse_str(&user_uuid).map_err(|_| MentatError::BadUuid(user_uuid.clone()))?; let uuid = Uuid::parse_str(&user_uuid).map_err(|_| MentatError::BadUuid(user_uuid.clone()))?;

View file

@ -10,148 +10,154 @@
extern crate mentat; extern crate mentat;
extern crate mentat_core; extern crate mentat_core;
#[cfg(feature = "syncable")]
extern crate mentat_tolstoy; extern crate mentat_tolstoy;
use std::collections::BTreeMap; #[cfg(feature = "syncable")]
mod tests {
use std::collections::BTreeMap;
use mentat::conn::Conn; use mentat::conn::Conn;
use mentat::new_connection; use mentat::new_connection;
use mentat_tolstoy::tx_processor::{ use mentat_tolstoy::tx_processor::{
Processor, Processor,
TxReceiver, TxReceiver,
TxPart, TxPart,
}; };
use mentat_tolstoy::errors::Result; use mentat_tolstoy::errors::Result;
use mentat_core::{ use mentat_core::{
Entid, Entid,
TypedValue, TypedValue,
ValueType, ValueType,
}; };
struct TxCountingReceiver { struct TxCountingReceiver {
pub tx_count: usize, pub tx_count: usize,
pub is_done: bool, pub is_done: bool,
} }
impl TxCountingReceiver { impl TxCountingReceiver {
fn new() -> TxCountingReceiver { fn new() -> TxCountingReceiver {
TxCountingReceiver { TxCountingReceiver {
tx_count: 0, tx_count: 0,
is_done: false, is_done: false,
}
} }
} }
}
impl TxReceiver for TxCountingReceiver { impl TxReceiver for TxCountingReceiver {
fn tx<T>(&mut self, _tx_id: Entid, _d: &mut T) -> Result<()> fn tx<T>(&mut self, _tx_id: Entid, _d: &mut T) -> Result<()>
where T: Iterator<Item=TxPart> { where T: Iterator<Item=TxPart> {
self.tx_count = self.tx_count + 1; self.tx_count = self.tx_count + 1;
Ok(()) Ok(())
} }
fn done(&mut self) -> Result<()> { fn done(&mut self) -> Result<()> {
self.is_done = true; self.is_done = true;
Ok(()) Ok(())
}
}
#[derive(Debug)]
struct TestingReceiver {
pub txes: BTreeMap<Entid, Vec<TxPart>>,
pub is_done: bool,
}
impl TestingReceiver {
fn new() -> TestingReceiver {
TestingReceiver {
txes: BTreeMap::new(),
is_done: false,
} }
} }
}
#[derive(Debug)]
impl TxReceiver for TestingReceiver { struct TestingReceiver {
fn tx<T>(&mut self, tx_id: Entid, d: &mut T) -> Result<()> pub txes: BTreeMap<Entid, Vec<TxPart>>,
where T: Iterator<Item=TxPart> { pub is_done: bool,
let datoms = self.txes.entry(tx_id).or_insert(vec![]); }
datoms.extend(d);
Ok(()) impl TestingReceiver {
} fn new() -> TestingReceiver {
TestingReceiver {
fn done(&mut self) -> Result<()> { txes: BTreeMap::new(),
self.is_done = true; is_done: false,
Ok(()) }
} }
} }
fn assert_tx_datoms_count(receiver: &TestingReceiver, tx_num: usize, expected_datoms: usize) { impl TxReceiver for TestingReceiver {
let tx = receiver.txes.keys().nth(tx_num).expect("first tx"); fn tx<T>(&mut self, tx_id: Entid, d: &mut T) -> Result<()>
let datoms = receiver.txes.get(tx).expect("datoms"); where T: Iterator<Item=TxPart> {
assert_eq!(expected_datoms, datoms.len()); let datoms = self.txes.entry(tx_id).or_insert(vec![]);
} datoms.extend(d);
Ok(())
#[test] }
fn test_reader() {
let mut c = new_connection("").expect("Couldn't open conn."); fn done(&mut self) -> Result<()> {
let mut conn = Conn::connect(&mut c).expect("Couldn't open DB."); self.is_done = true;
{ Ok(())
let db_tx = c.transaction().expect("db tx"); }
// Don't inspect the bootstrap transaction, but we'd like to see it's there. }
let mut receiver = TxCountingReceiver::new();
assert_eq!(false, receiver.is_done); fn assert_tx_datoms_count(receiver: &TestingReceiver, tx_num: usize, expected_datoms: usize) {
Processor::process(&db_tx, None, &mut receiver).expect("processor"); let tx = receiver.txes.keys().nth(tx_num).expect("first tx");
assert_eq!(true, receiver.is_done); let datoms = receiver.txes.get(tx).expect("datoms");
assert_eq!(1, receiver.tx_count); assert_eq!(expected_datoms, datoms.len());
} }
let ids = conn.transact(&mut c, r#"[ #[test]
[:db/add "s" :db/ident :foo/numba] fn test_reader() {
[:db/add "s" :db/valueType :db.type/long] let mut c = new_connection("").expect("Couldn't open conn.");
[:db/add "s" :db/cardinality :db.cardinality/one] let mut conn = Conn::connect(&mut c).expect("Couldn't open DB.");
]"#).expect("successful transaction").tempids; {
let numba_entity_id = ids.get("s").unwrap(); let db_tx = c.transaction().expect("db tx");
// Don't inspect the bootstrap transaction, but we'd like to see it's there.
let bootstrap_tx; let mut receiver = TxCountingReceiver::new();
{ assert_eq!(false, receiver.is_done);
let db_tx = c.transaction().expect("db tx"); Processor::process(&db_tx, None, &mut receiver).expect("processor");
// Expect to see one more transaction of four parts (one for tx datom itself). assert_eq!(true, receiver.is_done);
let mut receiver = TestingReceiver::new(); assert_eq!(1, receiver.tx_count);
Processor::process(&db_tx, None, &mut receiver).expect("processor"); }
println!("{:#?}", receiver); let ids = conn.transact(&mut c, r#"[
[:db/add "s" :db/ident :foo/numba]
assert_eq!(2, receiver.txes.keys().count()); [:db/add "s" :db/valueType :db.type/long]
assert_tx_datoms_count(&receiver, 1, 4); [:db/add "s" :db/cardinality :db.cardinality/one]
]"#).expect("successful transaction").tempids;
bootstrap_tx = Some(*receiver.txes.keys().nth(0).expect("bootstrap tx")); let numba_entity_id = ids.get("s").unwrap();
}
let bootstrap_tx;
let ids = conn.transact(&mut c, r#"[ {
[:db/add "b" :foo/numba 123] let db_tx = c.transaction().expect("db tx");
]"#).expect("successful transaction").tempids; // Expect to see one more transaction of four parts (one for tx datom itself).
let asserted_e = ids.get("b").unwrap(); let mut receiver = TestingReceiver::new();
Processor::process(&db_tx, None, &mut receiver).expect("processor");
{
let db_tx = c.transaction().expect("db tx"); println!("{:#?}", receiver);
// Expect to see a single two part transaction assert_eq!(2, receiver.txes.keys().count());
let mut receiver = TestingReceiver::new(); assert_tx_datoms_count(&receiver, 1, 4);
// Note that we're asking for the bootstrap tx to be skipped by the processor. bootstrap_tx = Some(*receiver.txes.keys().nth(0).expect("bootstrap tx"));
Processor::process(&db_tx, bootstrap_tx, &mut receiver).expect("processor"); }
assert_eq!(2, receiver.txes.keys().count()); let ids = conn.transact(&mut c, r#"[
assert_tx_datoms_count(&receiver, 1, 2); [:db/add "b" :foo/numba 123]
]"#).expect("successful transaction").tempids;
// Inspect the transaction part. let asserted_e = ids.get("b").unwrap();
let tx_id = receiver.txes.keys().nth(1).expect("tx");
let datoms = receiver.txes.get(tx_id).expect("datoms"); {
let part = datoms.iter().find(|&part| &part.e == asserted_e).expect("to find asserted datom"); let db_tx = c.transaction().expect("db tx");
assert_eq!(numba_entity_id, &part.a); // Expect to see a single two part transaction
assert!(part.v.matches_type(ValueType::Long)); let mut receiver = TestingReceiver::new();
assert_eq!(TypedValue::Long(123), part.v);
assert_eq!(true, part.added); // Note that we're asking for the bootstrap tx to be skipped by the processor.
} Processor::process(&db_tx, bootstrap_tx, &mut receiver).expect("processor");
assert_eq!(2, receiver.txes.keys().count());
assert_tx_datoms_count(&receiver, 1, 2);
// Inspect the transaction part.
let tx_id = receiver.txes.keys().nth(1).expect("tx");
let datoms = receiver.txes.get(tx_id).expect("datoms");
let part = datoms.iter().find(|&part| &part.e == asserted_e).expect("to find asserted datom");
assert_eq!(numba_entity_id, &part.a);
assert!(part.v.matches_type(ValueType::Long));
assert_eq!(TypedValue::Long(123), part.v);
assert_eq!(true, part.added);
}
}
} }

View file

@ -4,9 +4,10 @@ version = "0.0.1"
# Forward mentat's features. # Forward mentat's features.
[features] [features]
default = ["bundled_sqlite3"] default = ["bundled_sqlite3", "syncable"]
sqlcipher = ["mentat/sqlcipher"] sqlcipher = ["mentat/sqlcipher"]
bundled_sqlite3 = ["mentat/bundled_sqlite3"] bundled_sqlite3 = ["mentat/bundled_sqlite3"]
syncable = ["mentat/syncable"]
[lib] [lib]
name = "mentat_cli" name = "mentat_cli"

View file

@ -35,19 +35,23 @@ use mentat_core::{
}; };
use mentat::{ use mentat::{
Binding,
CacheDirection, CacheDirection,
Keyword, Keyword,
Queryable,
QueryExplanation, QueryExplanation,
QueryOutput, QueryOutput,
QueryResults, QueryResults,
Queryable,
Store, Store,
Binding,
Syncable,
TxReport, TxReport,
TypedValue, TypedValue,
}; };
#[cfg(feature = "syncable")]
use mentat::{
Syncable,
};
use command_parser::{ use command_parser::{
Command, Command,
}; };
@ -66,7 +70,6 @@ use command_parser::{
COMMAND_QUERY_EXPLAIN_SHORT, COMMAND_QUERY_EXPLAIN_SHORT,
COMMAND_QUERY_PREPARED_LONG, COMMAND_QUERY_PREPARED_LONG,
COMMAND_SCHEMA, COMMAND_SCHEMA,
COMMAND_SYNC,
COMMAND_TIMER_LONG, COMMAND_TIMER_LONG,
COMMAND_TRANSACT_LONG, COMMAND_TRANSACT_LONG,
COMMAND_TRANSACT_SHORT, COMMAND_TRANSACT_SHORT,
@ -82,6 +85,11 @@ use command_parser::{
COMMAND_OPEN_ENCRYPTED, COMMAND_OPEN_ENCRYPTED,
}; };
#[cfg(feature = "syncable")]
use command_parser::{
COMMAND_SYNC,
};
use input::InputReader; use input::InputReader;
use input::InputResult::{ use input::InputResult::{
Empty, Empty,
@ -124,7 +132,9 @@ lazy_static! {
(COMMAND_TIMER_LONG, "Enable or disable timing of query and transact operations."), (COMMAND_TIMER_LONG, "Enable or disable timing of query and transact operations."),
(COMMAND_CACHE, "Cache an attribute. Usage: `.cache :foo/bar reverse`"), (COMMAND_CACHE, "Cache an attribute. Usage: `.cache :foo/bar reverse`"),
(COMMAND_SYNC, "Synchronize the database against a Sync Server URL for a provided user UUID."),
#[cfg(feature = "syncable")]
(COMMAND_SYNC, "Synchronize the database against a Mentat Sync Server URL for a provided user UUID."),
] ]
}; };
} }
@ -359,12 +369,20 @@ impl Repl {
Err(e) => eprintln!("{}", e) Err(e) => eprintln!("{}", e)
}; };
}, },
#[cfg(feature = "syncable")]
Command::Sync(args) => { Command::Sync(args) => {
match self.store.sync(&args[0], &args[1]) { match self.store.sync(&args[0], &args[1]) {
Ok(_) => println!("Synced!"), Ok(_) => println!("Synced!"),
Err(e) => eprintln!("{:?}", e) Err(e) => eprintln!("{:?}", e)
}; };
} },
#[cfg(not(feature = "syncable"))]
Command::Sync(_) => {
eprintln!(".sync requires the syncable Mentat feature");
},
Command::Timer(on) => { Command::Timer(on) => {
self.toggle_timer(on); self.toggle_timer(on);
}, },