From a0c52fa425beedb9372f26218a47bc2cf1130ca9 Mon Sep 17 00:00:00 2001 From: Richard Newman Date: Wed, 14 Mar 2018 08:18:22 -0700 Subject: [PATCH] NON-WORKING: can't spawn a thread because the closure would outlive reports. --- db/src/tx_observer.rs | 56 +++++++++++++++++++++++-------------------- 1 file changed, 30 insertions(+), 26 deletions(-) diff --git a/db/src/tx_observer.rs b/db/src/tx_observer.rs index d2011294..dbaee0a5 100644 --- a/db/src/tx_observer.rs +++ b/db/src/tx_observer.rs @@ -24,27 +24,30 @@ use types::{ }; pub struct TxObserver { - notify_fn: Arc>) + Send + Sync>>, + notify_fn: Arc) + Send + Sync>>, attributes: AttributeSet, } impl TxObserver { - pub fn new(attributes: AttributeSet, notify_fn: F) -> TxObserver where F: Fn(&String, &Vec>) + 'static + Send + Sync { + pub fn new(attributes: AttributeSet, notify_fn: F) -> TxObserver where F: Fn(&String, &Vec<&TxReport>) + 'static + Send + Sync { TxObserver { notify_fn: Arc::new(Box::new(notify_fn)), attributes, } } - pub fn applicable_reports(&self, reports: &Vec>) -> Vec> { - reports.into_iter().filter_map( |report| { - self.attributes.intersection(&report.changeset) - .next() - .and_then(|_| Some(Arc::clone(report))) - }).collect() + pub fn applicable_reports<'r>(&self, reports: &'r Vec) -> Vec<&'r TxReport> { + let mut out = Vec::with_capacity(reports.len()); + for report in reports { + if self.attributes.intersection(&report.changeset).next().is_none() { + continue; + } + out.push(report); + } + out } - fn notify(&self, key: &String, reports: &Vec>) { + fn notify(&self, key: &String, reports: &Vec<&TxReport>) { (*self.notify_fn)(key, reports); } } @@ -53,14 +56,14 @@ pub trait Command { fn execute(&mut self); } -pub struct NotifyTxObserver { +pub struct NotifyTxObserver<'r> { key: String, - reports: Vec>, + reports: Vec<&'r TxReport>, observer: Weak, } -impl NotifyTxObserver { - pub fn new(key: String, reports: Vec>, observer: Weak) -> Self { +impl<'r> NotifyTxObserver<'r> { + pub fn new(key: String, reports: Vec<&'r TxReport>, observer: Weak) -> Self { NotifyTxObserver { key, reports, @@ -69,17 +72,17 @@ impl NotifyTxObserver { } } -impl Command for NotifyTxObserver { +impl<'r> Command for NotifyTxObserver<'r> { fn execute(&mut self) { self.observer.upgrade().map(|o| o.notify(&self.key, &self.reports)); } } -pub struct AsyncBatchExecutor { - commands: Vec>, +pub struct AsyncBatchExecutor<'r> { + commands: Vec>, } -impl Command for AsyncBatchExecutor { +impl<'r> Command for AsyncBatchExecutor<'r> { fn execute(&mut self) { // need to clone to move to a new thread. let command_queue = ::std::mem::replace(&mut self.commands, Vec::new()); @@ -121,8 +124,8 @@ impl TxObservationService { !self.observers.is_empty() } - fn command_from_reports(&self, key: &String, reports: &Vec>, observer: &Arc) -> Option> { - let applicable_reports = observer.applicable_reports(reports); + fn command_from_reports<'r>(&self, key: &String, reports: &'r Vec, observer: &Arc) -> Option> { + let applicable_reports: Vec<&'r TxReport> = observer.applicable_reports(reports); if !applicable_reports.is_empty() { Some(Box::new(NotifyTxObserver::new(key.clone(), applicable_reports, Arc::downgrade(observer)))) } else { @@ -130,13 +133,14 @@ impl TxObservationService { } } - pub fn transaction_did_commit(&mut self, reports: Vec>) { - // notify all observers about their relevant transactions - let commands: Vec> = self.observers - .iter() - .filter_map(|(key, observer)| { self.command_from_reports(&key, &reports, &observer) }) - .collect(); - self.command_queue.push(Box::new(AsyncBatchExecutor{ commands })); + pub fn transaction_did_commit(&mut self, reports: Vec) { + let mut commands = Vec::with_capacity(self.observers.len()); + for (key, observer) in self.observers.iter() { + if let Some(command) = self.command_from_reports(key, &reports, &observer) { + commands.push(command); + } + } + self.command_queue.push(Box::new(AsyncBatchExecutor { commands })); } pub fn run(&mut self) {