address review comments

This commit is contained in:
Emily Toop 2018-03-15 16:57:04 +00:00
parent 352c16425d
commit dfe433d370
7 changed files with 37 additions and 17 deletions

View file

@ -29,6 +29,7 @@ rustc_version = "0.2"
chrono = "0.4" chrono = "0.4"
error-chain = { git = "https://github.com/rnewman/error-chain", branch = "rnewman/sync" } error-chain = { git = "https://github.com/rnewman/error-chain", branch = "rnewman/sync" }
lazy_static = "0.2" lazy_static = "0.2"
smallvec = "0.6"
time = "0.1" time = "0.1"
uuid = "0.5" uuid = "0.5"

View file

@ -10,6 +10,7 @@ itertools = "0.7"
lazy_static = "0.2" lazy_static = "0.2"
num = "0.1" num = "0.1"
ordered-float = "0.5" ordered-float = "0.5"
smallvec = "0.6"
time = "0.1" time = "0.1"
[dependencies.rusqlite] [dependencies.rusqlite]

View file

@ -21,6 +21,7 @@ extern crate lazy_static;
extern crate num; extern crate num;
extern crate rusqlite; extern crate rusqlite;
extern crate smallvec;
extern crate tabwriter; extern crate tabwriter;
extern crate time; extern crate time;

View file

@ -671,6 +671,7 @@ impl<'conn, 'a, W> Tx<'conn, 'a, W> where W: TransactWatcher {
} }
self.watcher.datom(op, e, a, &v); self.watcher.datom(op, e, a, &v);
// TODO: Create something like a watcher to do this for us.
affected_attrs.insert(a); affected_attrs.insert(a);
let reduced = (e, a, attribute, v, added); let reduced = (e, a, attribute, v, added);

View file

