Populate changeset of attributes inside TxReport during transact.

Batch up TxReports for entire transaction.
Notify observers about committed transaction.
Store transaction observer service inside Conn
This commit is contained in:
Emily Toop 2018-03-09 12:16:55 +00:00
parent 8d60f2b3d1
commit 9f3d2c08b2
2 changed files with 32 additions and 8 deletions

View file

@ -51,8 +51,9 @@ use std::collections::{
BTreeSet,
VecDeque,
};
use std::rc::Rc;
use std::rc::{
Rc,
};
use db;
use db::{
@ -106,6 +107,7 @@ use schema::{
};
use types::{
Attribute,
AttributeSet,
AVPair,
AVMap,
Entid,
@ -527,8 +529,7 @@ impl<'conn, 'a, W> Tx<'conn, 'a, W> where W: TransactWatcher {
/// This approach is explained in https://github.com/mozilla/mentat/wiki/Transacting.
// TODO: move this to the transactor layer.
pub fn transact_entities<I>(&mut self, entities: I) -> Result<TxReport>
where I: IntoIterator<Item=Entity>,
W: TransactWatcher {
where I: IntoIterator<Item=Entity> {
// Pipeline stage 1: entities -> terms with tempids and lookup refs.
let (terms_with_temp_ids_and_lookup_refs, tempid_set, lookup_ref_set) = self.entities_into_terms_with_temp_ids_and_lookup_refs(entities)?;
@ -542,8 +543,7 @@ impl<'conn, 'a, W> Tx<'conn, 'a, W> where W: TransactWatcher {
}
pub fn transact_simple_terms<I>(&mut self, terms: I, tempid_set: InternSet<TempId>) -> Result<TxReport>
where I: IntoIterator<Item=TermWithTempIds>,
W: TransactWatcher {
where I: IntoIterator<Item=TermWithTempIds> {
// TODO: push these into an internal transaction report?
let mut tempids: BTreeMap<TempId, KnownEntid> = BTreeMap::default();
@ -614,7 +614,9 @@ impl<'conn, 'a, W> Tx<'conn, 'a, W> where W: TransactWatcher {
final_populations.allocated,
inert_terms.into_iter().map(|term| term.unwrap()).collect()].concat();
let tx_instant;
let mut affected_attrs = AttributeSet::new();
{ // TODO: Don't use this block to scope borrowing the schema; instead, extract a helper function.
@ -669,6 +671,7 @@ impl<'conn, 'a, W> Tx<'conn, 'a, W> where W: TransactWatcher {
}
self.watcher.datom(op, e, a, &v);
affected_attrs.insert(a);
let reduced = (e, a, attribute, v, added);
match (attribute.fulltext, attribute.multival) {
@ -735,6 +738,7 @@ impl<'conn, 'a, W> Tx<'conn, 'a, W> where W: TransactWatcher {
tx_id: self.tx_id,
tx_instant,
tempids: tempids,
changeset: affected_attrs,
})
}
}

View file

@ -57,6 +57,7 @@ use mentat_db::{
transact,
transact_terms,
PartitionMap,
TxObservationService,
TxReport,
};
@ -140,6 +141,7 @@ pub struct Conn {
// TODO: maintain cache of query plans that could be shared across threads and invalidated when
// the schema changes. #315.
tx_observer_service: Mutex<TxObservationService>,
}
/// A convenience wrapper around a single SQLite connection and a Conn. This is suitable
@ -209,10 +211,10 @@ pub struct InProgress<'a, 'c> {
generation: u64,
partition_map: PartitionMap,
schema: Schema,
cache: InProgressSQLiteAttributeCache,
use_caching: bool,
tx_reports: Vec<TxReport>,
observer_service: Option<&'a Mutex<TxObservationService>>,
}
/// Represents an in-progress set of reads to the store. Just like `InProgress`,
@ -381,6 +383,7 @@ impl<'a, 'c> InProgress<'a, 'c> {
self.cache.transact_watcher(),
terms,
tempid_set)?;
self.tx_reports.push(report.clone());
self.partition_map = next_partition_map;
if let Some(schema) = next_schema {
self.schema = schema;
@ -404,6 +407,8 @@ impl<'a, 'c> InProgress<'a, 'c> {
&self.schema,
self.cache.transact_watcher(),
entities)?;
self.tx_reports.push(report.clone());
self.partition_map = next_partition_map;
if let Some(schema) = next_schema {
self.schema = schema;
@ -447,6 +452,12 @@ impl<'a, 'c> InProgress<'a, 'c> {
metadata.generation += 1;
metadata.partition_map = self.partition_map;
// let the transaction observer know that there have been some transactions committed.
if let Some(ref observer_service) = self.observer_service {
let mut os = observer_service.lock().unwrap();
os.transaction_did_commit(self.tx_reports);
}
// Update the conn's cache if we made any changes.
self.cache.commit_to(&mut metadata.attribute_cache);
@ -458,6 +469,12 @@ impl<'a, 'c> InProgress<'a, 'c> {
// TODO: consider making vocabulary lookup lazy -- we won't need it much of the time.
}
// run any commands that we've created along the way.
if let Some(ref observer_service) = self.observer_service {
let mut os = observer_service.lock().unwrap();
os.run();
}
Ok(())
}
@ -571,6 +588,7 @@ impl Conn {
fn new(partition_map: PartitionMap, schema: Schema) -> Conn {
Conn {
metadata: Mutex::new(Metadata::new(0, partition_map, Arc::new(schema), Default::default())),
tx_observer_service: Mutex::new(TxObservationService::new()),
}
}
@ -712,6 +730,8 @@ impl Conn {
schema: (*current_schema).clone(),
cache: InProgressSQLiteAttributeCache::from_cache(cache_cow),
use_caching: true,
tx_reports: Vec::new(),
observer_service: if self.tx_observer_service.lock().unwrap().has_observers() { Some(&self.tx_observer_service) } else { None },
})
}