From 58716ae22e6330c044f842d5a9974b1e26fcc73e Mon Sep 17 00:00:00 2001 From: Emily Toop Date: Tue, 13 Feb 2018 10:07:48 +0000 Subject: [PATCH] Transaction observation - Creation of transaction observer service that takes `TxObserver`s and registers them against keys and for sets of attributes. - Observer service called when InProgress commits and filters observers that are affected by the tx's that occured and notifies them of what changed - InProgress batches up tx's as it goes along so granular notification can be provided. --- Cargo.toml | 2 +- db/src/lib.rs | 3 + db/src/tx.rs | 119 +++++++++++++- db/src/types.rs | 19 ++- db/tests/tx_observer_tests.rs | 265 ++++++++++++++++++++++++++++++ ffi/Cargo.toml | 7 + ffi/src/android.rs | 11 ++ ffi/src/lib.rs | 150 +++++++++++++++++ ffi/src/utils.rs | 56 +++++++ src/conn.rs | 298 +++++++++++++++++++++++++++++++++- src/lib.rs | 1 + tolstoy/src/syncer.rs | 65 +++++--- tolstoy/src/tx_processor.rs | 45 ++++- 13 files changed, 1001 insertions(+), 40 deletions(-) create mode 100644 db/tests/tx_observer_tests.rs create mode 100644 ffi/Cargo.toml create mode 100644 ffi/src/android.rs create mode 100644 ffi/src/lib.rs create mode 100644 ffi/src/utils.rs diff --git a/Cargo.toml b/Cargo.toml index d7db3d90..78cf730b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,7 +21,7 @@ bundled_sqlite3 = ["rusqlite/bundled"] syncable = ["mentat_tolstoy"] [workspace] -members = ["tools/cli"] +members = ["tools/cli", "ffi"] [build-dependencies] rustc_version = "0.1.7" diff --git a/db/src/lib.rs b/db/src/lib.rs index d7270591..74094b3b 100644 --- a/db/src/lib.rs +++ b/db/src/lib.rs @@ -75,9 +75,12 @@ pub use db::{ pub use tx::{ transact, transact_terms, + TxObservationService, + TxObserver, }; pub use types::{ + AttributeSet, DB, PartitionMap, TxReport, diff --git a/db/src/tx.rs b/db/src/tx.rs index c364d0c8..abb50cac 100644 --- a/db/src/tx.rs +++ b/db/src/tx.rs @@ -51,7 +51,9 @@ use std::collections::{ BTreeSet, VecDeque, }; -use std::rc::Rc; +use std::rc::{ + Rc, +}; use db; use db::{ @@ -105,6 +107,7 @@ use schema::{ }; use types::{ Attribute, + AttributeSet, AVPair, AVMap, Entid, @@ -115,6 +118,30 @@ use types::{ }; use upsert_resolution::Generation; +use std::os::raw::c_char; +use std::os::raw::c_int; +use std::ffi::CString; + +pub const ANDROID_LOG_DEBUG: i32 = 3; +#[cfg(all(target_os="android", not(test)))] +extern { pub fn __android_log_write(prio: c_int, tag: *const c_char, text: *const c_char) -> c_int; } + +#[cfg(all(target_os="android", not(test)))] +pub fn d(message: &str) { + let tag = "mentat_db::tx"; + let message = CString::new(message).unwrap(); + let message = message.as_ptr(); + let tag = CString::new(tag).unwrap(); + let tag = tag.as_ptr(); + unsafe { __android_log_write(ANDROID_LOG_DEBUG, tag, message) }; +} + +#[cfg(all(not(target_os="android")))] +pub fn d(message: &str) { + let tag = "mentat_db::tx"; + println!("d: {}: {}", tag, message); +} + /// A transaction on its way to being applied. #[derive(Debug)] pub struct Tx<'conn, 'a> { @@ -600,6 +627,11 @@ impl<'conn, 'a> Tx<'conn, 'a> { final_populations.allocated, inert_terms.into_iter().map(|term| term.unwrap()).collect()].concat(); + let affected_attrs: AttributeSet = final_terms.iter().filter_map(|t| { + match t { + &Term::AddOrRetract(_, _, attrid, _) => Some(attrid), + } + }). collect(); let tx_instant; { // TODO: Don't use this block to scope borrowing the schema; instead, extract a helper function. @@ -689,7 +721,6 @@ impl<'conn, 'a> Tx<'conn, 'a> { if !fts_many.is_empty() { self.store.insert_fts_searches(&fts_many[..], db::SearchType::Exact)?; } - self.store.commit_transaction(self.tx_id)?; } @@ -718,6 +749,7 @@ impl<'conn, 'a> Tx<'conn, 'a> { tx_id: self.tx_id, tx_instant, tempids: tempids, + changeset: affected_attrs, }) } } @@ -749,7 +781,7 @@ fn conclude_tx(tx: Tx, report: TxReport) -> Result<(TxReport, PartitionMap, Opti /// /// This approach is explained in https://github.com/mozilla/mentat/wiki/Transacting. // TODO: move this to the transactor layer. -pub fn transact<'conn, 'a, I>(conn: &'conn rusqlite::Connection, +pub fn transact<'conn, 'a, 'id, I>(conn: &'conn rusqlite::Connection, partition_map: PartitionMap, schema_for_mutation: &'a Schema, schema: &'a Schema, @@ -762,15 +794,92 @@ pub fn transact<'conn, 'a, I>(conn: &'conn rusqlite::Connection, } /// Just like `transact`, but accepts lower-level inputs to allow bypassing the parser interface. -pub fn transact_terms<'conn, 'a, I>(conn: &'conn rusqlite::Connection, +pub fn transact_terms<'conn, 'a, 'id, I>(conn: &'conn rusqlite::Connection, partition_map: PartitionMap, schema_for_mutation: &'a Schema, schema: &'a Schema, terms: I, tempid_set: InternSet) -> Result<(TxReport, PartitionMap, Option)> where I: IntoIterator { - let mut tx = start_tx(conn, partition_map, schema_for_mutation, schema)?; let report = tx.transact_simple_terms(terms, tempid_set)?; conclude_tx(tx, report) } + +pub struct TxObserver { + notify_fn: Option)>>, + attributes: AttributeSet, + registration_time: Option>, +} + +impl TxObserver { + pub fn new(attributes: AttributeSet, notify_fn: F) -> TxObserver where F: FnMut(String, Vec) + 'static { + TxObserver { + notify_fn: Some(Box::new(notify_fn)), + attributes, + registration_time: None, + } + } + + fn set_registered(&mut self) { + self.registration_time = Some(now()); + } + + fn notify(&mut self, key: String, reports: &Vec) { + let mut matching_reports: Vec = vec![]; + // if let Some(registration_time) = self.registration_time { + // if registration_time.le(&reports[0].tx_instant) { + matching_reports = reports.iter().filter_map( |report| { + if self.attributes.intersection(&report.changeset).next().is_some(){ + Some(report.clone()) + } else { + None + } + }).collect(); + // } + // } + if !matching_reports.is_empty() { + if let Some(ref mut notify_fn) = self.notify_fn { + d(&format!("Notifying {:?} about tx {:?}", key, matching_reports)); + (notify_fn)(key, matching_reports); + } else { + d("no notify function specified for TxObserver"); + } + } else { + d(&format!("No matching reports for observer {:?} registered for updates on attributes {:?}: {:?}", key, reports, self.attributes)); + } + } +} + +#[derive(Default)] +pub struct TxObservationService { + observers: BTreeMap, +} + +impl TxObservationService { + // For testing purposes + pub fn is_registered(&self, key: &String) -> bool { + self.observers.contains_key(key) + } + + pub fn register(&mut self, key: String, mut observer: TxObserver) { + observer.set_registered(); + self.observers.insert(key.clone(), observer); + } + + pub fn deregister(&mut self, key: &String) { + self.observers.remove(key); + } + + pub fn has_observers(&self) -> bool { + !self.observers.is_empty() + } + + pub fn transaction_did_commit(&mut self, reports: &Vec) { + // notify all observers about their relevant transactions + for (key, observer) in self.observers.iter_mut() { + d(&format!("Notifying observer registered for key {:?}", key)); + observer.notify(key.clone(), reports); + } + } +} diff --git a/db/src/types.rs b/db/src/types.rs index 4fd40bd5..5e19da73 100644 --- a/db/src/types.rs +++ b/db/src/types.rs @@ -11,19 +11,22 @@ #![allow(dead_code)] use std::collections::HashMap; -use std::collections::BTreeMap; +use std::collections::{ + BTreeMap, + BTreeSet, +}; extern crate mentat_core; pub use self::mentat_core::{ - DateTime, - Entid, - ValueType, - TypedValue, Attribute, AttributeBitFlags, + DateTime, + Entid, Schema, + TypedValue, Utc, + ValueType, }; /// Represents one partition of the entid space. @@ -82,6 +85,9 @@ pub type AVPair = (Entid, TypedValue); /// Used to resolve lookup-refs and upserts. pub type AVMap<'a> = HashMap<&'a AVPair, Entid>; +// represents a set of entids that are correspond to attributes +pub type AttributeSet = BTreeSet; + /// A transaction report summarizes an applied transaction. #[derive(Clone, Debug, Eq, Hash, Ord, PartialOrd, PartialEq)] pub struct TxReport { @@ -97,4 +103,7 @@ pub struct TxReport { /// existing entid, or is allocated a new entid. (It is possible for multiple distinct string /// literal tempids to all unify to a single freshly allocated entid.) pub tempids: BTreeMap, + + // A set of entids for attributes that were affected inside this transaction + pub changeset: AttributeSet, } diff --git a/db/tests/tx_observer_tests.rs b/db/tests/tx_observer_tests.rs new file mode 100644 index 00000000..00b99e09 --- /dev/null +++ b/db/tests/tx_observer_tests.rs @@ -0,0 +1,265 @@ +// Copyright 2018 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +extern crate mentat_db; +extern crate mentat_core; + +use std::cell::{ + RefCell +}; +use std::collections::{ + BTreeMap, + BTreeSet, +}; +use std::ops::Deref; +use std::rc::{ + Rc, +}; + +use mentat_core::{ + now, +}; + +use mentat_db::{ + TxObserver, + TxObservationService, +}; + +use mentat_db::types::TxReport; + +fn get_registered_observer_attributes() -> BTreeSet { + let mut registered_attrs = BTreeSet::new(); + registered_attrs.insert(100); + registered_attrs.insert(200); + registered_attrs.insert(300); + registered_attrs +} + +fn tx_report(tx_id: i64, changes: BTreeSet) -> TxReport { + TxReport { + tx_id: tx_id, + tx_instant: now(), + tempids: BTreeMap::new(), + changeset: changes, + } +} + +#[test] +fn test_register_observer() { + let mut observer_service = TxObservationService::default(); + let key = "Test Observing".to_string(); + let registered_attrs = BTreeSet::new(); + + let tx_observer = TxObserver::new(registered_attrs, move |_obs_key, _batch| {}); + + observer_service.register(key.clone(), tx_observer); + assert!(observer_service.is_registered(&key)); +} + +#[test] +fn test_deregister_observer() { + let mut observer_service = TxObservationService::default(); + let key = "Test Observing".to_string(); + let registered_attrs = BTreeSet::new(); + + let tx_observer = TxObserver::new(registered_attrs, move |_obs_key, _batch| {}); + + observer_service.register(key.clone(), tx_observer); + assert!(observer_service.is_registered(&key)); + + observer_service.deregister(&key); + + assert!(!observer_service.is_registered(&key)); +} + +#[test] +fn test_observer_notified_on_registered_change() { + let mut observer_service = TxObservationService::default(); + let key = "Test Observing".to_string(); + let registered_attrs = get_registered_observer_attributes(); + + let txids = Rc::new(RefCell::new(Vec::new())); + let changes = Rc::new(RefCell::new(Vec::new())); + let called_key: Rc>> = Rc::new(RefCell::new(None)); + + let mut_txids = Rc::clone(&txids); + let mut_changes = Rc::clone(&changes); + let mut_key = Rc::clone(&called_key); + let tx_observer = TxObserver::new(registered_attrs, move |obs_key, batch| { + let mut k = mut_key.borrow_mut(); + *k = Some(obs_key.clone()); + let mut t = mut_txids.borrow_mut(); + let mut c = mut_changes.borrow_mut(); + for report in batch.iter() { + t.push(report.tx_id.clone()); + c.push(report.changeset.clone()); + } + t.sort(); + }); + + observer_service.register(key.clone(), tx_observer); + assert!(observer_service.is_registered(&key)); + + let mut tx_set_1 = BTreeSet::new(); + tx_set_1.insert(100); + tx_set_1.insert(400); + tx_set_1.insert(700); + let mut tx_set_2 = BTreeSet::new(); + tx_set_2.insert(200); + tx_set_2.insert(300); + let mut tx_set_3 = BTreeSet::new(); + tx_set_3.insert(600); + let mut batch = Vec::new(); + batch.push(tx_report(10, tx_set_1.clone())); + batch.push(tx_report(11, tx_set_2.clone())); + batch.push(tx_report(12, tx_set_3)); + observer_service.transaction_did_commit(&batch); + + let val = called_key.deref(); + assert_eq!(val, &RefCell::new(Some(key.clone()))); + let t = txids.deref(); + assert_eq!(t, &RefCell::new(vec![10, 11])); + + let c = changes.deref(); + assert_eq!(c, &RefCell::new(vec![tx_set_1, tx_set_2])); +} + +#[test] +fn test_observer_not_notified_on_unregistered_change() { + let mut observer_service = TxObservationService::default(); + let key = "Test Observing".to_string(); + let registered_attrs = get_registered_observer_attributes(); + + let txids = Rc::new(RefCell::new(Vec::new())); + let changes = Rc::new(RefCell::new(Vec::new())); + let called_key: Rc>> = Rc::new(RefCell::new(None)); + + let mut_txids = Rc::clone(&txids); + let mut_changes = Rc::clone(&changes); + let mut_key = Rc::clone(&called_key); + let tx_observer = TxObserver::new(registered_attrs, move |obs_key, batch| { + let mut k = mut_key.borrow_mut(); + *k = Some(obs_key.clone()); + let mut t = mut_txids.borrow_mut(); + let mut c = mut_changes.borrow_mut(); + for report in batch.iter() { + t.push(report.tx_id.clone()); + c.push(report.changeset.clone()); + } + t.sort(); + }); + + observer_service.register(key.clone(), tx_observer); + assert!(observer_service.is_registered(&key)); + + let mut tx_set_1 = BTreeSet::new(); + tx_set_1.insert(101); + tx_set_1.insert(401); + tx_set_1.insert(701); + let mut tx_set_2 = BTreeSet::new(); + tx_set_2.insert(201); + tx_set_2.insert(301); + let mut tx_set_3 = BTreeSet::new(); + tx_set_3.insert(601); + let mut batch = Vec::new(); + batch.push(tx_report(10, tx_set_1)); + batch.push(tx_report(11, tx_set_2)); + batch.push(tx_report(12, tx_set_3)); + observer_service.transaction_did_commit(&batch); + + let val = called_key.deref(); + assert_eq!(val, &RefCell::new(None)); + let t = txids.deref(); + assert_eq!(t, &RefCell::new(vec![])); + let c = changes.deref(); + assert_eq!(c, &RefCell::new(vec![])); +} + +#[test] +fn test_only_notifies_observers_registered_at_transact_start() { + let mut observer_service = TxObservationService::default(); + let key_1 = "Test Observing 1".to_string(); + let registered_attrs = get_registered_observer_attributes(); + + let txids_1 = Rc::new(RefCell::new(Vec::new())); + let changes_1 = Rc::new(RefCell::new(Vec::new())); + let called_key_1: Rc>> = Rc::new(RefCell::new(None)); + + let mut_txids_1 = Rc::clone(&txids_1); + let mut_changes_1 = Rc::clone(&changes_1); + let mut_key_1 = Rc::clone(&called_key_1); + + let tx_observer_1 = TxObserver::new(registered_attrs.clone(), move |obs_key, batch| { + let mut k = mut_key_1.borrow_mut(); + *k = Some(obs_key.clone()); + let mut t = mut_txids_1.borrow_mut(); + let mut c = mut_changes_1.borrow_mut(); + for report in batch.iter() { + t.push(report.tx_id.clone()); + c.push(report.changeset.clone()); + } + t.sort(); + }); + + observer_service.register(key_1.clone(), tx_observer_1); + assert!(observer_service.is_registered(&key_1)); + + let mut tx_set_1 = BTreeSet::new(); + tx_set_1.insert(100); + tx_set_1.insert(400); + tx_set_1.insert(700); + + let mut batch = Vec::new(); + batch.push(tx_report(10, tx_set_1.clone())); + + // register second observer after one transact has occured + let key_2 = "Test Observing 2".to_string(); + let txids_2 = Rc::new(RefCell::new(Vec::new())); + let changes_2 = Rc::new(RefCell::new(Vec::new())); + let called_key_2: Rc>> = Rc::new(RefCell::new(None)); + + let mut_txids_2 = Rc::clone(&txids_2); + let mut_changes_2 = Rc::clone(&changes_2); + let mut_key_2 = Rc::clone(&called_key_2); + + let tx_observer_2 = TxObserver::new(registered_attrs, move |obs_key, batch| { + let mut k = mut_key_2.borrow_mut(); + *k = Some(obs_key.clone()); + let mut t = mut_txids_2.borrow_mut(); + let mut c = mut_changes_2.borrow_mut(); + for report in batch.iter() { + t.push(report.tx_id.clone()); + c.push(report.changeset.clone()); + } + t.sort(); + }); + observer_service.register(key_2.clone(), tx_observer_2); + assert!(observer_service.is_registered(&key_2)); + + let mut tx_set_2 = BTreeSet::new(); + tx_set_2.insert(200); + tx_set_2.insert(300); + batch.push(tx_report(11, tx_set_2.clone())); + observer_service.transaction_did_commit(&batch); + + let val = called_key_1.deref(); + assert_eq!(val, &RefCell::new(Some(key_1.clone()))); + let t = txids_1.deref(); + assert_eq!(t, &RefCell::new(vec![10, 11])); + let c = changes_1.deref(); + assert_eq!(c, &RefCell::new(vec![tx_set_1.clone(), tx_set_2.clone()])); + + let val = called_key_2.deref(); + assert_eq!(val, &RefCell::new(None)); + let t = txids_2.deref(); + assert_eq!(t, &RefCell::new(vec![])); + let c = changes_2.deref(); + assert_eq!(c, &RefCell::new(vec![])); +} diff --git a/ffi/Cargo.toml b/ffi/Cargo.toml new file mode 100644 index 00000000..89c817cb --- /dev/null +++ b/ffi/Cargo.toml @@ -0,0 +1,7 @@ +[package] +name = "mentat_ffi" +version = "0.1.0" +authors = ["Emily Toop "] + +[dependencies.mentat] +path = ".." diff --git a/ffi/src/android.rs b/ffi/src/android.rs new file mode 100644 index 00000000..084bf74e --- /dev/null +++ b/ffi/src/android.rs @@ -0,0 +1,11 @@ +// TODO just use https://github.com/tomaka/android-rs-glue somehow? + +use std::os::raw::c_char; +use std::os::raw::c_int; + +// Logging +pub const ANDROID_LOG_DEBUG: i32 = 3; +pub const ANDROID_LOG_INFO: i32 = 4; +pub const ANDROID_LOG_WARN: i32 = 5; +pub const ANDROID_LOG_ERROR: i32 = 6; +extern { pub fn __android_log_write(prio: c_int, tag: *const c_char, text: *const c_char) -> c_int; } \ No newline at end of file diff --git a/ffi/src/lib.rs b/ffi/src/lib.rs new file mode 100644 index 00000000..c5af9ab0 --- /dev/null +++ b/ffi/src/lib.rs @@ -0,0 +1,150 @@ +// Copyright 2018 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +extern crate mentat; + +use std::collections::{ + BTreeSet, +}; +use std::os::raw::{ + c_char, + c_int, +}; +use std::slice; + +pub use mentat::{ + NamespacedKeyword, + HasSchema, + Store, + TxObserver, +}; + +pub mod utils; +pub mod android; + +pub use utils::strings::{ + c_char_to_string, + string_to_c_char, +}; + +use utils::log; + +#[repr(C)] +#[derive(Debug, Clone)] +pub struct ExternTxReport { + pub txid: i64, + pub changes: Box<[i64]>, + pub changes_len: usize +} + +#[repr(C)] +#[derive(Debug)] +pub struct ExternTxReportList { + pub reports: Box<[ExternTxReport]>, + pub len: usize +} + +#[no_mangle] +pub extern "C" fn new_store(uri: *const c_char) -> *mut Store { + let uri = c_char_to_string(uri); + let store = Store::open(&uri).expect("expected a store"); + Box::into_raw(Box::new(store)) +} + +#[no_mangle] +pub unsafe extern "C" fn store_destroy(store: *mut Store) { + let _ = Box::from_raw(store); +} + +#[no_mangle] +pub unsafe extern "C" fn store_register_observer(store: *mut Store, + key: *const c_char, + attributes: *const i64, + attributes_len: usize, + callback: extern fn(key: *const c_char)) {//, reports: &ExternTxReportList)) { + let store = &mut*store; + let mut attribute_set = BTreeSet::new(); + let slice = slice::from_raw_parts(attributes, attributes_len); + log::d(&format!("Observer attribute slice: {:?}", slice)); + for i in 0..attributes_len { + let attr = slice[i].into(); + attribute_set.insert(attr); + } + log::d(&format!("Observer attribute set: {:?}", attribute_set)); + let key = c_char_to_string(key); + let tx_observer = TxObserver::new(attribute_set, move |obs_key, batch| { + log::d(&format!("Calling observer registered for {:?}", obs_key)); + let extern_reports: Vec = batch.iter().map(|report| { + let changes: Vec = report.changeset.iter().map(|i|i.clone()).collect(); + let len = changes.len(); + ExternTxReport { + txid: report.tx_id.clone(), + changes: changes.into_boxed_slice(), + changes_len: len, + } + }).collect(); + let len = extern_reports.len(); + let reports = ExternTxReportList { + reports: extern_reports.into_boxed_slice(), + len: len, + }; + callback(string_to_c_char(obs_key));//, &reports); + }); + log::d(&format!("Registering observer for key: {:?}", key)); + store.register_observer(key, tx_observer); +} + +#[no_mangle] +pub unsafe extern "C" fn store_unregister_observer(store: *mut Store, key: *const c_char) { + let store = &mut*store; + let key = c_char_to_string(key); + log::d(&format!("Unregistering observer for key: {:?}", key)); + store.unregister_observer(&key); +} + +use std::panic; + +#[no_mangle] +pub unsafe extern "C" fn store_entid_for_attribute(store: *mut Store, attr: *const c_char) -> i64 { + let store = &mut*store; + log::d(&format!("store_entid_for_attribute got store")); + let mut keyword_string = c_char_to_string(attr); + log::d(&format!("store_entid_for_attribute keyword_string {:?}", keyword_string)); + let attr_name = keyword_string.split_off(1); + log::d(&format!("store_entid_for_attribute attr_name {:?}", attr_name)); + let parts: Vec<&str> = attr_name.split("/").collect(); + log::d(&format!("store_entid_for_attribute parts {:?}", parts)); + let kw = NamespacedKeyword::new(parts[0], parts[1]); + log::d(&format!("store_entid_for_attribute kw {:?}", kw)); + let conn = store.conn(); + log::d(&format!("store_entid_for_attribute conn")); + let current_schema = conn.current_schema(); + log::d(&format!("store_entid_for_attribute current_schema {:?}", current_schema)); + let got_entid = current_schema.get_entid(&kw); + log::d(&format!("store_entid_for_attribute got_entid {:?}", got_entid)); + let entid = got_entid.unwrap(); + log::d(&format!("store_entid_for_attribute entid {:?}", entid)); + entid.into() +} + +#[no_mangle] +pub unsafe extern "C" fn tx_report_list_entry_at(tx_report_list: *mut ExternTxReportList, index: c_int) -> *const ExternTxReport { + let tx_report_list = &*tx_report_list; + let index = index as usize; + let report = Box::new(tx_report_list.reports[index].clone()); + Box::into_raw(report) +} + +#[no_mangle] +pub unsafe extern "C" fn changelist_entry_at(tx_report: *mut ExternTxReport, index: c_int) -> i64 { + let tx_report = &*tx_report; + let index = index as usize; + tx_report.changes[index].clone() +} diff --git a/ffi/src/utils.rs b/ffi/src/utils.rs new file mode 100644 index 00000000..c5ad7fc4 --- /dev/null +++ b/ffi/src/utils.rs @@ -0,0 +1,56 @@ +// Copyright 2016 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +pub mod strings { + use std::os::raw::c_char; + use std::ffi::{ + CString, + CStr + }; + + pub fn c_char_to_string(cchar: *const c_char) -> String { + let c_str = unsafe { CStr::from_ptr(cchar) }; + let r_str = match c_str.to_str() { + Err(_) => "", + Ok(string) => string, + }; + r_str.to_string() + } + + pub fn string_to_c_char(r_string: String) -> *mut c_char { + CString::new(r_string).unwrap().into_raw() + } +} + +pub mod log { + #[cfg(all(target_os="android", not(test)))] + use std::ffi::CString; + + #[cfg(all(target_os="android", not(test)))] + use android; + + // TODO far from ideal. And, we might actually want to println in tests. + #[cfg(all(not(target_os="android"), not(target_os="ios")))] + pub fn d(_: &str) {} + + #[cfg(all(target_os="ios", not(test)))] + pub fn d(message: &str) { + eprintln!("{}", message); + } + + #[cfg(all(target_os="android", not(test)))] + pub fn d(message: &str) { + let message = CString::new(message).unwrap(); + let message = message.as_ptr(); + let tag = CString::new("mentat_ffi::utils").unwrap(); + let tag = tag.as_ptr(); + unsafe { android::__android_log_write(android::ANDROID_LOG_DEBUG, tag, message) }; + } +} diff --git a/src/conn.rs b/src/conn.rs index 1c648c28..ad8b69ff 100644 --- a/src/conn.rs +++ b/src/conn.rs @@ -48,6 +48,8 @@ use mentat_db::{ transact, transact_terms, PartitionMap, + TxObservationService, + TxObserver, TxReport, }; @@ -119,8 +121,9 @@ pub struct Conn { // TODO: maintain cache of query plans that could be shared across threads and invalidated when // the schema changes. #315. - attribute_cache: RwLock, + + tx_observer_service: Mutex, } /// A convenience wrapper around a single SQLite connection and a Conn. This is suitable @@ -186,8 +189,9 @@ pub struct InProgress<'a, 'c> { partition_map: PartitionMap, schema: Schema, cache: RwLockWriteGuard<'a, SQLiteAttributeCache>, - use_caching: bool, + tx_reports: Vec, + observer_service: Option<&'a Mutex>, } /// Represents an in-progress set of reads to the store. Just like `InProgress`, @@ -336,6 +340,29 @@ impl<'a, 'c> HasSchema for InProgress<'a, 'c> { } } +use std::os::raw::c_char; +use std::os::raw::c_int; +use std::ffi::CString; + +pub const ANDROID_LOG_DEBUG: i32 = 3; +#[cfg(all(target_os="android", not(test)))] +extern { pub fn __android_log_write(prio: c_int, tag: *const c_char, text: *const c_char) -> c_int; } + +#[cfg(all(target_os="android", not(test)))] +pub fn d(message: &str) { + let tag = "mentat::conn"; + let message = CString::new(message).unwrap(); + let message = message.as_ptr(); + let tag = CString::new(tag).unwrap(); + let tag = tag.as_ptr(); + unsafe { __android_log_write(ANDROID_LOG_DEBUG, tag, message) }; +} + +#[cfg(all(not(target_os="android")))] +pub fn d(message: &str) { + let tag = "mentat::conn"; + println!("d: {}: {}", tag, message); +} impl<'a, 'c> InProgress<'a, 'c> { pub fn builder(self) -> InProgressBuilder<'a, 'c> { @@ -354,6 +381,7 @@ impl<'a, 'c> InProgress<'a, 'c> { &self.schema, terms, tempid_set)?; + self.tx_reports.push(report.clone()); self.partition_map = next_partition_map; if let Some(schema) = next_schema { self.schema = schema; @@ -371,6 +399,8 @@ impl<'a, 'c> InProgress<'a, 'c> { // `Default::default` in those situations to extract the partition map, and so there // would still be some cost. let (report, next_partition_map, next_schema) = transact(&self.transaction, self.partition_map.clone(), &self.schema, &self.schema, entities)?; + self.tx_reports.push(report.clone()); + self.partition_map = next_partition_map; if let Some(schema) = next_schema { self.schema = schema; @@ -403,6 +433,14 @@ impl<'a, 'c> InProgress<'a, 'c> { // Commit the SQLite transaction while we hold the mutex. self.transaction.commit()?; + // notify any observers about a transaction that has been made. + if let Some(observer_service) = self.observer_service { + d(&format!("got an observer!")); + observer_service.lock().unwrap().transaction_did_commit(&self.tx_reports); + } else { + d(&format!("don't got no observer!")); + } + metadata.generation += 1; metadata.partition_map = self.partition_map; @@ -450,6 +488,16 @@ impl Store { direction, CacheAction::Register) } + + pub fn register_observer(&mut self, key: String, observer: TxObserver) { + self.conn.register_observer(key, observer); + } + + pub fn unregister_observer(&mut self, key: &String) { + println!("Store: unregistering observer for key {:?}", key); + self.conn.unregister_observer(key); + println!("Store: unregistered observer for key {:?}", key); + } } impl Queryable for Store { @@ -497,10 +545,16 @@ impl Conn { fn new(partition_map: PartitionMap, schema: Schema) -> Conn { Conn { metadata: Mutex::new(Metadata::new(0, partition_map, Arc::new(schema))), - attribute_cache: Default::default() + attribute_cache: Default::default(), + tx_observer_service: Mutex::new(TxObservationService::default()), } } + #[cfg(test)] + pub fn is_registered_as_observer(&self, key: &String) -> bool { + self.tx_observer_service.lock().unwrap().is_registered(key) + } + /// Prepare the provided SQLite handle for use as a Mentat store. Creates tables but /// _does not_ write the bootstrap schema. This constructor should only be used by /// consumers that expect to populate raw transaction data themselves. @@ -648,6 +702,8 @@ impl Conn { schema: (*current_schema).clone(), cache: self.attribute_cache.write().unwrap(), use_caching: true, + tx_reports: Vec::new(), + observer_service: if self.tx_observer_service.lock().unwrap().has_observers() { Some(&self.tx_observer_service) } else { None }, }) } @@ -731,6 +787,16 @@ impl Conn { }, } } + + pub fn register_observer(&mut self, key: String, observer: TxObserver) { + self.tx_observer_service.lock().unwrap().register(key, observer); + } + + pub fn unregister_observer(&mut self, key: &String) { + println!("Conn: unregistering observer for key {:?}", key); + self.tx_observer_service.lock().unwrap().deregister(key); + println!("Conn: unregistered observer for key {:?}", key); + } } #[cfg(test)] @@ -739,17 +805,42 @@ mod tests { extern crate mentat_parser_utils; + use std::cell::{ + RefCell + }; + use std::collections::{ + BTreeSet, + }; + use std::ops::Deref; + use std::rc::{ + Rc, + }; use std::time::Instant; use mentat_core::{ TypedValue, }; - use query::{ + + use ::entity_builder::{ + BuildTerms, + }; + + use ::query::{ Variable, }; use ::QueryResults; + use ::vocabulary::{ + AttributeBuilder, + Definition, + VersionedStore, + }; + + use ::vocabulary::attribute::{ + Unique + }; + use mentat_db::USER0; #[test] @@ -1057,4 +1148,203 @@ mod tests { assert!(cached_elapsed_time < uncached_elapsed_time); } } + + #[test] + fn test_register_observer() { + let mut sqlite = db::new_connection("").unwrap(); + let mut conn = Conn::connect(&mut sqlite).unwrap(); + + let key = "Test Observer".to_string(); + let tx_observer = TxObserver::new(BTreeSet::new(), move |_obs_key, _batch| {}); + + conn.register_observer(key.clone(), tx_observer); + assert!(conn.is_registered_as_observer(&key)); + } + + #[test] + fn test_deregister_observer() { + let mut sqlite = db::new_connection("").unwrap(); + let mut conn = Conn::connect(&mut sqlite).unwrap(); + + let key = "Test Observer".to_string(); + + let tx_observer = TxObserver::new(BTreeSet::new(), move |_obs_key, _batch| {}); + + conn.register_observer(key.clone(), tx_observer); + assert!(conn.is_registered_as_observer(&key)); + + conn.unregister_observer(&key); + + assert!(!conn.is_registered_as_observer(&key)); + } + + fn add_schema(conn: &mut Conn, mut sqlite: &mut rusqlite::Connection) { + // transact some schema + let mut in_progress = conn.begin_transaction(&mut sqlite).expect("expected in progress"); + in_progress.ensure_vocabulary(&Definition { + name: kw!(:todo/items), + version: 1, + attributes: vec![ + (kw!(:todo/uuid), + AttributeBuilder::new() + .value_type(ValueType::Uuid) + .multival(false) + .unique(Unique::Value) + .index(true) + .build()), + (kw!(:todo/name), + AttributeBuilder::new() + .value_type(ValueType::String) + .multival(false) + .fulltext(true) + .build()), + (kw!(:todo/completion_date), + AttributeBuilder::new() + .value_type(ValueType::Instant) + .multival(false) + .build()), + (kw!(:label/name), + AttributeBuilder::new() + .value_type(ValueType::String) + .multival(false) + .unique(Unique::Value) + .fulltext(true) + .index(true) + .build()), + (kw!(:label/color), + AttributeBuilder::new() + .value_type(ValueType::String) + .multival(false) + .build()), + ], + }).expect("expected vocubulary"); + in_progress.commit().expect("Expected vocabulary committed"); + } + + #[test] + fn test_observer_notified_on_registered_change() { + let mut sqlite = db::new_connection("").unwrap(); + let mut conn = Conn::connect(&mut sqlite).unwrap(); + add_schema(&mut conn, &mut sqlite); + + let name_entid: Entid = conn.current_schema().get_entid(&kw!(:todo/name)).expect("entid to exist for name").into(); + let date_entid: Entid = conn.current_schema().get_entid(&kw!(:todo/completion_date)).expect("entid to exist for completion_date").into(); + let mut registered_attrs = BTreeSet::new(); + registered_attrs.insert(name_entid.clone()); + registered_attrs.insert(date_entid.clone()); + + let key = "Test Observing".to_string(); + + let txids = Rc::new(RefCell::new(Vec::new())); + let changes = Rc::new(RefCell::new(Vec::new())); + let called_key: Rc>> = Rc::new(RefCell::new(None)); + + let mut_txids = Rc::clone(&txids); + let mut_changes = Rc::clone(&changes); + let mut_key = Rc::clone(&called_key); + let tx_observer = TxObserver::new(registered_attrs, move |obs_key, batch| { + let mut k = mut_key.borrow_mut(); + *k = Some(obs_key.clone()); + let mut t = mut_txids.borrow_mut(); + let mut c = mut_changes.borrow_mut(); + for report in batch.iter() { + t.push(report.tx_id.clone()); + c.push(report.changeset.clone()); + } + t.sort(); + }); + + conn.register_observer(key.clone(), tx_observer); + assert!(conn.is_registered_as_observer(&key)); + + let mut tx_ids = Vec::new(); + let mut changesets = Vec::new(); + { + let mut in_progress = conn.begin_transaction(&mut sqlite).expect("expected transaction"); + for i in 0..3 { + let name = format!("todo{}", i); + let uuid = Uuid::new_v4(); + let mut builder = in_progress.builder().describe_tempid(&name); + builder.add_kw( &kw!(:todo/uuid), TypedValue::Uuid(uuid)).expect("Expected added uuid"); + builder.add_kw(&kw!(:todo/name), TypedValue::typed_string(&name)).expect("Expected added name"); + if i % 2 == 0 { + builder.add_kw(&kw!(:todo/completion_date), TypedValue::current_instant()).expect("Expected added date"); + } + let (ip, r) = builder.transact(); + let report = r.expect("expected a report"); + tx_ids.push(report.tx_id.clone()); + changesets.push(report.changeset.clone()); + in_progress = ip; + } + let mut builder = in_progress.builder().describe_tempid("Label"); + builder.add_kw(&kw!(:label/name), TypedValue::typed_string("Label 1")).expect("Expected added name"); + builder.add_kw(&kw!(:label/color), TypedValue::typed_string("blue")).expect("Expected added color"); + builder.commit().expect("expect transaction to occur"); + } + + let val = called_key.deref(); + assert_eq!(val, &RefCell::new(Some(key.clone()))); + let t = txids.deref(); + assert_eq!(t, &RefCell::new(tx_ids)); + + let c = changes.deref(); + assert_eq!(c, &RefCell::new(changesets)); + } + + #[test] + fn test_observer_not_notified_on_unregistered_change() { + let mut sqlite = db::new_connection("").unwrap(); + let mut conn = Conn::connect(&mut sqlite).unwrap(); + add_schema(&mut conn, &mut sqlite); + + let name_entid: Entid = conn.current_schema().get_entid(&kw!(:todo/name)).expect("entid to exist for name").into(); + let date_entid: Entid = conn.current_schema().get_entid(&kw!(:todo/completion_date)).expect("entid to exist for completion_date").into(); + let mut registered_attrs = BTreeSet::new(); + registered_attrs.insert(name_entid.clone()); + registered_attrs.insert(date_entid.clone()); + + let key = "Test Observing".to_string(); + + let txids = Rc::new(RefCell::new(Vec::new())); + let changes = Rc::new(RefCell::new(Vec::new())); + let called_key: Rc>> = Rc::new(RefCell::new(None)); + + let mut_txids = Rc::clone(&txids); + let mut_changes = Rc::clone(&changes); + let mut_key = Rc::clone(&called_key); + let tx_observer = TxObserver::new(registered_attrs, move |obs_key, batch| { + let mut k = mut_key.borrow_mut(); + *k = Some(obs_key.clone()); + let mut t = mut_txids.borrow_mut(); + let mut c = mut_changes.borrow_mut(); + for report in batch.iter() { + t.push(report.tx_id.clone()); + c.push(report.changeset.clone()); + } + t.sort(); + }); + + conn.register_observer(key.clone(), tx_observer); + assert!(conn.is_registered_as_observer(&key)); + + { + let mut in_progress = conn.begin_transaction(&mut sqlite).expect("expected transaction"); + for i in 0..3 { + let name = format!("label{}", i); + let mut builder = in_progress.builder().describe_tempid(&name); + builder.add_kw(&kw!(:label/name), TypedValue::typed_string(&name)).expect("Expected added name"); + builder.add_kw(&kw!(:label/color), TypedValue::typed_string("blue")).expect("Expected added color"); + let (ip, _) = builder.transact(); + in_progress = ip; + } + } + + let val = called_key.deref(); + assert_eq!(val, &RefCell::new(None)); + let t = txids.deref(); + assert_eq!(t, &RefCell::new(vec![])); + + let c = changes.deref(); + assert_eq!(c, &RefCell::new(vec![])); + } } diff --git a/src/lib.rs b/src/lib.rs index d939bef1..6704d3e7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -54,6 +54,7 @@ pub use mentat_query::{ pub use mentat_db::{ CORE_SCHEMA_VERSION, DB_SCHEMA_CORE, + TxObserver, TxReport, new_connection, }; diff --git a/tolstoy/src/syncer.rs b/tolstoy/src/syncer.rs index b8b05b14..671aa41a 100644 --- a/tolstoy/src/syncer.rs +++ b/tolstoy/src/syncer.rs @@ -49,19 +49,27 @@ use tx_mapper::TxMapper; // See https://github.com/mozilla/mentat/issues/571 // Below is some debug Android-friendly logging: -// use std::os::raw::c_char; -// use std::os::raw::c_int; -// use std::ffi::CString; -// pub const ANDROID_LOG_DEBUG: i32 = 3; -// extern { pub fn __android_log_write(prio: c_int, tag: *const c_char, text: *const c_char) -> c_int; } +use std::os::raw::c_char; +use std::os::raw::c_int; +use std::ffi::CString; +pub const ANDROID_LOG_DEBUG: i32 = 3; +#[cfg(all(target_os="android", not(test)))] +extern { pub fn __android_log_write(prio: c_int, tag: *const c_char, text: *const c_char) -> c_int; } +#[cfg(all(target_os="android", not(test)))] pub fn d(message: &str) { - println!("d: {}", message); - // let message = CString::new(message).unwrap(); - // let message = message.as_ptr(); - // let tag = CString::new("RustyToodle").unwrap(); - // let tag = tag.as_ptr(); - // unsafe { __android_log_write(ANDROID_LOG_DEBUG, tag, message) }; + let tag = "mentat_tolstoy::syncer"; + let message = CString::new(message).unwrap(); + let message = message.as_ptr(); + let tag = CString::new(tag).unwrap(); + let tag = tag.as_ptr(); + unsafe { __android_log_write(ANDROID_LOG_DEBUG, tag, message) }; +} + +#[cfg(all(not(target_os="android")))] +pub fn d(message: &str) { + let tag = "mentat_tolstoy::syncer"; + println!("d: {}: {}", tag, message); } pub struct Syncer {} @@ -88,11 +96,13 @@ impl TxReceiver for InquiringTxReceiver { fn tx(&mut self, tx_id: Entid, _datoms: &mut T) -> Result<()> where T: Iterator { self.last_tx = Some(tx_id); + d(&format!("got new last_tx: {:?}", self.last_tx)); Ok(()) } fn done(&mut self) -> Result<()> { self.is_done = true; + d(&format!("done!")); Ok(()) } } @@ -154,7 +164,7 @@ impl<'c> TxReceiver for UploadingTxReceiver<'c> { Some(parent) => { d(&format!("putting transaction: {:?}, {:?}, {:?}", &tx_uuid, &parent, &tx_chunks)); self.remote_client.put_transaction(&tx_uuid, &parent, &tx_chunks)?; - + }, None => { d(&format!("putting transaction: {:?}, {:?}, {:?}", &tx_uuid, &self.remote_head, &tx_chunks)); @@ -194,6 +204,7 @@ impl Syncer { let mut uploader = UploadingTxReceiver::new(remote_client, remote_head); Processor::process(db_tx, from_tx, &mut uploader)?; if !uploader.is_done { + d(&format!("upload_ours: TxProcessorUnfinished!")); bail!(ErrorKind::TxProcessorUnfinished); } // Last tx uuid uploaded by the tx receiver. @@ -214,11 +225,13 @@ impl Syncer { fn download_theirs(_db_tx: &mut rusqlite::Transaction, remote_client: &RemoteClient, remote_head: &Uuid) -> Result> { let new_txs = remote_client.get_transactions(remote_head)?; + d(&format!("there are {} new_txs on the remote client", new_txs.len())); let mut tx_list = Vec::new(); for tx in new_txs { let mut tx_parts = Vec::new(); let chunks = remote_client.get_chunks(&tx)?; + d(&format!("received {} chunks from remote client", chunks.len())); // We pass along all of the downloaded parts, including transaction's // metadata datom. Transactor is expected to do the right thing, and @@ -233,7 +246,7 @@ impl Syncer { parts: tx_parts }); } - + d(&format!("got tx list: {:?}", &tx_list)); Ok(tx_list) @@ -243,7 +256,7 @@ impl Syncer { d(&format!("sync flowing")); ensure_current_version(db_tx)?; - + // TODO configure this sync with some auth data let remote_client = RemoteClient::new(server_uri.clone(), user_uuid.clone()); @@ -261,10 +274,20 @@ impl Syncer { let mut inquiring_tx_receiver = InquiringTxReceiver::new(); // TODO don't just start from the beginning... but then again, we should do this // without walking the table at all, and use the tx index. - Processor::process(db_tx, None, &mut inquiring_tx_receiver)?; + let inq_res = Processor::process(db_tx, None, &mut inquiring_tx_receiver); + match inq_res { + Ok(_) => d(&format!("inquiry ok")), + Err(e) => { + d(&format!("inquiry err: {:?}", e)); + return Err(e); + } + } + d(&format!("After Processor::process inquiring_tx_receiver")); if !inquiring_tx_receiver.is_done { + d(&format!("!inquiring_tx_receiver.is_done")); bail!(ErrorKind::TxProcessorUnfinished); } + d(&format!("TxMapper::get... local head")); let (have_local_changes, local_store_empty) = match inquiring_tx_receiver.last_tx { Some(tx) => { match TxMapper::get(db_tx, tx)? { @@ -274,6 +297,7 @@ impl Syncer { }, None => (false, true) }; + d(&format!("has_local_changes {:?}, local_empty {:?}", have_local_changes, local_store_empty)); // Check if the server is empty - populate it. if remote_head == Uuid::nil() { @@ -284,7 +308,7 @@ impl Syncer { // Check if the server is the same as us, and if our HEAD moved. } else if locally_known_remote_head == remote_head { d(&format!("server unchanged since last sync.")); - + if !have_local_changes { d(&format!("local HEAD did not move. Nothing to do!")); return Ok(SyncResult::NoChanges); @@ -385,7 +409,7 @@ impl RemoteClient { let uri = uri.parse()?; d(&format!("parsed uri {:?}", uri)); - + let work = client.get(uri).and_then(|res| { println!("Response: {}", res.status()); @@ -400,7 +424,6 @@ impl RemoteClient { d(&format!("running...")); let head_json = core.run(work)?; - d(&format!("got head: {:?}", &head_json.head)); Ok(head_json.head) } @@ -450,7 +473,7 @@ impl RemoteClient { let uri = uri.parse()?; d(&format!("parsed uri {:?}", uri)); - + let work = client.get(uri).and_then(|res| { println!("Response: {}", res.status()); @@ -483,7 +506,7 @@ impl RemoteClient { let uri = uri.parse()?; d(&format!("parsed uri {:?}", uri)); - + let work = client.get(uri).and_then(|res| { println!("Response: {}", res.status()); @@ -516,7 +539,7 @@ impl RemoteClient { let uri = uri.parse()?; d(&format!("parsed uri {:?}", uri)); - + let work = client.get(uri).and_then(|res| { println!("Response: {}", res.status()); diff --git a/tolstoy/src/tx_processor.rs b/tolstoy/src/tx_processor.rs index 9cf88489..029a4c09 100644 --- a/tolstoy/src/tx_processor.rs +++ b/tolstoy/src/tx_processor.rs @@ -49,6 +49,29 @@ where T: Sized + Iterator> + 't { rows: &'t mut Peekable, } +use std::os::raw::c_char; +use std::os::raw::c_int; +use std::ffi::CString; +pub const ANDROID_LOG_DEBUG: i32 = 3; +#[cfg(all(target_os="android", not(test)))] +extern { pub fn __android_log_write(prio: c_int, tag: *const c_char, text: *const c_char) -> c_int; } + +#[cfg(all(target_os="android", not(test)))] +pub fn d(message: &str) { + let tag = "mentat_db::tx_processor"; + let message = CString::new(message).unwrap(); + let message = message.as_ptr(); + let tag = CString::new(tag).unwrap(); + let tag = tag.as_ptr(); + unsafe { __android_log_write(ANDROID_LOG_DEBUG, tag, message) }; +} + +#[cfg(all(not(target_os="android")))] +pub fn d(message: &str) { + let tag = "mentat_db::tx_processor"; + println!("d: {}: {}", tag, message); +} + impl<'dbtx, 't, T> DatomsIterator<'dbtx, 't, T> where T: Sized + Iterator> + 't { fn new(first: &'dbtx TxPart, rows: &'t mut Peekable) -> DatomsIterator<'dbtx, 't, T> @@ -142,28 +165,40 @@ impl Processor { let mut stmt = sqlite.prepare(&select_query)?; let mut rows = stmt.query_and_then(&[], to_tx_part)?.peekable(); - let mut at_first_tx = true; + let mut at_tx = 1; let mut current_tx = None; while let Some(row) = rows.next() { let datom = row?; - + d(&format!("datom! {:?}", datom)); match current_tx { Some(tx) => { if tx != datom.tx { + d(&format!("new tx! {:?}", datom.tx)); + at_tx = at_tx + 1; + d(&format!("at_tx! {:?}", at_tx)); current_tx = Some(datom.tx); + if at_tx <= 2 && skip_first_tx { + d(&format!("skipping subsequent")); + continue; + } + d(&format!("Some: calling receiver.tx")); receiver.tx( datom.tx, &mut DatomsIterator::new(&datom, &mut rows) )?; + d(&format!("Some: returned from receiver.tx")); + } else { + d(&format!("skipping over datom in current tx block")); } }, None => { current_tx = Some(datom.tx); - if at_first_tx && skip_first_tx { - at_first_tx = false; + if at_tx <= 3 && skip_first_tx { + d(&format!("skipping first")); continue; } + d(&format!("None: calling receiver.tx")); receiver.tx( datom.tx, &mut DatomsIterator::new(&datom, &mut rows) @@ -171,7 +206,9 @@ impl Processor { } } } + d(&format!("calling receiver.done")); receiver.done()?; Ok(()) } } +