Simplify.

This has a watcher collect txid -> AttributeSet mappings each time a
transact occurs. On commit we retrieve those mappings and hand them over
to the observer service, which filters them and packages them up for
dispatch.
This commit is contained in:
Richard Newman 2018-03-19 11:30:06 -07:00
parent afe6444943
commit 5b37001726
4 changed files with 51 additions and 114 deletions

View file

@ -1369,14 +1369,6 @@ impl<'a> InProgressCacheTransactWatcher<'a> {
} }
impl<'a> TransactWatcher for InProgressCacheTransactWatcher<'a> { impl<'a> TransactWatcher for InProgressCacheTransactWatcher<'a> {
type Result = ();
fn tx_id(&mut self) -> Option<Entid> {
None
}
fn datom(&mut self, op: OpType, e: Entid, a: Entid, v: &TypedValue) { fn datom(&mut self, op: OpType, e: Entid, a: Entid, v: &TypedValue) {
if !self.active { if !self.active {
return; return;
@ -1410,7 +1402,7 @@ impl<'a> TransactWatcher for InProgressCacheTransactWatcher<'a> {
} }
} }
fn done(&mut self, _t: &Entid, schema: &Schema) -> Result<Self::Result> { fn done(&mut self, _t: &Entid, schema: &Schema) -> Result<()> {
// Oh, I wish we had impl trait. Without it we have a six-line type signature if we // Oh, I wish we had impl trait. Without it we have a six-line type signature if we
// try to break this out as a helper function. // try to break this out as a helper function.
let collected_retractions = mem::replace(&mut self.collected_retractions, Default::default()); let collected_retractions = mem::replace(&mut self.collected_retractions, Default::default());

View file

@ -8,20 +8,18 @@
// CONDITIONS OF ANY KIND, either express or implied. See the License for the // CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License. // specific language governing permissions and limitations under the License.
use std::collections::{
BTreeMap,
};
use std::sync::{ use std::sync::{
Arc, Arc,
Mutex,
Weak, Weak,
}; };
use std::sync::mpsc::{ use std::sync::mpsc::{
channel, channel,
Receiver, Receiver,
RecvError, RecvError,
Sender, Sender,
}; };
use std::thread; use std::thread;
use indexmap::{ use indexmap::{
@ -33,6 +31,7 @@ use mentat_core::{
Schema, Schema,
TypedValue, TypedValue,
}; };
use mentat_tx::entities::{ use mentat_tx::entities::{
OpType, OpType,
}; };
@ -40,37 +39,33 @@ use mentat_tx::entities::{
use errors::{ use errors::{
Result, Result,
}; };
use types::{ use types::{
AccumulatedTxids,
AttributeSet, AttributeSet,
}; };
use watcher::TransactWatcher; use watcher::TransactWatcher;
pub struct TxObserver { pub struct TxObserver {
notify_fn: Arc<Box<Fn(&str, BTreeMap<&Entid, &AttributeSet>) + Send + Sync>>, notify_fn: Arc<Box<Fn(&str, IndexMap<&Entid, &AttributeSet>) + Send + Sync>>,
attributes: AttributeSet, attributes: AttributeSet,
} }
impl TxObserver { impl TxObserver {
pub fn new<F>(attributes: AttributeSet, notify_fn: F) -> TxObserver where F: Fn(&str, BTreeMap<&Entid, &AttributeSet>) + 'static + Send + Sync { pub fn new<F>(attributes: AttributeSet, notify_fn: F) -> TxObserver where F: Fn(&str, IndexMap<&Entid, &AttributeSet>) + 'static + Send + Sync {
TxObserver { TxObserver {
notify_fn: Arc::new(Box::new(notify_fn)), notify_fn: Arc::new(Box::new(notify_fn)),
attributes, attributes,
} }
} }
pub fn applicable_reports<'r>(&self, reports: &'r BTreeMap<Entid, AttributeSet>) -> BTreeMap<&'r Entid, &'r AttributeSet> { pub fn applicable_reports<'r>(&self, reports: &'r IndexMap<Entid, AttributeSet>) -> IndexMap<&'r Entid, &'r AttributeSet> {
reports.into_iter().filter_map(|(txid, changeset)| { reports.into_iter()
self.attributes.intersection(changeset) .filter(|&(_txid, attrs)| !self.attributes.is_disjoint(attrs))
.next() .collect()
.and_then(|_| Some((txid, changeset)))
}).fold(BTreeMap::new(), |mut map, (txid, changeset)| {
map.insert(txid, changeset);
map
})
} }
fn notify(&self, key: &str, reports: BTreeMap<&Entid, &AttributeSet>) { fn notify(&self, key: &str, reports: IndexMap<&Entid, &AttributeSet>) {
(*self.notify_fn)(key, reports); (*self.notify_fn)(key, reports);
} }
} }
@ -80,12 +75,12 @@ pub trait Command {
} }
pub struct TxCommand { pub struct TxCommand {
reports: BTreeMap<Entid, AttributeSet>, reports: IndexMap<Entid, AttributeSet>,
observers: Weak<IndexMap<String, Arc<TxObserver>>>, observers: Weak<IndexMap<String, Arc<TxObserver>>>,
} }
impl TxCommand { impl TxCommand {
fn new(observers: &Arc<IndexMap<String, Arc<TxObserver>>>, reports: BTreeMap<Entid, AttributeSet>) -> Self { fn new(observers: &Arc<IndexMap<String, Arc<TxObserver>>>, reports: IndexMap<Entid, AttributeSet>) -> Self {
TxCommand { TxCommand {
reports, reports,
observers: Arc::downgrade(observers), observers: Arc::downgrade(observers),
@ -108,7 +103,6 @@ impl Command for TxCommand {
pub struct TxObservationService { pub struct TxObservationService {
observers: Arc<IndexMap<String, Arc<TxObserver>>>, observers: Arc<IndexMap<String, Arc<TxObserver>>>,
transactions: BTreeMap<Entid, AttributeSet>,
executor: Option<Sender<Box<Command + Send>>>, executor: Option<Sender<Box<Command + Send>>>,
} }
@ -116,7 +110,6 @@ impl TxObservationService {
pub fn new() -> Self { pub fn new() -> Self {
TxObservationService { TxObservationService {
observers: Arc::new(IndexMap::new()), observers: Arc::new(IndexMap::new()),
transactions: Default::default(),
executor: None, executor: None,
} }
} }
@ -138,21 +131,8 @@ impl TxObservationService {
!self.observers.is_empty() !self.observers.is_empty()
} }
pub fn add_transaction(&mut self, tx_id: Entid, attributes: AttributeSet) { pub fn in_progress_did_commit(&mut self, txes: IndexMap<Entid, AttributeSet>) {
self.transactions.insert(tx_id, attributes); let executor = self.executor.get_or_insert_with(|| {
}
pub fn transaction_did_commit(&mut self, txids: &AccumulatedTxids) {
// collect the changesets relating to this commit
let reports: BTreeMap<Entid, AttributeSet> = txids.into_iter().filter_map(|tx_id| {
self.transactions.remove(&tx_id).map_or(None, |changeset| Some((tx_id, changeset)))
})
.fold(BTreeMap::new(), |mut map, (tx_id, changeset)| {
map.insert(*tx_id, changeset);
map
});
let executor = self.executor.get_or_insert_with(||{
let (tx, rx): (Sender<Box<Command + Send>>, Receiver<Box<Command + Send>>) = channel(); let (tx, rx): (Sender<Box<Command + Send>>, Receiver<Box<Command + Send>>) = channel();
let mut worker = CommandExecutor::new(rx); let mut worker = CommandExecutor::new(rx);
@ -163,7 +143,7 @@ impl TxObservationService {
tx tx
}); });
let cmd = Box::new(TxCommand::new(&self.observers, reports)); let cmd = Box::new(TxCommand::new(&self.observers, txes));
executor.send(cmd).unwrap(); executor.send(cmd).unwrap();
} }
} }
@ -174,42 +154,28 @@ impl Drop for TxObservationService {
} }
} }
pub struct InProgressObserverTransactWatcher<'a> { pub struct InProgressObserverTransactWatcher {
collected_datoms: AttributeSet, collected_attributes: AttributeSet,
observer_service: &'a Mutex<TxObservationService>, pub txes: IndexMap<Entid, AttributeSet>,
active: bool
} }
impl<'a> InProgressObserverTransactWatcher<'a> { impl InProgressObserverTransactWatcher {
pub fn new(observer_service: &'a Mutex<TxObservationService>) -> InProgressObserverTransactWatcher { pub fn new() -> InProgressObserverTransactWatcher {
let mut w = InProgressObserverTransactWatcher { InProgressObserverTransactWatcher {
collected_datoms: Default::default(), collected_attributes: Default::default(),
observer_service, txes: Default::default(),
active: true
};
w.active = observer_service.lock().unwrap().has_observers();
w
}
}
impl<'a> TransactWatcher for InProgressObserverTransactWatcher<'a> {
type Result = ();
fn tx_id(&mut self) -> Option<Entid> {
None
}
fn datom(&mut self, _op: OpType, _e: Entid, a: Entid, _v: &TypedValue) {
if !self.active {
return
} }
self.collected_datoms.insert(a); }
}
impl TransactWatcher for InProgressObserverTransactWatcher {
fn datom(&mut self, _op: OpType, _e: Entid, a: Entid, _v: &TypedValue) {
self.collected_attributes.insert(a);
} }
fn done(&mut self, t: &Entid, _schema: &Schema) -> Result<Self::Result> { fn done(&mut self, t: &Entid, _schema: &Schema) -> Result<()> {
let collected_datoms = ::std::mem::replace(&mut self.collected_datoms, Default::default()); let collected_attributes = ::std::mem::replace(&mut self.collected_attributes, Default::default());
self.observer_service.lock().unwrap().add_transaction(t.clone(), collected_datoms); self.txes.insert(*t, collected_attributes);
Ok(()) Ok(())
} }
} }

View file

@ -32,33 +32,22 @@ use errors::{
}; };
pub trait TransactWatcher { pub trait TransactWatcher {
type Result;
fn tx_id(&mut self) -> Option<Entid>;
fn datom(&mut self, op: OpType, e: Entid, a: Entid, v: &TypedValue); fn datom(&mut self, op: OpType, e: Entid, a: Entid, v: &TypedValue);
/// Only return an error if you want to interrupt the transact! /// Only return an error if you want to interrupt the transact!
/// Called with the schema _prior to_ the transact -- any attributes or /// Called with the schema _prior to_ the transact -- any attributes or
/// attribute changes transacted during this transact are not reflected in /// attribute changes transacted during this transact are not reflected in
/// the schema. /// the schema.
fn done(&mut self, t: &Entid, schema: &Schema) -> Result<Self::Result>; fn done(&mut self, t: &Entid, schema: &Schema) -> Result<()>;
} }
pub struct NullWatcher(); pub struct NullWatcher();
impl TransactWatcher for NullWatcher { impl TransactWatcher for NullWatcher {
type Result = ();
fn tx_id(&mut self) -> Option<Entid> {
None
}
fn datom(&mut self, _op: OpType, _e: Entid, _a: Entid, _v: &TypedValue) { fn datom(&mut self, _op: OpType, _e: Entid, _a: Entid, _v: &TypedValue) {
} }
fn done(&mut self, _t: &Entid, _schema: &Schema) -> Result<Self::Result> { fn done(&mut self, _t: &Entid, _schema: &Schema) -> Result<()> {
Ok(()) Ok(())
} }
} }

View file

@ -219,6 +219,7 @@ pub struct InProgress<'a, 'c> {
use_caching: bool, use_caching: bool,
tx_ids: AccumulatedTxids, tx_ids: AccumulatedTxids,
tx_observer: &'a Mutex<TxObservationService>, tx_observer: &'a Mutex<TxObservationService>,
tx_observer_watcher: InProgressObserverTransactWatcher,
} }
/// Represents an in-progress set of reads to the store. Just like `InProgress`, /// Represents an in-progress set of reads to the store. Just like `InProgress`,
@ -380,9 +381,9 @@ impl<'a, 'c> InProgress<'a, 'c> {
pub fn transact_terms<I>(&mut self, terms: I, tempid_set: InternSet<TempId>) -> Result<TxReport> where I: IntoIterator<Item=TermWithTempIds> { pub fn transact_terms<I>(&mut self, terms: I, tempid_set: InternSet<TempId>) -> Result<TxReport> where I: IntoIterator<Item=TermWithTempIds> {
let w = InProgressTransactWatcher::new( let w = InProgressTransactWatcher::new(
InProgressObserverTransactWatcher::new(self.tx_observer), &mut self.tx_observer_watcher,
self.cache.transact_watcher()); self.cache.transact_watcher());
let (report, next_partition_map, next_schema, mut watcher) = let (report, next_partition_map, next_schema, _watcher) =
transact_terms(&self.transaction, transact_terms(&self.transaction,
self.partition_map.clone(), self.partition_map.clone(),
&self.schema, &self.schema,
@ -390,9 +391,6 @@ impl<'a, 'c> InProgress<'a, 'c> {
w, w,
terms, terms,
tempid_set)?; tempid_set)?;
if let Some(tx_id) = watcher.tx_id() {
self.tx_ids.push(tx_id);
}
self.partition_map = next_partition_map; self.partition_map = next_partition_map;
if let Some(schema) = next_schema { if let Some(schema) = next_schema {
self.schema = schema; self.schema = schema;
@ -410,9 +408,9 @@ impl<'a, 'c> InProgress<'a, 'c> {
// `Default::default` in those situations to extract the partition map, and so there // `Default::default` in those situations to extract the partition map, and so there
// would still be some cost. // would still be some cost.
let w = InProgressTransactWatcher::new( let w = InProgressTransactWatcher::new(
InProgressObserverTransactWatcher::new(self.tx_observer), &mut self.tx_observer_watcher,
self.cache.transact_watcher()); self.cache.transact_watcher());
let (report, next_partition_map, next_schema, mut watcher) = let (report, next_partition_map, next_schema, _watcher) =
transact(&self.transaction, transact(&self.transaction,
self.partition_map.clone(), self.partition_map.clone(),
&self.schema, &self.schema,
@ -420,10 +418,6 @@ impl<'a, 'c> InProgress<'a, 'c> {
self.schema, self.schema,
w, w,
entities)?; entities)?;
if let Some(tx_id) = watcher.tx_id() {
self.tx_ids.push(tx_id);
}
self.partition_map = next_partition_map; self.partition_map = next_partition_map;
if let Some(schema) = next_schema { if let Some(schema) = next_schema {
self.schema = schema; self.schema = schema;
@ -478,7 +472,8 @@ impl<'a, 'c> InProgress<'a, 'c> {
// TODO: consider making vocabulary lookup lazy -- we won't need it much of the time. // TODO: consider making vocabulary lookup lazy -- we won't need it much of the time.
} }
self.tx_observer.lock().unwrap().transaction_did_commit(&self.tx_ids); let txes = self.tx_observer_watcher.txes;
self.tx_observer.lock().unwrap().in_progress_did_commit(txes);
Ok(()) Ok(())
} }
@ -507,14 +502,14 @@ impl<'a, 'c> InProgress<'a, 'c> {
} }
} }
struct InProgressTransactWatcher<'a> { struct InProgressTransactWatcher<'a, 'o> {
cache_watcher: InProgressCacheTransactWatcher<'a>, cache_watcher: InProgressCacheTransactWatcher<'a>,
observer_watcher: InProgressObserverTransactWatcher<'a>, observer_watcher: &'o mut InProgressObserverTransactWatcher,
tx_id: Option<Entid>, tx_id: Option<Entid>,
} }
impl<'a> InProgressTransactWatcher<'a> { impl<'a, 'o> InProgressTransactWatcher<'a, 'o> {
fn new(observer_watcher: InProgressObserverTransactWatcher<'a>, cache_watcher: InProgressCacheTransactWatcher<'a>) -> Self { fn new(observer_watcher: &'o mut InProgressObserverTransactWatcher, cache_watcher: InProgressCacheTransactWatcher<'a>) -> Self {
InProgressTransactWatcher { InProgressTransactWatcher {
cache_watcher: cache_watcher, cache_watcher: cache_watcher,
observer_watcher: observer_watcher, observer_watcher: observer_watcher,
@ -523,19 +518,13 @@ impl<'a> InProgressTransactWatcher<'a> {
} }
} }
impl<'a> TransactWatcher for InProgressTransactWatcher<'a> { impl<'a, 'o> TransactWatcher for InProgressTransactWatcher<'a, 'o> {
type Result = ();
fn tx_id(&mut self) -> Option<Entid> {
self.tx_id.take()
}
fn datom(&mut self, op: OpType, e: Entid, a: Entid, v: &TypedValue) { fn datom(&mut self, op: OpType, e: Entid, a: Entid, v: &TypedValue) {
self.cache_watcher.datom(op.clone(), e.clone(), a.clone(), v); self.cache_watcher.datom(op.clone(), e.clone(), a.clone(), v);
self.observer_watcher.datom(op.clone(), e.clone(), a.clone(), v); self.observer_watcher.datom(op.clone(), e.clone(), a.clone(), v);
} }
fn done(&mut self, t: &Entid, schema: &Schema) -> ::mentat_db::errors::Result<Self::Result> { fn done(&mut self, t: &Entid, schema: &Schema) -> ::mentat_db::errors::Result<()> {
self.cache_watcher.done(t, schema)?; self.cache_watcher.done(t, schema)?;
self.observer_watcher.done(t, schema)?; self.observer_watcher.done(t, schema)?;
self.tx_id = Some(t.clone()); self.tx_id = Some(t.clone());
@ -778,6 +767,7 @@ impl Conn {
use_caching: true, use_caching: true,
tx_ids: Default::default(), tx_ids: Default::default(),
tx_observer: &self.tx_observer_service, tx_observer: &self.tx_observer_service,
tx_observer_watcher: InProgressObserverTransactWatcher::new(),
}) })
} }