NON-WORKING: can't spawn a thread because the closure would outlive reports.

This commit is contained in:
Richard Newman 2018-03-14 08:18:22 -07:00
parent 349d3d3990
commit a0c52fa425

View file

@ -24,27 +24,30 @@ use types::{
}; };
pub struct TxObserver { pub struct TxObserver {
notify_fn: Arc<Box<Fn(&String, &Vec<Arc<TxReport>>) + Send + Sync>>, notify_fn: Arc<Box<Fn(&String, &Vec<&TxReport>) + Send + Sync>>,
attributes: AttributeSet, attributes: AttributeSet,
} }
impl TxObserver { impl TxObserver {
pub fn new<F>(attributes: AttributeSet, notify_fn: F) -> TxObserver where F: Fn(&String, &Vec<Arc<TxReport>>) + 'static + Send + Sync { pub fn new<F>(attributes: AttributeSet, notify_fn: F) -> TxObserver where F: Fn(&String, &Vec<&TxReport>) + 'static + Send + Sync {
TxObserver { TxObserver {
notify_fn: Arc::new(Box::new(notify_fn)), notify_fn: Arc::new(Box::new(notify_fn)),
attributes, attributes,
} }
} }
pub fn applicable_reports(&self, reports: &Vec<Arc<TxReport>>) -> Vec<Arc<TxReport>> { pub fn applicable_reports<'r>(&self, reports: &'r Vec<TxReport>) -> Vec<&'r TxReport> {
reports.into_iter().filter_map( |report| { let mut out = Vec::with_capacity(reports.len());
self.attributes.intersection(&report.changeset) for report in reports {
.next() if self.attributes.intersection(&report.changeset).next().is_none() {
.and_then(|_| Some(Arc::clone(report))) continue;
}).collect() }
out.push(report);
}
out
} }
fn notify(&self, key: &String, reports: &Vec<Arc<TxReport>>) { fn notify(&self, key: &String, reports: &Vec<&TxReport>) {
(*self.notify_fn)(key, reports); (*self.notify_fn)(key, reports);
} }
} }
@ -53,14 +56,14 @@ pub trait Command {
fn execute(&mut self); fn execute(&mut self);
} }
pub struct NotifyTxObserver { pub struct NotifyTxObserver<'r> {
key: String, key: String,
reports: Vec<Arc<TxReport>>, reports: Vec<&'r TxReport>,
observer: Weak<TxObserver>, observer: Weak<TxObserver>,
} }
impl NotifyTxObserver { impl<'r> NotifyTxObserver<'r> {
pub fn new(key: String, reports: Vec<Arc<TxReport>>, observer: Weak<TxObserver>) -> Self { pub fn new(key: String, reports: Vec<&'r TxReport>, observer: Weak<TxObserver>) -> Self {
NotifyTxObserver { NotifyTxObserver {
key, key,
reports, reports,
@ -69,17 +72,17 @@ impl NotifyTxObserver {
} }
} }
impl Command for NotifyTxObserver { impl<'r> Command for NotifyTxObserver<'r> {
fn execute(&mut self) { fn execute(&mut self) {
self.observer.upgrade().map(|o| o.notify(&self.key, &self.reports)); self.observer.upgrade().map(|o| o.notify(&self.key, &self.reports));
} }
} }
pub struct AsyncBatchExecutor { pub struct AsyncBatchExecutor<'r> {
commands: Vec<Box<Command + Send>>, commands: Vec<Box<Command + Send + 'r>>,
} }
impl Command for AsyncBatchExecutor { impl<'r> Command for AsyncBatchExecutor<'r> {
fn execute(&mut self) { fn execute(&mut self) {
// need to clone to move to a new thread. // need to clone to move to a new thread.
let command_queue = ::std::mem::replace(&mut self.commands, Vec::new()); let command_queue = ::std::mem::replace(&mut self.commands, Vec::new());
@ -121,8 +124,8 @@ impl TxObservationService {
!self.observers.is_empty() !self.observers.is_empty()
} }
fn command_from_reports(&self, key: &String, reports: &Vec<Arc<TxReport>>, observer: &Arc<TxObserver>) -> Option<Box<Command + Send>> { fn command_from_reports<'r>(&self, key: &String, reports: &'r Vec<TxReport>, observer: &Arc<TxObserver>) -> Option<Box<Command + Send + 'r>> {
let applicable_reports = observer.applicable_reports(reports); let applicable_reports: Vec<&'r TxReport> = observer.applicable_reports(reports);
if !applicable_reports.is_empty() { if !applicable_reports.is_empty() {
Some(Box::new(NotifyTxObserver::new(key.clone(), applicable_reports, Arc::downgrade(observer)))) Some(Box::new(NotifyTxObserver::new(key.clone(), applicable_reports, Arc::downgrade(observer))))
} else { } else {
@ -130,13 +133,14 @@ impl TxObservationService {
} }
} }
pub fn transaction_did_commit(&mut self, reports: Vec<Arc<TxReport>>) { pub fn transaction_did_commit(&mut self, reports: Vec<TxReport>) {
// notify all observers about their relevant transactions let mut commands = Vec::with_capacity(self.observers.len());
let commands: Vec<Box<Command + Send>> = self.observers for (key, observer) in self.observers.iter() {
.iter() if let Some(command) = self.command_from_reports(key, &reports, &observer) {
.filter_map(|(key, observer)| { self.command_from_reports(&key, &reports, &observer) }) commands.push(command);
.collect(); }
self.command_queue.push(Box::new(AsyncBatchExecutor{ commands })); }
self.command_queue.push(Box::new(AsyncBatchExecutor { commands }));
} }
pub fn run(&mut self) { pub fn run(&mut self) {