From 099bde4b13e63c2146d2a7b5bcf71a12a3612008 Mon Sep 17 00:00:00 2001 From: Emily Toop Date: Tue, 13 Mar 2018 11:25:33 +0000 Subject: [PATCH] Address review comments --- db/src/tx_observer.rs | 49 +++++++++++++++++++------------------------ src/conn.rs | 6 +++--- 2 files changed, 25 insertions(+), 30 deletions(-) diff --git a/db/src/tx_observer.rs b/db/src/tx_observer.rs index 240e9ae0..9d4fa3e7 100644 --- a/db/src/tx_observer.rs +++ b/db/src/tx_observer.rs @@ -10,6 +10,7 @@ use std::sync::{ Arc, + Weak, }; use std::thread; @@ -23,34 +24,28 @@ use types::{ }; pub struct TxObserver { - notify_fn: Arc) + Send + Sync>>>, + notify_fn: Arc>) + Send + Sync>>, attributes: AttributeSet, } impl TxObserver { - pub fn new(attributes: AttributeSet, notify_fn: F) -> TxObserver where F: Fn(String, Vec) + 'static + Send + Sync { + pub fn new(attributes: AttributeSet, notify_fn: F) -> TxObserver where F: Fn(&String, &Vec>) + 'static + Send + Sync { TxObserver { - notify_fn: Arc::new(Some(Box::new(notify_fn))), + notify_fn: Arc::new(Box::new(notify_fn)), attributes, } } - pub fn applicable_reports(&self, reports: &Vec) -> Vec { + pub fn applicable_reports(&self, reports: &Vec>) -> Vec> { reports.into_iter().filter_map( |report| { - if self.attributes.intersection(&report.changeset).next().is_some(){ - Some(report.clone()) - } else { - None - } + self.attributes.intersection(&report.changeset) + .next() + .and_then(|_| Some(Arc::clone(report))) }).collect() } - fn notify(&self, key: String, reports: Vec) { - if let Some(ref notify_fn) = *self.notify_fn { - (notify_fn)(key, reports); - } else { - eprintln!("no notify function specified for TxObserver"); - } + fn notify(&self, key: &String, reports: &Vec>) { + (*self.notify_fn)(key, reports); } } @@ -77,12 +72,12 @@ impl Clone for Box { #[derive(Clone)] pub struct NotifyTxObserver { key: String, - reports: Vec, - observer: Arc, + reports: Vec>, + observer: Weak, } impl NotifyTxObserver { - pub fn new(key: String, reports: Vec, observer: Arc) -> Self { + pub fn new(key: String, reports: Vec>, observer: Weak) -> Self { NotifyTxObserver { key, reports, @@ -93,7 +88,7 @@ impl NotifyTxObserver { impl Command for NotifyTxObserver { fn execute(&self) { - self.observer.notify(self.key.clone(), self.reports.clone()); + self.observer.upgrade().map(|o| o.notify(&self.key, &self.reports)); } } @@ -104,6 +99,7 @@ pub struct AsyncBatchExecutor { impl Command for AsyncBatchExecutor { fn execute(&self) { + // need to clone to move to a new thread. let command_queue = self.commands.clone(); thread::spawn (move ||{ for command in command_queue.iter() { @@ -113,7 +109,6 @@ impl Command for AsyncBatchExecutor { } } -#[derive(Clone)] pub struct TxObservationService { observers: IndexMap>, pub command_queue: Vec>, @@ -132,7 +127,7 @@ impl TxObservationService { } pub fn register(&mut self, key: String, observer: Arc) { - self.observers.insert(key.clone(), observer); + self.observers.insert(key, observer); } pub fn deregister(&mut self, key: &String) { @@ -143,21 +138,21 @@ impl TxObservationService { !self.observers.is_empty() } - fn command_from_reports(&self, key: &String, reports: &Vec, observer: &Arc) -> Option> { + fn command_from_reports(&self, key: &String, reports: &Vec>, observer: &Arc) -> Option> { let applicable_reports = observer.applicable_reports(reports); if !applicable_reports.is_empty() { - Some(Box::new(NotifyTxObserver::new(key.clone(), applicable_reports, Arc::clone(observer)))) + Some(Box::new(NotifyTxObserver::new(key.clone(), applicable_reports, Arc::downgrade(observer)))) } else { None } } - pub fn transaction_did_commit(&mut self, reports: Vec) { + pub fn transaction_did_commit(&mut self, reports: Vec>) { // notify all observers about their relevant transactions let commands: Vec> = self.observers - .iter() - .filter_map(|(key, observer)| { self.command_from_reports(&key, &reports, &observer) }) - .collect(); + .iter() + .filter_map(|(key, observer)| { self.command_from_reports(&key, &reports, &observer) }) + .collect(); self.command_queue.push(Box::new(AsyncBatchExecutor{ commands })); } diff --git a/src/conn.rs b/src/conn.rs index 1bd666a0..00b8cf72 100644 --- a/src/conn.rs +++ b/src/conn.rs @@ -207,7 +207,7 @@ pub struct InProgress<'a, 'c> { schema: Schema, cache: InProgressSQLiteAttributeCache, use_caching: bool, - tx_reports: Vec, + tx_reports: Vec>, observer_service: Option<&'a Mutex>, } @@ -377,7 +377,7 @@ impl<'a, 'c> InProgress<'a, 'c> { self.cache.transact_watcher(), terms, tempid_set)?; - self.tx_reports.push(report.clone()); + self.tx_reports.push(Arc::new(report.clone())); self.partition_map = next_partition_map; if let Some(schema) = next_schema { self.schema = schema; @@ -401,7 +401,7 @@ impl<'a, 'c> InProgress<'a, 'c> { &self.schema, self.cache.transact_watcher(), entities)?; - self.tx_reports.push(report.clone()); + self.tx_reports.push(Arc::new(report.clone())); self.partition_map = next_partition_map; if let Some(schema) = next_schema {