Populate changeset of attributes inside TxReport during transact.

Batch up TxReports for entire transaction.
Notify observers about committed transaction.
Store transaction observer service inside Conn
This commit is contained in:
Emily Toop 2018-03-09 12:16:55 +00:00
parent ba08807137
commit cf510e758f
2 changed files with 32 additions and 8 deletions

View file

@ -51,8 +51,9 @@ use std::collections::{
BTreeSet, BTreeSet,
VecDeque, VecDeque,
}; };
use std::rc::{
use std::rc::Rc; Rc,
};
use db; use db;
use db::{ use db::{
@ -106,6 +107,7 @@ use schema::{
}; };
use types::{ use types::{
Attribute, Attribute,
AttributeSet,
AVPair, AVPair,
AVMap, AVMap,
Entid, Entid,
@ -527,8 +529,7 @@ impl<'conn, 'a, W> Tx<'conn, 'a, W> where W: TransactWatcher {
/// This approach is explained in https://github.com/mozilla/mentat/wiki/Transacting. /// This approach is explained in https://github.com/mozilla/mentat/wiki/Transacting.
// TODO: move this to the transactor layer. // TODO: move this to the transactor layer.
pub fn transact_entities<I>(&mut self, entities: I) -> Result<TxReport> pub fn transact_entities<I>(&mut self, entities: I) -> Result<TxReport>
where I: IntoIterator<Item=Entity>, where I: IntoIterator<Item=Entity> {
W: TransactWatcher {
// Pipeline stage 1: entities -> terms with tempids and lookup refs. // Pipeline stage 1: entities -> terms with tempids and lookup refs.
let (terms_with_temp_ids_and_lookup_refs, tempid_set, lookup_ref_set) = self.entities_into_terms_with_temp_ids_and_lookup_refs(entities)?; let (terms_with_temp_ids_and_lookup_refs, tempid_set, lookup_ref_set) = self.entities_into_terms_with_temp_ids_and_lookup_refs(entities)?;
@ -542,8 +543,7 @@ impl<'conn, 'a, W> Tx<'conn, 'a, W> where W: TransactWatcher {
} }
pub fn transact_simple_terms<I>(&mut self, terms: I, tempid_set: InternSet<TempId>) -> Result<TxReport> pub fn transact_simple_terms<I>(&mut self, terms: I, tempid_set: InternSet<TempId>) -> Result<TxReport>
where I: IntoIterator<Item=TermWithTempIds>, where I: IntoIterator<Item=TermWithTempIds> {
W: TransactWatcher {
// TODO: push these into an internal transaction report? // TODO: push these into an internal transaction report?
let mut tempids: BTreeMap<TempId, KnownEntid> = BTreeMap::default(); let mut tempids: BTreeMap<TempId, KnownEntid> = BTreeMap::default();
@ -614,7 +614,9 @@ impl<'conn, 'a, W> Tx<'conn, 'a, W> where W: TransactWatcher {
final_populations.allocated, final_populations.allocated,
inert_terms.into_iter().map(|term| term.unwrap()).collect()].concat(); inert_terms.into_iter().map(|term| term.unwrap()).collect()].concat();
let tx_instant; let tx_instant;
let mut affected_attrs = AttributeSet::new();
{ // TODO: Don't use this block to scope borrowing the schema; instead, extract a helper function. { // TODO: Don't use this block to scope borrowing the schema; instead, extract a helper function.
@ -669,6 +671,7 @@ impl<'conn, 'a, W> Tx<'conn, 'a, W> where W: TransactWatcher {
} }
self.watcher.datom(op, e, a, &v); self.watcher.datom(op, e, a, &v);
affected_attrs.insert(a);
let reduced = (e, a, attribute, v, added); let reduced = (e, a, attribute, v, added);
match (attribute.fulltext, attribute.multival) { match (attribute.fulltext, attribute.multival) {
@ -735,6 +738,7 @@ impl<'conn, 'a, W> Tx<'conn, 'a, W> where W: TransactWatcher {
tx_id: self.tx_id, tx_id: self.tx_id,
tx_instant, tx_instant,
tempids: tempids, tempids: tempids,
changeset: affected_attrs,
}) })
} }
} }

View file

@ -57,6 +57,7 @@ use mentat_db::{
transact, transact,
transact_terms, transact_terms,
PartitionMap, PartitionMap,
TxObservationService,
TxReport, TxReport,
}; };
@ -140,6 +141,7 @@ pub struct Conn {
// TODO: maintain cache of query plans that could be shared across threads and invalidated when // TODO: maintain cache of query plans that could be shared across threads and invalidated when
// the schema changes. #315. // the schema changes. #315.
tx_observer_service: Mutex<TxObservationService>,
} }
/// A convenience wrapper around a single SQLite connection and a Conn. This is suitable /// A convenience wrapper around a single SQLite connection and a Conn. This is suitable
@ -202,10 +204,10 @@ pub struct InProgress<'a, 'c> {
generation: u64, generation: u64,
partition_map: PartitionMap, partition_map: PartitionMap,
schema: Schema, schema: Schema,
cache: InProgressSQLiteAttributeCache, cache: InProgressSQLiteAttributeCache,
use_caching: bool, use_caching: bool,
tx_reports: Vec<TxReport>,
observer_service: Option<&'a Mutex<TxObservationService>>,
} }
/// Represents an in-progress set of reads to the store. Just like `InProgress`, /// Represents an in-progress set of reads to the store. Just like `InProgress`,
@ -374,6 +376,7 @@ impl<'a, 'c> InProgress<'a, 'c> {
self.cache.transact_watcher(), self.cache.transact_watcher(),
terms, terms,
tempid_set)?; tempid_set)?;
self.tx_reports.push(report.clone());
self.partition_map = next_partition_map; self.partition_map = next_partition_map;
if let Some(schema) = next_schema { if let Some(schema) = next_schema {
self.schema = schema; self.schema = schema;
@ -397,6 +400,8 @@ impl<'a, 'c> InProgress<'a, 'c> {
&self.schema, &self.schema,
self.cache.transact_watcher(), self.cache.transact_watcher(),
entities)?; entities)?;
self.tx_reports.push(report.clone());
self.partition_map = next_partition_map; self.partition_map = next_partition_map;
if let Some(schema) = next_schema { if let Some(schema) = next_schema {
self.schema = schema; self.schema = schema;
@ -440,6 +445,12 @@ impl<'a, 'c> InProgress<'a, 'c> {
metadata.generation += 1; metadata.generation += 1;
metadata.partition_map = self.partition_map; metadata.partition_map = self.partition_map;
// let the transaction observer know that there have been some transactions committed.
if let Some(ref observer_service) = self.observer_service {
let mut os = observer_service.lock().unwrap();
os.transaction_did_commit(self.tx_reports);
}
// Update the conn's cache if we made any changes. // Update the conn's cache if we made any changes.
self.cache.commit_to(&mut metadata.attribute_cache); self.cache.commit_to(&mut metadata.attribute_cache);
@ -451,6 +462,12 @@ impl<'a, 'c> InProgress<'a, 'c> {
// TODO: consider making vocabulary lookup lazy -- we won't need it much of the time. // TODO: consider making vocabulary lookup lazy -- we won't need it much of the time.
} }
// run any commands that we've created along the way.
if let Some(ref observer_service) = self.observer_service {
let mut os = observer_service.lock().unwrap();
os.run();
}
Ok(()) Ok(())
} }
@ -564,6 +581,7 @@ impl Conn {
fn new(partition_map: PartitionMap, schema: Schema) -> Conn { fn new(partition_map: PartitionMap, schema: Schema) -> Conn {
Conn { Conn {
metadata: Mutex::new(Metadata::new(0, partition_map, Arc::new(schema), Default::default())), metadata: Mutex::new(Metadata::new(0, partition_map, Arc::new(schema), Default::default())),
tx_observer_service: Mutex::new(TxObservationService::new()),
} }
} }
@ -705,6 +723,8 @@ impl Conn {
schema: (*current_schema).clone(), schema: (*current_schema).clone(),
cache: InProgressSQLiteAttributeCache::from_cache(cache_cow), cache: InProgressSQLiteAttributeCache::from_cache(cache_cow),
use_caching: true, use_caching: true,
tx_reports: Vec::new(),
observer_service: if self.tx_observer_service.lock().unwrap().has_observers() { Some(&self.tx_observer_service) } else { None },
}) })
} }