Execute commands in a separate thread

Command Queue Executor to watch for new commands and execute on longer running background thread
This commit is contained in:
Emily Toop 2018-03-13 11:25:33 +00:00
parent ecc4a7a35a
commit d4365fa4cd
7 changed files with 130 additions and 104 deletions

View file

@ -29,6 +29,7 @@ rustc_version = "0.2"
chrono = "0.4" chrono = "0.4"
error-chain = { git = "https://github.com/rnewman/error-chain", branch = "rnewman/sync" } error-chain = { git = "https://github.com/rnewman/error-chain", branch = "rnewman/sync" }
lazy_static = "0.2" lazy_static = "0.2"
smallvec = "0.6"
time = "0.1" time = "0.1"
uuid = "0.5" uuid = "0.5"

View file

@ -10,6 +10,7 @@ itertools = "0.7"
lazy_static = "0.2" lazy_static = "0.2"
num = "0.1" num = "0.1"
ordered-float = "0.5" ordered-float = "0.5"
smallvec = "0.6"
time = "0.1" time = "0.1"
[dependencies.rusqlite] [dependencies.rusqlite]

View file

@ -21,6 +21,7 @@ extern crate lazy_static;
extern crate num; extern crate num;
extern crate rusqlite; extern crate rusqlite;
extern crate smallvec;
extern crate tabwriter; extern crate tabwriter;
extern crate time; extern crate time;

View file

@ -671,6 +671,7 @@ impl<'conn, 'a, W> Tx<'conn, 'a, W> where W: TransactWatcher {
} }
self.watcher.datom(op, e, a, &v); self.watcher.datom(op, e, a, &v);
// TODO: Create something like a watcher to do this for us.
affected_attrs.insert(a); affected_attrs.insert(a);
let reduced = (e, a, attribute, v, added); let reduced = (e, a, attribute, v, added);

View file

