Command Queue Executor to watch for new commands and execute on longer running background thread
This commit is contained in:
parent
42329df63b
commit
352c16425d
2 changed files with 80 additions and 33 deletions
|
@ -8,13 +8,16 @@
|
||||||
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
|
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
|
||||||
// specific language governing permissions and limitations under the License.
|
// specific language governing permissions and limitations under the License.
|
||||||
|
|
||||||
use std::collections::{
|
|
||||||
VecDeque,
|
|
||||||
};
|
|
||||||
use std::sync::{
|
use std::sync::{
|
||||||
Arc,
|
Arc,
|
||||||
Weak,
|
Weak,
|
||||||
};
|
};
|
||||||
|
use std::sync::mpsc::{
|
||||||
|
channel,
|
||||||
|
Receiver,
|
||||||
|
RecvError,
|
||||||
|
Sender,
|
||||||
|
};
|
||||||
use std::thread;
|
use std::thread;
|
||||||
|
|
||||||
use indexmap::{
|
use indexmap::{
|
||||||
|
@ -56,46 +59,45 @@ pub trait Command {
|
||||||
fn execute(&mut self);
|
fn execute(&mut self);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct AsyncTxExecutor {
|
pub struct TxCommand {
|
||||||
reports: Vec<TxReport>,
|
reports: Vec<TxReport>,
|
||||||
observers: Weak<IndexMap<String, Arc<TxObserver>>>,
|
observers: Weak<IndexMap<String, Arc<TxObserver>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl AsyncTxExecutor {
|
impl TxCommand {
|
||||||
fn new(observers: &Arc<IndexMap<String, Arc<TxObserver>>>, reports: Vec<TxReport>) -> Self {
|
fn new(observers: &Arc<IndexMap<String, Arc<TxObserver>>>, reports: Vec<TxReport>) -> Self {
|
||||||
AsyncTxExecutor {
|
TxCommand {
|
||||||
reports,
|
reports,
|
||||||
observers: Arc::downgrade(observers),
|
observers: Arc::downgrade(observers),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Command for AsyncTxExecutor {
|
impl Command for TxCommand {
|
||||||
|
|
||||||
fn execute(&mut self) {
|
fn execute(&mut self) {
|
||||||
let reports = ::std::mem::replace(&mut self.reports, Vec::new());
|
self.observers.upgrade().map(|observers| {
|
||||||
let weak_observers = ::std::mem::replace(&mut self.observers, Default::default());
|
for (key, observer) in observers.iter() {
|
||||||
thread::spawn (move || {
|
let applicable_reports = observer.applicable_reports(&self.reports);
|
||||||
weak_observers.upgrade().map(|observers| {
|
if !applicable_reports.is_empty() {
|
||||||
for (key, observer) in observers.iter() {
|
|
||||||
let applicable_reports = observer.applicable_reports(&reports);
|
|
||||||
observer.notify(key.clone(), applicable_reports);
|
observer.notify(key.clone(), applicable_reports);
|
||||||
}
|
}
|
||||||
})
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct TxObservationService {
|
pub struct TxObservationService {
|
||||||
observers: Arc<IndexMap<String, Arc<TxObserver>>>,
|
observers: Arc<IndexMap<String, Arc<TxObserver>>>,
|
||||||
pub command_queue: VecDeque<Box<Command + Send>>,
|
executor: Option<Sender<Box<Command + Send>>>,
|
||||||
|
in_progress_count: i32,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TxObservationService {
|
impl TxObservationService {
|
||||||
pub fn new() -> Self {
|
pub fn new() -> Self {
|
||||||
TxObservationService {
|
TxObservationService {
|
||||||
observers: Arc::new(IndexMap::new()),
|
observers: Arc::new(IndexMap::new()),
|
||||||
command_queue: VecDeque::new(),
|
executor: None,
|
||||||
|
in_progress_count: 0,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -116,15 +118,58 @@ impl TxObservationService {
|
||||||
!self.observers.is_empty()
|
!self.observers.is_empty()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn transaction_did_commit(&mut self, reports: Vec<TxReport>) {
|
pub fn transaction_did_start(&mut self) {
|
||||||
self.command_queue.push_back(Box::new(AsyncTxExecutor::new(&self.observers, reports)));
|
self.in_progress_count += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn run(&mut self) {
|
pub fn transaction_did_commit(&mut self, reports: Vec<TxReport>) {
|
||||||
let mut command = self.command_queue.pop_front();
|
{
|
||||||
while command.is_some() {
|
let executor = self.executor.get_or_insert_with(||{
|
||||||
command.map(|mut c| c.execute());
|
let (tx, rx): (Sender<Box<Command + Send>>, Receiver<Box<Command + Send>>) = channel();
|
||||||
command = self.command_queue.pop_front();
|
let mut worker = CommandExecutor::new(rx);
|
||||||
|
|
||||||
|
thread::spawn(move || {
|
||||||
|
worker.main();
|
||||||
|
});
|
||||||
|
|
||||||
|
tx
|
||||||
|
});
|
||||||
|
|
||||||
|
let cmd = Box::new(TxCommand::new(&self.observers, reports));
|
||||||
|
executor.send(cmd).unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
self.in_progress_count -= 1;
|
||||||
|
|
||||||
|
if self.in_progress_count == 0 {
|
||||||
|
self.executor = None;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct CommandExecutor {
|
||||||
|
reciever: Receiver<Box<Command + Send>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl CommandExecutor {
|
||||||
|
fn new(rx: Receiver<Box<Command + Send>>) -> Self {
|
||||||
|
CommandExecutor {
|
||||||
|
reciever: rx,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn main(&mut self) {
|
||||||
|
loop {
|
||||||
|
match self.reciever.recv() {
|
||||||
|
Err(RecvError) => {
|
||||||
|
eprintln!("Disconnected, terminating CommandExecutor");
|
||||||
|
return
|
||||||
|
},
|
||||||
|
|
||||||
|
Ok(mut cmd) => {
|
||||||
|
cmd.execute()
|
||||||
|
},
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
20
src/conn.rs
20
src/conn.rs
|
@ -446,12 +446,6 @@ impl<'a, 'c> InProgress<'a, 'c> {
|
||||||
metadata.generation += 1;
|
metadata.generation += 1;
|
||||||
metadata.partition_map = self.partition_map;
|
metadata.partition_map = self.partition_map;
|
||||||
|
|
||||||
// let the transaction observer know that there have been some transactions committed.
|
|
||||||
if let Some(ref observer_service) = self.observer_service {
|
|
||||||
let mut os = observer_service.lock().unwrap();
|
|
||||||
os.transaction_did_commit(self.tx_reports);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Update the conn's cache if we made any changes.
|
// Update the conn's cache if we made any changes.
|
||||||
self.cache.commit_to(&mut metadata.attribute_cache);
|
self.cache.commit_to(&mut metadata.attribute_cache);
|
||||||
|
|
||||||
|
@ -463,10 +457,10 @@ impl<'a, 'c> InProgress<'a, 'c> {
|
||||||
// TODO: consider making vocabulary lookup lazy -- we won't need it much of the time.
|
// TODO: consider making vocabulary lookup lazy -- we won't need it much of the time.
|
||||||
}
|
}
|
||||||
|
|
||||||
// run any commands that we've created along the way.
|
// let the transaction observer know that there have been some transactions committed.
|
||||||
if let Some(ref observer_service) = self.observer_service {
|
if let Some(ref observer_service) = self.observer_service {
|
||||||
let mut os = observer_service.lock().unwrap();
|
let mut os = observer_service.lock().unwrap();
|
||||||
os.run();
|
os.transaction_did_commit(self.tx_reports);
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -721,6 +715,14 @@ impl Conn {
|
||||||
current.attribute_cache.clone())
|
current.attribute_cache.clone())
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let mut obs = self.tx_observer_service.lock().unwrap();
|
||||||
|
let observer_service = if obs.has_observers() {
|
||||||
|
obs.transaction_did_start();
|
||||||
|
Some(&self.tx_observer_service)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
|
||||||
Ok(InProgress {
|
Ok(InProgress {
|
||||||
mutex: &self.metadata,
|
mutex: &self.metadata,
|
||||||
transaction: tx,
|
transaction: tx,
|
||||||
|
@ -730,7 +732,7 @@ impl Conn {
|
||||||
cache: InProgressSQLiteAttributeCache::from_cache(cache_cow),
|
cache: InProgressSQLiteAttributeCache::from_cache(cache_cow),
|
||||||
use_caching: true,
|
use_caching: true,
|
||||||
tx_reports: Vec::new(),
|
tx_reports: Vec::new(),
|
||||||
observer_service: if self.tx_observer_service.lock().unwrap().has_observers() { Some(&self.tx_observer_service) } else { None },
|
observer_service: observer_service,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue