diff --git a/Cargo.toml b/Cargo.toml index 3f6d558b..c622804c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,6 +29,7 @@ rustc_version = "0.2" chrono = "0.4" error-chain = { git = "https://github.com/rnewman/error-chain", branch = "rnewman/sync" } lazy_static = "0.2" +smallvec = "0.6" time = "0.1" uuid = "0.5" diff --git a/db/Cargo.toml b/db/Cargo.toml index 634d9d1a..ecbded5d 100644 --- a/db/Cargo.toml +++ b/db/Cargo.toml @@ -10,6 +10,7 @@ itertools = "0.7" lazy_static = "0.2" num = "0.1" ordered-float = "0.5" +smallvec = "0.6" time = "0.1" [dependencies.rusqlite] diff --git a/db/src/lib.rs b/db/src/lib.rs index cfa6948c..debed886 100644 --- a/db/src/lib.rs +++ b/db/src/lib.rs @@ -21,6 +21,7 @@ extern crate lazy_static; extern crate num; extern crate rusqlite; +extern crate smallvec; extern crate tabwriter; extern crate time; diff --git a/db/src/tx.rs b/db/src/tx.rs index 96fa989f..20d35bfe 100644 --- a/db/src/tx.rs +++ b/db/src/tx.rs @@ -671,6 +671,7 @@ impl<'conn, 'a, W> Tx<'conn, 'a, W> where W: TransactWatcher { } self.watcher.datom(op, e, a, &v); + // TODO: Create something like a watcher to do this for us. affected_attrs.insert(a); let reduced = (e, a, attribute, v, added); diff --git a/db/src/tx_observer.rs b/db/src/tx_observer.rs index 240e9ae0..456bd51a 100644 --- a/db/src/tx_observer.rs +++ b/db/src/tx_observer.rs @@ -10,6 +10,13 @@ use std::sync::{ Arc, + Weak, +}; +use std::sync::mpsc::{ + channel, + Receiver, + RecvError, + Sender, }; use std::thread; @@ -17,155 +24,156 @@ use indexmap::{ IndexMap, }; +use smallvec::{ + SmallVec, +}; + use types::{ AttributeSet, TxReport, }; pub struct TxObserver { - notify_fn: Arc) + Send + Sync>>>, + notify_fn: Arc) + Send + Sync>>, attributes: AttributeSet, } impl TxObserver { - pub fn new(attributes: AttributeSet, notify_fn: F) -> TxObserver where F: Fn(String, Vec) + 'static + Send + Sync { + pub fn new(attributes: AttributeSet, notify_fn: F) -> TxObserver where F: Fn(&str, SmallVec<[&TxReport; 4]>) + 'static + Send + Sync { TxObserver { - notify_fn: Arc::new(Some(Box::new(notify_fn))), + notify_fn: Arc::new(Box::new(notify_fn)), attributes, } } - pub fn applicable_reports(&self, reports: &Vec) -> Vec { - reports.into_iter().filter_map( |report| { - if self.attributes.intersection(&report.changeset).next().is_some(){ - Some(report.clone()) - } else { - None - } + pub fn applicable_reports<'r>(&self, reports: &'r SmallVec<[TxReport; 4]>) -> SmallVec<[&'r TxReport; 4]> { + reports.into_iter().filter_map(|report| { + self.attributes.intersection(&report.changeset) + .next() + .and_then(|_| Some(report)) }).collect() } - fn notify(&self, key: String, reports: Vec) { - if let Some(ref notify_fn) = *self.notify_fn { - (notify_fn)(key, reports); - } else { - eprintln!("no notify function specified for TxObserver"); - } + fn notify(&self, key: &str, reports: SmallVec<[&TxReport; 4]>) { + (*self.notify_fn)(key, reports); } } -pub trait CommandClone { - fn clone_box(&self) -> Box; +pub trait Command { + fn execute(&mut self); } -impl CommandClone for T where T: 'static + Command + Clone + Send { - fn clone_box(&self) -> Box { - Box::new(self.clone()) - } +pub struct TxCommand { + reports: SmallVec<[TxReport; 4]>, + observers: Weak>>, } -pub trait Command: CommandClone { - fn execute(&self); -} - -impl Clone for Box { - fn clone(&self) -> Box { - self.clone_box() - } -} - -#[derive(Clone)] -pub struct NotifyTxObserver { - key: String, - reports: Vec, - observer: Arc, -} - -impl NotifyTxObserver { - pub fn new(key: String, reports: Vec, observer: Arc) -> Self { - NotifyTxObserver { - key, +impl TxCommand { + fn new(observers: &Arc>>, reports: SmallVec<[TxReport; 4]>) -> Self { + TxCommand { reports, - observer, + observers: Arc::downgrade(observers), } } } -impl Command for NotifyTxObserver { - fn execute(&self) { - self.observer.notify(self.key.clone(), self.reports.clone()); - } -} - -#[derive(Clone)] -pub struct AsyncBatchExecutor { - commands: Vec>, -} - -impl Command for AsyncBatchExecutor { - fn execute(&self) { - let command_queue = self.commands.clone(); - thread::spawn (move ||{ - for command in command_queue.iter() { - command.execute(); +impl Command for TxCommand { + fn execute(&mut self) { + self.observers.upgrade().map(|observers| { + for (key, observer) in observers.iter() { + let applicable_reports = observer.applicable_reports(&self.reports); + if !applicable_reports.is_empty() { + observer.notify(&key, applicable_reports); + } } }); } } -#[derive(Clone)] pub struct TxObservationService { - observers: IndexMap>, - pub command_queue: Vec>, + observers: Arc>>, + executor: Option>>, + in_progress_count: i32, } impl TxObservationService { pub fn new() -> Self { TxObservationService { - observers: IndexMap::new(), - command_queue: Vec::new(), + observers: Arc::new(IndexMap::new()), + executor: None, + in_progress_count: 0, } } + // For testing purposes pub fn is_registered(&self, key: &String) -> bool { self.observers.contains_key(key) } pub fn register(&mut self, key: String, observer: Arc) { - self.observers.insert(key.clone(), observer); + Arc::make_mut(&mut self.observers).insert(key, observer); } pub fn deregister(&mut self, key: &String) { - self.observers.remove(key); + Arc::make_mut(&mut self.observers).remove(key); } pub fn has_observers(&self) -> bool { !self.observers.is_empty() } - fn command_from_reports(&self, key: &String, reports: &Vec, observer: &Arc) -> Option> { - let applicable_reports = observer.applicable_reports(reports); - if !applicable_reports.is_empty() { - Some(Box::new(NotifyTxObserver::new(key.clone(), applicable_reports, Arc::clone(observer)))) - } else { - None - } + pub fn transaction_did_start(&mut self) { + self.in_progress_count += 1; } - pub fn transaction_did_commit(&mut self, reports: Vec) { - // notify all observers about their relevant transactions - let commands: Vec> = self.observers - .iter() - .filter_map(|(key, observer)| { self.command_from_reports(&key, &reports, &observer) }) - .collect(); - self.command_queue.push(Box::new(AsyncBatchExecutor{ commands })); - } + pub fn transaction_did_commit(&mut self, reports: SmallVec<[TxReport; 4]>) { + { + let executor = self.executor.get_or_insert_with(||{ + let (tx, rx): (Sender>, Receiver>) = channel(); + let mut worker = CommandExecutor::new(rx); - pub fn run(&mut self) { - for command in self.command_queue.iter() { - command.execute(); + thread::spawn(move || { + worker.main(); + }); + + tx + }); + + let cmd = Box::new(TxCommand::new(&self.observers, reports)); + executor.send(cmd).unwrap(); } - self.command_queue.clear(); + self.in_progress_count -= 1; + + if self.in_progress_count == 0 { + self.executor = None; + } + } +} + +struct CommandExecutor { + reciever: Receiver>, +} + +impl CommandExecutor { + fn new(rx: Receiver>) -> Self { + CommandExecutor { + reciever: rx, + } + } + + fn main(&mut self) { + loop { + match self.reciever.recv() { + Err(RecvError) => { + eprintln!("Disconnected, terminating CommandExecutor"); + return + }, + + Ok(mut cmd) => { + cmd.execute() + }, + } + } } } diff --git a/src/conn.rs b/src/conn.rs index a76d7772..665ea9a0 100644 --- a/src/conn.rs +++ b/src/conn.rs @@ -32,6 +32,8 @@ use rusqlite::{ TransactionBehavior, }; +use smallvec::SmallVec; + use edn; use mentat_core::{ @@ -214,7 +216,8 @@ pub struct InProgress<'a, 'c> { schema: Schema, cache: InProgressSQLiteAttributeCache, use_caching: bool, - tx_reports: Vec, + // TODO: Collect txids/affected datoms in a better way + tx_reports: SmallVec<[TxReport; 4]>, observer_service: Option<&'a Mutex>, } @@ -453,12 +456,6 @@ impl<'a, 'c> InProgress<'a, 'c> { metadata.generation += 1; metadata.partition_map = self.partition_map; - // let the transaction observer know that there have been some transactions committed. - if let Some(ref observer_service) = self.observer_service { - let mut os = observer_service.lock().unwrap(); - os.transaction_did_commit(self.tx_reports); - } - // Update the conn's cache if we made any changes. self.cache.commit_to(&mut metadata.attribute_cache); @@ -470,10 +467,10 @@ impl<'a, 'c> InProgress<'a, 'c> { // TODO: consider making vocabulary lookup lazy -- we won't need it much of the time. } - // run any commands that we've created along the way. + // let the transaction observer know that there have been some transactions committed. if let Some(ref observer_service) = self.observer_service { let mut os = observer_service.lock().unwrap(); - os.run(); + os.transaction_did_commit(self.tx_reports); } Ok(()) @@ -728,6 +725,14 @@ impl Conn { current.attribute_cache.clone()) }; + let mut obs = self.tx_observer_service.lock().unwrap(); + let observer_service = if obs.has_observers() { + obs.transaction_did_start(); + Some(&self.tx_observer_service) + } else { + None + }; + Ok(InProgress { mutex: &self.metadata, transaction: tx, @@ -736,8 +741,8 @@ impl Conn { schema: (*current_schema).clone(), cache: InProgressSQLiteAttributeCache::from_cache(cache_cow), use_caching: true, - tx_reports: Vec::new(), - observer_service: if self.tx_observer_service.lock().unwrap().has_observers() { Some(&self.tx_observer_service) } else { None }, + tx_reports: SmallVec::new(), + observer_service: observer_service, }) } @@ -846,7 +851,6 @@ mod tests { Duration, Instant }; - use std::thread; use mentat_core::{ CachedAttributes, @@ -1534,16 +1538,21 @@ mod tests { let output = Arc::new(Mutex::new(ObserverOutput::default())); let mut_output = Arc::downgrade(&output); + let (tx, rx): (::std::sync::mpsc::Sender<()>, ::std::sync::mpsc::Receiver<()>) = ::std::sync::mpsc::channel(); + // because the TxObserver is in an Arc and is therefore Sync, we have to wrap the Sender in a Mutex to also + // make it Sync. + let thread_tx = Mutex::new(tx); let tx_observer = Arc::new(TxObserver::new(registered_attrs, move |obs_key, batch| { if let Some(out) = mut_output.upgrade() { let mut o = out.lock().unwrap(); - o.called_key = Some(obs_key.clone()); + o.called_key = Some(obs_key.to_string()); for report in batch.iter() { o.txids.push(report.tx_id.clone()); o.changes.push(report.changeset.clone()); } o.txids.sort(); } + thread_tx.lock().unwrap().send(()).unwrap(); })); conn.register_observer(key.clone(), Arc::clone(&tx_observer)); @@ -1557,7 +1566,7 @@ mod tests { let name = format!("todo{}", i); let uuid = Uuid::new_v4(); let mut builder = in_progress.builder().describe_tempid(&name); - builder.add_kw( &kw!(:todo/uuid), TypedValue::Uuid(uuid)).expect("Expected added uuid"); + builder.add_kw(&kw!(:todo/uuid), TypedValue::Uuid(uuid)).expect("Expected added uuid"); builder.add_kw(&kw!(:todo/name), TypedValue::typed_string(&name)).expect("Expected added name"); if i % 2 == 0 { builder.add_kw(&kw!(:todo/completion_date), TypedValue::current_instant()).expect("Expected added date"); @@ -1575,7 +1584,7 @@ mod tests { } let delay = Duration::from_millis(100); - thread::sleep(delay); + let _ = rx.recv_timeout(delay); match Arc::try_unwrap(output) { Ok(out) => { @@ -1608,16 +1617,19 @@ mod tests { let output = Arc::new(Mutex::new(ObserverOutput::default())); let mut_output = Arc::downgrade(&output); + let (tx, rx): (::std::sync::mpsc::Sender<()>, ::std::sync::mpsc::Receiver<()>) = ::std::sync::mpsc::channel(); + let thread_tx = Mutex::new(tx); let tx_observer = Arc::new(TxObserver::new(registered_attrs, move |obs_key, batch| { if let Some(out) = mut_output.upgrade() { let mut o = out.lock().unwrap(); - o.called_key = Some(obs_key.clone()); + o.called_key = Some(obs_key.to_string()); for report in batch.iter() { o.txids.push(report.tx_id.clone()); o.changes.push(report.changeset.clone()); } o.txids.sort(); } + thread_tx.lock().unwrap().send(()).unwrap(); })); conn.register_observer(key.clone(), Arc::clone(&tx_observer)); @@ -1638,7 +1650,7 @@ mod tests { } let delay = Duration::from_millis(100); - thread::sleep(delay); + let _ = rx.recv_timeout(delay); match Arc::try_unwrap(output) { Ok(out) => { diff --git a/src/lib.rs b/src/lib.rs index d51eb60c..fdc28dff 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -18,6 +18,8 @@ extern crate lazy_static; extern crate rusqlite; +extern crate smallvec; + extern crate uuid; pub extern crate edn;