From 9f3d2c08b2241ee9948d16707f04c719714c0a5f Mon Sep 17 00:00:00 2001 From: Emily Toop Date: Fri, 9 Mar 2018 12:16:55 +0000 Subject: [PATCH] Populate changeset of attributes inside TxReport during transact. Batch up TxReports for entire transaction. Notify observers about committed transaction. Store transaction observer service inside Conn --- db/src/tx.rs | 16 ++++++++++------ src/conn.rs | 24 ++++++++++++++++++++++-- 2 files changed, 32 insertions(+), 8 deletions(-) diff --git a/db/src/tx.rs b/db/src/tx.rs index 8264e174..96fa989f 100644 --- a/db/src/tx.rs +++ b/db/src/tx.rs @@ -51,8 +51,9 @@ use std::collections::{ BTreeSet, VecDeque, }; - -use std::rc::Rc; +use std::rc::{ + Rc, +}; use db; use db::{ @@ -106,6 +107,7 @@ use schema::{ }; use types::{ Attribute, + AttributeSet, AVPair, AVMap, Entid, @@ -527,8 +529,7 @@ impl<'conn, 'a, W> Tx<'conn, 'a, W> where W: TransactWatcher { /// This approach is explained in https://github.com/mozilla/mentat/wiki/Transacting. // TODO: move this to the transactor layer. pub fn transact_entities(&mut self, entities: I) -> Result - where I: IntoIterator, - W: TransactWatcher { + where I: IntoIterator { // Pipeline stage 1: entities -> terms with tempids and lookup refs. let (terms_with_temp_ids_and_lookup_refs, tempid_set, lookup_ref_set) = self.entities_into_terms_with_temp_ids_and_lookup_refs(entities)?; @@ -542,8 +543,7 @@ impl<'conn, 'a, W> Tx<'conn, 'a, W> where W: TransactWatcher { } pub fn transact_simple_terms(&mut self, terms: I, tempid_set: InternSet) -> Result - where I: IntoIterator, - W: TransactWatcher { + where I: IntoIterator { // TODO: push these into an internal transaction report? let mut tempids: BTreeMap = BTreeMap::default(); @@ -614,7 +614,9 @@ impl<'conn, 'a, W> Tx<'conn, 'a, W> where W: TransactWatcher { final_populations.allocated, inert_terms.into_iter().map(|term| term.unwrap()).collect()].concat(); + let tx_instant; + let mut affected_attrs = AttributeSet::new(); { // TODO: Don't use this block to scope borrowing the schema; instead, extract a helper function. @@ -669,6 +671,7 @@ impl<'conn, 'a, W> Tx<'conn, 'a, W> where W: TransactWatcher { } self.watcher.datom(op, e, a, &v); + affected_attrs.insert(a); let reduced = (e, a, attribute, v, added); match (attribute.fulltext, attribute.multival) { @@ -735,6 +738,7 @@ impl<'conn, 'a, W> Tx<'conn, 'a, W> where W: TransactWatcher { tx_id: self.tx_id, tx_instant, tempids: tempids, + changeset: affected_attrs, }) } } diff --git a/src/conn.rs b/src/conn.rs index 9146d16f..eb56c7aa 100644 --- a/src/conn.rs +++ b/src/conn.rs @@ -57,6 +57,7 @@ use mentat_db::{ transact, transact_terms, PartitionMap, + TxObservationService, TxReport, }; @@ -140,6 +141,7 @@ pub struct Conn { // TODO: maintain cache of query plans that could be shared across threads and invalidated when // the schema changes. #315. + tx_observer_service: Mutex, } /// A convenience wrapper around a single SQLite connection and a Conn. This is suitable @@ -209,10 +211,10 @@ pub struct InProgress<'a, 'c> { generation: u64, partition_map: PartitionMap, schema: Schema, - cache: InProgressSQLiteAttributeCache, - use_caching: bool, + tx_reports: Vec, + observer_service: Option<&'a Mutex>, } /// Represents an in-progress set of reads to the store. Just like `InProgress`, @@ -381,6 +383,7 @@ impl<'a, 'c> InProgress<'a, 'c> { self.cache.transact_watcher(), terms, tempid_set)?; + self.tx_reports.push(report.clone()); self.partition_map = next_partition_map; if let Some(schema) = next_schema { self.schema = schema; @@ -404,6 +407,8 @@ impl<'a, 'c> InProgress<'a, 'c> { &self.schema, self.cache.transact_watcher(), entities)?; + self.tx_reports.push(report.clone()); + self.partition_map = next_partition_map; if let Some(schema) = next_schema { self.schema = schema; @@ -447,6 +452,12 @@ impl<'a, 'c> InProgress<'a, 'c> { metadata.generation += 1; metadata.partition_map = self.partition_map; + // let the transaction observer know that there have been some transactions committed. + if let Some(ref observer_service) = self.observer_service { + let mut os = observer_service.lock().unwrap(); + os.transaction_did_commit(self.tx_reports); + } + // Update the conn's cache if we made any changes. self.cache.commit_to(&mut metadata.attribute_cache); @@ -458,6 +469,12 @@ impl<'a, 'c> InProgress<'a, 'c> { // TODO: consider making vocabulary lookup lazy -- we won't need it much of the time. } + // run any commands that we've created along the way. + if let Some(ref observer_service) = self.observer_service { + let mut os = observer_service.lock().unwrap(); + os.run(); + } + Ok(()) } @@ -571,6 +588,7 @@ impl Conn { fn new(partition_map: PartitionMap, schema: Schema) -> Conn { Conn { metadata: Mutex::new(Metadata::new(0, partition_map, Arc::new(schema), Default::default())), + tx_observer_service: Mutex::new(TxObservationService::new()), } } @@ -712,6 +730,8 @@ impl Conn { schema: (*current_schema).clone(), cache: InProgressSQLiteAttributeCache::from_cache(cache_cow), use_caching: true, + tx_reports: Vec::new(), + observer_service: if self.tx_observer_service.lock().unwrap().has_observers() { Some(&self.tx_observer_service) } else { None }, }) }