Compare commits
1 commit
master
...
grisha/tol
Author | SHA1 | Date | |
---|---|---|---|
|
815eb1fd62 |
4 changed files with 246 additions and 3 deletions
|
@ -15,6 +15,7 @@ use std; // To refer to std::result::Result.
|
|||
use std::collections::BTreeSet;
|
||||
|
||||
use rusqlite;
|
||||
use uuid;
|
||||
|
||||
use edn;
|
||||
|
||||
|
@ -30,6 +31,9 @@ use mentat_query_projector;
|
|||
use mentat_query_pull;
|
||||
use mentat_sql;
|
||||
|
||||
#[cfg(feature = "syncable")]
|
||||
use mentat_tolstoy;
|
||||
|
||||
pub type Result<T> = std::result::Result<T, MentatError>;
|
||||
|
||||
#[macro_export]
|
||||
|
@ -80,6 +84,11 @@ pub enum MentatError {
|
|||
#[fail(display = "provided value of type {} doesn't match attribute value type {}", _0, _1)]
|
||||
ValueTypeMismatch(ValueType, ValueType),
|
||||
|
||||
/// We're just not done yet. Message that the feature is recognized but not yet
|
||||
/// implemented.
|
||||
#[fail(display = "not yet implemented: {}", _0)]
|
||||
NotYetImplemented(String),
|
||||
|
||||
#[fail(display = "{}", _0)]
|
||||
IoError(#[cause] std::io::Error),
|
||||
|
||||
|
@ -103,8 +112,15 @@ pub enum MentatError {
|
|||
#[fail(display = "{}", _0)]
|
||||
PullError(#[cause] mentat_query_pull::PullError),
|
||||
|
||||
#[fail(display = "{}", _0)]
|
||||
UuidError(#[cause] uuid::ParseError),
|
||||
|
||||
#[fail(display = "{}", _0)]
|
||||
SQLError(#[cause] mentat_sql::SQLError),
|
||||
|
||||
#[cfg(feature = "syncable")]
|
||||
#[fail(display = "{}", _0)]
|
||||
TolstoyError(#[cause] mentat_tolstoy::TolstoyError),
|
||||
}
|
||||
|
||||
impl From<std::io::Error> for MentatError {
|
||||
|
@ -154,3 +170,16 @@ impl From<mentat_sql::SQLError> for MentatError {
|
|||
MentatError::SQLError(error)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "syncable")]
|
||||
impl From<mentat_tolstoy::TolstoyError> for MentatError {
|
||||
fn from(error: mentat_tolstoy::TolstoyError) -> MentatError {
|
||||
MentatError::TolstoyError(error)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<uuid::ParseError> for MentatError {
|
||||
fn from(error: uuid::ParseError) -> MentatError {
|
||||
MentatError::UuidError(error)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -234,7 +234,7 @@ impl Pullable for Store {
|
|||
}
|
||||
|
||||
impl Syncable for Store {
|
||||
fn sync(&mut self, server_uri: &String, user_uuid: &String) -> ::std::result::Result<(), ::failure::Error> {
|
||||
fn sync(&mut self, server_uri: &String, user_uuid: &String) -> ::std::result::Result<(), ::errors::MentatError> {
|
||||
let uuid = Uuid::parse_str(&user_uuid).map_err(|_| MentatError::BadUuid(user_uuid.clone()))?;
|
||||
Ok(Syncer::flow(&mut self.sqlite, server_uri, &uuid)?)
|
||||
}
|
||||
|
|
142
src/sync.rs
Normal file
142
src/sync.rs
Normal file
|
@ -0,0 +1,142 @@
|
|||
// Copyright 2018 Mozilla
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
|
||||
// this file except in compliance with the License. You may obtain a copy of the
|
||||
// License at http://www.apache.org/licenses/LICENSE-2.0
|
||||
// Unless required by applicable law or agreed to in writing, software distributed
|
||||
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
|
||||
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations under the License.
|
||||
|
||||
use uuid::Uuid;
|
||||
|
||||
use rusqlite;
|
||||
|
||||
use conn::{
|
||||
Conn,
|
||||
InProgress,
|
||||
};
|
||||
|
||||
use errors::{
|
||||
Result,
|
||||
};
|
||||
|
||||
use mentat_core::{
|
||||
KnownEntid,
|
||||
};
|
||||
use mentat_db::{
|
||||
renumber,
|
||||
PartitionMap,
|
||||
};
|
||||
|
||||
use entity_builder::{
|
||||
BuildTerms,
|
||||
TermBuilder,
|
||||
};
|
||||
|
||||
use mentat_tolstoy::{
|
||||
Syncer,
|
||||
SyncMetadataClient,
|
||||
SyncResult,
|
||||
Tx,
|
||||
TxMapper,
|
||||
TolstoyError,
|
||||
};
|
||||
|
||||
pub trait Syncable {
|
||||
fn sync(&mut self, server_uri: &String, user_uuid: &String) -> ::std::result::Result<(), ::errors::MentatError>;
|
||||
}
|
||||
|
||||
fn fast_forward_local<'a, 'c>(in_progress: &mut InProgress<'a, 'c>, txs: Vec<Tx>) -> Result<Option<PartitionMap>> {
|
||||
let mut last_tx = None;
|
||||
|
||||
// During fast-forwarding, we will insert datoms with known entids
|
||||
// which, by definition, fall outside of our user partition.
|
||||
// Once we've done with insertion, we need to ensure that user
|
||||
// partition's next allocation will not overlap with just-inserted datoms.
|
||||
// To allow for "holes" in the user partition (due to data excision),
|
||||
// we track the highest incoming entid we saw, and expand our
|
||||
// local partition to match.
|
||||
// In absence of excision and implementation bugs, this should work
|
||||
// just as if we counted number of incoming entids and expanded by
|
||||
// that number instead.
|
||||
let mut last_encountered_partition_map = None;
|
||||
|
||||
for tx in txs {
|
||||
let mut builder = TermBuilder::new();
|
||||
|
||||
last_encountered_partition_map = match tx.parts[0].partitions.clone() {
|
||||
Some(parts) => Some(parts),
|
||||
None => bail!(TolstoyError::BadServerState("Missing partition map in incoming transaction".to_string()))
|
||||
};
|
||||
|
||||
for part in tx.parts {
|
||||
if part.added {
|
||||
builder.add(KnownEntid(part.e), KnownEntid(part.a), part.v)?;
|
||||
} else {
|
||||
builder.retract(KnownEntid(part.e), KnownEntid(part.a), part.v)?;
|
||||
}
|
||||
}
|
||||
|
||||
let report = in_progress.transact_builder(builder)?;
|
||||
|
||||
last_tx = Some((report.tx_id, tx.tx.clone()));
|
||||
}
|
||||
|
||||
// We've just transacted a new tx, and generated a new tx entid. Map it to the corresponding
|
||||
// incoming tx uuid, advance our "locally known remote head".
|
||||
if let Some((entid, uuid)) = last_tx {
|
||||
SyncMetadataClient::set_remote_head(&mut in_progress.transaction, &uuid)?;
|
||||
TxMapper::set_tx_uuid(&mut in_progress.transaction, entid, &uuid)?;
|
||||
}
|
||||
|
||||
Ok(last_encountered_partition_map)
|
||||
}
|
||||
|
||||
impl Conn {
|
||||
pub(crate) fn sync(&mut self,
|
||||
sqlite: &mut rusqlite::Connection,
|
||||
server_uri: &String, user_uuid: &String) -> ::std::result::Result<(), ::errors::MentatError> {
|
||||
let uuid = Uuid::parse_str(&user_uuid)?;
|
||||
|
||||
// Take an IMMEDIATE transaction right away. We have an SQL transaction, and complete
|
||||
// control over the `Conn` metadata at this point, just like `transact()`.
|
||||
let mut in_progress = self.begin_transaction(sqlite)?;
|
||||
|
||||
let sync_result = Syncer::flow(&mut in_progress.transaction, server_uri, &uuid)?;
|
||||
let mut incoming_partition = None;
|
||||
|
||||
match sync_result {
|
||||
SyncResult::EmptyServer => (),
|
||||
SyncResult::NoChanges => (),
|
||||
SyncResult::ServerFastForward => (),
|
||||
SyncResult::Merge => bail!(TolstoyError::NotYetImplemented(
|
||||
format!("Can't sync against diverged local.")
|
||||
)),
|
||||
SyncResult::LocalFastForward(txs) => {
|
||||
incoming_partition = fast_forward_local(&mut in_progress, txs)?;
|
||||
()
|
||||
},
|
||||
SyncResult::BadServerState => bail!(TolstoyError::NotYetImplemented(
|
||||
format!("Bad server state.")
|
||||
)),
|
||||
SyncResult::AdoptedRemoteOnFirstSync => (),
|
||||
SyncResult::IncompatibleBootstrapSchema => bail!(TolstoyError::NotYetImplemented(
|
||||
format!("IncompatibleBootstrapSchema.")
|
||||
)),
|
||||
}
|
||||
|
||||
match incoming_partition {
|
||||
Some(incoming) => {
|
||||
let root = SyncMetadataClient::get_partitions(&in_progress.transaction, true)?;
|
||||
let current = SyncMetadataClient::get_partitions(&in_progress.transaction, false)?;
|
||||
let updated_db = renumber(&in_progress.transaction, &root, ¤t, &incoming)?;
|
||||
in_progress.partition_map = updated_db.partition_map;
|
||||
()
|
||||
},
|
||||
None => ()
|
||||
}
|
||||
|
||||
in_progress.commit()
|
||||
}
|
||||
}
|
|
@ -10,7 +10,13 @@
|
|||
|
||||
#![allow(dead_code)]
|
||||
|
||||
use failure::Error;
|
||||
use std;
|
||||
use rusqlite;
|
||||
use uuid;
|
||||
use hyper;
|
||||
use serde_json;
|
||||
|
||||
use mentat_db;
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! bail {
|
||||
|
@ -19,7 +25,7 @@ macro_rules! bail {
|
|||
)
|
||||
}
|
||||
|
||||
pub type Result<T> = ::std::result::Result<T, Error>;
|
||||
pub type Result<T> = ::std::result::Result<T, TolstoyError>;
|
||||
|
||||
#[derive(Debug, Fail)]
|
||||
pub enum TolstoyError {
|
||||
|
@ -40,4 +46,70 @@ pub enum TolstoyError {
|
|||
|
||||
#[fail(display = "not yet implemented: {}", _0)]
|
||||
NotYetImplemented(String),
|
||||
|
||||
#[fail(display = "{}", _0)]
|
||||
DbError(#[cause] mentat_db::DbError),
|
||||
|
||||
#[fail(display = "{}", _0)]
|
||||
SerializationError(#[cause] serde_json::Error),
|
||||
|
||||
// It would be better to capture the underlying `rusqlite::Error`, but that type doesn't
|
||||
// implement many useful traits, including `Clone`, `Eq`, and `PartialEq`.
|
||||
#[fail(display = "SQL error: _0")]
|
||||
RusqliteError(String),
|
||||
|
||||
#[fail(display = "{}", _0)]
|
||||
IoError(#[cause] std::io::Error),
|
||||
|
||||
#[fail(display = "{}", _0)]
|
||||
UuidError(#[cause] uuid::ParseError),
|
||||
|
||||
#[fail(display = "{}", _0)]
|
||||
NetworkError(#[cause] hyper::Error),
|
||||
|
||||
#[fail(display = "{}", _0)]
|
||||
UriError(#[cause] hyper::error::UriError),
|
||||
}
|
||||
|
||||
impl From<mentat_db::DbError> for TolstoyError {
|
||||
fn from(error: mentat_db::DbError) -> TolstoyError {
|
||||
TolstoyError::DbError(error)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<serde_json::Error> for TolstoyError {
|
||||
fn from(error: serde_json::Error) -> TolstoyError {
|
||||
TolstoyError::SerializationError(error)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<rusqlite::Error> for TolstoyError {
|
||||
fn from(error: rusqlite::Error) -> TolstoyError {
|
||||
TolstoyError::RusqliteError(error.to_string())
|
||||
}
|
||||
}
|
||||
|
||||
impl From<std::io::Error> for TolstoyError {
|
||||
fn from(error: std::io::Error) -> TolstoyError {
|
||||
TolstoyError::IoError(error)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<uuid::ParseError> for TolstoyError {
|
||||
fn from(error: uuid::ParseError) -> TolstoyError {
|
||||
TolstoyError::UuidError(error)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<hyper::Error> for TolstoyError {
|
||||
fn from(error: hyper::Error) -> TolstoyError {
|
||||
TolstoyError::NetworkError(error)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<hyper::error::UriError> for TolstoyError {
|
||||
fn from(error: hyper::error::UriError) -> TolstoyError {
|
||||
TolstoyError::UriError(error)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue