Revised uploader flow (battle-tested); CLI support for sync (#557) r=rnewman

This commit is contained in:
Grisha Kruglov 2018-02-16 01:44:28 -08:00 committed by GitHub
parent c84db82575
commit 93e5dff9c8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 329 additions and 90 deletions

View file

@ -30,6 +30,7 @@ chrono = "0.4"
error-chain = { git = "https://github.com/rnewman/error-chain", branch = "rnewman/sync" }
lazy_static = "0.2"
time = "0.1"
uuid = "0.5"
[dependencies.rusqlite]
version = "0.12"

View file

@ -48,6 +48,9 @@ use mentat_tx::entities::TempId;
use mentat_tx_parser;
use mentat_tolstoy::Syncer;
use uuid::Uuid;
use cache::{
AttributeCacher,
};
@ -148,6 +151,10 @@ pub trait Queryable {
where E: Into<Entid>;
}
pub trait Syncable {
fn sync(&mut self, server_uri: &String, user_uuid: &String) -> Result<()>;
}
/// Represents an in-progress, not yet committed, set of changes to the store.
/// Call `commit` to commit your changes, or `rollback` to discard them.
/// A transaction is held open until you do so.
@ -418,6 +425,13 @@ impl Queryable for Store {
}
}
impl Syncable for Store {
fn sync(&mut self, server_uri: &String, user_uuid: &String) -> Result<()> {
let uuid = Uuid::parse_str(&user_uuid)?;
Ok(Syncer::flow(&mut self.sqlite, server_uri, &uuid)?)
}
}
impl Conn {
// Intentionally not public.
fn new(partition_map: PartitionMap, schema: Schema) -> Conn {

View file

@ -12,6 +12,8 @@
use rusqlite;
use uuid;
use std::collections::BTreeSet;
use edn;
@ -26,6 +28,7 @@ use mentat_query_parser;
use mentat_query_projector;
use mentat_query_translator;
use mentat_sql;
use mentat_tolstoy;
use mentat_tx_parser;
error_chain! {
@ -36,6 +39,7 @@ error_chain! {
foreign_links {
EdnParseError(edn::ParseError);
Rusqlite(rusqlite::Error);
UuidParseError(uuid::ParseError);
}
links {
@ -46,6 +50,7 @@ error_chain! {
TranslatorError(mentat_query_translator::Error, mentat_query_translator::ErrorKind);
SqlError(mentat_sql::Error, mentat_sql::ErrorKind);
TxParseError(mentat_tx_parser::Error, mentat_tx_parser::ErrorKind);
SyncError(mentat_tolstoy::Error, mentat_tolstoy::ErrorKind);
}
errors {

View file

@ -18,6 +18,8 @@ extern crate lazy_static;
extern crate rusqlite;
extern crate uuid;
pub extern crate edn;
extern crate mentat_core;
extern crate mentat_db;
@ -27,6 +29,7 @@ extern crate mentat_query_parser;
extern crate mentat_query_projector;
extern crate mentat_query_translator;
extern crate mentat_sql;
extern crate mentat_tolstoy;
extern crate mentat_tx;
extern crate mentat_tx_parser;
@ -114,6 +117,7 @@ pub use conn::{
InProgress,
Metadata,
Queryable,
Syncable,
Store,
};

View file

@ -100,7 +100,7 @@ fn test_reader() {
// Don't inspect the bootstrap transaction, but we'd like to see it's there.
let mut receiver = TxCountingReceiver::new();
assert_eq!(false, receiver.is_done);
Processor::process(&db_tx, &mut receiver).expect("processor");
Processor::process(&db_tx, None, &mut receiver).expect("processor");
assert_eq!(true, receiver.is_done);
assert_eq!(1, receiver.tx_count);
}
@ -112,16 +112,19 @@ fn test_reader() {
]"#).expect("successful transaction").tempids;
let numba_entity_id = ids.get("s").unwrap();
let mut bootstrap_tx = None;
{
let db_tx = c.transaction().expect("db tx");
// Expect to see one more transaction of four parts (one for tx datom itself).
let mut receiver = TestingReceiver::new();
Processor::process(&db_tx, &mut receiver).expect("processor");
Processor::process(&db_tx, None, &mut receiver).expect("processor");
println!("{:#?}", receiver);
assert_eq!(2, receiver.txes.keys().count());
assert_tx_datoms_count(&receiver, 1, 4);
bootstrap_tx = Some(*receiver.txes.keys().nth(0).expect("bootstrap tx"));
}
let ids = conn.transact(&mut c, r#"[
@ -134,13 +137,15 @@ fn test_reader() {
// Expect to see a single two part transaction
let mut receiver = TestingReceiver::new();
Processor::process(&db_tx, &mut receiver).expect("processor");
assert_eq!(3, receiver.txes.keys().count());
assert_tx_datoms_count(&receiver, 2, 2);
// Note that we're asking for the bootstrap tx to be skipped by the processor.
Processor::process(&db_tx, bootstrap_tx, &mut receiver).expect("processor");
assert_eq!(2, receiver.txes.keys().count());
assert_tx_datoms_count(&receiver, 1, 2);
// Inspect the transaction part.
let tx_id = receiver.txes.keys().nth(2).expect("tx");
let tx_id = receiver.txes.keys().nth(1).expect("tx");
let datoms = receiver.txes.get(tx_id).expect("datoms");
let part = &datoms[0];

View file

@ -7,6 +7,7 @@ authors = ["Grisha Kruglov <gkruglov@mozilla.com>"]
[dependencies]
futures = "0.1"
hyper = "0.11"
hyper-tls = "0.1.2"
tokio-core = "0.1"
serde = "1.0"
serde_json = "1.0"

View file

@ -59,9 +59,14 @@ error_chain! {
display("encountered more than one metadata value for key: {}", k)
}
UploadingProcessorUnfinished {
description("Uploading Tx processor couldn't finish")
display("Uploading Tx processor couldn't finish")
TxProcessorUnfinished {
description("Tx processor couldn't finish")
display("Tx processor couldn't finish")
}
BadServerResponse(s: String) {
description("Received bad response from the server")
display("Received bad response from the server: {}", s)
}
}
}

View file

@ -21,6 +21,7 @@ extern crate lazy_static;
extern crate serde_derive;
extern crate hyper;
extern crate hyper_tls;
extern crate tokio_core;
extern crate futures;
extern crate serde;
@ -37,3 +38,10 @@ pub mod tx_processor;
pub mod errors;
pub mod syncer;
pub mod tx_mapper;
pub use syncer::Syncer;
pub use errors::{
Error,
ErrorKind,
Result,
ResultExt,
};

View file

@ -9,16 +9,16 @@
// specific language governing permissions and limitations under the License.
use std;
use std::str::FromStr;
use std::collections::HashMap;
use futures::{future, Future, Stream};
use hyper;
use hyper::Client;
use hyper_tls;
use hyper::{Method, Request, StatusCode, Error as HyperError};
use hyper::header::{ContentType};
use rusqlite;
use serde_cbor;
// TODO:
// use serde_cbor;
use serde_json;
use tokio_core::reactor::Core;
use uuid::Uuid;
@ -26,6 +26,7 @@ use uuid::Uuid;
use mentat_core::Entid;
use metadata::SyncMetadataClient;
use metadata::HeadTrackable;
use schema::ensure_current_version;
use errors::{
ErrorKind,
@ -40,11 +41,58 @@ use tx_processor::{
use tx_mapper::TxMapper;
static API_VERSION: &str = "0.1";
static BASE_URL: &str = "https://mentat.dev.lcip.org/mentatsync/";
// TODO it would be nice to be able to pass
// in a logger into Syncer::flow; would allow for a "debug mode"
// and getting useful logs out of clients.
// Below is some debug Android-friendly logging:
// use std::os::raw::c_char;
// use std::os::raw::c_int;
// use std::ffi::CString;
// pub const ANDROID_LOG_DEBUG: i32 = 3;
// extern { pub fn __android_log_write(prio: c_int, tag: *const c_char, text: *const c_char) -> c_int; }
pub fn d(message: &str) {
println!("d: {}", message);
// let message = CString::new(message).unwrap();
// let message = message.as_ptr();
// let tag = CString::new("RustyToodle").unwrap();
// let tag = tag.as_ptr();
// unsafe { __android_log_write(ANDROID_LOG_DEBUG, tag, message) };
}
pub struct Syncer {}
// TODO this is sub-optimal, we don't need to walk the table
// to query the last thing in it w/ an index on tx!!
// but it's the hammer at hand!
struct InquiringTxReceiver {
pub last_tx: Option<Entid>,
pub is_done: bool,
}
impl InquiringTxReceiver {
fn new() -> InquiringTxReceiver {
InquiringTxReceiver {
last_tx: None,
is_done: false,
}
}
}
impl TxReceiver for InquiringTxReceiver {
fn tx<T>(&mut self, tx_id: Entid, _datoms: &mut T) -> Result<()>
where T: Iterator<Item=TxPart> {
self.last_tx = Some(tx_id);
Ok(())
}
fn done(&mut self) -> Result<()> {
self.is_done = true;
Ok(())
}
}
struct UploadingTxReceiver<'c> {
pub tx_temp_uuids: HashMap<Entid, Uuid>,
pub is_done: bool,
@ -66,7 +114,7 @@ impl<'c> UploadingTxReceiver<'c> {
}
impl<'c> TxReceiver for UploadingTxReceiver<'c> {
fn tx<T>(&mut self, tx_id: Entid, d: &mut T) -> Result<()>
fn tx<T>(&mut self, tx_id: Entid, datoms: &mut T) -> Result<()>
where T: Iterator<Item=TxPart> {
// Yes, we generate a new UUID for a given Tx, even if we might
// already have one mapped locally. Pre-existing local mapping will
@ -82,10 +130,14 @@ impl<'c> TxReceiver for UploadingTxReceiver<'c> {
// TODO separate bits of network work should be combined into single 'future'
// Upload all chunks.
for datom in d {
for datom in datoms {
let datom_uuid = Uuid::new_v4();
tx_chunks.push(datom_uuid);
self.remote_client.put_chunk(&datom_uuid, serde_cbor::to_vec(&datom)?)?
d(&format!("putting chunk: {:?}, {:?}", &datom_uuid, &datom));
// TODO switch over to CBOR once we're past debugging stuff.
// let cbor_val = serde_cbor::to_value(&datom)?;
// self.remote_client.put_chunk(&datom_uuid, &serde_cbor::ser::to_vec_sd(&cbor_val)?)?;
self.remote_client.put_chunk(&datom_uuid, &serde_json::to_string(&datom)?)?;
}
// Upload tx.
@ -95,11 +147,18 @@ impl<'c> TxReceiver for UploadingTxReceiver<'c> {
// Comes at a cost of possibly increasing racing against other clients.
match self.rolling_temp_head {
Some(parent) => {
d(&format!("putting transaction: {:?}, {:?}, {:?}", &tx_uuid, &parent, &tx_chunks));
self.remote_client.put_transaction(&tx_uuid, &parent, &tx_chunks)?;
self.rolling_temp_head = Some(tx_uuid.clone());
},
None => self.remote_client.put_transaction(&tx_uuid, self.remote_head, &tx_chunks)?
None => {
d(&format!("putting transaction: {:?}, {:?}, {:?}", &tx_uuid, &self.remote_head, &tx_chunks));
self.remote_client.put_transaction(&tx_uuid, self.remote_head, &tx_chunks)?;
}
}
d(&format!("updating rolling head: {:?}", tx_uuid));
self.rolling_temp_head = Some(tx_uuid.clone());
Ok(())
}
@ -111,40 +170,11 @@ impl<'c> TxReceiver for UploadingTxReceiver<'c> {
}
impl Syncer {
pub fn flow(sqlite: &mut rusqlite::Connection, username: String) -> Result<()> {
// Sketch of an upload flow:
// get remote head
// compare with local head
// if the same:
// - upload any local chunks, transactions
// - move server remote head
// - move local remote head
// TODO configure this sync with some auth data
let remote_client = RemoteClient::new(BASE_URL.into(), username);
let mut db_tx = sqlite.transaction()?;
let remote_head = remote_client.get_head()?;
let locally_known_remote_head = SyncMetadataClient::remote_head(&db_tx)?;
// TODO it's possible that we've successfully advanced remote head previously,
// but failed to advance our own local head. If that's the case, and we can recognize it,
// our sync becomes much cheaper.
// Don't know how to download, merge, resolve conflicts, etc yet.
if locally_known_remote_head != remote_head {
bail!(ErrorKind::NotYetImplemented(
format!("Can't yet sync against changed server. Local head {:?}, remote head {:?}", locally_known_remote_head, remote_head)
));
}
// Local and remote heads agree.
// In theory, it should be safe to upload our stuff now.
let mut uploader = UploadingTxReceiver::new(&remote_client, &remote_head);
Processor::process(&db_tx, &mut uploader)?;
fn upload_ours(db_tx: &mut rusqlite::Transaction, from_tx: Option<Entid>, remote_client: &RemoteClient, remote_head: &Uuid) -> Result<()> {
let mut uploader = UploadingTxReceiver::new(remote_client, remote_head);
Processor::process(db_tx, from_tx, &mut uploader)?;
if !uploader.is_done {
bail!(ErrorKind::UploadingProcessorUnfinished);
bail!(ErrorKind::TxProcessorUnfinished);
}
// Last tx uuid uploaded by the tx receiver.
// It's going to be our new head.
@ -155,20 +185,97 @@ impl Syncer {
// On succes:
// - persist local mappings from the receiver
// - update our local "remote head".
TxMapper::set_bulk(&mut db_tx, &uploader.tx_temp_uuids)?;
SyncMetadataClient::set_remote_head(&db_tx, &last_tx_uploaded)?;
// Commit everything: tx->uuid mappings and the new HEAD. We're synced!
db_tx.commit()?;
TxMapper::set_bulk(db_tx, &uploader.tx_temp_uuids)?;
SyncMetadataClient::set_remote_head(db_tx, &last_tx_uploaded)?;
}
Ok(())
}
pub fn flow(sqlite: &mut rusqlite::Connection, server_uri: &String, user_uuid: &Uuid) -> Result<()> {
d(&format!("sync flowing"));
ensure_current_version(sqlite)?;
// TODO configure this sync with some auth data
let remote_client = RemoteClient::new(server_uri.clone(), user_uuid.clone());
let mut db_tx = sqlite.transaction()?;
let remote_head = remote_client.get_head()?;
d(&format!("remote head {:?}", remote_head));
let locally_known_remote_head = SyncMetadataClient::remote_head(&db_tx)?;
d(&format!("local head {:?}", locally_known_remote_head));
// Local head: latest transaction that we have in the store,
// but with one caveat: its tx might will not be mapped if it's
// never been synced successfully.
// In other words: if latest tx isn't mapped, then HEAD moved
// since last sync and server needs to be updated.
let mut inquiring_tx_receiver = InquiringTxReceiver::new();
// TODO don't just start from the beginning... but then again, we should do this
// without walking the table at all, and use the tx index.
Processor::process(&db_tx, None, &mut inquiring_tx_receiver)?;
if !inquiring_tx_receiver.is_done {
bail!(ErrorKind::TxProcessorUnfinished);
}
let have_local_changes = match inquiring_tx_receiver.last_tx {
Some(tx) => {
match TxMapper::get(&db_tx, tx)? {
Some(_) => false,
None => true
}
},
None => false
};
// Check if the server is empty - populate it.
if remote_head == Uuid::nil() {
d(&format!("empty server!"));
Syncer::upload_ours(&mut db_tx, None, &remote_client, &remote_head)?;
// Check if the server is the same as us, and if our HEAD moved.
} else if locally_known_remote_head == remote_head {
d(&format!("server unchanged since last sync."));
if !have_local_changes {
d(&format!("local HEAD did not move. Nothing to do!"));
return Ok(());
}
d(&format!("local HEAD moved."));
// TODO it's possible that we've successfully advanced remote head previously,
// but failed to advance our own local head. If that's the case, and we can recognize it,
// our sync becomes just bumping our local head. AFAICT below would currently fail.
if let Some(upload_from_tx) = TxMapper::get_tx_for_uuid(&db_tx, &locally_known_remote_head)? {
d(&format!("Fast-forwarding the server."));
Syncer::upload_ours(&mut db_tx, Some(upload_from_tx), &remote_client, &remote_head)?;
} else {
d(&format!("Unable to fast-forward the server; missing local tx mapping"));
bail!(ErrorKind::TxIncorrectlyMapped(0));
}
// We diverged from the server.
// We'll need to rebase/merge ourselves on top of it.
} else {
d(&format!("server changed since last sync."));
bail!(ErrorKind::NotYetImplemented(
format!("Can't yet sync against changed server. Local head {:?}, remote head {:?}", locally_known_remote_head, remote_head)
));
}
// Commit everything, if there's anything to commit!
// Any new tx->uuid mappings and the new HEAD. We're synced!
db_tx.commit()?;
Ok(())
}
}
#[derive(Serialize)]
struct SerializedHead<'a> {
head: &'a Uuid
#[derive(Serialize,Deserialize)]
struct SerializedHead {
head: Uuid
}
#[derive(Serialize)]
@ -179,43 +286,66 @@ struct SerializedTransaction<'a> {
struct RemoteClient {
base_uri: String,
user_id: String
user_uuid: Uuid
}
impl RemoteClient {
fn new(base_uri: String, user_id: String) -> Self {
fn new(base_uri: String, user_uuid: Uuid) -> Self {
RemoteClient {
base_uri: base_uri,
user_id: user_id
user_uuid: user_uuid
}
}
fn bound_base_uri(&self) -> String {
// TODO escaping
format!("{}/{}/{}", self.base_uri, API_VERSION, self.user_id)
format!("{}/{}", self.base_uri, self.user_uuid)
}
fn get_uuid(&self, uri: String) -> Result<Uuid> {
let mut core = Core::new()?;
let client = Client::new(&core.handle());
let client = hyper::Client::configure()
.connector(hyper_tls::HttpsConnector::new(4, &core.handle()).unwrap())
.build(&core.handle());
// let client = hyper::Client::new(&core.handle());
d(&format!("client"));
let uri = uri.parse()?;
let get = client.get(uri).and_then(|res| {
res.body().concat2()
d(&format!("parsed uri {:?}", uri));
let work = client.get(uri).and_then(|res| {
println!("Response: {}", res.status());
res.body().concat2().and_then(move |body| {
let head_json: SerializedHead = serde_json::from_slice(&body).map_err(|e| {
std::io::Error::new(std::io::ErrorKind::Other, e)
})?;
Ok(head_json)
})
});
let got = core.run(get)?;
Ok(Uuid::from_str(std::str::from_utf8(&got)?)?)
d(&format!("running..."));
let head_json = core.run(work)?;
d(&format!("got head: {:?}", &head_json.head));
Ok(head_json.head)
}
fn put<T>(&self, uri: String, payload: T, expected: StatusCode) -> Result<()>
where hyper::Body: std::convert::From<T>,
T: {
where hyper::Body: std::convert::From<T>, {
let mut core = Core::new()?;
let client = Client::new(&core.handle());
let client = hyper::Client::configure()
.connector(hyper_tls::HttpsConnector::new(4, &core.handle()).unwrap())
.build(&core.handle());
// let client = hyper::Client::new(&core.handle());
let uri = uri.parse()?;
d(&format!("PUT {:?}", uri));
let mut req = Request::new(Method::Put, uri);
req.headers_mut().set(ContentType::json());
req.set_body(payload);
@ -224,9 +354,9 @@ impl RemoteClient {
let status_code = res.status();
if status_code != expected {
d(&format!("bad put response: {:?}", status_code));
future::err(HyperError::Status)
} else {
// body will be empty...
future::ok(())
}
});
@ -244,6 +374,7 @@ impl RemoteClient {
let uri = format!("{}/transactions/{}", self.bound_base_uri(), transaction_uuid);
let json = serde_json::to_string(&transaction)?;
d(&format!("serialized transaction: {:?}", json));
self.put(uri, json, StatusCode::Created)
}
@ -255,32 +386,33 @@ impl RemoteClient {
fn put_head(&self, uuid: &Uuid) -> Result<()> {
// {"head": uuid}
let head = SerializedHead {
head: uuid
head: uuid.clone()
};
let uri = format!("{}/head", self.bound_base_uri());
let json = serde_json::to_string(&head)?;
d(&format!("serialized head: {:?}", json));
self.put(uri, json, StatusCode::NoContent)
}
fn put_chunk(&self, chunk_uuid: &Uuid, payload: Vec<u8>) -> Result<()> {
fn put_chunk(&self, chunk_uuid: &Uuid, payload: &String) -> Result<()> {
let uri = format!("{}/chunks/{}", self.bound_base_uri(), chunk_uuid);
self.put(uri, payload, StatusCode::Created)
d(&format!("serialized chunk: {:?}", payload));
// TODO don't want to clone every datom!
self.put(uri, payload.clone(), StatusCode::Created)
}
}
#[cfg(test)]
mod tests {
use super::*;
fn test_remote_client(uri: &str, user_id: &str) -> RemoteClient {
RemoteClient::new(uri.into(), user_id.into())
}
use std::str::FromStr;
#[test]
fn test_remote_client_bound_uri() {
let remote_client = test_remote_client("https://example.com/api", "test-user");
assert_eq!("https://example.com/api/0.1/test-user", remote_client.bound_base_uri());
let user_uuid = Uuid::from_str(&"316ea470-ce35-4adf-9c61-e0de6e289c59").expect("uuid");
let server_uri = String::from("https://example.com/api/0.1");
let remote_client = RemoteClient::new(server_uri, user_uuid);
assert_eq!("https://example.com/api/0.1/316ea470-ce35-4adf-9c61-e0de6e289c59", remote_client.bound_base_uri());
}
}

View file

@ -46,7 +46,25 @@ impl TxMapper {
}
}
fn get(db_tx: &mut rusqlite::Transaction, tx: Entid) -> Result<Option<Uuid>> {
pub fn get_tx_for_uuid(db_tx: &rusqlite::Transaction, uuid: &Uuid) -> Result<Option<Entid>> {
let mut stmt = db_tx.prepare_cached(
"SELECT tx FROM tolstoy_tu WHERE uuid = ?"
)?;
let uuid_bytes = uuid.as_bytes().to_vec();
let results = stmt.query_map(&[&uuid_bytes], |r| r.get(0))?;
let mut txs = vec![];
txs.extend(results);
if txs.len() == 0 {
return Ok(None);
} else if txs.len() > 1 {
bail!(ErrorKind::TxIncorrectlyMapped(txs.len()));
}
Ok(Some(txs.remove(0)?))
}
pub fn get(db_tx: &rusqlite::Transaction, tx: Entid) -> Result<Option<Uuid>> {
let mut stmt = db_tx.prepare_cached(
"SELECT uuid FROM tolstoy_tu WHERE tx = ?"
)?;
@ -54,7 +72,7 @@ impl TxMapper {
let results = stmt.query_and_then(&[&tx], |r| -> Result<Uuid>{
let bytes: Vec<u8> = r.get(0);
Uuid::from_bytes(bytes.as_slice()).map_err(|e| e.into())
})?.peekable();
})?;
let mut uuids = vec![];
uuids.extend(results);

View file

@ -127,11 +127,14 @@ fn to_tx_part(row: &rusqlite::Row) -> Result<TxPart> {
}
impl Processor {
pub fn process<R>(sqlite: &rusqlite::Transaction, receiver: &mut R) -> Result<()>
pub fn process<R>(sqlite: &rusqlite::Transaction, from_tx: Option<Entid>, receiver: &mut R) -> Result<()>
where R: TxReceiver {
let mut stmt = sqlite.prepare(
"SELECT e, a, v, value_type_tag, tx, added FROM transactions ORDER BY tx"
)?;
let tx_filter = match from_tx {
Some(tx) => format!(" WHERE tx > {} ", tx),
None => format!("")
};
let select_query = format!("SELECT e, a, v, value_type_tag, tx, added FROM transactions {} ORDER BY tx", tx_filter);
let mut stmt = sqlite.prepare(&select_query)?;
let mut rows = stmt.query_and_then(&[], to_tx_part)?.peekable();
let mut current_tx = None;

View file

@ -45,6 +45,7 @@ pub static LONG_EXIT_COMMAND: &'static str = &"exit";
pub static SHORT_EXIT_COMMAND: &'static str = &"e";
pub static LONG_QUERY_EXPLAIN_COMMAND: &'static str = &"explain_query";
pub static SHORT_QUERY_EXPLAIN_COMMAND: &'static str = &"eq";
pub static SYNC_COMMAND: &'static str = &"sync";
#[derive(Clone, Debug, Eq, PartialEq)]
pub enum Command {
@ -54,6 +55,7 @@ pub enum Command {
Open(String),
Query(String),
Schema,
Sync(Vec<String>),
Timer(bool),
Transact(String),
QueryExplain(String),
@ -76,6 +78,7 @@ impl Command {
&Command::Open(_) |
&Command::Close |
&Command::Exit |
&Command::Sync(_) |
&Command::Schema => true
}
}
@ -90,6 +93,7 @@ impl Command {
&Command::Open(_) |
&Command::Close |
&Command::Exit |
&Command::Sync(_) |
&Command::Schema => false
}
}
@ -120,6 +124,9 @@ impl Command {
&Command::Schema => {
format!(".{}", SCHEMA_COMMAND)
},
&Command::Sync(ref args) => {
format!(".{} {:?}", SYNC_COMMAND, args)
},
&Command::QueryExplain(ref args) => {
format!(".{} {}", LONG_QUERY_EXPLAIN_COMMAND, args)
},
@ -179,6 +186,19 @@ pub fn command(s: &str) -> Result<Command, cli::Error> {
Ok(Command::Schema)
});
let sync_parser = string(SYNC_COMMAND)
.with(spaces())
.with(arguments())
.map(|args| {
if args.len() < 1 {
bail!(cli::ErrorKind::CommandParse("Missing required argument".to_string()));
}
if args.len() > 2 {
bail!(cli::ErrorKind::CommandParse(format!("Unrecognized argument {:?}", args[2])));
}
Ok(Command::Sync(args.clone()))
});
let exit_parser = try(string(LONG_EXIT_COMMAND)).or(try(string(SHORT_EXIT_COMMAND)))
.with(no_arg_parser())
.map(|args| {
@ -216,7 +236,7 @@ pub fn command(s: &str) -> Result<Command, cli::Error> {
});
spaces()
.skip(token('.'))
.with(choice::<[&mut Parser<Input = _, Output = Result<Command, cli::Error>>; 9], _>
.with(choice::<[&mut Parser<Input = _, Output = Result<Command, cli::Error>>; 10], _>
([&mut try(help_parser),
&mut try(timer_parser),
&mut try(open_parser),
@ -225,6 +245,7 @@ pub fn command(s: &str) -> Result<Command, cli::Error> {
&mut try(exit_parser),
&mut try(query_parser),
&mut try(schema_parser),
&mut try(sync_parser),
&mut try(transact_parser)]))
.parse(s)
.unwrap_or((Err(cli::ErrorKind::CommandParse(format!("Invalid command {:?}", s)).into()), "")).0
@ -315,6 +336,19 @@ mod tests {
}
}
#[test]
fn test_sync_parser_path_arg() {
let input = ".sync https://example.com/api/ 316ea470-ce35-4adf-9c61-e0de6e289c59";
let cmd = command(&input).expect("Expected open command");
match cmd {
Command::Sync(args) => {
assert_eq!(args[0], "https://example.com/api/".to_string());
assert_eq!(args[1], "316ea470-ce35-4adf-9c61-e0de6e289c59".to_string());
},
_ => assert!(false)
}
}
#[test]
fn test_open_parser_file_arg() {
let input = ".open my.db";

View file

@ -30,6 +30,7 @@ use mentat::{
QueryOutput,
QueryResults,
Store,
Syncable,
TxReport,
TypedValue,
};
@ -41,6 +42,7 @@ use command_parser::{
LONG_QUERY_COMMAND,
SHORT_QUERY_COMMAND,
SCHEMA_COMMAND,
SYNC_COMMAND,
LONG_TRANSACT_COMMAND,
SHORT_TRANSACT_COMMAND,
LONG_EXIT_COMMAND,
@ -67,6 +69,7 @@ lazy_static! {
map.insert(LONG_QUERY_COMMAND, "Execute a query against the current open database.");
map.insert(SHORT_QUERY_COMMAND, "Shortcut for `.query`. Execute a query against the current open database.");
map.insert(SCHEMA_COMMAND, "Output the schema for the current open database.");
map.insert(SYNC_COMMAND, "Synchronize database against a Sync Server URL for a provided user UUID.");
map.insert(LONG_TRANSACT_COMMAND, "Execute a transact against the current open database.");
map.insert(SHORT_TRANSACT_COMMAND, "Shortcut for `.transact`. Execute a transact against the current open database.");
map.insert(LONG_QUERY_EXPLAIN_COMMAND, "Show the SQL and query plan that would be executed for a given query.");
@ -180,6 +183,12 @@ impl Repl {
Err(e) => eprintln!("{}", e.to_string()),
};
},
Command::Sync(args) => {
match self.store.sync(&args[0], &args[1]) {
Ok(_) => println!("Synced!"),
Err(e) => eprintln!("{:?}", e)
};
}
Command::Close => self.close(),
Command::Query(query) => self.execute_query(query),
Command::QueryExplain(query) => self.explain_query(query),