Feedback pass 1
This commit is contained in:
parent
5c2c29bf26
commit
14e28de2f5
5 changed files with 74 additions and 47 deletions
|
@ -1,3 +1,13 @@
|
||||||
|
// Copyright 2016 Mozilla
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
|
||||||
|
// this file except in compliance with the License. You may obtain a copy of the
|
||||||
|
// License at http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
// Unless required by applicable law or agreed to in writing, software distributed
|
||||||
|
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
|
||||||
|
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
|
||||||
|
// specific language governing permissions and limitations under the License.
|
||||||
|
|
||||||
extern crate mentat;
|
extern crate mentat;
|
||||||
extern crate mentat_core;
|
extern crate mentat_core;
|
||||||
extern crate mentat_tolstoy;
|
extern crate mentat_tolstoy;
|
||||||
|
@ -6,7 +16,6 @@ use mentat::conn::Conn;
|
||||||
|
|
||||||
use mentat::new_connection;
|
use mentat::new_connection;
|
||||||
use mentat_tolstoy::tx_client::{
|
use mentat_tolstoy::tx_client::{
|
||||||
Tx,
|
|
||||||
TxReader,
|
TxReader,
|
||||||
TxClient
|
TxClient
|
||||||
};
|
};
|
||||||
|
@ -60,7 +69,7 @@ fn test_reader() {
|
||||||
assert_eq!(numba_entity_id, &part.a);
|
assert_eq!(numba_entity_id, &part.a);
|
||||||
assert!(part.v.matches_type(ValueType::Long));
|
assert!(part.v.matches_type(ValueType::Long));
|
||||||
assert_eq!(TypedValue::Long(123), part.v);
|
assert_eq!(TypedValue::Long(123), part.v);
|
||||||
assert_eq!(1, part.added);
|
assert_eq!(true, part.added);
|
||||||
|
|
||||||
// TODO retractions
|
// TODO retractions
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,9 +22,6 @@ path = "../core"
|
||||||
[dependencies.mentat_db]
|
[dependencies.mentat_db]
|
||||||
path = "../db"
|
path = "../db"
|
||||||
|
|
||||||
[dependencies.edn]
|
|
||||||
path = "../edn"
|
|
||||||
|
|
||||||
[dependencies.rusqlite]
|
[dependencies.rusqlite]
|
||||||
version = "0.12"
|
version = "0.12"
|
||||||
# System sqlite might be very old.
|
# System sqlite might be very old.
|
||||||
|
|
|
@ -13,7 +13,7 @@
|
||||||
use std;
|
use std;
|
||||||
use hyper;
|
use hyper;
|
||||||
use rusqlite;
|
use rusqlite;
|
||||||
use edn;
|
use uuid;
|
||||||
use mentat_db;
|
use mentat_db;
|
||||||
|
|
||||||
error_chain! {
|
error_chain! {
|
||||||
|
@ -25,7 +25,7 @@ error_chain! {
|
||||||
IOError(std::io::Error);
|
IOError(std::io::Error);
|
||||||
HttpError(hyper::Error);
|
HttpError(hyper::Error);
|
||||||
SqlError(rusqlite::Error);
|
SqlError(rusqlite::Error);
|
||||||
UuidParseError(edn::UuidParseError);
|
UuidParseError(uuid::ParseError);
|
||||||
}
|
}
|
||||||
|
|
||||||
links {
|
links {
|
||||||
|
|
|
@ -22,7 +22,6 @@ extern crate serde_json;
|
||||||
extern crate mentat_db;
|
extern crate mentat_db;
|
||||||
extern crate mentat_core;
|
extern crate mentat_core;
|
||||||
extern crate rusqlite;
|
extern crate rusqlite;
|
||||||
extern crate edn;
|
|
||||||
extern crate uuid;
|
extern crate uuid;
|
||||||
|
|
||||||
pub mod schema;
|
pub mod schema;
|
||||||
|
|
|
@ -10,48 +10,61 @@
|
||||||
|
|
||||||
use std::collections::BTreeMap;
|
use std::collections::BTreeMap;
|
||||||
use std::collections::btree_map::Entry;
|
use std::collections::btree_map::Entry;
|
||||||
|
use std::collections::HashMap;
|
||||||
|
|
||||||
use rusqlite;
|
use rusqlite;
|
||||||
|
|
||||||
use errors::{
|
use errors::{
|
||||||
Result,
|
Result,
|
||||||
ErrorKind
|
ErrorKind,
|
||||||
};
|
|
||||||
|
|
||||||
use mentat_db::types::{
|
|
||||||
Entid
|
|
||||||
};
|
};
|
||||||
|
|
||||||
use mentat_db::{
|
use mentat_db::{
|
||||||
entids,
|
entids,
|
||||||
TypedSQLValue
|
TypedSQLValue,
|
||||||
};
|
};
|
||||||
|
|
||||||
use mentat_core::{
|
use mentat_core::{
|
||||||
TypedValue
|
TypedValue,
|
||||||
|
Entid,
|
||||||
};
|
};
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct TxPart {
|
pub struct TxPart {
|
||||||
pub e: Entid,
|
pub e: Entid,
|
||||||
pub a: i64,
|
pub a: Entid,
|
||||||
pub v: TypedValue,
|
pub v: TypedValue,
|
||||||
pub added: i32
|
pub added: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Notes on 'parts' representation:
|
||||||
|
// Currently it's suitable for uses which necessitate pulling in the entire tx into memory,
|
||||||
|
// and don't require efficient querying/monitoring by attributes of parts.
|
||||||
|
//
|
||||||
|
// Example: streaming transactions to/from the server.
|
||||||
|
//
|
||||||
|
// In the future, consider:
|
||||||
|
// - A structure that makes typical tx-listener questions — "does this transaction mention
|
||||||
|
// an attribute or entity I care about?" — efficient. That might be a trie, it might be a
|
||||||
|
// bunch of extra data structures (e.g., a set of mentioned attributes), or something else.
|
||||||
|
// With an unsorted Vec<TxPart>, looking for a mentioned attribute requires linear search of the entire vector.
|
||||||
|
// - A structure that doesn't require pulling the entire tx into memory. This might be a cursor,
|
||||||
|
// a rowid range, or something else that's scoped to the lifetime of a particular database transaction,
|
||||||
|
// in order to preserve isolation.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct Tx {
|
pub struct Tx {
|
||||||
pub tx: Entid,
|
pub tx: Entid,
|
||||||
pub tx_instant: TypedValue,
|
pub tx_instant: TypedValue,
|
||||||
pub parts: Vec<TxPart>
|
pub parts: Vec<TxPart>,
|
||||||
}
|
}
|
||||||
|
|
||||||
struct RawTx {
|
struct RawDatom {
|
||||||
e: Entid,
|
e: Entid,
|
||||||
a: i64,
|
a: Entid,
|
||||||
v: TypedValue,
|
v: TypedValue, // composite of 'v' and 'value_type_tag'
|
||||||
tx: Entid,
|
tx: Entid,
|
||||||
added: i32
|
added: bool,
|
||||||
|
is_transaction: bool
|
||||||
}
|
}
|
||||||
|
|
||||||
pub trait TxReader {
|
pub trait TxReader {
|
||||||
|
@ -62,52 +75,61 @@ pub struct TxClient {}
|
||||||
|
|
||||||
impl TxReader for TxClient {
|
impl TxReader for TxClient {
|
||||||
fn all(sqlite: &rusqlite::Connection) -> Result<Vec<Tx>> {
|
fn all(sqlite: &rusqlite::Connection) -> Result<Vec<Tx>> {
|
||||||
// Make sure a=txInstant rows are first, so that we process
|
|
||||||
// all transactions before we process any transaction parts.
|
|
||||||
let mut stmt = sqlite.prepare(
|
let mut stmt = sqlite.prepare(
|
||||||
"SELECT
|
"SELECT
|
||||||
e, a, v, tx, added, value_type_tag,
|
e, a, v, tx, added, value_type_tag,
|
||||||
CASE a WHEN :txInstant THEN 1 ELSE 0 END is_transaction
|
CASE a WHEN :txInstant THEN 1 ELSE 0 END is_transaction
|
||||||
FROM transactions ORDER BY is_transaction DESC, tx ASC"
|
FROM transactions"
|
||||||
)?;
|
)?;
|
||||||
let rows: Vec<Result<RawTx>> = stmt.query_and_then_named(&[(":txInstant", &entids::DB_TX_INSTANT)], |row| -> Result<RawTx> {
|
let datoms: Vec<Result<RawDatom>> = stmt.query_and_then_named(&[(":txInstant", &entids::DB_TX_INSTANT)], |row| -> Result<RawDatom> {
|
||||||
Ok(RawTx {
|
Ok(RawDatom {
|
||||||
e: row.get(0),
|
e: row.get(0),
|
||||||
a: row.get(1),
|
a: row.get(1),
|
||||||
v: TypedValue::from_sql_value_pair(row.get(2), row.get(5))?,
|
v: TypedValue::from_sql_value_pair(row.get(2), row.get(5))?,
|
||||||
tx: row.get(3),
|
tx: row.get(3),
|
||||||
added: row.get(4)
|
added: row.get(4),
|
||||||
|
is_transaction: row.get(6),
|
||||||
})
|
})
|
||||||
})?.collect();
|
})?.collect();
|
||||||
|
|
||||||
// It's convenient to have a consistently ordered set of results,
|
// It's convenient to have a consistently ordered set of results,
|
||||||
// so we use a sorting map.
|
// so we use a sorting map.
|
||||||
let mut txes_by_tx = BTreeMap::new();
|
let mut txes_by_tx = BTreeMap::new();
|
||||||
for row_result in rows {
|
let mut tx_parts_by_tx = HashMap::new();
|
||||||
let row = row_result?;
|
|
||||||
// Row represents a transaction.
|
// On first pass, build our Txes and TxParts for each.
|
||||||
if row.a == entids::DB_TX_INSTANT {
|
for datom_result in datoms {
|
||||||
txes_by_tx.insert(row.tx, Tx {
|
let datom = datom_result?;
|
||||||
tx: row.tx,
|
// Datom represents a transaction.
|
||||||
tx_instant: row.v,
|
if datom.is_transaction {
|
||||||
parts: Vec::new()
|
txes_by_tx.insert(datom.tx, Tx {
|
||||||
|
tx: datom.tx,
|
||||||
|
tx_instant: datom.v,
|
||||||
|
parts: Vec::new(),
|
||||||
});
|
});
|
||||||
// Row represents part of a transaction. Our query statement above guarantees
|
// Datom represents part of a transaction. Our query statement above guarantees
|
||||||
// that we've already processed corresponding transaction at this point.
|
// that we've already processed corresponding transaction at this point.
|
||||||
} else {
|
} else {
|
||||||
if let Entry::Occupied(mut t) = txes_by_tx.entry(row.tx) {
|
let parts = tx_parts_by_tx.entry(datom.tx).or_insert(Vec::new());
|
||||||
t.get_mut().parts.push(TxPart {
|
parts.push(TxPart {
|
||||||
e: row.e,
|
e: datom.e,
|
||||||
a: row.a,
|
a: datom.a,
|
||||||
v: row.v,
|
v: datom.v,
|
||||||
added: row.added
|
added: datom.added,
|
||||||
});
|
});
|
||||||
} else {
|
|
||||||
bail!(ErrorKind::UnexpectedState(format!("Encountered transaction part before transaction {:?}", row.tx)))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// On second pass, consume TxParts map and associate parts with corresponding Txes.
|
||||||
|
for (e, tx_parts) in tx_parts_by_tx.into_iter() {
|
||||||
|
if let Entry::Occupied(mut tx) = txes_by_tx.entry(e) {
|
||||||
|
tx.get_mut().parts = tx_parts;
|
||||||
|
} else {
|
||||||
|
bail!(ErrorKind::UnexpectedState(format!("Missing transactions datoms for tx={:?}", e)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Finally, consume the Tx map and a Vec of its values.
|
||||||
Ok(txes_by_tx.into_iter().map(|(_, tx)| tx).collect())
|
Ok(txes_by_tx.into_iter().map(|(_, tx)| tx).collect())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue