diff --git a/db/Cargo.toml b/db/Cargo.toml index 60d1e735..634d9d1a 100644 --- a/db/Cargo.toml +++ b/db/Cargo.toml @@ -5,6 +5,7 @@ workspace = ".." [dependencies] error-chain = { git = "https://github.com/rnewman/error-chain", branch = "rnewman/sync" } +indexmap = "0.4" itertools = "0.7" lazy_static = "0.2" num = "0.1" diff --git a/db/src/lib.rs b/db/src/lib.rs index 19e9619b..d91c7568 100644 --- a/db/src/lib.rs +++ b/db/src/lib.rs @@ -13,6 +13,7 @@ #[macro_use] extern crate error_chain; +extern crate indexmap; extern crate itertools; #[macro_use] @@ -45,6 +46,7 @@ pub mod errors; pub mod internal_types; // pub because we need them for building entities programmatically. mod metadata; mod schema; +pub mod tx_observer; mod watcher; mod tx; pub mod types; diff --git a/db/src/tx_observer.rs b/db/src/tx_observer.rs new file mode 100644 index 00000000..240e9ae0 --- /dev/null +++ b/db/src/tx_observer.rs @@ -0,0 +1,171 @@ +// Copyright 2018 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +use std::sync::{ + Arc, +}; +use std::thread; + +use indexmap::{ + IndexMap, +}; + +use types::{ + AttributeSet, + TxReport, +}; + +pub struct TxObserver { + notify_fn: Arc) + Send + Sync>>>, + attributes: AttributeSet, +} + +impl TxObserver { + pub fn new(attributes: AttributeSet, notify_fn: F) -> TxObserver where F: Fn(String, Vec) + 'static + Send + Sync { + TxObserver { + notify_fn: Arc::new(Some(Box::new(notify_fn))), + attributes, + } + } + + pub fn applicable_reports(&self, reports: &Vec) -> Vec { + reports.into_iter().filter_map( |report| { + if self.attributes.intersection(&report.changeset).next().is_some(){ + Some(report.clone()) + } else { + None + } + }).collect() + } + + fn notify(&self, key: String, reports: Vec) { + if let Some(ref notify_fn) = *self.notify_fn { + (notify_fn)(key, reports); + } else { + eprintln!("no notify function specified for TxObserver"); + } + } +} + +pub trait CommandClone { + fn clone_box(&self) -> Box; +} + +impl CommandClone for T where T: 'static + Command + Clone + Send { + fn clone_box(&self) -> Box { + Box::new(self.clone()) + } +} + +pub trait Command: CommandClone { + fn execute(&self); +} + +impl Clone for Box { + fn clone(&self) -> Box { + self.clone_box() + } +} + +#[derive(Clone)] +pub struct NotifyTxObserver { + key: String, + reports: Vec, + observer: Arc, +} + +impl NotifyTxObserver { + pub fn new(key: String, reports: Vec, observer: Arc) -> Self { + NotifyTxObserver { + key, + reports, + observer, + } + } +} + +impl Command for NotifyTxObserver { + fn execute(&self) { + self.observer.notify(self.key.clone(), self.reports.clone()); + } +} + +#[derive(Clone)] +pub struct AsyncBatchExecutor { + commands: Vec>, +} + +impl Command for AsyncBatchExecutor { + fn execute(&self) { + let command_queue = self.commands.clone(); + thread::spawn (move ||{ + for command in command_queue.iter() { + command.execute(); + } + }); + } +} + +#[derive(Clone)] +pub struct TxObservationService { + observers: IndexMap>, + pub command_queue: Vec>, +} + +impl TxObservationService { + pub fn new() -> Self { + TxObservationService { + observers: IndexMap::new(), + command_queue: Vec::new(), + } + } + // For testing purposes + pub fn is_registered(&self, key: &String) -> bool { + self.observers.contains_key(key) + } + + pub fn register(&mut self, key: String, observer: Arc) { + self.observers.insert(key.clone(), observer); + } + + pub fn deregister(&mut self, key: &String) { + self.observers.remove(key); + } + + pub fn has_observers(&self) -> bool { + !self.observers.is_empty() + } + + fn command_from_reports(&self, key: &String, reports: &Vec, observer: &Arc) -> Option> { + let applicable_reports = observer.applicable_reports(reports); + if !applicable_reports.is_empty() { + Some(Box::new(NotifyTxObserver::new(key.clone(), applicable_reports, Arc::clone(observer)))) + } else { + None + } + } + + pub fn transaction_did_commit(&mut self, reports: Vec) { + // notify all observers about their relevant transactions + let commands: Vec> = self.observers + .iter() + .filter_map(|(key, observer)| { self.command_from_reports(&key, &reports, &observer) }) + .collect(); + self.command_queue.push(Box::new(AsyncBatchExecutor{ commands })); + } + + pub fn run(&mut self) { + for command in self.command_queue.iter() { + command.execute(); + } + + self.command_queue.clear(); + } +} diff --git a/db/src/types.rs b/db/src/types.rs index 4fd40bd5..5e19da73 100644 --- a/db/src/types.rs +++ b/db/src/types.rs @@ -11,19 +11,22 @@ #![allow(dead_code)] use std::collections::HashMap; -use std::collections::BTreeMap; +use std::collections::{ + BTreeMap, + BTreeSet, +}; extern crate mentat_core; pub use self::mentat_core::{ - DateTime, - Entid, - ValueType, - TypedValue, Attribute, AttributeBitFlags, + DateTime, + Entid, Schema, + TypedValue, Utc, + ValueType, }; /// Represents one partition of the entid space. @@ -82,6 +85,9 @@ pub type AVPair = (Entid, TypedValue); /// Used to resolve lookup-refs and upserts. pub type AVMap<'a> = HashMap<&'a AVPair, Entid>; +// represents a set of entids that are correspond to attributes +pub type AttributeSet = BTreeSet; + /// A transaction report summarizes an applied transaction. #[derive(Clone, Debug, Eq, Hash, Ord, PartialOrd, PartialEq)] pub struct TxReport { @@ -97,4 +103,7 @@ pub struct TxReport { /// existing entid, or is allocated a new entid. (It is possible for multiple distinct string /// literal tempids to all unify to a single freshly allocated entid.) pub tempids: BTreeMap, + + // A set of entids for attributes that were affected inside this transaction + pub changeset: AttributeSet, }