Add Stores to manage Conn and creation of rusqlite::Connections.

Enable ability to create named in memory stores and in memory stores with shared caches.
Include ability to create encrypted connections.
Update `Store` to take an `Arc<Conn>` so references can be shared.
Update FFI to use `Stores` instead of `Store`.
Add `store_open_named_in_memory_store` to open a named in-memory store over FFI (useful for tests).
This commit is contained in:
Emily Toop 2018-07-04 14:30:23 +01:00
parent d056c7cc10
commit e1c2c9ee77
8 changed files with 494 additions and 17 deletions

View file

@ -89,7 +89,7 @@ fn escape_string_for_pragma(s: &str) -> String {
s.replace("'", "''")
}
fn make_connection(uri: &Path, maybe_encryption_key: Option<&str>) -> rusqlite::Result<rusqlite::Connection> {
pub fn make_connection(uri: &Path, maybe_encryption_key: Option<&str>) -> rusqlite::Result<rusqlite::Connection> {
let conn = match uri.to_string_lossy().len() {
0 => rusqlite::Connection::open_in_memory()?,
_ => rusqlite::Connection::open(uri)?,

View file

@ -82,6 +82,7 @@ pub use entids::{
pub use db::{
TypedSQLValue,
new_connection,
make_connection,
};
#[cfg(feature = "sqlcipher")]

View file

@ -107,6 +107,7 @@ pub use mentat::{
QueryResults,
RelResult,
Store,
Stores,
Syncable,
TypedValue,
TxObserver,
@ -220,7 +221,14 @@ pub unsafe extern "C" fn store_open(uri: *const c_char, error: *mut ExternError)
pub unsafe extern "C" fn store_open_encrypted(uri: *const c_char, key: *const c_char, error: *mut ExternError) -> *mut Store {
let uri = c_char_to_string(uri);
let key = c_char_to_string(key);
translate_result(Store::open_with_key(&uri, &key), error)
translate_result(Stores::open_with_key(&uri, &key), error)
}
/// Variant of store_open that opens a named in-memory database.
#[no_mangle]
pub unsafe extern "C" fn store_open_named_in_memory_store(name: *const c_char, error: *mut ExternError) -> *mut Store {
let name = c_char_to_string(name);
translate_result(Stores::open_named_in_memory_store(name), error)
}
// TODO: open empty
@ -1556,7 +1564,6 @@ pub unsafe extern "C" fn typed_value_into_long(typed_value: *mut Binding) -> c_l
pub unsafe extern "C" fn typed_value_into_entid(typed_value: *mut Binding) -> Entid {
assert_not_null!(typed_value);
let typed_value = Box::from_raw(typed_value);
println!("typed value as entid {:?}", typed_value);
unwrap_conversion(typed_value.into_entid(), ValueType::Ref)
}

View file

@ -11,8 +11,8 @@
#![allow(dead_code)]
use std; // To refer to std::result::Result.
use std::collections::BTreeSet;
use std::path::PathBuf;
use rusqlite;
@ -87,6 +87,15 @@ pub enum MentatError {
#[fail(display = "provided value of type {} doesn't match attribute value type {}", _0, _1)]
ValueTypeMismatch(ValueType, ValueType),
#[fail(display = "Cannot open store {} at path {:?} as it does not match previous store location {:?}", _0, _1, _2)]
StorePathMismatch(String, PathBuf, PathBuf),
#[fail(display = "The Store at {} does not exist or is not yet open.", _0)]
StoreNotFound(String),
#[fail(display = "The Store at {:?} has active connections and cannot be closed.", _0)]
StoresLockPoisoned(String),
#[fail(display = "{}", _0)]
IoError(#[cause] std::io::Error),

View file

@ -10,10 +10,6 @@
#![allow(dead_code)]
use std::borrow::{
Borrow,
};
use std::collections::{
BTreeMap,
};

View file

@ -176,6 +176,7 @@ pub use mentat_transaction::query::{
pub mod conn;
pub mod query_builder;
pub mod store;
pub mod stores;
pub mod vocabulary;
pub use query_builder::{
@ -199,6 +200,10 @@ pub use store::{
Store,
};
pub use stores::{
Stores,
};
#[cfg(test)]
mod tests {
use edn::symbols::Keyword;

View file

@ -72,17 +72,25 @@ use mentat_transaction::query::{
/// A convenience wrapper around a single SQLite connection and a Conn. This is suitable
/// for applications that don't require complex connection management.
pub struct Store {
conn: Conn,
conn: Arc<Conn>,
sqlite: rusqlite::Connection,
}
impl Store {
/// Create a Store from a connection and Conn.
pub fn new(conn: Arc<Conn>, connection: rusqlite::Connection) -> Result<Store> {
Ok(Store {
conn: conn,
sqlite: connection,
})
}
/// Open a store at the supplied path, ensuring that it includes the bootstrap schema.
pub fn open(path: &str) -> Result<Store> {
let mut connection = ::new_connection(path)?;
let conn = Conn::connect(&mut connection)?;
Ok(Store {
conn: conn,
conn: Arc::new(conn),
sqlite: connection,
})
}
@ -104,14 +112,33 @@ impl Store {
let mut connection = ::new_connection_with_key(path, encryption_key)?;
let conn = Conn::connect(&mut connection)?;
Ok(Store {
conn: conn,
conn: Arc::new(conn),
sqlite: connection,
})
}
/// Change the key for a database that was opened using `open_with_key` (using `PRAGMA
/// rekey`). Fails unless linked against sqlcipher (or something else that supports the Sqlite
/// Encryption Extension).
/// Variant of `open_empty` that allows a key (for encryption/decryption) to
/// be supplied. Fails unless linked against sqlcipher (or something else
/// that supports the Sqlite Encryption Extension).
pub fn open_empty_with_key(path: &str, encryption_key: &str) -> Result<Store> {
if !path.is_empty() {
if Path::new(path).exists() {
bail!(MentatError::PathAlreadyExists(path.to_string()));
}
}
let mut connection = ::new_connection_with_key(path, encryption_key)?;
let conn = Conn::empty(&mut connection)?;
Ok(Store {
conn: Arc::new(conn),
sqlite: connection,
})
}
/// Change the key for a database that was opened using `open_with_key` or
/// `open_empty_with_key` (using `PRAGMA rekey`). Fails unless linked
/// against sqlcipher (or something else that supports the Sqlite Encryption
/// Extension).
pub fn change_encryption_key(&mut self, new_encryption_key: &str) -> Result<()> {
::change_encryption_key(&self.sqlite, new_encryption_key)?;
Ok(())
@ -131,12 +158,12 @@ impl Store {
}
impl Store {
pub fn dismantle(self) -> (rusqlite::Connection, Conn) {
pub fn dismantle(self) -> (rusqlite::Connection, Arc<Conn>) {
(self.sqlite, self.conn)
}
pub fn conn(&self) -> &Conn {
&self.conn
pub fn conn(&self) -> Arc<Conn> {
self.conn.clone()
}
pub fn begin_read<'m>(&'m mut self) -> Result<InProgressRead<'m, 'm>> {

432
src/stores.rs Normal file
View file

@ -0,0 +1,432 @@
// Copyright 2016 Mozilla
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// this file except in compliance with the License. You may obtain a copy of the
// License at http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
#![allow(dead_code)]
use std::collections::{
BTreeMap,
};
use std::convert::{AsRef};
use std::collections::btree_map::{
Entry,
};
use std::path::{
Path,
};
use std::sync::{
Arc,
RwLock,
};
use rusqlite;
use mentat_db::{
make_connection,
};
use conn::{
Conn,
};
use errors::*;
use store::{
Store,
};
/// A process is only permitted to have one open handle to each database. This manager
/// exists to enforce that constraint: don't open databases directly.
lazy_static! {
static ref MANAGER: RwLock<Stores> = RwLock::new(Stores::new());
}
/// A struct to store a tuple of a path to a store
/// and the connection to that store. We stores these things
/// together to ensure that two stores at different paths cannot
/// be opened with the same name.
struct StoreConnection {
conn: Arc<Conn>,
file_path: String,
}
impl StoreConnection {
fn new<T>(path: T, maybe_encryption_key: Option<&str>) -> Result<(StoreConnection, rusqlite::Connection)> where T: AsRef<Path> {
let path = path.as_ref().to_path_buf();
let os_str = path.as_os_str();
let file_path = if os_str.is_empty() {
"file::memory:?cache=shared"
} else {
os_str.to_str().unwrap()
};
StoreConnection::new_connection(file_path, maybe_encryption_key)
}
fn new_named_in_memory_connection(name: &str, maybe_encryption_key: Option<&str>) -> Result<(StoreConnection, rusqlite::Connection)> {
let file = format!("file::{}?mode=memory&cache=shared", name);
StoreConnection::new_connection(&file, maybe_encryption_key)
}
fn new_connection(file: &str, maybe_encryption_key: Option<&str>) -> Result<(StoreConnection, rusqlite::Connection)> {
let mut sqlite = make_connection(file.as_ref(), maybe_encryption_key)?;
Ok((StoreConnection {
conn: Arc::new(Conn::connect(&mut sqlite)?),
file_path: file.to_string(),
}, sqlite))
}
fn store(& mut self) -> Result<Store> {
let sqlite = make_connection(&self.file_path.as_ref(), None)?;
Store::new(self.conn.clone(), sqlite)
}
fn encrypted_store(& mut self, encryption_key: &str) -> Result<Store> {
let sqlite = make_connection(&self.file_path.as_ref(), Some(encryption_key))?;
Store::new(self.conn.clone(), sqlite)
}
fn store_with_connection(& mut self, sqlite: rusqlite::Connection) -> Result<Store> {
Store::new(self.conn.clone(), sqlite)
}
}
/// Stores keeps a reference to a Conn that has been opened for a store
/// along with the path to the store and a key that uniquely identifies
/// that store. The key is stored as a String so that multiple in memory stores
/// can be named and uniquely identified.
pub struct Stores {
connections: BTreeMap<String, StoreConnection>,
}
impl Stores {
fn new() -> Stores {
Stores {
connections: Default::default(),
}
}
pub fn singleton() -> &'static RwLock<Stores> {
&*MANAGER
}
fn is_store_open(name: &str) -> bool {
Stores::singleton().read().unwrap().is_open(&name)
}
pub fn open_store<T>(path: T) -> Result<Store> where T: AsRef<Path> {
let path_ref = path.as_ref();
let name: String = path_ref.to_string_lossy().into();
Stores::singleton().write().map_err(|_| MentatError::StoresLockPoisoned(name.clone()))?.open(&name, path_ref)
}
pub fn open_named_in_memory_store(name: &str) -> Result<Store> {
Stores::singleton().write().map_err(|_| MentatError::StoresLockPoisoned(name.to_string()))?.open(name, "")
}
#[cfg(feature = "sqlcipher")]
pub fn open_store_with_key<T>(path: T, encryption_key: &str) -> Result<Store> where T: AsRef<Path> {
let path_ref = path.as_ref();
let name: String = path_ref.to_string_lossy().into();
Stores::singleton().write().map_err(|_| MentatError::StoresLockPoisoned(name.clone()))?.open_with_key(&name, path_ref, encryption_key)
}
pub fn get_store<T>(path: T) -> Result<Option<Store>> where T: AsRef<Path> {
let name: String = path.as_ref().to_string_lossy().into();
Stores::singleton().write().map_err(|_| MentatError::StoresLockPoisoned(name.clone()))?.get(&name)
}
#[cfg(feature = "sqlcipher")]
pub fn get_store_with_key<T>(path: T, encryption_key: &str) -> Result<Option<Store>> where T: AsRef<Path> {
let name: String = path.as_ref().to_string_lossy().into();
Stores::singleton().write().map_err(|_| MentatError::StoresLockPoisoned(name.clone()))?.get_with_key(&name, encryption_key)
}
pub fn get_named_in_memory_store(name: &str) -> Result<Option<Store>> {
Stores::singleton().write().map_err(|_| MentatError::StoresLockPoisoned(name.to_string()))?.get(name)
}
pub fn connect_store< T>(path: T) -> Result<Store> where T: AsRef<Path> {
let name: String = path.as_ref().to_string_lossy().into();
Stores::singleton().write().map_err(|_| MentatError::StoresLockPoisoned(name.clone()))?.connect(&name)
}
#[cfg(feature = "sqlcipher")]
pub fn connect_store_with_key< T>(path: T, encryption_key: &str) -> Result<Store> where T: AsRef<Path> {
let name: String = path.as_ref().to_string_lossy().into();
Stores::singleton().write().map_err(|_| MentatError::StoresLockPoisoned(name.clone()))?.connect_with_key(&name, encryption_key)
}
pub fn connect_named_in_memory_store(name: &str) -> Result<Store> {
Stores::singleton().write().map_err(|_| MentatError::StoresLockPoisoned(name.to_string()))?.connect(name)
}
pub fn close_store<T>(path: T) -> Result<()> where T: AsRef<Path> {
let name: String = path.as_ref().to_string_lossy().into();
Stores::singleton().write().map_err(|_| MentatError::StoresLockPoisoned(name.clone()))?.close(&name)
}
pub fn close_named_in_memory_store(name: &str) -> Result<()> {
Stores::singleton().write().map_err(|_| MentatError::StoresLockPoisoned(name.to_string()))?.close(name)
}
}
impl Stores {
// Returns true if there exists an entry for the provided name in the connections map.
// This does not guarentee that the weak reference we hold to the Conn is still valid.
fn is_open(&self, name: &str) -> bool {
self.connections.contains_key(name)
}
// Open a store with an existing connection if available, or
// create a new connection if not.
pub fn open<T>(&mut self, name: &str, path: T) -> Result<Store> where T: AsRef<Path> {
match self.connections.entry(name.to_string()) {
Entry::Occupied(mut entry) => {
let connection = entry.get_mut();
connection.store()
},
Entry::Vacant(entry) =>{
let path = path.as_ref().to_path_buf();
let (mut store_connection, connection) = if !name.is_empty() && path.as_os_str().is_empty() {
StoreConnection::new_named_in_memory_connection(name, None)?
} else {
StoreConnection::new(path, None)?
};
let store = store_connection.store_with_connection(connection);
entry.insert(store_connection);
store
},
}
}
// Open an encrypted store with an existing connection if available, or
// create a new connection if not.
#[cfg(feature = "sqlcipher")]
pub fn open_with_key<T>(&mut self, name: &str, path: T, encryption_key: &str) -> Result<Store> where T: AsRef<Path> {
match self.connections.entry(name.to_string()) {
Entry::Occupied(mut entry) => {
let connection = entry.get_mut();
connection.store()
},
Entry::Vacant(entry) =>{
let path = path.as_ref().to_path_buf();
let (mut store_connection, connection) = if !name.is_empty() && path.as_os_str().is_empty() {
StoreConnection::new_named_in_memory_connection(name, Some(encryption_key))?
} else {
StoreConnection::new(path, Some(encryption_key))?
};
let store = store_connection.store_with_connection(connection);
entry.insert(store_connection);
store
},
}
}
// Returns a store with an existing connection to path, if available, or None if a
// store at the provided path has not yet been opened.
pub fn get(&mut self, name: &str) -> Result<Option<Store>> {
self.connections.get_mut(name)
.map_or(Ok(None), |store_conn| store_conn.store()
.map(|s| Some(s)))
}
// Returns an encrypted store with an existing connection to path, if available, or None if a
// store at the provided path has not yet been opened.
#[cfg(feature = "sqlcipher")]
pub fn get_with_key(&mut self, name: &str, encryption_key: &str) -> Result<Option<Store>> {
self.connections.get_mut(name)
.map_or(Ok(None), |store_conn| store_conn.encrypted_store(encryption_key)
.map(|s| Some(s)))
}
// Creates a new store on an existing connection with a new rusqlite connection.
// Equivalent to forking an existing store.
pub fn connect(&mut self, name: &str) -> Result<Store> {
self.connections.get_mut(name)
.ok_or(MentatError::StoreNotFound(name.to_string()).into())
.and_then(|store_conn| store_conn.store())
}
// Creates a new store on an existing connection with a new encrypted rusqlite connection.
// Equivalent to forking an existing store.
#[cfg(feature = "sqlcipher")]
pub fn connect_with_key(&mut self, name: &str, encryption_key: &str) -> Result<Store> {
self.connections.get_mut(name)
.ok_or(MentatError::StoreNotFound(name.to_string()).into())
.and_then(|store_conn| store_conn.encrypted_store(encryption_key))
}
// Drops the weak reference we have stored to an opened store there is no more than
// one Store with a reference to the Conn for the provided path.
pub fn close(&mut self, name: &str) -> Result<()> {
self.connections.remove(name);
return Ok(());
}
}
#[cfg(test)]
mod tests {
use super::*;
use conn::{
Queryable,
};
#[test]
fn test_stores_open_new_store() {
let name = "test.db";
let _store = Stores::open_store(name).expect("Expected a store to be opened");
assert!(Stores::is_store_open(name));
}
#[test]
fn test_stores_open_new_named_in_memory_store() {
let name = "test_stores_open_new_named_in_memory_store";
let _store = Stores::open_named_in_memory_store(name).expect("Expected a store to be opened");
assert!(Stores::is_store_open(name));
}
#[test]
fn test_stores_open_existing_store() {
let name = "test_stores_open_existing_store";
let store1 = Stores::open_named_in_memory_store(name).expect("Expected a store to be opened");
assert!(Stores::is_store_open(name));
let store2 = Stores::open_named_in_memory_store(name).expect("Expected a store to be opened");
assert!(Arc::ptr_eq(&store1.conn(), &store2.conn()));
}
#[test]
fn test_stores_get_open_store() {
let name = "test_stores_get_open_store";
let store = Stores::open_named_in_memory_store(name).expect("Expected a store to be opened");
assert!(Stores::is_store_open(name));
let store_ref = Stores::get_named_in_memory_store(name).expect("Expected a store to be fetched").expect("store");
assert!(Arc::ptr_eq(&store.conn(), &store_ref.conn()));
}
#[test]
fn test_stores_get_closed_store() {
match Stores::get_named_in_memory_store("test_stores_get_closed_store").expect("Expected a store to be fetched") {
None => (),
Some(_) => panic!("Store is not open and so none should be returned"),
}
}
#[test]
fn test_stores_connect_open_store() {
let name = "test_stores_connect_open_store";
let store1 = Stores::open_named_in_memory_store(name).expect("Expected a store to be opened");
assert!(Stores::is_store_open(name));
{
// connect to an existing store
let store2 = Stores::connect_named_in_memory_store(name).expect("expected a new store");
assert!(Arc::ptr_eq(&store1.conn(), &store2.conn()));
// get the existing store
let store3 = Stores::get_named_in_memory_store(name).expect("Expected a store to be fetched").unwrap();
assert!(Arc::ptr_eq(&store2.conn(), &store3.conn()));
}
// connect to the store again
let store4 = Stores::connect_named_in_memory_store(name).expect("expected a new store");
assert!(Arc::ptr_eq(&store1.conn(), &store4.conn()));
}
#[test]
fn test_stores_connect_closed_store() {
let name = "test_stores_connect_closed_store";
let err = Stores::connect_named_in_memory_store(name).err();
match err.unwrap() {
MentatError::StoreNotFound(message) => { assert_eq!(name, message); },
x => panic!("expected Store Not Found error, got {:?}", x),
}
}
#[test]
fn test_stores_close_store_with_one_reference() {
let name = "test_stores_close_store_with_one_reference";
let store = Stores::open_named_in_memory_store(name).expect("Expected a store to be opened");
assert_eq!(3, Arc::strong_count(&store.conn()));
assert!(Stores::close_named_in_memory_store(name).is_ok());
assert!(Stores::get_named_in_memory_store(name).expect("expected an empty result").is_none())
}
#[test]
fn test_stores_close_store_with_multiple_references() {
let name = "test_stores_close_store_with_multiple_references";
let store1 = Stores::open_named_in_memory_store(name).expect("Expected a store to be opened");
assert!(Stores::is_store_open(name));
let store2 = Stores::connect_named_in_memory_store(name).expect("expected a connected store");
assert!(Arc::ptr_eq(&store1.conn(), &store2.conn()));
Stores::close_named_in_memory_store(name).expect("succeeded");
assert!(Stores::is_store_open(name) == false);
}
#[test]
fn test_stores_close_unopened_store() {
let name = "test_stores_close_unopened_store";
Stores::close_named_in_memory_store(name).expect("succeeded");
assert!(Stores::is_store_open(name) == false);
}
#[test]
fn test_stores_connect_perform_mutable_operations() {
let path = "test.db";
let mut store1 = Stores::open_store(path).expect("Expected a store to be opened");
{
let mut in_progress = store1.begin_transaction().expect("begun");
in_progress.transact(r#"[
{ :db/ident :foo/bar
:db/cardinality :db.cardinality/one
:db/index true
:db/unique :db.unique/identity
:db/valueType :db.type/long },
{ :db/ident :foo/baz
:db/cardinality :db.cardinality/one
:db/valueType :db.type/boolean }
{ :db/ident :foo/x
:db/cardinality :db.cardinality/many
:db/valueType :db.type/long }]"#).expect("transact");
in_progress.commit().expect("commit");
}
// Forking an open store leads to a ref count of 2 on the shared conn.
// We should be able to perform write operations on this connection.
let mut store2 = Stores::connect_store(path).expect("expected a new store");
let mut in_progress = store2.begin_transaction().expect("begun");
in_progress.transact(r#"[
{:foo/bar 15, :foo/baz false, :foo/x [1, 2, 3]}
{:foo/bar 99, :foo/baz true}
{:foo/bar -2, :foo/baz true}
]"#).expect("transact");
in_progress.commit().expect("commit");
// We should be able to see the changes made on `store2` on `store1`
let result = store1.q_once(r#"[:find ?e . :where [?e :foo/baz false]]"#, None).expect("succeeded");
assert!(result.into_scalar().expect("succeeded").is_some());
}
#[test]
#[cfg(feature = "sqlcipher")]
fn test_open_store_with_key() {
let secret_key = "key";
let name = "../fixtures/v1encrypted.db";
let _store = Stores::open_store_with_key(name, secret_key).expect("Expected a store to be opened");
assert!(Stores::is_store_open(name));
}
}