@ -10,6 +10,13 @@
use std::sync::{ use std::sync::{
Arc, Arc,
Weak,
};
use std::sync::mpsc::{
channel,
Receiver,
RecvError,
Sender,
}; };
use std::thread; use std::thread;
@ -17,155 +24,156 @@ use indexmap::{
IndexMap, IndexMap,
}; };
use smallvec::{
SmallVec,
};
use types::{ use types::{
AttributeSet, AttributeSet,
TxReport, TxReport,
}; };
pub struct TxObserver { pub struct TxObserver {
notify_fn: Arc<Option<Box<Fn(String, Vec<TxReport>) + Send + Sync>>>, notify_fn: Arc<Box<Fn(&str, SmallVec<[&TxReport; 4]>) + Send + Sync>>,
attributes: AttributeSet, attributes: AttributeSet,
} }
impl TxObserver { impl TxObserver {
pub fn new<F>(attributes: AttributeSet, notify_fn: F) -> TxObserver where F: Fn(String, Vec<TxReport>) + 'static + Send + Sync { pub fn new<F>(attributes: AttributeSet, notify_fn: F) -> TxObserver where F: Fn(&str, SmallVec<[&TxReport; 4]>) + 'static + Send + Sync {
TxObserver { TxObserver {
notify_fn: Arc::new(Some(Box::new(notify_fn))), notify_fn: Arc::new(Box::new(notify_fn)),
attributes, attributes,
} }
} }
pub fn applicable_reports(&self, reports: &Vec<TxReport>) -> Vec<TxReport> { pub fn applicable_reports<'r>(&self, reports: &'r SmallVec<[TxReport; 4]>) -> SmallVec<[&'r TxReport; 4]> {
reports.into_iter().filter_map( |report| { reports.into_iter().filter_map(|report| {
if self.attributes.intersection(&report.changeset).next().is_some(){ self.attributes.intersection(&report.changeset)
Some(report.clone()) .next()
} else { .and_then(|_| Some(report))
None
}
}).collect() }).collect()
} }
fn notify(&self, key: String, reports: Vec<TxReport>) { fn notify(&self, key: &str, reports: SmallVec<[&TxReport; 4]>) {
if let Some(ref notify_fn) = *self.notify_fn { (*self.notify_fn)(key, reports);
(notify_fn)(key, reports);
} else {
eprintln!("no notify function specified for TxObserver");
}
} }
} }
pub trait CommandClone { pub trait Command {
fn clone_box(&self) -> Box<Command + Send>; fn execute(&mut self);
} }
impl<T> CommandClone for T where T: 'static + Command + Clone + Send { pub struct TxCommand {
fn clone_box(&self) -> Box<Command + Send> { reports: SmallVec<[TxReport; 4]>,
Box::new(self.clone()) observers: Weak<IndexMap<String, Arc<TxObserver>>>,
}
} }
pub trait Command: CommandClone { impl TxCommand {
fn execute(&self); fn new(observers: &Arc<IndexMap<String, Arc<TxObserver>>>, reports: SmallVec<[TxReport; 4]>) -> Self {
} TxCommand {
impl Clone for Box<Command + Send> {
fn clone(&self) -> Box<Command + Send> {
self.clone_box()
}
}
#[derive(Clone)]
pub struct NotifyTxObserver {
key: String,
reports: Vec<TxReport>,
observer: Arc<TxObserver>,
}
impl NotifyTxObserver {
pub fn new(key: String, reports: Vec<TxReport>, observer: Arc<TxObserver>) -> Self {
NotifyTxObserver {
key,
reports, reports,
observer, observers: Arc::downgrade(observers),
} }
} }
} }
impl Command for NotifyTxObserver { impl Command for TxCommand {
fn execute(&self) { fn execute(&mut self) {
self.observer.notify(self.key.clone(), self.reports.clone()); self.observers.upgrade().map(|observers| {
} for (key, observer) in observers.iter() {
} let applicable_reports = observer.applicable_reports(&self.reports);
if !applicable_reports.is_empty() {
#[derive(Clone)] observer.notify(&key, applicable_reports);
pub struct AsyncBatchExecutor { }
commands: Vec<Box<Command + Send>>,
}
impl Command for AsyncBatchExecutor {
fn execute(&self) {
let command_queue = self.commands.clone();
thread::spawn (move ||{
for command in command_queue.iter() {
command.execute();
} }
}); });
} }
} }
#[derive(Clone)]
pub struct TxObservationService { pub struct TxObservationService {
observers: IndexMap<String, Arc<TxObserver>>, observers: Arc<IndexMap<String, Arc<TxObserver>>>,
pub command_queue: Vec<Box<Command + Send>>, executor: Option<Sender<Box<Command + Send>>>,
in_progress_count: i32,
} }
impl TxObservationService { impl TxObservationService {
pub fn new() -> Self { pub fn new() -> Self {
TxObservationService { TxObservationService {
observers: IndexMap::new(), observers: Arc::new(IndexMap::new()),
command_queue: Vec::new(), executor: None,
in_progress_count: 0,
} }
} }
// For testing purposes // For testing purposes
pub fn is_registered(&self, key: &String) -> bool { pub fn is_registered(&self, key: &String) -> bool {
self.observers.contains_key(key) self.observers.contains_key(key)
} }
pub fn register(&mut self, key: String, observer: Arc<TxObserver>) { pub fn register(&mut self, key: String, observer: Arc<TxObserver>) {
self.observers.insert(key.clone(), observer); Arc::make_mut(&mut self.observers).insert(key, observer);
} }
pub fn deregister(&mut self, key: &String) { pub fn deregister(&mut self, key: &String) {
self.observers.remove(key); Arc::make_mut(&mut self.observers).remove(key);
} }
pub fn has_observers(&self) -> bool { pub fn has_observers(&self) -> bool {
!self.observers.is_empty() !self.observers.is_empty()
} }
fn command_from_reports(&self, key: &String, reports: &Vec<TxReport>, observer: &Arc<TxObserver>) -> Option<Box<Command + Send>> { pub fn transaction_did_start(&mut self) {
let applicable_reports = observer.applicable_reports(reports); self.in_progress_count += 1;
if !applicable_reports.is_empty() {
Some(Box::new(NotifyTxObserver::new(key.clone(), applicable_reports, Arc::clone(observer))))
} else {
None
}
} }
pub fn transaction_did_commit(&mut self, reports: Vec<TxReport>) { pub fn transaction_did_commit(&mut self, reports: SmallVec<[TxReport; 4]>) {
// notify all observers about their relevant transactions {
let commands: Vec<Box<Command + Send>> = self.observers let executor = self.executor.get_or_insert_with(||{
.iter() let (tx, rx): (Sender<Box<Command + Send>>, Receiver<Box<Command + Send>>) = channel();
.filter_map(|(key, observer)| { self.command_from_reports(&key, &reports, &observer) }) let mut worker = CommandExecutor::new(rx);
.collect();
self.command_queue.push(Box::new(AsyncBatchExecutor{ commands }));
}
pub fn run(&mut self) { thread::spawn(move || {
for command in self.command_queue.iter() { worker.main();
command.execute(); });
tx
});
let cmd = Box::new(TxCommand::new(&self.observers, reports));
executor.send(cmd).unwrap();
} }
self.command_queue.clear(); self.in_progress_count -= 1;
if self.in_progress_count == 0 {
self.executor = None;
}
}
}
struct CommandExecutor {
reciever: Receiver<Box<Command + Send>>,
}
impl CommandExecutor {
fn new(rx: Receiver<Box<Command + Send>>) -> Self {
CommandExecutor {
reciever: rx,
}
}
fn main(&mut self) {
loop {
match self.reciever.recv() {
Err(RecvError) => {
eprintln!("Disconnected, terminating CommandExecutor");
return
},
Ok(mut cmd) => {
cmd.execute()
},
}
}
} }
} }

