mentat/src/store.rs

874 lines
29 KiB
Rust
Raw Normal View History

// Copyright 2016 Mozilla
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// this file except in compliance with the License. You may obtain a copy of the
// License at http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
#![allow(dead_code)]
use std::collections::BTreeMap;
use std::sync::Arc;
use rusqlite;
use edn;
use core_traits::{Entid, StructuredMap, TypedValue};
use mentat_core::{Keyword, TxReport, ValueRc};
use mentat_db::TxObserver;
use mentat_transaction::{
CacheAction, CacheDirection, InProgress, InProgressRead, Pullable, Queryable,
};
use crate::conn::Conn;
use public_traits::errors::Result;
use mentat_transaction::query::{PreparedResult, QueryExplanation, QueryInputs, QueryOutput};
Basic sync support (#563) r=nalexander * Pre: remove remnants of 'open_empty' * Pre: Cleanup 'datoms' table after a timeline move Since timeline move operations use a transactor, they generate a "phantom" 'tx' and a 'txInstant' assertion. It is "phantom" in a sense that it was never present in the 'transactions' table, and is entirely synthetic as far as our database is concerned. It's an implementational artifact, and we were not cleaning it up. It becomes a problem when we start inserting transactions after a move. Once the transactor clashes with the phantom 'tx', it will retract the phantom 'txInstant' value, leaving the transactions log in an incorrect state. This patch adds a test for this scenario and elects the easy way out: simply remove the offending 'txInstant' datom. * Part 1: Sync without support for side-effects A "side-effect" is defined here as a mutation of a remote state as part of the sync. If, during a sync we determine that a remote state needs to be changed, bail out. This generally supports different variations of "baton-passing" syncing, where clients will succeed syncing if each change is non-conflicting. * Part 2: Support basic "side-effects" syncing This patch introduces a concept of a follow-up sync. If a sync generated a "merge transaction" (a regular transaction that contains assertions necessary for local and remote transaction logs to converge), then this transaction needs to be uploaded in a follow-up sync. Generated SyncReport indicates if a follow-up sync is required. Follow-up sync itself is just a regular sync. If remote state did not change, it will result in a simple RemoteFastForward. Otherwise, we'll continue merging and requesting a follow-up. Schema alterations are explicitly not supported. As local transactions are rebased on top of remote, following changes happen: - entids are changed into tempids, letting transactor upsert :db/unique values - entids for retractions are changed into lookup-refs if we're confident they'll succeed -- otherwise, retractions are dropped on the floor * Post: use a macro for more readable tests * Tolstoy README
2018-09-08 02:18:20 +00:00
#[cfg(feature = "syncable")]
use mentat_tolstoy::{SyncFollowup, SyncReport, SyncResult};
Basic sync support (#563) r=nalexander * Pre: remove remnants of 'open_empty' * Pre: Cleanup 'datoms' table after a timeline move Since timeline move operations use a transactor, they generate a "phantom" 'tx' and a 'txInstant' assertion. It is "phantom" in a sense that it was never present in the 'transactions' table, and is entirely synthetic as far as our database is concerned. It's an implementational artifact, and we were not cleaning it up. It becomes a problem when we start inserting transactions after a move. Once the transactor clashes with the phantom 'tx', it will retract the phantom 'txInstant' value, leaving the transactions log in an incorrect state. This patch adds a test for this scenario and elects the easy way out: simply remove the offending 'txInstant' datom. * Part 1: Sync without support for side-effects A "side-effect" is defined here as a mutation of a remote state as part of the sync. If, during a sync we determine that a remote state needs to be changed, bail out. This generally supports different variations of "baton-passing" syncing, where clients will succeed syncing if each change is non-conflicting. * Part 2: Support basic "side-effects" syncing This patch introduces a concept of a follow-up sync. If a sync generated a "merge transaction" (a regular transaction that contains assertions necessary for local and remote transaction logs to converge), then this transaction needs to be uploaded in a follow-up sync. Generated SyncReport indicates if a follow-up sync is required. Follow-up sync itself is just a regular sync. If remote state did not change, it will result in a simple RemoteFastForward. Otherwise, we'll continue merging and requesting a follow-up. Schema alterations are explicitly not supported. As local transactions are rebased on top of remote, following changes happen: - entids are changed into tempids, letting transactor upsert :db/unique values - entids for retractions are changed into lookup-refs if we're confident they'll succeed -- otherwise, retractions are dropped on the floor * Post: use a macro for more readable tests * Tolstoy README
2018-09-08 02:18:20 +00:00
#[cfg(feature = "syncable")]
use crate::sync::Syncable;
Basic sync support (#563) r=nalexander * Pre: remove remnants of 'open_empty' * Pre: Cleanup 'datoms' table after a timeline move Since timeline move operations use a transactor, they generate a "phantom" 'tx' and a 'txInstant' assertion. It is "phantom" in a sense that it was never present in the 'transactions' table, and is entirely synthetic as far as our database is concerned. It's an implementational artifact, and we were not cleaning it up. It becomes a problem when we start inserting transactions after a move. Once the transactor clashes with the phantom 'tx', it will retract the phantom 'txInstant' value, leaving the transactions log in an incorrect state. This patch adds a test for this scenario and elects the easy way out: simply remove the offending 'txInstant' datom. * Part 1: Sync without support for side-effects A "side-effect" is defined here as a mutation of a remote state as part of the sync. If, during a sync we determine that a remote state needs to be changed, bail out. This generally supports different variations of "baton-passing" syncing, where clients will succeed syncing if each change is non-conflicting. * Part 2: Support basic "side-effects" syncing This patch introduces a concept of a follow-up sync. If a sync generated a "merge transaction" (a regular transaction that contains assertions necessary for local and remote transaction logs to converge), then this transaction needs to be uploaded in a follow-up sync. Generated SyncReport indicates if a follow-up sync is required. Follow-up sync itself is just a regular sync. If remote state did not change, it will result in a simple RemoteFastForward. Otherwise, we'll continue merging and requesting a follow-up. Schema alterations are explicitly not supported. As local transactions are rebased on top of remote, following changes happen: - entids are changed into tempids, letting transactor upsert :db/unique values - entids for retractions are changed into lookup-refs if we're confident they'll succeed -- otherwise, retractions are dropped on the floor * Post: use a macro for more readable tests * Tolstoy README
2018-09-08 02:18:20 +00:00
/// A convenience wrapper around a single SQLite connection and a Conn. This is suitable
/// for applications that don't require complex connection management.
pub struct Store {
conn: Conn,
sqlite: rusqlite::Connection,
}
impl Store {
/// Open a store at the supplied path, ensuring that it includes the bootstrap schema.
pub fn open(path: &str) -> Result<Store> {
let mut connection = crate::new_connection(path)?;
let conn = Conn::connect(&mut connection)?;
Ok(Store {
conn,
sqlite: connection,
})
}
pub fn transact(&mut self, transaction: &str) -> Result<TxReport> {
let mut ip = self.begin_transaction()?;
let report = ip.transact(transaction)?;
ip.commit()?;
Ok(report)
}
Basic sync support (#563) r=nalexander * Pre: remove remnants of 'open_empty' * Pre: Cleanup 'datoms' table after a timeline move Since timeline move operations use a transactor, they generate a "phantom" 'tx' and a 'txInstant' assertion. It is "phantom" in a sense that it was never present in the 'transactions' table, and is entirely synthetic as far as our database is concerned. It's an implementational artifact, and we were not cleaning it up. It becomes a problem when we start inserting transactions after a move. Once the transactor clashes with the phantom 'tx', it will retract the phantom 'txInstant' value, leaving the transactions log in an incorrect state. This patch adds a test for this scenario and elects the easy way out: simply remove the offending 'txInstant' datom. * Part 1: Sync without support for side-effects A "side-effect" is defined here as a mutation of a remote state as part of the sync. If, during a sync we determine that a remote state needs to be changed, bail out. This generally supports different variations of "baton-passing" syncing, where clients will succeed syncing if each change is non-conflicting. * Part 2: Support basic "side-effects" syncing This patch introduces a concept of a follow-up sync. If a sync generated a "merge transaction" (a regular transaction that contains assertions necessary for local and remote transaction logs to converge), then this transaction needs to be uploaded in a follow-up sync. Generated SyncReport indicates if a follow-up sync is required. Follow-up sync itself is just a regular sync. If remote state did not change, it will result in a simple RemoteFastForward. Otherwise, we'll continue merging and requesting a follow-up. Schema alterations are explicitly not supported. As local transactions are rebased on top of remote, following changes happen: - entids are changed into tempids, letting transactor upsert :db/unique values - entids for retractions are changed into lookup-refs if we're confident they'll succeed -- otherwise, retractions are dropped on the floor * Post: use a macro for more readable tests * Tolstoy README
2018-09-08 02:18:20 +00:00
#[cfg(feature = "syncable")]
pub fn sync(&mut self, server_uri: &String, user_uuid: &String) -> Result<SyncResult> {
let mut reports = vec![];
loop {
let mut ip = self.begin_transaction()?;
let report = ip.sync(server_uri, user_uuid)?;
ip.commit()?;
match report {
SyncReport::Merge(SyncFollowup::FullSync) => {
reports.push(report);
continue;
}
Basic sync support (#563) r=nalexander * Pre: remove remnants of 'open_empty' * Pre: Cleanup 'datoms' table after a timeline move Since timeline move operations use a transactor, they generate a "phantom" 'tx' and a 'txInstant' assertion. It is "phantom" in a sense that it was never present in the 'transactions' table, and is entirely synthetic as far as our database is concerned. It's an implementational artifact, and we were not cleaning it up. It becomes a problem when we start inserting transactions after a move. Once the transactor clashes with the phantom 'tx', it will retract the phantom 'txInstant' value, leaving the transactions log in an incorrect state. This patch adds a test for this scenario and elects the easy way out: simply remove the offending 'txInstant' datom. * Part 1: Sync without support for side-effects A "side-effect" is defined here as a mutation of a remote state as part of the sync. If, during a sync we determine that a remote state needs to be changed, bail out. This generally supports different variations of "baton-passing" syncing, where clients will succeed syncing if each change is non-conflicting. * Part 2: Support basic "side-effects" syncing This patch introduces a concept of a follow-up sync. If a sync generated a "merge transaction" (a regular transaction that contains assertions necessary for local and remote transaction logs to converge), then this transaction needs to be uploaded in a follow-up sync. Generated SyncReport indicates if a follow-up sync is required. Follow-up sync itself is just a regular sync. If remote state did not change, it will result in a simple RemoteFastForward. Otherwise, we'll continue merging and requesting a follow-up. Schema alterations are explicitly not supported. As local transactions are rebased on top of remote, following changes happen: - entids are changed into tempids, letting transactor upsert :db/unique values - entids for retractions are changed into lookup-refs if we're confident they'll succeed -- otherwise, retractions are dropped on the floor * Post: use a macro for more readable tests * Tolstoy README
2018-09-08 02:18:20 +00:00
_ => {
reports.push(report);
break;
}
Basic sync support (#563) r=nalexander * Pre: remove remnants of 'open_empty' * Pre: Cleanup 'datoms' table after a timeline move Since timeline move operations use a transactor, they generate a "phantom" 'tx' and a 'txInstant' assertion. It is "phantom" in a sense that it was never present in the 'transactions' table, and is entirely synthetic as far as our database is concerned. It's an implementational artifact, and we were not cleaning it up. It becomes a problem when we start inserting transactions after a move. Once the transactor clashes with the phantom 'tx', it will retract the phantom 'txInstant' value, leaving the transactions log in an incorrect state. This patch adds a test for this scenario and elects the easy way out: simply remove the offending 'txInstant' datom. * Part 1: Sync without support for side-effects A "side-effect" is defined here as a mutation of a remote state as part of the sync. If, during a sync we determine that a remote state needs to be changed, bail out. This generally supports different variations of "baton-passing" syncing, where clients will succeed syncing if each change is non-conflicting. * Part 2: Support basic "side-effects" syncing This patch introduces a concept of a follow-up sync. If a sync generated a "merge transaction" (a regular transaction that contains assertions necessary for local and remote transaction logs to converge), then this transaction needs to be uploaded in a follow-up sync. Generated SyncReport indicates if a follow-up sync is required. Follow-up sync itself is just a regular sync. If remote state did not change, it will result in a simple RemoteFastForward. Otherwise, we'll continue merging and requesting a follow-up. Schema alterations are explicitly not supported. As local transactions are rebased on top of remote, following changes happen: - entids are changed into tempids, letting transactor upsert :db/unique values - entids for retractions are changed into lookup-refs if we're confident they'll succeed -- otherwise, retractions are dropped on the floor * Post: use a macro for more readable tests * Tolstoy README
2018-09-08 02:18:20 +00:00
}
}
if reports.len() == 1 {
Ok(SyncResult::Atomic(reports[0].clone()))
} else {
Ok(SyncResult::NonAtomic(reports))
}
}
}
#[cfg(feature = "sqlcipher")]
impl Store {
/// Variant of `open` that allows a key (for encryption/decryption) to be
/// supplied. Fails unless linked against sqlcipher (or something else that
/// supports the Sqlite Encryption Extension).
pub fn open_with_key(path: &str, encryption_key: &str) -> Result<Store> {
let mut connection = crate::new_connection_with_key(path, encryption_key)?;
let conn = Conn::connect(&mut connection)?;
Ok(Store {
conn,
sqlite: connection,
})
}
/// Change the key for a database that was opened using `open_with_key` (using `PRAGMA
/// rekey`). Fails unless linked against sqlcipher (or something else that supports the Sqlite
/// Encryption Extension).
pub fn change_encryption_key(&mut self, new_encryption_key: &str) -> Result<()> {
crate::change_encryption_key(&self.sqlite, new_encryption_key)?;
Ok(())
}
}
impl Store {
/// Intended for use from tests.
pub fn sqlite_mut(&mut self) -> &mut rusqlite::Connection {
&mut self.sqlite
}
#[cfg(test)]
pub fn is_registered_as_observer(&self, key: &String) -> bool {
self.conn
.tx_observer_service
.lock()
.unwrap()
.is_registered(key)
}
}
impl Store {
pub fn dismantle(self) -> (rusqlite::Connection, Conn) {
(self.sqlite, self.conn)
}
pub fn conn(&self) -> &Conn {
&self.conn
}
pub fn begin_read<'m>(&'m mut self) -> Result<InProgressRead<'m, 'm>> {
self.conn.begin_read(&mut self.sqlite)
}
pub fn begin_transaction<'m>(&'m mut self) -> Result<InProgress<'m, 'm>> {
self.conn.begin_transaction(&mut self.sqlite)
}
pub fn cache(&mut self, attr: &Keyword, direction: CacheDirection) -> Result<()> {
let schema = &self.conn.current_schema();
self.conn.cache(
&mut self.sqlite,
schema,
attr,
direction,
CacheAction::Register,
)
}
pub fn register_observer(&mut self, key: String, observer: Arc<TxObserver>) {
self.conn.register_observer(key, observer);
}
pub fn unregister_observer(&mut self, key: &String) {
self.conn.unregister_observer(key);
}
2018-05-02 22:38:12 +00:00
pub fn last_tx_id(&self) -> Entid {
self.conn.last_tx_id()
}
}
impl Queryable for Store {
fn q_once<T>(&self, query: &str, inputs: T) -> Result<QueryOutput>
where
T: Into<Option<QueryInputs>>,
{
self.conn.q_once(&self.sqlite, query, inputs)
}
fn q_prepare<T>(&self, query: &str, inputs: T) -> PreparedResult<'_>
where
T: Into<Option<QueryInputs>>,
{
self.conn.q_prepare(&self.sqlite, query, inputs)
}
fn q_explain<T>(&self, query: &str, inputs: T) -> Result<QueryExplanation>
where
T: Into<Option<QueryInputs>>,
{
self.conn.q_explain(&self.sqlite, query, inputs)
}
fn lookup_values_for_attribute<E>(
&self,
entity: E,
attribute: &edn::Keyword,
) -> Result<Vec<TypedValue>>
where
E: Into<Entid>,
{
self.conn
.lookup_values_for_attribute(&self.sqlite, entity.into(), attribute)
}
fn lookup_value_for_attribute<E>(
&self,
entity: E,
attribute: &edn::Keyword,
) -> Result<Option<TypedValue>>
where
E: Into<Entid>,
{
self.conn
.lookup_value_for_attribute(&self.sqlite, entity.into(), attribute)
}
}
impl Pullable for Store {
fn pull_attributes_for_entities<E, A>(
&self,
entities: E,
attributes: A,
) -> Result<BTreeMap<Entid, ValueRc<StructuredMap>>>
where
E: IntoIterator<Item = Entid>,
A: IntoIterator<Item = Entid>,
{
self.conn
.pull_attributes_for_entities(&self.sqlite, entities, attributes)
}
fn pull_attributes_for_entity<A>(&self, entity: Entid, attributes: A) -> Result<StructuredMap>
where
A: IntoIterator<Item = Entid>,
{
self.conn
.pull_attributes_for_entity(&self.sqlite, entity, attributes)
}
}
#[cfg(test)]
mod tests {
use super::*;
use time;
Basic sync support (#563) r=nalexander * Pre: remove remnants of 'open_empty' * Pre: Cleanup 'datoms' table after a timeline move Since timeline move operations use a transactor, they generate a "phantom" 'tx' and a 'txInstant' assertion. It is "phantom" in a sense that it was never present in the 'transactions' table, and is entirely synthetic as far as our database is concerned. It's an implementational artifact, and we were not cleaning it up. It becomes a problem when we start inserting transactions after a move. Once the transactor clashes with the phantom 'tx', it will retract the phantom 'txInstant' value, leaving the transactions log in an incorrect state. This patch adds a test for this scenario and elects the easy way out: simply remove the offending 'txInstant' datom. * Part 1: Sync without support for side-effects A "side-effect" is defined here as a mutation of a remote state as part of the sync. If, during a sync we determine that a remote state needs to be changed, bail out. This generally supports different variations of "baton-passing" syncing, where clients will succeed syncing if each change is non-conflicting. * Part 2: Support basic "side-effects" syncing This patch introduces a concept of a follow-up sync. If a sync generated a "merge transaction" (a regular transaction that contains assertions necessary for local and remote transaction logs to converge), then this transaction needs to be uploaded in a follow-up sync. Generated SyncReport indicates if a follow-up sync is required. Follow-up sync itself is just a regular sync. If remote state did not change, it will result in a simple RemoteFastForward. Otherwise, we'll continue merging and requesting a follow-up. Schema alterations are explicitly not supported. As local transactions are rebased on top of remote, following changes happen: - entids are changed into tempids, letting transactor upsert :db/unique values - entids for retractions are changed into lookup-refs if we're confident they'll succeed -- otherwise, retractions are dropped on the floor * Post: use a macro for more readable tests * Tolstoy README
2018-09-08 02:18:20 +00:00
use uuid::Uuid;
use std::collections::BTreeSet;
use std::path::{Path, PathBuf};
use std::sync::mpsc;
use std::sync::Mutex;
use std::time::Duration;
use mentat_db::cache::SQLiteAttributeCache;
use core_traits::{TypedValue, ValueType};
use mentat_core::{CachedAttributes, HasSchema};
use mentat_transaction::entity_builder::BuildTerms;
use mentat_transaction::query::PreparedQuery;
use mentat_query_algebrizer::QueryInputs;
use crate::vocabulary::{AttributeBuilder, Definition, VersionedStore};
use core_traits::attribute::Unique;
fn fixture_path(rest: &str) -> PathBuf {
let fixtures = Path::new("fixtures/");
fixtures.join(Path::new(rest))
}
#[test]
fn test_prepared_query_with_cache() {
let mut store = Store::open("").expect("opened");
let mut in_progress = store.begin_transaction().expect("began");
in_progress
.import(fixture_path("cities.schema"))
.expect("transacted schema");
in_progress
.import(fixture_path("all_seattle.edn"))
.expect("transacted data");
in_progress
.cache(
&kw!(:neighborhood/district),
CacheDirection::Forward,
CacheAction::Register,
)
.expect("cache done");
in_progress
.cache(
&kw!(:district/name),
CacheDirection::Forward,
CacheAction::Register,
)
.expect("cache done");
in_progress
.cache(
&kw!(:neighborhood/name),
CacheDirection::Reverse,
CacheAction::Register,
)
.expect("cache done");
let query = r#"[:find ?district
:in ?hood
:where
[?neighborhood :neighborhood/name ?hood]
[?neighborhood :neighborhood/district ?d]
[?d :district/name ?district]]"#;
let hood = "Beacon Hill";
let inputs = QueryInputs::with_value_sequence(vec![(
var!(?hood),
TypedValue::typed_string(hood).into(),
)]);
let mut prepared = in_progress.q_prepare(query, inputs).expect("prepared");
match &prepared {
&PreparedQuery::Constant {
select: ref _select,
} => {}
_ => panic!(),
};
let start = time::Instant::now();
let results = prepared.run(None).expect("results");
let end = time::Instant::now();
println!(
"Prepared cache execution took {}µs",
(end - start).whole_microseconds()
);
assert_eq!(
results.into_rel().expect("result"),
vec![vec![TypedValue::typed_string("Greater Duwamish")]].into()
);
}
trait StoreCache {
fn get_entid_for_value(&self, attr: Entid, val: &TypedValue) -> Option<Entid>;
fn is_attribute_cached_reverse(&self, attr: Entid) -> bool;
fn is_attribute_cached_forward(&self, attr: Entid) -> bool;
}
impl StoreCache for Store {
fn get_entid_for_value(&self, attr: Entid, val: &TypedValue) -> Option<Entid> {
let cache = self.conn.current_cache();
cache.get_entid_for_value(attr, val)
}
fn is_attribute_cached_forward(&self, attr: Entid) -> bool {
self.conn.current_cache().is_attribute_cached_forward(attr)
}
fn is_attribute_cached_reverse(&self, attr: Entid) -> bool {
self.conn.current_cache().is_attribute_cached_reverse(attr)
}
}
#[test]
fn test_cache_mutation() {
let mut store = Store::open("").expect("opened");
{
let mut in_progress = store.begin_transaction().expect("begun");
in_progress
.transact(
r#"[
{ :db/ident :foo/bar
:db/cardinality :db.cardinality/one
:db/index true
:db/unique :db.unique/identity
:db/valueType :db.type/long },
{ :db/ident :foo/baz
:db/cardinality :db.cardinality/one
:db/valueType :db.type/boolean }
{ :db/ident :foo/x
:db/cardinality :db.cardinality/many
:db/valueType :db.type/long }]"#,
)
.expect("transact");
// Cache one….
in_progress
.cache(
&kw!(:foo/bar),
CacheDirection::Reverse,
CacheAction::Register,
)
.expect("cache done");
in_progress.commit().expect("commit");
}
let foo_bar = store
.conn
.current_schema()
.get_entid(&kw!(:foo/bar))
.expect("foo/bar")
.0;
let foo_baz = store
.conn
.current_schema()
.get_entid(&kw!(:foo/baz))
.expect("foo/baz")
.0;
let foo_x = store
.conn
.current_schema()
.get_entid(&kw!(:foo/x))
.expect("foo/x")
.0;
// … and cache the others via the store.
store
.cache(&kw!(:foo/baz), CacheDirection::Both)
.expect("cache done");
store
.cache(&kw!(:foo/x), CacheDirection::Forward)
.expect("cache done");
{
assert!(store.is_attribute_cached_reverse(foo_bar));
assert!(store.is_attribute_cached_forward(foo_baz));
assert!(store.is_attribute_cached_reverse(foo_baz));
assert!(store.is_attribute_cached_forward(foo_x));
}
// Add some data.
{
let mut in_progress = store.begin_transaction().expect("begun");
{
assert!(in_progress.cache.is_attribute_cached_reverse(foo_bar));
assert!(in_progress.cache.is_attribute_cached_forward(foo_baz));
assert!(in_progress.cache.is_attribute_cached_reverse(foo_baz));
assert!(in_progress.cache.is_attribute_cached_forward(foo_x));
assert!(in_progress
.cache
.overlay
.is_attribute_cached_reverse(foo_bar));
assert!(in_progress
.cache
.overlay
.is_attribute_cached_forward(foo_baz));
assert!(in_progress
.cache
.overlay
.is_attribute_cached_reverse(foo_baz));
assert!(in_progress.cache.overlay.is_attribute_cached_forward(foo_x));
}
in_progress
.transact(
r#"[
{:foo/bar 15, :foo/baz false, :foo/x [1, 2, 3]}
{:foo/bar 99, :foo/baz true}
{:foo/bar -2, :foo/baz true}
]"#,
)
.expect("transact");
// Data is in the cache.
let first = in_progress
.cache
.get_entid_for_value(foo_bar, &TypedValue::Long(15))
.expect("id");
assert_eq!(
in_progress
.cache
.get_value_for_entid(&in_progress.schema, foo_baz, first)
.expect("val"),
&TypedValue::Boolean(false)
);
// All three values for :foo/x.
let all_three: BTreeSet<TypedValue> = in_progress
.cache
.get_values_for_entid(&in_progress.schema, foo_x, first)
.expect("val")
.iter()
.cloned()
.collect();
assert_eq!(
all_three,
vec![1, 2, 3].into_iter().map(TypedValue::Long).collect()
);
in_progress.commit().expect("commit");
}
// Data is still in the cache.
{
let first = store
.get_entid_for_value(foo_bar, &TypedValue::Long(15))
.expect("id");
let cache: SQLiteAttributeCache = store.conn.current_cache();
assert_eq!(
cache
.get_value_for_entid(&store.conn.current_schema(), foo_baz, first)
.expect("val"),
&TypedValue::Boolean(false)
);
let all_three: BTreeSet<TypedValue> = cache
.get_values_for_entid(&store.conn.current_schema(), foo_x, first)
.expect("val")
.iter()
.cloned()
.collect();
assert_eq!(
all_three,
vec![1, 2, 3].into_iter().map(TypedValue::Long).collect()
);
}
// We can remove data and the cache reflects it, immediately and after commit.
{
let mut in_progress = store.begin_transaction().expect("began");
let first = in_progress
.cache
.get_entid_for_value(foo_bar, &TypedValue::Long(15))
.expect("id");
in_progress
.transact(format!("[[:db/retract {} :foo/x 2]]", first).as_str())
.expect("transact");
let only_two: BTreeSet<TypedValue> = in_progress
.cache
.get_values_for_entid(&in_progress.schema, foo_x, first)
.expect("val")
.iter()
.cloned()
.collect();
assert_eq!(
only_two,
vec![1, 3].into_iter().map(TypedValue::Long).collect()
);
// Rollback: unchanged.
}
{
let first = store
.get_entid_for_value(foo_bar, &TypedValue::Long(15))
.expect("id");
let cache: SQLiteAttributeCache = store.conn.current_cache();
assert_eq!(
cache
.get_value_for_entid(&store.conn.current_schema(), foo_baz, first)
.expect("val"),
&TypedValue::Boolean(false)
);
let all_three: BTreeSet<TypedValue> = cache
.get_values_for_entid(&store.conn.current_schema(), foo_x, first)
.expect("val")
.iter()
.cloned()
.collect();
assert_eq!(
all_three,
vec![1, 2, 3].into_iter().map(TypedValue::Long).collect()
);
}
// Try again, but this time commit.
{
let mut in_progress = store.begin_transaction().expect("began");
let first = in_progress
.cache
.get_entid_for_value(foo_bar, &TypedValue::Long(15))
.expect("id");
in_progress
.transact(format!("[[:db/retract {} :foo/x 2]]", first).as_str())
.expect("transact");
in_progress.commit().expect("committed");
}
{
let first = store
.get_entid_for_value(foo_bar, &TypedValue::Long(15))
.expect("id");
let cache: SQLiteAttributeCache = store.conn.current_cache();
assert_eq!(
cache
.get_value_for_entid(&store.conn.current_schema(), foo_baz, first)
.expect("val"),
&TypedValue::Boolean(false)
);
let only_two: BTreeSet<TypedValue> = cache
.get_values_for_entid(&store.conn.current_schema(), foo_x, first)
.expect("val")
.iter()
.cloned()
.collect();
assert_eq!(
only_two,
vec![1, 3].into_iter().map(TypedValue::Long).collect()
);
}
}
fn test_register_observer() {
let mut conn = Store::open("").unwrap();
let key = "Test Observer".to_string();
let tx_observer = TxObserver::new(BTreeSet::new(), move |_obs_key, _batch| {});
conn.register_observer(key.clone(), Arc::new(tx_observer));
assert!(conn.is_registered_as_observer(&key));
}
#[test]
fn test_deregister_observer() {
let mut conn = Store::open("").unwrap();
let key = "Test Observer".to_string();
let tx_observer = TxObserver::new(BTreeSet::new(), move |_obs_key, _batch| {});
conn.register_observer(key.clone(), Arc::new(tx_observer));
assert!(conn.is_registered_as_observer(&key));
conn.unregister_observer(&key);
assert!(!conn.is_registered_as_observer(&key));
}
fn add_schema(conn: &mut Store) {
// transact some schema
let mut in_progress = conn.begin_transaction().expect("expected in progress");
in_progress
.ensure_vocabulary(&Definition::new(
kw!(:todo/items),
1,
vec![
(
kw!(:todo/uuid),
AttributeBuilder::helpful()
.value_type(ValueType::Uuid)
.multival(false)
.unique(Unique::Value)
.index(true)
.build(),
),
(
kw!(:todo/name),
AttributeBuilder::helpful()
.value_type(ValueType::String)
.multival(false)
.fulltext(true)
.build(),
),
(
kw!(:todo/completion_date),
AttributeBuilder::helpful()
.value_type(ValueType::Instant)
.multival(false)
.build(),
),
(
kw!(:label/name),
AttributeBuilder::helpful()
.value_type(ValueType::String)
.multival(false)
.unique(Unique::Value)
.fulltext(true)
.index(true)
.build(),
),
(
kw!(:label/color),
AttributeBuilder::helpful()
.value_type(ValueType::String)
.multival(false)
.build(),
),
],
))
.expect("expected vocubulary");
in_progress.commit().expect("Expected vocabulary committed");
}
#[derive(Default, Debug)]
struct ObserverOutput {
txids: Vec<i64>,
changes: Vec<BTreeSet<i64>>,
called_key: Option<String>,
}
#[test]
fn test_observer_notified_on_registered_change() {
let mut conn = Store::open("").unwrap();
add_schema(&mut conn);
let name_entid: Entid = conn
.conn()
.current_schema()
.get_entid(&kw!(:todo/name))
.expect("entid to exist for name")
.into();
let date_entid: Entid = conn
.conn()
.current_schema()
.get_entid(&kw!(:todo/completion_date))
.expect("entid to exist for completion_date")
.into();
let mut registered_attrs = BTreeSet::new();
registered_attrs.insert(name_entid.clone());
registered_attrs.insert(date_entid.clone());
let key = "Test Observing".to_string();
let output = Arc::new(Mutex::new(ObserverOutput::default()));
let mut_output = Arc::downgrade(&output);
let (tx, rx): (mpsc::Sender<()>, mpsc::Receiver<()>) = mpsc::channel();
// because the TxObserver is in an Arc and is therefore Sync, we have to wrap the Sender in a Mutex to also
// make it Sync.
let thread_tx = Mutex::new(tx);
let tx_observer = Arc::new(TxObserver::new(registered_attrs, move |obs_key, batch| {
if let Some(out) = mut_output.upgrade() {
let mut o = out.lock().unwrap();
o.called_key = Some(obs_key.to_string());
for (tx_id, changes) in batch.into_iter() {
o.txids.push(*tx_id);
o.changes.push(changes.clone());
}
o.txids.sort();
}
thread_tx.lock().unwrap().send(()).unwrap();
}));
conn.register_observer(key.clone(), Arc::clone(&tx_observer));
assert!(conn.is_registered_as_observer(&key));
let mut tx_ids = Vec::new();
let mut changesets = Vec::new();
let db_tx_instant_entid: Entid = conn
.conn()
.current_schema()
.get_entid(&kw!(:db/txInstant))
.expect("entid to exist for :db/txInstant")
.into();
let uuid_entid: Entid = conn
.conn()
.current_schema()
.get_entid(&kw!(:todo/uuid))
.expect("entid to exist for name")
.into();
{
let mut in_progress = conn.begin_transaction().expect("expected transaction");
for i in 0..3 {
let mut changeset = BTreeSet::new();
changeset.insert(db_tx_instant_entid.clone());
let name = format!("todo{}", i);
let uuid = Uuid::new_v4();
let mut builder = in_progress.builder().describe_tempid(&name);
builder
.add(kw!(:todo/uuid), TypedValue::Uuid(uuid))
.expect("Expected added uuid");
changeset.insert(uuid_entid.clone());
builder
.add(kw!(:todo/name), TypedValue::typed_string(name))
.expect("Expected added name");
changeset.insert(name_entid.clone());
if i % 2 == 0 {
builder
.add(kw!(:todo/completion_date), TypedValue::current_instant())
.expect("Expected added date");
changeset.insert(date_entid.clone());
}
let (ip, r) = builder.transact();
let report = r.expect("expected a report");
tx_ids.push(report.tx_id.clone());
changesets.push(changeset);
in_progress = ip;
}
let mut builder = in_progress.builder().describe_tempid("Label");
builder
.add(kw!(:label/name), TypedValue::typed_string("Label 1"))
.expect("Expected added name");
builder
.add(kw!(:label/color), TypedValue::typed_string("blue"))
.expect("Expected added color");
builder.commit().expect("expect transaction to occur");
}
let delay = Duration::from_millis(100);
let _ = rx.recv_timeout(delay);
let out = Arc::try_unwrap(output).expect("unwrapped");
let o = out.into_inner().expect("Expected an Output");
assert_eq!(o.called_key, Some(key.clone()));
assert_eq!(o.txids, tx_ids);
assert_eq!(o.changes, changesets);
}
#[test]
fn test_observer_not_notified_on_unregistered_change() {
let mut conn = Store::open("").unwrap();
add_schema(&mut conn);
let name_entid: Entid = conn
.conn()
.current_schema()
.get_entid(&kw!(:todo/name))
.expect("entid to exist for name")
.into();
let date_entid: Entid = conn
.conn()
.current_schema()
.get_entid(&kw!(:todo/completion_date))
.expect("entid to exist for completion_date")
.into();
let mut registered_attrs = BTreeSet::new();
registered_attrs.insert(name_entid.clone());
registered_attrs.insert(date_entid.clone());
let key = "Test Observing".to_string();
let output = Arc::new(Mutex::new(ObserverOutput::default()));
let mut_output = Arc::downgrade(&output);
let (tx, rx): (mpsc::Sender<()>, mpsc::Receiver<()>) = mpsc::channel();
let thread_tx = Mutex::new(tx);
let tx_observer = Arc::new(TxObserver::new(registered_attrs, move |obs_key, batch| {
if let Some(out) = mut_output.upgrade() {
let mut o = out.lock().unwrap();
o.called_key = Some(obs_key.to_string());
for (tx_id, changes) in batch.into_iter() {
o.txids.push(*tx_id);
o.changes.push(changes.clone());
}
o.txids.sort();
}
thread_tx.lock().unwrap().send(()).unwrap();
}));
conn.register_observer(key.clone(), Arc::clone(&tx_observer));
assert!(conn.is_registered_as_observer(&key));
let tx_ids = Vec::<Entid>::new();
let changesets = Vec::<BTreeSet<Entid>>::new();
{
let mut in_progress = conn.begin_transaction().expect("expected transaction");
for i in 0..3 {
let name = format!("label{}", i);
let mut builder = in_progress.builder().describe_tempid(&name);
builder
.add(kw!(:label/name), TypedValue::typed_string(name))
.expect("Expected added name");
builder
.add(kw!(:label/color), TypedValue::typed_string("blue"))
.expect("Expected added color");
let (ip, _) = builder.transact();
in_progress = ip;
}
}
let delay = Duration::from_millis(100);
let _ = rx.recv_timeout(delay);
let out = Arc::try_unwrap(output).expect("unwrapped");
let o = out.into_inner().expect("Expected an Output");
assert_eq!(o.called_key, None);
assert_eq!(o.txids, tx_ids);
assert_eq!(o.changes, changesets);
}
}