From dfe433d370f7d2bc6e7a9acd0f8b9b64d130c972 Mon Sep 17 00:00:00 2001 From: Emily Toop Date: Thu, 15 Mar 2018 16:57:04 +0000 Subject: [PATCH] address review comments --- Cargo.toml | 1 + db/Cargo.toml | 1 + db/src/lib.rs | 1 + db/src/tx.rs | 1 + db/src/tx_observer.rs | 22 +++++++++++++--------- src/conn.rs | 26 ++++++++++++++++++-------- src/lib.rs | 2 ++ 7 files changed, 37 insertions(+), 17 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 3f6d558b..c622804c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,6 +29,7 @@ rustc_version = "0.2" chrono = "0.4" error-chain = { git = "https://github.com/rnewman/error-chain", branch = "rnewman/sync" } lazy_static = "0.2" +smallvec = "0.6" time = "0.1" uuid = "0.5" diff --git a/db/Cargo.toml b/db/Cargo.toml index 634d9d1a..ecbded5d 100644 --- a/db/Cargo.toml +++ b/db/Cargo.toml @@ -10,6 +10,7 @@ itertools = "0.7" lazy_static = "0.2" num = "0.1" ordered-float = "0.5" +smallvec = "0.6" time = "0.1" [dependencies.rusqlite] diff --git a/db/src/lib.rs b/db/src/lib.rs index cfa6948c..debed886 100644 --- a/db/src/lib.rs +++ b/db/src/lib.rs @@ -21,6 +21,7 @@ extern crate lazy_static; extern crate num; extern crate rusqlite; +extern crate smallvec; extern crate tabwriter; extern crate time; diff --git a/db/src/tx.rs b/db/src/tx.rs index 96fa989f..20d35bfe 100644 --- a/db/src/tx.rs +++ b/db/src/tx.rs @@ -671,6 +671,7 @@ impl<'conn, 'a, W> Tx<'conn, 'a, W> where W: TransactWatcher { } self.watcher.datom(op, e, a, &v); + // TODO: Create something like a watcher to do this for us. affected_attrs.insert(a); let reduced = (e, a, attribute, v, added); diff --git a/db/src/tx_observer.rs b/db/src/tx_observer.rs index fe74b537..456bd51a 100644 --- a/db/src/tx_observer.rs +++ b/db/src/tx_observer.rs @@ -24,33 +24,37 @@ use indexmap::{ IndexMap, }; +use smallvec::{ + SmallVec, +}; + use types::{ AttributeSet, TxReport, }; pub struct TxObserver { - notify_fn: Arc) + Send + Sync>>, + notify_fn: Arc) + Send + Sync>>, attributes: AttributeSet, } impl TxObserver { - pub fn new(attributes: AttributeSet, notify_fn: F) -> TxObserver where F: Fn(String, Vec<&TxReport>) + 'static + Send + Sync { + pub fn new(attributes: AttributeSet, notify_fn: F) -> TxObserver where F: Fn(&str, SmallVec<[&TxReport; 4]>) + 'static + Send + Sync { TxObserver { notify_fn: Arc::new(Box::new(notify_fn)), attributes, } } - pub fn applicable_reports<'r>(&self, reports: &'r Vec) -> Vec<&'r TxReport> { - reports.into_iter().filter_map( |report| { + pub fn applicable_reports<'r>(&self, reports: &'r SmallVec<[TxReport; 4]>) -> SmallVec<[&'r TxReport; 4]> { + reports.into_iter().filter_map(|report| { self.attributes.intersection(&report.changeset) .next() .and_then(|_| Some(report)) }).collect() } - fn notify(&self, key: String, reports: Vec<&TxReport>) { + fn notify(&self, key: &str, reports: SmallVec<[&TxReport; 4]>) { (*self.notify_fn)(key, reports); } } @@ -60,12 +64,12 @@ pub trait Command { } pub struct TxCommand { - reports: Vec, + reports: SmallVec<[TxReport; 4]>, observers: Weak>>, } impl TxCommand { - fn new(observers: &Arc>>, reports: Vec) -> Self { + fn new(observers: &Arc>>, reports: SmallVec<[TxReport; 4]>) -> Self { TxCommand { reports, observers: Arc::downgrade(observers), @@ -79,7 +83,7 @@ impl Command for TxCommand { for (key, observer) in observers.iter() { let applicable_reports = observer.applicable_reports(&self.reports); if !applicable_reports.is_empty() { - observer.notify(key.clone(), applicable_reports); + observer.notify(&key, applicable_reports); } } }); @@ -122,7 +126,7 @@ impl TxObservationService { self.in_progress_count += 1; } - pub fn transaction_did_commit(&mut self, reports: Vec) { + pub fn transaction_did_commit(&mut self, reports: SmallVec<[TxReport; 4]>) { { let executor = self.executor.get_or_insert_with(||{ let (tx, rx): (Sender>, Receiver>) = channel(); diff --git a/src/conn.rs b/src/conn.rs index 6e58ad0d..bf407738 100644 --- a/src/conn.rs +++ b/src/conn.rs @@ -32,6 +32,8 @@ use rusqlite::{ TransactionBehavior, }; +use smallvec::SmallVec; + use edn; use mentat_core::{ @@ -207,7 +209,8 @@ pub struct InProgress<'a, 'c> { schema: Schema, cache: InProgressSQLiteAttributeCache, use_caching: bool, - tx_reports: Vec, + // TODO: Collect txids/affected datoms in a better way + tx_reports: SmallVec<[TxReport; 4]>, observer_service: Option<&'a Mutex>, } @@ -731,7 +734,7 @@ impl Conn { schema: (*current_schema).clone(), cache: InProgressSQLiteAttributeCache::from_cache(cache_cow), use_caching: true, - tx_reports: Vec::new(), + tx_reports: SmallVec::new(), observer_service: observer_service, }) } @@ -841,7 +844,6 @@ mod tests { Duration, Instant }; - use std::thread; use mentat_core::{ CachedAttributes, @@ -1529,16 +1531,21 @@ mod tests { let output = Arc::new(Mutex::new(ObserverOutput::default())); let mut_output = Arc::downgrade(&output); + let (tx, rx): (::std::sync::mpsc::Sender<()>, ::std::sync::mpsc::Receiver<()>) = ::std::sync::mpsc::channel(); + // because the TxObserver is in an Arc and is therefore Sync, we have to wrap the Sender in a Mutex to also + // make it Sync. + let thread_tx = Mutex::new(tx); let tx_observer = Arc::new(TxObserver::new(registered_attrs, move |obs_key, batch| { if let Some(out) = mut_output.upgrade() { let mut o = out.lock().unwrap(); - o.called_key = Some(obs_key.clone()); + o.called_key = Some(obs_key.to_string()); for report in batch.iter() { o.txids.push(report.tx_id.clone()); o.changes.push(report.changeset.clone()); } o.txids.sort(); } + thread_tx.lock().unwrap().send(()).unwrap(); })); conn.register_observer(key.clone(), Arc::clone(&tx_observer)); @@ -1552,7 +1559,7 @@ mod tests { let name = format!("todo{}", i); let uuid = Uuid::new_v4(); let mut builder = in_progress.builder().describe_tempid(&name); - builder.add_kw( &kw!(:todo/uuid), TypedValue::Uuid(uuid)).expect("Expected added uuid"); + builder.add_kw(&kw!(:todo/uuid), TypedValue::Uuid(uuid)).expect("Expected added uuid"); builder.add_kw(&kw!(:todo/name), TypedValue::typed_string(&name)).expect("Expected added name"); if i % 2 == 0 { builder.add_kw(&kw!(:todo/completion_date), TypedValue::current_instant()).expect("Expected added date"); @@ -1570,7 +1577,7 @@ mod tests { } let delay = Duration::from_millis(100); - thread::sleep(delay); + let _ = rx.recv_timeout(delay); match Arc::try_unwrap(output) { Ok(out) => { @@ -1603,16 +1610,19 @@ mod tests { let output = Arc::new(Mutex::new(ObserverOutput::default())); let mut_output = Arc::downgrade(&output); + let (tx, rx): (::std::sync::mpsc::Sender<()>, ::std::sync::mpsc::Receiver<()>) = ::std::sync::mpsc::channel(); + let thread_tx = Mutex::new(tx); let tx_observer = Arc::new(TxObserver::new(registered_attrs, move |obs_key, batch| { if let Some(out) = mut_output.upgrade() { let mut o = out.lock().unwrap(); - o.called_key = Some(obs_key.clone()); + o.called_key = Some(obs_key.to_string()); for report in batch.iter() { o.txids.push(report.tx_id.clone()); o.changes.push(report.changeset.clone()); } o.txids.sort(); } + thread_tx.lock().unwrap().send(()).unwrap(); })); conn.register_observer(key.clone(), Arc::clone(&tx_observer)); @@ -1633,7 +1643,7 @@ mod tests { } let delay = Duration::from_millis(100); - thread::sleep(delay); + let _ = rx.recv_timeout(delay); match Arc::try_unwrap(output) { Ok(out) => { diff --git a/src/lib.rs b/src/lib.rs index d51eb60c..fdc28dff 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -18,6 +18,8 @@ extern crate lazy_static; extern crate rusqlite; +extern crate smallvec; + extern crate uuid; pub extern crate edn;