Compare commits
12 commits
master
...
rnewman/tr
Author | SHA1 | Date | |
---|---|---|---|
|
5b37001726 | ||
|
afe6444943 | ||
|
dfe433d370 | ||
|
352c16425d | ||
|
42329df63b | ||
|
421b7ad436 | ||
|
099bde4b13 | ||
|
1b9c338973 | ||
|
77c91c71df | ||
|
cf510e758f | ||
|
ba08807137 | ||
|
78d632cd31 |
10 changed files with 563 additions and 26 deletions
|
@ -29,6 +29,7 @@ rustc_version = "0.2"
|
||||||
chrono = "0.4"
|
chrono = "0.4"
|
||||||
error-chain = { git = "https://github.com/rnewman/error-chain", branch = "rnewman/sync" }
|
error-chain = { git = "https://github.com/rnewman/error-chain", branch = "rnewman/sync" }
|
||||||
lazy_static = "0.2"
|
lazy_static = "0.2"
|
||||||
|
smallvec = "0.6"
|
||||||
time = "0.1"
|
time = "0.1"
|
||||||
uuid = "0.5"
|
uuid = "0.5"
|
||||||
|
|
||||||
|
|
|
@ -5,10 +5,12 @@ workspace = ".."
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
error-chain = { git = "https://github.com/rnewman/error-chain", branch = "rnewman/sync" }
|
error-chain = { git = "https://github.com/rnewman/error-chain", branch = "rnewman/sync" }
|
||||||
|
indexmap = "0.4"
|
||||||
itertools = "0.7"
|
itertools = "0.7"
|
||||||
lazy_static = "0.2"
|
lazy_static = "0.2"
|
||||||
num = "0.1"
|
num = "0.1"
|
||||||
ordered-float = "0.5"
|
ordered-float = "0.5"
|
||||||
|
smallvec = "0.6"
|
||||||
time = "0.1"
|
time = "0.1"
|
||||||
|
|
||||||
[dependencies.rusqlite]
|
[dependencies.rusqlite]
|
||||||
|
|
|
@ -1402,7 +1402,7 @@ impl<'a> TransactWatcher for InProgressCacheTransactWatcher<'a> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn done(&mut self, schema: &Schema) -> Result<()> {
|
fn done(&mut self, _t: &Entid, schema: &Schema) -> Result<()> {
|
||||||
// Oh, I wish we had impl trait. Without it we have a six-line type signature if we
|
// Oh, I wish we had impl trait. Without it we have a six-line type signature if we
|
||||||
// try to break this out as a helper function.
|
// try to break this out as a helper function.
|
||||||
let collected_retractions = mem::replace(&mut self.collected_retractions, Default::default());
|
let collected_retractions = mem::replace(&mut self.collected_retractions, Default::default());
|
||||||
|
|
|
@ -13,6 +13,7 @@
|
||||||
|
|
||||||
#[macro_use]
|
#[macro_use]
|
||||||
extern crate error_chain;
|
extern crate error_chain;
|
||||||
|
extern crate indexmap;
|
||||||
extern crate itertools;
|
extern crate itertools;
|
||||||
|
|
||||||
#[macro_use]
|
#[macro_use]
|
||||||
|
@ -20,6 +21,7 @@ extern crate lazy_static;
|
||||||
|
|
||||||
extern crate num;
|
extern crate num;
|
||||||
extern crate rusqlite;
|
extern crate rusqlite;
|
||||||
|
extern crate smallvec;
|
||||||
extern crate tabwriter;
|
extern crate tabwriter;
|
||||||
extern crate time;
|
extern crate time;
|
||||||
|
|
||||||
|
@ -45,6 +47,7 @@ pub mod errors;
|
||||||
pub mod internal_types; // pub because we need them for building entities programmatically.
|
pub mod internal_types; // pub because we need them for building entities programmatically.
|
||||||
mod metadata;
|
mod metadata;
|
||||||
mod schema;
|
mod schema;
|
||||||
|
pub mod tx_observer;
|
||||||
mod watcher;
|
mod watcher;
|
||||||
mod tx;
|
mod tx;
|
||||||
pub mod types;
|
pub mod types;
|
||||||
|
@ -85,7 +88,14 @@ pub use tx::{
|
||||||
transact_terms,
|
transact_terms,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
pub use tx_observer::{
|
||||||
|
InProgressObserverTransactWatcher,
|
||||||
|
TxObservationService,
|
||||||
|
TxObserver,
|
||||||
|
};
|
||||||
|
|
||||||
pub use types::{
|
pub use types::{
|
||||||
|
AttributeSet,
|
||||||
DB,
|
DB,
|
||||||
PartitionMap,
|
PartitionMap,
|
||||||
TxReport,
|
TxReport,
|
||||||
|
|
19
db/src/tx.rs
19
db/src/tx.rs
|
@ -51,8 +51,9 @@ use std::collections::{
|
||||||
BTreeSet,
|
BTreeSet,
|
||||||
VecDeque,
|
VecDeque,
|
||||||
};
|
};
|
||||||
|
use std::rc::{
|
||||||
use std::rc::Rc;
|
Rc,
|
||||||
|
};
|
||||||
|
|
||||||
use db;
|
use db;
|
||||||
use db::{
|
use db::{
|
||||||
|
@ -106,6 +107,7 @@ use schema::{
|
||||||
};
|
};
|
||||||
use types::{
|
use types::{
|
||||||
Attribute,
|
Attribute,
|
||||||
|
AttributeSet,
|
||||||
AVPair,
|
AVPair,
|
||||||
AVMap,
|
AVMap,
|
||||||
Entid,
|
Entid,
|
||||||
|
@ -527,8 +529,7 @@ impl<'conn, 'a, W> Tx<'conn, 'a, W> where W: TransactWatcher {
|
||||||
/// This approach is explained in https://github.com/mozilla/mentat/wiki/Transacting.
|
/// This approach is explained in https://github.com/mozilla/mentat/wiki/Transacting.
|
||||||
// TODO: move this to the transactor layer.
|
// TODO: move this to the transactor layer.
|
||||||
pub fn transact_entities<I>(&mut self, entities: I) -> Result<TxReport>
|
pub fn transact_entities<I>(&mut self, entities: I) -> Result<TxReport>
|
||||||
where I: IntoIterator<Item=Entity>,
|
where I: IntoIterator<Item=Entity> {
|
||||||
W: TransactWatcher {
|
|
||||||
// Pipeline stage 1: entities -> terms with tempids and lookup refs.
|
// Pipeline stage 1: entities -> terms with tempids and lookup refs.
|
||||||
let (terms_with_temp_ids_and_lookup_refs, tempid_set, lookup_ref_set) = self.entities_into_terms_with_temp_ids_and_lookup_refs(entities)?;
|
let (terms_with_temp_ids_and_lookup_refs, tempid_set, lookup_ref_set) = self.entities_into_terms_with_temp_ids_and_lookup_refs(entities)?;
|
||||||
|
|
||||||
|
@ -542,8 +543,7 @@ impl<'conn, 'a, W> Tx<'conn, 'a, W> where W: TransactWatcher {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn transact_simple_terms<I>(&mut self, terms: I, tempid_set: InternSet<TempId>) -> Result<TxReport>
|
pub fn transact_simple_terms<I>(&mut self, terms: I, tempid_set: InternSet<TempId>) -> Result<TxReport>
|
||||||
where I: IntoIterator<Item=TermWithTempIds>,
|
where I: IntoIterator<Item=TermWithTempIds> {
|
||||||
W: TransactWatcher {
|
|
||||||
// TODO: push these into an internal transaction report?
|
// TODO: push these into an internal transaction report?
|
||||||
let mut tempids: BTreeMap<TempId, KnownEntid> = BTreeMap::default();
|
let mut tempids: BTreeMap<TempId, KnownEntid> = BTreeMap::default();
|
||||||
|
|
||||||
|
@ -614,7 +614,9 @@ impl<'conn, 'a, W> Tx<'conn, 'a, W> where W: TransactWatcher {
|
||||||
final_populations.allocated,
|
final_populations.allocated,
|
||||||
inert_terms.into_iter().map(|term| term.unwrap()).collect()].concat();
|
inert_terms.into_iter().map(|term| term.unwrap()).collect()].concat();
|
||||||
|
|
||||||
|
|
||||||
let tx_instant;
|
let tx_instant;
|
||||||
|
let mut affected_attrs = AttributeSet::new();
|
||||||
|
|
||||||
{ // TODO: Don't use this block to scope borrowing the schema; instead, extract a helper function.
|
{ // TODO: Don't use this block to scope borrowing the schema; instead, extract a helper function.
|
||||||
|
|
||||||
|
@ -669,6 +671,8 @@ impl<'conn, 'a, W> Tx<'conn, 'a, W> where W: TransactWatcher {
|
||||||
}
|
}
|
||||||
|
|
||||||
self.watcher.datom(op, e, a, &v);
|
self.watcher.datom(op, e, a, &v);
|
||||||
|
// TODO: Create something like a watcher to do this for us.
|
||||||
|
affected_attrs.insert(a);
|
||||||
|
|
||||||
let reduced = (e, a, attribute, v, added);
|
let reduced = (e, a, attribute, v, added);
|
||||||
match (attribute.fulltext, attribute.multival) {
|
match (attribute.fulltext, attribute.multival) {
|
||||||
|
@ -710,7 +714,7 @@ impl<'conn, 'a, W> Tx<'conn, 'a, W> where W: TransactWatcher {
|
||||||
}
|
}
|
||||||
|
|
||||||
db::update_partition_map(self.store, &self.partition_map)?;
|
db::update_partition_map(self.store, &self.partition_map)?;
|
||||||
self.watcher.done(self.schema)?;
|
self.watcher.done(&self.tx_id, self.schema)?;
|
||||||
|
|
||||||
if tx_might_update_metadata {
|
if tx_might_update_metadata {
|
||||||
// Extract changes to metadata from the store.
|
// Extract changes to metadata from the store.
|
||||||
|
@ -747,7 +751,6 @@ fn start_tx<'conn, 'a, W>(conn: &'conn rusqlite::Connection,
|
||||||
watcher: W) -> Result<Tx<'conn, 'a, W>>
|
watcher: W) -> Result<Tx<'conn, 'a, W>>
|
||||||
where W: TransactWatcher {
|
where W: TransactWatcher {
|
||||||
let tx_id = partition_map.allocate_entid(":db.part/tx");
|
let tx_id = partition_map.allocate_entid(":db.part/tx");
|
||||||
|
|
||||||
conn.begin_tx_application()?;
|
conn.begin_tx_application()?;
|
||||||
|
|
||||||
Ok(Tx::new(conn, partition_map, schema_for_mutation, schema, watcher, tx_id))
|
Ok(Tx::new(conn, partition_map, schema_for_mutation, schema, watcher, tx_id))
|
||||||
|
|
208
db/src/tx_observer.rs
Normal file
208
db/src/tx_observer.rs
Normal file
|
@ -0,0 +1,208 @@
|
||||||
|
// Copyright 2018 Mozilla
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
|
||||||
|
// this file except in compliance with the License. You may obtain a copy of the
|
||||||
|
// License at http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
// Unless required by applicable law or agreed to in writing, software distributed
|
||||||
|
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
|
||||||
|
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
|
||||||
|
// specific language governing permissions and limitations under the License.
|
||||||
|
|
||||||
|
use std::sync::{
|
||||||
|
Arc,
|
||||||
|
Weak,
|
||||||
|
};
|
||||||
|
|
||||||
|
use std::sync::mpsc::{
|
||||||
|
channel,
|
||||||
|
Receiver,
|
||||||
|
RecvError,
|
||||||
|
Sender,
|
||||||
|
};
|
||||||
|
|
||||||
|
use std::thread;
|
||||||
|
|
||||||
|
use indexmap::{
|
||||||
|
IndexMap,
|
||||||
|
};
|
||||||
|
|
||||||
|
use mentat_core::{
|
||||||
|
Entid,
|
||||||
|
Schema,
|
||||||
|
TypedValue,
|
||||||
|
};
|
||||||
|
|
||||||
|
use mentat_tx::entities::{
|
||||||
|
OpType,
|
||||||
|
};
|
||||||
|
|
||||||
|
use errors::{
|
||||||
|
Result,
|
||||||
|
};
|
||||||
|
|
||||||
|
use types::{
|
||||||
|
AttributeSet,
|
||||||
|
};
|
||||||
|
|
||||||
|
use watcher::TransactWatcher;
|
||||||
|
|
||||||
|
pub struct TxObserver {
|
||||||
|
notify_fn: Arc<Box<Fn(&str, IndexMap<&Entid, &AttributeSet>) + Send + Sync>>,
|
||||||
|
attributes: AttributeSet,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TxObserver {
|
||||||
|
pub fn new<F>(attributes: AttributeSet, notify_fn: F) -> TxObserver where F: Fn(&str, IndexMap<&Entid, &AttributeSet>) + 'static + Send + Sync {
|
||||||
|
TxObserver {
|
||||||
|
notify_fn: Arc::new(Box::new(notify_fn)),
|
||||||
|
attributes,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn applicable_reports<'r>(&self, reports: &'r IndexMap<Entid, AttributeSet>) -> IndexMap<&'r Entid, &'r AttributeSet> {
|
||||||
|
reports.into_iter()
|
||||||
|
.filter(|&(_txid, attrs)| !self.attributes.is_disjoint(attrs))
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn notify(&self, key: &str, reports: IndexMap<&Entid, &AttributeSet>) {
|
||||||
|
(*self.notify_fn)(key, reports);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub trait Command {
|
||||||
|
fn execute(&mut self);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct TxCommand {
|
||||||
|
reports: IndexMap<Entid, AttributeSet>,
|
||||||
|
observers: Weak<IndexMap<String, Arc<TxObserver>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TxCommand {
|
||||||
|
fn new(observers: &Arc<IndexMap<String, Arc<TxObserver>>>, reports: IndexMap<Entid, AttributeSet>) -> Self {
|
||||||
|
TxCommand {
|
||||||
|
reports,
|
||||||
|
observers: Arc::downgrade(observers),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Command for TxCommand {
|
||||||
|
fn execute(&mut self) {
|
||||||
|
self.observers.upgrade().map(|observers| {
|
||||||
|
for (key, observer) in observers.iter() {
|
||||||
|
let applicable_reports = observer.applicable_reports(&self.reports);
|
||||||
|
if !applicable_reports.is_empty() {
|
||||||
|
observer.notify(&key, applicable_reports);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct TxObservationService {
|
||||||
|
observers: Arc<IndexMap<String, Arc<TxObserver>>>,
|
||||||
|
executor: Option<Sender<Box<Command + Send>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TxObservationService {
|
||||||
|
pub fn new() -> Self {
|
||||||
|
TxObservationService {
|
||||||
|
observers: Arc::new(IndexMap::new()),
|
||||||
|
executor: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// For testing purposes
|
||||||
|
pub fn is_registered(&self, key: &String) -> bool {
|
||||||
|
self.observers.contains_key(key)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn register(&mut self, key: String, observer: Arc<TxObserver>) {
|
||||||
|
Arc::make_mut(&mut self.observers).insert(key, observer);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn deregister(&mut self, key: &String) {
|
||||||
|
Arc::make_mut(&mut self.observers).remove(key);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn has_observers(&self) -> bool {
|
||||||
|
!self.observers.is_empty()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn in_progress_did_commit(&mut self, txes: IndexMap<Entid, AttributeSet>) {
|
||||||
|
let executor = self.executor.get_or_insert_with(|| {
|
||||||
|
let (tx, rx): (Sender<Box<Command + Send>>, Receiver<Box<Command + Send>>) = channel();
|
||||||
|
let mut worker = CommandExecutor::new(rx);
|
||||||
|
|
||||||
|
thread::spawn(move || {
|
||||||
|
worker.main();
|
||||||
|
});
|
||||||
|
|
||||||
|
tx
|
||||||
|
});
|
||||||
|
|
||||||
|
let cmd = Box::new(TxCommand::new(&self.observers, txes));
|
||||||
|
executor.send(cmd).unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for TxObservationService {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
self.executor = None;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct InProgressObserverTransactWatcher {
|
||||||
|
collected_attributes: AttributeSet,
|
||||||
|
pub txes: IndexMap<Entid, AttributeSet>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl InProgressObserverTransactWatcher {
|
||||||
|
pub fn new() -> InProgressObserverTransactWatcher {
|
||||||
|
InProgressObserverTransactWatcher {
|
||||||
|
collected_attributes: Default::default(),
|
||||||
|
txes: Default::default(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TransactWatcher for InProgressObserverTransactWatcher {
|
||||||
|
fn datom(&mut self, _op: OpType, _e: Entid, a: Entid, _v: &TypedValue) {
|
||||||
|
self.collected_attributes.insert(a);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn done(&mut self, t: &Entid, _schema: &Schema) -> Result<()> {
|
||||||
|
let collected_attributes = ::std::mem::replace(&mut self.collected_attributes, Default::default());
|
||||||
|
self.txes.insert(*t, collected_attributes);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct CommandExecutor {
|
||||||
|
receiver: Receiver<Box<Command + Send>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl CommandExecutor {
|
||||||
|
fn new(rx: Receiver<Box<Command + Send>>) -> Self {
|
||||||
|
CommandExecutor {
|
||||||
|
receiver: rx,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn main(&mut self) {
|
||||||
|
loop {
|
||||||
|
match self.receiver.recv() {
|
||||||
|
Err(RecvError) => {
|
||||||
|
eprintln!("Disconnected, terminating CommandExecutor");
|
||||||
|
return
|
||||||
|
},
|
||||||
|
|
||||||
|
Ok(mut cmd) => {
|
||||||
|
cmd.execute()
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -11,19 +11,24 @@
|
||||||
#![allow(dead_code)]
|
#![allow(dead_code)]
|
||||||
|
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::collections::BTreeMap;
|
use std::collections::{
|
||||||
|
BTreeMap,
|
||||||
|
BTreeSet,
|
||||||
|
};
|
||||||
|
|
||||||
|
use smallvec::SmallVec;
|
||||||
|
|
||||||
extern crate mentat_core;
|
extern crate mentat_core;
|
||||||
|
|
||||||
pub use self::mentat_core::{
|
pub use self::mentat_core::{
|
||||||
DateTime,
|
|
||||||
Entid,
|
|
||||||
ValueType,
|
|
||||||
TypedValue,
|
|
||||||
Attribute,
|
Attribute,
|
||||||
AttributeBitFlags,
|
AttributeBitFlags,
|
||||||
|
DateTime,
|
||||||
|
Entid,
|
||||||
Schema,
|
Schema,
|
||||||
|
TypedValue,
|
||||||
Utc,
|
Utc,
|
||||||
|
ValueType,
|
||||||
};
|
};
|
||||||
|
|
||||||
/// Represents one partition of the entid space.
|
/// Represents one partition of the entid space.
|
||||||
|
@ -82,6 +87,11 @@ pub type AVPair = (Entid, TypedValue);
|
||||||
/// Used to resolve lookup-refs and upserts.
|
/// Used to resolve lookup-refs and upserts.
|
||||||
pub type AVMap<'a> = HashMap<&'a AVPair, Entid>;
|
pub type AVMap<'a> = HashMap<&'a AVPair, Entid>;
|
||||||
|
|
||||||
|
// represents a set of entids that are correspond to attributes
|
||||||
|
pub type AttributeSet = BTreeSet<Entid>;
|
||||||
|
|
||||||
|
pub type AccumulatedTxids = SmallVec<[Entid; 4]>;
|
||||||
|
|
||||||
/// A transaction report summarizes an applied transaction.
|
/// A transaction report summarizes an applied transaction.
|
||||||
#[derive(Clone, Debug, Eq, Hash, Ord, PartialOrd, PartialEq)]
|
#[derive(Clone, Debug, Eq, Hash, Ord, PartialOrd, PartialEq)]
|
||||||
pub struct TxReport {
|
pub struct TxReport {
|
||||||
|
|
|
@ -38,7 +38,7 @@ pub trait TransactWatcher {
|
||||||
/// Called with the schema _prior to_ the transact -- any attributes or
|
/// Called with the schema _prior to_ the transact -- any attributes or
|
||||||
/// attribute changes transacted during this transact are not reflected in
|
/// attribute changes transacted during this transact are not reflected in
|
||||||
/// the schema.
|
/// the schema.
|
||||||
fn done(&mut self, schema: &Schema) -> Result<()>;
|
fn done(&mut self, t: &Entid, schema: &Schema) -> Result<()>;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct NullWatcher();
|
pub struct NullWatcher();
|
||||||
|
@ -47,7 +47,7 @@ impl TransactWatcher for NullWatcher {
|
||||||
fn datom(&mut self, _op: OpType, _e: Entid, _a: Entid, _v: &TypedValue) {
|
fn datom(&mut self, _op: OpType, _e: Entid, _a: Entid, _v: &TypedValue) {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn done(&mut self, _schema: &Schema) -> Result<()> {
|
fn done(&mut self, _t: &Entid, _schema: &Schema) -> Result<()> {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
321
src/conn.rs
321
src/conn.rs
|
@ -48,6 +48,7 @@ use mentat_core::{
|
||||||
use mentat_core::intern_set::InternSet;
|
use mentat_core::intern_set::InternSet;
|
||||||
|
|
||||||
use mentat_db::cache::{
|
use mentat_db::cache::{
|
||||||
|
InProgressCacheTransactWatcher,
|
||||||
InProgressSQLiteAttributeCache,
|
InProgressSQLiteAttributeCache,
|
||||||
SQLiteAttributeCache,
|
SQLiteAttributeCache,
|
||||||
};
|
};
|
||||||
|
@ -56,15 +57,26 @@ use mentat_db::db;
|
||||||
use mentat_db::{
|
use mentat_db::{
|
||||||
transact,
|
transact,
|
||||||
transact_terms,
|
transact_terms,
|
||||||
|
InProgressObserverTransactWatcher,
|
||||||
PartitionMap,
|
PartitionMap,
|
||||||
|
TransactWatcher,
|
||||||
|
TxObservationService,
|
||||||
|
TxObserver,
|
||||||
TxReport,
|
TxReport,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
use mentat_db::types::{
|
||||||
|
AccumulatedTxids,
|
||||||
|
};
|
||||||
|
|
||||||
use mentat_db::internal_types::TermWithTempIds;
|
use mentat_db::internal_types::TermWithTempIds;
|
||||||
|
|
||||||
use mentat_tx;
|
use mentat_tx;
|
||||||
|
|
||||||
use mentat_tx::entities::TempId;
|
use mentat_tx::entities::{
|
||||||
|
TempId,
|
||||||
|
OpType,
|
||||||
|
};
|
||||||
|
|
||||||
use mentat_tx_parser;
|
use mentat_tx_parser;
|
||||||
|
|
||||||
|
@ -140,6 +152,7 @@ pub struct Conn {
|
||||||
|
|
||||||
// TODO: maintain cache of query plans that could be shared across threads and invalidated when
|
// TODO: maintain cache of query plans that could be shared across threads and invalidated when
|
||||||
// the schema changes. #315.
|
// the schema changes. #315.
|
||||||
|
tx_observer_service: Mutex<TxObservationService>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A convenience wrapper around a single SQLite connection and a Conn. This is suitable
|
/// A convenience wrapper around a single SQLite connection and a Conn. This is suitable
|
||||||
|
@ -202,10 +215,11 @@ pub struct InProgress<'a, 'c> {
|
||||||
generation: u64,
|
generation: u64,
|
||||||
partition_map: PartitionMap,
|
partition_map: PartitionMap,
|
||||||
schema: Schema,
|
schema: Schema,
|
||||||
|
|
||||||
cache: InProgressSQLiteAttributeCache,
|
cache: InProgressSQLiteAttributeCache,
|
||||||
|
|
||||||
use_caching: bool,
|
use_caching: bool,
|
||||||
|
tx_ids: AccumulatedTxids,
|
||||||
|
tx_observer: &'a Mutex<TxObservationService>,
|
||||||
|
tx_observer_watcher: InProgressObserverTransactWatcher,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Represents an in-progress set of reads to the store. Just like `InProgress`,
|
/// Represents an in-progress set of reads to the store. Just like `InProgress`,
|
||||||
|
@ -366,12 +380,15 @@ impl<'a, 'c> InProgress<'a, 'c> {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn transact_terms<I>(&mut self, terms: I, tempid_set: InternSet<TempId>) -> Result<TxReport> where I: IntoIterator<Item=TermWithTempIds> {
|
pub fn transact_terms<I>(&mut self, terms: I, tempid_set: InternSet<TempId>) -> Result<TxReport> where I: IntoIterator<Item=TermWithTempIds> {
|
||||||
|
let w = InProgressTransactWatcher::new(
|
||||||
|
&mut self.tx_observer_watcher,
|
||||||
|
self.cache.transact_watcher());
|
||||||
let (report, next_partition_map, next_schema, _watcher) =
|
let (report, next_partition_map, next_schema, _watcher) =
|
||||||
transact_terms(&self.transaction,
|
transact_terms(&self.transaction,
|
||||||
self.partition_map.clone(),
|
self.partition_map.clone(),
|
||||||
&self.schema,
|
&self.schema,
|
||||||
&self.schema,
|
&self.schema,
|
||||||
self.cache.transact_watcher(),
|
w,
|
||||||
terms,
|
terms,
|
||||||
tempid_set)?;
|
tempid_set)?;
|
||||||
self.partition_map = next_partition_map;
|
self.partition_map = next_partition_map;
|
||||||
|
@ -390,12 +407,16 @@ impl<'a, 'c> InProgress<'a, 'c> {
|
||||||
// `Metadata` on return. If we used `Cell` or other mechanisms, we'd be using
|
// `Metadata` on return. If we used `Cell` or other mechanisms, we'd be using
|
||||||
// `Default::default` in those situations to extract the partition map, and so there
|
// `Default::default` in those situations to extract the partition map, and so there
|
||||||
// would still be some cost.
|
// would still be some cost.
|
||||||
|
let w = InProgressTransactWatcher::new(
|
||||||
|
&mut self.tx_observer_watcher,
|
||||||
|
self.cache.transact_watcher());
|
||||||
let (report, next_partition_map, next_schema, _watcher) =
|
let (report, next_partition_map, next_schema, _watcher) =
|
||||||
transact(&self.transaction,
|
transact(&self.transaction,
|
||||||
self.partition_map.clone(),
|
self.partition_map.clone(),
|
||||||
&self.schema,
|
&self.schema,
|
||||||
&self.schema,
|
&
|
||||||
self.cache.transact_watcher(),
|
self.schema,
|
||||||
|
w,
|
||||||
entities)?;
|
entities)?;
|
||||||
self.partition_map = next_partition_map;
|
self.partition_map = next_partition_map;
|
||||||
if let Some(schema) = next_schema {
|
if let Some(schema) = next_schema {
|
||||||
|
@ -451,6 +472,9 @@ impl<'a, 'c> InProgress<'a, 'c> {
|
||||||
// TODO: consider making vocabulary lookup lazy -- we won't need it much of the time.
|
// TODO: consider making vocabulary lookup lazy -- we won't need it much of the time.
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let txes = self.tx_observer_watcher.txes;
|
||||||
|
self.tx_observer.lock().unwrap().in_progress_did_commit(txes);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -478,6 +502,36 @@ impl<'a, 'c> InProgress<'a, 'c> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct InProgressTransactWatcher<'a, 'o> {
|
||||||
|
cache_watcher: InProgressCacheTransactWatcher<'a>,
|
||||||
|
observer_watcher: &'o mut InProgressObserverTransactWatcher,
|
||||||
|
tx_id: Option<Entid>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, 'o> InProgressTransactWatcher<'a, 'o> {
|
||||||
|
fn new(observer_watcher: &'o mut InProgressObserverTransactWatcher, cache_watcher: InProgressCacheTransactWatcher<'a>) -> Self {
|
||||||
|
InProgressTransactWatcher {
|
||||||
|
cache_watcher: cache_watcher,
|
||||||
|
observer_watcher: observer_watcher,
|
||||||
|
tx_id: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, 'o> TransactWatcher for InProgressTransactWatcher<'a, 'o> {
|
||||||
|
fn datom(&mut self, op: OpType, e: Entid, a: Entid, v: &TypedValue) {
|
||||||
|
self.cache_watcher.datom(op.clone(), e.clone(), a.clone(), v);
|
||||||
|
self.observer_watcher.datom(op.clone(), e.clone(), a.clone(), v);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn done(&mut self, t: &Entid, schema: &Schema) -> ::mentat_db::errors::Result<()> {
|
||||||
|
self.cache_watcher.done(t, schema)?;
|
||||||
|
self.observer_watcher.done(t, schema)?;
|
||||||
|
self.tx_id = Some(t.clone());
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl Store {
|
impl Store {
|
||||||
/// Intended for use from tests.
|
/// Intended for use from tests.
|
||||||
pub fn sqlite_mut(&mut self) -> &mut rusqlite::Connection {
|
pub fn sqlite_mut(&mut self) -> &mut rusqlite::Connection {
|
||||||
|
@ -564,9 +618,15 @@ impl Conn {
|
||||||
fn new(partition_map: PartitionMap, schema: Schema) -> Conn {
|
fn new(partition_map: PartitionMap, schema: Schema) -> Conn {
|
||||||
Conn {
|
Conn {
|
||||||
metadata: Mutex::new(Metadata::new(0, partition_map, Arc::new(schema), Default::default())),
|
metadata: Mutex::new(Metadata::new(0, partition_map, Arc::new(schema), Default::default())),
|
||||||
|
tx_observer_service: Mutex::new(TxObservationService::new()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
pub fn is_registered_as_observer(&self, key: &String) -> bool {
|
||||||
|
self.tx_observer_service.lock().unwrap().is_registered(key)
|
||||||
|
}
|
||||||
|
|
||||||
/// Prepare the provided SQLite handle for use as a Mentat store. Creates tables but
|
/// Prepare the provided SQLite handle for use as a Mentat store. Creates tables but
|
||||||
/// _does not_ write the bootstrap schema. This constructor should only be used by
|
/// _does not_ write the bootstrap schema. This constructor should only be used by
|
||||||
/// consumers that expect to populate raw transaction data themselves.
|
/// consumers that expect to populate raw transaction data themselves.
|
||||||
|
@ -705,6 +765,9 @@ impl Conn {
|
||||||
schema: (*current_schema).clone(),
|
schema: (*current_schema).clone(),
|
||||||
cache: InProgressSQLiteAttributeCache::from_cache(cache_cow),
|
cache: InProgressSQLiteAttributeCache::from_cache(cache_cow),
|
||||||
use_caching: true,
|
use_caching: true,
|
||||||
|
tx_ids: Default::default(),
|
||||||
|
tx_observer: &self.tx_observer_service,
|
||||||
|
tx_observer_watcher: InProgressObserverTransactWatcher::new(),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -786,6 +849,14 @@ impl Conn {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn register_observer(&mut self, key: String, observer: Arc<TxObserver>) {
|
||||||
|
self.tx_observer_service.lock().unwrap().register(key, observer);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn unregister_observer(&mut self, key: &String) {
|
||||||
|
self.tx_observer_service.lock().unwrap().deregister(key);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
@ -798,19 +869,25 @@ mod tests {
|
||||||
use std::collections::{
|
use std::collections::{
|
||||||
BTreeSet,
|
BTreeSet,
|
||||||
};
|
};
|
||||||
|
|
||||||
use std::path::{
|
use std::path::{
|
||||||
PathBuf,
|
PathBuf,
|
||||||
};
|
};
|
||||||
|
use std::sync::mpsc;
|
||||||
use std::time::Instant;
|
use std::time::{
|
||||||
|
Duration,
|
||||||
|
Instant,
|
||||||
|
};
|
||||||
|
|
||||||
use mentat_core::{
|
use mentat_core::{
|
||||||
CachedAttributes,
|
CachedAttributes,
|
||||||
TypedValue,
|
TypedValue,
|
||||||
};
|
};
|
||||||
|
|
||||||
use query::{
|
use ::entity_builder::{
|
||||||
|
BuildTerms,
|
||||||
|
};
|
||||||
|
|
||||||
|
use ::query::{
|
||||||
PreparedQuery,
|
PreparedQuery,
|
||||||
Variable,
|
Variable,
|
||||||
};
|
};
|
||||||
|
@ -821,6 +898,16 @@ mod tests {
|
||||||
QueryResults,
|
QueryResults,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
use ::vocabulary::{
|
||||||
|
AttributeBuilder,
|
||||||
|
Definition,
|
||||||
|
VersionedStore,
|
||||||
|
};
|
||||||
|
|
||||||
|
use ::vocabulary::attribute::{
|
||||||
|
Unique,
|
||||||
|
};
|
||||||
|
|
||||||
use mentat_db::USER0;
|
use mentat_db::USER0;
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
@ -1381,4 +1468,218 @@ mod tests {
|
||||||
assert_eq!(only_two, vec![1, 3].into_iter().map(TypedValue::Long).collect());
|
assert_eq!(only_two, vec![1, 3].into_iter().map(TypedValue::Long).collect());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn test_register_observer() {
|
||||||
|
let mut sqlite = db::new_connection("").unwrap();
|
||||||
|
let mut conn = Conn::connect(&mut sqlite).unwrap();
|
||||||
|
|
||||||
|
let key = "Test Observer".to_string();
|
||||||
|
let tx_observer = TxObserver::new(BTreeSet::new(), move |_obs_key, _batch| {});
|
||||||
|
|
||||||
|
conn.register_observer(key.clone(), Arc::new(tx_observer));
|
||||||
|
assert!(conn.is_registered_as_observer(&key));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_deregister_observer() {
|
||||||
|
let mut sqlite = db::new_connection("").unwrap();
|
||||||
|
let mut conn = Conn::connect(&mut sqlite).unwrap();
|
||||||
|
|
||||||
|
let key = "Test Observer".to_string();
|
||||||
|
|
||||||
|
let tx_observer = TxObserver::new(BTreeSet::new(), move |_obs_key, _batch| {});
|
||||||
|
|
||||||
|
conn.register_observer(key.clone(), Arc::new(tx_observer));
|
||||||
|
assert!(conn.is_registered_as_observer(&key));
|
||||||
|
|
||||||
|
conn.unregister_observer(&key);
|
||||||
|
|
||||||
|
assert!(!conn.is_registered_as_observer(&key));
|
||||||
|
}
|
||||||
|
|
||||||
|
fn add_schema(conn: &mut Conn, mut sqlite: &mut rusqlite::Connection) {
|
||||||
|
// transact some schema
|
||||||
|
let mut in_progress = conn.begin_transaction(&mut sqlite).expect("expected in progress");
|
||||||
|
in_progress.ensure_vocabulary(&Definition {
|
||||||
|
name: kw!(:todo/items),
|
||||||
|
version: 1,
|
||||||
|
attributes: vec![
|
||||||
|
(kw!(:todo/uuid),
|
||||||
|
AttributeBuilder::new()
|
||||||
|
.value_type(ValueType::Uuid)
|
||||||
|
.multival(false)
|
||||||
|
.unique(Unique::Value)
|
||||||
|
.index(true)
|
||||||
|
.build()),
|
||||||
|
(kw!(:todo/name),
|
||||||
|
AttributeBuilder::new()
|
||||||
|
.value_type(ValueType::String)
|
||||||
|
.multival(false)
|
||||||
|
.fulltext(true)
|
||||||
|
.build()),
|
||||||
|
(kw!(:todo/completion_date),
|
||||||
|
AttributeBuilder::new()
|
||||||
|
.value_type(ValueType::Instant)
|
||||||
|
.multival(false)
|
||||||
|
.build()),
|
||||||
|
(kw!(:label/name),
|
||||||
|
AttributeBuilder::new()
|
||||||
|
.value_type(ValueType::String)
|
||||||
|
.multival(false)
|
||||||
|
.unique(Unique::Value)
|
||||||
|
.fulltext(true)
|
||||||
|
.index(true)
|
||||||
|
.build()),
|
||||||
|
(kw!(:label/color),
|
||||||
|
AttributeBuilder::new()
|
||||||
|
.value_type(ValueType::String)
|
||||||
|
.multival(false)
|
||||||
|
.build()),
|
||||||
|
],
|
||||||
|
}).expect("expected vocubulary");
|
||||||
|
in_progress.commit().expect("Expected vocabulary committed");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Default, Debug)]
|
||||||
|
struct ObserverOutput {
|
||||||
|
txids: Vec<i64>,
|
||||||
|
changes: Vec<BTreeSet<i64>>,
|
||||||
|
called_key: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_observer_notified_on_registered_change() {
|
||||||
|
let mut sqlite = db::new_connection("").unwrap();
|
||||||
|
let mut conn = Conn::connect(&mut sqlite).unwrap();
|
||||||
|
add_schema(&mut conn, &mut sqlite);
|
||||||
|
|
||||||
|
let name_entid: Entid = conn.current_schema().get_entid(&kw!(:todo/name)).expect("entid to exist for name").into();
|
||||||
|
let date_entid: Entid = conn.current_schema().get_entid(&kw!(:todo/completion_date)).expect("entid to exist for completion_date").into();
|
||||||
|
let mut registered_attrs = BTreeSet::new();
|
||||||
|
registered_attrs.insert(name_entid.clone());
|
||||||
|
registered_attrs.insert(date_entid.clone());
|
||||||
|
|
||||||
|
let key = "Test Observing".to_string();
|
||||||
|
|
||||||
|
let output = Arc::new(Mutex::new(ObserverOutput::default()));
|
||||||
|
|
||||||
|
let mut_output = Arc::downgrade(&output);
|
||||||
|
let (tx, rx): (mpsc::Sender<()>, mpsc::Receiver<()>) = mpsc::channel();
|
||||||
|
// because the TxObserver is in an Arc and is therefore Sync, we have to wrap the Sender in a Mutex to also
|
||||||
|
// make it Sync.
|
||||||
|
let thread_tx = Mutex::new(tx);
|
||||||
|
let tx_observer = Arc::new(TxObserver::new(registered_attrs, move |obs_key, batch| {
|
||||||
|
if let Some(out) = mut_output.upgrade() {
|
||||||
|
let mut o = out.lock().unwrap();
|
||||||
|
o.called_key = Some(obs_key.to_string());
|
||||||
|
for (tx_id, changes) in batch.into_iter() {
|
||||||
|
o.txids.push(*tx_id);
|
||||||
|
o.changes.push(changes.clone());
|
||||||
|
}
|
||||||
|
o.txids.sort();
|
||||||
|
}
|
||||||
|
thread_tx.lock().unwrap().send(()).unwrap();
|
||||||
|
}));
|
||||||
|
|
||||||
|
conn.register_observer(key.clone(), Arc::clone(&tx_observer));
|
||||||
|
assert!(conn.is_registered_as_observer(&key));
|
||||||
|
|
||||||
|
let mut tx_ids = Vec::new();
|
||||||
|
let mut changesets = Vec::new();
|
||||||
|
let uuid_entid: Entid = conn.current_schema().get_entid(&kw!(:todo/uuid)).expect("entid to exist for name").into();
|
||||||
|
{
|
||||||
|
let mut in_progress = conn.begin_transaction(&mut sqlite).expect("expected transaction");
|
||||||
|
for i in 0..3 {
|
||||||
|
let mut changeset = BTreeSet::new();
|
||||||
|
let name = format!("todo{}", i);
|
||||||
|
let uuid = Uuid::new_v4();
|
||||||
|
let mut builder = in_progress.builder().describe_tempid(&name);
|
||||||
|
builder.add_kw(&kw!(:todo/uuid), TypedValue::Uuid(uuid)).expect("Expected added uuid");
|
||||||
|
changeset.insert(uuid_entid.clone());
|
||||||
|
builder.add_kw(&kw!(:todo/name), TypedValue::typed_string(&name)).expect("Expected added name");
|
||||||
|
changeset.insert(name_entid.clone());
|
||||||
|
if i % 2 == 0 {
|
||||||
|
builder.add_kw(&kw!(:todo/completion_date), TypedValue::current_instant()).expect("Expected added date");
|
||||||
|
changeset.insert(date_entid.clone());
|
||||||
|
}
|
||||||
|
let (ip, r) = builder.transact();
|
||||||
|
let report = r.expect("expected a report");
|
||||||
|
tx_ids.push(report.tx_id.clone());
|
||||||
|
changesets.push(changeset);
|
||||||
|
in_progress = ip;
|
||||||
|
}
|
||||||
|
let mut builder = in_progress.builder().describe_tempid("Label");
|
||||||
|
builder.add_kw(&kw!(:label/name), TypedValue::typed_string("Label 1")).expect("Expected added name");
|
||||||
|
builder.add_kw(&kw!(:label/color), TypedValue::typed_string("blue")).expect("Expected added color");
|
||||||
|
builder.commit().expect("expect transaction to occur");
|
||||||
|
}
|
||||||
|
|
||||||
|
let delay = Duration::from_millis(100);
|
||||||
|
let _ = rx.recv_timeout(delay);
|
||||||
|
|
||||||
|
let out = Arc::try_unwrap(output).expect("unwrapped");
|
||||||
|
let o = out.into_inner().expect("Expected an Output");
|
||||||
|
assert_eq!(o.called_key, Some(key.clone()));
|
||||||
|
assert_eq!(o.txids, tx_ids);
|
||||||
|
assert_eq!(o.changes, changesets);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_observer_not_notified_on_unregistered_change() {
|
||||||
|
let mut sqlite = db::new_connection("").unwrap();
|
||||||
|
let mut conn = Conn::connect(&mut sqlite).unwrap();
|
||||||
|
add_schema(&mut conn, &mut sqlite);
|
||||||
|
|
||||||
|
let name_entid: Entid = conn.current_schema().get_entid(&kw!(:todo/name)).expect("entid to exist for name").into();
|
||||||
|
let date_entid: Entid = conn.current_schema().get_entid(&kw!(:todo/completion_date)).expect("entid to exist for completion_date").into();
|
||||||
|
let mut registered_attrs = BTreeSet::new();
|
||||||
|
registered_attrs.insert(name_entid.clone());
|
||||||
|
registered_attrs.insert(date_entid.clone());
|
||||||
|
|
||||||
|
let key = "Test Observing".to_string();
|
||||||
|
|
||||||
|
let output = Arc::new(Mutex::new(ObserverOutput::default()));
|
||||||
|
|
||||||
|
let mut_output = Arc::downgrade(&output);
|
||||||
|
let (tx, rx): (mpsc::Sender<()>, mpsc::Receiver<()>) = mpsc::channel();
|
||||||
|
let thread_tx = Mutex::new(tx);
|
||||||
|
let tx_observer = Arc::new(TxObserver::new(registered_attrs, move |obs_key, batch| {
|
||||||
|
if let Some(out) = mut_output.upgrade() {
|
||||||
|
let mut o = out.lock().unwrap();
|
||||||
|
o.called_key = Some(obs_key.to_string());
|
||||||
|
for (tx_id, changes) in batch.into_iter() {
|
||||||
|
o.txids.push(*tx_id);
|
||||||
|
o.changes.push(changes.clone());
|
||||||
|
}
|
||||||
|
o.txids.sort();
|
||||||
|
}
|
||||||
|
thread_tx.lock().unwrap().send(()).unwrap();
|
||||||
|
}));
|
||||||
|
|
||||||
|
conn.register_observer(key.clone(), Arc::clone(&tx_observer));
|
||||||
|
assert!(conn.is_registered_as_observer(&key));
|
||||||
|
|
||||||
|
let tx_ids = Vec::<Entid>::new();
|
||||||
|
let changesets = Vec::<BTreeSet<Entid>>::new();
|
||||||
|
{
|
||||||
|
let mut in_progress = conn.begin_transaction(&mut sqlite).expect("expected transaction");
|
||||||
|
for i in 0..3 {
|
||||||
|
let name = format!("label{}", i);
|
||||||
|
let mut builder = in_progress.builder().describe_tempid(&name);
|
||||||
|
builder.add_kw(&kw!(:label/name), TypedValue::typed_string(&name)).expect("Expected added name");
|
||||||
|
builder.add_kw(&kw!(:label/color), TypedValue::typed_string("blue")).expect("Expected added color");
|
||||||
|
let (ip, _) = builder.transact();
|
||||||
|
in_progress = ip;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let delay = Duration::from_millis(100);
|
||||||
|
let _ = rx.recv_timeout(delay);
|
||||||
|
|
||||||
|
let out = Arc::try_unwrap(output).expect("unwrapped");
|
||||||
|
let o = out.into_inner().expect("Expected an Output");
|
||||||
|
assert_eq!(o.called_key, None);
|
||||||
|
assert_eq!(o.txids, tx_ids);
|
||||||
|
assert_eq!(o.changes, changesets);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,6 +18,8 @@ extern crate lazy_static;
|
||||||
|
|
||||||
extern crate rusqlite;
|
extern crate rusqlite;
|
||||||
|
|
||||||
|
extern crate smallvec;
|
||||||
|
|
||||||
extern crate uuid;
|
extern crate uuid;
|
||||||
|
|
||||||
pub extern crate edn;
|
pub extern crate edn;
|
||||||
|
|
Loading…
Reference in a new issue