@ -24,33 +24,37 @@ use indexmap::{
IndexMap, IndexMap,
}; };
use smallvec::{
SmallVec,
};
use types::{ use types::{
AttributeSet, AttributeSet,
TxReport, TxReport,
}; };
pub struct TxObserver { pub struct TxObserver {
notify_fn: Arc<Box<Fn(String, Vec<&TxReport>) + Send + Sync>>, notify_fn: Arc<Box<Fn(&str, SmallVec<[&TxReport; 4]>) + Send + Sync>>,
attributes: AttributeSet, attributes: AttributeSet,
} }
impl TxObserver { impl TxObserver {
pub fn new<F>(attributes: AttributeSet, notify_fn: F) -> TxObserver where F: Fn(String, Vec<&TxReport>) + 'static + Send + Sync { pub fn new<F>(attributes: AttributeSet, notify_fn: F) -> TxObserver where F: Fn(&str, SmallVec<[&TxReport; 4]>) + 'static + Send + Sync {
TxObserver { TxObserver {
notify_fn: Arc::new(Box::new(notify_fn)), notify_fn: Arc::new(Box::new(notify_fn)),
attributes, attributes,
} }
} }
pub fn applicable_reports<'r>(&self, reports: &'r Vec<TxReport>) -> Vec<&'r TxReport> { pub fn applicable_reports<'r>(&self, reports: &'r SmallVec<[TxReport; 4]>) -> SmallVec<[&'r TxReport; 4]> {
reports.into_iter().filter_map( |report| { reports.into_iter().filter_map(|report| {
self.attributes.intersection(&report.changeset) self.attributes.intersection(&report.changeset)
.next() .next()
.and_then(|_| Some(report)) .and_then(|_| Some(report))
}).collect() }).collect()
} }
fn notify(&self, key: String, reports: Vec<&TxReport>) { fn notify(&self, key: &str, reports: SmallVec<[&TxReport; 4]>) {
(*self.notify_fn)(key, reports); (*self.notify_fn)(key, reports);
} }
} }
@ -60,12 +64,12 @@ pub trait Command {
} }
pub struct TxCommand { pub struct TxCommand {
reports: Vec<TxReport>, reports: SmallVec<[TxReport; 4]>,
observers: Weak<IndexMap<String, Arc<TxObserver>>>, observers: Weak<IndexMap<String, Arc<TxObserver>>>,
} }
impl TxCommand { impl TxCommand {
fn new(observers: &Arc<IndexMap<String, Arc<TxObserver>>>, reports: Vec<TxReport>) -> Self { fn new(observers: &Arc<IndexMap<String, Arc<TxObserver>>>, reports: SmallVec<[TxReport; 4]>) -> Self {
TxCommand { TxCommand {
reports, reports,
observers: Arc::downgrade(observers), observers: Arc::downgrade(observers),
@ -79,7 +83,7 @@ impl Command for TxCommand {
for (key, observer) in observers.iter() { for (key, observer) in observers.iter() {
let applicable_reports = observer.applicable_reports(&self.reports); let applicable_reports = observer.applicable_reports(&self.reports);
if !applicable_reports.is_empty() { if !applicable_reports.is_empty() {
observer.notify(key.clone(), applicable_reports); observer.notify(&key, applicable_reports);
} }
} }
}); });
@ -122,7 +126,7 @@ impl TxObservationService {
self.in_progress_count += 1; self.in_progress_count += 1;
} }
pub fn transaction_did_commit(&mut self, reports: Vec<TxReport>) { pub fn transaction_did_commit(&mut self, reports: SmallVec<[TxReport; 4]>) {
{ {
let executor = self.executor.get_or_insert_with(||{ let executor = self.executor.get_or_insert_with(||{
let (tx, rx): (Sender<Box<Command + Send>>, Receiver<Box<Command + Send>>) = channel(); let (tx, rx): (Sender<Box<Command + Send>>, Receiver<Box<Command + Send>>) = channel();

View file

@ -32,6 +32,8 @@ use rusqlite::{
TransactionBehavior, TransactionBehavior,
}; };
use smallvec::SmallVec;
use edn; use edn;
use mentat_core::{ use mentat_core::{
@ -207,7 +209,8 @@ pub struct InProgress<'a, 'c> {
schema: Schema, schema: Schema,
cache: InProgressSQLiteAttributeCache, cache: InProgressSQLiteAttributeCache,
use_caching: bool, use_caching: bool,
tx_reports: Vec<TxReport>, // TODO: Collect txids/affected datoms in a better way
tx_reports: SmallVec<[TxReport; 4]>,
observer_service: Option<&'a Mutex<TxObservationService>>, observer_service: Option<&'a Mutex<TxObservationService>>,
} }
@ -731,7 +734,7 @@ impl Conn {
schema: (*current_schema).clone(), schema: (*current_schema).clone(),
cache: InProgressSQLiteAttributeCache::from_cache(cache_cow), cache: InProgressSQLiteAttributeCache::from_cache(cache_cow),
use_caching: true, use_caching: true,
tx_reports: Vec::new(), tx_reports: SmallVec::new(),
observer_service: observer_service, observer_service: observer_service,
}) })
} }
@ -841,7 +844,6 @@ mod tests {
Duration, Duration,
Instant Instant
}; };
use std::thread;
use mentat_core::{ use mentat_core::{
CachedAttributes, CachedAttributes,
@ -1529,16 +1531,21 @@ mod tests {
let output = Arc::new(Mutex::new(ObserverOutput::default())); let output = Arc::new(Mutex::new(ObserverOutput::default()));
let mut_output = Arc::downgrade(&output); let mut_output = Arc::downgrade(&output);
let (tx, rx): (::std::sync::mpsc::Sender<()>, ::std::sync::mpsc::Receiver<()>) = ::std::sync::mpsc::channel();
// because the TxObserver is in an Arc and is therefore Sync, we have to wrap the Sender in a Mutex to also
// make it Sync.
let thread_tx = Mutex::new(tx);
let tx_observer = Arc::new(TxObserver::new(registered_attrs, move |obs_key, batch| { let tx_observer = Arc::new(TxObserver::new(registered_attrs, move |obs_key, batch| {
if let Some(out) = mut_output.upgrade() { if let Some(out) = mut_output.upgrade() {
let mut o = out.lock().unwrap(); let mut o = out.lock().unwrap();
o.called_key = Some(obs_key.clone()); o.called_key = Some(obs_key.to_string());
for report in batch.iter() { for report in batch.iter() {
o.txids.push(report.tx_id.clone()); o.txids.push(report.tx_id.clone());
o.changes.push(report.changeset.clone()); o.changes.push(report.changeset.clone());
} }
o.txids.sort(); o.txids.sort();
} }
thread_tx.lock().unwrap().send(()).unwrap();
})); }));
conn.register_observer(key.clone(), Arc::clone(&tx_observer)); conn.register_observer(key.clone(), Arc::clone(&tx_observer));
@ -1552,7 +1559,7 @@ mod tests {
let name = format!("todo{}", i); let name = format!("todo{}", i);
let uuid = Uuid::new_v4(); let uuid = Uuid::new_v4();
let mut builder = in_progress.builder().describe_tempid(&name); let mut builder = in_progress.builder().describe_tempid(&name);
builder.add_kw( &kw!(:todo/uuid), TypedValue::Uuid(uuid)).expect("Expected added uuid"); builder.add_kw(&kw!(:todo/uuid), TypedValue::Uuid(uuid)).expect("Expected added uuid");
builder.add_kw(&kw!(:todo/name), TypedValue::typed_string(&name)).expect("Expected added name"); builder.add_kw(&kw!(:todo/name), TypedValue::typed_string(&name)).expect("Expected added name");
if i % 2 == 0 { if i % 2 == 0 {
builder.add_kw(&kw!(:todo/completion_date), TypedValue::current_instant()).expect("Expected added date"); builder.add_kw(&kw!(:todo/completion_date), TypedValue::current_instant()).expect("Expected added date");
@ -1570,7 +1577,7 @@ mod tests {
} }
let delay = Duration::from_millis(100); let delay = Duration::from_millis(100);
thread::sleep(delay); let _ = rx.recv_timeout(delay);
match Arc::try_unwrap(output) { match Arc::try_unwrap(output) {
Ok(out) => { Ok(out) => {
@ -1603,16 +1610,19 @@ mod tests {
let output = Arc::new(Mutex::new(ObserverOutput::default())); let output = Arc::new(Mutex::new(ObserverOutput::default()));
let mut_output = Arc::downgrade(&output); let mut_output = Arc::downgrade(&output);
let (tx, rx): (::std::sync::mpsc::Sender<()>, ::std::sync::mpsc::Receiver<()>) = ::std::sync::mpsc::channel();
let thread_tx = Mutex::new(tx);
let tx_observer = Arc::new(TxObserver::new(registered_attrs, move |obs_key, batch| { let tx_observer = Arc::new(TxObserver::new(registered_attrs, move |obs_key, batch| {
if let Some(out) = mut_output.upgrade() { if let Some(out) = mut_output.upgrade() {
let mut o = out.lock().unwrap(); let mut o = out.lock().unwrap();
o.called_key = Some(obs_key.clone()); o.called_key = Some(obs_key.to_string());
for report in batch.iter() { for report in batch.iter() {
o.txids.push(report.tx_id.clone()); o.txids.push(report.tx_id.clone());
o.changes.push(report.changeset.clone()); o.changes.push(report.changeset.clone());
} }
o.txids.sort(); o.txids.sort();
} }
thread_tx.lock().unwrap().send(()).unwrap();
})); }));
conn.register_observer(key.clone(), Arc::clone(&tx_observer)); conn.register_observer(key.clone(), Arc::clone(&tx_observer));
@ -1633,7 +1643,7 @@ mod tests {
} }
let delay = Duration::from_millis(100); let delay = Duration::from_millis(100);
thread::sleep(delay); let _ = rx.recv_timeout(delay);
match Arc::try_unwrap(output) { match Arc::try_unwrap(output) {
Ok(out) => { Ok(out) => {

View file

@ -18,6 +18,8 @@ extern crate lazy_static;
extern crate rusqlite; extern crate rusqlite;
extern crate smallvec;
extern crate uuid; extern crate uuid;
pub extern crate edn; pub extern crate edn;