Compare commits

...

2 commits

Author SHA1 Message Date
Emily Toop
58716ae22e Transaction observation
- Creation of transaction observer service that takes `TxObserver`s and registers them against keys and for sets of attributes.
- Observer service called when InProgress commits and filters observers that are affected by the tx's that occured and notifies them of what changed
- InProgress batches up tx's as it goes along so granular notification can be provided.
2018-03-12 10:13:49 +00:00
Grisha Kruglov
d9d2b3a89a Replication syncing 2018-03-08 02:20:50 -05:00
23 changed files with 1424 additions and 128 deletions

View file

@ -16,11 +16,12 @@ version = "0.6.1"
build = "build/version.rs"
[features]
default = ["bundled_sqlite3"]
default = ["bundled_sqlite3", "syncable"]
bundled_sqlite3 = ["rusqlite/bundled"]
syncable = ["mentat_tolstoy"]
[workspace]
members = ["tools/cli"]
members = ["tools/cli", "ffi"]
[build-dependencies]
rustc_version = "0.1.7"
@ -78,6 +79,7 @@ path = "tx-parser"
[dependencies.mentat_tolstoy]
path = "tolstoy"
optional = true
[profile.release]
debug = true

View file

@ -1115,6 +1115,7 @@ pub trait PartitionMapping {
fn allocate_entid<S: ?Sized + Ord + Display>(&mut self, partition: &S) -> i64 where String: Borrow<S>;
fn allocate_entids<S: ?Sized + Ord + Display>(&mut self, partition: &S, n: usize) -> Range<i64> where String: Borrow<S>;
fn contains_entid(&self, entid: Entid) -> bool;
fn expand_up_to<S: ?Sized + Ord + Display>(&mut self, partition: &S, entid: i64) where String: Borrow<S>;
}
impl PartitionMapping for PartitionMap {
@ -1136,6 +1137,23 @@ impl PartitionMapping for PartitionMap {
}
}
fn expand_up_to<S: ?Sized + Ord + Display>(&mut self, partition: &S, entid: i64) where String: Borrow<S> {
match self.get_mut(partition) {
Some(partition) => {
// Don't honour requests to shrink the partition.
if partition.index > entid {
return ()
}
let new_index = entid + 1;
if partition.index != new_index {
partition.index = new_index;
}
},
// This is a programming error.
None => panic!("Cannot expand unknown partition: {}", partition),
}
}
fn contains_entid(&self, entid: Entid) -> bool {
self.values().any(|partition| partition.contains_entid(entid))
}

View file

