From 84f29676e8758a50cd48741679dfff77efb5d318 Mon Sep 17 00:00:00 2001 From: Grisha Kruglov Date: Fri, 9 Feb 2018 09:55:19 -0800 Subject: [PATCH] "Unchanged server" uploader flow (#543) r=rnewman * Remove unused struct from tx_processor * Derive serialize & deserialize for TypedValue * First pass of uploader flow + feedback --- core/Cargo.toml | 7 +- core/src/lib.rs | 6 +- edn/Cargo.toml | 5 + edn/src/lib.rs | 7 + edn/src/symbols.rs | 1 + query-translator/src/translate.rs | 2 +- tests/tolstoy.rs | 63 ++++--- tolstoy/Cargo.toml | 1 + tolstoy/src/errors.rs | 28 ++- tolstoy/src/lib.rs | 11 +- tolstoy/src/metadata.rs | 54 +++--- tolstoy/src/schema.rs | 2 +- tolstoy/src/syncer.rs | 286 ++++++++++++++++++++++++++++++ tolstoy/src/tx_mapper.rs | 110 ++++++++++++ tolstoy/src/tx_processor.rs | 22 +-- 15 files changed, 528 insertions(+), 77 deletions(-) create mode 100644 tolstoy/src/syncer.rs create mode 100644 tolstoy/src/tx_mapper.rs diff --git a/core/Cargo.toml b/core/Cargo.toml index 02ecfef1..5b768226 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -4,12 +4,15 @@ version = "0.0.1" workspace = ".." [dependencies] -chrono = "0.4" +chrono = { version = "0.4", features = ["serde"] } enum-set = { git = "https://github.com/rnewman/enum-set" } lazy_static = "0.2" num = "0.1" -ordered-float = "0.5" +ordered-float = { version = "0.5", features = ["serde"] } uuid = "0.5" +serde = { version = "1.0", features = ["rc"] } +serde_derive = "1.0" [dependencies.edn] path = "../edn" +features = ["serde_support"] diff --git a/core/src/lib.rs b/core/src/lib.rs index 695fbc2c..107c49c1 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -12,10 +12,14 @@ extern crate chrono; extern crate enum_set; extern crate ordered_float; extern crate uuid; +extern crate serde; #[macro_use] extern crate lazy_static; +#[macro_use] +extern crate serde_derive; + extern crate edn; pub mod values; @@ -175,7 +179,7 @@ impl fmt::Display for ValueType { /// Represents a Mentat value in a particular value set. // TODO: expand to include :db.type/{instant,url,uuid}. // TODO: BigInt? -#[derive(Clone,Debug,Eq,Hash,Ord,PartialOrd,PartialEq)] +#[derive(Clone,Debug,Eq,Hash,Ord,PartialOrd,PartialEq,Serialize,Deserialize)] pub enum TypedValue { Ref(Entid), Boolean(bool), diff --git a/edn/Cargo.toml b/edn/Cargo.toml index 758eaae6..99e1a57e 100644 --- a/edn/Cargo.toml +++ b/edn/Cargo.toml @@ -17,6 +17,11 @@ num = "0.1" ordered-float = "0.5" pretty = "0.2" uuid = "0.5" +serde = { version = "1.0", optional = true } +serde_derive = { version = "1.0", optional = true } + +[features] +serde_support = ["serde", "serde_derive"] [build-dependencies] peg = "0.5" diff --git a/edn/src/lib.rs b/edn/src/lib.rs index 37041c4c..53f461d7 100644 --- a/edn/src/lib.rs +++ b/edn/src/lib.rs @@ -15,6 +15,13 @@ extern crate ordered_float; extern crate pretty; extern crate uuid; +#[cfg(feature = "serde_support")] +extern crate serde; + +#[cfg(feature = "serde_support")] +#[macro_use] +extern crate serde_derive; + pub mod symbols; pub mod types; pub mod pretty_print; diff --git a/edn/src/symbols.rs b/edn/src/symbols.rs index cd14ab99..032c4741 100644 --- a/edn/src/symbols.rs +++ b/edn/src/symbols.rs @@ -70,6 +70,7 @@ pub struct NamespacedSymbol { pub struct Keyword(pub String); #[derive(Clone,Debug,Eq,Hash,Ord,PartialOrd,PartialEq)] +#[cfg_attr(feature = "serde_support", derive(Serialize, Deserialize))] pub struct NamespacedKeyword { // We derive PartialOrd, which implements a lexicographic order based // on the order of members, so put namespace first. diff --git a/query-translator/src/translate.rs b/query-translator/src/translate.rs index c183b0ee..75a51f00 100644 --- a/query-translator/src/translate.rs +++ b/query-translator/src/translate.rs @@ -139,7 +139,7 @@ fn possible_affinities(value_types: ValueTypeSet) -> HashMap Result; - fn set_remote_head(&mut self, uuid: &Uuid) -> Result<()>; +pub trait HeadTrackable { + fn remote_head(tx: &rusqlite::Transaction) -> Result; + fn set_remote_head(tx: &rusqlite::Transaction, uuid: &Uuid) -> Result<()>; } -struct SyncMetadataClient { - conn: rusqlite::Connection -} - -impl SyncMetadataClient { - fn new(conn: rusqlite::Connection) -> Self { - SyncMetadataClient { - conn: conn - } - } -} +pub struct SyncMetadataClient {} impl HeadTrackable for SyncMetadataClient { - fn remote_head(&self) -> Result { - self.conn.query_row( + fn remote_head(tx: &rusqlite::Transaction) -> Result { + tx.query_row( "SELECT value FROM tolstoy_metadata WHERE key = ?", &[&schema::REMOTE_HEAD_KEY], |r| { let bytes: Vec = r.get(0); @@ -44,11 +37,14 @@ impl HeadTrackable for SyncMetadataClient { )?.map_err(|e| e.into()) } - fn set_remote_head(&mut self, uuid: &Uuid) -> Result<()> { - let tx = self.conn.transaction()?; + fn set_remote_head(tx: &rusqlite::Transaction, uuid: &Uuid) -> Result<()> { let uuid_bytes = uuid.as_bytes().to_vec(); - tx.execute("UPDATE tolstoy_metadata SET value = ? WHERE key = ?", &[&uuid_bytes, &schema::REMOTE_HEAD_KEY])?; - tx.commit().map_err(|e| e.into()) + let updated = tx.execute("UPDATE tolstoy_metadata SET value = ? WHERE key = ?", + &[&uuid_bytes, &schema::REMOTE_HEAD_KEY])?; + if updated != 1 { + bail!(ErrorKind::DuplicateMetadata(schema::REMOTE_HEAD_KEY.into())); + } + Ok(()) } } @@ -58,17 +54,17 @@ mod tests { #[test] fn test_get_remote_head_default() { - let conn = schema::tests::setup_conn(); - let metadata_client: SyncMetadataClient = SyncMetadataClient::new(conn); - assert_eq!(Uuid::nil(), metadata_client.remote_head().expect("fetch succeeded")); + let mut conn = schema::tests::setup_conn(); + let tx = conn.transaction().expect("db tx"); + assert_eq!(Uuid::nil(), SyncMetadataClient::remote_head(&tx).expect("fetch succeeded")); } #[test] fn test_set_and_get_remote_head() { - let conn = schema::tests::setup_conn(); + let mut conn = schema::tests::setup_conn(); let uuid = Uuid::new_v4(); - let mut metadata_client: SyncMetadataClient = SyncMetadataClient::new(conn); - metadata_client.set_remote_head(&uuid).expect("update succeeded"); - assert_eq!(uuid, metadata_client.remote_head().expect("fetch succeeded")); + let tx = conn.transaction().expect("db tx"); + SyncMetadataClient::set_remote_head(&tx, &uuid).expect("update succeeded"); + assert_eq!(uuid, SyncMetadataClient::remote_head(&tx).expect("fetch succeeded")); } } diff --git a/tolstoy/src/schema.rs b/tolstoy/src/schema.rs index c6075b3f..9bd175ff 100644 --- a/tolstoy/src/schema.rs +++ b/tolstoy/src/schema.rs @@ -1,4 +1,4 @@ -// Copyright 2016 Mozilla +// Copyright 2018 Mozilla // // Licensed under the Apache License, Version 2.0 (the "License"); you may not use // this file except in compliance with the License. You may obtain a copy of the diff --git a/tolstoy/src/syncer.rs b/tolstoy/src/syncer.rs new file mode 100644 index 00000000..ff5281f2 --- /dev/null +++ b/tolstoy/src/syncer.rs @@ -0,0 +1,286 @@ +// Copyright 2018 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +use std; +use std::str::FromStr; +use std::collections::HashMap; + +use futures::{future, Future, Stream}; +use hyper; +use hyper::Client; +use hyper::{Method, Request, StatusCode, Error as HyperError}; +use hyper::header::{ContentType}; +use rusqlite; +use serde_cbor; +use serde_json; +use tokio_core::reactor::Core; +use uuid::Uuid; + +use mentat_core::Entid; +use metadata::SyncMetadataClient; +use metadata::HeadTrackable; + +use errors::{ + ErrorKind, + Result, +}; + +use tx_processor::{ + Processor, + TxReceiver, + TxPart, +}; + +use tx_mapper::TxMapper; + +static API_VERSION: &str = "0.1"; +static BASE_URL: &str = "https://mentat.dev.lcip.org/mentatsync/"; + +pub struct Syncer {} + +struct UploadingTxReceiver<'c> { + pub tx_temp_uuids: HashMap, + pub is_done: bool, + remote_client: &'c RemoteClient, + remote_head: &'c Uuid, + rolling_temp_head: Option, +} + +impl<'c> UploadingTxReceiver<'c> { + fn new(client: &'c RemoteClient, remote_head: &'c Uuid) -> UploadingTxReceiver<'c> { + UploadingTxReceiver { + tx_temp_uuids: HashMap::new(), + remote_client: client, + remote_head: remote_head, + rolling_temp_head: None, + is_done: false + } + } +} + +impl<'c> TxReceiver for UploadingTxReceiver<'c> { + fn tx(&mut self, tx_id: Entid, d: &mut T) -> Result<()> + where T: Iterator { + // Yes, we generate a new UUID for a given Tx, even if we might + // already have one mapped locally. Pre-existing local mapping will + // be replaced if this sync succeeds entirely. + // If we're seeing this tx again, it implies that previous attempt + // to sync didn't update our local head. Something went wrong last time, + // and it's unwise to try to re-use these remote tx mappings. + // We just leave garbage txs to be GC'd on the server. + let tx_uuid = Uuid::new_v4(); + self.tx_temp_uuids.insert(tx_id, tx_uuid); + let mut tx_chunks = vec![]; + + // TODO separate bits of network work should be combined into single 'future' + + // Upload all chunks. + for datom in d { + let datom_uuid = Uuid::new_v4(); + tx_chunks.push(datom_uuid); + self.remote_client.put_chunk(&datom_uuid, serde_cbor::to_vec(&datom)?)? + } + + // Upload tx. + // NB: At this point, we may choose to update remote & local heads. + // Depending on how much we're uploading, and how unreliable our connection + // is, this might be a good thing to do to ensure we make at least some progress. + // Comes at a cost of possibly increasing racing against other clients. + match self.rolling_temp_head { + Some(parent) => { + self.remote_client.put_transaction(&tx_uuid, &parent, &tx_chunks)?; + self.rolling_temp_head = Some(tx_uuid.clone()); + }, + None => self.remote_client.put_transaction(&tx_uuid, self.remote_head, &tx_chunks)? + } + + Ok(()) + } + + fn done(&mut self) -> Result<()> { + self.is_done = true; + Ok(()) + } +} + +impl Syncer { + pub fn flow(sqlite: &mut rusqlite::Connection, username: String) -> Result<()> { + // Sketch of an upload flow: + // get remote head + // compare with local head + // if the same: + // - upload any local chunks, transactions + // - move server remote head + // - move local remote head + + // TODO configure this sync with some auth data + let remote_client = RemoteClient::new(BASE_URL.into(), username); + + let mut db_tx = sqlite.transaction()?; + + let remote_head = remote_client.get_head()?; + let locally_known_remote_head = SyncMetadataClient::remote_head(&db_tx)?; + + // TODO it's possible that we've successfully advanced remote head previously, + // but failed to advance our own local head. If that's the case, and we can recognize it, + // our sync becomes much cheaper. + + // Don't know how to download, merge, resolve conflicts, etc yet. + if locally_known_remote_head != remote_head { + bail!(ErrorKind::NotYetImplemented( + format!("Can't yet sync against changed server. Local head {:?}, remote head {:?}", locally_known_remote_head, remote_head) + )); + } + + // Local and remote heads agree. + // In theory, it should be safe to upload our stuff now. + let mut uploader = UploadingTxReceiver::new(&remote_client, &remote_head); + Processor::process(&db_tx, &mut uploader)?; + if !uploader.is_done { + bail!(ErrorKind::UploadingProcessorUnfinished); + } + // Last tx uuid uploaded by the tx receiver. + // It's going to be our new head. + if let Some(last_tx_uploaded) = uploader.rolling_temp_head { + // Upload remote head. + remote_client.put_head(&last_tx_uploaded)?; + + // On succes: + // - persist local mappings from the receiver + // - update our local "remote head". + TxMapper::set_bulk(&mut db_tx, &uploader.tx_temp_uuids)?; + SyncMetadataClient::set_remote_head(&db_tx, &last_tx_uploaded)?; + + // Commit everything: tx->uuid mappings and the new HEAD. We're synced! + db_tx.commit()?; + } + + Ok(()) + } +} + +#[derive(Serialize)] +struct SerializedHead<'a> { + head: &'a Uuid +} + +#[derive(Serialize)] +struct SerializedTransaction<'a> { + parent: &'a Uuid, + chunks: &'a Vec +} + +struct RemoteClient { + base_uri: String, + user_id: String +} + +impl RemoteClient { + fn new(base_uri: String, user_id: String) -> Self { + RemoteClient { + base_uri: base_uri, + user_id: user_id + } + } + + fn bound_base_uri(&self) -> String { + // TODO escaping + format!("{}/{}/{}", self.base_uri, API_VERSION, self.user_id) + } + + fn get_uuid(&self, uri: String) -> Result { + let mut core = Core::new()?; + let client = Client::new(&core.handle()); + + let uri = uri.parse()?; + let get = client.get(uri).and_then(|res| { + res.body().concat2() + }); + + let got = core.run(get)?; + Ok(Uuid::from_str(std::str::from_utf8(&got)?)?) + } + + fn put(&self, uri: String, payload: T, expected: StatusCode) -> Result<()> + where hyper::Body: std::convert::From, + T: { + let mut core = Core::new()?; + let client = Client::new(&core.handle()); + + let uri = uri.parse()?; + + let mut req = Request::new(Method::Put, uri); + req.headers_mut().set(ContentType::json()); + req.set_body(payload); + + let put = client.request(req).and_then(|res| { + let status_code = res.status(); + + if status_code != expected { + future::err(HyperError::Status) + } else { + // body will be empty... + future::ok(()) + } + }); + + core.run(put)?; + Ok(()) + } + + fn put_transaction(&self, transaction_uuid: &Uuid, parent_uuid: &Uuid, chunks: &Vec) -> Result<()> { + // {"parent": uuid, "chunks": [chunk1, chunk2...]} + let transaction = SerializedTransaction { + parent: parent_uuid, + chunks: chunks + }; + + let uri = format!("{}/transactions/{}", self.bound_base_uri(), transaction_uuid); + let json = serde_json::to_string(&transaction)?; + self.put(uri, json, StatusCode::Created) + } + + fn get_head(&self) -> Result { + let uri = format!("{}/head", self.bound_base_uri()); + self.get_uuid(uri) + } + + fn put_head(&self, uuid: &Uuid) -> Result<()> { + // {"head": uuid} + let head = SerializedHead { + head: uuid + }; + + let uri = format!("{}/head", self.bound_base_uri()); + + let json = serde_json::to_string(&head)?; + self.put(uri, json, StatusCode::NoContent) + } + + fn put_chunk(&self, chunk_uuid: &Uuid, payload: Vec) -> Result<()> { + let uri = format!("{}/chunks/{}", self.bound_base_uri(), chunk_uuid); + self.put(uri, payload, StatusCode::Created) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn test_remote_client(uri: &str, user_id: &str) -> RemoteClient { + RemoteClient::new(uri.into(), user_id.into()) + } + + #[test] + fn test_remote_client_bound_uri() { + let remote_client = test_remote_client("https://example.com/api", "test-user"); + assert_eq!("https://example.com/api/0.1/test-user", remote_client.bound_base_uri()); + } +} diff --git a/tolstoy/src/tx_mapper.rs b/tolstoy/src/tx_mapper.rs new file mode 100644 index 00000000..570a1cab --- /dev/null +++ b/tolstoy/src/tx_mapper.rs @@ -0,0 +1,110 @@ +// Copyright 2018 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +use std::collections::HashMap; +use rusqlite; +use uuid::Uuid; +use mentat_core::Entid; + +use errors::{ + ErrorKind, + Result, +}; + +// Exposes a tx<->uuid mapping interface. +pub struct TxMapper {} + +impl TxMapper { + pub fn set_bulk(db_tx: &mut rusqlite::Transaction, tx_uuid_map: &HashMap) -> Result<()> { + let mut stmt = db_tx.prepare_cached( + "INSERT OR REPLACE INTO tolstoy_tu (tx, uuid) VALUES (?, ?)" + )?; + for (tx, uuid) in tx_uuid_map.iter() { + let uuid_bytes = uuid.as_bytes().to_vec(); + stmt.execute(&[tx, &uuid_bytes])?; + } + Ok(()) + } + + // TODO for when we're downloading, right? + pub fn get_or_set_uuid_for_tx(db_tx: &mut rusqlite::Transaction, tx: Entid) -> Result { + match TxMapper::get(db_tx, tx)? { + Some(uuid) => Ok(uuid), + None => { + let uuid = Uuid::new_v4(); + let uuid_bytes = uuid.as_bytes().to_vec(); + db_tx.execute("INSERT INTO tolstoy_tu (tx, uuid) VALUES (?, ?)", &[&tx, &uuid_bytes])?; + return Ok(uuid); + } + } + } + + fn get(db_tx: &mut rusqlite::Transaction, tx: Entid) -> Result> { + let mut stmt = db_tx.prepare_cached( + "SELECT uuid FROM tolstoy_tu WHERE tx = ?" + )?; + + let results = stmt.query_and_then(&[&tx], |r| -> Result{ + let bytes: Vec = r.get(0); + Uuid::from_bytes(bytes.as_slice()).map_err(|e| e.into()) + })?.peekable(); + + let mut uuids = vec![]; + uuids.extend(results); + if uuids.len() == 0 { + return Ok(None); + } else if uuids.len() > 1 { + bail!(ErrorKind::TxIncorrectlyMapped(uuids.len())); + } + Ok(Some(uuids.remove(0)?)) + } +} + +#[cfg(test)] +pub mod tests { + use super::*; + use schema; + + #[test] + fn test_getters() { + let mut conn = schema::tests::setup_conn(); + let mut tx = conn.transaction().expect("db tx"); + assert_eq!(None, TxMapper::get(&mut tx, 1).expect("success")); + let set_uuid = TxMapper::get_or_set_uuid_for_tx(&mut tx, 1).expect("success"); + assert_eq!(Some(set_uuid), TxMapper::get(&mut tx, 1).expect("success")); + } + + #[test] + fn test_bulk_setter() { + let mut conn = schema::tests::setup_conn(); + let mut tx = conn.transaction().expect("db tx"); + let mut map = HashMap::new(); + + TxMapper::set_bulk(&mut tx, &map).expect("empty map success"); + + let uuid1 = Uuid::new_v4(); + let uuid2 = Uuid::new_v4(); + map.insert(1, uuid1); + map.insert(2, uuid2); + + TxMapper::set_bulk(&mut tx, &map).expect("map success"); + assert_eq!(Some(uuid1), TxMapper::get(&mut tx, 1).expect("success")); + assert_eq!(Some(uuid2), TxMapper::get(&mut tx, 2).expect("success")); + + // Now let's replace one of mappings. + map.remove(&1); + let new_uuid2 = Uuid::new_v4(); + map.insert(2, new_uuid2); + + TxMapper::set_bulk(&mut tx, &map).expect("map success"); + assert_eq!(Some(uuid1), TxMapper::get(&mut tx, 1).expect("success")); + assert_eq!(Some(new_uuid2), TxMapper::get(&mut tx, 2).expect("success")); + } +} diff --git a/tolstoy/src/tx_processor.rs b/tolstoy/src/tx_processor.rs index 5065c674..f9b08f61 100644 --- a/tolstoy/src/tx_processor.rs +++ b/tolstoy/src/tx_processor.rs @@ -1,4 +1,4 @@ -// Copyright 2016 Mozilla +// Copyright 2018 Mozilla // // Licensed under the Apache License, Version 2.0 (the "License"); you may not use // this file except in compliance with the License. You may obtain a copy of the @@ -24,7 +24,7 @@ use mentat_core::{ TypedValue, }; -#[derive(Debug, Clone)] +#[derive(Debug,Clone,Serialize,Deserialize)] pub struct TxPart { pub e: Entid, pub a: Entid, @@ -33,12 +33,6 @@ pub struct TxPart { pub added: bool, } -#[derive(Debug, Clone)] -pub struct Tx { - pub tx: Entid, - pub tx_instant: TypedValue, -} - pub trait TxReceiver { fn tx(&mut self, tx_id: Entid, d: &mut T) -> Result<()> where T: Iterator; @@ -47,17 +41,17 @@ pub trait TxReceiver { pub struct Processor {} -pub struct DatomsIterator<'conn, 't, T> +pub struct DatomsIterator<'dbtx, 't, T> where T: Sized + Iterator> + 't { at_first: bool, at_last: bool, - first: &'conn TxPart, + first: &'dbtx TxPart, rows: &'t mut Peekable, } -impl<'conn, 't, T> DatomsIterator<'conn, 't, T> +impl<'dbtx, 't, T> DatomsIterator<'dbtx, 't, T> where T: Sized + Iterator> + 't { - fn new(first: &'conn TxPart, rows: &'t mut Peekable) -> DatomsIterator<'conn, 't, T> + fn new(first: &'dbtx TxPart, rows: &'t mut Peekable) -> DatomsIterator<'dbtx, 't, T> { DatomsIterator { at_first: true, @@ -68,7 +62,7 @@ where T: Sized + Iterator> + 't { } } -impl<'conn, 't, T> Iterator for DatomsIterator<'conn, 't, T> +impl<'dbtx, 't, T> Iterator for DatomsIterator<'dbtx, 't, T> where T: Sized + Iterator> + 't { type Item = TxPart; @@ -133,7 +127,7 @@ fn to_tx_part(row: &rusqlite::Row) -> Result { } impl Processor { - pub fn process(sqlite: &rusqlite::Connection, receiver: &mut R) -> Result<()> + pub fn process(sqlite: &rusqlite::Transaction, receiver: &mut R) -> Result<()> where R: TxReceiver { let mut stmt = sqlite.prepare( "SELECT e, a, v, value_type_tag, tx, added FROM transactions ORDER BY tx"