use mem::replace instead of clone to get command queue into separate thread.
Remove Clone implementation from Command
This commit is contained in:
parent
099bde4b13
commit
421b7ad436
1 changed files with 135 additions and 32 deletions
|
@ -8,10 +8,19 @@
|
|||
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations under the License.
|
||||
|
||||
use std::collections::{
|
||||
VecDeque,
|
||||
};
|
||||
use std::sync::{
|
||||
Arc,
|
||||
Mutex,
|
||||
Weak,
|
||||
};
|
||||
use std::sync::mpsc::{
|
||||
channel,
|
||||
Receiver,
|
||||
Sender,
|
||||
};
|
||||
use std::thread;
|
||||
|
||||
use indexmap::{
|
||||
|
@ -49,27 +58,10 @@ impl TxObserver {
|
|||
}
|
||||
}
|
||||
|
||||
pub trait CommandClone {
|
||||
fn clone_box(&self) -> Box<Command + Send>;
|
||||
pub trait Command {
|
||||
fn execute(&mut self);
|
||||
}
|
||||
|
||||
impl<T> CommandClone for T where T: 'static + Command + Clone + Send {
|
||||
fn clone_box(&self) -> Box<Command + Send> {
|
||||
Box::new(self.clone())
|
||||
}
|
||||
}
|
||||
|
||||
pub trait Command: CommandClone {
|
||||
fn execute(&self);
|
||||
}
|
||||
|
||||
impl Clone for Box<Command + Send> {
|
||||
fn clone(&self) -> Box<Command + Send> {
|
||||
self.clone_box()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct NotifyTxObserver {
|
||||
key: String,
|
||||
reports: Vec<Arc<TxReport>>,
|
||||
|
@ -87,22 +79,21 @@ impl NotifyTxObserver {
|
|||
}
|
||||
|
||||
impl Command for NotifyTxObserver {
|
||||
fn execute(&self) {
|
||||
fn execute(&mut self) {
|
||||
self.observer.upgrade().map(|o| o.notify(&self.key, &self.reports));
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct AsyncBatchExecutor {
|
||||
commands: Vec<Box<Command + Send>>,
|
||||
}
|
||||
|
||||
impl Command for AsyncBatchExecutor {
|
||||
fn execute(&self) {
|
||||
fn execute(&mut self) {
|
||||
// need to clone to move to a new thread.
|
||||
let command_queue = self.commands.clone();
|
||||
thread::spawn (move ||{
|
||||
for command in command_queue.iter() {
|
||||
let command_queue = ::std::mem::replace(&mut self.commands, Vec::new());
|
||||
thread::spawn (move || {
|
||||
for mut command in command_queue.into_iter() {
|
||||
command.execute();
|
||||
}
|
||||
});
|
||||
|
@ -111,16 +102,21 @@ impl Command for AsyncBatchExecutor {
|
|||
|
||||
pub struct TxObservationService {
|
||||
observers: IndexMap<String, Arc<TxObserver>>,
|
||||
pub command_queue: Vec<Box<Command + Send>>,
|
||||
pub command_queue: VecDeque<Box<Command + Send>>,
|
||||
}
|
||||
|
||||
impl TxObservationService {
|
||||
pub fn new() -> Self {
|
||||
// let (tx, rx) = channel();
|
||||
// let worker = ThreadWorker::new(0, rx);
|
||||
// thread::spawn(move || worker.main());
|
||||
TxObservationService {
|
||||
observers: IndexMap::new(),
|
||||
command_queue: Vec::new(),
|
||||
command_queue: VecDeque::new(),
|
||||
// sender: tx,
|
||||
}
|
||||
}
|
||||
|
||||
// For testing purposes
|
||||
pub fn is_registered(&self, key: &String) -> bool {
|
||||
self.observers.contains_key(key)
|
||||
|
@ -153,14 +149,121 @@ impl TxObservationService {
|
|||
.iter()
|
||||
.filter_map(|(key, observer)| { self.command_from_reports(&key, &reports, &observer) })
|
||||
.collect();
|
||||
self.command_queue.push(Box::new(AsyncBatchExecutor{ commands }));
|
||||
self.command_queue.push_back(Box::new(AsyncBatchExecutor { commands }));
|
||||
}
|
||||
|
||||
pub fn run(&mut self) {
|
||||
for command in self.command_queue.iter() {
|
||||
command.execute();
|
||||
let mut command = self.command_queue.pop_front();
|
||||
while command.is_some() {
|
||||
command.map(|mut c| c.execute());
|
||||
command = self.command_queue.pop_front();
|
||||
}
|
||||
|
||||
self.command_queue.clear();
|
||||
}
|
||||
}
|
||||
|
||||
// impl CommandQueueObserver for TxObservationService {
|
||||
// fn inserted_item(&self, command: Command) {
|
||||
// command.execute();
|
||||
// }
|
||||
// }
|
||||
|
||||
// pub trait CommandQueueObserver: Send + 'static {
|
||||
// fn inserted_item(&self, command: Command);
|
||||
// }
|
||||
|
||||
// struct Inner<Observer: CommandQueueObserver> {
|
||||
// command_queue: VecDeque<Box<Command + Send>>,
|
||||
// observer: Observer,
|
||||
// }
|
||||
|
||||
// impl<Observer: CommandQueueObserver> Inner<Observer> {
|
||||
// fn new(cq: VecDeque<Box<Command + Send>>, observer: Observer) -> Inner<Observer> {
|
||||
// Inner {
|
||||
// command_queue: cq,
|
||||
// observer: observer,
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
|
||||
// pub struct CommandQueueHandle(Vec<Sender<()>>);
|
||||
|
||||
// impl CommandQueueHandle {
|
||||
// pub fn new<Observer>(num_threads: usize, observer: Observer) -> (CommandQueueHandle, VecDeque<Box<Command + Send>>)
|
||||
// where Observer: CommandQueueObserver
|
||||
// {
|
||||
// let queue = VecDeque::new();
|
||||
// let inner = Arc::new(Mutex::new(Inner::new(queue.clone(), observer)));
|
||||
// let mut worker_channels = Vec::with_capacity(num_threads);
|
||||
|
||||
// for i in 0..num_threads {
|
||||
// let (tx, rx) = mpsc::channel();
|
||||
// worker_channels.push(tx);
|
||||
|
||||
// let worker = ThreadWorker::new(i, inner.clone(), rx);
|
||||
// thread::spawn(move || worker.main());
|
||||
// }
|
||||
// (ViewModelHandle(worker_channels), starting_vm)
|
||||
// }
|
||||
// }
|
||||
|
||||
// struct ThreadWorker<Observer: CommandQueueObserver> {
|
||||
// inner: Arc<Mutex<Inner<Observer>>>,
|
||||
// thread_id: usize,
|
||||
// shutdown: Receiver<()>,
|
||||
// }
|
||||
|
||||
// impl<Observer: CommandQueueObserver> ThreadWorker<Observer> {
|
||||
// fn new(thread_id: usize,
|
||||
// inner: Arc<Mutex<Inner<Observer>>>,
|
||||
// shutdown: Receiver<()>)
|
||||
// -> ThreadWorker<Observer> {
|
||||
// ThreadWorker {
|
||||
// inner: inner,
|
||||
// thread_id: thread_id,
|
||||
// shutdown: shutdown,
|
||||
// }
|
||||
// }
|
||||
|
||||
// fn main(&self) {
|
||||
// let mut rng = rand::thread_rng();
|
||||
// let between = Range::new(0i32, 10);
|
||||
|
||||
// loop {
|
||||
// thread::sleep(Duration::from_millis(1000 + rng.gen_range(0, 3000)));
|
||||
|
||||
// if self.should_shutdown() {
|
||||
// println!("thread {} exiting", self.thread_id);
|
||||
// return;
|
||||
// }
|
||||
|
||||
// let observer = self.inner.lock().unwrap();
|
||||
// let queue = self.inner.lock().unwrap().command_queue;
|
||||
|
||||
// if queue.is_empty() {
|
||||
// continue;
|
||||
// }
|
||||
|
||||
// while !queue.is_empty() {
|
||||
// let cmd = queue.front();
|
||||
|
||||
// }
|
||||
|
||||
// // // 20% of the time, add a new item.
|
||||
// // // 10% of the time, remove an item.
|
||||
// // // 70% of the time, modify an existing item.
|
||||
// // match between.ind_sample(&mut rng) {
|
||||
// // 0 | 1 => self.add_new_item(),
|
||||
// // 2 => self.remove_existing_item(&mut rng),
|
||||
// // _ => self.modify_existing_item(&mut rng),
|
||||
// // }
|
||||
// }
|
||||
// }
|
||||
|
||||
// fn should_shutdown(&self) -> bool {
|
||||
// match self.shutdown.try_recv() {
|
||||
// Err(mpsc::TryRecvError::Disconnected) => true,
|
||||
// Err(mpsc::TryRecvError::Empty) => false,
|
||||
// Ok(()) => unreachable!("thread worker channels should not be used directly"),
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
|
|
Loading…
Reference in a new issue