Just use the one command, it keeping reference to all the reports for the transaction.

It will also do the observer filtering
@ -13,14 +13,8 @@ use std::collections::{
use std::sync::{
use std::sync::mpsc::{
use std::thread;
use indexmap::{
@ -33,27 +27,27 @@ use types::{
pub struct TxObserver {
notify_fn: Arc<Box<Fn(&String, &Vec<Arc<TxReport>>) + Send + Sync>>,
notify_fn: Arc<Box<Fn(String, Vec<&TxReport>) + Send + Sync>>,
attributes: AttributeSet,
impl TxObserver {
pub fn new<F>(attributes: AttributeSet, notify_fn: F) -> TxObserver where F: Fn(&String, &Vec<Arc<TxReport>>) + 'static + Send + Sync {
pub fn new<F>(attributes: AttributeSet, notify_fn: F) -> TxObserver where F: Fn(String, Vec<&TxReport>) + 'static + Send + Sync {
TxObserver {
notify_fn: Arc::new(Box::new(notify_fn)),
pub fn applicable_reports(&self, reports: &Vec<Arc<TxReport>>) -> Vec<Arc<TxReport>> {
pub fn applicable_reports<'r>(&self, reports: &'r Vec<TxReport>) -> Vec<&'r TxReport> {
reports.into_iter().filter_map( |report| {
.and_then(|_| Some(Arc::clone(report)))
.and_then(|_| Some(report))
fn notify(&self, key: &String, reports: &Vec<Arc<TxReport>>) {
fn notify(&self, key: String, reports: Vec<&TxReport>) {
(*self.notify_fn)(key, reports);
@ -62,58 +56,46 @@ pub trait Command {
fn execute(&mut self);
pub struct NotifyTxObserver {
key: String,
reports: Vec<Arc<TxReport>>,
observer: Weak<TxObserver>,
pub struct AsyncTxExecutor {
reports: Vec<TxReport>,
observers: Weak<IndexMap<String, Arc<TxObserver>>>,
impl NotifyTxObserver {
pub fn new(key: String, reports: Vec<Arc<TxReport>>, observer: Weak<TxObserver>) -> Self {
NotifyTxObserver {
impl AsyncTxExecutor {
fn new(observers: &Arc<IndexMap<String, Arc<TxObserver>>>, reports: Vec<TxReport>) -> Self {
AsyncTxExecutor {
observers: Arc::downgrade(observers),
impl Command for NotifyTxObserver {
fn execute(&mut self) {
self.observer.upgrade().map(|o| o.notify(&self.key, &self.reports));
impl Command for AsyncTxExecutor {
pub struct AsyncBatchExecutor {
commands: Vec<Box<Command + Send>>,
impl Command for AsyncBatchExecutor {
fn execute(&mut self) {
// need to clone to move to a new thread.
let command_queue = ::std::mem::replace(&mut self.commands, Vec::new());
let reports = ::std::mem::replace(&mut self.reports, Vec::new());
let weak_observers = ::std::mem::replace(&mut self.observers, Default::default());
thread::spawn (move || {
for mut command in command_queue.into_iter() {
weak_observers.upgrade().map(|observers| {
for (key, observer) in observers.iter() {
let applicable_reports = observer.applicable_reports(&reports);
observer.notify(key.clone(), applicable_reports);
pub struct TxObservationService {
observers: IndexMap<String, Arc<TxObserver>>,
observers: Arc<IndexMap<String, Arc<TxObserver>>>,
pub command_queue: VecDeque<Box<Command + Send>>,
impl TxObservationService {
pub fn new() -> Self {
// let (tx, rx) = channel();
// let worker = ThreadWorker::new(0, rx);
// thread::spawn(move || worker.main());
TxObservationService {
observers: IndexMap::new(),
observers: Arc::new(IndexMap::new()),
command_queue: VecDeque::new(),
// sender: tx,
@ -123,33 +105,19 @@ impl TxObservationService {
pub fn register(&mut self, key: String, observer: Arc<TxObserver>) {
self.observers.insert(key, observer);
Arc::make_mut(&mut self.observers).insert(key, observer);
pub fn deregister(&mut self, key: &String) {
Arc::make_mut(&mut self.observers).remove(key);
pub fn has_observers(&self) -> bool {
fn command_from_reports(&self, key: &String, reports: &Vec<Arc<TxReport>>, observer: &Arc<TxObserver>) -> Option<Box<Command + Send>> {
let applicable_reports = observer.applicable_reports(reports);
if !applicable_reports.is_empty() {
Some(Box::new(NotifyTxObserver::new(key.clone(), applicable_reports, Arc::downgrade(observer))))
} else {
pub fn transaction_did_commit(&mut self, reports: Vec<Arc<TxReport>>) {
// notify all observers about their relevant transactions
let commands: Vec<Box<Command + Send>> = self.observers
.filter_map(|(key, observer)| { self.command_from_reports(&key, &reports, &observer) })
self.command_queue.push_back(Box::new(AsyncBatchExecutor { commands }));
pub fn transaction_did_commit(&mut self, reports: Vec<TxReport>) {
self.command_queue.push_back(Box::new(AsyncTxExecutor::new(&self.observers, reports)));
pub fn run(&mut self) {
@ -160,110 +128,3 @@ impl TxObservationService {
@ -207,7 +207,7 @@ pub struct InProgress<'a, 'c> {
schema: Schema,
cache: InProgressSQLiteAttributeCache,
use_caching: bool,
tx_reports: Vec<Arc<TxReport>>,
tx_reports: Vec<TxReport>,
observer_service: Option<&'a Mutex<TxObservationService>>,
@ -377,7 +377,7 @@ impl<'a, 'c> InProgress<'a, 'c> {
self.partition_map = next_partition_map;
if let Some(schema) = next_schema {
self.schema = schema;
@ -401,7 +401,7 @@ impl<'a, 'c> InProgress<'a, 'c> {
self.partition_map = next_partition_map;
if let Some(schema) = next_schema {