First pass at Stores

This commit is contained in:
Emily Toop 2018-05-16 17:29:27 +01:00
parent 65fc352802
commit 8b3f5a8478
3 changed files with 77 additions and 15 deletions

View file

@ -115,6 +115,7 @@ use query::{
/// changing parts (schema) that we want to share across threads. /// changing parts (schema) that we want to share across threads.
/// ///
/// See https://github.com/mozilla/mentat/wiki/Thoughts:-modeling-db-conn-in-Rust. /// See https://github.com/mozilla/mentat/wiki/Thoughts:-modeling-db-conn-in-Rust.
#[derive(Clone)]
pub struct Metadata { pub struct Metadata {
pub generation: u64, pub generation: u64,
pub partition_map: PartitionMap, pub partition_map: PartitionMap,
@ -135,6 +136,7 @@ impl Metadata {
} }
/// A mutable, safe reference to the current Mentat store. /// A mutable, safe reference to the current Mentat store.
#[derive(Clone)]
pub struct Conn { pub struct Conn {
/// `Mutex` since all reads and writes need to be exclusive. Internally, owned data for the /// `Mutex` since all reads and writes need to be exclusive. Internally, owned data for the
/// volatile parts (generation and partition map), and `Arc` for the infrequently changing parts /// volatile parts (generation and partition map), and `Arc` for the infrequently changing parts
@ -151,13 +153,12 @@ pub struct Conn {
/// gives us copy-on-write semantics. /// gives us copy-on-write semantics.
/// We store that cached `Arc` here in a `Mutex`, so that the main copy can be carefully /// We store that cached `Arc` here in a `Mutex`, so that the main copy can be carefully
/// replaced on commit. /// replaced on commit.
metadata: Mutex<Metadata>, metadata: Arc<Mutex<Metadata>>,
// TODO: maintain set of change listeners or handles to transaction report queues. #298. // TODO: maintain set of change listeners or handles to transaction report queues. #298.
// TODO: maintain cache of query plans that could be shared across threads and invalidated when // TODO: maintain cache of query plans that could be shared across threads and invalidated when
// the schema changes. #315. // the schema changes. #315.
tx_observer_service: Mutex<TxObservationService>,
pub(crate) tx_observer_service: Arc<Mutex<TxObservationService>>, pub(crate) tx_observer_service: Arc<Mutex<TxObservationService>>,
} }
@ -572,8 +573,8 @@ impl Conn {
// Intentionally not public. // Intentionally not public.
fn new(partition_map: PartitionMap, schema: Schema) -> Conn { fn new(partition_map: PartitionMap, schema: Schema) -> Conn {
Conn { Conn {
metadata: Mutex::new(Metadata::new(0, partition_map, Arc::new(schema), Default::default())), metadata: Arc::new(Mutex::new(Metadata::new(0, partition_map, Arc::new(schema), Default::default()))),
tx_observer_service: Mutex::new(TxObservationService::new()), tx_observer_service: Arc::new(Mutex::new(TxObservationService::new())),
} }
} }

View file

@ -137,6 +137,7 @@ pub use conn::{
pub use stores::{ pub use stores::{
Store, Store,
Stores,
}; };
#[cfg(test)] #[cfg(test)]

View file

@ -16,6 +16,7 @@ use std::collections::{
use std::path::{ use std::path::{
Path, Path,
PathBuf,
}; };
use std::sync::{ use std::sync::{
@ -62,10 +63,66 @@ use query::{
QueryOutput, QueryOutput,
}; };
pub struct Stores {
stores: BTreeMap<PathBuf, Store>,
}
impl Stores {
fn new() -> Stores {
Stores {
stores: Default::default(),
}
}
}
impl Stores {
fn is_open(&self, path: PathBuf) -> bool {
self.stores.contains_key(&path)
}
pub fn open<'p, P>(&mut self, path: P) -> Result<&mut Store> where P: Into<&'p Path> {
let path = path.into();
let canonical = path.canonicalize()?;
Ok(self.stores.entry(canonical).or_insert(Store::open(path.to_str().unwrap())?))
}
pub fn get<'p, P>(&mut self, path: P) -> Result<Option<&Store>> where P: Into<&'p Path> {
let canonical = path.into().canonicalize()?;
Ok(self.stores.get(&canonical))
}
pub fn get_mut<'p, P>(&mut self, path: P) -> Result<Option<&mut Store>> where P: Into<&'p Path> {
let canonical = path.into().canonicalize()?;
Ok(self.stores.get_mut(&canonical))
}
pub fn connect<'p, P>(&mut self, path: P) -> Result<Store> where P: Into<&'p Path> {
let path = path.into();
let canonical = path.canonicalize()?;
let store = self.stores.get_mut(&canonical).unwrap();
let connection = ::new_connection(path.to_str().unwrap())?;
Ok(store.fork(connection))
}
fn open_connections_for_store(&self, path: &PathBuf) -> usize {
Arc::strong_count(self.stores.get(path).unwrap().conn())
}
pub fn close<'p, P>(&mut self, path: P) -> Result<()> where P: Into<&'p Path> {
let canonical = path.into().canonicalize()?;
if self.open_connections_for_store(&canonical) <= 1 {
self.stores.remove(&canonical);
Ok(())
} else {
Ok(())
}
}
}
/// A convenience wrapper around a single SQLite connection and a Conn. This is suitable /// A convenience wrapper around a single SQLite connection and a Conn. This is suitable
/// for applications that don't require complex connection management. /// for applications that don't require complex connection management.
pub struct Store { pub struct Store {
conn: Conn, conn: Arc<Conn>,
sqlite: rusqlite::Connection, sqlite: rusqlite::Connection,
} }
@ -75,11 +132,15 @@ impl Store {
let mut connection = ::new_connection(path)?; let mut connection = ::new_connection(path)?;
let conn = Conn::connect(&mut connection)?; let conn = Conn::connect(&mut connection)?;
Ok(Store { Ok(Store {
conn: conn, conn: Arc::new(conn),
sqlite: connection, sqlite: connection,
}) })
} }
pub fn open_in_memory() -> Result<Store> {
Store::open("")
}
/// Returns a totally blank store with no bootstrap schema. Use `open` instead. /// Returns a totally blank store with no bootstrap schema. Use `open` instead.
pub fn open_empty(path: &str) -> Result<Store> { pub fn open_empty(path: &str) -> Result<Store> {
if !path.is_empty() { if !path.is_empty() {
@ -91,7 +152,7 @@ impl Store {
let mut connection = ::new_connection(path)?; let mut connection = ::new_connection(path)?;
let conn = Conn::empty(&mut connection)?; let conn = Conn::empty(&mut connection)?;
Ok(Store { Ok(Store {
conn: conn, conn: Arc::new(conn),
sqlite: connection, sqlite: connection,
}) })
} }
@ -124,25 +185,25 @@ impl Store {
} }
} }
pub fn dismantle(self) -> (rusqlite::Connection, Conn) { pub fn dismantle(self) -> (rusqlite::Connection, Arc<Conn>) {
(self.sqlite, self.conn) (self.sqlite, self.conn)
} }
pub fn conn(&self) -> &Conn { pub fn conn(&self) -> &Arc<Conn> {
&self.conn &self.conn
} }
pub fn begin_read<'m>(&'m mut self) -> Result<InProgressRead<'m, 'm>> { pub fn begin_read<'m>(&'m mut self) -> Result<InProgressRead<'m, 'm>> {
self.conn.begin_read(&mut self.sqlite) Arc::make_mut(&mut self.conn).begin_read(&mut self.sqlite)
} }
pub fn begin_transaction<'m>(&'m mut self) -> Result<InProgress<'m, 'm>> { pub fn begin_transaction<'m>(&'m mut self) -> Result<InProgress<'m, 'm>> {
self.conn.begin_transaction(&mut self.sqlite) Arc::make_mut(&mut self.conn).begin_transaction(&mut self.sqlite)
} }
pub fn cache(&mut self, attr: &Keyword, direction: CacheDirection) -> Result<()> { pub fn cache(&mut self, attr: &Keyword, direction: CacheDirection) -> Result<()> {
let schema = &self.conn.current_schema(); let schema = &self.conn.current_schema();
self.conn.cache(&mut self.sqlite, Arc::make_mut(&mut self.conn).cache(&mut self.sqlite,
schema, schema,
attr, attr,
direction, direction,
@ -150,11 +211,11 @@ impl Store {
} }
pub fn register_observer(&mut self, key: String, observer: Arc<TxObserver>) { pub fn register_observer(&mut self, key: String, observer: Arc<TxObserver>) {
self.conn.register_observer(key, observer); Arc::make_mut(&mut self.conn).register_observer(key, observer);
} }
pub fn unregister_observer(&mut self, key: &String) { pub fn unregister_observer(&mut self, key: &String) {
self.conn.unregister_observer(key); Arc::make_mut(&mut self.conn).unregister_observer(key);
} }
} }
@ -232,7 +293,6 @@ mod tests {
use mentat_core::{ use mentat_core::{
CachedAttributes, CachedAttributes,
HasSchema,
TypedValue, TypedValue,
ValueType, ValueType,
}; };