diff --git a/src/conn.rs b/src/conn.rs index ce653ffd..a76d7772 100644 --- a/src/conn.rs +++ b/src/conn.rs @@ -593,6 +593,11 @@ impl Conn { } } + #[cfg(test)] + pub fn is_registered_as_observer(&self, key: &String) -> bool { + self.tx_observer_service.lock().unwrap().is_registered(key) + } + /// Prepare the provided SQLite handle for use as a Mentat store. Creates tables but /// _does not_ write the bootstrap schema. This constructor should only be used by /// consumers that expect to populate raw transaction data themselves. @@ -834,19 +839,25 @@ mod tests { use std::collections::{ BTreeSet, }; - use std::path::{ PathBuf, }; - - use std::time::Instant; + use std::time::{ + Duration, + Instant + }; + use std::thread; use mentat_core::{ CachedAttributes, TypedValue, }; - use query::{ + use ::entity_builder::{ + BuildTerms, + }; + + use ::query::{ PreparedQuery, Variable, }; @@ -857,6 +868,16 @@ mod tests { QueryResults, }; + use ::vocabulary::{ + AttributeBuilder, + Definition, + VersionedStore, + }; + + use ::vocabulary::attribute::{ + Unique + }; + use mentat_db::USER0; #[test] @@ -1417,4 +1438,219 @@ mod tests { assert_eq!(only_two, vec![1, 3].into_iter().map(TypedValue::Long).collect()); } } + + fn test_register_observer() { + let mut sqlite = db::new_connection("").unwrap(); + let mut conn = Conn::connect(&mut sqlite).unwrap(); + + let key = "Test Observer".to_string(); + let tx_observer = TxObserver::new(BTreeSet::new(), move |_obs_key, _batch| {}); + + conn.register_observer(key.clone(), Arc::new(tx_observer)); + assert!(conn.is_registered_as_observer(&key)); + } + + #[test] + fn test_deregister_observer() { + let mut sqlite = db::new_connection("").unwrap(); + let mut conn = Conn::connect(&mut sqlite).unwrap(); + + let key = "Test Observer".to_string(); + + let tx_observer = TxObserver::new(BTreeSet::new(), move |_obs_key, _batch| {}); + + conn.register_observer(key.clone(), Arc::new(tx_observer)); + assert!(conn.is_registered_as_observer(&key)); + + conn.unregister_observer(&key); + + assert!(!conn.is_registered_as_observer(&key)); + } + + fn add_schema(conn: &mut Conn, mut sqlite: &mut rusqlite::Connection) { + // transact some schema + let mut in_progress = conn.begin_transaction(&mut sqlite).expect("expected in progress"); + in_progress.ensure_vocabulary(&Definition { + name: kw!(:todo/items), + version: 1, + attributes: vec![ + (kw!(:todo/uuid), + AttributeBuilder::new() + .value_type(ValueType::Uuid) + .multival(false) + .unique(Unique::Value) + .index(true) + .build()), + (kw!(:todo/name), + AttributeBuilder::new() + .value_type(ValueType::String) + .multival(false) + .fulltext(true) + .build()), + (kw!(:todo/completion_date), + AttributeBuilder::new() + .value_type(ValueType::Instant) + .multival(false) + .build()), + (kw!(:label/name), + AttributeBuilder::new() + .value_type(ValueType::String) + .multival(false) + .unique(Unique::Value) + .fulltext(true) + .index(true) + .build()), + (kw!(:label/color), + AttributeBuilder::new() + .value_type(ValueType::String) + .multival(false) + .build()), + ], + }).expect("expected vocubulary"); + in_progress.commit().expect("Expected vocabulary committed"); + } + + #[derive(Default)] + struct ObserverOutput { + txids: Vec, + changes: Vec>, + called_key: Option, + } + + #[test] + fn test_observer_notified_on_registered_change() { + let mut sqlite = db::new_connection("").unwrap(); + let mut conn = Conn::connect(&mut sqlite).unwrap(); + add_schema(&mut conn, &mut sqlite); + + let name_entid: Entid = conn.current_schema().get_entid(&kw!(:todo/name)).expect("entid to exist for name").into(); + let date_entid: Entid = conn.current_schema().get_entid(&kw!(:todo/completion_date)).expect("entid to exist for completion_date").into(); + let mut registered_attrs = BTreeSet::new(); + registered_attrs.insert(name_entid.clone()); + registered_attrs.insert(date_entid.clone()); + + let key = "Test Observing".to_string(); + + let output = Arc::new(Mutex::new(ObserverOutput::default())); + + let mut_output = Arc::downgrade(&output); + let tx_observer = Arc::new(TxObserver::new(registered_attrs, move |obs_key, batch| { + if let Some(out) = mut_output.upgrade() { + let mut o = out.lock().unwrap(); + o.called_key = Some(obs_key.clone()); + for report in batch.iter() { + o.txids.push(report.tx_id.clone()); + o.changes.push(report.changeset.clone()); + } + o.txids.sort(); + } + })); + + conn.register_observer(key.clone(), Arc::clone(&tx_observer)); + assert!(conn.is_registered_as_observer(&key)); + + let mut tx_ids = Vec::new(); + let mut changesets = Vec::new(); + { + let mut in_progress = conn.begin_transaction(&mut sqlite).expect("expected transaction"); + for i in 0..3 { + let name = format!("todo{}", i); + let uuid = Uuid::new_v4(); + let mut builder = in_progress.builder().describe_tempid(&name); + builder.add_kw( &kw!(:todo/uuid), TypedValue::Uuid(uuid)).expect("Expected added uuid"); + builder.add_kw(&kw!(:todo/name), TypedValue::typed_string(&name)).expect("Expected added name"); + if i % 2 == 0 { + builder.add_kw(&kw!(:todo/completion_date), TypedValue::current_instant()).expect("Expected added date"); + } + let (ip, r) = builder.transact(); + let report = r.expect("expected a report"); + tx_ids.push(report.tx_id.clone()); + changesets.push(report.changeset.clone()); + in_progress = ip; + } + let mut builder = in_progress.builder().describe_tempid("Label"); + builder.add_kw(&kw!(:label/name), TypedValue::typed_string("Label 1")).expect("Expected added name"); + builder.add_kw(&kw!(:label/color), TypedValue::typed_string("blue")).expect("Expected added color"); + builder.commit().expect("expect transaction to occur"); + } + + let delay = Duration::from_millis(100); + thread::sleep(delay); + + match Arc::try_unwrap(output) { + Ok(out) => { + let o = out.into_inner().expect("Expected an Output"); + assert_eq!(o.called_key, Some(key.clone())); + assert_eq!(o.txids, tx_ids); + assert_eq!(o.changes, changesets); + }, + _ => { + println!("Unable to unwrap output"); + assert!(false); + } + } + } + + #[test] + fn test_observer_not_notified_on_unregistered_change() { + let mut sqlite = db::new_connection("").unwrap(); + let mut conn = Conn::connect(&mut sqlite).unwrap(); + add_schema(&mut conn, &mut sqlite); + + let name_entid: Entid = conn.current_schema().get_entid(&kw!(:todo/name)).expect("entid to exist for name").into(); + let date_entid: Entid = conn.current_schema().get_entid(&kw!(:todo/completion_date)).expect("entid to exist for completion_date").into(); + let mut registered_attrs = BTreeSet::new(); + registered_attrs.insert(name_entid.clone()); + registered_attrs.insert(date_entid.clone()); + + let key = "Test Observing".to_string(); + + let output = Arc::new(Mutex::new(ObserverOutput::default())); + + let mut_output = Arc::downgrade(&output); + let tx_observer = Arc::new(TxObserver::new(registered_attrs, move |obs_key, batch| { + if let Some(out) = mut_output.upgrade() { + let mut o = out.lock().unwrap(); + o.called_key = Some(obs_key.clone()); + for report in batch.iter() { + o.txids.push(report.tx_id.clone()); + o.changes.push(report.changeset.clone()); + } + o.txids.sort(); + } + })); + + conn.register_observer(key.clone(), Arc::clone(&tx_observer)); + assert!(conn.is_registered_as_observer(&key)); + + let tx_ids = Vec::::new(); + let changesets = Vec::>::new(); + { + let mut in_progress = conn.begin_transaction(&mut sqlite).expect("expected transaction"); + for i in 0..3 { + let name = format!("label{}", i); + let mut builder = in_progress.builder().describe_tempid(&name); + builder.add_kw(&kw!(:label/name), TypedValue::typed_string(&name)).expect("Expected added name"); + builder.add_kw(&kw!(:label/color), TypedValue::typed_string("blue")).expect("Expected added color"); + let (ip, _) = builder.transact(); + in_progress = ip; + } + } + + let delay = Duration::from_millis(100); + thread::sleep(delay); + + match Arc::try_unwrap(output) { + Ok(out) => { + let o = out.into_inner().expect("Expected an Output"); + assert_eq!(o.called_key, None); + assert_eq!(o.txids, tx_ids); + assert_eq!(o.changes, changesets); + }, + _ => { + println!("Unable to unwrap output"); + assert!(false); + } + } + } }