From f86b24001f38e0f37f791b393dfc371df7aa8205 Mon Sep 17 00:00:00 2001 From: Nick Alexander Date: Fri, 3 Mar 2017 15:03:59 -0800 Subject: [PATCH] Add top-level `Conn`. Fixes #296. (#342) r=rnewman * Add top-level `Conn`. Fixes #296. This is a little different than the API rnewman and I originally discussed in https://public.etherpad-mozilla.org/p/db-conn-thoughts. A few notes: - I was led to make a `Schema` instance the thing that is shared, rather than a `db::DB`. It's possible that queries will want to know the current transaction at some point (to prevent races, or to query historical data), but that can be a future consideration. - The generation number just allows for a cheap comparison. I don't care to handle races to transact just yet; the long term plan might be to make embedding applications responsible for avoiding races, or we might handle queuing transactions and yielding report futures in Mentat itself. - The sharing of the partition maps is a little more subtle than expected. Partition maps are volatile: a successful Mentat transaction always advances the :db.part/tx partition, so it's not worth passing references around. This means that consumers must clone in order to maintain just a single clone per transaction. Clean some cruft. * Review comments. --- db/src/db.rs | 4 +- db/src/tx.rs | 20 ++--- src/conn.rs | 218 +++++++++++++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 1 + 4 files changed, 227 insertions(+), 16 deletions(-) create mode 100644 src/conn.rs diff --git a/db/src/db.rs b/db/src/db.rs index 1a6fd5ff..37d457ce 100644 --- a/db/src/db.rs +++ b/db/src/db.rs @@ -208,7 +208,7 @@ pub fn create_current_version(conn: &mut rusqlite::Connection) -> Result { // TODO: return to transact_internal to self-manage the encompassing SQLite transaction. let bootstrap_schema = bootstrap::bootstrap_schema(); - let (_report, next_partition_map, next_schema) = transact(&tx, &bootstrap_partition_map, &bootstrap_schema, bootstrap::bootstrap_entities())?; + let (_report, next_partition_map, next_schema) = transact(&tx, bootstrap_partition_map, &bootstrap_schema, bootstrap::bootstrap_entities())?; if next_schema.is_some() { // TODO Use custom ErrorKind https://github.com/brson/error-chain/issues/117 bail!(ErrorKind::NotYetImplemented(format!("Initial bootstrap transaction did not produce expected bootstrap schema"))); @@ -902,7 +902,7 @@ mod tests { let entities: Vec<_> = mentat_tx_parser::Tx::parse(&[assertions][..]).unwrap(); - let maybe_report = transact(&conn, partition_map, schema, entities); + let maybe_report = transact(&conn, partition_map.clone(), schema, entities); if let Some(expected_transaction) = expected_transaction { if !expected_transaction.is_nil() { diff --git a/db/src/tx.rs b/db/src/tx.rs index ee9bf174..066a911f 100644 --- a/db/src/tx.rs +++ b/db/src/tx.rs @@ -46,6 +46,7 @@ //! keep everything straight. use std; +use std::borrow::Cow; use std::collections::BTreeSet; use ::{to_namespaced_keyword}; @@ -275,7 +276,7 @@ impl<'conn, 'a> Tx<'conn, 'a> { let temp_id_allocations: TempIdMap = unresolved_temp_ids.into_iter().zip(entids).collect(); let final_populations = generation.into_final_populations(&temp_id_allocations)?; - { + /// Assertions that are :db.cardinality/one and not :db.fulltext. let mut non_fts_one: Vec = vec![]; @@ -325,10 +326,6 @@ impl<'conn, 'a> Tx<'conn, 'a> { } self.store.commit_transaction(self.tx_id)?; - } - - // let mut next_schema = self.schema.to_mut(); - // next_schema.ident_map.insert(NamespacedKeyword::new("db", "new"), 1000); // TODO: update idents and schema materialized views. db::update_partition_map(self.store, &self.partition_map)?; @@ -340,25 +337,20 @@ impl<'conn, 'a> Tx<'conn, 'a> { } } -use std::borrow::Cow; - -/// Transact the given `entities` against the given SQLite `conn`, using the metadata in -/// `self.DB`. +/// Transact the given `entities` against the given SQLite `conn`, using the given metadata. /// /// This approach is explained in https://github.com/mozilla/mentat/wiki/Transacting. // TODO: move this to the transactor layer. -pub fn transact<'conn, 'a, I>(conn: &'conn rusqlite::Connection, partition_map: &'a PartitionMap, schema: &'a Schema, entities: I) -> Result<(TxReport, PartitionMap, Option)> where I: IntoIterator { +pub fn transact<'conn, 'a, I>(conn: &'conn rusqlite::Connection, mut partition_map: PartitionMap, schema: &'a Schema, entities: I) -> Result<(TxReport, PartitionMap, Option)> where I: IntoIterator { // Eventually, this function will be responsible for managing a SQLite transaction. For // now, it's just about the tx details. let tx_instant = ::now(); // Label the transaction with the timestamp when we first see it: leading edge. - - let mut next_partition_map: PartitionMap = partition_map.clone(); - let tx_id = next_partition_map.allocate_entid(":db.part/tx"); + let tx_id = partition_map.allocate_entid(":db.part/tx"); conn.begin_transaction()?; - let mut tx = Tx::new(conn, next_partition_map, schema, tx_id, tx_instant); + let mut tx = Tx::new(conn, partition_map, schema, tx_id, tx_instant); let report = tx.transact_entities(entities)?; diff --git a/src/conn.rs b/src/conn.rs new file mode 100644 index 00000000..4b07f6ca --- /dev/null +++ b/src/conn.rs @@ -0,0 +1,218 @@ +// Copyright 2016 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +#![allow(dead_code)] + +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; + +use rusqlite; + +use edn; +use errors::*; +use mentat_core::{ + Schema, + TypedValue, +}; +use mentat_db::db; +use mentat_db::{ + transact, + PartitionMap, + TxReport, +}; +use mentat_tx_parser; +use query::{ + q_once, + QueryResults, +}; + + +/// Connection metadata required to query from, or apply transactions to, a Mentat store. +/// +/// Owned data for the volatile parts (generation and partition map), and `Arc` for the infrequently +/// changing parts (schema) that we want to share across threads. +/// +/// See https://github.com/mozilla/mentat/wiki/Thoughts:-modeling-db-conn-in-Rust. +pub struct Metadata { + pub generation: u64, + pub partition_map: PartitionMap, + pub schema: Arc, +} + +impl Metadata { + // Intentionally not public. + fn new(generation: u64, partition_map: PartitionMap, schema: Arc) -> Metadata { + Metadata { + generation: generation, + partition_map: partition_map, + schema: schema, + } + } +} + +/// A mutable, safe reference to the current Mentat store. +struct Conn { + /// `Mutex` since all reads and writes need to be exclusive. Internally, owned data for the + /// volatile parts (generation and partition map), and `Arc` for the infrequently changing parts + /// (schema) that we want to share across threads. A consuming thread may use a shared + /// reference after the `Conn`'s `Metadata` has moved on. + /// + /// The motivating case is multiple query threads taking references to the current schema to + /// perform long-running queries while a single writer thread moves the metadata -- partition + /// map and schema -- forward. + metadata: Mutex, + + // TODO: maintain set of change listeners or handles to transaction report queues. #298. + + // TODO: maintain cache of query plans that could be shared across threads and invalidated when + // the schema changes. #315. +} + +impl Conn { + // Intentionally not public. + fn new(partition_map: PartitionMap, schema: Schema) -> Conn { + Conn { + metadata: Mutex::new(Metadata::new(0, partition_map, Arc::new(schema))) + } + } + + pub fn connect(sqlite: &mut rusqlite::Connection) -> Result { + let db = db::ensure_current_version(sqlite) + .chain_err(|| "Unable to initialize Mentat store")?; + Ok(Conn::new(db.partition_map, db.schema)) + } + + /// Yield the current `Schema` instance. + pub fn current_schema(&self) -> Arc { + // We always unwrap the mutex lock: if it's poisoned, this will propogate panics to all + // accessing threads. This is perhaps not reasonable; we expect the mutex to be held for + // very short intervals, but a panic during a critical update section is possible, since the + // lock encapsulates committing a SQL transaction. + // + // That being said, in the future we will provide an interface to take the mutex, providing + // maximum flexibility for Mentat consumers. + // + // This approach might need to change when we support interrupting query threads (#297), and + // will definitely need to change if we support interrupting transactor threads. + // + // Improving this is tracked by https://github.com/mozilla/mentat/issues/356. + self.metadata.lock().unwrap().schema.clone() + } + + /// Query the Mentat store, using the given connection and the current metadata. + pub fn q_once(&self, + sqlite: &rusqlite::Connection, + query: &str, + inputs: T) -> Result + where T: Into>> { + + q_once(sqlite, + &*self.current_schema(), + query, + inputs.into()) + } + + /// Transact entities against the Mentat store, using the given connection and the current + /// metadata. + pub fn transact(&mut self, + sqlite: &mut rusqlite::Connection, + transaction: &str) -> Result { + + let assertion_vector = edn::parse::value(transaction) + .map(|x| x.without_spans())?; + let entities = mentat_tx_parser::Tx::parse(&[assertion_vector][..])?; + + let tx = sqlite.transaction()?; + + let (current_generation, current_partition_map, current_schema) = + { + // The mutex is taken during this block. + let ref current: Metadata = *self.metadata.lock().unwrap(); + (current.generation, + // Expensive, but the partition map is updated after every committed transaction. + current.partition_map.clone(), + // Cheap. + current.schema.clone()) + }; + + // The transaction is processed while the mutex is not held. + let (report, next_partition_map, next_schema) = transact(&tx, current_partition_map, &*current_schema, entities)?; + + { + // The mutex is taken during this block. + let mut metadata = self.metadata.lock().unwrap(); + + if current_generation != metadata.generation { + // Somebody else wrote! + // Retrying is tracked by https://github.com/mozilla/mentat/issues/357. + bail!("Lost the transact() race!"); + } + + // Commit the SQLite transaction while we hold the mutex. + tx.commit()?; + + metadata.generation += 1; + metadata.partition_map = next_partition_map; + if let Some(next_schema) = next_schema { + metadata.schema = Arc::new(next_schema); + } + } + + Ok(report) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + extern crate mentat_parser_utils; + use self::mentat_parser_utils::ValueParseError; + + #[test] + fn test_transact_errors() { + let mut sqlite = db::new_connection("").unwrap(); + let mut conn = Conn::connect(&mut sqlite).unwrap(); + + // Good: empty transaction. + let report = conn.transact(&mut sqlite, "[]").unwrap(); + assert_eq!(report.tx_id, 0x10000000 + 1); + + // Bad EDN: missing closing ']'. + let report = conn.transact(&mut sqlite, "[[:db/add \"t\" :db/ident :a/keyword]"); + match report.unwrap_err() { + Error(ErrorKind::EdnParseError(_), _) => { }, + x => panic!("expected EDN parse error, got {:?}", x), + } + + // Good EDN. + let report = conn.transact(&mut sqlite, "[[:db/add \"t\" :db/ident :a/keyword]]").unwrap(); + assert_eq!(report.tx_id, 0x10000000 + 2); + + // Bad transaction data: missing leading :db/add. + let report = conn.transact(&mut sqlite, "[[\"t\" :db/ident :b/keyword]]"); + match report.unwrap_err() { + Error(ErrorKind::TxParseError(::mentat_tx_parser::errors::ErrorKind::ParseError(ValueParseError { .. })), _) => { }, + x => panic!("expected EDN parse error, got {:?}", x), + } + + // Good transaction data. + let report = conn.transact(&mut sqlite, "[[:db/add \"u\" :db/ident :b/keyword]]").unwrap(); + assert_eq!(report.tx_id, 0x10000000 + 3); + + // Bad transaction based on state of store: conflicting upsert. + let report = conn.transact(&mut sqlite, "[[:db/add \"u\" :db/ident :a/keyword] + [:db/add \"u\" :db/ident :b/keyword]]"); + match report.unwrap_err() { + Error(ErrorKind::DbError(::mentat_db::errors::ErrorKind::NotYetImplemented(_)), _) => { }, + x => panic!("expected EDN parse error, got {:?}", x), + } + } +} diff --git a/src/lib.rs b/src/lib.rs index bf635254..f8d6d864 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -33,6 +33,7 @@ use rusqlite::Connection; pub mod errors; pub mod ident; +pub mod conn; pub mod query; pub fn get_name() -> String {