// Copyright 2018 Mozilla // // Licensed under the Apache License, Version 2.0 (the "License"); you may not use // this file except in compliance with the License. You may obtain a copy of the // License at http://www.apache.org/licenses/LICENSE-2.0 // Unless required by applicable law or agreed to in writing, software distributed // under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR // CONDITIONS OF ANY KIND, either express or implied. See the License for the // specific language governing permissions and limitations under the License. use std; use std::collections::HashMap; use futures::{future, Future, Stream}; use hyper; // TODO: enable TLS support; hurdle is cross-compiling openssl for Android. // See https://github.com/mozilla/mentat/issues/569 // use hyper_tls; use hyper::{Method, Request, StatusCode, Error as HyperError}; use hyper::header::{ContentType}; use rusqlite; // TODO: https://github.com/mozilla/mentat/issues/570 // use serde_cbor; use serde_json; use tokio_core::reactor::Core; use uuid::Uuid; use mentat_core::Entid; use metadata::SyncMetadataClient; use metadata::HeadTrackable; use schema::ensure_current_version; use errors::{ TolstoyError, Result, }; use tx_processor::{ Processor, TxReceiver, TxPart, }; use tx_mapper::TxMapper; // TODO it would be nice to be able to pass // in a logger into Syncer::flow; would allow for a "debug mode" // and getting useful logs out of clients. // See https://github.com/mozilla/mentat/issues/571 // Below is some debug Android-friendly logging: // use std::os::raw::c_char; // use std::os::raw::c_int; // use std::ffi::CString; // pub const ANDROID_LOG_DEBUG: i32 = 3; // extern { pub fn __android_log_write(prio: c_int, tag: *const c_char, text: *const c_char) -> c_int; } pub fn d(message: &str) { println!("d: {}", message); // let message = CString::new(message).unwrap(); // let message = message.as_ptr(); // let tag = CString::new("RustyToodle").unwrap(); // let tag = tag.as_ptr(); // unsafe { __android_log_write(ANDROID_LOG_DEBUG, tag, message) }; } pub struct Syncer {} // TODO this is sub-optimal, we don't need to walk the table // to query the last thing in it w/ an index on tx!! // but it's the hammer at hand! // See https://github.com/mozilla/mentat/issues/572 struct InquiringTxReceiver { pub last_tx: Option, pub is_done: bool, } impl InquiringTxReceiver { fn new() -> InquiringTxReceiver { InquiringTxReceiver { last_tx: None, is_done: false, } } } impl TxReceiver for InquiringTxReceiver { fn tx(&mut self, tx_id: Entid, _datoms: &mut T) -> Result<()> where T: Iterator { self.last_tx = Some(tx_id); Ok(()) } fn done(&mut self) -> Result<()> { self.is_done = true; Ok(()) } } struct UploadingTxReceiver<'c> { pub tx_temp_uuids: HashMap, pub is_done: bool, remote_client: &'c RemoteClient, remote_head: &'c Uuid, rolling_temp_head: Option, } impl<'c> UploadingTxReceiver<'c> { fn new(client: &'c RemoteClient, remote_head: &'c Uuid) -> UploadingTxReceiver<'c> { UploadingTxReceiver { tx_temp_uuids: HashMap::new(), remote_client: client, remote_head: remote_head, rolling_temp_head: None, is_done: false } } } impl<'c> TxReceiver for UploadingTxReceiver<'c> { fn tx(&mut self, tx_id: Entid, datoms: &mut T) -> Result<()> where T: Iterator { // Yes, we generate a new UUID for a given Tx, even if we might // already have one mapped locally. Pre-existing local mapping will // be replaced if this sync succeeds entirely. // If we're seeing this tx again, it implies that previous attempt // to sync didn't update our local head. Something went wrong last time, // and it's unwise to try to re-use these remote tx mappings. // We just leave garbage txs to be GC'd on the server. let tx_uuid = Uuid::new_v4(); self.tx_temp_uuids.insert(tx_id, tx_uuid); let mut tx_chunks = vec![]; // TODO separate bits of network work should be combined into single 'future' // Upload all chunks. for datom in datoms { let datom_uuid = Uuid::new_v4(); tx_chunks.push(datom_uuid); d(&format!("putting chunk: {:?}, {:?}", &datom_uuid, &datom)); // TODO switch over to CBOR once we're past debugging stuff. // See https://github.com/mozilla/mentat/issues/570 // let cbor_val = serde_cbor::to_value(&datom)?; // self.remote_client.put_chunk(&datom_uuid, &serde_cbor::ser::to_vec_sd(&cbor_val)?)?; self.remote_client.put_chunk(&datom_uuid, &serde_json::to_string(&datom)?)?; } // Upload tx. // NB: At this point, we may choose to update remote & local heads. // Depending on how much we're uploading, and how unreliable our connection // is, this might be a good thing to do to ensure we make at least some progress. // Comes at a cost of possibly increasing racing against other clients. match self.rolling_temp_head { Some(parent) => { d(&format!("putting transaction: {:?}, {:?}, {:?}", &tx_uuid, &parent, &tx_chunks)); self.remote_client.put_transaction(&tx_uuid, &parent, &tx_chunks)?; }, None => { d(&format!("putting transaction: {:?}, {:?}, {:?}", &tx_uuid, &self.remote_head, &tx_chunks)); self.remote_client.put_transaction(&tx_uuid, self.remote_head, &tx_chunks)?; } } d(&format!("updating rolling head: {:?}", tx_uuid)); self.rolling_temp_head = Some(tx_uuid.clone()); Ok(()) } fn done(&mut self) -> Result<()> { self.is_done = true; Ok(()) } } impl Syncer { fn upload_ours(db_tx: &mut rusqlite::Transaction, from_tx: Option, remote_client: &RemoteClient, remote_head: &Uuid) -> Result<()> { let mut uploader = UploadingTxReceiver::new(remote_client, remote_head); Processor::process(db_tx, from_tx, &mut uploader)?; if !uploader.is_done { bail!(TolstoyError::TxProcessorUnfinished); } // Last tx uuid uploaded by the tx receiver. // It's going to be our new head. if let Some(last_tx_uploaded) = uploader.rolling_temp_head { // Upload remote head. remote_client.put_head(&last_tx_uploaded)?; // On succes: // - persist local mappings from the receiver // - update our local "remote head". TxMapper::set_bulk(db_tx, &uploader.tx_temp_uuids)?; SyncMetadataClient::set_remote_head(db_tx, &last_tx_uploaded)?; } Ok(()) } pub fn flow(sqlite: &mut rusqlite::Connection, server_uri: &String, user_uuid: &Uuid) -> Result<()> { d(&format!("sync flowing")); ensure_current_version(sqlite)?; // TODO configure this sync with some auth data let remote_client = RemoteClient::new(server_uri.clone(), user_uuid.clone()); let mut db_tx = sqlite.transaction()?; let remote_head = remote_client.get_head()?; d(&format!("remote head {:?}", remote_head)); let locally_known_remote_head = SyncMetadataClient::remote_head(&db_tx)?; d(&format!("local head {:?}", locally_known_remote_head)); // Local head: latest transaction that we have in the store, // but with one caveat: its tx might will not be mapped if it's // never been synced successfully. // In other words: if latest tx isn't mapped, then HEAD moved // since last sync and server needs to be updated. let mut inquiring_tx_receiver = InquiringTxReceiver::new(); // TODO don't just start from the beginning... but then again, we should do this // without walking the table at all, and use the tx index. Processor::process(&db_tx, None, &mut inquiring_tx_receiver)?; if !inquiring_tx_receiver.is_done { bail!(TolstoyError::TxProcessorUnfinished); } let have_local_changes = match inquiring_tx_receiver.last_tx { Some(tx) => { match TxMapper::get(&db_tx, tx)? { Some(_) => false, None => true } }, None => false }; // Check if the server is empty - populate it. if remote_head == Uuid::nil() { d(&format!("empty server!")); Syncer::upload_ours(&mut db_tx, None, &remote_client, &remote_head)?; // Check if the server is the same as us, and if our HEAD moved. } else if locally_known_remote_head == remote_head { d(&format!("server unchanged since last sync.")); if !have_local_changes { d(&format!("local HEAD did not move. Nothing to do!")); return Ok(()); } d(&format!("local HEAD moved.")); // TODO it's possible that we've successfully advanced remote head previously, // but failed to advance our own local head. If that's the case, and we can recognize it, // our sync becomes just bumping our local head. AFAICT below would currently fail. if let Some(upload_from_tx) = TxMapper::get_tx_for_uuid(&db_tx, &locally_known_remote_head)? { d(&format!("Fast-forwarding the server.")); Syncer::upload_ours(&mut db_tx, Some(upload_from_tx), &remote_client, &remote_head)?; } else { d(&format!("Unable to fast-forward the server; missing local tx mapping")); bail!(TolstoyError::TxIncorrectlyMapped(0)); } // We diverged from the server. // We'll need to rebase/merge ourselves on top of it. } else { d(&format!("server changed since last sync.")); bail!(TolstoyError::NotYetImplemented( format!("Can't yet sync against changed server. Local head {:?}, remote head {:?}", locally_known_remote_head, remote_head) )); } // Commit everything, if there's anything to commit! // Any new tx->uuid mappings and the new HEAD. We're synced! db_tx.commit()?; Ok(()) } } #[derive(Serialize,Deserialize)] struct SerializedHead { head: Uuid } #[derive(Serialize)] struct SerializedTransaction<'a> { parent: &'a Uuid, chunks: &'a Vec } struct RemoteClient { base_uri: String, user_uuid: Uuid } impl RemoteClient { fn new(base_uri: String, user_uuid: Uuid) -> Self { RemoteClient { base_uri: base_uri, user_uuid: user_uuid } } fn bound_base_uri(&self) -> String { // TODO escaping format!("{}/{}", self.base_uri, self.user_uuid) } fn get_uuid(&self, uri: String) -> Result { let mut core = Core::new()?; // TODO enable TLS, see https://github.com/mozilla/mentat/issues/569 // let client = hyper::Client::configure() // .connector(hyper_tls::HttpsConnector::new(4, &core.handle()).unwrap()) // .build(&core.handle()); let client = hyper::Client::new(&core.handle()); d(&format!("client")); let uri = uri.parse()?; d(&format!("parsed uri {:?}", uri)); let work = client.get(uri).and_then(|res| { println!("Response: {}", res.status()); res.body().concat2().and_then(move |body| { let head_json: SerializedHead = serde_json::from_slice(&body).map_err(|e| { std::io::Error::new(std::io::ErrorKind::Other, e) })?; Ok(head_json) }) }); d(&format!("running...")); let head_json = core.run(work)?; d(&format!("got head: {:?}", &head_json.head)); Ok(head_json.head) } fn put(&self, uri: String, payload: T, expected: StatusCode) -> Result<()> where hyper::Body: std::convert::From, { let mut core = Core::new()?; // TODO enable TLS, see https://github.com/mozilla/mentat/issues/569 // let client = hyper::Client::configure() // .connector(hyper_tls::HttpsConnector::new(4, &core.handle()).unwrap()) // .build(&core.handle()); let client = hyper::Client::new(&core.handle()); let uri = uri.parse()?; d(&format!("PUT {:?}", uri)); let mut req = Request::new(Method::Put, uri); req.headers_mut().set(ContentType::json()); req.set_body(payload); let put = client.request(req).and_then(|res| { let status_code = res.status(); if status_code != expected { d(&format!("bad put response: {:?}", status_code)); future::err(HyperError::Status) } else { future::ok(()) } }); core.run(put)?; Ok(()) } fn put_transaction(&self, transaction_uuid: &Uuid, parent_uuid: &Uuid, chunks: &Vec) -> Result<()> { // {"parent": uuid, "chunks": [chunk1, chunk2...]} let transaction = SerializedTransaction { parent: parent_uuid, chunks: chunks }; let uri = format!("{}/transactions/{}", self.bound_base_uri(), transaction_uuid); let json = serde_json::to_string(&transaction)?; d(&format!("serialized transaction: {:?}", json)); self.put(uri, json, StatusCode::Created) } fn get_head(&self) -> Result { let uri = format!("{}/head", self.bound_base_uri()); self.get_uuid(uri) } fn put_head(&self, uuid: &Uuid) -> Result<()> { // {"head": uuid} let head = SerializedHead { head: uuid.clone() }; let uri = format!("{}/head", self.bound_base_uri()); let json = serde_json::to_string(&head)?; d(&format!("serialized head: {:?}", json)); self.put(uri, json, StatusCode::NoContent) } fn put_chunk(&self, chunk_uuid: &Uuid, payload: &String) -> Result<()> { let uri = format!("{}/chunks/{}", self.bound_base_uri(), chunk_uuid); d(&format!("serialized chunk: {:?}", payload)); // TODO don't want to clone every datom! self.put(uri, payload.clone(), StatusCode::Created) } } #[cfg(test)] mod tests { use super::*; use std::borrow::Borrow; use std::str::FromStr; use edn; use mentat_db::debug::{TestConn}; #[test] fn test_remote_client_bound_uri() { let user_uuid = Uuid::from_str(&"316ea470-ce35-4adf-9c61-e0de6e289c59").expect("uuid"); let server_uri = String::from("https://example.com/api/0.1"); let remote_client = RemoteClient::new(server_uri, user_uuid); assert_eq!("https://example.com/api/0.1/316ea470-ce35-4adf-9c61-e0de6e289c59", remote_client.bound_base_uri()); } #[test] fn test_add() { let mut conn = TestConn::default(); // Test inserting :db.cardinality/one elements. assert_transact!(conn, "[[:db/add 100 :db.schema/version 1] [:db/add 101 :db.schema/version 2]]"); assert_matches!(conn.last_transaction(), "[[100 :db.schema/version 1 ?tx true] [101 :db.schema/version 2 ?tx true] [?tx :db/txInstant ?ms ?tx true]]"); assert_matches!(conn.datoms(), "[[100 :db.schema/version 1] [101 :db.schema/version 2]]"); // Test inserting :db.cardinality/many elements. assert_transact!(conn, "[[:db/add 200 :db.schema/attribute 100] [:db/add 200 :db.schema/attribute 101]]"); assert_matches!(conn.last_transaction(), "[[200 :db.schema/attribute 100 ?tx true] [200 :db.schema/attribute 101 ?tx true] [?tx :db/txInstant ?ms ?tx true]]"); assert_matches!(conn.datoms(), "[[100 :db.schema/version 1] [101 :db.schema/version 2] [200 :db.schema/attribute 100] [200 :db.schema/attribute 101]]"); // Test replacing existing :db.cardinality/one elements. assert_transact!(conn, "[[:db/add 100 :db.schema/version 11] [:db/add 101 :db.schema/version 22]]"); assert_matches!(conn.last_transaction(), "[[100 :db.schema/version 1 ?tx false] [100 :db.schema/version 11 ?tx true] [101 :db.schema/version 2 ?tx false] [101 :db.schema/version 22 ?tx true] [?tx :db/txInstant ?ms ?tx true]]"); assert_matches!(conn.datoms(), "[[100 :db.schema/version 11] [101 :db.schema/version 22] [200 :db.schema/attribute 100] [200 :db.schema/attribute 101]]"); // Test that asserting existing :db.cardinality/one elements doesn't change the store. assert_transact!(conn, "[[:db/add 100 :db.schema/version 11] [:db/add 101 :db.schema/version 22]]"); assert_matches!(conn.last_transaction(), "[[?tx :db/txInstant ?ms ?tx true]]"); assert_matches!(conn.datoms(), "[[100 :db.schema/version 11] [101 :db.schema/version 22] [200 :db.schema/attribute 100] [200 :db.schema/attribute 101]]"); // Test that asserting existing :db.cardinality/many elements doesn't change the store. assert_transact!(conn, "[[:db/add 200 :db.schema/attribute 100] [:db/add 200 :db.schema/attribute 101]]"); assert_matches!(conn.last_transaction(), "[[?tx :db/txInstant ?ms ?tx true]]"); assert_matches!(conn.datoms(), "[[100 :db.schema/version 11] [101 :db.schema/version 22] [200 :db.schema/attribute 100] [200 :db.schema/attribute 101]]"); } }