diff --git a/Cargo.toml b/Cargo.toml index 9aaa8af1..161e53ca 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -60,6 +60,9 @@ path = "query-sql" [dependencies.mentat_query_translator] path = "query-translator" +[dependencies.mentat_tx] +path = "tx" + [dependencies.mentat_tx_parser] path = "tx-parser" diff --git a/db/src/bootstrap.rs b/db/src/bootstrap.rs index 22af6bb0..3a864f29 100644 --- a/db/src/bootstrap.rs +++ b/db/src/bootstrap.rs @@ -32,6 +32,9 @@ use types::{Partition, PartitionMap}; /// This is the start of the :db.part/tx partition. pub const TX0: i64 = 0x10000000; +/// This is the start of the :db.part/user partition. +pub const USER0: i64 = 0x10000; + lazy_static! { static ref V1_IDENTS: Vec<(symbols::NamespacedKeyword, i64)> = { vec![(ns_keyword!("db", "ident"), entids::DB_IDENT), @@ -78,7 +81,7 @@ lazy_static! { static ref V1_PARTS: Vec<(symbols::NamespacedKeyword, i64, i64)> = { vec![(ns_keyword!("db.part", "db"), 0, (1 + V1_IDENTS.len()) as i64), - (ns_keyword!("db.part", "user"), 0x10000, 0x10000), + (ns_keyword!("db.part", "user"), USER0, USER0), (ns_keyword!("db.part", "tx"), TX0, TX0), ] }; diff --git a/db/src/db.rs b/db/src/db.rs index a05d55df..8b430548 100644 --- a/db/src/db.rs +++ b/db/src/db.rs @@ -21,8 +21,9 @@ use std::rc::Rc; use itertools; use itertools::Itertools; use rusqlite; -use rusqlite::types::{ToSql, ToSqlOutput}; +use rusqlite::TransactionBehavior; use rusqlite::limits::Limit; +use rusqlite::types::{ToSql, ToSqlOutput}; use ::{repeat_values, to_namespaced_keyword}; use bootstrap; @@ -208,7 +209,7 @@ fn get_user_version(conn: &rusqlite::Connection) -> Result { // TODO: rename "SQL" functions to align with "datoms" functions. pub fn create_current_version(conn: &mut rusqlite::Connection) -> Result { - let tx = conn.transaction()?; + let tx = conn.transaction_with_behavior(TransactionBehavior::Exclusive)?; for statement in (&V1_STATEMENTS).iter() { tx.execute(statement, &[])?; @@ -518,7 +519,7 @@ pub trait MentatStoring { /// /// Use this to create temporary tables, prepare indices, set pragmas, etc, before the initial /// `insert_non_fts_searches` invocation. - fn begin_transaction(&self) -> Result<()>; + fn begin_tx_application(&self) -> Result<()>; // TODO: this is not a reasonable abstraction, but I don't want to really consider non-SQL storage just yet. fn insert_non_fts_searches<'a>(&self, entities: &'a [ReducedEntity], search_type: SearchType) -> Result<()>; @@ -710,7 +711,7 @@ impl MentatStoring for rusqlite::Connection { } /// Create empty temporary tables for search parameters and search results. - fn begin_transaction(&self) -> Result<()> { + fn begin_tx_application(&self) -> Result<()> { // We can't do this in one shot, since we can't prepare a batch statement. let statements = [ r#"DROP TABLE IF EXISTS temp.exact_searches"#, @@ -1163,7 +1164,8 @@ mod tests { let details = { // The block scopes the borrow of self.sqlite. - let tx = self.sqlite.transaction()?; + // We're about to write, so go straight ahead and get an IMMEDIATE transaction. + let tx = self.sqlite.transaction_with_behavior(TransactionBehavior::Immediate)?; // Applying the transaction can fail, so we don't unwrap. let details = transact(&tx, self.partition_map.clone(), &self.schema, &self.schema, entities)?; tx.commit()?; diff --git a/db/src/lib.rs b/db/src/lib.rs index e15148f0..51af3a58 100644 --- a/db/src/lib.rs +++ b/db/src/lib.rs @@ -48,6 +48,13 @@ mod internal_types; mod upsert_resolution; mod tx; +// Export these for reference from tests. cfg(test) should work, but doesn't. +// #[cfg(test)] +pub use bootstrap::{ + TX0, + USER0, +}; + pub use db::{ TypedSQLValue, new_connection, diff --git a/db/src/tx.rs b/db/src/tx.rs index 04ede770..190a9fab 100644 --- a/db/src/tx.rs +++ b/db/src/tx.rs @@ -700,7 +700,7 @@ pub fn transact<'conn, 'a, I>( let tx_instant = ::now(); // Label the transaction with the timestamp when we first see it: leading edge. let tx_id = partition_map.allocate_entid(":db.part/tx"); - conn.begin_transaction()?; + conn.begin_tx_application()?; let mut tx = Tx::new(conn, partition_map, schema_for_mutation, schema, tx_id, tx_instant); diff --git a/query-projector/src/lib.rs b/query-projector/src/lib.rs index f7e963f0..0737cd80 100644 --- a/query-projector/src/lib.rs +++ b/query-projector/src/lib.rs @@ -27,11 +27,9 @@ use rusqlite::{ use mentat_core::{ SQLValueType, - SQLValueTypeSet, TypedValue, ValueType, ValueTypeTag, - ValueTypeSet, }; use mentat_db::{ @@ -73,7 +71,7 @@ error_chain! { } } -#[derive(Debug)] +#[derive(Debug, PartialEq, Eq)] pub enum QueryResults { Scalar(Option), Tuple(Option>), diff --git a/query-translator/src/translate.rs b/query-translator/src/translate.rs index 47019a1e..528c445f 100644 --- a/query-translator/src/translate.rs +++ b/query-translator/src/translate.rs @@ -10,7 +10,6 @@ use mentat_core::{ SQLValueType, - SQLValueTypeSet, TypedValue, ValueType, }; diff --git a/src/conn.rs b/src/conn.rs index d6820fc0..20b342b8 100644 --- a/src/conn.rs +++ b/src/conn.rs @@ -13,6 +13,9 @@ use std::sync::{Arc, Mutex}; use rusqlite; +use rusqlite::{ + TransactionBehavior, +}; use edn; @@ -27,6 +30,8 @@ use mentat_db::{ TxReport, }; +use mentat_tx; + use mentat_tx_parser; use errors::*; @@ -78,6 +83,83 @@ pub struct Conn { // the schema changes. #315. } +/// Represents an in-progress, not yet committed, set of changes to the store. +/// Call `commit` to commit your changes, or `rollback` to discard them. +/// A transaction is held open until you do so. +/// Your changes will be implicitly dropped along with this struct. +pub struct InProgress<'a, 'c> { + transaction: rusqlite::Transaction<'c>, + mutex: &'a Mutex, + generation: u64, + partition_map: PartitionMap, + schema: Schema, + last_report: Option, // For now we track only the last, but we could accumulate all. +} + +impl<'a, 'c> InProgress<'a, 'c> { + pub fn transact_entities(mut self, entities: I) -> Result> where I: IntoIterator { + let (report, next_partition_map, next_schema) = transact(&self.transaction, self.partition_map, &self.schema, &self.schema, entities)?; + self.partition_map = next_partition_map; + if let Some(schema) = next_schema { + self.schema = schema; + } + self.last_report = Some(report); + Ok(self) + } + + /// Query the Mentat store, using the given connection and the current metadata. + pub fn q_once(&self, + query: &str, + inputs: T) -> Result + where T: Into> + { + + q_once(&*(self.transaction), + &self.schema, + query, + inputs) + } + + pub fn transact(self, transaction: &str) -> Result> { + let assertion_vector = edn::parse::value(transaction)?; + let entities = mentat_tx_parser::Tx::parse(&assertion_vector)?; + self.transact_entities(entities) + } + + pub fn last_report(&self) -> Option<&TxReport> { + self.last_report.as_ref() + } + + pub fn rollback(mut self) -> Result<()> { + self.last_report = None; + self.transaction.rollback().map_err(|e| e.into()) + } + + pub fn commit(self) -> Result> { + // The mutex is taken during this entire method. + let mut metadata = self.mutex.lock().unwrap(); + + if self.generation != metadata.generation { + // Somebody else wrote! + // Retrying is tracked by https://github.com/mozilla/mentat/issues/357. + // This should not occur -- an attempt to take a competing IMMEDIATE transaction + // will fail with `SQLITE_BUSY`, causing this function to abort. + bail!("Lost the transact() race!"); + } + + // Commit the SQLite transaction while we hold the mutex. + self.transaction.commit()?; + + metadata.generation += 1; + metadata.partition_map = self.partition_map; + if self.schema != *(metadata.schema) { + metadata.schema = Arc::new(self.schema); + } + + Ok(self.last_report) + } +} + impl Conn { // Intentionally not public. fn new(partition_map: PartitionMap, schema: Schema) -> Conn { @@ -123,17 +205,8 @@ impl Conn { inputs) } - /// Transact entities against the Mentat store, using the given connection and the current - /// metadata. - pub fn transact(&mut self, - sqlite: &mut rusqlite::Connection, - transaction: &str) -> Result { - - let assertion_vector = edn::parse::value(transaction)?; - let entities = mentat_tx_parser::Tx::parse(&assertion_vector)?; - - let tx = sqlite.transaction()?; - + pub fn begin_transaction<'m, 'conn>(&'m mut self, sqlite: &'conn mut rusqlite::Connection) -> Result> { + let tx = sqlite.transaction_with_behavior(TransactionBehavior::Immediate)?; let (current_generation, current_partition_map, current_schema) = { // The mutex is taken during this block. @@ -145,28 +218,32 @@ impl Conn { current.schema.clone()) }; - // The transaction is processed while the mutex is not held. - let (report, next_partition_map, next_schema) = transact(&tx, current_partition_map, &*current_schema, &*current_schema, entities)?; + Ok(InProgress { + mutex: &self.metadata, + transaction: tx, + generation: current_generation, + partition_map: current_partition_map, + schema: (*current_schema).clone(), + last_report: None, + }) + } - { - // The mutex is taken during this block. - let mut metadata = self.metadata.lock().unwrap(); + /// Transact entities against the Mentat store, using the given connection and the current + /// metadata. + pub fn transact(&mut self, + sqlite: &mut rusqlite::Connection, + transaction: &str) -> Result { + // Parse outside the SQL transaction. This is a tradeoff: we are limiting the scope of the + // transaction, and indeed we don't even create a SQL transaction if the provided input is + // invalid, but it means SQLite errors won't be found until the parse is complete, and if + // there's a race for the database (don't do that!) we are less likely to win it. + let assertion_vector = edn::parse::value(transaction)?; + let entities = mentat_tx_parser::Tx::parse(&assertion_vector)?; - if current_generation != metadata.generation { - // Somebody else wrote! - // Retrying is tracked by https://github.com/mozilla/mentat/issues/357. - bail!("Lost the transact() race!"); - } - - // Commit the SQLite transaction while we hold the mutex. - tx.commit()?; - - metadata.generation += 1; - metadata.partition_map = next_partition_map; - if let Some(next_schema) = next_schema { - metadata.schema = Arc::new(next_schema); - } - } + let report = self.begin_transaction(sqlite)? + .transact_entities(entities)? + .commit()? + .expect("we always get a report"); Ok(report) } @@ -177,6 +254,11 @@ mod tests { use super::*; extern crate mentat_parser_utils; + use mentat_core::{ + TypedValue, + }; + + use mentat_db::USER0; #[test] fn test_transact_does_not_collide_existing_entids() { @@ -232,6 +314,98 @@ mod tests { assert_eq!(report.tempids["temp"], next); } + /// Return the entid that will be allocated to the next transacted tempid. + fn get_next_entid(conn: &Conn) -> i64 { + let partition_map = &conn.metadata.lock().unwrap().partition_map; + partition_map.get(":db.part/user").unwrap().index + } + + #[test] + fn test_compound_transact() { + let mut sqlite = db::new_connection("").unwrap(); + let mut conn = Conn::connect(&mut sqlite).unwrap(); + + let tempid_offset = get_next_entid(&conn); + + let t = "[[:db/add \"one\" :db/ident :a/keyword1] \ + [:db/add \"two\" :db/ident :a/keyword2]]"; + + // This can refer to `t`, 'cos they occur in separate txes. + let t2 = "[{:db.schema/attribute \"three\", :db/ident :a/keyword1}]"; + + // Scoped borrow of `conn`. + { + let in_progress = conn.begin_transaction(&mut sqlite).expect("begun successfully"); + let in_progress = in_progress.transact(t).expect("transacted successfully"); + let one = in_progress.last_report().unwrap().tempids.get("one").expect("found one").clone(); + let two = in_progress.last_report().unwrap().tempids.get("two").expect("found two").clone(); + assert!(one != two); + assert!(one == tempid_offset || one == tempid_offset + 1); + assert!(two == tempid_offset || two == tempid_offset + 1); + + let during = in_progress.q_once("[:find ?x . :where [?x :db/ident :a/keyword1]]", None) + .expect("query succeeded"); + assert_eq!(during, QueryResults::Scalar(Some(TypedValue::Ref(one)))); + + let report = in_progress.transact(t2) + .expect("t2 succeeded") + .commit() + .expect("commit succeeded"); + let three = report.unwrap().tempids.get("three").expect("found three").clone(); + assert!(one != three); + assert!(two != three); + } + + // The DB part table changed. + let tempid_offset_after = get_next_entid(&conn); + assert_eq!(tempid_offset + 3, tempid_offset_after); + } + + #[test] + fn test_compound_rollback() { + let mut sqlite = db::new_connection("").unwrap(); + let mut conn = Conn::connect(&mut sqlite).unwrap(); + + let tempid_offset = get_next_entid(&conn); + + // Nothing in the store => USER0 should be our starting point. + assert_eq!(tempid_offset, USER0); + + let t = "[[:db/add \"one\" :db/ident :a/keyword1] \ + [:db/add \"two\" :db/ident :a/keyword2]]"; + + // Scoped borrow of `sqlite`. + { + let in_progress = conn.begin_transaction(&mut sqlite).expect("begun successfully"); + let in_progress = in_progress.transact(t).expect("transacted successfully"); + + let one = in_progress.last_report().unwrap().tempids.get("one").expect("found it").clone(); + let two = in_progress.last_report().unwrap().tempids.get("two").expect("found it").clone(); + + // The IDs are contiguous, starting at the previous part index. + assert!(one != two); + assert!(one == tempid_offset || one == tempid_offset + 1); + assert!(two == tempid_offset || two == tempid_offset + 1); + + // Inside the InProgress we can see our changes. + let during = in_progress.q_once("[:find ?x . :where [?x :db/ident :a/keyword1]]", None) + .expect("query succeeded"); + + assert_eq!(during, QueryResults::Scalar(Some(TypedValue::Ref(one)))); + + in_progress.rollback() + .expect("rollback succeeded"); + } + + let after = conn.q_once(&mut sqlite, "[:find ?x . :where [?x :db/ident :a/keyword1]]", None) + .expect("query succeeded"); + assert_eq!(after, QueryResults::Scalar(None)); + + // The DB part table is unchanged. + let tempid_offset_after = get_next_entid(&conn); + assert_eq!(tempid_offset, tempid_offset_after); + } + #[test] fn test_transact_errors() { let mut sqlite = db::new_connection("").unwrap(); diff --git a/src/lib.rs b/src/lib.rs index d635a651..0fcf3675 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -22,6 +22,7 @@ extern crate mentat_query_parser; extern crate mentat_query_projector; extern crate mentat_query_translator; extern crate mentat_sql; +extern crate mentat_tx; extern crate mentat_tx_parser; use rusqlite::Connection;