From ab957948b4f642e9d4e5e6ac8169c8b382295573 Mon Sep 17 00:00:00 2001 From: Emily Toop Date: Mon, 19 Mar 2018 17:09:38 +0000 Subject: [PATCH] Move to using watcher. Simplify. This has a watcher collect txid -> AttributeSet mappings each time a transact occurs. On commit we retrieve those mappings and hand them over to the observer service, which filters them and packages them up for dispatch. Tidy up --- Cargo.toml | 1 - db/Cargo.toml | 1 - db/src/cache.rs | 2 +- db/src/lib.rs | 2 +- db/src/tx.rs | 9 +-- db/src/tx_observer.rs | 107 ++++++++++++++++++++------------ db/src/types.rs | 3 - db/src/watcher.rs | 4 +- src/conn.rs | 141 +++++++++++++++++++++++------------------- src/lib.rs | 2 - 10 files changed, 152 insertions(+), 120 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index c622804c..3f6d558b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,7 +29,6 @@ rustc_version = "0.2" chrono = "0.4" error-chain = { git = "https://github.com/rnewman/error-chain", branch = "rnewman/sync" } lazy_static = "0.2" -smallvec = "0.6" time = "0.1" uuid = "0.5" diff --git a/db/Cargo.toml b/db/Cargo.toml index ecbded5d..634d9d1a 100644 --- a/db/Cargo.toml +++ b/db/Cargo.toml @@ -10,7 +10,6 @@ itertools = "0.7" lazy_static = "0.2" num = "0.1" ordered-float = "0.5" -smallvec = "0.6" time = "0.1" [dependencies.rusqlite] diff --git a/db/src/cache.rs b/db/src/cache.rs index bcb3c12f..c1e1cb4c 100644 --- a/db/src/cache.rs +++ b/db/src/cache.rs @@ -1402,7 +1402,7 @@ impl<'a> TransactWatcher for InProgressCacheTransactWatcher<'a> { } } - fn done(&mut self, schema: &Schema) -> Result<()> { + fn done(&mut self, _t: &Entid, schema: &Schema) -> Result<()> { // Oh, I wish we had impl trait. Without it we have a six-line type signature if we // try to break this out as a helper function. let collected_retractions = mem::replace(&mut self.collected_retractions, Default::default()); diff --git a/db/src/lib.rs b/db/src/lib.rs index debed886..bb4d257c 100644 --- a/db/src/lib.rs +++ b/db/src/lib.rs @@ -21,7 +21,6 @@ extern crate lazy_static; extern crate num; extern crate rusqlite; -extern crate smallvec; extern crate tabwriter; extern crate time; @@ -89,6 +88,7 @@ pub use tx::{ }; pub use tx_observer::{ + InProgressObserverTransactWatcher, TxObservationService, TxObserver, }; diff --git a/db/src/tx.rs b/db/src/tx.rs index 20d35bfe..a08128f2 100644 --- a/db/src/tx.rs +++ b/db/src/tx.rs @@ -107,7 +107,6 @@ use schema::{ }; use types::{ Attribute, - AttributeSet, AVPair, AVMap, Entid, @@ -616,8 +615,6 @@ impl<'conn, 'a, W> Tx<'conn, 'a, W> where W: TransactWatcher { let tx_instant; - let mut affected_attrs = AttributeSet::new(); - { // TODO: Don't use this block to scope borrowing the schema; instead, extract a helper function. // Assertions that are :db.cardinality/one and not :db.fulltext. @@ -671,8 +668,6 @@ impl<'conn, 'a, W> Tx<'conn, 'a, W> where W: TransactWatcher { } self.watcher.datom(op, e, a, &v); - // TODO: Create something like a watcher to do this for us. - affected_attrs.insert(a); let reduced = (e, a, attribute, v, added); match (attribute.fulltext, attribute.multival) { @@ -714,7 +709,7 @@ impl<'conn, 'a, W> Tx<'conn, 'a, W> where W: TransactWatcher { } db::update_partition_map(self.store, &self.partition_map)?; - self.watcher.done(self.schema)?; + self.watcher.done(&self.tx_id, self.schema)?; if tx_might_update_metadata { // Extract changes to metadata from the store. @@ -739,7 +734,6 @@ impl<'conn, 'a, W> Tx<'conn, 'a, W> where W: TransactWatcher { tx_id: self.tx_id, tx_instant, tempids: tempids, - changeset: affected_attrs, }) } } @@ -752,7 +746,6 @@ fn start_tx<'conn, 'a, W>(conn: &'conn rusqlite::Connection, watcher: W) -> Result> where W: TransactWatcher { let tx_id = partition_map.allocate_entid(":db.part/tx"); - conn.begin_tx_application()?; Ok(Tx::new(conn, partition_map, schema_for_mutation, schema, watcher, tx_id)) diff --git a/db/src/tx_observer.rs b/db/src/tx_observer.rs index 456bd51a..1b041e96 100644 --- a/db/src/tx_observer.rs +++ b/db/src/tx_observer.rs @@ -12,49 +12,60 @@ use std::sync::{ Arc, Weak, }; + use std::sync::mpsc::{ channel, Receiver, RecvError, Sender, }; + use std::thread; use indexmap::{ IndexMap, }; -use smallvec::{ - SmallVec, +use mentat_core::{ + Entid, + Schema, + TypedValue, +}; + +use mentat_tx::entities::{ + OpType, +}; + +use errors::{ + Result, }; use types::{ AttributeSet, - TxReport, }; +use watcher::TransactWatcher; + pub struct TxObserver { - notify_fn: Arc) + Send + Sync>>, + notify_fn: Arc) + Send + Sync>>, attributes: AttributeSet, } impl TxObserver { - pub fn new(attributes: AttributeSet, notify_fn: F) -> TxObserver where F: Fn(&str, SmallVec<[&TxReport; 4]>) + 'static + Send + Sync { + pub fn new(attributes: AttributeSet, notify_fn: F) -> TxObserver where F: Fn(&str, IndexMap<&Entid, &AttributeSet>) + 'static + Send + Sync { TxObserver { notify_fn: Arc::new(Box::new(notify_fn)), attributes, } } - pub fn applicable_reports<'r>(&self, reports: &'r SmallVec<[TxReport; 4]>) -> SmallVec<[&'r TxReport; 4]> { - reports.into_iter().filter_map(|report| { - self.attributes.intersection(&report.changeset) - .next() - .and_then(|_| Some(report)) - }).collect() + pub fn applicable_reports<'r>(&self, reports: &'r IndexMap) -> IndexMap<&'r Entid, &'r AttributeSet> { + reports.into_iter() + .filter(|&(_txid, attrs)| !self.attributes.is_disjoint(attrs)) + .collect() } - fn notify(&self, key: &str, reports: SmallVec<[&TxReport; 4]>) { + fn notify(&self, key: &str, reports: IndexMap<&Entid, &AttributeSet>) { (*self.notify_fn)(key, reports); } } @@ -64,12 +75,12 @@ pub trait Command { } pub struct TxCommand { - reports: SmallVec<[TxReport; 4]>, + reports: IndexMap, observers: Weak>>, } impl TxCommand { - fn new(observers: &Arc>>, reports: SmallVec<[TxReport; 4]>) -> Self { + fn new(observers: &Arc>>, reports: IndexMap) -> Self { TxCommand { reports, observers: Arc::downgrade(observers), @@ -93,7 +104,6 @@ impl Command for TxCommand { pub struct TxObservationService { observers: Arc>>, executor: Option>>, - in_progress_count: i32, } impl TxObservationService { @@ -101,7 +111,6 @@ impl TxObservationService { TxObservationService { observers: Arc::new(IndexMap::new()), executor: None, - in_progress_count: 0, } } @@ -122,49 +131,69 @@ impl TxObservationService { !self.observers.is_empty() } - pub fn transaction_did_start(&mut self) { - self.in_progress_count += 1; - } + pub fn in_progress_did_commit(&mut self, txes: IndexMap) { + let executor = self.executor.get_or_insert_with(|| { + let (tx, rx): (Sender>, Receiver>) = channel(); + let mut worker = CommandExecutor::new(rx); - pub fn transaction_did_commit(&mut self, reports: SmallVec<[TxReport; 4]>) { - { - let executor = self.executor.get_or_insert_with(||{ - let (tx, rx): (Sender>, Receiver>) = channel(); - let mut worker = CommandExecutor::new(rx); - - thread::spawn(move || { - worker.main(); - }); - - tx + thread::spawn(move || { + worker.main(); }); - let cmd = Box::new(TxCommand::new(&self.observers, reports)); - executor.send(cmd).unwrap(); - } + tx + }); - self.in_progress_count -= 1; + let cmd = Box::new(TxCommand::new(&self.observers, txes)); + executor.send(cmd).unwrap(); + } +} - if self.in_progress_count == 0 { - self.executor = None; +impl Drop for TxObservationService { + fn drop(&mut self) { + self.executor = None; + } +} + +pub struct InProgressObserverTransactWatcher { + collected_attributes: AttributeSet, + pub txes: IndexMap, +} + +impl InProgressObserverTransactWatcher { + pub fn new() -> InProgressObserverTransactWatcher { + InProgressObserverTransactWatcher { + collected_attributes: Default::default(), + txes: Default::default(), } } } +impl TransactWatcher for InProgressObserverTransactWatcher { + fn datom(&mut self, _op: OpType, _e: Entid, a: Entid, _v: &TypedValue) { + self.collected_attributes.insert(a); + } + + fn done(&mut self, t: &Entid, _schema: &Schema) -> Result<()> { + let collected_attributes = ::std::mem::replace(&mut self.collected_attributes, Default::default()); + self.txes.insert(*t, collected_attributes); + Ok(()) + } +} + struct CommandExecutor { - reciever: Receiver>, + receiver: Receiver>, } impl CommandExecutor { fn new(rx: Receiver>) -> Self { CommandExecutor { - reciever: rx, + receiver: rx, } } fn main(&mut self) { loop { - match self.reciever.recv() { + match self.receiver.recv() { Err(RecvError) => { eprintln!("Disconnected, terminating CommandExecutor"); return diff --git a/db/src/types.rs b/db/src/types.rs index 5e19da73..14f8b78e 100644 --- a/db/src/types.rs +++ b/db/src/types.rs @@ -103,7 +103,4 @@ pub struct TxReport { /// existing entid, or is allocated a new entid. (It is possible for multiple distinct string /// literal tempids to all unify to a single freshly allocated entid.) pub tempids: BTreeMap, - - // A set of entids for attributes that were affected inside this transaction - pub changeset: AttributeSet, } diff --git a/db/src/watcher.rs b/db/src/watcher.rs index fa22cb6c..15d8b3b7 100644 --- a/db/src/watcher.rs +++ b/db/src/watcher.rs @@ -38,7 +38,7 @@ pub trait TransactWatcher { /// Called with the schema _prior to_ the transact -- any attributes or /// attribute changes transacted during this transact are not reflected in /// the schema. - fn done(&mut self, schema: &Schema) -> Result<()>; + fn done(&mut self, t: &Entid, schema: &Schema) -> Result<()>; } pub struct NullWatcher(); @@ -47,7 +47,7 @@ impl TransactWatcher for NullWatcher { fn datom(&mut self, _op: OpType, _e: Entid, _a: Entid, _v: &TypedValue) { } - fn done(&mut self, _schema: &Schema) -> Result<()> { + fn done(&mut self, _t: &Entid, _schema: &Schema) -> Result<()> { Ok(()) } } diff --git a/src/conn.rs b/src/conn.rs index 665ea9a0..cd671f91 100644 --- a/src/conn.rs +++ b/src/conn.rs @@ -32,8 +32,6 @@ use rusqlite::{ TransactionBehavior, }; -use smallvec::SmallVec; - use edn; use mentat_core::{ @@ -50,6 +48,7 @@ use mentat_core::{ use mentat_core::intern_set::InternSet; use mentat_db::cache::{ + InProgressCacheTransactWatcher, InProgressSQLiteAttributeCache, SQLiteAttributeCache, }; @@ -58,7 +57,9 @@ use mentat_db::db; use mentat_db::{ transact, transact_terms, + InProgressObserverTransactWatcher, PartitionMap, + TransactWatcher, TxObservationService, TxObserver, TxReport, @@ -68,7 +69,10 @@ use mentat_db::internal_types::TermWithTempIds; use mentat_tx; -use mentat_tx::entities::TempId; +use mentat_tx::entities::{ + TempId, + OpType, +}; use mentat_tx_parser; @@ -216,9 +220,8 @@ pub struct InProgress<'a, 'c> { schema: Schema, cache: InProgressSQLiteAttributeCache, use_caching: bool, - // TODO: Collect txids/affected datoms in a better way - tx_reports: SmallVec<[TxReport; 4]>, - observer_service: Option<&'a Mutex>, + tx_observer: &'a Mutex, + tx_observer_watcher: InProgressObserverTransactWatcher, } /// Represents an in-progress set of reads to the store. Just like `InProgress`, @@ -379,15 +382,17 @@ impl<'a, 'c> InProgress<'a, 'c> { } pub fn transact_terms(&mut self, terms: I, tempid_set: InternSet) -> Result where I: IntoIterator { + let w = InProgressTransactWatcher::new( + &mut self.tx_observer_watcher, + self.cache.transact_watcher()); let (report, next_partition_map, next_schema, _watcher) = transact_terms(&self.transaction, self.partition_map.clone(), &self.schema, &self.schema, - self.cache.transact_watcher(), + w, terms, tempid_set)?; - self.tx_reports.push(report.clone()); self.partition_map = next_partition_map; if let Some(schema) = next_schema { self.schema = schema; @@ -404,15 +409,16 @@ impl<'a, 'c> InProgress<'a, 'c> { // `Metadata` on return. If we used `Cell` or other mechanisms, we'd be using // `Default::default` in those situations to extract the partition map, and so there // would still be some cost. + let w = InProgressTransactWatcher::new( + &mut self.tx_observer_watcher, + self.cache.transact_watcher()); let (report, next_partition_map, next_schema, _watcher) = transact(&self.transaction, self.partition_map.clone(), &self.schema, &self.schema, - self.cache.transact_watcher(), + w, entities)?; - self.tx_reports.push(report.clone()); - self.partition_map = next_partition_map; if let Some(schema) = next_schema { self.schema = schema; @@ -467,11 +473,8 @@ impl<'a, 'c> InProgress<'a, 'c> { // TODO: consider making vocabulary lookup lazy -- we won't need it much of the time. } - // let the transaction observer know that there have been some transactions committed. - if let Some(ref observer_service) = self.observer_service { - let mut os = observer_service.lock().unwrap(); - os.transaction_did_commit(self.tx_reports); - } + let txes = self.tx_observer_watcher.txes; + self.tx_observer.lock().unwrap().in_progress_did_commit(txes); Ok(()) } @@ -500,6 +503,36 @@ impl<'a, 'c> InProgress<'a, 'c> { } } +struct InProgressTransactWatcher<'a, 'o> { + cache_watcher: InProgressCacheTransactWatcher<'a>, + observer_watcher: &'o mut InProgressObserverTransactWatcher, + tx_id: Option, +} + +impl<'a, 'o> InProgressTransactWatcher<'a, 'o> { + fn new(observer_watcher: &'o mut InProgressObserverTransactWatcher, cache_watcher: InProgressCacheTransactWatcher<'a>) -> Self { + InProgressTransactWatcher { + cache_watcher: cache_watcher, + observer_watcher: observer_watcher, + tx_id: None, + } + } +} + +impl<'a, 'o> TransactWatcher for InProgressTransactWatcher<'a, 'o> { + fn datom(&mut self, op: OpType, e: Entid, a: Entid, v: &TypedValue) { + self.cache_watcher.datom(op.clone(), e.clone(), a.clone(), v); + self.observer_watcher.datom(op.clone(), e.clone(), a.clone(), v); + } + + fn done(&mut self, t: &Entid, schema: &Schema) -> ::mentat_db::errors::Result<()> { + self.cache_watcher.done(t, schema)?; + self.observer_watcher.done(t, schema)?; + self.tx_id = Some(t.clone()); + Ok(()) + } +} + impl Store { /// Intended for use from tests. pub fn sqlite_mut(&mut self) -> &mut rusqlite::Connection { @@ -725,14 +758,6 @@ impl Conn { current.attribute_cache.clone()) }; - let mut obs = self.tx_observer_service.lock().unwrap(); - let observer_service = if obs.has_observers() { - obs.transaction_did_start(); - Some(&self.tx_observer_service) - } else { - None - }; - Ok(InProgress { mutex: &self.metadata, transaction: tx, @@ -741,8 +766,8 @@ impl Conn { schema: (*current_schema).clone(), cache: InProgressSQLiteAttributeCache::from_cache(cache_cow), use_caching: true, - tx_reports: SmallVec::new(), - observer_service: observer_service, + tx_observer: &self.tx_observer_service, + tx_observer_watcher: InProgressObserverTransactWatcher::new(), }) } @@ -847,9 +872,10 @@ mod tests { use std::path::{ PathBuf, }; + use std::sync::mpsc; use std::time::{ Duration, - Instant + Instant, }; use mentat_core::{ @@ -879,7 +905,7 @@ mod tests { }; use ::vocabulary::attribute::{ - Unique + Unique, }; use mentat_db::USER0; @@ -1514,7 +1540,7 @@ mod tests { in_progress.commit().expect("Expected vocabulary committed"); } - #[derive(Default)] + #[derive(Default, Debug)] struct ObserverOutput { txids: Vec, changes: Vec>, @@ -1538,7 +1564,7 @@ mod tests { let output = Arc::new(Mutex::new(ObserverOutput::default())); let mut_output = Arc::downgrade(&output); - let (tx, rx): (::std::sync::mpsc::Sender<()>, ::std::sync::mpsc::Receiver<()>) = ::std::sync::mpsc::channel(); + let (tx, rx): (mpsc::Sender<()>, mpsc::Receiver<()>) = mpsc::channel(); // because the TxObserver is in an Arc and is therefore Sync, we have to wrap the Sender in a Mutex to also // make it Sync. let thread_tx = Mutex::new(tx); @@ -1546,9 +1572,9 @@ mod tests { if let Some(out) = mut_output.upgrade() { let mut o = out.lock().unwrap(); o.called_key = Some(obs_key.to_string()); - for report in batch.iter() { - o.txids.push(report.tx_id.clone()); - o.changes.push(report.changeset.clone()); + for (tx_id, changes) in batch.into_iter() { + o.txids.push(*tx_id); + o.changes.push(changes.clone()); } o.txids.sort(); } @@ -1560,21 +1586,26 @@ mod tests { let mut tx_ids = Vec::new(); let mut changesets = Vec::new(); + let uuid_entid: Entid = conn.current_schema().get_entid(&kw!(:todo/uuid)).expect("entid to exist for name").into(); { let mut in_progress = conn.begin_transaction(&mut sqlite).expect("expected transaction"); for i in 0..3 { + let mut changeset = BTreeSet::new(); let name = format!("todo{}", i); let uuid = Uuid::new_v4(); let mut builder = in_progress.builder().describe_tempid(&name); builder.add_kw(&kw!(:todo/uuid), TypedValue::Uuid(uuid)).expect("Expected added uuid"); + changeset.insert(uuid_entid.clone()); builder.add_kw(&kw!(:todo/name), TypedValue::typed_string(&name)).expect("Expected added name"); + changeset.insert(name_entid.clone()); if i % 2 == 0 { builder.add_kw(&kw!(:todo/completion_date), TypedValue::current_instant()).expect("Expected added date"); + changeset.insert(date_entid.clone()); } let (ip, r) = builder.transact(); let report = r.expect("expected a report"); tx_ids.push(report.tx_id.clone()); - changesets.push(report.changeset.clone()); + changesets.push(changeset); in_progress = ip; } let mut builder = in_progress.builder().describe_tempid("Label"); @@ -1586,18 +1617,11 @@ mod tests { let delay = Duration::from_millis(100); let _ = rx.recv_timeout(delay); - match Arc::try_unwrap(output) { - Ok(out) => { - let o = out.into_inner().expect("Expected an Output"); - assert_eq!(o.called_key, Some(key.clone())); - assert_eq!(o.txids, tx_ids); - assert_eq!(o.changes, changesets); - }, - _ => { - println!("Unable to unwrap output"); - assert!(false); - } - } + let out = Arc::try_unwrap(output).expect("unwrapped"); + let o = out.into_inner().expect("Expected an Output"); + assert_eq!(o.called_key, Some(key.clone())); + assert_eq!(o.txids, tx_ids); + assert_eq!(o.changes, changesets); } #[test] @@ -1617,15 +1641,15 @@ mod tests { let output = Arc::new(Mutex::new(ObserverOutput::default())); let mut_output = Arc::downgrade(&output); - let (tx, rx): (::std::sync::mpsc::Sender<()>, ::std::sync::mpsc::Receiver<()>) = ::std::sync::mpsc::channel(); + let (tx, rx): (mpsc::Sender<()>, mpsc::Receiver<()>) = mpsc::channel(); let thread_tx = Mutex::new(tx); let tx_observer = Arc::new(TxObserver::new(registered_attrs, move |obs_key, batch| { if let Some(out) = mut_output.upgrade() { let mut o = out.lock().unwrap(); o.called_key = Some(obs_key.to_string()); - for report in batch.iter() { - o.txids.push(report.tx_id.clone()); - o.changes.push(report.changeset.clone()); + for (tx_id, changes) in batch.into_iter() { + o.txids.push(*tx_id); + o.changes.push(changes.clone()); } o.txids.sort(); } @@ -1652,17 +1676,10 @@ mod tests { let delay = Duration::from_millis(100); let _ = rx.recv_timeout(delay); - match Arc::try_unwrap(output) { - Ok(out) => { - let o = out.into_inner().expect("Expected an Output"); - assert_eq!(o.called_key, None); - assert_eq!(o.txids, tx_ids); - assert_eq!(o.changes, changesets); - }, - _ => { - println!("Unable to unwrap output"); - assert!(false); - } - } + let out = Arc::try_unwrap(output).expect("unwrapped"); + let o = out.into_inner().expect("Expected an Output"); + assert_eq!(o.called_key, None); + assert_eq!(o.txids, tx_ids); + assert_eq!(o.changes, changesets); } } diff --git a/src/lib.rs b/src/lib.rs index fdc28dff..d51eb60c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -18,8 +18,6 @@ extern crate lazy_static; extern crate rusqlite; -extern crate smallvec; - extern crate uuid; pub extern crate edn;