diff --git a/Cargo.toml b/Cargo.toml index 1e6101bf..5c00dcea 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,6 +30,7 @@ chrono = "0.4" error-chain = { git = "https://github.com/rnewman/error-chain", branch = "rnewman/sync" } lazy_static = "0.2" time = "0.1" +uuid = "0.5" [dependencies.rusqlite] version = "0.12" diff --git a/src/conn.rs b/src/conn.rs index 5e14259d..798d278d 100644 --- a/src/conn.rs +++ b/src/conn.rs @@ -48,6 +48,9 @@ use mentat_tx::entities::TempId; use mentat_tx_parser; +use mentat_tolstoy::Syncer; + +use uuid::Uuid; use cache::{ AttributeCacher, }; @@ -148,6 +151,10 @@ pub trait Queryable { where E: Into; } +pub trait Syncable { + fn sync(&mut self, server_uri: &String, user_uuid: &String) -> Result<()>; +} + /// Represents an in-progress, not yet committed, set of changes to the store. /// Call `commit` to commit your changes, or `rollback` to discard them. /// A transaction is held open until you do so. @@ -418,6 +425,13 @@ impl Queryable for Store { } } +impl Syncable for Store { + fn sync(&mut self, server_uri: &String, user_uuid: &String) -> Result<()> { + let uuid = Uuid::parse_str(&user_uuid)?; + Ok(Syncer::flow(&mut self.sqlite, server_uri, &uuid)?) + } +} + impl Conn { // Intentionally not public. fn new(partition_map: PartitionMap, schema: Schema) -> Conn { diff --git a/src/errors.rs b/src/errors.rs index 256f914f..35910e02 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -12,6 +12,8 @@ use rusqlite; +use uuid; + use std::collections::BTreeSet; use edn; @@ -26,6 +28,7 @@ use mentat_query_parser; use mentat_query_projector; use mentat_query_translator; use mentat_sql; +use mentat_tolstoy; use mentat_tx_parser; error_chain! { @@ -36,6 +39,7 @@ error_chain! { foreign_links { EdnParseError(edn::ParseError); Rusqlite(rusqlite::Error); + UuidParseError(uuid::ParseError); } links { @@ -46,6 +50,7 @@ error_chain! { TranslatorError(mentat_query_translator::Error, mentat_query_translator::ErrorKind); SqlError(mentat_sql::Error, mentat_sql::ErrorKind); TxParseError(mentat_tx_parser::Error, mentat_tx_parser::ErrorKind); + SyncError(mentat_tolstoy::Error, mentat_tolstoy::ErrorKind); } errors { diff --git a/src/lib.rs b/src/lib.rs index abb6130f..79540f55 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -18,6 +18,8 @@ extern crate lazy_static; extern crate rusqlite; +extern crate uuid; + pub extern crate edn; extern crate mentat_core; extern crate mentat_db; @@ -27,6 +29,7 @@ extern crate mentat_query_parser; extern crate mentat_query_projector; extern crate mentat_query_translator; extern crate mentat_sql; +extern crate mentat_tolstoy; extern crate mentat_tx; extern crate mentat_tx_parser; @@ -114,6 +117,7 @@ pub use conn::{ InProgress, Metadata, Queryable, + Syncable, Store, }; diff --git a/tests/tolstoy.rs b/tests/tolstoy.rs index 4f36a276..a6e29fd8 100644 --- a/tests/tolstoy.rs +++ b/tests/tolstoy.rs @@ -100,7 +100,7 @@ fn test_reader() { // Don't inspect the bootstrap transaction, but we'd like to see it's there. let mut receiver = TxCountingReceiver::new(); assert_eq!(false, receiver.is_done); - Processor::process(&db_tx, &mut receiver).expect("processor"); + Processor::process(&db_tx, None, &mut receiver).expect("processor"); assert_eq!(true, receiver.is_done); assert_eq!(1, receiver.tx_count); } @@ -112,16 +112,19 @@ fn test_reader() { ]"#).expect("successful transaction").tempids; let numba_entity_id = ids.get("s").unwrap(); + let mut bootstrap_tx = None; { let db_tx = c.transaction().expect("db tx"); // Expect to see one more transaction of four parts (one for tx datom itself). let mut receiver = TestingReceiver::new(); - Processor::process(&db_tx, &mut receiver).expect("processor"); + Processor::process(&db_tx, None, &mut receiver).expect("processor"); println!("{:#?}", receiver); assert_eq!(2, receiver.txes.keys().count()); assert_tx_datoms_count(&receiver, 1, 4); + + bootstrap_tx = Some(*receiver.txes.keys().nth(0).expect("bootstrap tx")); } let ids = conn.transact(&mut c, r#"[ @@ -134,13 +137,15 @@ fn test_reader() { // Expect to see a single two part transaction let mut receiver = TestingReceiver::new(); - Processor::process(&db_tx, &mut receiver).expect("processor"); - assert_eq!(3, receiver.txes.keys().count()); - assert_tx_datoms_count(&receiver, 2, 2); + // Note that we're asking for the bootstrap tx to be skipped by the processor. + Processor::process(&db_tx, bootstrap_tx, &mut receiver).expect("processor"); + + assert_eq!(2, receiver.txes.keys().count()); + assert_tx_datoms_count(&receiver, 1, 2); // Inspect the transaction part. - let tx_id = receiver.txes.keys().nth(2).expect("tx"); + let tx_id = receiver.txes.keys().nth(1).expect("tx"); let datoms = receiver.txes.get(tx_id).expect("datoms"); let part = &datoms[0]; diff --git a/tolstoy/Cargo.toml b/tolstoy/Cargo.toml index 2d0bbb3f..0d5fdc4e 100644 --- a/tolstoy/Cargo.toml +++ b/tolstoy/Cargo.toml @@ -7,6 +7,7 @@ authors = ["Grisha Kruglov "] [dependencies] futures = "0.1" hyper = "0.11" +hyper-tls = "0.1.2" tokio-core = "0.1" serde = "1.0" serde_json = "1.0" diff --git a/tolstoy/src/errors.rs b/tolstoy/src/errors.rs index 74b6a469..de5dfdba 100644 --- a/tolstoy/src/errors.rs +++ b/tolstoy/src/errors.rs @@ -59,9 +59,14 @@ error_chain! { display("encountered more than one metadata value for key: {}", k) } - UploadingProcessorUnfinished { - description("Uploading Tx processor couldn't finish") - display("Uploading Tx processor couldn't finish") + TxProcessorUnfinished { + description("Tx processor couldn't finish") + display("Tx processor couldn't finish") + } + + BadServerResponse(s: String) { + description("Received bad response from the server") + display("Received bad response from the server: {}", s) } } } diff --git a/tolstoy/src/lib.rs b/tolstoy/src/lib.rs index 68234519..ce20aeed 100644 --- a/tolstoy/src/lib.rs +++ b/tolstoy/src/lib.rs @@ -21,6 +21,7 @@ extern crate lazy_static; extern crate serde_derive; extern crate hyper; +extern crate hyper_tls; extern crate tokio_core; extern crate futures; extern crate serde; @@ -37,3 +38,10 @@ pub mod tx_processor; pub mod errors; pub mod syncer; pub mod tx_mapper; +pub use syncer::Syncer; +pub use errors::{ + Error, + ErrorKind, + Result, + ResultExt, +}; diff --git a/tolstoy/src/syncer.rs b/tolstoy/src/syncer.rs index ff5281f2..e423cf32 100644 --- a/tolstoy/src/syncer.rs +++ b/tolstoy/src/syncer.rs @@ -9,16 +9,16 @@ // specific language governing permissions and limitations under the License. use std; -use std::str::FromStr; use std::collections::HashMap; use futures::{future, Future, Stream}; use hyper; -use hyper::Client; +use hyper_tls; use hyper::{Method, Request, StatusCode, Error as HyperError}; use hyper::header::{ContentType}; use rusqlite; -use serde_cbor; +// TODO: +// use serde_cbor; use serde_json; use tokio_core::reactor::Core; use uuid::Uuid; @@ -26,6 +26,7 @@ use uuid::Uuid; use mentat_core::Entid; use metadata::SyncMetadataClient; use metadata::HeadTrackable; +use schema::ensure_current_version; use errors::{ ErrorKind, @@ -40,11 +41,58 @@ use tx_processor::{ use tx_mapper::TxMapper; -static API_VERSION: &str = "0.1"; -static BASE_URL: &str = "https://mentat.dev.lcip.org/mentatsync/"; +// TODO it would be nice to be able to pass +// in a logger into Syncer::flow; would allow for a "debug mode" +// and getting useful logs out of clients. +// Below is some debug Android-friendly logging: + +// use std::os::raw::c_char; +// use std::os::raw::c_int; +// use std::ffi::CString; +// pub const ANDROID_LOG_DEBUG: i32 = 3; +// extern { pub fn __android_log_write(prio: c_int, tag: *const c_char, text: *const c_char) -> c_int; } + +pub fn d(message: &str) { + println!("d: {}", message); + // let message = CString::new(message).unwrap(); + // let message = message.as_ptr(); + // let tag = CString::new("RustyToodle").unwrap(); + // let tag = tag.as_ptr(); + // unsafe { __android_log_write(ANDROID_LOG_DEBUG, tag, message) }; +} pub struct Syncer {} +// TODO this is sub-optimal, we don't need to walk the table +// to query the last thing in it w/ an index on tx!! +// but it's the hammer at hand! +struct InquiringTxReceiver { + pub last_tx: Option, + pub is_done: bool, +} + +impl InquiringTxReceiver { + fn new() -> InquiringTxReceiver { + InquiringTxReceiver { + last_tx: None, + is_done: false, + } + } +} + +impl TxReceiver for InquiringTxReceiver { + fn tx(&mut self, tx_id: Entid, _datoms: &mut T) -> Result<()> + where T: Iterator { + self.last_tx = Some(tx_id); + Ok(()) + } + + fn done(&mut self) -> Result<()> { + self.is_done = true; + Ok(()) + } +} + struct UploadingTxReceiver<'c> { pub tx_temp_uuids: HashMap, pub is_done: bool, @@ -66,7 +114,7 @@ impl<'c> UploadingTxReceiver<'c> { } impl<'c> TxReceiver for UploadingTxReceiver<'c> { - fn tx(&mut self, tx_id: Entid, d: &mut T) -> Result<()> + fn tx(&mut self, tx_id: Entid, datoms: &mut T) -> Result<()> where T: Iterator { // Yes, we generate a new UUID for a given Tx, even if we might // already have one mapped locally. Pre-existing local mapping will @@ -82,10 +130,14 @@ impl<'c> TxReceiver for UploadingTxReceiver<'c> { // TODO separate bits of network work should be combined into single 'future' // Upload all chunks. - for datom in d { + for datom in datoms { let datom_uuid = Uuid::new_v4(); tx_chunks.push(datom_uuid); - self.remote_client.put_chunk(&datom_uuid, serde_cbor::to_vec(&datom)?)? + d(&format!("putting chunk: {:?}, {:?}", &datom_uuid, &datom)); + // TODO switch over to CBOR once we're past debugging stuff. + // let cbor_val = serde_cbor::to_value(&datom)?; + // self.remote_client.put_chunk(&datom_uuid, &serde_cbor::ser::to_vec_sd(&cbor_val)?)?; + self.remote_client.put_chunk(&datom_uuid, &serde_json::to_string(&datom)?)?; } // Upload tx. @@ -95,12 +147,19 @@ impl<'c> TxReceiver for UploadingTxReceiver<'c> { // Comes at a cost of possibly increasing racing against other clients. match self.rolling_temp_head { Some(parent) => { + d(&format!("putting transaction: {:?}, {:?}, {:?}", &tx_uuid, &parent, &tx_chunks)); self.remote_client.put_transaction(&tx_uuid, &parent, &tx_chunks)?; - self.rolling_temp_head = Some(tx_uuid.clone()); + }, - None => self.remote_client.put_transaction(&tx_uuid, self.remote_head, &tx_chunks)? + None => { + d(&format!("putting transaction: {:?}, {:?}, {:?}", &tx_uuid, &self.remote_head, &tx_chunks)); + self.remote_client.put_transaction(&tx_uuid, self.remote_head, &tx_chunks)?; + } } + d(&format!("updating rolling head: {:?}", tx_uuid)); + self.rolling_temp_head = Some(tx_uuid.clone()); + Ok(()) } @@ -111,40 +170,11 @@ impl<'c> TxReceiver for UploadingTxReceiver<'c> { } impl Syncer { - pub fn flow(sqlite: &mut rusqlite::Connection, username: String) -> Result<()> { - // Sketch of an upload flow: - // get remote head - // compare with local head - // if the same: - // - upload any local chunks, transactions - // - move server remote head - // - move local remote head - - // TODO configure this sync with some auth data - let remote_client = RemoteClient::new(BASE_URL.into(), username); - - let mut db_tx = sqlite.transaction()?; - - let remote_head = remote_client.get_head()?; - let locally_known_remote_head = SyncMetadataClient::remote_head(&db_tx)?; - - // TODO it's possible that we've successfully advanced remote head previously, - // but failed to advance our own local head. If that's the case, and we can recognize it, - // our sync becomes much cheaper. - - // Don't know how to download, merge, resolve conflicts, etc yet. - if locally_known_remote_head != remote_head { - bail!(ErrorKind::NotYetImplemented( - format!("Can't yet sync against changed server. Local head {:?}, remote head {:?}", locally_known_remote_head, remote_head) - )); - } - - // Local and remote heads agree. - // In theory, it should be safe to upload our stuff now. - let mut uploader = UploadingTxReceiver::new(&remote_client, &remote_head); - Processor::process(&db_tx, &mut uploader)?; + fn upload_ours(db_tx: &mut rusqlite::Transaction, from_tx: Option, remote_client: &RemoteClient, remote_head: &Uuid) -> Result<()> { + let mut uploader = UploadingTxReceiver::new(remote_client, remote_head); + Processor::process(db_tx, from_tx, &mut uploader)?; if !uploader.is_done { - bail!(ErrorKind::UploadingProcessorUnfinished); + bail!(ErrorKind::TxProcessorUnfinished); } // Last tx uuid uploaded by the tx receiver. // It's going to be our new head. @@ -155,20 +185,97 @@ impl Syncer { // On succes: // - persist local mappings from the receiver // - update our local "remote head". - TxMapper::set_bulk(&mut db_tx, &uploader.tx_temp_uuids)?; - SyncMetadataClient::set_remote_head(&db_tx, &last_tx_uploaded)?; - - // Commit everything: tx->uuid mappings and the new HEAD. We're synced! - db_tx.commit()?; + TxMapper::set_bulk(db_tx, &uploader.tx_temp_uuids)?; + SyncMetadataClient::set_remote_head(db_tx, &last_tx_uploaded)?; } Ok(()) } + + pub fn flow(sqlite: &mut rusqlite::Connection, server_uri: &String, user_uuid: &Uuid) -> Result<()> { + d(&format!("sync flowing")); + + ensure_current_version(sqlite)?; + + // TODO configure this sync with some auth data + let remote_client = RemoteClient::new(server_uri.clone(), user_uuid.clone()); + let mut db_tx = sqlite.transaction()?; + + let remote_head = remote_client.get_head()?; + d(&format!("remote head {:?}", remote_head)); + + let locally_known_remote_head = SyncMetadataClient::remote_head(&db_tx)?; + d(&format!("local head {:?}", locally_known_remote_head)); + + // Local head: latest transaction that we have in the store, + // but with one caveat: its tx might will not be mapped if it's + // never been synced successfully. + // In other words: if latest tx isn't mapped, then HEAD moved + // since last sync and server needs to be updated. + let mut inquiring_tx_receiver = InquiringTxReceiver::new(); + // TODO don't just start from the beginning... but then again, we should do this + // without walking the table at all, and use the tx index. + Processor::process(&db_tx, None, &mut inquiring_tx_receiver)?; + if !inquiring_tx_receiver.is_done { + bail!(ErrorKind::TxProcessorUnfinished); + } + let have_local_changes = match inquiring_tx_receiver.last_tx { + Some(tx) => { + match TxMapper::get(&db_tx, tx)? { + Some(_) => false, + None => true + } + }, + None => false + }; + + // Check if the server is empty - populate it. + if remote_head == Uuid::nil() { + d(&format!("empty server!")); + Syncer::upload_ours(&mut db_tx, None, &remote_client, &remote_head)?; + + // Check if the server is the same as us, and if our HEAD moved. + } else if locally_known_remote_head == remote_head { + d(&format!("server unchanged since last sync.")); + + if !have_local_changes { + d(&format!("local HEAD did not move. Nothing to do!")); + return Ok(()); + } + + d(&format!("local HEAD moved.")); + // TODO it's possible that we've successfully advanced remote head previously, + // but failed to advance our own local head. If that's the case, and we can recognize it, + // our sync becomes just bumping our local head. AFAICT below would currently fail. + if let Some(upload_from_tx) = TxMapper::get_tx_for_uuid(&db_tx, &locally_known_remote_head)? { + d(&format!("Fast-forwarding the server.")); + Syncer::upload_ours(&mut db_tx, Some(upload_from_tx), &remote_client, &remote_head)?; + } else { + d(&format!("Unable to fast-forward the server; missing local tx mapping")); + bail!(ErrorKind::TxIncorrectlyMapped(0)); + } + + // We diverged from the server. + // We'll need to rebase/merge ourselves on top of it. + } else { + d(&format!("server changed since last sync.")); + + bail!(ErrorKind::NotYetImplemented( + format!("Can't yet sync against changed server. Local head {:?}, remote head {:?}", locally_known_remote_head, remote_head) + )); + } + + // Commit everything, if there's anything to commit! + // Any new tx->uuid mappings and the new HEAD. We're synced! + db_tx.commit()?; + + Ok(()) + } } -#[derive(Serialize)] -struct SerializedHead<'a> { - head: &'a Uuid +#[derive(Serialize,Deserialize)] +struct SerializedHead { + head: Uuid } #[derive(Serialize)] @@ -179,43 +286,66 @@ struct SerializedTransaction<'a> { struct RemoteClient { base_uri: String, - user_id: String + user_uuid: Uuid } + impl RemoteClient { - fn new(base_uri: String, user_id: String) -> Self { + fn new(base_uri: String, user_uuid: Uuid) -> Self { RemoteClient { base_uri: base_uri, - user_id: user_id + user_uuid: user_uuid } } fn bound_base_uri(&self) -> String { // TODO escaping - format!("{}/{}/{}", self.base_uri, API_VERSION, self.user_id) + format!("{}/{}", self.base_uri, self.user_uuid) } fn get_uuid(&self, uri: String) -> Result { let mut core = Core::new()?; - let client = Client::new(&core.handle()); + let client = hyper::Client::configure() + .connector(hyper_tls::HttpsConnector::new(4, &core.handle()).unwrap()) + .build(&core.handle()); + // let client = hyper::Client::new(&core.handle()); + + d(&format!("client")); let uri = uri.parse()?; - let get = client.get(uri).and_then(|res| { - res.body().concat2() + + d(&format!("parsed uri {:?}", uri)); + + let work = client.get(uri).and_then(|res| { + println!("Response: {}", res.status()); + + res.body().concat2().and_then(move |body| { + let head_json: SerializedHead = serde_json::from_slice(&body).map_err(|e| { + std::io::Error::new(std::io::ErrorKind::Other, e) + })?; + Ok(head_json) + }) }); - let got = core.run(get)?; - Ok(Uuid::from_str(std::str::from_utf8(&got)?)?) + d(&format!("running...")); + + let head_json = core.run(work)?; + d(&format!("got head: {:?}", &head_json.head)); + Ok(head_json.head) } fn put(&self, uri: String, payload: T, expected: StatusCode) -> Result<()> - where hyper::Body: std::convert::From, - T: { + where hyper::Body: std::convert::From, { let mut core = Core::new()?; - let client = Client::new(&core.handle()); + let client = hyper::Client::configure() + .connector(hyper_tls::HttpsConnector::new(4, &core.handle()).unwrap()) + .build(&core.handle()); + // let client = hyper::Client::new(&core.handle()); let uri = uri.parse()?; + d(&format!("PUT {:?}", uri)); + let mut req = Request::new(Method::Put, uri); req.headers_mut().set(ContentType::json()); req.set_body(payload); @@ -224,9 +354,9 @@ impl RemoteClient { let status_code = res.status(); if status_code != expected { + d(&format!("bad put response: {:?}", status_code)); future::err(HyperError::Status) } else { - // body will be empty... future::ok(()) } }); @@ -244,6 +374,7 @@ impl RemoteClient { let uri = format!("{}/transactions/{}", self.bound_base_uri(), transaction_uuid); let json = serde_json::to_string(&transaction)?; + d(&format!("serialized transaction: {:?}", json)); self.put(uri, json, StatusCode::Created) } @@ -255,32 +386,33 @@ impl RemoteClient { fn put_head(&self, uuid: &Uuid) -> Result<()> { // {"head": uuid} let head = SerializedHead { - head: uuid + head: uuid.clone() }; let uri = format!("{}/head", self.bound_base_uri()); - let json = serde_json::to_string(&head)?; + d(&format!("serialized head: {:?}", json)); self.put(uri, json, StatusCode::NoContent) } - fn put_chunk(&self, chunk_uuid: &Uuid, payload: Vec) -> Result<()> { + fn put_chunk(&self, chunk_uuid: &Uuid, payload: &String) -> Result<()> { let uri = format!("{}/chunks/{}", self.bound_base_uri(), chunk_uuid); - self.put(uri, payload, StatusCode::Created) + d(&format!("serialized chunk: {:?}", payload)); + // TODO don't want to clone every datom! + self.put(uri, payload.clone(), StatusCode::Created) } } #[cfg(test)] mod tests { use super::*; - - fn test_remote_client(uri: &str, user_id: &str) -> RemoteClient { - RemoteClient::new(uri.into(), user_id.into()) - } + use std::str::FromStr; #[test] fn test_remote_client_bound_uri() { - let remote_client = test_remote_client("https://example.com/api", "test-user"); - assert_eq!("https://example.com/api/0.1/test-user", remote_client.bound_base_uri()); + let user_uuid = Uuid::from_str(&"316ea470-ce35-4adf-9c61-e0de6e289c59").expect("uuid"); + let server_uri = String::from("https://example.com/api/0.1"); + let remote_client = RemoteClient::new(server_uri, user_uuid); + assert_eq!("https://example.com/api/0.1/316ea470-ce35-4adf-9c61-e0de6e289c59", remote_client.bound_base_uri()); } } diff --git a/tolstoy/src/tx_mapper.rs b/tolstoy/src/tx_mapper.rs index 570a1cab..a5a84405 100644 --- a/tolstoy/src/tx_mapper.rs +++ b/tolstoy/src/tx_mapper.rs @@ -46,7 +46,25 @@ impl TxMapper { } } - fn get(db_tx: &mut rusqlite::Transaction, tx: Entid) -> Result> { + pub fn get_tx_for_uuid(db_tx: &rusqlite::Transaction, uuid: &Uuid) -> Result> { + let mut stmt = db_tx.prepare_cached( + "SELECT tx FROM tolstoy_tu WHERE uuid = ?" + )?; + + let uuid_bytes = uuid.as_bytes().to_vec(); + let results = stmt.query_map(&[&uuid_bytes], |r| r.get(0))?; + + let mut txs = vec![]; + txs.extend(results); + if txs.len() == 0 { + return Ok(None); + } else if txs.len() > 1 { + bail!(ErrorKind::TxIncorrectlyMapped(txs.len())); + } + Ok(Some(txs.remove(0)?)) + } + + pub fn get(db_tx: &rusqlite::Transaction, tx: Entid) -> Result> { let mut stmt = db_tx.prepare_cached( "SELECT uuid FROM tolstoy_tu WHERE tx = ?" )?; @@ -54,7 +72,7 @@ impl TxMapper { let results = stmt.query_and_then(&[&tx], |r| -> Result{ let bytes: Vec = r.get(0); Uuid::from_bytes(bytes.as_slice()).map_err(|e| e.into()) - })?.peekable(); + })?; let mut uuids = vec![]; uuids.extend(results); diff --git a/tolstoy/src/tx_processor.rs b/tolstoy/src/tx_processor.rs index f9b08f61..ebc12cae 100644 --- a/tolstoy/src/tx_processor.rs +++ b/tolstoy/src/tx_processor.rs @@ -127,11 +127,14 @@ fn to_tx_part(row: &rusqlite::Row) -> Result { } impl Processor { - pub fn process(sqlite: &rusqlite::Transaction, receiver: &mut R) -> Result<()> + pub fn process(sqlite: &rusqlite::Transaction, from_tx: Option, receiver: &mut R) -> Result<()> where R: TxReceiver { - let mut stmt = sqlite.prepare( - "SELECT e, a, v, value_type_tag, tx, added FROM transactions ORDER BY tx" - )?; + let tx_filter = match from_tx { + Some(tx) => format!(" WHERE tx > {} ", tx), + None => format!("") + }; + let select_query = format!("SELECT e, a, v, value_type_tag, tx, added FROM transactions {} ORDER BY tx", tx_filter); + let mut stmt = sqlite.prepare(&select_query)?; let mut rows = stmt.query_and_then(&[], to_tx_part)?.peekable(); let mut current_tx = None; diff --git a/tools/cli/src/mentat_cli/command_parser.rs b/tools/cli/src/mentat_cli/command_parser.rs index 5cb64338..8a91bc31 100644 --- a/tools/cli/src/mentat_cli/command_parser.rs +++ b/tools/cli/src/mentat_cli/command_parser.rs @@ -45,6 +45,7 @@ pub static LONG_EXIT_COMMAND: &'static str = &"exit"; pub static SHORT_EXIT_COMMAND: &'static str = &"e"; pub static LONG_QUERY_EXPLAIN_COMMAND: &'static str = &"explain_query"; pub static SHORT_QUERY_EXPLAIN_COMMAND: &'static str = &"eq"; +pub static SYNC_COMMAND: &'static str = &"sync"; #[derive(Clone, Debug, Eq, PartialEq)] pub enum Command { @@ -54,6 +55,7 @@ pub enum Command { Open(String), Query(String), Schema, + Sync(Vec), Timer(bool), Transact(String), QueryExplain(String), @@ -76,6 +78,7 @@ impl Command { &Command::Open(_) | &Command::Close | &Command::Exit | + &Command::Sync(_) | &Command::Schema => true } } @@ -90,6 +93,7 @@ impl Command { &Command::Open(_) | &Command::Close | &Command::Exit | + &Command::Sync(_) | &Command::Schema => false } } @@ -120,6 +124,9 @@ impl Command { &Command::Schema => { format!(".{}", SCHEMA_COMMAND) }, + &Command::Sync(ref args) => { + format!(".{} {:?}", SYNC_COMMAND, args) + }, &Command::QueryExplain(ref args) => { format!(".{} {}", LONG_QUERY_EXPLAIN_COMMAND, args) }, @@ -179,6 +186,19 @@ pub fn command(s: &str) -> Result { Ok(Command::Schema) }); + let sync_parser = string(SYNC_COMMAND) + .with(spaces()) + .with(arguments()) + .map(|args| { + if args.len() < 1 { + bail!(cli::ErrorKind::CommandParse("Missing required argument".to_string())); + } + if args.len() > 2 { + bail!(cli::ErrorKind::CommandParse(format!("Unrecognized argument {:?}", args[2]))); + } + Ok(Command::Sync(args.clone())) + }); + let exit_parser = try(string(LONG_EXIT_COMMAND)).or(try(string(SHORT_EXIT_COMMAND))) .with(no_arg_parser()) .map(|args| { @@ -216,7 +236,7 @@ pub fn command(s: &str) -> Result { }); spaces() .skip(token('.')) - .with(choice::<[&mut Parser>; 9], _> + .with(choice::<[&mut Parser>; 10], _> ([&mut try(help_parser), &mut try(timer_parser), &mut try(open_parser), @@ -225,6 +245,7 @@ pub fn command(s: &str) -> Result { &mut try(exit_parser), &mut try(query_parser), &mut try(schema_parser), + &mut try(sync_parser), &mut try(transact_parser)])) .parse(s) .unwrap_or((Err(cli::ErrorKind::CommandParse(format!("Invalid command {:?}", s)).into()), "")).0 @@ -315,6 +336,19 @@ mod tests { } } + #[test] + fn test_sync_parser_path_arg() { + let input = ".sync https://example.com/api/ 316ea470-ce35-4adf-9c61-e0de6e289c59"; + let cmd = command(&input).expect("Expected open command"); + match cmd { + Command::Sync(args) => { + assert_eq!(args[0], "https://example.com/api/".to_string()); + assert_eq!(args[1], "316ea470-ce35-4adf-9c61-e0de6e289c59".to_string()); + }, + _ => assert!(false) + } + } + #[test] fn test_open_parser_file_arg() { let input = ".open my.db"; diff --git a/tools/cli/src/mentat_cli/repl.rs b/tools/cli/src/mentat_cli/repl.rs index cfb77c6b..d23c8ce7 100644 --- a/tools/cli/src/mentat_cli/repl.rs +++ b/tools/cli/src/mentat_cli/repl.rs @@ -30,6 +30,7 @@ use mentat::{ QueryOutput, QueryResults, Store, + Syncable, TxReport, TypedValue, }; @@ -41,6 +42,7 @@ use command_parser::{ LONG_QUERY_COMMAND, SHORT_QUERY_COMMAND, SCHEMA_COMMAND, + SYNC_COMMAND, LONG_TRANSACT_COMMAND, SHORT_TRANSACT_COMMAND, LONG_EXIT_COMMAND, @@ -67,6 +69,7 @@ lazy_static! { map.insert(LONG_QUERY_COMMAND, "Execute a query against the current open database."); map.insert(SHORT_QUERY_COMMAND, "Shortcut for `.query`. Execute a query against the current open database."); map.insert(SCHEMA_COMMAND, "Output the schema for the current open database."); + map.insert(SYNC_COMMAND, "Synchronize database against a Sync Server URL for a provided user UUID."); map.insert(LONG_TRANSACT_COMMAND, "Execute a transact against the current open database."); map.insert(SHORT_TRANSACT_COMMAND, "Shortcut for `.transact`. Execute a transact against the current open database."); map.insert(LONG_QUERY_EXPLAIN_COMMAND, "Show the SQL and query plan that would be executed for a given query."); @@ -180,6 +183,12 @@ impl Repl { Err(e) => eprintln!("{}", e.to_string()), }; }, + Command::Sync(args) => { + match self.store.sync(&args[0], &args[1]) { + Ok(_) => println!("Synced!"), + Err(e) => eprintln!("{:?}", e) + }; + } Command::Close => self.close(), Command::Query(query) => self.execute_query(query), Command::QueryExplain(query) => self.explain_query(query),