View file

@ -32,6 +32,8 @@ use rusqlite::{
TransactionBehavior, TransactionBehavior,
}; };
use smallvec::SmallVec;
use edn; use edn;
use mentat_core::{ use mentat_core::{
@ -214,7 +216,8 @@ pub struct InProgress<'a, 'c> {
schema: Schema, schema: Schema,
cache: InProgressSQLiteAttributeCache, cache: InProgressSQLiteAttributeCache,
use_caching: bool, use_caching: bool,
tx_reports: Vec<TxReport>, // TODO: Collect txids/affected datoms in a better way
tx_reports: SmallVec<[TxReport; 4]>,
observer_service: Option<&'a Mutex<TxObservationService>>, observer_service: Option<&'a Mutex<TxObservationService>>,
} }
@ -453,12 +456,6 @@ impl<'a, 'c> InProgress<'a, 'c> {
metadata.generation += 1; metadata.generation += 1;
metadata.partition_map = self.partition_map; metadata.partition_map = self.partition_map;
// let the transaction observer know that there have been some transactions committed.
if let Some(ref observer_service) = self.observer_service {
let mut os = observer_service.lock().unwrap();
os.transaction_did_commit(self.tx_reports);
}
// Update the conn's cache if we made any changes. // Update the conn's cache if we made any changes.
self.cache.commit_to(&mut metadata.attribute_cache); self.cache.commit_to(&mut metadata.attribute_cache);
@ -470,10 +467,10 @@ impl<'a, 'c> InProgress<'a, 'c> {
// TODO: consider making vocabulary lookup lazy -- we won't need it much of the time. // TODO: consider making vocabulary lookup lazy -- we won't need it much of the time.
} }
// run any commands that we've created along the way. // let the transaction observer know that there have been some transactions committed.
if let Some(ref observer_service) = self.observer_service { if let Some(ref observer_service) = self.observer_service {
let mut os = observer_service.lock().unwrap(); let mut os = observer_service.lock().unwrap();
os.run(); os.transaction_did_commit(self.tx_reports);
} }
Ok(()) Ok(())
@ -728,6 +725,14 @@ impl Conn {
current.attribute_cache.clone()) current.attribute_cache.clone())
}; };
let mut obs = self.tx_observer_service.lock().unwrap();
let observer_service = if obs.has_observers() {
obs.transaction_did_start();
Some(&self.tx_observer_service)
} else {
None
};
Ok(InProgress { Ok(InProgress {
mutex: &self.metadata, mutex: &self.metadata,
transaction: tx, transaction: tx,
@ -736,8 +741,8 @@ impl Conn {
schema: (*current_schema).clone(), schema: (*current_schema).clone(),
cache: InProgressSQLiteAttributeCache::from_cache(cache_cow), cache: InProgressSQLiteAttributeCache::from_cache(cache_cow),
use_caching: true, use_caching: true,
tx_reports: Vec::new(), tx_reports: SmallVec::new(),
observer_service: if self.tx_observer_service.lock().unwrap().has_observers() { Some(&self.tx_observer_service) } else { None }, observer_service: observer_service,
}) })
} }
@ -846,7 +851,6 @@ mod tests {
Duration, Duration,
Instant Instant
}; };
use std::thread;
use mentat_core::{ use mentat_core::{
CachedAttributes, CachedAttributes,
@ -1534,16 +1538,21 @@ mod tests {
let output = Arc::new(Mutex::new(ObserverOutput::default())); let output = Arc::new(Mutex::new(ObserverOutput::default()));
let mut_output = Arc::downgrade(&output); let mut_output = Arc::downgrade(&output);
let (tx, rx): (::std::sync::mpsc::Sender<()>, ::std::sync::mpsc::Receiver<()>) = ::std::sync::mpsc::channel();
// because the TxObserver is in an Arc and is therefore Sync, we have to wrap the Sender in a Mutex to also
// make it Sync.
let thread_tx = Mutex::new(tx);
let tx_observer = Arc::new(TxObserver::new(registered_attrs, move |obs_key, batch| { let tx_observer = Arc::new(TxObserver::new(registered_attrs, move |obs_key, batch| {
if let Some(out) = mut_output.upgrade() { if let Some(out) = mut_output.upgrade() {
let mut o = out.lock().unwrap(); let mut o = out.lock().unwrap();
o.called_key = Some(obs_key.clone()); o.called_key = Some(obs_key.to_string());
for report in batch.iter() { for report in batch.iter() {
o.txids.push(report.tx_id.clone()); o.txids.push(report.tx_id.clone());
o.changes.push(report.changeset.clone()); o.changes.push(report.changeset.clone());
} }
o.txids.sort(); o.txids.sort();
} }
thread_tx.lock().unwrap().send(()).unwrap();
})); }));
conn.register_observer(key.clone(), Arc::clone(&tx_observer)); conn.register_observer(key.clone(), Arc::clone(&tx_observer));
@ -1557,7 +1566,7 @@ mod tests {
let name = format!("todo{}", i); let name = format!("todo{}", i);
let uuid = Uuid::new_v4(); let uuid = Uuid::new_v4();
let mut builder = in_progress.builder().describe_tempid(&name); let mut builder = in_progress.builder().describe_tempid(&name);
builder.add_kw( &kw!(:todo/uuid), TypedValue::Uuid(uuid)).expect("Expected added uuid"); builder.add_kw(&kw!(:todo/uuid), TypedValue::Uuid(uuid)).expect("Expected added uuid");
builder.add_kw(&kw!(:todo/name), TypedValue::typed_string(&name)).expect("Expected added name"); builder.add_kw(&kw!(:todo/name), TypedValue::typed_string(&name)).expect("Expected added name");
if i % 2 == 0 { if i % 2 == 0 {
builder.add_kw(&kw!(:todo/completion_date), TypedValue::current_instant()).expect("Expected added date"); builder.add_kw(&kw!(:todo/completion_date), TypedValue::current_instant()).expect("Expected added date");
@ -1575,7 +1584,7 @@ mod tests {
} }
let delay = Duration::from_millis(100); let delay = Duration::from_millis(100);
thread::sleep(delay); let _ = rx.recv_timeout(delay);
match Arc::try_unwrap(output) { match Arc::try_unwrap(output) {
Ok(out) => { Ok(out) => {
@ -1608,16 +1617,19 @@ mod tests {
let output = Arc::new(Mutex::new(ObserverOutput::default())); let output = Arc::new(Mutex::new(ObserverOutput::default()));
let mut_output = Arc::downgrade(&output); let mut_output = Arc::downgrade(&output);
let (tx, rx): (::std::sync::mpsc::Sender<()>, ::std::sync::mpsc::Receiver<()>) = ::std::sync::mpsc::channel();
let thread_tx = Mutex::new(tx);
let tx_observer = Arc::new(TxObserver::new(registered_attrs, move |obs_key, batch| { let tx_observer = Arc::new(TxObserver::new(registered_attrs, move |obs_key, batch| {
if let Some(out) = mut_output.upgrade() { if let Some(out) = mut_output.upgrade() {
let mut o = out.lock().unwrap(); let mut o = out.lock().unwrap();
o.called_key = Some(obs_key.clone()); o.called_key = Some(obs_key.to_string());
for report in batch.iter() { for report in batch.iter() {
o.txids.push(report.tx_id.clone()); o.txids.push(report.tx_id.clone());
o.changes.push(report.changeset.clone()); o.changes.push(report.changeset.clone());
} }
o.txids.sort(); o.txids.sort();
} }
thread_tx.lock().unwrap().send(()).unwrap();
})); }));
conn.register_observer(key.clone(), Arc::clone(&tx_observer)); conn.register_observer(key.clone(), Arc::clone(&tx_observer));
@ -1638,7 +1650,7 @@ mod tests {
} }
let delay = Duration::from_millis(100); let delay = Duration::from_millis(100);
thread::sleep(delay); let _ = rx.recv_timeout(delay);
match Arc::try_unwrap(output) { match Arc::try_unwrap(output) {
Ok(out) => { Ok(out) => {

View file

@ -18,6 +18,8 @@ extern crate lazy_static;
extern crate rusqlite; extern crate rusqlite;
extern crate smallvec;
extern crate uuid; extern crate uuid;
pub extern crate edn; pub extern crate edn;