diff --git a/db/src/cache.rs b/db/src/cache.rs index 2e0ba742..c1e1cb4c 100644 --- a/db/src/cache.rs +++ b/db/src/cache.rs @@ -1369,14 +1369,6 @@ impl<'a> InProgressCacheTransactWatcher<'a> { } impl<'a> TransactWatcher for InProgressCacheTransactWatcher<'a> { - type Result = (); - - - - fn tx_id(&mut self) -> Option { - None - } - fn datom(&mut self, op: OpType, e: Entid, a: Entid, v: &TypedValue) { if !self.active { return; @@ -1410,7 +1402,7 @@ impl<'a> TransactWatcher for InProgressCacheTransactWatcher<'a> { } } - fn done(&mut self, _t: &Entid, schema: &Schema) -> Result { + fn done(&mut self, _t: &Entid, schema: &Schema) -> Result<()> { // Oh, I wish we had impl trait. Without it we have a six-line type signature if we // try to break this out as a helper function. let collected_retractions = mem::replace(&mut self.collected_retractions, Default::default()); diff --git a/db/src/tx_observer.rs b/db/src/tx_observer.rs index d5444b34..1b041e96 100644 --- a/db/src/tx_observer.rs +++ b/db/src/tx_observer.rs @@ -8,20 +8,18 @@ // CONDITIONS OF ANY KIND, either express or implied. See the License for the // specific language governing permissions and limitations under the License. -use std::collections::{ - BTreeMap, -}; use std::sync::{ Arc, - Mutex, Weak, }; + use std::sync::mpsc::{ channel, Receiver, RecvError, Sender, }; + use std::thread; use indexmap::{ @@ -33,6 +31,7 @@ use mentat_core::{ Schema, TypedValue, }; + use mentat_tx::entities::{ OpType, }; @@ -40,37 +39,33 @@ use mentat_tx::entities::{ use errors::{ Result, }; + use types::{ - AccumulatedTxids, AttributeSet, }; + use watcher::TransactWatcher; pub struct TxObserver { - notify_fn: Arc) + Send + Sync>>, + notify_fn: Arc) + Send + Sync>>, attributes: AttributeSet, } impl TxObserver { - pub fn new(attributes: AttributeSet, notify_fn: F) -> TxObserver where F: Fn(&str, BTreeMap<&Entid, &AttributeSet>) + 'static + Send + Sync { + pub fn new(attributes: AttributeSet, notify_fn: F) -> TxObserver where F: Fn(&str, IndexMap<&Entid, &AttributeSet>) + 'static + Send + Sync { TxObserver { notify_fn: Arc::new(Box::new(notify_fn)), attributes, } } - pub fn applicable_reports<'r>(&self, reports: &'r BTreeMap) -> BTreeMap<&'r Entid, &'r AttributeSet> { - reports.into_iter().filter_map(|(txid, changeset)| { - self.attributes.intersection(changeset) - .next() - .and_then(|_| Some((txid, changeset))) - }).fold(BTreeMap::new(), |mut map, (txid, changeset)| { - map.insert(txid, changeset); - map - }) + pub fn applicable_reports<'r>(&self, reports: &'r IndexMap) -> IndexMap<&'r Entid, &'r AttributeSet> { + reports.into_iter() + .filter(|&(_txid, attrs)| !self.attributes.is_disjoint(attrs)) + .collect() } - fn notify(&self, key: &str, reports: BTreeMap<&Entid, &AttributeSet>) { + fn notify(&self, key: &str, reports: IndexMap<&Entid, &AttributeSet>) { (*self.notify_fn)(key, reports); } } @@ -80,12 +75,12 @@ pub trait Command { } pub struct TxCommand { - reports: BTreeMap, + reports: IndexMap, observers: Weak>>, } impl TxCommand { - fn new(observers: &Arc>>, reports: BTreeMap) -> Self { + fn new(observers: &Arc>>, reports: IndexMap) -> Self { TxCommand { reports, observers: Arc::downgrade(observers), @@ -108,7 +103,6 @@ impl Command for TxCommand { pub struct TxObservationService { observers: Arc>>, - transactions: BTreeMap, executor: Option>>, } @@ -116,7 +110,6 @@ impl TxObservationService { pub fn new() -> Self { TxObservationService { observers: Arc::new(IndexMap::new()), - transactions: Default::default(), executor: None, } } @@ -138,21 +131,8 @@ impl TxObservationService { !self.observers.is_empty() } - pub fn add_transaction(&mut self, tx_id: Entid, attributes: AttributeSet) { - self.transactions.insert(tx_id, attributes); - } - - pub fn transaction_did_commit(&mut self, txids: &AccumulatedTxids) { - // collect the changesets relating to this commit - let reports: BTreeMap = txids.into_iter().filter_map(|tx_id| { - self.transactions.remove(&tx_id).map_or(None, |changeset| Some((tx_id, changeset))) - }) - .fold(BTreeMap::new(), |mut map, (tx_id, changeset)| { - map.insert(*tx_id, changeset); - map - }); - - let executor = self.executor.get_or_insert_with(||{ + pub fn in_progress_did_commit(&mut self, txes: IndexMap) { + let executor = self.executor.get_or_insert_with(|| { let (tx, rx): (Sender>, Receiver>) = channel(); let mut worker = CommandExecutor::new(rx); @@ -163,7 +143,7 @@ impl TxObservationService { tx }); - let cmd = Box::new(TxCommand::new(&self.observers, reports)); + let cmd = Box::new(TxCommand::new(&self.observers, txes)); executor.send(cmd).unwrap(); } } @@ -174,42 +154,28 @@ impl Drop for TxObservationService { } } -pub struct InProgressObserverTransactWatcher<'a> { - collected_datoms: AttributeSet, - observer_service: &'a Mutex, - active: bool +pub struct InProgressObserverTransactWatcher { + collected_attributes: AttributeSet, + pub txes: IndexMap, } -impl<'a> InProgressObserverTransactWatcher<'a> { - pub fn new(observer_service: &'a Mutex) -> InProgressObserverTransactWatcher { - let mut w = InProgressObserverTransactWatcher { - collected_datoms: Default::default(), - observer_service, - active: true - }; - - w.active = observer_service.lock().unwrap().has_observers(); - w - } -} - -impl<'a> TransactWatcher for InProgressObserverTransactWatcher<'a> { - type Result = (); - - fn tx_id(&mut self) -> Option { - None - } - - fn datom(&mut self, _op: OpType, _e: Entid, a: Entid, _v: &TypedValue) { - if !self.active { - return +impl InProgressObserverTransactWatcher { + pub fn new() -> InProgressObserverTransactWatcher { + InProgressObserverTransactWatcher { + collected_attributes: Default::default(), + txes: Default::default(), } - self.collected_datoms.insert(a); + } +} + +impl TransactWatcher for InProgressObserverTransactWatcher { + fn datom(&mut self, _op: OpType, _e: Entid, a: Entid, _v: &TypedValue) { + self.collected_attributes.insert(a); } - fn done(&mut self, t: &Entid, _schema: &Schema) -> Result { - let collected_datoms = ::std::mem::replace(&mut self.collected_datoms, Default::default()); - self.observer_service.lock().unwrap().add_transaction(t.clone(), collected_datoms); + fn done(&mut self, t: &Entid, _schema: &Schema) -> Result<()> { + let collected_attributes = ::std::mem::replace(&mut self.collected_attributes, Default::default()); + self.txes.insert(*t, collected_attributes); Ok(()) } } diff --git a/db/src/watcher.rs b/db/src/watcher.rs index d8ce9302..15d8b3b7 100644 --- a/db/src/watcher.rs +++ b/db/src/watcher.rs @@ -32,33 +32,22 @@ use errors::{ }; pub trait TransactWatcher { - type Result; - - fn tx_id(&mut self) -> Option; - fn datom(&mut self, op: OpType, e: Entid, a: Entid, v: &TypedValue); /// Only return an error if you want to interrupt the transact! /// Called with the schema _prior to_ the transact -- any attributes or /// attribute changes transacted during this transact are not reflected in /// the schema. - fn done(&mut self, t: &Entid, schema: &Schema) -> Result; + fn done(&mut self, t: &Entid, schema: &Schema) -> Result<()>; } pub struct NullWatcher(); impl TransactWatcher for NullWatcher { - type Result = (); - - - fn tx_id(&mut self) -> Option { - None - } - fn datom(&mut self, _op: OpType, _e: Entid, _a: Entid, _v: &TypedValue) { } - fn done(&mut self, _t: &Entid, _schema: &Schema) -> Result { + fn done(&mut self, _t: &Entid, _schema: &Schema) -> Result<()> { Ok(()) } } diff --git a/src/conn.rs b/src/conn.rs index c8f343a8..72c3d7de 100644 --- a/src/conn.rs +++ b/src/conn.rs @@ -219,6 +219,7 @@ pub struct InProgress<'a, 'c> { use_caching: bool, tx_ids: AccumulatedTxids, tx_observer: &'a Mutex, + tx_observer_watcher: InProgressObserverTransactWatcher, } /// Represents an in-progress set of reads to the store. Just like `InProgress`, @@ -380,9 +381,9 @@ impl<'a, 'c> InProgress<'a, 'c> { pub fn transact_terms(&mut self, terms: I, tempid_set: InternSet) -> Result where I: IntoIterator { let w = InProgressTransactWatcher::new( - InProgressObserverTransactWatcher::new(self.tx_observer), + &mut self.tx_observer_watcher, self.cache.transact_watcher()); - let (report, next_partition_map, next_schema, mut watcher) = + let (report, next_partition_map, next_schema, _watcher) = transact_terms(&self.transaction, self.partition_map.clone(), &self.schema, @@ -390,9 +391,6 @@ impl<'a, 'c> InProgress<'a, 'c> { w, terms, tempid_set)?; - if let Some(tx_id) = watcher.tx_id() { - self.tx_ids.push(tx_id); - } self.partition_map = next_partition_map; if let Some(schema) = next_schema { self.schema = schema; @@ -410,9 +408,9 @@ impl<'a, 'c> InProgress<'a, 'c> { // `Default::default` in those situations to extract the partition map, and so there // would still be some cost. let w = InProgressTransactWatcher::new( - InProgressObserverTransactWatcher::new(self.tx_observer), + &mut self.tx_observer_watcher, self.cache.transact_watcher()); - let (report, next_partition_map, next_schema, mut watcher) = + let (report, next_partition_map, next_schema, _watcher) = transact(&self.transaction, self.partition_map.clone(), &self.schema, @@ -420,10 +418,6 @@ impl<'a, 'c> InProgress<'a, 'c> { self.schema, w, entities)?; - if let Some(tx_id) = watcher.tx_id() { - self.tx_ids.push(tx_id); - } - self.partition_map = next_partition_map; if let Some(schema) = next_schema { self.schema = schema; @@ -478,7 +472,8 @@ impl<'a, 'c> InProgress<'a, 'c> { // TODO: consider making vocabulary lookup lazy -- we won't need it much of the time. } - self.tx_observer.lock().unwrap().transaction_did_commit(&self.tx_ids); + let txes = self.tx_observer_watcher.txes; + self.tx_observer.lock().unwrap().in_progress_did_commit(txes); Ok(()) } @@ -507,14 +502,14 @@ impl<'a, 'c> InProgress<'a, 'c> { } } -struct InProgressTransactWatcher<'a> { +struct InProgressTransactWatcher<'a, 'o> { cache_watcher: InProgressCacheTransactWatcher<'a>, - observer_watcher: InProgressObserverTransactWatcher<'a>, + observer_watcher: &'o mut InProgressObserverTransactWatcher, tx_id: Option, } -impl<'a> InProgressTransactWatcher<'a> { - fn new(observer_watcher: InProgressObserverTransactWatcher<'a>, cache_watcher: InProgressCacheTransactWatcher<'a>) -> Self { +impl<'a, 'o> InProgressTransactWatcher<'a, 'o> { + fn new(observer_watcher: &'o mut InProgressObserverTransactWatcher, cache_watcher: InProgressCacheTransactWatcher<'a>) -> Self { InProgressTransactWatcher { cache_watcher: cache_watcher, observer_watcher: observer_watcher, @@ -523,19 +518,13 @@ impl<'a> InProgressTransactWatcher<'a> { } } -impl<'a> TransactWatcher for InProgressTransactWatcher<'a> { - type Result = (); - - fn tx_id(&mut self) -> Option { - self.tx_id.take() - } - +impl<'a, 'o> TransactWatcher for InProgressTransactWatcher<'a, 'o> { fn datom(&mut self, op: OpType, e: Entid, a: Entid, v: &TypedValue) { self.cache_watcher.datom(op.clone(), e.clone(), a.clone(), v); self.observer_watcher.datom(op.clone(), e.clone(), a.clone(), v); } - fn done(&mut self, t: &Entid, schema: &Schema) -> ::mentat_db::errors::Result { + fn done(&mut self, t: &Entid, schema: &Schema) -> ::mentat_db::errors::Result<()> { self.cache_watcher.done(t, schema)?; self.observer_watcher.done(t, schema)?; self.tx_id = Some(t.clone()); @@ -778,6 +767,7 @@ impl Conn { use_caching: true, tx_ids: Default::default(), tx_observer: &self.tx_observer_service, + tx_observer_watcher: InProgressObserverTransactWatcher::new(), }) }