// Copyright 2016 Mozilla // // Licensed under the Apache License, Version 2.0 (the "License"); you may not use // this file except in compliance with the License. You may obtain a copy of the // License at http://www.apache.org/licenses/LICENSE-2.0 // Unless required by applicable law or agreed to in writing, software distributed // under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR // CONDITIONS OF ANY KIND, either express or implied. See the License for the // specific language governing permissions and limitations under the License. #![allow(dead_code)] use std::collections::HashMap; use std::sync::{Arc, Mutex}; use rusqlite; use edn; use errors::*; use mentat_core::{ Schema, TypedValue, }; use mentat_db::db; use mentat_db::{ transact, PartitionMap, TxReport, }; use mentat_tx_parser; use query::{ q_once, QueryResults, }; /// Connection metadata required to query from, or apply transactions to, a Mentat store. /// /// Owned data for the volatile parts (generation and partition map), and `Arc` for the infrequently /// changing parts (schema) that we want to share across threads. /// /// See https://github.com/mozilla/mentat/wiki/Thoughts:-modeling-db-conn-in-Rust. pub struct Metadata { pub generation: u64, pub partition_map: PartitionMap, pub schema: Arc, } impl Metadata { // Intentionally not public. fn new(generation: u64, partition_map: PartitionMap, schema: Arc) -> Metadata { Metadata { generation: generation, partition_map: partition_map, schema: schema, } } } /// A mutable, safe reference to the current Mentat store. pub struct Conn { /// `Mutex` since all reads and writes need to be exclusive. Internally, owned data for the /// volatile parts (generation and partition map), and `Arc` for the infrequently changing parts /// (schema) that we want to share across threads. A consuming thread may use a shared /// reference after the `Conn`'s `Metadata` has moved on. /// /// The motivating case is multiple query threads taking references to the current schema to /// perform long-running queries while a single writer thread moves the metadata -- partition /// map and schema -- forward. metadata: Mutex, // TODO: maintain set of change listeners or handles to transaction report queues. #298. // TODO: maintain cache of query plans that could be shared across threads and invalidated when // the schema changes. #315. } impl Conn { // Intentionally not public. fn new(partition_map: PartitionMap, schema: Schema) -> Conn { Conn { metadata: Mutex::new(Metadata::new(0, partition_map, Arc::new(schema))) } } pub fn connect(sqlite: &mut rusqlite::Connection) -> Result { let db = db::ensure_current_version(sqlite) .chain_err(|| "Unable to initialize Mentat store")?; Ok(Conn::new(db.partition_map, db.schema)) } /// Yield the current `Schema` instance. pub fn current_schema(&self) -> Arc { // We always unwrap the mutex lock: if it's poisoned, this will propogate panics to all // accessing threads. This is perhaps not reasonable; we expect the mutex to be held for // very short intervals, but a panic during a critical update section is possible, since the // lock encapsulates committing a SQL transaction. // // That being said, in the future we will provide an interface to take the mutex, providing // maximum flexibility for Mentat consumers. // // This approach might need to change when we support interrupting query threads (#297), and // will definitely need to change if we support interrupting transactor threads. // // Improving this is tracked by https://github.com/mozilla/mentat/issues/356. self.metadata.lock().unwrap().schema.clone() } /// Query the Mentat store, using the given connection and the current metadata. pub fn q_once(&self, sqlite: &rusqlite::Connection, query: &str, inputs: T, limit: U) -> Result where T: Into>>, U: Into> { q_once(sqlite, &*self.current_schema(), query, inputs, limit) } /// Transact entities against the Mentat store, using the given connection and the current /// metadata. pub fn transact(&mut self, sqlite: &mut rusqlite::Connection, transaction: &str) -> Result { let assertion_vector = edn::parse::value(transaction)?; let entities = mentat_tx_parser::Tx::parse(assertion_vector)?; let tx = sqlite.transaction()?; let (current_generation, current_partition_map, current_schema) = { // The mutex is taken during this block. let ref current: Metadata = *self.metadata.lock().unwrap(); (current.generation, // Expensive, but the partition map is updated after every committed transaction. current.partition_map.clone(), // Cheap. current.schema.clone()) }; // The transaction is processed while the mutex is not held. let (report, next_partition_map, next_schema) = transact(&tx, current_partition_map, &*current_schema, &*current_schema, entities)?; { // The mutex is taken during this block. let mut metadata = self.metadata.lock().unwrap(); if current_generation != metadata.generation { // Somebody else wrote! // Retrying is tracked by https://github.com/mozilla/mentat/issues/357. bail!("Lost the transact() race!"); } // Commit the SQLite transaction while we hold the mutex. tx.commit()?; metadata.generation += 1; metadata.partition_map = next_partition_map; if let Some(next_schema) = next_schema { metadata.schema = Arc::new(next_schema); } } Ok(report) } } #[cfg(test)] mod tests { use super::*; extern crate mentat_parser_utils; #[test] fn test_transact_errors() { let mut sqlite = db::new_connection("").unwrap(); let mut conn = Conn::connect(&mut sqlite).unwrap(); // Good: empty transaction. let report = conn.transact(&mut sqlite, "[]").unwrap(); assert_eq!(report.tx_id, 0x10000000 + 1); // Bad EDN: missing closing ']'. let report = conn.transact(&mut sqlite, "[[:db/add \"t\" :db/ident :a/keyword]"); match report.unwrap_err() { Error(ErrorKind::EdnParseError(_), _) => { }, x => panic!("expected EDN parse error, got {:?}", x), } // Good EDN. let report = conn.transact(&mut sqlite, "[[:db/add \"t\" :db/ident :a/keyword]]").unwrap(); assert_eq!(report.tx_id, 0x10000000 + 2); // Bad transaction data: missing leading :db/add. let report = conn.transact(&mut sqlite, "[[\"t\" :db/ident :b/keyword]]"); match report.unwrap_err() { Error(ErrorKind::TxParseError(::mentat_tx_parser::errors::ErrorKind::ParseError(_)), _) => { }, x => panic!("expected EDN parse error, got {:?}", x), } // Good transaction data. let report = conn.transact(&mut sqlite, "[[:db/add \"u\" :db/ident :b/keyword]]").unwrap(); assert_eq!(report.tx_id, 0x10000000 + 3); // Bad transaction based on state of store: conflicting upsert. let report = conn.transact(&mut sqlite, "[[:db/add \"u\" :db/ident :a/keyword] [:db/add \"u\" :db/ident :b/keyword]]"); match report.unwrap_err() { Error(ErrorKind::DbError(::mentat_db::errors::ErrorKind::NotYetImplemented(_)), _) => { }, x => panic!("expected EDN parse error, got {:?}", x), } } }