diff --git a/db/src/tx_observer.rs b/db/src/tx_observer.rs index 9d4fa3e7..d2011294 100644 --- a/db/src/tx_observer.rs +++ b/db/src/tx_observer.rs @@ -49,27 +49,10 @@ impl TxObserver { } } -pub trait CommandClone { - fn clone_box(&self) -> Box; +pub trait Command { + fn execute(&mut self); } -impl CommandClone for T where T: 'static + Command + Clone + Send { - fn clone_box(&self) -> Box { - Box::new(self.clone()) - } -} - -pub trait Command: CommandClone { - fn execute(&self); -} - -impl Clone for Box { - fn clone(&self) -> Box { - self.clone_box() - } -} - -#[derive(Clone)] pub struct NotifyTxObserver { key: String, reports: Vec>, @@ -87,22 +70,21 @@ impl NotifyTxObserver { } impl Command for NotifyTxObserver { - fn execute(&self) { + fn execute(&mut self) { self.observer.upgrade().map(|o| o.notify(&self.key, &self.reports)); } } -#[derive(Clone)] pub struct AsyncBatchExecutor { commands: Vec>, } impl Command for AsyncBatchExecutor { - fn execute(&self) { + fn execute(&mut self) { // need to clone to move to a new thread. - let command_queue = self.commands.clone(); + let command_queue = ::std::mem::replace(&mut self.commands, Vec::new()); thread::spawn (move ||{ - for command in command_queue.iter() { + for mut command in command_queue.into_iter() { command.execute(); } }); @@ -121,7 +103,8 @@ impl TxObservationService { command_queue: Vec::new(), } } - // For testing purposes + + // For testing purposes. pub fn is_registered(&self, key: &String) -> bool { self.observers.contains_key(key) } @@ -157,10 +140,9 @@ impl TxObservationService { } pub fn run(&mut self) { - for command in self.command_queue.iter() { + let command_queue = ::std::mem::replace(&mut self.command_queue, Vec::new()); + for mut command in command_queue.into_iter() { command.execute(); } - - self.command_queue.clear(); } }