[tx] Start implementing bulk SQL insertion algorithms (#214). r=rnewman,jsantell
* Pre: Add some value conversion tests. This is follow-up to earlier work. Turn TypedValue::Keyword into edn::Value::NamespacedKeyword. Don't take a reference to value_type_tag. * Pre: Add repeat_values. Requires itertools, so this commit is not stand-alone. * Pre: Expose the first transaction ID as bootstrap::TX0. This is handy for testing. * Pre: Improve debug module. * Pre: Bump rusqlite version for https://github.com/jgallagher/rusqlite/issues/211. * Pre: Use itertools. * Start implementing bulk SQL insertion algorithms. (#214) This is slightly simpler re-expression of the existing Clojure implementation. * Post: Start generic data-driven transaction testing. (#188) * Review comment: `use ::{SYMBOL}` instead of `use {SYMBOL}`. * Review comment: Prefer bindings_per_statement to values_per_statement.
This commit is contained in:
parent
c585715224
commit
afafcd64a0
9 changed files with 862 additions and 80 deletions
|
@ -9,9 +9,10 @@ nickel = "0.9.0"
|
|||
slog = "1.4.0"
|
||||
slog-scope = "0.2.2"
|
||||
slog-term = "1.3.4"
|
||||
time = "0.1.35"
|
||||
|
||||
[dependencies.rusqlite]
|
||||
version = "0.9.3"
|
||||
version = "0.9.5"
|
||||
# System sqlite might be very old.
|
||||
features = ["bundled"]
|
||||
|
||||
|
|
|
@ -4,10 +4,13 @@ version = "0.0.1"
|
|||
|
||||
[dependencies]
|
||||
error-chain = "0.8.0"
|
||||
itertools = "0.5.9"
|
||||
lazy_static = "0.2.2"
|
||||
ordered-float = "0.4.0"
|
||||
time = "0.1.35"
|
||||
|
||||
[dependencies.rusqlite]
|
||||
version = "0.9.3"
|
||||
version = "0.9.5"
|
||||
# System sqlite might be very old.
|
||||
features = ["bundled"]
|
||||
|
||||
|
@ -22,3 +25,7 @@ path = "../tx"
|
|||
|
||||
[dependencies.mentat_tx_parser]
|
||||
path = "../tx-parser"
|
||||
|
||||
# Should be dev-dependencies.
|
||||
[dependencies.tabwriter]
|
||||
version = "1.0.3"
|
||||
|
|
|
@ -10,7 +10,7 @@
|
|||
|
||||
#![allow(dead_code)]
|
||||
|
||||
use {to_namespaced_keyword};
|
||||
use ::{to_namespaced_keyword};
|
||||
use edn;
|
||||
use edn::types::Value;
|
||||
use entids;
|
||||
|
@ -20,6 +20,11 @@ use mentat_tx_parser;
|
|||
use types::{IdentMap, Partition, PartitionMap, Schema, TypedValue};
|
||||
use values;
|
||||
|
||||
/// The first transaction ID applied to the knowledge base.
|
||||
///
|
||||
/// This is the start of the :db.part/tx partition.
|
||||
pub const TX0: i64 = 0x10000000;
|
||||
|
||||
lazy_static! {
|
||||
static ref V1_IDENTS: Vec<(&'static str, i64)> = {
|
||||
vec![(":db/ident", entids::DB_IDENT),
|
||||
|
@ -70,14 +75,14 @@ lazy_static! {
|
|||
static ref V1_PARTS: Vec<(&'static str, i64, i64)> = {
|
||||
vec![(":db.part/db", 0, (1 + V1_IDENTS.len()) as i64),
|
||||
(":db.part/user", 0x10000, 0x10000),
|
||||
(":db.part/tx", 0x10000000, 0x10000000),
|
||||
(":db.part/tx", TX0, TX0),
|
||||
]
|
||||
};
|
||||
|
||||
static ref V2_PARTS: Vec<(&'static str, i64, i64)> = {
|
||||
vec![(":db.part/db", 0, (1 + V2_IDENTS.len()) as i64),
|
||||
(":db.part/user", 0x10000, 0x10000),
|
||||
(":db.part/tx", 0x10000000, 0x10000000),
|
||||
(":db.part/tx", TX0, TX0),
|
||||
]
|
||||
};
|
||||
|
||||
|
|
510
db/src/db.rs
510
db/src/db.rs
|
@ -10,11 +10,18 @@
|
|||
|
||||
#![allow(dead_code)]
|
||||
|
||||
use std::iter::once;
|
||||
|
||||
use itertools;
|
||||
use itertools::Itertools;
|
||||
use rusqlite;
|
||||
use rusqlite::types::{ToSql, ToSqlOutput};
|
||||
use time;
|
||||
|
||||
use ::{repeat_values, to_namespaced_keyword};
|
||||
use bootstrap;
|
||||
use edn::types::Value;
|
||||
use entids;
|
||||
use errors::*;
|
||||
use mentat_tx::entities as entmod;
|
||||
use mentat_tx::entities::Entity;
|
||||
|
@ -166,7 +173,7 @@ pub fn create_current_version(conn: &mut rusqlite::Connection) -> Result<i32> {
|
|||
}
|
||||
|
||||
let bootstrap_db = DB::new(bootstrap_partition_map, bootstrap::bootstrap_schema());
|
||||
bootstrap_db.transact_internal(&tx, &bootstrap::bootstrap_entities()[..])?;
|
||||
bootstrap_db.transact_internal(&tx, &bootstrap::bootstrap_entities()[..], bootstrap::TX0)?;
|
||||
|
||||
set_user_version(&tx, CURRENT_VERSION)?;
|
||||
let user_version = get_user_version(&tx)?;
|
||||
|
@ -285,8 +292,8 @@ pub fn ensure_current_version(conn: &mut rusqlite::Connection) -> Result<i32> {
|
|||
|
||||
impl TypedValue {
|
||||
/// Given a SQLite `value` and a `value_type_tag`, return the corresponding `TypedValue`.
|
||||
pub fn from_sql_value_pair(value: rusqlite::types::Value, value_type_tag: &i32) -> Result<TypedValue> {
|
||||
match (*value_type_tag, value) {
|
||||
pub fn from_sql_value_pair(value: rusqlite::types::Value, value_type_tag: i32) -> Result<TypedValue> {
|
||||
match (value_type_tag, value) {
|
||||
(0, rusqlite::types::Value::Integer(x)) => Ok(TypedValue::Ref(x)),
|
||||
(1, rusqlite::types::Value::Integer(x)) => Ok(TypedValue::Boolean(0 != x)),
|
||||
// SQLite distinguishes integral from decimal types, allowing long and double to
|
||||
|
@ -295,7 +302,7 @@ impl TypedValue {
|
|||
(5, rusqlite::types::Value::Real(x)) => Ok(TypedValue::Double(x.into())),
|
||||
(10, rusqlite::types::Value::Text(x)) => Ok(TypedValue::String(x)),
|
||||
(13, rusqlite::types::Value::Text(x)) => Ok(TypedValue::Keyword(x)),
|
||||
(_, value) => bail!(ErrorKind::BadSQLValuePair(value, *value_type_tag)),
|
||||
(_, value) => bail!(ErrorKind::BadSQLValuePair(value, value_type_tag)),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -338,7 +345,7 @@ impl TypedValue {
|
|||
&TypedValue::Long(x) => (Value::Integer(x), ValueType::Long),
|
||||
&TypedValue::Double(x) => (Value::Float(x), ValueType::Double),
|
||||
&TypedValue::String(ref x) => (Value::Text(x.clone()), ValueType::String),
|
||||
&TypedValue::Keyword(ref x) => (Value::Text(x.clone()), ValueType::Keyword),
|
||||
&TypedValue::Keyword(ref x) => (Value::NamespacedKeyword(to_namespaced_keyword(&x).unwrap()), ValueType::Keyword),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -373,7 +380,7 @@ pub fn read_schema(conn: &rusqlite::Connection, ident_map: &IdentMap) -> Result<
|
|||
let symbolic_attr: String = row.get_checked(1)?;
|
||||
let v: rusqlite::types::Value = row.get_checked(2)?;
|
||||
let value_type_tag: i32 = row.get_checked(3)?;
|
||||
let typed_value = TypedValue::from_sql_value_pair(v, &value_type_tag)?;
|
||||
let typed_value = TypedValue::from_sql_value_pair(v, value_type_tag)?;
|
||||
|
||||
Ok((symbolic_ident, symbolic_attr, typed_value))
|
||||
})?.collect();
|
||||
|
@ -390,6 +397,15 @@ pub fn read_db(conn: &rusqlite::Connection) -> Result<DB> {
|
|||
Ok(DB::new(partition_map, schema))
|
||||
}
|
||||
|
||||
/// Internal representation of an [e a v added] datom, ready to be transacted against the store.
|
||||
type ReducedEntity = (i64, i64, TypedValue, bool);
|
||||
|
||||
#[derive(Clone,Debug,Eq,Hash,Ord,PartialOrd,PartialEq)]
|
||||
pub enum SearchType {
|
||||
Exact,
|
||||
Inexact,
|
||||
}
|
||||
|
||||
impl DB {
|
||||
/// Do schema-aware typechecking and coercion.
|
||||
///
|
||||
|
@ -416,45 +432,372 @@ impl DB {
|
|||
}
|
||||
}
|
||||
|
||||
/// Create empty temporary tables for search parameters and search results.
|
||||
fn create_temp_tables(&self, conn: &rusqlite::Connection) -> Result<()> {
|
||||
// We can't do this in one shot, since we can't prepare a batch statement.
|
||||
let statements = [
|
||||
r#"DROP TABLE IF EXISTS temp.exact_searches"#,
|
||||
// TODO: compress bit flags into a single bit field, and expand when inserting into
|
||||
// `datoms` and `transactions`.
|
||||
// TODO: drop tx0 entirely.
|
||||
r#"CREATE TABLE temp.exact_searches (
|
||||
e0 INTEGER NOT NULL,
|
||||
a0 SMALLINT NOT NULL,
|
||||
v0 BLOB NOT NULL,
|
||||
value_type_tag0 SMALLINT NOT NULL,
|
||||
tx0 INTEGER NOT NULL,
|
||||
added0 TINYINT NOT NULL,
|
||||
index_avet0 TINYINT NOT NULL,
|
||||
index_vaet0 TINYINT NOT NULL,
|
||||
index_fulltext0 TINYINT NOT NULL,
|
||||
unique_value0 TINYINT NOT NULL)"#,
|
||||
// There's no real need to split exact and inexact searches, so long as we keep things
|
||||
// in the correct place and performant. Splitting has the advantage of being explicit
|
||||
// and slightly easier to read, so we'll do that to start.
|
||||
r#"DROP TABLE IF EXISTS temp.inexact_searches"#,
|
||||
r#"CREATE TABLE temp.inexact_searches (
|
||||
e0 INTEGER NOT NULL,
|
||||
a0 SMALLINT NOT NULL,
|
||||
v0 BLOB NOT NULL,
|
||||
value_type_tag0 SMALLINT NOT NULL,
|
||||
tx0 INTEGER NOT NULL,
|
||||
added0 TINYINT NOT NULL,
|
||||
index_avet0 TINYINT NOT NULL,
|
||||
index_vaet0 TINYINT NOT NULL,
|
||||
index_fulltext0 TINYINT NOT NULL,
|
||||
unique_value0 TINYINT NOT NULL)"#,
|
||||
r#"DROP TABLE IF EXISTS temp.search_results"#,
|
||||
// TODO: don't encode search_type as a STRING. This is explicit and much easier to read
|
||||
// than another flag, so we'll do it to start, and optimize later.
|
||||
r#"CREATE TABLE temp.search_results (
|
||||
e0 INTEGER NOT NULL,
|
||||
a0 SMALLINT NOT NULL,
|
||||
v0 BLOB NOT NULL,
|
||||
value_type_tag0 SMALLINT NOT NULL,
|
||||
tx0 INTEGER NOT NULL,
|
||||
added0 TINYINT NOT NULL,
|
||||
index_avet0 TINYINT NOT NULL,
|
||||
index_vaet0 TINYINT NOT NULL,
|
||||
index_fulltext0 TINYINT NOT NULL,
|
||||
unique_value0 TINYINT NOT NULL,
|
||||
search_type STRING NOT NULL,
|
||||
rid INTEGER,
|
||||
v BLOB)"#,
|
||||
// It is an error to transact the same [e a v] twice in one transaction. This index will
|
||||
// cause insertion to fail if a transaction tries to do that. (Sadly, the failure is
|
||||
// opaque.)
|
||||
//
|
||||
// N.b.: temp goes on index name, not table name. See http://stackoverflow.com/a/22308016.
|
||||
r#"CREATE UNIQUE INDEX IF NOT EXISTS temp.search_results_unique ON search_results (e0, a0, v0, value_type_tag0)"#,
|
||||
];
|
||||
|
||||
for statement in &statements {
|
||||
let mut stmt = conn.prepare_cached(statement)?;
|
||||
stmt.execute(&[])
|
||||
.map(|_c| ())
|
||||
.chain_err(|| "Failed to create temporary tables")?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Insert search rows into temporary search tables.
|
||||
///
|
||||
/// Eventually, the details of this approach will be captured in
|
||||
/// https://github.com/mozilla/mentat/wiki/Transacting:-entity-to-SQL-translation.
|
||||
fn insert_non_fts_searches<'a>(&self, conn: &rusqlite::Connection, entities: &'a [ReducedEntity], tx: Entid, search_type: SearchType) -> Result<()> {
|
||||
let bindings_per_statement = 10;
|
||||
|
||||
let chunks: itertools::IntoChunks<_> = entities.into_iter().chunks(::SQLITE_MAX_VARIABLE_NUMBER / bindings_per_statement);
|
||||
|
||||
// We'd like to flat_map here, but it's not obvious how to flat_map across Result.
|
||||
let results: Result<Vec<()>> = chunks.into_iter().map(|chunk| -> Result<()> {
|
||||
let mut count = 0;
|
||||
|
||||
// We must keep these computed values somewhere to reference them later, so we can't
|
||||
// combine this map and the subsequent flat_map.
|
||||
// (e0, a0, v0, value_type_tag0, added0, index_avet0, index_vaet0, index_fulltext0, unique_value0)
|
||||
let block: Result<Vec<(i64 /* e */, i64 /* a */,
|
||||
ToSqlOutput<'a> /* value */, /* value_type_tag */ i32,
|
||||
/* added0 */ bool,
|
||||
/* index_avet0 */ bool,
|
||||
/* index_vaet0 */ bool,
|
||||
/* index_fulltext0 */ bool,
|
||||
/* unique_value0 */ bool)>> = chunk.map(|&(e, a, ref typed_value, added)| {
|
||||
count += 1;
|
||||
let attribute: &Attribute = self.schema.require_attribute_for_entid(&a)?;
|
||||
|
||||
// Now we can represent the typed value as an SQL value.
|
||||
let (value, value_type_tag): (ToSqlOutput, i32) = typed_value.to_sql_value_pair();
|
||||
|
||||
Ok((e, a, value, value_type_tag,
|
||||
added,
|
||||
attribute.index,
|
||||
attribute.value_type == ValueType::Ref,
|
||||
attribute.fulltext,
|
||||
attribute.unique_value))
|
||||
}).collect();
|
||||
let block = block?;
|
||||
|
||||
// `params` reference computed values in `block`.
|
||||
let params: Vec<&ToSql> = block.iter().flat_map(|&(ref e, ref a, ref value, ref value_type_tag, added, index_avet, index_vaet, index_fulltext, unique_value)| {
|
||||
// Avoid inner heap allocation.
|
||||
// TODO: extract some finite length iterator to make this less indented!
|
||||
once(e as &ToSql)
|
||||
.chain(once(a as &ToSql)
|
||||
.chain(once(value as &ToSql)
|
||||
.chain(once(value_type_tag as &ToSql)
|
||||
.chain(once(&tx as &ToSql)
|
||||
.chain(once(to_bool_ref(added) as &ToSql)
|
||||
.chain(once(to_bool_ref(index_avet) as &ToSql)
|
||||
.chain(once(to_bool_ref(index_vaet) as &ToSql)
|
||||
.chain(once(to_bool_ref(index_fulltext) as &ToSql)
|
||||
.chain(once(to_bool_ref(unique_value) as &ToSql))))))))))
|
||||
}).collect();
|
||||
|
||||
// TODO: cache this for selected values of count.
|
||||
let values: String = repeat_values(bindings_per_statement, count);
|
||||
let s: String = if search_type == SearchType::Exact {
|
||||
format!("INSERT INTO temp.exact_searches (e0, a0, v0, value_type_tag0, tx0, added0, index_avet0, index_vaet0, index_fulltext0, unique_value0) VALUES {}", values)
|
||||
} else {
|
||||
format!("INSERT INTO temp.inexact_searches (e0, a0, v0, value_type_tag0, tx0, added0, index_avet0, index_vaet0, index_fulltext0, unique_value0) VALUES {}", values)
|
||||
};
|
||||
|
||||
// TODO: consider ensuring we inserted the expected number of rows.
|
||||
let mut stmt = conn.prepare_cached(s.as_str())?;
|
||||
stmt.execute(¶ms)
|
||||
.map(|_c| ())
|
||||
.chain_err(|| "Could not insert non-fts one statements into temporary search table!")
|
||||
}).collect::<Result<Vec<()>>>();
|
||||
|
||||
results.map(|_| ())
|
||||
}
|
||||
|
||||
/// Take search rows and complete `temp.search_results`.
|
||||
///
|
||||
/// See https://github.com/mozilla/mentat/wiki/Transacting:-entity-to-SQL-translation.
|
||||
fn search(&self, conn: &rusqlite::Connection) -> Result<()> {
|
||||
// First is fast, only one table walk: lookup by exact eav.
|
||||
// Second is slower, but still only one table walk: lookup old value by ea.
|
||||
let s = r#"
|
||||
INSERT INTO temp.search_results
|
||||
SELECT t.e0, t.a0, t.v0, t.value_type_tag0, t.tx0, t.added0, t.index_avet0, t.index_vaet0, t.index_fulltext0, t.unique_value0, ':db.cardinality/many', d.rowid, d.v
|
||||
FROM temp.exact_searches AS t
|
||||
LEFT JOIN datoms AS d
|
||||
ON t.e0 = d.e AND
|
||||
t.a0 = d.a AND
|
||||
t.value_type_tag0 = d.value_type_tag AND
|
||||
t.v0 = d.v
|
||||
|
||||
UNION ALL
|
||||
|
||||
SELECT t.e0, t.a0, t.v0, t.value_type_tag0, t.tx0, t.added0, t.index_avet0, t.index_vaet0, t.index_fulltext0, t.unique_value0, ':db.cardinality/one', d.rowid, d.v
|
||||
FROM temp.inexact_searches AS t
|
||||
LEFT JOIN datoms AS d
|
||||
ON t.e0 = d.e AND
|
||||
t.a0 = d.a"#;
|
||||
|
||||
let mut stmt = conn.prepare_cached(s)?;
|
||||
stmt.execute(&[])
|
||||
.map(|_c| ())
|
||||
.chain_err(|| "Could not search!")
|
||||
}
|
||||
|
||||
/// Insert the new transaction into the `transactions` table.
|
||||
///
|
||||
/// This turns the contents of `search_results` into a new transaction.
|
||||
///
|
||||
/// See https://github.com/mozilla/mentat/wiki/Transacting:-entity-to-SQL-translation.
|
||||
// TODO: capture `conn` in a `TxInternal` structure.
|
||||
fn insert_transaction(&self, conn: &rusqlite::Connection, tx: Entid) -> Result<()> {
|
||||
let s = r#"
|
||||
INSERT INTO transactions (e, a, v, tx, added, value_type_tag)
|
||||
SELECT e0, a0, v0, ?, 1, value_type_tag0
|
||||
FROM temp.search_results
|
||||
WHERE added0 IS 1 AND ((rid IS NULL) OR ((rid IS NOT NULL) AND (v0 IS NOT v)))"#;
|
||||
|
||||
let mut stmt = conn.prepare_cached(s)?;
|
||||
stmt.execute(&[&tx])
|
||||
.map(|_c| ())
|
||||
.chain_err(|| "Could not insert transaction: failed to add datoms not already present")?;
|
||||
|
||||
let s = r#"
|
||||
INSERT INTO transactions (e, a, v, tx, added, value_type_tag)
|
||||
SELECT e0, a0, v, ?, 0, value_type_tag0
|
||||
FROM temp.search_results
|
||||
WHERE rid IS NOT NULL AND
|
||||
((added0 IS 0) OR
|
||||
(added0 IS 1 AND search_type IS ':db.cardinality/one' AND v0 IS NOT v))"#;
|
||||
|
||||
let mut stmt = conn.prepare_cached(s)?;
|
||||
stmt.execute(&[&tx])
|
||||
.map(|_c| ())
|
||||
.chain_err(|| "Could not insert transaction: failed to retract datoms already present")?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Update the contents of the `datoms` materialized view with the new transaction.
|
||||
///
|
||||
/// This applies the contents of `search_results` to the `datoms` table (in place).
|
||||
///
|
||||
/// See https://github.com/mozilla/mentat/wiki/Transacting:-entity-to-SQL-translation.
|
||||
// TODO: capture `conn` in a `TxInternal` structure.
|
||||
fn update_datoms(&self, conn: &rusqlite::Connection, tx: Entid) -> Result<()> {
|
||||
// Delete datoms that were retracted, or those that were :db.cardinality/one and will be
|
||||
// replaced.
|
||||
let s = r#"
|
||||
WITH ids AS (SELECT rid
|
||||
FROM temp.search_results
|
||||
WHERE rid IS NOT NULL AND
|
||||
((added0 IS 0) OR
|
||||
(added0 IS 1 AND search_type IS ':db.cardinality/one' AND v0 IS NOT v)))
|
||||
DELETE FROM datoms WHERE rowid IN ids"#;
|
||||
|
||||
let mut stmt = conn.prepare_cached(s)?;
|
||||
stmt.execute(&[])
|
||||
.map(|_c| ())
|
||||
.chain_err(|| "Could not update datoms: failed to retract datoms already present")?;
|
||||
|
||||
// Insert datoms that were added and not already present.
|
||||
let s = r#"
|
||||
INSERT INTO datoms (e, a, v, tx, value_type_tag, index_avet, index_vaet, index_fulltext, unique_value)
|
||||
SELECT e0, a0, v0, ?, value_type_tag0,
|
||||
index_avet0, index_vaet0, index_fulltext0, unique_value0
|
||||
FROM temp.search_results
|
||||
WHERE added0 IS 1 AND ((rid IS NULL) OR ((rid IS NOT NULL) AND (v0 IS NOT v)))"#;
|
||||
|
||||
let mut stmt = conn.prepare_cached(s)?;
|
||||
stmt.execute(&[&tx])
|
||||
.map(|_c| ())
|
||||
.chain_err(|| "Could not update datoms: failed to add datoms not already present")?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Transact the given `entities` against the given SQLite `conn`, using the metadata in
|
||||
/// `self.DB`.
|
||||
///
|
||||
/// This approach is explained in https://github.com/mozilla/mentat/wiki/Transacting.
|
||||
// TODO: move this to the transactor layer.
|
||||
pub fn transact_internal(&self, conn: &rusqlite::Connection, entities: &[Entity]) -> Result<()>{
|
||||
// TODO: manage :db/tx, write :db/txInstant.
|
||||
let tx = 1;
|
||||
pub fn transact_internal(&self, conn: &rusqlite::Connection, entities: &[Entity], tx: Entid) -> Result<()>{
|
||||
// TODO: push these into an internal transaction report?
|
||||
|
||||
/// Assertions that are :db.cardinality/one and not :db.fulltext.
|
||||
let mut non_fts_one: Vec<ReducedEntity> = vec![];
|
||||
|
||||
/// Assertions that are :db.cardinality/many and not :db.fulltext.
|
||||
let mut non_fts_many: Vec<ReducedEntity> = vec![];
|
||||
|
||||
// Transact [:db/add :db/txInstant NOW :db/tx].
|
||||
// TODO: allow this to be present in the transaction data.
|
||||
let now = time::get_time();
|
||||
let tx_instant = (now.sec as i64 * 1_000) + (now.nsec as i64 / (1_000_000));
|
||||
non_fts_one.push((tx,
|
||||
entids::DB_TX_INSTANT,
|
||||
TypedValue::Long(tx_instant),
|
||||
true));
|
||||
|
||||
// Right now, this could be a for loop, saving some mapping, collecting, and type
|
||||
// annotations. However, I expect it to be a multi-stage map as we start to transform the
|
||||
// underlying entities, in which case this expression is more natural than for loops.
|
||||
let r: Vec<Result<()>> = entities.into_iter().map(|entity: &Entity| -> Result<()> {
|
||||
match *entity {
|
||||
Entity::Add {
|
||||
e: entmod::EntidOrLookupRef::Entid(entmod::Entid::Ident(ref e_)),
|
||||
a: entmod::Entid::Ident(ref a_),
|
||||
e: entmod::EntidOrLookupRef::Entid(ref e_),
|
||||
a: ref a_,
|
||||
v: entmod::ValueOrLookupRef::Value(ref v_),
|
||||
tx: _ } => {
|
||||
|
||||
// TODO: prepare and cache all these statements outside the transaction loop.
|
||||
// XXX: Error types.
|
||||
let mut stmt: rusqlite::Statement = conn.prepare("INSERT INTO datoms(e, a, v, tx, value_type_tag, index_avet, index_vaet, index_fulltext, unique_value) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)")?;
|
||||
let e: i64 = *self.schema.require_entid(&e_.to_string())?;
|
||||
let a: i64 = *self.schema.require_entid(&a_.to_string())?;
|
||||
let e: i64 = match e_ {
|
||||
&entmod::Entid::Entid(ref e__) => *e__,
|
||||
&entmod::Entid::Ident(ref e__) => *self.schema.require_entid(&e__.to_string())?,
|
||||
};
|
||||
|
||||
let a: i64 = match a_ {
|
||||
&entmod::Entid::Entid(ref a__) => *a__,
|
||||
&entmod::Entid::Ident(ref a__) => *self.schema.require_entid(&a__.to_string())?,
|
||||
};
|
||||
|
||||
let attribute: &Attribute = self.schema.require_attribute_for_entid(&a)?;
|
||||
if attribute.fulltext {
|
||||
bail!(ErrorKind::NotYetImplemented(format!("Transacting :db/fulltext entities is not yet implemented: {:?}", entity)))
|
||||
}
|
||||
|
||||
// This is our chance to do schema-aware typechecking: to either assert that the
|
||||
// given value is in the attribute's value set, or (in limited cases) to coerce
|
||||
// the value into the attribute's value set.
|
||||
let typed_value: TypedValue = self.to_typed_value(v_, &attribute)?;
|
||||
|
||||
// Now we can represent the typed value as an SQL value.
|
||||
let (value, value_type_tag): (ToSqlOutput, i32) = typed_value.to_sql_value_pair();
|
||||
|
||||
// Fun times, type signatures.
|
||||
let values: [&ToSql; 9] = [&e, &a, &value, &tx, &value_type_tag, &attribute.index, to_bool_ref(attribute.value_type == ValueType::Ref), &attribute.fulltext, &attribute.unique_value];
|
||||
stmt.insert(&values[..])?;
|
||||
let added = true;
|
||||
if attribute.multival {
|
||||
non_fts_many.push((e, a, typed_value, added));
|
||||
} else {
|
||||
non_fts_one.push((e, a, typed_value, added));
|
||||
}
|
||||
Ok(())
|
||||
},
|
||||
// TODO: find a better error type for this.
|
||||
_ => panic!(format!("Transacting entity not yet supported: {:?}", entity))
|
||||
|
||||
Entity::Retract {
|
||||
e: entmod::EntidOrLookupRef::Entid(ref e_),
|
||||
a: ref a_,
|
||||
v: entmod::ValueOrLookupRef::Value(ref v_) } => {
|
||||
|
||||
let e: i64 = match e_ {
|
||||
&entmod::Entid::Entid(ref e__) => *e__,
|
||||
&entmod::Entid::Ident(ref e__) => *self.schema.require_entid(&e__.to_string())?,
|
||||
};
|
||||
|
||||
let a: i64 = match a_ {
|
||||
&entmod::Entid::Entid(ref a__) => *a__,
|
||||
&entmod::Entid::Ident(ref a__) => *self.schema.require_entid(&a__.to_string())?,
|
||||
};
|
||||
|
||||
let attribute: &Attribute = self.schema.require_attribute_for_entid(&a)?;
|
||||
if attribute.fulltext {
|
||||
bail!(ErrorKind::NotYetImplemented(format!("Transacting :db/fulltext entities is not yet implemented: {:?}", entity)))
|
||||
}
|
||||
|
||||
// This is our chance to do schema-aware typechecking: to either assert that the
|
||||
// given value is in the attribute's value set, or (in limited cases) to coerce
|
||||
// the value into the attribute's value set.
|
||||
let typed_value: TypedValue = self.to_typed_value(v_, &attribute)?;
|
||||
|
||||
let added = false;
|
||||
|
||||
if attribute.multival {
|
||||
non_fts_many.push((e, a, typed_value, added));
|
||||
} else {
|
||||
non_fts_one.push((e, a, typed_value, added));
|
||||
}
|
||||
Ok(())
|
||||
},
|
||||
|
||||
_ => bail!(ErrorKind::NotYetImplemented(format!("Transacting this entity is not yet implemented: {:?}", entity)))
|
||||
}
|
||||
}).collect();
|
||||
|
||||
let x: Result<Vec<()>> = r.into_iter().collect();
|
||||
x.map(|_| ())
|
||||
let r: Result<Vec<()>> = r.into_iter().collect();
|
||||
r?;
|
||||
|
||||
self.create_temp_tables(conn)?;
|
||||
|
||||
if !non_fts_one.is_empty() {
|
||||
self.insert_non_fts_searches(conn, &non_fts_one[..], tx, SearchType::Inexact)?;
|
||||
}
|
||||
|
||||
if !non_fts_many.is_empty() {
|
||||
self.insert_non_fts_searches(conn, &non_fts_many[..], tx, SearchType::Exact)?;
|
||||
}
|
||||
|
||||
self.search(conn)?;
|
||||
|
||||
self.insert_transaction(conn, tx)?;
|
||||
self.update_datoms(conn, tx)?;
|
||||
|
||||
// TODO: update parts, idents, schema materialized views.
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -463,6 +806,9 @@ mod tests {
|
|||
use super::*;
|
||||
use bootstrap;
|
||||
use debug;
|
||||
use edn;
|
||||
use edn::symbols;
|
||||
use mentat_tx_parser;
|
||||
use rusqlite;
|
||||
use types::*;
|
||||
|
||||
|
@ -471,46 +817,110 @@ mod tests {
|
|||
// TODO: figure out how to reference the fixtures directory for real. For now, assume we're
|
||||
// executing `cargo test` in `db/`.
|
||||
let conn = rusqlite::Connection::open("../fixtures/v2empty.db").unwrap();
|
||||
// assert_eq!(ensure_current_version(&mut conn).unwrap(), CURRENT_VERSION);
|
||||
|
||||
// TODO: write :db/txInstant, bump :db.part/tx.
|
||||
// let partition_map = read_partition_map(&conn).unwrap();
|
||||
// assert_eq!(partition_map, bootstrap::bootstrap_partition_map());
|
||||
|
||||
let ident_map = read_ident_map(&conn).unwrap();
|
||||
assert_eq!(ident_map, bootstrap::bootstrap_ident_map());
|
||||
|
||||
let schema = read_schema(&conn, &ident_map).unwrap();
|
||||
assert_eq!(schema, bootstrap::bootstrap_schema()); // Schema::default());
|
||||
assert_eq!(schema, bootstrap::bootstrap_schema());
|
||||
|
||||
let db = read_db(&conn).unwrap();
|
||||
|
||||
let datoms = debug::datoms_after(&conn, &db, &0).unwrap();
|
||||
assert_eq!(datoms.len(), 89); // The 89th is the :db/txInstant value.
|
||||
// Does not include :db/txInstant.
|
||||
let datoms = debug::datoms_after(&conn, &db, 0).unwrap();
|
||||
assert_eq!(datoms.0.len(), 88);
|
||||
|
||||
// // TODO: fewer magic numbers!
|
||||
// assert_eq!(debug::datoms_after(&conn, &db, &0x10000001).unwrap(), vec![]);
|
||||
// Includes :db/txInstant.
|
||||
let transactions = debug::transactions_after(&conn, &db, 0).unwrap();
|
||||
assert_eq!(transactions.0.len(), 1);
|
||||
assert_eq!(transactions.0[0].0.len(), 89);
|
||||
}
|
||||
|
||||
/// Assert that a sequence of transactions meets expectations.
|
||||
///
|
||||
/// The transactions, expectations, and optional labels, are given in a simple EDN format; see
|
||||
/// https://github.com/mozilla/mentat/wiki/Transacting:-EDN-test-format.
|
||||
///
|
||||
/// There is some magic here about transaction numbering that I don't want to commit to or
|
||||
/// document just yet. The end state might be much more general pattern matching syntax, rather
|
||||
/// than the targeted transaction ID and timestamp replacement we have right now.
|
||||
fn assert_transactions(conn: &rusqlite::Connection, db: &DB, transactions: &Vec<edn::Value>) {
|
||||
for (index, transaction) in transactions.into_iter().enumerate() {
|
||||
let index = index as i64;
|
||||
let transaction = transaction.as_map().unwrap();
|
||||
let label: edn::Value = transaction.get(&edn::Value::NamespacedKeyword(symbols::NamespacedKeyword::new("test", "label"))).unwrap().clone();
|
||||
let assertions: edn::Value = transaction.get(&edn::Value::NamespacedKeyword(symbols::NamespacedKeyword::new("test", "assertions"))).unwrap().clone();
|
||||
// TODO: use hyphenated keywords, like :test/expected-transaction -- when the EDN parser
|
||||
// supports them!
|
||||
let expected_transaction: Option<&edn::Value> = transaction.get(&edn::Value::NamespacedKeyword(symbols::NamespacedKeyword::new("test", "expectedtransaction")));
|
||||
let expected_datoms: Option<&edn::Value> = transaction.get(&edn::Value::NamespacedKeyword(symbols::NamespacedKeyword::new("test", "expecteddatoms")));
|
||||
|
||||
let entities: Vec<_> = mentat_tx_parser::Tx::parse(&[assertions][..]).unwrap();
|
||||
db.transact_internal(&conn, &entities[..], bootstrap::TX0 + index + 1).unwrap();
|
||||
|
||||
if let Some(expected_transaction) = expected_transaction {
|
||||
let transactions = debug::transactions_after(&conn, &db, bootstrap::TX0 + index).unwrap();
|
||||
assert_eq!(transactions.0[0].into_edn(),
|
||||
*expected_transaction,
|
||||
"\n{} - expected transaction:\n{}\n{}", label, transactions.0[0].into_edn(), *expected_transaction);
|
||||
}
|
||||
|
||||
if let Some(expected_datoms) = expected_datoms {
|
||||
let datoms = debug::datoms_after(&conn, &db, bootstrap::TX0).unwrap();
|
||||
assert_eq!(datoms.into_edn(),
|
||||
*expected_datoms,
|
||||
"\n{} - expected datoms:\n{}\n{}", label, datoms.into_edn(), *expected_datoms);
|
||||
}
|
||||
|
||||
// Don't allow empty tests. This will need to change if we allow transacting schema
|
||||
// fragments in a preamble, but for now it might catch malformed tests.
|
||||
assert_ne!((expected_transaction, expected_datoms), (None, None),
|
||||
"Transaction test must include at least one of :test/expectedtransaction or :test/expecteddatoms");
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_create_current_version() {
|
||||
// // assert_eq!(bootstrap_schema().unwrap(), Schema::default());
|
||||
|
||||
// // Ignore result.
|
||||
// use std::fs;
|
||||
// let _ = fs::remove_file("/Users/nalexander/Mozilla/mentat/test.db");
|
||||
// let mut conn = rusqlite::Connection::open("file:///Users/nalexander/Mozilla/mentat/test.db").unwrap();
|
||||
|
||||
fn test_add() {
|
||||
let mut conn = new_connection();
|
||||
|
||||
assert_eq!(ensure_current_version(&mut conn).unwrap(), CURRENT_VERSION);
|
||||
|
||||
let bootstrap_db = DB::new(bootstrap::bootstrap_partition_map(), bootstrap::bootstrap_schema());
|
||||
// TODO: write materialized view of bootstrapped schema to SQL store.
|
||||
// let db = read_db(&conn).unwrap();
|
||||
// assert_eq!(db, bootstrap_db);
|
||||
|
||||
let datoms = debug::datoms_after(&conn, &bootstrap_db, &0).unwrap();
|
||||
assert_eq!(datoms.len(), 88);
|
||||
// Does not include :db/txInstant.
|
||||
let datoms = debug::datoms_after(&conn, &bootstrap_db, 0).unwrap();
|
||||
assert_eq!(datoms.0.len(), 88);
|
||||
|
||||
// Includes :db/txInstant.
|
||||
let transactions = debug::transactions_after(&conn, &bootstrap_db, 0).unwrap();
|
||||
assert_eq!(transactions.0.len(), 1);
|
||||
assert_eq!(transactions.0[0].0.len(), 89);
|
||||
|
||||
// TODO: extract a test macro simplifying this boilerplate yet further.
|
||||
let value = edn::parse::value(include_str!("../../tx/fixtures/test_add.edn")).unwrap();
|
||||
|
||||
let transactions = value.as_vector().unwrap();
|
||||
assert_transactions(&conn, &bootstrap_db, transactions);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_retract() {
|
||||
let mut conn = new_connection();
|
||||
assert_eq!(ensure_current_version(&mut conn).unwrap(), CURRENT_VERSION);
|
||||
|
||||
let bootstrap_db = DB::new(bootstrap::bootstrap_partition_map(), bootstrap::bootstrap_schema());
|
||||
|
||||
// Does not include :db/txInstant.
|
||||
let datoms = debug::datoms_after(&conn, &bootstrap_db, 0).unwrap();
|
||||
assert_eq!(datoms.0.len(), 88);
|
||||
|
||||
// Includes :db/txInstant.
|
||||
let transactions = debug::transactions_after(&conn, &bootstrap_db, 0).unwrap();
|
||||
assert_eq!(transactions.0.len(), 1);
|
||||
assert_eq!(transactions.0[0].0.len(), 89);
|
||||
|
||||
let value = edn::parse::value(include_str!("../../tx/fixtures/test_retract.edn")).unwrap();
|
||||
|
||||
let transactions = value.as_vector().unwrap();
|
||||
assert_transactions(&conn, &bootstrap_db, transactions);
|
||||
}
|
||||
}
|
||||
|
|
198
db/src/debug.rs
198
db/src/debug.rs
|
@ -12,55 +12,203 @@
|
|||
|
||||
/// Low-level functions for testing.
|
||||
|
||||
use rusqlite;
|
||||
use std::collections::{BTreeSet};
|
||||
use std::io::{Write};
|
||||
|
||||
use {to_namespaced_keyword};
|
||||
use edn::types::{Value};
|
||||
use itertools::Itertools;
|
||||
use rusqlite;
|
||||
use rusqlite::types::{ToSql};
|
||||
use tabwriter::TabWriter;
|
||||
|
||||
use ::{to_namespaced_keyword};
|
||||
use bootstrap;
|
||||
use edn;
|
||||
use edn::symbols;
|
||||
use entids;
|
||||
use mentat_tx::entities::{Entid};
|
||||
use types::{DB, TypedValue};
|
||||
use errors::Result;
|
||||
|
||||
/// Represents an assertion (*datom*) in the store.
|
||||
/// Represents a *datom* (assertion) in the store.
|
||||
#[derive(Clone,Debug,Eq,Hash,Ord,PartialOrd,PartialEq)]
|
||||
pub struct Datom {
|
||||
// TODO: generalize this.
|
||||
e: Entid,
|
||||
a: Entid,
|
||||
v: Value,
|
||||
tx: Option<i64>,
|
||||
v: edn::Value,
|
||||
tx: i64,
|
||||
added: Option<bool>,
|
||||
}
|
||||
|
||||
/// Return the complete set of datoms in the store, ordered by (e, a, v).
|
||||
pub fn datoms(conn: &rusqlite::Connection, db: &DB) -> Result<Vec<Datom>> {
|
||||
// TODO: fewer magic numbers!
|
||||
datoms_after(conn, db, &0x10000000)
|
||||
/// Represents a set of datoms (assertions) in the store.
|
||||
pub struct Datoms(pub BTreeSet<Datom>);
|
||||
|
||||
/// Represents an ordered sequence of transactions in the store.
|
||||
pub struct Transactions(pub Vec<Datoms>);
|
||||
|
||||
fn label_tx_id(tx: i64) -> edn::Value {
|
||||
edn::Value::PlainSymbol(symbols::PlainSymbol::new(format!("?tx{}", tx - bootstrap::TX0)))
|
||||
}
|
||||
|
||||
/// Return the set of datoms in the store with transaction ID strictly
|
||||
/// greater than the given `tx`, ordered by (tx, e, a, v).
|
||||
pub fn datoms_after(conn: &rusqlite::Connection, db: &DB, tx: &i32) -> Result<Vec<Datom>> {
|
||||
let mut stmt: rusqlite::Statement = conn.prepare("SELECT e, a, v, value_type_tag FROM datoms WHERE tx > ? ORDER BY tx, e, a, v")?;
|
||||
fn label_tx_instant(tx: i64) -> edn::Value {
|
||||
edn::Value::PlainSymbol(symbols::PlainSymbol::new(format!("?ms{}", tx - bootstrap::TX0)))
|
||||
}
|
||||
|
||||
// Convert numeric entid to entity Entid.
|
||||
let to_entid = |x| {
|
||||
db.schema.get_ident(&x).and_then(|y| to_namespaced_keyword(&y)).map(Entid::Ident).unwrap_or(Entid::Entid(x))
|
||||
};
|
||||
impl Datom {
|
||||
pub fn into_edn<T, U>(&self, tx_id: T, tx_instant: &U) -> edn::Value
|
||||
where T: Fn(i64) -> edn::Value, U: Fn(i64) -> edn::Value {
|
||||
let f = |entid: &Entid| -> edn::Value {
|
||||
match *entid {
|
||||
Entid::Entid(ref y) => edn::Value::Integer(y.clone()),
|
||||
Entid::Ident(ref y) => edn::Value::NamespacedKeyword(y.clone()),
|
||||
}
|
||||
};
|
||||
|
||||
let datoms = stmt.query_and_then(&[tx], |row| {
|
||||
// Rewrite [E :db/txInstant V] to [?txN :db/txInstant ?t0].
|
||||
let mut v = if self.a == Entid::Entid(entids::DB_TX_INSTANT) || self.a == Entid::Ident(to_namespaced_keyword(":db/txInstant").unwrap()) {
|
||||
vec![tx_id(self.tx),
|
||||
f(&self.a),
|
||||
tx_instant(self.tx)]
|
||||
} else {
|
||||
vec![f(&self.e), f(&self.a), self.v.clone()]
|
||||
};
|
||||
if let Some(added) = self.added {
|
||||
v.push(tx_id(self.tx));
|
||||
v.push(edn::Value::Boolean(added));
|
||||
}
|
||||
|
||||
edn::Value::Vector(v)
|
||||
}
|
||||
}
|
||||
|
||||
impl Datoms {
|
||||
pub fn into_edn_raw<T, U>(&self, tx_id: &T, tx_instant: &U) -> edn::Value
|
||||
where T: Fn(i64) -> edn::Value, U: Fn(i64) -> edn::Value {
|
||||
edn::Value::Set((&self.0).into_iter().map(|x| x.into_edn(tx_id, tx_instant)).collect())
|
||||
}
|
||||
|
||||
pub fn into_edn(&self) -> edn::Value {
|
||||
self.into_edn_raw(&label_tx_id, &label_tx_instant)
|
||||
}
|
||||
}
|
||||
|
||||
impl Transactions {
|
||||
pub fn into_edn_raw<T, U>(&self, tx_id: &T, tx_instant: &U) -> edn::Value
|
||||
where T: Fn(i64) -> edn::Value, U: Fn(i64) -> edn::Value {
|
||||
edn::Value::Vector((&self.0).into_iter().map(|x| x.into_edn_raw(tx_id, tx_instant)).collect())
|
||||
}
|
||||
|
||||
pub fn into_edn(&self) -> edn::Value {
|
||||
self.into_edn_raw(&label_tx_id, &label_tx_instant)
|
||||
}
|
||||
}
|
||||
|
||||
/// Convert a numeric entid to an ident `Entid` if possible, otherwise a numeric `Entid`.
|
||||
fn to_entid(db: &DB, entid: i64) -> Entid {
|
||||
db.schema.get_ident(&entid).and_then(|ident| to_namespaced_keyword(&ident)).map_or(Entid::Entid(entid), Entid::Ident)
|
||||
}
|
||||
|
||||
/// Return the set of datoms in the store, ordered by (e, a, v, tx), but not including any datoms of
|
||||
/// the form [... :db/txInstant ...].
|
||||
pub fn datoms(conn: &rusqlite::Connection, db: &DB) -> Result<Datoms> {
|
||||
datoms_after(conn, db, bootstrap::TX0 - 1)
|
||||
}
|
||||
|
||||
/// Return the set of datoms in the store with transaction ID strictly greater than the given `tx`,
|
||||
/// ordered by (e, a, v, tx).
|
||||
///
|
||||
/// The datom set returned does not include any datoms of the form [... :db/txInstant ...].
|
||||
pub fn datoms_after(conn: &rusqlite::Connection, db: &DB, tx: i64) -> Result<Datoms> {
|
||||
let mut stmt: rusqlite::Statement = conn.prepare("SELECT e, a, v, value_type_tag, tx FROM datoms WHERE tx > ? ORDER BY e ASC, a ASC, v ASC, tx ASC")?;
|
||||
|
||||
let r: Result<Vec<_>> = stmt.query_and_then(&[&tx], |row| {
|
||||
let e: i64 = row.get_checked(0)?;
|
||||
let a: i64 = row.get_checked(1)?;
|
||||
|
||||
if a == entids::DB_TX_INSTANT {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let v: rusqlite::types::Value = row.get_checked(2)?;
|
||||
let value_type_tag: i32 = row.get_checked(3)?;
|
||||
|
||||
let typed_value = TypedValue::from_sql_value_pair(v, &value_type_tag)?;
|
||||
let typed_value = TypedValue::from_sql_value_pair(v, value_type_tag)?;
|
||||
let (value, _) = typed_value.to_edn_value_pair();
|
||||
|
||||
Ok(Datom {
|
||||
e: to_entid(e),
|
||||
a: to_entid(a),
|
||||
let tx: i64 = row.get_checked(4)?;
|
||||
|
||||
Ok(Some(Datom {
|
||||
e: to_entid(db, e),
|
||||
a: to_entid(db, a),
|
||||
v: value,
|
||||
tx: None,
|
||||
tx: tx,
|
||||
added: None,
|
||||
}))
|
||||
})?.collect();
|
||||
|
||||
Ok(Datoms(r?.into_iter().filter_map(|x| x).collect()))
|
||||
}
|
||||
|
||||
/// Return the sequence of transactions in the store with transaction ID strictly greater than the
|
||||
/// given `tx`, ordered by (tx, e, a, v).
|
||||
///
|
||||
/// Each transaction returned includes the [:db/tx :db/txInstant ...] datom.
|
||||
pub fn transactions_after(conn: &rusqlite::Connection, db: &DB, tx: i64) -> Result<Transactions> {
|
||||
let mut stmt: rusqlite::Statement = conn.prepare("SELECT e, a, v, value_type_tag, tx, added FROM transactions WHERE tx > ? ORDER BY tx ASC, e ASC, a ASC, v ASC, added ASC")?;
|
||||
|
||||
let r: Result<Vec<_>> = stmt.query_and_then(&[&tx], |row| {
|
||||
let e: i64 = row.get_checked(0)?;
|
||||
let a: i64 = row.get_checked(1)?;
|
||||
|
||||
let v: rusqlite::types::Value = row.get_checked(2)?;
|
||||
let value_type_tag: i32 = row.get_checked(3)?;
|
||||
|
||||
let typed_value = TypedValue::from_sql_value_pair(v, value_type_tag)?;
|
||||
let (value, _) = typed_value.to_edn_value_pair();
|
||||
|
||||
let tx: i64 = row.get_checked(4)?;
|
||||
let added: bool = row.get_checked(5)?;
|
||||
|
||||
Ok(Datom {
|
||||
e: to_entid(db, e),
|
||||
a: to_entid(db, a),
|
||||
v: value,
|
||||
tx: tx,
|
||||
added: Some(added),
|
||||
})
|
||||
})?.collect();
|
||||
datoms
|
||||
|
||||
// Group by tx.
|
||||
let r: Vec<Datoms> = r?.into_iter().group_by(|x| x.tx).into_iter().map(|(_key, group)| Datoms(group.collect())).collect();
|
||||
Ok(Transactions(r))
|
||||
}
|
||||
|
||||
/// Execute the given `sql` query with the given `params` and format the results as a
|
||||
/// tab-and-newline formatted string suitable for debug printing.
|
||||
///
|
||||
/// The query is printed followed by a newline, then the returned columns followed by a newline, and
|
||||
/// then the data rows and columns. All columns are aligned.
|
||||
pub fn dump_sql_query(conn: &rusqlite::Connection, sql: &str, params: &[&ToSql]) -> Result<String> {
|
||||
let mut stmt: rusqlite::Statement = conn.prepare(sql)?;
|
||||
|
||||
let mut tw = TabWriter::new(Vec::new()).padding(2);
|
||||
write!(&mut tw, "{}\n", sql).unwrap();
|
||||
|
||||
for column_name in stmt.column_names() {
|
||||
write!(&mut tw, "{}\t", column_name).unwrap();
|
||||
}
|
||||
write!(&mut tw, "\n").unwrap();
|
||||
|
||||
let r: Result<Vec<_>> = stmt.query_and_then(params, |row| {
|
||||
for i in 0..row.column_count() {
|
||||
let value: rusqlite::types::Value = row.get_checked(i)?;
|
||||
write!(&mut tw, "{:?}\t", value).unwrap();
|
||||
}
|
||||
write!(&mut tw, "\n").unwrap();
|
||||
Ok(())
|
||||
})?.collect();
|
||||
r?;
|
||||
|
||||
let dump = String::from_utf8(tw.into_inner().unwrap()).unwrap();
|
||||
Ok(dump)
|
||||
}
|
||||
|
|
|
@ -10,14 +10,21 @@
|
|||
|
||||
#[macro_use]
|
||||
extern crate error_chain;
|
||||
extern crate itertools;
|
||||
#[macro_use]
|
||||
extern crate lazy_static;
|
||||
extern crate rusqlite;
|
||||
extern crate time;
|
||||
|
||||
extern crate tabwriter;
|
||||
|
||||
extern crate edn;
|
||||
extern crate mentat_tx;
|
||||
extern crate mentat_tx_parser;
|
||||
|
||||
use itertools::Itertools;
|
||||
use std::iter::repeat;
|
||||
|
||||
pub use errors::*;
|
||||
pub use schema::*;
|
||||
pub use types::*;
|
||||
|
@ -33,6 +40,8 @@ mod values;
|
|||
|
||||
use edn::symbols;
|
||||
|
||||
pub const SQLITE_MAX_VARIABLE_NUMBER: usize = 999;
|
||||
|
||||
pub fn to_namespaced_keyword(s: &str) -> Option<symbols::NamespacedKeyword> {
|
||||
let splits = [':', '/'];
|
||||
let mut i = s.split(&splits[..]);
|
||||
|
@ -41,3 +50,26 @@ pub fn to_namespaced_keyword(s: &str) -> Option<symbols::NamespacedKeyword> {
|
|||
_ => None
|
||||
}
|
||||
}
|
||||
|
||||
/// Prepare an SQL `VALUES` block, like (?, ?, ?), (?, ?, ?).
|
||||
///
|
||||
/// The number of values per tuple determines `(?, ?, ?)`. The number of tuples determines `(...), (...)`.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```rust
|
||||
/// # use mentat_db::{repeat_values};
|
||||
/// assert_eq!(repeat_values(1, 3), "(?), (?), (?)".to_string());
|
||||
/// assert_eq!(repeat_values(3, 1), "(?, ?, ?)".to_string());
|
||||
/// assert_eq!(repeat_values(2, 2), "(?, ?), (?, ?)".to_string());
|
||||
/// ```
|
||||
pub fn repeat_values(values_per_tuple: usize, tuples: usize) -> String {
|
||||
assert!(values_per_tuple >= 1);
|
||||
assert!(tuples >= 1);
|
||||
assert!(values_per_tuple * tuples < SQLITE_MAX_VARIABLE_NUMBER, "Too many values: {} * {} >= {}", values_per_tuple, tuples, SQLITE_MAX_VARIABLE_NUMBER);
|
||||
// Like "(?, ?, ?)".
|
||||
let inner = format!("({})", repeat("?").take(values_per_tuple).join(", "));
|
||||
// Like "(?, ?, ?), (?, ?, ?)".
|
||||
let values: String = repeat(inner).take(tuples).join(", ");
|
||||
values
|
||||
}
|
||||
|
|
54
db/tests/value_tests.rs
Normal file
54
db/tests/value_tests.rs
Normal file
|
@ -0,0 +1,54 @@
|
|||
// Copyright 2016 Mozilla
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
|
||||
// this file except in compliance with the License. You may obtain a copy of the
|
||||
// License at http://www.apache.org/licenses/LICENSE-2.0
|
||||
// Unless required by applicable law or agreed to in writing, software distributed
|
||||
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
|
||||
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations under the License.
|
||||
|
||||
extern crate edn;
|
||||
extern crate mentat_db;
|
||||
extern crate ordered_float;
|
||||
extern crate rusqlite;
|
||||
|
||||
use mentat_db::{TypedValue, ValueType};
|
||||
use ordered_float::OrderedFloat;
|
||||
use edn::symbols;
|
||||
|
||||
// It's not possible to test to_sql_value_pair since rusqlite::ToSqlOutput doesn't implement
|
||||
// PartialEq.
|
||||
#[test]
|
||||
fn test_from_sql_value_pair() {
|
||||
assert_eq!(TypedValue::from_sql_value_pair(rusqlite::types::Value::Integer(1234), 0).unwrap(), TypedValue::Ref(1234));
|
||||
|
||||
assert_eq!(TypedValue::from_sql_value_pair(rusqlite::types::Value::Integer(0), 1).unwrap(), TypedValue::Boolean(false));
|
||||
assert_eq!(TypedValue::from_sql_value_pair(rusqlite::types::Value::Integer(1), 1).unwrap(), TypedValue::Boolean(true));
|
||||
|
||||
assert_eq!(TypedValue::from_sql_value_pair(rusqlite::types::Value::Integer(0), 5).unwrap(), TypedValue::Long(0));
|
||||
assert_eq!(TypedValue::from_sql_value_pair(rusqlite::types::Value::Integer(1234), 5).unwrap(), TypedValue::Long(1234));
|
||||
|
||||
assert_eq!(TypedValue::from_sql_value_pair(rusqlite::types::Value::Real(0.0), 5).unwrap(), TypedValue::Double(OrderedFloat(0.0)));
|
||||
assert_eq!(TypedValue::from_sql_value_pair(rusqlite::types::Value::Real(0.5), 5).unwrap(), TypedValue::Double(OrderedFloat(0.5)));
|
||||
|
||||
assert_eq!(TypedValue::from_sql_value_pair(rusqlite::types::Value::Text(":db/keyword".into()), 10).unwrap(), TypedValue::String(":db/keyword".into()));
|
||||
assert_eq!(TypedValue::from_sql_value_pair(rusqlite::types::Value::Text(":db/keyword".into()), 13).unwrap(), TypedValue::Keyword(":db/keyword".into()));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_to_edn_value_pair() {
|
||||
assert_eq!(TypedValue::Ref(1234).to_edn_value_pair(), (edn::Value::Integer(1234), ValueType::Ref));
|
||||
|
||||
assert_eq!(TypedValue::Boolean(false).to_edn_value_pair(), (edn::Value::Boolean(false), ValueType::Boolean));
|
||||
assert_eq!(TypedValue::Boolean(true).to_edn_value_pair(), (edn::Value::Boolean(true), ValueType::Boolean));
|
||||
|
||||
assert_eq!(TypedValue::Long(0).to_edn_value_pair(), (edn::Value::Integer(0), ValueType::Long));
|
||||
assert_eq!(TypedValue::Long(1234).to_edn_value_pair(), (edn::Value::Integer(1234), ValueType::Long));
|
||||
|
||||
assert_eq!(TypedValue::Double(OrderedFloat(0.0)).to_edn_value_pair(), (edn::Value::Float(OrderedFloat(0.0)), ValueType::Double));
|
||||
assert_eq!(TypedValue::Double(OrderedFloat(0.5)).to_edn_value_pair(), (edn::Value::Float(OrderedFloat(0.5)), ValueType::Double));
|
||||
|
||||
assert_eq!(TypedValue::String(":db/keyword".into()).to_edn_value_pair(), (edn::Value::Text(":db/keyword".into()), ValueType::String));
|
||||
assert_eq!(TypedValue::Keyword(":db/keyword".into()).to_edn_value_pair(), (edn::Value::NamespacedKeyword(symbols::NamespacedKeyword::new("db", "keyword")), ValueType::Keyword));
|
||||
}
|
66
tx/fixtures/test_add.edn
Normal file
66
tx/fixtures/test_add.edn
Normal file
|
@ -0,0 +1,66 @@
|
|||
[{:test/label ":db.cardinality/one, insert"
|
||||
:test/assertions
|
||||
[[:db/add 100 :db/ident :keyword/value1]
|
||||
[:db/add 101 :db/ident :keyword/value2]]
|
||||
:test/expectedtransaction
|
||||
#{[100 :db/ident :keyword/value1 ?tx1 true]
|
||||
[101 :db/ident :keyword/value2 ?tx1 true]
|
||||
[?tx1 :db/txInstant ?ms1 ?tx1 true]}
|
||||
:test/expecteddatoms
|
||||
#{[100 :db/ident :keyword/value1]
|
||||
[101 :db/ident :keyword/value2]}}
|
||||
|
||||
{:test/label ":db.cardinality/many, insert"
|
||||
:test/assertions
|
||||
[[:db/add 200 :db.schema/attribute 100]
|
||||
[:db/add 200 :db.schema/attribute 101]]
|
||||
:test/expectedtransaction
|
||||
#{[200 :db.schema/attribute 100 ?tx2 true]
|
||||
[200 :db.schema/attribute 101 ?tx2 true]
|
||||
[?tx2 :db/txInstant ?ms2 ?tx2 true]}
|
||||
:test/expecteddatoms
|
||||
#{[100 :db/ident :keyword/value1]
|
||||
[101 :db/ident :keyword/value2]
|
||||
[200 :db.schema/attribute 100]
|
||||
[200 :db.schema/attribute 101]}}
|
||||
|
||||
{:test/label ":db.cardinality/one, replace"
|
||||
:test/assertions
|
||||
[[:db/add 100 :db/ident :keyword/value11]
|
||||
[:db/add 101 :db/ident :keyword/value22]]
|
||||
:test/expectedtransaction
|
||||
#{[100 :db/ident :keyword/value1 ?tx3 false]
|
||||
[100 :db/ident :keyword/value11 ?tx3 true]
|
||||
[101 :db/ident :keyword/value2 ?tx3 false]
|
||||
[101 :db/ident :keyword/value22 ?tx3 true]
|
||||
[?tx3 :db/txInstant ?ms3 ?tx3 true]}
|
||||
:test/expecteddatoms
|
||||
#{[100 :db/ident :keyword/value11]
|
||||
[101 :db/ident :keyword/value22]
|
||||
[200 :db.schema/attribute 100]
|
||||
[200 :db.schema/attribute 101]}}
|
||||
|
||||
{:test/label ":db.cardinality/one, already present"
|
||||
:test/assertions
|
||||
[[:db/add 100 :db/ident :keyword/value11]
|
||||
[:db/add 101 :db/ident :keyword/value22]]
|
||||
:test/expectedtransaction
|
||||
#{[?tx4 :db/txInstant ?ms4 ?tx4 true]}
|
||||
:test/expecteddatoms
|
||||
#{[100 :db/ident :keyword/value11]
|
||||
[101 :db/ident :keyword/value22]
|
||||
[200 :db.schema/attribute 100]
|
||||
[200 :db.schema/attribute 101]}}
|
||||
|
||||
{:test/label ":db.cardinality/many, already present"
|
||||
:test/assertions
|
||||
[[:db/add 200 :db.schema/attribute 100]
|
||||
[:db/add 200 :db.schema/attribute 101]]
|
||||
:test/expectedtransaction
|
||||
#{[?tx5 :db/txInstant ?ms5 ?tx5 true]}
|
||||
:test/expecteddatoms
|
||||
#{[100 :db/ident :keyword/value11]
|
||||
[101 :db/ident :keyword/value22]
|
||||
[200 :db.schema/attribute 100]
|
||||
[200 :db.schema/attribute 101]}}
|
||||
]
|
59
tx/fixtures/test_retract.edn
Normal file
59
tx/fixtures/test_retract.edn
Normal file
|
@ -0,0 +1,59 @@
|
|||
[{:test/label ":db.cardinality/one, insert"
|
||||
:test/assertions
|
||||
[[:db/add 100 :db/ident :keyword/value1]
|
||||
[:db/add 101 :db/ident :keyword/value2]]
|
||||
:test/expectedtransaction
|
||||
#{[100 :db/ident :keyword/value1 ?tx1 true]
|
||||
[101 :db/ident :keyword/value2 ?tx1 true]
|
||||
[?tx1 :db/txInstant ?ms1 ?tx1 true]}
|
||||
:test/expecteddatoms
|
||||
#{[100 :db/ident :keyword/value1]
|
||||
[101 :db/ident :keyword/value2]}}
|
||||
|
||||
{:test/label ":db.cardinality/many, insert"
|
||||
:test/assertions
|
||||
[[:db/add 200 :db.schema/attribute 100]
|
||||
[:db/add 200 :db.schema/attribute 101]]
|
||||
:test/expectedtransaction
|
||||
#{[200 :db.schema/attribute 100 ?tx2 true]
|
||||
[200 :db.schema/attribute 101 ?tx2 true]
|
||||
[?tx2 :db/txInstant ?ms2 ?tx2 true]}
|
||||
:test/expecteddatoms
|
||||
#{[100 :db/ident :keyword/value1]
|
||||
[101 :db/ident :keyword/value2]
|
||||
[200 :db.schema/attribute 100]
|
||||
[200 :db.schema/attribute 101]}}
|
||||
|
||||
{:test/label ":db.cardinality/one, retract"
|
||||
:test/assertions
|
||||
[[:db/retract 100 :db/ident :keyword/value1]]
|
||||
:test/expectedtransaction
|
||||
#{[100 :db/ident :keyword/value1 ?tx3 false]
|
||||
[?tx3 :db/txInstant ?ms3 ?tx3 true]}
|
||||
:test/expecteddatoms
|
||||
#{[101 :db/ident :keyword/value2]
|
||||
[200 :db.schema/attribute 100]
|
||||
[200 :db.schema/attribute 101]}}
|
||||
|
||||
{:test/label ":db.cardinality/many, retract"
|
||||
:test/assertions
|
||||
[[:db/retract 200 :db.schema/attribute 100]]
|
||||
:test/expectedtransaction
|
||||
#{[200 :db.schema/attribute 100 ?tx4 false]
|
||||
[?tx4 :db/txInstant ?ms4 ?tx4 true]}
|
||||
:test/expecteddatoms
|
||||
#{[101 :db/ident :keyword/value2]
|
||||
[200 :db.schema/attribute 101]}
|
||||
}
|
||||
|
||||
{:test/label ":db.cardinality/{one,many}, not present."
|
||||
:test/assertions
|
||||
[[:db/retract 100 :db/ident :keyword/value1]
|
||||
[:db/retract 200 :db.schema/attribute 100]]
|
||||
:test/expectedtransaction
|
||||
#{[?tx5 :db/txInstant ?ms5 ?tx5 true]}
|
||||
:test/expecteddatoms
|
||||
#{[101 :db/ident :keyword/value2]
|
||||
[200 :db.schema/attribute 101]}
|
||||
}
|
||||
]
|
Loading…
Reference in a new issue