Fold in @ncalexan's changes to make Conn manage its own mutability.

ref: https://github.com/ncalexan/mentat/tree/stores/src
This commit is contained in:
Emily Toop 2018-07-04 13:52:07 +01:00
parent e3113783ae
commit d056c7cc10
4 changed files with 241 additions and 28 deletions

View file

@ -134,7 +134,6 @@ impl Conn {
/// Prepare the provided SQLite handle for use as a Mentat store. Creates tables but
/// _does not_ write the bootstrap schema. This constructor should only be used by
/// consumers that expect to populate raw transaction data themselves.
pub(crate) fn empty(sqlite: &mut rusqlite::Connection) -> Result<Conn> {
let (tx, db) = db::create_empty_current_version(sqlite)?;
tx.commit()?;
@ -276,7 +275,7 @@ impl Conn {
}
/// Take a SQLite transaction.
fn begin_transaction_with_behavior<'m, 'conn>(&'m mut self, sqlite: &'conn mut rusqlite::Connection, behavior: TransactionBehavior) -> Result<InProgress<'m, 'conn>> {
fn begin_transaction_with_behavior<'m, 'conn>(&'m self, sqlite: &'conn mut rusqlite::Connection, behavior: TransactionBehavior) -> Result<InProgress<'m, 'conn>> {
let tx = sqlite.transaction_with_behavior(behavior)?;
let (current_generation, current_partition_map, current_schema, cache_cow) =
{
@ -305,12 +304,12 @@ impl Conn {
// Helper to avoid passing connections around.
// Make both args mutable so that we can't have parallel access.
pub fn begin_read<'m, 'conn>(&'m mut self, sqlite: &'conn mut rusqlite::Connection) -> Result<InProgressRead<'m, 'conn>> {
pub fn begin_read<'m, 'conn>(&'m self, sqlite: &'conn mut rusqlite::Connection) -> Result<InProgressRead<'m, 'conn>> {
self.begin_transaction_with_behavior(sqlite, TransactionBehavior::Deferred)
.map(|ip| InProgressRead { in_progress: ip })
}
pub fn begin_uncached_read<'m, 'conn>(&'m mut self, sqlite: &'conn mut rusqlite::Connection) -> Result<InProgressRead<'m, 'conn>> {
pub fn begin_uncached_read<'m, 'conn>(&'m self, sqlite: &'conn mut rusqlite::Connection) -> Result<InProgressRead<'m, 'conn>> {
self.begin_transaction_with_behavior(sqlite, TransactionBehavior::Deferred)
.map(|mut ip| {
ip.use_caching(false);
@ -322,20 +321,20 @@ impl Conn {
/// connections from taking immediate or exclusive transactions. This is appropriate for our
/// writes and `InProgress`: it means we are ready to write whenever we want to, and nobody else
/// can start a transaction that's not `DEFERRED`, but we don't need exclusivity yet.
pub fn begin_transaction<'m, 'conn>(&'m mut self, sqlite: &'conn mut rusqlite::Connection) -> Result<InProgress<'m, 'conn>> {
pub fn begin_transaction<'m, 'conn>(&'m self, sqlite: &'conn mut rusqlite::Connection) -> Result<InProgress<'m, 'conn>> {
self.begin_transaction_with_behavior(sqlite, TransactionBehavior::Immediate)
}
/// Transact entities against the Mentat store, using the given connection and the current
/// metadata.
pub fn transact<B>(&mut self,
pub fn transact<T>(&self,
sqlite: &mut rusqlite::Connection,
transaction: B) -> Result<TxReport> where B: Borrow<str> {
transaction: T) -> Result<TxReport> where T: AsRef<str> {
// Parse outside the SQL transaction. This is a tradeoff: we are limiting the scope of the
// transaction, and indeed we don't even create a SQL transaction if the provided input is
// invalid, but it means SQLite errors won't be found until the parse is complete, and if
// there's a race for the database (don't do that!) we are less likely to win it.
let entities = edn::parse::entities(transaction.borrow())?;
let entities = edn::parse::entities(transaction.as_ref())?;
let mut in_progress = self.begin_transaction(sqlite)?;
let report = in_progress.transact_entities(entities)?;
@ -349,7 +348,7 @@ impl Conn {
/// `cache_action` determines if the attribute should be added or removed from the cache.
/// CacheAction::Add is idempotent - each attribute is only added once.
/// CacheAction::Remove throws an error if the attribute does not currently exist in the cache.
pub fn cache(&mut self,
pub fn cache(&self,
sqlite: &mut rusqlite::Connection,
schema: &Schema,
attribute: &Keyword,
@ -381,11 +380,11 @@ impl Conn {
}
}
pub fn register_observer(&mut self, key: String, observer: Arc<TxObserver>) {
pub fn register_observer(&self, key: String, observer: Arc<TxObserver>) {
self.tx_observer_service.lock().unwrap().register(key, observer);
}
pub fn unregister_observer(&mut self, key: &String) {
pub fn unregister_observer(&self, key: &String) {
self.tx_observer_service.lock().unwrap().deregister(key);
}
}
@ -428,7 +427,7 @@ mod tests {
#[test]
fn test_transact_does_not_collide_existing_entids() {
let mut sqlite = db::new_connection("").unwrap();
let mut conn = Conn::connect(&mut sqlite).unwrap();
let conn = Conn::connect(&mut sqlite).unwrap();
// Let's find out the next ID that'll be allocated. We're going to try to collide with it
// a bit later.
@ -454,7 +453,7 @@ mod tests {
#[test]
fn test_transact_does_not_collide_new_entids() {
let mut sqlite = db::new_connection("").unwrap();
let mut conn = Conn::connect(&mut sqlite).unwrap();
let conn = Conn::connect(&mut sqlite).unwrap();
// Let's find out the next ID that'll be allocated. We're going to try to collide with it.
let next = conn.metadata.lock().expect("metadata").partition_map[":db.part/user"].next_entid();
@ -488,7 +487,7 @@ mod tests {
#[test]
fn test_compound_transact() {
let mut sqlite = db::new_connection("").unwrap();
let mut conn = Conn::connect(&mut sqlite).unwrap();
let conn = Conn::connect(&mut sqlite).unwrap();
let tempid_offset = get_next_entid(&conn);
@ -529,7 +528,7 @@ mod tests {
#[test]
fn test_simple_prepared_query() {
let mut c = db::new_connection("").expect("Couldn't open conn.");
let mut conn = Conn::connect(&mut c).expect("Couldn't open DB.");
let conn = Conn::connect(&mut c).expect("Couldn't open DB.");
conn.transact(&mut c, r#"[
[:db/add "s" :db/ident :foo/boolean]
[:db/add "s" :db/valueType :db.type/boolean]
@ -566,7 +565,7 @@ mod tests {
#[test]
fn test_compound_rollback() {
let mut sqlite = db::new_connection("").unwrap();
let mut conn = Conn::connect(&mut sqlite).unwrap();
let conn = Conn::connect(&mut sqlite).unwrap();
let tempid_offset = get_next_entid(&conn);
@ -616,7 +615,7 @@ mod tests {
#[test]
fn test_transact_errors() {
let mut sqlite = db::new_connection("").unwrap();
let mut conn = Conn::connect(&mut sqlite).unwrap();
let conn = Conn::connect(&mut sqlite).unwrap();
// Good: empty transaction.
let report = conn.transact(&mut sqlite, "[]").unwrap();
@ -661,7 +660,7 @@ mod tests {
#[test]
fn test_add_to_cache_failure_no_attribute() {
let mut sqlite = db::new_connection("").unwrap();
let mut conn = Conn::connect(&mut sqlite).unwrap();
let conn = Conn::connect(&mut sqlite).unwrap();
let _report = conn.transact(&mut sqlite, r#"[
{ :db/ident :foo/bar
:db/valueType :db.type/long },
@ -682,7 +681,7 @@ mod tests {
fn test_lookup_attribute_with_caching() {
let mut sqlite = db::new_connection("").unwrap();
let mut conn = Conn::connect(&mut sqlite).unwrap();
let conn = Conn::connect(&mut sqlite).unwrap();
let _report = conn.transact(&mut sqlite, r#"[
{ :db/ident :foo/bar
:db/valueType :db.type/long },
@ -741,7 +740,7 @@ mod tests {
#[test]
fn test_cache_usage() {
let mut sqlite = db::new_connection("").unwrap();
let mut conn = Conn::connect(&mut sqlite).unwrap();
let conn = Conn::connect(&mut sqlite).unwrap();
let db_ident = (*conn.current_schema()).get_entid(&kw!(:db/ident)).expect("db_ident").0;
let db_type = (*conn.current_schema()).get_entid(&kw!(:db/valueType)).expect("db_ident").0;

View file

@ -254,7 +254,7 @@ fn test_instants_and_uuids() {
let start = Utc::now() + FixedOffset::west(60 * 60);
let mut c = new_connection("").expect("Couldn't open conn.");
let mut conn = Conn::connect(&mut c).expect("Couldn't open DB.");
let conn = Conn::connect(&mut c).expect("Couldn't open DB.");
conn.transact(&mut c, r#"[
[:db/add "s" :db/ident :foo/uuid]
[:db/add "s" :db/valueType :db.type/uuid]
@ -291,7 +291,7 @@ fn test_instants_and_uuids() {
#[test]
fn test_tx() {
let mut c = new_connection("").expect("Couldn't open conn.");
let mut conn = Conn::connect(&mut c).expect("Couldn't open DB.");
let conn = Conn::connect(&mut c).expect("Couldn't open DB.");
conn.transact(&mut c, r#"[
[:db/add "s" :db/ident :foo/uuid]
[:db/add "s" :db/valueType :db.type/uuid]
@ -324,7 +324,7 @@ fn test_tx() {
#[test]
fn test_tx_as_input() {
let mut c = new_connection("").expect("Couldn't open conn.");
let mut conn = Conn::connect(&mut c).expect("Couldn't open DB.");
let conn = Conn::connect(&mut c).expect("Couldn't open DB.");
conn.transact(&mut c, r#"[
[:db/add "s" :db/ident :foo/uuid]
[:db/add "s" :db/valueType :db.type/uuid]
@ -361,7 +361,7 @@ fn test_tx_as_input() {
#[test]
fn test_fulltext() {
let mut c = new_connection("").expect("Couldn't open conn.");
let mut conn = Conn::connect(&mut c).expect("Couldn't open DB.");
let conn = Conn::connect(&mut c).expect("Couldn't open DB.");
conn.transact(&mut c, r#"[
[:db/add "a" :db/ident :foo/term]
@ -467,7 +467,7 @@ fn test_fulltext() {
#[test]
fn test_instant_range_query() {
let mut c = new_connection("").expect("Couldn't open conn.");
let mut conn = Conn::connect(&mut c).expect("Couldn't open DB.");
let conn = Conn::connect(&mut c).expect("Couldn't open DB.");
conn.transact(&mut c, r#"[
[:db/add "a" :db/ident :foo/date]
@ -503,7 +503,7 @@ fn test_instant_range_query() {
#[test]
fn test_lookup() {
let mut c = new_connection("").expect("Couldn't open conn.");
let mut conn = Conn::connect(&mut c).expect("Couldn't open DB.");
let conn = Conn::connect(&mut c).expect("Couldn't open DB.");
conn.transact(&mut c, r#"[
[:db/add "a" :db/ident :foo/date]
@ -656,7 +656,7 @@ fn test_aggregates_type_handling() {
#[test]
fn test_type_reqs() {
let mut c = new_connection("").expect("Couldn't open conn.");
let mut conn = Conn::connect(&mut c).expect("Couldn't open DB.");
let conn = Conn::connect(&mut c).expect("Couldn't open DB.");
conn.transact(&mut c, r#"[
{:db/ident :test/boolean :db/valueType :db.type/boolean :db/cardinality :db.cardinality/one}
@ -1526,3 +1526,63 @@ fn test_encrypted() {
// so the specific test we use doesn't matter that much.
run_tx_data_test(Store::open_with_key("", "secret").expect("opened"));
}
#[test]
fn test_conn_cross_thread() {
let file = "file:memdb?mode=memory&cache=shared";
let mut sqlite = mentat_db::db::new_connection(file).expect("Couldn't open in-memory db");
let conn = Conn::connect(&mut sqlite).expect("to connect");
conn.transact(&mut sqlite, r#"[
[:db/add "a" :db/ident :foo/term]
[:db/add "a" :db/valueType :db.type/string]
[:db/add "a" :db/fulltext false]
[:db/add "a" :db/cardinality :db.cardinality/many]
]"#).unwrap();
let _tx1 = conn.transact(&mut sqlite, r#"[
[:db/add "e" :foo/term "1"]
]"#).expect("tx1 to apply");
let _tx2 = conn.transact(&mut sqlite, r#"[
[:db/add "e" :foo/term "2"]
]"#).expect("tx2 to apply");
use std::sync::Arc;
use std::sync::mpsc::channel;
let conn = Arc::new(conn);
let (tx1, rx1) = channel();
let (txs, rxs) = channel();
let (tx2, rx2) = channel();
std::thread::spawn(move || {
let shared_conn: Arc<Conn> = rx1.recv().expect("rx1");
let mut sqlite1 = mentat_db::db::new_connection(file).expect("Couldn't open in-memory db");
shared_conn.transact(&mut sqlite1, r#"[
[:db/add "a" :db/ident :foo/bar]
[:db/add "a" :db/valueType :db.type/long]
[:db/add "a" :db/cardinality :db.cardinality/many]
]"#).unwrap();
txs.send(()).expect("to sync");
});
tx1.send(conn.clone()).expect("tx1");
rxs.recv().expect("to sync");
std::thread::spawn(move || {
let shared_conn: Arc<Conn> = rx2.recv().expect("rx1");
let mut sqlite2 = mentat_db::db::new_connection(file).expect("Couldn't open in-memory db");
assert_eq!(None, shared_conn.current_schema().get_entid(&Keyword::namespaced("foo", "bar")));
shared_conn.q_once(&mut sqlite2, "[:find ?e :where [?e _ _]]", None).expect("to prepare");
});
tx2.send(conn.clone()).expect("tx2");
}

View file

@ -186,7 +186,7 @@ fn test_add_vocab() {
let foo_v1_b = vocabulary::Definition::new(kw!(:org.mozilla/foo), 1, bar_and_baz.clone());
let mut sqlite = mentat_db::db::new_connection("").unwrap();
let mut conn = Conn::connect(&mut sqlite).unwrap();
let conn = Conn::connect(&mut sqlite).unwrap();
let foo_version_query = r#"[:find [?version ?aa]
:where

View file

@ -276,3 +276,157 @@ impl<'a, 'c> EntityBuilder<InProgressBuilder<'a, 'c>> {
self.finish().0.commit()
}
}
#[cfg(test)]
mod testing {
extern crate mentat_db;
use ::{
Conn,
Entid,
HasSchema,
KnownEntid,
MentatError,
Queryable,
TxReport,
TypedValue,
};
use super::*;
// In reality we expect the store to hand these out safely.
fn fake_known_entid(e: Entid) -> KnownEntid {
KnownEntid(e)
}
#[test]
fn test_entity_builder_bogus_entids() {
let mut builder = TermBuilder::new();
let e = builder.named_tempid("x");
let a1 = fake_known_entid(37); // :db/doc
let a2 = fake_known_entid(999);
let v = TypedValue::typed_string("Some attribute");
let ve = fake_known_entid(12345);
builder.add(e.clone(), a1, v).expect("add succeeded");
builder.add(e.clone(), a2, e.clone()).expect("add succeeded, even though it's meaningless");
builder.add(e.clone(), a2, ve).expect("add succeeded, even though it's meaningless");
let (terms, tempids) = builder.build().expect("build succeeded");
assert_eq!(tempids.len(), 1);
assert_eq!(terms.len(), 3); // TODO: check the contents?
// Now try to add them to a real store.
let mut sqlite = mentat_db::db::new_connection("").unwrap();
let conn = Conn::connect(&mut sqlite).unwrap();
let mut in_progress = conn.begin_transaction(&mut sqlite).expect("begun successfully");
// This should fail: unrecognized entid.
match in_progress.transact_entities(terms).expect_err("expected transact to fail") {
MentatError::DbError(e) => {
assert_eq!(e.kind(), mentat_db::DbErrorKind::UnrecognizedEntid(999));
},
_ => panic!("Should have rejected the entid."),
}
}
#[test]
fn test_in_progress_builder() {
let mut sqlite = mentat_db::db::new_connection("").unwrap();
let conn = Conn::connect(&mut sqlite).unwrap();
// Give ourselves a schema to work with!
conn.transact(&mut sqlite, r#"[
[:db/add "o" :db/ident :foo/one]
[:db/add "o" :db/valueType :db.type/long]
[:db/add "o" :db/cardinality :db.cardinality/one]
[:db/add "m" :db/ident :foo/many]
[:db/add "m" :db/valueType :db.type/string]
[:db/add "m" :db/cardinality :db.cardinality/many]
[:db/add "r" :db/ident :foo/ref]
[:db/add "r" :db/valueType :db.type/ref]
[:db/add "r" :db/cardinality :db.cardinality/one]
]"#).unwrap();
let in_progress = conn.begin_transaction(&mut sqlite).expect("begun successfully");
// We can use this or not!
let a_many = in_progress.get_entid(&kw!(:foo/many)).expect(":foo/many");
let mut builder = in_progress.builder();
let e_x = builder.named_tempid("x");
let v_many_1 = TypedValue::typed_string("Some text");
let v_many_2 = TypedValue::typed_string("Other text");
builder.add(e_x.clone(), kw!(:foo/many), v_many_1).expect("add succeeded");
builder.add(e_x.clone(), a_many, v_many_2).expect("add succeeded");
builder.commit().expect("commit succeeded");
}
#[test]
fn test_entity_builder() {
let mut sqlite = mentat_db::db::new_connection("").unwrap();
let conn = Conn::connect(&mut sqlite).unwrap();
let foo_one = kw!(:foo/one);
let foo_many = kw!(:foo/many);
let foo_ref = kw!(:foo/ref);
let report: TxReport;
// Give ourselves a schema to work with!
// Scoped borrow of conn.
{
conn.transact(&mut sqlite, r#"[
[:db/add "o" :db/ident :foo/one]
[:db/add "o" :db/valueType :db.type/long]
[:db/add "o" :db/cardinality :db.cardinality/one]
[:db/add "m" :db/ident :foo/many]
[:db/add "m" :db/valueType :db.type/string]
[:db/add "m" :db/cardinality :db.cardinality/many]
[:db/add "r" :db/ident :foo/ref]
[:db/add "r" :db/valueType :db.type/ref]
[:db/add "r" :db/cardinality :db.cardinality/one]
]"#).unwrap();
let mut in_progress = conn.begin_transaction(&mut sqlite).expect("begun successfully");
// Scoped borrow of in_progress.
{
let mut builder = TermBuilder::new();
let e_x = builder.named_tempid("x");
let e_y = builder.named_tempid("y");
let a_ref = in_progress.get_entid(&foo_ref).expect(":foo/ref");
let a_one = in_progress.get_entid(&foo_one).expect(":foo/one");
let a_many = in_progress.get_entid(&foo_many).expect(":foo/many");
let v_many_1 = TypedValue::typed_string("Some text");
let v_many_2 = TypedValue::typed_string("Other text");
let v_long: TypedValue = 123.into();
builder.add(e_x.clone(), a_many, v_many_1).expect("add succeeded");
builder.add(e_x.clone(), a_many, v_many_2).expect("add succeeded");
builder.add(e_y.clone(), a_ref, e_x.clone()).expect("add succeeded");
builder.add(e_x.clone(), a_one, v_long).expect("add succeeded");
let (terms, tempids) = builder.build().expect("build succeeded");
assert_eq!(tempids.len(), 2);
assert_eq!(terms.len(), 4);
report = in_progress.transact_entities(terms).expect("add succeeded");
let x = report.tempids.get("x").expect("our tempid has an ID");
let y = report.tempids.get("y").expect("our tempid has an ID");
assert_eq!(in_progress.lookup_value_for_attribute(*y, &foo_ref).expect("lookup succeeded"),
Some(TypedValue::Ref(*x)));
assert_eq!(in_progress.lookup_value_for_attribute(*x, &foo_one).expect("lookup succeeded"),
Some(TypedValue::Long(123)));
}
in_progress.commit().expect("commit succeeded");
}
// It's all still there after the commit.
let x = report.tempids.get("x").expect("our tempid has an ID");
let y = report.tempids.get("y").expect("our tempid has an ID");
assert_eq!(conn.lookup_value_for_attribute(&mut sqlite, *y, &foo_ref).expect("lookup succeeded"),
Some(TypedValue::Ref(*x)));
}
}