Address review comments

This commit is contained in:
Emily Toop 2018-03-13 11:25:33 +00:00
parent 1b9c338973
commit 099bde4b13
2 changed files with 25 additions and 30 deletions

View file

@ -10,6 +10,7 @@
use std::sync::{ use std::sync::{
Arc, Arc,
Weak,
}; };
use std::thread; use std::thread;
@ -23,34 +24,28 @@ use types::{
}; };
pub struct TxObserver { pub struct TxObserver {
notify_fn: Arc<Option<Box<Fn(String, Vec<TxReport>) + Send + Sync>>>, notify_fn: Arc<Box<Fn(&String, &Vec<Arc<TxReport>>) + Send + Sync>>,
attributes: AttributeSet, attributes: AttributeSet,
} }
impl TxObserver { impl TxObserver {
pub fn new<F>(attributes: AttributeSet, notify_fn: F) -> TxObserver where F: Fn(String, Vec<TxReport>) + 'static + Send + Sync { pub fn new<F>(attributes: AttributeSet, notify_fn: F) -> TxObserver where F: Fn(&String, &Vec<Arc<TxReport>>) + 'static + Send + Sync {
TxObserver { TxObserver {
notify_fn: Arc::new(Some(Box::new(notify_fn))), notify_fn: Arc::new(Box::new(notify_fn)),
attributes, attributes,
} }
} }
pub fn applicable_reports(&self, reports: &Vec<TxReport>) -> Vec<TxReport> { pub fn applicable_reports(&self, reports: &Vec<Arc<TxReport>>) -> Vec<Arc<TxReport>> {
reports.into_iter().filter_map( |report| { reports.into_iter().filter_map( |report| {
if self.attributes.intersection(&report.changeset).next().is_some(){ self.attributes.intersection(&report.changeset)
Some(report.clone()) .next()
} else { .and_then(|_| Some(Arc::clone(report)))
None
}
}).collect() }).collect()
} }
fn notify(&self, key: String, reports: Vec<TxReport>) { fn notify(&self, key: &String, reports: &Vec<Arc<TxReport>>) {
if let Some(ref notify_fn) = *self.notify_fn { (*self.notify_fn)(key, reports);
(notify_fn)(key, reports);
} else {
eprintln!("no notify function specified for TxObserver");
}
} }
} }
@ -77,12 +72,12 @@ impl Clone for Box<Command + Send> {
#[derive(Clone)] #[derive(Clone)]
pub struct NotifyTxObserver { pub struct NotifyTxObserver {
key: String, key: String,
reports: Vec<TxReport>, reports: Vec<Arc<TxReport>>,
observer: Arc<TxObserver>, observer: Weak<TxObserver>,
} }
impl NotifyTxObserver { impl NotifyTxObserver {
pub fn new(key: String, reports: Vec<TxReport>, observer: Arc<TxObserver>) -> Self { pub fn new(key: String, reports: Vec<Arc<TxReport>>, observer: Weak<TxObserver>) -> Self {
NotifyTxObserver { NotifyTxObserver {
key, key,
reports, reports,
@ -93,7 +88,7 @@ impl NotifyTxObserver {
impl Command for NotifyTxObserver { impl Command for NotifyTxObserver {
fn execute(&self) { fn execute(&self) {
self.observer.notify(self.key.clone(), self.reports.clone()); self.observer.upgrade().map(|o| o.notify(&self.key, &self.reports));
} }
} }
@ -104,6 +99,7 @@ pub struct AsyncBatchExecutor {
impl Command for AsyncBatchExecutor { impl Command for AsyncBatchExecutor {
fn execute(&self) { fn execute(&self) {
// need to clone to move to a new thread.
let command_queue = self.commands.clone(); let command_queue = self.commands.clone();
thread::spawn (move ||{ thread::spawn (move ||{
for command in command_queue.iter() { for command in command_queue.iter() {
@ -113,7 +109,6 @@ impl Command for AsyncBatchExecutor {
} }
} }
#[derive(Clone)]
pub struct TxObservationService { pub struct TxObservationService {
observers: IndexMap<String, Arc<TxObserver>>, observers: IndexMap<String, Arc<TxObserver>>,
pub command_queue: Vec<Box<Command + Send>>, pub command_queue: Vec<Box<Command + Send>>,
@ -132,7 +127,7 @@ impl TxObservationService {
} }
pub fn register(&mut self, key: String, observer: Arc<TxObserver>) { pub fn register(&mut self, key: String, observer: Arc<TxObserver>) {
self.observers.insert(key.clone(), observer); self.observers.insert(key, observer);
} }
pub fn deregister(&mut self, key: &String) { pub fn deregister(&mut self, key: &String) {
@ -143,21 +138,21 @@ impl TxObservationService {
!self.observers.is_empty() !self.observers.is_empty()
} }
fn command_from_reports(&self, key: &String, reports: &Vec<TxReport>, observer: &Arc<TxObserver>) -> Option<Box<Command + Send>> { fn command_from_reports(&self, key: &String, reports: &Vec<Arc<TxReport>>, observer: &Arc<TxObserver>) -> Option<Box<Command + Send>> {
let applicable_reports = observer.applicable_reports(reports); let applicable_reports = observer.applicable_reports(reports);
if !applicable_reports.is_empty() { if !applicable_reports.is_empty() {
Some(Box::new(NotifyTxObserver::new(key.clone(), applicable_reports, Arc::clone(observer)))) Some(Box::new(NotifyTxObserver::new(key.clone(), applicable_reports, Arc::downgrade(observer))))
} else { } else {
None None
} }
} }
pub fn transaction_did_commit(&mut self, reports: Vec<TxReport>) { pub fn transaction_did_commit(&mut self, reports: Vec<Arc<TxReport>>) {
// notify all observers about their relevant transactions // notify all observers about their relevant transactions
let commands: Vec<Box<Command + Send>> = self.observers let commands: Vec<Box<Command + Send>> = self.observers
.iter() .iter()
.filter_map(|(key, observer)| { self.command_from_reports(&key, &reports, &observer) }) .filter_map(|(key, observer)| { self.command_from_reports(&key, &reports, &observer) })
.collect(); .collect();
self.command_queue.push(Box::new(AsyncBatchExecutor{ commands })); self.command_queue.push(Box::new(AsyncBatchExecutor{ commands }));
} }

View file

@ -207,7 +207,7 @@ pub struct InProgress<'a, 'c> {
schema: Schema, schema: Schema,
cache: InProgressSQLiteAttributeCache, cache: InProgressSQLiteAttributeCache,
use_caching: bool, use_caching: bool,
tx_reports: Vec<TxReport>, tx_reports: Vec<Arc<TxReport>>,
observer_service: Option<&'a Mutex<TxObservationService>>, observer_service: Option<&'a Mutex<TxObservationService>>,
} }
@ -377,7 +377,7 @@ impl<'a, 'c> InProgress<'a, 'c> {
self.cache.transact_watcher(), self.cache.transact_watcher(),
terms, terms,
tempid_set)?; tempid_set)?;
self.tx_reports.push(report.clone()); self.tx_reports.push(Arc::new(report.clone()));
self.partition_map = next_partition_map; self.partition_map = next_partition_map;
if let Some(schema) = next_schema { if let Some(schema) = next_schema {
self.schema = schema; self.schema = schema;
@ -401,7 +401,7 @@ impl<'a, 'c> InProgress<'a, 'c> {
&self.schema, &self.schema,
self.cache.transact_watcher(), self.cache.transact_watcher(),
entities)?; entities)?;
self.tx_reports.push(report.clone()); self.tx_reports.push(Arc::new(report.clone()));
self.partition_map = next_partition_map; self.partition_map = next_partition_map;
if let Some(schema) = next_schema { if let Some(schema) = next_schema {