@ -47,8 +47,7 @@ mod tx;
pub mod types;
mod upsert_resolution;
// Export these for reference from tests. cfg(test) should work, but doesn't.
// #[cfg(test)]
// Export these for reference from sync code and tests.
pub use bootstrap::{
TX0,
USER0,
@ -76,9 +75,12 @@ pub use db::{
pub use tx::{
transact,
transact_terms,
TxObservationService,
TxObserver,
};
pub use types::{
AttributeSet,
DB,
PartitionMap,
TxReport,

View file

@ -51,7 +51,9 @@ use std::collections::{
BTreeSet,
VecDeque,
};
use std::rc::Rc;
use std::rc::{
Rc,
};
use db;
use db::{
@ -105,6 +107,7 @@ use schema::{
};
use types::{
Attribute,
AttributeSet,
AVPair,
AVMap,
Entid,
@ -115,6 +118,30 @@ use types::{
};
use upsert_resolution::Generation;
use std::os::raw::c_char;
use std::os::raw::c_int;
use std::ffi::CString;
pub const ANDROID_LOG_DEBUG: i32 = 3;
#[cfg(all(target_os="android", not(test)))]
extern { pub fn __android_log_write(prio: c_int, tag: *const c_char, text: *const c_char) -> c_int; }
#[cfg(all(target_os="android", not(test)))]
pub fn d(message: &str) {
let tag = "mentat_db::tx";
let message = CString::new(message).unwrap();
let message = message.as_ptr();
let tag = CString::new(tag).unwrap();
let tag = tag.as_ptr();
unsafe { __android_log_write(ANDROID_LOG_DEBUG, tag, message) };
}
#[cfg(all(not(target_os="android")))]
pub fn d(message: &str) {
let tag = "mentat_db::tx";
println!("d: {}: {}", tag, message);
}
/// A transaction on its way to being applied.
#[derive(Debug)]
pub struct Tx<'conn, 'a> {
@ -600,6 +627,11 @@ impl<'conn, 'a> Tx<'conn, 'a> {
final_populations.allocated,
inert_terms.into_iter().map(|term| term.unwrap()).collect()].concat();
let affected_attrs: AttributeSet = final_terms.iter().filter_map(|t| {
match t {
&Term::AddOrRetract(_, _, attrid, _) => Some(attrid),
}
}). collect();
let tx_instant;
{ // TODO: Don't use this block to scope borrowing the schema; instead, extract a helper function.
@ -689,7 +721,6 @@ impl<'conn, 'a> Tx<'conn, 'a> {
if !fts_many.is_empty() {
self.store.insert_fts_searches(&fts_many[..], db::SearchType::Exact)?;
}
self.store.commit_transaction(self.tx_id)?;
}
@ -718,6 +749,7 @@ impl<'conn, 'a> Tx<'conn, 'a> {
tx_id: self.tx_id,
tx_instant,
tempids: tempids,
changeset: affected_attrs,
})
}
}
@ -749,7 +781,7 @@ fn conclude_tx(tx: Tx, report: TxReport) -> Result<(TxReport, PartitionMap, Opti
///
/// This approach is explained in https://github.com/mozilla/mentat/wiki/Transacting.
// TODO: move this to the transactor layer.
pub fn transact<'conn, 'a, I>(conn: &'conn rusqlite::Connection,
pub fn transact<'conn, 'a, 'id, I>(conn: &'conn rusqlite::Connection,
partition_map: PartitionMap,
schema_for_mutation: &'a Schema,
schema: &'a Schema,
@ -762,15 +794,92 @@ pub fn transact<'conn, 'a, I>(conn: &'conn rusqlite::Connection,
}
/// Just like `transact`, but accepts lower-level inputs to allow bypassing the parser interface.
pub fn transact_terms<'conn, 'a, I>(conn: &'conn rusqlite::Connection,
pub fn transact_terms<'conn, 'a, 'id, I>(conn: &'conn rusqlite::Connection,
partition_map: PartitionMap,
schema_for_mutation: &'a Schema,
schema: &'a Schema,
terms: I,
tempid_set: InternSet<TempId>) -> Result<(TxReport, PartitionMap, Option<Schema>)>
where I: IntoIterator<Item=TermWithTempIds> {
let mut tx = start_tx(conn, partition_map, schema_for_mutation, schema)?;
let report = tx.transact_simple_terms(terms, tempid_set)?;
conclude_tx(tx, report)
}
pub struct TxObserver {
notify_fn: Option<Box<FnMut(String, Vec<TxReport>)>>,
attributes: AttributeSet,
registration_time: Option<DateTime<Utc>>,
}
impl TxObserver {
pub fn new<F>(attributes: AttributeSet, notify_fn: F) -> TxObserver where F: FnMut(String, Vec<TxReport>) + 'static {
TxObserver {
notify_fn: Some(Box::new(notify_fn)),
attributes,
registration_time: None,
}
}
fn set_registered(&mut self) {
self.registration_time = Some(now());
}
fn notify(&mut self, key: String, reports: &Vec<TxReport>) {
let mut matching_reports: Vec<TxReport> = vec![];
// if let Some(registration_time) = self.registration_time {
// if registration_time.le(&reports[0].tx_instant) {
matching_reports = reports.iter().filter_map( |report| {
if self.attributes.intersection(&report.changeset).next().is_some(){
Some(report.clone())
} else {
None
}
}).collect();
// }
// }
if !matching_reports.is_empty() {
if let Some(ref mut notify_fn) = self.notify_fn {
d(&format!("Notifying {:?} about tx {:?}", key, matching_reports));
(notify_fn)(key, matching_reports);
} else {
d("no notify function specified for TxObserver");
}
} else {
d(&format!("No matching reports for observer {:?} registered for updates on attributes {:?}: {:?}", key, reports, self.attributes));
}
}
}
#[derive(Default)]
pub struct TxObservationService {
observers: BTreeMap<String, TxObserver>,
}
impl TxObservationService {
// For testing purposes
pub fn is_registered(&self, key: &String) -> bool {
self.observers.contains_key(key)
}
pub fn register(&mut self, key: String, mut observer: TxObserver) {
observer.set_registered();
self.observers.insert(key.clone(), observer);
}
pub fn deregister(&mut self, key: &String) {
self.observers.remove(key);
}
pub fn has_observers(&self) -> bool {
!self.observers.is_empty()
}
pub fn transaction_did_commit(&mut self, reports: &Vec<TxReport>) {
// notify all observers about their relevant transactions
for (key, observer) in self.observers.iter_mut() {
d(&format!("Notifying observer registered for key {:?}", key));
observer.notify(key.clone(), reports);
}
}
}

View file

@ -11,19 +11,22 @@
#![allow(dead_code)]
use std::collections::HashMap;
use std::collections::BTreeMap;
use std::collections::{
BTreeMap,
BTreeSet,
};
extern crate mentat_core;
pub use self::mentat_core::{
DateTime,
Entid,
ValueType,
TypedValue,
Attribute,
AttributeBitFlags,
DateTime,
Entid,
Schema,
TypedValue,
Utc,
ValueType,
};
/// Represents one partition of the entid space.
@ -82,6 +85,9 @@ pub type AVPair = (Entid, TypedValue);
/// Used to resolve lookup-refs and upserts.
pub type AVMap<'a> = HashMap<&'a AVPair, Entid>;
// represents a set of entids that are correspond to attributes
pub type AttributeSet = BTreeSet<Entid>;
/// A transaction report summarizes an applied transaction.
#[derive(Clone, Debug, Eq, Hash, Ord, PartialOrd, PartialEq)]
pub struct TxReport {
@ -97,4 +103,7 @@ pub struct TxReport {
/// existing entid, or is allocated a new entid. (It is possible for multiple distinct string
/// literal tempids to all unify to a single freshly allocated entid.)
pub tempids: BTreeMap<String, Entid>,
// A set of entids for attributes that were affected inside this transaction
pub changeset: AttributeSet,
}

View file

@ -0,0 +1,265 @@
// Copyright 2018 Mozilla
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// this file except in compliance with the License. You may obtain a copy of the
// License at http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
extern crate mentat_db;
extern crate mentat_core;
use std::cell::{
RefCell
};
use std::collections::{
BTreeMap,
BTreeSet,
};
use std::ops::Deref;
use std::rc::{
Rc,
};
use mentat_core::{
now,
};
use mentat_db::{
TxObserver,
TxObservationService,
};
use mentat_db::types::TxReport;
fn get_registered_observer_attributes() -> BTreeSet<i64> {
let mut registered_attrs = BTreeSet::new();
registered_attrs.insert(100);
registered_attrs.insert(200);
registered_attrs.insert(300);
registered_attrs
}
fn tx_report(tx_id: i64, changes: BTreeSet<i64>) -> TxReport {
TxReport {
tx_id: tx_id,
tx_instant: now(),
tempids: BTreeMap::new(),
changeset: changes,
}
}
#[test]
fn test_register_observer() {
let mut observer_service = TxObservationService::default();
let key = "Test Observing".to_string();
let registered_attrs = BTreeSet::new();
let tx_observer = TxObserver::new(registered_attrs, move |_obs_key, _batch| {});
observer_service.register(key.clone(), tx_observer);
assert!(observer_service.is_registered(&key));
}
#[test]
fn test_deregister_observer() {
let mut observer_service = TxObservationService::default();
let key = "Test Observing".to_string();
let registered_attrs = BTreeSet::new();
let tx_observer = TxObserver::new(registered_attrs, move |_obs_key, _batch| {});
observer_service.register(key.clone(), tx_observer);
assert!(observer_service.is_registered(&key));
observer_service.deregister(&key);
assert!(!observer_service.is_registered(&key));
}
#[test]
fn test_observer_notified_on_registered_change() {
let mut observer_service = TxObservationService::default();
let key = "Test Observing".to_string();
let registered_attrs = get_registered_observer_attributes();
let txids = Rc::new(RefCell::new(Vec::new()));
let changes = Rc::new(RefCell::new(Vec::new()));
let called_key: Rc<RefCell<Option<String>>> = Rc::new(RefCell::new(None));
let mut_txids = Rc::clone(&txids);
let mut_changes = Rc::clone(&changes);
let mut_key = Rc::clone(&called_key);
let tx_observer = TxObserver::new(registered_attrs, move |obs_key, batch| {
let mut k = mut_key.borrow_mut();
*k = Some(obs_key.clone());
let mut t = mut_txids.borrow_mut();
let mut c = mut_changes.borrow_mut();
for report in batch.iter() {
t.push(report.tx_id.clone());
c.push(report.changeset.clone());
}
t.sort();
});
observer_service.register(key.clone(), tx_observer);
assert!(observer_service.is_registered(&key));
let mut tx_set_1 = BTreeSet::new();
tx_set_1.insert(100);
tx_set_1.insert(400);
tx_set_1.insert(700);
let mut tx_set_2 = BTreeSet::new();
tx_set_2.insert(200);
tx_set_2.insert(300);
let mut tx_set_3 = BTreeSet::new();
tx_set_3.insert(600);
let mut batch = Vec::new();
batch.push(tx_report(10, tx_set_1.clone()));
batch.push(tx_report(11, tx_set_2.clone()));
batch.push(tx_report(12, tx_set_3));
observer_service.transaction_did_commit(&batch);
let val = called_key.deref();
assert_eq!(val, &RefCell::new(Some(key.clone())));
let t = txids.deref();
assert_eq!(t, &RefCell::new(vec![10, 11]));
let c = changes.deref();
assert_eq!(c, &RefCell::new(vec![tx_set_1, tx_set_2]));
}
#[test]
fn test_observer_not_notified_on_unregistered_change() {
let mut observer_service = TxObservationService::default();
let key = "Test Observing".to_string();
let registered_attrs = get_registered_observer_attributes();
let txids = Rc::new(RefCell::new(Vec::new()));
let changes = Rc::new(RefCell::new(Vec::new()));
let called_key: Rc<RefCell<Option<String>>> = Rc::new(RefCell::new(None));
let mut_txids = Rc::clone(&txids);
let mut_changes = Rc::clone(&changes);
let mut_key = Rc::clone(&called_key);
let tx_observer = TxObserver::new(registered_attrs, move |obs_key, batch| {
let mut k = mut_key.borrow_mut();
*k = Some(obs_key.clone());
let mut t = mut_txids.borrow_mut();
let mut c = mut_changes.borrow_mut();
for report in batch.iter() {
t.push(report.tx_id.clone());
c.push(report.changeset.clone());
}
t.sort();
});
observer_service.register(key.clone(), tx_observer);
assert!(observer_service.is_registered(&key));
let mut tx_set_1 = BTreeSet::new();
tx_set_1.insert(101);
tx_set_1.insert(401);
tx_set_1.insert(701);
let mut tx_set_2 = BTreeSet::new();
tx_set_2.insert(201);
tx_set_2.insert(301);
let mut tx_set_3 = BTreeSet::new();
tx_set_3.insert(601);
let mut batch = Vec::new();
batch.push(tx_report(10, tx_set_1));
batch.push(tx_report(11, tx_set_2));
batch.push(tx_report(12, tx_set_3));
observer_service.transaction_did_commit(&batch);
let val = called_key.deref();
assert_eq!(val, &RefCell::new(None));
let t = txids.deref();
assert_eq!(t, &RefCell::new(vec![]));
let c = changes.deref();
assert_eq!(c, &RefCell::new(vec![]));
}
#[test]
fn test_only_notifies_observers_registered_at_transact_start() {
let mut observer_service = TxObservationService::default();
let key_1 = "Test Observing 1".to_string();
let registered_attrs = get_registered_observer_attributes();
let txids_1 = Rc::new(RefCell::new(Vec::new()));
let changes_1 = Rc::new(RefCell::new(Vec::new()));
let called_key_1: Rc<RefCell<Option<String>>> = Rc::new(RefCell::new(None));
let mut_txids_1 = Rc::clone(&txids_1);
let mut_changes_1 = Rc::clone(&changes_1);
let mut_key_1 = Rc::clone(&called_key_1);
let tx_observer_1 = TxObserver::new(registered_attrs.clone(), move |obs_key, batch| {
let mut k = mut_key_1.borrow_mut();
*k = Some(obs_key.clone());
let mut t = mut_txids_1.borrow_mut();
let mut c = mut_changes_1.borrow_mut();
for report in batch.iter() {
t.push(report.tx_id.clone());
c.push(report.changeset.clone());
}
t.sort();
});
observer_service.register(key_1.clone(), tx_observer_1);
assert!(observer_service.is_registered(&key_1));
let mut tx_set_1 = BTreeSet::new();
tx_set_1.insert(100);
tx_set_1.insert(400);
tx_set_1.insert(700);
let mut batch = Vec::new();
batch.push(tx_report(10, tx_set_1.clone()));
// register second observer after one transact has occured
let key_2 = "Test Observing 2".to_string();
let txids_2 = Rc::new(RefCell::new(Vec::new()));
let changes_2 = Rc::new(RefCell::new(Vec::new()));
let called_key_2: Rc<RefCell<Option<String>>> = Rc::new(RefCell::new(None));
let mut_txids_2 = Rc::clone(&txids_2);
let mut_changes_2 = Rc::clone(&changes_2);
let mut_key_2 = Rc::clone(&called_key_2);
let tx_observer_2 = TxObserver::new(registered_attrs, move |obs_key, batch| {
let mut k = mut_key_2.borrow_mut();
*k = Some(obs_key.clone());
let mut t = mut_txids_2.borrow_mut();
let mut c = mut_changes_2.borrow_mut();
for report in batch.iter() {
t.push(report.tx_id.clone());
c.push(report.changeset.clone());
}
t.sort();
});
observer_service.register(key_2.clone(), tx_observer_2);
assert!(observer_service.is_registered(&key_2));
let mut tx_set_2 = BTreeSet::new();
tx_set_2.insert(200);
tx_set_2.insert(300);
batch.push(tx_report(11, tx_set_2.clone()));
observer_service.transaction_did_commit(&batch);
let val = called_key_1.deref();
assert_eq!(val, &RefCell::new(Some(key_1.clone())));
let t = txids_1.deref();
assert_eq!(t, &RefCell::new(vec![10, 11]));
let c = changes_1.deref();
assert_eq!(c, &RefCell::new(vec![tx_set_1.clone(), tx_set_2.clone()]));
let val = called_key_2.deref();
assert_eq!(val, &RefCell::new(None));
let t = txids_2.deref();
assert_eq!(t, &RefCell::new(vec![]));
let c = changes_2.deref();
assert_eq!(c, &RefCell::new(vec![]));
}

7
ffi/Cargo.toml Normal file
View file

@ -0,0 +1,7 @@
[package]
name = "mentat_ffi"
version = "0.1.0"
authors = ["Emily Toop <etoop@mozilla.com>"]
[dependencies.mentat]
path = ".."

11
ffi/src/android.rs Normal file
View file

@ -0,0 +1,11 @@
// TODO just use https://github.com/tomaka/android-rs-glue somehow?
use std::os::raw::c_char;
use std::os::raw::c_int;
// Logging
pub const ANDROID_LOG_DEBUG: i32 = 3;
pub const ANDROID_LOG_INFO: i32 = 4;
pub const ANDROID_LOG_WARN: i32 = 5;
pub const ANDROID_LOG_ERROR: i32 = 6;
extern { pub fn __android_log_write(prio: c_int, tag: *const c_char, text: *const c_char) -> c_int; }

150
ffi/src/lib.rs Normal file
View file

@ -0,0 +1,150 @@
// Copyright 2018 Mozilla
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// this file except in compliance with the License. You may obtain a copy of the
// License at http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
extern crate mentat;
use std::collections::{
BTreeSet,
};
use std::os::raw::{
c_char,
c_int,
};
use std::slice;
pub use mentat::{
NamespacedKeyword,
HasSchema,
Store,
TxObserver,
};
pub mod utils;
pub mod android;
pub use utils::strings::{
c_char_to_string,
string_to_c_char,
};
use utils::log;
#[repr(C)]
#[derive(Debug, Clone)]
pub struct ExternTxReport {
pub txid: i64,
pub changes: Box<[i64]>,
pub changes_len: usize
}
#[repr(C)]
#[derive(Debug)]
pub struct ExternTxReportList {
pub reports: Box<[ExternTxReport]>,
pub len: usize
}
#[no_mangle]
pub extern "C" fn new_store(uri: *const c_char) -> *mut Store {
let uri = c_char_to_string(uri);
let store = Store::open(&uri).expect("expected a store");
Box::into_raw(Box::new(store))
}
#[no_mangle]
pub unsafe extern "C" fn store_destroy(store: *mut Store) {
let _ = Box::from_raw(store);
}
#[no_mangle]
pub unsafe extern "C" fn store_register_observer(store: *mut Store,
key: *const c_char,
attributes: *const i64,
attributes_len: usize,
callback: extern fn(key: *const c_char)) {//, reports: &ExternTxReportList)) {
let store = &mut*store;
let mut attribute_set = BTreeSet::new();
let slice = slice::from_raw_parts(attributes, attributes_len);
log::d(&format!("Observer attribute slice: {:?}", slice));
for i in 0..attributes_len {
let attr = slice[i].into();
attribute_set.insert(attr);
}
log::d(&format!("Observer attribute set: {:?}", attribute_set));
let key = c_char_to_string(key);
let tx_observer = TxObserver::new(attribute_set, move |obs_key, batch| {
log::d(&format!("Calling observer registered for {:?}", obs_key));
let extern_reports: Vec<ExternTxReport> = batch.iter().map(|report| {
let changes: Vec<i64> = report.changeset.iter().map(|i|i.clone()).collect();
let len = changes.len();
ExternTxReport {
txid: report.tx_id.clone(),
changes: changes.into_boxed_slice(),
changes_len: len,
}
}).collect();
let len = extern_reports.len();
let reports = ExternTxReportList {
reports: extern_reports.into_boxed_slice(),
len: len,
};
callback(string_to_c_char(obs_key));//, &reports);
});
log::d(&format!("Registering observer for key: {:?}", key));
store.register_observer(key, tx_observer);
}
#[no_mangle]
pub unsafe extern "C" fn store_unregister_observer(store: *mut Store, key: *const c_char) {
let store = &mut*store;
let key = c_char_to_string(key);
log::d(&format!("Unregistering observer for key: {:?}", key));
store.unregister_observer(&key);
}
use std::panic;
#[no_mangle]
pub unsafe extern "C" fn store_entid_for_attribute(store: *mut Store, attr: *const c_char) -> i64 {
let store = &mut*store;
log::d(&format!("store_entid_for_attribute got store"));
let mut keyword_string = c_char_to_string(attr);
log::d(&format!("store_entid_for_attribute keyword_string {:?}", keyword_string));
let attr_name = keyword_string.split_off(1);
log::d(&format!("store_entid_for_attribute attr_name {:?}", attr_name));
let parts: Vec<&str> = attr_name.split("/").collect();
log::d(&format!("store_entid_for_attribute parts {:?}", parts));
let kw = NamespacedKeyword::new(parts[0], parts[1]);
log::d(&format!("store_entid_for_attribute kw {:?}", kw));
let conn = store.conn();
log::d(&format!("store_entid_for_attribute conn"));
let current_schema = conn.current_schema();
log::d(&format!("store_entid_for_attribute current_schema {:?}", current_schema));
let got_entid = current_schema.get_entid(&kw);
log::d(&format!("store_entid_for_attribute got_entid {:?}", got_entid));
let entid = got_entid.unwrap();
log::d(&format!("store_entid_for_attribute entid {:?}", entid));
entid.into()
}
#[no_mangle]
pub unsafe extern "C" fn tx_report_list_entry_at(tx_report_list: *mut ExternTxReportList, index: c_int) -> *const ExternTxReport {
let tx_report_list = &*tx_report_list;
let index = index as usize;
let report = Box::new(tx_report_list.reports[index].clone());
Box::into_raw(report)
}
#[no_mangle]
pub unsafe extern "C" fn changelist_entry_at(tx_report: *mut ExternTxReport, index: c_int) -> i64 {
let tx_report = &*tx_report;
let index = index as usize;
tx_report.changes[index].clone()
}

56
ffi/src/utils.rs Normal file
View file

@ -0,0 +1,56 @@
// Copyright 2016 Mozilla
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// this file except in compliance with the License. You may obtain a copy of the
// License at http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
pub mod strings {
use std::os::raw::c_char;
use std::ffi::{
CString,
CStr
};
pub fn c_char_to_string(cchar: *const c_char) -> String {
let c_str = unsafe { CStr::from_ptr(cchar) };
let r_str = match c_str.to_str() {
Err(_) => "",
Ok(string) => string,
};
r_str.to_string()
}
pub fn string_to_c_char(r_string: String) -> *mut c_char {
CString::new(r_string).unwrap().into_raw()
}
}
pub mod log {
#[cfg(all(target_os="android", not(test)))]
use std::ffi::CString;
#[cfg(all(target_os="android", not(test)))]
use android;
// TODO far from ideal. And, we might actually want to println in tests.
#[cfg(all(not(target_os="android"), not(target_os="ios")))]
pub fn d(_: &str) {}
#[cfg(all(target_os="ios", not(test)))]
pub fn d(message: &str) {
eprintln!("{}", message);
}
#[cfg(all(target_os="android", not(test)))]
pub fn d(message: &str) {
let message = CString::new(message).unwrap();
let message = message.as_ptr();
let tag = CString::new("mentat_ffi::utils").unwrap();
let tag = tag.as_ptr();
unsafe { android::__android_log_write(android::ANDROID_LOG_DEBUG, tag, message) };
}
}

View file

@ -48,10 +48,13 @@ use mentat_db::{
transact,
transact_terms,
PartitionMap,
TxObservationService,
TxObserver,
TxReport,
};
use mentat_db::internal_types::TermWithTempIds;
use mentat_db::db::PartitionMapping;
use mentat_tx;
@ -59,10 +62,6 @@ use mentat_tx::entities::TempId;
use mentat_tx_parser;
use mentat_tolstoy::Syncer;
use uuid::Uuid;
use entity_builder::{
InProgressBuilder,
};
@ -122,15 +121,16 @@ pub struct Conn {
// TODO: maintain cache of query plans that could be shared across threads and invalidated when
// the schema changes. #315.
attribute_cache: RwLock<SQLiteAttributeCache>,
tx_observer_service: Mutex<TxObservationService>,
}
/// A convenience wrapper around a single SQLite connection and a Conn. This is suitable
/// for applications that don't require complex connection management.
pub struct Store {
pub sqlite: rusqlite::Connection,
conn: Conn,
sqlite: rusqlite::Connection,
}
impl Store {
@ -157,6 +157,12 @@ impl Store {
sqlite: connection,
})
}
pub fn fast_forward_user_partition(&mut self, new_head: Entid) -> Result<()> {
let mut metadata = self.conn.metadata.lock().unwrap();
metadata.partition_map.expand_up_to(":db.part/user", new_head);
db::update_partition_map(&mut self.sqlite, &metadata.partition_map).map_err(|e| e.into())
}
}
pub trait Queryable {
@ -172,10 +178,6 @@ pub trait Queryable {
where E: Into<Entid>;
}
pub trait Syncable {
fn sync(&mut self, server_uri: &String, user_uuid: &String) -> Result<()>;
}
/// Represents an in-progress, not yet committed, set of changes to the store.
/// Call `commit` to commit your changes, or `rollback` to discard them.
/// A transaction is held open until you do so.
@ -187,8 +189,9 @@ pub struct InProgress<'a, 'c> {
partition_map: PartitionMap,
schema: Schema,
cache: RwLockWriteGuard<'a, SQLiteAttributeCache>,
use_caching: bool,
tx_reports: Vec<TxReport>,
observer_service: Option<&'a Mutex<TxObservationService>>,
}
/// Represents an in-progress set of reads to the store. Just like `InProgress`,
@ -337,6 +340,29 @@ impl<'a, 'c> HasSchema for InProgress<'a, 'c> {
}
}
use std::os::raw::c_char;
use std::os::raw::c_int;
use std::ffi::CString;
pub const ANDROID_LOG_DEBUG: i32 = 3;
#[cfg(all(target_os="android", not(test)))]
extern { pub fn __android_log_write(prio: c_int, tag: *const c_char, text: *const c_char) -> c_int; }
#[cfg(all(target_os="android", not(test)))]
pub fn d(message: &str) {
let tag = "mentat::conn";
let message = CString::new(message).unwrap();
let message = message.as_ptr();
let tag = CString::new(tag).unwrap();
let tag = tag.as_ptr();
unsafe { __android_log_write(ANDROID_LOG_DEBUG, tag, message) };
}
#[cfg(all(not(target_os="android")))]
pub fn d(message: &str) {
let tag = "mentat::conn";
println!("d: {}: {}", tag, message);
}
impl<'a, 'c> InProgress<'a, 'c> {
pub fn builder(self) -> InProgressBuilder<'a, 'c> {
@ -355,6 +381,7 @@ impl<'a, 'c> InProgress<'a, 'c> {
&self.schema,
terms,
tempid_set)?;
self.tx_reports.push(report.clone());
self.partition_map = next_partition_map;
if let Some(schema) = next_schema {
self.schema = schema;
@ -372,6 +399,8 @@ impl<'a, 'c> InProgress<'a, 'c> {
// `Default::default` in those situations to extract the partition map, and so there
// would still be some cost.
let (report, next_partition_map, next_schema) = transact(&self.transaction, self.partition_map.clone(), &self.schema, &self.schema, entities)?;
self.tx_reports.push(report.clone());
self.partition_map = next_partition_map;
if let Some(schema) = next_schema {
self.schema = schema;
@ -404,6 +433,14 @@ impl<'a, 'c> InProgress<'a, 'c> {
// Commit the SQLite transaction while we hold the mutex.
self.transaction.commit()?;
// notify any observers about a transaction that has been made.
if let Some(observer_service) = self.observer_service {
d(&format!("got an observer!"));
observer_service.lock().unwrap().transaction_did_commit(&self.tx_reports);
} else {
d(&format!("don't got no observer!"));
}
metadata.generation += 1;
metadata.partition_map = self.partition_map;
@ -451,6 +488,16 @@ impl Store {
direction,
CacheAction::Register)
}
pub fn register_observer(&mut self, key: String, observer: TxObserver) {
self.conn.register_observer(key, observer);
}
pub fn unregister_observer(&mut self, key: &String) {
println!("Store: unregistering observer for key {:?}", key);
self.conn.unregister_observer(key);
println!("Store: unregistered observer for key {:?}", key);
}
}
impl Queryable for Store {
@ -493,22 +540,21 @@ pub enum CacheAction {
Deregister,
}
impl Syncable for Store {
fn sync(&mut self, server_uri: &String, user_uuid: &String) -> Result<()> {
let uuid = Uuid::parse_str(&user_uuid)?;
Ok(Syncer::flow(&mut self.sqlite, server_uri, &uuid)?)
}
}
impl Conn {
// Intentionally not public.
fn new(partition_map: PartitionMap, schema: Schema) -> Conn {
Conn {
metadata: Mutex::new(Metadata::new(0, partition_map, Arc::new(schema))),
attribute_cache: Default::default()
attribute_cache: Default::default(),
tx_observer_service: Mutex::new(TxObservationService::default()),
}
}
#[cfg(test)]
pub fn is_registered_as_observer(&self, key: &String) -> bool {
self.tx_observer_service.lock().unwrap().is_registered(key)
}
/// Prepare the provided SQLite handle for use as a Mentat store. Creates tables but
/// _does not_ write the bootstrap schema. This constructor should only be used by
/// consumers that expect to populate raw transaction data themselves.
@ -656,6 +702,8 @@ impl Conn {
schema: (*current_schema).clone(),
cache: self.attribute_cache.write().unwrap(),
use_caching: true,
tx_reports: Vec::new(),
observer_service: if self.tx_observer_service.lock().unwrap().has_observers() { Some(&self.tx_observer_service) } else { None },
})
}
@ -739,6 +787,16 @@ impl Conn {
},
}
}
pub fn register_observer(&mut self, key: String, observer: TxObserver) {
self.tx_observer_service.lock().unwrap().register(key, observer);
}
pub fn unregister_observer(&mut self, key: &String) {
println!("Conn: unregistering observer for key {:?}", key);
self.tx_observer_service.lock().unwrap().deregister(key);
println!("Conn: unregistered observer for key {:?}", key);
}
}
#[cfg(test)]
@ -747,17 +805,42 @@ mod tests {
extern crate mentat_parser_utils;
use std::cell::{
RefCell
};
use std::collections::{
BTreeSet,
};
use std::ops::Deref;
use std::rc::{
Rc,
};
use std::time::Instant;
use mentat_core::{
TypedValue,
};
use query::{
use ::entity_builder::{
BuildTerms,
};
use ::query::{
Variable,
};
use ::QueryResults;
use ::vocabulary::{
AttributeBuilder,
Definition,
VersionedStore,
};
use ::vocabulary::attribute::{
Unique
};
use mentat_db::USER0;
#[test]
@ -1065,4 +1148,203 @@ mod tests {
assert!(cached_elapsed_time < uncached_elapsed_time);
}
}
#[test]
fn test_register_observer() {
let mut sqlite = db::new_connection("").unwrap();
let mut conn = Conn::connect(&mut sqlite).unwrap();
let key = "Test Observer".to_string();
let tx_observer = TxObserver::new(BTreeSet::new(), move |_obs_key, _batch| {});
conn.register_observer(key.clone(), tx_observer);
assert!(conn.is_registered_as_observer(&key));
}
#[test]
fn test_deregister_observer() {
let mut sqlite = db::new_connection("").unwrap();
let mut conn = Conn::connect(&mut sqlite).unwrap();
let key = "Test Observer".to_string();
let tx_observer = TxObserver::new(BTreeSet::new(), move |_obs_key, _batch| {});
conn.register_observer(key.clone(), tx_observer);
assert!(conn.is_registered_as_observer(&key));
conn.unregister_observer(&key);
assert!(!conn.is_registered_as_observer(&key));
}
fn add_schema(conn: &mut Conn, mut sqlite: &mut rusqlite::Connection) {
// transact some schema
let mut in_progress = conn.begin_transaction(&mut sqlite).expect("expected in progress");
in_progress.ensure_vocabulary(&Definition {
name: kw!(:todo/items),
version: 1,
attributes: vec![
(kw!(:todo/uuid),
AttributeBuilder::new()
.value_type(ValueType::Uuid)
.multival(false)
.unique(Unique::Value)
.index(true)
.build()),
(kw!(:todo/name),
AttributeBuilder::new()
.value_type(ValueType::String)
.multival(false)
.fulltext(true)
.build()),
(kw!(:todo/completion_date),
AttributeBuilder::new()
.value_type(ValueType::Instant)
.multival(false)
.build()),
(kw!(:label/name),
AttributeBuilder::new()
.value_type(ValueType::String)
.multival(false)
.unique(Unique::Value)
.fulltext(true)
.index(true)
.build()),
(kw!(:label/color),
AttributeBuilder::new()
.value_type(ValueType::String)
.multival(false)
.build()),
],
}).expect("expected vocubulary");
in_progress.commit().expect("Expected vocabulary committed");
}
#[test]
fn test_observer_notified_on_registered_change() {
let mut sqlite = db::new_connection("").unwrap();
let mut conn = Conn::connect(&mut sqlite).unwrap();
add_schema(&mut conn, &mut sqlite);
let name_entid: Entid = conn.current_schema().get_entid(&kw!(:todo/name)).expect("entid to exist for name").into();
let date_entid: Entid = conn.current_schema().get_entid(&kw!(:todo/completion_date)).expect("entid to exist for completion_date").into();
let mut registered_attrs = BTreeSet::new();
registered_attrs.insert(name_entid.clone());
registered_attrs.insert(date_entid.clone());
let key = "Test Observing".to_string();
let txids = Rc::new(RefCell::new(Vec::new()));
let changes = Rc::new(RefCell::new(Vec::new()));
let called_key: Rc<RefCell<Option<String>>> = Rc::new(RefCell::new(None));
let mut_txids = Rc::clone(&txids);
let mut_changes = Rc::clone(&changes);
let mut_key = Rc::clone(&called_key);
let tx_observer = TxObserver::new(registered_attrs, move |obs_key, batch| {
let mut k = mut_key.borrow_mut();
*k = Some(obs_key.clone());
let mut t = mut_txids.borrow_mut();
let mut c = mut_changes.borrow_mut();
for report in batch.iter() {
t.push(report.tx_id.clone());
c.push(report.changeset.clone());
}
t.sort();
});
conn.register_observer(key.clone(), tx_observer);
assert!(conn.is_registered_as_observer(&key));
let mut tx_ids = Vec::new();
let mut changesets = Vec::new();
{
let mut in_progress = conn.begin_transaction(&mut sqlite).expect("expected transaction");
for i in 0..3 {
let name = format!("todo{}", i);
let uuid = Uuid::new_v4();
let mut builder = in_progress.builder().describe_tempid(&name);
builder.add_kw( &kw!(:todo/uuid), TypedValue::Uuid(uuid)).expect("Expected added uuid");
builder.add_kw(&kw!(:todo/name), TypedValue::typed_string(&name)).expect("Expected added name");
if i % 2 == 0 {
builder.add_kw(&kw!(:todo/completion_date), TypedValue::current_instant()).expect("Expected added date");
}
let (ip, r) = builder.transact();
let report = r.expect("expected a report");
tx_ids.push(report.tx_id.clone());
changesets.push(report.changeset.clone());
in_progress = ip;
}
let mut builder = in_progress.builder().describe_tempid("Label");
builder.add_kw(&kw!(:label/name), TypedValue::typed_string("Label 1")).expect("Expected added name");
builder.add_kw(&kw!(:label/color), TypedValue::typed_string("blue")).expect("Expected added color");
builder.commit().expect("expect transaction to occur");
}
let val = called_key.deref();
assert_eq!(val, &RefCell::new(Some(key.clone())));
let t = txids.deref();
assert_eq!(t, &RefCell::new(tx_ids));
let c = changes.deref();
assert_eq!(c, &RefCell::new(changesets));
}
#[test]
fn test_observer_not_notified_on_unregistered_change() {
let mut sqlite = db::new_connection("").unwrap();
let mut conn = Conn::connect(&mut sqlite).unwrap();
add_schema(&mut conn, &mut sqlite);
let name_entid: Entid = conn.current_schema().get_entid(&kw!(:todo/name)).expect("entid to exist for name").into();
let date_entid: Entid = conn.current_schema().get_entid(&kw!(:todo/completion_date)).expect("entid to exist for completion_date").into();
let mut registered_attrs = BTreeSet::new();
registered_attrs.insert(name_entid.clone());
registered_attrs.insert(date_entid.clone());
let key = "Test Observing".to_string();
let txids = Rc::new(RefCell::new(Vec::new()));
let changes = Rc::new(RefCell::new(Vec::new()));
let called_key: Rc<RefCell<Option<String>>> = Rc::new(RefCell::new(None));
let mut_txids = Rc::clone(&txids);
let mut_changes = Rc::clone(&changes);
let mut_key = Rc::clone(&called_key);
let tx_observer = TxObserver::new(registered_attrs, move |obs_key, batch| {
let mut k = mut_key.borrow_mut();
*k = Some(obs_key.clone());
let mut t = mut_txids.borrow_mut();
let mut c = mut_changes.borrow_mut();
for report in batch.iter() {
t.push(report.tx_id.clone());
c.push(report.changeset.clone());
}
t.sort();
});
conn.register_observer(key.clone(), tx_observer);
assert!(conn.is_registered_as_observer(&key));
{
let mut in_progress = conn.begin_transaction(&mut sqlite).expect("expected transaction");
for i in 0..3 {
let name = format!("label{}", i);
let mut builder = in_progress.builder().describe_tempid(&name);
builder.add_kw(&kw!(:label/name), TypedValue::typed_string(&name)).expect("Expected added name");
builder.add_kw(&kw!(:label/color), TypedValue::typed_string("blue")).expect("Expected added color");
let (ip, _) = builder.transact();
in_progress = ip;
}
}
let val = called_key.deref();
assert_eq!(val, &RefCell::new(None));
let t = txids.deref();
assert_eq!(t, &RefCell::new(vec![]));
let c = changes.deref();
assert_eq!(c, &RefCell::new(vec![]));
}
}

View file

@ -108,5 +108,10 @@ error_chain! {
description("provided value doesn't match value type")
display("provided value of type {} doesn't match attribute value type {}", provided, expected)
}
NotYetImplemented(t: String) {
description("not yet implemented")
display("not yet implemented: {}", t)
}
}
}

View file

@ -29,10 +29,12 @@ extern crate mentat_query_parser;
extern crate mentat_query_projector;
extern crate mentat_query_translator;
extern crate mentat_sql;
extern crate mentat_tolstoy;
extern crate mentat_tx;
extern crate mentat_tx_parser;
#[cfg(feature = "syncable")]
extern crate mentat_tolstoy;
pub use mentat_core::{
Attribute,
Entid,
@ -52,6 +54,7 @@ pub use mentat_query::{
pub use mentat_db::{
CORE_SCHEMA_VERSION,
DB_SCHEMA_CORE,
TxObserver,
TxReport,
new_connection,
};
@ -95,6 +98,13 @@ pub mod conn;
pub mod query;
pub mod entity_builder;
#[cfg(feature = "syncable")]
pub mod sync;
pub fn get_name() -> String {
return String::from("mentat");
}
pub use query::{
IntoResult,
PlainSymbol,
@ -115,7 +125,6 @@ pub use conn::{
InProgress,
Metadata,
Queryable,
Syncable,
Store,
};

133
src/sync.rs Normal file
View file

@ -0,0 +1,133 @@
// Copyright 2016 Mozilla
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// this file except in compliance with the License. You may obtain a copy of the
// License at http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
use uuid::Uuid;
use conn::Store;
use errors::{
Result,
ErrorKind,
};
use mentat_core::{
Entid,
KnownEntid,
};
use mentat_db as db;
use entity_builder::BuildTerms;
use mentat_tolstoy::{
Syncer,
SyncMetadataClient,
TxMapper,
};
use mentat_tolstoy::syncer::{
Tx,
SyncResult,
};
use mentat_tolstoy::metadata::HeadTrackable;
pub trait Syncable {
fn sync(&mut self, server_uri: &String, user_uuid: &String) -> Result<()>;
fn fast_forward_local(&mut self, txs: Vec<Tx>) -> Result<()>;
}
fn within_user_partition(entid: Entid) -> bool {
entid >= db::USER0 && entid < db::TX0
}
impl Syncable for Store {
fn fast_forward_local(&mut self, txs: Vec<Tx>) -> Result<()> {
let mut last_tx_entid = None;
let mut last_tx_uuid = None;
// During fast-forwarding, we will insert datoms with known entids
// which, by definition, fall outside of our user partition.
// Once we've done with insertion, we need to ensure that user
// partition's next allocation will not overlap with just-inserted datoms.
// To allow for "holes" in the user partition (due to data excision),
// we track the highest incoming entid we saw, and expand our
// local partition to match.
// In absence of excision and implementation bugs, this should work
// just as if we counted number of incoming entids and expanded by
// that number instead.
let mut largest_endid_encountered = db::USER0;
for tx in txs {
let in_progress = self.begin_transaction()?;
let mut builder = in_progress.builder();
for part in tx.parts {
if part.added {
builder.add(KnownEntid(part.e), KnownEntid(part.a), part.v.clone())?;
} else {
builder.retract(KnownEntid(part.e), KnownEntid(part.a), part.v.clone())?;
}
// Ignore datoms that fall outside of the user partition:
if within_user_partition(part.e) && part.e > largest_endid_encountered {
largest_endid_encountered = part.e;
}
}
let report = builder.commit()?;
last_tx_entid = Some(report.tx_id);
last_tx_uuid = Some(tx.tx.clone());
}
// We've just transacted a new tx, and generated a new tx entid.
// Map it to the corresponding incoming tx uuid, advance our
// "locally known remote head".
if let Some(uuid) = last_tx_uuid {
if let Some(entid) = last_tx_entid {
{
let mut db_tx = self.sqlite.transaction()?;
SyncMetadataClient::set_remote_head(&mut db_tx, &uuid)?;
TxMapper::set_tx_uuid(&mut db_tx, entid, &uuid)?;
db_tx.commit()?;
}
// only need to advance the user partition, since we're using KnownEntid and partition won't
// get auto-updated; shouldn't be a problem for tx partition, since we're relying on the builder
// to create a tx and advance the partition for us.
self.fast_forward_user_partition(largest_endid_encountered)?;
}
}
Ok(())
}
fn sync(&mut self, server_uri: &String, user_uuid: &String) -> Result<()> {
let uuid = Uuid::parse_str(&user_uuid)?;
let sync_result;
{
let mut db_tx = self.sqlite.transaction()?;
sync_result = Syncer::flow(&mut db_tx, server_uri, &uuid)?;
// TODO this should be done _after_ all of the operations below conclude!
// Commits any changes Syncer made (schema, updated heads, tu mappings during an upload, etc)
db_tx.commit()?;
}
// TODO These operations need to borrow self as mutable; but we already borrow it for db_tx above,
// and so for now we split up sync into multiple db transactions /o\
// Fixing this likely involves either implementing flow on InProgress, or changing flow to
// take an InProgress instead of a raw sql transaction.
match sync_result {
SyncResult::EmptyServer => Ok(()),
SyncResult::NoChanges => Ok(()),
SyncResult::ServerFastForward => Ok(()),
SyncResult::Merge => bail!(ErrorKind::NotYetImplemented(
format!("Can't sync against diverged local.")
)),
SyncResult::LocalFastForward(txs) => self.fast_forward_local(txs)
}
}
}

View file

@ -97,12 +97,12 @@ fn test_reader() {
let mut conn = Conn::connect(&mut c).expect("Couldn't open DB.");
{
let db_tx = c.transaction().expect("db tx");
// Don't inspect the bootstrap transaction, but we'd like to see it's there.
// Ensure that the first (bootstrap) transaction is skipped over.
let mut receiver = TxCountingReceiver::new();
assert_eq!(false, receiver.is_done);
Processor::process(&db_tx, None, &mut receiver).expect("processor");
assert_eq!(true, receiver.is_done);
assert_eq!(1, receiver.tx_count);
assert_eq!(0, receiver.tx_count);
}
let ids = conn.transact(&mut c, r#"[
@ -112,7 +112,7 @@ fn test_reader() {
]"#).expect("successful transaction").tempids;
let numba_entity_id = ids.get("s").unwrap();
let mut bootstrap_tx = None;
let first_tx;
{
let db_tx = c.transaction().expect("db tx");
// Expect to see one more transaction of four parts (one for tx datom itself).
@ -121,10 +121,10 @@ fn test_reader() {
println!("{:#?}", receiver);
assert_eq!(2, receiver.txes.keys().count());
assert_tx_datoms_count(&receiver, 1, 4);
assert_eq!(1, receiver.txes.keys().count());
assert_tx_datoms_count(&receiver, 0, 4);
bootstrap_tx = Some(*receiver.txes.keys().nth(0).expect("bootstrap tx"));
first_tx = Some(*receiver.txes.keys().nth(0).expect("first tx"));
}
let ids = conn.transact(&mut c, r#"[
@ -138,14 +138,14 @@ fn test_reader() {
// Expect to see a single two part transaction
let mut receiver = TestingReceiver::new();
// Note that we're asking for the bootstrap tx to be skipped by the processor.
Processor::process(&db_tx, bootstrap_tx, &mut receiver).expect("processor");
// Note that we're asking for the first transacted tx to be skipped by the processor.
Processor::process(&db_tx, first_tx, &mut receiver).expect("processor");
assert_eq!(2, receiver.txes.keys().count());
assert_tx_datoms_count(&receiver, 1, 2);
assert_eq!(1, receiver.txes.keys().count());
assert_tx_datoms_count(&receiver, 0, 2);
// Inspect the transaction part.
let tx_id = receiver.txes.keys().nth(1).expect("tx");
let tx_id = receiver.txes.keys().nth(0).expect("tx");
let datoms = receiver.txes.get(tx_id).expect("datoms");
let part = &datoms[0];

View file

@ -49,11 +49,6 @@ error_chain! {
display("encountered unexpected state: {}", t)
}
NotYetImplemented(t: String) {
description("not yet implemented")
display("not yet implemented: {}", t)
}
DuplicateMetadata(k: String) {
description("encountered more than one metadata value for key")
display("encountered more than one metadata value for key: {}", k)

View file

@ -39,7 +39,9 @@ pub mod tx_processor;
pub mod errors;
pub mod syncer;
pub mod tx_mapper;
pub use tx_mapper::TxMapper;
pub use syncer::Syncer;
pub use metadata::SyncMetadataClient;
pub use errors::{
Error,
ErrorKind,

View file

@ -54,16 +54,16 @@ mod tests {
#[test]
fn test_get_remote_head_default() {
let mut conn = schema::tests::setup_conn();
let tx = conn.transaction().expect("db tx");
let mut conn = schema::tests::setup_conn_bare();
let tx = schema::tests::setup_tx(&mut conn);
assert_eq!(Uuid::nil(), SyncMetadataClient::remote_head(&tx).expect("fetch succeeded"));
}
#[test]
fn test_set_and_get_remote_head() {
let mut conn = schema::tests::setup_conn();
let mut conn = schema::tests::setup_conn_bare();
let tx = schema::tests::setup_tx(&mut conn);
let uuid = Uuid::new_v4();
let tx = conn.transaction().expect("db tx");
SyncMetadataClient::set_remote_head(&tx, &uuid).expect("update succeeded");
assert_eq!(uuid, SyncMetadataClient::remote_head(&tx).expect("fetch succeeded"));
}

View file

@ -24,15 +24,13 @@ lazy_static! {
};
}
pub fn ensure_current_version(conn: &mut rusqlite::Connection) -> Result<()> {
let tx = conn.transaction()?;
pub fn ensure_current_version(tx: &mut rusqlite::Transaction) -> Result<()> {
for statement in (&SCHEMA_STATEMENTS).iter() {
tx.execute(statement, &[])?;
}
tx.execute("INSERT OR IGNORE INTO tolstoy_metadata (key, value) VALUES (?, zeroblob(16))", &[&REMOTE_HEAD_KEY])?;
tx.commit().map_err(|e| e.into())
Ok(())
}
#[cfg(test)]
@ -40,7 +38,7 @@ pub mod tests {
use super::*;
use uuid::Uuid;
fn setup_conn_bare() -> rusqlite::Connection {
pub fn setup_conn_bare() -> rusqlite::Connection {
let conn = rusqlite::Connection::open_in_memory().unwrap();
conn.execute_batch("
@ -54,19 +52,24 @@ pub mod tests {
conn
}
pub fn setup_conn() -> rusqlite::Connection {
let mut conn = setup_conn_bare();
ensure_current_version(&mut conn).expect("connection setup");
conn
pub fn setup_tx_bare<'a>(conn: &'a mut rusqlite::Connection) -> rusqlite::Transaction<'a> {
conn.transaction().expect("tx")
}
pub fn setup_tx<'a>(conn: &'a mut rusqlite::Connection) -> rusqlite::Transaction<'a> {
let mut tx = conn.transaction().expect("tx");
ensure_current_version(&mut tx).expect("connection setup");
tx
}
#[test]
fn test_empty() {
let mut conn = setup_conn_bare();
let mut tx = setup_tx_bare(&mut conn);
assert!(ensure_current_version(&mut conn).is_ok());
assert!(ensure_current_version(&mut tx).is_ok());
let mut stmt = conn.prepare("SELECT key FROM tolstoy_metadata WHERE value = zeroblob(16)").unwrap();
let mut stmt = tx.prepare("SELECT key FROM tolstoy_metadata WHERE value = zeroblob(16)").unwrap();
let mut keys_iter = stmt.query_map(&[], |r| r.get(0)).expect("query works");
let first: Result<String> = keys_iter.next().unwrap().map_err(|e| e.into());
@ -82,27 +85,23 @@ pub mod tests {
#[test]
fn test_non_empty() {
let mut conn = setup_conn_bare();
let mut tx = setup_tx_bare(&mut conn);
assert!(ensure_current_version(&mut conn).is_ok());
assert!(ensure_current_version(&mut tx).is_ok());
let test_uuid = Uuid::new_v4();
{
let tx = conn.transaction().unwrap();
let uuid_bytes = test_uuid.as_bytes().to_vec();
match tx.execute("UPDATE tolstoy_metadata SET value = ? WHERE key = ?", &[&uuid_bytes, &REMOTE_HEAD_KEY]) {
Err(e) => panic!("Error running an update: {}", e),
_ => ()
}
match tx.commit() {
Err(e) => panic!("Error committing an update: {}", e),
_ => ()
}
}
assert!(ensure_current_version(&mut conn).is_ok());
assert!(ensure_current_version(&mut tx).is_ok());
// Check that running ensure_current_version on an initialized conn doesn't change anything.
let mut stmt = conn.prepare("SELECT value FROM tolstoy_metadata").unwrap();
let mut stmt = tx.prepare("SELECT value FROM tolstoy_metadata").unwrap();
let mut values_iter = stmt.query_map(&[], |r| {
let raw_uuid: Vec<u8> = r.get(0);
Uuid::from_bytes(raw_uuid.as_slice()).unwrap()

View file

@ -49,19 +49,27 @@ use tx_mapper::TxMapper;
// See https://github.com/mozilla/mentat/issues/571
// Below is some debug Android-friendly logging:
// use std::os::raw::c_char;
// use std::os::raw::c_int;
// use std::ffi::CString;
// pub const ANDROID_LOG_DEBUG: i32 = 3;
// extern { pub fn __android_log_write(prio: c_int, tag: *const c_char, text: *const c_char) -> c_int; }
use std::os::raw::c_char;
use std::os::raw::c_int;
use std::ffi::CString;
pub const ANDROID_LOG_DEBUG: i32 = 3;
#[cfg(all(target_os="android", not(test)))]
extern { pub fn __android_log_write(prio: c_int, tag: *const c_char, text: *const c_char) -> c_int; }
#[cfg(all(target_os="android", not(test)))]
pub fn d(message: &str) {
println!("d: {}", message);
// let message = CString::new(message).unwrap();
// let message = message.as_ptr();
// let tag = CString::new("RustyToodle").unwrap();
// let tag = tag.as_ptr();
// unsafe { __android_log_write(ANDROID_LOG_DEBUG, tag, message) };
let tag = "mentat_tolstoy::syncer";
let message = CString::new(message).unwrap();
let message = message.as_ptr();
let tag = CString::new(tag).unwrap();
let tag = tag.as_ptr();
unsafe { __android_log_write(ANDROID_LOG_DEBUG, tag, message) };
}
#[cfg(all(not(target_os="android")))]
pub fn d(message: &str) {
let tag = "mentat_tolstoy::syncer";
println!("d: {}: {}", tag, message);
}
pub struct Syncer {}
@ -88,11 +96,13 @@ impl TxReceiver for InquiringTxReceiver {
fn tx<T>(&mut self, tx_id: Entid, _datoms: &mut T) -> Result<()>
where T: Iterator<Item=TxPart> {
self.last_tx = Some(tx_id);
d(&format!("got new last_tx: {:?}", self.last_tx));
Ok(())
}
fn done(&mut self) -> Result<()> {
self.is_done = true;
d(&format!("done!"));
Ok(())
}
}
@ -154,7 +164,7 @@ impl<'c> TxReceiver for UploadingTxReceiver<'c> {
Some(parent) => {
d(&format!("putting transaction: {:?}, {:?}, {:?}", &tx_uuid, &parent, &tx_chunks));
self.remote_client.put_transaction(&tx_uuid, &parent, &tx_chunks)?;
},
None => {
d(&format!("putting transaction: {:?}, {:?}, {:?}", &tx_uuid, &self.remote_head, &tx_chunks));
@ -174,11 +184,27 @@ impl<'c> TxReceiver for UploadingTxReceiver<'c> {
}
}
// For returning out of the downloader as an ordered list.
#[derive(Debug)]
pub struct Tx {
pub tx: Uuid,
pub parts: Vec<TxPart>,
}
pub enum SyncResult {
EmptyServer,
NoChanges,
ServerFastForward,
LocalFastForward(Vec<Tx>),
Merge,
}
impl Syncer {
fn upload_ours(db_tx: &mut rusqlite::Transaction, from_tx: Option<Entid>, remote_client: &RemoteClient, remote_head: &Uuid) -> Result<()> {
fn fast_forward_server(db_tx: &mut rusqlite::Transaction, from_tx: Option<Entid>, remote_client: &RemoteClient, remote_head: &Uuid) -> Result<()> {
let mut uploader = UploadingTxReceiver::new(remote_client, remote_head);
Processor::process(db_tx, from_tx, &mut uploader)?;
if !uploader.is_done {
d(&format!("upload_ours: TxProcessorUnfinished!"));
bail!(ErrorKind::TxProcessorUnfinished);
}
// Last tx uuid uploaded by the tx receiver.
@ -197,19 +223,47 @@ impl Syncer {
Ok(())
}
pub fn flow(sqlite: &mut rusqlite::Connection, server_uri: &String, user_uuid: &Uuid) -> Result<()> {
fn download_theirs(_db_tx: &mut rusqlite::Transaction, remote_client: &RemoteClient, remote_head: &Uuid) -> Result<Vec<Tx>> {
let new_txs = remote_client.get_transactions(remote_head)?;
d(&format!("there are {} new_txs on the remote client", new_txs.len()));
let mut tx_list = Vec::new();
for tx in new_txs {
let mut tx_parts = Vec::new();
let chunks = remote_client.get_chunks(&tx)?;
d(&format!("received {} chunks from remote client", chunks.len()));
// We pass along all of the downloaded parts, including transaction's
// metadata datom. Transactor is expected to do the right thing, and
// use txInstant from one of our datoms.
for chunk in chunks {
let part = remote_client.get_chunk(&chunk)?;
tx_parts.push(part);
}
tx_list.push(Tx {
tx: tx,
parts: tx_parts
});
}
d(&format!("got tx list: {:?}", &tx_list));
Ok(tx_list)
}
pub fn flow(db_tx: &mut rusqlite::Transaction, server_uri: &String, user_uuid: &Uuid) -> Result<SyncResult> {
d(&format!("sync flowing"));
ensure_current_version(sqlite)?;
ensure_current_version(db_tx)?;
// TODO configure this sync with some auth data
let remote_client = RemoteClient::new(server_uri.clone(), user_uuid.clone());
let mut db_tx = sqlite.transaction()?;
let remote_head = remote_client.get_head()?;
d(&format!("remote head {:?}", remote_head));
let locally_known_remote_head = SyncMetadataClient::remote_head(&db_tx)?;
let locally_known_remote_head = SyncMetadataClient::remote_head(db_tx)?;
d(&format!("local head {:?}", locally_known_remote_head));
// Local head: latest transaction that we have in the store,
@ -220,61 +274,75 @@ impl Syncer {
let mut inquiring_tx_receiver = InquiringTxReceiver::new();
// TODO don't just start from the beginning... but then again, we should do this
// without walking the table at all, and use the tx index.
Processor::process(&db_tx, None, &mut inquiring_tx_receiver)?;
let inq_res = Processor::process(db_tx, None, &mut inquiring_tx_receiver);
match inq_res {
Ok(_) => d(&format!("inquiry ok")),
Err(e) => {
d(&format!("inquiry err: {:?}", e));
return Err(e);
}
}
d(&format!("After Processor::process inquiring_tx_receiver"));
if !inquiring_tx_receiver.is_done {
d(&format!("!inquiring_tx_receiver.is_done"));
bail!(ErrorKind::TxProcessorUnfinished);
}
let have_local_changes = match inquiring_tx_receiver.last_tx {
d(&format!("TxMapper::get... local head"));
let (have_local_changes, local_store_empty) = match inquiring_tx_receiver.last_tx {
Some(tx) => {
match TxMapper::get(&db_tx, tx)? {
Some(_) => false,
None => true
match TxMapper::get(db_tx, tx)? {
Some(_) => (false, false),
None => (true, false)
}
},
None => false
None => (false, true)
};
d(&format!("has_local_changes {:?}, local_empty {:?}", have_local_changes, local_store_empty));
// Check if the server is empty - populate it.
if remote_head == Uuid::nil() {
d(&format!("empty server!"));
Syncer::upload_ours(&mut db_tx, None, &remote_client, &remote_head)?;
Syncer::fast_forward_server(db_tx, None, &remote_client, &remote_head)?;
return Ok(SyncResult::EmptyServer);
// Check if the server is the same as us, and if our HEAD moved.
} else if locally_known_remote_head == remote_head {
d(&format!("server unchanged since last sync."));
if !have_local_changes {
d(&format!("local HEAD did not move. Nothing to do!"));
return Ok(());
return Ok(SyncResult::NoChanges);
}
d(&format!("local HEAD moved."));
// TODO it's possible that we've successfully advanced remote head previously,
// but failed to advance our own local head. If that's the case, and we can recognize it,
// our sync becomes just bumping our local head. AFAICT below would currently fail.
if let Some(upload_from_tx) = TxMapper::get_tx_for_uuid(&db_tx, &locally_known_remote_head)? {
if let Some(upload_from_tx) = TxMapper::get_tx_for_uuid(db_tx, &locally_known_remote_head)? {
d(&format!("Fast-forwarding the server."));
Syncer::upload_ours(&mut db_tx, Some(upload_from_tx), &remote_client, &remote_head)?;
Syncer::fast_forward_server(db_tx, Some(upload_from_tx), &remote_client, &remote_head)?;
return Ok(SyncResult::ServerFastForward);
} else {
d(&format!("Unable to fast-forward the server; missing local tx mapping"));
bail!(ErrorKind::TxIncorrectlyMapped(0));
}
// We diverged from the server.
// We'll need to rebase/merge ourselves on top of it.
// We diverged from the server. If we're lucky, we can just fast-forward local.
// Otherwise, a merge (or a rebase) is required.
} else {
d(&format!("server changed since last sync."));
bail!(ErrorKind::NotYetImplemented(
format!("Can't yet sync against changed server. Local head {:?}, remote head {:?}", locally_known_remote_head, remote_head)
// TODO local store moved forward since we last synced. Need to merge or rebase.
if !local_store_empty && have_local_changes {
return Ok(SyncResult::Merge);
}
d(&format!("fast-forwarding local store."));
return Ok(SyncResult::LocalFastForward(
Syncer::download_theirs(db_tx, &remote_client, &locally_known_remote_head)?
));
}
// Commit everything, if there's anything to commit!
// Any new tx->uuid mappings and the new HEAD. We're synced!
db_tx.commit()?;
Ok(())
// Our caller will commit the tx with our changes when it's done.
}
}
@ -289,9 +357,24 @@ struct SerializedTransaction<'a> {
chunks: &'a Vec<Uuid>
}
#[derive(Deserialize)]
struct DeserializableTransaction {
parent: Uuid,
chunks: Vec<Uuid>,
id: Uuid,
seq: i64,
}
#[derive(Deserialize)]
struct SerializedTransactions {
limit: i64,
from: Uuid,
transactions: Vec<Uuid>,
}
struct RemoteClient {
base_uri: String,
user_uuid: Uuid
user_uuid: Uuid,
}
@ -308,9 +391,14 @@ impl RemoteClient {
format!("{}/{}", self.base_uri, self.user_uuid)
}
// TODO what we want is a method that returns a deserialized json structure.
// It'll need a type T so that consumers can specify what downloaded json will
// map to. I ran into borrow issues doing that - probably need to restructure
// this and use PhantomData markers or somesuch.
// But for now, we get code duplication.
fn get_uuid(&self, uri: String) -> Result<Uuid> {
let mut core = Core::new()?;
// TODO enable TLS, see https://github.com/mozilla/mentat/issues/569
// TODO https://github.com/mozilla/mentat/issues/569
// let client = hyper::Client::configure()
// .connector(hyper_tls::HttpsConnector::new(4, &core.handle()).unwrap())
// .build(&core.handle());
@ -321,29 +409,28 @@ impl RemoteClient {
let uri = uri.parse()?;
d(&format!("parsed uri {:?}", uri));
let work = client.get(uri).and_then(|res| {
println!("Response: {}", res.status());
res.body().concat2().and_then(move |body| {
let head_json: SerializedHead = serde_json::from_slice(&body).map_err(|e| {
let json: SerializedHead = serde_json::from_slice(&body).map_err(|e| {
std::io::Error::new(std::io::ErrorKind::Other, e)
})?;
Ok(head_json)
Ok(json)
})
});
d(&format!("running..."));
let head_json = core.run(work)?;
d(&format!("got head: {:?}", &head_json.head));
Ok(head_json.head)
}
fn put<T>(&self, uri: String, payload: T, expected: StatusCode) -> Result<()>
where hyper::Body: std::convert::From<T>, {
let mut core = Core::new()?;
// TODO enable TLS, see https://github.com/mozilla/mentat/issues/569
// TODO https://github.com/mozilla/mentat/issues/569
// let client = hyper::Client::configure()
// .connector(hyper_tls::HttpsConnector::new(4, &core.handle()).unwrap())
// .build(&core.handle());
@ -372,6 +459,105 @@ impl RemoteClient {
Ok(())
}
fn get_transactions(&self, parent_uuid: &Uuid) -> Result<Vec<Uuid>> {
let mut core = Core::new()?;
// TODO https://github.com/mozilla/mentat/issues/569
// let client = hyper::Client::configure()
// .connector(hyper_tls::HttpsConnector::new(4, &core.handle()).unwrap())
// .build(&core.handle());
let client = hyper::Client::new(&core.handle());
d(&format!("client"));
let uri = format!("{}/transactions?from={}", self.bound_base_uri(), parent_uuid);
let uri = uri.parse()?;
d(&format!("parsed uri {:?}", uri));
let work = client.get(uri).and_then(|res| {
println!("Response: {}", res.status());
res.body().concat2().and_then(move |body| {
let json: SerializedTransactions = serde_json::from_slice(&body).map_err(|e| {
std::io::Error::new(std::io::ErrorKind::Other, e)
})?;
Ok(json)
})
});
d(&format!("running..."));
let transactions_json = core.run(work)?;
d(&format!("got transactions: {:?}", &transactions_json.transactions));
Ok(transactions_json.transactions)
}
fn get_chunks(&self, transaction_uuid: &Uuid) -> Result<Vec<Uuid>> {
let mut core = Core::new()?;
// TODO https://github.com/mozilla/mentat/issues/569
// let client = hyper::Client::configure()
// .connector(hyper_tls::HttpsConnector::new(4, &core.handle()).unwrap())
// .build(&core.handle());
let client = hyper::Client::new(&core.handle());
d(&format!("client"));
let uri = format!("{}/transactions/{}", self.bound_base_uri(), transaction_uuid);
let uri = uri.parse()?;
d(&format!("parsed uri {:?}", uri));
let work = client.get(uri).and_then(|res| {
println!("Response: {}", res.status());
res.body().concat2().and_then(move |body| {
let json: DeserializableTransaction = serde_json::from_slice(&body).map_err(|e| {
std::io::Error::new(std::io::ErrorKind::Other, e)
})?;
Ok(json)
})
});
d(&format!("running..."));
let transaction_json = core.run(work)?;
d(&format!("got transaction chunks: {:?}", &transaction_json.chunks));
Ok(transaction_json.chunks)
}
fn get_chunk(&self, chunk_uuid: &Uuid) -> Result<TxPart> {
let mut core = Core::new()?;
// TODO https://github.com/mozilla/mentat/issues/569
// let client = hyper::Client::configure()
// .connector(hyper_tls::HttpsConnector::new(4, &core.handle()).unwrap())
// .build(&core.handle());
let client = hyper::Client::new(&core.handle());
d(&format!("client"));
let uri = format!("{}/chunks/{}", self.bound_base_uri(), chunk_uuid);
let uri = uri.parse()?;
d(&format!("parsed uri {:?}", uri));
let work = client.get(uri).and_then(|res| {
println!("Response: {}", res.status());
res.body().concat2().and_then(move |body| {
let json: TxPart = serde_json::from_slice(&body).map_err(|e| {
std::io::Error::new(std::io::ErrorKind::Other, e)
})?;
Ok(json)
})
});
d(&format!("running..."));
let chunk = core.run(work)?;
d(&format!("got transaction chunk: {:?}", &chunk));
Ok(chunk)
}
fn put_transaction(&self, transaction_uuid: &Uuid, parent_uuid: &Uuid, chunks: &Vec<Uuid>) -> Result<()> {
// {"parent": uuid, "chunks": [chunk1, chunk2...]}
let transaction = SerializedTransaction {

View file

@ -33,6 +33,13 @@ impl TxMapper {
Ok(())
}
// TODO upsert...? error checking..?
pub fn set_tx_uuid(db_tx: &mut rusqlite::Transaction, tx: Entid, uuid: &Uuid) -> Result<()> {
let uuid_bytes = uuid.as_bytes().to_vec();
db_tx.execute("INSERT INTO tolstoy_tu (tx, uuid) VALUES (?, ?)", &[&tx, &uuid_bytes])?;
Ok(())
}
// TODO for when we're downloading, right?
pub fn get_or_set_uuid_for_tx(db_tx: &mut rusqlite::Transaction, tx: Entid) -> Result<Uuid> {
match TxMapper::get(db_tx, tx)? {
@ -92,8 +99,8 @@ pub mod tests {
#[test]
fn test_getters() {
let mut conn = schema::tests::setup_conn();
let mut tx = conn.transaction().expect("db tx");
let mut conn = schema::tests::setup_conn_bare();
let mut tx = schema::tests::setup_tx(&mut conn);
assert_eq!(None, TxMapper::get(&mut tx, 1).expect("success"));
let set_uuid = TxMapper::get_or_set_uuid_for_tx(&mut tx, 1).expect("success");
assert_eq!(Some(set_uuid), TxMapper::get(&mut tx, 1).expect("success"));
@ -101,8 +108,8 @@ pub mod tests {
#[test]
fn test_bulk_setter() {
let mut conn = schema::tests::setup_conn();
let mut tx = conn.transaction().expect("db tx");
let mut conn = schema::tests::setup_conn_bare();
let mut tx = schema::tests::setup_tx(&mut conn);
let mut map = HashMap::new();
TxMapper::set_bulk(&mut tx, &map).expect("empty map success");

View file

@ -49,6 +49,29 @@ where T: Sized + Iterator<Item=Result<TxPart>> + 't {
rows: &'t mut Peekable<T>,
}
use std::os::raw::c_char;
use std::os::raw::c_int;
use std::ffi::CString;
pub const ANDROID_LOG_DEBUG: i32 = 3;
#[cfg(all(target_os="android", not(test)))]
extern { pub fn __android_log_write(prio: c_int, tag: *const c_char, text: *const c_char) -> c_int; }
#[cfg(all(target_os="android", not(test)))]
pub fn d(message: &str) {
let tag = "mentat_db::tx_processor";
let message = CString::new(message).unwrap();
let message = message.as_ptr();
let tag = CString::new(tag).unwrap();
let tag = tag.as_ptr();
unsafe { __android_log_write(ANDROID_LOG_DEBUG, tag, message) };
}
#[cfg(all(not(target_os="android")))]
pub fn d(message: &str) {
let tag = "mentat_db::tx_processor";
println!("d: {}: {}", tag, message);
}
impl<'dbtx, 't, T> DatomsIterator<'dbtx, 't, T>
where T: Sized + Iterator<Item=Result<TxPart>> + 't {
fn new(first: &'dbtx TxPart, rows: &'t mut Peekable<T>) -> DatomsIterator<'dbtx, 't, T>
@ -130,29 +153,52 @@ impl Processor {
pub fn process<R>(sqlite: &rusqlite::Transaction, from_tx: Option<Entid>, receiver: &mut R) -> Result<()>
where R: TxReceiver {
let tx_filter = match from_tx {
Some(tx) => format!(" WHERE tx > {} ", tx),
Some(tx) => format!("WHERE tx > {}", tx),
None => format!("")
};
// If no 'from_tx' is provided, get everything but skip over the first (bootstrap) transaction.
let skip_first_tx = match from_tx {
Some(_) => false,
None => true
};
let select_query = format!("SELECT e, a, v, value_type_tag, tx, added FROM transactions {} ORDER BY tx", tx_filter);
let mut stmt = sqlite.prepare(&select_query)?;
let mut rows = stmt.query_and_then(&[], to_tx_part)?.peekable();
let mut at_tx = 1;
let mut current_tx = None;
while let Some(row) = rows.next() {
let datom = row?;
d(&format!("datom! {:?}", datom));
match current_tx {
Some(tx) => {
if tx != datom.tx {
d(&format!("new tx! {:?}", datom.tx));
at_tx = at_tx + 1;
d(&format!("at_tx! {:?}", at_tx));
current_tx = Some(datom.tx);
if at_tx <= 2 && skip_first_tx {
d(&format!("skipping subsequent"));
continue;
}
d(&format!("Some: calling receiver.tx"));
receiver.tx(
datom.tx,
&mut DatomsIterator::new(&datom, &mut rows)
)?;
d(&format!("Some: returned from receiver.tx"));
} else {
d(&format!("skipping over datom in current tx block"));
}
},
None => {
current_tx = Some(datom.tx);
if at_tx <= 3 && skip_first_tx {
d(&format!("skipping first"));
continue;
}
d(&format!("None: calling receiver.tx"));
receiver.tx(
datom.tx,
&mut DatomsIterator::new(&datom, &mut rows)
@ -160,7 +206,9 @@ impl Processor {
}
}
}
d(&format!("calling receiver.done"));
receiver.done()?;
Ok(())
}
}

View file

@ -32,11 +32,12 @@ use mentat::{
QueryOutput,
QueryResults,
Store,
Syncable,
TxReport,
TypedValue,
};
use mentat::sync::Syncable;
use command_parser::{
Command,
HELP_COMMAND,