Move to using watcher.

Simplify.

This has a watcher collect txid -> AttributeSet mappings each time a
transact occurs. On commit we retrieve those mappings and hand them over
to the observer service, which filters them and packages them up for
dispatch.

Tidy up
This commit is contained in:
Emily Toop 2018-03-19 17:09:38 +00:00
parent d4365fa4cd
commit ab957948b4
10 changed files with 152 additions and 120 deletions

View file

@ -29,7 +29,6 @@ rustc_version = "0.2"
chrono = "0.4"
error-chain = { git = "https://github.com/rnewman/error-chain", branch = "rnewman/sync" }
lazy_static = "0.2"
smallvec = "0.6"
time = "0.1"
uuid = "0.5"

View file

@ -10,7 +10,6 @@ itertools = "0.7"
lazy_static = "0.2"
num = "0.1"
ordered-float = "0.5"
smallvec = "0.6"
time = "0.1"
[dependencies.rusqlite]

View file

@ -1402,7 +1402,7 @@ impl<'a> TransactWatcher for InProgressCacheTransactWatcher<'a> {
}
}
fn done(&mut self, schema: &Schema) -> Result<()> {
fn done(&mut self, _t: &Entid, schema: &Schema) -> Result<()> {
// Oh, I wish we had impl trait. Without it we have a six-line type signature if we
// try to break this out as a helper function.
let collected_retractions = mem::replace(&mut self.collected_retractions, Default::default());

View file

@ -21,7 +21,6 @@ extern crate lazy_static;
extern crate num;
extern crate rusqlite;
extern crate smallvec;
extern crate tabwriter;
extern crate time;
@ -89,6 +88,7 @@ pub use tx::{
};
pub use tx_observer::{
InProgressObserverTransactWatcher,
TxObservationService,
TxObserver,
};

View file

@ -107,7 +107,6 @@ use schema::{
};
use types::{
Attribute,
AttributeSet,
AVPair,
AVMap,
Entid,
@ -616,8 +615,6 @@ impl<'conn, 'a, W> Tx<'conn, 'a, W> where W: TransactWatcher {
let tx_instant;
let mut affected_attrs = AttributeSet::new();
{ // TODO: Don't use this block to scope borrowing the schema; instead, extract a helper function.
// Assertions that are :db.cardinality/one and not :db.fulltext.
@ -671,8 +668,6 @@ impl<'conn, 'a, W> Tx<'conn, 'a, W> where W: TransactWatcher {
}
self.watcher.datom(op, e, a, &v);
// TODO: Create something like a watcher to do this for us.
affected_attrs.insert(a);
let reduced = (e, a, attribute, v, added);
match (attribute.fulltext, attribute.multival) {
@ -714,7 +709,7 @@ impl<'conn, 'a, W> Tx<'conn, 'a, W> where W: TransactWatcher {
}
db::update_partition_map(self.store, &self.partition_map)?;
self.watcher.done(self.schema)?;
self.watcher.done(&self.tx_id, self.schema)?;
if tx_might_update_metadata {
// Extract changes to metadata from the store.
@ -739,7 +734,6 @@ impl<'conn, 'a, W> Tx<'conn, 'a, W> where W: TransactWatcher {
tx_id: self.tx_id,
tx_instant,
tempids: tempids,
changeset: affected_attrs,
})
}
}
@ -752,7 +746,6 @@ fn start_tx<'conn, 'a, W>(conn: &'conn rusqlite::Connection,
watcher: W) -> Result<Tx<'conn, 'a, W>>
where W: TransactWatcher {
let tx_id = partition_map.allocate_entid(":db.part/tx");
conn.begin_tx_application()?;
Ok(Tx::new(conn, partition_map, schema_for_mutation, schema, watcher, tx_id))

View file

@ -12,49 +12,60 @@ use std::sync::{
Arc,
Weak,
};
use std::sync::mpsc::{
channel,
Receiver,
RecvError,
Sender,
};
use std::thread;
use indexmap::{
IndexMap,
};
use smallvec::{
SmallVec,
use mentat_core::{
Entid,
Schema,
TypedValue,
};
use mentat_tx::entities::{
OpType,
};
use errors::{
Result,
};
use types::{
AttributeSet,
TxReport,
};
use watcher::TransactWatcher;
pub struct TxObserver {
notify_fn: Arc<Box<Fn(&str, SmallVec<[&TxReport; 4]>) + Send + Sync>>,
notify_fn: Arc<Box<Fn(&str, IndexMap<&Entid, &AttributeSet>) + Send + Sync>>,
attributes: AttributeSet,
}
impl TxObserver {
pub fn new<F>(attributes: AttributeSet, notify_fn: F) -> TxObserver where F: Fn(&str, SmallVec<[&TxReport; 4]>) + 'static + Send + Sync {
pub fn new<F>(attributes: AttributeSet, notify_fn: F) -> TxObserver where F: Fn(&str, IndexMap<&Entid, &AttributeSet>) + 'static + Send + Sync {
TxObserver {
notify_fn: Arc::new(Box::new(notify_fn)),
attributes,
}
}
pub fn applicable_reports<'r>(&self, reports: &'r SmallVec<[TxReport; 4]>) -> SmallVec<[&'r TxReport; 4]> {
reports.into_iter().filter_map(|report| {
self.attributes.intersection(&report.changeset)
.next()
.and_then(|_| Some(report))
}).collect()
pub fn applicable_reports<'r>(&self, reports: &'r IndexMap<Entid, AttributeSet>) -> IndexMap<&'r Entid, &'r AttributeSet> {
reports.into_iter()
.filter(|&(_txid, attrs)| !self.attributes.is_disjoint(attrs))
.collect()
}
fn notify(&self, key: &str, reports: SmallVec<[&TxReport; 4]>) {
fn notify(&self, key: &str, reports: IndexMap<&Entid, &AttributeSet>) {
(*self.notify_fn)(key, reports);
}
}
@ -64,12 +75,12 @@ pub trait Command {
}
pub struct TxCommand {
reports: SmallVec<[TxReport; 4]>,
reports: IndexMap<Entid, AttributeSet>,
observers: Weak<IndexMap<String, Arc<TxObserver>>>,
}
impl TxCommand {
fn new(observers: &Arc<IndexMap<String, Arc<TxObserver>>>, reports: SmallVec<[TxReport; 4]>) -> Self {
fn new(observers: &Arc<IndexMap<String, Arc<TxObserver>>>, reports: IndexMap<Entid, AttributeSet>) -> Self {
TxCommand {
reports,
observers: Arc::downgrade(observers),
@ -93,7 +104,6 @@ impl Command for TxCommand {
pub struct TxObservationService {
observers: Arc<IndexMap<String, Arc<TxObserver>>>,
executor: Option<Sender<Box<Command + Send>>>,
in_progress_count: i32,
}
impl TxObservationService {
@ -101,7 +111,6 @@ impl TxObservationService {
TxObservationService {
observers: Arc::new(IndexMap::new()),
executor: None,
in_progress_count: 0,
}
}
@ -122,49 +131,69 @@ impl TxObservationService {
!self.observers.is_empty()
}
pub fn transaction_did_start(&mut self) {
self.in_progress_count += 1;
}
pub fn in_progress_did_commit(&mut self, txes: IndexMap<Entid, AttributeSet>) {
let executor = self.executor.get_or_insert_with(|| {
let (tx, rx): (Sender<Box<Command + Send>>, Receiver<Box<Command + Send>>) = channel();
let mut worker = CommandExecutor::new(rx);
pub fn transaction_did_commit(&mut self, reports: SmallVec<[TxReport; 4]>) {
{
let executor = self.executor.get_or_insert_with(||{
let (tx, rx): (Sender<Box<Command + Send>>, Receiver<Box<Command + Send>>) = channel();
let mut worker = CommandExecutor::new(rx);
thread::spawn(move || {
worker.main();
});
tx
thread::spawn(move || {
worker.main();
});
let cmd = Box::new(TxCommand::new(&self.observers, reports));
executor.send(cmd).unwrap();
}
tx
});
self.in_progress_count -= 1;
let cmd = Box::new(TxCommand::new(&self.observers, txes));
executor.send(cmd).unwrap();
}
}
if self.in_progress_count == 0 {
self.executor = None;
impl Drop for TxObservationService {
fn drop(&mut self) {
self.executor = None;
}
}
pub struct InProgressObserverTransactWatcher {
collected_attributes: AttributeSet,
pub txes: IndexMap<Entid, AttributeSet>,
}
impl InProgressObserverTransactWatcher {
pub fn new() -> InProgressObserverTransactWatcher {
InProgressObserverTransactWatcher {
collected_attributes: Default::default(),
txes: Default::default(),
}
}
}
impl TransactWatcher for InProgressObserverTransactWatcher {
fn datom(&mut self, _op: OpType, _e: Entid, a: Entid, _v: &TypedValue) {
self.collected_attributes.insert(a);
}
fn done(&mut self, t: &Entid, _schema: &Schema) -> Result<()> {
let collected_attributes = ::std::mem::replace(&mut self.collected_attributes, Default::default());
self.txes.insert(*t, collected_attributes);
Ok(())
}
}
struct CommandExecutor {
reciever: Receiver<Box<Command + Send>>,
receiver: Receiver<Box<Command + Send>>,
}
impl CommandExecutor {
fn new(rx: Receiver<Box<Command + Send>>) -> Self {
CommandExecutor {
reciever: rx,
receiver: rx,
}
}
fn main(&mut self) {
loop {
match self.reciever.recv() {
match self.receiver.recv() {
Err(RecvError) => {
eprintln!("Disconnected, terminating CommandExecutor");
return

View file

@ -103,7 +103,4 @@ pub struct TxReport {
/// existing entid, or is allocated a new entid. (It is possible for multiple distinct string
/// literal tempids to all unify to a single freshly allocated entid.)
pub tempids: BTreeMap<String, Entid>,
// A set of entids for attributes that were affected inside this transaction
pub changeset: AttributeSet,
}

View file

@ -38,7 +38,7 @@ pub trait TransactWatcher {
/// Called with the schema _prior to_ the transact -- any attributes or
/// attribute changes transacted during this transact are not reflected in
/// the schema.
fn done(&mut self, schema: &Schema) -> Result<()>;
fn done(&mut self, t: &Entid, schema: &Schema) -> Result<()>;
}
pub struct NullWatcher();
@ -47,7 +47,7 @@ impl TransactWatcher for NullWatcher {
fn datom(&mut self, _op: OpType, _e: Entid, _a: Entid, _v: &TypedValue) {
}
fn done(&mut self, _schema: &Schema) -> Result<()> {
fn done(&mut self, _t: &Entid, _schema: &Schema) -> Result<()> {
Ok(())
}
}

View file

@ -32,8 +32,6 @@ use rusqlite::{
TransactionBehavior,
};
use smallvec::SmallVec;
use edn;
use mentat_core::{
@ -50,6 +48,7 @@ use mentat_core::{
use mentat_core::intern_set::InternSet;
use mentat_db::cache::{
InProgressCacheTransactWatcher,
InProgressSQLiteAttributeCache,
SQLiteAttributeCache,
};
@ -58,7 +57,9 @@ use mentat_db::db;
use mentat_db::{
transact,
transact_terms,
InProgressObserverTransactWatcher,
PartitionMap,
TransactWatcher,
TxObservationService,
TxObserver,
TxReport,
@ -68,7 +69,10 @@ use mentat_db::internal_types::TermWithTempIds;
use mentat_tx;
use mentat_tx::entities::TempId;
use mentat_tx::entities::{
TempId,
OpType,
};
use mentat_tx_parser;
@ -216,9 +220,8 @@ pub struct InProgress<'a, 'c> {
schema: Schema,
cache: InProgressSQLiteAttributeCache,
use_caching: bool,
// TODO: Collect txids/affected datoms in a better way
tx_reports: SmallVec<[TxReport; 4]>,
observer_service: Option<&'a Mutex<TxObservationService>>,
tx_observer: &'a Mutex<TxObservationService>,
tx_observer_watcher: InProgressObserverTransactWatcher,
}
/// Represents an in-progress set of reads to the store. Just like `InProgress`,
@ -379,15 +382,17 @@ impl<'a, 'c> InProgress<'a, 'c> {
}
pub fn transact_terms<I>(&mut self, terms: I, tempid_set: InternSet<TempId>) -> Result<TxReport> where I: IntoIterator<Item=TermWithTempIds> {
let w = InProgressTransactWatcher::new(
&mut self.tx_observer_watcher,
self.cache.transact_watcher());
let (report, next_partition_map, next_schema, _watcher) =
transact_terms(&self.transaction,
self.partition_map.clone(),
&self.schema,
&self.schema,
self.cache.transact_watcher(),
w,
terms,
tempid_set)?;
self.tx_reports.push(report.clone());
self.partition_map = next_partition_map;
if let Some(schema) = next_schema {
self.schema = schema;
@ -404,15 +409,16 @@ impl<'a, 'c> InProgress<'a, 'c> {
// `Metadata` on return. If we used `Cell` or other mechanisms, we'd be using
// `Default::default` in those situations to extract the partition map, and so there
// would still be some cost.
let w = InProgressTransactWatcher::new(
&mut self.tx_observer_watcher,
self.cache.transact_watcher());
let (report, next_partition_map, next_schema, _watcher) =
transact(&self.transaction,
self.partition_map.clone(),
&self.schema,
&self.schema,
self.cache.transact_watcher(),
w,
entities)?;
self.tx_reports.push(report.clone());
self.partition_map = next_partition_map;
if let Some(schema) = next_schema {
self.schema = schema;
@ -467,11 +473,8 @@ impl<'a, 'c> InProgress<'a, 'c> {
// TODO: consider making vocabulary lookup lazy -- we won't need it much of the time.
}
// let the transaction observer know that there have been some transactions committed.
if let Some(ref observer_service) = self.observer_service {
let mut os = observer_service.lock().unwrap();
os.transaction_did_commit(self.tx_reports);
}
let txes = self.tx_observer_watcher.txes;
self.tx_observer.lock().unwrap().in_progress_did_commit(txes);
Ok(())
}
@ -500,6 +503,36 @@ impl<'a, 'c> InProgress<'a, 'c> {
}
}
struct InProgressTransactWatcher<'a, 'o> {
cache_watcher: InProgressCacheTransactWatcher<'a>,
observer_watcher: &'o mut InProgressObserverTransactWatcher,
tx_id: Option<Entid>,
}
impl<'a, 'o> InProgressTransactWatcher<'a, 'o> {
fn new(observer_watcher: &'o mut InProgressObserverTransactWatcher, cache_watcher: InProgressCacheTransactWatcher<'a>) -> Self {
InProgressTransactWatcher {
cache_watcher: cache_watcher,
observer_watcher: observer_watcher,
tx_id: None,
}
}
}
impl<'a, 'o> TransactWatcher for InProgressTransactWatcher<'a, 'o> {
fn datom(&mut self, op: OpType, e: Entid, a: Entid, v: &TypedValue) {
self.cache_watcher.datom(op.clone(), e.clone(), a.clone(), v);
self.observer_watcher.datom(op.clone(), e.clone(), a.clone(), v);
}
fn done(&mut self, t: &Entid, schema: &Schema) -> ::mentat_db::errors::Result<()> {
self.cache_watcher.done(t, schema)?;
self.observer_watcher.done(t, schema)?;
self.tx_id = Some(t.clone());
Ok(())
}
}
impl Store {
/// Intended for use from tests.
pub fn sqlite_mut(&mut self) -> &mut rusqlite::Connection {
@ -725,14 +758,6 @@ impl Conn {
current.attribute_cache.clone())
};
let mut obs = self.tx_observer_service.lock().unwrap();
let observer_service = if obs.has_observers() {
obs.transaction_did_start();
Some(&self.tx_observer_service)
} else {
None
};
Ok(InProgress {
mutex: &self.metadata,
transaction: tx,
@ -741,8 +766,8 @@ impl Conn {
schema: (*current_schema).clone(),
cache: InProgressSQLiteAttributeCache::from_cache(cache_cow),
use_caching: true,
tx_reports: SmallVec::new(),
observer_service: observer_service,
tx_observer: &self.tx_observer_service,
tx_observer_watcher: InProgressObserverTransactWatcher::new(),
})
}
@ -847,9 +872,10 @@ mod tests {
use std::path::{
PathBuf,
};
use std::sync::mpsc;
use std::time::{
Duration,
Instant
Instant,
};
use mentat_core::{
@ -879,7 +905,7 @@ mod tests {
};
use ::vocabulary::attribute::{
Unique
Unique,
};
use mentat_db::USER0;
@ -1514,7 +1540,7 @@ mod tests {
in_progress.commit().expect("Expected vocabulary committed");
}
#[derive(Default)]
#[derive(Default, Debug)]
struct ObserverOutput {
txids: Vec<i64>,
changes: Vec<BTreeSet<i64>>,
@ -1538,7 +1564,7 @@ mod tests {
let output = Arc::new(Mutex::new(ObserverOutput::default()));
let mut_output = Arc::downgrade(&output);
let (tx, rx): (::std::sync::mpsc::Sender<()>, ::std::sync::mpsc::Receiver<()>) = ::std::sync::mpsc::channel();
let (tx, rx): (mpsc::Sender<()>, mpsc::Receiver<()>) = mpsc::channel();
// because the TxObserver is in an Arc and is therefore Sync, we have to wrap the Sender in a Mutex to also
// make it Sync.
let thread_tx = Mutex::new(tx);
@ -1546,9 +1572,9 @@ mod tests {
if let Some(out) = mut_output.upgrade() {
let mut o = out.lock().unwrap();
o.called_key = Some(obs_key.to_string());
for report in batch.iter() {
o.txids.push(report.tx_id.clone());
o.changes.push(report.changeset.clone());
for (tx_id, changes) in batch.into_iter() {
o.txids.push(*tx_id);
o.changes.push(changes.clone());
}
o.txids.sort();
}
@ -1560,21 +1586,26 @@ mod tests {
let mut tx_ids = Vec::new();
let mut changesets = Vec::new();
let uuid_entid: Entid = conn.current_schema().get_entid(&kw!(:todo/uuid)).expect("entid to exist for name").into();
{
let mut in_progress = conn.begin_transaction(&mut sqlite).expect("expected transaction");
for i in 0..3 {
let mut changeset = BTreeSet::new();
let name = format!("todo{}", i);
let uuid = Uuid::new_v4();
let mut builder = in_progress.builder().describe_tempid(&name);
builder.add_kw(&kw!(:todo/uuid), TypedValue::Uuid(uuid)).expect("Expected added uuid");
changeset.insert(uuid_entid.clone());
builder.add_kw(&kw!(:todo/name), TypedValue::typed_string(&name)).expect("Expected added name");
changeset.insert(name_entid.clone());
if i % 2 == 0 {
builder.add_kw(&kw!(:todo/completion_date), TypedValue::current_instant()).expect("Expected added date");
changeset.insert(date_entid.clone());
}
let (ip, r) = builder.transact();
let report = r.expect("expected a report");
tx_ids.push(report.tx_id.clone());
changesets.push(report.changeset.clone());
changesets.push(changeset);
in_progress = ip;
}
let mut builder = in_progress.builder().describe_tempid("Label");
@ -1586,18 +1617,11 @@ mod tests {
let delay = Duration::from_millis(100);
let _ = rx.recv_timeout(delay);
match Arc::try_unwrap(output) {
Ok(out) => {
let o = out.into_inner().expect("Expected an Output");
assert_eq!(o.called_key, Some(key.clone()));
assert_eq!(o.txids, tx_ids);
assert_eq!(o.changes, changesets);
},
_ => {
println!("Unable to unwrap output");
assert!(false);
}
}
let out = Arc::try_unwrap(output).expect("unwrapped");
let o = out.into_inner().expect("Expected an Output");
assert_eq!(o.called_key, Some(key.clone()));
assert_eq!(o.txids, tx_ids);
assert_eq!(o.changes, changesets);
}
#[test]
@ -1617,15 +1641,15 @@ mod tests {
let output = Arc::new(Mutex::new(ObserverOutput::default()));
let mut_output = Arc::downgrade(&output);
let (tx, rx): (::std::sync::mpsc::Sender<()>, ::std::sync::mpsc::Receiver<()>) = ::std::sync::mpsc::channel();
let (tx, rx): (mpsc::Sender<()>, mpsc::Receiver<()>) = mpsc::channel();
let thread_tx = Mutex::new(tx);
let tx_observer = Arc::new(TxObserver::new(registered_attrs, move |obs_key, batch| {
if let Some(out) = mut_output.upgrade() {
let mut o = out.lock().unwrap();
o.called_key = Some(obs_key.to_string());
for report in batch.iter() {
o.txids.push(report.tx_id.clone());
o.changes.push(report.changeset.clone());
for (tx_id, changes) in batch.into_iter() {
o.txids.push(*tx_id);
o.changes.push(changes.clone());
}
o.txids.sort();
}
@ -1652,17 +1676,10 @@ mod tests {
let delay = Duration::from_millis(100);
let _ = rx.recv_timeout(delay);
match Arc::try_unwrap(output) {
Ok(out) => {
let o = out.into_inner().expect("Expected an Output");
assert_eq!(o.called_key, None);
assert_eq!(o.txids, tx_ids);
assert_eq!(o.changes, changesets);
},
_ => {
println!("Unable to unwrap output");
assert!(false);
}
}
let out = Arc::try_unwrap(output).expect("unwrapped");
let o = out.into_inner().expect("Expected an Output");
assert_eq!(o.called_key, None);
assert_eq!(o.txids, tx_ids);
assert_eq!(o.changes, changesets);
}
}

View file

@ -18,8 +18,6 @@ extern crate lazy_static;
extern crate rusqlite;
extern crate smallvec;
extern crate uuid;
pub extern crate edn;