Replication syncing
This commit is contained in:
parent
36d455150d
commit
d9d2b3a89a
16 changed files with 431 additions and 96 deletions
|
@ -16,8 +16,9 @@ version = "0.6.1"
|
||||||
build = "build/version.rs"
|
build = "build/version.rs"
|
||||||
|
|
||||||
[features]
|
[features]
|
||||||
default = ["bundled_sqlite3"]
|
default = ["bundled_sqlite3", "syncable"]
|
||||||
bundled_sqlite3 = ["rusqlite/bundled"]
|
bundled_sqlite3 = ["rusqlite/bundled"]
|
||||||
|
syncable = ["mentat_tolstoy"]
|
||||||
|
|
||||||
[workspace]
|
[workspace]
|
||||||
members = ["tools/cli"]
|
members = ["tools/cli"]
|
||||||
|
@ -78,6 +79,7 @@ path = "tx-parser"
|
||||||
|
|
||||||
[dependencies.mentat_tolstoy]
|
[dependencies.mentat_tolstoy]
|
||||||
path = "tolstoy"
|
path = "tolstoy"
|
||||||
|
optional = true
|
||||||
|
|
||||||
[profile.release]
|
[profile.release]
|
||||||
debug = true
|
debug = true
|
||||||
|
|
18
db/src/db.rs
18
db/src/db.rs
|
@ -1115,6 +1115,7 @@ pub trait PartitionMapping {
|
||||||
fn allocate_entid<S: ?Sized + Ord + Display>(&mut self, partition: &S) -> i64 where String: Borrow<S>;
|
fn allocate_entid<S: ?Sized + Ord + Display>(&mut self, partition: &S) -> i64 where String: Borrow<S>;
|
||||||
fn allocate_entids<S: ?Sized + Ord + Display>(&mut self, partition: &S, n: usize) -> Range<i64> where String: Borrow<S>;
|
fn allocate_entids<S: ?Sized + Ord + Display>(&mut self, partition: &S, n: usize) -> Range<i64> where String: Borrow<S>;
|
||||||
fn contains_entid(&self, entid: Entid) -> bool;
|
fn contains_entid(&self, entid: Entid) -> bool;
|
||||||
|
fn expand_up_to<S: ?Sized + Ord + Display>(&mut self, partition: &S, entid: i64) where String: Borrow<S>;
|
||||||
}
|
}
|
||||||
|
|
||||||
impl PartitionMapping for PartitionMap {
|
impl PartitionMapping for PartitionMap {
|
||||||
|
@ -1136,6 +1137,23 @@ impl PartitionMapping for PartitionMap {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn expand_up_to<S: ?Sized + Ord + Display>(&mut self, partition: &S, entid: i64) where String: Borrow<S> {
|
||||||
|
match self.get_mut(partition) {
|
||||||
|
Some(partition) => {
|
||||||
|
// Don't honour requests to shrink the partition.
|
||||||
|
if partition.index > entid {
|
||||||
|
return ()
|
||||||
|
}
|
||||||
|
let new_index = entid + 1;
|
||||||
|
if partition.index != new_index {
|
||||||
|
partition.index = new_index;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
// This is a programming error.
|
||||||
|
None => panic!("Cannot expand unknown partition: {}", partition),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn contains_entid(&self, entid: Entid) -> bool {
|
fn contains_entid(&self, entid: Entid) -> bool {
|
||||||
self.values().any(|partition| partition.contains_entid(entid))
|
self.values().any(|partition| partition.contains_entid(entid))
|
||||||
}
|
}
|
||||||
|
|
|
@ -47,8 +47,7 @@ mod tx;
|
||||||
pub mod types;
|
pub mod types;
|
||||||
mod upsert_resolution;
|
mod upsert_resolution;
|
||||||
|
|
||||||
// Export these for reference from tests. cfg(test) should work, but doesn't.
|
// Export these for reference from sync code and tests.
|
||||||
// #[cfg(test)]
|
|
||||||
pub use bootstrap::{
|
pub use bootstrap::{
|
||||||
TX0,
|
TX0,
|
||||||
USER0,
|
USER0,
|
||||||
|
|
24
src/conn.rs
24
src/conn.rs
|
@ -52,6 +52,7 @@ use mentat_db::{
|
||||||
};
|
};
|
||||||
|
|
||||||
use mentat_db::internal_types::TermWithTempIds;
|
use mentat_db::internal_types::TermWithTempIds;
|
||||||
|
use mentat_db::db::PartitionMapping;
|
||||||
|
|
||||||
use mentat_tx;
|
use mentat_tx;
|
||||||
|
|
||||||
|
@ -59,10 +60,6 @@ use mentat_tx::entities::TempId;
|
||||||
|
|
||||||
use mentat_tx_parser;
|
use mentat_tx_parser;
|
||||||
|
|
||||||
use mentat_tolstoy::Syncer;
|
|
||||||
|
|
||||||
use uuid::Uuid;
|
|
||||||
|
|
||||||
use entity_builder::{
|
use entity_builder::{
|
||||||
InProgressBuilder,
|
InProgressBuilder,
|
||||||
};
|
};
|
||||||
|
@ -129,8 +126,8 @@ pub struct Conn {
|
||||||
/// A convenience wrapper around a single SQLite connection and a Conn. This is suitable
|
/// A convenience wrapper around a single SQLite connection and a Conn. This is suitable
|
||||||
/// for applications that don't require complex connection management.
|
/// for applications that don't require complex connection management.
|
||||||
pub struct Store {
|
pub struct Store {
|
||||||
|
pub sqlite: rusqlite::Connection,
|
||||||
conn: Conn,
|
conn: Conn,
|
||||||
sqlite: rusqlite::Connection,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Store {
|
impl Store {
|
||||||
|
@ -157,6 +154,12 @@ impl Store {
|
||||||
sqlite: connection,
|
sqlite: connection,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn fast_forward_user_partition(&mut self, new_head: Entid) -> Result<()> {
|
||||||
|
let mut metadata = self.conn.metadata.lock().unwrap();
|
||||||
|
metadata.partition_map.expand_up_to(":db.part/user", new_head);
|
||||||
|
db::update_partition_map(&mut self.sqlite, &metadata.partition_map).map_err(|e| e.into())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub trait Queryable {
|
pub trait Queryable {
|
||||||
|
@ -172,10 +175,6 @@ pub trait Queryable {
|
||||||
where E: Into<Entid>;
|
where E: Into<Entid>;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub trait Syncable {
|
|
||||||
fn sync(&mut self, server_uri: &String, user_uuid: &String) -> Result<()>;
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Represents an in-progress, not yet committed, set of changes to the store.
|
/// Represents an in-progress, not yet committed, set of changes to the store.
|
||||||
/// Call `commit` to commit your changes, or `rollback` to discard them.
|
/// Call `commit` to commit your changes, or `rollback` to discard them.
|
||||||
/// A transaction is held open until you do so.
|
/// A transaction is held open until you do so.
|
||||||
|
@ -493,13 +492,6 @@ pub enum CacheAction {
|
||||||
Deregister,
|
Deregister,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Syncable for Store {
|
|
||||||
fn sync(&mut self, server_uri: &String, user_uuid: &String) -> Result<()> {
|
|
||||||
let uuid = Uuid::parse_str(&user_uuid)?;
|
|
||||||
Ok(Syncer::flow(&mut self.sqlite, server_uri, &uuid)?)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Conn {
|
impl Conn {
|
||||||
// Intentionally not public.
|
// Intentionally not public.
|
||||||
fn new(partition_map: PartitionMap, schema: Schema) -> Conn {
|
fn new(partition_map: PartitionMap, schema: Schema) -> Conn {
|
||||||
|
|
|
@ -108,5 +108,10 @@ error_chain! {
|
||||||
description("provided value doesn't match value type")
|
description("provided value doesn't match value type")
|
||||||
display("provided value of type {} doesn't match attribute value type {}", provided, expected)
|
display("provided value of type {} doesn't match attribute value type {}", provided, expected)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
NotYetImplemented(t: String) {
|
||||||
|
description("not yet implemented")
|
||||||
|
display("not yet implemented: {}", t)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
12
src/lib.rs
12
src/lib.rs
|
@ -29,10 +29,12 @@ extern crate mentat_query_parser;
|
||||||
extern crate mentat_query_projector;
|
extern crate mentat_query_projector;
|
||||||
extern crate mentat_query_translator;
|
extern crate mentat_query_translator;
|
||||||
extern crate mentat_sql;
|
extern crate mentat_sql;
|
||||||
extern crate mentat_tolstoy;
|
|
||||||
extern crate mentat_tx;
|
extern crate mentat_tx;
|
||||||
extern crate mentat_tx_parser;
|
extern crate mentat_tx_parser;
|
||||||
|
|
||||||
|
#[cfg(feature = "syncable")]
|
||||||
|
extern crate mentat_tolstoy;
|
||||||
|
|
||||||
pub use mentat_core::{
|
pub use mentat_core::{
|
||||||
Attribute,
|
Attribute,
|
||||||
Entid,
|
Entid,
|
||||||
|
@ -95,6 +97,13 @@ pub mod conn;
|
||||||
pub mod query;
|
pub mod query;
|
||||||
pub mod entity_builder;
|
pub mod entity_builder;
|
||||||
|
|
||||||
|
#[cfg(feature = "syncable")]
|
||||||
|
pub mod sync;
|
||||||
|
|
||||||
|
pub fn get_name() -> String {
|
||||||
|
return String::from("mentat");
|
||||||
|
}
|
||||||
|
|
||||||
pub use query::{
|
pub use query::{
|
||||||
IntoResult,
|
IntoResult,
|
||||||
PlainSymbol,
|
PlainSymbol,
|
||||||
|
@ -115,7 +124,6 @@ pub use conn::{
|
||||||
InProgress,
|
InProgress,
|
||||||
Metadata,
|
Metadata,
|
||||||
Queryable,
|
Queryable,
|
||||||
Syncable,
|
|
||||||
Store,
|
Store,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
133
src/sync.rs
Normal file
133
src/sync.rs
Normal file
|
@ -0,0 +1,133 @@
|
||||||
|
// Copyright 2016 Mozilla
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
|
||||||
|
// this file except in compliance with the License. You may obtain a copy of the
|
||||||
|
// License at http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
// Unless required by applicable law or agreed to in writing, software distributed
|
||||||
|
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
|
||||||
|
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
|
||||||
|
// specific language governing permissions and limitations under the License.
|
||||||
|
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
use conn::Store;
|
||||||
|
use errors::{
|
||||||
|
Result,
|
||||||
|
ErrorKind,
|
||||||
|
};
|
||||||
|
|
||||||
|
use mentat_core::{
|
||||||
|
Entid,
|
||||||
|
KnownEntid,
|
||||||
|
};
|
||||||
|
use mentat_db as db;
|
||||||
|
|
||||||
|
use entity_builder::BuildTerms;
|
||||||
|
|
||||||
|
use mentat_tolstoy::{
|
||||||
|
Syncer,
|
||||||
|
SyncMetadataClient,
|
||||||
|
TxMapper,
|
||||||
|
};
|
||||||
|
use mentat_tolstoy::syncer::{
|
||||||
|
Tx,
|
||||||
|
SyncResult,
|
||||||
|
};
|
||||||
|
use mentat_tolstoy::metadata::HeadTrackable;
|
||||||
|
|
||||||
|
pub trait Syncable {
|
||||||
|
fn sync(&mut self, server_uri: &String, user_uuid: &String) -> Result<()>;
|
||||||
|
fn fast_forward_local(&mut self, txs: Vec<Tx>) -> Result<()>;
|
||||||
|
}
|
||||||
|
|
||||||
|
fn within_user_partition(entid: Entid) -> bool {
|
||||||
|
entid >= db::USER0 && entid < db::TX0
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Syncable for Store {
|
||||||
|
fn fast_forward_local(&mut self, txs: Vec<Tx>) -> Result<()> {
|
||||||
|
let mut last_tx_entid = None;
|
||||||
|
let mut last_tx_uuid = None;
|
||||||
|
|
||||||
|
// During fast-forwarding, we will insert datoms with known entids
|
||||||
|
// which, by definition, fall outside of our user partition.
|
||||||
|
// Once we've done with insertion, we need to ensure that user
|
||||||
|
// partition's next allocation will not overlap with just-inserted datoms.
|
||||||
|
// To allow for "holes" in the user partition (due to data excision),
|
||||||
|
// we track the highest incoming entid we saw, and expand our
|
||||||
|
// local partition to match.
|
||||||
|
// In absence of excision and implementation bugs, this should work
|
||||||
|
// just as if we counted number of incoming entids and expanded by
|
||||||
|
// that number instead.
|
||||||
|
let mut largest_endid_encountered = db::USER0;
|
||||||
|
|
||||||
|
for tx in txs {
|
||||||
|
let in_progress = self.begin_transaction()?;
|
||||||
|
let mut builder = in_progress.builder();
|
||||||
|
for part in tx.parts {
|
||||||
|
if part.added {
|
||||||
|
builder.add(KnownEntid(part.e), KnownEntid(part.a), part.v.clone())?;
|
||||||
|
} else {
|
||||||
|
builder.retract(KnownEntid(part.e), KnownEntid(part.a), part.v.clone())?;
|
||||||
|
}
|
||||||
|
// Ignore datoms that fall outside of the user partition:
|
||||||
|
if within_user_partition(part.e) && part.e > largest_endid_encountered {
|
||||||
|
largest_endid_encountered = part.e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let report = builder.commit()?;
|
||||||
|
last_tx_entid = Some(report.tx_id);
|
||||||
|
last_tx_uuid = Some(tx.tx.clone());
|
||||||
|
}
|
||||||
|
|
||||||
|
// We've just transacted a new tx, and generated a new tx entid.
|
||||||
|
// Map it to the corresponding incoming tx uuid, advance our
|
||||||
|
// "locally known remote head".
|
||||||
|
if let Some(uuid) = last_tx_uuid {
|
||||||
|
if let Some(entid) = last_tx_entid {
|
||||||
|
{
|
||||||
|
let mut db_tx = self.sqlite.transaction()?;
|
||||||
|
SyncMetadataClient::set_remote_head(&mut db_tx, &uuid)?;
|
||||||
|
TxMapper::set_tx_uuid(&mut db_tx, entid, &uuid)?;
|
||||||
|
db_tx.commit()?;
|
||||||
|
}
|
||||||
|
|
||||||
|
// only need to advance the user partition, since we're using KnownEntid and partition won't
|
||||||
|
// get auto-updated; shouldn't be a problem for tx partition, since we're relying on the builder
|
||||||
|
// to create a tx and advance the partition for us.
|
||||||
|
self.fast_forward_user_partition(largest_endid_encountered)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn sync(&mut self, server_uri: &String, user_uuid: &String) -> Result<()> {
|
||||||
|
let uuid = Uuid::parse_str(&user_uuid)?;
|
||||||
|
|
||||||
|
let sync_result;
|
||||||
|
{
|
||||||
|
let mut db_tx = self.sqlite.transaction()?;
|
||||||
|
sync_result = Syncer::flow(&mut db_tx, server_uri, &uuid)?;
|
||||||
|
|
||||||
|
// TODO this should be done _after_ all of the operations below conclude!
|
||||||
|
// Commits any changes Syncer made (schema, updated heads, tu mappings during an upload, etc)
|
||||||
|
db_tx.commit()?;
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO These operations need to borrow self as mutable; but we already borrow it for db_tx above,
|
||||||
|
// and so for now we split up sync into multiple db transactions /o\
|
||||||
|
// Fixing this likely involves either implementing flow on InProgress, or changing flow to
|
||||||
|
// take an InProgress instead of a raw sql transaction.
|
||||||
|
|
||||||
|
match sync_result {
|
||||||
|
SyncResult::EmptyServer => Ok(()),
|
||||||
|
SyncResult::NoChanges => Ok(()),
|
||||||
|
SyncResult::ServerFastForward => Ok(()),
|
||||||
|
SyncResult::Merge => bail!(ErrorKind::NotYetImplemented(
|
||||||
|
format!("Can't sync against diverged local.")
|
||||||
|
)),
|
||||||
|
SyncResult::LocalFastForward(txs) => self.fast_forward_local(txs)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -97,12 +97,12 @@ fn test_reader() {
|
||||||
let mut conn = Conn::connect(&mut c).expect("Couldn't open DB.");
|
let mut conn = Conn::connect(&mut c).expect("Couldn't open DB.");
|
||||||
{
|
{
|
||||||
let db_tx = c.transaction().expect("db tx");
|
let db_tx = c.transaction().expect("db tx");
|
||||||
// Don't inspect the bootstrap transaction, but we'd like to see it's there.
|
// Ensure that the first (bootstrap) transaction is skipped over.
|
||||||
let mut receiver = TxCountingReceiver::new();
|
let mut receiver = TxCountingReceiver::new();
|
||||||
assert_eq!(false, receiver.is_done);
|
assert_eq!(false, receiver.is_done);
|
||||||
Processor::process(&db_tx, None, &mut receiver).expect("processor");
|
Processor::process(&db_tx, None, &mut receiver).expect("processor");
|
||||||
assert_eq!(true, receiver.is_done);
|
assert_eq!(true, receiver.is_done);
|
||||||
assert_eq!(1, receiver.tx_count);
|
assert_eq!(0, receiver.tx_count);
|
||||||
}
|
}
|
||||||
|
|
||||||
let ids = conn.transact(&mut c, r#"[
|
let ids = conn.transact(&mut c, r#"[
|
||||||
|
@ -112,7 +112,7 @@ fn test_reader() {
|
||||||
]"#).expect("successful transaction").tempids;
|
]"#).expect("successful transaction").tempids;
|
||||||
let numba_entity_id = ids.get("s").unwrap();
|
let numba_entity_id = ids.get("s").unwrap();
|
||||||
|
|
||||||
let mut bootstrap_tx = None;
|
let first_tx;
|
||||||
{
|
{
|
||||||
let db_tx = c.transaction().expect("db tx");
|
let db_tx = c.transaction().expect("db tx");
|
||||||
// Expect to see one more transaction of four parts (one for tx datom itself).
|
// Expect to see one more transaction of four parts (one for tx datom itself).
|
||||||
|
@ -121,10 +121,10 @@ fn test_reader() {
|
||||||
|
|
||||||
println!("{:#?}", receiver);
|
println!("{:#?}", receiver);
|
||||||
|
|
||||||
assert_eq!(2, receiver.txes.keys().count());
|
assert_eq!(1, receiver.txes.keys().count());
|
||||||
assert_tx_datoms_count(&receiver, 1, 4);
|
assert_tx_datoms_count(&receiver, 0, 4);
|
||||||
|
|
||||||
bootstrap_tx = Some(*receiver.txes.keys().nth(0).expect("bootstrap tx"));
|
first_tx = Some(*receiver.txes.keys().nth(0).expect("first tx"));
|
||||||
}
|
}
|
||||||
|
|
||||||
let ids = conn.transact(&mut c, r#"[
|
let ids = conn.transact(&mut c, r#"[
|
||||||
|
@ -138,14 +138,14 @@ fn test_reader() {
|
||||||
// Expect to see a single two part transaction
|
// Expect to see a single two part transaction
|
||||||
let mut receiver = TestingReceiver::new();
|
let mut receiver = TestingReceiver::new();
|
||||||
|
|
||||||
// Note that we're asking for the bootstrap tx to be skipped by the processor.
|
// Note that we're asking for the first transacted tx to be skipped by the processor.
|
||||||
Processor::process(&db_tx, bootstrap_tx, &mut receiver).expect("processor");
|
Processor::process(&db_tx, first_tx, &mut receiver).expect("processor");
|
||||||
|
|
||||||
assert_eq!(2, receiver.txes.keys().count());
|
assert_eq!(1, receiver.txes.keys().count());
|
||||||
assert_tx_datoms_count(&receiver, 1, 2);
|
assert_tx_datoms_count(&receiver, 0, 2);
|
||||||
|
|
||||||
// Inspect the transaction part.
|
// Inspect the transaction part.
|
||||||
let tx_id = receiver.txes.keys().nth(1).expect("tx");
|
let tx_id = receiver.txes.keys().nth(0).expect("tx");
|
||||||
let datoms = receiver.txes.get(tx_id).expect("datoms");
|
let datoms = receiver.txes.get(tx_id).expect("datoms");
|
||||||
let part = &datoms[0];
|
let part = &datoms[0];
|
||||||
|
|
||||||
|
|
|
@ -49,11 +49,6 @@ error_chain! {
|
||||||
display("encountered unexpected state: {}", t)
|
display("encountered unexpected state: {}", t)
|
||||||
}
|
}
|
||||||
|
|
||||||
NotYetImplemented(t: String) {
|
|
||||||
description("not yet implemented")
|
|
||||||
display("not yet implemented: {}", t)
|
|
||||||
}
|
|
||||||
|
|
||||||
DuplicateMetadata(k: String) {
|
DuplicateMetadata(k: String) {
|
||||||
description("encountered more than one metadata value for key")
|
description("encountered more than one metadata value for key")
|
||||||
display("encountered more than one metadata value for key: {}", k)
|
display("encountered more than one metadata value for key: {}", k)
|
||||||
|
|
|
@ -39,7 +39,9 @@ pub mod tx_processor;
|
||||||
pub mod errors;
|
pub mod errors;
|
||||||
pub mod syncer;
|
pub mod syncer;
|
||||||
pub mod tx_mapper;
|
pub mod tx_mapper;
|
||||||
|
pub use tx_mapper::TxMapper;
|
||||||
pub use syncer::Syncer;
|
pub use syncer::Syncer;
|
||||||
|
pub use metadata::SyncMetadataClient;
|
||||||
pub use errors::{
|
pub use errors::{
|
||||||
Error,
|
Error,
|
||||||
ErrorKind,
|
ErrorKind,
|
||||||
|
|
|
@ -54,16 +54,16 @@ mod tests {
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_get_remote_head_default() {
|
fn test_get_remote_head_default() {
|
||||||
let mut conn = schema::tests::setup_conn();
|
let mut conn = schema::tests::setup_conn_bare();
|
||||||
let tx = conn.transaction().expect("db tx");
|
let tx = schema::tests::setup_tx(&mut conn);
|
||||||
assert_eq!(Uuid::nil(), SyncMetadataClient::remote_head(&tx).expect("fetch succeeded"));
|
assert_eq!(Uuid::nil(), SyncMetadataClient::remote_head(&tx).expect("fetch succeeded"));
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_set_and_get_remote_head() {
|
fn test_set_and_get_remote_head() {
|
||||||
let mut conn = schema::tests::setup_conn();
|
let mut conn = schema::tests::setup_conn_bare();
|
||||||
|
let tx = schema::tests::setup_tx(&mut conn);
|
||||||
let uuid = Uuid::new_v4();
|
let uuid = Uuid::new_v4();
|
||||||
let tx = conn.transaction().expect("db tx");
|
|
||||||
SyncMetadataClient::set_remote_head(&tx, &uuid).expect("update succeeded");
|
SyncMetadataClient::set_remote_head(&tx, &uuid).expect("update succeeded");
|
||||||
assert_eq!(uuid, SyncMetadataClient::remote_head(&tx).expect("fetch succeeded"));
|
assert_eq!(uuid, SyncMetadataClient::remote_head(&tx).expect("fetch succeeded"));
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,15 +24,13 @@ lazy_static! {
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn ensure_current_version(conn: &mut rusqlite::Connection) -> Result<()> {
|
pub fn ensure_current_version(tx: &mut rusqlite::Transaction) -> Result<()> {
|
||||||
let tx = conn.transaction()?;
|
|
||||||
|
|
||||||
for statement in (&SCHEMA_STATEMENTS).iter() {
|
for statement in (&SCHEMA_STATEMENTS).iter() {
|
||||||
tx.execute(statement, &[])?;
|
tx.execute(statement, &[])?;
|
||||||
}
|
}
|
||||||
|
|
||||||
tx.execute("INSERT OR IGNORE INTO tolstoy_metadata (key, value) VALUES (?, zeroblob(16))", &[&REMOTE_HEAD_KEY])?;
|
tx.execute("INSERT OR IGNORE INTO tolstoy_metadata (key, value) VALUES (?, zeroblob(16))", &[&REMOTE_HEAD_KEY])?;
|
||||||
tx.commit().map_err(|e| e.into())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
@ -40,7 +38,7 @@ pub mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
fn setup_conn_bare() -> rusqlite::Connection {
|
pub fn setup_conn_bare() -> rusqlite::Connection {
|
||||||
let conn = rusqlite::Connection::open_in_memory().unwrap();
|
let conn = rusqlite::Connection::open_in_memory().unwrap();
|
||||||
|
|
||||||
conn.execute_batch("
|
conn.execute_batch("
|
||||||
|
@ -54,19 +52,24 @@ pub mod tests {
|
||||||
conn
|
conn
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn setup_conn() -> rusqlite::Connection {
|
pub fn setup_tx_bare<'a>(conn: &'a mut rusqlite::Connection) -> rusqlite::Transaction<'a> {
|
||||||
let mut conn = setup_conn_bare();
|
conn.transaction().expect("tx")
|
||||||
ensure_current_version(&mut conn).expect("connection setup");
|
}
|
||||||
conn
|
|
||||||
|
pub fn setup_tx<'a>(conn: &'a mut rusqlite::Connection) -> rusqlite::Transaction<'a> {
|
||||||
|
let mut tx = conn.transaction().expect("tx");
|
||||||
|
ensure_current_version(&mut tx).expect("connection setup");
|
||||||
|
tx
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_empty() {
|
fn test_empty() {
|
||||||
let mut conn = setup_conn_bare();
|
let mut conn = setup_conn_bare();
|
||||||
|
let mut tx = setup_tx_bare(&mut conn);
|
||||||
|
|
||||||
assert!(ensure_current_version(&mut conn).is_ok());
|
assert!(ensure_current_version(&mut tx).is_ok());
|
||||||
|
|
||||||
let mut stmt = conn.prepare("SELECT key FROM tolstoy_metadata WHERE value = zeroblob(16)").unwrap();
|
let mut stmt = tx.prepare("SELECT key FROM tolstoy_metadata WHERE value = zeroblob(16)").unwrap();
|
||||||
let mut keys_iter = stmt.query_map(&[], |r| r.get(0)).expect("query works");
|
let mut keys_iter = stmt.query_map(&[], |r| r.get(0)).expect("query works");
|
||||||
|
|
||||||
let first: Result<String> = keys_iter.next().unwrap().map_err(|e| e.into());
|
let first: Result<String> = keys_iter.next().unwrap().map_err(|e| e.into());
|
||||||
|
@ -82,27 +85,23 @@ pub mod tests {
|
||||||
#[test]
|
#[test]
|
||||||
fn test_non_empty() {
|
fn test_non_empty() {
|
||||||
let mut conn = setup_conn_bare();
|
let mut conn = setup_conn_bare();
|
||||||
|
let mut tx = setup_tx_bare(&mut conn);
|
||||||
|
|
||||||
assert!(ensure_current_version(&mut conn).is_ok());
|
assert!(ensure_current_version(&mut tx).is_ok());
|
||||||
|
|
||||||
let test_uuid = Uuid::new_v4();
|
let test_uuid = Uuid::new_v4();
|
||||||
{
|
{
|
||||||
let tx = conn.transaction().unwrap();
|
|
||||||
let uuid_bytes = test_uuid.as_bytes().to_vec();
|
let uuid_bytes = test_uuid.as_bytes().to_vec();
|
||||||
match tx.execute("UPDATE tolstoy_metadata SET value = ? WHERE key = ?", &[&uuid_bytes, &REMOTE_HEAD_KEY]) {
|
match tx.execute("UPDATE tolstoy_metadata SET value = ? WHERE key = ?", &[&uuid_bytes, &REMOTE_HEAD_KEY]) {
|
||||||
Err(e) => panic!("Error running an update: {}", e),
|
Err(e) => panic!("Error running an update: {}", e),
|
||||||
_ => ()
|
_ => ()
|
||||||
}
|
}
|
||||||
match tx.commit() {
|
|
||||||
Err(e) => panic!("Error committing an update: {}", e),
|
|
||||||
_ => ()
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
assert!(ensure_current_version(&mut conn).is_ok());
|
assert!(ensure_current_version(&mut tx).is_ok());
|
||||||
|
|
||||||
// Check that running ensure_current_version on an initialized conn doesn't change anything.
|
// Check that running ensure_current_version on an initialized conn doesn't change anything.
|
||||||
let mut stmt = conn.prepare("SELECT value FROM tolstoy_metadata").unwrap();
|
let mut stmt = tx.prepare("SELECT value FROM tolstoy_metadata").unwrap();
|
||||||
let mut values_iter = stmt.query_map(&[], |r| {
|
let mut values_iter = stmt.query_map(&[], |r| {
|
||||||
let raw_uuid: Vec<u8> = r.get(0);
|
let raw_uuid: Vec<u8> = r.get(0);
|
||||||
Uuid::from_bytes(raw_uuid.as_slice()).unwrap()
|
Uuid::from_bytes(raw_uuid.as_slice()).unwrap()
|
||||||
|
|
|
@ -174,8 +174,23 @@ impl<'c> TxReceiver for UploadingTxReceiver<'c> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// For returning out of the downloader as an ordered list.
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct Tx {
|
||||||
|
pub tx: Uuid,
|
||||||
|
pub parts: Vec<TxPart>,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub enum SyncResult {
|
||||||
|
EmptyServer,
|
||||||
|
NoChanges,
|
||||||
|
ServerFastForward,
|
||||||
|
LocalFastForward(Vec<Tx>),
|
||||||
|
Merge,
|
||||||
|
}
|
||||||
|
|
||||||
impl Syncer {
|
impl Syncer {
|
||||||
fn upload_ours(db_tx: &mut rusqlite::Transaction, from_tx: Option<Entid>, remote_client: &RemoteClient, remote_head: &Uuid) -> Result<()> {
|
fn fast_forward_server(db_tx: &mut rusqlite::Transaction, from_tx: Option<Entid>, remote_client: &RemoteClient, remote_head: &Uuid) -> Result<()> {
|
||||||
let mut uploader = UploadingTxReceiver::new(remote_client, remote_head);
|
let mut uploader = UploadingTxReceiver::new(remote_client, remote_head);
|
||||||
Processor::process(db_tx, from_tx, &mut uploader)?;
|
Processor::process(db_tx, from_tx, &mut uploader)?;
|
||||||
if !uploader.is_done {
|
if !uploader.is_done {
|
||||||
|
@ -197,19 +212,45 @@ impl Syncer {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn flow(sqlite: &mut rusqlite::Connection, server_uri: &String, user_uuid: &Uuid) -> Result<()> {
|
fn download_theirs(_db_tx: &mut rusqlite::Transaction, remote_client: &RemoteClient, remote_head: &Uuid) -> Result<Vec<Tx>> {
|
||||||
|
let new_txs = remote_client.get_transactions(remote_head)?;
|
||||||
|
let mut tx_list = Vec::new();
|
||||||
|
|
||||||
|
for tx in new_txs {
|
||||||
|
let mut tx_parts = Vec::new();
|
||||||
|
let chunks = remote_client.get_chunks(&tx)?;
|
||||||
|
|
||||||
|
// We pass along all of the downloaded parts, including transaction's
|
||||||
|
// metadata datom. Transactor is expected to do the right thing, and
|
||||||
|
// use txInstant from one of our datoms.
|
||||||
|
for chunk in chunks {
|
||||||
|
let part = remote_client.get_chunk(&chunk)?;
|
||||||
|
tx_parts.push(part);
|
||||||
|
}
|
||||||
|
|
||||||
|
tx_list.push(Tx {
|
||||||
|
tx: tx,
|
||||||
|
parts: tx_parts
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
d(&format!("got tx list: {:?}", &tx_list));
|
||||||
|
|
||||||
|
Ok(tx_list)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn flow(db_tx: &mut rusqlite::Transaction, server_uri: &String, user_uuid: &Uuid) -> Result<SyncResult> {
|
||||||
d(&format!("sync flowing"));
|
d(&format!("sync flowing"));
|
||||||
|
|
||||||
ensure_current_version(sqlite)?;
|
ensure_current_version(db_tx)?;
|
||||||
|
|
||||||
// TODO configure this sync with some auth data
|
// TODO configure this sync with some auth data
|
||||||
let remote_client = RemoteClient::new(server_uri.clone(), user_uuid.clone());
|
let remote_client = RemoteClient::new(server_uri.clone(), user_uuid.clone());
|
||||||
let mut db_tx = sqlite.transaction()?;
|
|
||||||
|
|
||||||
let remote_head = remote_client.get_head()?;
|
let remote_head = remote_client.get_head()?;
|
||||||
d(&format!("remote head {:?}", remote_head));
|
d(&format!("remote head {:?}", remote_head));
|
||||||
|
|
||||||
let locally_known_remote_head = SyncMetadataClient::remote_head(&db_tx)?;
|
let locally_known_remote_head = SyncMetadataClient::remote_head(db_tx)?;
|
||||||
d(&format!("local head {:?}", locally_known_remote_head));
|
d(&format!("local head {:?}", locally_known_remote_head));
|
||||||
|
|
||||||
// Local head: latest transaction that we have in the store,
|
// Local head: latest transaction that we have in the store,
|
||||||
|
@ -220,24 +261,25 @@ impl Syncer {
|
||||||
let mut inquiring_tx_receiver = InquiringTxReceiver::new();
|
let mut inquiring_tx_receiver = InquiringTxReceiver::new();
|
||||||
// TODO don't just start from the beginning... but then again, we should do this
|
// TODO don't just start from the beginning... but then again, we should do this
|
||||||
// without walking the table at all, and use the tx index.
|
// without walking the table at all, and use the tx index.
|
||||||
Processor::process(&db_tx, None, &mut inquiring_tx_receiver)?;
|
Processor::process(db_tx, None, &mut inquiring_tx_receiver)?;
|
||||||
if !inquiring_tx_receiver.is_done {
|
if !inquiring_tx_receiver.is_done {
|
||||||
bail!(ErrorKind::TxProcessorUnfinished);
|
bail!(ErrorKind::TxProcessorUnfinished);
|
||||||
}
|
}
|
||||||
let have_local_changes = match inquiring_tx_receiver.last_tx {
|
let (have_local_changes, local_store_empty) = match inquiring_tx_receiver.last_tx {
|
||||||
Some(tx) => {
|
Some(tx) => {
|
||||||
match TxMapper::get(&db_tx, tx)? {
|
match TxMapper::get(db_tx, tx)? {
|
||||||
Some(_) => false,
|
Some(_) => (false, false),
|
||||||
None => true
|
None => (true, false)
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
None => false
|
None => (false, true)
|
||||||
};
|
};
|
||||||
|
|
||||||
// Check if the server is empty - populate it.
|
// Check if the server is empty - populate it.
|
||||||
if remote_head == Uuid::nil() {
|
if remote_head == Uuid::nil() {
|
||||||
d(&format!("empty server!"));
|
d(&format!("empty server!"));
|
||||||
Syncer::upload_ours(&mut db_tx, None, &remote_client, &remote_head)?;
|
Syncer::fast_forward_server(db_tx, None, &remote_client, &remote_head)?;
|
||||||
|
return Ok(SyncResult::EmptyServer);
|
||||||
|
|
||||||
// Check if the server is the same as us, and if our HEAD moved.
|
// Check if the server is the same as us, and if our HEAD moved.
|
||||||
} else if locally_known_remote_head == remote_head {
|
} else if locally_known_remote_head == remote_head {
|
||||||
|
@ -245,36 +287,38 @@ impl Syncer {
|
||||||
|
|
||||||
if !have_local_changes {
|
if !have_local_changes {
|
||||||
d(&format!("local HEAD did not move. Nothing to do!"));
|
d(&format!("local HEAD did not move. Nothing to do!"));
|
||||||
return Ok(());
|
return Ok(SyncResult::NoChanges);
|
||||||
}
|
}
|
||||||
|
|
||||||
d(&format!("local HEAD moved."));
|
d(&format!("local HEAD moved."));
|
||||||
// TODO it's possible that we've successfully advanced remote head previously,
|
// TODO it's possible that we've successfully advanced remote head previously,
|
||||||
// but failed to advance our own local head. If that's the case, and we can recognize it,
|
// but failed to advance our own local head. If that's the case, and we can recognize it,
|
||||||
// our sync becomes just bumping our local head. AFAICT below would currently fail.
|
// our sync becomes just bumping our local head. AFAICT below would currently fail.
|
||||||
if let Some(upload_from_tx) = TxMapper::get_tx_for_uuid(&db_tx, &locally_known_remote_head)? {
|
if let Some(upload_from_tx) = TxMapper::get_tx_for_uuid(db_tx, &locally_known_remote_head)? {
|
||||||
d(&format!("Fast-forwarding the server."));
|
d(&format!("Fast-forwarding the server."));
|
||||||
Syncer::upload_ours(&mut db_tx, Some(upload_from_tx), &remote_client, &remote_head)?;
|
Syncer::fast_forward_server(db_tx, Some(upload_from_tx), &remote_client, &remote_head)?;
|
||||||
|
return Ok(SyncResult::ServerFastForward);
|
||||||
} else {
|
} else {
|
||||||
d(&format!("Unable to fast-forward the server; missing local tx mapping"));
|
d(&format!("Unable to fast-forward the server; missing local tx mapping"));
|
||||||
bail!(ErrorKind::TxIncorrectlyMapped(0));
|
bail!(ErrorKind::TxIncorrectlyMapped(0));
|
||||||
}
|
}
|
||||||
|
|
||||||
// We diverged from the server.
|
// We diverged from the server. If we're lucky, we can just fast-forward local.
|
||||||
// We'll need to rebase/merge ourselves on top of it.
|
// Otherwise, a merge (or a rebase) is required.
|
||||||
} else {
|
} else {
|
||||||
d(&format!("server changed since last sync."));
|
d(&format!("server changed since last sync."));
|
||||||
|
|
||||||
bail!(ErrorKind::NotYetImplemented(
|
// TODO local store moved forward since we last synced. Need to merge or rebase.
|
||||||
format!("Can't yet sync against changed server. Local head {:?}, remote head {:?}", locally_known_remote_head, remote_head)
|
if !local_store_empty && have_local_changes {
|
||||||
|
return Ok(SyncResult::Merge);
|
||||||
|
}
|
||||||
|
|
||||||
|
d(&format!("fast-forwarding local store."));
|
||||||
|
return Ok(SyncResult::LocalFastForward(
|
||||||
|
Syncer::download_theirs(db_tx, &remote_client, &locally_known_remote_head)?
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
// Our caller will commit the tx with our changes when it's done.
|
||||||
// Commit everything, if there's anything to commit!
|
|
||||||
// Any new tx->uuid mappings and the new HEAD. We're synced!
|
|
||||||
db_tx.commit()?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -289,9 +333,24 @@ struct SerializedTransaction<'a> {
|
||||||
chunks: &'a Vec<Uuid>
|
chunks: &'a Vec<Uuid>
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Deserialize)]
|
||||||
|
struct DeserializableTransaction {
|
||||||
|
parent: Uuid,
|
||||||
|
chunks: Vec<Uuid>,
|
||||||
|
id: Uuid,
|
||||||
|
seq: i64,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Deserialize)]
|
||||||
|
struct SerializedTransactions {
|
||||||
|
limit: i64,
|
||||||
|
from: Uuid,
|
||||||
|
transactions: Vec<Uuid>,
|
||||||
|
}
|
||||||
|
|
||||||
struct RemoteClient {
|
struct RemoteClient {
|
||||||
base_uri: String,
|
base_uri: String,
|
||||||
user_uuid: Uuid
|
user_uuid: Uuid,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -308,9 +367,14 @@ impl RemoteClient {
|
||||||
format!("{}/{}", self.base_uri, self.user_uuid)
|
format!("{}/{}", self.base_uri, self.user_uuid)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO what we want is a method that returns a deserialized json structure.
|
||||||
|
// It'll need a type T so that consumers can specify what downloaded json will
|
||||||
|
// map to. I ran into borrow issues doing that - probably need to restructure
|
||||||
|
// this and use PhantomData markers or somesuch.
|
||||||
|
// But for now, we get code duplication.
|
||||||
fn get_uuid(&self, uri: String) -> Result<Uuid> {
|
fn get_uuid(&self, uri: String) -> Result<Uuid> {
|
||||||
let mut core = Core::new()?;
|
let mut core = Core::new()?;
|
||||||
// TODO enable TLS, see https://github.com/mozilla/mentat/issues/569
|
// TODO https://github.com/mozilla/mentat/issues/569
|
||||||
// let client = hyper::Client::configure()
|
// let client = hyper::Client::configure()
|
||||||
// .connector(hyper_tls::HttpsConnector::new(4, &core.handle()).unwrap())
|
// .connector(hyper_tls::HttpsConnector::new(4, &core.handle()).unwrap())
|
||||||
// .build(&core.handle());
|
// .build(&core.handle());
|
||||||
|
@ -326,10 +390,10 @@ impl RemoteClient {
|
||||||
println!("Response: {}", res.status());
|
println!("Response: {}", res.status());
|
||||||
|
|
||||||
res.body().concat2().and_then(move |body| {
|
res.body().concat2().and_then(move |body| {
|
||||||
let head_json: SerializedHead = serde_json::from_slice(&body).map_err(|e| {
|
let json: SerializedHead = serde_json::from_slice(&body).map_err(|e| {
|
||||||
std::io::Error::new(std::io::ErrorKind::Other, e)
|
std::io::Error::new(std::io::ErrorKind::Other, e)
|
||||||
})?;
|
})?;
|
||||||
Ok(head_json)
|
Ok(json)
|
||||||
})
|
})
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -343,7 +407,7 @@ impl RemoteClient {
|
||||||
fn put<T>(&self, uri: String, payload: T, expected: StatusCode) -> Result<()>
|
fn put<T>(&self, uri: String, payload: T, expected: StatusCode) -> Result<()>
|
||||||
where hyper::Body: std::convert::From<T>, {
|
where hyper::Body: std::convert::From<T>, {
|
||||||
let mut core = Core::new()?;
|
let mut core = Core::new()?;
|
||||||
// TODO enable TLS, see https://github.com/mozilla/mentat/issues/569
|
// TODO https://github.com/mozilla/mentat/issues/569
|
||||||
// let client = hyper::Client::configure()
|
// let client = hyper::Client::configure()
|
||||||
// .connector(hyper_tls::HttpsConnector::new(4, &core.handle()).unwrap())
|
// .connector(hyper_tls::HttpsConnector::new(4, &core.handle()).unwrap())
|
||||||
// .build(&core.handle());
|
// .build(&core.handle());
|
||||||
|
@ -372,6 +436,105 @@ impl RemoteClient {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn get_transactions(&self, parent_uuid: &Uuid) -> Result<Vec<Uuid>> {
|
||||||
|
let mut core = Core::new()?;
|
||||||
|
// TODO https://github.com/mozilla/mentat/issues/569
|
||||||
|
// let client = hyper::Client::configure()
|
||||||
|
// .connector(hyper_tls::HttpsConnector::new(4, &core.handle()).unwrap())
|
||||||
|
// .build(&core.handle());
|
||||||
|
let client = hyper::Client::new(&core.handle());
|
||||||
|
|
||||||
|
d(&format!("client"));
|
||||||
|
|
||||||
|
let uri = format!("{}/transactions?from={}", self.bound_base_uri(), parent_uuid);
|
||||||
|
let uri = uri.parse()?;
|
||||||
|
|
||||||
|
d(&format!("parsed uri {:?}", uri));
|
||||||
|
|
||||||
|
let work = client.get(uri).and_then(|res| {
|
||||||
|
println!("Response: {}", res.status());
|
||||||
|
|
||||||
|
res.body().concat2().and_then(move |body| {
|
||||||
|
let json: SerializedTransactions = serde_json::from_slice(&body).map_err(|e| {
|
||||||
|
std::io::Error::new(std::io::ErrorKind::Other, e)
|
||||||
|
})?;
|
||||||
|
Ok(json)
|
||||||
|
})
|
||||||
|
});
|
||||||
|
|
||||||
|
d(&format!("running..."));
|
||||||
|
|
||||||
|
let transactions_json = core.run(work)?;
|
||||||
|
d(&format!("got transactions: {:?}", &transactions_json.transactions));
|
||||||
|
Ok(transactions_json.transactions)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_chunks(&self, transaction_uuid: &Uuid) -> Result<Vec<Uuid>> {
|
||||||
|
let mut core = Core::new()?;
|
||||||
|
// TODO https://github.com/mozilla/mentat/issues/569
|
||||||
|
// let client = hyper::Client::configure()
|
||||||
|
// .connector(hyper_tls::HttpsConnector::new(4, &core.handle()).unwrap())
|
||||||
|
// .build(&core.handle());
|
||||||
|
let client = hyper::Client::new(&core.handle());
|
||||||
|
|
||||||
|
d(&format!("client"));
|
||||||
|
|
||||||
|
let uri = format!("{}/transactions/{}", self.bound_base_uri(), transaction_uuid);
|
||||||
|
let uri = uri.parse()?;
|
||||||
|
|
||||||
|
d(&format!("parsed uri {:?}", uri));
|
||||||
|
|
||||||
|
let work = client.get(uri).and_then(|res| {
|
||||||
|
println!("Response: {}", res.status());
|
||||||
|
|
||||||
|
res.body().concat2().and_then(move |body| {
|
||||||
|
let json: DeserializableTransaction = serde_json::from_slice(&body).map_err(|e| {
|
||||||
|
std::io::Error::new(std::io::ErrorKind::Other, e)
|
||||||
|
})?;
|
||||||
|
Ok(json)
|
||||||
|
})
|
||||||
|
});
|
||||||
|
|
||||||
|
d(&format!("running..."));
|
||||||
|
|
||||||
|
let transaction_json = core.run(work)?;
|
||||||
|
d(&format!("got transaction chunks: {:?}", &transaction_json.chunks));
|
||||||
|
Ok(transaction_json.chunks)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_chunk(&self, chunk_uuid: &Uuid) -> Result<TxPart> {
|
||||||
|
let mut core = Core::new()?;
|
||||||
|
// TODO https://github.com/mozilla/mentat/issues/569
|
||||||
|
// let client = hyper::Client::configure()
|
||||||
|
// .connector(hyper_tls::HttpsConnector::new(4, &core.handle()).unwrap())
|
||||||
|
// .build(&core.handle());
|
||||||
|
let client = hyper::Client::new(&core.handle());
|
||||||
|
|
||||||
|
d(&format!("client"));
|
||||||
|
|
||||||
|
let uri = format!("{}/chunks/{}", self.bound_base_uri(), chunk_uuid);
|
||||||
|
let uri = uri.parse()?;
|
||||||
|
|
||||||
|
d(&format!("parsed uri {:?}", uri));
|
||||||
|
|
||||||
|
let work = client.get(uri).and_then(|res| {
|
||||||
|
println!("Response: {}", res.status());
|
||||||
|
|
||||||
|
res.body().concat2().and_then(move |body| {
|
||||||
|
let json: TxPart = serde_json::from_slice(&body).map_err(|e| {
|
||||||
|
std::io::Error::new(std::io::ErrorKind::Other, e)
|
||||||
|
})?;
|
||||||
|
Ok(json)
|
||||||
|
})
|
||||||
|
});
|
||||||
|
|
||||||
|
d(&format!("running..."));
|
||||||
|
|
||||||
|
let chunk = core.run(work)?;
|
||||||
|
d(&format!("got transaction chunk: {:?}", &chunk));
|
||||||
|
Ok(chunk)
|
||||||
|
}
|
||||||
|
|
||||||
fn put_transaction(&self, transaction_uuid: &Uuid, parent_uuid: &Uuid, chunks: &Vec<Uuid>) -> Result<()> {
|
fn put_transaction(&self, transaction_uuid: &Uuid, parent_uuid: &Uuid, chunks: &Vec<Uuid>) -> Result<()> {
|
||||||
// {"parent": uuid, "chunks": [chunk1, chunk2...]}
|
// {"parent": uuid, "chunks": [chunk1, chunk2...]}
|
||||||
let transaction = SerializedTransaction {
|
let transaction = SerializedTransaction {
|
||||||
|
|
|
@ -33,6 +33,13 @@ impl TxMapper {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO upsert...? error checking..?
|
||||||
|
pub fn set_tx_uuid(db_tx: &mut rusqlite::Transaction, tx: Entid, uuid: &Uuid) -> Result<()> {
|
||||||
|
let uuid_bytes = uuid.as_bytes().to_vec();
|
||||||
|
db_tx.execute("INSERT INTO tolstoy_tu (tx, uuid) VALUES (?, ?)", &[&tx, &uuid_bytes])?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
// TODO for when we're downloading, right?
|
// TODO for when we're downloading, right?
|
||||||
pub fn get_or_set_uuid_for_tx(db_tx: &mut rusqlite::Transaction, tx: Entid) -> Result<Uuid> {
|
pub fn get_or_set_uuid_for_tx(db_tx: &mut rusqlite::Transaction, tx: Entid) -> Result<Uuid> {
|
||||||
match TxMapper::get(db_tx, tx)? {
|
match TxMapper::get(db_tx, tx)? {
|
||||||
|
@ -92,8 +99,8 @@ pub mod tests {
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_getters() {
|
fn test_getters() {
|
||||||
let mut conn = schema::tests::setup_conn();
|
let mut conn = schema::tests::setup_conn_bare();
|
||||||
let mut tx = conn.transaction().expect("db tx");
|
let mut tx = schema::tests::setup_tx(&mut conn);
|
||||||
assert_eq!(None, TxMapper::get(&mut tx, 1).expect("success"));
|
assert_eq!(None, TxMapper::get(&mut tx, 1).expect("success"));
|
||||||
let set_uuid = TxMapper::get_or_set_uuid_for_tx(&mut tx, 1).expect("success");
|
let set_uuid = TxMapper::get_or_set_uuid_for_tx(&mut tx, 1).expect("success");
|
||||||
assert_eq!(Some(set_uuid), TxMapper::get(&mut tx, 1).expect("success"));
|
assert_eq!(Some(set_uuid), TxMapper::get(&mut tx, 1).expect("success"));
|
||||||
|
@ -101,8 +108,8 @@ pub mod tests {
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_bulk_setter() {
|
fn test_bulk_setter() {
|
||||||
let mut conn = schema::tests::setup_conn();
|
let mut conn = schema::tests::setup_conn_bare();
|
||||||
let mut tx = conn.transaction().expect("db tx");
|
let mut tx = schema::tests::setup_tx(&mut conn);
|
||||||
let mut map = HashMap::new();
|
let mut map = HashMap::new();
|
||||||
|
|
||||||
TxMapper::set_bulk(&mut tx, &map).expect("empty map success");
|
TxMapper::set_bulk(&mut tx, &map).expect("empty map success");
|
||||||
|
|
|
@ -130,14 +130,21 @@ impl Processor {
|
||||||
pub fn process<R>(sqlite: &rusqlite::Transaction, from_tx: Option<Entid>, receiver: &mut R) -> Result<()>
|
pub fn process<R>(sqlite: &rusqlite::Transaction, from_tx: Option<Entid>, receiver: &mut R) -> Result<()>
|
||||||
where R: TxReceiver {
|
where R: TxReceiver {
|
||||||
let tx_filter = match from_tx {
|
let tx_filter = match from_tx {
|
||||||
Some(tx) => format!(" WHERE tx > {} ", tx),
|
Some(tx) => format!("WHERE tx > {}", tx),
|
||||||
None => format!("")
|
None => format!("")
|
||||||
};
|
};
|
||||||
|
// If no 'from_tx' is provided, get everything but skip over the first (bootstrap) transaction.
|
||||||
|
let skip_first_tx = match from_tx {
|
||||||
|
Some(_) => false,
|
||||||
|
None => true
|
||||||
|
};
|
||||||
let select_query = format!("SELECT e, a, v, value_type_tag, tx, added FROM transactions {} ORDER BY tx", tx_filter);
|
let select_query = format!("SELECT e, a, v, value_type_tag, tx, added FROM transactions {} ORDER BY tx", tx_filter);
|
||||||
let mut stmt = sqlite.prepare(&select_query)?;
|
let mut stmt = sqlite.prepare(&select_query)?;
|
||||||
|
|
||||||
let mut rows = stmt.query_and_then(&[], to_tx_part)?.peekable();
|
let mut rows = stmt.query_and_then(&[], to_tx_part)?.peekable();
|
||||||
|
let mut at_first_tx = true;
|
||||||
let mut current_tx = None;
|
let mut current_tx = None;
|
||||||
|
|
||||||
while let Some(row) = rows.next() {
|
while let Some(row) = rows.next() {
|
||||||
let datom = row?;
|
let datom = row?;
|
||||||
|
|
||||||
|
@ -153,6 +160,10 @@ impl Processor {
|
||||||
},
|
},
|
||||||
None => {
|
None => {
|
||||||
current_tx = Some(datom.tx);
|
current_tx = Some(datom.tx);
|
||||||
|
if at_first_tx && skip_first_tx {
|
||||||
|
at_first_tx = false;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
receiver.tx(
|
receiver.tx(
|
||||||
datom.tx,
|
datom.tx,
|
||||||
&mut DatomsIterator::new(&datom, &mut rows)
|
&mut DatomsIterator::new(&datom, &mut rows)
|
||||||
|
|
|
@ -32,11 +32,12 @@ use mentat::{
|
||||||
QueryOutput,
|
QueryOutput,
|
||||||
QueryResults,
|
QueryResults,
|
||||||
Store,
|
Store,
|
||||||
Syncable,
|
|
||||||
TxReport,
|
TxReport,
|
||||||
TypedValue,
|
TypedValue,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
use mentat::sync::Syncable;
|
||||||
|
|
||||||
use command_parser::{
|
use command_parser::{
|
||||||
Command,
|
Command,
|
||||||
HELP_COMMAND,
|
HELP_COMMAND,
|
||||||
|
|
Loading…
Reference in a new issue