501 lines
18 KiB
Rust
501 lines
18 KiB
Rust
// Copyright 2018 Mozilla
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
|
|
// this file except in compliance with the License. You may obtain a copy of the
|
|
// License at http://www.apache.org/licenses/LICENSE-2.0
|
|
// Unless required by applicable law or agreed to in writing, software distributed
|
|
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
|
|
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
|
|
// specific language governing permissions and limitations under the License.
|
|
|
|
use std;
|
|
use std::collections::HashMap;
|
|
|
|
use futures::{future, Future, Stream};
|
|
use hyper;
|
|
// TODO: enable TLS support; hurdle is cross-compiling openssl for Android.
|
|
// See https://github.com/mozilla/mentat/issues/569
|
|
// use hyper_tls;
|
|
use hyper::{Method, Request, StatusCode, Error as HyperError};
|
|
use hyper::header::{ContentType};
|
|
use rusqlite;
|
|
// TODO: https://github.com/mozilla/mentat/issues/570
|
|
// use serde_cbor;
|
|
use serde_json;
|
|
use tokio_core::reactor::Core;
|
|
use uuid::Uuid;
|
|
|
|
use core_traits::{
|
|
Entid,
|
|
};
|
|
|
|
use metadata::SyncMetadataClient;
|
|
use metadata::HeadTrackable;
|
|
use schema::ensure_current_version;
|
|
|
|
use tolstoy_traits::errors::{
|
|
TolstoyError,
|
|
Result,
|
|
};
|
|
|
|
use tx_processor::{
|
|
Processor,
|
|
TxReceiver,
|
|
TxPart,
|
|
};
|
|
|
|
use tx_mapper::TxMapper;
|
|
|
|
// TODO it would be nice to be able to pass
|
|
// in a logger into Syncer::flow; would allow for a "debug mode"
|
|
// and getting useful logs out of clients.
|
|
// See https://github.com/mozilla/mentat/issues/571
|
|
// Below is some debug Android-friendly logging:
|
|
|
|
// use std::os::raw::c_char;
|
|
// use std::os::raw::c_int;
|
|
// use std::ffi::CString;
|
|
// pub const ANDROID_LOG_DEBUG: i32 = 3;
|
|
// extern { pub fn __android_log_write(prio: c_int, tag: *const c_char, text: *const c_char) -> c_int; }
|
|
|
|
pub fn d(message: &str) {
|
|
println!("d: {}", message);
|
|
// let message = CString::new(message).unwrap();
|
|
// let message = message.as_ptr();
|
|
// let tag = CString::new("RustyToodle").unwrap();
|
|
// let tag = tag.as_ptr();
|
|
// unsafe { __android_log_write(ANDROID_LOG_DEBUG, tag, message) };
|
|
}
|
|
|
|
pub struct Syncer {}
|
|
|
|
// TODO this is sub-optimal, we don't need to walk the table
|
|
// to query the last thing in it w/ an index on tx!!
|
|
// but it's the hammer at hand!
|
|
// See https://github.com/mozilla/mentat/issues/572
|
|
struct InquiringTxReceiver {
|
|
pub last_tx: Option<Entid>,
|
|
pub is_done: bool,
|
|
}
|
|
|
|
impl InquiringTxReceiver {
|
|
fn new() -> InquiringTxReceiver {
|
|
InquiringTxReceiver {
|
|
last_tx: None,
|
|
is_done: false,
|
|
}
|
|
}
|
|
}
|
|
|
|
impl TxReceiver for InquiringTxReceiver {
|
|
fn tx<T>(&mut self, tx_id: Entid, _datoms: &mut T) -> Result<()>
|
|
where T: Iterator<Item=TxPart> {
|
|
self.last_tx = Some(tx_id);
|
|
Ok(())
|
|
}
|
|
|
|
fn done(&mut self) -> Result<()> {
|
|
self.is_done = true;
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
struct UploadingTxReceiver<'c> {
|
|
pub tx_temp_uuids: HashMap<Entid, Uuid>,
|
|
pub is_done: bool,
|
|
remote_client: &'c RemoteClient,
|
|
remote_head: &'c Uuid,
|
|
rolling_temp_head: Option<Uuid>,
|
|
}
|
|
|
|
impl<'c> UploadingTxReceiver<'c> {
|
|
fn new(client: &'c RemoteClient, remote_head: &'c Uuid) -> UploadingTxReceiver<'c> {
|
|
UploadingTxReceiver {
|
|
tx_temp_uuids: HashMap::new(),
|
|
remote_client: client,
|
|
remote_head: remote_head,
|
|
rolling_temp_head: None,
|
|
is_done: false
|
|
}
|
|
}
|
|
}
|
|
|
|
impl<'c> TxReceiver for UploadingTxReceiver<'c> {
|
|
fn tx<T>(&mut self, tx_id: Entid, datoms: &mut T) -> Result<()>
|
|
where T: Iterator<Item=TxPart> {
|
|
// Yes, we generate a new UUID for a given Tx, even if we might
|
|
// already have one mapped locally. Pre-existing local mapping will
|
|
// be replaced if this sync succeeds entirely.
|
|
// If we're seeing this tx again, it implies that previous attempt
|
|
// to sync didn't update our local head. Something went wrong last time,
|
|
// and it's unwise to try to re-use these remote tx mappings.
|
|
// We just leave garbage txs to be GC'd on the server.
|
|
let tx_uuid = Uuid::new_v4();
|
|
self.tx_temp_uuids.insert(tx_id, tx_uuid);
|
|
let mut tx_chunks = vec![];
|
|
|
|
// TODO separate bits of network work should be combined into single 'future'
|
|
|
|
// Upload all chunks.
|
|
for datom in datoms {
|
|
let datom_uuid = Uuid::new_v4();
|
|
tx_chunks.push(datom_uuid);
|
|
d(&format!("putting chunk: {:?}, {:?}", &datom_uuid, &datom));
|
|
// TODO switch over to CBOR once we're past debugging stuff.
|
|
// See https://github.com/mozilla/mentat/issues/570
|
|
// let cbor_val = serde_cbor::to_value(&datom)?;
|
|
// self.remote_client.put_chunk(&datom_uuid, &serde_cbor::ser::to_vec_sd(&cbor_val)?)?;
|
|
self.remote_client.put_chunk(&datom_uuid, &serde_json::to_string(&datom)?)?;
|
|
}
|
|
|
|
// Upload tx.
|
|
// NB: At this point, we may choose to update remote & local heads.
|
|
// Depending on how much we're uploading, and how unreliable our connection
|
|
// is, this might be a good thing to do to ensure we make at least some progress.
|
|
// Comes at a cost of possibly increasing racing against other clients.
|
|
match self.rolling_temp_head {
|
|
Some(parent) => {
|
|
d(&format!("putting transaction: {:?}, {:?}, {:?}", &tx_uuid, &parent, &tx_chunks));
|
|
self.remote_client.put_transaction(&tx_uuid, &parent, &tx_chunks)?;
|
|
|
|
},
|
|
None => {
|
|
d(&format!("putting transaction: {:?}, {:?}, {:?}", &tx_uuid, &self.remote_head, &tx_chunks));
|
|
self.remote_client.put_transaction(&tx_uuid, self.remote_head, &tx_chunks)?;
|
|
}
|
|
}
|
|
|
|
d(&format!("updating rolling head: {:?}", tx_uuid));
|
|
self.rolling_temp_head = Some(tx_uuid.clone());
|
|
|
|
Ok(())
|
|
}
|
|
|
|
fn done(&mut self) -> Result<()> {
|
|
self.is_done = true;
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
impl Syncer {
|
|
fn upload_ours(db_tx: &mut rusqlite::Transaction, from_tx: Option<Entid>, remote_client: &RemoteClient, remote_head: &Uuid) -> Result<()> {
|
|
let mut uploader = UploadingTxReceiver::new(remote_client, remote_head);
|
|
Processor::process(db_tx, from_tx, &mut uploader)?;
|
|
if !uploader.is_done {
|
|
bail!(TolstoyError::TxProcessorUnfinished);
|
|
}
|
|
// Last tx uuid uploaded by the tx receiver.
|
|
// It's going to be our new head.
|
|
if let Some(last_tx_uploaded) = uploader.rolling_temp_head {
|
|
// Upload remote head.
|
|
remote_client.put_head(&last_tx_uploaded)?;
|
|
|
|
// On succes:
|
|
// - persist local mappings from the receiver
|
|
// - update our local "remote head".
|
|
TxMapper::set_bulk(db_tx, &uploader.tx_temp_uuids)?;
|
|
SyncMetadataClient::set_remote_head(db_tx, &last_tx_uploaded)?;
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
pub fn flow(sqlite: &mut rusqlite::Connection, server_uri: &String, user_uuid: &Uuid) -> Result<()> {
|
|
d(&format!("sync flowing"));
|
|
|
|
ensure_current_version(sqlite)?;
|
|
|
|
// TODO configure this sync with some auth data
|
|
let remote_client = RemoteClient::new(server_uri.clone(), user_uuid.clone());
|
|
let mut db_tx = sqlite.transaction()?;
|
|
|
|
let remote_head = remote_client.get_head()?;
|
|
d(&format!("remote head {:?}", remote_head));
|
|
|
|
let locally_known_remote_head = SyncMetadataClient::remote_head(&db_tx)?;
|
|
d(&format!("local head {:?}", locally_known_remote_head));
|
|
|
|
// Local head: latest transaction that we have in the store,
|
|
// but with one caveat: its tx might will not be mapped if it's
|
|
// never been synced successfully.
|
|
// In other words: if latest tx isn't mapped, then HEAD moved
|
|
// since last sync and server needs to be updated.
|
|
let mut inquiring_tx_receiver = InquiringTxReceiver::new();
|
|
// TODO don't just start from the beginning... but then again, we should do this
|
|
// without walking the table at all, and use the tx index.
|
|
Processor::process(&db_tx, None, &mut inquiring_tx_receiver)?;
|
|
if !inquiring_tx_receiver.is_done {
|
|
bail!(TolstoyError::TxProcessorUnfinished);
|
|
}
|
|
let have_local_changes = match inquiring_tx_receiver.last_tx {
|
|
Some(tx) => {
|
|
match TxMapper::get(&db_tx, tx)? {
|
|
Some(_) => false,
|
|
None => true
|
|
}
|
|
},
|
|
None => false
|
|
};
|
|
|
|
// Check if the server is empty - populate it.
|
|
if remote_head == Uuid::nil() {
|
|
d(&format!("empty server!"));
|
|
Syncer::upload_ours(&mut db_tx, None, &remote_client, &remote_head)?;
|
|
|
|
// Check if the server is the same as us, and if our HEAD moved.
|
|
} else if locally_known_remote_head == remote_head {
|
|
d(&format!("server unchanged since last sync."));
|
|
|
|
if !have_local_changes {
|
|
d(&format!("local HEAD did not move. Nothing to do!"));
|
|
return Ok(());
|
|
}
|
|
|
|
d(&format!("local HEAD moved."));
|
|
// TODO it's possible that we've successfully advanced remote head previously,
|
|
// but failed to advance our own local head. If that's the case, and we can recognize it,
|
|
// our sync becomes just bumping our local head. AFAICT below would currently fail.
|
|
if let Some(upload_from_tx) = TxMapper::get_tx_for_uuid(&db_tx, &locally_known_remote_head)? {
|
|
d(&format!("Fast-forwarding the server."));
|
|
Syncer::upload_ours(&mut db_tx, Some(upload_from_tx), &remote_client, &remote_head)?;
|
|
} else {
|
|
d(&format!("Unable to fast-forward the server; missing local tx mapping"));
|
|
bail!(TolstoyError::TxIncorrectlyMapped(0));
|
|
}
|
|
|
|
// We diverged from the server.
|
|
// We'll need to rebase/merge ourselves on top of it.
|
|
} else {
|
|
d(&format!("server changed since last sync."));
|
|
|
|
bail!(TolstoyError::NotYetImplemented(
|
|
format!("Can't yet sync against changed server. Local head {:?}, remote head {:?}", locally_known_remote_head, remote_head)
|
|
));
|
|
}
|
|
|
|
// Commit everything, if there's anything to commit!
|
|
// Any new tx->uuid mappings and the new HEAD. We're synced!
|
|
db_tx.commit()?;
|
|
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
#[derive(Serialize,Deserialize)]
|
|
struct SerializedHead {
|
|
head: Uuid
|
|
}
|
|
|
|
#[derive(Serialize)]
|
|
struct SerializedTransaction<'a> {
|
|
parent: &'a Uuid,
|
|
chunks: &'a Vec<Uuid>
|
|
}
|
|
|
|
struct RemoteClient {
|
|
base_uri: String,
|
|
user_uuid: Uuid
|
|
}
|
|
|
|
|
|
impl RemoteClient {
|
|
fn new(base_uri: String, user_uuid: Uuid) -> Self {
|
|
RemoteClient {
|
|
base_uri: base_uri,
|
|
user_uuid: user_uuid
|
|
}
|
|
}
|
|
|
|
fn bound_base_uri(&self) -> String {
|
|
// TODO escaping
|
|
format!("{}/{}", self.base_uri, self.user_uuid)
|
|
}
|
|
|
|
fn get_uuid(&self, uri: String) -> Result<Uuid> {
|
|
let mut core = Core::new()?;
|
|
// TODO enable TLS, see https://github.com/mozilla/mentat/issues/569
|
|
// let client = hyper::Client::configure()
|
|
// .connector(hyper_tls::HttpsConnector::new(4, &core.handle()).unwrap())
|
|
// .build(&core.handle());
|
|
let client = hyper::Client::new(&core.handle());
|
|
|
|
d(&format!("client"));
|
|
|
|
let uri = uri.parse()?;
|
|
|
|
d(&format!("parsed uri {:?}", uri));
|
|
|
|
let work = client.get(uri).and_then(|res| {
|
|
println!("Response: {}", res.status());
|
|
|
|
res.body().concat2().and_then(move |body| {
|
|
let head_json: SerializedHead = serde_json::from_slice(&body).map_err(|e| {
|
|
std::io::Error::new(std::io::ErrorKind::Other, e)
|
|
})?;
|
|
Ok(head_json)
|
|
})
|
|
});
|
|
|
|
d(&format!("running..."));
|
|
|
|
let head_json = core.run(work)?;
|
|
d(&format!("got head: {:?}", &head_json.head));
|
|
Ok(head_json.head)
|
|
}
|
|
|
|
fn put<T>(&self, uri: String, payload: T, expected: StatusCode) -> Result<()>
|
|
where hyper::Body: std::convert::From<T>, {
|
|
let mut core = Core::new()?;
|
|
// TODO enable TLS, see https://github.com/mozilla/mentat/issues/569
|
|
// let client = hyper::Client::configure()
|
|
// .connector(hyper_tls::HttpsConnector::new(4, &core.handle()).unwrap())
|
|
// .build(&core.handle());
|
|
let client = hyper::Client::new(&core.handle());
|
|
|
|
let uri = uri.parse()?;
|
|
|
|
d(&format!("PUT {:?}", uri));
|
|
|
|
let mut req = Request::new(Method::Put, uri);
|
|
req.headers_mut().set(ContentType::json());
|
|
req.set_body(payload);
|
|
|
|
let put = client.request(req).and_then(|res| {
|
|
let status_code = res.status();
|
|
|
|
if status_code != expected {
|
|
d(&format!("bad put response: {:?}", status_code));
|
|
future::err(HyperError::Status)
|
|
} else {
|
|
future::ok(())
|
|
}
|
|
});
|
|
|
|
core.run(put)?;
|
|
Ok(())
|
|
}
|
|
|
|
fn put_transaction(&self, transaction_uuid: &Uuid, parent_uuid: &Uuid, chunks: &Vec<Uuid>) -> Result<()> {
|
|
// {"parent": uuid, "chunks": [chunk1, chunk2...]}
|
|
let transaction = SerializedTransaction {
|
|
parent: parent_uuid,
|
|
chunks: chunks
|
|
};
|
|
|
|
let uri = format!("{}/transactions/{}", self.bound_base_uri(), transaction_uuid);
|
|
let json = serde_json::to_string(&transaction)?;
|
|
d(&format!("serialized transaction: {:?}", json));
|
|
self.put(uri, json, StatusCode::Created)
|
|
}
|
|
|
|
fn get_head(&self) -> Result<Uuid> {
|
|
let uri = format!("{}/head", self.bound_base_uri());
|
|
self.get_uuid(uri)
|
|
}
|
|
|
|
fn put_head(&self, uuid: &Uuid) -> Result<()> {
|
|
// {"head": uuid}
|
|
let head = SerializedHead {
|
|
head: uuid.clone()
|
|
};
|
|
|
|
let uri = format!("{}/head", self.bound_base_uri());
|
|
let json = serde_json::to_string(&head)?;
|
|
d(&format!("serialized head: {:?}", json));
|
|
self.put(uri, json, StatusCode::NoContent)
|
|
}
|
|
|
|
fn put_chunk(&self, chunk_uuid: &Uuid, payload: &String) -> Result<()> {
|
|
let uri = format!("{}/chunks/{}", self.bound_base_uri(), chunk_uuid);
|
|
d(&format!("serialized chunk: {:?}", payload));
|
|
// TODO don't want to clone every datom!
|
|
self.put(uri, payload.clone(), StatusCode::Created)
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
use std::borrow::Borrow;
|
|
use std::str::FromStr;
|
|
|
|
use edn;
|
|
|
|
use mentat_db::debug::{TestConn};
|
|
|
|
#[test]
|
|
fn test_remote_client_bound_uri() {
|
|
let user_uuid = Uuid::from_str(&"316ea470-ce35-4adf-9c61-e0de6e289c59").expect("uuid");
|
|
let server_uri = String::from("https://example.com/api/0.1");
|
|
let remote_client = RemoteClient::new(server_uri, user_uuid);
|
|
assert_eq!("https://example.com/api/0.1/316ea470-ce35-4adf-9c61-e0de6e289c59", remote_client.bound_base_uri());
|
|
}
|
|
|
|
#[test]
|
|
fn test_add() {
|
|
let mut conn = TestConn::default();
|
|
|
|
// Test inserting :db.cardinality/one elements.
|
|
assert_transact!(conn, "[[:db/add 100 :db.schema/version 1]
|
|
[:db/add 101 :db.schema/version 2]]");
|
|
assert_matches!(conn.last_transaction(),
|
|
"[[100 :db.schema/version 1 ?tx true]
|
|
[101 :db.schema/version 2 ?tx true]
|
|
[?tx :db/txInstant ?ms ?tx true]]");
|
|
assert_matches!(conn.datoms(),
|
|
"[[100 :db.schema/version 1]
|
|
[101 :db.schema/version 2]]");
|
|
|
|
// Test inserting :db.cardinality/many elements.
|
|
assert_transact!(conn, "[[:db/add 200 :db.schema/attribute 100]
|
|
[:db/add 200 :db.schema/attribute 101]]");
|
|
assert_matches!(conn.last_transaction(),
|
|
"[[200 :db.schema/attribute 100 ?tx true]
|
|
[200 :db.schema/attribute 101 ?tx true]
|
|
[?tx :db/txInstant ?ms ?tx true]]");
|
|
assert_matches!(conn.datoms(),
|
|
"[[100 :db.schema/version 1]
|
|
[101 :db.schema/version 2]
|
|
[200 :db.schema/attribute 100]
|
|
[200 :db.schema/attribute 101]]");
|
|
|
|
// Test replacing existing :db.cardinality/one elements.
|
|
assert_transact!(conn, "[[:db/add 100 :db.schema/version 11]
|
|
[:db/add 101 :db.schema/version 22]]");
|
|
assert_matches!(conn.last_transaction(),
|
|
"[[100 :db.schema/version 1 ?tx false]
|
|
[100 :db.schema/version 11 ?tx true]
|
|
[101 :db.schema/version 2 ?tx false]
|
|
[101 :db.schema/version 22 ?tx true]
|
|
[?tx :db/txInstant ?ms ?tx true]]");
|
|
assert_matches!(conn.datoms(),
|
|
"[[100 :db.schema/version 11]
|
|
[101 :db.schema/version 22]
|
|
[200 :db.schema/attribute 100]
|
|
[200 :db.schema/attribute 101]]");
|
|
|
|
|
|
// Test that asserting existing :db.cardinality/one elements doesn't change the store.
|
|
assert_transact!(conn, "[[:db/add 100 :db.schema/version 11]
|
|
[:db/add 101 :db.schema/version 22]]");
|
|
assert_matches!(conn.last_transaction(),
|
|
"[[?tx :db/txInstant ?ms ?tx true]]");
|
|
assert_matches!(conn.datoms(),
|
|
"[[100 :db.schema/version 11]
|
|
[101 :db.schema/version 22]
|
|
[200 :db.schema/attribute 100]
|
|
[200 :db.schema/attribute 101]]");
|
|
|
|
|
|
// Test that asserting existing :db.cardinality/many elements doesn't change the store.
|
|
assert_transact!(conn, "[[:db/add 200 :db.schema/attribute 100]
|
|
[:db/add 200 :db.schema/attribute 101]]");
|
|
assert_matches!(conn.last_transaction(),
|
|
"[[?tx :db/txInstant ?ms ?tx true]]");
|
|
assert_matches!(conn.datoms(),
|
|
"[[100 :db.schema/version 11]
|
|
[101 :db.schema/version 22]
|
|
[200 :db.schema/attribute 100]
|
|
[200 :db.schema/attribute 101]]");
|
|
}
|
|
}
|