mentat/src/conn.rs
2017-04-03 13:15:21 -07:00

220 lines
8.1 KiB
Rust

// Copyright 2016 Mozilla
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// this file except in compliance with the License. You may obtain a copy of the
// License at http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
#![allow(dead_code)]
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use rusqlite;
use edn;
use errors::*;
use mentat_core::{
Schema,
TypedValue,
};
use mentat_db::db;
use mentat_db::{
transact,
PartitionMap,
TxReport,
};
use mentat_tx_parser;
use query::{
q_once,
QueryResults,
};
/// Connection metadata required to query from, or apply transactions to, a Mentat store.
///
/// Owned data for the volatile parts (generation and partition map), and `Arc` for the infrequently
/// changing parts (schema) that we want to share across threads.
///
/// See https://github.com/mozilla/mentat/wiki/Thoughts:-modeling-db-conn-in-Rust.
pub struct Metadata {
pub generation: u64,
pub partition_map: PartitionMap,
pub schema: Arc<Schema>,
}
impl Metadata {
// Intentionally not public.
fn new(generation: u64, partition_map: PartitionMap, schema: Arc<Schema>) -> Metadata {
Metadata {
generation: generation,
partition_map: partition_map,
schema: schema,
}
}
}
/// A mutable, safe reference to the current Mentat store.
struct Conn {
/// `Mutex` since all reads and writes need to be exclusive. Internally, owned data for the
/// volatile parts (generation and partition map), and `Arc` for the infrequently changing parts
/// (schema) that we want to share across threads. A consuming thread may use a shared
/// reference after the `Conn`'s `Metadata` has moved on.
///
/// The motivating case is multiple query threads taking references to the current schema to
/// perform long-running queries while a single writer thread moves the metadata -- partition
/// map and schema -- forward.
metadata: Mutex<Metadata>,
// TODO: maintain set of change listeners or handles to transaction report queues. #298.
// TODO: maintain cache of query plans that could be shared across threads and invalidated when
// the schema changes. #315.
}
impl Conn {
// Intentionally not public.
fn new(partition_map: PartitionMap, schema: Schema) -> Conn {
Conn {
metadata: Mutex::new(Metadata::new(0, partition_map, Arc::new(schema)))
}
}
pub fn connect(sqlite: &mut rusqlite::Connection) -> Result<Conn> {
let db = db::ensure_current_version(sqlite)
.chain_err(|| "Unable to initialize Mentat store")?;
Ok(Conn::new(db.partition_map, db.schema))
}
/// Yield the current `Schema` instance.
pub fn current_schema(&self) -> Arc<Schema> {
// We always unwrap the mutex lock: if it's poisoned, this will propogate panics to all
// accessing threads. This is perhaps not reasonable; we expect the mutex to be held for
// very short intervals, but a panic during a critical update section is possible, since the
// lock encapsulates committing a SQL transaction.
//
// That being said, in the future we will provide an interface to take the mutex, providing
// maximum flexibility for Mentat consumers.
//
// This approach might need to change when we support interrupting query threads (#297), and
// will definitely need to change if we support interrupting transactor threads.
//
// Improving this is tracked by https://github.com/mozilla/mentat/issues/356.
self.metadata.lock().unwrap().schema.clone()
}
/// Query the Mentat store, using the given connection and the current metadata.
pub fn q_once<T, U>(&self,
sqlite: &rusqlite::Connection,
query: &str,
inputs: T,
limit: U) -> Result<QueryResults>
where T: Into<Option<HashMap<String, TypedValue>>>,
U: Into<Option<u64>>
{
q_once(sqlite,
&*self.current_schema(),
query,
inputs,
limit)
}
/// Transact entities against the Mentat store, using the given connection and the current
/// metadata.
pub fn transact(&mut self,
sqlite: &mut rusqlite::Connection,
transaction: &str) -> Result<TxReport> {
let assertion_vector = edn::parse::value(transaction)?;
let entities = mentat_tx_parser::Tx::parse(assertion_vector)?;
let tx = sqlite.transaction()?;
let (current_generation, current_partition_map, current_schema) =
{
// The mutex is taken during this block.
let ref current: Metadata = *self.metadata.lock().unwrap();
(current.generation,
// Expensive, but the partition map is updated after every committed transaction.
current.partition_map.clone(),
// Cheap.
current.schema.clone())
};
// The transaction is processed while the mutex is not held.
let (report, next_partition_map, next_schema) = transact(&tx, current_partition_map, &*current_schema, &*current_schema, entities)?;
{
// The mutex is taken during this block.
let mut metadata = self.metadata.lock().unwrap();
if current_generation != metadata.generation {
// Somebody else wrote!
// Retrying is tracked by https://github.com/mozilla/mentat/issues/357.
bail!("Lost the transact() race!");
}
// Commit the SQLite transaction while we hold the mutex.
tx.commit()?;
metadata.generation += 1;
metadata.partition_map = next_partition_map;
if let Some(next_schema) = next_schema {
metadata.schema = Arc::new(next_schema);
}
}
Ok(report)
}
}
#[cfg(test)]
mod tests {
use super::*;
extern crate mentat_parser_utils;
#[test]
fn test_transact_errors() {
let mut sqlite = db::new_connection("").unwrap();
let mut conn = Conn::connect(&mut sqlite).unwrap();
// Good: empty transaction.
let report = conn.transact(&mut sqlite, "[]").unwrap();
assert_eq!(report.tx_id, 0x10000000 + 1);
// Bad EDN: missing closing ']'.
let report = conn.transact(&mut sqlite, "[[:db/add \"t\" :db/ident :a/keyword]");
match report.unwrap_err() {
Error(ErrorKind::EdnParseError(_), _) => { },
x => panic!("expected EDN parse error, got {:?}", x),
}
// Good EDN.
let report = conn.transact(&mut sqlite, "[[:db/add \"t\" :db/ident :a/keyword]]").unwrap();
assert_eq!(report.tx_id, 0x10000000 + 2);
// Bad transaction data: missing leading :db/add.
let report = conn.transact(&mut sqlite, "[[\"t\" :db/ident :b/keyword]]");
match report.unwrap_err() {
Error(ErrorKind::TxParseError(::mentat_tx_parser::errors::ErrorKind::ParseError(_)), _) => { },
x => panic!("expected EDN parse error, got {:?}", x),
}
// Good transaction data.
let report = conn.transact(&mut sqlite, "[[:db/add \"u\" :db/ident :b/keyword]]").unwrap();
assert_eq!(report.tx_id, 0x10000000 + 3);
// Bad transaction based on state of store: conflicting upsert.
let report = conn.transact(&mut sqlite, "[[:db/add \"u\" :db/ident :a/keyword]
[:db/add \"u\" :db/ident :b/keyword]]");
match report.unwrap_err() {
Error(ErrorKind::DbError(::mentat_db::errors::ErrorKind::NotYetImplemented(_)), _) => { },
x => panic!("expected EDN parse error, got {:?}", x),
}
}
}