866 lines
29 KiB
Rust
866 lines
29 KiB
Rust
// Copyright 2016 Mozilla
|
||
//
|
||
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
|
||
// this file except in compliance with the License. You may obtain a copy of the
|
||
// License at http://www.apache.org/licenses/LICENSE-2.0
|
||
// Unless required by applicable law or agreed to in writing, software distributed
|
||
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
|
||
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
|
||
// specific language governing permissions and limitations under the License.
|
||
|
||
#![allow(dead_code)]
|
||
|
||
use std::collections::BTreeMap;
|
||
|
||
use std::sync::Arc;
|
||
|
||
use core_traits::{Entid, StructuredMap, TypedValue};
|
||
|
||
use mentat_core::{Keyword, TxReport, ValueRc};
|
||
use mentat_db::TxObserver;
|
||
|
||
use mentat_transaction::{
|
||
CacheAction, CacheDirection, InProgress, InProgressRead, Pullable, Queryable,
|
||
};
|
||
|
||
use crate::conn::Conn;
|
||
|
||
use public_traits::errors::Result;
|
||
|
||
use mentat_transaction::query::{PreparedResult, QueryExplanation, QueryInputs, QueryOutput};
|
||
|
||
#[cfg(feature = "syncable")]
|
||
use mentat_tolstoy::{SyncFollowup, SyncReport, SyncResult};
|
||
|
||
#[cfg(feature = "syncable")]
|
||
use crate::sync::Syncable;
|
||
|
||
/// A convenience wrapper around a single SQLite connection and a Conn. This is suitable
|
||
/// for applications that don't require complex connection management.
|
||
pub struct Store {
|
||
conn: Conn,
|
||
sqlite: rusqlite::Connection,
|
||
}
|
||
|
||
impl Store {
|
||
/// Open a store at the supplied path, ensuring that it includes the bootstrap schema.
|
||
pub fn open(path: &str) -> Result<Store> {
|
||
let mut connection = crate::new_connection(path)?;
|
||
let conn = Conn::connect(&mut connection)?;
|
||
Ok(Store {
|
||
conn,
|
||
sqlite: connection,
|
||
})
|
||
}
|
||
|
||
pub fn transact(&mut self, transaction: &str) -> Result<TxReport> {
|
||
let mut ip = self.begin_transaction()?;
|
||
let report = ip.transact(transaction)?;
|
||
ip.commit()?;
|
||
Ok(report)
|
||
}
|
||
|
||
#[cfg(feature = "syncable")]
|
||
pub fn sync(&mut self, server_uri: &str, user_uuid: &str) -> Result<SyncResult> {
|
||
let mut reports = vec![];
|
||
loop {
|
||
let mut ip = self.begin_transaction()?;
|
||
let report = ip.sync(server_uri, user_uuid)?;
|
||
ip.commit()?;
|
||
|
||
match report {
|
||
SyncReport::Merge(SyncFollowup::FullSync) => {
|
||
reports.push(report);
|
||
continue;
|
||
}
|
||
_ => {
|
||
reports.push(report);
|
||
break;
|
||
}
|
||
}
|
||
}
|
||
if reports.len() == 1 {
|
||
Ok(SyncResult::Atomic(reports[0].clone()))
|
||
} else {
|
||
Ok(SyncResult::NonAtomic(reports))
|
||
}
|
||
}
|
||
}
|
||
|
||
#[cfg(feature = "sqlcipher")]
|
||
impl Store {
|
||
/// Variant of `open` that allows a key (for encryption/decryption) to be
|
||
/// supplied. Fails unless linked against sqlcipher (or something else that
|
||
/// supports the Sqlite Encryption Extension).
|
||
pub fn open_with_key(path: &str, encryption_key: &str) -> Result<Store> {
|
||
let mut connection = crate::new_connection_with_key(path, encryption_key)?;
|
||
let conn = Conn::connect(&mut connection)?;
|
||
Ok(Store {
|
||
conn,
|
||
sqlite: connection,
|
||
})
|
||
}
|
||
|
||
/// Change the key for a database that was opened using `open_with_key` (using `PRAGMA
|
||
/// rekey`). Fails unless linked against sqlcipher (or something else that supports the Sqlite
|
||
/// Encryption Extension).
|
||
pub fn change_encryption_key(&mut self, new_encryption_key: &str) -> Result<()> {
|
||
crate::change_encryption_key(&self.sqlite, new_encryption_key)?;
|
||
Ok(())
|
||
}
|
||
}
|
||
|
||
impl Store {
|
||
/// Intended for use from tests.
|
||
pub fn sqlite_mut(&mut self) -> &mut rusqlite::Connection {
|
||
&mut self.sqlite
|
||
}
|
||
|
||
#[cfg(test)]
|
||
pub fn is_registered_as_observer(&self, key: &str) -> bool {
|
||
self.conn
|
||
.tx_observer_service
|
||
.lock()
|
||
.unwrap()
|
||
.is_registered(key)
|
||
}
|
||
}
|
||
|
||
impl Store {
|
||
pub fn dismantle(self) -> (rusqlite::Connection, Conn) {
|
||
(self.sqlite, self.conn)
|
||
}
|
||
|
||
pub fn conn(&self) -> &Conn {
|
||
&self.conn
|
||
}
|
||
|
||
pub fn begin_read<'m>(&'m mut self) -> Result<InProgressRead<'m, 'm>> {
|
||
self.conn.begin_read(&mut self.sqlite)
|
||
}
|
||
|
||
pub fn begin_transaction<'m>(&'m mut self) -> Result<InProgress<'m, 'm>> {
|
||
self.conn.begin_transaction(&mut self.sqlite)
|
||
}
|
||
|
||
pub fn cache(&mut self, attr: &Keyword, direction: CacheDirection) -> Result<()> {
|
||
let schema = &self.conn.current_schema();
|
||
self.conn.cache(
|
||
&mut self.sqlite,
|
||
schema,
|
||
attr,
|
||
direction,
|
||
CacheAction::Register,
|
||
)
|
||
}
|
||
|
||
pub fn register_observer(&mut self, key: String, observer: Arc<TxObserver>) {
|
||
self.conn.register_observer(key, observer);
|
||
}
|
||
|
||
pub fn unregister_observer(&mut self, key: &str) {
|
||
self.conn.unregister_observer(key);
|
||
}
|
||
|
||
pub fn last_tx_id(&self) -> Entid {
|
||
self.conn.last_tx_id()
|
||
}
|
||
}
|
||
|
||
impl Queryable for Store {
|
||
fn q_once<T>(&self, query: &str, inputs: T) -> Result<QueryOutput>
|
||
where
|
||
T: Into<Option<QueryInputs>>,
|
||
{
|
||
self.conn.q_once(&self.sqlite, query, inputs)
|
||
}
|
||
|
||
fn q_prepare<T>(&self, query: &str, inputs: T) -> PreparedResult<'_>
|
||
where
|
||
T: Into<Option<QueryInputs>>,
|
||
{
|
||
self.conn.q_prepare(&self.sqlite, query, inputs)
|
||
}
|
||
|
||
fn q_explain<T>(&self, query: &str, inputs: T) -> Result<QueryExplanation>
|
||
where
|
||
T: Into<Option<QueryInputs>>,
|
||
{
|
||
self.conn.q_explain(&self.sqlite, query, inputs)
|
||
}
|
||
|
||
fn lookup_values_for_attribute<E>(
|
||
&self,
|
||
entity: E,
|
||
attribute: &edn::Keyword,
|
||
) -> Result<Vec<TypedValue>>
|
||
where
|
||
E: Into<Entid>,
|
||
{
|
||
self.conn
|
||
.lookup_values_for_attribute(&self.sqlite, entity.into(), attribute)
|
||
}
|
||
|
||
fn lookup_value_for_attribute<E>(
|
||
&self,
|
||
entity: E,
|
||
attribute: &edn::Keyword,
|
||
) -> Result<Option<TypedValue>>
|
||
where
|
||
E: Into<Entid>,
|
||
{
|
||
self.conn
|
||
.lookup_value_for_attribute(&self.sqlite, entity.into(), attribute)
|
||
}
|
||
}
|
||
|
||
impl Pullable for Store {
|
||
fn pull_attributes_for_entities<E, A>(
|
||
&self,
|
||
entities: E,
|
||
attributes: A,
|
||
) -> Result<BTreeMap<Entid, ValueRc<StructuredMap>>>
|
||
where
|
||
E: IntoIterator<Item = Entid>,
|
||
A: IntoIterator<Item = Entid>,
|
||
{
|
||
self.conn
|
||
.pull_attributes_for_entities(&self.sqlite, entities, attributes)
|
||
}
|
||
|
||
fn pull_attributes_for_entity<A>(&self, entity: Entid, attributes: A) -> Result<StructuredMap>
|
||
where
|
||
A: IntoIterator<Item = Entid>,
|
||
{
|
||
self.conn
|
||
.pull_attributes_for_entity(&self.sqlite, entity, attributes)
|
||
}
|
||
}
|
||
|
||
#[cfg(test)]
|
||
mod tests {
|
||
use super::*;
|
||
|
||
use uuid::Uuid;
|
||
|
||
use std::collections::BTreeSet;
|
||
use std::path::{Path, PathBuf};
|
||
use std::sync::mpsc;
|
||
use std::sync::Mutex;
|
||
use std::time::Duration;
|
||
|
||
use mentat_db::cache::SQLiteAttributeCache;
|
||
|
||
use core_traits::{TypedValue, ValueType};
|
||
|
||
use mentat_core::{CachedAttributes, HasSchema};
|
||
|
||
use mentat_transaction::entity_builder::BuildTerms;
|
||
|
||
use mentat_transaction::query::PreparedQuery;
|
||
|
||
use mentat_query_algebrizer::QueryInputs;
|
||
|
||
use crate::vocabulary::{AttributeBuilder, Definition, VersionedStore};
|
||
|
||
use core_traits::attribute::Unique;
|
||
|
||
fn fixture_path(rest: &str) -> PathBuf {
|
||
let fixtures = Path::new("fixtures/");
|
||
fixtures.join(Path::new(rest))
|
||
}
|
||
|
||
#[test]
|
||
fn test_prepared_query_with_cache() {
|
||
let mut store = Store::open("").expect("opened");
|
||
let mut in_progress = store.begin_transaction().expect("began");
|
||
in_progress
|
||
.import(fixture_path("cities.schema"))
|
||
.expect("transacted schema");
|
||
in_progress
|
||
.import(fixture_path("all_seattle.edn"))
|
||
.expect("transacted data");
|
||
in_progress
|
||
.cache(
|
||
&kw!(:neighborhood/district),
|
||
CacheDirection::Forward,
|
||
CacheAction::Register,
|
||
)
|
||
.expect("cache done");
|
||
in_progress
|
||
.cache(
|
||
&kw!(:district/name),
|
||
CacheDirection::Forward,
|
||
CacheAction::Register,
|
||
)
|
||
.expect("cache done");
|
||
in_progress
|
||
.cache(
|
||
&kw!(:neighborhood/name),
|
||
CacheDirection::Reverse,
|
||
CacheAction::Register,
|
||
)
|
||
.expect("cache done");
|
||
|
||
let query = r#"[:find ?district
|
||
:in ?hood
|
||
:where
|
||
[?neighborhood :neighborhood/name ?hood]
|
||
[?neighborhood :neighborhood/district ?d]
|
||
[?d :district/name ?district]]"#;
|
||
let hood = "Beacon Hill";
|
||
let inputs =
|
||
QueryInputs::with_value_sequence(vec![(var!(?hood), TypedValue::typed_string(hood))]);
|
||
let mut prepared = in_progress.q_prepare(query, inputs).expect("prepared");
|
||
match prepared {
|
||
PreparedQuery::Constant {
|
||
select: ref _select,
|
||
} => {}
|
||
_ => panic!(),
|
||
};
|
||
|
||
let start = time::Instant::now();
|
||
let results = prepared.run(None).expect("results");
|
||
let end = time::Instant::now();
|
||
println!(
|
||
"Prepared cache execution took {}µs",
|
||
(end - start).whole_microseconds()
|
||
);
|
||
assert_eq!(
|
||
results.into_rel().expect("result"),
|
||
vec![vec![TypedValue::typed_string("Greater Duwamish")]].into()
|
||
);
|
||
}
|
||
|
||
trait StoreCache {
|
||
fn get_entid_for_value(&self, attr: Entid, val: &TypedValue) -> Option<Entid>;
|
||
fn is_attribute_cached_reverse(&self, attr: Entid) -> bool;
|
||
fn is_attribute_cached_forward(&self, attr: Entid) -> bool;
|
||
}
|
||
|
||
impl StoreCache for Store {
|
||
fn get_entid_for_value(&self, attr: Entid, val: &TypedValue) -> Option<Entid> {
|
||
let cache = self.conn.current_cache();
|
||
cache.get_entid_for_value(attr, val)
|
||
}
|
||
|
||
fn is_attribute_cached_forward(&self, attr: Entid) -> bool {
|
||
self.conn.current_cache().is_attribute_cached_forward(attr)
|
||
}
|
||
|
||
fn is_attribute_cached_reverse(&self, attr: Entid) -> bool {
|
||
self.conn.current_cache().is_attribute_cached_reverse(attr)
|
||
}
|
||
}
|
||
|
||
#[test]
|
||
fn test_cache_mutation() {
|
||
let mut store = Store::open("").expect("opened");
|
||
|
||
{
|
||
let mut in_progress = store.begin_transaction().expect("begun");
|
||
in_progress
|
||
.transact(
|
||
r#"[
|
||
{ :db/ident :foo/bar
|
||
:db/cardinality :db.cardinality/one
|
||
:db/index true
|
||
:db/unique :db.unique/identity
|
||
:db/valueType :db.type/long },
|
||
{ :db/ident :foo/baz
|
||
:db/cardinality :db.cardinality/one
|
||
:db/valueType :db.type/boolean }
|
||
{ :db/ident :foo/x
|
||
:db/cardinality :db.cardinality/many
|
||
:db/valueType :db.type/long }]"#,
|
||
)
|
||
.expect("transact");
|
||
|
||
// Cache one….
|
||
in_progress
|
||
.cache(
|
||
&kw!(:foo/bar),
|
||
CacheDirection::Reverse,
|
||
CacheAction::Register,
|
||
)
|
||
.expect("cache done");
|
||
in_progress.commit().expect("commit");
|
||
}
|
||
|
||
let foo_bar = store
|
||
.conn
|
||
.current_schema()
|
||
.get_entid(&kw!(:foo/bar))
|
||
.expect("foo/bar")
|
||
.0;
|
||
let foo_baz = store
|
||
.conn
|
||
.current_schema()
|
||
.get_entid(&kw!(:foo/baz))
|
||
.expect("foo/baz")
|
||
.0;
|
||
let foo_x = store
|
||
.conn
|
||
.current_schema()
|
||
.get_entid(&kw!(:foo/x))
|
||
.expect("foo/x")
|
||
.0;
|
||
|
||
// … and cache the others via the store.
|
||
store
|
||
.cache(&kw!(:foo/baz), CacheDirection::Both)
|
||
.expect("cache done");
|
||
store
|
||
.cache(&kw!(:foo/x), CacheDirection::Forward)
|
||
.expect("cache done");
|
||
{
|
||
assert!(store.is_attribute_cached_reverse(foo_bar));
|
||
assert!(store.is_attribute_cached_forward(foo_baz));
|
||
assert!(store.is_attribute_cached_reverse(foo_baz));
|
||
assert!(store.is_attribute_cached_forward(foo_x));
|
||
}
|
||
|
||
// Add some data.
|
||
{
|
||
let mut in_progress = store.begin_transaction().expect("begun");
|
||
|
||
{
|
||
assert!(in_progress.cache.is_attribute_cached_reverse(foo_bar));
|
||
assert!(in_progress.cache.is_attribute_cached_forward(foo_baz));
|
||
assert!(in_progress.cache.is_attribute_cached_reverse(foo_baz));
|
||
assert!(in_progress.cache.is_attribute_cached_forward(foo_x));
|
||
|
||
assert!(in_progress
|
||
.cache
|
||
.overlay
|
||
.is_attribute_cached_reverse(foo_bar));
|
||
assert!(in_progress
|
||
.cache
|
||
.overlay
|
||
.is_attribute_cached_forward(foo_baz));
|
||
assert!(in_progress
|
||
.cache
|
||
.overlay
|
||
.is_attribute_cached_reverse(foo_baz));
|
||
assert!(in_progress.cache.overlay.is_attribute_cached_forward(foo_x));
|
||
}
|
||
|
||
in_progress
|
||
.transact(
|
||
r#"[
|
||
{:foo/bar 15, :foo/baz false, :foo/x [1, 2, 3]}
|
||
{:foo/bar 99, :foo/baz true}
|
||
{:foo/bar -2, :foo/baz true}
|
||
]"#,
|
||
)
|
||
.expect("transact");
|
||
|
||
// Data is in the cache.
|
||
let first = in_progress
|
||
.cache
|
||
.get_entid_for_value(foo_bar, &TypedValue::Long(15))
|
||
.expect("id");
|
||
assert_eq!(
|
||
in_progress
|
||
.cache
|
||
.get_value_for_entid(&in_progress.schema, foo_baz, first)
|
||
.expect("val"),
|
||
&TypedValue::Boolean(false)
|
||
);
|
||
|
||
// All three values for :foo/x.
|
||
let all_three: BTreeSet<TypedValue> = in_progress
|
||
.cache
|
||
.get_values_for_entid(&in_progress.schema, foo_x, first)
|
||
.expect("val")
|
||
.iter()
|
||
.cloned()
|
||
.collect();
|
||
assert_eq!(
|
||
all_three,
|
||
vec![1, 2, 3].into_iter().map(TypedValue::Long).collect()
|
||
);
|
||
|
||
in_progress.commit().expect("commit");
|
||
}
|
||
|
||
// Data is still in the cache.
|
||
{
|
||
let first = store
|
||
.get_entid_for_value(foo_bar, &TypedValue::Long(15))
|
||
.expect("id");
|
||
let cache: SQLiteAttributeCache = store.conn.current_cache();
|
||
assert_eq!(
|
||
cache
|
||
.get_value_for_entid(&store.conn.current_schema(), foo_baz, first)
|
||
.expect("val"),
|
||
&TypedValue::Boolean(false)
|
||
);
|
||
|
||
let all_three: BTreeSet<TypedValue> = cache
|
||
.get_values_for_entid(&store.conn.current_schema(), foo_x, first)
|
||
.expect("val")
|
||
.iter()
|
||
.cloned()
|
||
.collect();
|
||
assert_eq!(
|
||
all_three,
|
||
vec![1, 2, 3].into_iter().map(TypedValue::Long).collect()
|
||
);
|
||
}
|
||
|
||
// We can remove data and the cache reflects it, immediately and after commit.
|
||
{
|
||
let mut in_progress = store.begin_transaction().expect("began");
|
||
let first = in_progress
|
||
.cache
|
||
.get_entid_for_value(foo_bar, &TypedValue::Long(15))
|
||
.expect("id");
|
||
in_progress
|
||
.transact(format!("[[:db/retract {} :foo/x 2]]", first).as_str())
|
||
.expect("transact");
|
||
|
||
let only_two: BTreeSet<TypedValue> = in_progress
|
||
.cache
|
||
.get_values_for_entid(&in_progress.schema, foo_x, first)
|
||
.expect("val")
|
||
.iter()
|
||
.cloned()
|
||
.collect();
|
||
assert_eq!(
|
||
only_two,
|
||
vec![1, 3].into_iter().map(TypedValue::Long).collect()
|
||
);
|
||
|
||
// Rollback: unchanged.
|
||
}
|
||
{
|
||
let first = store
|
||
.get_entid_for_value(foo_bar, &TypedValue::Long(15))
|
||
.expect("id");
|
||
let cache: SQLiteAttributeCache = store.conn.current_cache();
|
||
assert_eq!(
|
||
cache
|
||
.get_value_for_entid(&store.conn.current_schema(), foo_baz, first)
|
||
.expect("val"),
|
||
&TypedValue::Boolean(false)
|
||
);
|
||
|
||
let all_three: BTreeSet<TypedValue> = cache
|
||
.get_values_for_entid(&store.conn.current_schema(), foo_x, first)
|
||
.expect("val")
|
||
.iter()
|
||
.cloned()
|
||
.collect();
|
||
assert_eq!(
|
||
all_three,
|
||
vec![1, 2, 3].into_iter().map(TypedValue::Long).collect()
|
||
);
|
||
}
|
||
|
||
// Try again, but this time commit.
|
||
{
|
||
let mut in_progress = store.begin_transaction().expect("began");
|
||
let first = in_progress
|
||
.cache
|
||
.get_entid_for_value(foo_bar, &TypedValue::Long(15))
|
||
.expect("id");
|
||
in_progress
|
||
.transact(format!("[[:db/retract {} :foo/x 2]]", first).as_str())
|
||
.expect("transact");
|
||
in_progress.commit().expect("committed");
|
||
}
|
||
{
|
||
let first = store
|
||
.get_entid_for_value(foo_bar, &TypedValue::Long(15))
|
||
.expect("id");
|
||
let cache: SQLiteAttributeCache = store.conn.current_cache();
|
||
assert_eq!(
|
||
cache
|
||
.get_value_for_entid(&store.conn.current_schema(), foo_baz, first)
|
||
.expect("val"),
|
||
&TypedValue::Boolean(false)
|
||
);
|
||
|
||
let only_two: BTreeSet<TypedValue> = cache
|
||
.get_values_for_entid(&store.conn.current_schema(), foo_x, first)
|
||
.expect("val")
|
||
.iter()
|
||
.cloned()
|
||
.collect();
|
||
assert_eq!(
|
||
only_two,
|
||
vec![1, 3].into_iter().map(TypedValue::Long).collect()
|
||
);
|
||
}
|
||
}
|
||
|
||
fn test_register_observer() {
|
||
let mut conn = Store::open("").unwrap();
|
||
|
||
let key = "Test Observer".to_string();
|
||
let tx_observer = TxObserver::new(BTreeSet::new(), move |_obs_key, _batch| {});
|
||
|
||
conn.register_observer(key.clone(), Arc::new(tx_observer));
|
||
assert!(conn.is_registered_as_observer(&key));
|
||
}
|
||
|
||
#[test]
|
||
fn test_deregister_observer() {
|
||
let mut conn = Store::open("").unwrap();
|
||
|
||
let key = "Test Observer".to_string();
|
||
|
||
let tx_observer = TxObserver::new(BTreeSet::new(), move |_obs_key, _batch| {});
|
||
|
||
conn.register_observer(key.clone(), Arc::new(tx_observer));
|
||
assert!(conn.is_registered_as_observer(&key));
|
||
|
||
conn.unregister_observer(&key);
|
||
|
||
assert!(!conn.is_registered_as_observer(&key));
|
||
}
|
||
|
||
fn add_schema(conn: &mut Store) {
|
||
// transact some schema
|
||
let mut in_progress = conn.begin_transaction().expect("expected in progress");
|
||
in_progress
|
||
.ensure_vocabulary(&Definition::new(
|
||
kw!(:todo/items),
|
||
1,
|
||
vec![
|
||
(
|
||
kw!(:todo/uuid),
|
||
AttributeBuilder::helpful()
|
||
.value_type(ValueType::Uuid)
|
||
.multival(false)
|
||
.unique(Unique::Value)
|
||
.index(true)
|
||
.build(),
|
||
),
|
||
(
|
||
kw!(:todo/name),
|
||
AttributeBuilder::helpful()
|
||
.value_type(ValueType::String)
|
||
.multival(false)
|
||
.fulltext(true)
|
||
.build(),
|
||
),
|
||
(
|
||
kw!(:todo/completion_date),
|
||
AttributeBuilder::helpful()
|
||
.value_type(ValueType::Instant)
|
||
.multival(false)
|
||
.build(),
|
||
),
|
||
(
|
||
kw!(:label/name),
|
||
AttributeBuilder::helpful()
|
||
.value_type(ValueType::String)
|
||
.multival(false)
|
||
.unique(Unique::Value)
|
||
.fulltext(true)
|
||
.index(true)
|
||
.build(),
|
||
),
|
||
(
|
||
kw!(:label/color),
|
||
AttributeBuilder::helpful()
|
||
.value_type(ValueType::String)
|
||
.multival(false)
|
||
.build(),
|
||
),
|
||
],
|
||
))
|
||
.expect("expected vocubulary");
|
||
in_progress.commit().expect("Expected vocabulary committed");
|
||
}
|
||
|
||
#[derive(Default, Debug)]
|
||
struct ObserverOutput {
|
||
txids: Vec<i64>,
|
||
changes: Vec<BTreeSet<i64>>,
|
||
called_key: Option<String>,
|
||
}
|
||
|
||
#[test]
|
||
fn test_observer_notified_on_registered_change() {
|
||
let mut conn = Store::open("").unwrap();
|
||
add_schema(&mut conn);
|
||
|
||
let name_entid: Entid = conn
|
||
.conn()
|
||
.current_schema()
|
||
.get_entid(&kw!(:todo/name))
|
||
.expect("entid to exist for name")
|
||
.into();
|
||
let date_entid: Entid = conn
|
||
.conn()
|
||
.current_schema()
|
||
.get_entid(&kw!(:todo/completion_date))
|
||
.expect("entid to exist for completion_date")
|
||
.into();
|
||
let mut registered_attrs = BTreeSet::new();
|
||
registered_attrs.insert(name_entid);
|
||
registered_attrs.insert(date_entid);
|
||
|
||
let key = "Test Observing".to_string();
|
||
|
||
let output = Arc::new(Mutex::new(ObserverOutput::default()));
|
||
|
||
let mut_output = Arc::downgrade(&output);
|
||
let (tx, rx): (mpsc::Sender<()>, mpsc::Receiver<()>) = mpsc::channel();
|
||
// because the TxObserver is in an Arc and is therefore Sync, we have to wrap the Sender in a Mutex to also
|
||
// make it Sync.
|
||
let thread_tx = Mutex::new(tx);
|
||
let tx_observer = Arc::new(TxObserver::new(registered_attrs, move |obs_key, batch| {
|
||
if let Some(out) = mut_output.upgrade() {
|
||
let mut o = out.lock().unwrap();
|
||
o.called_key = Some(obs_key.to_string());
|
||
for (tx_id, changes) in batch.into_iter() {
|
||
o.txids.push(*tx_id);
|
||
o.changes.push(changes.clone());
|
||
}
|
||
o.txids.sort_unstable();
|
||
}
|
||
thread_tx.lock().unwrap().send(()).unwrap();
|
||
}));
|
||
|
||
conn.register_observer(key.clone(), Arc::clone(&tx_observer));
|
||
assert!(conn.is_registered_as_observer(&key));
|
||
|
||
let mut tx_ids = Vec::new();
|
||
let mut changesets = Vec::new();
|
||
let db_tx_instant_entid: Entid = conn
|
||
.conn()
|
||
.current_schema()
|
||
.get_entid(&kw!(:db/txInstant))
|
||
.expect("entid to exist for :db/txInstant")
|
||
.into();
|
||
let uuid_entid: Entid = conn
|
||
.conn()
|
||
.current_schema()
|
||
.get_entid(&kw!(:todo/uuid))
|
||
.expect("entid to exist for name")
|
||
.into();
|
||
{
|
||
let mut in_progress = conn.begin_transaction().expect("expected transaction");
|
||
for i in 0..3 {
|
||
let mut changeset = BTreeSet::new();
|
||
changeset.insert(db_tx_instant_entid);
|
||
let name = format!("todo{}", i);
|
||
let uuid = Uuid::new_v4();
|
||
let mut builder = in_progress.builder().describe_tempid(&name);
|
||
builder
|
||
.add(kw!(:todo/uuid), TypedValue::Uuid(uuid))
|
||
.expect("Expected added uuid");
|
||
changeset.insert(uuid_entid);
|
||
builder
|
||
.add(kw!(:todo/name), TypedValue::typed_string(name))
|
||
.expect("Expected added name");
|
||
changeset.insert(name_entid);
|
||
if i % 2 == 0 {
|
||
builder
|
||
.add(kw!(:todo/completion_date), TypedValue::current_instant())
|
||
.expect("Expected added date");
|
||
changeset.insert(date_entid);
|
||
}
|
||
let (ip, r) = builder.transact();
|
||
let report = r.expect("expected a report");
|
||
tx_ids.push(report.tx_id);
|
||
changesets.push(changeset);
|
||
in_progress = ip;
|
||
}
|
||
let mut builder = in_progress.builder().describe_tempid("Label");
|
||
builder
|
||
.add(kw!(:label/name), TypedValue::typed_string("Label 1"))
|
||
.expect("Expected added name");
|
||
builder
|
||
.add(kw!(:label/color), TypedValue::typed_string("blue"))
|
||
.expect("Expected added color");
|
||
builder.commit().expect("expect transaction to occur");
|
||
}
|
||
|
||
let delay = Duration::from_millis(100);
|
||
let _ = rx.recv_timeout(delay);
|
||
|
||
let out = Arc::try_unwrap(output).expect("unwrapped");
|
||
let o = out.into_inner().expect("Expected an Output");
|
||
assert_eq!(o.called_key, Some(key));
|
||
assert_eq!(o.txids, tx_ids);
|
||
assert_eq!(o.changes, changesets);
|
||
}
|
||
|
||
#[test]
|
||
fn test_observer_not_notified_on_unregistered_change() {
|
||
let mut conn = Store::open("").unwrap();
|
||
add_schema(&mut conn);
|
||
|
||
let name_entid: Entid = conn
|
||
.conn()
|
||
.current_schema()
|
||
.get_entid(&kw!(:todo/name))
|
||
.expect("entid to exist for name")
|
||
.into();
|
||
let date_entid: Entid = conn
|
||
.conn()
|
||
.current_schema()
|
||
.get_entid(&kw!(:todo/completion_date))
|
||
.expect("entid to exist for completion_date")
|
||
.into();
|
||
let mut registered_attrs = BTreeSet::new();
|
||
registered_attrs.insert(name_entid);
|
||
registered_attrs.insert(date_entid);
|
||
|
||
let key = "Test Observing".to_string();
|
||
|
||
let output = Arc::new(Mutex::new(ObserverOutput::default()));
|
||
|
||
let mut_output = Arc::downgrade(&output);
|
||
let (tx, rx): (mpsc::Sender<()>, mpsc::Receiver<()>) = mpsc::channel();
|
||
let thread_tx = Mutex::new(tx);
|
||
let tx_observer = Arc::new(TxObserver::new(registered_attrs, move |obs_key, batch| {
|
||
if let Some(out) = mut_output.upgrade() {
|
||
let mut o = out.lock().unwrap();
|
||
o.called_key = Some(obs_key.to_string());
|
||
for (tx_id, changes) in batch.into_iter() {
|
||
o.txids.push(*tx_id);
|
||
o.changes.push(changes.clone());
|
||
}
|
||
o.txids.sort_unstable();
|
||
}
|
||
thread_tx.lock().unwrap().send(()).unwrap();
|
||
}));
|
||
|
||
conn.register_observer(key.clone(), Arc::clone(&tx_observer));
|
||
assert!(conn.is_registered_as_observer(&key));
|
||
|
||
let tx_ids = Vec::<Entid>::new();
|
||
let changesets = Vec::<BTreeSet<Entid>>::new();
|
||
{
|
||
let mut in_progress = conn.begin_transaction().expect("expected transaction");
|
||
for i in 0..3 {
|
||
let name = format!("label{}", i);
|
||
let mut builder = in_progress.builder().describe_tempid(&name);
|
||
builder
|
||
.add(kw!(:label/name), TypedValue::typed_string(name))
|
||
.expect("Expected added name");
|
||
builder
|
||
.add(kw!(:label/color), TypedValue::typed_string("blue"))
|
||
.expect("Expected added color");
|
||
let (ip, _) = builder.transact();
|
||
in_progress = ip;
|
||
}
|
||
}
|
||
|
||
let delay = Duration::from_millis(100);
|
||
let _ = rx.recv_timeout(delay);
|
||
|
||
let out = Arc::try_unwrap(output).expect("unwrapped");
|
||
let o = out.into_inner().expect("Expected an Output");
|
||
assert_eq!(o.called_key, None);
|
||
assert_eq!(o.txids, tx_ids);
|
||
assert_eq!(o.changes, changesets);
|
||
}
|
||
}
|