From 22b17a6779f176faed6f02d986b3cb729d311aa5 Mon Sep 17 00:00:00 2001 From: Grisha Kruglov Date: Wed, 15 Aug 2018 13:55:46 -0700 Subject: [PATCH] Split "mentat transaction" logic away from the main crate Sync needs to operate over a "mentat transaction", not just a "db transaction". This shuffle allows internal mentat crates to consume InProgress, which models the concept of a "mentat transaction". --- .travis.yml | 2 +- Cargo.toml | 3 + db/src/db.rs | 2 +- public-traits/errors.rs | 4 +- src/conn.rs | 485 +------------------ src/lib.rs | 30 +- src/query_builder.rs | 2 +- src/store.rs | 26 +- src/vocabulary.rs | 14 +- tests/entity_builder.rs | 182 +++++++ tests/query.rs | 5 +- tests/vocabulary.rs | 8 +- transaction/Cargo.toml | 47 ++ {src => transaction/src}/entity_builder.rs | 159 +----- transaction/src/lib.rs | 538 +++++++++++++++++++++ transaction/src/metadata.rs | 41 ++ {src => transaction/src}/query.rs | 0 17 files changed, 879 insertions(+), 669 deletions(-) create mode 100644 tests/entity_builder.rs create mode 100644 transaction/Cargo.toml rename {src => transaction/src}/entity_builder.rs (58%) create mode 100644 transaction/src/lib.rs create mode 100644 transaction/src/metadata.rs rename {src => transaction/src}/query.rs (100%) diff --git a/.travis.yml b/.travis.yml index 45a88372..51ce16ef 100644 --- a/.travis.yml +++ b/.travis.yml @@ -21,7 +21,7 @@ script: # this all-together, see https://github.com/rust-lang/cargo/issues/5364 for more information). To # work around this, we run tests individually for subcrates that rely on `rusqlite`. - | - for crate in "" "db" "db-traits" "ffi" "public-traits" "query-projector" "query-projector-traits" "query-pull" "sql" "tolstoy" "tolstoy-traits" "tools/cli"; do + for crate in "" "db" "db-traits" "ffi" "public-traits" "query-projector" "query-projector-traits" "query-pull" "sql" "tolstoy" "tolstoy-traits" "transaction" "tools/cli"; do cargo test --manifest-path ./$crate/Cargo.toml --verbose --no-default-features --features sqlcipher done after_success: diff --git a/Cargo.toml b/Cargo.toml index a769d2a4..4b3a5e79 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -84,6 +84,9 @@ path = "sql-traits" [dependencies.public_traits] path = "public-traits" +[dependencies.mentat_transaction] +path = "transaction" + [dependencies.mentat_tolstoy] path = "tolstoy" optional = true diff --git a/db/src/db.rs b/db/src/db.rs index ad28069b..881f83e0 100644 --- a/db/src/db.rs +++ b/db/src/db.rs @@ -51,10 +51,10 @@ use core_traits::{ }; use mentat_core::{ + AttributeMap, FromMicros, IdentMap, Schema, - AttributeMap, ToMicros, ValueRc, }; diff --git a/public-traits/errors.rs b/public-traits/errors.rs index 561a8d2c..5f212ab6 100644 --- a/public-traits/errors.rs +++ b/public-traits/errors.rs @@ -23,7 +23,9 @@ use core_traits::{ ValueType, }; -use db_traits::errors::DbError; +use db_traits::errors::{ + DbError, +}; use query_algebrizer_traits::errors::{ AlgebrizerError, }; diff --git a/src/conn.rs b/src/conn.rs index 921da9a5..1462f73f 100644 --- a/src/conn.rs +++ b/src/conn.rs @@ -18,18 +18,6 @@ use std::collections::{ BTreeMap, }; -use std::fs::{ - File, -}; - -use std::io::{ - Read, -}; - -use std::path::{ - Path, -}; - use std::sync::{ Arc, Mutex, @@ -41,9 +29,6 @@ use rusqlite::{ }; use edn; -use edn::{ - InternSet, -}; pub use core_traits::{ Attribute, @@ -63,38 +48,29 @@ use mentat_core::{ }; use mentat_db::cache::{ - InProgressCacheTransactWatcher, InProgressSQLiteAttributeCache, SQLiteAttributeCache, }; use mentat_db::db; use mentat_db::{ - transact, - transact_terms, InProgressObserverTransactWatcher, PartitionMap, - TransactableValue, - TransactWatcher, TxObservationService, TxObserver, }; -use mentat_db::internal_types::TermWithTempIds; - use mentat_query_pull::{ pull_attributes_for_entities, pull_attributes_for_entity, }; -use edn::entities::{ - TempId, - OpType, -}; - -use entity_builder::{ - InProgressBuilder, - TermBuilder, +use mentat_transaction::{ + CacheAction, + CacheDirection, + Metadata, + InProgress, + InProgressRead, }; use public_traits::errors::{ @@ -102,7 +78,7 @@ use public_traits::errors::{ MentatError, }; -use query::{ +use mentat_transaction::query::{ Known, PreparedResult, QueryExplanation, @@ -116,31 +92,6 @@ use query::{ q_uncached, }; -/// Connection metadata required to query from, or apply transactions to, a Mentat store. -/// -/// Owned data for the volatile parts (generation and partition map), and `Arc` for the infrequently -/// changing parts (schema) that we want to share across threads. -/// -/// See https://github.com/mozilla/mentat/wiki/Thoughts:-modeling-db-conn-in-Rust. -pub struct Metadata { - pub generation: u64, - pub partition_map: PartitionMap, - pub schema: Arc, - pub attribute_cache: SQLiteAttributeCache, -} - -impl Metadata { - // Intentionally not public. - fn new(generation: u64, partition_map: PartitionMap, schema: Arc, cache: SQLiteAttributeCache) -> Metadata { - Metadata { - generation: generation, - partition_map: partition_map, - schema: schema, - attribute_cache: cache, - } - } -} - /// A mutable, safe reference to the current Mentat store. pub struct Conn { /// `Mutex` since all reads and writes need to be exclusive. Internally, owned data for the @@ -167,422 +118,10 @@ pub struct Conn { pub(crate) tx_observer_service: Mutex, } -pub trait Queryable { - fn q_explain(&self, query: &str, inputs: T) -> Result - where T: Into>; - fn q_once(&self, query: &str, inputs: T) -> Result - where T: Into>; - fn q_prepare(&self, query: &str, inputs: T) -> PreparedResult - where T: Into>; - fn lookup_values_for_attribute(&self, entity: E, attribute: &edn::Keyword) -> Result> - where E: Into; - fn lookup_value_for_attribute(&self, entity: E, attribute: &edn::Keyword) -> Result> - where E: Into; -} - -pub trait Pullable { - fn pull_attributes_for_entities(&self, entities: E, attributes: A) -> Result>> - where E: IntoIterator, - A: IntoIterator; - fn pull_attributes_for_entity(&self, entity: Entid, attributes: A) -> Result - where A: IntoIterator; -} - pub trait Syncable { fn sync(&mut self, server_uri: &String, user_uuid: &String) -> Result<()>; } -/// Represents an in-progress, not yet committed, set of changes to the store. -/// Call `commit` to commit your changes, or `rollback` to discard them. -/// A transaction is held open until you do so. -/// Your changes will be implicitly dropped along with this struct. -pub struct InProgress<'a, 'c> { - transaction: rusqlite::Transaction<'c>, - mutex: &'a Mutex, - generation: u64, - partition_map: PartitionMap, - pub(crate) schema: Schema, - pub(crate) cache: InProgressSQLiteAttributeCache, - use_caching: bool, - tx_observer: &'a Mutex, - tx_observer_watcher: InProgressObserverTransactWatcher, -} - -/// Represents an in-progress set of reads to the store. Just like `InProgress`, -/// which is read-write, but only allows for reads. -pub struct InProgressRead<'a, 'c>(InProgress<'a, 'c>); - -impl<'a, 'c> Queryable for InProgressRead<'a, 'c> { - fn q_once(&self, query: &str, inputs: T) -> Result - where T: Into> { - self.0.q_once(query, inputs) - } - - fn q_prepare(&self, query: &str, inputs: T) -> PreparedResult - where T: Into> { - self.0.q_prepare(query, inputs) - } - - fn q_explain(&self, query: &str, inputs: T) -> Result - where T: Into> { - self.0.q_explain(query, inputs) - } - - fn lookup_values_for_attribute(&self, entity: E, attribute: &edn::Keyword) -> Result> - where E: Into { - self.0.lookup_values_for_attribute(entity, attribute) - } - - fn lookup_value_for_attribute(&self, entity: E, attribute: &edn::Keyword) -> Result> - where E: Into { - self.0.lookup_value_for_attribute(entity, attribute) - } -} - -impl<'a, 'c> Pullable for InProgressRead<'a, 'c> { - fn pull_attributes_for_entities(&self, entities: E, attributes: A) -> Result>> - where E: IntoIterator, - A: IntoIterator { - self.0.pull_attributes_for_entities(entities, attributes) - } - - fn pull_attributes_for_entity(&self, entity: Entid, attributes: A) -> Result - where A: IntoIterator { - self.0.pull_attributes_for_entity(entity, attributes) - } -} - -impl<'a, 'c> Queryable for InProgress<'a, 'c> { - fn q_once(&self, query: &str, inputs: T) -> Result - where T: Into> { - - if self.use_caching { - let known = Known::new(&self.schema, Some(&self.cache)); - q_once(&*(self.transaction), - known, - query, - inputs) - } else { - q_uncached(&*(self.transaction), - &self.schema, - query, - inputs) - } - } - - fn q_prepare(&self, query: &str, inputs: T) -> PreparedResult - where T: Into> { - - let known = Known::new(&self.schema, Some(&self.cache)); - q_prepare(&*(self.transaction), - known, - query, - inputs) - } - - fn q_explain(&self, query: &str, inputs: T) -> Result - where T: Into> { - - let known = Known::new(&self.schema, Some(&self.cache)); - q_explain(&*(self.transaction), - known, - query, - inputs) - } - - fn lookup_values_for_attribute(&self, entity: E, attribute: &edn::Keyword) -> Result> - where E: Into { - let known = Known::new(&self.schema, Some(&self.cache)); - lookup_values_for_attribute(&*(self.transaction), known, entity, attribute) - } - - fn lookup_value_for_attribute(&self, entity: E, attribute: &edn::Keyword) -> Result> - where E: Into { - let known = Known::new(&self.schema, Some(&self.cache)); - lookup_value_for_attribute(&*(self.transaction), known, entity, attribute) - } -} - -impl<'a, 'c> Pullable for InProgress<'a, 'c> { - fn pull_attributes_for_entities(&self, entities: E, attributes: A) -> Result>> - where E: IntoIterator, - A: IntoIterator { - pull_attributes_for_entities(&self.schema, &*(self.transaction), entities, attributes) - .map_err(|e| e.into()) - } - - fn pull_attributes_for_entity(&self, entity: Entid, attributes: A) -> Result - where A: IntoIterator { - pull_attributes_for_entity(&self.schema, &*(self.transaction), entity, attributes) - .map_err(|e| e.into()) - } -} - -impl<'a, 'c> HasSchema for InProgressRead<'a, 'c> { - fn entid_for_type(&self, t: ValueType) -> Option { - self.0.entid_for_type(t) - } - - fn get_ident(&self, x: T) -> Option<&Keyword> where T: Into { - self.0.get_ident(x) - } - - fn get_entid(&self, x: &Keyword) -> Option { - self.0.get_entid(x) - } - - fn attribute_for_entid(&self, x: T) -> Option<&Attribute> where T: Into { - self.0.attribute_for_entid(x) - } - - fn attribute_for_ident(&self, ident: &Keyword) -> Option<(&Attribute, KnownEntid)> { - self.0.attribute_for_ident(ident) - } - - /// Return true if the provided entid identifies an attribute in this schema. - fn is_attribute(&self, x: T) -> bool where T: Into { - self.0.is_attribute(x) - } - - /// Return true if the provided ident identifies an attribute in this schema. - fn identifies_attribute(&self, x: &Keyword) -> bool { - self.0.identifies_attribute(x) - } - - fn component_attributes(&self) -> &[Entid] { - self.0.component_attributes() - } -} - -impl<'a, 'c> HasSchema for InProgress<'a, 'c> { - fn entid_for_type(&self, t: ValueType) -> Option { - self.schema.entid_for_type(t) - } - - fn get_ident(&self, x: T) -> Option<&Keyword> where T: Into { - self.schema.get_ident(x) - } - - fn get_entid(&self, x: &Keyword) -> Option { - self.schema.get_entid(x) - } - - fn attribute_for_entid(&self, x: T) -> Option<&Attribute> where T: Into { - self.schema.attribute_for_entid(x) - } - - fn attribute_for_ident(&self, ident: &Keyword) -> Option<(&Attribute, KnownEntid)> { - self.schema.attribute_for_ident(ident) - } - - /// Return true if the provided entid identifies an attribute in this schema. - fn is_attribute(&self, x: T) -> bool where T: Into { - self.schema.is_attribute(x) - } - - /// Return true if the provided ident identifies an attribute in this schema. - fn identifies_attribute(&self, x: &Keyword) -> bool { - self.schema.identifies_attribute(x) - } - - fn component_attributes(&self) -> &[Entid] { - self.schema.component_attributes() - } -} - -impl<'a, 'c> InProgressRead<'a, 'c> { - pub fn last_tx_id(&self) -> Entid { - self.0.last_tx_id() - } -} - -impl<'a, 'c> InProgress<'a, 'c> { - pub fn builder(self) -> InProgressBuilder<'a, 'c> { - InProgressBuilder::new(self) - } - - /// Choose whether to use in-memory caches for running queries. - pub fn use_caching(&mut self, yesno: bool) { - self.use_caching = yesno; - } - - /// If you only have a reference to an `InProgress`, you can't use the easy builder. - /// This exists so you can make your own. - pub fn transact_builder(&mut self, builder: TermBuilder) -> Result { - builder.build() - .and_then(|(terms, _tempid_set)| { - self.transact_entities(terms) - }) - } - - pub fn transact_terms(&mut self, terms: I, tempid_set: InternSet) -> Result where I: IntoIterator { - let w = InProgressTransactWatcher::new( - &mut self.tx_observer_watcher, - self.cache.transact_watcher()); - let (report, next_partition_map, next_schema, _watcher) = - transact_terms(&self.transaction, - self.partition_map.clone(), - &self.schema, - &self.schema, - w, - terms, - tempid_set)?; - self.partition_map = next_partition_map; - if let Some(schema) = next_schema { - self.schema = schema; - } - Ok(report) - } - - pub fn transact_entities(&mut self, entities: I) -> Result where I: IntoIterator> { - // We clone the partition map here, rather than trying to use a Cell or using a mutable - // reference, for two reasons: - // 1. `transact` allocates new IDs in partitions before and while doing work that might - // fail! We don't want to mutate this map on failure, so we can't just use &mut. - // 2. Even if we could roll that back, we end up putting this `PartitionMap` into our - // `Metadata` on return. If we used `Cell` or other mechanisms, we'd be using - // `Default::default` in those situations to extract the partition map, and so there - // would still be some cost. - let w = InProgressTransactWatcher::new( - &mut self.tx_observer_watcher, - self.cache.transact_watcher()); - let (report, next_partition_map, next_schema, _watcher) = - transact(&self.transaction, - self.partition_map.clone(), - &self.schema, - &self.schema, - w, - entities)?; - self.partition_map = next_partition_map; - if let Some(schema) = next_schema { - self.schema = schema; - } - Ok(report) - } - - pub fn transact(&mut self, transaction: B) -> Result where B: Borrow { - let entities = edn::parse::entities(transaction.borrow())?; - self.transact_entities(entities) - } - - pub fn import

