Basic sync support (#563) r=nalexander

* Pre: remove remnants of 'open_empty'

* Pre: Cleanup 'datoms' table after a timeline move

Since timeline move operations use a transactor, they generate a
"phantom" 'tx' and a 'txInstant' assertion. It is "phantom" in a sense
that it was never present in the 'transactions' table, and is entirely
synthetic as far as our database is concerned.
It's an implementational artifact, and we were not cleaning it up.

It becomes a problem when we start inserting transactions after a move.
Once the transactor clashes with the phantom 'tx', it will retract the
phantom 'txInstant' value, leaving the transactions log in an incorrect state.

This patch adds a test for this scenario and elects the easy way out: simply
remove the offending 'txInstant' datom.

* Part 1: Sync without support for side-effects

A "side-effect" is defined here as a mutation of a remote state as part
of the sync.

If, during a sync we determine that a remote state needs to be changed, bail out.

This generally supports different variations of "baton-passing" syncing, where clients
will succeed syncing if each change is non-conflicting.

* Part 2: Support basic "side-effects" syncing

This patch introduces a concept of a follow-up sync. If a sync generated
a "merge transaction" (a regular transaction that contains assertions
necessary for local and remote transaction logs to converge), then
this transaction needs to be uploaded in a follow-up sync.

Generated SyncReport indicates if a follow-up sync is required.

Follow-up sync itself is just a regular sync. If remote state did not change,
it will result in a simple RemoteFastForward. Otherwise, we'll continue
merging and requesting a follow-up.

Schema alterations are explicitly not supported.

As local transactions are rebased on top of remote, following changes happen:
- entids are changed into tempids, letting transactor upsert :db/unique values
- entids for retractions are changed into lookup-refs if we're confident they'll succeed
-- otherwise, retractions are dropped on the floor

* Post: use a macro for more readable tests

* Tolstoy README
This commit is contained in:
Grisha Kruglov 2018-09-07 19:18:20 -07:00 committed by GitHub
parent 64821079c2
commit b22b29679b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
30 changed files with 3758 additions and 599 deletions

View file

@ -32,6 +32,7 @@ chrono = "0.4"
failure = "0.1.1"
lazy_static = "0.2"
time = "0.1"
log = "0.4"
uuid = { version = "0.5", features = ["v4", "serde"] }
[dependencies.rusqlite]

View file

@ -98,6 +98,11 @@ fn move_transactions_to(conn: &rusqlite::Connection, tx_ids: &[Entid], new_timel
Ok(())
}
fn remove_tx_from_datoms(conn: &rusqlite::Connection, tx_id: Entid) -> Result<()> {
conn.execute("DELETE FROM datoms WHERE e = ?", &[&tx_id])?;
Ok(())
}
fn is_timeline_empty(conn: &rusqlite::Connection, timeline: Entid) -> Result<bool> {
let mut stmt = conn.prepare("SELECT timeline FROM timelined_transactions WHERE timeline = ? GROUP BY timeline")?;
let rows = stmt.query_and_then(&[&timeline], |row| -> Result<i64> {
@ -152,11 +157,22 @@ pub fn move_from_main_timeline(conn: &rusqlite::Connection, schema: &Schema,
let reversed_terms = reversed_terms_for(conn, *tx_id)?;
// Rewind schema and datoms.
let (_, _, new_schema, _) = transact_terms_with_action(
let (report, _, new_schema, _) = transact_terms_with_action(
conn, partition_map.clone(), schema, schema, NullWatcher(),
reversed_terms.into_iter().map(|t| t.rewrap()),
InternSet::new(), TransactorAction::Materialize
)?;
// Rewind operation generated a 'tx' and a 'txInstant' assertion, which got
// inserted into the 'datoms' table (due to TransactorAction::Materialize).
// This is problematic. If we transact a few more times, the transactor will
// generate the same 'tx', but with a different 'txInstant'.
// The end result will be a transaction which has a phantom
// retraction of a txInstant, since transactor operates against the state of
// 'datoms', and not against the 'transactions' table.
// A quick workaround is to just remove the bad txInstant datom.
// See test_clashing_tx_instants test case.
remove_tx_from_datoms(conn, report.tx_id)?;
last_schema = new_schema;
}
@ -191,7 +207,7 @@ mod tests {
};
conn.partition_map = pmap.clone();
}
#[test]
fn test_pop_simple() {
let mut conn = TestConn::default();
@ -284,7 +300,85 @@ mod tests {
"#);
}
#[test]
fn test_clashing_tx_instants() {
let mut conn = TestConn::default();
conn.sanitized_partition_map();
// Transact a basic schema.
assert_transact!(conn, r#"
[{:db/ident :person/name :db/valueType :db.type/string :db/cardinality :db.cardinality/one :db/unique :db.unique/identity :db/index true}]
"#);
// Make an assertion against our schema.
assert_transact!(conn, r#"[{:person/name "Vanya"}]"#);
// Move that assertion away from the main timeline.
let (new_schema, new_partition_map) = move_from_main_timeline(
&conn.sqlite, &conn.schema, conn.partition_map.clone(),
conn.last_tx_id().., 1
).expect("moved single tx");
update_conn(&mut conn, &new_schema, &new_partition_map);
// Assert that our datoms are now just the schema.
assert_matches!(conn.datoms(), "
[[?e :db/ident :person/name]
[?e :db/valueType :db.type/string]
[?e :db/cardinality :db.cardinality/one]
[?e :db/unique :db.unique/identity]
[?e :db/index true]]");
// Same for transactions.
assert_matches!(conn.transactions(), "
[[[?e :db/ident :person/name ?tx true]
[?e :db/valueType :db.type/string ?tx true]
[?e :db/cardinality :db.cardinality/one ?tx true]
[?e :db/unique :db.unique/identity ?tx true]
[?e :db/index true ?tx true]
[?tx :db/txInstant ?ms ?tx true]]]");
// Re-assert our initial fact against our schema.
assert_transact!(conn, r#"
[[:db/add "tempid" :person/name "Vanya"]]"#);
// Now, change that fact. This is the "clashing" transaction, if we're
// performing a timeline move using the transactor.
assert_transact!(conn, r#"
[[:db/add (lookup-ref :person/name "Vanya") :person/name "Ivan"]]"#);
// Assert that our datoms are now the schema and the final assertion.
assert_matches!(conn.datoms(), r#"
[[?e1 :db/ident :person/name]
[?e1 :db/valueType :db.type/string]
[?e1 :db/cardinality :db.cardinality/one]
[?e1 :db/unique :db.unique/identity]
[?e1 :db/index true]
[?e2 :person/name "Ivan"]]
"#);
// Assert that we have three correct looking transactions.
// This will fail if we're not cleaning up the 'datoms' table
// after the timeline move.
assert_matches!(conn.transactions(), r#"
[[
[?e1 :db/ident :person/name ?tx1 true]
[?e1 :db/valueType :db.type/string ?tx1 true]
[?e1 :db/cardinality :db.cardinality/one ?tx1 true]
[?e1 :db/unique :db.unique/identity ?tx1 true]
[?e1 :db/index true ?tx1 true]
[?tx1 :db/txInstant ?ms1 ?tx1 true]
]
[
[?e2 :person/name "Vanya" ?tx2 true]
[?tx2 :db/txInstant ?ms2 ?tx2 true]
]
[
[?e2 :person/name "Ivan" ?tx3 true]
[?e2 :person/name "Vanya" ?tx3 false]
[?tx3 :db/txInstant ?ms3 ?tx3 true]
]]
"#);
}
#[test]
fn test_pop_schema() {
let mut conn = TestConn::default();
@ -432,7 +526,7 @@ mod tests {
assert_matches!(conn.datoms(), "[]");
assert_matches!(conn.transactions(), "[]");
assert_eq!(conn.partition_map, partition_map0);
// Assert all of schema's components individually, for some guidance in case of failures:
assert_eq!(conn.schema.entid_map, schema0.entid_map);
assert_eq!(conn.schema.ident_map, schema0.ident_map);

View file

@ -107,7 +107,6 @@ pub use mentat::{
QueryResults,
RelResult,
Store,
Syncable,
TypedValue,
TxObserver,
TxReport,

View file

@ -10,11 +10,12 @@ path = "lib.rs"
[features]
default = ["syncable"]
sqlcipher = ["rusqlite/sqlcipher"]
syncable = ["tolstoy_traits"]
syncable = ["tolstoy_traits", "hyper", "serde_json"]
[dependencies]
failure = "0.1.1"
failure_derive = "0.1.1"
uuid = "0.5"
[dependencies.rusqlite]
version = "0.13"
@ -44,3 +45,11 @@ path = "../sql-traits"
[dependencies.tolstoy_traits]
path = "../tolstoy-traits"
optional = true
[dependencies.hyper]
version = "0.11"
optional = true
[dependencies.serde_json]
version = "1.0"
optional = true

View file

@ -13,8 +13,10 @@
use std; // To refer to std::result::Result.
use std::collections::BTreeSet;
use std::error::Error;
use rusqlite;
use uuid;
use edn;
@ -44,6 +46,12 @@ use tolstoy_traits::errors::{
TolstoyError,
};
#[cfg(feature = "syncable")]
use hyper;
#[cfg(feature = "syncable")]
use serde_json;
pub type Result<T> = std::result::Result<T, MentatError>;
#[derive(Debug, Fail)]
@ -97,8 +105,8 @@ pub enum MentatError {
// It would be better to capture the underlying `rusqlite::Error`, but that type doesn't
// implement many useful traits, including `Clone`, `Eq`, and `PartialEq`.
#[fail(display = "SQL error: {}", _0)]
RusqliteError(String),
#[fail(display = "SQL error: {}, cause: {}", _0, _1)]
RusqliteError(String, String),
#[fail(display = "{}", _0)]
EdnParseError(#[cause] edn::ParseError),
@ -118,9 +126,24 @@ pub enum MentatError {
#[fail(display = "{}", _0)]
SQLError(#[cause] SQLError),
#[fail(display = "{}", _0)]
UuidError(#[cause] uuid::ParseError),
#[cfg(feature = "syncable")]
#[fail(display = "{}", _0)]
TolstoyError(#[cause] TolstoyError),
#[cfg(feature = "syncable")]
#[fail(display = "{}", _0)]
NetworkError(#[cause] hyper::Error),
#[cfg(feature = "syncable")]
#[fail(display = "{}", _0)]
UriError(#[cause] hyper::error::UriError),
#[cfg(feature = "syncable")]
#[fail(display = "{}", _0)]
SerializationError(#[cause] serde_json::Error),
}
impl From<std::io::Error> for MentatError {
@ -131,7 +154,17 @@ impl From<std::io::Error> for MentatError {
impl From<rusqlite::Error> for MentatError {
fn from(error: rusqlite::Error) -> MentatError {
MentatError::RusqliteError(error.to_string())
let cause = match error.cause() {
Some(e) => e.to_string(),
None => "".to_string()
};
MentatError::RusqliteError(error.to_string(), cause)
}
}
impl From<uuid::ParseError> for MentatError {
fn from(error: uuid::ParseError) -> MentatError {
MentatError::UuidError(error)
}
}
@ -177,3 +210,24 @@ impl From<TolstoyError> for MentatError {
MentatError::TolstoyError(error)
}
}
#[cfg(feature = "syncable")]
impl From<serde_json::Error> for MentatError {
fn from(error: serde_json::Error) -> MentatError {
MentatError::SerializationError(error)
}
}
#[cfg(feature = "syncable")]
impl From<hyper::Error> for MentatError {
fn from(error: hyper::Error) -> MentatError {
MentatError::NetworkError(error)
}
}
#[cfg(feature = "syncable")]
impl From<hyper::error::UriError> for MentatError {
fn from(error: hyper::error::UriError) -> MentatError {
MentatError::UriError(error)
}
}

View file

@ -20,7 +20,16 @@ extern crate db_traits;
extern crate query_pull_traits;
extern crate query_projector_traits;
extern crate query_algebrizer_traits;
extern crate tolstoy_traits;
extern crate sql_traits;
extern crate uuid;
#[cfg(feature = "syncable")]
extern crate tolstoy_traits;
#[cfg(feature = "syncable")]
extern crate hyper;
#[cfg(feature = "syncable")]
extern crate serde_json;
pub mod errors;

View file

@ -118,10 +118,6 @@ pub struct Conn {
pub(crate) tx_observer_service: Mutex<TxObservationService>,
}
pub trait Syncable {
fn sync(&mut self, server_uri: &String, user_uuid: &String) -> Result<()>;
}
impl Conn {
// Intentionally not public.
fn new(partition_map: PartitionMap, schema: Schema) -> Conn {
@ -131,17 +127,6 @@ impl Conn {
}
}
/// Prepare the provided SQLite handle for use as a Mentat store. Creates tables but
/// _does not_ write the bootstrap schema. This constructor should only be used by
/// consumers that expect to populate raw transaction data themselves.
pub(crate) fn empty(sqlite: &mut rusqlite::Connection) -> Result<Conn> {
let (tx, db) = db::create_empty_current_version(sqlite)?;
tx.commit()?;
Ok(Conn::new(db.partition_map, db.schema))
}
pub fn connect(sqlite: &mut rusqlite::Connection) -> Result<Conn> {
let db = db::ensure_current_version(sqlite)?;
Ok(Conn::new(db.partition_map, db.schema))

View file

@ -178,13 +178,25 @@ pub mod query_builder;
pub mod store;
pub mod vocabulary;
#[cfg(feature = "syncable")]
mod sync;
#[cfg(feature = "syncable")]
pub use sync::{
Syncable,
};
#[cfg(feature = "syncable")]
pub use mentat_tolstoy::{
SyncReport,
};
pub use query_builder::{
QueryBuilder,
};
pub use conn::{
Conn,
Syncable,
};
pub use mentat_transaction::{

View file

@ -37,9 +37,6 @@ use mentat_db::{
TxObserver,
};
#[cfg(feature = "syncable")]
use mentat_tolstoy::Syncer;
use mentat_transaction::{
CacheAction,
CacheDirection,
@ -57,11 +54,6 @@ use public_traits::errors::{
Result,
};
#[cfg(feature = "syncable")]
use public_traits::errors::{
MentatError,
};
use mentat_transaction::query::{
PreparedResult,
QueryExplanation,
@ -69,6 +61,18 @@ use mentat_transaction::query::{
QueryOutput,
};
#[cfg(feature = "syncable")]
use mentat_tolstoy::{
SyncReport,
SyncResult,
SyncFollowup,
};
#[cfg(feature = "syncable")]
use sync::{
Syncable,
};
/// A convenience wrapper around a single SQLite connection and a Conn. This is suitable
/// for applications that don't require complex connection management.
pub struct Store {
@ -93,6 +97,32 @@ impl Store {
ip.commit()?;
Ok(report)
}
#[cfg(feature = "syncable")]
pub fn sync(&mut self, server_uri: &String, user_uuid: &String) -> Result<SyncResult> {
let mut reports = vec![];
loop {
let mut ip = self.begin_transaction()?;
let report = ip.sync(server_uri, user_uuid)?;
ip.commit()?;
match report {
SyncReport::Merge(SyncFollowup::FullSync) => {
reports.push(report);
continue
},
_ => {
reports.push(report);
break
},
}
}
if reports.len() == 1 {
Ok(SyncResult::Atomic(reports[0].clone()))
} else {
Ok(SyncResult::NonAtomic(reports))
}
}
}
#[cfg(feature = "sqlcipher")]
@ -209,26 +239,14 @@ impl Pullable for Store {
}
}
#[cfg(feature = "syncable")]
use uuid::Uuid;
#[cfg(feature = "syncable")]
use conn::Syncable;
#[cfg(feature = "syncable")]
impl Syncable for Store {
fn sync(&mut self, server_uri: &String, user_uuid: &String) -> Result<()> {
let uuid = Uuid::parse_str(&user_uuid).map_err(|_| MentatError::BadUuid(user_uuid.clone()))?;
Ok(Syncer::flow(&mut self.sqlite, server_uri, &uuid)?)
}
}
#[cfg(test)]
mod tests {
use super::*;
extern crate time;
use uuid::Uuid;
use std::collections::{
BTreeSet,
};
@ -244,8 +262,6 @@ mod tests {
Duration,
};
use uuid::Uuid;
use mentat_db::cache::{
SQLiteAttributeCache,
};

46
src/sync.rs Normal file
View file

@ -0,0 +1,46 @@
// Copyright 2018 Mozilla
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// this file except in compliance with the License. You may obtain a copy of the
// License at http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
use uuid::Uuid;
use mentat_transaction::{
InProgress,
};
use errors::{
Result,
};
use mentat_tolstoy::{
Syncer,
RemoteClient,
SyncReport,
};
pub trait Syncable {
fn sync(&mut self, server_uri: &String, user_uuid: &String) -> Result<SyncReport>;
}
impl<'a, 'c> Syncable for InProgress<'a, 'c> {
fn sync(&mut self, server_uri: &String, user_uuid: &String) -> Result<SyncReport> {
// Syncer behaves as if it's part of InProgress.
// This split into a separate crate is segment synchronization functionality
// in a single crate which can be easily disabled by consumers,
// and to separate concerns.
// But for all intents and purposes, Syncer operates over a "mentat transaction",
// which is exactly what InProgress represents.
let mut remote_client = RemoteClient::new(
server_uri.to_string(),
Uuid::parse_str(&user_uuid)?
);
Syncer::sync(self, &mut remote_client)
.map_err(|e| e.into())
}
}

File diff suppressed because it is too large Load diff

View file

@ -9,19 +9,24 @@
// specific language governing permissions and limitations under the License.
use std;
use std::error::Error;
use rusqlite;
use uuid;
use hyper;
use serde_json;
use db_traits::errors::DbError;
pub type Result<T> = ::std::result::Result<T, TolstoyError>;
use db_traits::errors::{
DbError,
};
#[derive(Debug, Fail)]
pub enum TolstoyError {
#[fail(display = "Received bad response from the server: {}", _0)]
BadServerResponse(String),
#[fail(display = "Received bad response from the remote: {}", _0)]
BadRemoteResponse(String),
// TODO expand this into concrete error types
#[fail(display = "Received bad remote state: {}", _0)]
BadRemoteState(String),
#[fail(display = "encountered more than one metadata value for key: {}", _0)]
DuplicateMetadata(String),
@ -46,8 +51,8 @@ pub enum TolstoyError {
// It would be better to capture the underlying `rusqlite::Error`, but that type doesn't
// implement many useful traits, including `Clone`, `Eq`, and `PartialEq`.
#[fail(display = "SQL error: {}", _0)]
RusqliteError(String),
#[fail(display = "SQL error: {}, cause: {}", _0, _1)]
RusqliteError(String, String),
#[fail(display = "{}", _0)]
IoError(#[cause] std::io::Error),
@ -76,7 +81,11 @@ impl From<serde_json::Error> for TolstoyError {
impl From<rusqlite::Error> for TolstoyError {
fn from(error: rusqlite::Error) -> TolstoyError {
TolstoyError::RusqliteError(error.to_string())
let cause = match error.cause() {
Some(e) => e.to_string(),
None => "".to_string()
};
TolstoyError::RusqliteError(error.to_string(), cause)
}
}

View file

@ -39,6 +39,12 @@ path = "../db-traits"
[dependencies.tolstoy_traits]
path = "../tolstoy-traits"
[dependencies.public_traits]
path = "../public-traits"
[dependencies.mentat_transaction]
path = "../transaction"
[dependencies.rusqlite]
version = "0.13"
features = ["limits"]

138
tolstoy/README.md Normal file
View file

@ -0,0 +1,138 @@
# Tolstoy, a Mentat Sync implementation
## Current state
This work is partially a proof-of-concept, partially an alpha implementation of how a generic sync might operate on top of a log-oriented storage layer a la Mentat.
## Overview
### Very briefly
Tolstoy will synchronize a local Mentat database against a remote server, modifying local state if necessary, and uploading changes to the server if necessary. Schema additions are allowed (adding vocabulary). Schema mutations are currently not implemented (changing vocabulary). Mentat's core schema must be the same on all participating clients (i.e. core schema alterations are unsupported).
**Basic example:**
Client 1 knows about `name` and `age` of a person.
```
[
{
:db/ident :person/name
:db/valueType :db.type/string
:db/cardinality :db.cardinality/one
}
{
:db/ident :person/age
:db/valueType :db.type/long
:db/cardinality :db.cardinality/one
}
{:person/name "Grisha" :person/age 30}
]
```
Client 2 doesn't know about `age`, but knows about `ssn`:
```
[
{
:db/ident :person/name
:db/valueType :db.type/string
:db/cardinality :db.cardinality/one
}
{
:db/ident :person/ssn
:db/valueType :db.type/long
:db/cardinality :db.cardinality/one
:db/unique :db.unique/identity
:db/index true
}
{:person/name "Grisha" :person/ssn 123}
]
```
Sync Client 1, then Client 2, then Client 1 again.
Entity `Grisha` will be "duplicated", since `:person/name` is not defined as unique.
```
[
[:person/name :db/ident :person/name]
[:person/name :db/valueType :db.type/string]
[:person/name :db/cardinality :db.cardinality/one]
[:person/age :db/ident :person/age]
[:person/age :db/valueType :db.type/long]
[:person/age :db/cardinality :db.cardinality/one]
[:person/ssn :db/ident :person/ssn]
[:person/ssn :db/valueType :db.type/long]
[:person/ssn :db/cardinality :db.cardinality/one]
[:person/ssn :db/unique :db.unique/identity]
[:person/ssn :db/index true]
[?grisha_one :person/name "Grisha"]
[?grisha_one :person/age 30]
[?grisha_two :person/name "Grisha"]
[?grisha_two :person/ssn 123]
]
```
If, instead, `:person/name` was defined as `:db/unique :db.unique/identity`, then our final state will be:
```
[
[...schema datoms...]
[?united_grisha :person/name "Grisha"]
[?united_grisha :person/age 30]
[?united_grisha :person/ssn 123]
]
```
Note that in above examples, symbols such as `?grisha_one` are meant to expand to some internal entitiy id (e.g. 65536).
### Try it via the CLI
In the Mentat CLI, a `.sync` operation exposes Tolstoy's functionality. Basic usage: `.sync http://path-to-server account-uuid`. Authentication, etc., is not implemented.
### In more detail...
Syncing is defined in terms of coming to an agreement between local and remote states. A local state is what's currently present on the current instance. A remote state is what's currently present on a server.
Mentat is a log-oriented store, and so "local" and "remote" are really just two transaction logs.
Internally, Tolstoy tracks the "locally known remote HEAD" and the "last-synced local transaction", which gives us three basic primitives:
- a shared root, a state which is at the root of both local and remote logs
- incoming changes - what remote changed on top of the shared root
- local changes of the shared root.
Thus, four kinds of positive-case things might occur during a sync:
- a no-op - there are no incoming changes and local didn't change
- a local fast-forward - there are remote changes, but no local changes
- a remote fast-forward - there are local changes, but no remote changes
- a merge - there are both local and remote changes.
The first three cases are "trivial" - we either do nothing, or we download and transact remote transactions, or we upload local transactions and advance the remote HEAD.
The merge case is a little more complicated. During a merging sync, a kind of a rebase is performed:
1. local transactions are moved off of the main timeline
2. remote transactions are transacted on top of the shared root
3. local transactions are transacted
Generally, intuition about the transactor's behaviour applies to reasoning about Tolstoy's sync as well. If a transaction "makes sense", it will be applied.
Remote transactions are applied "as-is", with an exception of the `txInstance` - it must be preserved, and so the datom describing it is re-written prior to application to use the `(transaction-tx)` transaction function.
Local transactions are rewritten to use tempids instead of their entids if they are assertions, and `(lookup-ref a v)` form in cases of retractions - but only if `lookup-ref` is guaranteed to succeed, otherwise retractions are dropped on the floor. Cases where local retractions are dropped:
- we're retracting an entitiy which isn't `:db/unique`
- we're retracting an entitiy which was already retracted by one of the `remote` transactions.
### Sync report
A sync operation produces either a single or multiple sync reports.
A single report - internally referred to as an atomic sync report - indicates that sync was able to finish within a single local database transaction.
Alternatively a non-atomic report is produced. It's a series of regular atomic reports. This indicates that sync required multiple "passes" to complete - e.g. a merge first, then remote fast-forward - and each step was performed within a separate local database transaction.
## Explicitly not supported - will abort with a NotYetImplemented
This alpha implementation doesn't support some cases, but it recognizes them and gracefully aborts (leaving local and remote states untouched):
- Syncing against a Mentat instance which uses a different core schema version.
- Syncing with schema mutations. Schema additions are fine, but transactions which change a set of attributes that define a user-defined `:db/ident` will cause sync to abort.
## Misc operational properties
- All sync operations happen in a context of an `InProgress` - an internal Mentat transaction representation. If sync succeeds, all necessary operations are comitted to the underlying database in a single SQLite transaction. Similarly, an aborting sync will simply drop an uncomitted transaction.
- "Follow-up" syncing is currently supported in a basic manner: if there are local changes arising from a merge operation, they are comitted to the local store, and a full sync is requested which is expected to fast-forward remote state in an optimal case, and if we lost the race to the server - to merge the local "merged state" with further remote changes.
## Server
Tolstoy operates against an instance of [Mentat Sync Prototype Server](https://github.com/rfk/mentat-sync-prototype/tree/480d43d7001cd92455fdbbd374255db458e18b6c). That repository defines a transaction-oriented API, which is all that Tolstoy expects of the server.

90
tolstoy/src/bootstrap.rs Normal file
View file

@ -0,0 +1,90 @@
// Copyright 2018 Mozilla
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// this file except in compliance with the License. You may obtain a copy of the
// License at http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
use mentat_core::{
Keyword,
};
use mentat_db::{
CORE_SCHEMA_VERSION,
};
use public_traits::errors::{
Result,
};
use tolstoy_traits::errors::{
TolstoyError,
};
use datoms::{
DatomsHelper,
};
use types::{
Tx,
};
pub struct BootstrapHelper<'a> {
parts: DatomsHelper<'a>
}
impl<'a> BootstrapHelper<'a> {
pub fn new(assumed_bootstrap_tx: &Tx) -> BootstrapHelper {
BootstrapHelper {
parts: DatomsHelper::new(&assumed_bootstrap_tx.parts),
}
}
// TODO we could also iterate through our own bootstrap schema definition and check that everything matches
// "version" is used here as a proxy for doing that work
pub fn is_compatible(&self) -> Result<bool> {
Ok(self.core_schema_version()? == CORE_SCHEMA_VERSION as i64)
}
pub fn core_schema_version(&self) -> Result<i64> {
match self.parts.ea_lookup(
Keyword::namespaced("db.schema", "core"),
Keyword::namespaced("db.schema", "version"),
) {
Some(v) => {
// TODO v is just a type tag and a Copy value, we shouldn't need to clone.
match v.clone().into_long() {
Some(v) => Ok(v),
None => bail!(TolstoyError::BadRemoteState("incorrect type for core schema version".to_string()))
}
},
None => bail!(TolstoyError::BadRemoteState("missing core schema version".to_string()))
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use mentat_db::debug::{
TestConn,
};
use debug::txs_after;
#[test]
fn test_bootstrap_version() {
let remote = TestConn::default();
let remote_txs = txs_after(&remote.sqlite, &remote.schema, remote.last_tx_id() - 1);
assert_eq!(1, remote_txs.len());
let bh = BootstrapHelper::new(&remote_txs[0]);
assert_eq!(1, bh.core_schema_version().expect("schema version"));
}
}

67
tolstoy/src/datoms.rs Normal file
View file

@ -0,0 +1,67 @@
// Copyright 2018 Mozilla
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// this file except in compliance with the License. You may obtain a copy of the
// License at http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
use edn::{
Keyword,
};
use core_traits::{
Entid,
TypedValue,
};
use types::TxPart;
/// A primitive query interface geared toward processing bootstrap-like sets of datoms.
pub struct DatomsHelper<'a> {
parts: &'a Vec<TxPart>,
}
impl<'a> DatomsHelper<'a> {
pub fn new(parts: &'a Vec<TxPart>) -> DatomsHelper {
DatomsHelper {
parts: parts,
}
}
// TODO these are obviously quite inefficient
pub fn e_lookup(&self, e: Keyword) -> Option<Entid> {
// This wraps Keyword (e) in ValueRc (aliased Arc), which is rather expensive.
let kw_e = TypedValue::Keyword(e.into());
for part in self.parts {
if kw_e == part.v && part.added {
return Some(part.e);
}
}
None
}
pub fn ea_lookup(&self, e: Keyword, a: Keyword) -> Option<&TypedValue> {
let e_e = self.e_lookup(e);
let a_e = self.e_lookup(a);
if e_e.is_none() || a_e.is_none() {
return None;
}
let e_e = e_e.unwrap();
let a_e = a_e.unwrap();
for part in self.parts {
if part.e == e_e && part.a == a_e && part.added {
return Some(&part.v);
}
}
None
}
}

105
tolstoy/src/debug.rs Normal file
View file

@ -0,0 +1,105 @@
// Copyright 2018 Mozilla
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// this file except in compliance with the License. You may obtain a copy of the
// License at http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
// TODO could hide this behind #[cfg(test)], since this is only for test use.
use rusqlite;
use uuid::Uuid;
use edn::entities::{
EntidOrIdent,
};
use core_traits::{
Entid,
TypedValue,
};
use mentat_core::{
HasSchema,
Schema,
};
use mentat_db::{
TypedSQLValue,
};
use mentat_db::debug::{
Datom,
Datoms,
transactions_after,
};
use types::{
Tx,
TxPart,
};
/// A rough equivalent of mentat_db::debug::transactions_after
/// for Tolstoy's Tx type.
pub fn txs_after(sqlite: &rusqlite::Connection, schema: &Schema, after: Entid) -> Vec<Tx> {
let transactions = transactions_after(
sqlite, schema, after
).expect("remote transactions");
let mut txs = vec![];
for transaction in transactions.0 {
let mut tx = Tx {
tx: Uuid::new_v4(),
parts: vec![],
};
for datom in &transaction.0 {
let e = match datom.e {
EntidOrIdent::Entid(ref e) => *e,
_ => panic!(),
};
let a = match datom.a {
EntidOrIdent::Entid(ref a) => *a,
EntidOrIdent::Ident(ref a) => schema.get_entid(a).unwrap().0,
};
tx.parts.push(TxPart {
partitions: None,
e: e,
a: a,
v: TypedValue::from_edn_value(&datom.v).unwrap(),
tx: datom.tx,
added: datom.added.unwrap()
});
}
txs.push(tx);
}
txs
}
pub fn part_to_datom(schema: &Schema, part: &TxPart) -> Datom {
Datom {
e: match schema.get_ident(part.e) {
Some(ident) => EntidOrIdent::Ident(ident.clone()),
None => EntidOrIdent::Entid(part.e),
},
a: match schema.get_ident(part.a) {
Some(ident) => EntidOrIdent::Ident(ident.clone()),
None => EntidOrIdent::Entid(part.a),
},
v: TypedValue::to_edn_value_pair(&part.v).0,
tx: part.tx,
added: Some(part.added),
}
}
pub fn parts_to_datoms(schema: &Schema, parts: &Vec<TxPart>) -> Datoms {
Datoms(parts.iter().map(|p| part_to_datom(schema, p)).collect())
}

View file

@ -27,22 +27,50 @@ extern crate serde;
extern crate serde_cbor;
extern crate serde_json;
// See https://github.com/rust-lang/rust/issues/44342#issuecomment-376010077.
#[cfg_attr(test, macro_use)] extern crate log;
#[cfg_attr(test, macro_use)] extern crate mentat_db;
extern crate log;
extern crate mentat_db;
extern crate mentat_core;
extern crate db_traits;
#[macro_use]
extern crate core_traits;
extern crate public_traits;
extern crate rusqlite;
extern crate uuid;
extern crate tolstoy_traits;
extern crate mentat_transaction;
pub mod schema;
pub mod bootstrap;
pub mod metadata;
pub mod tx_processor;
pub use metadata::{
PartitionsTable,
SyncMetadata,
};
mod datoms;
pub mod debug;
pub mod remote_client;
pub use remote_client::{
RemoteClient,
};
pub mod schema;
pub mod syncer;
pub use syncer::{
Syncer,
SyncReport,
SyncResult,
SyncFollowup,
};
mod tx_uploader;
pub mod logger;
pub mod tx_mapper;
pub use syncer::Syncer;
pub use tx_mapper::{
TxMapper,
};
pub mod tx_processor;
pub mod types;
pub use types::{
Tx,
TxPart,
GlobalTransactionLog,
};

32
tolstoy/src/logger.rs Normal file
View file

@ -0,0 +1,32 @@
// Copyright 2018 Mozilla
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// this file except in compliance with the License. You may obtain a copy of the
// License at http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
// TODO: use `log` crate.
// TODO it would be nice to be able to pass
// in a logger into Syncer::flow; would allow for a "debug mode"
// and getting useful logs out of clients.
// See https://github.com/mozilla/mentat/issues/571
// Below is some debug Android-friendly logging:
// use std::os::raw::c_char;
// use std::os::raw::c_int;
// use std::ffi::CString;
// pub const ANDROID_LOG_DEBUG: i32 = 3;
// extern { pub fn __android_log_write(prio: c_int, tag: *const c_char, text: *const c_char) -> c_int; }
pub fn d(message: &str) {
println!("d: {}", message);
// let message = CString::new(message).unwrap();
// let message = message.as_ptr();
// let tag = CString::new("RustyToodle").unwrap();
// let tag = tag.as_ptr();
// unsafe { __android_log_write(ANDROID_LOG_DEBUG, tag, message) };
}

View file

@ -13,21 +13,57 @@
use rusqlite;
use uuid::Uuid;
use core_traits::{
Entid,
};
use schema;
use tolstoy_traits::errors::{
TolstoyError,
use public_traits::errors::{
Result,
};
pub trait HeadTrackable {
fn remote_head(tx: &rusqlite::Transaction) -> Result<Uuid>;
fn set_remote_head(tx: &rusqlite::Transaction, uuid: &Uuid) -> Result<()>;
use tolstoy_traits::errors::{
TolstoyError,
};
use mentat_db::{
Partition,
PartitionMap,
db,
};
use types::{
LocalGlobalTxMapping,
};
use TxMapper;
// Could be Copy, but that might change
pub struct SyncMetadata {
// Local head: latest transaction that we have in the store,
// but with one caveat: its tx might will not be mapped if it's
// never been synced successfully.
// In other words: if latest tx isn't mapped, then HEAD moved
// since last sync and server needs to be updated.
pub root: Entid,
pub head: Entid,
}
pub struct SyncMetadataClient {}
pub enum PartitionsTable {
Core,
Tolstoy,
}
impl HeadTrackable for SyncMetadataClient {
fn remote_head(tx: &rusqlite::Transaction) -> Result<Uuid> {
impl SyncMetadata {
pub fn new(root: Entid, head: Entid) -> SyncMetadata {
SyncMetadata {
root: root,
head: head,
}
}
pub fn remote_head(tx: &rusqlite::Transaction) -> Result<Uuid> {
tx.query_row(
"SELECT value FROM tolstoy_metadata WHERE key = ?",
&[&schema::REMOTE_HEAD_KEY], |r| {
@ -37,7 +73,7 @@ impl HeadTrackable for SyncMetadataClient {
)?.map_err(|e| e.into())
}
fn set_remote_head(tx: &rusqlite::Transaction, uuid: &Uuid) -> Result<()> {
pub fn set_remote_head(tx: &rusqlite::Transaction, uuid: &Uuid) -> Result<()> {
let uuid_bytes = uuid.as_bytes().to_vec();
let updated = tx.execute("UPDATE tolstoy_metadata SET value = ? WHERE key = ?",
&[&uuid_bytes, &schema::REMOTE_HEAD_KEY])?;
@ -46,25 +82,135 @@ impl HeadTrackable for SyncMetadataClient {
}
Ok(())
}
pub fn set_remote_head_and_map(tx: &mut rusqlite::Transaction, mapping: LocalGlobalTxMapping) -> Result<()> {
SyncMetadata::set_remote_head(tx, mapping.remote)?;
TxMapper::set_lg_mapping(tx, mapping)?;
Ok(())
}
// TODO Functions below start to blur the line between mentat-proper and tolstoy...
pub fn get_partitions(tx: &rusqlite::Transaction, parts_table: PartitionsTable) -> Result<PartitionMap> {
match parts_table {
PartitionsTable::Core => {
db::read_partition_map(tx).map_err(|e| e.into())
},
PartitionsTable::Tolstoy => {
let mut stmt: ::rusqlite::Statement = tx.prepare("SELECT part, start, end, idx, allow_excision FROM tolstoy_parts")?;
let m: Result<PartitionMap> = stmt.query_and_then(&[], |row| -> Result<(String, Partition)> {
Ok((row.get_checked(0)?, Partition::new(row.get_checked(1)?, row.get_checked(2)?, row.get_checked(3)?, row.get_checked(4)?)))
})?.collect();
m
}
}
}
pub fn root_and_head_tx(tx: &rusqlite::Transaction) -> Result<(Entid, Entid)> {
let mut stmt: ::rusqlite::Statement = tx.prepare("SELECT tx FROM timelined_transactions WHERE timeline = 0 GROUP BY tx ORDER BY tx")?;
let txs: Vec<_> = stmt.query_and_then(&[], |row| -> Result<Entid> {
Ok(row.get_checked(0)?)
})?.collect();
let mut txs = txs.into_iter();
let root_tx = match txs.nth(0) {
None => bail!(TolstoyError::UnexpectedState(format!("Could not get root tx"))),
Some(t) => t?
};
match txs.last() {
None => Ok((root_tx, root_tx)),
Some(t) => Ok((root_tx, t?))
}
}
pub fn local_txs(db_tx: &rusqlite::Transaction, after: Option<Entid>) -> Result<Vec<Entid>> {
let after_clause = match after {
Some(t) => format!("WHERE timeline = 0 AND tx > {}", t),
None => format!("WHERE timeline = 0")
};
let mut stmt: ::rusqlite::Statement = db_tx.prepare(&format!("SELECT tx FROM timelined_transactions {} GROUP BY tx ORDER BY tx", after_clause))?;
let txs: Vec<_> = stmt.query_and_then(&[], |row| -> Result<Entid> {
Ok(row.get_checked(0)?)
})?.collect();
let mut all = Vec::with_capacity(txs.len());
for tx in txs {
all.push(tx?);
}
Ok(all)
}
pub fn is_tx_empty(db_tx: &rusqlite::Transaction, tx_id: Entid) -> Result<bool> {
let count = db_tx.query_row("SELECT count(rowid) FROM timelined_transactions WHERE timeline = 0 AND tx = ? AND e != ?", &[&tx_id, &tx_id], |row| -> Result<i64> {
Ok(row.get_checked(0)?)
})?;
Ok(count? == 0)
}
pub fn has_entity_assertions_in_tx(db_tx: &rusqlite::Transaction, e: Entid, tx_id: Entid) -> Result<bool> {
let count = db_tx.query_row("SELECT count(rowid) FROM timelined_transactions WHERE timeline = 0 AND tx = ? AND e = ?", &[&tx_id, &e], |row| -> Result<i64> {
Ok(row.get_checked(0)?)
})?;
Ok(count? > 0)
}
}
#[cfg(test)]
mod tests {
use super::*;
use mentat_db::db;
#[test]
fn test_get_remote_head_default() {
let mut conn = schema::tests::setup_conn();
let tx = conn.transaction().expect("db tx");
assert_eq!(Uuid::nil(), SyncMetadataClient::remote_head(&tx).expect("fetch succeeded"));
let mut conn = schema::tests::setup_conn_bare();
let tx = schema::tests::setup_tx(&mut conn);
assert_eq!(Uuid::nil(), SyncMetadata::remote_head(&tx).expect("fetch succeeded"));
}
#[test]
fn test_set_and_get_remote_head() {
let mut conn = schema::tests::setup_conn();
let mut conn = schema::tests::setup_conn_bare();
let tx = schema::tests::setup_tx(&mut conn);
let uuid = Uuid::new_v4();
let tx = conn.transaction().expect("db tx");
SyncMetadataClient::set_remote_head(&tx, &uuid).expect("update succeeded");
assert_eq!(uuid, SyncMetadataClient::remote_head(&tx).expect("fetch succeeded"));
SyncMetadata::set_remote_head(&tx, &uuid).expect("update succeeded");
assert_eq!(uuid, SyncMetadata::remote_head(&tx).expect("fetch succeeded"));
}
#[test]
fn test_root_and_head_tx() {
let mut conn = schema::tests::setup_conn_bare();
db::ensure_current_version(&mut conn).expect("mentat db init");
let db_tx = conn.transaction().expect("transaction");
let (root_tx, last_tx) = SyncMetadata::root_and_head_tx(&db_tx).expect("last tx");
assert_eq!(268435456, root_tx);
assert_eq!(268435456, last_tx);
// These are determenistic, but brittle.
// Inserting a tx 268435457 at time 1529971773701734
// 268435457|3|1529971773701734|268435457|1|4
// ... which defines entity ':person/name'...
// 65536|1|:person/name|268435457|1|13
// ... which has valueType of string
// 65536|7|27|268435457|1|0
// ... which is unique...
// 65536|9|36|268435457|1|0
// ... ident
// 65536|11|1|268435457|1|1
// last attribute is the timeline (0).
db_tx.execute("INSERT INTO timelined_transactions VALUES (?, ?, ?, ?, ?, ?, ?)", &[&268435457, &3, &1529971773701734_i64, &268435457, &1, &4, &0]).expect("inserted");
db_tx.execute("INSERT INTO timelined_transactions VALUES (?, ?, ?, ?, ?, ?, ?)", &[&65536, &1, &":person/name", &268435457, &1, &13, &0]).expect("inserted");
db_tx.execute("INSERT INTO timelined_transactions VALUES (?, ?, ?, ?, ?, ?, ?)", &[&65536, &7, &27, &268435457, &1, &0, &0]).expect("inserted");
db_tx.execute("INSERT INTO timelined_transactions VALUES (?, ?, ?, ?, ?, ?, ?)", &[&65536, &9, &36, &268435457, &1, &0, &0]).expect("inserted");
db_tx.execute("INSERT INTO timelined_transactions VALUES (?, ?, ?, ?, ?, ?, ?)", &[&65536, &11, &1, &268435457, &1, &1, &0]).expect("inserted");
let (root_tx, last_tx) = SyncMetadata::root_and_head_tx(&db_tx).expect("last tx");
assert_eq!(268435456, root_tx);
assert_eq!(268435457, last_tx);
}
}

View file

@ -0,0 +1,341 @@
// Copyright 2018 Mozilla
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// this file except in compliance with the License. You may obtain a copy of the
// License at http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
#![allow(dead_code)]
use std;
use futures::{future, Future, Stream};
use hyper;
// TODO: enable TLS support; hurdle is cross-compiling openssl for Android.
// See https://github.com/mozilla/mentat/issues/569
// use hyper_tls;
use hyper::{
Method,
Request,
StatusCode,
Error as HyperError
};
use hyper::header::{
ContentType,
};
// TODO: https://github.com/mozilla/mentat/issues/570
// use serde_cbor;
use serde_json;
use tokio_core::reactor::Core;
use uuid::Uuid;
use public_traits::errors::{
Result,
};
use logger::d;
use types::{
Tx,
TxPart,
GlobalTransactionLog,
};
#[derive(Serialize,Deserialize)]
struct SerializedHead {
head: Uuid
}
#[derive(Serialize)]
struct SerializedTransaction<'a> {
parent: &'a Uuid,
chunks: &'a Vec<Uuid>
}
#[derive(Deserialize)]
struct DeserializableTransaction {
parent: Uuid,
chunks: Vec<Uuid>,
id: Uuid,
seq: i64,
}
#[derive(Deserialize)]
struct SerializedTransactions {
limit: i64,
from: Uuid,
transactions: Vec<Uuid>,
}
pub struct RemoteClient {
base_uri: String,
user_uuid: Uuid,
}
impl RemoteClient {
pub fn new(base_uri: String, user_uuid: Uuid) -> Self {
RemoteClient {
base_uri: base_uri,
user_uuid: user_uuid,
}
}
fn bound_base_uri(&self) -> String {
// TODO escaping
format!("{}/{}", self.base_uri, self.user_uuid)
}
// TODO what we want is a method that returns a deserialized json structure.
// It'll need a type T so that consumers can specify what downloaded json will
// map to. I ran into borrow issues doing that - probably need to restructure
// this and use PhantomData markers or somesuch.
// But for now, we get code duplication.
fn get_uuid(&self, uri: String) -> Result<Uuid> {
let mut core = Core::new()?;
// TODO https://github.com/mozilla/mentat/issues/569
// let client = hyper::Client::configure()
// .connector(hyper_tls::HttpsConnector::new(4, &core.handle()).unwrap())
// .build(&core.handle());
let client = hyper::Client::new(&core.handle());
d(&format!("client"));
let uri = uri.parse()?;
d(&format!("parsed uri {:?}", uri));
let work = client.get(uri).and_then(|res| {
println!("Response: {}", res.status());
res.body().concat2().and_then(move |body| {
let json: SerializedHead = serde_json::from_slice(&body).map_err(|e| {
std::io::Error::new(std::io::ErrorKind::Other, e)
})?;
Ok(json)
})
});
d(&format!("running..."));
let head_json = core.run(work)?;
d(&format!("got head: {:?}", &head_json.head));
Ok(head_json.head)
}
fn put<T>(&self, uri: String, payload: T, expected: StatusCode) -> Result<()>
where hyper::Body: std::convert::From<T>, {
let mut core = Core::new()?;
// TODO https://github.com/mozilla/mentat/issues/569
// let client = hyper::Client::configure()
// .connector(hyper_tls::HttpsConnector::new(4, &core.handle()).unwrap())
// .build(&core.handle());
let client = hyper::Client::new(&core.handle());
let uri = uri.parse()?;
d(&format!("PUT {:?}", uri));
let mut req = Request::new(Method::Put, uri);
req.headers_mut().set(ContentType::json());
req.set_body(payload);
let put = client.request(req).and_then(|res| {
let status_code = res.status();
if status_code != expected {
d(&format!("bad put response: {:?}", status_code));
future::err(HyperError::Status)
} else {
future::ok(())
}
});
core.run(put)?;
Ok(())
}
fn get_transactions(&self, parent_uuid: &Uuid) -> Result<Vec<Uuid>> {
let mut core = Core::new()?;
// TODO https://github.com/mozilla/mentat/issues/569
// let client = hyper::Client::configure()
// .connector(hyper_tls::HttpsConnector::new(4, &core.handle()).unwrap())
// .build(&core.handle());
let client = hyper::Client::new(&core.handle());
d(&format!("client"));
let uri = format!("{}/transactions?from={}", self.bound_base_uri(), parent_uuid);
let uri = uri.parse()?;
d(&format!("parsed uri {:?}", uri));
let work = client.get(uri).and_then(|res| {
println!("Response: {}", res.status());
res.body().concat2().and_then(move |body| {
let json: SerializedTransactions = serde_json::from_slice(&body).map_err(|e| {
std::io::Error::new(std::io::ErrorKind::Other, e)
})?;
Ok(json)
})
});
d(&format!("running..."));
let transactions_json = core.run(work)?;
d(&format!("got transactions: {:?}", &transactions_json.transactions));
Ok(transactions_json.transactions)
}
fn get_chunks(&self, transaction_uuid: &Uuid) -> Result<Vec<Uuid>> {
let mut core = Core::new()?;
// TODO https://github.com/mozilla/mentat/issues/569
// let client = hyper::Client::configure()
// .connector(hyper_tls::HttpsConnector::new(4, &core.handle()).unwrap())
// .build(&core.handle());
let client = hyper::Client::new(&core.handle());
d(&format!("client"));
let uri = format!("{}/transactions/{}", self.bound_base_uri(), transaction_uuid);
let uri = uri.parse()?;
d(&format!("parsed uri {:?}", uri));
let work = client.get(uri).and_then(|res| {
println!("Response: {}", res.status());
res.body().concat2().and_then(move |body| {
let json: DeserializableTransaction = serde_json::from_slice(&body).map_err(|e| {
std::io::Error::new(std::io::ErrorKind::Other, e)
})?;
Ok(json)
})
});
d(&format!("running..."));
let transaction_json = core.run(work)?;
d(&format!("got transaction chunks: {:?}", &transaction_json.chunks));
Ok(transaction_json.chunks)
}
fn get_chunk(&self, chunk_uuid: &Uuid) -> Result<TxPart> {
let mut core = Core::new()?;
// TODO https://github.com/mozilla/mentat/issues/569
// let client = hyper::Client::configure()
// .connector(hyper_tls::HttpsConnector::new(4, &core.handle()).unwrap())
// .build(&core.handle());
let client = hyper::Client::new(&core.handle());
d(&format!("client"));
let uri = format!("{}/chunks/{}", self.bound_base_uri(), chunk_uuid);
let uri = uri.parse()?;
d(&format!("parsed uri {:?}", uri));
let work = client.get(uri).and_then(|res| {
println!("Response: {}", res.status());
res.body().concat2().and_then(move |body| {
let json: TxPart = serde_json::from_slice(&body).map_err(|e| {
std::io::Error::new(std::io::ErrorKind::Other, e)
})?;
Ok(json)
})
});
d(&format!("running..."));
let chunk = core.run(work)?;
d(&format!("got transaction chunk: {:?}", &chunk));
Ok(chunk)
}
}
impl GlobalTransactionLog for RemoteClient {
fn head(&self) -> Result<Uuid> {
let uri = format!("{}/head", self.bound_base_uri());
self.get_uuid(uri)
}
fn set_head(&mut self, uuid: &Uuid) -> Result<()> {
// {"head": uuid}
let head = SerializedHead {
head: uuid.clone()
};
let uri = format!("{}/head", self.bound_base_uri());
let json = serde_json::to_string(&head)?;
d(&format!("serialized head: {:?}", json));
self.put(uri, json, StatusCode::NoContent)
}
/// Slurp transactions and datoms after `tx`, returning them as owned data.
///
/// This is inefficient but convenient for development.
fn transactions_after(&self, tx: &Uuid) -> Result<Vec<Tx>> {
let new_txs = self.get_transactions(tx)?;
let mut tx_list = Vec::new();
for tx in new_txs {
let mut tx_parts = Vec::new();
let chunks = self.get_chunks(&tx)?;
// We pass along all of the downloaded parts, including transaction's
// metadata datom. Transactor is expected to do the right thing, and
// use txInstant from one of our datoms.
for chunk in chunks {
let part = self.get_chunk(&chunk)?;
tx_parts.push(part);
}
tx_list.push(Tx {
tx: tx.into(),
parts: tx_parts
});
}
d(&format!("got tx list: {:?}", &tx_list));
Ok(tx_list)
}
fn put_transaction(&mut self, transaction_uuid: &Uuid, parent_uuid: &Uuid, chunks: &Vec<Uuid>) -> Result<()> {
// {"parent": uuid, "chunks": [chunk1, chunk2...]}
let transaction = SerializedTransaction {
parent: parent_uuid,
chunks: chunks
};
let uri = format!("{}/transactions/{}", self.bound_base_uri(), transaction_uuid);
let json = serde_json::to_string(&transaction)?;
d(&format!("serialized transaction: {:?}", json));
self.put(uri, json, StatusCode::Created)
}
fn put_chunk(&mut self, chunk_uuid: &Uuid, payload: &TxPart) -> Result<()> {
let payload: String = serde_json::to_string(payload)?;
let uri = format!("{}/chunks/{}", self.bound_base_uri(), chunk_uuid);
d(&format!("serialized chunk: {:?}", payload));
// TODO don't want to clone every datom!
self.put(uri, payload, StatusCode::Created)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::str::FromStr;
#[test]
fn test_remote_client_bound_uri() {
let user_uuid = Uuid::from_str(&"316ea470-ce35-4adf-9c61-e0de6e289c59").expect("uuid");
let server_uri = String::from("https://example.com/api/0.1");
let remote_client = RemoteClient::new(server_uri, user_uuid);
assert_eq!("https://example.com/api/0.1/316ea470-ce35-4adf-9c61-e0de6e289c59", remote_client.bound_base_uri());
}
}

View file

@ -9,30 +9,44 @@
// specific language governing permissions and limitations under the License.
use rusqlite;
use tolstoy_traits::errors::Result;
pub static REMOTE_HEAD_KEY: &str = r#"remote_head"#;
use mentat_db::V1_PARTS as BOOTSTRAP_PARTITIONS;
use public_traits::errors::{
Result,
};
pub static REMOTE_HEAD_KEY: &str = r"remote_head";
pub static PARTITION_DB: &str = r":db.part/db";
pub static PARTITION_USER: &str = r":db.part/user";
pub static PARTITION_TX: &str = r":db.part/tx";
lazy_static! {
/// SQL statements to be executed, in order, to create the Tolstoy SQL schema (version 1).
/// "tolstoy_parts" records what the partitions were at the end of last sync, and is used
/// as a "root partition" during renumbering (a three-way merge of partitions).
#[cfg_attr(rustfmt, rustfmt_skip)]
static ref SCHEMA_STATEMENTS: Vec<&'static str> = { vec![
r#"CREATE TABLE IF NOT EXISTS tolstoy_tu (tx INTEGER PRIMARY KEY, uuid BLOB NOT NULL UNIQUE) WITHOUT ROWID"#,
r#"CREATE TABLE IF NOT EXISTS tolstoy_metadata (key BLOB NOT NULL UNIQUE, value BLOB NOT NULL)"#,
r#"CREATE INDEX IF NOT EXISTS idx_tolstoy_tu_ut ON tolstoy_tu (uuid, tx)"#,
"CREATE TABLE IF NOT EXISTS tolstoy_tu (tx INTEGER PRIMARY KEY, uuid BLOB NOT NULL UNIQUE) WITHOUT ROWID",
"CREATE TABLE IF NOT EXISTS tolstoy_metadata (key BLOB NOT NULL UNIQUE, value BLOB NOT NULL)",
"CREATE TABLE IF NOT EXISTS tolstoy_parts (part TEXT NOT NULL PRIMARY KEY, start INTEGER NOT NULL, end INTEGER NOT NULL, idx INTEGER NOT NULL, allow_excision SMALLINT NOT NULL)",
"CREATE INDEX IF NOT EXISTS idx_tolstoy_tu_ut ON tolstoy_tu (uuid, tx)",
]
};
}
pub fn ensure_current_version(conn: &mut rusqlite::Connection) -> Result<()> {
let tx = conn.transaction()?;
pub fn ensure_current_version(tx: &mut rusqlite::Transaction) -> Result<()> {
for statement in (&SCHEMA_STATEMENTS).iter() {
tx.execute(statement, &[])?;
}
// Initial partition information is what we'd see at bootstrap, and is used during first sync.
for (name, start, end, index, allow_excision) in BOOTSTRAP_PARTITIONS.iter() {
tx.execute("INSERT OR IGNORE INTO tolstoy_parts VALUES (?, ?, ?, ?, ?)", &[&name.to_string(), start, end, index, allow_excision])?;
}
tx.execute("INSERT OR IGNORE INTO tolstoy_metadata (key, value) VALUES (?, zeroblob(16))", &[&REMOTE_HEAD_KEY])?;
tx.commit().map_err(|e| e.into())
Ok(())
}
#[cfg(test)]
@ -40,7 +54,14 @@ pub mod tests {
use super::*;
use uuid::Uuid;
fn setup_conn_bare() -> rusqlite::Connection {
use metadata::{
PartitionsTable,
SyncMetadata,
};
use mentat_db::USER0;
pub fn setup_conn_bare() -> rusqlite::Connection {
let conn = rusqlite::Connection::open_in_memory().unwrap();
conn.execute_batch("
@ -54,19 +75,24 @@ pub mod tests {
conn
}
pub fn setup_conn() -> rusqlite::Connection {
let mut conn = setup_conn_bare();
ensure_current_version(&mut conn).expect("connection setup");
conn
pub fn setup_tx_bare<'a>(conn: &'a mut rusqlite::Connection) -> rusqlite::Transaction<'a> {
conn.transaction().expect("tx")
}
pub fn setup_tx<'a>(conn: &'a mut rusqlite::Connection) -> rusqlite::Transaction<'a> {
let mut tx = conn.transaction().expect("tx");
ensure_current_version(&mut tx).expect("connection setup");
tx
}
#[test]
fn test_empty() {
let mut conn = setup_conn_bare();
let mut tx = setup_tx_bare(&mut conn);
assert!(ensure_current_version(&mut conn).is_ok());
assert!(ensure_current_version(&mut tx).is_ok());
let mut stmt = conn.prepare("SELECT key FROM tolstoy_metadata WHERE value = zeroblob(16)").unwrap();
let mut stmt = tx.prepare("SELECT key FROM tolstoy_metadata WHERE value = zeroblob(16)").unwrap();
let mut keys_iter = stmt.query_map(&[], |r| r.get(0)).expect("query works");
let first: Result<String> = keys_iter.next().unwrap().map_err(|e| e.into());
@ -77,32 +103,46 @@ pub mod tests {
},
(_, _) => { panic!("Wrong number of results."); },
}
let partitions = SyncMetadata::get_partitions(&tx, PartitionsTable::Tolstoy).unwrap();
assert_eq!(partitions.len(), BOOTSTRAP_PARTITIONS.len());
for (name, start, end, index, allow_excision) in BOOTSTRAP_PARTITIONS.iter() {
let p = partitions.get(&name.to_string()).unwrap();
assert_eq!(p.start, *start);
assert_eq!(p.end, *end);
assert_eq!(p.next_entid(), *index);
assert_eq!(p.allow_excision, *allow_excision);
}
}
#[test]
fn test_non_empty() {
let mut conn = setup_conn_bare();
let mut tx = setup_tx_bare(&mut conn);
assert!(ensure_current_version(&mut conn).is_ok());
assert!(ensure_current_version(&mut tx).is_ok());
let test_uuid = Uuid::new_v4();
{
let tx = conn.transaction().unwrap();
let uuid_bytes = test_uuid.as_bytes().to_vec();
match tx.execute("UPDATE tolstoy_metadata SET value = ? WHERE key = ?", &[&uuid_bytes, &REMOTE_HEAD_KEY]) {
Err(e) => panic!("Error running an update: {}", e),
_ => ()
}
match tx.commit() {
Err(e) => panic!("Error committing an update: {}", e),
_ => ()
}
}
assert!(ensure_current_version(&mut conn).is_ok());
let new_idx = USER0 + 1;
match tx.execute("UPDATE tolstoy_parts SET idx = ? WHERE part = ?", &[&new_idx, &PARTITION_USER]) {
Err(e) => panic!("Error running an update: {}", e),
_ => ()
}
assert!(ensure_current_version(&mut tx).is_ok());
// Check that running ensure_current_version on an initialized conn doesn't change anything.
let mut stmt = conn.prepare("SELECT value FROM tolstoy_metadata").unwrap();
let mut stmt = tx.prepare("SELECT value FROM tolstoy_metadata").unwrap();
let mut values_iter = stmt.query_map(&[], |r| {
let raw_uuid: Vec<u8> = r.get(0);
Uuid::from_bytes(raw_uuid.as_slice()).unwrap()
@ -116,5 +156,13 @@ pub mod tests {
},
(_, _) => { panic!("Wrong number of results."); },
}
let partitions = SyncMetadata::get_partitions(&tx, PartitionsTable::Tolstoy).unwrap();
assert_eq!(partitions.len(), BOOTSTRAP_PARTITIONS.len());
let user_partition = partitions.get(PARTITION_USER).unwrap();
assert_eq!(user_partition.start, USER0);
assert_eq!(user_partition.next_entid(), new_idx);
}
}

File diff suppressed because it is too large Load diff

View file

@ -8,7 +8,6 @@
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
use std::collections::HashMap;
use rusqlite;
use uuid::Uuid;
@ -16,26 +15,37 @@ use core_traits::{
Entid,
};
use public_traits::errors::{
Result,
};
use tolstoy_traits::errors::{
TolstoyError,
Result,
};
use types::{
LocalGlobalTxMapping,
};
// Exposes a tx<->uuid mapping interface.
pub struct TxMapper {}
impl TxMapper {
pub fn set_bulk(db_tx: &mut rusqlite::Transaction, tx_uuid_map: &HashMap<Entid, Uuid>) -> Result<()> {
pub fn set_lg_mappings(db_tx: &mut rusqlite::Transaction, mappings: Vec<LocalGlobalTxMapping>) -> Result<()> {
let mut stmt = db_tx.prepare_cached(
"INSERT OR REPLACE INTO tolstoy_tu (tx, uuid) VALUES (?, ?)"
)?;
for (tx, uuid) in tx_uuid_map.iter() {
let uuid_bytes = uuid.as_bytes().to_vec();
stmt.execute(&[tx, &uuid_bytes])?;
for mapping in mappings.iter() {
let uuid_bytes = mapping.remote.as_bytes().to_vec();
stmt.execute(&[&mapping.local, &uuid_bytes])?;
}
Ok(())
}
pub fn set_lg_mapping(db_tx: &mut rusqlite::Transaction, mapping: LocalGlobalTxMapping) -> Result<()> {
TxMapper::set_lg_mappings(db_tx, vec![mapping])
}
// TODO for when we're downloading, right?
pub fn get_or_set_uuid_for_tx(db_tx: &mut rusqlite::Transaction, tx: Entid) -> Result<Uuid> {
match TxMapper::get(db_tx, tx)? {
@ -95,8 +105,8 @@ pub mod tests {
#[test]
fn test_getters() {
let mut conn = schema::tests::setup_conn();
let mut tx = conn.transaction().expect("db tx");
let mut conn = schema::tests::setup_conn_bare();
let mut tx = schema::tests::setup_tx(&mut conn);
assert_eq!(None, TxMapper::get(&mut tx, 1).expect("success"));
let set_uuid = TxMapper::get_or_set_uuid_for_tx(&mut tx, 1).expect("success");
assert_eq!(Some(set_uuid), TxMapper::get(&mut tx, 1).expect("success"));
@ -104,27 +114,29 @@ pub mod tests {
#[test]
fn test_bulk_setter() {
let mut conn = schema::tests::setup_conn();
let mut tx = conn.transaction().expect("db tx");
let mut map = HashMap::new();
let mut conn = schema::tests::setup_conn_bare();
let mut tx = schema::tests::setup_tx(&mut conn);
TxMapper::set_bulk(&mut tx, &map).expect("empty map success");
TxMapper::set_lg_mappings(&mut tx, vec![]).expect("empty map success");
let uuid1 = Uuid::new_v4();
let uuid2 = Uuid::new_v4();
map.insert(1, uuid1);
map.insert(2, uuid2);
TxMapper::set_bulk(&mut tx, &map).expect("map success");
TxMapper::set_lg_mappings(
&mut tx,
vec![(1, &uuid1).into(), (2, &uuid2).into()]
).expect("map success");
assert_eq!(Some(uuid1), TxMapper::get(&mut tx, 1).expect("success"));
assert_eq!(Some(uuid2), TxMapper::get(&mut tx, 2).expect("success"));
// Now let's replace one of mappings.
map.remove(&1);
// Now let's replace one of the mappings.
let new_uuid2 = Uuid::new_v4();
map.insert(2, new_uuid2);
TxMapper::set_bulk(&mut tx, &map).expect("map success");
TxMapper::set_lg_mappings(
&mut tx,
vec![(1, &uuid1).into(), (2, &new_uuid2).into()]
).expect("map success");
assert_eq!(Some(uuid1), TxMapper::get(&mut tx, 1).expect("success"));
assert_eq!(Some(new_uuid2), TxMapper::get(&mut tx, 2).expect("success"));
}

View file

@ -11,10 +11,6 @@ use std::iter::Peekable;
use rusqlite;
use tolstoy_traits::errors::{
Result,
};
use mentat_db::{
TypedSQLValue,
};
@ -24,19 +20,21 @@ use core_traits::{
TypedValue,
};
#[derive(Debug,Clone,Serialize,Deserialize)]
pub struct TxPart {
pub e: Entid,
pub a: Entid,
pub v: TypedValue,
pub tx: Entid,
pub added: bool,
}
use public_traits::errors::{
Result,
};
pub trait TxReceiver {
fn tx<T>(&mut self, tx_id: Entid, d: &mut T) -> Result<()>
where T: Iterator<Item=TxPart>;
fn done(&mut self) -> Result<()>;
use types::{
TxPart,
};
/// Implementors must specify type of the "receiver report" which
/// they will produce once processor is finished.
pub trait TxReceiver<RR> {
/// Called for each transaction, with an iterator over its datoms.
fn tx<T: Iterator<Item=TxPart>>(&mut self, tx_id: Entid, d: &mut T) -> Result<()>;
/// Called once processor is finished, consuming this receiver and producing a report.
fn done(self) -> RR;
}
pub struct Processor {}
@ -101,6 +99,7 @@ where T: Sized + Iterator<Item=Result<TxPart>> + 't {
Err(_) => None,
Ok(datom) => {
Some(TxPart {
partitions: None,
e: datom.e,
a: datom.a,
v: datom.v.clone(),
@ -118,25 +117,31 @@ where T: Sized + Iterator<Item=Result<TxPart>> + 't {
fn to_tx_part(row: &rusqlite::Row) -> Result<TxPart> {
Ok(TxPart {
e: row.get(0),
a: row.get(1),
v: TypedValue::from_sql_value_pair(row.get(2), row.get(3))?,
tx: row.get(4),
added: row.get(5),
partitions: None,
e: row.get_checked(0)?,
a: row.get_checked(1)?,
v: TypedValue::from_sql_value_pair(row.get_checked(2)?, row.get_checked(3)?)?,
tx: row.get_checked(4)?,
added: row.get_checked(5)?,
})
}
impl Processor {
pub fn process<R>(sqlite: &rusqlite::Transaction, from_tx: Option<Entid>, receiver: &mut R) -> Result<()>
where R: TxReceiver {
pub fn process<RR, R: TxReceiver<RR>>
(sqlite: &rusqlite::Transaction, from_tx: Option<Entid>, mut receiver: R) -> Result<RR> {
let tx_filter = match from_tx {
Some(tx) => format!(" WHERE tx > {} ", tx),
None => format!("")
Some(tx) => format!(" WHERE timeline = 0 AND tx > {} ", tx),
None => format!("WHERE timeline = 0")
};
let select_query = format!("SELECT e, a, v, value_type_tag, tx, added FROM transactions {} ORDER BY tx", tx_filter);
let select_query = format!("SELECT e, a, v, value_type_tag, tx, added FROM timelined_transactions {} ORDER BY tx", tx_filter);
let mut stmt = sqlite.prepare(&select_query)?;
let mut rows = stmt.query_and_then(&[], to_tx_part)?.peekable();
// Walk the transaction table, keeping track of the current "tx".
// Whenever "tx" changes, construct a datoms iterator and pass it to the receiver.
// NB: this logic depends on data coming out of the rows iterator to be sorted by "tx".
let mut current_tx = None;
while let Some(row) = rows.next() {
let datom = row?;
@ -160,7 +165,8 @@ impl Processor {
}
}
}
receiver.done()?;
Ok(())
// Consume the receiver, letting it produce a "receiver report"
// as defined by generic type RR.
Ok(receiver.done())
}
}

220
tolstoy/src/tx_uploader.rs Normal file
View file

@ -0,0 +1,220 @@
// Copyright 2018 Mozilla
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// this file except in compliance with the License. You may obtain a copy of the
// License at http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
use std::collections::HashMap;
use uuid::Uuid;
use core_traits::{
Entid,
};
use mentat_db::{
PartitionMap,
V1_PARTS,
};
use public_traits::errors::{
Result,
};
use tx_processor::{
TxReceiver,
};
use types::{
TxPart,
GlobalTransactionLog,
};
use logger::d;
pub struct UploaderReport {
pub temp_uuids: HashMap<Entid, Uuid>,
pub head: Option<Uuid>,
}
pub(crate) struct TxUploader<'c> {
tx_temp_uuids: HashMap<Entid, Uuid>,
remote_client: &'c mut GlobalTransactionLog,
remote_head: &'c Uuid,
rolling_temp_head: Option<Uuid>,
local_partitions: PartitionMap,
}
impl<'c> TxUploader<'c> {
pub fn new(client: &'c mut GlobalTransactionLog, remote_head: &'c Uuid, local_partitions: PartitionMap) -> TxUploader<'c> {
TxUploader {
tx_temp_uuids: HashMap::new(),
remote_client: client,
remote_head: remote_head,
rolling_temp_head: None,
local_partitions: local_partitions,
}
}
}
/// Given a set of entids and a partition map, returns a new PartitionMap that would result from
/// expanding the partitions to fit the entids.
fn allocate_partition_map_for_entids<T>(entids: T, local_partitions: &PartitionMap) -> PartitionMap
where T: Iterator<Item=Entid> {
let mut parts = HashMap::new();
for name in V1_PARTS.iter().map(|&(ref part, ..)| part.to_string()) {
// This shouldn't fail: locally-sourced partitions must be present within with V1_PARTS.
let p = local_partitions.get(&name).unwrap();
parts.insert(name, (p, p.clone()));
}
// For a given partition, set its index to one greater than the largest encountered entid within its partition space.
for entid in entids {
for (p, new_p) in parts.values_mut() {
if p.allows_entid(entid) && entid >= new_p.next_entid() {
new_p.set_next_entid(entid + 1);
}
}
}
let mut m = PartitionMap::default();
for (name, (_, new_p)) in parts {
m.insert(name, new_p);
}
m
}
impl<'c> TxReceiver<UploaderReport> for TxUploader<'c> {
fn tx<T>(&mut self, tx_id: Entid, datoms: &mut T) -> Result<()>
where T: Iterator<Item=TxPart> {
// Yes, we generate a new UUID for a given Tx, even if we might
// already have one mapped locally. Pre-existing local mapping will
// be replaced if this sync succeeds entirely.
// If we're seeing this tx again, it implies that previous attempt
// to sync didn't update our local head. Something went wrong last time,
// and it's unwise to try to re-use these remote tx mappings.
// We just leave garbage txs to be GC'd on the server.
let tx_uuid = Uuid::new_v4();
self.tx_temp_uuids.insert(tx_id, tx_uuid);
let mut tx_chunks = vec![];
// TODO separate bits of network work should be combined into single 'future'
let mut datoms: Vec<TxPart> = datoms.collect();
// TODO this should live within a transaction, once server support is in place.
// For now, we're uploading the PartitionMap in transaction's first chunk.
datoms[0].partitions = Some(allocate_partition_map_for_entids(datoms.iter().map(|d| d.e), &self.local_partitions));
// Upload all chunks.
for datom in &datoms {
let datom_uuid = Uuid::new_v4();
tx_chunks.push(datom_uuid);
d(&format!("putting chunk: {:?}, {:?}", &datom_uuid, &datom));
// TODO switch over to CBOR once we're past debugging stuff.
// See https://github.com/mozilla/mentat/issues/570
// let cbor_val = serde_cbor::to_value(&datom)?;
// self.remote_client.put_chunk(&datom_uuid, &serde_cbor::ser::to_vec_sd(&cbor_val)?)?;
self.remote_client.put_chunk(&datom_uuid, &datom)?;
}
// Upload tx.
// NB: At this point, we may choose to update remote & local heads.
// Depending on how much we're uploading, and how unreliable our connection
// is, this might be a good thing to do to ensure we make at least some progress.
// Comes at a cost of possibly increasing racing against other clients.
let tx_parent = match self.rolling_temp_head {
Some(p) => p,
None => *self.remote_head,
};
d(&format!("putting transaction: {:?}, {:?}, {:?}", &tx_uuid, &tx_parent, &tx_chunks));
self.remote_client.put_transaction(&tx_uuid, &tx_parent, &tx_chunks)?;
d(&format!("updating rolling head: {:?}", tx_uuid));
self.rolling_temp_head = Some(tx_uuid.clone());
Ok(())
}
fn done(self) -> UploaderReport {
UploaderReport {
temp_uuids: self.tx_temp_uuids,
head: self.rolling_temp_head,
}
}
}
#[cfg(test)]
pub mod tests {
use super::*;
use mentat_db::{
Partition,
V1_PARTS,
};
use schema::{
PARTITION_USER,
PARTITION_TX,
PARTITION_DB,
};
fn bootstrap_partition_map() -> PartitionMap {
V1_PARTS.iter()
.map(|&(ref part, start, end, index, allow_excision)| (part.to_string(), Partition::new(start, end, index, allow_excision)))
.collect()
}
#[test]
fn test_allocate_partition_map_for_entids() {
let bootstrap_map = bootstrap_partition_map();
// Empty list of entids should not allocate any space in partitions.
let entids: Vec<Entid> = vec![];
let no_op_map = allocate_partition_map_for_entids(entids.into_iter(), &bootstrap_map);
assert_eq!(bootstrap_map, no_op_map);
// Only user partition.
let entids = vec![65536];
let new_map = allocate_partition_map_for_entids(entids.into_iter(), &bootstrap_map);
assert_eq!(65537, new_map.get(PARTITION_USER).unwrap().next_entid());
// Other partitions are untouched.
assert_eq!(41, new_map.get(PARTITION_DB).unwrap().next_entid());
assert_eq!(268435456, new_map.get(PARTITION_TX).unwrap().next_entid());
// Only tx partition.
let entids = vec![268435666];
let new_map = allocate_partition_map_for_entids(entids.into_iter(), &bootstrap_map);
assert_eq!(268435667, new_map.get(PARTITION_TX).unwrap().next_entid());
// Other partitions are untouched.
assert_eq!(65536, new_map.get(PARTITION_USER).unwrap().next_entid());
assert_eq!(41, new_map.get(PARTITION_DB).unwrap().next_entid());
// Only DB partition.
let entids = vec![41];
let new_map = allocate_partition_map_for_entids(entids.into_iter(), &bootstrap_map);
assert_eq!(42, new_map.get(PARTITION_DB).unwrap().next_entid());
// Other partitions are untouched.
assert_eq!(65536, new_map.get(PARTITION_USER).unwrap().next_entid());
assert_eq!(268435456, new_map.get(PARTITION_TX).unwrap().next_entid());
// User and tx partitions.
let entids = vec![65537, 268435456];
let new_map = allocate_partition_map_for_entids(entids.into_iter(), &bootstrap_map);
assert_eq!(65538, new_map.get(PARTITION_USER).unwrap().next_entid());
assert_eq!(268435457, new_map.get(PARTITION_TX).unwrap().next_entid());
// DB partition is untouched.
assert_eq!(41, new_map.get(PARTITION_DB).unwrap().next_entid());
// DB, user and tx partitions.
let entids = vec![41, 65666, 268435457];
let new_map = allocate_partition_map_for_entids(entids.into_iter(), &bootstrap_map);
assert_eq!(65667, new_map.get(PARTITION_USER).unwrap().next_entid());
assert_eq!(268435458, new_map.get(PARTITION_TX).unwrap().next_entid());
assert_eq!(42, new_map.get(PARTITION_DB).unwrap().next_entid());
}
}

101
tolstoy/src/types.rs Normal file
View file

@ -0,0 +1,101 @@
// Copyright 2018 Mozilla
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// this file except in compliance with the License. You may obtain a copy of the
// License at http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
use std::cmp::Ordering;
use uuid::Uuid;
use core_traits::{
Entid,
TypedValue,
};
use mentat_db::PartitionMap;
use public_traits::errors::{
Result,
};
pub struct LocalGlobalTxMapping<'a> {
pub local: Entid,
pub remote: &'a Uuid,
}
impl<'a> From<(Entid, &'a Uuid)> for LocalGlobalTxMapping<'a> {
fn from((local, remote): (Entid, &'a Uuid)) -> LocalGlobalTxMapping {
LocalGlobalTxMapping {
local: local,
remote: remote,
}
}
}
impl<'a> LocalGlobalTxMapping<'a> {
pub fn new(local: Entid, remote: &'a Uuid) -> LocalGlobalTxMapping<'a> {
LocalGlobalTxMapping {
local: local,
remote: remote
}
}
}
// TODO unite these around something like `enum TxIdentifier {Global(Uuid), Local(Entid)}`?
#[derive(Debug, Clone)]
pub struct LocalTx {
pub tx: Entid,
pub parts: Vec<TxPart>,
}
impl PartialOrd for LocalTx {
fn partial_cmp(&self, other: &LocalTx) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for LocalTx {
fn cmp(&self, other: &LocalTx) -> Ordering {
self.tx.cmp(&other.tx)
}
}
impl PartialEq for LocalTx {
fn eq(&self, other: &LocalTx) -> bool {
self.tx == other.tx
}
}
impl Eq for LocalTx {}
// For returning out of the downloader as an ordered list.
#[derive(Debug, Clone, PartialEq)]
pub struct Tx {
pub tx: Uuid,
pub parts: Vec<TxPart>,
}
#[derive(Debug,Clone,Serialize,Deserialize,PartialEq)]
pub struct TxPart {
// TODO this is a temporary for development. Only first TxPart in a chunk series should have a non-None 'parts'.
// 'parts' should actually live in a transaction, but we do this now to avoid changing the server until dust settles.
pub partitions: Option<PartitionMap>,
pub e: Entid,
pub a: Entid,
pub v: TypedValue,
pub tx: Entid,
pub added: bool,
}
pub trait GlobalTransactionLog {
fn head(&self) -> Result<Uuid>;
fn transactions_after(&self, tx: &Uuid) -> Result<Vec<Tx>>;
fn set_head(&mut self, tx: &Uuid) -> Result<()>;
fn put_transaction(&mut self, tx: &Uuid, parent_tx: &Uuid, chunk_txs: &Vec<Uuid>) -> Result<()>;
fn put_chunk(&mut self, tx: &Uuid, payload: &TxPart) -> Result<()>;
}

View file

@ -47,11 +47,6 @@ use mentat::{
TypedValue,
};
#[cfg(feature = "syncable")]
use mentat::{
Syncable,
};
use command_parser::{
Command,
};
@ -356,7 +351,7 @@ impl Repl {
#[cfg(feature = "syncable")]
Command::Sync(args) => {
match self.store.sync(&args[0], &args[1]) {
Ok(_) => println!("Synced!"),
Ok(report) => println!("Sync report: {}", report),
Err(e) => eprintln!("{:?}", e)
};
},
@ -403,7 +398,7 @@ impl Repl {
if self.path.is_empty() || path != self.path {
let next = match encryption_key {
#[cfg(not(feature = "sqlcipher"))]
Some(_) => return Err(::mentat::MentatError::RusqliteError(".open_encrypted requires the sqlcipher Mentat feature".into())),
Some(_) => return Err(::mentat::MentatError::RusqliteError(".open_encrypted requires the sqlcipher Mentat feature".into(), "".into())),
#[cfg(feature = "sqlcipher")]
Some(k) => {
Store::open_with_key(path.as_str(), k)?

View file

@ -84,6 +84,7 @@ use public_traits::errors::{
pub type Terms = (Vec<Entity<TypedValue>>, InternSet<TempId>);
#[derive(Debug)]
pub struct TermBuilder {
tempids: InternSet<TempId>,
terms: Vec<Entity<TypedValue>>,

View file

@ -320,6 +320,21 @@ impl<'a, 'c> InProgress<'a, 'c> {
pub fn last_tx_id(&self) -> Entid {
self.partition_map[":db.part/tx"].next_entid() - 1
}
pub fn savepoint(&self, name: &str) -> Result<()> {
self.transaction.execute(&format!("SAVEPOINT {}", name), &[])?;
Ok(())
}
pub fn rollback_savepoint(&self, name: &str) -> Result<()> {
self.transaction.execute(&format!("ROLLBACK TO {}", name), &[])?;
Ok(())
}
pub fn release_savepoint(&self, name: &str) -> Result<()> {
self.transaction.execute(&format!("RELEASE {}", name), &[])?;
Ok(())
}
}
impl<'a, 'c> InProgressRead<'a, 'c> {