Compare commits
6 commits
master
...
static_man
Author | SHA1 | Date | |
---|---|---|---|
|
9e6d6c2834 | ||
|
baa7cb4f8f | ||
|
9d8c666052 | ||
|
10eba4fdbf | ||
|
8b3f5a8478 | ||
|
65fc352802 |
4 changed files with 996 additions and 572 deletions
581
src/conn.rs
581
src/conn.rs
|
@ -88,10 +88,6 @@ use edn::entities::{
|
||||||
OpType,
|
OpType,
|
||||||
};
|
};
|
||||||
|
|
||||||
use mentat_tolstoy::Syncer;
|
|
||||||
|
|
||||||
use uuid::Uuid;
|
|
||||||
|
|
||||||
use entity_builder::{
|
use entity_builder::{
|
||||||
InProgressBuilder,
|
InProgressBuilder,
|
||||||
TermBuilder,
|
TermBuilder,
|
||||||
|
@ -119,6 +115,7 @@ use query::{
|
||||||
/// changing parts (schema) that we want to share across threads.
|
/// changing parts (schema) that we want to share across threads.
|
||||||
///
|
///
|
||||||
/// See https://github.com/mozilla/mentat/wiki/Thoughts:-modeling-db-conn-in-Rust.
|
/// See https://github.com/mozilla/mentat/wiki/Thoughts:-modeling-db-conn-in-Rust.
|
||||||
|
#[derive(Clone)]
|
||||||
pub struct Metadata {
|
pub struct Metadata {
|
||||||
pub generation: u64,
|
pub generation: u64,
|
||||||
pub partition_map: PartitionMap,
|
pub partition_map: PartitionMap,
|
||||||
|
@ -139,6 +136,7 @@ impl Metadata {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A mutable, safe reference to the current Mentat store.
|
/// A mutable, safe reference to the current Mentat store.
|
||||||
|
#[derive(Clone)]
|
||||||
pub struct Conn {
|
pub struct Conn {
|
||||||
/// `Mutex` since all reads and writes need to be exclusive. Internally, owned data for the
|
/// `Mutex` since all reads and writes need to be exclusive. Internally, owned data for the
|
||||||
/// volatile parts (generation and partition map), and `Arc` for the infrequently changing parts
|
/// volatile parts (generation and partition map), and `Arc` for the infrequently changing parts
|
||||||
|
@ -155,55 +153,13 @@ pub struct Conn {
|
||||||
/// gives us copy-on-write semantics.
|
/// gives us copy-on-write semantics.
|
||||||
/// We store that cached `Arc` here in a `Mutex`, so that the main copy can be carefully
|
/// We store that cached `Arc` here in a `Mutex`, so that the main copy can be carefully
|
||||||
/// replaced on commit.
|
/// replaced on commit.
|
||||||
metadata: Mutex<Metadata>,
|
metadata: Arc<Mutex<Metadata>>,
|
||||||
|
|
||||||
// TODO: maintain set of change listeners or handles to transaction report queues. #298.
|
// TODO: maintain set of change listeners or handles to transaction report queues. #298.
|
||||||
|
|
||||||
// TODO: maintain cache of query plans that could be shared across threads and invalidated when
|
// TODO: maintain cache of query plans that could be shared across threads and invalidated when
|
||||||
// the schema changes. #315.
|
// the schema changes. #315.
|
||||||
tx_observer_service: Mutex<TxObservationService>,
|
pub(crate) tx_observer_service: Arc<Mutex<TxObservationService>>,
|
||||||
}
|
|
||||||
|
|
||||||
/// A convenience wrapper around a single SQLite connection and a Conn. This is suitable
|
|
||||||
/// for applications that don't require complex connection management.
|
|
||||||
pub struct Store {
|
|
||||||
conn: Conn,
|
|
||||||
sqlite: rusqlite::Connection,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Store {
|
|
||||||
/// Open a store at the supplied path, ensuring that it includes the bootstrap schema.
|
|
||||||
pub fn open(path: &str) -> Result<Store> {
|
|
||||||
let mut connection = ::new_connection(path)?;
|
|
||||||
let conn = Conn::connect(&mut connection)?;
|
|
||||||
Ok(Store {
|
|
||||||
conn: conn,
|
|
||||||
sqlite: connection,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns a totally blank store with no bootstrap schema. Use `open` instead.
|
|
||||||
pub fn open_empty(path: &str) -> Result<Store> {
|
|
||||||
if !path.is_empty() {
|
|
||||||
if Path::new(path).exists() {
|
|
||||||
bail!(ErrorKind::PathAlreadyExists(path.to_string()));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut connection = ::new_connection(path)?;
|
|
||||||
let conn = Conn::empty(&mut connection)?;
|
|
||||||
Ok(Store {
|
|
||||||
conn: conn,
|
|
||||||
sqlite: connection,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn transact(&mut self, transaction: &str) -> Result<TxReport> {
|
|
||||||
let mut ip = self.begin_transaction()?;
|
|
||||||
let report = ip.transact(transaction)?;
|
|
||||||
ip.commit()?;
|
|
||||||
Ok(report)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub trait Queryable {
|
pub trait Queryable {
|
||||||
|
@ -240,8 +196,8 @@ pub struct InProgress<'a, 'c> {
|
||||||
mutex: &'a Mutex<Metadata>,
|
mutex: &'a Mutex<Metadata>,
|
||||||
generation: u64,
|
generation: u64,
|
||||||
partition_map: PartitionMap,
|
partition_map: PartitionMap,
|
||||||
schema: Schema,
|
pub(crate) schema: Schema,
|
||||||
cache: InProgressSQLiteAttributeCache,
|
pub(crate) cache: InProgressSQLiteAttributeCache,
|
||||||
use_caching: bool,
|
use_caching: bool,
|
||||||
tx_observer: &'a Mutex<TxObservationService>,
|
tx_observer: &'a Mutex<TxObservationService>,
|
||||||
tx_observer_watcher: InProgressObserverTransactWatcher,
|
tx_observer_watcher: InProgressObserverTransactWatcher,
|
||||||
|
@ -600,93 +556,6 @@ impl<'a, 'o> TransactWatcher for InProgressTransactWatcher<'a, 'o> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Store {
|
|
||||||
/// Intended for use from tests.
|
|
||||||
pub fn sqlite_mut(&mut self) -> &mut rusqlite::Connection {
|
|
||||||
&mut self.sqlite
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
pub fn is_registered_as_observer(&self, key: &String) -> bool {
|
|
||||||
self.conn.tx_observer_service.lock().unwrap().is_registered(key)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Store {
|
|
||||||
pub fn dismantle(self) -> (rusqlite::Connection, Conn) {
|
|
||||||
(self.sqlite, self.conn)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn conn(&self) -> &Conn {
|
|
||||||
&self.conn
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn begin_read<'m>(&'m mut self) -> Result<InProgressRead<'m, 'm>> {
|
|
||||||
self.conn.begin_read(&mut self.sqlite)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn begin_transaction<'m>(&'m mut self) -> Result<InProgress<'m, 'm>> {
|
|
||||||
self.conn.begin_transaction(&mut self.sqlite)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn cache(&mut self, attr: &Keyword, direction: CacheDirection) -> Result<()> {
|
|
||||||
let schema = &self.conn.current_schema();
|
|
||||||
self.conn.cache(&mut self.sqlite,
|
|
||||||
schema,
|
|
||||||
attr,
|
|
||||||
direction,
|
|
||||||
CacheAction::Register)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn register_observer(&mut self, key: String, observer: Arc<TxObserver>) {
|
|
||||||
self.conn.register_observer(key, observer);
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn unregister_observer(&mut self, key: &String) {
|
|
||||||
self.conn.unregister_observer(key);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Queryable for Store {
|
|
||||||
fn q_once<T>(&self, query: &str, inputs: T) -> Result<QueryOutput>
|
|
||||||
where T: Into<Option<QueryInputs>> {
|
|
||||||
self.conn.q_once(&self.sqlite, query, inputs)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn q_prepare<T>(&self, query: &str, inputs: T) -> PreparedResult
|
|
||||||
where T: Into<Option<QueryInputs>> {
|
|
||||||
self.conn.q_prepare(&self.sqlite, query, inputs)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn q_explain<T>(&self, query: &str, inputs: T) -> Result<QueryExplanation>
|
|
||||||
where T: Into<Option<QueryInputs>> {
|
|
||||||
self.conn.q_explain(&self.sqlite, query, inputs)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn lookup_values_for_attribute<E>(&self, entity: E, attribute: &edn::Keyword) -> Result<Vec<TypedValue>>
|
|
||||||
where E: Into<Entid> {
|
|
||||||
self.conn.lookup_values_for_attribute(&self.sqlite, entity.into(), attribute)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn lookup_value_for_attribute<E>(&self, entity: E, attribute: &edn::Keyword) -> Result<Option<TypedValue>>
|
|
||||||
where E: Into<Entid> {
|
|
||||||
self.conn.lookup_value_for_attribute(&self.sqlite, entity.into(), attribute)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Pullable for Store {
|
|
||||||
fn pull_attributes_for_entities<E, A>(&self, entities: E, attributes: A) -> Result<BTreeMap<Entid, ValueRc<StructuredMap>>>
|
|
||||||
where E: IntoIterator<Item=Entid>,
|
|
||||||
A: IntoIterator<Item=Entid> {
|
|
||||||
self.conn.pull_attributes_for_entities(&self.sqlite, entities, attributes)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn pull_attributes_for_entity<A>(&self, entity: Entid, attributes: A) -> Result<StructuredMap>
|
|
||||||
where A: IntoIterator<Item=Entid> {
|
|
||||||
self.conn.pull_attributes_for_entity(&self.sqlite, entity, attributes)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
|
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
|
||||||
pub enum CacheDirection {
|
pub enum CacheDirection {
|
||||||
Forward,
|
Forward,
|
||||||
|
@ -700,26 +569,19 @@ pub enum CacheAction {
|
||||||
Deregister,
|
Deregister,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Syncable for Store {
|
|
||||||
fn sync(&mut self, server_uri: &String, user_uuid: &String) -> Result<()> {
|
|
||||||
let uuid = Uuid::parse_str(&user_uuid)?;
|
|
||||||
Ok(Syncer::flow(&mut self.sqlite, server_uri, &uuid)?)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Conn {
|
impl Conn {
|
||||||
// Intentionally not public.
|
// Intentionally not public.
|
||||||
fn new(partition_map: PartitionMap, schema: Schema) -> Conn {
|
fn new(partition_map: PartitionMap, schema: Schema) -> Conn {
|
||||||
Conn {
|
Conn {
|
||||||
metadata: Mutex::new(Metadata::new(0, partition_map, Arc::new(schema), Default::default())),
|
metadata: Arc::new(Mutex::new(Metadata::new(0, partition_map, Arc::new(schema), Default::default()))),
|
||||||
tx_observer_service: Mutex::new(TxObservationService::new()),
|
tx_observer_service: Arc::new(Mutex::new(TxObservationService::new())),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Prepare the provided SQLite handle for use as a Mentat store. Creates tables but
|
/// Prepare the provided SQLite handle for use as a Mentat store. Creates tables but
|
||||||
/// _does not_ write the bootstrap schema. This constructor should only be used by
|
/// _does not_ write the bootstrap schema. This constructor should only be used by
|
||||||
/// consumers that expect to populate raw transaction data themselves.
|
/// consumers that expect to populate raw transaction data themselves.
|
||||||
fn empty(sqlite: &mut rusqlite::Connection) -> Result<Conn> {
|
pub(crate) fn empty(sqlite: &mut rusqlite::Connection) -> Result<Conn> {
|
||||||
let (tx, db) = db::create_empty_current_version(sqlite)
|
let (tx, db) = db::create_empty_current_version(sqlite)
|
||||||
.chain_err(|| "Unable to initialize Mentat store")?;
|
.chain_err(|| "Unable to initialize Mentat store")?;
|
||||||
tx.commit()?;
|
tx.commit()?;
|
||||||
|
@ -976,15 +838,7 @@ mod tests {
|
||||||
extern crate time;
|
extern crate time;
|
||||||
extern crate mentat_parser_utils;
|
extern crate mentat_parser_utils;
|
||||||
|
|
||||||
use std::collections::{
|
|
||||||
BTreeSet,
|
|
||||||
};
|
|
||||||
use std::path::{
|
|
||||||
PathBuf,
|
|
||||||
};
|
|
||||||
use std::sync::mpsc;
|
|
||||||
use std::time::{
|
use std::time::{
|
||||||
Duration,
|
|
||||||
Instant,
|
Instant,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -994,12 +848,7 @@ mod tests {
|
||||||
TypedValue,
|
TypedValue,
|
||||||
};
|
};
|
||||||
|
|
||||||
use ::entity_builder::{
|
|
||||||
BuildTerms,
|
|
||||||
};
|
|
||||||
|
|
||||||
use ::query::{
|
use ::query::{
|
||||||
PreparedQuery,
|
|
||||||
Variable,
|
Variable,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -1009,16 +858,6 @@ mod tests {
|
||||||
QueryResults,
|
QueryResults,
|
||||||
};
|
};
|
||||||
|
|
||||||
use ::vocabulary::{
|
|
||||||
AttributeBuilder,
|
|
||||||
Definition,
|
|
||||||
VersionedStore,
|
|
||||||
};
|
|
||||||
|
|
||||||
use ::vocabulary::attribute::{
|
|
||||||
Unique,
|
|
||||||
};
|
|
||||||
|
|
||||||
use mentat_db::USER0;
|
use mentat_db::USER0;
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
@ -1391,406 +1230,4 @@ mod tests {
|
||||||
assert!(conn.current_cache().is_attribute_cached_forward(db_ident));
|
assert!(conn.current_cache().is_attribute_cached_forward(db_ident));
|
||||||
assert!(conn.current_cache().is_attribute_cached_forward(db_type));
|
assert!(conn.current_cache().is_attribute_cached_forward(db_type));
|
||||||
}
|
}
|
||||||
|
|
||||||
fn fixture_path(rest: &str) -> PathBuf {
|
|
||||||
let fixtures = Path::new("fixtures/");
|
|
||||||
fixtures.join(Path::new(rest))
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_prepared_query_with_cache() {
|
|
||||||
let mut store = Store::open("").expect("opened");
|
|
||||||
let mut in_progress = store.begin_transaction().expect("began");
|
|
||||||
in_progress.import(fixture_path("cities.schema")).expect("transacted schema");
|
|
||||||
in_progress.import(fixture_path("all_seattle.edn")).expect("transacted data");
|
|
||||||
in_progress.cache(&kw!(:neighborhood/district), CacheDirection::Forward, CacheAction::Register).expect("cache done");
|
|
||||||
in_progress.cache(&kw!(:district/name), CacheDirection::Forward, CacheAction::Register).expect("cache done");
|
|
||||||
in_progress.cache(&kw!(:neighborhood/name), CacheDirection::Reverse, CacheAction::Register).expect("cache done");
|
|
||||||
|
|
||||||
let query = r#"[:find ?district
|
|
||||||
:in ?hood
|
|
||||||
:where
|
|
||||||
[?neighborhood :neighborhood/name ?hood]
|
|
||||||
[?neighborhood :neighborhood/district ?d]
|
|
||||||
[?d :district/name ?district]]"#;
|
|
||||||
let hood = "Beacon Hill";
|
|
||||||
let inputs = QueryInputs::with_value_sequence(vec![(var!(?hood), TypedValue::typed_string(hood).into())]);
|
|
||||||
let mut prepared = in_progress.q_prepare(query, inputs)
|
|
||||||
.expect("prepared");
|
|
||||||
match &prepared {
|
|
||||||
&PreparedQuery::Constant { select: ref _select } => {},
|
|
||||||
_ => panic!(),
|
|
||||||
};
|
|
||||||
|
|
||||||
|
|
||||||
let start = time::PreciseTime::now();
|
|
||||||
let results = prepared.run(None).expect("results");
|
|
||||||
let end = time::PreciseTime::now();
|
|
||||||
println!("Prepared cache execution took {}µs", start.to(end).num_microseconds().unwrap());
|
|
||||||
assert_eq!(results.into_rel().expect("result"),
|
|
||||||
vec![vec![TypedValue::typed_string("Greater Duwamish")]].into());
|
|
||||||
}
|
|
||||||
|
|
||||||
trait StoreCache {
|
|
||||||
fn get_entid_for_value(&self, attr: Entid, val: &TypedValue) -> Option<Entid>;
|
|
||||||
fn is_attribute_cached_reverse(&self, attr: Entid) -> bool;
|
|
||||||
fn is_attribute_cached_forward(&self, attr: Entid) -> bool;
|
|
||||||
}
|
|
||||||
|
|
||||||
impl StoreCache for Store {
|
|
||||||
fn get_entid_for_value(&self, attr: Entid, val: &TypedValue) -> Option<Entid> {
|
|
||||||
let cache = self.conn.current_cache();
|
|
||||||
cache.get_entid_for_value(attr, val)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn is_attribute_cached_forward(&self, attr: Entid) -> bool {
|
|
||||||
self.conn.current_cache().is_attribute_cached_forward(attr)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn is_attribute_cached_reverse(&self, attr: Entid) -> bool {
|
|
||||||
self.conn.current_cache().is_attribute_cached_reverse(attr)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_cache_mutation() {
|
|
||||||
let mut store = Store::open("").expect("opened");
|
|
||||||
|
|
||||||
{
|
|
||||||
let mut in_progress = store.begin_transaction().expect("begun");
|
|
||||||
in_progress.transact(r#"[
|
|
||||||
{ :db/ident :foo/bar
|
|
||||||
:db/cardinality :db.cardinality/one
|
|
||||||
:db/index true
|
|
||||||
:db/unique :db.unique/identity
|
|
||||||
:db/valueType :db.type/long },
|
|
||||||
{ :db/ident :foo/baz
|
|
||||||
:db/cardinality :db.cardinality/one
|
|
||||||
:db/valueType :db.type/boolean }
|
|
||||||
{ :db/ident :foo/x
|
|
||||||
:db/cardinality :db.cardinality/many
|
|
||||||
:db/valueType :db.type/long }]"#).expect("transact");
|
|
||||||
|
|
||||||
// Cache one….
|
|
||||||
in_progress.cache(&kw!(:foo/bar), CacheDirection::Reverse, CacheAction::Register).expect("cache done");
|
|
||||||
in_progress.commit().expect("commit");
|
|
||||||
}
|
|
||||||
|
|
||||||
let foo_bar = store.conn.current_schema().get_entid(&kw!(:foo/bar)).expect("foo/bar").0;
|
|
||||||
let foo_baz = store.conn.current_schema().get_entid(&kw!(:foo/baz)).expect("foo/baz").0;
|
|
||||||
let foo_x = store.conn.current_schema().get_entid(&kw!(:foo/x)).expect("foo/x").0;
|
|
||||||
|
|
||||||
// … and cache the others via the store.
|
|
||||||
store.cache(&kw!(:foo/baz), CacheDirection::Both).expect("cache done");
|
|
||||||
store.cache(&kw!(:foo/x), CacheDirection::Forward).expect("cache done");
|
|
||||||
{
|
|
||||||
assert!(store.is_attribute_cached_reverse(foo_bar));
|
|
||||||
assert!(store.is_attribute_cached_forward(foo_baz));
|
|
||||||
assert!(store.is_attribute_cached_reverse(foo_baz));
|
|
||||||
assert!(store.is_attribute_cached_forward(foo_x));
|
|
||||||
}
|
|
||||||
|
|
||||||
// Add some data.
|
|
||||||
{
|
|
||||||
let mut in_progress = store.begin_transaction().expect("begun");
|
|
||||||
|
|
||||||
{
|
|
||||||
assert!(in_progress.cache.is_attribute_cached_reverse(foo_bar));
|
|
||||||
assert!(in_progress.cache.is_attribute_cached_forward(foo_baz));
|
|
||||||
assert!(in_progress.cache.is_attribute_cached_reverse(foo_baz));
|
|
||||||
assert!(in_progress.cache.is_attribute_cached_forward(foo_x));
|
|
||||||
|
|
||||||
assert!(in_progress.cache.overlay.is_attribute_cached_reverse(foo_bar));
|
|
||||||
assert!(in_progress.cache.overlay.is_attribute_cached_forward(foo_baz));
|
|
||||||
assert!(in_progress.cache.overlay.is_attribute_cached_reverse(foo_baz));
|
|
||||||
assert!(in_progress.cache.overlay.is_attribute_cached_forward(foo_x));
|
|
||||||
}
|
|
||||||
|
|
||||||
in_progress.transact(r#"[
|
|
||||||
{:foo/bar 15, :foo/baz false, :foo/x [1, 2, 3]}
|
|
||||||
{:foo/bar 99, :foo/baz true}
|
|
||||||
{:foo/bar -2, :foo/baz true}
|
|
||||||
]"#).expect("transact");
|
|
||||||
|
|
||||||
// Data is in the cache.
|
|
||||||
let first = in_progress.cache.get_entid_for_value(foo_bar, &TypedValue::Long(15)).expect("id");
|
|
||||||
assert_eq!(in_progress.cache.get_value_for_entid(&in_progress.schema, foo_baz, first).expect("val"), &TypedValue::Boolean(false));
|
|
||||||
|
|
||||||
// All three values for :foo/x.
|
|
||||||
let all_three: BTreeSet<TypedValue> = in_progress.cache
|
|
||||||
.get_values_for_entid(&in_progress.schema, foo_x, first)
|
|
||||||
.expect("val")
|
|
||||||
.iter().cloned().collect();
|
|
||||||
assert_eq!(all_three, vec![1, 2, 3].into_iter().map(TypedValue::Long).collect());
|
|
||||||
|
|
||||||
in_progress.commit().expect("commit");
|
|
||||||
}
|
|
||||||
|
|
||||||
// Data is still in the cache.
|
|
||||||
{
|
|
||||||
let first = store.get_entid_for_value(foo_bar, &TypedValue::Long(15)).expect("id");
|
|
||||||
let cache: SQLiteAttributeCache = store.conn.current_cache();
|
|
||||||
assert_eq!(cache.get_value_for_entid(&store.conn.current_schema(), foo_baz, first).expect("val"), &TypedValue::Boolean(false));
|
|
||||||
|
|
||||||
let all_three: BTreeSet<TypedValue> = cache.get_values_for_entid(&store.conn.current_schema(), foo_x, first)
|
|
||||||
.expect("val")
|
|
||||||
.iter().cloned().collect();
|
|
||||||
assert_eq!(all_three, vec![1, 2, 3].into_iter().map(TypedValue::Long).collect());
|
|
||||||
}
|
|
||||||
|
|
||||||
// We can remove data and the cache reflects it, immediately and after commit.
|
|
||||||
{
|
|
||||||
let mut in_progress = store.begin_transaction().expect("began");
|
|
||||||
let first = in_progress.cache.get_entid_for_value(foo_bar, &TypedValue::Long(15)).expect("id");
|
|
||||||
in_progress.transact(format!("[[:db/retract {} :foo/x 2]]", first).as_str()).expect("transact");
|
|
||||||
|
|
||||||
let only_two: BTreeSet<TypedValue> = in_progress.cache
|
|
||||||
.get_values_for_entid(&in_progress.schema, foo_x, first)
|
|
||||||
.expect("val")
|
|
||||||
.iter().cloned().collect();
|
|
||||||
assert_eq!(only_two, vec![1, 3].into_iter().map(TypedValue::Long).collect());
|
|
||||||
|
|
||||||
// Rollback: unchanged.
|
|
||||||
}
|
|
||||||
{
|
|
||||||
let first = store.get_entid_for_value(foo_bar, &TypedValue::Long(15)).expect("id");
|
|
||||||
let cache: SQLiteAttributeCache = store.conn.current_cache();
|
|
||||||
assert_eq!(cache.get_value_for_entid(&store.conn.current_schema(), foo_baz, first).expect("val"), &TypedValue::Boolean(false));
|
|
||||||
|
|
||||||
let all_three: BTreeSet<TypedValue> = cache.get_values_for_entid(&store.conn.current_schema(), foo_x, first)
|
|
||||||
.expect("val")
|
|
||||||
.iter().cloned().collect();
|
|
||||||
assert_eq!(all_three, vec![1, 2, 3].into_iter().map(TypedValue::Long).collect());
|
|
||||||
}
|
|
||||||
|
|
||||||
// Try again, but this time commit.
|
|
||||||
{
|
|
||||||
let mut in_progress = store.begin_transaction().expect("began");
|
|
||||||
let first = in_progress.cache.get_entid_for_value(foo_bar, &TypedValue::Long(15)).expect("id");
|
|
||||||
in_progress.transact(format!("[[:db/retract {} :foo/x 2]]", first).as_str()).expect("transact");
|
|
||||||
in_progress.commit().expect("committed");
|
|
||||||
}
|
|
||||||
{
|
|
||||||
let first = store.get_entid_for_value(foo_bar, &TypedValue::Long(15)).expect("id");
|
|
||||||
let cache: SQLiteAttributeCache = store.conn.current_cache();
|
|
||||||
assert_eq!(cache.get_value_for_entid(&store.conn.current_schema(), foo_baz, first).expect("val"), &TypedValue::Boolean(false));
|
|
||||||
|
|
||||||
let only_two: BTreeSet<TypedValue> = cache.get_values_for_entid(&store.conn.current_schema(), foo_x, first)
|
|
||||||
.expect("val")
|
|
||||||
.iter().cloned().collect();
|
|
||||||
assert_eq!(only_two, vec![1, 3].into_iter().map(TypedValue::Long).collect());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn test_register_observer() {
|
|
||||||
let mut conn = Store::open("").unwrap();
|
|
||||||
|
|
||||||
let key = "Test Observer".to_string();
|
|
||||||
let tx_observer = TxObserver::new(BTreeSet::new(), move |_obs_key, _batch| {});
|
|
||||||
|
|
||||||
conn.register_observer(key.clone(), Arc::new(tx_observer));
|
|
||||||
assert!(conn.is_registered_as_observer(&key));
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_deregister_observer() {
|
|
||||||
let mut conn = Store::open("").unwrap();
|
|
||||||
|
|
||||||
let key = "Test Observer".to_string();
|
|
||||||
|
|
||||||
let tx_observer = TxObserver::new(BTreeSet::new(), move |_obs_key, _batch| {});
|
|
||||||
|
|
||||||
conn.register_observer(key.clone(), Arc::new(tx_observer));
|
|
||||||
assert!(conn.is_registered_as_observer(&key));
|
|
||||||
|
|
||||||
conn.unregister_observer(&key);
|
|
||||||
|
|
||||||
assert!(!conn.is_registered_as_observer(&key));
|
|
||||||
}
|
|
||||||
|
|
||||||
fn add_schema(conn: &mut Store) {
|
|
||||||
// transact some schema
|
|
||||||
let mut in_progress = conn.begin_transaction().expect("expected in progress");
|
|
||||||
in_progress.ensure_vocabulary(&Definition::new(
|
|
||||||
kw!(:todo/items),
|
|
||||||
1,
|
|
||||||
vec![
|
|
||||||
(kw!(:todo/uuid),
|
|
||||||
AttributeBuilder::helpful()
|
|
||||||
.value_type(ValueType::Uuid)
|
|
||||||
.multival(false)
|
|
||||||
.unique(Unique::Value)
|
|
||||||
.index(true)
|
|
||||||
.build()),
|
|
||||||
(kw!(:todo/name),
|
|
||||||
AttributeBuilder::helpful()
|
|
||||||
.value_type(ValueType::String)
|
|
||||||
.multival(false)
|
|
||||||
.fulltext(true)
|
|
||||||
.build()),
|
|
||||||
(kw!(:todo/completion_date),
|
|
||||||
AttributeBuilder::helpful()
|
|
||||||
.value_type(ValueType::Instant)
|
|
||||||
.multival(false)
|
|
||||||
.build()),
|
|
||||||
(kw!(:label/name),
|
|
||||||
AttributeBuilder::helpful()
|
|
||||||
.value_type(ValueType::String)
|
|
||||||
.multival(false)
|
|
||||||
.unique(Unique::Value)
|
|
||||||
.fulltext(true)
|
|
||||||
.index(true)
|
|
||||||
.build()),
|
|
||||||
(kw!(:label/color),
|
|
||||||
AttributeBuilder::helpful()
|
|
||||||
.value_type(ValueType::String)
|
|
||||||
.multival(false)
|
|
||||||
.build()),
|
|
||||||
],
|
|
||||||
)).expect("expected vocubulary");
|
|
||||||
in_progress.commit().expect("Expected vocabulary committed");
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Default, Debug)]
|
|
||||||
struct ObserverOutput {
|
|
||||||
txids: Vec<i64>,
|
|
||||||
changes: Vec<BTreeSet<i64>>,
|
|
||||||
called_key: Option<String>,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_observer_notified_on_registered_change() {
|
|
||||||
let mut conn = Store::open("").unwrap();
|
|
||||||
add_schema(&mut conn);
|
|
||||||
|
|
||||||
let name_entid: Entid = conn.conn().current_schema().get_entid(&kw!(:todo/name)).expect("entid to exist for name").into();
|
|
||||||
let date_entid: Entid = conn.conn().current_schema().get_entid(&kw!(:todo/completion_date)).expect("entid to exist for completion_date").into();
|
|
||||||
let mut registered_attrs = BTreeSet::new();
|
|
||||||
registered_attrs.insert(name_entid.clone());
|
|
||||||
registered_attrs.insert(date_entid.clone());
|
|
||||||
|
|
||||||
let key = "Test Observing".to_string();
|
|
||||||
|
|
||||||
let output = Arc::new(Mutex::new(ObserverOutput::default()));
|
|
||||||
|
|
||||||
let mut_output = Arc::downgrade(&output);
|
|
||||||
let (tx, rx): (mpsc::Sender<()>, mpsc::Receiver<()>) = mpsc::channel();
|
|
||||||
// because the TxObserver is in an Arc and is therefore Sync, we have to wrap the Sender in a Mutex to also
|
|
||||||
// make it Sync.
|
|
||||||
let thread_tx = Mutex::new(tx);
|
|
||||||
let tx_observer = Arc::new(TxObserver::new(registered_attrs, move |obs_key, batch| {
|
|
||||||
if let Some(out) = mut_output.upgrade() {
|
|
||||||
let mut o = out.lock().unwrap();
|
|
||||||
o.called_key = Some(obs_key.to_string());
|
|
||||||
for (tx_id, changes) in batch.into_iter() {
|
|
||||||
o.txids.push(*tx_id);
|
|
||||||
o.changes.push(changes.clone());
|
|
||||||
}
|
|
||||||
o.txids.sort();
|
|
||||||
}
|
|
||||||
thread_tx.lock().unwrap().send(()).unwrap();
|
|
||||||
}));
|
|
||||||
|
|
||||||
conn.register_observer(key.clone(), Arc::clone(&tx_observer));
|
|
||||||
assert!(conn.is_registered_as_observer(&key));
|
|
||||||
|
|
||||||
let mut tx_ids = Vec::new();
|
|
||||||
let mut changesets = Vec::new();
|
|
||||||
let db_tx_instant_entid: Entid = conn.conn().current_schema().get_entid(&kw!(:db/txInstant)).expect("entid to exist for :db/txInstant").into();
|
|
||||||
let uuid_entid: Entid = conn.conn().current_schema().get_entid(&kw!(:todo/uuid)).expect("entid to exist for name").into();
|
|
||||||
{
|
|
||||||
let mut in_progress = conn.begin_transaction().expect("expected transaction");
|
|
||||||
for i in 0..3 {
|
|
||||||
let mut changeset = BTreeSet::new();
|
|
||||||
changeset.insert(db_tx_instant_entid.clone());
|
|
||||||
let name = format!("todo{}", i);
|
|
||||||
let uuid = Uuid::new_v4();
|
|
||||||
let mut builder = in_progress.builder().describe_tempid(&name);
|
|
||||||
builder.add_kw(&kw!(:todo/uuid), TypedValue::Uuid(uuid)).expect("Expected added uuid");
|
|
||||||
changeset.insert(uuid_entid.clone());
|
|
||||||
builder.add_kw(&kw!(:todo/name), TypedValue::typed_string(&name)).expect("Expected added name");
|
|
||||||
changeset.insert(name_entid.clone());
|
|
||||||
if i % 2 == 0 {
|
|
||||||
builder.add_kw(&kw!(:todo/completion_date), TypedValue::current_instant()).expect("Expected added date");
|
|
||||||
changeset.insert(date_entid.clone());
|
|
||||||
}
|
|
||||||
let (ip, r) = builder.transact();
|
|
||||||
let report = r.expect("expected a report");
|
|
||||||
tx_ids.push(report.tx_id.clone());
|
|
||||||
changesets.push(changeset);
|
|
||||||
in_progress = ip;
|
|
||||||
}
|
|
||||||
let mut builder = in_progress.builder().describe_tempid("Label");
|
|
||||||
builder.add_kw(&kw!(:label/name), TypedValue::typed_string("Label 1")).expect("Expected added name");
|
|
||||||
builder.add_kw(&kw!(:label/color), TypedValue::typed_string("blue")).expect("Expected added color");
|
|
||||||
builder.commit().expect("expect transaction to occur");
|
|
||||||
}
|
|
||||||
|
|
||||||
let delay = Duration::from_millis(100);
|
|
||||||
let _ = rx.recv_timeout(delay);
|
|
||||||
|
|
||||||
let out = Arc::try_unwrap(output).expect("unwrapped");
|
|
||||||
let o = out.into_inner().expect("Expected an Output");
|
|
||||||
assert_eq!(o.called_key, Some(key.clone()));
|
|
||||||
assert_eq!(o.txids, tx_ids);
|
|
||||||
assert_eq!(o.changes, changesets);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_observer_not_notified_on_unregistered_change() {
|
|
||||||
let mut conn = Store::open("").unwrap();
|
|
||||||
add_schema(&mut conn);
|
|
||||||
|
|
||||||
let name_entid: Entid = conn.conn().current_schema().get_entid(&kw!(:todo/name)).expect("entid to exist for name").into();
|
|
||||||
let date_entid: Entid = conn.conn().current_schema().get_entid(&kw!(:todo/completion_date)).expect("entid to exist for completion_date").into();
|
|
||||||
let mut registered_attrs = BTreeSet::new();
|
|
||||||
registered_attrs.insert(name_entid.clone());
|
|
||||||
registered_attrs.insert(date_entid.clone());
|
|
||||||
|
|
||||||
let key = "Test Observing".to_string();
|
|
||||||
|
|
||||||
let output = Arc::new(Mutex::new(ObserverOutput::default()));
|
|
||||||
|
|
||||||
let mut_output = Arc::downgrade(&output);
|
|
||||||
let (tx, rx): (mpsc::Sender<()>, mpsc::Receiver<()>) = mpsc::channel();
|
|
||||||
let thread_tx = Mutex::new(tx);
|
|
||||||
let tx_observer = Arc::new(TxObserver::new(registered_attrs, move |obs_key, batch| {
|
|
||||||
if let Some(out) = mut_output.upgrade() {
|
|
||||||
let mut o = out.lock().unwrap();
|
|
||||||
o.called_key = Some(obs_key.to_string());
|
|
||||||
for (tx_id, changes) in batch.into_iter() {
|
|
||||||
o.txids.push(*tx_id);
|
|
||||||
o.changes.push(changes.clone());
|
|
||||||
}
|
|
||||||
o.txids.sort();
|
|
||||||
}
|
|
||||||
thread_tx.lock().unwrap().send(()).unwrap();
|
|
||||||
}));
|
|
||||||
|
|
||||||
conn.register_observer(key.clone(), Arc::clone(&tx_observer));
|
|
||||||
assert!(conn.is_registered_as_observer(&key));
|
|
||||||
|
|
||||||
let tx_ids = Vec::<Entid>::new();
|
|
||||||
let changesets = Vec::<BTreeSet<Entid>>::new();
|
|
||||||
{
|
|
||||||
let mut in_progress = conn.begin_transaction().expect("expected transaction");
|
|
||||||
for i in 0..3 {
|
|
||||||
let name = format!("label{}", i);
|
|
||||||
let mut builder = in_progress.builder().describe_tempid(&name);
|
|
||||||
builder.add_kw(&kw!(:label/name), TypedValue::typed_string(&name)).expect("Expected added name");
|
|
||||||
builder.add_kw(&kw!(:label/color), TypedValue::typed_string("blue")).expect("Expected added color");
|
|
||||||
let (ip, _) = builder.transact();
|
|
||||||
in_progress = ip;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let delay = Duration::from_millis(100);
|
|
||||||
let _ = rx.recv_timeout(delay);
|
|
||||||
|
|
||||||
let out = Arc::try_unwrap(output).expect("unwrapped");
|
|
||||||
let o = out.into_inner().expect("Expected an Output");
|
|
||||||
assert_eq!(o.called_key, None);
|
|
||||||
assert_eq!(o.txids, tx_ids);
|
|
||||||
assert_eq!(o.changes, changesets);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -109,5 +109,15 @@ error_chain! {
|
||||||
description("provided value doesn't match value type")
|
description("provided value doesn't match value type")
|
||||||
display("provided value of type {} doesn't match attribute value type {}", provided, expected)
|
display("provided value of type {} doesn't match attribute value type {}", provided, expected)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
StoreNotFound(path: String) {
|
||||||
|
description("the Store provided does not exist or is not yet open.")
|
||||||
|
display("the Store at {:?} does not exist or is not yet open.", path)
|
||||||
|
}
|
||||||
|
|
||||||
|
StoreConnectionStillActive(path: String) {
|
||||||
|
description("the Store provided has active connections and cannot be closed.")
|
||||||
|
display("the Store at {:?} has active connections and cannot be closed.", path)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -101,6 +101,7 @@ pub mod errors;
|
||||||
pub mod ident;
|
pub mod ident;
|
||||||
pub mod vocabulary;
|
pub mod vocabulary;
|
||||||
pub mod conn;
|
pub mod conn;
|
||||||
|
pub mod stores;
|
||||||
pub mod query;
|
pub mod query;
|
||||||
pub mod entity_builder;
|
pub mod entity_builder;
|
||||||
pub mod query_builder;
|
pub mod query_builder;
|
||||||
|
@ -132,7 +133,11 @@ pub use conn::{
|
||||||
Pullable,
|
Pullable,
|
||||||
Queryable,
|
Queryable,
|
||||||
Syncable,
|
Syncable,
|
||||||
|
};
|
||||||
|
|
||||||
|
pub use stores::{
|
||||||
Store,
|
Store,
|
||||||
|
Stores,
|
||||||
};
|
};
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
|
972
src/stores.rs
Normal file
972
src/stores.rs
Normal file
|
@ -0,0 +1,972 @@
|
||||||
|
// Copyright 2016 Mozilla
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
|
||||||
|
// this file except in compliance with the License. You may obtain a copy of the
|
||||||
|
// License at http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
// Unless required by applicable law or agreed to in writing, software distributed
|
||||||
|
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
|
||||||
|
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
|
||||||
|
// specific language governing permissions and limitations under the License.
|
||||||
|
|
||||||
|
#![allow(dead_code)]
|
||||||
|
|
||||||
|
use std::cell::RefCell;
|
||||||
|
use std::collections::{
|
||||||
|
BTreeMap,
|
||||||
|
};
|
||||||
|
|
||||||
|
use std::path::{
|
||||||
|
Path,
|
||||||
|
};
|
||||||
|
|
||||||
|
use std::sync::{
|
||||||
|
Arc,
|
||||||
|
RwLock,
|
||||||
|
};
|
||||||
|
|
||||||
|
use rusqlite;
|
||||||
|
|
||||||
|
use edn;
|
||||||
|
|
||||||
|
use mentat_core::{
|
||||||
|
Entid,
|
||||||
|
Keyword,
|
||||||
|
StructuredMap,
|
||||||
|
TypedValue,
|
||||||
|
ValueRc,
|
||||||
|
};
|
||||||
|
use mentat_db::{
|
||||||
|
TxObserver,
|
||||||
|
TxReport,
|
||||||
|
};
|
||||||
|
|
||||||
|
use mentat_tolstoy::Syncer;
|
||||||
|
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
use conn::{
|
||||||
|
CacheAction,
|
||||||
|
CacheDirection,
|
||||||
|
Conn,
|
||||||
|
InProgress,
|
||||||
|
InProgressRead,
|
||||||
|
Pullable,
|
||||||
|
Queryable,
|
||||||
|
Syncable
|
||||||
|
};
|
||||||
|
|
||||||
|
use errors::*;
|
||||||
|
|
||||||
|
use query::{
|
||||||
|
PreparedResult,
|
||||||
|
QueryExplanation,
|
||||||
|
QueryInputs,
|
||||||
|
QueryOutput,
|
||||||
|
};
|
||||||
|
|
||||||
|
/// A process is only permitted to have one open handle to each database. This manager
|
||||||
|
/// exists to enforce that constraint: don't open databases directly.
|
||||||
|
thread_local! {
|
||||||
|
static LOCAL_STORES: RefCell<Stores> = {
|
||||||
|
RefCell::new(Stores::new())
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
lazy_static! {
|
||||||
|
static ref CONNECTIONS: RwLock<BTreeMap<String, Arc<Conn>>> = RwLock::new(BTreeMap::default());
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct Stores {
|
||||||
|
connections: BTreeMap<String, Store>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Stores {
|
||||||
|
fn new() -> Stores {
|
||||||
|
Stores {
|
||||||
|
connections: Default::default(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Stores {
|
||||||
|
fn is_open(path: &str) -> bool {
|
||||||
|
CONNECTIONS.read().unwrap().contains_key(path)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn is_open_on_thread(path: &str) -> bool {
|
||||||
|
LOCAL_STORES.with(|s| (*s.borrow()).connections.contains_key(path))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn open(path: &str) -> Result<&mut Store> {
|
||||||
|
let p = path.to_string();
|
||||||
|
let connections = CONNECTIONS.read().unwrap();
|
||||||
|
let store = match connections.get(path) {
|
||||||
|
Some(conn) => {
|
||||||
|
LOCAL_STORES.with(|s| {
|
||||||
|
let readable = *s.borrow();
|
||||||
|
match readable.connections.get_mut(path) {
|
||||||
|
Some(store) => store,
|
||||||
|
None => {
|
||||||
|
let store = Store {
|
||||||
|
conn: conn.clone(),
|
||||||
|
sqlite: ::new_connection(path).expect("connection"),
|
||||||
|
};
|
||||||
|
(*s.borrow_mut()).connections.insert(path.to_string(), store);
|
||||||
|
readable.connections.get_mut(path).unwrap()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
},
|
||||||
|
None => {
|
||||||
|
let store = Store::open(path)?;
|
||||||
|
CONNECTIONS.write().unwrap().insert(path.to_string(), store.conn().clone());
|
||||||
|
|
||||||
|
LOCAL_STORES.with(|s| {
|
||||||
|
(*s.borrow_mut()).connections.insert(path.to_string(), store);
|
||||||
|
(*s.borrow()).connections.get_mut(path).unwrap()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
};
|
||||||
|
Ok(store)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get(path: &str) -> Result<Option<&Store>> {
|
||||||
|
Ok(LOCAL_STORES.with(|s| (*s.borrow()).connections.get(path)))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_mut(path: &str) -> Result<Option<&mut Store>> {
|
||||||
|
Ok(LOCAL_STORES.with(|s| (*s.borrow_mut()).connections.get_mut(path)))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn connect(path: &str) -> Result<Store> {
|
||||||
|
let store = LOCAL_STORES.with(|s| (*s.borrow()).connections.get_mut(path)).ok_or(ErrorKind::StoreNotFound(path.to_string()))?;
|
||||||
|
let sqlite = ::new_connection(path)?;
|
||||||
|
store.fork(sqlite)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn open_connections_for_store(path: &str) -> Result<usize> {
|
||||||
|
Ok(Arc::strong_count(CONNECTIONS.read().unwrap().get(path).ok_or(ErrorKind::StoreNotFound(path.to_string()))?))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn close(path: &str) -> Result<()> {
|
||||||
|
LOCAL_STORES.with(|s| (*s.borrow_mut()).connections.remove(path));
|
||||||
|
if Stores::open_connections_for_store(path)? <= 1 {
|
||||||
|
CONNECTIONS.write().unwrap().remove(path);
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A convenience wrapper around a single SQLite connection and a Conn. This is suitable
|
||||||
|
/// for applications that don't require complex connection management.
|
||||||
|
pub struct Store {
|
||||||
|
conn: Arc<Conn>,
|
||||||
|
sqlite: rusqlite::Connection,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Store {
|
||||||
|
/// Open a store at the supplied path, ensuring that it includes the bootstrap schema.
|
||||||
|
pub fn open(path: &str) -> Result<Store> {
|
||||||
|
let mut connection = ::new_connection(path)?;
|
||||||
|
let conn = Conn::connect(&mut connection)?;
|
||||||
|
Ok(Store {
|
||||||
|
conn: Arc::new(conn),
|
||||||
|
sqlite: connection,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns a totally blank store with no bootstrap schema. Use `open` instead.
|
||||||
|
pub fn open_empty(path: &str) -> Result<Store> {
|
||||||
|
if !path.is_empty() {
|
||||||
|
if Path::new(path).exists() {
|
||||||
|
bail!(ErrorKind::PathAlreadyExists(path.to_string()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut connection = ::new_connection(path)?;
|
||||||
|
let conn = Conn::empty(&mut connection)?;
|
||||||
|
Ok(Store {
|
||||||
|
conn: Arc::new(conn),
|
||||||
|
sqlite: connection,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn transact(&mut self, transaction: &str) -> Result<TxReport> {
|
||||||
|
let mut ip = self.begin_transaction()?;
|
||||||
|
let report = ip.transact(transaction)?;
|
||||||
|
ip.commit()?;
|
||||||
|
Ok(report)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Store {
|
||||||
|
/// Intended for use from tests.
|
||||||
|
pub fn sqlite_mut(&mut self) -> &mut rusqlite::Connection {
|
||||||
|
&mut self.sqlite
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
pub fn is_registered_as_observer(&self, key: &String) -> bool {
|
||||||
|
self.conn.tx_observer_service.lock().unwrap().is_registered(key)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Store {
|
||||||
|
pub fn fork(&mut self, sqlite: rusqlite::Connection) -> Result<Store> {
|
||||||
|
Ok(Store {
|
||||||
|
conn: self.conn.clone(),
|
||||||
|
sqlite: sqlite,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn dismantle(self) -> (rusqlite::Connection, Arc<Conn>) {
|
||||||
|
(self.sqlite, self.conn)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn conn(&self) -> &Arc<Conn> {
|
||||||
|
&self.conn
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn begin_read<'m>(&'m mut self) -> Result<InProgressRead<'m, 'm>> {
|
||||||
|
Arc::make_mut(&mut self.conn).begin_read(&mut self.sqlite)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn begin_transaction<'m>(&'m mut self) -> Result<InProgress<'m, 'm>> {
|
||||||
|
Arc::make_mut(&mut self.conn).begin_transaction(&mut self.sqlite)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn cache(&mut self, attr: &Keyword, direction: CacheDirection) -> Result<()> {
|
||||||
|
let schema = &self.conn.current_schema();
|
||||||
|
Arc::make_mut(&mut self.conn).cache(&mut self.sqlite,
|
||||||
|
schema,
|
||||||
|
attr,
|
||||||
|
direction,
|
||||||
|
CacheAction::Register)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn register_observer(&mut self, key: String, observer: Arc<TxObserver>) {
|
||||||
|
Arc::make_mut(&mut self.conn).register_observer(key, observer);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn unregister_observer(&mut self, key: &String) {
|
||||||
|
Arc::make_mut(&mut self.conn).unregister_observer(key);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Queryable for Store {
|
||||||
|
fn q_once<T>(&self, query: &str, inputs: T) -> Result<QueryOutput>
|
||||||
|
where T: Into<Option<QueryInputs>> {
|
||||||
|
self.conn.q_once(&self.sqlite, query, inputs)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn q_prepare<T>(&self, query: &str, inputs: T) -> PreparedResult
|
||||||
|
where T: Into<Option<QueryInputs>> {
|
||||||
|
self.conn.q_prepare(&self.sqlite, query, inputs)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn q_explain<T>(&self, query: &str, inputs: T) -> Result<QueryExplanation>
|
||||||
|
where T: Into<Option<QueryInputs>> {
|
||||||
|
self.conn.q_explain(&self.sqlite, query, inputs)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn lookup_values_for_attribute<E>(&self, entity: E, attribute: &edn::Keyword) -> Result<Vec<TypedValue>>
|
||||||
|
where E: Into<Entid> {
|
||||||
|
self.conn.lookup_values_for_attribute(&self.sqlite, entity.into(), attribute)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn lookup_value_for_attribute<E>(&self, entity: E, attribute: &edn::Keyword) -> Result<Option<TypedValue>>
|
||||||
|
where E: Into<Entid> {
|
||||||
|
self.conn.lookup_value_for_attribute(&self.sqlite, entity.into(), attribute)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Pullable for Store {
|
||||||
|
fn pull_attributes_for_entities<E, A>(&self, entities: E, attributes: A) -> Result<BTreeMap<Entid, ValueRc<StructuredMap>>>
|
||||||
|
where E: IntoIterator<Item=Entid>,
|
||||||
|
A: IntoIterator<Item=Entid> {
|
||||||
|
self.conn.pull_attributes_for_entities(&self.sqlite, entities, attributes)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn pull_attributes_for_entity<A>(&self, entity: Entid, attributes: A) -> Result<StructuredMap>
|
||||||
|
where A: IntoIterator<Item=Entid> {
|
||||||
|
self.conn.pull_attributes_for_entity(&self.sqlite, entity, attributes)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Syncable for Store {
|
||||||
|
fn sync(&mut self, server_uri: &String, user_uuid: &String) -> Result<()> {
|
||||||
|
let uuid = Uuid::parse_str(&user_uuid)?;
|
||||||
|
Ok(Syncer::flow(&mut self.sqlite, server_uri, &uuid)?)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
extern crate time;
|
||||||
|
extern crate mentat_parser_utils;
|
||||||
|
|
||||||
|
use std::collections::{
|
||||||
|
BTreeSet,
|
||||||
|
};
|
||||||
|
use std::path::{
|
||||||
|
PathBuf,
|
||||||
|
};
|
||||||
|
use std::sync::mpsc;
|
||||||
|
use std::sync::{
|
||||||
|
Mutex,
|
||||||
|
};
|
||||||
|
use std::time::{
|
||||||
|
Duration,
|
||||||
|
};
|
||||||
|
|
||||||
|
use mentat_db::cache::{
|
||||||
|
SQLiteAttributeCache,
|
||||||
|
};
|
||||||
|
|
||||||
|
use mentat_core::{
|
||||||
|
CachedAttributes,
|
||||||
|
HasSchema,
|
||||||
|
TypedValue,
|
||||||
|
ValueType,
|
||||||
|
};
|
||||||
|
|
||||||
|
use ::entity_builder::{
|
||||||
|
BuildTerms,
|
||||||
|
};
|
||||||
|
|
||||||
|
use ::query::{
|
||||||
|
PreparedQuery,
|
||||||
|
};
|
||||||
|
|
||||||
|
use ::{
|
||||||
|
QueryInputs,
|
||||||
|
};
|
||||||
|
|
||||||
|
use ::vocabulary::{
|
||||||
|
AttributeBuilder,
|
||||||
|
Definition,
|
||||||
|
VersionedStore,
|
||||||
|
};
|
||||||
|
|
||||||
|
use ::vocabulary::attribute::{
|
||||||
|
Unique,
|
||||||
|
};
|
||||||
|
|
||||||
|
fn fixture_path(rest: &str) -> PathBuf {
|
||||||
|
let fixtures = Path::new("fixtures/");
|
||||||
|
fixtures.join(Path::new(rest))
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_prepared_query_with_cache() {
|
||||||
|
let mut store = Store::open("").expect("opened");
|
||||||
|
let mut in_progress = store.begin_transaction().expect("began");
|
||||||
|
in_progress.import(fixture_path("cities.schema")).expect("transacted schema");
|
||||||
|
in_progress.import(fixture_path("all_seattle.edn")).expect("transacted data");
|
||||||
|
in_progress.cache(&kw!(:neighborhood/district), CacheDirection::Forward, CacheAction::Register).expect("cache done");
|
||||||
|
in_progress.cache(&kw!(:district/name), CacheDirection::Forward, CacheAction::Register).expect("cache done");
|
||||||
|
in_progress.cache(&kw!(:neighborhood/name), CacheDirection::Reverse, CacheAction::Register).expect("cache done");
|
||||||
|
|
||||||
|
let query = r#"[:find ?district
|
||||||
|
:in ?hood
|
||||||
|
:where
|
||||||
|
[?neighborhood :neighborhood/name ?hood]
|
||||||
|
[?neighborhood :neighborhood/district ?d]
|
||||||
|
[?d :district/name ?district]]"#;
|
||||||
|
let hood = "Beacon Hill";
|
||||||
|
let inputs = QueryInputs::with_value_sequence(vec![(var!(?hood), TypedValue::typed_string(hood).into())]);
|
||||||
|
let mut prepared = in_progress.q_prepare(query, inputs)
|
||||||
|
.expect("prepared");
|
||||||
|
match &prepared {
|
||||||
|
&PreparedQuery::Constant { select: ref _select } => {},
|
||||||
|
_ => panic!(),
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
let start = time::PreciseTime::now();
|
||||||
|
let results = prepared.run(None).expect("results");
|
||||||
|
let end = time::PreciseTime::now();
|
||||||
|
println!("Prepared cache execution took {}µs", start.to(end).num_microseconds().unwrap());
|
||||||
|
assert_eq!(results.into_rel().expect("result"),
|
||||||
|
vec![vec![TypedValue::typed_string("Greater Duwamish")]].into());
|
||||||
|
}
|
||||||
|
|
||||||
|
trait StoreCache {
|
||||||
|
fn get_entid_for_value(&self, attr: Entid, val: &TypedValue) -> Option<Entid>;
|
||||||
|
fn is_attribute_cached_reverse(&self, attr: Entid) -> bool;
|
||||||
|
fn is_attribute_cached_forward(&self, attr: Entid) -> bool;
|
||||||
|
}
|
||||||
|
|
||||||
|
impl StoreCache for Store {
|
||||||
|
fn get_entid_for_value(&self, attr: Entid, val: &TypedValue) -> Option<Entid> {
|
||||||
|
let cache = self.conn.current_cache();
|
||||||
|
cache.get_entid_for_value(attr, val)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn is_attribute_cached_forward(&self, attr: Entid) -> bool {
|
||||||
|
self.conn.current_cache().is_attribute_cached_forward(attr)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn is_attribute_cached_reverse(&self, attr: Entid) -> bool {
|
||||||
|
self.conn.current_cache().is_attribute_cached_reverse(attr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_cache_mutation() {
|
||||||
|
let mut store = Store::open("").expect("opened");
|
||||||
|
|
||||||
|
{
|
||||||
|
let mut in_progress = store.begin_transaction().expect("begun");
|
||||||
|
in_progress.transact(r#"[
|
||||||
|
{ :db/ident :foo/bar
|
||||||
|
:db/cardinality :db.cardinality/one
|
||||||
|
:db/index true
|
||||||
|
:db/unique :db.unique/identity
|
||||||
|
:db/valueType :db.type/long },
|
||||||
|
{ :db/ident :foo/baz
|
||||||
|
:db/cardinality :db.cardinality/one
|
||||||
|
:db/valueType :db.type/boolean }
|
||||||
|
{ :db/ident :foo/x
|
||||||
|
:db/cardinality :db.cardinality/many
|
||||||
|
:db/valueType :db.type/long }]"#).expect("transact");
|
||||||
|
|
||||||
|
// Cache one….
|
||||||
|
in_progress.cache(&kw!(:foo/bar), CacheDirection::Reverse, CacheAction::Register).expect("cache done");
|
||||||
|
in_progress.commit().expect("commit");
|
||||||
|
}
|
||||||
|
|
||||||
|
let foo_bar = store.conn.current_schema().get_entid(&kw!(:foo/bar)).expect("foo/bar").0;
|
||||||
|
let foo_baz = store.conn.current_schema().get_entid(&kw!(:foo/baz)).expect("foo/baz").0;
|
||||||
|
let foo_x = store.conn.current_schema().get_entid(&kw!(:foo/x)).expect("foo/x").0;
|
||||||
|
|
||||||
|
// … and cache the others via the store.
|
||||||
|
store.cache(&kw!(:foo/baz), CacheDirection::Both).expect("cache done");
|
||||||
|
store.cache(&kw!(:foo/x), CacheDirection::Forward).expect("cache done");
|
||||||
|
{
|
||||||
|
assert!(store.is_attribute_cached_reverse(foo_bar));
|
||||||
|
assert!(store.is_attribute_cached_forward(foo_baz));
|
||||||
|
assert!(store.is_attribute_cached_reverse(foo_baz));
|
||||||
|
assert!(store.is_attribute_cached_forward(foo_x));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add some data.
|
||||||
|
{
|
||||||
|
let mut in_progress = store.begin_transaction().expect("begun");
|
||||||
|
|
||||||
|
{
|
||||||
|
assert!(in_progress.cache.is_attribute_cached_reverse(foo_bar));
|
||||||
|
assert!(in_progress.cache.is_attribute_cached_forward(foo_baz));
|
||||||
|
assert!(in_progress.cache.is_attribute_cached_reverse(foo_baz));
|
||||||
|
assert!(in_progress.cache.is_attribute_cached_forward(foo_x));
|
||||||
|
|
||||||
|
assert!(in_progress.cache.overlay.is_attribute_cached_reverse(foo_bar));
|
||||||
|
assert!(in_progress.cache.overlay.is_attribute_cached_forward(foo_baz));
|
||||||
|
assert!(in_progress.cache.overlay.is_attribute_cached_reverse(foo_baz));
|
||||||
|
assert!(in_progress.cache.overlay.is_attribute_cached_forward(foo_x));
|
||||||
|
}
|
||||||
|
|
||||||
|
in_progress.transact(r#"[
|
||||||
|
{:foo/bar 15, :foo/baz false, :foo/x [1, 2, 3]}
|
||||||
|
{:foo/bar 99, :foo/baz true}
|
||||||
|
{:foo/bar -2, :foo/baz true}
|
||||||
|
]"#).expect("transact");
|
||||||
|
|
||||||
|
// Data is in the cache.
|
||||||
|
let first = in_progress.cache.get_entid_for_value(foo_bar, &TypedValue::Long(15)).expect("id");
|
||||||
|
assert_eq!(in_progress.cache.get_value_for_entid(&in_progress.schema, foo_baz, first).expect("val"), &TypedValue::Boolean(false));
|
||||||
|
|
||||||
|
// All three values for :foo/x.
|
||||||
|
let all_three: BTreeSet<TypedValue> = in_progress.cache
|
||||||
|
.get_values_for_entid(&in_progress.schema, foo_x, first)
|
||||||
|
.expect("val")
|
||||||
|
.iter().cloned().collect();
|
||||||
|
assert_eq!(all_three, vec![1, 2, 3].into_iter().map(TypedValue::Long).collect());
|
||||||
|
|
||||||
|
in_progress.commit().expect("commit");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Data is still in the cache.
|
||||||
|
{
|
||||||
|
let first = store.get_entid_for_value(foo_bar, &TypedValue::Long(15)).expect("id");
|
||||||
|
let cache: SQLiteAttributeCache = store.conn.current_cache();
|
||||||
|
assert_eq!(cache.get_value_for_entid(&store.conn.current_schema(), foo_baz, first).expect("val"), &TypedValue::Boolean(false));
|
||||||
|
|
||||||
|
let all_three: BTreeSet<TypedValue> = cache.get_values_for_entid(&store.conn.current_schema(), foo_x, first)
|
||||||
|
.expect("val")
|
||||||
|
.iter().cloned().collect();
|
||||||
|
assert_eq!(all_three, vec![1, 2, 3].into_iter().map(TypedValue::Long).collect());
|
||||||
|
}
|
||||||
|
|
||||||
|
// We can remove data and the cache reflects it, immediately and after commit.
|
||||||
|
{
|
||||||
|
let mut in_progress = store.begin_transaction().expect("began");
|
||||||
|
let first = in_progress.cache.get_entid_for_value(foo_bar, &TypedValue::Long(15)).expect("id");
|
||||||
|
in_progress.transact(format!("[[:db/retract {} :foo/x 2]]", first).as_str()).expect("transact");
|
||||||
|
|
||||||
|
let only_two: BTreeSet<TypedValue> = in_progress.cache
|
||||||
|
.get_values_for_entid(&in_progress.schema, foo_x, first)
|
||||||
|
.expect("val")
|
||||||
|
.iter().cloned().collect();
|
||||||
|
assert_eq!(only_two, vec![1, 3].into_iter().map(TypedValue::Long).collect());
|
||||||
|
|
||||||
|
// Rollback: unchanged.
|
||||||
|
}
|
||||||
|
{
|
||||||
|
let first = store.get_entid_for_value(foo_bar, &TypedValue::Long(15)).expect("id");
|
||||||
|
let cache: SQLiteAttributeCache = store.conn.current_cache();
|
||||||
|
assert_eq!(cache.get_value_for_entid(&store.conn.current_schema(), foo_baz, first).expect("val"), &TypedValue::Boolean(false));
|
||||||
|
|
||||||
|
let all_three: BTreeSet<TypedValue> = cache.get_values_for_entid(&store.conn.current_schema(), foo_x, first)
|
||||||
|
.expect("val")
|
||||||
|
.iter().cloned().collect();
|
||||||
|
assert_eq!(all_three, vec![1, 2, 3].into_iter().map(TypedValue::Long).collect());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Try again, but this time commit.
|
||||||
|
{
|
||||||
|
let mut in_progress = store.begin_transaction().expect("began");
|
||||||
|
let first = in_progress.cache.get_entid_for_value(foo_bar, &TypedValue::Long(15)).expect("id");
|
||||||
|
in_progress.transact(format!("[[:db/retract {} :foo/x 2]]", first).as_str()).expect("transact");
|
||||||
|
in_progress.commit().expect("committed");
|
||||||
|
}
|
||||||
|
{
|
||||||
|
let first = store.get_entid_for_value(foo_bar, &TypedValue::Long(15)).expect("id");
|
||||||
|
let cache: SQLiteAttributeCache = store.conn.current_cache();
|
||||||
|
assert_eq!(cache.get_value_for_entid(&store.conn.current_schema(), foo_baz, first).expect("val"), &TypedValue::Boolean(false));
|
||||||
|
|
||||||
|
let only_two: BTreeSet<TypedValue> = cache.get_values_for_entid(&store.conn.current_schema(), foo_x, first)
|
||||||
|
.expect("val")
|
||||||
|
.iter().cloned().collect();
|
||||||
|
assert_eq!(only_two, vec![1, 3].into_iter().map(TypedValue::Long).collect());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn test_register_observer() {
|
||||||
|
let mut conn = Store::open("").unwrap();
|
||||||
|
|
||||||
|
let key = "Test Observer".to_string();
|
||||||
|
let tx_observer = TxObserver::new(BTreeSet::new(), move |_obs_key, _batch| {});
|
||||||
|
|
||||||
|
conn.register_observer(key.clone(), Arc::new(tx_observer));
|
||||||
|
assert!(conn.is_registered_as_observer(&key));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_deregister_observer() {
|
||||||
|
let mut conn = Store::open("").unwrap();
|
||||||
|
|
||||||
|
let key = "Test Observer".to_string();
|
||||||
|
|
||||||
|
let tx_observer = TxObserver::new(BTreeSet::new(), move |_obs_key, _batch| {});
|
||||||
|
|
||||||
|
conn.register_observer(key.clone(), Arc::new(tx_observer));
|
||||||
|
assert!(conn.is_registered_as_observer(&key));
|
||||||
|
|
||||||
|
conn.unregister_observer(&key);
|
||||||
|
|
||||||
|
assert!(!conn.is_registered_as_observer(&key));
|
||||||
|
}
|
||||||
|
|
||||||
|
fn add_schema(conn: &mut Store) {
|
||||||
|
// transact some schema
|
||||||
|
let mut in_progress = conn.begin_transaction().expect("expected in progress");
|
||||||
|
in_progress.ensure_vocabulary(&Definition::new(
|
||||||
|
kw!(:todo/items),
|
||||||
|
1,
|
||||||
|
vec![
|
||||||
|
(kw!(:todo/uuid),
|
||||||
|
AttributeBuilder::helpful()
|
||||||
|
.value_type(ValueType::Uuid)
|
||||||
|
.multival(false)
|
||||||
|
.unique(Unique::Value)
|
||||||
|
.index(true)
|
||||||
|
.build()),
|
||||||
|
(kw!(:todo/name),
|
||||||
|
AttributeBuilder::helpful()
|
||||||
|
.value_type(ValueType::String)
|
||||||
|
.multival(false)
|
||||||
|
.fulltext(true)
|
||||||
|
.build()),
|
||||||
|
(kw!(:todo/completion_date),
|
||||||
|
AttributeBuilder::helpful()
|
||||||
|
.value_type(ValueType::Instant)
|
||||||
|
.multival(false)
|
||||||
|
.build()),
|
||||||
|
(kw!(:label/name),
|
||||||
|
AttributeBuilder::helpful()
|
||||||
|
.value_type(ValueType::String)
|
||||||
|
.multival(false)
|
||||||
|
.unique(Unique::Value)
|
||||||
|
.fulltext(true)
|
||||||
|
.index(true)
|
||||||
|
.build()),
|
||||||
|
(kw!(:label/color),
|
||||||
|
AttributeBuilder::helpful()
|
||||||
|
.value_type(ValueType::String)
|
||||||
|
.multival(false)
|
||||||
|
.build()),
|
||||||
|
],
|
||||||
|
)).expect("expected vocubulary");
|
||||||
|
in_progress.commit().expect("Expected vocabulary committed");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Default, Debug)]
|
||||||
|
struct ObserverOutput {
|
||||||
|
txids: Vec<i64>,
|
||||||
|
changes: Vec<BTreeSet<i64>>,
|
||||||
|
called_key: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_observer_notified_on_registered_change() {
|
||||||
|
let mut conn = Store::open("").unwrap();
|
||||||
|
add_schema(&mut conn);
|
||||||
|
|
||||||
|
let name_entid: Entid = conn.conn().current_schema().get_entid(&kw!(:todo/name)).expect("entid to exist for name").into();
|
||||||
|
let date_entid: Entid = conn.conn().current_schema().get_entid(&kw!(:todo/completion_date)).expect("entid to exist for completion_date").into();
|
||||||
|
let mut registered_attrs = BTreeSet::new();
|
||||||
|
registered_attrs.insert(name_entid.clone());
|
||||||
|
registered_attrs.insert(date_entid.clone());
|
||||||
|
|
||||||
|
let key = "Test Observing".to_string();
|
||||||
|
|
||||||
|
let output = Arc::new(Mutex::new(ObserverOutput::default()));
|
||||||
|
|
||||||
|
let mut_output = Arc::downgrade(&output);
|
||||||
|
let (tx, rx): (mpsc::Sender<()>, mpsc::Receiver<()>) = mpsc::channel();
|
||||||
|
// because the TxObserver is in an Arc and is therefore Sync, we have to wrap the Sender in a Mutex to also
|
||||||
|
// make it Sync.
|
||||||
|
let thread_tx = Mutex::new(tx);
|
||||||
|
let tx_observer = Arc::new(TxObserver::new(registered_attrs, move |obs_key, batch| {
|
||||||
|
if let Some(out) = mut_output.upgrade() {
|
||||||
|
let mut o = out.lock().unwrap();
|
||||||
|
o.called_key = Some(obs_key.to_string());
|
||||||
|
for (tx_id, changes) in batch.into_iter() {
|
||||||
|
o.txids.push(*tx_id);
|
||||||
|
o.changes.push(changes.clone());
|
||||||
|
}
|
||||||
|
o.txids.sort();
|
||||||
|
}
|
||||||
|
thread_tx.lock().unwrap().send(()).unwrap();
|
||||||
|
}));
|
||||||
|
|
||||||
|
conn.register_observer(key.clone(), Arc::clone(&tx_observer));
|
||||||
|
assert!(conn.is_registered_as_observer(&key));
|
||||||
|
|
||||||
|
let mut tx_ids = Vec::new();
|
||||||
|
let mut changesets = Vec::new();
|
||||||
|
let db_tx_instant_entid: Entid = conn.conn().current_schema().get_entid(&kw!(:db/txInstant)).expect("entid to exist for :db/txInstant").into();
|
||||||
|
let uuid_entid: Entid = conn.conn().current_schema().get_entid(&kw!(:todo/uuid)).expect("entid to exist for name").into();
|
||||||
|
{
|
||||||
|
let mut in_progress = conn.begin_transaction().expect("expected transaction");
|
||||||
|
for i in 0..3 {
|
||||||
|
let mut changeset = BTreeSet::new();
|
||||||
|
changeset.insert(db_tx_instant_entid.clone());
|
||||||
|
let name = format!("todo{}", i);
|
||||||
|
let uuid = Uuid::new_v4();
|
||||||
|
let mut builder = in_progress.builder().describe_tempid(&name);
|
||||||
|
builder.add_kw(&kw!(:todo/uuid), TypedValue::Uuid(uuid)).expect("Expected added uuid");
|
||||||
|
changeset.insert(uuid_entid.clone());
|
||||||
|
builder.add_kw(&kw!(:todo/name), TypedValue::typed_string(&name)).expect("Expected added name");
|
||||||
|
changeset.insert(name_entid.clone());
|
||||||
|
if i % 2 == 0 {
|
||||||
|
builder.add_kw(&kw!(:todo/completion_date), TypedValue::current_instant()).expect("Expected added date");
|
||||||
|
changeset.insert(date_entid.clone());
|
||||||
|
}
|
||||||
|
let (ip, r) = builder.transact();
|
||||||
|
let report = r.expect("expected a report");
|
||||||
|
tx_ids.push(report.tx_id.clone());
|
||||||
|
changesets.push(changeset);
|
||||||
|
in_progress = ip;
|
||||||
|
}
|
||||||
|
let mut builder = in_progress.builder().describe_tempid("Label");
|
||||||
|
builder.add_kw(&kw!(:label/name), TypedValue::typed_string("Label 1")).expect("Expected added name");
|
||||||
|
builder.add_kw(&kw!(:label/color), TypedValue::typed_string("blue")).expect("Expected added color");
|
||||||
|
builder.commit().expect("expect transaction to occur");
|
||||||
|
}
|
||||||
|
|
||||||
|
let delay = Duration::from_millis(100);
|
||||||
|
let _ = rx.recv_timeout(delay);
|
||||||
|
|
||||||
|
let out = Arc::try_unwrap(output).expect("unwrapped");
|
||||||
|
let o = out.into_inner().expect("Expected an Output");
|
||||||
|
assert_eq!(o.called_key, Some(key.clone()));
|
||||||
|
assert_eq!(o.txids, tx_ids);
|
||||||
|
assert_eq!(o.changes, changesets);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_observer_not_notified_on_unregistered_change() {
|
||||||
|
let mut conn = Store::open("").unwrap();
|
||||||
|
add_schema(&mut conn);
|
||||||
|
|
||||||
|
let name_entid: Entid = conn.conn().current_schema().get_entid(&kw!(:todo/name)).expect("entid to exist for name").into();
|
||||||
|
let date_entid: Entid = conn.conn().current_schema().get_entid(&kw!(:todo/completion_date)).expect("entid to exist for completion_date").into();
|
||||||
|
let mut registered_attrs = BTreeSet::new();
|
||||||
|
registered_attrs.insert(name_entid.clone());
|
||||||
|
registered_attrs.insert(date_entid.clone());
|
||||||
|
|
||||||
|
let key = "Test Observing".to_string();
|
||||||
|
|
||||||
|
let output = Arc::new(Mutex::new(ObserverOutput::default()));
|
||||||
|
|
||||||
|
let mut_output = Arc::downgrade(&output);
|
||||||
|
let (tx, rx): (mpsc::Sender<()>, mpsc::Receiver<()>) = mpsc::channel();
|
||||||
|
let thread_tx = Mutex::new(tx);
|
||||||
|
let tx_observer = Arc::new(TxObserver::new(registered_attrs, move |obs_key, batch| {
|
||||||
|
if let Some(out) = mut_output.upgrade() {
|
||||||
|
let mut o = out.lock().unwrap();
|
||||||
|
o.called_key = Some(obs_key.to_string());
|
||||||
|
for (tx_id, changes) in batch.into_iter() {
|
||||||
|
o.txids.push(*tx_id);
|
||||||
|
o.changes.push(changes.clone());
|
||||||
|
}
|
||||||
|
o.txids.sort();
|
||||||
|
}
|
||||||
|
thread_tx.lock().unwrap().send(()).unwrap();
|
||||||
|
}));
|
||||||
|
|
||||||
|
conn.register_observer(key.clone(), Arc::clone(&tx_observer));
|
||||||
|
assert!(conn.is_registered_as_observer(&key));
|
||||||
|
|
||||||
|
let tx_ids = Vec::<Entid>::new();
|
||||||
|
let changesets = Vec::<BTreeSet<Entid>>::new();
|
||||||
|
{
|
||||||
|
let mut in_progress = conn.begin_transaction().expect("expected transaction");
|
||||||
|
for i in 0..3 {
|
||||||
|
let name = format!("label{}", i);
|
||||||
|
let mut builder = in_progress.builder().describe_tempid(&name);
|
||||||
|
builder.add_kw(&kw!(:label/name), TypedValue::typed_string(&name)).expect("Expected added name");
|
||||||
|
builder.add_kw(&kw!(:label/color), TypedValue::typed_string("blue")).expect("Expected added color");
|
||||||
|
let (ip, _) = builder.transact();
|
||||||
|
in_progress = ip;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let delay = Duration::from_millis(100);
|
||||||
|
let _ = rx.recv_timeout(delay);
|
||||||
|
|
||||||
|
let out = Arc::try_unwrap(output).expect("unwrapped");
|
||||||
|
let o = out.into_inner().expect("Expected an Output");
|
||||||
|
assert_eq!(o.called_key, None);
|
||||||
|
assert_eq!(o.txids, tx_ids);
|
||||||
|
assert_eq!(o.changes, changesets);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_stores_open_new_store() {
|
||||||
|
let store = Stores::open("test.db").expect("Expected a store to be opened");
|
||||||
|
assert_eq!(1, Arc::strong_count(store.conn()));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_stores_open_new_in_memory_store() {
|
||||||
|
let path = "";
|
||||||
|
let store = Stores::open(path).expect("Expected a store to be opened");
|
||||||
|
assert_eq!(1, Arc::strong_count(store.conn()));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_stores_open_existing_store() {
|
||||||
|
{
|
||||||
|
let store1 = Stores::open("").expect("Expected a store to be opened");
|
||||||
|
assert_eq!(1, Arc::strong_count(store1.conn()));
|
||||||
|
}
|
||||||
|
{
|
||||||
|
let store2 = Stores::open("").expect("Expected a store to be opened");
|
||||||
|
assert_eq!(1, Arc::strong_count(store2.conn()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_stores_get_open_store() {
|
||||||
|
let path = "";
|
||||||
|
{
|
||||||
|
let store = Stores::open(path).expect("Expected a store to be opened");
|
||||||
|
assert_eq!(1, Arc::strong_count(store.conn()));
|
||||||
|
}
|
||||||
|
{
|
||||||
|
let store_ref = Stores::get(path).expect("Expected a store to be fetched").unwrap();
|
||||||
|
assert_eq!(1, Arc::strong_count(store_ref.conn()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_stores_get_closed_store() {
|
||||||
|
match Stores::get("").expect("Expected a store to be fetched") {
|
||||||
|
None => (),
|
||||||
|
Some(_) => panic!("Store is not open and so none should be returned"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_stores_get_mut_open_store() {
|
||||||
|
let path = "";
|
||||||
|
{
|
||||||
|
let store = Stores::open(path).expect("Expected a store to be opened");
|
||||||
|
assert_eq!(1, Arc::strong_count(store.conn()));
|
||||||
|
}
|
||||||
|
{
|
||||||
|
let store_ref = Stores::get_mut(path).expect("Expected a store to be fetched").unwrap();
|
||||||
|
assert_eq!(1, Arc::strong_count(store_ref.conn()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_stores_get_mut_closed_store() {
|
||||||
|
match Stores::get_mut("").expect("Expected a store to be fetched") {
|
||||||
|
None => (),
|
||||||
|
Some(_) => panic!("Store is not open and so none should be returned"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_stores_connect_open_store() {
|
||||||
|
let path = "";
|
||||||
|
{
|
||||||
|
let store1 = Stores::open(path).expect("Expected a store to be opened");
|
||||||
|
assert_eq!(1, Arc::strong_count(store1.conn()));
|
||||||
|
}
|
||||||
|
|
||||||
|
// forking an open store leads to a ref count of 2 on the shared conn.
|
||||||
|
let store2 = Stores::connect(path).expect("expected a new store");
|
||||||
|
assert_eq!(2, Arc::strong_count(store2.conn()));
|
||||||
|
|
||||||
|
{
|
||||||
|
// fetching a reference to the original store also has a ref count of 2 on the shared conn
|
||||||
|
let store3 = Stores::get(path).expect("Expected a store to be fetched").unwrap();
|
||||||
|
assert_eq!(2, Arc::strong_count(store3.conn()));
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
// forking again, in it's own scope increases the refcount.
|
||||||
|
let store4 = Stores::connect(path).expect("expected a new store");
|
||||||
|
assert_eq!(3, Arc::strong_count(store2.conn()));
|
||||||
|
assert_eq!(3, Arc::strong_count(store4.conn()));
|
||||||
|
}
|
||||||
|
|
||||||
|
// but now that scope is over, the original refcount is restored.
|
||||||
|
// let store5 = manager.get(path).expect("Expected a store to be fetched").unwrap();
|
||||||
|
assert_eq!(2, Arc::strong_count(store2.conn()));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_stores_connect_closed_store() {
|
||||||
|
let path = "";
|
||||||
|
let err = Stores::connect(path).err();
|
||||||
|
match err.unwrap() {
|
||||||
|
Error(ErrorKind::StoreNotFound(message), _) => { assert_eq!(path, message); },
|
||||||
|
x => panic!("expected Store Not Found error, got {:?}", x),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_stores_close_store_with_one_reference() {
|
||||||
|
let path = "";
|
||||||
|
{
|
||||||
|
let store = Stores::open(path).expect("Expected a store to be opened");
|
||||||
|
assert_eq!(1, Arc::strong_count(store.conn()));
|
||||||
|
}
|
||||||
|
|
||||||
|
assert!(Stores::close(path).is_ok());
|
||||||
|
|
||||||
|
assert!(Stores::get(path).expect("expected an empty result").is_none())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_stores_close_store_with_multiple_references() {
|
||||||
|
let path = "";
|
||||||
|
{
|
||||||
|
let store1 = Stores::open(path).expect("Expected a store to be opened");
|
||||||
|
assert_eq!(1, Arc::strong_count(store1.conn()));
|
||||||
|
}
|
||||||
|
|
||||||
|
// forking an open store leads to a ref count of 2 on the shared conn.
|
||||||
|
let store2 = Stores::connect(path).expect("expected a new store");
|
||||||
|
assert_eq!(2, Arc::strong_count(store2.conn()));
|
||||||
|
|
||||||
|
let err = Stores::close(path).err();
|
||||||
|
match err.unwrap() {
|
||||||
|
Error(ErrorKind::StoreConnectionStillActive(message), _) => { assert_eq!(path, message); },
|
||||||
|
x => panic!("expected StoreConnectionStillActive error, got {:?}", x),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_stores_close_store_with_scoped_multiple_references() {
|
||||||
|
let path = "";
|
||||||
|
{
|
||||||
|
let store1 = Stores::open(path).expect("Expected a store to be opened");
|
||||||
|
assert_eq!(1, Arc::strong_count(store1.conn()));
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
// forking an open store leads to a ref count of 2 on the shared conn.
|
||||||
|
let store2 = Stores::connect(path).expect("expected a new store");
|
||||||
|
assert_eq!(2, Arc::strong_count(store2.conn()));
|
||||||
|
|
||||||
|
let err = Stores::close(path).err();
|
||||||
|
match err.unwrap() {
|
||||||
|
Error(ErrorKind::StoreConnectionStillActive(message), _) => { assert_eq!(path, message); },
|
||||||
|
x => panic!("expected StoreConnectionStillActive error, got {:?}", x),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// outside of the scope, there should only be one strong reference so we can close the connection
|
||||||
|
assert!(Stores::close(path).is_ok());
|
||||||
|
assert!(Stores::get(path).expect("expected an empty result").is_none())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_stores_close_unopened_store() {
|
||||||
|
let path = "";
|
||||||
|
|
||||||
|
let err = Stores::close(path).err();
|
||||||
|
match err.unwrap() {
|
||||||
|
Error(ErrorKind::StoreNotFound(message), _) => { assert_eq!(path, message); },
|
||||||
|
x => panic!("expected StoreNotFound error, got {:?}", x),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_stores_connect_perform_mutable_operations() {
|
||||||
|
let path = "test.db";
|
||||||
|
|
||||||
|
{
|
||||||
|
let store1 = Stores::open(path).expect("Expected a store to be opened");
|
||||||
|
assert_eq!(1, Arc::strong_count(store1.conn()));
|
||||||
|
let mut in_progress = store1.begin_transaction().expect("begun");
|
||||||
|
in_progress.transact(r#"[
|
||||||
|
{ :db/ident :foo/bar
|
||||||
|
:db/cardinality :db.cardinality/one
|
||||||
|
:db/index true
|
||||||
|
:db/unique :db.unique/identity
|
||||||
|
:db/valueType :db.type/long },
|
||||||
|
{ :db/ident :foo/baz
|
||||||
|
:db/cardinality :db.cardinality/one
|
||||||
|
:db/valueType :db.type/boolean }
|
||||||
|
{ :db/ident :foo/x
|
||||||
|
:db/cardinality :db.cardinality/many
|
||||||
|
:db/valueType :db.type/long }]"#).expect("transact");
|
||||||
|
|
||||||
|
in_progress.commit().expect("commit");
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
// forking an open store leads to a ref count of 2 on the shared conn.
|
||||||
|
// we should be able to perform write operations on this connection
|
||||||
|
let mut store2 = Stores::connect(path).expect("expected a new store");
|
||||||
|
assert_eq!(2, Arc::strong_count(store2.conn()));
|
||||||
|
let mut in_progress = store2.begin_transaction().expect("begun");
|
||||||
|
in_progress.transact(r#"[
|
||||||
|
{:foo/bar 15, :foo/baz false, :foo/x [1, 2, 3]}
|
||||||
|
{:foo/bar 99, :foo/baz true}
|
||||||
|
{:foo/bar -2, :foo/baz true}
|
||||||
|
]"#).expect("transact");
|
||||||
|
in_progress.commit().expect("commit");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in a new issue