* Pre: Drop unneeded tx0 from search results. * Pre: Don't require a schema in some of the DB code. The idea is to separate the transaction applying code, which is schema-aware, from the concrete storage code, which is just concerned with getting bits onto disk. * Pre: Only reference Schema, not DB, in debug module. This is part of a larger separation of the volatile PartitionMap, which is modified every transaction, from the stable Schema, which is infrequently modified. * Pre: Fix indentation. * Extract part of DB to new SchemaTypeChecking trait. * Extract part of DB to new PartitionMapping trait. * Pre: Don't expect :db.part/tx partition to advance when tx fails. This fails right now, because we allocate tx IDs even when we shouldn't. * Sketch a db interface without DB. * Add ValueParseError; use error-chain in tx-parser. This can be simplified when https://github.com/Marwes/combine/issues/86 makes it to a published release, but this unblocks us for now. This converts the `combine` error type `ParseError<&'a [edn::Value]>` to a type with owned `Vec<edn::Value>` collections, re-using `edn::Value::Vector` for making them `Display`. * Pre: Accept Borrow<Schema> instead of just &Schema in debug module. This makes it easy to use Rc<Schema> or Arc<Schema> without inserting &* sigils throughout the code. * Use error-chain in query-parser. There are a few things to point out here: - the fine grained error types have been flattened into one crate-wide error type; it's pretty easy to regain the granularity as needed. - edn::ParseError is automatically lifted to mentat_query_parser::errors::Error; - we use mentat_parser_utils::ValueParser to maintain parsing error information from `combine`. * Patch up top-level. * Review comment: Only `borrow()` once.
This commit is contained in:
parent
5e3cdd1fc2
commit
dcd9bcb1ce
24 changed files with 657 additions and 408 deletions
|
@ -18,6 +18,7 @@ rustc_version = "0.1.7"
|
|||
|
||||
[dependencies]
|
||||
clap = "2.19.3"
|
||||
error-chain = "0.9.0"
|
||||
nickel = "0.9.0"
|
||||
slog = "1.4.0"
|
||||
slog-scope = "0.2.2"
|
||||
|
|
|
@ -4,7 +4,7 @@ version = "0.0.1"
|
|||
workspace = ".."
|
||||
|
||||
[dependencies]
|
||||
error-chain = "0.8.0"
|
||||
error-chain = "0.9.0"
|
||||
itertools = "0.5.9"
|
||||
lazy_static = "0.2.2"
|
||||
ordered-float = "0.4.0"
|
||||
|
|
|
@ -96,8 +96,8 @@ lazy_static! {
|
|||
static ref V1_SYMBOLIC_SCHEMA: Value = {
|
||||
let s = r#"
|
||||
{:db/ident {:db/valueType :db.type/keyword
|
||||
:db/cardinality :db.cardinality/one
|
||||
:db/unique :db.unique/identity}
|
||||
:db/cardinality :db.cardinality/one
|
||||
:db/unique :db.unique/identity}
|
||||
:db.install/partition {:db/valueType :db.type/ref
|
||||
:db/cardinality :db.cardinality/many}
|
||||
:db.install/valueType {:db/valueType :db.type/ref
|
||||
|
|
464
db/src/db.rs
464
db/src/db.rs
|
@ -22,7 +22,7 @@ use itertools::Itertools;
|
|||
use rusqlite;
|
||||
use rusqlite::types::{ToSql, ToSqlOutput};
|
||||
|
||||
use ::{now, repeat_values, to_namespaced_keyword};
|
||||
use ::{repeat_values, to_namespaced_keyword};
|
||||
use bootstrap;
|
||||
use edn::types::Value;
|
||||
use edn::symbols;
|
||||
|
@ -35,7 +35,6 @@ use mentat_core::{
|
|||
TypedValue,
|
||||
ValueType,
|
||||
};
|
||||
use mentat_tx::entities::Entity;
|
||||
use errors::{ErrorKind, Result, ResultExt};
|
||||
use schema::SchemaBuilding;
|
||||
use types::{
|
||||
|
@ -44,9 +43,8 @@ use types::{
|
|||
DB,
|
||||
Partition,
|
||||
PartitionMap,
|
||||
TxReport,
|
||||
};
|
||||
use tx::Tx;
|
||||
use tx::transact;
|
||||
|
||||
pub fn new_connection<T>(uri: T) -> rusqlite::Result<rusqlite::Connection> where T: AsRef<Path> {
|
||||
let conn = match uri.as_ref().to_string_lossy().len() {
|
||||
|
@ -209,13 +207,20 @@ pub fn create_current_version(conn: &mut rusqlite::Connection) -> Result<DB> {
|
|||
}
|
||||
|
||||
// TODO: return to transact_internal to self-manage the encompassing SQLite transaction.
|
||||
let mut bootstrap_db = DB::new(bootstrap_partition_map, bootstrap::bootstrap_schema());
|
||||
bootstrap_db.transact(&tx, bootstrap::bootstrap_entities())?;
|
||||
let bootstrap_schema = bootstrap::bootstrap_schema();
|
||||
let (_report, next_partition_map, next_schema) = transact(&tx, &bootstrap_partition_map, &bootstrap_schema, bootstrap::bootstrap_entities())?;
|
||||
if next_schema.is_some() {
|
||||
// TODO Use custom ErrorKind https://github.com/brson/error-chain/issues/117
|
||||
bail!(ErrorKind::NotYetImplemented(format!("Initial bootstrap transaction did not produce expected bootstrap schema")));
|
||||
}
|
||||
|
||||
set_user_version(&tx, CURRENT_VERSION)?;
|
||||
|
||||
// TODO: use the drop semantics to do this automagically?
|
||||
tx.commit()?;
|
||||
|
||||
// TODO: ensure that schema is not changed by bootstrap transaction.
|
||||
let bootstrap_db = DB::new(next_partition_map, bootstrap_schema);
|
||||
Ok(bootstrap_db)
|
||||
}
|
||||
|
||||
|
@ -449,7 +454,7 @@ pub fn read_db(conn: &rusqlite::Connection) -> Result<DB> {
|
|||
}
|
||||
|
||||
/// Internal representation of an [e a v added] datom, ready to be transacted against the store.
|
||||
pub type ReducedEntity = (i64, i64, TypedValue, bool);
|
||||
pub type ReducedEntity<'a> = (Entid, Entid, &'a Attribute, TypedValue, bool);
|
||||
|
||||
#[derive(Clone,Debug,Eq,Hash,Ord,PartialOrd,PartialEq)]
|
||||
pub enum SearchType {
|
||||
|
@ -457,34 +462,13 @@ pub enum SearchType {
|
|||
Inexact,
|
||||
}
|
||||
|
||||
impl DB {
|
||||
/// Do schema-aware typechecking and coercion.
|
||||
///
|
||||
/// Either assert that the given value is in the attribute's value set, or (in limited cases)
|
||||
/// coerce the given value into the attribute's value set.
|
||||
pub fn to_typed_value(&self, value: &Value, attribute: &Attribute) -> Result<TypedValue> {
|
||||
// TODO: encapsulate entid-ident-attribute for better error messages.
|
||||
match TypedValue::from_edn_value(value) {
|
||||
// We don't recognize this EDN at all. Get out!
|
||||
None => bail!(ErrorKind::BadEDNValuePair(value.clone(), attribute.value_type.clone())),
|
||||
Some(typed_value) => match (&attribute.value_type, typed_value) {
|
||||
// Most types don't coerce at all.
|
||||
(&ValueType::Boolean, tv @ TypedValue::Boolean(_)) => Ok(tv),
|
||||
(&ValueType::Long, tv @ TypedValue::Long(_)) => Ok(tv),
|
||||
(&ValueType::Double, tv @ TypedValue::Double(_)) => Ok(tv),
|
||||
(&ValueType::String, tv @ TypedValue::String(_)) => Ok(tv),
|
||||
(&ValueType::Keyword, tv @ TypedValue::Keyword(_)) => Ok(tv),
|
||||
// Ref coerces a little: we interpret some things depending on the schema as a Ref.
|
||||
(&ValueType::Ref, TypedValue::Long(x)) => Ok(TypedValue::Ref(x)),
|
||||
(&ValueType::Ref, TypedValue::Keyword(ref x)) => {
|
||||
self.schema.require_entid(&x).map(|entid| TypedValue::Ref(entid))
|
||||
}
|
||||
// Otherwise, we have a type mismatch.
|
||||
(value_type, _) => bail!(ErrorKind::BadEDNValuePair(value.clone(), value_type.clone())),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// `MentatStoring` will be the trait that encapsulates the storage layer. It is consumed by the
|
||||
/// transaction processing layer.
|
||||
///
|
||||
/// Right now, the only implementation of `MentatStoring` is the SQLite-specific SQL schema. In the
|
||||
/// future, we might consider other SQL engines (perhaps with different fulltext indexing), or
|
||||
/// entirely different data stores, say ones shaped like key-value stores.
|
||||
pub trait MentatStoring {
|
||||
/// Given a slice of [a v] lookup-refs, look up the corresponding [e a v] triples.
|
||||
///
|
||||
/// It is assumed that the attribute `a` in each lookup-ref is `:db/unique`, so that at most one
|
||||
|
@ -492,8 +476,135 @@ impl DB {
|
|||
/// chosen non-deterministically, if one exists.)
|
||||
///
|
||||
/// Returns a map &(a, v) -> e, to avoid cloning potentially large values. The keys of the map
|
||||
/// are exactly those (a, v) pairs that have an assertion [e a v] in the datom store.
|
||||
pub fn resolve_avs<'a>(&self, conn: &rusqlite::Connection, avs: &'a [&'a AVPair]) -> Result<AVMap<'a>> {
|
||||
/// are exactly those (a, v) pairs that have an assertion [e a v] in the store.
|
||||
fn resolve_avs<'a>(&self, avs: &'a [&'a AVPair]) -> Result<AVMap<'a>>;
|
||||
|
||||
/// Begin (or prepare) the underlying storage layer for a new Mentat transaction.
|
||||
///
|
||||
/// Use this to create temporary tables, prepare indices, set pragmas, etc, before the initial
|
||||
/// `insert_non_fts_searches` invocation.
|
||||
fn begin_transaction(&self) -> Result<()>;
|
||||
|
||||
// TODO: this is not a reasonable abstraction, but I don't want to really consider non-SQL storage just yet.
|
||||
fn insert_non_fts_searches<'a>(&self, entities: &'a [ReducedEntity], search_type: SearchType) -> Result<()>;
|
||||
|
||||
/// Finalize the underlying storage layer after a Mentat transaction.
|
||||
///
|
||||
/// Use this to finalize temporary tables, complete indices, revert pragmas, etc, after the
|
||||
/// final `insert_non_fts_searches` invocation.
|
||||
fn commit_transaction(&self, tx_id: Entid) -> Result<()>;
|
||||
}
|
||||
|
||||
/// Take search rows and complete `temp.search_results`.
|
||||
///
|
||||
/// See https://github.com/mozilla/mentat/wiki/Transacting:-entity-to-SQL-translation.
|
||||
fn search(conn: &rusqlite::Connection) -> Result<()> {
|
||||
// First is fast, only one table walk: lookup by exact eav.
|
||||
// Second is slower, but still only one table walk: lookup old value by ea.
|
||||
let s = r#"
|
||||
INSERT INTO temp.search_results
|
||||
SELECT t.e0, t.a0, t.v0, t.value_type_tag0, t.added0, t.flags0, ':db.cardinality/many', d.rowid, d.v
|
||||
FROM temp.exact_searches AS t
|
||||
LEFT JOIN datoms AS d
|
||||
ON t.e0 = d.e AND
|
||||
t.a0 = d.a AND
|
||||
t.value_type_tag0 = d.value_type_tag AND
|
||||
t.v0 = d.v
|
||||
|
||||
UNION ALL
|
||||
|
||||
SELECT t.e0, t.a0, t.v0, t.value_type_tag0, t.added0, t.flags0, ':db.cardinality/one', d.rowid, d.v
|
||||
FROM temp.inexact_searches AS t
|
||||
LEFT JOIN datoms AS d
|
||||
ON t.e0 = d.e AND
|
||||
t.a0 = d.a"#;
|
||||
|
||||
let mut stmt = conn.prepare_cached(s)?;
|
||||
stmt.execute(&[])
|
||||
.map(|_c| ())
|
||||
.chain_err(|| "Could not search!")
|
||||
}
|
||||
|
||||
/// Insert the new transaction into the `transactions` table.
|
||||
///
|
||||
/// This turns the contents of `search_results` into a new transaction.
|
||||
///
|
||||
/// See https://github.com/mozilla/mentat/wiki/Transacting:-entity-to-SQL-translation.
|
||||
fn insert_transaction(conn: &rusqlite::Connection, tx: Entid) -> Result<()> {
|
||||
let s = r#"
|
||||
INSERT INTO transactions (e, a, v, tx, added, value_type_tag)
|
||||
SELECT e0, a0, v0, ?, 1, value_type_tag0
|
||||
FROM temp.search_results
|
||||
WHERE added0 IS 1 AND ((rid IS NULL) OR ((rid IS NOT NULL) AND (v0 IS NOT v)))"#;
|
||||
|
||||
let mut stmt = conn.prepare_cached(s)?;
|
||||
stmt.execute(&[&tx])
|
||||
.map(|_c| ())
|
||||
.chain_err(|| "Could not insert transaction: failed to add datoms not already present")?;
|
||||
|
||||
let s = r#"
|
||||
INSERT INTO transactions (e, a, v, tx, added, value_type_tag)
|
||||
SELECT e0, a0, v, ?, 0, value_type_tag0
|
||||
FROM temp.search_results
|
||||
WHERE rid IS NOT NULL AND
|
||||
((added0 IS 0) OR
|
||||
(added0 IS 1 AND search_type IS ':db.cardinality/one' AND v0 IS NOT v))"#;
|
||||
|
||||
let mut stmt = conn.prepare_cached(s)?;
|
||||
stmt.execute(&[&tx])
|
||||
.map(|_c| ())
|
||||
.chain_err(|| "Could not insert transaction: failed to retract datoms already present")?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Update the contents of the `datoms` materialized view with the new transaction.
|
||||
///
|
||||
/// This applies the contents of `search_results` to the `datoms` table (in place).
|
||||
///
|
||||
/// See https://github.com/mozilla/mentat/wiki/Transacting:-entity-to-SQL-translation.
|
||||
fn update_datoms(conn: &rusqlite::Connection, tx: Entid) -> Result<()> {
|
||||
// Delete datoms that were retracted, or those that were :db.cardinality/one and will be
|
||||
// replaced.
|
||||
let s = r#"
|
||||
WITH ids AS (SELECT rid
|
||||
FROM temp.search_results
|
||||
WHERE rid IS NOT NULL AND
|
||||
((added0 IS 0) OR
|
||||
(added0 IS 1 AND search_type IS ':db.cardinality/one' AND v0 IS NOT v)))
|
||||
DELETE FROM datoms WHERE rowid IN ids"#;
|
||||
|
||||
let mut stmt = conn.prepare_cached(s)?;
|
||||
stmt.execute(&[])
|
||||
.map(|_c| ())
|
||||
.chain_err(|| "Could not update datoms: failed to retract datoms already present")?;
|
||||
|
||||
// Insert datoms that were added and not already present. We also must
|
||||
// expand our bitfield into flags.
|
||||
let s = format!(r#"
|
||||
INSERT INTO datoms (e, a, v, tx, value_type_tag, index_avet, index_vaet, index_fulltext, unique_value)
|
||||
SELECT e0, a0, v0, ?, value_type_tag0,
|
||||
flags0 & {} IS NOT 0,
|
||||
flags0 & {} IS NOT 0,
|
||||
flags0 & {} IS NOT 0,
|
||||
flags0 & {} IS NOT 0
|
||||
FROM temp.search_results
|
||||
WHERE added0 IS 1 AND ((rid IS NULL) OR ((rid IS NOT NULL) AND (v0 IS NOT v)))"#,
|
||||
AttributeBitFlags::IndexAVET as u8,
|
||||
AttributeBitFlags::IndexVAET as u8,
|
||||
AttributeBitFlags::IndexFulltext as u8,
|
||||
AttributeBitFlags::UniqueValue as u8);
|
||||
|
||||
let mut stmt = conn.prepare_cached(&s)?;
|
||||
stmt.execute(&[&tx])
|
||||
.map(|_c| ())
|
||||
.chain_err(|| "Could not update datoms: failed to add datoms not already present")?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
impl MentatStoring for rusqlite::Connection {
|
||||
fn resolve_avs<'a>(&self, avs: &'a [&'a AVPair]) -> Result<AVMap<'a>> {
|
||||
// Start search_id's at some identifiable number.
|
||||
let initial_search_id = 2000;
|
||||
let bindings_per_statement = 4;
|
||||
|
@ -537,7 +648,7 @@ impl DB {
|
|||
FROM t, all_datoms AS d \
|
||||
WHERE d.index_avet IS NOT 0 AND d.a = t.a AND d.value_type_tag = t.value_type_tag AND d.v = t.v",
|
||||
values);
|
||||
let mut stmt: rusqlite::Statement = conn.prepare(s.as_str())?;
|
||||
let mut stmt: rusqlite::Statement = self.prepare(s.as_str())?;
|
||||
|
||||
let m: Result<Vec<(i64, Entid)>> = stmt.query_and_then(¶ms, |row| -> Result<(i64, Entid)> {
|
||||
Ok((row.get_checked(0)?, row.get_checked(1)?))
|
||||
|
@ -557,20 +668,18 @@ impl DB {
|
|||
}
|
||||
|
||||
/// Create empty temporary tables for search parameters and search results.
|
||||
fn create_temp_tables(&self, conn: &rusqlite::Connection) -> Result<()> {
|
||||
fn begin_transaction(&self) -> Result<()> {
|
||||
// We can't do this in one shot, since we can't prepare a batch statement.
|
||||
let statements = [
|
||||
r#"DROP TABLE IF EXISTS temp.exact_searches"#,
|
||||
// Note that `flags0` is a bitfield of several flags compressed via
|
||||
// `AttributeBitFlags.flags()` in the temporary search tables, later
|
||||
// expanded in the `datoms` insertion.
|
||||
// TODO: drop tx0 entirely.
|
||||
r#"CREATE TABLE temp.exact_searches (
|
||||
e0 INTEGER NOT NULL,
|
||||
a0 SMALLINT NOT NULL,
|
||||
v0 BLOB NOT NULL,
|
||||
value_type_tag0 SMALLINT NOT NULL,
|
||||
tx0 INTEGER NOT NULL,
|
||||
added0 TINYINT NOT NULL,
|
||||
flags0 TINYINT NOT NULL)"#,
|
||||
// There's no real need to split exact and inexact searches, so long as we keep things
|
||||
|
@ -582,7 +691,6 @@ impl DB {
|
|||
a0 SMALLINT NOT NULL,
|
||||
v0 BLOB NOT NULL,
|
||||
value_type_tag0 SMALLINT NOT NULL,
|
||||
tx0 INTEGER NOT NULL,
|
||||
added0 TINYINT NOT NULL,
|
||||
flags0 TINYINT NOT NULL)"#,
|
||||
r#"DROP TABLE IF EXISTS temp.search_results"#,
|
||||
|
@ -593,7 +701,6 @@ impl DB {
|
|||
a0 SMALLINT NOT NULL,
|
||||
v0 BLOB NOT NULL,
|
||||
value_type_tag0 SMALLINT NOT NULL,
|
||||
tx0 INTEGER NOT NULL,
|
||||
added0 TINYINT NOT NULL,
|
||||
flags0 TINYINT NOT NULL,
|
||||
search_type STRING NOT NULL,
|
||||
|
@ -608,7 +715,7 @@ impl DB {
|
|||
];
|
||||
|
||||
for statement in &statements {
|
||||
let mut stmt = conn.prepare_cached(statement)?;
|
||||
let mut stmt = self.prepare_cached(statement)?;
|
||||
stmt.execute(&[])
|
||||
.map(|_c| ())
|
||||
.chain_err(|| "Failed to create temporary tables")?;
|
||||
|
@ -621,8 +728,8 @@ impl DB {
|
|||
///
|
||||
/// Eventually, the details of this approach will be captured in
|
||||
/// https://github.com/mozilla/mentat/wiki/Transacting:-entity-to-SQL-translation.
|
||||
pub fn insert_non_fts_searches<'a>(&self, conn: &rusqlite::Connection, entities: &'a [ReducedEntity], tx: Entid, search_type: SearchType) -> Result<()> {
|
||||
let bindings_per_statement = 7;
|
||||
fn insert_non_fts_searches<'a>(&self, entities: &'a [ReducedEntity<'a>], search_type: SearchType) -> Result<()> {
|
||||
let bindings_per_statement = 6;
|
||||
|
||||
let chunks: itertools::IntoChunks<_> = entities.into_iter().chunks(::SQLITE_MAX_VARIABLE_NUMBER / bindings_per_statement);
|
||||
|
||||
|
@ -633,21 +740,18 @@ impl DB {
|
|||
// We must keep these computed values somewhere to reference them later, so we can't
|
||||
// combine this map and the subsequent flat_map.
|
||||
// (e0, a0, v0, value_type_tag0, added0, flags0)
|
||||
let block: Result<Vec<(i64 /* e */, i64 /* a */,
|
||||
ToSqlOutput<'a> /* value */, /* value_type_tag */ i32,
|
||||
/* added0 */ bool,
|
||||
/* flags0 */ u8)>> = chunk.map(|&(e, a, ref typed_value, added)| {
|
||||
let block: Result<Vec<(i64 /* e */,
|
||||
i64 /* a */,
|
||||
ToSqlOutput<'a> /* value */,
|
||||
i32 /* value_type_tag */,
|
||||
bool, /* added0 */
|
||||
u8 /* flags0 */)>> = chunk.map(|&(e, a, ref attribute, ref typed_value, added)| {
|
||||
count += 1;
|
||||
let attribute: &Attribute = self.schema.require_attribute_for_entid(a)?;
|
||||
|
||||
// Now we can represent the typed value as an SQL value.
|
||||
let (value, value_type_tag): (ToSqlOutput, i32) = typed_value.to_sql_value_pair();
|
||||
|
||||
let flags = attribute.flags();
|
||||
|
||||
Ok((e, a, value, value_type_tag,
|
||||
added,
|
||||
flags))
|
||||
Ok((e, a, value, value_type_tag, added, attribute.flags()))
|
||||
}).collect();
|
||||
let block = block?;
|
||||
|
||||
|
@ -659,21 +763,20 @@ impl DB {
|
|||
.chain(once(a as &ToSql)
|
||||
.chain(once(value as &ToSql)
|
||||
.chain(once(value_type_tag as &ToSql)
|
||||
.chain(once(&tx as &ToSql)
|
||||
.chain(once(to_bool_ref(added) as &ToSql)
|
||||
.chain(once(flags as &ToSql)))))))
|
||||
.chain(once(to_bool_ref(added) as &ToSql)
|
||||
.chain(once(flags as &ToSql))))))
|
||||
}).collect();
|
||||
|
||||
// TODO: cache this for selected values of count.
|
||||
let values: String = repeat_values(bindings_per_statement, count);
|
||||
let s: String = if search_type == SearchType::Exact {
|
||||
format!("INSERT INTO temp.exact_searches (e0, a0, v0, value_type_tag0, tx0, added0, flags0) VALUES {}", values)
|
||||
format!("INSERT INTO temp.exact_searches (e0, a0, v0, value_type_tag0, added0, flags0) VALUES {}", values)
|
||||
} else {
|
||||
format!("INSERT INTO temp.inexact_searches (e0, a0, v0, value_type_tag0, tx0, added0, flags0) VALUES {}", values)
|
||||
format!("INSERT INTO temp.inexact_searches (e0, a0, v0, value_type_tag0, added0, flags0) VALUES {}", values)
|
||||
};
|
||||
|
||||
// TODO: consider ensuring we inserted the expected number of rows.
|
||||
let mut stmt = conn.prepare_cached(s.as_str())?;
|
||||
let mut stmt = self.prepare_cached(s.as_str())?;
|
||||
stmt.execute(¶ms)
|
||||
.map(|_c| ())
|
||||
.chain_err(|| "Could not insert non-fts one statements into temporary search table!")
|
||||
|
@ -682,151 +785,55 @@ impl DB {
|
|||
results.map(|_| ())
|
||||
}
|
||||
|
||||
/// Take search rows and complete `temp.search_results`.
|
||||
///
|
||||
/// See https://github.com/mozilla/mentat/wiki/Transacting:-entity-to-SQL-translation.
|
||||
pub fn search(&self, conn: &rusqlite::Connection) -> Result<()> {
|
||||
// First is fast, only one table walk: lookup by exact eav.
|
||||
// Second is slower, but still only one table walk: lookup old value by ea.
|
||||
let s = r#"
|
||||
INSERT INTO temp.search_results
|
||||
SELECT t.e0, t.a0, t.v0, t.value_type_tag0, t.tx0, t.added0, t.flags0, ':db.cardinality/many', d.rowid, d.v
|
||||
FROM temp.exact_searches AS t
|
||||
LEFT JOIN datoms AS d
|
||||
ON t.e0 = d.e AND
|
||||
t.a0 = d.a AND
|
||||
t.value_type_tag0 = d.value_type_tag AND
|
||||
t.v0 = d.v
|
||||
|
||||
UNION ALL
|
||||
|
||||
SELECT t.e0, t.a0, t.v0, t.value_type_tag0, t.tx0, t.added0, t.flags0, ':db.cardinality/one', d.rowid, d.v
|
||||
FROM temp.inexact_searches AS t
|
||||
LEFT JOIN datoms AS d
|
||||
ON t.e0 = d.e AND
|
||||
t.a0 = d.a"#;
|
||||
|
||||
let mut stmt = conn.prepare_cached(s)?;
|
||||
stmt.execute(&[])
|
||||
.map(|_c| ())
|
||||
.chain_err(|| "Could not search!")
|
||||
}
|
||||
|
||||
/// Insert the new transaction into the `transactions` table.
|
||||
///
|
||||
/// This turns the contents of `search_results` into a new transaction.
|
||||
///
|
||||
/// See https://github.com/mozilla/mentat/wiki/Transacting:-entity-to-SQL-translation.
|
||||
// TODO: capture `conn` in a `TxInternal` structure.
|
||||
pub fn insert_transaction(&self, conn: &rusqlite::Connection, tx: Entid) -> Result<()> {
|
||||
let s = r#"
|
||||
INSERT INTO transactions (e, a, v, tx, added, value_type_tag)
|
||||
SELECT e0, a0, v0, ?, 1, value_type_tag0
|
||||
FROM temp.search_results
|
||||
WHERE added0 IS 1 AND ((rid IS NULL) OR ((rid IS NOT NULL) AND (v0 IS NOT v)))"#;
|
||||
|
||||
let mut stmt = conn.prepare_cached(s)?;
|
||||
stmt.execute(&[&tx])
|
||||
.map(|_c| ())
|
||||
.chain_err(|| "Could not insert transaction: failed to add datoms not already present")?;
|
||||
|
||||
let s = r#"
|
||||
INSERT INTO transactions (e, a, v, tx, added, value_type_tag)
|
||||
SELECT e0, a0, v, ?, 0, value_type_tag0
|
||||
FROM temp.search_results
|
||||
WHERE rid IS NOT NULL AND
|
||||
((added0 IS 0) OR
|
||||
(added0 IS 1 AND search_type IS ':db.cardinality/one' AND v0 IS NOT v))"#;
|
||||
|
||||
let mut stmt = conn.prepare_cached(s)?;
|
||||
stmt.execute(&[&tx])
|
||||
.map(|_c| ())
|
||||
.chain_err(|| "Could not insert transaction: failed to retract datoms already present")?;
|
||||
|
||||
fn commit_transaction(&self, tx_id: Entid) -> Result<()> {
|
||||
search(&self)?;
|
||||
insert_transaction(&self, tx_id)?;
|
||||
update_datoms(&self, tx_id)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Update the contents of the `datoms` materialized view with the new transaction.
|
||||
///
|
||||
/// This applies the contents of `search_results` to the `datoms` table (in place).
|
||||
///
|
||||
/// See https://github.com/mozilla/mentat/wiki/Transacting:-entity-to-SQL-translation.
|
||||
// TODO: capture `conn` in a `TxInternal` structure.
|
||||
pub fn update_datoms(&self, conn: &rusqlite::Connection, tx: Entid) -> Result<()> {
|
||||
// Delete datoms that were retracted, or those that were :db.cardinality/one and will be
|
||||
// replaced.
|
||||
let s = r#"
|
||||
WITH ids AS (SELECT rid
|
||||
FROM temp.search_results
|
||||
WHERE rid IS NOT NULL AND
|
||||
((added0 IS 0) OR
|
||||
(added0 IS 1 AND search_type IS ':db.cardinality/one' AND v0 IS NOT v)))
|
||||
DELETE FROM datoms WHERE rowid IN ids"#;
|
||||
|
||||
let mut stmt = conn.prepare_cached(s)?;
|
||||
stmt.execute(&[])
|
||||
.map(|_c| ())
|
||||
.chain_err(|| "Could not update datoms: failed to retract datoms already present")?;
|
||||
|
||||
// Insert datoms that were added and not already present. We also must
|
||||
// expand our bitfield into flags.
|
||||
let s = format!(r#"
|
||||
INSERT INTO datoms (e, a, v, tx, value_type_tag, index_avet, index_vaet, index_fulltext, unique_value)
|
||||
SELECT e0, a0, v0, ?, value_type_tag0,
|
||||
flags0 & {} IS NOT 0,
|
||||
flags0 & {} IS NOT 0,
|
||||
flags0 & {} IS NOT 0,
|
||||
flags0 & {} IS NOT 0
|
||||
FROM temp.search_results
|
||||
WHERE added0 IS 1 AND ((rid IS NULL) OR ((rid IS NOT NULL) AND (v0 IS NOT v)))"#,
|
||||
AttributeBitFlags::IndexAVET as u8,
|
||||
AttributeBitFlags::IndexVAET as u8,
|
||||
AttributeBitFlags::IndexFulltext as u8,
|
||||
AttributeBitFlags::UniqueValue as u8);
|
||||
|
||||
let mut stmt = conn.prepare_cached(&s)?;
|
||||
stmt.execute(&[&tx])
|
||||
.map(|_c| ())
|
||||
.chain_err(|| "Could not update datoms: failed to add datoms not already present")?;
|
||||
|
||||
Ok(())
|
||||
/// Update the current partition map materialized view.
|
||||
// TODO: only update changed partitions.
|
||||
pub fn update_partition_map(conn: &rusqlite::Connection, partition_map: &PartitionMap) -> Result<()> {
|
||||
let values_per_statement = 2;
|
||||
let max_partitions = ::SQLITE_MAX_VARIABLE_NUMBER / values_per_statement;
|
||||
if partition_map.len() > max_partitions {
|
||||
bail!(ErrorKind::NotYetImplemented(format!("No more than {} partitions are supported", max_partitions)));
|
||||
}
|
||||
|
||||
/// Update the current partition map materialized view.
|
||||
// TODO: only update changed partitions.
|
||||
pub fn update_partition_map(&self, conn: &rusqlite::Connection) -> Result<()> {
|
||||
let values_per_statement = 2;
|
||||
let max_partitions = ::SQLITE_MAX_VARIABLE_NUMBER / values_per_statement;
|
||||
if self.partition_map.len() > max_partitions {
|
||||
bail!(ErrorKind::NotYetImplemented(format!("No more than {} partitions are supported", max_partitions)));
|
||||
}
|
||||
// Like "UPDATE parts SET idx = CASE WHEN part = ? THEN ? WHEN part = ? THEN ? ELSE idx END".
|
||||
let s = format!("UPDATE parts SET idx = CASE {} ELSE idx END",
|
||||
repeat("WHEN part = ? THEN ?").take(partition_map.len()).join(" "));
|
||||
|
||||
// Like "UPDATE parts SET idx = CASE WHEN part = ? THEN ? WHEN part = ? THEN ? ELSE idx END".
|
||||
let s = format!("UPDATE parts SET idx = CASE {} ELSE idx END",
|
||||
repeat("WHEN part = ? THEN ?").take(self.partition_map.len()).join(" "));
|
||||
let params: Vec<&ToSql> = partition_map.iter().flat_map(|(name, partition)| {
|
||||
once(name as &ToSql)
|
||||
.chain(once(&partition.index as &ToSql))
|
||||
}).collect();
|
||||
|
||||
let params: Vec<&ToSql> = self.partition_map.iter().flat_map(|(name, partition)| {
|
||||
once(name as &ToSql)
|
||||
.chain(once(&partition.index as &ToSql))
|
||||
}).collect();
|
||||
// TODO: only cache the latest of these statements. Changing the set of partitions isn't
|
||||
// supported in the Clojure implementation at all, and might not be supported in Mentat soon,
|
||||
// so this is very low priority.
|
||||
let mut stmt = conn.prepare_cached(s.as_str())?;
|
||||
stmt.execute(¶ms[..])
|
||||
.map(|_c| ())
|
||||
.chain_err(|| "Could not update partition map")
|
||||
}
|
||||
|
||||
// TODO: only cache the latest of these statements. Changing the set of partitions isn't
|
||||
// supported in the Clojure implementation at all, and might not be supported in Mentat soon,
|
||||
// so this is very low priority.
|
||||
let mut stmt = conn.prepare_cached(s.as_str())?;
|
||||
stmt.execute(¶ms[..])
|
||||
.map(|_c| ())
|
||||
.chain_err(|| "Could not update partition map")
|
||||
}
|
||||
pub trait PartitionMapping {
|
||||
fn allocate_entid<S: ?Sized + Ord + Display>(&mut self, partition: &S) -> i64 where String: Borrow<S>;
|
||||
fn allocate_entids<S: ?Sized + Ord + Display>(&mut self, partition: &S, n: usize) -> Range<i64> where String: Borrow<S>;
|
||||
}
|
||||
|
||||
impl PartitionMapping for PartitionMap {
|
||||
/// Allocate a single fresh entid in the given `partition`.
|
||||
pub fn allocate_entid<S: ?Sized + Ord + Display>(&mut self, partition: &S) -> i64 where String: Borrow<S> {
|
||||
fn allocate_entid<S: ?Sized + Ord + Display>(&mut self, partition: &S) -> i64 where String: Borrow<S> {
|
||||
self.allocate_entids(partition, 1).start
|
||||
}
|
||||
|
||||
/// Allocate `n` fresh entids in the given `partition`.
|
||||
pub fn allocate_entids<S: ?Sized + Ord + Display>(&mut self, partition: &S, n: usize) -> Range<i64> where String: Borrow<S> {
|
||||
match self.partition_map.get_mut(partition) {
|
||||
fn allocate_entids<S: ?Sized + Ord + Display>(&mut self, partition: &S, n: usize) -> Range<i64> where String: Borrow<S> {
|
||||
match self.get_mut(partition) {
|
||||
Some(mut partition) => {
|
||||
let idx = partition.index;
|
||||
partition.index += n as i64;
|
||||
|
@ -836,24 +843,6 @@ impl DB {
|
|||
None => panic!("Cannot allocate entid from unknown partition: {}", partition),
|
||||
}
|
||||
}
|
||||
|
||||
/// Transact the given `entities` against the given SQLite `conn`, using the metadata in
|
||||
/// `self.DB`.
|
||||
///
|
||||
/// This approach is explained in https://github.com/mozilla/mentat/wiki/Transacting.
|
||||
// TODO: move this to the transactor layer.
|
||||
pub fn transact<I>(&mut self, conn: &rusqlite::Connection, entities: I) -> Result<TxReport> where I: IntoIterator<Item=Entity> {
|
||||
// Eventually, this function will be responsible for managing a SQLite transaction. For
|
||||
// now, it's just about the tx details.
|
||||
|
||||
let tx_instant = now(); // Label the transaction with the timestamp when we first see it: leading edge.
|
||||
let tx_id = self.allocate_entid(":db.part/tx");
|
||||
|
||||
self.create_temp_tables(conn)?;
|
||||
|
||||
let mut tx = Tx::new(self, conn, tx_id, tx_instant);
|
||||
tx.transact_entities(entities)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
@ -865,6 +854,7 @@ mod tests {
|
|||
use edn::symbols;
|
||||
use mentat_tx_parser;
|
||||
use rusqlite;
|
||||
use tx::transact;
|
||||
|
||||
#[test]
|
||||
fn test_open_current_version() {
|
||||
|
@ -881,11 +871,11 @@ mod tests {
|
|||
let db = read_db(&conn).unwrap();
|
||||
|
||||
// Does not include :db/txInstant.
|
||||
let datoms = debug::datoms_after(&conn, &db, 0).unwrap();
|
||||
let datoms = debug::datoms_after(&conn, &db.schema, 0).unwrap();
|
||||
assert_eq!(datoms.0.len(), 88);
|
||||
|
||||
// Includes :db/txInstant.
|
||||
let transactions = debug::transactions_after(&conn, &db, 0).unwrap();
|
||||
let transactions = debug::transactions_after(&conn, &db.schema, 0).unwrap();
|
||||
assert_eq!(transactions.0.len(), 1);
|
||||
assert_eq!(transactions.0[0].0.len(), 89);
|
||||
}
|
||||
|
@ -898,9 +888,11 @@ mod tests {
|
|||
/// There is some magic here about transaction numbering that I don't want to commit to or
|
||||
/// document just yet. The end state might be much more general pattern matching syntax, rather
|
||||
/// than the targeted transaction ID and timestamp replacement we have right now.
|
||||
fn assert_transactions(conn: &rusqlite::Connection, db: &mut DB, transactions: &Vec<edn::Value>) {
|
||||
for (index, transaction) in transactions.into_iter().enumerate() {
|
||||
let index = index as i64;
|
||||
// TODO: accept a `Conn`.
|
||||
fn assert_transactions<'a>(conn: &rusqlite::Connection, partition_map: &mut PartitionMap, schema: &mut Schema, transactions: &Vec<edn::Value>) {
|
||||
let mut index: i64 = bootstrap::TX0;
|
||||
|
||||
for transaction in transactions {
|
||||
let transaction = transaction.as_map().unwrap();
|
||||
let label: edn::Value = transaction.get(&edn::Value::NamespacedKeyword(symbols::NamespacedKeyword::new("test", "label"))).unwrap().clone();
|
||||
let assertions: edn::Value = transaction.get(&edn::Value::NamespacedKeyword(symbols::NamespacedKeyword::new("test", "assertions"))).unwrap().clone();
|
||||
|
@ -910,10 +902,12 @@ mod tests {
|
|||
|
||||
let entities: Vec<_> = mentat_tx_parser::Tx::parse(&[assertions][..]).unwrap();
|
||||
|
||||
let maybe_report = db.transact(&conn, entities);
|
||||
let maybe_report = transact(&conn, partition_map, schema, entities);
|
||||
|
||||
if let Some(expected_transaction) = expected_transaction {
|
||||
if expected_transaction.is_nil() {
|
||||
if !expected_transaction.is_nil() {
|
||||
index += 1;
|
||||
} else {
|
||||
assert!(maybe_report.is_err());
|
||||
|
||||
if let Some(expected_error_message) = expected_error_message {
|
||||
|
@ -925,10 +919,17 @@ mod tests {
|
|||
continue
|
||||
}
|
||||
|
||||
let report = maybe_report.unwrap();
|
||||
assert_eq!(report.tx_id, bootstrap::TX0 + index + 1);
|
||||
// TODO: test schema changes in the EDN format.
|
||||
let (report, next_partition_map, next_schema) = maybe_report.unwrap();
|
||||
*partition_map = next_partition_map;
|
||||
if let Some(next_schema) = next_schema {
|
||||
*schema = next_schema;
|
||||
}
|
||||
|
||||
let transactions = debug::transactions_after(&conn, &db, bootstrap::TX0 + index).unwrap();
|
||||
assert_eq!(index, report.tx_id,
|
||||
"\n{} - expected tx_id {} but got tx_id {}", label, index, report.tx_id);
|
||||
|
||||
let transactions = debug::transactions_after(&conn, &schema, index - 1).unwrap();
|
||||
assert_eq!(transactions.0.len(), 1);
|
||||
assert_eq!(*expected_transaction,
|
||||
transactions.0[0].into_edn(),
|
||||
|
@ -936,7 +937,7 @@ mod tests {
|
|||
}
|
||||
|
||||
if let Some(expected_datoms) = expected_datoms {
|
||||
let datoms = debug::datoms_after(&conn, &db, bootstrap::TX0).unwrap();
|
||||
let datoms = debug::datoms_after(&conn, &schema, bootstrap::TX0).unwrap();
|
||||
assert_eq!(*expected_datoms,
|
||||
datoms.into_edn(),
|
||||
"\n{} - expected datoms:\n{}\nbut got datoms:\n{}", label, *expected_datoms, datoms.into_edn())
|
||||
|
@ -955,11 +956,11 @@ mod tests {
|
|||
let mut db = ensure_current_version(&mut conn).unwrap();
|
||||
|
||||
// Does not include :db/txInstant.
|
||||
let datoms = debug::datoms_after(&conn, &db, 0).unwrap();
|
||||
let datoms = debug::datoms_after(&conn, &db.schema, 0).unwrap();
|
||||
assert_eq!(datoms.0.len(), 88);
|
||||
|
||||
// Includes :db/txInstant.
|
||||
let transactions = debug::transactions_after(&conn, &db, 0).unwrap();
|
||||
let transactions = debug::transactions_after(&conn, &db.schema, 0).unwrap();
|
||||
assert_eq!(transactions.0.len(), 1);
|
||||
assert_eq!(transactions.0[0].0.len(), 89);
|
||||
|
||||
|
@ -967,7 +968,8 @@ mod tests {
|
|||
let value = edn::parse::value(include_str!("../../tx/fixtures/test_add.edn")).unwrap().without_spans();
|
||||
|
||||
let transactions = value.as_vector().unwrap();
|
||||
assert_transactions(&conn, &mut db, transactions);
|
||||
|
||||
assert_transactions(&conn, &mut db.partition_map, &mut db.schema, transactions);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -976,18 +978,18 @@ mod tests {
|
|||
let mut db = ensure_current_version(&mut conn).unwrap();
|
||||
|
||||
// Does not include :db/txInstant.
|
||||
let datoms = debug::datoms_after(&conn, &db, 0).unwrap();
|
||||
let datoms = debug::datoms_after(&conn, &db.schema, 0).unwrap();
|
||||
assert_eq!(datoms.0.len(), 88);
|
||||
|
||||
// Includes :db/txInstant.
|
||||
let transactions = debug::transactions_after(&conn, &db, 0).unwrap();
|
||||
let transactions = debug::transactions_after(&conn, &db.schema, 0).unwrap();
|
||||
assert_eq!(transactions.0.len(), 1);
|
||||
assert_eq!(transactions.0[0].0.len(), 89);
|
||||
|
||||
let value = edn::parse::value(include_str!("../../tx/fixtures/test_retract.edn")).unwrap().without_spans();
|
||||
|
||||
let transactions = value.as_vector().unwrap();
|
||||
assert_transactions(&conn, &mut db, transactions);
|
||||
assert_transactions(&conn, &mut db.partition_map, &mut db.schema, transactions);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -996,17 +998,17 @@ mod tests {
|
|||
let mut db = ensure_current_version(&mut conn).unwrap();
|
||||
|
||||
// Does not include :db/txInstant.
|
||||
let datoms = debug::datoms_after(&conn, &db, 0).unwrap();
|
||||
let datoms = debug::datoms_after(&conn, &db.schema, 0).unwrap();
|
||||
assert_eq!(datoms.0.len(), 88);
|
||||
|
||||
// Includes :db/txInstant.
|
||||
let transactions = debug::transactions_after(&conn, &db, 0).unwrap();
|
||||
let transactions = debug::transactions_after(&conn, &db.schema, 0).unwrap();
|
||||
assert_eq!(transactions.0.len(), 1);
|
||||
assert_eq!(transactions.0[0].0.len(), 89);
|
||||
|
||||
let value = edn::parse::value(include_str!("../../tx/fixtures/test_upsert_vector.edn")).unwrap().without_spans();
|
||||
|
||||
let transactions = value.as_vector().unwrap();
|
||||
assert_transactions(&conn, &mut db, transactions);
|
||||
assert_transactions(&conn, &mut db.partition_map, &mut db.schema, transactions);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -12,6 +12,7 @@
|
|||
|
||||
/// Low-level functions for testing.
|
||||
|
||||
use std::borrow::Borrow;
|
||||
use std::collections::{BTreeSet};
|
||||
use std::io::{Write};
|
||||
|
||||
|
@ -28,7 +29,7 @@ use entids;
|
|||
use mentat_core::TypedValue;
|
||||
use mentat_tx::entities::{Entid};
|
||||
use db::TypedSQLValue;
|
||||
use types::DB;
|
||||
use types::Schema;
|
||||
use errors::Result;
|
||||
|
||||
/// Represents a *datom* (assertion) in the store.
|
||||
|
@ -106,21 +107,21 @@ impl Transactions {
|
|||
}
|
||||
|
||||
/// Convert a numeric entid to an ident `Entid` if possible, otherwise a numeric `Entid`.
|
||||
fn to_entid(db: &DB, entid: i64) -> Entid {
|
||||
db.schema.get_ident(entid).map_or(Entid::Entid(entid), |ident| Entid::Ident(ident.clone()))
|
||||
fn to_entid(schema: &Schema, entid: i64) -> Entid {
|
||||
schema.get_ident(entid).map_or(Entid::Entid(entid), |ident| Entid::Ident(ident.clone()))
|
||||
}
|
||||
|
||||
/// Return the set of datoms in the store, ordered by (e, a, v, tx), but not including any datoms of
|
||||
/// the form [... :db/txInstant ...].
|
||||
pub fn datoms(conn: &rusqlite::Connection, db: &DB) -> Result<Datoms> {
|
||||
datoms_after(conn, db, bootstrap::TX0 - 1)
|
||||
pub fn datoms<S: Borrow<Schema>>(conn: &rusqlite::Connection, schema: &S) -> Result<Datoms> {
|
||||
datoms_after(conn, schema, bootstrap::TX0 - 1)
|
||||
}
|
||||
|
||||
/// Return the set of datoms in the store with transaction ID strictly greater than the given `tx`,
|
||||
/// ordered by (e, a, v, tx).
|
||||
///
|
||||
/// The datom set returned does not include any datoms of the form [... :db/txInstant ...].
|
||||
pub fn datoms_after(conn: &rusqlite::Connection, db: &DB, tx: i64) -> Result<Datoms> {
|
||||
pub fn datoms_after<S: Borrow<Schema>>(conn: &rusqlite::Connection, schema: &S, tx: i64) -> Result<Datoms> {
|
||||
let mut stmt: rusqlite::Statement = conn.prepare("SELECT e, a, v, value_type_tag, tx FROM datoms WHERE tx > ? ORDER BY e ASC, a ASC, v ASC, tx ASC")?;
|
||||
|
||||
let r: Result<Vec<_>> = stmt.query_and_then(&[&tx], |row| {
|
||||
|
@ -139,9 +140,10 @@ pub fn datoms_after(conn: &rusqlite::Connection, db: &DB, tx: i64) -> Result<Dat
|
|||
|
||||
let tx: i64 = row.get_checked(4)?;
|
||||
|
||||
let borrowed_schema = schema.borrow();
|
||||
Ok(Some(Datom {
|
||||
e: to_entid(db, e),
|
||||
a: to_entid(db, a),
|
||||
e: to_entid(borrowed_schema, e),
|
||||
a: to_entid(borrowed_schema, a),
|
||||
v: value,
|
||||
tx: tx,
|
||||
added: None,
|
||||
|
@ -155,7 +157,7 @@ pub fn datoms_after(conn: &rusqlite::Connection, db: &DB, tx: i64) -> Result<Dat
|
|||
/// given `tx`, ordered by (tx, e, a, v).
|
||||
///
|
||||
/// Each transaction returned includes the [:db/tx :db/txInstant ...] datom.
|
||||
pub fn transactions_after(conn: &rusqlite::Connection, db: &DB, tx: i64) -> Result<Transactions> {
|
||||
pub fn transactions_after<S: Borrow<Schema>>(conn: &rusqlite::Connection, schema: &S, tx: i64) -> Result<Transactions> {
|
||||
let mut stmt: rusqlite::Statement = conn.prepare("SELECT e, a, v, value_type_tag, tx, added FROM transactions WHERE tx > ? ORDER BY tx ASC, e ASC, a ASC, v ASC, added ASC")?;
|
||||
|
||||
let r: Result<Vec<_>> = stmt.query_and_then(&[&tx], |row| {
|
||||
|
@ -171,9 +173,10 @@ pub fn transactions_after(conn: &rusqlite::Connection, db: &DB, tx: i64) -> Resu
|
|||
let tx: i64 = row.get_checked(4)?;
|
||||
let added: bool = row.get_checked(5)?;
|
||||
|
||||
let borrowed_schema = schema.borrow();
|
||||
Ok(Datom {
|
||||
e: to_entid(db, e),
|
||||
a: to_entid(db, a),
|
||||
e: to_entid(borrowed_schema, e),
|
||||
a: to_entid(borrowed_schema, a),
|
||||
v: value,
|
||||
tx: tx,
|
||||
added: Some(added),
|
||||
|
|
|
@ -26,13 +26,13 @@ extern crate mentat_tx_parser;
|
|||
|
||||
use itertools::Itertools;
|
||||
use std::iter::repeat;
|
||||
use errors::{ErrorKind, Result};
|
||||
pub use errors::{Error, ErrorKind, ResultExt, Result};
|
||||
|
||||
pub mod db;
|
||||
mod bootstrap;
|
||||
mod debug;
|
||||
pub mod debug;
|
||||
mod entids;
|
||||
mod errors;
|
||||
pub mod errors;
|
||||
mod schema;
|
||||
mod types;
|
||||
mod internal_types;
|
||||
|
@ -40,7 +40,12 @@ mod upsert_resolution;
|
|||
mod values;
|
||||
mod tx;
|
||||
|
||||
pub use types::DB;
|
||||
pub use tx::transact;
|
||||
pub use types::{
|
||||
DB,
|
||||
PartitionMap,
|
||||
TxReport,
|
||||
};
|
||||
|
||||
use edn::symbols;
|
||||
|
||||
|
|
|
@ -10,6 +10,8 @@
|
|||
|
||||
#![allow(dead_code)]
|
||||
|
||||
use db::TypedSQLValue;
|
||||
use edn;
|
||||
use entids;
|
||||
use errors::{ErrorKind, Result};
|
||||
use edn::symbols;
|
||||
|
@ -177,3 +179,34 @@ impl SchemaBuilding for Schema {
|
|||
Schema::from_ident_map_and_schema_map(ident_map.clone(), schema_map)
|
||||
}
|
||||
}
|
||||
|
||||
pub trait SchemaTypeChecking {
|
||||
/// Do schema-aware typechecking and coercion.
|
||||
///
|
||||
/// Either assert that the given value is in the attribute's value set, or (in limited cases)
|
||||
/// coerce the given value into the attribute's value set.
|
||||
fn to_typed_value(&self, value: &edn::Value, attribute: &Attribute) -> Result<TypedValue>;
|
||||
}
|
||||
|
||||
impl SchemaTypeChecking for Schema {
|
||||
fn to_typed_value(&self, value: &edn::Value, attribute: &Attribute) -> Result<TypedValue> {
|
||||
// TODO: encapsulate entid-ident-attribute for better error messages.
|
||||
match TypedValue::from_edn_value(value) {
|
||||
// We don't recognize this EDN at all. Get out!
|
||||
None => bail!(ErrorKind::BadEDNValuePair(value.clone(), attribute.value_type.clone())),
|
||||
Some(typed_value) => match (&attribute.value_type, typed_value) {
|
||||
// Most types don't coerce at all.
|
||||
(&ValueType::Boolean, tv @ TypedValue::Boolean(_)) => Ok(tv),
|
||||
(&ValueType::Long, tv @ TypedValue::Long(_)) => Ok(tv),
|
||||
(&ValueType::Double, tv @ TypedValue::Double(_)) => Ok(tv),
|
||||
(&ValueType::String, tv @ TypedValue::String(_)) => Ok(tv),
|
||||
(&ValueType::Keyword, tv @ TypedValue::Keyword(_)) => Ok(tv),
|
||||
// Ref coerces a little: we interpret some things depending on the schema as a Ref.
|
||||
(&ValueType::Ref, TypedValue::Long(x)) => Ok(TypedValue::Ref(x)),
|
||||
(&ValueType::Ref, TypedValue::Keyword(ref x)) => self.require_entid(&x).map(|entid| TypedValue::Ref(entid)),
|
||||
// Otherwise, we have a type mismatch.
|
||||
(value_type, _) => bail!(ErrorKind::BadEDNValuePair(value.clone(), value_type.clone())),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
151
db/src/tx.rs
151
db/src/tx.rs
|
@ -48,7 +48,12 @@
|
|||
use std;
|
||||
use std::collections::BTreeSet;
|
||||
|
||||
use db::{ReducedEntity, SearchType};
|
||||
use ::{to_namespaced_keyword};
|
||||
use db;
|
||||
use db::{
|
||||
MentatStoring,
|
||||
PartitionMapping,
|
||||
};
|
||||
use entids;
|
||||
use errors::{ErrorKind, Result};
|
||||
use internal_types::{
|
||||
|
@ -60,17 +65,23 @@ use internal_types::{
|
|||
TermWithTempIds,
|
||||
TermWithoutTempIds,
|
||||
replace_lookup_ref};
|
||||
use mentat_core::intern_set;
|
||||
use mentat_core::{
|
||||
intern_set,
|
||||
Schema,
|
||||
};
|
||||
use mentat_tx::entities as entmod;
|
||||
use mentat_tx::entities::{Entity, OpType};
|
||||
use rusqlite;
|
||||
use schema::SchemaBuilding;
|
||||
use schema::{
|
||||
SchemaBuilding,
|
||||
SchemaTypeChecking,
|
||||
};
|
||||
use types::{
|
||||
Attribute,
|
||||
AVPair,
|
||||
AVMap,
|
||||
DB,
|
||||
Entid,
|
||||
PartitionMap,
|
||||
TypedValue,
|
||||
TxReport,
|
||||
ValueType,
|
||||
|
@ -79,28 +90,37 @@ use upsert_resolution::Generation;
|
|||
|
||||
/// A transaction on its way to being applied.
|
||||
#[derive(Debug)]
|
||||
pub struct Tx<'conn> {
|
||||
/// The metadata to use to interpret the transaction entities with.
|
||||
pub db: &'conn mut DB,
|
||||
pub struct Tx<'conn, 'a> {
|
||||
/// The storage to apply against. In the future, this will be a Mentat connection.
|
||||
store: &'conn rusqlite::Connection, // TODO: db::MentatStoring,
|
||||
|
||||
/// The SQLite connection to apply against. In the future, this will be a Mentat connection.
|
||||
pub conn: &'conn rusqlite::Connection,
|
||||
/// The partition map to allocate entids from.
|
||||
///
|
||||
/// The partition map is volatile in the sense that every succesful transaction updates
|
||||
/// allocates at least one tx ID, so we own and modify our own partition map.
|
||||
partition_map: PartitionMap,
|
||||
|
||||
/// The schema to use when interpreting the transaction entities.
|
||||
///
|
||||
/// The schema is infrequently updated, so we borrow a schema until we need to modify it.
|
||||
schema: Cow<'a, Schema>,
|
||||
|
||||
/// The transaction ID of the transaction.
|
||||
pub tx_id: Entid,
|
||||
tx_id: Entid,
|
||||
|
||||
/// The timestamp when the transaction began to be committed.
|
||||
///
|
||||
/// This is milliseconds after the Unix epoch according to the transactor's local clock.
|
||||
// TODO: :db.type/instant.
|
||||
pub tx_instant: i64,
|
||||
tx_instant: i64,
|
||||
}
|
||||
|
||||
impl<'conn> Tx<'conn> {
|
||||
pub fn new(db: &'conn mut DB, conn: &'conn rusqlite::Connection, tx_id: Entid, tx_instant: i64) -> Tx<'conn> {
|
||||
impl<'conn, 'a> Tx<'conn, 'a> {
|
||||
pub fn new(store: &'conn rusqlite::Connection, partition_map: PartitionMap, schema: &'a Schema, tx_id: Entid, tx_instant: i64) -> Tx<'conn, 'a> {
|
||||
Tx {
|
||||
db: db,
|
||||
conn: conn,
|
||||
store: store,
|
||||
partition_map: partition_map,
|
||||
schema: Cow::Borrowed(schema),
|
||||
tx_id: tx_id,
|
||||
tx_instant: tx_instant,
|
||||
}
|
||||
|
@ -109,7 +129,7 @@ impl<'conn> Tx<'conn> {
|
|||
/// Given a collection of tempids and the [a v] pairs that they might upsert to, resolve exactly
|
||||
/// which [a v] pairs do upsert to entids, and map each tempid that upserts to the upserted
|
||||
/// entid. The keys of the resulting map are exactly those tempids that upserted.
|
||||
pub fn resolve_temp_id_avs<'a>(&self, conn: &rusqlite::Connection, temp_id_avs: &'a [(TempId, AVPair)]) -> Result<TempIdMap> {
|
||||
pub fn resolve_temp_id_avs<'b>(&self, temp_id_avs: &'b [(TempId, AVPair)]) -> Result<TempIdMap> {
|
||||
if temp_id_avs.is_empty() {
|
||||
return Ok(TempIdMap::default());
|
||||
}
|
||||
|
@ -121,7 +141,7 @@ impl<'conn> Tx<'conn> {
|
|||
}
|
||||
|
||||
// Lookup in the store.
|
||||
let av_map: AVMap = self.db.resolve_avs(conn, &av_pairs[..])?;
|
||||
let av_map: AVMap = self.store.resolve_avs(&av_pairs[..])?;
|
||||
|
||||
// Map id->entid.
|
||||
let mut temp_id_map: TempIdMap = TempIdMap::default();
|
||||
|
@ -153,16 +173,16 @@ impl<'conn> Tx<'conn> {
|
|||
Entity::AddOrRetract { op, e, a, v } => {
|
||||
let a: i64 = match a {
|
||||
entmod::Entid::Entid(ref a) => *a,
|
||||
entmod::Entid::Ident(ref a) => self.db.schema.require_entid(&a)?,
|
||||
entmod::Entid::Ident(ref a) => self.schema.require_entid(&a)?,
|
||||
};
|
||||
|
||||
let attribute: &Attribute = self.db.schema.require_attribute_for_entid(a)?;
|
||||
let attribute: &Attribute = self.schema.require_attribute_for_entid(a)?;
|
||||
|
||||
let e = match e {
|
||||
entmod::EntidOrLookupRefOrTempId::Entid(e) => {
|
||||
let e: i64 = match e {
|
||||
entmod::Entid::Entid(ref e) => *e,
|
||||
entmod::Entid::Ident(ref e) => self.db.schema.require_entid(&e)?,
|
||||
entmod::Entid::Ident(ref e) => self.schema.require_entid(&e)?,
|
||||
};
|
||||
std::result::Result::Ok(e)
|
||||
},
|
||||
|
@ -186,7 +206,7 @@ impl<'conn> Tx<'conn> {
|
|||
// Here is where we do schema-aware typechecking: we either assert that
|
||||
// the given value is in the attribute's value set, or (in limited
|
||||
// cases) coerce the value into the attribute's value set.
|
||||
let typed_value: TypedValue = self.db.to_typed_value(&v, &attribute)?;
|
||||
let typed_value: TypedValue = self.schema.to_typed_value(&v, &attribute)?;
|
||||
|
||||
std::result::Result::Ok(typed_value)
|
||||
}
|
||||
|
@ -215,27 +235,13 @@ impl<'conn> Tx<'conn> {
|
|||
}).collect::<Result<Vec<_>>>()
|
||||
}
|
||||
|
||||
/// Transact the given `entities` against the given SQLite `conn`, using the metadata in
|
||||
/// `self.DB`.
|
||||
/// Transact the given `entities` against the store.
|
||||
///
|
||||
/// This approach is explained in https://github.com/mozilla/mentat/wiki/Transacting.
|
||||
// TODO: move this to the transactor layer.
|
||||
pub fn transact_entities<I>(&mut self, entities: I) -> Result<TxReport> where I: IntoIterator<Item=Entity> {
|
||||
// TODO: push these into an internal transaction report?
|
||||
|
||||
/// Assertions that are :db.cardinality/one and not :db.fulltext.
|
||||
let mut non_fts_one: Vec<ReducedEntity> = vec![];
|
||||
|
||||
/// Assertions that are :db.cardinality/many and not :db.fulltext.
|
||||
let mut non_fts_many: Vec<ReducedEntity> = vec![];
|
||||
|
||||
// Transact [:db/add :db/txInstant NOW :db/tx].
|
||||
// TODO: allow this to be present in the transaction data.
|
||||
non_fts_one.push((self.tx_id,
|
||||
entids::DB_TX_INSTANT,
|
||||
TypedValue::Long(self.tx_instant),
|
||||
true));
|
||||
|
||||
// We don't yet support lookup refs, so this isn't mutable. Later, it'll be mutable.
|
||||
let lookup_refs: intern_set::InternSet<AVPair> = intern_set::InternSet::new();
|
||||
|
||||
|
@ -245,18 +251,18 @@ impl<'conn> Tx<'conn> {
|
|||
|
||||
// Pipeline stage 2: resolve lookup refs -> terms with tempids.
|
||||
let lookup_ref_avs: Vec<&(i64, TypedValue)> = lookup_refs.inner.iter().map(|rc| &**rc).collect();
|
||||
let lookup_ref_map: AVMap = self.db.resolve_avs(self.conn, &lookup_ref_avs[..])?;
|
||||
let lookup_ref_map: AVMap = self.store.resolve_avs(&lookup_ref_avs[..])?;
|
||||
|
||||
let terms_with_temp_ids = self.resolve_lookup_refs(&lookup_ref_map, terms_with_temp_ids_and_lookup_refs)?;
|
||||
|
||||
// Pipeline stage 3: upsert tempids -> terms without tempids or lookup refs.
|
||||
// Now we can collect upsert populations.
|
||||
let (mut generation, inert_terms) = Generation::from(terms_with_temp_ids, &self.db.schema)?;
|
||||
let (mut generation, inert_terms) = Generation::from(terms_with_temp_ids, &self.schema)?;
|
||||
|
||||
// And evolve them forward.
|
||||
while generation.can_evolve() {
|
||||
// Evolve further.
|
||||
let temp_id_map = self.resolve_temp_id_avs(self.conn, &generation.temp_id_avs()[..])?;
|
||||
let temp_id_map = self.resolve_temp_id_avs(&generation.temp_id_avs()[..])?;
|
||||
generation = generation.evolve_one_step(&temp_id_map);
|
||||
}
|
||||
|
||||
|
@ -264,11 +270,18 @@ impl<'conn> Tx<'conn> {
|
|||
let unresolved_temp_ids: BTreeSet<TempId> = generation.temp_ids_in_allocations();
|
||||
|
||||
// TODO: track partitions for temporary IDs.
|
||||
let entids = self.db.allocate_entids(":db.part/user", unresolved_temp_ids.len());
|
||||
let entids = self.partition_map.allocate_entids(":db.part/user", unresolved_temp_ids.len());
|
||||
|
||||
let temp_id_allocations: TempIdMap = unresolved_temp_ids.into_iter().zip(entids).collect();
|
||||
|
||||
let final_populations = generation.into_final_populations(&temp_id_allocations)?;
|
||||
{
|
||||
/// Assertions that are :db.cardinality/one and not :db.fulltext.
|
||||
let mut non_fts_one: Vec<db::ReducedEntity> = vec![];
|
||||
|
||||
/// Assertions that are :db.cardinality/many and not :db.fulltext.
|
||||
let mut non_fts_many: Vec<db::ReducedEntity> = vec![];
|
||||
|
||||
let final_terms: Vec<TermWithoutTempIds> = [final_populations.resolved,
|
||||
final_populations.allocated,
|
||||
inert_terms.into_iter().map(|term| term.unwrap()).collect()].concat();
|
||||
|
@ -279,36 +292,46 @@ impl<'conn> Tx<'conn> {
|
|||
for term in final_terms {
|
||||
match term {
|
||||
Term::AddOrRetract(op, e, a, v) => {
|
||||
let attribute: &Attribute = self.db.schema.require_attribute_for_entid(a)?;
|
||||
let attribute: &Attribute = self.schema.require_attribute_for_entid(a)?;
|
||||
if attribute.fulltext {
|
||||
bail!(ErrorKind::NotYetImplemented(format!("Transacting :db/fulltext entities is not yet implemented"))) // TODO: reference original input. Difficult!
|
||||
}
|
||||
|
||||
let added = op == OpType::Add;
|
||||
if attribute.multival {
|
||||
non_fts_many.push((e, a, v, added));
|
||||
non_fts_many.push((e, a, attribute, v, added));
|
||||
} else {
|
||||
non_fts_one.push((e, a, v, added));
|
||||
non_fts_one.push((e, a, attribute, v, added));
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// Transact [:db/add :db/txInstant NOW :db/tx].
|
||||
// TODO: allow this to be present in the transaction data.
|
||||
non_fts_one.push((self.tx_id,
|
||||
entids::DB_TX_INSTANT,
|
||||
// TODO: extract this to a constant.
|
||||
self.schema.require_attribute_for_entid(self.schema.require_entid(&to_namespaced_keyword(":db/txInstant").unwrap())?)?,
|
||||
TypedValue::Long(self.tx_instant),
|
||||
true));
|
||||
|
||||
if !non_fts_one.is_empty() {
|
||||
self.db.insert_non_fts_searches(self.conn, &non_fts_one[..], self.tx_id, SearchType::Inexact)?;
|
||||
self.store.insert_non_fts_searches(&non_fts_one[..], db::SearchType::Inexact)?;
|
||||
}
|
||||
|
||||
if !non_fts_many.is_empty() {
|
||||
self.db.insert_non_fts_searches(self.conn, &non_fts_many[..], self.tx_id, SearchType::Exact)?;
|
||||
self.store.insert_non_fts_searches(&non_fts_many[..], db::SearchType::Exact)?;
|
||||
}
|
||||
|
||||
self.db.search(self.conn)?;
|
||||
self.store.commit_transaction(self.tx_id)?;
|
||||
}
|
||||
|
||||
self.db.insert_transaction(self.conn, self.tx_id)?;
|
||||
self.db.update_datoms(self.conn, self.tx_id)?;
|
||||
// let mut next_schema = self.schema.to_mut();
|
||||
// next_schema.ident_map.insert(NamespacedKeyword::new("db", "new"), 1000);
|
||||
|
||||
// TODO: update idents and schema materialized views.
|
||||
self.db.update_partition_map(self.conn)?;
|
||||
db::update_partition_map(self.store, &self.partition_map)?;
|
||||
|
||||
Ok(TxReport {
|
||||
tx_id: self.tx_id,
|
||||
|
@ -316,3 +339,33 @@ impl<'conn> Tx<'conn> {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
use std::borrow::Cow;
|
||||
|
||||
/// Transact the given `entities` against the given SQLite `conn`, using the metadata in
|
||||
/// `self.DB`.
|
||||
///
|
||||
/// This approach is explained in https://github.com/mozilla/mentat/wiki/Transacting.
|
||||
// TODO: move this to the transactor layer.
|
||||
pub fn transact<'conn, 'a, I>(conn: &'conn rusqlite::Connection, partition_map: &'a PartitionMap, schema: &'a Schema, entities: I) -> Result<(TxReport, PartitionMap, Option<Schema>)> where I: IntoIterator<Item=Entity> {
|
||||
// Eventually, this function will be responsible for managing a SQLite transaction. For
|
||||
// now, it's just about the tx details.
|
||||
|
||||
let tx_instant = ::now(); // Label the transaction with the timestamp when we first see it: leading edge.
|
||||
|
||||
let mut next_partition_map: PartitionMap = partition_map.clone();
|
||||
let tx_id = next_partition_map.allocate_entid(":db.part/tx");
|
||||
|
||||
conn.begin_transaction()?;
|
||||
|
||||
let mut tx = Tx::new(conn, next_partition_map, schema, tx_id, tx_instant);
|
||||
|
||||
let report = tx.transact_entities(entities)?;
|
||||
|
||||
// If the schema has moved on, return it.
|
||||
let next_schema = match tx.schema {
|
||||
Cow::Borrowed(_) => None,
|
||||
Cow::Owned(next_schema) => Some(next_schema),
|
||||
};
|
||||
Ok((report, tx.partition_map, next_schema))
|
||||
}
|
||||
|
|
|
@ -23,7 +23,8 @@ pub mod parse {
|
|||
include!(concat!(env!("OUT_DIR"), "/edn.rs"));
|
||||
}
|
||||
|
||||
pub use ordered_float::OrderedFloat;
|
||||
pub use num::BigInt;
|
||||
pub use ordered_float::OrderedFloat;
|
||||
pub use parse::ParseError;
|
||||
pub use types::Value;
|
||||
pub use symbols::{Keyword, NamespacedKeyword, PlainSymbol, NamespacedSymbol};
|
||||
|
|
|
@ -5,7 +5,7 @@ authors = ["Victor Porof <vporof@mozilla.com>", "Richard Newman <rnewman@mozilla
|
|||
workspace = ".."
|
||||
|
||||
[dependencies]
|
||||
combine = "2.1.1"
|
||||
combine = "2.2.2"
|
||||
|
||||
[dependencies.edn]
|
||||
path = "../edn"
|
||||
|
|
|
@ -105,3 +105,85 @@ macro_rules! def_value_satisfy_parser_fn {
|
|||
});
|
||||
}
|
||||
}
|
||||
|
||||
/// A `ValueParseError` is a `combine::primitives::ParseError`-alike that implements the `Debug`,
|
||||
/// `Display`, and `std::error::Error` traits. In addition, it doesn't capture references, making
|
||||
/// it possible to store `ValueParseError` instances in local links with the `error-chain` crate.
|
||||
///
|
||||
/// This is achieved by wrapping slices of type `&'a [edn::Value]` in an owning type that implements
|
||||
/// `Display`; rather than introducing a newtype like `DisplayVec`, we re-use `edn::Value::Vector`.
|
||||
#[derive(PartialEq)]
|
||||
pub struct ValueParseError {
|
||||
pub position: usize,
|
||||
// Think of this as `Vec<Error<edn::Value, DisplayVec<edn::Value>>>`; see above.
|
||||
pub errors: Vec<combine::primitives::Error<edn::Value, edn::Value>>,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for ValueParseError {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
write!(f,
|
||||
"ParseError {{ position: {:?}, errors: {:?} }}",
|
||||
self.position,
|
||||
self.errors)
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for ValueParseError {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
try!(writeln!(f, "Parse error at {}", self.position));
|
||||
combine::primitives::Error::fmt_errors(&self.errors, f)
|
||||
}
|
||||
}
|
||||
|
||||
impl std::error::Error for ValueParseError {
|
||||
fn description(&self) -> &str {
|
||||
"parse error parsing EDN values"
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> From<combine::primitives::ParseError<&'a [edn::Value]>> for ValueParseError {
|
||||
fn from(e: combine::primitives::ParseError<&'a [edn::Value]>) -> ValueParseError {
|
||||
ValueParseError {
|
||||
position: e.position,
|
||||
errors: e.errors.into_iter().map(|e| e.map_range(|r| {
|
||||
let mut v = Vec::new();
|
||||
v.extend_from_slice(r);
|
||||
edn::Value::Vector(v)
|
||||
})).collect(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Allow to map the range types of combine::primitives::{Info, Error}.
|
||||
trait MapRange<R, S> {
|
||||
type Output;
|
||||
fn map_range<F>(self, f: F) -> Self::Output where F: FnOnce(R) -> S;
|
||||
}
|
||||
|
||||
impl<T, R, S> MapRange<R, S> for combine::primitives::Info<T, R> {
|
||||
type Output = combine::primitives::Info<T, S>;
|
||||
|
||||
fn map_range<F>(self, f: F) -> combine::primitives::Info<T, S> where F: FnOnce(R) -> S {
|
||||
use combine::primitives::Info::*;
|
||||
match self {
|
||||
Token(t) => Token(t),
|
||||
Range(r) => Range(f(r)),
|
||||
Owned(s) => Owned(s),
|
||||
Borrowed(x) => Borrowed(x),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, R, S> MapRange<R, S> for combine::primitives::Error<T, R> {
|
||||
type Output = combine::primitives::Error<T, S>;
|
||||
|
||||
fn map_range<F>(self, f: F) -> combine::primitives::Error<T, S> where F: FnOnce(R) -> S {
|
||||
use combine::primitives::Error::*;
|
||||
match self {
|
||||
Unexpected(x) => Unexpected(x.map_range(f)),
|
||||
Expected(x) => Expected(x.map_range(f)),
|
||||
Message(x) => Message(x.map_range(f)),
|
||||
Other(x) => Other(x),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -4,7 +4,8 @@ version = "0.0.1"
|
|||
workspace = ".."
|
||||
|
||||
[dependencies]
|
||||
combine = "2.1.1"
|
||||
combine = "2.2.2"
|
||||
error-chain = "0.9.0"
|
||||
matches = "0.1"
|
||||
|
||||
[dependencies.edn]
|
||||
|
|
35
query-parser/src/errors.rs
Normal file
35
query-parser/src/errors.rs
Normal file
|
@ -0,0 +1,35 @@
|
|||
// Copyright 2016 Mozilla
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
|
||||
// this file except in compliance with the License. You may obtain a copy of the
|
||||
// License at http://www.apache.org/licenses/LICENSE-2.0
|
||||
// Unless required by applicable law or agreed to in writing, software distributed
|
||||
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
|
||||
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations under the License.
|
||||
|
||||
#![allow(dead_code)]
|
||||
|
||||
use edn;
|
||||
use mentat_parser_utils::ValueParseError;
|
||||
|
||||
error_chain! {
|
||||
types {
|
||||
Error, ErrorKind, ResultExt, Result;
|
||||
}
|
||||
|
||||
foreign_links {
|
||||
EdnParseError(edn::ParseError);
|
||||
}
|
||||
|
||||
links {
|
||||
}
|
||||
|
||||
errors {
|
||||
NotAVariableError(value: edn::Value) {}
|
||||
InvalidInput(value: edn::Value) {}
|
||||
FindParseError(value_parse_error: ValueParseError) {}
|
||||
WhereParseError(value_parse_error: ValueParseError) {}
|
||||
MissingField(field: edn::Keyword) {}
|
||||
}
|
||||
}
|
|
@ -34,6 +34,7 @@
|
|||
/// ! parts of the map.
|
||||
|
||||
extern crate edn;
|
||||
extern crate mentat_parser_utils;
|
||||
extern crate mentat_query;
|
||||
|
||||
use std::collections::BTreeMap;
|
||||
|
@ -44,10 +45,15 @@ use self::mentat_query::{
|
|||
SrcVar,
|
||||
Variable,
|
||||
};
|
||||
use self::mentat_parser_utils::ValueParseError;
|
||||
|
||||
use super::errors::{
|
||||
Error,
|
||||
ErrorKind,
|
||||
Result,
|
||||
};
|
||||
|
||||
use super::parse::{
|
||||
NotAVariableError,
|
||||
QueryParseError,
|
||||
QueryParseResult,
|
||||
clause_seq_to_patterns,
|
||||
};
|
||||
|
@ -57,14 +63,14 @@ use super::util::vec_to_keyword_map;
|
|||
/// If the provided slice of EDN values are all variables as
|
||||
/// defined by `value_to_variable`, return a `Vec` of `Variable`s.
|
||||
/// Otherwise, return the unrecognized Value in a `NotAVariableError`.
|
||||
fn values_to_variables(vals: &[edn::Value]) -> Result<Vec<Variable>, NotAVariableError> {
|
||||
fn values_to_variables(vals: &[edn::Value]) -> Result<Vec<Variable>> {
|
||||
let mut out: Vec<Variable> = Vec::with_capacity(vals.len());
|
||||
for v in vals {
|
||||
if let Some(var) = Variable::from_value(v) {
|
||||
out.push(var);
|
||||
continue;
|
||||
}
|
||||
return Err(NotAVariableError(v.clone()));
|
||||
bail!(ErrorKind::NotAVariableError(v.clone()));
|
||||
}
|
||||
return Ok(out);
|
||||
}
|
||||
|
@ -109,7 +115,6 @@ fn parse_find_parts(find: &[edn::Value],
|
|||
where_clauses: where_clauses,
|
||||
}
|
||||
})
|
||||
.map_err(QueryParseError::FindParseError)
|
||||
}
|
||||
|
||||
fn parse_find_map(map: BTreeMap<edn::Keyword, Vec<edn::Value>>) -> QueryParseResult {
|
||||
|
@ -127,10 +132,10 @@ fn parse_find_map(map: BTreeMap<edn::Keyword, Vec<edn::Value>>) -> QueryParseRes
|
|||
map.get(&kw_with).map(|x| x.as_slice()),
|
||||
wheres);
|
||||
} else {
|
||||
return Err(QueryParseError::MissingField(kw_where));
|
||||
bail!(ErrorKind::MissingField(kw_where));
|
||||
}
|
||||
} else {
|
||||
return Err(QueryParseError::MissingField(kw_find));
|
||||
bail!(ErrorKind::MissingField(kw_find));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -148,22 +153,16 @@ fn parse_find_edn_map(map: BTreeMap<edn::Value, edn::Value>) -> QueryParseResult
|
|||
m.insert(kw, vec);
|
||||
continue;
|
||||
} else {
|
||||
return Err(QueryParseError::InvalidInput(v));
|
||||
bail!(ErrorKind::InvalidInput(v));
|
||||
}
|
||||
} else {
|
||||
return Err(QueryParseError::InvalidInput(k));
|
||||
bail!(ErrorKind::InvalidInput(k));
|
||||
}
|
||||
}
|
||||
|
||||
parse_find_map(m)
|
||||
}
|
||||
|
||||
impl From<edn::parse::ParseError> for QueryParseError {
|
||||
fn from(err: edn::parse::ParseError) -> QueryParseError {
|
||||
QueryParseError::EdnParseError(err)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn parse_find_string(string: &str) -> QueryParseResult {
|
||||
let expr = edn::parse::value(string)?;
|
||||
parse_find(expr.without_spans())
|
||||
|
@ -179,7 +178,7 @@ pub fn parse_find(expr: edn::Value) -> QueryParseResult {
|
|||
return parse_find_map(m);
|
||||
}
|
||||
}
|
||||
return Err(QueryParseError::InvalidInput(expr));
|
||||
bail!(ErrorKind::InvalidInput(expr));
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
|
|
@ -10,16 +10,27 @@
|
|||
|
||||
#![allow(unused_imports)]
|
||||
|
||||
#[macro_use]
|
||||
extern crate error_chain;
|
||||
#[macro_use]
|
||||
extern crate matches;
|
||||
|
||||
extern crate edn;
|
||||
#[macro_use]
|
||||
extern crate mentat_parser_utils;
|
||||
|
||||
mod util;
|
||||
mod parse;
|
||||
pub mod errors;
|
||||
pub mod find;
|
||||
|
||||
pub use errors::{
|
||||
Error,
|
||||
ErrorKind,
|
||||
ResultExt,
|
||||
Result,
|
||||
};
|
||||
|
||||
pub use find::{
|
||||
parse_find,
|
||||
parse_find_string,
|
||||
|
@ -27,8 +38,4 @@ pub use find::{
|
|||
|
||||
pub use parse::{
|
||||
QueryParseResult,
|
||||
QueryParseError,
|
||||
FindParseError,
|
||||
WhereParseError,
|
||||
NotAVariableError,
|
||||
};
|
||||
|
|
|
@ -13,7 +13,7 @@ extern crate edn;
|
|||
extern crate mentat_parser_utils;
|
||||
extern crate mentat_query;
|
||||
|
||||
use self::mentat_parser_utils::ResultParser;
|
||||
use self::mentat_parser_utils::{ResultParser, ValueParseError};
|
||||
use self::combine::{eof, many1, optional, parser, satisfy_map, Parser, ParseResult, Stream};
|
||||
use self::combine::combinator::{choice, try};
|
||||
use self::mentat_query::{
|
||||
|
@ -29,47 +29,10 @@ use self::mentat_query::{
|
|||
WhereClause,
|
||||
};
|
||||
|
||||
#[derive(Clone, Debug, Eq, PartialEq)]
|
||||
pub struct NotAVariableError(pub edn::Value);
|
||||
|
||||
#[allow(dead_code)]
|
||||
#[derive(Clone, Debug, Eq, PartialEq)]
|
||||
pub enum FindParseError {
|
||||
Err,
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
#[derive(Clone, Debug, Eq, PartialEq)]
|
||||
pub enum WhereParseError {
|
||||
Err,
|
||||
}
|
||||
|
||||
#[derive(Clone,Debug,Eq,PartialEq)]
|
||||
pub enum QueryParseError {
|
||||
InvalidInput(edn::Value),
|
||||
EdnParseError(edn::parse::ParseError),
|
||||
MissingField(edn::Keyword),
|
||||
FindParseError(FindParseError),
|
||||
WhereParseError(WhereParseError),
|
||||
WithParseError(NotAVariableError),
|
||||
}
|
||||
|
||||
impl From<WhereParseError> for QueryParseError {
|
||||
fn from(err: WhereParseError) -> QueryParseError {
|
||||
QueryParseError::WhereParseError(err)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<NotAVariableError> for QueryParseError {
|
||||
fn from(err: NotAVariableError) -> QueryParseError {
|
||||
QueryParseError::WithParseError(err)
|
||||
}
|
||||
}
|
||||
|
||||
pub type WhereParseResult = Result<Vec<WhereClause>, WhereParseError>;
|
||||
pub type FindParseResult = Result<FindSpec, FindParseError>;
|
||||
pub type QueryParseResult = Result<FindQuery, QueryParseError>;
|
||||
|
||||
use errors::{Error, ErrorKind, ResultExt, Result};
|
||||
pub type WhereParseResult = Result<Vec<WhereClause>>;
|
||||
pub type FindParseResult = Result<FindSpec>;
|
||||
pub type QueryParseResult = Result<FindQuery>;
|
||||
|
||||
pub struct Query<I>(::std::marker::PhantomData<fn(I) -> I>);
|
||||
|
||||
|
@ -222,7 +185,8 @@ pub fn find_seq_to_find_spec(find: &[edn::Value]) -> FindParseResult {
|
|||
Find::find()
|
||||
.parse(find)
|
||||
.map(|x| x.0)
|
||||
.map_err(|_| FindParseError::Err)
|
||||
.map_err::<ValueParseError, _>(|e| e.translate_position(find).into())
|
||||
.map_err(|e| Error::from_kind(ErrorKind::FindParseError(e)))
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
|
@ -230,7 +194,8 @@ pub fn clause_seq_to_patterns(clauses: &[edn::Value]) -> WhereParseResult {
|
|||
Where::clauses()
|
||||
.parse(clauses)
|
||||
.map(|x| x.0)
|
||||
.map_err(|_| WhereParseError::Err)
|
||||
.map_err::<ValueParseError, _>(|e| e.translate_position(clauses).into())
|
||||
.map_err(|e| Error::from_kind(ErrorKind::WhereParseError(e)))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
@ -401,15 +366,15 @@ mod test {
|
|||
edn::Value::PlainSymbol(ellipsis.clone())])];
|
||||
let rel = [edn::Value::PlainSymbol(vx.clone()), edn::Value::PlainSymbol(vy.clone())];
|
||||
|
||||
assert_eq!(Ok(FindSpec::FindScalar(Element::Variable(Variable(vx.clone())))),
|
||||
find_seq_to_find_spec(&scalar));
|
||||
assert_eq!(Ok(FindSpec::FindTuple(vec![Element::Variable(Variable(vx.clone())),
|
||||
Element::Variable(Variable(vy.clone()))])),
|
||||
find_seq_to_find_spec(&tuple));
|
||||
assert_eq!(Ok(FindSpec::FindColl(Element::Variable(Variable(vx.clone())))),
|
||||
find_seq_to_find_spec(&coll));
|
||||
assert_eq!(Ok(FindSpec::FindRel(vec![Element::Variable(Variable(vx.clone())),
|
||||
Element::Variable(Variable(vy.clone()))])),
|
||||
find_seq_to_find_spec(&rel));
|
||||
assert_eq!(FindSpec::FindScalar(Element::Variable(Variable(vx.clone()))),
|
||||
find_seq_to_find_spec(&scalar).unwrap());
|
||||
assert_eq!(FindSpec::FindTuple(vec![Element::Variable(Variable(vx.clone())),
|
||||
Element::Variable(Variable(vy.clone()))]),
|
||||
find_seq_to_find_spec(&tuple).unwrap());
|
||||
assert_eq!(FindSpec::FindColl(Element::Variable(Variable(vx.clone()))),
|
||||
find_seq_to_find_spec(&coll).unwrap());
|
||||
assert_eq!(FindSpec::FindRel(vec![Element::Variable(Variable(vx.clone())),
|
||||
Element::Variable(Variable(vy.clone()))]),
|
||||
find_seq_to_find_spec(&rel).unwrap());
|
||||
}
|
||||
}
|
||||
|
|
35
src/errors.rs
Normal file
35
src/errors.rs
Normal file
|
@ -0,0 +1,35 @@
|
|||
// Copyright 2016 Mozilla
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
|
||||
// this file except in compliance with the License. You may obtain a copy of the
|
||||
// License at http://www.apache.org/licenses/LICENSE-2.0
|
||||
// Unless required by applicable law or agreed to in writing, software distributed
|
||||
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
|
||||
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations under the License.
|
||||
|
||||
#![allow(dead_code)]
|
||||
|
||||
use rusqlite;
|
||||
|
||||
use edn;
|
||||
use mentat_db;
|
||||
use mentat_query_parser;
|
||||
use mentat_tx_parser;
|
||||
|
||||
error_chain! {
|
||||
types {
|
||||
Error, ErrorKind, ResultExt, Result;
|
||||
}
|
||||
|
||||
foreign_links {
|
||||
EdnParseError(edn::ParseError);
|
||||
Rusqlite(rusqlite::Error);
|
||||
}
|
||||
|
||||
links {
|
||||
DbError(mentat_db::Error, mentat_db::ErrorKind);
|
||||
QueryParseError(mentat_query_parser::Error, mentat_query_parser::ErrorKind);
|
||||
TxParseError(mentat_tx_parser::Error, mentat_tx_parser::ErrorKind);
|
||||
}
|
||||
}
|
|
@ -8,6 +8,8 @@
|
|||
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations under the License.
|
||||
|
||||
#[macro_use]
|
||||
extern crate error_chain;
|
||||
#[macro_use]
|
||||
extern crate slog;
|
||||
#[macro_use]
|
||||
|
@ -21,9 +23,11 @@ extern crate mentat_db;
|
|||
extern crate mentat_query;
|
||||
extern crate mentat_query_parser;
|
||||
extern crate mentat_query_algebrizer;
|
||||
extern crate mentat_tx_parser;
|
||||
|
||||
use rusqlite::Connection;
|
||||
|
||||
pub mod errors;
|
||||
pub mod ident;
|
||||
pub mod query;
|
||||
|
||||
|
|
20
src/query.rs
20
src/query.rs
|
@ -18,7 +18,10 @@ use mentat_db::DB;
|
|||
|
||||
use mentat_query_parser::{
|
||||
parse_find_string,
|
||||
QueryParseError,
|
||||
};
|
||||
|
||||
use errors::{
|
||||
Result,
|
||||
};
|
||||
|
||||
// TODO
|
||||
|
@ -31,19 +34,6 @@ pub enum QueryResults {
|
|||
Rel(Vec<Vec<TypedValue>>),
|
||||
}
|
||||
|
||||
pub enum QueryExecutionError {
|
||||
ParseError(QueryParseError),
|
||||
InvalidArgumentName(String),
|
||||
}
|
||||
|
||||
impl From<QueryParseError> for QueryExecutionError {
|
||||
fn from(err: QueryParseError) -> QueryExecutionError {
|
||||
QueryExecutionError::ParseError(err)
|
||||
}
|
||||
}
|
||||
|
||||
pub type QueryExecutionResult = Result<QueryResults, QueryExecutionError>;
|
||||
|
||||
/// Take an EDN query string, a reference to a open SQLite connection, a Mentat DB, and an optional
|
||||
/// collection of input bindings (which should be keyed by `"?varname"`), and execute the query
|
||||
/// immediately, blocking the current thread.
|
||||
|
@ -53,7 +43,7 @@ pub type QueryExecutionResult = Result<QueryResults, QueryExecutionError>;
|
|||
pub fn q_once(sqlite: SQLiteConnection,
|
||||
db: DB,
|
||||
query: &str,
|
||||
inputs: Option<HashMap<String, TypedValue>>) -> QueryExecutionResult {
|
||||
inputs: Option<HashMap<String, TypedValue>>) -> Result<QueryResults> {
|
||||
// TODO: validate inputs.
|
||||
let parsed = parse_find_string(query)?;
|
||||
Ok(QueryResults::Scalar(Some(TypedValue::Boolean(true))))
|
||||
|
|
|
@ -4,13 +4,14 @@ version = "0.0.1"
|
|||
workspace = ".."
|
||||
|
||||
[dependencies]
|
||||
combine = "2.1.1"
|
||||
combine = "2.2.2"
|
||||
error-chain = "0.9.0"
|
||||
|
||||
[dependencies.edn]
|
||||
path = "../edn"
|
||||
path = "../edn"
|
||||
|
||||
[dependencies.mentat_tx]
|
||||
path = "../tx"
|
||||
path = "../tx"
|
||||
|
||||
[dependencies.mentat_parser_utils]
|
||||
path = "../parser-utils"
|
||||
path = "../parser-utils"
|
||||
|
|
26
tx-parser/src/errors.rs
Normal file
26
tx-parser/src/errors.rs
Normal file
|
@ -0,0 +1,26 @@
|
|||
// Copyright 2016 Mozilla
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
|
||||
// this file except in compliance with the License. You may obtain a copy of the
|
||||
// License at http://www.apache.org/licenses/LICENSE-2.0
|
||||
// Unless required by applicable law or agreed to in writing, software distributed
|
||||
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
|
||||
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations under the License.
|
||||
|
||||
#![allow(dead_code)]
|
||||
|
||||
use mentat_parser_utils::ValueParseError;
|
||||
|
||||
error_chain! {
|
||||
types {
|
||||
Error, ErrorKind, ResultExt, Result;
|
||||
}
|
||||
|
||||
errors {
|
||||
ParseError(value_parse_error: ValueParseError) {
|
||||
description("error parsing edn values")
|
||||
display("error parsing edn values:\n{}", value_parse_error)
|
||||
}
|
||||
}
|
||||
}
|
|
@ -10,8 +10,11 @@
|
|||
|
||||
#![allow(dead_code)]
|
||||
|
||||
extern crate edn;
|
||||
extern crate combine;
|
||||
#[macro_use]
|
||||
extern crate error_chain;
|
||||
|
||||
extern crate edn;
|
||||
extern crate mentat_tx;
|
||||
|
||||
#[macro_use]
|
||||
|
@ -22,7 +25,10 @@ use combine::combinator::{Expected, FnParser};
|
|||
use edn::symbols::NamespacedKeyword;
|
||||
use edn::types::Value;
|
||||
use mentat_tx::entities::{Entid, EntidOrLookupRefOrTempId, Entity, LookupRef, OpType};
|
||||
use mentat_parser_utils::ResultParser;
|
||||
use mentat_parser_utils::{ResultParser, ValueParseError};
|
||||
|
||||
pub mod errors;
|
||||
pub use errors::*;
|
||||
|
||||
pub struct Tx<I>(::std::marker::PhantomData<fn(I) -> I>);
|
||||
|
||||
|
@ -158,14 +164,14 @@ def_parser_fn!(Tx, entities, Value, Vec<Entity>, input, {
|
|||
.parse_stream(input)
|
||||
});
|
||||
|
||||
impl<I> Tx<I>
|
||||
where I: Stream<Item = Value>
|
||||
{
|
||||
pub fn parse(input: I) -> Result<Vec<Entity>, combine::ParseError<I>> {
|
||||
(Tx::<I>::entities(), eof())
|
||||
impl<'a> Tx<&'a [edn::Value]> {
|
||||
pub fn parse(input: &'a [edn::Value]) -> std::result::Result<Vec<Entity>, errors::Error> {
|
||||
(Tx::<_>::entities(), eof())
|
||||
.map(|(es, _)| es)
|
||||
.parse(input)
|
||||
.map(|x| x.0)
|
||||
.map_err::<ValueParseError, _>(|e| e.translate_position(input).into())
|
||||
.map_err(|e| Error::from_kind(ErrorKind::ParseError(e)))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -30,8 +30,8 @@ fn test_entities() {
|
|||
let input = [edn];
|
||||
|
||||
let result = Tx::parse(&input[..]);
|
||||
assert_eq!(result,
|
||||
Ok(vec![
|
||||
assert_eq!(result.unwrap(),
|
||||
vec![
|
||||
Entity::AddOrRetract {
|
||||
op: OpType::Add,
|
||||
e: EntidOrLookupRefOrTempId::Entid(Entid::Entid(101)),
|
||||
|
@ -50,7 +50,7 @@ fn test_entities() {
|
|||
a: Entid::Ident(NamespacedKeyword::new("test", "b")),
|
||||
v: Value::Text("w".into()),
|
||||
},
|
||||
]));
|
||||
]);
|
||||
}
|
||||
|
||||
// TODO: test error handling in select cases.
|
||||
|
|
|
@ -74,7 +74,7 @@
|
|||
;; This ref doesn't exist, so the assertion will be ignored.
|
||||
[:db/retract "t1" :db.schema/attribute 103]]
|
||||
:test/expected-transaction
|
||||
#{[?tx6 :db/txInstant ?ms6 ?tx6 true]}
|
||||
#{[?tx5 :db/txInstant ?ms5 ?tx5 true]}
|
||||
:test/expected-error-message
|
||||
""
|
||||
:test/expected-tempids
|
||||
|
@ -94,9 +94,9 @@
|
|||
[[:db/add "t1" :db/ident :name/Josef]
|
||||
[:db/add "t2" :db.schema/attribute "t1"]]
|
||||
:test/expected-transaction
|
||||
#{[65538 :db/ident :name/Josef ?tx8 true]
|
||||
[65539 :db.schema/attribute 65538 ?tx8 true]
|
||||
[?tx8 :db/txInstant ?ms8 ?tx8 true]}
|
||||
#{[65538 :db/ident :name/Josef ?tx6 true]
|
||||
[65539 :db.schema/attribute 65538 ?tx6 true]
|
||||
[?tx6 :db/txInstant ?ms6 ?tx6 true]}
|
||||
:test/expected-error-message
|
||||
""
|
||||
:test/expected-tempids
|
||||
|
|
Loading…
Reference in a new issue