Add top-level Conn. Fixes #296. (#342) r=rnewman

* Add top-level `Conn`. Fixes #296.

This is a little different than the API rnewman and I originally
discussed in https://public.etherpad-mozilla.org/p/db-conn-thoughts.
A few notes:

- I was led to make a `Schema` instance the thing that is shared,
  rather than a `db::DB`.  It's possible that queries will want to
  know the current transaction at some point (to prevent races, or to
  query historical data), but that can be a future consideration.

- The generation number just allows for a cheap comparison.  I don't
  care to handle races to transact just yet; the long term plan might
  be to make embedding applications responsible for avoiding races, or
  we might handle queuing transactions and yielding report futures in
  Mentat itself.

- The sharing of the partition maps is a little more subtle than
  expected.  Partition maps are volatile: a successful Mentat
  transaction always advances the :db.part/tx partition, so it's not
  worth passing references around.  This means that consumers must
  clone in order to maintain just a single clone per transaction.

Clean some cruft.

* Review comments.
This commit is contained in:
Nick Alexander 2017-03-03 15:03:59 -08:00 committed by GitHub
parent ecf56395b9
commit f86b24001f
4 changed files with 227 additions and 16 deletions

View file

@ -208,7 +208,7 @@ pub fn create_current_version(conn: &mut rusqlite::Connection) -> Result<DB> {
// TODO: return to transact_internal to self-manage the encompassing SQLite transaction. // TODO: return to transact_internal to self-manage the encompassing SQLite transaction.
let bootstrap_schema = bootstrap::bootstrap_schema(); let bootstrap_schema = bootstrap::bootstrap_schema();
let (_report, next_partition_map, next_schema) = transact(&tx, &bootstrap_partition_map, &bootstrap_schema, bootstrap::bootstrap_entities())?; let (_report, next_partition_map, next_schema) = transact(&tx, bootstrap_partition_map, &bootstrap_schema, bootstrap::bootstrap_entities())?;
if next_schema.is_some() { if next_schema.is_some() {
// TODO Use custom ErrorKind https://github.com/brson/error-chain/issues/117 // TODO Use custom ErrorKind https://github.com/brson/error-chain/issues/117
bail!(ErrorKind::NotYetImplemented(format!("Initial bootstrap transaction did not produce expected bootstrap schema"))); bail!(ErrorKind::NotYetImplemented(format!("Initial bootstrap transaction did not produce expected bootstrap schema")));
@ -902,7 +902,7 @@ mod tests {
let entities: Vec<_> = mentat_tx_parser::Tx::parse(&[assertions][..]).unwrap(); let entities: Vec<_> = mentat_tx_parser::Tx::parse(&[assertions][..]).unwrap();
let maybe_report = transact(&conn, partition_map, schema, entities); let maybe_report = transact(&conn, partition_map.clone(), schema, entities);
if let Some(expected_transaction) = expected_transaction { if let Some(expected_transaction) = expected_transaction {
if !expected_transaction.is_nil() { if !expected_transaction.is_nil() {

View file

@ -46,6 +46,7 @@
//! keep everything straight. //! keep everything straight.
use std; use std;
use std::borrow::Cow;
use std::collections::BTreeSet; use std::collections::BTreeSet;
use ::{to_namespaced_keyword}; use ::{to_namespaced_keyword};
@ -275,7 +276,7 @@ impl<'conn, 'a> Tx<'conn, 'a> {
let temp_id_allocations: TempIdMap = unresolved_temp_ids.into_iter().zip(entids).collect(); let temp_id_allocations: TempIdMap = unresolved_temp_ids.into_iter().zip(entids).collect();
let final_populations = generation.into_final_populations(&temp_id_allocations)?; let final_populations = generation.into_final_populations(&temp_id_allocations)?;
{
/// Assertions that are :db.cardinality/one and not :db.fulltext. /// Assertions that are :db.cardinality/one and not :db.fulltext.
let mut non_fts_one: Vec<db::ReducedEntity> = vec![]; let mut non_fts_one: Vec<db::ReducedEntity> = vec![];
@ -325,10 +326,6 @@ impl<'conn, 'a> Tx<'conn, 'a> {
} }
self.store.commit_transaction(self.tx_id)?; self.store.commit_transaction(self.tx_id)?;
}
// let mut next_schema = self.schema.to_mut();
// next_schema.ident_map.insert(NamespacedKeyword::new("db", "new"), 1000);
// TODO: update idents and schema materialized views. // TODO: update idents and schema materialized views.
db::update_partition_map(self.store, &self.partition_map)?; db::update_partition_map(self.store, &self.partition_map)?;
@ -340,25 +337,20 @@ impl<'conn, 'a> Tx<'conn, 'a> {
} }
} }
use std::borrow::Cow; /// Transact the given `entities` against the given SQLite `conn`, using the given metadata.
/// Transact the given `entities` against the given SQLite `conn`, using the metadata in
/// `self.DB`.
/// ///
/// This approach is explained in https://github.com/mozilla/mentat/wiki/Transacting. /// This approach is explained in https://github.com/mozilla/mentat/wiki/Transacting.
// TODO: move this to the transactor layer. // TODO: move this to the transactor layer.
pub fn transact<'conn, 'a, I>(conn: &'conn rusqlite::Connection, partition_map: &'a PartitionMap, schema: &'a Schema, entities: I) -> Result<(TxReport, PartitionMap, Option<Schema>)> where I: IntoIterator<Item=Entity> { pub fn transact<'conn, 'a, I>(conn: &'conn rusqlite::Connection, mut partition_map: PartitionMap, schema: &'a Schema, entities: I) -> Result<(TxReport, PartitionMap, Option<Schema>)> where I: IntoIterator<Item=Entity> {
// Eventually, this function will be responsible for managing a SQLite transaction. For // Eventually, this function will be responsible for managing a SQLite transaction. For
// now, it's just about the tx details. // now, it's just about the tx details.
let tx_instant = ::now(); // Label the transaction with the timestamp when we first see it: leading edge. let tx_instant = ::now(); // Label the transaction with the timestamp when we first see it: leading edge.
let tx_id = partition_map.allocate_entid(":db.part/tx");
let mut next_partition_map: PartitionMap = partition_map.clone();
let tx_id = next_partition_map.allocate_entid(":db.part/tx");
conn.begin_transaction()?; conn.begin_transaction()?;
let mut tx = Tx::new(conn, next_partition_map, schema, tx_id, tx_instant); let mut tx = Tx::new(conn, partition_map, schema, tx_id, tx_instant);
let report = tx.transact_entities(entities)?; let report = tx.transact_entities(entities)?;

218
src/conn.rs Normal file
View file

@ -0,0 +1,218 @@
// Copyright 2016 Mozilla
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// this file except in compliance with the License. You may obtain a copy of the
// License at http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
#![allow(dead_code)]
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use rusqlite;
use edn;
use errors::*;
use mentat_core::{
Schema,
TypedValue,
};
use mentat_db::db;
use mentat_db::{
transact,
PartitionMap,
TxReport,
};
use mentat_tx_parser;
use query::{
q_once,
QueryResults,
};
/// Connection metadata required to query from, or apply transactions to, a Mentat store.
///
/// Owned data for the volatile parts (generation and partition map), and `Arc` for the infrequently
/// changing parts (schema) that we want to share across threads.
///
/// See https://github.com/mozilla/mentat/wiki/Thoughts:-modeling-db-conn-in-Rust.
pub struct Metadata {
pub generation: u64,
pub partition_map: PartitionMap,
pub schema: Arc<Schema>,
}
impl Metadata {
// Intentionally not public.
fn new(generation: u64, partition_map: PartitionMap, schema: Arc<Schema>) -> Metadata {
Metadata {
generation: generation,
partition_map: partition_map,
schema: schema,
}
}
}
/// A mutable, safe reference to the current Mentat store.
struct Conn {
/// `Mutex` since all reads and writes need to be exclusive. Internally, owned data for the
/// volatile parts (generation and partition map), and `Arc` for the infrequently changing parts
/// (schema) that we want to share across threads. A consuming thread may use a shared
/// reference after the `Conn`'s `Metadata` has moved on.
///
/// The motivating case is multiple query threads taking references to the current schema to
/// perform long-running queries while a single writer thread moves the metadata -- partition
/// map and schema -- forward.
metadata: Mutex<Metadata>,
// TODO: maintain set of change listeners or handles to transaction report queues. #298.
// TODO: maintain cache of query plans that could be shared across threads and invalidated when
// the schema changes. #315.
}
impl Conn {
// Intentionally not public.
fn new(partition_map: PartitionMap, schema: Schema) -> Conn {
Conn {
metadata: Mutex::new(Metadata::new(0, partition_map, Arc::new(schema)))
}
}
pub fn connect(sqlite: &mut rusqlite::Connection) -> Result<Conn> {
let db = db::ensure_current_version(sqlite)
.chain_err(|| "Unable to initialize Mentat store")?;
Ok(Conn::new(db.partition_map, db.schema))
}
/// Yield the current `Schema` instance.
pub fn current_schema(&self) -> Arc<Schema> {
// We always unwrap the mutex lock: if it's poisoned, this will propogate panics to all
// accessing threads. This is perhaps not reasonable; we expect the mutex to be held for
// very short intervals, but a panic during a critical update section is possible, since the
// lock encapsulates committing a SQL transaction.
//
// That being said, in the future we will provide an interface to take the mutex, providing
// maximum flexibility for Mentat consumers.
//
// This approach might need to change when we support interrupting query threads (#297), and
// will definitely need to change if we support interrupting transactor threads.
//
// Improving this is tracked by https://github.com/mozilla/mentat/issues/356.
self.metadata.lock().unwrap().schema.clone()
}
/// Query the Mentat store, using the given connection and the current metadata.
pub fn q_once<T>(&self,
sqlite: &rusqlite::Connection,
query: &str,
inputs: T) -> Result<QueryResults>
where T: Into<Option<HashMap<String, TypedValue>>> {
q_once(sqlite,
&*self.current_schema(),
query,
inputs.into())
}
/// Transact entities against the Mentat store, using the given connection and the current
/// metadata.
pub fn transact(&mut self,
sqlite: &mut rusqlite::Connection,
transaction: &str) -> Result<TxReport> {
let assertion_vector = edn::parse::value(transaction)
.map(|x| x.without_spans())?;
let entities = mentat_tx_parser::Tx::parse(&[assertion_vector][..])?;
let tx = sqlite.transaction()?;
let (current_generation, current_partition_map, current_schema) =
{
// The mutex is taken during this block.
let ref current: Metadata = *self.metadata.lock().unwrap();
(current.generation,
// Expensive, but the partition map is updated after every committed transaction.
current.partition_map.clone(),
// Cheap.
current.schema.clone())
};
// The transaction is processed while the mutex is not held.
let (report, next_partition_map, next_schema) = transact(&tx, current_partition_map, &*current_schema, entities)?;
{
// The mutex is taken during this block.
let mut metadata = self.metadata.lock().unwrap();
if current_generation != metadata.generation {
// Somebody else wrote!
// Retrying is tracked by https://github.com/mozilla/mentat/issues/357.
bail!("Lost the transact() race!");
}
// Commit the SQLite transaction while we hold the mutex.
tx.commit()?;
metadata.generation += 1;
metadata.partition_map = next_partition_map;
if let Some(next_schema) = next_schema {
metadata.schema = Arc::new(next_schema);
}
}
Ok(report)
}
}
#[cfg(test)]
mod tests {
use super::*;
extern crate mentat_parser_utils;
use self::mentat_parser_utils::ValueParseError;
#[test]
fn test_transact_errors() {
let mut sqlite = db::new_connection("").unwrap();
let mut conn = Conn::connect(&mut sqlite).unwrap();
// Good: empty transaction.
let report = conn.transact(&mut sqlite, "[]").unwrap();
assert_eq!(report.tx_id, 0x10000000 + 1);
// Bad EDN: missing closing ']'.
let report = conn.transact(&mut sqlite, "[[:db/add \"t\" :db/ident :a/keyword]");
match report.unwrap_err() {
Error(ErrorKind::EdnParseError(_), _) => { },
x => panic!("expected EDN parse error, got {:?}", x),
}
// Good EDN.
let report = conn.transact(&mut sqlite, "[[:db/add \"t\" :db/ident :a/keyword]]").unwrap();
assert_eq!(report.tx_id, 0x10000000 + 2);
// Bad transaction data: missing leading :db/add.
let report = conn.transact(&mut sqlite, "[[\"t\" :db/ident :b/keyword]]");
match report.unwrap_err() {
Error(ErrorKind::TxParseError(::mentat_tx_parser::errors::ErrorKind::ParseError(ValueParseError { .. })), _) => { },
x => panic!("expected EDN parse error, got {:?}", x),
}
// Good transaction data.
let report = conn.transact(&mut sqlite, "[[:db/add \"u\" :db/ident :b/keyword]]").unwrap();
assert_eq!(report.tx_id, 0x10000000 + 3);
// Bad transaction based on state of store: conflicting upsert.
let report = conn.transact(&mut sqlite, "[[:db/add \"u\" :db/ident :a/keyword]
[:db/add \"u\" :db/ident :b/keyword]]");
match report.unwrap_err() {
Error(ErrorKind::DbError(::mentat_db::errors::ErrorKind::NotYetImplemented(_)), _) => { },
x => panic!("expected EDN parse error, got {:?}", x),
}
}
}

View file

@ -33,6 +33,7 @@ use rusqlite::Connection;
pub mod errors; pub mod errors;
pub mod ident; pub mod ident;
pub mod conn;
pub mod query; pub mod query;
pub fn get_name() -> String { pub fn get_name() -> String {