diff --git a/src/conn.rs b/src/conn.rs index 1462f73f..cc21bf85 100644 --- a/src/conn.rs +++ b/src/conn.rs @@ -134,7 +134,6 @@ impl Conn { /// Prepare the provided SQLite handle for use as a Mentat store. Creates tables but /// _does not_ write the bootstrap schema. This constructor should only be used by /// consumers that expect to populate raw transaction data themselves. - pub(crate) fn empty(sqlite: &mut rusqlite::Connection) -> Result { let (tx, db) = db::create_empty_current_version(sqlite)?; tx.commit()?; @@ -276,7 +275,7 @@ impl Conn { } /// Take a SQLite transaction. - fn begin_transaction_with_behavior<'m, 'conn>(&'m mut self, sqlite: &'conn mut rusqlite::Connection, behavior: TransactionBehavior) -> Result> { + fn begin_transaction_with_behavior<'m, 'conn>(&'m self, sqlite: &'conn mut rusqlite::Connection, behavior: TransactionBehavior) -> Result> { let tx = sqlite.transaction_with_behavior(behavior)?; let (current_generation, current_partition_map, current_schema, cache_cow) = { @@ -305,12 +304,12 @@ impl Conn { // Helper to avoid passing connections around. // Make both args mutable so that we can't have parallel access. - pub fn begin_read<'m, 'conn>(&'m mut self, sqlite: &'conn mut rusqlite::Connection) -> Result> { + pub fn begin_read<'m, 'conn>(&'m self, sqlite: &'conn mut rusqlite::Connection) -> Result> { self.begin_transaction_with_behavior(sqlite, TransactionBehavior::Deferred) .map(|ip| InProgressRead { in_progress: ip }) } - pub fn begin_uncached_read<'m, 'conn>(&'m mut self, sqlite: &'conn mut rusqlite::Connection) -> Result> { + pub fn begin_uncached_read<'m, 'conn>(&'m self, sqlite: &'conn mut rusqlite::Connection) -> Result> { self.begin_transaction_with_behavior(sqlite, TransactionBehavior::Deferred) .map(|mut ip| { ip.use_caching(false); @@ -322,20 +321,20 @@ impl Conn { /// connections from taking immediate or exclusive transactions. This is appropriate for our /// writes and `InProgress`: it means we are ready to write whenever we want to, and nobody else /// can start a transaction that's not `DEFERRED`, but we don't need exclusivity yet. - pub fn begin_transaction<'m, 'conn>(&'m mut self, sqlite: &'conn mut rusqlite::Connection) -> Result> { + pub fn begin_transaction<'m, 'conn>(&'m self, sqlite: &'conn mut rusqlite::Connection) -> Result> { self.begin_transaction_with_behavior(sqlite, TransactionBehavior::Immediate) } /// Transact entities against the Mentat store, using the given connection and the current /// metadata. - pub fn transact(&mut self, + pub fn transact(&self, sqlite: &mut rusqlite::Connection, - transaction: B) -> Result where B: Borrow { + transaction: T) -> Result where T: AsRef { // Parse outside the SQL transaction. This is a tradeoff: we are limiting the scope of the // transaction, and indeed we don't even create a SQL transaction if the provided input is // invalid, but it means SQLite errors won't be found until the parse is complete, and if // there's a race for the database (don't do that!) we are less likely to win it. - let entities = edn::parse::entities(transaction.borrow())?; + let entities = edn::parse::entities(transaction.as_ref())?; let mut in_progress = self.begin_transaction(sqlite)?; let report = in_progress.transact_entities(entities)?; @@ -349,7 +348,7 @@ impl Conn { /// `cache_action` determines if the attribute should be added or removed from the cache. /// CacheAction::Add is idempotent - each attribute is only added once. /// CacheAction::Remove throws an error if the attribute does not currently exist in the cache. - pub fn cache(&mut self, + pub fn cache(&self, sqlite: &mut rusqlite::Connection, schema: &Schema, attribute: &Keyword, @@ -381,11 +380,11 @@ impl Conn { } } - pub fn register_observer(&mut self, key: String, observer: Arc) { + pub fn register_observer(&self, key: String, observer: Arc) { self.tx_observer_service.lock().unwrap().register(key, observer); } - pub fn unregister_observer(&mut self, key: &String) { + pub fn unregister_observer(&self, key: &String) { self.tx_observer_service.lock().unwrap().deregister(key); } } @@ -428,7 +427,7 @@ mod tests { #[test] fn test_transact_does_not_collide_existing_entids() { let mut sqlite = db::new_connection("").unwrap(); - let mut conn = Conn::connect(&mut sqlite).unwrap(); + let conn = Conn::connect(&mut sqlite).unwrap(); // Let's find out the next ID that'll be allocated. We're going to try to collide with it // a bit later. @@ -454,7 +453,7 @@ mod tests { #[test] fn test_transact_does_not_collide_new_entids() { let mut sqlite = db::new_connection("").unwrap(); - let mut conn = Conn::connect(&mut sqlite).unwrap(); + let conn = Conn::connect(&mut sqlite).unwrap(); // Let's find out the next ID that'll be allocated. We're going to try to collide with it. let next = conn.metadata.lock().expect("metadata").partition_map[":db.part/user"].next_entid(); @@ -488,7 +487,7 @@ mod tests { #[test] fn test_compound_transact() { let mut sqlite = db::new_connection("").unwrap(); - let mut conn = Conn::connect(&mut sqlite).unwrap(); + let conn = Conn::connect(&mut sqlite).unwrap(); let tempid_offset = get_next_entid(&conn); @@ -529,7 +528,7 @@ mod tests { #[test] fn test_simple_prepared_query() { let mut c = db::new_connection("").expect("Couldn't open conn."); - let mut conn = Conn::connect(&mut c).expect("Couldn't open DB."); + let conn = Conn::connect(&mut c).expect("Couldn't open DB."); conn.transact(&mut c, r#"[ [:db/add "s" :db/ident :foo/boolean] [:db/add "s" :db/valueType :db.type/boolean] @@ -566,7 +565,7 @@ mod tests { #[test] fn test_compound_rollback() { let mut sqlite = db::new_connection("").unwrap(); - let mut conn = Conn::connect(&mut sqlite).unwrap(); + let conn = Conn::connect(&mut sqlite).unwrap(); let tempid_offset = get_next_entid(&conn); @@ -616,7 +615,7 @@ mod tests { #[test] fn test_transact_errors() { let mut sqlite = db::new_connection("").unwrap(); - let mut conn = Conn::connect(&mut sqlite).unwrap(); + let conn = Conn::connect(&mut sqlite).unwrap(); // Good: empty transaction. let report = conn.transact(&mut sqlite, "[]").unwrap(); @@ -661,7 +660,7 @@ mod tests { #[test] fn test_add_to_cache_failure_no_attribute() { let mut sqlite = db::new_connection("").unwrap(); - let mut conn = Conn::connect(&mut sqlite).unwrap(); + let conn = Conn::connect(&mut sqlite).unwrap(); let _report = conn.transact(&mut sqlite, r#"[ { :db/ident :foo/bar :db/valueType :db.type/long }, @@ -682,7 +681,7 @@ mod tests { fn test_lookup_attribute_with_caching() { let mut sqlite = db::new_connection("").unwrap(); - let mut conn = Conn::connect(&mut sqlite).unwrap(); + let conn = Conn::connect(&mut sqlite).unwrap(); let _report = conn.transact(&mut sqlite, r#"[ { :db/ident :foo/bar :db/valueType :db.type/long }, @@ -741,7 +740,7 @@ mod tests { #[test] fn test_cache_usage() { let mut sqlite = db::new_connection("").unwrap(); - let mut conn = Conn::connect(&mut sqlite).unwrap(); + let conn = Conn::connect(&mut sqlite).unwrap(); let db_ident = (*conn.current_schema()).get_entid(&kw!(:db/ident)).expect("db_ident").0; let db_type = (*conn.current_schema()).get_entid(&kw!(:db/valueType)).expect("db_ident").0; diff --git a/tests/query.rs b/tests/query.rs index 819835ad..5a4de205 100644 --- a/tests/query.rs +++ b/tests/query.rs @@ -254,7 +254,7 @@ fn test_instants_and_uuids() { let start = Utc::now() + FixedOffset::west(60 * 60); let mut c = new_connection("").expect("Couldn't open conn."); - let mut conn = Conn::connect(&mut c).expect("Couldn't open DB."); + let conn = Conn::connect(&mut c).expect("Couldn't open DB."); conn.transact(&mut c, r#"[ [:db/add "s" :db/ident :foo/uuid] [:db/add "s" :db/valueType :db.type/uuid] @@ -291,7 +291,7 @@ fn test_instants_and_uuids() { #[test] fn test_tx() { let mut c = new_connection("").expect("Couldn't open conn."); - let mut conn = Conn::connect(&mut c).expect("Couldn't open DB."); + let conn = Conn::connect(&mut c).expect("Couldn't open DB."); conn.transact(&mut c, r#"[ [:db/add "s" :db/ident :foo/uuid] [:db/add "s" :db/valueType :db.type/uuid] @@ -324,7 +324,7 @@ fn test_tx() { #[test] fn test_tx_as_input() { let mut c = new_connection("").expect("Couldn't open conn."); - let mut conn = Conn::connect(&mut c).expect("Couldn't open DB."); + let conn = Conn::connect(&mut c).expect("Couldn't open DB."); conn.transact(&mut c, r#"[ [:db/add "s" :db/ident :foo/uuid] [:db/add "s" :db/valueType :db.type/uuid] @@ -361,7 +361,7 @@ fn test_tx_as_input() { #[test] fn test_fulltext() { let mut c = new_connection("").expect("Couldn't open conn."); - let mut conn = Conn::connect(&mut c).expect("Couldn't open DB."); + let conn = Conn::connect(&mut c).expect("Couldn't open DB."); conn.transact(&mut c, r#"[ [:db/add "a" :db/ident :foo/term] @@ -467,7 +467,7 @@ fn test_fulltext() { #[test] fn test_instant_range_query() { let mut c = new_connection("").expect("Couldn't open conn."); - let mut conn = Conn::connect(&mut c).expect("Couldn't open DB."); + let conn = Conn::connect(&mut c).expect("Couldn't open DB."); conn.transact(&mut c, r#"[ [:db/add "a" :db/ident :foo/date] @@ -503,7 +503,7 @@ fn test_instant_range_query() { #[test] fn test_lookup() { let mut c = new_connection("").expect("Couldn't open conn."); - let mut conn = Conn::connect(&mut c).expect("Couldn't open DB."); + let conn = Conn::connect(&mut c).expect("Couldn't open DB."); conn.transact(&mut c, r#"[ [:db/add "a" :db/ident :foo/date] @@ -656,7 +656,7 @@ fn test_aggregates_type_handling() { #[test] fn test_type_reqs() { let mut c = new_connection("").expect("Couldn't open conn."); - let mut conn = Conn::connect(&mut c).expect("Couldn't open DB."); + let conn = Conn::connect(&mut c).expect("Couldn't open DB."); conn.transact(&mut c, r#"[ {:db/ident :test/boolean :db/valueType :db.type/boolean :db/cardinality :db.cardinality/one} @@ -1526,3 +1526,63 @@ fn test_encrypted() { // so the specific test we use doesn't matter that much. run_tx_data_test(Store::open_with_key("", "secret").expect("opened")); } + +#[test] +fn test_conn_cross_thread() { + let file = "file:memdb?mode=memory&cache=shared"; + + let mut sqlite = mentat_db::db::new_connection(file).expect("Couldn't open in-memory db"); + let conn = Conn::connect(&mut sqlite).expect("to connect"); + + conn.transact(&mut sqlite, r#"[ + [:db/add "a" :db/ident :foo/term] + [:db/add "a" :db/valueType :db.type/string] + [:db/add "a" :db/fulltext false] + [:db/add "a" :db/cardinality :db.cardinality/many] + ]"#).unwrap(); + + let _tx1 = conn.transact(&mut sqlite, r#"[ + [:db/add "e" :foo/term "1"] + ]"#).expect("tx1 to apply"); + + let _tx2 = conn.transact(&mut sqlite, r#"[ + [:db/add "e" :foo/term "2"] + ]"#).expect("tx2 to apply"); + + use std::sync::Arc; + use std::sync::mpsc::channel; + + let conn = Arc::new(conn); + + let (tx1, rx1) = channel(); + let (txs, rxs) = channel(); + let (tx2, rx2) = channel(); + + std::thread::spawn(move || { + let shared_conn: Arc = rx1.recv().expect("rx1"); + let mut sqlite1 = mentat_db::db::new_connection(file).expect("Couldn't open in-memory db"); + + shared_conn.transact(&mut sqlite1, r#"[ + [:db/add "a" :db/ident :foo/bar] + [:db/add "a" :db/valueType :db.type/long] + [:db/add "a" :db/cardinality :db.cardinality/many] + ]"#).unwrap(); + + txs.send(()).expect("to sync"); + }); + + tx1.send(conn.clone()).expect("tx1"); + + rxs.recv().expect("to sync"); + + std::thread::spawn(move || { + let shared_conn: Arc = rx2.recv().expect("rx1"); + let mut sqlite2 = mentat_db::db::new_connection(file).expect("Couldn't open in-memory db"); + + assert_eq!(None, shared_conn.current_schema().get_entid(&Keyword::namespaced("foo", "bar"))); + + shared_conn.q_once(&mut sqlite2, "[:find ?e :where [?e _ _]]", None).expect("to prepare"); + }); + + tx2.send(conn.clone()).expect("tx2"); +} diff --git a/tests/vocabulary.rs b/tests/vocabulary.rs index 979a4a24..f56421b9 100644 --- a/tests/vocabulary.rs +++ b/tests/vocabulary.rs @@ -186,7 +186,7 @@ fn test_add_vocab() { let foo_v1_b = vocabulary::Definition::new(kw!(:org.mozilla/foo), 1, bar_and_baz.clone()); let mut sqlite = mentat_db::db::new_connection("").unwrap(); - let mut conn = Conn::connect(&mut sqlite).unwrap(); + let conn = Conn::connect(&mut sqlite).unwrap(); let foo_version_query = r#"[:find [?version ?aa] :where diff --git a/transaction/src/entity_builder.rs b/transaction/src/entity_builder.rs index af4ac1ac..dcfaf61a 100644 --- a/transaction/src/entity_builder.rs +++ b/transaction/src/entity_builder.rs @@ -276,3 +276,157 @@ impl<'a, 'c> EntityBuilder> { self.finish().0.commit() } } + +#[cfg(test)] +mod testing { + extern crate mentat_db; + + use ::{ + Conn, + Entid, + HasSchema, + KnownEntid, + MentatError, + Queryable, + TxReport, + TypedValue, + }; + + use super::*; + + // In reality we expect the store to hand these out safely. + fn fake_known_entid(e: Entid) -> KnownEntid { + KnownEntid(e) + } + + #[test] + fn test_entity_builder_bogus_entids() { + let mut builder = TermBuilder::new(); + let e = builder.named_tempid("x"); + let a1 = fake_known_entid(37); // :db/doc + let a2 = fake_known_entid(999); + let v = TypedValue::typed_string("Some attribute"); + let ve = fake_known_entid(12345); + + builder.add(e.clone(), a1, v).expect("add succeeded"); + builder.add(e.clone(), a2, e.clone()).expect("add succeeded, even though it's meaningless"); + builder.add(e.clone(), a2, ve).expect("add succeeded, even though it's meaningless"); + let (terms, tempids) = builder.build().expect("build succeeded"); + + assert_eq!(tempids.len(), 1); + assert_eq!(terms.len(), 3); // TODO: check the contents? + + // Now try to add them to a real store. + let mut sqlite = mentat_db::db::new_connection("").unwrap(); + let conn = Conn::connect(&mut sqlite).unwrap(); + let mut in_progress = conn.begin_transaction(&mut sqlite).expect("begun successfully"); + + // This should fail: unrecognized entid. + match in_progress.transact_entities(terms).expect_err("expected transact to fail") { + MentatError::DbError(e) => { + assert_eq!(e.kind(), mentat_db::DbErrorKind::UnrecognizedEntid(999)); + }, + _ => panic!("Should have rejected the entid."), + } + } + + #[test] + fn test_in_progress_builder() { + let mut sqlite = mentat_db::db::new_connection("").unwrap(); + let conn = Conn::connect(&mut sqlite).unwrap(); + + // Give ourselves a schema to work with! + conn.transact(&mut sqlite, r#"[ + [:db/add "o" :db/ident :foo/one] + [:db/add "o" :db/valueType :db.type/long] + [:db/add "o" :db/cardinality :db.cardinality/one] + [:db/add "m" :db/ident :foo/many] + [:db/add "m" :db/valueType :db.type/string] + [:db/add "m" :db/cardinality :db.cardinality/many] + [:db/add "r" :db/ident :foo/ref] + [:db/add "r" :db/valueType :db.type/ref] + [:db/add "r" :db/cardinality :db.cardinality/one] + ]"#).unwrap(); + + let in_progress = conn.begin_transaction(&mut sqlite).expect("begun successfully"); + + // We can use this or not! + let a_many = in_progress.get_entid(&kw!(:foo/many)).expect(":foo/many"); + + let mut builder = in_progress.builder(); + let e_x = builder.named_tempid("x"); + let v_many_1 = TypedValue::typed_string("Some text"); + let v_many_2 = TypedValue::typed_string("Other text"); + builder.add(e_x.clone(), kw!(:foo/many), v_many_1).expect("add succeeded"); + builder.add(e_x.clone(), a_many, v_many_2).expect("add succeeded"); + builder.commit().expect("commit succeeded"); + } + + #[test] + fn test_entity_builder() { + let mut sqlite = mentat_db::db::new_connection("").unwrap(); + let conn = Conn::connect(&mut sqlite).unwrap(); + + let foo_one = kw!(:foo/one); + let foo_many = kw!(:foo/many); + let foo_ref = kw!(:foo/ref); + let report: TxReport; + + // Give ourselves a schema to work with! + // Scoped borrow of conn. + { + conn.transact(&mut sqlite, r#"[ + [:db/add "o" :db/ident :foo/one] + [:db/add "o" :db/valueType :db.type/long] + [:db/add "o" :db/cardinality :db.cardinality/one] + [:db/add "m" :db/ident :foo/many] + [:db/add "m" :db/valueType :db.type/string] + [:db/add "m" :db/cardinality :db.cardinality/many] + [:db/add "r" :db/ident :foo/ref] + [:db/add "r" :db/valueType :db.type/ref] + [:db/add "r" :db/cardinality :db.cardinality/one] + ]"#).unwrap(); + + let mut in_progress = conn.begin_transaction(&mut sqlite).expect("begun successfully"); + + // Scoped borrow of in_progress. + { + let mut builder = TermBuilder::new(); + let e_x = builder.named_tempid("x"); + let e_y = builder.named_tempid("y"); + let a_ref = in_progress.get_entid(&foo_ref).expect(":foo/ref"); + let a_one = in_progress.get_entid(&foo_one).expect(":foo/one"); + let a_many = in_progress.get_entid(&foo_many).expect(":foo/many"); + let v_many_1 = TypedValue::typed_string("Some text"); + let v_many_2 = TypedValue::typed_string("Other text"); + let v_long: TypedValue = 123.into(); + + builder.add(e_x.clone(), a_many, v_many_1).expect("add succeeded"); + builder.add(e_x.clone(), a_many, v_many_2).expect("add succeeded"); + builder.add(e_y.clone(), a_ref, e_x.clone()).expect("add succeeded"); + builder.add(e_x.clone(), a_one, v_long).expect("add succeeded"); + + let (terms, tempids) = builder.build().expect("build succeeded"); + + assert_eq!(tempids.len(), 2); + assert_eq!(terms.len(), 4); + + report = in_progress.transact_entities(terms).expect("add succeeded"); + let x = report.tempids.get("x").expect("our tempid has an ID"); + let y = report.tempids.get("y").expect("our tempid has an ID"); + assert_eq!(in_progress.lookup_value_for_attribute(*y, &foo_ref).expect("lookup succeeded"), + Some(TypedValue::Ref(*x))); + assert_eq!(in_progress.lookup_value_for_attribute(*x, &foo_one).expect("lookup succeeded"), + Some(TypedValue::Long(123))); + } + + in_progress.commit().expect("commit succeeded"); + } + + // It's all still there after the commit. + let x = report.tempids.get("x").expect("our tempid has an ID"); + let y = report.tempids.get("y").expect("our tempid has an ID"); + assert_eq!(conn.lookup_value_for_attribute(&mut sqlite, *y, &foo_ref).expect("lookup succeeded"), + Some(TypedValue::Ref(*x))); + } +}