"Unchanged server" uploader flow (#543) r=rnewman

* Remove unused struct from tx_processor

* Derive serialize & deserialize for TypedValue

* First pass of uploader flow + feedback
This commit is contained in:
Grisha Kruglov 2018-02-09 09:55:19 -08:00 committed by GitHub
parent d11810dca7
commit 84f29676e8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 528 additions and 77 deletions

View file

@ -4,12 +4,15 @@ version = "0.0.1"
workspace = ".."
[dependencies]
chrono = "0.4"
chrono = { version = "0.4", features = ["serde"] }
enum-set = { git = "https://github.com/rnewman/enum-set" }
lazy_static = "0.2"
num = "0.1"
ordered-float = "0.5"
ordered-float = { version = "0.5", features = ["serde"] }
uuid = "0.5"
serde = { version = "1.0", features = ["rc"] }
serde_derive = "1.0"
[dependencies.edn]
path = "../edn"
features = ["serde_support"]

View file

@ -12,10 +12,14 @@ extern crate chrono;
extern crate enum_set;
extern crate ordered_float;
extern crate uuid;
extern crate serde;
#[macro_use]
extern crate lazy_static;
#[macro_use]
extern crate serde_derive;
extern crate edn;
pub mod values;
@ -175,7 +179,7 @@ impl fmt::Display for ValueType {
/// Represents a Mentat value in a particular value set.
// TODO: expand to include :db.type/{instant,url,uuid}.
// TODO: BigInt?
#[derive(Clone,Debug,Eq,Hash,Ord,PartialOrd,PartialEq)]
#[derive(Clone,Debug,Eq,Hash,Ord,PartialOrd,PartialEq,Serialize,Deserialize)]
pub enum TypedValue {
Ref(Entid),
Boolean(bool),

View file

@ -17,6 +17,11 @@ num = "0.1"
ordered-float = "0.5"
pretty = "0.2"
uuid = "0.5"
serde = { version = "1.0", optional = true }
serde_derive = { version = "1.0", optional = true }
[features]
serde_support = ["serde", "serde_derive"]
[build-dependencies]
peg = "0.5"

View file

@ -15,6 +15,13 @@ extern crate ordered_float;
extern crate pretty;
extern crate uuid;
#[cfg(feature = "serde_support")]
extern crate serde;
#[cfg(feature = "serde_support")]
#[macro_use]
extern crate serde_derive;
pub mod symbols;
pub mod types;
pub mod pretty_print;

View file

@ -70,6 +70,7 @@ pub struct NamespacedSymbol {
pub struct Keyword(pub String);
#[derive(Clone,Debug,Eq,Hash,Ord,PartialOrd,PartialEq)]
#[cfg_attr(feature = "serde_support", derive(Serialize, Deserialize))]
pub struct NamespacedKeyword {
// We derive PartialOrd, which implements a lexicographic order based
// on the order of members, so put namespace first.

View file

@ -139,7 +139,7 @@ fn possible_affinities(value_types: ValueTypeSet) -> HashMap<ValueTypeTag, Vec<S
let mut result = HashMap::with_capacity(value_types.len());
for ty in value_types {
let (tag, affinity_to_check) = ty.sql_representation();
let mut affinities = result.entry(tag).or_insert_with(Vec::new);
let affinities = result.entry(tag).or_insert_with(Vec::new);
if let Some(affinity) = affinity_to_check {
affinities.push(affinity);
}

View file

@ -95,13 +95,15 @@ fn assert_tx_datoms_count(receiver: &TestingReceiver, tx_num: usize, expected_da
fn test_reader() {
let mut c = new_connection("").expect("Couldn't open conn.");
let mut conn = Conn::connect(&mut c).expect("Couldn't open DB.");
// Don't inspect the bootstrap transaction, but we'd like to see it's there.
let mut receiver = TxCountingReceiver::new();
assert_eq!(false, receiver.is_done);
Processor::process(&c, &mut receiver).expect("processor");
assert_eq!(true, receiver.is_done);
assert_eq!(1, receiver.tx_count);
{
let db_tx = c.transaction().expect("db tx");
// Don't inspect the bootstrap transaction, but we'd like to see it's there.
let mut receiver = TxCountingReceiver::new();
assert_eq!(false, receiver.is_done);
Processor::process(&db_tx, &mut receiver).expect("processor");
assert_eq!(true, receiver.is_done);
assert_eq!(1, receiver.tx_count);
}
let ids = conn.transact(&mut c, r#"[
[:db/add "s" :db/ident :foo/numba]
@ -110,35 +112,42 @@ fn test_reader() {
]"#).expect("successful transaction").tempids;
let numba_entity_id = ids.get("s").unwrap();
// Expect to see one more transaction of four parts (one for tx datom itself).
let mut receiver = TestingReceiver::new();
Processor::process(&c, &mut receiver).expect("processor");
{
let db_tx = c.transaction().expect("db tx");
// Expect to see one more transaction of four parts (one for tx datom itself).
let mut receiver = TestingReceiver::new();
Processor::process(&db_tx, &mut receiver).expect("processor");
println!("{:#?}", receiver);
println!("{:#?}", receiver);
assert_eq!(2, receiver.txes.keys().count());
assert_tx_datoms_count(&receiver, 1, 4);
assert_eq!(2, receiver.txes.keys().count());
assert_tx_datoms_count(&receiver, 1, 4);
}
let ids = conn.transact(&mut c, r#"[
[:db/add "b" :foo/numba 123]
]"#).expect("successful transaction").tempids;
let asserted_e = ids.get("b").unwrap();
// Expect to see a single two part transaction
let mut receiver = TestingReceiver::new();
Processor::process(&c, &mut receiver).expect("processor");
{
let db_tx = c.transaction().expect("db tx");
assert_eq!(3, receiver.txes.keys().count());
assert_tx_datoms_count(&receiver, 2, 2);
// Expect to see a single two part transaction
let mut receiver = TestingReceiver::new();
Processor::process(&db_tx, &mut receiver).expect("processor");
// Inspect the transaction part.
let tx_id = receiver.txes.keys().nth(2).expect("tx");
let datoms = receiver.txes.get(tx_id).expect("datoms");
let part = &datoms[0];
assert_eq!(3, receiver.txes.keys().count());
assert_tx_datoms_count(&receiver, 2, 2);
assert_eq!(asserted_e, &part.e);
assert_eq!(numba_entity_id, &part.a);
assert!(part.v.matches_type(ValueType::Long));
assert_eq!(TypedValue::Long(123), part.v);
assert_eq!(true, part.added);
// Inspect the transaction part.
let tx_id = receiver.txes.keys().nth(2).expect("tx");
let datoms = receiver.txes.get(tx_id).expect("datoms");
let part = &datoms[0];
assert_eq!(asserted_e, &part.e);
assert_eq!(numba_entity_id, &part.a);
assert!(part.v.matches_type(ValueType::Long));
assert_eq!(TypedValue::Long(123), part.v);
assert_eq!(true, part.added);
}
}

View file

@ -10,6 +10,7 @@ hyper = "0.11"
tokio-core = "0.1"
serde = "1.0"
serde_json = "1.0"
serde_cbor = "0.8.2"
serde_derive = "1.0"
lazy_static = "0.2"
uuid = { version = "0.5", features = ["v4", "serde"] }

View file

@ -1,4 +1,4 @@
// Copyright 2016 Mozilla
// Copyright 2018 Mozilla
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// this file except in compliance with the License. You may obtain a copy of the
@ -15,6 +15,8 @@ use hyper;
use rusqlite;
use uuid;
use mentat_db;
use serde_cbor;
use serde_json;
error_chain! {
types {
@ -24,8 +26,12 @@ error_chain! {
foreign_links {
IOError(std::io::Error);
HttpError(hyper::Error);
HyperUriError(hyper::error::UriError);
SqlError(rusqlite::Error);
UuidParseError(uuid::ParseError);
Utf8Error(std::str::Utf8Error);
JsonError(serde_json::Error);
CborError(serde_cbor::error::Error);
}
links {
@ -33,9 +39,29 @@ error_chain! {
}
errors {
TxIncorrectlyMapped(n: usize) {
description("encountered more than one uuid mapping for tx")
display("expected one, found {} uuid mappings for tx", n)
}
UnexpectedState(t: String) {
description("encountered unexpected state")
display("encountered unexpected state: {}", t)
}
NotYetImplemented(t: String) {
description("not yet implemented")
display("not yet implemented: {}", t)
}
DuplicateMetadata(k: String) {
description("encountered more than one metadata value for key")
display("encountered more than one metadata value for key: {}", k)
}
UploadingProcessorUnfinished {
description("Uploading Tx processor couldn't finish")
display("Uploading Tx processor couldn't finish")
}
}
}

View file

@ -1,4 +1,4 @@
// Copyright 2016 Mozilla
// Copyright 2018 Mozilla
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// this file except in compliance with the License. You may obtain a copy of the
@ -8,16 +8,23 @@
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
// For error_chain:
#![recursion_limit="128"]
#[macro_use]
extern crate error_chain;
#[macro_use]
extern crate lazy_static;
#[macro_use]
extern crate serde_derive;
extern crate hyper;
extern crate tokio_core;
extern crate futures;
extern crate serde;
extern crate serde_cbor;
extern crate serde_json;
extern crate mentat_db;
extern crate mentat_core;
@ -28,3 +35,5 @@ pub mod schema;
pub mod metadata;
pub mod tx_processor;
pub mod errors;
pub mod syncer;
pub mod tx_mapper;

View file

@ -1,4 +1,4 @@
// Copyright 2016 Mozilla
// Copyright 2018 Mozilla
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// this file except in compliance with the License. You may obtain a copy of the
@ -14,28 +14,21 @@ use rusqlite;
use uuid::Uuid;
use schema;
use errors::Result;
use errors::{
ErrorKind,
Result,
};
trait HeadTrackable {
fn remote_head(&self) -> Result<Uuid>;
fn set_remote_head(&mut self, uuid: &Uuid) -> Result<()>;
pub trait HeadTrackable {
fn remote_head(tx: &rusqlite::Transaction) -> Result<Uuid>;
fn set_remote_head(tx: &rusqlite::Transaction, uuid: &Uuid) -> Result<()>;
}
struct SyncMetadataClient {
conn: rusqlite::Connection
}
impl SyncMetadataClient {
fn new(conn: rusqlite::Connection) -> Self {
SyncMetadataClient {
conn: conn
}
}
}
pub struct SyncMetadataClient {}
impl HeadTrackable for SyncMetadataClient {
fn remote_head(&self) -> Result<Uuid> {
self.conn.query_row(
fn remote_head(tx: &rusqlite::Transaction) -> Result<Uuid> {
tx.query_row(
"SELECT value FROM tolstoy_metadata WHERE key = ?",
&[&schema::REMOTE_HEAD_KEY], |r| {
let bytes: Vec<u8> = r.get(0);
@ -44,11 +37,14 @@ impl HeadTrackable for SyncMetadataClient {
)?.map_err(|e| e.into())
}
fn set_remote_head(&mut self, uuid: &Uuid) -> Result<()> {
let tx = self.conn.transaction()?;
fn set_remote_head(tx: &rusqlite::Transaction, uuid: &Uuid) -> Result<()> {
let uuid_bytes = uuid.as_bytes().to_vec();
tx.execute("UPDATE tolstoy_metadata SET value = ? WHERE key = ?", &[&uuid_bytes, &schema::REMOTE_HEAD_KEY])?;
tx.commit().map_err(|e| e.into())
let updated = tx.execute("UPDATE tolstoy_metadata SET value = ? WHERE key = ?",
&[&uuid_bytes, &schema::REMOTE_HEAD_KEY])?;
if updated != 1 {
bail!(ErrorKind::DuplicateMetadata(schema::REMOTE_HEAD_KEY.into()));
}
Ok(())
}
}
@ -58,17 +54,17 @@ mod tests {
#[test]
fn test_get_remote_head_default() {
let conn = schema::tests::setup_conn();
let metadata_client: SyncMetadataClient = SyncMetadataClient::new(conn);
assert_eq!(Uuid::nil(), metadata_client.remote_head().expect("fetch succeeded"));
let mut conn = schema::tests::setup_conn();
let tx = conn.transaction().expect("db tx");
assert_eq!(Uuid::nil(), SyncMetadataClient::remote_head(&tx).expect("fetch succeeded"));
}
#[test]
fn test_set_and_get_remote_head() {
let conn = schema::tests::setup_conn();
let mut conn = schema::tests::setup_conn();
let uuid = Uuid::new_v4();
let mut metadata_client: SyncMetadataClient = SyncMetadataClient::new(conn);
metadata_client.set_remote_head(&uuid).expect("update succeeded");
assert_eq!(uuid, metadata_client.remote_head().expect("fetch succeeded"));
let tx = conn.transaction().expect("db tx");
SyncMetadataClient::set_remote_head(&tx, &uuid).expect("update succeeded");
assert_eq!(uuid, SyncMetadataClient::remote_head(&tx).expect("fetch succeeded"));
}
}

View file

@ -1,4 +1,4 @@
// Copyright 2016 Mozilla
// Copyright 2018 Mozilla
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// this file except in compliance with the License. You may obtain a copy of the

286
tolstoy/src/syncer.rs Normal file
View file

@ -0,0 +1,286 @@
// Copyright 2018 Mozilla
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// this file except in compliance with the License. You may obtain a copy of the
// License at http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
use std;
use std::str::FromStr;
use std::collections::HashMap;
use futures::{future, Future, Stream};
use hyper;
use hyper::Client;
use hyper::{Method, Request, StatusCode, Error as HyperError};
use hyper::header::{ContentType};
use rusqlite;
use serde_cbor;
use serde_json;
use tokio_core::reactor::Core;
use uuid::Uuid;
use mentat_core::Entid;
use metadata::SyncMetadataClient;
use metadata::HeadTrackable;
use errors::{
ErrorKind,
Result,
};
use tx_processor::{
Processor,
TxReceiver,
TxPart,
};
use tx_mapper::TxMapper;
static API_VERSION: &str = "0.1";
static BASE_URL: &str = "https://mentat.dev.lcip.org/mentatsync/";
pub struct Syncer {}
struct UploadingTxReceiver<'c> {
pub tx_temp_uuids: HashMap<Entid, Uuid>,
pub is_done: bool,
remote_client: &'c RemoteClient,
remote_head: &'c Uuid,
rolling_temp_head: Option<Uuid>,
}
impl<'c> UploadingTxReceiver<'c> {
fn new(client: &'c RemoteClient, remote_head: &'c Uuid) -> UploadingTxReceiver<'c> {
UploadingTxReceiver {
tx_temp_uuids: HashMap::new(),
remote_client: client,
remote_head: remote_head,
rolling_temp_head: None,
is_done: false
}
}
}
impl<'c> TxReceiver for UploadingTxReceiver<'c> {
fn tx<T>(&mut self, tx_id: Entid, d: &mut T) -> Result<()>
where T: Iterator<Item=TxPart> {
// Yes, we generate a new UUID for a given Tx, even if we might
// already have one mapped locally. Pre-existing local mapping will
// be replaced if this sync succeeds entirely.
// If we're seeing this tx again, it implies that previous attempt
// to sync didn't update our local head. Something went wrong last time,
// and it's unwise to try to re-use these remote tx mappings.
// We just leave garbage txs to be GC'd on the server.
let tx_uuid = Uuid::new_v4();
self.tx_temp_uuids.insert(tx_id, tx_uuid);
let mut tx_chunks = vec![];
// TODO separate bits of network work should be combined into single 'future'
// Upload all chunks.
for datom in d {
let datom_uuid = Uuid::new_v4();
tx_chunks.push(datom_uuid);
self.remote_client.put_chunk(&datom_uuid, serde_cbor::to_vec(&datom)?)?
}
// Upload tx.
// NB: At this point, we may choose to update remote & local heads.
// Depending on how much we're uploading, and how unreliable our connection
// is, this might be a good thing to do to ensure we make at least some progress.
// Comes at a cost of possibly increasing racing against other clients.
match self.rolling_temp_head {
Some(parent) => {
self.remote_client.put_transaction(&tx_uuid, &parent, &tx_chunks)?;
self.rolling_temp_head = Some(tx_uuid.clone());
},
None => self.remote_client.put_transaction(&tx_uuid, self.remote_head, &tx_chunks)?
}
Ok(())
}
fn done(&mut self) -> Result<()> {
self.is_done = true;
Ok(())
}
}
impl Syncer {
pub fn flow(sqlite: &mut rusqlite::Connection, username: String) -> Result<()> {
// Sketch of an upload flow:
// get remote head
// compare with local head
// if the same:
// - upload any local chunks, transactions
// - move server remote head
// - move local remote head
// TODO configure this sync with some auth data
let remote_client = RemoteClient::new(BASE_URL.into(), username);
let mut db_tx = sqlite.transaction()?;
let remote_head = remote_client.get_head()?;
let locally_known_remote_head = SyncMetadataClient::remote_head(&db_tx)?;
// TODO it's possible that we've successfully advanced remote head previously,
// but failed to advance our own local head. If that's the case, and we can recognize it,
// our sync becomes much cheaper.
// Don't know how to download, merge, resolve conflicts, etc yet.
if locally_known_remote_head != remote_head {
bail!(ErrorKind::NotYetImplemented(
format!("Can't yet sync against changed server. Local head {:?}, remote head {:?}", locally_known_remote_head, remote_head)
));
}
// Local and remote heads agree.
// In theory, it should be safe to upload our stuff now.
let mut uploader = UploadingTxReceiver::new(&remote_client, &remote_head);
Processor::process(&db_tx, &mut uploader)?;
if !uploader.is_done {
bail!(ErrorKind::UploadingProcessorUnfinished);
}
// Last tx uuid uploaded by the tx receiver.
// It's going to be our new head.
if let Some(last_tx_uploaded) = uploader.rolling_temp_head {
// Upload remote head.
remote_client.put_head(&last_tx_uploaded)?;
// On succes:
// - persist local mappings from the receiver
// - update our local "remote head".
TxMapper::set_bulk(&mut db_tx, &uploader.tx_temp_uuids)?;
SyncMetadataClient::set_remote_head(&db_tx, &last_tx_uploaded)?;
// Commit everything: tx->uuid mappings and the new HEAD. We're synced!
db_tx.commit()?;
}
Ok(())
}
}
#[derive(Serialize)]
struct SerializedHead<'a> {
head: &'a Uuid
}
#[derive(Serialize)]
struct SerializedTransaction<'a> {
parent: &'a Uuid,
chunks: &'a Vec<Uuid>
}
struct RemoteClient {
base_uri: String,
user_id: String
}
impl RemoteClient {
fn new(base_uri: String, user_id: String) -> Self {
RemoteClient {
base_uri: base_uri,
user_id: user_id
}
}
fn bound_base_uri(&self) -> String {
// TODO escaping
format!("{}/{}/{}", self.base_uri, API_VERSION, self.user_id)
}
fn get_uuid(&self, uri: String) -> Result<Uuid> {
let mut core = Core::new()?;
let client = Client::new(&core.handle());
let uri = uri.parse()?;
let get = client.get(uri).and_then(|res| {
res.body().concat2()
});
let got = core.run(get)?;
Ok(Uuid::from_str(std::str::from_utf8(&got)?)?)
}
fn put<T>(&self, uri: String, payload: T, expected: StatusCode) -> Result<()>
where hyper::Body: std::convert::From<T>,
T: {
let mut core = Core::new()?;
let client = Client::new(&core.handle());
let uri = uri.parse()?;
let mut req = Request::new(Method::Put, uri);
req.headers_mut().set(ContentType::json());
req.set_body(payload);
let put = client.request(req).and_then(|res| {
let status_code = res.status();
if status_code != expected {
future::err(HyperError::Status)
} else {
// body will be empty...
future::ok(())
}
});
core.run(put)?;
Ok(())
}
fn put_transaction(&self, transaction_uuid: &Uuid, parent_uuid: &Uuid, chunks: &Vec<Uuid>) -> Result<()> {
// {"parent": uuid, "chunks": [chunk1, chunk2...]}
let transaction = SerializedTransaction {
parent: parent_uuid,
chunks: chunks
};
let uri = format!("{}/transactions/{}", self.bound_base_uri(), transaction_uuid);
let json = serde_json::to_string(&transaction)?;
self.put(uri, json, StatusCode::Created)
}
fn get_head(&self) -> Result<Uuid> {
let uri = format!("{}/head", self.bound_base_uri());
self.get_uuid(uri)
}
fn put_head(&self, uuid: &Uuid) -> Result<()> {
// {"head": uuid}
let head = SerializedHead {
head: uuid
};
let uri = format!("{}/head", self.bound_base_uri());
let json = serde_json::to_string(&head)?;
self.put(uri, json, StatusCode::NoContent)
}
fn put_chunk(&self, chunk_uuid: &Uuid, payload: Vec<u8>) -> Result<()> {
let uri = format!("{}/chunks/{}", self.bound_base_uri(), chunk_uuid);
self.put(uri, payload, StatusCode::Created)
}
}
#[cfg(test)]
mod tests {
use super::*;
fn test_remote_client(uri: &str, user_id: &str) -> RemoteClient {
RemoteClient::new(uri.into(), user_id.into())
}
#[test]
fn test_remote_client_bound_uri() {
let remote_client = test_remote_client("https://example.com/api", "test-user");
assert_eq!("https://example.com/api/0.1/test-user", remote_client.bound_base_uri());
}
}

110
tolstoy/src/tx_mapper.rs Normal file
View file

@ -0,0 +1,110 @@
// Copyright 2018 Mozilla
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// this file except in compliance with the License. You may obtain a copy of the
// License at http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
use std::collections::HashMap;
use rusqlite;
use uuid::Uuid;
use mentat_core::Entid;
use errors::{
ErrorKind,
Result,
};
// Exposes a tx<->uuid mapping interface.
pub struct TxMapper {}
impl TxMapper {
pub fn set_bulk(db_tx: &mut rusqlite::Transaction, tx_uuid_map: &HashMap<Entid, Uuid>) -> Result<()> {
let mut stmt = db_tx.prepare_cached(
"INSERT OR REPLACE INTO tolstoy_tu (tx, uuid) VALUES (?, ?)"
)?;
for (tx, uuid) in tx_uuid_map.iter() {
let uuid_bytes = uuid.as_bytes().to_vec();
stmt.execute(&[tx, &uuid_bytes])?;
}
Ok(())
}
// TODO for when we're downloading, right?
pub fn get_or_set_uuid_for_tx(db_tx: &mut rusqlite::Transaction, tx: Entid) -> Result<Uuid> {
match TxMapper::get(db_tx, tx)? {
Some(uuid) => Ok(uuid),
None => {
let uuid = Uuid::new_v4();
let uuid_bytes = uuid.as_bytes().to_vec();
db_tx.execute("INSERT INTO tolstoy_tu (tx, uuid) VALUES (?, ?)", &[&tx, &uuid_bytes])?;
return Ok(uuid);
}
}
}
fn get(db_tx: &mut rusqlite::Transaction, tx: Entid) -> Result<Option<Uuid>> {
let mut stmt = db_tx.prepare_cached(
"SELECT uuid FROM tolstoy_tu WHERE tx = ?"
)?;
let results = stmt.query_and_then(&[&tx], |r| -> Result<Uuid>{
let bytes: Vec<u8> = r.get(0);
Uuid::from_bytes(bytes.as_slice()).map_err(|e| e.into())
})?.peekable();
let mut uuids = vec![];
uuids.extend(results);
if uuids.len() == 0 {
return Ok(None);
} else if uuids.len() > 1 {
bail!(ErrorKind::TxIncorrectlyMapped(uuids.len()));
}
Ok(Some(uuids.remove(0)?))
}
}
#[cfg(test)]
pub mod tests {
use super::*;
use schema;
#[test]
fn test_getters() {
let mut conn = schema::tests::setup_conn();
let mut tx = conn.transaction().expect("db tx");
assert_eq!(None, TxMapper::get(&mut tx, 1).expect("success"));
let set_uuid = TxMapper::get_or_set_uuid_for_tx(&mut tx, 1).expect("success");
assert_eq!(Some(set_uuid), TxMapper::get(&mut tx, 1).expect("success"));
}
#[test]
fn test_bulk_setter() {
let mut conn = schema::tests::setup_conn();
let mut tx = conn.transaction().expect("db tx");
let mut map = HashMap::new();
TxMapper::set_bulk(&mut tx, &map).expect("empty map success");
let uuid1 = Uuid::new_v4();
let uuid2 = Uuid::new_v4();
map.insert(1, uuid1);
map.insert(2, uuid2);
TxMapper::set_bulk(&mut tx, &map).expect("map success");
assert_eq!(Some(uuid1), TxMapper::get(&mut tx, 1).expect("success"));
assert_eq!(Some(uuid2), TxMapper::get(&mut tx, 2).expect("success"));
// Now let's replace one of mappings.
map.remove(&1);
let new_uuid2 = Uuid::new_v4();
map.insert(2, new_uuid2);
TxMapper::set_bulk(&mut tx, &map).expect("map success");
assert_eq!(Some(uuid1), TxMapper::get(&mut tx, 1).expect("success"));
assert_eq!(Some(new_uuid2), TxMapper::get(&mut tx, 2).expect("success"));
}
}

View file

@ -1,4 +1,4 @@
// Copyright 2016 Mozilla
// Copyright 2018 Mozilla
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// this file except in compliance with the License. You may obtain a copy of the
@ -24,7 +24,7 @@ use mentat_core::{
TypedValue,
};
#[derive(Debug, Clone)]
#[derive(Debug,Clone,Serialize,Deserialize)]
pub struct TxPart {
pub e: Entid,
pub a: Entid,
@ -33,12 +33,6 @@ pub struct TxPart {
pub added: bool,
}
#[derive(Debug, Clone)]
pub struct Tx {
pub tx: Entid,
pub tx_instant: TypedValue,
}
pub trait TxReceiver {
fn tx<T>(&mut self, tx_id: Entid, d: &mut T) -> Result<()>
where T: Iterator<Item=TxPart>;
@ -47,17 +41,17 @@ pub trait TxReceiver {
pub struct Processor {}
pub struct DatomsIterator<'conn, 't, T>
pub struct DatomsIterator<'dbtx, 't, T>
where T: Sized + Iterator<Item=Result<TxPart>> + 't {
at_first: bool,
at_last: bool,
first: &'conn TxPart,
first: &'dbtx TxPart,
rows: &'t mut Peekable<T>,
}
impl<'conn, 't, T> DatomsIterator<'conn, 't, T>
impl<'dbtx, 't, T> DatomsIterator<'dbtx, 't, T>
where T: Sized + Iterator<Item=Result<TxPart>> + 't {
fn new(first: &'conn TxPart, rows: &'t mut Peekable<T>) -> DatomsIterator<'conn, 't, T>
fn new(first: &'dbtx TxPart, rows: &'t mut Peekable<T>) -> DatomsIterator<'dbtx, 't, T>
{
DatomsIterator {
at_first: true,
@ -68,7 +62,7 @@ where T: Sized + Iterator<Item=Result<TxPart>> + 't {
}
}
impl<'conn, 't, T> Iterator for DatomsIterator<'conn, 't, T>
impl<'dbtx, 't, T> Iterator for DatomsIterator<'dbtx, 't, T>
where T: Sized + Iterator<Item=Result<TxPart>> + 't {
type Item = TxPart;
@ -133,7 +127,7 @@ fn to_tx_part(row: &rusqlite::Row) -> Result<TxPart> {
}
impl Processor {
pub fn process<R>(sqlite: &rusqlite::Connection, receiver: &mut R) -> Result<()>
pub fn process<R>(sqlite: &rusqlite::Transaction, receiver: &mut R) -> Result<()>
where R: TxReceiver {
let mut stmt = sqlite.prepare(
"SELECT e, a, v, value_type_tag, tx, added FROM transactions ORDER BY tx"