(&mut self, path: P) -> Result - where P: AsRef { - let mut file = File::open(path)?; - let mut text: String = String::new(); - file.read_to_string(&mut text)?; - self.transact(text.as_str()) - } - - pub fn rollback(self) -> Result<()> { - self.transaction.rollback().map_err(|e| e.into()) - } - - pub fn commit(self) -> Result<()> { - // The mutex is taken during this entire method. - let mut metadata = self.mutex.lock().unwrap(); - - if self.generation != metadata.generation { - // Somebody else wrote! - // Retrying is tracked by https://github.com/mozilla/mentat/issues/357. - // This should not occur -- an attempt to take a competing IMMEDIATE transaction - // will fail with `SQLITE_BUSY`, causing this function to abort. - bail!(MentatError::UnexpectedLostTransactRace); - } - - // Commit the SQLite transaction while we hold the mutex. - self.transaction.commit()?; - - metadata.generation += 1; - metadata.partition_map = self.partition_map; - - // Update the conn's cache if we made any changes. - self.cache.commit_to(&mut metadata.attribute_cache); - - if self.schema != *(metadata.schema) { - metadata.schema = Arc::new(self.schema); - - // TODO: rebuild vocabularies and notify consumers that they've changed -- it's possible - // that a change has arrived over the wire and invalidated some local module. - // TODO: consider making vocabulary lookup lazy -- we won't need it much of the time. - } - - let txes = self.tx_observer_watcher.txes; - self.tx_observer.lock().unwrap().in_progress_did_commit(txes); - - Ok(()) - } - - pub fn cache(&mut self, - attribute: &Keyword, - cache_direction: CacheDirection, - cache_action: CacheAction) -> Result<()> { - let attribute_entid: Entid = self.schema - .attribute_for_ident(&attribute) - .ok_or_else(|| MentatError::UnknownAttribute(attribute.to_string()))?.1.into(); - - match cache_action { - CacheAction::Register => { - match cache_direction { - CacheDirection::Both => self.cache.register(&self.schema, &self.transaction, attribute_entid), - CacheDirection::Forward => self.cache.register_forward(&self.schema, &self.transaction, attribute_entid), - CacheDirection::Reverse => self.cache.register_reverse(&self.schema, &self.transaction, attribute_entid), - }.map_err(|e| e.into()) - }, - CacheAction::Deregister => { - self.cache.unregister(attribute_entid); - Ok(()) - }, - } - } - - pub fn last_tx_id(&self) -> Entid { - self.partition_map[":db.part/tx"].next_entid() - 1 - } -} - -struct InProgressTransactWatcher<'a, 'o> { - cache_watcher: InProgressCacheTransactWatcher<'a>, - observer_watcher: &'o mut InProgressObserverTransactWatcher, - tx_id: Option, -} - -impl<'a, 'o> InProgressTransactWatcher<'a, 'o> { - fn new(observer_watcher: &'o mut InProgressObserverTransactWatcher, cache_watcher: InProgressCacheTransactWatcher<'a>) -> Self { - InProgressTransactWatcher { - cache_watcher: cache_watcher, - observer_watcher: observer_watcher, - tx_id: None, - } - } -} - -impl<'a, 'o> TransactWatcher for InProgressTransactWatcher<'a, 'o> { - fn datom(&mut self, op: OpType, e: Entid, a: Entid, v: &TypedValue) { - self.cache_watcher.datom(op.clone(), e.clone(), a.clone(), v); - self.observer_watcher.datom(op.clone(), e.clone(), a.clone(), v); - } - - fn done(&mut self, t: &Entid, schema: &Schema) -> ::db_traits::errors::Result<()> { - self.cache_watcher.done(t, schema)?; - self.observer_watcher.done(t, schema)?; - self.tx_id = Some(t.clone()); - Ok(()) - } -} - -#[derive(Clone, Copy, Debug, Eq, PartialEq)] -pub enum CacheDirection { - Forward, - Reverse, - Both, -} - -#[derive(Clone, Copy, Debug, Eq, PartialEq)] -pub enum CacheAction { - Register, - Deregister, -} - impl Conn { // Intentionally not public. fn new(partition_map: PartitionMap, schema: Schema) -> Conn { @@ -768,14 +307,14 @@ impl Conn { // Make both args mutable so that we can't have parallel access. pub fn begin_read<'m, 'conn>(&'m mut self, sqlite: &'conn mut rusqlite::Connection) -> Result> { self.begin_transaction_with_behavior(sqlite, TransactionBehavior::Deferred) - .map(InProgressRead) + .map(|ip| InProgressRead { in_progress: ip }) } pub fn begin_uncached_read<'m, 'conn>(&'m mut self, sqlite: &'conn mut rusqlite::Connection) -> Result> { self.begin_transaction_with_behavior(sqlite, TransactionBehavior::Deferred) .map(|mut ip| { ip.use_caching(false); - InProgressRead(ip) + InProgressRead { in_progress: ip } }) } @@ -870,7 +409,7 @@ mod tests { CachedAttributes, }; - use ::query::{ + use mentat_transaction::query::{ Variable, }; @@ -882,6 +421,10 @@ mod tests { use mentat_db::USER0; + use mentat_transaction::{ + Queryable, + }; + #[test] fn test_transact_does_not_collide_existing_entids() { let mut sqlite = db::new_connection("").unwrap(); diff --git a/src/lib.rs b/src/lib.rs index 41be6d1b..a0e0b0c8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,8 +8,6 @@ // CONDITIONS OF ANY KIND, either express or implied. See the License for the // specific language governing permissions and limitations under the License. -#![recursion_limit="128"] - extern crate failure; #[macro_use] @@ -34,6 +32,7 @@ extern crate query_pull_traits; extern crate sql_traits; extern crate mentat_sql; extern crate public_traits; +extern crate mentat_transaction; #[cfg(feature = "syncable")] extern crate mentat_tolstoy; @@ -153,14 +152,14 @@ pub use mentat_query_projector::{ pub use query_pull_traits::errors::PullError; pub use sql_traits::errors::SQLError; -pub mod conn; -pub mod entity_builder; -pub mod query; -pub mod query_builder; -pub mod store; -pub mod vocabulary; +pub use mentat_transaction::{ + Metadata, +}; -pub use query::{ +pub use mentat_transaction::query; +pub use mentat_transaction::entity_builder; + +pub use mentat_transaction::query::{ IntoResult, PlainSymbol, QueryExecutionResult, @@ -174,19 +173,26 @@ pub use query::{ q_once, }; +pub mod conn; +pub mod query_builder; +pub mod store; +pub mod vocabulary; + pub use query_builder::{ QueryBuilder, }; pub use conn::{ + Conn, + Syncable, +}; + +pub use mentat_transaction::{ CacheAction, CacheDirection, - Conn, InProgress, - Metadata, Pullable, Queryable, - Syncable, }; pub use store::{ diff --git a/src/query_builder.rs b/src/query_builder.rs index 097eb439..833faf24 100644 --- a/src/query_builder.rs +++ b/src/query_builder.rs @@ -95,7 +95,7 @@ impl<'a> QueryBuilder<'a> { let types = ::std::mem::replace(&mut self.types, Default::default()); let query_inputs = QueryInputs::new(types, values)?; let read = self.store.begin_read()?; - read.q_once(&self.query, query_inputs) + read.q_once(&self.query, query_inputs).map_err(|e| e.into()) } pub fn execute_scalar(&mut self) -> Result> { diff --git a/src/store.rs b/src/store.rs index 5ca22ab3..591b9fad 100644 --- a/src/store.rs +++ b/src/store.rs @@ -14,10 +14,6 @@ use std::collections::{ BTreeMap, }; -use std::path::{ - Path, -}; - use std::sync::{ Arc, }; @@ -44,27 +40,29 @@ use mentat_db::{ #[cfg(feature = "syncable")] use mentat_tolstoy::Syncer; -use conn::{ +use mentat_transaction::{ CacheAction, CacheDirection, - Conn, InProgress, InProgressRead, Pullable, Queryable, }; -#[cfg(feature = "syncable")] use conn::{ - Syncable, + Conn, }; use public_traits::errors::{ - MentatError, Result, }; -use query::{ +#[cfg(feature = "syncable")] +use public_traits::errors::{ + MentatError, +}; + +use mentat_transaction::query::{ PreparedResult, QueryExplanation, QueryInputs, @@ -214,6 +212,9 @@ impl Pullable for Store { #[cfg(feature = "syncable")] use uuid::Uuid; +#[cfg(feature = "syncable")] +use conn::Syncable; + #[cfg(feature = "syncable")] impl Syncable for Store { fn sync(&mut self, server_uri: &String, user_uuid: &String) -> Result<()> { @@ -232,6 +233,7 @@ mod tests { BTreeSet, }; use std::path::{ + Path, PathBuf, }; use std::sync::mpsc; @@ -258,11 +260,11 @@ mod tests { HasSchema, }; - use ::entity_builder::{ + use mentat_transaction::entity_builder::{ BuildTerms, }; - use ::query::{ + use mentat_transaction::query::{ PreparedQuery, }; diff --git a/src/vocabulary.rs b/src/vocabulary.rs index 91a18116..e708117a 100644 --- a/src/vocabulary.rs +++ b/src/vocabulary.rs @@ -113,17 +113,17 @@ use ::{ ValueType, }; -use ::conn::{ - InProgress, - Queryable, -}; - use ::errors::{ MentatError, Result, }; -use ::entity_builder::{ +use mentat_transaction::{ + InProgress, + Queryable, +}; + +use mentat_transaction::entity_builder::{ BuildTerms, TermBuilder, Terms, @@ -476,7 +476,7 @@ impl Definition { } } - builder.build() + builder.build().map_err(|e| e.into()) } /// Return a sequence of terms that describes this vocabulary definition and its attributes. diff --git a/tests/entity_builder.rs b/tests/entity_builder.rs new file mode 100644 index 00000000..62f5202b --- /dev/null +++ b/tests/entity_builder.rs @@ -0,0 +1,182 @@ +// Copyright 2018 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +#[macro_use] +extern crate mentat; +extern crate mentat_core; +extern crate mentat_db; +extern crate mentat_transaction; +extern crate public_traits; +extern crate core_traits; +extern crate db_traits; + +use mentat::conn::{ + Conn, +}; + +use core_traits::{ + Entid, + KnownEntid, + TypedValue, +}; + +use mentat_core::{ + HasSchema, + TxReport, +}; + +use mentat_transaction::{ + TermBuilder, + Queryable, +}; + +use public_traits::errors::{ + MentatError, +}; + +use mentat::entity_builder::{ + BuildTerms, +}; + +// In reality we expect the store to hand these out safely. +fn fake_known_entid(e: Entid) -> KnownEntid { + KnownEntid(e) +} + +#[test] +fn test_entity_builder_bogus_entids() { + let mut builder = TermBuilder::new(); + let e = builder.named_tempid("x"); + let a1 = fake_known_entid(37); // :db/doc + let a2 = fake_known_entid(999); + let v = TypedValue::typed_string("Some attribute"); + let ve = fake_known_entid(12345); + + builder.add(e.clone(), a1, v).expect("add succeeded"); + builder.add(e.clone(), a2, e.clone()).expect("add succeeded, even though it's meaningless"); + builder.add(e.clone(), a2, ve).expect("add succeeded, even though it's meaningless"); + let (terms, tempids) = builder.build().expect("build succeeded"); + + assert_eq!(tempids.len(), 1); + assert_eq!(terms.len(), 3); // TODO: check the contents? + + // Now try to add them to a real store. + let mut sqlite = mentat_db::db::new_connection("").unwrap(); + let mut conn = Conn::connect(&mut sqlite).unwrap(); + let mut in_progress = conn.begin_transaction(&mut sqlite).expect("begun successfully"); + + // This should fail: unrecognized entid. + match in_progress.transact_entities(terms).expect_err("expected transact to fail") { + MentatError::DbError(e) => { + assert_eq!(e.kind(), db_traits::errors::DbErrorKind::UnrecognizedEntid(999)); + }, + _ => panic!("Should have rejected the entid."), + } +} + +#[test] +fn test_in_progress_builder() { + let mut sqlite = mentat_db::db::new_connection("").unwrap(); + let mut conn = Conn::connect(&mut sqlite).unwrap(); + + // Give ourselves a schema to work with! + conn.transact(&mut sqlite, r#"[ + [:db/add "o" :db/ident :foo/one] + [:db/add "o" :db/valueType :db.type/long] + [:db/add "o" :db/cardinality :db.cardinality/one] + [:db/add "m" :db/ident :foo/many] + [:db/add "m" :db/valueType :db.type/string] + [:db/add "m" :db/cardinality :db.cardinality/many] + [:db/add "r" :db/ident :foo/ref] + [:db/add "r" :db/valueType :db.type/ref] + [:db/add "r" :db/cardinality :db.cardinality/one] + ]"#).unwrap(); + + let in_progress = conn.begin_transaction(&mut sqlite).expect("begun successfully"); + + // We can use this or not! + let a_many = in_progress.get_entid(&kw!(:foo/many)).expect(":foo/many"); + + let mut builder = in_progress.builder(); + let e_x = builder.named_tempid("x"); + let v_many_1 = TypedValue::typed_string("Some text"); + let v_many_2 = TypedValue::typed_string("Other text"); + builder.add(e_x.clone(), kw!(:foo/many), v_many_1).expect("add succeeded"); + builder.add(e_x.clone(), a_many, v_many_2).expect("add succeeded"); + builder.commit().expect("commit succeeded"); +} + +#[test] +fn test_entity_builder() { + let mut sqlite = mentat_db::db::new_connection("").unwrap(); + let mut conn = Conn::connect(&mut sqlite).unwrap(); + + let foo_one = kw!(:foo/one); + let foo_many = kw!(:foo/many); + let foo_ref = kw!(:foo/ref); + let report: TxReport; + + // Give ourselves a schema to work with! + // Scoped borrow of conn. + { + conn.transact(&mut sqlite, r#"[ + [:db/add "o" :db/ident :foo/one] + [:db/add "o" :db/valueType :db.type/long] + [:db/add "o" :db/cardinality :db.cardinality/one] + [:db/add "m" :db/ident :foo/many] + [:db/add "m" :db/valueType :db.type/string] + [:db/add "m" :db/cardinality :db.cardinality/many] + [:db/add "r" :db/ident :foo/ref] + [:db/add "r" :db/valueType :db.type/ref] + [:db/add "r" :db/cardinality :db.cardinality/one] + ]"#).unwrap(); + + let mut in_progress = conn.begin_transaction(&mut sqlite).expect("begun successfully"); + + // Scoped borrow of in_progress. + { + let mut builder = TermBuilder::new(); + let e_x = builder.named_tempid("x"); + let e_y = builder.named_tempid("y"); + let a_ref = in_progress.get_entid(&foo_ref).expect(":foo/ref"); + let a_one = in_progress.get_entid(&foo_one).expect(":foo/one"); + let a_many = in_progress.get_entid(&foo_many).expect(":foo/many"); + let v_many_1 = TypedValue::typed_string("Some text"); + let v_many_2 = TypedValue::typed_string("Other text"); + let v_long: TypedValue = 123.into(); + + builder.add(e_x.clone(), a_many, v_many_1).expect("add succeeded"); + builder.add(e_x.clone(), a_many, v_many_2).expect("add succeeded"); + builder.add(e_y.clone(), a_ref, e_x.clone()).expect("add succeeded"); + builder.add(e_x.clone(), a_one, v_long).expect("add succeeded"); + + let (terms, tempids) = builder.build().expect("build succeeded"); + + assert_eq!(tempids.len(), 2); + assert_eq!(terms.len(), 4); + + report = in_progress.transact_entities(terms).expect("add succeeded"); + let x = report.tempids.get("x").expect("our tempid has an ID"); + let y = report.tempids.get("y").expect("our tempid has an ID"); + assert_eq!(in_progress.lookup_value_for_attribute(*y, &foo_ref).expect("lookup succeeded"), + Some(TypedValue::Ref(*x))); + assert_eq!(in_progress.lookup_value_for_attribute(*x, &foo_one).expect("lookup succeeded"), + Some(TypedValue::Long(123))); + } + + in_progress.commit().expect("commit succeeded"); + } + + // It's all still there after the commit. + let x = report.tempids.get("x").expect("our tempid has an ID"); + let y = report.tempids.get("y").expect("our tempid has an ID"); + assert_eq!(conn.lookup_value_for_attribute(&mut sqlite, *y, &foo_ref).expect("lookup succeeded"), + Some(TypedValue::Ref(*x))); +} diff --git a/tests/query.rs b/tests/query.rs index a970d524..819835ad 100644 --- a/tests/query.rs +++ b/tests/query.rs @@ -15,8 +15,11 @@ extern crate time; extern crate mentat; extern crate mentat_core; extern crate core_traits; +extern crate public_traits; extern crate mentat_db; +extern crate mentat_transaction; + // TODO: when we switch to `failure`, make this more humane. extern crate query_algebrizer_traits; // For errors; extern crate query_projector_traits; // For errors. @@ -63,7 +66,7 @@ use mentat::query::q_uncached; use mentat::conn::Conn; -use mentat::errors::{ +use public_traits::errors::{ MentatError, }; diff --git a/tests/vocabulary.rs b/tests/vocabulary.rs index 82e225b7..979a4a24 100644 --- a/tests/vocabulary.rs +++ b/tests/vocabulary.rs @@ -623,7 +623,7 @@ fn test_upgrade_with_functions() { return Ok(()); } - ip.transact_builder(builder).and(Ok(())) + ip.transact_builder(builder).and(Ok(())).map_err(|e| e.into()) } fn people_v1_to_v2(ip: &mut InProgress, from: &Vocabulary) -> mentat::errors::Result<()> { @@ -696,7 +696,7 @@ fn test_upgrade_with_functions() { return Ok(()); } - ip.transact_builder(builder).and(Ok(())) + ip.transact_builder(builder).and(Ok(())).map_err(|e| e.into()) } /// This is the function we write to dedupe. This logic is very suitable for sharing: @@ -751,7 +751,7 @@ fn test_upgrade_with_functions() { return Ok(()); } - ip.transact_builder(builder).and(Ok(())) + ip.transact_builder(builder).and(Ok(())).map_err(|e| e.into()) } // This migration is bad: it can't impose the uniqueness constraint because we end up with @@ -992,7 +992,7 @@ fn test_upgrade_with_functions() { builder.add(person_likes, db_doc, TypedValue::typed_string("Deprecated. Use :movie/likes or :food/likes instead."))?; - ip.transact_builder(builder).and(Ok(())) + ip.transact_builder(builder).and(Ok(())).map_err(|e| e.into()) } }; diff --git a/transaction/Cargo.toml b/transaction/Cargo.toml new file mode 100644 index 00000000..48ccdb0b --- /dev/null +++ b/transaction/Cargo.toml @@ -0,0 +1,47 @@ +[package] +name = "mentat_transaction" +version = "0.0.1" +workspace = ".." + +[features] +sqlcipher = ["rusqlite/sqlcipher"] + +[dependencies] +failure = "0.1.1" + +[dependencies.edn] +path = "../edn" + +[dependencies.public_traits] +path = "../public-traits" + +[dependencies.mentat_core] +path = "../core" + +[dependencies.core_traits] +path = "../core-traits" + +[dependencies.mentat_db] +path = "../db" + +[dependencies.db_traits] +path = "../db-traits" + +[dependencies.mentat_sql] +path = "../sql" + +[dependencies.mentat_query_algebrizer] +path = "../query-algebrizer" + +[dependencies.mentat_query_projector] +path = "../query-projector" + +[dependencies.mentat_query_pull] +path = "../query-pull" + +[dependencies.mentat_query_sql] +path = "../query-sql" + +[dependencies.rusqlite] +version = "0.13" +features = ["limits"] diff --git a/src/entity_builder.rs b/transaction/src/entity_builder.rs similarity index 58% rename from src/entity_builder.rs rename to transaction/src/entity_builder.rs index 0ae54f10..af4ac1ac 100644 --- a/src/entity_builder.rs +++ b/transaction/src/entity_builder.rs @@ -76,9 +76,7 @@ use mentat_core::{ TxReport, }; -use conn::{ - InProgress, -}; +use ::InProgress; use public_traits::errors::{ Result, @@ -278,158 +276,3 @@ impl<'a, 'c> EntityBuilder> { self.finish().0.commit() } } - -#[cfg(test)] -mod testing { - extern crate mentat_db; - extern crate db_traits; - - use ::{ - Conn, - Entid, - HasSchema, - KnownEntid, - MentatError, - Queryable, - TxReport, - TypedValue, - }; - - use super::*; - - // In reality we expect the store to hand these out safely. - fn fake_known_entid(e: Entid) -> KnownEntid { - KnownEntid(e) - } - - #[test] - fn test_entity_builder_bogus_entids() { - let mut builder = TermBuilder::new(); - let e = builder.named_tempid("x"); - let a1 = fake_known_entid(37); // :db/doc - let a2 = fake_known_entid(999); - let v = TypedValue::typed_string("Some attribute"); - let ve = fake_known_entid(12345); - - builder.add(e.clone(), a1, v).expect("add succeeded"); - builder.add(e.clone(), a2, e.clone()).expect("add succeeded, even though it's meaningless"); - builder.add(e.clone(), a2, ve).expect("add succeeded, even though it's meaningless"); - let (terms, tempids) = builder.build().expect("build succeeded"); - - assert_eq!(tempids.len(), 1); - assert_eq!(terms.len(), 3); // TODO: check the contents? - - // Now try to add them to a real store. - let mut sqlite = mentat_db::db::new_connection("").unwrap(); - let mut conn = Conn::connect(&mut sqlite).unwrap(); - let mut in_progress = conn.begin_transaction(&mut sqlite).expect("begun successfully"); - - // This should fail: unrecognized entid. - match in_progress.transact_entities(terms).expect_err("expected transact to fail") { - MentatError::DbError(e) => { - assert_eq!(e.kind(), db_traits::errors::DbErrorKind::UnrecognizedEntid(999)); - }, - _ => panic!("Should have rejected the entid."), - } - } - - #[test] - fn test_in_progress_builder() { - let mut sqlite = mentat_db::db::new_connection("").unwrap(); - let mut conn = Conn::connect(&mut sqlite).unwrap(); - - // Give ourselves a schema to work with! - conn.transact(&mut sqlite, r#"[ - [:db/add "o" :db/ident :foo/one] - [:db/add "o" :db/valueType :db.type/long] - [:db/add "o" :db/cardinality :db.cardinality/one] - [:db/add "m" :db/ident :foo/many] - [:db/add "m" :db/valueType :db.type/string] - [:db/add "m" :db/cardinality :db.cardinality/many] - [:db/add "r" :db/ident :foo/ref] - [:db/add "r" :db/valueType :db.type/ref] - [:db/add "r" :db/cardinality :db.cardinality/one] - ]"#).unwrap(); - - let in_progress = conn.begin_transaction(&mut sqlite).expect("begun successfully"); - - // We can use this or not! - let a_many = in_progress.get_entid(&kw!(:foo/many)).expect(":foo/many"); - - let mut builder = in_progress.builder(); - let e_x = builder.named_tempid("x"); - let v_many_1 = TypedValue::typed_string("Some text"); - let v_many_2 = TypedValue::typed_string("Other text"); - builder.add(e_x.clone(), kw!(:foo/many), v_many_1).expect("add succeeded"); - builder.add(e_x.clone(), a_many, v_many_2).expect("add succeeded"); - builder.commit().expect("commit succeeded"); - } - - #[test] - fn test_entity_builder() { - let mut sqlite = mentat_db::db::new_connection("").unwrap(); - let mut conn = Conn::connect(&mut sqlite).unwrap(); - - let foo_one = kw!(:foo/one); - let foo_many = kw!(:foo/many); - let foo_ref = kw!(:foo/ref); - let report: TxReport; - - // Give ourselves a schema to work with! - // Scoped borrow of conn. - { - conn.transact(&mut sqlite, r#"[ - [:db/add "o" :db/ident :foo/one] - [:db/add "o" :db/valueType :db.type/long] - [:db/add "o" :db/cardinality :db.cardinality/one] - [:db/add "m" :db/ident :foo/many] - [:db/add "m" :db/valueType :db.type/string] - [:db/add "m" :db/cardinality :db.cardinality/many] - [:db/add "r" :db/ident :foo/ref] - [:db/add "r" :db/valueType :db.type/ref] - [:db/add "r" :db/cardinality :db.cardinality/one] - ]"#).unwrap(); - - let mut in_progress = conn.begin_transaction(&mut sqlite).expect("begun successfully"); - - // Scoped borrow of in_progress. - { - let mut builder = TermBuilder::new(); - let e_x = builder.named_tempid("x"); - let e_y = builder.named_tempid("y"); - let a_ref = in_progress.get_entid(&foo_ref).expect(":foo/ref"); - let a_one = in_progress.get_entid(&foo_one).expect(":foo/one"); - let a_many = in_progress.get_entid(&foo_many).expect(":foo/many"); - let v_many_1 = TypedValue::typed_string("Some text"); - let v_many_2 = TypedValue::typed_string("Other text"); - let v_long: TypedValue = 123.into(); - - builder.add(e_x.clone(), a_many, v_many_1).expect("add succeeded"); - builder.add(e_x.clone(), a_many, v_many_2).expect("add succeeded"); - builder.add(e_y.clone(), a_ref, e_x.clone()).expect("add succeeded"); - builder.add(e_x.clone(), a_one, v_long).expect("add succeeded"); - - let (terms, tempids) = builder.build().expect("build succeeded"); - - assert_eq!(tempids.len(), 2); - assert_eq!(terms.len(), 4); - - report = in_progress.transact_entities(terms).expect("add succeeded"); - let x = report.tempids.get("x").expect("our tempid has an ID"); - let y = report.tempids.get("y").expect("our tempid has an ID"); - assert_eq!(in_progress.lookup_value_for_attribute(*y, &foo_ref).expect("lookup succeeded"), - Some(TypedValue::Ref(*x))); - assert_eq!(in_progress.lookup_value_for_attribute(*x, &foo_one).expect("lookup succeeded"), - Some(TypedValue::Long(123))); - } - - in_progress.commit().expect("commit succeeded"); - } - - // It's all still there after the commit. - let x = report.tempids.get("x").expect("our tempid has an ID"); - let y = report.tempids.get("y").expect("our tempid has an ID"); - assert_eq!(conn.lookup_value_for_attribute(&mut sqlite, *y, &foo_ref).expect("lookup succeeded"), - Some(TypedValue::Ref(*x))); - } -} diff --git a/transaction/src/lib.rs b/transaction/src/lib.rs new file mode 100644 index 00000000..6260d38e --- /dev/null +++ b/transaction/src/lib.rs @@ -0,0 +1,538 @@ +// Copyright 2018 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +extern crate failure; +extern crate rusqlite; + +extern crate edn; +extern crate public_traits; +#[macro_use] +extern crate core_traits; +extern crate db_traits; +extern crate mentat_core; +extern crate mentat_db; +extern crate mentat_query_algebrizer; +extern crate mentat_query_projector; +extern crate mentat_query_pull; +extern crate mentat_sql; + +use std::sync::{ + Arc, + Mutex, +}; + +use std::io::Read; + +use std::borrow::Borrow; + +use std::collections::BTreeMap; + +use std::fs::{ + File, +}; + +use std::path::{ + Path, +}; + +use edn::{ + InternSet, + Keyword, +}; +use edn::entities::{ + TempId, + OpType, +}; + +use core_traits::{ + Attribute, + Entid, + KnownEntid, + StructuredMap, + TypedValue, + ValueType, +}; + +use public_traits::errors::{ + Result, + MentatError, +}; + +use mentat_core::{ + HasSchema, + Schema, + TxReport, + ValueRc, +}; + +use mentat_query_pull::{ + pull_attributes_for_entities, + pull_attributes_for_entity, +}; + +use mentat_db::{ + transact, + transact_terms, + InProgressObserverTransactWatcher, + PartitionMap, + TransactableValue, + TransactWatcher, + TxObservationService, +}; + +use mentat_db::internal_types::TermWithTempIds; + +use mentat_db::cache::{ + InProgressCacheTransactWatcher, + InProgressSQLiteAttributeCache, +}; + +pub mod entity_builder; +pub mod metadata; +pub mod query; + +pub use entity_builder::{ + InProgressBuilder, + TermBuilder, +}; + +pub use metadata::{ + Metadata, +}; + +use query::{ + Known, + PreparedResult, + QueryExplanation, + QueryInputs, + QueryOutput, + lookup_value_for_attribute, + lookup_values_for_attribute, + q_explain, + q_once, + q_prepare, + q_uncached, +}; + + +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub enum CacheDirection { + Forward, + Reverse, + Both, +} + +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub enum CacheAction { + Register, + Deregister, +} + +/// Represents an in-progress, not yet committed, set of changes to the store. +/// Call `commit` to commit your changes, or `rollback` to discard them. +/// A transaction is held open until you do so. +/// Your changes will be implicitly dropped along with this struct. +pub struct InProgress<'a, 'c> { + pub transaction: rusqlite::Transaction<'c>, + pub mutex: &'a Mutex, + pub generation: u64, + pub partition_map: PartitionMap, + pub schema: Schema, + pub cache: InProgressSQLiteAttributeCache, + pub use_caching: bool, + pub tx_observer: &'a Mutex, + pub tx_observer_watcher: InProgressObserverTransactWatcher, +} + +/// Represents an in-progress set of reads to the store. Just like `InProgress`, +/// which is read-write, but only allows for reads. +pub struct InProgressRead<'a, 'c> { + pub in_progress: InProgress<'a, 'c>, +} + +pub trait Queryable { + fn q_explain(&self, query: &str, inputs: T) -> Result + where T: Into>; + fn q_once(&self, query: &str, inputs: T) -> Result + where T: Into>; + fn q_prepare(&self, query: &str, inputs: T) -> PreparedResult + where T: Into>; + fn lookup_values_for_attribute(&self, entity: E, attribute: &edn::Keyword) -> Result> + where E: Into; + fn lookup_value_for_attribute(&self, entity: E, attribute: &edn::Keyword) -> Result> + where E: Into; +} + +pub trait Pullable { + fn pull_attributes_for_entities(&self, entities: E, attributes: A) -> Result>> + where E: IntoIterator, + A: IntoIterator; + fn pull_attributes_for_entity(&self, entity: Entid, attributes: A) -> Result + where A: IntoIterator; +} + +impl<'a, 'c> InProgress<'a, 'c> { + pub fn builder(self) -> InProgressBuilder<'a, 'c> { + InProgressBuilder::new(self) + } + + /// Choose whether to use in-memory caches for running queries. + pub fn use_caching(&mut self, yesno: bool) { + self.use_caching = yesno; + } + + /// If you only have a reference to an `InProgress`, you can't use the easy builder. + /// This exists so you can make your own. + pub fn transact_builder(&mut self, builder: TermBuilder) -> Result { + builder.build() + .and_then(|(terms, _tempid_set)| { + self.transact_entities(terms) + }) + } + + pub fn transact_terms(&mut self, terms: I, tempid_set: InternSet) -> Result where I: IntoIterator { + let w = InProgressTransactWatcher::new( + &mut self.tx_observer_watcher, + self.cache.transact_watcher()); + let (report, next_partition_map, next_schema, _watcher) = + transact_terms(&self.transaction, + self.partition_map.clone(), + &self.schema, + &self.schema, + w, + terms, + tempid_set)?; + self.partition_map = next_partition_map; + if let Some(schema) = next_schema { + self.schema = schema; + } + Ok(report) + } + + pub fn transact_entities(&mut self, entities: I) -> Result where I: IntoIterator> { + // We clone the partition map here, rather than trying to use a Cell or using a mutable + // reference, for two reasons: + // 1. `transact` allocates new IDs in partitions before and while doing work that might + // fail! We don't want to mutate this map on failure, so we can't just use &mut. + // 2. Even if we could roll that back, we end up putting this `PartitionMap` into our + // `Metadata` on return. If we used `Cell` or other mechanisms, we'd be using + // `Default::default` in those situations to extract the partition map, and so there + // would still be some cost. + let w = InProgressTransactWatcher::new( + &mut self.tx_observer_watcher, + self.cache.transact_watcher()); + let (report, next_partition_map, next_schema, _watcher) = + transact(&self.transaction, + self.partition_map.clone(), + &self.schema, + &self.schema, + w, + entities)?; + self.partition_map = next_partition_map; + if let Some(schema) = next_schema { + self.schema = schema; + } + Ok(report) + } + + pub fn transact(&mut self, transaction: B) -> Result where B: Borrow { + let entities = edn::parse::entities(transaction.borrow())?; + self.transact_entities(entities) + } + + pub fn import

(&mut self, path: P) -> Result + where P: AsRef { + let mut file = File::open(path)?; + let mut text: String = String::new(); + file.read_to_string(&mut text)?; + self.transact(text.as_str()) + } + + pub fn rollback(self) -> Result<()> { + self.transaction.rollback().map_err(|e| e.into()) + } + + pub fn commit(self) -> Result<()> { + // The mutex is taken during this entire method. + let mut metadata = self.mutex.lock().unwrap(); + + if self.generation != metadata.generation { + // Somebody else wrote! + // Retrying is tracked by https://github.com/mozilla/mentat/issues/357. + // This should not occur -- an attempt to take a competing IMMEDIATE transaction + // will fail with `SQLITE_BUSY`, causing this function to abort. + bail!(MentatError::UnexpectedLostTransactRace); + } + + // Commit the SQLite transaction while we hold the mutex. + self.transaction.commit()?; + + metadata.generation += 1; + metadata.partition_map = self.partition_map; + + // Update the conn's cache if we made any changes. + self.cache.commit_to(&mut metadata.attribute_cache); + + if self.schema != *(metadata.schema) { + metadata.schema = Arc::new(self.schema); + + // TODO: rebuild vocabularies and notify consumers that they've changed -- it's possible + // that a change has arrived over the wire and invalidated some local module. + // TODO: consider making vocabulary lookup lazy -- we won't need it much of the time. + } + + let txes = self.tx_observer_watcher.txes; + self.tx_observer.lock().unwrap().in_progress_did_commit(txes); + + Ok(()) + } + + pub fn cache(&mut self, + attribute: &Keyword, + cache_direction: CacheDirection, + cache_action: CacheAction) -> Result<()> { + let attribute_entid: Entid = self.schema + .attribute_for_ident(&attribute) + .ok_or_else(|| MentatError::UnknownAttribute(attribute.to_string()))?.1.into(); + + match cache_action { + CacheAction::Register => { + match cache_direction { + CacheDirection::Both => self.cache.register(&self.schema, &self.transaction, attribute_entid), + CacheDirection::Forward => self.cache.register_forward(&self.schema, &self.transaction, attribute_entid), + CacheDirection::Reverse => self.cache.register_reverse(&self.schema, &self.transaction, attribute_entid), + }.map_err(|e| e.into()) + }, + CacheAction::Deregister => { + self.cache.unregister(attribute_entid); + Ok(()) + }, + } + } + + pub fn last_tx_id(&self) -> Entid { + self.partition_map[":db.part/tx"].next_entid() - 1 + } +} + +impl<'a, 'c> InProgressRead<'a, 'c> { + pub fn last_tx_id(&self) -> Entid { + self.in_progress.last_tx_id() + } +} + + +impl<'a, 'c> Queryable for InProgressRead<'a, 'c> { + fn q_once(&self, query: &str, inputs: T) -> Result + where T: Into> { + self.in_progress.q_once(query, inputs) + } + + fn q_prepare(&self, query: &str, inputs: T) -> PreparedResult + where T: Into> { + self.in_progress.q_prepare(query, inputs) + } + + fn q_explain(&self, query: &str, inputs: T) -> Result + where T: Into> { + self.in_progress.q_explain(query, inputs) + } + + fn lookup_values_for_attribute(&self, entity: E, attribute: &edn::Keyword) -> Result> + where E: Into { + self.in_progress.lookup_values_for_attribute(entity, attribute) + } + + fn lookup_value_for_attribute(&self, entity: E, attribute: &edn::Keyword) -> Result> + where E: Into { + self.in_progress.lookup_value_for_attribute(entity, attribute) + } +} + +impl<'a, 'c> Pullable for InProgressRead<'a, 'c> { + fn pull_attributes_for_entities(&self, entities: E, attributes: A) -> Result>> + where E: IntoIterator, + A: IntoIterator { + self.in_progress.pull_attributes_for_entities(entities, attributes) + } + + fn pull_attributes_for_entity(&self, entity: Entid, attributes: A) -> Result + where A: IntoIterator { + self.in_progress.pull_attributes_for_entity(entity, attributes) + } +} + +impl<'a, 'c> Queryable for InProgress<'a, 'c> { + fn q_once(&self, query: &str, inputs: T) -> Result + where T: Into> { + + if self.use_caching { + let known = Known::new(&self.schema, Some(&self.cache)); + q_once(&*(self.transaction), + known, + query, + inputs) + } else { + q_uncached(&*(self.transaction), + &self.schema, + query, + inputs) + } + } + + fn q_prepare(&self, query: &str, inputs: T) -> PreparedResult + where T: Into> { + + let known = Known::new(&self.schema, Some(&self.cache)); + q_prepare(&*(self.transaction), + known, + query, + inputs) + } + + fn q_explain(&self, query: &str, inputs: T) -> Result + where T: Into> { + + let known = Known::new(&self.schema, Some(&self.cache)); + q_explain(&*(self.transaction), + known, + query, + inputs) + } + + fn lookup_values_for_attribute(&self, entity: E, attribute: &edn::Keyword) -> Result> + where E: Into { + let known = Known::new(&self.schema, Some(&self.cache)); + lookup_values_for_attribute(&*(self.transaction), known, entity, attribute) + } + + fn lookup_value_for_attribute(&self, entity: E, attribute: &edn::Keyword) -> Result> + where E: Into { + let known = Known::new(&self.schema, Some(&self.cache)); + lookup_value_for_attribute(&*(self.transaction), known, entity, attribute) + } +} + +impl<'a, 'c> Pullable for InProgress<'a, 'c> { + fn pull_attributes_for_entities(&self, entities: E, attributes: A) -> Result>> + where E: IntoIterator, + A: IntoIterator { + pull_attributes_for_entities(&self.schema, &*(self.transaction), entities, attributes) + .map_err(|e| e.into()) + } + + fn pull_attributes_for_entity(&self, entity: Entid, attributes: A) -> Result + where A: IntoIterator { + pull_attributes_for_entity(&self.schema, &*(self.transaction), entity, attributes) + .map_err(|e| e.into()) + } +} + +impl<'a, 'c> HasSchema for InProgressRead<'a, 'c> { + fn entid_for_type(&self, t: ValueType) -> Option { + self.in_progress.entid_for_type(t) + } + + fn get_ident(&self, x: T) -> Option<&Keyword> where T: Into { + self.in_progress.get_ident(x) + } + + fn get_entid(&self, x: &Keyword) -> Option { + self.in_progress.get_entid(x) + } + + fn attribute_for_entid(&self, x: T) -> Option<&Attribute> where T: Into { + self.in_progress.attribute_for_entid(x) + } + + fn attribute_for_ident(&self, ident: &Keyword) -> Option<(&Attribute, KnownEntid)> { + self.in_progress.attribute_for_ident(ident) + } + + /// Return true if the provided entid identifies an attribute in this schema. + fn is_attribute(&self, x: T) -> bool where T: Into { + self.in_progress.is_attribute(x) + } + + /// Return true if the provided ident identifies an attribute in this schema. + fn identifies_attribute(&self, x: &Keyword) -> bool { + self.in_progress.identifies_attribute(x) + } + + fn component_attributes(&self) -> &[Entid] { + self.in_progress.component_attributes() + } +} + +impl<'a, 'c> HasSchema for InProgress<'a, 'c> { + fn entid_for_type(&self, t: ValueType) -> Option { + self.schema.entid_for_type(t) + } + + fn get_ident(&self, x: T) -> Option<&Keyword> where T: Into { + self.schema.get_ident(x) + } + + fn get_entid(&self, x: &Keyword) -> Option { + self.schema.get_entid(x) + } + + fn attribute_for_entid(&self, x: T) -> Option<&Attribute> where T: Into { + self.schema.attribute_for_entid(x) + } + + fn attribute_for_ident(&self, ident: &Keyword) -> Option<(&Attribute, KnownEntid)> { + self.schema.attribute_for_ident(ident) + } + + /// Return true if the provided entid identifies an attribute in this schema. + fn is_attribute(&self, x: T) -> bool where T: Into { + self.schema.is_attribute(x) + } + + /// Return true if the provided ident identifies an attribute in this schema. + fn identifies_attribute(&self, x: &Keyword) -> bool { + self.schema.identifies_attribute(x) + } + + fn component_attributes(&self) -> &[Entid] { + self.schema.component_attributes() + } +} + +struct InProgressTransactWatcher<'a, 'o> { + cache_watcher: InProgressCacheTransactWatcher<'a>, + observer_watcher: &'o mut InProgressObserverTransactWatcher, + tx_id: Option, +} + +impl<'a, 'o> InProgressTransactWatcher<'a, 'o> { + fn new(observer_watcher: &'o mut InProgressObserverTransactWatcher, cache_watcher: InProgressCacheTransactWatcher<'a>) -> Self { + InProgressTransactWatcher { + cache_watcher: cache_watcher, + observer_watcher: observer_watcher, + tx_id: None, + } + } +} + +impl<'a, 'o> TransactWatcher for InProgressTransactWatcher<'a, 'o> { + fn datom(&mut self, op: OpType, e: Entid, a: Entid, v: &TypedValue) { + self.cache_watcher.datom(op.clone(), e.clone(), a.clone(), v); + self.observer_watcher.datom(op.clone(), e.clone(), a.clone(), v); + } + + fn done(&mut self, t: &Entid, schema: &Schema) -> ::db_traits::errors::Result<()> { + self.cache_watcher.done(t, schema)?; + self.observer_watcher.done(t, schema)?; + self.tx_id = Some(t.clone()); + Ok(()) + } +} diff --git a/transaction/src/metadata.rs b/transaction/src/metadata.rs new file mode 100644 index 00000000..79946497 --- /dev/null +++ b/transaction/src/metadata.rs @@ -0,0 +1,41 @@ +/// Connection metadata required to query from, or apply transactions to, a Mentat store. +/// +/// Owned data for the volatile parts (generation and partition map), and `Arc` for the infrequently +/// changing parts (schema) that we want to share across threads. +/// +/// See https://github.com/mozilla/mentat/wiki/Thoughts:-modeling-db-conn-in-Rust. + +use std::sync::{ + Arc, +}; + +use mentat_core::{ + Schema, +}; + +use mentat_db::{ + PartitionMap, +}; + +use mentat_db::cache::{ + SQLiteAttributeCache, +}; + +pub struct Metadata { + pub generation: u64, + pub partition_map: PartitionMap, + pub schema: Arc, + pub attribute_cache: SQLiteAttributeCache, +} + +impl Metadata { + // Intentionally not public. + pub fn new(generation: u64, partition_map: PartitionMap, schema: Arc, cache: SQLiteAttributeCache) -> Metadata { + Metadata { + generation: generation, + partition_map: partition_map, + schema: schema, + attribute_cache: cache, + } + } +} diff --git a/src/query.rs b/transaction/src/query.rs similarity index 100% rename from src/query.rs rename to transaction/src/query.rs