Compare commits
8 commits
master
...
rnewman/tx
Author | SHA1 | Date | |
---|---|---|---|
|
a0c52fa425 | ||
|
349d3d3990 | ||
|
099bde4b13 | ||
|
1b9c338973 | ||
|
77c91c71df | ||
|
cf510e758f | ||
|
ba08807137 | ||
|
78d632cd31 |
6 changed files with 456 additions and 17 deletions
|
@ -5,6 +5,7 @@ workspace = ".."
|
|||
|
||||
[dependencies]
|
||||
error-chain = { git = "https://github.com/rnewman/error-chain", branch = "rnewman/sync" }
|
||||
indexmap = "0.4"
|
||||
itertools = "0.7"
|
||||
lazy_static = "0.2"
|
||||
num = "0.1"
|
||||
|
|
|
@ -13,6 +13,7 @@
|
|||
|
||||
#[macro_use]
|
||||
extern crate error_chain;
|
||||
extern crate indexmap;
|
||||
extern crate itertools;
|
||||
|
||||
#[macro_use]
|
||||
|
@ -45,6 +46,7 @@ pub mod errors;
|
|||
pub mod internal_types; // pub because we need them for building entities programmatically.
|
||||
mod metadata;
|
||||
mod schema;
|
||||
pub mod tx_observer;
|
||||
mod watcher;
|
||||
mod tx;
|
||||
pub mod types;
|
||||
|
@ -85,7 +87,13 @@ pub use tx::{
|
|||
transact_terms,
|
||||
};
|
||||
|
||||
pub use tx_observer::{
|
||||
TxObservationService,
|
||||
TxObserver,
|
||||
};
|
||||
|
||||
pub use types::{
|
||||
AttributeSet,
|
||||
DB,
|
||||
PartitionMap,
|
||||
TxReport,
|
||||
|
|
16
db/src/tx.rs
16
db/src/tx.rs
|
@ -51,8 +51,9 @@ use std::collections::{
|
|||
BTreeSet,
|
||||
VecDeque,
|
||||
};
|
||||
|
||||
use std::rc::Rc;
|
||||
use std::rc::{
|
||||
Rc,
|
||||
};
|
||||
|
||||
use db;
|
||||
use db::{
|
||||
|
@ -106,6 +107,7 @@ use schema::{
|
|||
};
|
||||
use types::{
|
||||
Attribute,
|
||||
AttributeSet,
|
||||
AVPair,
|
||||
AVMap,
|
||||
Entid,
|
||||
|
@ -527,8 +529,7 @@ impl<'conn, 'a, W> Tx<'conn, 'a, W> where W: TransactWatcher {
|
|||
/// This approach is explained in https://github.com/mozilla/mentat/wiki/Transacting.
|
||||
// TODO: move this to the transactor layer.
|
||||
pub fn transact_entities<I>(&mut self, entities: I) -> Result<TxReport>
|
||||
where I: IntoIterator<Item=Entity>,
|
||||
W: TransactWatcher {
|
||||
where I: IntoIterator<Item=Entity> {
|
||||
// Pipeline stage 1: entities -> terms with tempids and lookup refs.
|
||||
let (terms_with_temp_ids_and_lookup_refs, tempid_set, lookup_ref_set) = self.entities_into_terms_with_temp_ids_and_lookup_refs(entities)?;
|
||||
|
||||
|
@ -542,8 +543,7 @@ impl<'conn, 'a, W> Tx<'conn, 'a, W> where W: TransactWatcher {
|
|||
}
|
||||
|
||||
pub fn transact_simple_terms<I>(&mut self, terms: I, tempid_set: InternSet<TempId>) -> Result<TxReport>
|
||||
where I: IntoIterator<Item=TermWithTempIds>,
|
||||
W: TransactWatcher {
|
||||
where I: IntoIterator<Item=TermWithTempIds> {
|
||||
// TODO: push these into an internal transaction report?
|
||||
let mut tempids: BTreeMap<TempId, KnownEntid> = BTreeMap::default();
|
||||
|
||||
|
@ -614,7 +614,9 @@ impl<'conn, 'a, W> Tx<'conn, 'a, W> where W: TransactWatcher {
|
|||
final_populations.allocated,
|
||||
inert_terms.into_iter().map(|term| term.unwrap()).collect()].concat();
|
||||
|
||||
|
||||
let tx_instant;
|
||||
let mut affected_attrs = AttributeSet::new();
|
||||
|
||||
{ // TODO: Don't use this block to scope borrowing the schema; instead, extract a helper function.
|
||||
|
||||
|
@ -669,6 +671,7 @@ impl<'conn, 'a, W> Tx<'conn, 'a, W> where W: TransactWatcher {
|
|||
}
|
||||
|
||||
self.watcher.datom(op, e, a, &v);
|
||||
affected_attrs.insert(a);
|
||||
|
||||
let reduced = (e, a, attribute, v, added);
|
||||
match (attribute.fulltext, attribute.multival) {
|
||||
|
@ -735,6 +738,7 @@ impl<'conn, 'a, W> Tx<'conn, 'a, W> where W: TransactWatcher {
|
|||
tx_id: self.tx_id,
|
||||
tx_instant,
|
||||
tempids: tempids,
|
||||
changeset: affected_attrs,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
152
db/src/tx_observer.rs
Normal file
152
db/src/tx_observer.rs
Normal file
|
@ -0,0 +1,152 @@
|
|||
// Copyright 2018 Mozilla
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
|
||||
// this file except in compliance with the License. You may obtain a copy of the
|
||||
// License at http://www.apache.org/licenses/LICENSE-2.0
|
||||
// Unless required by applicable law or agreed to in writing, software distributed
|
||||
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
|
||||
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations under the License.
|
||||
|
||||
use std::sync::{
|
||||
Arc,
|
||||
Weak,
|
||||
};
|
||||
use std::thread;
|
||||
|
||||
use indexmap::{
|
||||
IndexMap,
|
||||
};
|
||||
|
||||
use types::{
|
||||
AttributeSet,
|
||||
TxReport,
|
||||
};
|
||||
|
||||
pub struct TxObserver {
|
||||
notify_fn: Arc<Box<Fn(&String, &Vec<&TxReport>) + Send + Sync>>,
|
||||
attributes: AttributeSet,
|
||||
}
|
||||
|
||||
impl TxObserver {
|
||||
pub fn new<F>(attributes: AttributeSet, notify_fn: F) -> TxObserver where F: Fn(&String, &Vec<&TxReport>) + 'static + Send + Sync {
|
||||
TxObserver {
|
||||
notify_fn: Arc::new(Box::new(notify_fn)),
|
||||
attributes,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn applicable_reports<'r>(&self, reports: &'r Vec<TxReport>) -> Vec<&'r TxReport> {
|
||||
let mut out = Vec::with_capacity(reports.len());
|
||||
for report in reports {
|
||||
if self.attributes.intersection(&report.changeset).next().is_none() {
|
||||
continue;
|
||||
}
|
||||
out.push(report);
|
||||
}
|
||||
out
|
||||
}
|
||||
|
||||
fn notify(&self, key: &String, reports: &Vec<&TxReport>) {
|
||||
(*self.notify_fn)(key, reports);
|
||||
}
|
||||
}
|
||||
|
||||
pub trait Command {
|
||||
fn execute(&mut self);
|
||||
}
|
||||
|
||||
pub struct NotifyTxObserver<'r> {
|
||||
key: String,
|
||||
reports: Vec<&'r TxReport>,
|
||||
observer: Weak<TxObserver>,
|
||||
}
|
||||
|
||||
impl<'r> NotifyTxObserver<'r> {
|
||||
pub fn new(key: String, reports: Vec<&'r TxReport>, observer: Weak<TxObserver>) -> Self {
|
||||
NotifyTxObserver {
|
||||
key,
|
||||
reports,
|
||||
observer,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'r> Command for NotifyTxObserver<'r> {
|
||||
fn execute(&mut self) {
|
||||
self.observer.upgrade().map(|o| o.notify(&self.key, &self.reports));
|
||||
}
|
||||
}
|
||||
|
||||
pub struct AsyncBatchExecutor<'r> {
|
||||
commands: Vec<Box<Command + Send + 'r>>,
|
||||
}
|
||||
|
||||
impl<'r> Command for AsyncBatchExecutor<'r> {
|
||||
fn execute(&mut self) {
|
||||
// need to clone to move to a new thread.
|
||||
let command_queue = ::std::mem::replace(&mut self.commands, Vec::new());
|
||||
thread::spawn (move ||{
|
||||
for mut command in command_queue.into_iter() {
|
||||
command.execute();
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
pub struct TxObservationService {
|
||||
observers: IndexMap<String, Arc<TxObserver>>,
|
||||
pub command_queue: Vec<Box<Command + Send>>,
|
||||
}
|
||||
|
||||
impl TxObservationService {
|
||||
pub fn new() -> Self {
|
||||
TxObservationService {
|
||||
observers: IndexMap::new(),
|
||||
command_queue: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
// For testing purposes.
|
||||
pub fn is_registered(&self, key: &String) -> bool {
|
||||
self.observers.contains_key(key)
|
||||
}
|
||||
|
||||
pub fn register(&mut self, key: String, observer: Arc<TxObserver>) {
|
||||
self.observers.insert(key, observer);
|
||||
}
|
||||
|
||||
pub fn deregister(&mut self, key: &String) {
|
||||
self.observers.remove(key);
|
||||
}
|
||||
|
||||
pub fn has_observers(&self) -> bool {
|
||||
!self.observers.is_empty()
|
||||
}
|
||||
|
||||
fn command_from_reports<'r>(&self, key: &String, reports: &'r Vec<TxReport>, observer: &Arc<TxObserver>) -> Option<Box<Command + Send + 'r>> {
|
||||
let applicable_reports: Vec<&'r TxReport> = observer.applicable_reports(reports);
|
||||
if !applicable_reports.is_empty() {
|
||||
Some(Box::new(NotifyTxObserver::new(key.clone(), applicable_reports, Arc::downgrade(observer))))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
pub fn transaction_did_commit(&mut self, reports: Vec<TxReport>) {
|
||||
let mut commands = Vec::with_capacity(self.observers.len());
|
||||
for (key, observer) in self.observers.iter() {
|
||||
if let Some(command) = self.command_from_reports(key, &reports, &observer) {
|
||||
commands.push(command);
|
||||
}
|
||||
}
|
||||
self.command_queue.push(Box::new(AsyncBatchExecutor { commands }));
|
||||
}
|
||||
|
||||
pub fn run(&mut self) {
|
||||
let command_queue = ::std::mem::replace(&mut self.command_queue, Vec::new());
|
||||
for mut command in command_queue.into_iter() {
|
||||
command.execute();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -11,19 +11,22 @@
|
|||
#![allow(dead_code)]
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::collections::BTreeMap;
|
||||
use std::collections::{
|
||||
BTreeMap,
|
||||
BTreeSet,
|
||||
};
|
||||
|
||||
extern crate mentat_core;
|
||||
|
||||
pub use self::mentat_core::{
|
||||
DateTime,
|
||||
Entid,
|
||||
ValueType,
|
||||
TypedValue,
|
||||
Attribute,
|
||||
AttributeBitFlags,
|
||||
DateTime,
|
||||
Entid,
|
||||
Schema,
|
||||
TypedValue,
|
||||
Utc,
|
||||
ValueType,
|
||||
};
|
||||
|
||||
/// Represents one partition of the entid space.
|
||||
|
@ -82,6 +85,9 @@ pub type AVPair = (Entid, TypedValue);
|
|||
/// Used to resolve lookup-refs and upserts.
|
||||
pub type AVMap<'a> = HashMap<&'a AVPair, Entid>;
|
||||
|
||||
// represents a set of entids that are correspond to attributes
|
||||
pub type AttributeSet = BTreeSet<Entid>;
|
||||
|
||||
/// A transaction report summarizes an applied transaction.
|
||||
#[derive(Clone, Debug, Eq, Hash, Ord, PartialOrd, PartialEq)]
|
||||
pub struct TxReport {
|
||||
|
@ -97,4 +103,7 @@ pub struct TxReport {
|
|||
/// existing entid, or is allocated a new entid. (It is possible for multiple distinct string
|
||||
/// literal tempids to all unify to a single freshly allocated entid.)
|
||||
pub tempids: BTreeMap<String, Entid>,
|
||||
|
||||
// A set of entids for attributes that were affected inside this transaction
|
||||
pub changeset: AttributeSet,
|
||||
}
|
||||
|
|
277
src/conn.rs
277
src/conn.rs
|
@ -57,6 +57,8 @@ use mentat_db::{
|
|||
transact,
|
||||
transact_terms,
|
||||
PartitionMap,
|
||||
TxObservationService,
|
||||
TxObserver,
|
||||
TxReport,
|
||||
};
|
||||
|
||||
|
@ -140,6 +142,7 @@ pub struct Conn {
|
|||
|
||||
// TODO: maintain cache of query plans that could be shared across threads and invalidated when
|
||||
// the schema changes. #315.
|
||||
tx_observer_service: Mutex<TxObservationService>,
|
||||
}
|
||||
|
||||
/// A convenience wrapper around a single SQLite connection and a Conn. This is suitable
|
||||
|
@ -202,10 +205,10 @@ pub struct InProgress<'a, 'c> {
|
|||
generation: u64,
|
||||
partition_map: PartitionMap,
|
||||
schema: Schema,
|
||||
|
||||
cache: InProgressSQLiteAttributeCache,
|
||||
|
||||
use_caching: bool,
|
||||
tx_reports: Vec<Arc<TxReport>>,
|
||||
observer_service: Option<&'a Mutex<TxObservationService>>,
|
||||
}
|
||||
|
||||
/// Represents an in-progress set of reads to the store. Just like `InProgress`,
|
||||
|
@ -374,6 +377,7 @@ impl<'a, 'c> InProgress<'a, 'c> {
|
|||
self.cache.transact_watcher(),
|
||||
terms,
|
||||
tempid_set)?;
|
||||
self.tx_reports.push(Arc::new(report.clone()));
|
||||
self.partition_map = next_partition_map;
|
||||
if let Some(schema) = next_schema {
|
||||
self.schema = schema;
|
||||
|
@ -397,6 +401,8 @@ impl<'a, 'c> InProgress<'a, 'c> {
|
|||
&self.schema,
|
||||
self.cache.transact_watcher(),
|
||||
entities)?;
|
||||
self.tx_reports.push(Arc::new(report.clone()));
|
||||
|
||||
self.partition_map = next_partition_map;
|
||||
if let Some(schema) = next_schema {
|
||||
self.schema = schema;
|
||||
|
@ -440,6 +446,12 @@ impl<'a, 'c> InProgress<'a, 'c> {
|
|||
metadata.generation += 1;
|
||||
metadata.partition_map = self.partition_map;
|
||||
|
||||
// let the transaction observer know that there have been some transactions committed.
|
||||
if let Some(ref observer_service) = self.observer_service {
|
||||
let mut os = observer_service.lock().unwrap();
|
||||
os.transaction_did_commit(self.tx_reports);
|
||||
}
|
||||
|
||||
// Update the conn's cache if we made any changes.
|
||||
self.cache.commit_to(&mut metadata.attribute_cache);
|
||||
|
||||
|
@ -451,6 +463,12 @@ impl<'a, 'c> InProgress<'a, 'c> {
|
|||
// TODO: consider making vocabulary lookup lazy -- we won't need it much of the time.
|
||||
}
|
||||
|
||||
// run any commands that we've created along the way.
|
||||
if let Some(ref observer_service) = self.observer_service {
|
||||
let mut os = observer_service.lock().unwrap();
|
||||
os.run();
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -564,9 +582,15 @@ impl Conn {
|
|||
fn new(partition_map: PartitionMap, schema: Schema) -> Conn {
|
||||
Conn {
|
||||
metadata: Mutex::new(Metadata::new(0, partition_map, Arc::new(schema), Default::default())),
|
||||
tx_observer_service: Mutex::new(TxObservationService::new()),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub fn is_registered_as_observer(&self, key: &String) -> bool {
|
||||
self.tx_observer_service.lock().unwrap().is_registered(key)
|
||||
}
|
||||
|
||||
/// Prepare the provided SQLite handle for use as a Mentat store. Creates tables but
|
||||
/// _does not_ write the bootstrap schema. This constructor should only be used by
|
||||
/// consumers that expect to populate raw transaction data themselves.
|
||||
|
@ -705,6 +729,8 @@ impl Conn {
|
|||
schema: (*current_schema).clone(),
|
||||
cache: InProgressSQLiteAttributeCache::from_cache(cache_cow),
|
||||
use_caching: true,
|
||||
tx_reports: Vec::new(),
|
||||
observer_service: if self.tx_observer_service.lock().unwrap().has_observers() { Some(&self.tx_observer_service) } else { None },
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -786,6 +812,14 @@ impl Conn {
|
|||
},
|
||||
}
|
||||
}
|
||||
|
||||
pub fn register_observer(&mut self, key: String, observer: Arc<TxObserver>) {
|
||||
self.tx_observer_service.lock().unwrap().register(key, observer);
|
||||
}
|
||||
|
||||
pub fn unregister_observer(&mut self, key: &String) {
|
||||
self.tx_observer_service.lock().unwrap().deregister(key);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
@ -798,19 +832,25 @@ mod tests {
|
|||
use std::collections::{
|
||||
BTreeSet,
|
||||
};
|
||||
|
||||
use std::path::{
|
||||
PathBuf,
|
||||
};
|
||||
|
||||
use std::time::Instant;
|
||||
use std::time::{
|
||||
Duration,
|
||||
Instant
|
||||
};
|
||||
use std::thread;
|
||||
|
||||
use mentat_core::{
|
||||
CachedAttributes,
|
||||
TypedValue,
|
||||
};
|
||||
|
||||
use query::{
|
||||
use ::entity_builder::{
|
||||
BuildTerms,
|
||||
};
|
||||
|
||||
use ::query::{
|
||||
PreparedQuery,
|
||||
Variable,
|
||||
};
|
||||
|
@ -821,6 +861,16 @@ mod tests {
|
|||
QueryResults,
|
||||
};
|
||||
|
||||
use ::vocabulary::{
|
||||
AttributeBuilder,
|
||||
Definition,
|
||||
VersionedStore,
|
||||
};
|
||||
|
||||
use ::vocabulary::attribute::{
|
||||
Unique
|
||||
};
|
||||
|
||||
use mentat_db::USER0;
|
||||
|
||||
#[test]
|
||||
|
@ -1381,4 +1431,219 @@ mod tests {
|
|||
assert_eq!(only_two, vec![1, 3].into_iter().map(TypedValue::Long).collect());
|
||||
}
|
||||
}
|
||||
|
||||
fn test_register_observer() {
|
||||
let mut sqlite = db::new_connection("").unwrap();
|
||||
let mut conn = Conn::connect(&mut sqlite).unwrap();
|
||||
|
||||
let key = "Test Observer".to_string();
|
||||
let tx_observer = TxObserver::new(BTreeSet::new(), move |_obs_key, _batch| {});
|
||||
|
||||
conn.register_observer(key.clone(), Arc::new(tx_observer));
|
||||
assert!(conn.is_registered_as_observer(&key));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_deregister_observer() {
|
||||
let mut sqlite = db::new_connection("").unwrap();
|
||||
let mut conn = Conn::connect(&mut sqlite).unwrap();
|
||||
|
||||
let key = "Test Observer".to_string();
|
||||
|
||||
let tx_observer = TxObserver::new(BTreeSet::new(), move |_obs_key, _batch| {});
|
||||
|
||||
conn.register_observer(key.clone(), Arc::new(tx_observer));
|
||||
assert!(conn.is_registered_as_observer(&key));
|
||||
|
||||
conn.unregister_observer(&key);
|
||||
|
||||
assert!(!conn.is_registered_as_observer(&key));
|
||||
}
|
||||
|
||||
fn add_schema(conn: &mut Conn, mut sqlite: &mut rusqlite::Connection) {
|
||||
// transact some schema
|
||||
let mut in_progress = conn.begin_transaction(&mut sqlite).expect("expected in progress");
|
||||
in_progress.ensure_vocabulary(&Definition {
|
||||
name: kw!(:todo/items),
|
||||
version: 1,
|
||||
attributes: vec![
|
||||
(kw!(:todo/uuid),
|
||||
AttributeBuilder::new()
|
||||
.value_type(ValueType::Uuid)
|
||||
.multival(false)
|
||||
.unique(Unique::Value)
|
||||
.index(true)
|
||||
.build()),
|
||||
(kw!(:todo/name),
|
||||
AttributeBuilder::new()
|
||||
.value_type(ValueType::String)
|
||||
.multival(false)
|
||||
.fulltext(true)
|
||||
.build()),
|
||||
(kw!(:todo/completion_date),
|
||||
AttributeBuilder::new()
|
||||
.value_type(ValueType::Instant)
|
||||
.multival(false)
|
||||
.build()),
|
||||
(kw!(:label/name),
|
||||
AttributeBuilder::new()
|
||||
.value_type(ValueType::String)
|
||||
.multival(false)
|
||||
.unique(Unique::Value)
|
||||
.fulltext(true)
|
||||
.index(true)
|
||||
.build()),
|
||||
(kw!(:label/color),
|
||||
AttributeBuilder::new()
|
||||
.value_type(ValueType::String)
|
||||
.multival(false)
|
||||
.build()),
|
||||
],
|
||||
}).expect("expected vocubulary");
|
||||
in_progress.commit().expect("Expected vocabulary committed");
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct ObserverOutput {
|
||||
txids: Vec<i64>,
|
||||
changes: Vec<BTreeSet<i64>>,
|
||||
called_key: Option<String>,
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_observer_notified_on_registered_change() {
|
||||
let mut sqlite = db::new_connection("").unwrap();
|
||||
let mut conn = Conn::connect(&mut sqlite).unwrap();
|
||||
add_schema(&mut conn, &mut sqlite);
|
||||
|
||||
let name_entid: Entid = conn.current_schema().get_entid(&kw!(:todo/name)).expect("entid to exist for name").into();
|
||||
let date_entid: Entid = conn.current_schema().get_entid(&kw!(:todo/completion_date)).expect("entid to exist for completion_date").into();
|
||||
let mut registered_attrs = BTreeSet::new();
|
||||
registered_attrs.insert(name_entid.clone());
|
||||
registered_attrs.insert(date_entid.clone());
|
||||
|
||||
let key = "Test Observing".to_string();
|
||||
|
||||
let output = Arc::new(Mutex::new(ObserverOutput::default()));
|
||||
|
||||
let mut_output = Arc::downgrade(&output);
|
||||
let tx_observer = Arc::new(TxObserver::new(registered_attrs, move |obs_key, batch| {
|
||||
if let Some(out) = mut_output.upgrade() {
|
||||
let mut o = out.lock().unwrap();
|
||||
o.called_key = Some(obs_key.clone());
|
||||
for report in batch.iter() {
|
||||
o.txids.push(report.tx_id.clone());
|
||||
o.changes.push(report.changeset.clone());
|
||||
}
|
||||
o.txids.sort();
|
||||
}
|
||||
}));
|
||||
|
||||
conn.register_observer(key.clone(), Arc::clone(&tx_observer));
|
||||
assert!(conn.is_registered_as_observer(&key));
|
||||
|
||||
let mut tx_ids = Vec::new();
|
||||
let mut changesets = Vec::new();
|
||||
{
|
||||
let mut in_progress = conn.begin_transaction(&mut sqlite).expect("expected transaction");
|
||||
for i in 0..3 {
|
||||
let name = format!("todo{}", i);
|
||||
let uuid = Uuid::new_v4();
|
||||
let mut builder = in_progress.builder().describe_tempid(&name);
|
||||
builder.add_kw( &kw!(:todo/uuid), TypedValue::Uuid(uuid)).expect("Expected added uuid");
|
||||
builder.add_kw(&kw!(:todo/name), TypedValue::typed_string(&name)).expect("Expected added name");
|
||||
if i % 2 == 0 {
|
||||
builder.add_kw(&kw!(:todo/completion_date), TypedValue::current_instant()).expect("Expected added date");
|
||||
}
|
||||
let (ip, r) = builder.transact();
|
||||
let report = r.expect("expected a report");
|
||||
tx_ids.push(report.tx_id.clone());
|
||||
changesets.push(report.changeset.clone());
|
||||
in_progress = ip;
|
||||
}
|
||||
let mut builder = in_progress.builder().describe_tempid("Label");
|
||||
builder.add_kw(&kw!(:label/name), TypedValue::typed_string("Label 1")).expect("Expected added name");
|
||||
builder.add_kw(&kw!(:label/color), TypedValue::typed_string("blue")).expect("Expected added color");
|
||||
builder.commit().expect("expect transaction to occur");
|
||||
}
|
||||
|
||||
let delay = Duration::from_millis(100);
|
||||
thread::sleep(delay);
|
||||
|
||||
match Arc::try_unwrap(output) {
|
||||
Ok(out) => {
|
||||
let o = out.into_inner().expect("Expected an Output");
|
||||
assert_eq!(o.called_key, Some(key.clone()));
|
||||
assert_eq!(o.txids, tx_ids);
|
||||
assert_eq!(o.changes, changesets);
|
||||
},
|
||||
_ => {
|
||||
println!("Unable to unwrap output");
|
||||
assert!(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_observer_not_notified_on_unregistered_change() {
|
||||
let mut sqlite = db::new_connection("").unwrap();
|
||||
let mut conn = Conn::connect(&mut sqlite).unwrap();
|
||||
add_schema(&mut conn, &mut sqlite);
|
||||
|
||||
let name_entid: Entid = conn.current_schema().get_entid(&kw!(:todo/name)).expect("entid to exist for name").into();
|
||||
let date_entid: Entid = conn.current_schema().get_entid(&kw!(:todo/completion_date)).expect("entid to exist for completion_date").into();
|
||||
let mut registered_attrs = BTreeSet::new();
|
||||
registered_attrs.insert(name_entid.clone());
|
||||
registered_attrs.insert(date_entid.clone());
|
||||
|
||||
let key = "Test Observing".to_string();
|
||||
|
||||
let output = Arc::new(Mutex::new(ObserverOutput::default()));
|
||||
|
||||
let mut_output = Arc::downgrade(&output);
|
||||
let tx_observer = Arc::new(TxObserver::new(registered_attrs, move |obs_key, batch| {
|
||||
if let Some(out) = mut_output.upgrade() {
|
||||
let mut o = out.lock().unwrap();
|
||||
o.called_key = Some(obs_key.clone());
|
||||
for report in batch.iter() {
|
||||
o.txids.push(report.tx_id.clone());
|
||||
o.changes.push(report.changeset.clone());
|
||||
}
|
||||
o.txids.sort();
|
||||
}
|
||||
}));
|
||||
|
||||
conn.register_observer(key.clone(), Arc::clone(&tx_observer));
|
||||
assert!(conn.is_registered_as_observer(&key));
|
||||
|
||||
let tx_ids = Vec::<Entid>::new();
|
||||
let changesets = Vec::<BTreeSet<Entid>>::new();
|
||||
{
|
||||
let mut in_progress = conn.begin_transaction(&mut sqlite).expect("expected transaction");
|
||||
for i in 0..3 {
|
||||
let name = format!("label{}", i);
|
||||
let mut builder = in_progress.builder().describe_tempid(&name);
|
||||
builder.add_kw(&kw!(:label/name), TypedValue::typed_string(&name)).expect("Expected added name");
|
||||
builder.add_kw(&kw!(:label/color), TypedValue::typed_string("blue")).expect("Expected added color");
|
||||
let (ip, _) = builder.transact();
|
||||
in_progress = ip;
|
||||
}
|
||||
}
|
||||
|
||||
let delay = Duration::from_millis(100);
|
||||
thread::sleep(delay);
|
||||
|
||||
match Arc::try_unwrap(output) {
|
||||
Ok(out) => {
|
||||
let o = out.into_inner().expect("Expected an Output");
|
||||
assert_eq!(o.called_key, None);
|
||||
assert_eq!(o.txids, tx_ids);
|
||||
assert_eq!(o.changes, changesets);
|
||||
},
|
||||
_ => {
|
||||
println!("Unable to unwrap output");
|
||||
assert!(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue