Compare commits
1 commit
master
...
grisha/tol
Author | SHA1 | Date | |
---|---|---|---|
|
815eb1fd62 |
4 changed files with 246 additions and 3 deletions
|
@ -15,6 +15,7 @@ use std; // To refer to std::result::Result.
|
||||||
use std::collections::BTreeSet;
|
use std::collections::BTreeSet;
|
||||||
|
|
||||||
use rusqlite;
|
use rusqlite;
|
||||||
|
use uuid;
|
||||||
|
|
||||||
use edn;
|
use edn;
|
||||||
|
|
||||||
|
@ -30,6 +31,9 @@ use mentat_query_projector;
|
||||||
use mentat_query_pull;
|
use mentat_query_pull;
|
||||||
use mentat_sql;
|
use mentat_sql;
|
||||||
|
|
||||||
|
#[cfg(feature = "syncable")]
|
||||||
|
use mentat_tolstoy;
|
||||||
|
|
||||||
pub type Result<T> = std::result::Result<T, MentatError>;
|
pub type Result<T> = std::result::Result<T, MentatError>;
|
||||||
|
|
||||||
#[macro_export]
|
#[macro_export]
|
||||||
|
@ -80,6 +84,11 @@ pub enum MentatError {
|
||||||
#[fail(display = "provided value of type {} doesn't match attribute value type {}", _0, _1)]
|
#[fail(display = "provided value of type {} doesn't match attribute value type {}", _0, _1)]
|
||||||
ValueTypeMismatch(ValueType, ValueType),
|
ValueTypeMismatch(ValueType, ValueType),
|
||||||
|
|
||||||
|
/// We're just not done yet. Message that the feature is recognized but not yet
|
||||||
|
/// implemented.
|
||||||
|
#[fail(display = "not yet implemented: {}", _0)]
|
||||||
|
NotYetImplemented(String),
|
||||||
|
|
||||||
#[fail(display = "{}", _0)]
|
#[fail(display = "{}", _0)]
|
||||||
IoError(#[cause] std::io::Error),
|
IoError(#[cause] std::io::Error),
|
||||||
|
|
||||||
|
@ -103,8 +112,15 @@ pub enum MentatError {
|
||||||
#[fail(display = "{}", _0)]
|
#[fail(display = "{}", _0)]
|
||||||
PullError(#[cause] mentat_query_pull::PullError),
|
PullError(#[cause] mentat_query_pull::PullError),
|
||||||
|
|
||||||
|
#[fail(display = "{}", _0)]
|
||||||
|
UuidError(#[cause] uuid::ParseError),
|
||||||
|
|
||||||
#[fail(display = "{}", _0)]
|
#[fail(display = "{}", _0)]
|
||||||
SQLError(#[cause] mentat_sql::SQLError),
|
SQLError(#[cause] mentat_sql::SQLError),
|
||||||
|
|
||||||
|
#[cfg(feature = "syncable")]
|
||||||
|
#[fail(display = "{}", _0)]
|
||||||
|
TolstoyError(#[cause] mentat_tolstoy::TolstoyError),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<std::io::Error> for MentatError {
|
impl From<std::io::Error> for MentatError {
|
||||||
|
@ -154,3 +170,16 @@ impl From<mentat_sql::SQLError> for MentatError {
|
||||||
MentatError::SQLError(error)
|
MentatError::SQLError(error)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "syncable")]
|
||||||
|
impl From<mentat_tolstoy::TolstoyError> for MentatError {
|
||||||
|
fn from(error: mentat_tolstoy::TolstoyError) -> MentatError {
|
||||||
|
MentatError::TolstoyError(error)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<uuid::ParseError> for MentatError {
|
||||||
|
fn from(error: uuid::ParseError) -> MentatError {
|
||||||
|
MentatError::UuidError(error)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -234,7 +234,7 @@ impl Pullable for Store {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Syncable for Store {
|
impl Syncable for Store {
|
||||||
fn sync(&mut self, server_uri: &String, user_uuid: &String) -> ::std::result::Result<(), ::failure::Error> {
|
fn sync(&mut self, server_uri: &String, user_uuid: &String) -> ::std::result::Result<(), ::errors::MentatError> {
|
||||||
let uuid = Uuid::parse_str(&user_uuid).map_err(|_| MentatError::BadUuid(user_uuid.clone()))?;
|
let uuid = Uuid::parse_str(&user_uuid).map_err(|_| MentatError::BadUuid(user_uuid.clone()))?;
|
||||||
Ok(Syncer::flow(&mut self.sqlite, server_uri, &uuid)?)
|
Ok(Syncer::flow(&mut self.sqlite, server_uri, &uuid)?)
|
||||||
}
|
}
|
||||||
|
|
142
src/sync.rs
Normal file
142
src/sync.rs
Normal file
|
@ -0,0 +1,142 @@
|
||||||
|
// Copyright 2018 Mozilla
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
|
||||||
|
// this file except in compliance with the License. You may obtain a copy of the
|
||||||
|
// License at http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
// Unless required by applicable law or agreed to in writing, software distributed
|
||||||
|
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
|
||||||
|
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
|
||||||
|
// specific language governing permissions and limitations under the License.
|
||||||
|
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
use rusqlite;
|
||||||
|
|
||||||
|
use conn::{
|
||||||
|
Conn,
|
||||||
|
InProgress,
|
||||||
|
};
|
||||||
|
|
||||||
|
use errors::{
|
||||||
|
Result,
|
||||||
|
};
|
||||||
|
|
||||||
|
use mentat_core::{
|
||||||
|
KnownEntid,
|
||||||
|
};
|
||||||
|
use mentat_db::{
|
||||||
|
renumber,
|
||||||
|
PartitionMap,
|
||||||
|
};
|
||||||
|
|
||||||
|
use entity_builder::{
|
||||||
|
BuildTerms,
|
||||||
|
TermBuilder,
|
||||||
|
};
|
||||||
|
|
||||||
|
use mentat_tolstoy::{
|
||||||
|
Syncer,
|
||||||
|
SyncMetadataClient,
|
||||||
|
SyncResult,
|
||||||
|
Tx,
|
||||||
|
TxMapper,
|
||||||
|
TolstoyError,
|
||||||
|
};
|
||||||
|
|
||||||
|
pub trait Syncable {
|
||||||
|
fn sync(&mut self, server_uri: &String, user_uuid: &String) -> ::std::result::Result<(), ::errors::MentatError>;
|
||||||
|
}
|
||||||
|
|
||||||
|
fn fast_forward_local<'a, 'c>(in_progress: &mut InProgress<'a, 'c>, txs: Vec<Tx>) -> Result<Option<PartitionMap>> {
|
||||||
|
let mut last_tx = None;
|
||||||
|
|
||||||
|
// During fast-forwarding, we will insert datoms with known entids
|
||||||
|
// which, by definition, fall outside of our user partition.
|
||||||
|
// Once we've done with insertion, we need to ensure that user
|
||||||
|
// partition's next allocation will not overlap with just-inserted datoms.
|
||||||
|
// To allow for "holes" in the user partition (due to data excision),
|
||||||
|
// we track the highest incoming entid we saw, and expand our
|
||||||
|
// local partition to match.
|
||||||
|
// In absence of excision and implementation bugs, this should work
|
||||||
|
// just as if we counted number of incoming entids and expanded by
|
||||||
|
// that number instead.
|
||||||
|
let mut last_encountered_partition_map = None;
|
||||||
|
|
||||||
|
for tx in txs {
|
||||||
|
let mut builder = TermBuilder::new();
|
||||||
|
|
||||||
|
last_encountered_partition_map = match tx.parts[0].partitions.clone() {
|
||||||
|
Some(parts) => Some(parts),
|
||||||
|
None => bail!(TolstoyError::BadServerState("Missing partition map in incoming transaction".to_string()))
|
||||||
|
};
|
||||||
|
|
||||||
|
for part in tx.parts {
|
||||||
|
if part.added {
|
||||||
|
builder.add(KnownEntid(part.e), KnownEntid(part.a), part.v)?;
|
||||||
|
} else {
|
||||||
|
builder.retract(KnownEntid(part.e), KnownEntid(part.a), part.v)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let report = in_progress.transact_builder(builder)?;
|
||||||
|
|
||||||
|
last_tx = Some((report.tx_id, tx.tx.clone()));
|
||||||
|
}
|
||||||
|
|
||||||
|
// We've just transacted a new tx, and generated a new tx entid. Map it to the corresponding
|
||||||
|
// incoming tx uuid, advance our "locally known remote head".
|
||||||
|
if let Some((entid, uuid)) = last_tx {
|
||||||
|
SyncMetadataClient::set_remote_head(&mut in_progress.transaction, &uuid)?;
|
||||||
|
TxMapper::set_tx_uuid(&mut in_progress.transaction, entid, &uuid)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(last_encountered_partition_map)
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Conn {
|
||||||
|
pub(crate) fn sync(&mut self,
|
||||||
|
sqlite: &mut rusqlite::Connection,
|
||||||
|
server_uri: &String, user_uuid: &String) -> ::std::result::Result<(), ::errors::MentatError> {
|
||||||
|
let uuid = Uuid::parse_str(&user_uuid)?;
|
||||||
|
|
||||||
|
// Take an IMMEDIATE transaction right away. We have an SQL transaction, and complete
|
||||||
|
// control over the `Conn` metadata at this point, just like `transact()`.
|
||||||
|
let mut in_progress = self.begin_transaction(sqlite)?;
|
||||||
|
|
||||||
|
let sync_result = Syncer::flow(&mut in_progress.transaction, server_uri, &uuid)?;
|
||||||
|
let mut incoming_partition = None;
|
||||||
|
|
||||||
|
match sync_result {
|
||||||
|
SyncResult::EmptyServer => (),
|
||||||
|
SyncResult::NoChanges => (),
|
||||||
|
SyncResult::ServerFastForward => (),
|
||||||
|
SyncResult::Merge => bail!(TolstoyError::NotYetImplemented(
|
||||||
|
format!("Can't sync against diverged local.")
|
||||||
|
)),
|
||||||
|
SyncResult::LocalFastForward(txs) => {
|
||||||
|
incoming_partition = fast_forward_local(&mut in_progress, txs)?;
|
||||||
|
()
|
||||||
|
},
|
||||||
|
SyncResult::BadServerState => bail!(TolstoyError::NotYetImplemented(
|
||||||
|
format!("Bad server state.")
|
||||||
|
)),
|
||||||
|
SyncResult::AdoptedRemoteOnFirstSync => (),
|
||||||
|
SyncResult::IncompatibleBootstrapSchema => bail!(TolstoyError::NotYetImplemented(
|
||||||
|
format!("IncompatibleBootstrapSchema.")
|
||||||
|
)),
|
||||||
|
}
|
||||||
|
|
||||||
|
match incoming_partition {
|
||||||
|
Some(incoming) => {
|
||||||
|
let root = SyncMetadataClient::get_partitions(&in_progress.transaction, true)?;
|
||||||
|
let current = SyncMetadataClient::get_partitions(&in_progress.transaction, false)?;
|
||||||
|
let updated_db = renumber(&in_progress.transaction, &root, ¤t, &incoming)?;
|
||||||
|
in_progress.partition_map = updated_db.partition_map;
|
||||||
|
()
|
||||||
|
},
|
||||||
|
None => ()
|
||||||
|
}
|
||||||
|
|
||||||
|
in_progress.commit()
|
||||||
|
}
|
||||||
|
}
|
|
@ -10,7 +10,13 @@
|
||||||
|
|
||||||
#![allow(dead_code)]
|
#![allow(dead_code)]
|
||||||
|
|
||||||
use failure::Error;
|
use std;
|
||||||
|
use rusqlite;
|
||||||
|
use uuid;
|
||||||
|
use hyper;
|
||||||
|
use serde_json;
|
||||||
|
|
||||||
|
use mentat_db;
|
||||||
|
|
||||||
#[macro_export]
|
#[macro_export]
|
||||||
macro_rules! bail {
|
macro_rules! bail {
|
||||||
|
@ -19,7 +25,7 @@ macro_rules! bail {
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub type Result<T> = ::std::result::Result<T, Error>;
|
pub type Result<T> = ::std::result::Result<T, TolstoyError>;
|
||||||
|
|
||||||
#[derive(Debug, Fail)]
|
#[derive(Debug, Fail)]
|
||||||
pub enum TolstoyError {
|
pub enum TolstoyError {
|
||||||
|
@ -40,4 +46,70 @@ pub enum TolstoyError {
|
||||||
|
|
||||||
#[fail(display = "not yet implemented: {}", _0)]
|
#[fail(display = "not yet implemented: {}", _0)]
|
||||||
NotYetImplemented(String),
|
NotYetImplemented(String),
|
||||||
|
|
||||||
|
#[fail(display = "{}", _0)]
|
||||||
|
DbError(#[cause] mentat_db::DbError),
|
||||||
|
|
||||||
|
#[fail(display = "{}", _0)]
|
||||||
|
SerializationError(#[cause] serde_json::Error),
|
||||||
|
|
||||||
|
// It would be better to capture the underlying `rusqlite::Error`, but that type doesn't
|
||||||
|
// implement many useful traits, including `Clone`, `Eq`, and `PartialEq`.
|
||||||
|
#[fail(display = "SQL error: _0")]
|
||||||
|
RusqliteError(String),
|
||||||
|
|
||||||
|
#[fail(display = "{}", _0)]
|
||||||
|
IoError(#[cause] std::io::Error),
|
||||||
|
|
||||||
|
#[fail(display = "{}", _0)]
|
||||||
|
UuidError(#[cause] uuid::ParseError),
|
||||||
|
|
||||||
|
#[fail(display = "{}", _0)]
|
||||||
|
NetworkError(#[cause] hyper::Error),
|
||||||
|
|
||||||
|
#[fail(display = "{}", _0)]
|
||||||
|
UriError(#[cause] hyper::error::UriError),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl From<mentat_db::DbError> for TolstoyError {
|
||||||
|
fn from(error: mentat_db::DbError) -> TolstoyError {
|
||||||
|
TolstoyError::DbError(error)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<serde_json::Error> for TolstoyError {
|
||||||
|
fn from(error: serde_json::Error) -> TolstoyError {
|
||||||
|
TolstoyError::SerializationError(error)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<rusqlite::Error> for TolstoyError {
|
||||||
|
fn from(error: rusqlite::Error) -> TolstoyError {
|
||||||
|
TolstoyError::RusqliteError(error.to_string())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<std::io::Error> for TolstoyError {
|
||||||
|
fn from(error: std::io::Error) -> TolstoyError {
|
||||||
|
TolstoyError::IoError(error)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<uuid::ParseError> for TolstoyError {
|
||||||
|
fn from(error: uuid::ParseError) -> TolstoyError {
|
||||||
|
TolstoyError::UuidError(error)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<hyper::Error> for TolstoyError {
|
||||||
|
fn from(error: hyper::Error) -> TolstoyError {
|
||||||
|
TolstoyError::NetworkError(error)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<hyper::error::UriError> for TolstoyError {
|
||||||
|
fn from(error: hyper::error::UriError) -> TolstoyError {
|
||||||
|
TolstoyError::UriError(error)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue