diff --git a/db/src/tx_observer.rs b/db/src/tx_observer.rs index 9d4fa3e7..2359937c 100644 --- a/db/src/tx_observer.rs +++ b/db/src/tx_observer.rs @@ -8,10 +8,19 @@ // CONDITIONS OF ANY KIND, either express or implied. See the License for the // specific language governing permissions and limitations under the License. +use std::collections::{ + VecDeque, +}; use std::sync::{ Arc, + Mutex, Weak, }; +use std::sync::mpsc::{ + channel, + Receiver, + Sender, +}; use std::thread; use indexmap::{ @@ -49,27 +58,10 @@ impl TxObserver { } } -pub trait CommandClone { - fn clone_box(&self) -> Box; +pub trait Command { + fn execute(&mut self); } -impl CommandClone for T where T: 'static + Command + Clone + Send { - fn clone_box(&self) -> Box { - Box::new(self.clone()) - } -} - -pub trait Command: CommandClone { - fn execute(&self); -} - -impl Clone for Box { - fn clone(&self) -> Box { - self.clone_box() - } -} - -#[derive(Clone)] pub struct NotifyTxObserver { key: String, reports: Vec>, @@ -87,22 +79,21 @@ impl NotifyTxObserver { } impl Command for NotifyTxObserver { - fn execute(&self) { + fn execute(&mut self) { self.observer.upgrade().map(|o| o.notify(&self.key, &self.reports)); } } -#[derive(Clone)] pub struct AsyncBatchExecutor { commands: Vec>, } impl Command for AsyncBatchExecutor { - fn execute(&self) { + fn execute(&mut self) { // need to clone to move to a new thread. - let command_queue = self.commands.clone(); - thread::spawn (move ||{ - for command in command_queue.iter() { + let command_queue = ::std::mem::replace(&mut self.commands, Vec::new()); + thread::spawn (move || { + for mut command in command_queue.into_iter() { command.execute(); } }); @@ -111,16 +102,21 @@ impl Command for AsyncBatchExecutor { pub struct TxObservationService { observers: IndexMap>, - pub command_queue: Vec>, + pub command_queue: VecDeque>, } impl TxObservationService { pub fn new() -> Self { + // let (tx, rx) = channel(); + // let worker = ThreadWorker::new(0, rx); + // thread::spawn(move || worker.main()); TxObservationService { observers: IndexMap::new(), - command_queue: Vec::new(), + command_queue: VecDeque::new(), + // sender: tx, } } + // For testing purposes pub fn is_registered(&self, key: &String) -> bool { self.observers.contains_key(key) @@ -153,14 +149,121 @@ impl TxObservationService { .iter() .filter_map(|(key, observer)| { self.command_from_reports(&key, &reports, &observer) }) .collect(); - self.command_queue.push(Box::new(AsyncBatchExecutor{ commands })); + self.command_queue.push_back(Box::new(AsyncBatchExecutor { commands })); } pub fn run(&mut self) { - for command in self.command_queue.iter() { - command.execute(); + let mut command = self.command_queue.pop_front(); + while command.is_some() { + command.map(|mut c| c.execute()); + command = self.command_queue.pop_front(); } - - self.command_queue.clear(); } } + +// impl CommandQueueObserver for TxObservationService { +// fn inserted_item(&self, command: Command) { +// command.execute(); +// } +// } + +// pub trait CommandQueueObserver: Send + 'static { +// fn inserted_item(&self, command: Command); +// } + +// struct Inner { +// command_queue: VecDeque>, +// observer: Observer, +// } + +// impl Inner { +// fn new(cq: VecDeque>, observer: Observer) -> Inner { +// Inner { +// command_queue: cq, +// observer: observer, +// } +// } +// } + +// pub struct CommandQueueHandle(Vec>); + +// impl CommandQueueHandle { +// pub fn new(num_threads: usize, observer: Observer) -> (CommandQueueHandle, VecDeque>) +// where Observer: CommandQueueObserver +// { +// let queue = VecDeque::new(); +// let inner = Arc::new(Mutex::new(Inner::new(queue.clone(), observer))); +// let mut worker_channels = Vec::with_capacity(num_threads); + +// for i in 0..num_threads { +// let (tx, rx) = mpsc::channel(); +// worker_channels.push(tx); + +// let worker = ThreadWorker::new(i, inner.clone(), rx); +// thread::spawn(move || worker.main()); +// } +// (ViewModelHandle(worker_channels), starting_vm) +// } +// } + +// struct ThreadWorker { +// inner: Arc>>, +// thread_id: usize, +// shutdown: Receiver<()>, +// } + +// impl ThreadWorker { +// fn new(thread_id: usize, +// inner: Arc>>, +// shutdown: Receiver<()>) +// -> ThreadWorker { +// ThreadWorker { +// inner: inner, +// thread_id: thread_id, +// shutdown: shutdown, +// } +// } + +// fn main(&self) { +// let mut rng = rand::thread_rng(); +// let between = Range::new(0i32, 10); + +// loop { +// thread::sleep(Duration::from_millis(1000 + rng.gen_range(0, 3000))); + +// if self.should_shutdown() { +// println!("thread {} exiting", self.thread_id); +// return; +// } + +// let observer = self.inner.lock().unwrap(); +// let queue = self.inner.lock().unwrap().command_queue; + +// if queue.is_empty() { +// continue; +// } + +// while !queue.is_empty() { +// let cmd = queue.front(); + +// } + +// // // 20% of the time, add a new item. +// // // 10% of the time, remove an item. +// // // 70% of the time, modify an existing item. +// // match between.ind_sample(&mut rng) { +// // 0 | 1 => self.add_new_item(), +// // 2 => self.remove_existing_item(&mut rng), +// // _ => self.modify_existing_item(&mut rng), +// // } +// } +// } + +// fn should_shutdown(&self) -> bool { +// match self.shutdown.try_recv() { +// Err(mpsc::TryRecvError::Disconnected) => true, +// Err(mpsc::TryRecvError::Empty) => false, +// Ok(()) => unreachable!("thread worker channels should not be used directly"), +// } +// } +// }