Transaction observation
- Creation of transaction observer service that takes `TxObserver`s and registers them against keys and for sets of attributes. - Observer service called when InProgress commits and filters observers that are affected by the tx's that occured and notifies them of what changed - InProgress batches up tx's as it goes along so granular notification can be provided.
This commit is contained in:
parent
d9d2b3a89a
commit
58716ae22e
13 changed files with 1001 additions and 40 deletions
|
@ -21,7 +21,7 @@ bundled_sqlite3 = ["rusqlite/bundled"]
|
||||||
syncable = ["mentat_tolstoy"]
|
syncable = ["mentat_tolstoy"]
|
||||||
|
|
||||||
[workspace]
|
[workspace]
|
||||||
members = ["tools/cli"]
|
members = ["tools/cli", "ffi"]
|
||||||
|
|
||||||
[build-dependencies]
|
[build-dependencies]
|
||||||
rustc_version = "0.1.7"
|
rustc_version = "0.1.7"
|
||||||
|
|
|
@ -75,9 +75,12 @@ pub use db::{
|
||||||
pub use tx::{
|
pub use tx::{
|
||||||
transact,
|
transact,
|
||||||
transact_terms,
|
transact_terms,
|
||||||
|
TxObservationService,
|
||||||
|
TxObserver,
|
||||||
};
|
};
|
||||||
|
|
||||||
pub use types::{
|
pub use types::{
|
||||||
|
AttributeSet,
|
||||||
DB,
|
DB,
|
||||||
PartitionMap,
|
PartitionMap,
|
||||||
TxReport,
|
TxReport,
|
||||||
|
|
119
db/src/tx.rs
119
db/src/tx.rs
|
@ -51,7 +51,9 @@ use std::collections::{
|
||||||
BTreeSet,
|
BTreeSet,
|
||||||
VecDeque,
|
VecDeque,
|
||||||
};
|
};
|
||||||
use std::rc::Rc;
|
use std::rc::{
|
||||||
|
Rc,
|
||||||
|
};
|
||||||
|
|
||||||
use db;
|
use db;
|
||||||
use db::{
|
use db::{
|
||||||
|
@ -105,6 +107,7 @@ use schema::{
|
||||||
};
|
};
|
||||||
use types::{
|
use types::{
|
||||||
Attribute,
|
Attribute,
|
||||||
|
AttributeSet,
|
||||||
AVPair,
|
AVPair,
|
||||||
AVMap,
|
AVMap,
|
||||||
Entid,
|
Entid,
|
||||||
|
@ -115,6 +118,30 @@ use types::{
|
||||||
};
|
};
|
||||||
use upsert_resolution::Generation;
|
use upsert_resolution::Generation;
|
||||||
|
|
||||||
|
use std::os::raw::c_char;
|
||||||
|
use std::os::raw::c_int;
|
||||||
|
use std::ffi::CString;
|
||||||
|
|
||||||
|
pub const ANDROID_LOG_DEBUG: i32 = 3;
|
||||||
|
#[cfg(all(target_os="android", not(test)))]
|
||||||
|
extern { pub fn __android_log_write(prio: c_int, tag: *const c_char, text: *const c_char) -> c_int; }
|
||||||
|
|
||||||
|
#[cfg(all(target_os="android", not(test)))]
|
||||||
|
pub fn d(message: &str) {
|
||||||
|
let tag = "mentat_db::tx";
|
||||||
|
let message = CString::new(message).unwrap();
|
||||||
|
let message = message.as_ptr();
|
||||||
|
let tag = CString::new(tag).unwrap();
|
||||||
|
let tag = tag.as_ptr();
|
||||||
|
unsafe { __android_log_write(ANDROID_LOG_DEBUG, tag, message) };
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(all(not(target_os="android")))]
|
||||||
|
pub fn d(message: &str) {
|
||||||
|
let tag = "mentat_db::tx";
|
||||||
|
println!("d: {}: {}", tag, message);
|
||||||
|
}
|
||||||
|
|
||||||
/// A transaction on its way to being applied.
|
/// A transaction on its way to being applied.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct Tx<'conn, 'a> {
|
pub struct Tx<'conn, 'a> {
|
||||||
|
@ -600,6 +627,11 @@ impl<'conn, 'a> Tx<'conn, 'a> {
|
||||||
final_populations.allocated,
|
final_populations.allocated,
|
||||||
inert_terms.into_iter().map(|term| term.unwrap()).collect()].concat();
|
inert_terms.into_iter().map(|term| term.unwrap()).collect()].concat();
|
||||||
|
|
||||||
|
let affected_attrs: AttributeSet = final_terms.iter().filter_map(|t| {
|
||||||
|
match t {
|
||||||
|
&Term::AddOrRetract(_, _, attrid, _) => Some(attrid),
|
||||||
|
}
|
||||||
|
}). collect();
|
||||||
let tx_instant;
|
let tx_instant;
|
||||||
|
|
||||||
{ // TODO: Don't use this block to scope borrowing the schema; instead, extract a helper function.
|
{ // TODO: Don't use this block to scope borrowing the schema; instead, extract a helper function.
|
||||||
|
@ -689,7 +721,6 @@ impl<'conn, 'a> Tx<'conn, 'a> {
|
||||||
if !fts_many.is_empty() {
|
if !fts_many.is_empty() {
|
||||||
self.store.insert_fts_searches(&fts_many[..], db::SearchType::Exact)?;
|
self.store.insert_fts_searches(&fts_many[..], db::SearchType::Exact)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
self.store.commit_transaction(self.tx_id)?;
|
self.store.commit_transaction(self.tx_id)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -718,6 +749,7 @@ impl<'conn, 'a> Tx<'conn, 'a> {
|
||||||
tx_id: self.tx_id,
|
tx_id: self.tx_id,
|
||||||
tx_instant,
|
tx_instant,
|
||||||
tempids: tempids,
|
tempids: tempids,
|
||||||
|
changeset: affected_attrs,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -749,7 +781,7 @@ fn conclude_tx(tx: Tx, report: TxReport) -> Result<(TxReport, PartitionMap, Opti
|
||||||
///
|
///
|
||||||
/// This approach is explained in https://github.com/mozilla/mentat/wiki/Transacting.
|
/// This approach is explained in https://github.com/mozilla/mentat/wiki/Transacting.
|
||||||
// TODO: move this to the transactor layer.
|
// TODO: move this to the transactor layer.
|
||||||
pub fn transact<'conn, 'a, I>(conn: &'conn rusqlite::Connection,
|
pub fn transact<'conn, 'a, 'id, I>(conn: &'conn rusqlite::Connection,
|
||||||
partition_map: PartitionMap,
|
partition_map: PartitionMap,
|
||||||
schema_for_mutation: &'a Schema,
|
schema_for_mutation: &'a Schema,
|
||||||
schema: &'a Schema,
|
schema: &'a Schema,
|
||||||
|
@ -762,15 +794,92 @@ pub fn transact<'conn, 'a, I>(conn: &'conn rusqlite::Connection,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Just like `transact`, but accepts lower-level inputs to allow bypassing the parser interface.
|
/// Just like `transact`, but accepts lower-level inputs to allow bypassing the parser interface.
|
||||||
pub fn transact_terms<'conn, 'a, I>(conn: &'conn rusqlite::Connection,
|
pub fn transact_terms<'conn, 'a, 'id, I>(conn: &'conn rusqlite::Connection,
|
||||||
partition_map: PartitionMap,
|
partition_map: PartitionMap,
|
||||||
schema_for_mutation: &'a Schema,
|
schema_for_mutation: &'a Schema,
|
||||||
schema: &'a Schema,
|
schema: &'a Schema,
|
||||||
terms: I,
|
terms: I,
|
||||||
tempid_set: InternSet<TempId>) -> Result<(TxReport, PartitionMap, Option<Schema>)>
|
tempid_set: InternSet<TempId>) -> Result<(TxReport, PartitionMap, Option<Schema>)>
|
||||||
where I: IntoIterator<Item=TermWithTempIds> {
|
where I: IntoIterator<Item=TermWithTempIds> {
|
||||||
|
|
||||||
let mut tx = start_tx(conn, partition_map, schema_for_mutation, schema)?;
|
let mut tx = start_tx(conn, partition_map, schema_for_mutation, schema)?;
|
||||||
let report = tx.transact_simple_terms(terms, tempid_set)?;
|
let report = tx.transact_simple_terms(terms, tempid_set)?;
|
||||||
conclude_tx(tx, report)
|
conclude_tx(tx, report)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub struct TxObserver {
|
||||||
|
notify_fn: Option<Box<FnMut(String, Vec<TxReport>)>>,
|
||||||
|
attributes: AttributeSet,
|
||||||
|
registration_time: Option<DateTime<Utc>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TxObserver {
|
||||||
|
pub fn new<F>(attributes: AttributeSet, notify_fn: F) -> TxObserver where F: FnMut(String, Vec<TxReport>) + 'static {
|
||||||
|
TxObserver {
|
||||||
|
notify_fn: Some(Box::new(notify_fn)),
|
||||||
|
attributes,
|
||||||
|
registration_time: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_registered(&mut self) {
|
||||||
|
self.registration_time = Some(now());
|
||||||
|
}
|
||||||
|
|
||||||
|
fn notify(&mut self, key: String, reports: &Vec<TxReport>) {
|
||||||
|
let mut matching_reports: Vec<TxReport> = vec![];
|
||||||
|
// if let Some(registration_time) = self.registration_time {
|
||||||
|
// if registration_time.le(&reports[0].tx_instant) {
|
||||||
|
matching_reports = reports.iter().filter_map( |report| {
|
||||||
|
if self.attributes.intersection(&report.changeset).next().is_some(){
|
||||||
|
Some(report.clone())
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}).collect();
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
if !matching_reports.is_empty() {
|
||||||
|
if let Some(ref mut notify_fn) = self.notify_fn {
|
||||||
|
d(&format!("Notifying {:?} about tx {:?}", key, matching_reports));
|
||||||
|
(notify_fn)(key, matching_reports);
|
||||||
|
} else {
|
||||||
|
d("no notify function specified for TxObserver");
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
d(&format!("No matching reports for observer {:?} registered for updates on attributes {:?}: {:?}", key, reports, self.attributes));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
pub struct TxObservationService {
|
||||||
|
observers: BTreeMap<String, TxObserver>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TxObservationService {
|
||||||
|
// For testing purposes
|
||||||
|
pub fn is_registered(&self, key: &String) -> bool {
|
||||||
|
self.observers.contains_key(key)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn register(&mut self, key: String, mut observer: TxObserver) {
|
||||||
|
observer.set_registered();
|
||||||
|
self.observers.insert(key.clone(), observer);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn deregister(&mut self, key: &String) {
|
||||||
|
self.observers.remove(key);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn has_observers(&self) -> bool {
|
||||||
|
!self.observers.is_empty()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn transaction_did_commit(&mut self, reports: &Vec<TxReport>) {
|
||||||
|
// notify all observers about their relevant transactions
|
||||||
|
for (key, observer) in self.observers.iter_mut() {
|
||||||
|
d(&format!("Notifying observer registered for key {:?}", key));
|
||||||
|
observer.notify(key.clone(), reports);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -11,19 +11,22 @@
|
||||||
#![allow(dead_code)]
|
#![allow(dead_code)]
|
||||||
|
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::collections::BTreeMap;
|
use std::collections::{
|
||||||
|
BTreeMap,
|
||||||
|
BTreeSet,
|
||||||
|
};
|
||||||
|
|
||||||
extern crate mentat_core;
|
extern crate mentat_core;
|
||||||
|
|
||||||
pub use self::mentat_core::{
|
pub use self::mentat_core::{
|
||||||
DateTime,
|
|
||||||
Entid,
|
|
||||||
ValueType,
|
|
||||||
TypedValue,
|
|
||||||
Attribute,
|
Attribute,
|
||||||
AttributeBitFlags,
|
AttributeBitFlags,
|
||||||
|
DateTime,
|
||||||
|
Entid,
|
||||||
Schema,
|
Schema,
|
||||||
|
TypedValue,
|
||||||
Utc,
|
Utc,
|
||||||
|
ValueType,
|
||||||
};
|
};
|
||||||
|
|
||||||
/// Represents one partition of the entid space.
|
/// Represents one partition of the entid space.
|
||||||
|
@ -82,6 +85,9 @@ pub type AVPair = (Entid, TypedValue);
|
||||||
/// Used to resolve lookup-refs and upserts.
|
/// Used to resolve lookup-refs and upserts.
|
||||||
pub type AVMap<'a> = HashMap<&'a AVPair, Entid>;
|
pub type AVMap<'a> = HashMap<&'a AVPair, Entid>;
|
||||||
|
|
||||||
|
// represents a set of entids that are correspond to attributes
|
||||||
|
pub type AttributeSet = BTreeSet<Entid>;
|
||||||
|
|
||||||
/// A transaction report summarizes an applied transaction.
|
/// A transaction report summarizes an applied transaction.
|
||||||
#[derive(Clone, Debug, Eq, Hash, Ord, PartialOrd, PartialEq)]
|
#[derive(Clone, Debug, Eq, Hash, Ord, PartialOrd, PartialEq)]
|
||||||
pub struct TxReport {
|
pub struct TxReport {
|
||||||
|
@ -97,4 +103,7 @@ pub struct TxReport {
|
||||||
/// existing entid, or is allocated a new entid. (It is possible for multiple distinct string
|
/// existing entid, or is allocated a new entid. (It is possible for multiple distinct string
|
||||||
/// literal tempids to all unify to a single freshly allocated entid.)
|
/// literal tempids to all unify to a single freshly allocated entid.)
|
||||||
pub tempids: BTreeMap<String, Entid>,
|
pub tempids: BTreeMap<String, Entid>,
|
||||||
|
|
||||||
|
// A set of entids for attributes that were affected inside this transaction
|
||||||
|
pub changeset: AttributeSet,
|
||||||
}
|
}
|
||||||
|
|
265
db/tests/tx_observer_tests.rs
Normal file
265
db/tests/tx_observer_tests.rs
Normal file
|
@ -0,0 +1,265 @@
|
||||||
|
// Copyright 2018 Mozilla
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
|
||||||
|
// this file except in compliance with the License. You may obtain a copy of the
|
||||||
|
// License at http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
// Unless required by applicable law or agreed to in writing, software distributed
|
||||||
|
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
|
||||||
|
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
|
||||||
|
// specific language governing permissions and limitations under the License.
|
||||||
|
|
||||||
|
extern crate mentat_db;
|
||||||
|
extern crate mentat_core;
|
||||||
|
|
||||||
|
use std::cell::{
|
||||||
|
RefCell
|
||||||
|
};
|
||||||
|
use std::collections::{
|
||||||
|
BTreeMap,
|
||||||
|
BTreeSet,
|
||||||
|
};
|
||||||
|
use std::ops::Deref;
|
||||||
|
use std::rc::{
|
||||||
|
Rc,
|
||||||
|
};
|
||||||
|
|
||||||
|
use mentat_core::{
|
||||||
|
now,
|
||||||
|
};
|
||||||
|
|
||||||
|
use mentat_db::{
|
||||||
|
TxObserver,
|
||||||
|
TxObservationService,
|
||||||
|
};
|
||||||
|
|
||||||
|
use mentat_db::types::TxReport;
|
||||||
|
|
||||||
|
fn get_registered_observer_attributes() -> BTreeSet<i64> {
|
||||||
|
let mut registered_attrs = BTreeSet::new();
|
||||||
|
registered_attrs.insert(100);
|
||||||
|
registered_attrs.insert(200);
|
||||||
|
registered_attrs.insert(300);
|
||||||
|
registered_attrs
|
||||||
|
}
|
||||||
|
|
||||||
|
fn tx_report(tx_id: i64, changes: BTreeSet<i64>) -> TxReport {
|
||||||
|
TxReport {
|
||||||
|
tx_id: tx_id,
|
||||||
|
tx_instant: now(),
|
||||||
|
tempids: BTreeMap::new(),
|
||||||
|
changeset: changes,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_register_observer() {
|
||||||
|
let mut observer_service = TxObservationService::default();
|
||||||
|
let key = "Test Observing".to_string();
|
||||||
|
let registered_attrs = BTreeSet::new();
|
||||||
|
|
||||||
|
let tx_observer = TxObserver::new(registered_attrs, move |_obs_key, _batch| {});
|
||||||
|
|
||||||
|
observer_service.register(key.clone(), tx_observer);
|
||||||
|
assert!(observer_service.is_registered(&key));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_deregister_observer() {
|
||||||
|
let mut observer_service = TxObservationService::default();
|
||||||
|
let key = "Test Observing".to_string();
|
||||||
|
let registered_attrs = BTreeSet::new();
|
||||||
|
|
||||||
|
let tx_observer = TxObserver::new(registered_attrs, move |_obs_key, _batch| {});
|
||||||
|
|
||||||
|
observer_service.register(key.clone(), tx_observer);
|
||||||
|
assert!(observer_service.is_registered(&key));
|
||||||
|
|
||||||
|
observer_service.deregister(&key);
|
||||||
|
|
||||||
|
assert!(!observer_service.is_registered(&key));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_observer_notified_on_registered_change() {
|
||||||
|
let mut observer_service = TxObservationService::default();
|
||||||
|
let key = "Test Observing".to_string();
|
||||||
|
let registered_attrs = get_registered_observer_attributes();
|
||||||
|
|
||||||
|
let txids = Rc::new(RefCell::new(Vec::new()));
|
||||||
|
let changes = Rc::new(RefCell::new(Vec::new()));
|
||||||
|
let called_key: Rc<RefCell<Option<String>>> = Rc::new(RefCell::new(None));
|
||||||
|
|
||||||
|
let mut_txids = Rc::clone(&txids);
|
||||||
|
let mut_changes = Rc::clone(&changes);
|
||||||
|
let mut_key = Rc::clone(&called_key);
|
||||||
|
let tx_observer = TxObserver::new(registered_attrs, move |obs_key, batch| {
|
||||||
|
let mut k = mut_key.borrow_mut();
|
||||||
|
*k = Some(obs_key.clone());
|
||||||
|
let mut t = mut_txids.borrow_mut();
|
||||||
|
let mut c = mut_changes.borrow_mut();
|
||||||
|
for report in batch.iter() {
|
||||||
|
t.push(report.tx_id.clone());
|
||||||
|
c.push(report.changeset.clone());
|
||||||
|
}
|
||||||
|
t.sort();
|
||||||
|
});
|
||||||
|
|
||||||
|
observer_service.register(key.clone(), tx_observer);
|
||||||
|
assert!(observer_service.is_registered(&key));
|
||||||
|
|
||||||
|
let mut tx_set_1 = BTreeSet::new();
|
||||||
|
tx_set_1.insert(100);
|
||||||
|
tx_set_1.insert(400);
|
||||||
|
tx_set_1.insert(700);
|
||||||
|
let mut tx_set_2 = BTreeSet::new();
|
||||||
|
tx_set_2.insert(200);
|
||||||
|
tx_set_2.insert(300);
|
||||||
|
let mut tx_set_3 = BTreeSet::new();
|
||||||
|
tx_set_3.insert(600);
|
||||||
|
let mut batch = Vec::new();
|
||||||
|
batch.push(tx_report(10, tx_set_1.clone()));
|
||||||
|
batch.push(tx_report(11, tx_set_2.clone()));
|
||||||
|
batch.push(tx_report(12, tx_set_3));
|
||||||
|
observer_service.transaction_did_commit(&batch);
|
||||||
|
|
||||||
|
let val = called_key.deref();
|
||||||
|
assert_eq!(val, &RefCell::new(Some(key.clone())));
|
||||||
|
let t = txids.deref();
|
||||||
|
assert_eq!(t, &RefCell::new(vec![10, 11]));
|
||||||
|
|
||||||
|
let c = changes.deref();
|
||||||
|
assert_eq!(c, &RefCell::new(vec![tx_set_1, tx_set_2]));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_observer_not_notified_on_unregistered_change() {
|
||||||
|
let mut observer_service = TxObservationService::default();
|
||||||
|
let key = "Test Observing".to_string();
|
||||||
|
let registered_attrs = get_registered_observer_attributes();
|
||||||
|
|
||||||
|
let txids = Rc::new(RefCell::new(Vec::new()));
|
||||||
|
let changes = Rc::new(RefCell::new(Vec::new()));
|
||||||
|
let called_key: Rc<RefCell<Option<String>>> = Rc::new(RefCell::new(None));
|
||||||
|
|
||||||
|
let mut_txids = Rc::clone(&txids);
|
||||||
|
let mut_changes = Rc::clone(&changes);
|
||||||
|
let mut_key = Rc::clone(&called_key);
|
||||||
|
let tx_observer = TxObserver::new(registered_attrs, move |obs_key, batch| {
|
||||||
|
let mut k = mut_key.borrow_mut();
|
||||||
|
*k = Some(obs_key.clone());
|
||||||
|
let mut t = mut_txids.borrow_mut();
|
||||||
|
let mut c = mut_changes.borrow_mut();
|
||||||
|
for report in batch.iter() {
|
||||||
|
t.push(report.tx_id.clone());
|
||||||
|
c.push(report.changeset.clone());
|
||||||
|
}
|
||||||
|
t.sort();
|
||||||
|
});
|
||||||
|
|
||||||
|
observer_service.register(key.clone(), tx_observer);
|
||||||
|
assert!(observer_service.is_registered(&key));
|
||||||
|
|
||||||
|
let mut tx_set_1 = BTreeSet::new();
|
||||||
|
tx_set_1.insert(101);
|
||||||
|
tx_set_1.insert(401);
|
||||||
|
tx_set_1.insert(701);
|
||||||
|
let mut tx_set_2 = BTreeSet::new();
|
||||||
|
tx_set_2.insert(201);
|
||||||
|
tx_set_2.insert(301);
|
||||||
|
let mut tx_set_3 = BTreeSet::new();
|
||||||
|
tx_set_3.insert(601);
|
||||||
|
let mut batch = Vec::new();
|
||||||
|
batch.push(tx_report(10, tx_set_1));
|
||||||
|
batch.push(tx_report(11, tx_set_2));
|
||||||
|
batch.push(tx_report(12, tx_set_3));
|
||||||
|
observer_service.transaction_did_commit(&batch);
|
||||||
|
|
||||||
|
let val = called_key.deref();
|
||||||
|
assert_eq!(val, &RefCell::new(None));
|
||||||
|
let t = txids.deref();
|
||||||
|
assert_eq!(t, &RefCell::new(vec![]));
|
||||||
|
let c = changes.deref();
|
||||||
|
assert_eq!(c, &RefCell::new(vec![]));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_only_notifies_observers_registered_at_transact_start() {
|
||||||
|
let mut observer_service = TxObservationService::default();
|
||||||
|
let key_1 = "Test Observing 1".to_string();
|
||||||
|
let registered_attrs = get_registered_observer_attributes();
|
||||||
|
|
||||||
|
let txids_1 = Rc::new(RefCell::new(Vec::new()));
|
||||||
|
let changes_1 = Rc::new(RefCell::new(Vec::new()));
|
||||||
|
let called_key_1: Rc<RefCell<Option<String>>> = Rc::new(RefCell::new(None));
|
||||||
|
|
||||||
|
let mut_txids_1 = Rc::clone(&txids_1);
|
||||||
|
let mut_changes_1 = Rc::clone(&changes_1);
|
||||||
|
let mut_key_1 = Rc::clone(&called_key_1);
|
||||||
|
|
||||||
|
let tx_observer_1 = TxObserver::new(registered_attrs.clone(), move |obs_key, batch| {
|
||||||
|
let mut k = mut_key_1.borrow_mut();
|
||||||
|
*k = Some(obs_key.clone());
|
||||||
|
let mut t = mut_txids_1.borrow_mut();
|
||||||
|
let mut c = mut_changes_1.borrow_mut();
|
||||||
|
for report in batch.iter() {
|
||||||
|
t.push(report.tx_id.clone());
|
||||||
|
c.push(report.changeset.clone());
|
||||||
|
}
|
||||||
|
t.sort();
|
||||||
|
});
|
||||||
|
|
||||||
|
observer_service.register(key_1.clone(), tx_observer_1);
|
||||||
|
assert!(observer_service.is_registered(&key_1));
|
||||||
|
|
||||||
|
let mut tx_set_1 = BTreeSet::new();
|
||||||
|
tx_set_1.insert(100);
|
||||||
|
tx_set_1.insert(400);
|
||||||
|
tx_set_1.insert(700);
|
||||||
|
|
||||||
|
let mut batch = Vec::new();
|
||||||
|
batch.push(tx_report(10, tx_set_1.clone()));
|
||||||
|
|
||||||
|
// register second observer after one transact has occured
|
||||||
|
let key_2 = "Test Observing 2".to_string();
|
||||||
|
let txids_2 = Rc::new(RefCell::new(Vec::new()));
|
||||||
|
let changes_2 = Rc::new(RefCell::new(Vec::new()));
|
||||||
|
let called_key_2: Rc<RefCell<Option<String>>> = Rc::new(RefCell::new(None));
|
||||||
|
|
||||||
|
let mut_txids_2 = Rc::clone(&txids_2);
|
||||||
|
let mut_changes_2 = Rc::clone(&changes_2);
|
||||||
|
let mut_key_2 = Rc::clone(&called_key_2);
|
||||||
|
|
||||||
|
let tx_observer_2 = TxObserver::new(registered_attrs, move |obs_key, batch| {
|
||||||
|
let mut k = mut_key_2.borrow_mut();
|
||||||
|
*k = Some(obs_key.clone());
|
||||||
|
let mut t = mut_txids_2.borrow_mut();
|
||||||
|
let mut c = mut_changes_2.borrow_mut();
|
||||||
|
for report in batch.iter() {
|
||||||
|
t.push(report.tx_id.clone());
|
||||||
|
c.push(report.changeset.clone());
|
||||||
|
}
|
||||||
|
t.sort();
|
||||||
|
});
|
||||||
|
observer_service.register(key_2.clone(), tx_observer_2);
|
||||||
|
assert!(observer_service.is_registered(&key_2));
|
||||||
|
|
||||||
|
let mut tx_set_2 = BTreeSet::new();
|
||||||
|
tx_set_2.insert(200);
|
||||||
|
tx_set_2.insert(300);
|
||||||
|
batch.push(tx_report(11, tx_set_2.clone()));
|
||||||
|
observer_service.transaction_did_commit(&batch);
|
||||||
|
|
||||||
|
let val = called_key_1.deref();
|
||||||
|
assert_eq!(val, &RefCell::new(Some(key_1.clone())));
|
||||||
|
let t = txids_1.deref();
|
||||||
|
assert_eq!(t, &RefCell::new(vec![10, 11]));
|
||||||
|
let c = changes_1.deref();
|
||||||
|
assert_eq!(c, &RefCell::new(vec![tx_set_1.clone(), tx_set_2.clone()]));
|
||||||
|
|
||||||
|
let val = called_key_2.deref();
|
||||||
|
assert_eq!(val, &RefCell::new(None));
|
||||||
|
let t = txids_2.deref();
|
||||||
|
assert_eq!(t, &RefCell::new(vec![]));
|
||||||
|
let c = changes_2.deref();
|
||||||
|
assert_eq!(c, &RefCell::new(vec![]));
|
||||||
|
}
|
7
ffi/Cargo.toml
Normal file
7
ffi/Cargo.toml
Normal file
|
@ -0,0 +1,7 @@
|
||||||
|
[package]
|
||||||
|
name = "mentat_ffi"
|
||||||
|
version = "0.1.0"
|
||||||
|
authors = ["Emily Toop <etoop@mozilla.com>"]
|
||||||
|
|
||||||
|
[dependencies.mentat]
|
||||||
|
path = ".."
|
11
ffi/src/android.rs
Normal file
11
ffi/src/android.rs
Normal file
|
@ -0,0 +1,11 @@
|
||||||
|
// TODO just use https://github.com/tomaka/android-rs-glue somehow?
|
||||||
|
|
||||||
|
use std::os::raw::c_char;
|
||||||
|
use std::os::raw::c_int;
|
||||||
|
|
||||||
|
// Logging
|
||||||
|
pub const ANDROID_LOG_DEBUG: i32 = 3;
|
||||||
|
pub const ANDROID_LOG_INFO: i32 = 4;
|
||||||
|
pub const ANDROID_LOG_WARN: i32 = 5;
|
||||||
|
pub const ANDROID_LOG_ERROR: i32 = 6;
|
||||||
|
extern { pub fn __android_log_write(prio: c_int, tag: *const c_char, text: *const c_char) -> c_int; }
|
150
ffi/src/lib.rs
Normal file
150
ffi/src/lib.rs
Normal file
|
@ -0,0 +1,150 @@
|
||||||
|
// Copyright 2018 Mozilla
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
|
||||||
|
// this file except in compliance with the License. You may obtain a copy of the
|
||||||
|
// License at http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
// Unless required by applicable law or agreed to in writing, software distributed
|
||||||
|
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
|
||||||
|
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
|
||||||
|
// specific language governing permissions and limitations under the License.
|
||||||
|
|
||||||
|
extern crate mentat;
|
||||||
|
|
||||||
|
use std::collections::{
|
||||||
|
BTreeSet,
|
||||||
|
};
|
||||||
|
use std::os::raw::{
|
||||||
|
c_char,
|
||||||
|
c_int,
|
||||||
|
};
|
||||||
|
use std::slice;
|
||||||
|
|
||||||
|
pub use mentat::{
|
||||||
|
NamespacedKeyword,
|
||||||
|
HasSchema,
|
||||||
|
Store,
|
||||||
|
TxObserver,
|
||||||
|
};
|
||||||
|
|
||||||
|
pub mod utils;
|
||||||
|
pub mod android;
|
||||||
|
|
||||||
|
pub use utils::strings::{
|
||||||
|
c_char_to_string,
|
||||||
|
string_to_c_char,
|
||||||
|
};
|
||||||
|
|
||||||
|
use utils::log;
|
||||||
|
|
||||||
|
#[repr(C)]
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct ExternTxReport {
|
||||||
|
pub txid: i64,
|
||||||
|
pub changes: Box<[i64]>,
|
||||||
|
pub changes_len: usize
|
||||||
|
}
|
||||||
|
|
||||||
|
#[repr(C)]
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct ExternTxReportList {
|
||||||
|
pub reports: Box<[ExternTxReport]>,
|
||||||
|
pub len: usize
|
||||||
|
}
|
||||||
|
|
||||||
|
#[no_mangle]
|
||||||
|
pub extern "C" fn new_store(uri: *const c_char) -> *mut Store {
|
||||||
|
let uri = c_char_to_string(uri);
|
||||||
|
let store = Store::open(&uri).expect("expected a store");
|
||||||
|
Box::into_raw(Box::new(store))
|
||||||
|
}
|
||||||
|
|
||||||
|
#[no_mangle]
|
||||||
|
pub unsafe extern "C" fn store_destroy(store: *mut Store) {
|
||||||
|
let _ = Box::from_raw(store);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[no_mangle]
|
||||||
|
pub unsafe extern "C" fn store_register_observer(store: *mut Store,
|
||||||
|
key: *const c_char,
|
||||||
|
attributes: *const i64,
|
||||||
|
attributes_len: usize,
|
||||||
|
callback: extern fn(key: *const c_char)) {//, reports: &ExternTxReportList)) {
|
||||||
|
let store = &mut*store;
|
||||||
|
let mut attribute_set = BTreeSet::new();
|
||||||
|
let slice = slice::from_raw_parts(attributes, attributes_len);
|
||||||
|
log::d(&format!("Observer attribute slice: {:?}", slice));
|
||||||
|
for i in 0..attributes_len {
|
||||||
|
let attr = slice[i].into();
|
||||||
|
attribute_set.insert(attr);
|
||||||
|
}
|
||||||
|
log::d(&format!("Observer attribute set: {:?}", attribute_set));
|
||||||
|
let key = c_char_to_string(key);
|
||||||
|
let tx_observer = TxObserver::new(attribute_set, move |obs_key, batch| {
|
||||||
|
log::d(&format!("Calling observer registered for {:?}", obs_key));
|
||||||
|
let extern_reports: Vec<ExternTxReport> = batch.iter().map(|report| {
|
||||||
|
let changes: Vec<i64> = report.changeset.iter().map(|i|i.clone()).collect();
|
||||||
|
let len = changes.len();
|
||||||
|
ExternTxReport {
|
||||||
|
txid: report.tx_id.clone(),
|
||||||
|
changes: changes.into_boxed_slice(),
|
||||||
|
changes_len: len,
|
||||||
|
}
|
||||||
|
}).collect();
|
||||||
|
let len = extern_reports.len();
|
||||||
|
let reports = ExternTxReportList {
|
||||||
|
reports: extern_reports.into_boxed_slice(),
|
||||||
|
len: len,
|
||||||
|
};
|
||||||
|
callback(string_to_c_char(obs_key));//, &reports);
|
||||||
|
});
|
||||||
|
log::d(&format!("Registering observer for key: {:?}", key));
|
||||||
|
store.register_observer(key, tx_observer);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[no_mangle]
|
||||||
|
pub unsafe extern "C" fn store_unregister_observer(store: *mut Store, key: *const c_char) {
|
||||||
|
let store = &mut*store;
|
||||||
|
let key = c_char_to_string(key);
|
||||||
|
log::d(&format!("Unregistering observer for key: {:?}", key));
|
||||||
|
store.unregister_observer(&key);
|
||||||
|
}
|
||||||
|
|
||||||
|
use std::panic;
|
||||||
|
|
||||||
|
#[no_mangle]
|
||||||
|
pub unsafe extern "C" fn store_entid_for_attribute(store: *mut Store, attr: *const c_char) -> i64 {
|
||||||
|
let store = &mut*store;
|
||||||
|
log::d(&format!("store_entid_for_attribute got store"));
|
||||||
|
let mut keyword_string = c_char_to_string(attr);
|
||||||
|
log::d(&format!("store_entid_for_attribute keyword_string {:?}", keyword_string));
|
||||||
|
let attr_name = keyword_string.split_off(1);
|
||||||
|
log::d(&format!("store_entid_for_attribute attr_name {:?}", attr_name));
|
||||||
|
let parts: Vec<&str> = attr_name.split("/").collect();
|
||||||
|
log::d(&format!("store_entid_for_attribute parts {:?}", parts));
|
||||||
|
let kw = NamespacedKeyword::new(parts[0], parts[1]);
|
||||||
|
log::d(&format!("store_entid_for_attribute kw {:?}", kw));
|
||||||
|
let conn = store.conn();
|
||||||
|
log::d(&format!("store_entid_for_attribute conn"));
|
||||||
|
let current_schema = conn.current_schema();
|
||||||
|
log::d(&format!("store_entid_for_attribute current_schema {:?}", current_schema));
|
||||||
|
let got_entid = current_schema.get_entid(&kw);
|
||||||
|
log::d(&format!("store_entid_for_attribute got_entid {:?}", got_entid));
|
||||||
|
let entid = got_entid.unwrap();
|
||||||
|
log::d(&format!("store_entid_for_attribute entid {:?}", entid));
|
||||||
|
entid.into()
|
||||||
|
}
|
||||||
|
|
||||||
|
#[no_mangle]
|
||||||
|
pub unsafe extern "C" fn tx_report_list_entry_at(tx_report_list: *mut ExternTxReportList, index: c_int) -> *const ExternTxReport {
|
||||||
|
let tx_report_list = &*tx_report_list;
|
||||||
|
let index = index as usize;
|
||||||
|
let report = Box::new(tx_report_list.reports[index].clone());
|
||||||
|
Box::into_raw(report)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[no_mangle]
|
||||||
|
pub unsafe extern "C" fn changelist_entry_at(tx_report: *mut ExternTxReport, index: c_int) -> i64 {
|
||||||
|
let tx_report = &*tx_report;
|
||||||
|
let index = index as usize;
|
||||||
|
tx_report.changes[index].clone()
|
||||||
|
}
|
56
ffi/src/utils.rs
Normal file
56
ffi/src/utils.rs
Normal file
|
@ -0,0 +1,56 @@
|
||||||
|
// Copyright 2016 Mozilla
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
|
||||||
|
// this file except in compliance with the License. You may obtain a copy of the
|
||||||
|
// License at http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
// Unless required by applicable law or agreed to in writing, software distributed
|
||||||
|
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
|
||||||
|
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
|
||||||
|
// specific language governing permissions and limitations under the License.
|
||||||
|
|
||||||
|
pub mod strings {
|
||||||
|
use std::os::raw::c_char;
|
||||||
|
use std::ffi::{
|
||||||
|
CString,
|
||||||
|
CStr
|
||||||
|
};
|
||||||
|
|
||||||
|
pub fn c_char_to_string(cchar: *const c_char) -> String {
|
||||||
|
let c_str = unsafe { CStr::from_ptr(cchar) };
|
||||||
|
let r_str = match c_str.to_str() {
|
||||||
|
Err(_) => "",
|
||||||
|
Ok(string) => string,
|
||||||
|
};
|
||||||
|
r_str.to_string()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn string_to_c_char(r_string: String) -> *mut c_char {
|
||||||
|
CString::new(r_string).unwrap().into_raw()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub mod log {
|
||||||
|
#[cfg(all(target_os="android", not(test)))]
|
||||||
|
use std::ffi::CString;
|
||||||
|
|
||||||
|
#[cfg(all(target_os="android", not(test)))]
|
||||||
|
use android;
|
||||||
|
|
||||||
|
// TODO far from ideal. And, we might actually want to println in tests.
|
||||||
|
#[cfg(all(not(target_os="android"), not(target_os="ios")))]
|
||||||
|
pub fn d(_: &str) {}
|
||||||
|
|
||||||
|
#[cfg(all(target_os="ios", not(test)))]
|
||||||
|
pub fn d(message: &str) {
|
||||||
|
eprintln!("{}", message);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(all(target_os="android", not(test)))]
|
||||||
|
pub fn d(message: &str) {
|
||||||
|
let message = CString::new(message).unwrap();
|
||||||
|
let message = message.as_ptr();
|
||||||
|
let tag = CString::new("mentat_ffi::utils").unwrap();
|
||||||
|
let tag = tag.as_ptr();
|
||||||
|
unsafe { android::__android_log_write(android::ANDROID_LOG_DEBUG, tag, message) };
|
||||||
|
}
|
||||||
|
}
|
298
src/conn.rs
298
src/conn.rs
|
@ -48,6 +48,8 @@ use mentat_db::{
|
||||||
transact,
|
transact,
|
||||||
transact_terms,
|
transact_terms,
|
||||||
PartitionMap,
|
PartitionMap,
|
||||||
|
TxObservationService,
|
||||||
|
TxObserver,
|
||||||
TxReport,
|
TxReport,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -119,8 +121,9 @@ pub struct Conn {
|
||||||
|
|
||||||
// TODO: maintain cache of query plans that could be shared across threads and invalidated when
|
// TODO: maintain cache of query plans that could be shared across threads and invalidated when
|
||||||
// the schema changes. #315.
|
// the schema changes. #315.
|
||||||
|
|
||||||
attribute_cache: RwLock<SQLiteAttributeCache>,
|
attribute_cache: RwLock<SQLiteAttributeCache>,
|
||||||
|
|
||||||
|
tx_observer_service: Mutex<TxObservationService>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A convenience wrapper around a single SQLite connection and a Conn. This is suitable
|
/// A convenience wrapper around a single SQLite connection and a Conn. This is suitable
|
||||||
|
@ -186,8 +189,9 @@ pub struct InProgress<'a, 'c> {
|
||||||
partition_map: PartitionMap,
|
partition_map: PartitionMap,
|
||||||
schema: Schema,
|
schema: Schema,
|
||||||
cache: RwLockWriteGuard<'a, SQLiteAttributeCache>,
|
cache: RwLockWriteGuard<'a, SQLiteAttributeCache>,
|
||||||
|
|
||||||
use_caching: bool,
|
use_caching: bool,
|
||||||
|
tx_reports: Vec<TxReport>,
|
||||||
|
observer_service: Option<&'a Mutex<TxObservationService>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Represents an in-progress set of reads to the store. Just like `InProgress`,
|
/// Represents an in-progress set of reads to the store. Just like `InProgress`,
|
||||||
|
@ -336,6 +340,29 @@ impl<'a, 'c> HasSchema for InProgress<'a, 'c> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
use std::os::raw::c_char;
|
||||||
|
use std::os::raw::c_int;
|
||||||
|
use std::ffi::CString;
|
||||||
|
|
||||||
|
pub const ANDROID_LOG_DEBUG: i32 = 3;
|
||||||
|
#[cfg(all(target_os="android", not(test)))]
|
||||||
|
extern { pub fn __android_log_write(prio: c_int, tag: *const c_char, text: *const c_char) -> c_int; }
|
||||||
|
|
||||||
|
#[cfg(all(target_os="android", not(test)))]
|
||||||
|
pub fn d(message: &str) {
|
||||||
|
let tag = "mentat::conn";
|
||||||
|
let message = CString::new(message).unwrap();
|
||||||
|
let message = message.as_ptr();
|
||||||
|
let tag = CString::new(tag).unwrap();
|
||||||
|
let tag = tag.as_ptr();
|
||||||
|
unsafe { __android_log_write(ANDROID_LOG_DEBUG, tag, message) };
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(all(not(target_os="android")))]
|
||||||
|
pub fn d(message: &str) {
|
||||||
|
let tag = "mentat::conn";
|
||||||
|
println!("d: {}: {}", tag, message);
|
||||||
|
}
|
||||||
|
|
||||||
impl<'a, 'c> InProgress<'a, 'c> {
|
impl<'a, 'c> InProgress<'a, 'c> {
|
||||||
pub fn builder(self) -> InProgressBuilder<'a, 'c> {
|
pub fn builder(self) -> InProgressBuilder<'a, 'c> {
|
||||||
|
@ -354,6 +381,7 @@ impl<'a, 'c> InProgress<'a, 'c> {
|
||||||
&self.schema,
|
&self.schema,
|
||||||
terms,
|
terms,
|
||||||
tempid_set)?;
|
tempid_set)?;
|
||||||
|
self.tx_reports.push(report.clone());
|
||||||
self.partition_map = next_partition_map;
|
self.partition_map = next_partition_map;
|
||||||
if let Some(schema) = next_schema {
|
if let Some(schema) = next_schema {
|
||||||
self.schema = schema;
|
self.schema = schema;
|
||||||
|
@ -371,6 +399,8 @@ impl<'a, 'c> InProgress<'a, 'c> {
|
||||||
// `Default::default` in those situations to extract the partition map, and so there
|
// `Default::default` in those situations to extract the partition map, and so there
|
||||||
// would still be some cost.
|
// would still be some cost.
|
||||||
let (report, next_partition_map, next_schema) = transact(&self.transaction, self.partition_map.clone(), &self.schema, &self.schema, entities)?;
|
let (report, next_partition_map, next_schema) = transact(&self.transaction, self.partition_map.clone(), &self.schema, &self.schema, entities)?;
|
||||||
|
self.tx_reports.push(report.clone());
|
||||||
|
|
||||||
self.partition_map = next_partition_map;
|
self.partition_map = next_partition_map;
|
||||||
if let Some(schema) = next_schema {
|
if let Some(schema) = next_schema {
|
||||||
self.schema = schema;
|
self.schema = schema;
|
||||||
|
@ -403,6 +433,14 @@ impl<'a, 'c> InProgress<'a, 'c> {
|
||||||
// Commit the SQLite transaction while we hold the mutex.
|
// Commit the SQLite transaction while we hold the mutex.
|
||||||
self.transaction.commit()?;
|
self.transaction.commit()?;
|
||||||
|
|
||||||
|
// notify any observers about a transaction that has been made.
|
||||||
|
if let Some(observer_service) = self.observer_service {
|
||||||
|
d(&format!("got an observer!"));
|
||||||
|
observer_service.lock().unwrap().transaction_did_commit(&self.tx_reports);
|
||||||
|
} else {
|
||||||
|
d(&format!("don't got no observer!"));
|
||||||
|
}
|
||||||
|
|
||||||
metadata.generation += 1;
|
metadata.generation += 1;
|
||||||
metadata.partition_map = self.partition_map;
|
metadata.partition_map = self.partition_map;
|
||||||
|
|
||||||
|
@ -450,6 +488,16 @@ impl Store {
|
||||||
direction,
|
direction,
|
||||||
CacheAction::Register)
|
CacheAction::Register)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn register_observer(&mut self, key: String, observer: TxObserver) {
|
||||||
|
self.conn.register_observer(key, observer);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn unregister_observer(&mut self, key: &String) {
|
||||||
|
println!("Store: unregistering observer for key {:?}", key);
|
||||||
|
self.conn.unregister_observer(key);
|
||||||
|
println!("Store: unregistered observer for key {:?}", key);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Queryable for Store {
|
impl Queryable for Store {
|
||||||
|
@ -497,10 +545,16 @@ impl Conn {
|
||||||
fn new(partition_map: PartitionMap, schema: Schema) -> Conn {
|
fn new(partition_map: PartitionMap, schema: Schema) -> Conn {
|
||||||
Conn {
|
Conn {
|
||||||
metadata: Mutex::new(Metadata::new(0, partition_map, Arc::new(schema))),
|
metadata: Mutex::new(Metadata::new(0, partition_map, Arc::new(schema))),
|
||||||
attribute_cache: Default::default()
|
attribute_cache: Default::default(),
|
||||||
|
tx_observer_service: Mutex::new(TxObservationService::default()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
pub fn is_registered_as_observer(&self, key: &String) -> bool {
|
||||||
|
self.tx_observer_service.lock().unwrap().is_registered(key)
|
||||||
|
}
|
||||||
|
|
||||||
/// Prepare the provided SQLite handle for use as a Mentat store. Creates tables but
|
/// Prepare the provided SQLite handle for use as a Mentat store. Creates tables but
|
||||||
/// _does not_ write the bootstrap schema. This constructor should only be used by
|
/// _does not_ write the bootstrap schema. This constructor should only be used by
|
||||||
/// consumers that expect to populate raw transaction data themselves.
|
/// consumers that expect to populate raw transaction data themselves.
|
||||||
|
@ -648,6 +702,8 @@ impl Conn {
|
||||||
schema: (*current_schema).clone(),
|
schema: (*current_schema).clone(),
|
||||||
cache: self.attribute_cache.write().unwrap(),
|
cache: self.attribute_cache.write().unwrap(),
|
||||||
use_caching: true,
|
use_caching: true,
|
||||||
|
tx_reports: Vec::new(),
|
||||||
|
observer_service: if self.tx_observer_service.lock().unwrap().has_observers() { Some(&self.tx_observer_service) } else { None },
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -731,6 +787,16 @@ impl Conn {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn register_observer(&mut self, key: String, observer: TxObserver) {
|
||||||
|
self.tx_observer_service.lock().unwrap().register(key, observer);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn unregister_observer(&mut self, key: &String) {
|
||||||
|
println!("Conn: unregistering observer for key {:?}", key);
|
||||||
|
self.tx_observer_service.lock().unwrap().deregister(key);
|
||||||
|
println!("Conn: unregistered observer for key {:?}", key);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
@ -739,17 +805,42 @@ mod tests {
|
||||||
|
|
||||||
extern crate mentat_parser_utils;
|
extern crate mentat_parser_utils;
|
||||||
|
|
||||||
|
use std::cell::{
|
||||||
|
RefCell
|
||||||
|
};
|
||||||
|
use std::collections::{
|
||||||
|
BTreeSet,
|
||||||
|
};
|
||||||
|
use std::ops::Deref;
|
||||||
|
use std::rc::{
|
||||||
|
Rc,
|
||||||
|
};
|
||||||
use std::time::Instant;
|
use std::time::Instant;
|
||||||
|
|
||||||
use mentat_core::{
|
use mentat_core::{
|
||||||
TypedValue,
|
TypedValue,
|
||||||
};
|
};
|
||||||
use query::{
|
|
||||||
|
use ::entity_builder::{
|
||||||
|
BuildTerms,
|
||||||
|
};
|
||||||
|
|
||||||
|
use ::query::{
|
||||||
Variable,
|
Variable,
|
||||||
};
|
};
|
||||||
|
|
||||||
use ::QueryResults;
|
use ::QueryResults;
|
||||||
|
|
||||||
|
use ::vocabulary::{
|
||||||
|
AttributeBuilder,
|
||||||
|
Definition,
|
||||||
|
VersionedStore,
|
||||||
|
};
|
||||||
|
|
||||||
|
use ::vocabulary::attribute::{
|
||||||
|
Unique
|
||||||
|
};
|
||||||
|
|
||||||
use mentat_db::USER0;
|
use mentat_db::USER0;
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
@ -1057,4 +1148,203 @@ mod tests {
|
||||||
assert!(cached_elapsed_time < uncached_elapsed_time);
|
assert!(cached_elapsed_time < uncached_elapsed_time);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_register_observer() {
|
||||||
|
let mut sqlite = db::new_connection("").unwrap();
|
||||||
|
let mut conn = Conn::connect(&mut sqlite).unwrap();
|
||||||
|
|
||||||
|
let key = "Test Observer".to_string();
|
||||||
|
let tx_observer = TxObserver::new(BTreeSet::new(), move |_obs_key, _batch| {});
|
||||||
|
|
||||||
|
conn.register_observer(key.clone(), tx_observer);
|
||||||
|
assert!(conn.is_registered_as_observer(&key));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_deregister_observer() {
|
||||||
|
let mut sqlite = db::new_connection("").unwrap();
|
||||||
|
let mut conn = Conn::connect(&mut sqlite).unwrap();
|
||||||
|
|
||||||
|
let key = "Test Observer".to_string();
|
||||||
|
|
||||||
|
let tx_observer = TxObserver::new(BTreeSet::new(), move |_obs_key, _batch| {});
|
||||||
|
|
||||||
|
conn.register_observer(key.clone(), tx_observer);
|
||||||
|
assert!(conn.is_registered_as_observer(&key));
|
||||||
|
|
||||||
|
conn.unregister_observer(&key);
|
||||||
|
|
||||||
|
assert!(!conn.is_registered_as_observer(&key));
|
||||||
|
}
|
||||||
|
|
||||||
|
fn add_schema(conn: &mut Conn, mut sqlite: &mut rusqlite::Connection) {
|
||||||
|
// transact some schema
|
||||||
|
let mut in_progress = conn.begin_transaction(&mut sqlite).expect("expected in progress");
|
||||||
|
in_progress.ensure_vocabulary(&Definition {
|
||||||
|
name: kw!(:todo/items),
|
||||||
|
version: 1,
|
||||||
|
attributes: vec![
|
||||||
|
(kw!(:todo/uuid),
|
||||||
|
AttributeBuilder::new()
|
||||||
|
.value_type(ValueType::Uuid)
|
||||||
|
.multival(false)
|
||||||
|
.unique(Unique::Value)
|
||||||
|
.index(true)
|
||||||
|
.build()),
|
||||||
|
(kw!(:todo/name),
|
||||||
|
AttributeBuilder::new()
|
||||||
|
.value_type(ValueType::String)
|
||||||
|
.multival(false)
|
||||||
|
.fulltext(true)
|
||||||
|
.build()),
|
||||||
|
(kw!(:todo/completion_date),
|
||||||
|
AttributeBuilder::new()
|
||||||
|
.value_type(ValueType::Instant)
|
||||||
|
.multival(false)
|
||||||
|
.build()),
|
||||||
|
(kw!(:label/name),
|
||||||
|
AttributeBuilder::new()
|
||||||
|
.value_type(ValueType::String)
|
||||||
|
.multival(false)
|
||||||
|
.unique(Unique::Value)
|
||||||
|
.fulltext(true)
|
||||||
|
.index(true)
|
||||||
|
.build()),
|
||||||
|
(kw!(:label/color),
|
||||||
|
AttributeBuilder::new()
|
||||||
|
.value_type(ValueType::String)
|
||||||
|
.multival(false)
|
||||||
|
.build()),
|
||||||
|
],
|
||||||
|
}).expect("expected vocubulary");
|
||||||
|
in_progress.commit().expect("Expected vocabulary committed");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_observer_notified_on_registered_change() {
|
||||||
|
let mut sqlite = db::new_connection("").unwrap();
|
||||||
|
let mut conn = Conn::connect(&mut sqlite).unwrap();
|
||||||
|
add_schema(&mut conn, &mut sqlite);
|
||||||
|
|
||||||
|
let name_entid: Entid = conn.current_schema().get_entid(&kw!(:todo/name)).expect("entid to exist for name").into();
|
||||||
|
let date_entid: Entid = conn.current_schema().get_entid(&kw!(:todo/completion_date)).expect("entid to exist for completion_date").into();
|
||||||
|
let mut registered_attrs = BTreeSet::new();
|
||||||
|
registered_attrs.insert(name_entid.clone());
|
||||||
|
registered_attrs.insert(date_entid.clone());
|
||||||
|
|
||||||
|
let key = "Test Observing".to_string();
|
||||||
|
|
||||||
|
let txids = Rc::new(RefCell::new(Vec::new()));
|
||||||
|
let changes = Rc::new(RefCell::new(Vec::new()));
|
||||||
|
let called_key: Rc<RefCell<Option<String>>> = Rc::new(RefCell::new(None));
|
||||||
|
|
||||||
|
let mut_txids = Rc::clone(&txids);
|
||||||
|
let mut_changes = Rc::clone(&changes);
|
||||||
|
let mut_key = Rc::clone(&called_key);
|
||||||
|
let tx_observer = TxObserver::new(registered_attrs, move |obs_key, batch| {
|
||||||
|
let mut k = mut_key.borrow_mut();
|
||||||
|
*k = Some(obs_key.clone());
|
||||||
|
let mut t = mut_txids.borrow_mut();
|
||||||
|
let mut c = mut_changes.borrow_mut();
|
||||||
|
for report in batch.iter() {
|
||||||
|
t.push(report.tx_id.clone());
|
||||||
|
c.push(report.changeset.clone());
|
||||||
|
}
|
||||||
|
t.sort();
|
||||||
|
});
|
||||||
|
|
||||||
|
conn.register_observer(key.clone(), tx_observer);
|
||||||
|
assert!(conn.is_registered_as_observer(&key));
|
||||||
|
|
||||||
|
let mut tx_ids = Vec::new();
|
||||||
|
let mut changesets = Vec::new();
|
||||||
|
{
|
||||||
|
let mut in_progress = conn.begin_transaction(&mut sqlite).expect("expected transaction");
|
||||||
|
for i in 0..3 {
|
||||||
|
let name = format!("todo{}", i);
|
||||||
|
let uuid = Uuid::new_v4();
|
||||||
|
let mut builder = in_progress.builder().describe_tempid(&name);
|
||||||
|
builder.add_kw( &kw!(:todo/uuid), TypedValue::Uuid(uuid)).expect("Expected added uuid");
|
||||||
|
builder.add_kw(&kw!(:todo/name), TypedValue::typed_string(&name)).expect("Expected added name");
|
||||||
|
if i % 2 == 0 {
|
||||||
|
builder.add_kw(&kw!(:todo/completion_date), TypedValue::current_instant()).expect("Expected added date");
|
||||||
|
}
|
||||||
|
let (ip, r) = builder.transact();
|
||||||
|
let report = r.expect("expected a report");
|
||||||
|
tx_ids.push(report.tx_id.clone());
|
||||||
|
changesets.push(report.changeset.clone());
|
||||||
|
in_progress = ip;
|
||||||
|
}
|
||||||
|
let mut builder = in_progress.builder().describe_tempid("Label");
|
||||||
|
builder.add_kw(&kw!(:label/name), TypedValue::typed_string("Label 1")).expect("Expected added name");
|
||||||
|
builder.add_kw(&kw!(:label/color), TypedValue::typed_string("blue")).expect("Expected added color");
|
||||||
|
builder.commit().expect("expect transaction to occur");
|
||||||
|
}
|
||||||
|
|
||||||
|
let val = called_key.deref();
|
||||||
|
assert_eq!(val, &RefCell::new(Some(key.clone())));
|
||||||
|
let t = txids.deref();
|
||||||
|
assert_eq!(t, &RefCell::new(tx_ids));
|
||||||
|
|
||||||
|
let c = changes.deref();
|
||||||
|
assert_eq!(c, &RefCell::new(changesets));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_observer_not_notified_on_unregistered_change() {
|
||||||
|
let mut sqlite = db::new_connection("").unwrap();
|
||||||
|
let mut conn = Conn::connect(&mut sqlite).unwrap();
|
||||||
|
add_schema(&mut conn, &mut sqlite);
|
||||||
|
|
||||||
|
let name_entid: Entid = conn.current_schema().get_entid(&kw!(:todo/name)).expect("entid to exist for name").into();
|
||||||
|
let date_entid: Entid = conn.current_schema().get_entid(&kw!(:todo/completion_date)).expect("entid to exist for completion_date").into();
|
||||||
|
let mut registered_attrs = BTreeSet::new();
|
||||||
|
registered_attrs.insert(name_entid.clone());
|
||||||
|
registered_attrs.insert(date_entid.clone());
|
||||||
|
|
||||||
|
let key = "Test Observing".to_string();
|
||||||
|
|
||||||
|
let txids = Rc::new(RefCell::new(Vec::new()));
|
||||||
|
let changes = Rc::new(RefCell::new(Vec::new()));
|
||||||
|
let called_key: Rc<RefCell<Option<String>>> = Rc::new(RefCell::new(None));
|
||||||
|
|
||||||
|
let mut_txids = Rc::clone(&txids);
|
||||||
|
let mut_changes = Rc::clone(&changes);
|
||||||
|
let mut_key = Rc::clone(&called_key);
|
||||||
|
let tx_observer = TxObserver::new(registered_attrs, move |obs_key, batch| {
|
||||||
|
let mut k = mut_key.borrow_mut();
|
||||||
|
*k = Some(obs_key.clone());
|
||||||
|
let mut t = mut_txids.borrow_mut();
|
||||||
|
let mut c = mut_changes.borrow_mut();
|
||||||
|
for report in batch.iter() {
|
||||||
|
t.push(report.tx_id.clone());
|
||||||
|
c.push(report.changeset.clone());
|
||||||
|
}
|
||||||
|
t.sort();
|
||||||
|
});
|
||||||
|
|
||||||
|
conn.register_observer(key.clone(), tx_observer);
|
||||||
|
assert!(conn.is_registered_as_observer(&key));
|
||||||
|
|
||||||
|
{
|
||||||
|
let mut in_progress = conn.begin_transaction(&mut sqlite).expect("expected transaction");
|
||||||
|
for i in 0..3 {
|
||||||
|
let name = format!("label{}", i);
|
||||||
|
let mut builder = in_progress.builder().describe_tempid(&name);
|
||||||
|
builder.add_kw(&kw!(:label/name), TypedValue::typed_string(&name)).expect("Expected added name");
|
||||||
|
builder.add_kw(&kw!(:label/color), TypedValue::typed_string("blue")).expect("Expected added color");
|
||||||
|
let (ip, _) = builder.transact();
|
||||||
|
in_progress = ip;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let val = called_key.deref();
|
||||||
|
assert_eq!(val, &RefCell::new(None));
|
||||||
|
let t = txids.deref();
|
||||||
|
assert_eq!(t, &RefCell::new(vec![]));
|
||||||
|
|
||||||
|
let c = changes.deref();
|
||||||
|
assert_eq!(c, &RefCell::new(vec![]));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -54,6 +54,7 @@ pub use mentat_query::{
|
||||||
pub use mentat_db::{
|
pub use mentat_db::{
|
||||||
CORE_SCHEMA_VERSION,
|
CORE_SCHEMA_VERSION,
|
||||||
DB_SCHEMA_CORE,
|
DB_SCHEMA_CORE,
|
||||||
|
TxObserver,
|
||||||
TxReport,
|
TxReport,
|
||||||
new_connection,
|
new_connection,
|
||||||
};
|
};
|
||||||
|
|
|
@ -49,19 +49,27 @@ use tx_mapper::TxMapper;
|
||||||
// See https://github.com/mozilla/mentat/issues/571
|
// See https://github.com/mozilla/mentat/issues/571
|
||||||
// Below is some debug Android-friendly logging:
|
// Below is some debug Android-friendly logging:
|
||||||
|
|
||||||
// use std::os::raw::c_char;
|
use std::os::raw::c_char;
|
||||||
// use std::os::raw::c_int;
|
use std::os::raw::c_int;
|
||||||
// use std::ffi::CString;
|
use std::ffi::CString;
|
||||||
// pub const ANDROID_LOG_DEBUG: i32 = 3;
|
pub const ANDROID_LOG_DEBUG: i32 = 3;
|
||||||
// extern { pub fn __android_log_write(prio: c_int, tag: *const c_char, text: *const c_char) -> c_int; }
|
#[cfg(all(target_os="android", not(test)))]
|
||||||
|
extern { pub fn __android_log_write(prio: c_int, tag: *const c_char, text: *const c_char) -> c_int; }
|
||||||
|
|
||||||
|
#[cfg(all(target_os="android", not(test)))]
|
||||||
pub fn d(message: &str) {
|
pub fn d(message: &str) {
|
||||||
println!("d: {}", message);
|
let tag = "mentat_tolstoy::syncer";
|
||||||
// let message = CString::new(message).unwrap();
|
let message = CString::new(message).unwrap();
|
||||||
// let message = message.as_ptr();
|
let message = message.as_ptr();
|
||||||
// let tag = CString::new("RustyToodle").unwrap();
|
let tag = CString::new(tag).unwrap();
|
||||||
// let tag = tag.as_ptr();
|
let tag = tag.as_ptr();
|
||||||
// unsafe { __android_log_write(ANDROID_LOG_DEBUG, tag, message) };
|
unsafe { __android_log_write(ANDROID_LOG_DEBUG, tag, message) };
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(all(not(target_os="android")))]
|
||||||
|
pub fn d(message: &str) {
|
||||||
|
let tag = "mentat_tolstoy::syncer";
|
||||||
|
println!("d: {}: {}", tag, message);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct Syncer {}
|
pub struct Syncer {}
|
||||||
|
@ -88,11 +96,13 @@ impl TxReceiver for InquiringTxReceiver {
|
||||||
fn tx<T>(&mut self, tx_id: Entid, _datoms: &mut T) -> Result<()>
|
fn tx<T>(&mut self, tx_id: Entid, _datoms: &mut T) -> Result<()>
|
||||||
where T: Iterator<Item=TxPart> {
|
where T: Iterator<Item=TxPart> {
|
||||||
self.last_tx = Some(tx_id);
|
self.last_tx = Some(tx_id);
|
||||||
|
d(&format!("got new last_tx: {:?}", self.last_tx));
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn done(&mut self) -> Result<()> {
|
fn done(&mut self) -> Result<()> {
|
||||||
self.is_done = true;
|
self.is_done = true;
|
||||||
|
d(&format!("done!"));
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -194,6 +204,7 @@ impl Syncer {
|
||||||
let mut uploader = UploadingTxReceiver::new(remote_client, remote_head);
|
let mut uploader = UploadingTxReceiver::new(remote_client, remote_head);
|
||||||
Processor::process(db_tx, from_tx, &mut uploader)?;
|
Processor::process(db_tx, from_tx, &mut uploader)?;
|
||||||
if !uploader.is_done {
|
if !uploader.is_done {
|
||||||
|
d(&format!("upload_ours: TxProcessorUnfinished!"));
|
||||||
bail!(ErrorKind::TxProcessorUnfinished);
|
bail!(ErrorKind::TxProcessorUnfinished);
|
||||||
}
|
}
|
||||||
// Last tx uuid uploaded by the tx receiver.
|
// Last tx uuid uploaded by the tx receiver.
|
||||||
|
@ -214,11 +225,13 @@ impl Syncer {
|
||||||
|
|
||||||
fn download_theirs(_db_tx: &mut rusqlite::Transaction, remote_client: &RemoteClient, remote_head: &Uuid) -> Result<Vec<Tx>> {
|
fn download_theirs(_db_tx: &mut rusqlite::Transaction, remote_client: &RemoteClient, remote_head: &Uuid) -> Result<Vec<Tx>> {
|
||||||
let new_txs = remote_client.get_transactions(remote_head)?;
|
let new_txs = remote_client.get_transactions(remote_head)?;
|
||||||
|
d(&format!("there are {} new_txs on the remote client", new_txs.len()));
|
||||||
let mut tx_list = Vec::new();
|
let mut tx_list = Vec::new();
|
||||||
|
|
||||||
for tx in new_txs {
|
for tx in new_txs {
|
||||||
let mut tx_parts = Vec::new();
|
let mut tx_parts = Vec::new();
|
||||||
let chunks = remote_client.get_chunks(&tx)?;
|
let chunks = remote_client.get_chunks(&tx)?;
|
||||||
|
d(&format!("received {} chunks from remote client", chunks.len()));
|
||||||
|
|
||||||
// We pass along all of the downloaded parts, including transaction's
|
// We pass along all of the downloaded parts, including transaction's
|
||||||
// metadata datom. Transactor is expected to do the right thing, and
|
// metadata datom. Transactor is expected to do the right thing, and
|
||||||
|
@ -261,10 +274,20 @@ impl Syncer {
|
||||||
let mut inquiring_tx_receiver = InquiringTxReceiver::new();
|
let mut inquiring_tx_receiver = InquiringTxReceiver::new();
|
||||||
// TODO don't just start from the beginning... but then again, we should do this
|
// TODO don't just start from the beginning... but then again, we should do this
|
||||||
// without walking the table at all, and use the tx index.
|
// without walking the table at all, and use the tx index.
|
||||||
Processor::process(db_tx, None, &mut inquiring_tx_receiver)?;
|
let inq_res = Processor::process(db_tx, None, &mut inquiring_tx_receiver);
|
||||||
|
match inq_res {
|
||||||
|
Ok(_) => d(&format!("inquiry ok")),
|
||||||
|
Err(e) => {
|
||||||
|
d(&format!("inquiry err: {:?}", e));
|
||||||
|
return Err(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
d(&format!("After Processor::process inquiring_tx_receiver"));
|
||||||
if !inquiring_tx_receiver.is_done {
|
if !inquiring_tx_receiver.is_done {
|
||||||
|
d(&format!("!inquiring_tx_receiver.is_done"));
|
||||||
bail!(ErrorKind::TxProcessorUnfinished);
|
bail!(ErrorKind::TxProcessorUnfinished);
|
||||||
}
|
}
|
||||||
|
d(&format!("TxMapper::get... local head"));
|
||||||
let (have_local_changes, local_store_empty) = match inquiring_tx_receiver.last_tx {
|
let (have_local_changes, local_store_empty) = match inquiring_tx_receiver.last_tx {
|
||||||
Some(tx) => {
|
Some(tx) => {
|
||||||
match TxMapper::get(db_tx, tx)? {
|
match TxMapper::get(db_tx, tx)? {
|
||||||
|
@ -274,6 +297,7 @@ impl Syncer {
|
||||||
},
|
},
|
||||||
None => (false, true)
|
None => (false, true)
|
||||||
};
|
};
|
||||||
|
d(&format!("has_local_changes {:?}, local_empty {:?}", have_local_changes, local_store_empty));
|
||||||
|
|
||||||
// Check if the server is empty - populate it.
|
// Check if the server is empty - populate it.
|
||||||
if remote_head == Uuid::nil() {
|
if remote_head == Uuid::nil() {
|
||||||
|
@ -400,7 +424,6 @@ impl RemoteClient {
|
||||||
d(&format!("running..."));
|
d(&format!("running..."));
|
||||||
|
|
||||||
let head_json = core.run(work)?;
|
let head_json = core.run(work)?;
|
||||||
d(&format!("got head: {:?}", &head_json.head));
|
|
||||||
Ok(head_json.head)
|
Ok(head_json.head)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -49,6 +49,29 @@ where T: Sized + Iterator<Item=Result<TxPart>> + 't {
|
||||||
rows: &'t mut Peekable<T>,
|
rows: &'t mut Peekable<T>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
use std::os::raw::c_char;
|
||||||
|
use std::os::raw::c_int;
|
||||||
|
use std::ffi::CString;
|
||||||
|
pub const ANDROID_LOG_DEBUG: i32 = 3;
|
||||||
|
#[cfg(all(target_os="android", not(test)))]
|
||||||
|
extern { pub fn __android_log_write(prio: c_int, tag: *const c_char, text: *const c_char) -> c_int; }
|
||||||
|
|
||||||
|
#[cfg(all(target_os="android", not(test)))]
|
||||||
|
pub fn d(message: &str) {
|
||||||
|
let tag = "mentat_db::tx_processor";
|
||||||
|
let message = CString::new(message).unwrap();
|
||||||
|
let message = message.as_ptr();
|
||||||
|
let tag = CString::new(tag).unwrap();
|
||||||
|
let tag = tag.as_ptr();
|
||||||
|
unsafe { __android_log_write(ANDROID_LOG_DEBUG, tag, message) };
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(all(not(target_os="android")))]
|
||||||
|
pub fn d(message: &str) {
|
||||||
|
let tag = "mentat_db::tx_processor";
|
||||||
|
println!("d: {}: {}", tag, message);
|
||||||
|
}
|
||||||
|
|
||||||
impl<'dbtx, 't, T> DatomsIterator<'dbtx, 't, T>
|
impl<'dbtx, 't, T> DatomsIterator<'dbtx, 't, T>
|
||||||
where T: Sized + Iterator<Item=Result<TxPart>> + 't {
|
where T: Sized + Iterator<Item=Result<TxPart>> + 't {
|
||||||
fn new(first: &'dbtx TxPart, rows: &'t mut Peekable<T>) -> DatomsIterator<'dbtx, 't, T>
|
fn new(first: &'dbtx TxPart, rows: &'t mut Peekable<T>) -> DatomsIterator<'dbtx, 't, T>
|
||||||
|
@ -142,28 +165,40 @@ impl Processor {
|
||||||
let mut stmt = sqlite.prepare(&select_query)?;
|
let mut stmt = sqlite.prepare(&select_query)?;
|
||||||
|
|
||||||
let mut rows = stmt.query_and_then(&[], to_tx_part)?.peekable();
|
let mut rows = stmt.query_and_then(&[], to_tx_part)?.peekable();
|
||||||
let mut at_first_tx = true;
|
let mut at_tx = 1;
|
||||||
let mut current_tx = None;
|
let mut current_tx = None;
|
||||||
|
|
||||||
while let Some(row) = rows.next() {
|
while let Some(row) = rows.next() {
|
||||||
let datom = row?;
|
let datom = row?;
|
||||||
|
d(&format!("datom! {:?}", datom));
|
||||||
match current_tx {
|
match current_tx {
|
||||||
Some(tx) => {
|
Some(tx) => {
|
||||||
if tx != datom.tx {
|
if tx != datom.tx {
|
||||||
|
d(&format!("new tx! {:?}", datom.tx));
|
||||||
|
at_tx = at_tx + 1;
|
||||||
|
d(&format!("at_tx! {:?}", at_tx));
|
||||||
current_tx = Some(datom.tx);
|
current_tx = Some(datom.tx);
|
||||||
|
if at_tx <= 2 && skip_first_tx {
|
||||||
|
d(&format!("skipping subsequent"));
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
d(&format!("Some: calling receiver.tx"));
|
||||||
receiver.tx(
|
receiver.tx(
|
||||||
datom.tx,
|
datom.tx,
|
||||||
&mut DatomsIterator::new(&datom, &mut rows)
|
&mut DatomsIterator::new(&datom, &mut rows)
|
||||||
)?;
|
)?;
|
||||||
|
d(&format!("Some: returned from receiver.tx"));
|
||||||
|
} else {
|
||||||
|
d(&format!("skipping over datom in current tx block"));
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
None => {
|
None => {
|
||||||
current_tx = Some(datom.tx);
|
current_tx = Some(datom.tx);
|
||||||
if at_first_tx && skip_first_tx {
|
if at_tx <= 3 && skip_first_tx {
|
||||||
at_first_tx = false;
|
d(&format!("skipping first"));
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
d(&format!("None: calling receiver.tx"));
|
||||||
receiver.tx(
|
receiver.tx(
|
||||||
datom.tx,
|
datom.tx,
|
||||||
&mut DatomsIterator::new(&datom, &mut rows)
|
&mut DatomsIterator::new(&datom, &mut rows)
|
||||||
|
@ -171,7 +206,9 @@ impl Processor {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
d(&format!("calling receiver.done"));
|
||||||
receiver.done()?;
|
receiver.done()?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue