diff --git a/src/errors.rs b/src/errors.rs index 337aad52..74be29e9 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -15,6 +15,7 @@ use std; // To refer to std::result::Result. use std::collections::BTreeSet; use rusqlite; +use uuid; use edn; @@ -30,6 +31,9 @@ use mentat_query_projector; use mentat_query_pull; use mentat_sql; +#[cfg(feature = "syncable")] +use mentat_tolstoy; + pub type Result = std::result::Result; #[macro_export] @@ -80,6 +84,11 @@ pub enum MentatError { #[fail(display = "provided value of type {} doesn't match attribute value type {}", _0, _1)] ValueTypeMismatch(ValueType, ValueType), + /// We're just not done yet. Message that the feature is recognized but not yet + /// implemented. + #[fail(display = "not yet implemented: {}", _0)] + NotYetImplemented(String), + #[fail(display = "{}", _0)] IoError(#[cause] std::io::Error), @@ -103,8 +112,15 @@ pub enum MentatError { #[fail(display = "{}", _0)] PullError(#[cause] mentat_query_pull::PullError), + #[fail(display = "{}", _0)] + UuidError(#[cause] uuid::ParseError), + #[fail(display = "{}", _0)] SQLError(#[cause] mentat_sql::SQLError), + + #[cfg(feature = "syncable")] + #[fail(display = "{}", _0)] + TolstoyError(#[cause] mentat_tolstoy::TolstoyError), } impl From for MentatError { @@ -154,3 +170,16 @@ impl From for MentatError { MentatError::SQLError(error) } } + +#[cfg(feature = "syncable")] +impl From for MentatError { + fn from(error: mentat_tolstoy::TolstoyError) -> MentatError { + MentatError::TolstoyError(error) + } +} + +impl From for MentatError { + fn from(error: uuid::ParseError) -> MentatError { + MentatError::UuidError(error) + } +} diff --git a/src/store.rs b/src/store.rs index 52a37d5c..437b7db4 100644 --- a/src/store.rs +++ b/src/store.rs @@ -234,7 +234,7 @@ impl Pullable for Store { } impl Syncable for Store { - fn sync(&mut self, server_uri: &String, user_uuid: &String) -> ::std::result::Result<(), ::failure::Error> { + fn sync(&mut self, server_uri: &String, user_uuid: &String) -> ::std::result::Result<(), ::errors::MentatError> { let uuid = Uuid::parse_str(&user_uuid).map_err(|_| MentatError::BadUuid(user_uuid.clone()))?; Ok(Syncer::flow(&mut self.sqlite, server_uri, &uuid)?) } diff --git a/src/sync.rs b/src/sync.rs new file mode 100644 index 00000000..612ced28 --- /dev/null +++ b/src/sync.rs @@ -0,0 +1,142 @@ +// Copyright 2018 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +use uuid::Uuid; + +use rusqlite; + +use conn::{ + Conn, + InProgress, +}; + +use errors::{ + Result, +}; + +use mentat_core::{ + KnownEntid, +}; +use mentat_db::{ + renumber, + PartitionMap, +}; + +use entity_builder::{ + BuildTerms, + TermBuilder, +}; + +use mentat_tolstoy::{ + Syncer, + SyncMetadataClient, + SyncResult, + Tx, + TxMapper, + TolstoyError, +}; + +pub trait Syncable { + fn sync(&mut self, server_uri: &String, user_uuid: &String) -> ::std::result::Result<(), ::errors::MentatError>; +} + +fn fast_forward_local<'a, 'c>(in_progress: &mut InProgress<'a, 'c>, txs: Vec) -> Result> { + let mut last_tx = None; + + // During fast-forwarding, we will insert datoms with known entids + // which, by definition, fall outside of our user partition. + // Once we've done with insertion, we need to ensure that user + // partition's next allocation will not overlap with just-inserted datoms. + // To allow for "holes" in the user partition (due to data excision), + // we track the highest incoming entid we saw, and expand our + // local partition to match. + // In absence of excision and implementation bugs, this should work + // just as if we counted number of incoming entids and expanded by + // that number instead. + let mut last_encountered_partition_map = None; + + for tx in txs { + let mut builder = TermBuilder::new(); + + last_encountered_partition_map = match tx.parts[0].partitions.clone() { + Some(parts) => Some(parts), + None => bail!(TolstoyError::BadServerState("Missing partition map in incoming transaction".to_string())) + }; + + for part in tx.parts { + if part.added { + builder.add(KnownEntid(part.e), KnownEntid(part.a), part.v)?; + } else { + builder.retract(KnownEntid(part.e), KnownEntid(part.a), part.v)?; + } + } + + let report = in_progress.transact_builder(builder)?; + + last_tx = Some((report.tx_id, tx.tx.clone())); + } + + // We've just transacted a new tx, and generated a new tx entid. Map it to the corresponding + // incoming tx uuid, advance our "locally known remote head". + if let Some((entid, uuid)) = last_tx { + SyncMetadataClient::set_remote_head(&mut in_progress.transaction, &uuid)?; + TxMapper::set_tx_uuid(&mut in_progress.transaction, entid, &uuid)?; + } + + Ok(last_encountered_partition_map) +} + +impl Conn { + pub(crate) fn sync(&mut self, + sqlite: &mut rusqlite::Connection, + server_uri: &String, user_uuid: &String) -> ::std::result::Result<(), ::errors::MentatError> { + let uuid = Uuid::parse_str(&user_uuid)?; + + // Take an IMMEDIATE transaction right away. We have an SQL transaction, and complete + // control over the `Conn` metadata at this point, just like `transact()`. + let mut in_progress = self.begin_transaction(sqlite)?; + + let sync_result = Syncer::flow(&mut in_progress.transaction, server_uri, &uuid)?; + let mut incoming_partition = None; + + match sync_result { + SyncResult::EmptyServer => (), + SyncResult::NoChanges => (), + SyncResult::ServerFastForward => (), + SyncResult::Merge => bail!(TolstoyError::NotYetImplemented( + format!("Can't sync against diverged local.") + )), + SyncResult::LocalFastForward(txs) => { + incoming_partition = fast_forward_local(&mut in_progress, txs)?; + () + }, + SyncResult::BadServerState => bail!(TolstoyError::NotYetImplemented( + format!("Bad server state.") + )), + SyncResult::AdoptedRemoteOnFirstSync => (), + SyncResult::IncompatibleBootstrapSchema => bail!(TolstoyError::NotYetImplemented( + format!("IncompatibleBootstrapSchema.") + )), + } + + match incoming_partition { + Some(incoming) => { + let root = SyncMetadataClient::get_partitions(&in_progress.transaction, true)?; + let current = SyncMetadataClient::get_partitions(&in_progress.transaction, false)?; + let updated_db = renumber(&in_progress.transaction, &root, ¤t, &incoming)?; + in_progress.partition_map = updated_db.partition_map; + () + }, + None => () + } + + in_progress.commit() + } +} diff --git a/tolstoy/src/errors.rs b/tolstoy/src/errors.rs index 3b0a2e3c..5c788aba 100644 --- a/tolstoy/src/errors.rs +++ b/tolstoy/src/errors.rs @@ -10,7 +10,13 @@ #![allow(dead_code)] -use failure::Error; +use std; +use rusqlite; +use uuid; +use hyper; +use serde_json; + +use mentat_db; #[macro_export] macro_rules! bail { @@ -19,7 +25,7 @@ macro_rules! bail { ) } -pub type Result = ::std::result::Result; +pub type Result = ::std::result::Result; #[derive(Debug, Fail)] pub enum TolstoyError { @@ -40,4 +46,70 @@ pub enum TolstoyError { #[fail(display = "not yet implemented: {}", _0)] NotYetImplemented(String), + + #[fail(display = "{}", _0)] + DbError(#[cause] mentat_db::DbError), + + #[fail(display = "{}", _0)] + SerializationError(#[cause] serde_json::Error), + + // It would be better to capture the underlying `rusqlite::Error`, but that type doesn't + // implement many useful traits, including `Clone`, `Eq`, and `PartialEq`. + #[fail(display = "SQL error: _0")] + RusqliteError(String), + + #[fail(display = "{}", _0)] + IoError(#[cause] std::io::Error), + + #[fail(display = "{}", _0)] + UuidError(#[cause] uuid::ParseError), + + #[fail(display = "{}", _0)] + NetworkError(#[cause] hyper::Error), + + #[fail(display = "{}", _0)] + UriError(#[cause] hyper::error::UriError), } + +impl From for TolstoyError { + fn from(error: mentat_db::DbError) -> TolstoyError { + TolstoyError::DbError(error) + } +} + +impl From for TolstoyError { + fn from(error: serde_json::Error) -> TolstoyError { + TolstoyError::SerializationError(error) + } +} + +impl From for TolstoyError { + fn from(error: rusqlite::Error) -> TolstoyError { + TolstoyError::RusqliteError(error.to_string()) + } +} + +impl From for TolstoyError { + fn from(error: std::io::Error) -> TolstoyError { + TolstoyError::IoError(error) + } +} + +impl From for TolstoyError { + fn from(error: uuid::ParseError) -> TolstoyError { + TolstoyError::UuidError(error) + } +} + +impl From for TolstoyError { + fn from(error: hyper::Error) -> TolstoyError { + TolstoyError::NetworkError(error) + } +} + +impl From for TolstoyError { + fn from(error: hyper::error::UriError) -> TolstoyError { + TolstoyError::UriError(error) + } +} +