diff --git a/Cargo.toml b/Cargo.toml index 16eec210..93da6d4e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,7 @@ rustc_version = "0.1.7" [dependencies] clap = "2.19.3" +error-chain = "0.9.0" nickel = "0.9.0" slog = "1.4.0" slog-scope = "0.2.2" diff --git a/db/Cargo.toml b/db/Cargo.toml index 75b4b985..e0ccde58 100644 --- a/db/Cargo.toml +++ b/db/Cargo.toml @@ -4,7 +4,7 @@ version = "0.0.1" workspace = ".." [dependencies] -error-chain = "0.8.0" +error-chain = "0.9.0" itertools = "0.5.9" lazy_static = "0.2.2" ordered-float = "0.4.0" diff --git a/db/src/bootstrap.rs b/db/src/bootstrap.rs index 7bf8d1c9..f058ece5 100644 --- a/db/src/bootstrap.rs +++ b/db/src/bootstrap.rs @@ -96,8 +96,8 @@ lazy_static! { static ref V1_SYMBOLIC_SCHEMA: Value = { let s = r#" {:db/ident {:db/valueType :db.type/keyword - :db/cardinality :db.cardinality/one - :db/unique :db.unique/identity} + :db/cardinality :db.cardinality/one + :db/unique :db.unique/identity} :db.install/partition {:db/valueType :db.type/ref :db/cardinality :db.cardinality/many} :db.install/valueType {:db/valueType :db.type/ref diff --git a/db/src/db.rs b/db/src/db.rs index 66c3a039..1a6fd5ff 100644 --- a/db/src/db.rs +++ b/db/src/db.rs @@ -22,7 +22,7 @@ use itertools::Itertools; use rusqlite; use rusqlite::types::{ToSql, ToSqlOutput}; -use ::{now, repeat_values, to_namespaced_keyword}; +use ::{repeat_values, to_namespaced_keyword}; use bootstrap; use edn::types::Value; use edn::symbols; @@ -35,7 +35,6 @@ use mentat_core::{ TypedValue, ValueType, }; -use mentat_tx::entities::Entity; use errors::{ErrorKind, Result, ResultExt}; use schema::SchemaBuilding; use types::{ @@ -44,9 +43,8 @@ use types::{ DB, Partition, PartitionMap, - TxReport, }; -use tx::Tx; +use tx::transact; pub fn new_connection(uri: T) -> rusqlite::Result where T: AsRef { let conn = match uri.as_ref().to_string_lossy().len() { @@ -209,13 +207,20 @@ pub fn create_current_version(conn: &mut rusqlite::Connection) -> Result { } // TODO: return to transact_internal to self-manage the encompassing SQLite transaction. - let mut bootstrap_db = DB::new(bootstrap_partition_map, bootstrap::bootstrap_schema()); - bootstrap_db.transact(&tx, bootstrap::bootstrap_entities())?; + let bootstrap_schema = bootstrap::bootstrap_schema(); + let (_report, next_partition_map, next_schema) = transact(&tx, &bootstrap_partition_map, &bootstrap_schema, bootstrap::bootstrap_entities())?; + if next_schema.is_some() { + // TODO Use custom ErrorKind https://github.com/brson/error-chain/issues/117 + bail!(ErrorKind::NotYetImplemented(format!("Initial bootstrap transaction did not produce expected bootstrap schema"))); + } set_user_version(&tx, CURRENT_VERSION)?; // TODO: use the drop semantics to do this automagically? tx.commit()?; + + // TODO: ensure that schema is not changed by bootstrap transaction. + let bootstrap_db = DB::new(next_partition_map, bootstrap_schema); Ok(bootstrap_db) } @@ -449,7 +454,7 @@ pub fn read_db(conn: &rusqlite::Connection) -> Result { } /// Internal representation of an [e a v added] datom, ready to be transacted against the store. -pub type ReducedEntity = (i64, i64, TypedValue, bool); +pub type ReducedEntity<'a> = (Entid, Entid, &'a Attribute, TypedValue, bool); #[derive(Clone,Debug,Eq,Hash,Ord,PartialOrd,PartialEq)] pub enum SearchType { @@ -457,34 +462,13 @@ pub enum SearchType { Inexact, } -impl DB { - /// Do schema-aware typechecking and coercion. - /// - /// Either assert that the given value is in the attribute's value set, or (in limited cases) - /// coerce the given value into the attribute's value set. - pub fn to_typed_value(&self, value: &Value, attribute: &Attribute) -> Result { - // TODO: encapsulate entid-ident-attribute for better error messages. - match TypedValue::from_edn_value(value) { - // We don't recognize this EDN at all. Get out! - None => bail!(ErrorKind::BadEDNValuePair(value.clone(), attribute.value_type.clone())), - Some(typed_value) => match (&attribute.value_type, typed_value) { - // Most types don't coerce at all. - (&ValueType::Boolean, tv @ TypedValue::Boolean(_)) => Ok(tv), - (&ValueType::Long, tv @ TypedValue::Long(_)) => Ok(tv), - (&ValueType::Double, tv @ TypedValue::Double(_)) => Ok(tv), - (&ValueType::String, tv @ TypedValue::String(_)) => Ok(tv), - (&ValueType::Keyword, tv @ TypedValue::Keyword(_)) => Ok(tv), - // Ref coerces a little: we interpret some things depending on the schema as a Ref. - (&ValueType::Ref, TypedValue::Long(x)) => Ok(TypedValue::Ref(x)), - (&ValueType::Ref, TypedValue::Keyword(ref x)) => { - self.schema.require_entid(&x).map(|entid| TypedValue::Ref(entid)) - } - // Otherwise, we have a type mismatch. - (value_type, _) => bail!(ErrorKind::BadEDNValuePair(value.clone(), value_type.clone())), - } - } - } - +/// `MentatStoring` will be the trait that encapsulates the storage layer. It is consumed by the +/// transaction processing layer. +/// +/// Right now, the only implementation of `MentatStoring` is the SQLite-specific SQL schema. In the +/// future, we might consider other SQL engines (perhaps with different fulltext indexing), or +/// entirely different data stores, say ones shaped like key-value stores. +pub trait MentatStoring { /// Given a slice of [a v] lookup-refs, look up the corresponding [e a v] triples. /// /// It is assumed that the attribute `a` in each lookup-ref is `:db/unique`, so that at most one @@ -492,8 +476,135 @@ impl DB { /// chosen non-deterministically, if one exists.) /// /// Returns a map &(a, v) -> e, to avoid cloning potentially large values. The keys of the map - /// are exactly those (a, v) pairs that have an assertion [e a v] in the datom store. - pub fn resolve_avs<'a>(&self, conn: &rusqlite::Connection, avs: &'a [&'a AVPair]) -> Result> { + /// are exactly those (a, v) pairs that have an assertion [e a v] in the store. + fn resolve_avs<'a>(&self, avs: &'a [&'a AVPair]) -> Result>; + + /// Begin (or prepare) the underlying storage layer for a new Mentat transaction. + /// + /// Use this to create temporary tables, prepare indices, set pragmas, etc, before the initial + /// `insert_non_fts_searches` invocation. + fn begin_transaction(&self) -> Result<()>; + + // TODO: this is not a reasonable abstraction, but I don't want to really consider non-SQL storage just yet. + fn insert_non_fts_searches<'a>(&self, entities: &'a [ReducedEntity], search_type: SearchType) -> Result<()>; + + /// Finalize the underlying storage layer after a Mentat transaction. + /// + /// Use this to finalize temporary tables, complete indices, revert pragmas, etc, after the + /// final `insert_non_fts_searches` invocation. + fn commit_transaction(&self, tx_id: Entid) -> Result<()>; +} + +/// Take search rows and complete `temp.search_results`. +/// +/// See https://github.com/mozilla/mentat/wiki/Transacting:-entity-to-SQL-translation. +fn search(conn: &rusqlite::Connection) -> Result<()> { + // First is fast, only one table walk: lookup by exact eav. + // Second is slower, but still only one table walk: lookup old value by ea. + let s = r#" + INSERT INTO temp.search_results + SELECT t.e0, t.a0, t.v0, t.value_type_tag0, t.added0, t.flags0, ':db.cardinality/many', d.rowid, d.v + FROM temp.exact_searches AS t + LEFT JOIN datoms AS d + ON t.e0 = d.e AND + t.a0 = d.a AND + t.value_type_tag0 = d.value_type_tag AND + t.v0 = d.v + + UNION ALL + + SELECT t.e0, t.a0, t.v0, t.value_type_tag0, t.added0, t.flags0, ':db.cardinality/one', d.rowid, d.v + FROM temp.inexact_searches AS t + LEFT JOIN datoms AS d + ON t.e0 = d.e AND + t.a0 = d.a"#; + + let mut stmt = conn.prepare_cached(s)?; + stmt.execute(&[]) + .map(|_c| ()) + .chain_err(|| "Could not search!") +} + +/// Insert the new transaction into the `transactions` table. +/// +/// This turns the contents of `search_results` into a new transaction. +/// +/// See https://github.com/mozilla/mentat/wiki/Transacting:-entity-to-SQL-translation. +fn insert_transaction(conn: &rusqlite::Connection, tx: Entid) -> Result<()> { + let s = r#" + INSERT INTO transactions (e, a, v, tx, added, value_type_tag) + SELECT e0, a0, v0, ?, 1, value_type_tag0 + FROM temp.search_results + WHERE added0 IS 1 AND ((rid IS NULL) OR ((rid IS NOT NULL) AND (v0 IS NOT v)))"#; + + let mut stmt = conn.prepare_cached(s)?; + stmt.execute(&[&tx]) + .map(|_c| ()) + .chain_err(|| "Could not insert transaction: failed to add datoms not already present")?; + + let s = r#" + INSERT INTO transactions (e, a, v, tx, added, value_type_tag) + SELECT e0, a0, v, ?, 0, value_type_tag0 + FROM temp.search_results + WHERE rid IS NOT NULL AND + ((added0 IS 0) OR + (added0 IS 1 AND search_type IS ':db.cardinality/one' AND v0 IS NOT v))"#; + + let mut stmt = conn.prepare_cached(s)?; + stmt.execute(&[&tx]) + .map(|_c| ()) + .chain_err(|| "Could not insert transaction: failed to retract datoms already present")?; + + Ok(()) +} + +/// Update the contents of the `datoms` materialized view with the new transaction. +/// +/// This applies the contents of `search_results` to the `datoms` table (in place). +/// +/// See https://github.com/mozilla/mentat/wiki/Transacting:-entity-to-SQL-translation. +fn update_datoms(conn: &rusqlite::Connection, tx: Entid) -> Result<()> { + // Delete datoms that were retracted, or those that were :db.cardinality/one and will be + // replaced. + let s = r#" + WITH ids AS (SELECT rid + FROM temp.search_results + WHERE rid IS NOT NULL AND + ((added0 IS 0) OR + (added0 IS 1 AND search_type IS ':db.cardinality/one' AND v0 IS NOT v))) + DELETE FROM datoms WHERE rowid IN ids"#; + + let mut stmt = conn.prepare_cached(s)?; + stmt.execute(&[]) + .map(|_c| ()) + .chain_err(|| "Could not update datoms: failed to retract datoms already present")?; + + // Insert datoms that were added and not already present. We also must + // expand our bitfield into flags. + let s = format!(r#" + INSERT INTO datoms (e, a, v, tx, value_type_tag, index_avet, index_vaet, index_fulltext, unique_value) + SELECT e0, a0, v0, ?, value_type_tag0, + flags0 & {} IS NOT 0, + flags0 & {} IS NOT 0, + flags0 & {} IS NOT 0, + flags0 & {} IS NOT 0 + FROM temp.search_results + WHERE added0 IS 1 AND ((rid IS NULL) OR ((rid IS NOT NULL) AND (v0 IS NOT v)))"#, + AttributeBitFlags::IndexAVET as u8, + AttributeBitFlags::IndexVAET as u8, + AttributeBitFlags::IndexFulltext as u8, + AttributeBitFlags::UniqueValue as u8); + + let mut stmt = conn.prepare_cached(&s)?; + stmt.execute(&[&tx]) + .map(|_c| ()) + .chain_err(|| "Could not update datoms: failed to add datoms not already present")?; + + Ok(()) +} + +impl MentatStoring for rusqlite::Connection { + fn resolve_avs<'a>(&self, avs: &'a [&'a AVPair]) -> Result> { // Start search_id's at some identifiable number. let initial_search_id = 2000; let bindings_per_statement = 4; @@ -537,7 +648,7 @@ impl DB { FROM t, all_datoms AS d \ WHERE d.index_avet IS NOT 0 AND d.a = t.a AND d.value_type_tag = t.value_type_tag AND d.v = t.v", values); - let mut stmt: rusqlite::Statement = conn.prepare(s.as_str())?; + let mut stmt: rusqlite::Statement = self.prepare(s.as_str())?; let m: Result> = stmt.query_and_then(¶ms, |row| -> Result<(i64, Entid)> { Ok((row.get_checked(0)?, row.get_checked(1)?)) @@ -557,20 +668,18 @@ impl DB { } /// Create empty temporary tables for search parameters and search results. - fn create_temp_tables(&self, conn: &rusqlite::Connection) -> Result<()> { + fn begin_transaction(&self) -> Result<()> { // We can't do this in one shot, since we can't prepare a batch statement. let statements = [ r#"DROP TABLE IF EXISTS temp.exact_searches"#, // Note that `flags0` is a bitfield of several flags compressed via // `AttributeBitFlags.flags()` in the temporary search tables, later // expanded in the `datoms` insertion. - // TODO: drop tx0 entirely. r#"CREATE TABLE temp.exact_searches ( e0 INTEGER NOT NULL, a0 SMALLINT NOT NULL, v0 BLOB NOT NULL, value_type_tag0 SMALLINT NOT NULL, - tx0 INTEGER NOT NULL, added0 TINYINT NOT NULL, flags0 TINYINT NOT NULL)"#, // There's no real need to split exact and inexact searches, so long as we keep things @@ -582,7 +691,6 @@ impl DB { a0 SMALLINT NOT NULL, v0 BLOB NOT NULL, value_type_tag0 SMALLINT NOT NULL, - tx0 INTEGER NOT NULL, added0 TINYINT NOT NULL, flags0 TINYINT NOT NULL)"#, r#"DROP TABLE IF EXISTS temp.search_results"#, @@ -593,7 +701,6 @@ impl DB { a0 SMALLINT NOT NULL, v0 BLOB NOT NULL, value_type_tag0 SMALLINT NOT NULL, - tx0 INTEGER NOT NULL, added0 TINYINT NOT NULL, flags0 TINYINT NOT NULL, search_type STRING NOT NULL, @@ -608,7 +715,7 @@ impl DB { ]; for statement in &statements { - let mut stmt = conn.prepare_cached(statement)?; + let mut stmt = self.prepare_cached(statement)?; stmt.execute(&[]) .map(|_c| ()) .chain_err(|| "Failed to create temporary tables")?; @@ -621,8 +728,8 @@ impl DB { /// /// Eventually, the details of this approach will be captured in /// https://github.com/mozilla/mentat/wiki/Transacting:-entity-to-SQL-translation. - pub fn insert_non_fts_searches<'a>(&self, conn: &rusqlite::Connection, entities: &'a [ReducedEntity], tx: Entid, search_type: SearchType) -> Result<()> { - let bindings_per_statement = 7; + fn insert_non_fts_searches<'a>(&self, entities: &'a [ReducedEntity<'a>], search_type: SearchType) -> Result<()> { + let bindings_per_statement = 6; let chunks: itertools::IntoChunks<_> = entities.into_iter().chunks(::SQLITE_MAX_VARIABLE_NUMBER / bindings_per_statement); @@ -633,21 +740,18 @@ impl DB { // We must keep these computed values somewhere to reference them later, so we can't // combine this map and the subsequent flat_map. // (e0, a0, v0, value_type_tag0, added0, flags0) - let block: Result /* value */, /* value_type_tag */ i32, - /* added0 */ bool, - /* flags0 */ u8)>> = chunk.map(|&(e, a, ref typed_value, added)| { + let block: Result /* value */, + i32 /* value_type_tag */, + bool, /* added0 */ + u8 /* flags0 */)>> = chunk.map(|&(e, a, ref attribute, ref typed_value, added)| { count += 1; - let attribute: &Attribute = self.schema.require_attribute_for_entid(a)?; // Now we can represent the typed value as an SQL value. let (value, value_type_tag): (ToSqlOutput, i32) = typed_value.to_sql_value_pair(); - let flags = attribute.flags(); - - Ok((e, a, value, value_type_tag, - added, - flags)) + Ok((e, a, value, value_type_tag, added, attribute.flags())) }).collect(); let block = block?; @@ -659,21 +763,20 @@ impl DB { .chain(once(a as &ToSql) .chain(once(value as &ToSql) .chain(once(value_type_tag as &ToSql) - .chain(once(&tx as &ToSql) - .chain(once(to_bool_ref(added) as &ToSql) - .chain(once(flags as &ToSql))))))) + .chain(once(to_bool_ref(added) as &ToSql) + .chain(once(flags as &ToSql)))))) }).collect(); // TODO: cache this for selected values of count. let values: String = repeat_values(bindings_per_statement, count); let s: String = if search_type == SearchType::Exact { - format!("INSERT INTO temp.exact_searches (e0, a0, v0, value_type_tag0, tx0, added0, flags0) VALUES {}", values) + format!("INSERT INTO temp.exact_searches (e0, a0, v0, value_type_tag0, added0, flags0) VALUES {}", values) } else { - format!("INSERT INTO temp.inexact_searches (e0, a0, v0, value_type_tag0, tx0, added0, flags0) VALUES {}", values) + format!("INSERT INTO temp.inexact_searches (e0, a0, v0, value_type_tag0, added0, flags0) VALUES {}", values) }; // TODO: consider ensuring we inserted the expected number of rows. - let mut stmt = conn.prepare_cached(s.as_str())?; + let mut stmt = self.prepare_cached(s.as_str())?; stmt.execute(¶ms) .map(|_c| ()) .chain_err(|| "Could not insert non-fts one statements into temporary search table!") @@ -682,151 +785,55 @@ impl DB { results.map(|_| ()) } - /// Take search rows and complete `temp.search_results`. - /// - /// See https://github.com/mozilla/mentat/wiki/Transacting:-entity-to-SQL-translation. - pub fn search(&self, conn: &rusqlite::Connection) -> Result<()> { - // First is fast, only one table walk: lookup by exact eav. - // Second is slower, but still only one table walk: lookup old value by ea. - let s = r#" - INSERT INTO temp.search_results - SELECT t.e0, t.a0, t.v0, t.value_type_tag0, t.tx0, t.added0, t.flags0, ':db.cardinality/many', d.rowid, d.v - FROM temp.exact_searches AS t - LEFT JOIN datoms AS d - ON t.e0 = d.e AND - t.a0 = d.a AND - t.value_type_tag0 = d.value_type_tag AND - t.v0 = d.v - - UNION ALL - - SELECT t.e0, t.a0, t.v0, t.value_type_tag0, t.tx0, t.added0, t.flags0, ':db.cardinality/one', d.rowid, d.v - FROM temp.inexact_searches AS t - LEFT JOIN datoms AS d - ON t.e0 = d.e AND - t.a0 = d.a"#; - - let mut stmt = conn.prepare_cached(s)?; - stmt.execute(&[]) - .map(|_c| ()) - .chain_err(|| "Could not search!") - } - - /// Insert the new transaction into the `transactions` table. - /// - /// This turns the contents of `search_results` into a new transaction. - /// - /// See https://github.com/mozilla/mentat/wiki/Transacting:-entity-to-SQL-translation. - // TODO: capture `conn` in a `TxInternal` structure. - pub fn insert_transaction(&self, conn: &rusqlite::Connection, tx: Entid) -> Result<()> { - let s = r#" - INSERT INTO transactions (e, a, v, tx, added, value_type_tag) - SELECT e0, a0, v0, ?, 1, value_type_tag0 - FROM temp.search_results - WHERE added0 IS 1 AND ((rid IS NULL) OR ((rid IS NOT NULL) AND (v0 IS NOT v)))"#; - - let mut stmt = conn.prepare_cached(s)?; - stmt.execute(&[&tx]) - .map(|_c| ()) - .chain_err(|| "Could not insert transaction: failed to add datoms not already present")?; - - let s = r#" - INSERT INTO transactions (e, a, v, tx, added, value_type_tag) - SELECT e0, a0, v, ?, 0, value_type_tag0 - FROM temp.search_results - WHERE rid IS NOT NULL AND - ((added0 IS 0) OR - (added0 IS 1 AND search_type IS ':db.cardinality/one' AND v0 IS NOT v))"#; - - let mut stmt = conn.prepare_cached(s)?; - stmt.execute(&[&tx]) - .map(|_c| ()) - .chain_err(|| "Could not insert transaction: failed to retract datoms already present")?; - + fn commit_transaction(&self, tx_id: Entid) -> Result<()> { + search(&self)?; + insert_transaction(&self, tx_id)?; + update_datoms(&self, tx_id)?; Ok(()) } +} - /// Update the contents of the `datoms` materialized view with the new transaction. - /// - /// This applies the contents of `search_results` to the `datoms` table (in place). - /// - /// See https://github.com/mozilla/mentat/wiki/Transacting:-entity-to-SQL-translation. - // TODO: capture `conn` in a `TxInternal` structure. - pub fn update_datoms(&self, conn: &rusqlite::Connection, tx: Entid) -> Result<()> { - // Delete datoms that were retracted, or those that were :db.cardinality/one and will be - // replaced. - let s = r#" - WITH ids AS (SELECT rid - FROM temp.search_results - WHERE rid IS NOT NULL AND - ((added0 IS 0) OR - (added0 IS 1 AND search_type IS ':db.cardinality/one' AND v0 IS NOT v))) - DELETE FROM datoms WHERE rowid IN ids"#; - - let mut stmt = conn.prepare_cached(s)?; - stmt.execute(&[]) - .map(|_c| ()) - .chain_err(|| "Could not update datoms: failed to retract datoms already present")?; - - // Insert datoms that were added and not already present. We also must - // expand our bitfield into flags. - let s = format!(r#" - INSERT INTO datoms (e, a, v, tx, value_type_tag, index_avet, index_vaet, index_fulltext, unique_value) - SELECT e0, a0, v0, ?, value_type_tag0, - flags0 & {} IS NOT 0, - flags0 & {} IS NOT 0, - flags0 & {} IS NOT 0, - flags0 & {} IS NOT 0 - FROM temp.search_results - WHERE added0 IS 1 AND ((rid IS NULL) OR ((rid IS NOT NULL) AND (v0 IS NOT v)))"#, - AttributeBitFlags::IndexAVET as u8, - AttributeBitFlags::IndexVAET as u8, - AttributeBitFlags::IndexFulltext as u8, - AttributeBitFlags::UniqueValue as u8); - - let mut stmt = conn.prepare_cached(&s)?; - stmt.execute(&[&tx]) - .map(|_c| ()) - .chain_err(|| "Could not update datoms: failed to add datoms not already present")?; - - Ok(()) +/// Update the current partition map materialized view. +// TODO: only update changed partitions. +pub fn update_partition_map(conn: &rusqlite::Connection, partition_map: &PartitionMap) -> Result<()> { + let values_per_statement = 2; + let max_partitions = ::SQLITE_MAX_VARIABLE_NUMBER / values_per_statement; + if partition_map.len() > max_partitions { + bail!(ErrorKind::NotYetImplemented(format!("No more than {} partitions are supported", max_partitions))); } - /// Update the current partition map materialized view. - // TODO: only update changed partitions. - pub fn update_partition_map(&self, conn: &rusqlite::Connection) -> Result<()> { - let values_per_statement = 2; - let max_partitions = ::SQLITE_MAX_VARIABLE_NUMBER / values_per_statement; - if self.partition_map.len() > max_partitions { - bail!(ErrorKind::NotYetImplemented(format!("No more than {} partitions are supported", max_partitions))); - } + // Like "UPDATE parts SET idx = CASE WHEN part = ? THEN ? WHEN part = ? THEN ? ELSE idx END". + let s = format!("UPDATE parts SET idx = CASE {} ELSE idx END", + repeat("WHEN part = ? THEN ?").take(partition_map.len()).join(" ")); - // Like "UPDATE parts SET idx = CASE WHEN part = ? THEN ? WHEN part = ? THEN ? ELSE idx END". - let s = format!("UPDATE parts SET idx = CASE {} ELSE idx END", - repeat("WHEN part = ? THEN ?").take(self.partition_map.len()).join(" ")); + let params: Vec<&ToSql> = partition_map.iter().flat_map(|(name, partition)| { + once(name as &ToSql) + .chain(once(&partition.index as &ToSql)) + }).collect(); - let params: Vec<&ToSql> = self.partition_map.iter().flat_map(|(name, partition)| { - once(name as &ToSql) - .chain(once(&partition.index as &ToSql)) - }).collect(); + // TODO: only cache the latest of these statements. Changing the set of partitions isn't + // supported in the Clojure implementation at all, and might not be supported in Mentat soon, + // so this is very low priority. + let mut stmt = conn.prepare_cached(s.as_str())?; + stmt.execute(¶ms[..]) + .map(|_c| ()) + .chain_err(|| "Could not update partition map") +} - // TODO: only cache the latest of these statements. Changing the set of partitions isn't - // supported in the Clojure implementation at all, and might not be supported in Mentat soon, - // so this is very low priority. - let mut stmt = conn.prepare_cached(s.as_str())?; - stmt.execute(¶ms[..]) - .map(|_c| ()) - .chain_err(|| "Could not update partition map") - } +pub trait PartitionMapping { + fn allocate_entid(&mut self, partition: &S) -> i64 where String: Borrow; + fn allocate_entids(&mut self, partition: &S, n: usize) -> Range where String: Borrow; +} +impl PartitionMapping for PartitionMap { /// Allocate a single fresh entid in the given `partition`. - pub fn allocate_entid(&mut self, partition: &S) -> i64 where String: Borrow { + fn allocate_entid(&mut self, partition: &S) -> i64 where String: Borrow { self.allocate_entids(partition, 1).start } /// Allocate `n` fresh entids in the given `partition`. - pub fn allocate_entids(&mut self, partition: &S, n: usize) -> Range where String: Borrow { - match self.partition_map.get_mut(partition) { + fn allocate_entids(&mut self, partition: &S, n: usize) -> Range where String: Borrow { + match self.get_mut(partition) { Some(mut partition) => { let idx = partition.index; partition.index += n as i64; @@ -836,24 +843,6 @@ impl DB { None => panic!("Cannot allocate entid from unknown partition: {}", partition), } } - - /// Transact the given `entities` against the given SQLite `conn`, using the metadata in - /// `self.DB`. - /// - /// This approach is explained in https://github.com/mozilla/mentat/wiki/Transacting. - // TODO: move this to the transactor layer. - pub fn transact(&mut self, conn: &rusqlite::Connection, entities: I) -> Result where I: IntoIterator { - // Eventually, this function will be responsible for managing a SQLite transaction. For - // now, it's just about the tx details. - - let tx_instant = now(); // Label the transaction with the timestamp when we first see it: leading edge. - let tx_id = self.allocate_entid(":db.part/tx"); - - self.create_temp_tables(conn)?; - - let mut tx = Tx::new(self, conn, tx_id, tx_instant); - tx.transact_entities(entities) - } } #[cfg(test)] @@ -865,6 +854,7 @@ mod tests { use edn::symbols; use mentat_tx_parser; use rusqlite; + use tx::transact; #[test] fn test_open_current_version() { @@ -881,11 +871,11 @@ mod tests { let db = read_db(&conn).unwrap(); // Does not include :db/txInstant. - let datoms = debug::datoms_after(&conn, &db, 0).unwrap(); + let datoms = debug::datoms_after(&conn, &db.schema, 0).unwrap(); assert_eq!(datoms.0.len(), 88); // Includes :db/txInstant. - let transactions = debug::transactions_after(&conn, &db, 0).unwrap(); + let transactions = debug::transactions_after(&conn, &db.schema, 0).unwrap(); assert_eq!(transactions.0.len(), 1); assert_eq!(transactions.0[0].0.len(), 89); } @@ -898,9 +888,11 @@ mod tests { /// There is some magic here about transaction numbering that I don't want to commit to or /// document just yet. The end state might be much more general pattern matching syntax, rather /// than the targeted transaction ID and timestamp replacement we have right now. - fn assert_transactions(conn: &rusqlite::Connection, db: &mut DB, transactions: &Vec) { - for (index, transaction) in transactions.into_iter().enumerate() { - let index = index as i64; + // TODO: accept a `Conn`. + fn assert_transactions<'a>(conn: &rusqlite::Connection, partition_map: &mut PartitionMap, schema: &mut Schema, transactions: &Vec) { + let mut index: i64 = bootstrap::TX0; + + for transaction in transactions { let transaction = transaction.as_map().unwrap(); let label: edn::Value = transaction.get(&edn::Value::NamespacedKeyword(symbols::NamespacedKeyword::new("test", "label"))).unwrap().clone(); let assertions: edn::Value = transaction.get(&edn::Value::NamespacedKeyword(symbols::NamespacedKeyword::new("test", "assertions"))).unwrap().clone(); @@ -910,10 +902,12 @@ mod tests { let entities: Vec<_> = mentat_tx_parser::Tx::parse(&[assertions][..]).unwrap(); - let maybe_report = db.transact(&conn, entities); + let maybe_report = transact(&conn, partition_map, schema, entities); if let Some(expected_transaction) = expected_transaction { - if expected_transaction.is_nil() { + if !expected_transaction.is_nil() { + index += 1; + } else { assert!(maybe_report.is_err()); if let Some(expected_error_message) = expected_error_message { @@ -925,10 +919,17 @@ mod tests { continue } - let report = maybe_report.unwrap(); - assert_eq!(report.tx_id, bootstrap::TX0 + index + 1); + // TODO: test schema changes in the EDN format. + let (report, next_partition_map, next_schema) = maybe_report.unwrap(); + *partition_map = next_partition_map; + if let Some(next_schema) = next_schema { + *schema = next_schema; + } - let transactions = debug::transactions_after(&conn, &db, bootstrap::TX0 + index).unwrap(); + assert_eq!(index, report.tx_id, + "\n{} - expected tx_id {} but got tx_id {}", label, index, report.tx_id); + + let transactions = debug::transactions_after(&conn, &schema, index - 1).unwrap(); assert_eq!(transactions.0.len(), 1); assert_eq!(*expected_transaction, transactions.0[0].into_edn(), @@ -936,7 +937,7 @@ mod tests { } if let Some(expected_datoms) = expected_datoms { - let datoms = debug::datoms_after(&conn, &db, bootstrap::TX0).unwrap(); + let datoms = debug::datoms_after(&conn, &schema, bootstrap::TX0).unwrap(); assert_eq!(*expected_datoms, datoms.into_edn(), "\n{} - expected datoms:\n{}\nbut got datoms:\n{}", label, *expected_datoms, datoms.into_edn()) @@ -955,11 +956,11 @@ mod tests { let mut db = ensure_current_version(&mut conn).unwrap(); // Does not include :db/txInstant. - let datoms = debug::datoms_after(&conn, &db, 0).unwrap(); + let datoms = debug::datoms_after(&conn, &db.schema, 0).unwrap(); assert_eq!(datoms.0.len(), 88); // Includes :db/txInstant. - let transactions = debug::transactions_after(&conn, &db, 0).unwrap(); + let transactions = debug::transactions_after(&conn, &db.schema, 0).unwrap(); assert_eq!(transactions.0.len(), 1); assert_eq!(transactions.0[0].0.len(), 89); @@ -967,7 +968,8 @@ mod tests { let value = edn::parse::value(include_str!("../../tx/fixtures/test_add.edn")).unwrap().without_spans(); let transactions = value.as_vector().unwrap(); - assert_transactions(&conn, &mut db, transactions); + + assert_transactions(&conn, &mut db.partition_map, &mut db.schema, transactions); } #[test] @@ -976,18 +978,18 @@ mod tests { let mut db = ensure_current_version(&mut conn).unwrap(); // Does not include :db/txInstant. - let datoms = debug::datoms_after(&conn, &db, 0).unwrap(); + let datoms = debug::datoms_after(&conn, &db.schema, 0).unwrap(); assert_eq!(datoms.0.len(), 88); // Includes :db/txInstant. - let transactions = debug::transactions_after(&conn, &db, 0).unwrap(); + let transactions = debug::transactions_after(&conn, &db.schema, 0).unwrap(); assert_eq!(transactions.0.len(), 1); assert_eq!(transactions.0[0].0.len(), 89); let value = edn::parse::value(include_str!("../../tx/fixtures/test_retract.edn")).unwrap().without_spans(); let transactions = value.as_vector().unwrap(); - assert_transactions(&conn, &mut db, transactions); + assert_transactions(&conn, &mut db.partition_map, &mut db.schema, transactions); } #[test] @@ -996,17 +998,17 @@ mod tests { let mut db = ensure_current_version(&mut conn).unwrap(); // Does not include :db/txInstant. - let datoms = debug::datoms_after(&conn, &db, 0).unwrap(); + let datoms = debug::datoms_after(&conn, &db.schema, 0).unwrap(); assert_eq!(datoms.0.len(), 88); // Includes :db/txInstant. - let transactions = debug::transactions_after(&conn, &db, 0).unwrap(); + let transactions = debug::transactions_after(&conn, &db.schema, 0).unwrap(); assert_eq!(transactions.0.len(), 1); assert_eq!(transactions.0[0].0.len(), 89); let value = edn::parse::value(include_str!("../../tx/fixtures/test_upsert_vector.edn")).unwrap().without_spans(); let transactions = value.as_vector().unwrap(); - assert_transactions(&conn, &mut db, transactions); + assert_transactions(&conn, &mut db.partition_map, &mut db.schema, transactions); } } diff --git a/db/src/debug.rs b/db/src/debug.rs index 10d6b203..afcdc9a0 100644 --- a/db/src/debug.rs +++ b/db/src/debug.rs @@ -12,6 +12,7 @@ /// Low-level functions for testing. +use std::borrow::Borrow; use std::collections::{BTreeSet}; use std::io::{Write}; @@ -28,7 +29,7 @@ use entids; use mentat_core::TypedValue; use mentat_tx::entities::{Entid}; use db::TypedSQLValue; -use types::DB; +use types::Schema; use errors::Result; /// Represents a *datom* (assertion) in the store. @@ -106,21 +107,21 @@ impl Transactions { } /// Convert a numeric entid to an ident `Entid` if possible, otherwise a numeric `Entid`. -fn to_entid(db: &DB, entid: i64) -> Entid { - db.schema.get_ident(entid).map_or(Entid::Entid(entid), |ident| Entid::Ident(ident.clone())) +fn to_entid(schema: &Schema, entid: i64) -> Entid { + schema.get_ident(entid).map_or(Entid::Entid(entid), |ident| Entid::Ident(ident.clone())) } /// Return the set of datoms in the store, ordered by (e, a, v, tx), but not including any datoms of /// the form [... :db/txInstant ...]. -pub fn datoms(conn: &rusqlite::Connection, db: &DB) -> Result { - datoms_after(conn, db, bootstrap::TX0 - 1) +pub fn datoms>(conn: &rusqlite::Connection, schema: &S) -> Result { + datoms_after(conn, schema, bootstrap::TX0 - 1) } /// Return the set of datoms in the store with transaction ID strictly greater than the given `tx`, /// ordered by (e, a, v, tx). /// /// The datom set returned does not include any datoms of the form [... :db/txInstant ...]. -pub fn datoms_after(conn: &rusqlite::Connection, db: &DB, tx: i64) -> Result { +pub fn datoms_after>(conn: &rusqlite::Connection, schema: &S, tx: i64) -> Result { let mut stmt: rusqlite::Statement = conn.prepare("SELECT e, a, v, value_type_tag, tx FROM datoms WHERE tx > ? ORDER BY e ASC, a ASC, v ASC, tx ASC")?; let r: Result> = stmt.query_and_then(&[&tx], |row| { @@ -139,9 +140,10 @@ pub fn datoms_after(conn: &rusqlite::Connection, db: &DB, tx: i64) -> Result Result Result { +pub fn transactions_after>(conn: &rusqlite::Connection, schema: &S, tx: i64) -> Result { let mut stmt: rusqlite::Statement = conn.prepare("SELECT e, a, v, value_type_tag, tx, added FROM transactions WHERE tx > ? ORDER BY tx ASC, e ASC, a ASC, v ASC, added ASC")?; let r: Result> = stmt.query_and_then(&[&tx], |row| { @@ -171,9 +173,10 @@ pub fn transactions_after(conn: &rusqlite::Connection, db: &DB, tx: i64) -> Resu let tx: i64 = row.get_checked(4)?; let added: bool = row.get_checked(5)?; + let borrowed_schema = schema.borrow(); Ok(Datom { - e: to_entid(db, e), - a: to_entid(db, a), + e: to_entid(borrowed_schema, e), + a: to_entid(borrowed_schema, a), v: value, tx: tx, added: Some(added), diff --git a/db/src/lib.rs b/db/src/lib.rs index 3e7c0417..4fdf090f 100644 --- a/db/src/lib.rs +++ b/db/src/lib.rs @@ -26,13 +26,13 @@ extern crate mentat_tx_parser; use itertools::Itertools; use std::iter::repeat; -use errors::{ErrorKind, Result}; +pub use errors::{Error, ErrorKind, ResultExt, Result}; pub mod db; mod bootstrap; -mod debug; +pub mod debug; mod entids; -mod errors; +pub mod errors; mod schema; mod types; mod internal_types; @@ -40,7 +40,12 @@ mod upsert_resolution; mod values; mod tx; -pub use types::DB; +pub use tx::transact; +pub use types::{ + DB, + PartitionMap, + TxReport, +}; use edn::symbols; diff --git a/db/src/schema.rs b/db/src/schema.rs index 30ba81af..c942f283 100644 --- a/db/src/schema.rs +++ b/db/src/schema.rs @@ -10,6 +10,8 @@ #![allow(dead_code)] +use db::TypedSQLValue; +use edn; use entids; use errors::{ErrorKind, Result}; use edn::symbols; @@ -177,3 +179,34 @@ impl SchemaBuilding for Schema { Schema::from_ident_map_and_schema_map(ident_map.clone(), schema_map) } } + +pub trait SchemaTypeChecking { + /// Do schema-aware typechecking and coercion. + /// + /// Either assert that the given value is in the attribute's value set, or (in limited cases) + /// coerce the given value into the attribute's value set. + fn to_typed_value(&self, value: &edn::Value, attribute: &Attribute) -> Result; +} + +impl SchemaTypeChecking for Schema { + fn to_typed_value(&self, value: &edn::Value, attribute: &Attribute) -> Result { + // TODO: encapsulate entid-ident-attribute for better error messages. + match TypedValue::from_edn_value(value) { + // We don't recognize this EDN at all. Get out! + None => bail!(ErrorKind::BadEDNValuePair(value.clone(), attribute.value_type.clone())), + Some(typed_value) => match (&attribute.value_type, typed_value) { + // Most types don't coerce at all. + (&ValueType::Boolean, tv @ TypedValue::Boolean(_)) => Ok(tv), + (&ValueType::Long, tv @ TypedValue::Long(_)) => Ok(tv), + (&ValueType::Double, tv @ TypedValue::Double(_)) => Ok(tv), + (&ValueType::String, tv @ TypedValue::String(_)) => Ok(tv), + (&ValueType::Keyword, tv @ TypedValue::Keyword(_)) => Ok(tv), + // Ref coerces a little: we interpret some things depending on the schema as a Ref. + (&ValueType::Ref, TypedValue::Long(x)) => Ok(TypedValue::Ref(x)), + (&ValueType::Ref, TypedValue::Keyword(ref x)) => self.require_entid(&x).map(|entid| TypedValue::Ref(entid)), + // Otherwise, we have a type mismatch. + (value_type, _) => bail!(ErrorKind::BadEDNValuePair(value.clone(), value_type.clone())), + } + } + } +} diff --git a/db/src/tx.rs b/db/src/tx.rs index 938ceb51..ee9bf174 100644 --- a/db/src/tx.rs +++ b/db/src/tx.rs @@ -48,7 +48,12 @@ use std; use std::collections::BTreeSet; -use db::{ReducedEntity, SearchType}; +use ::{to_namespaced_keyword}; +use db; +use db::{ + MentatStoring, + PartitionMapping, +}; use entids; use errors::{ErrorKind, Result}; use internal_types::{ @@ -60,17 +65,23 @@ use internal_types::{ TermWithTempIds, TermWithoutTempIds, replace_lookup_ref}; -use mentat_core::intern_set; +use mentat_core::{ + intern_set, + Schema, +}; use mentat_tx::entities as entmod; use mentat_tx::entities::{Entity, OpType}; use rusqlite; -use schema::SchemaBuilding; +use schema::{ + SchemaBuilding, + SchemaTypeChecking, +}; use types::{ Attribute, AVPair, AVMap, - DB, Entid, + PartitionMap, TypedValue, TxReport, ValueType, @@ -79,28 +90,37 @@ use upsert_resolution::Generation; /// A transaction on its way to being applied. #[derive(Debug)] -pub struct Tx<'conn> { - /// The metadata to use to interpret the transaction entities with. - pub db: &'conn mut DB, +pub struct Tx<'conn, 'a> { + /// The storage to apply against. In the future, this will be a Mentat connection. + store: &'conn rusqlite::Connection, // TODO: db::MentatStoring, - /// The SQLite connection to apply against. In the future, this will be a Mentat connection. - pub conn: &'conn rusqlite::Connection, + /// The partition map to allocate entids from. + /// + /// The partition map is volatile in the sense that every succesful transaction updates + /// allocates at least one tx ID, so we own and modify our own partition map. + partition_map: PartitionMap, + + /// The schema to use when interpreting the transaction entities. + /// + /// The schema is infrequently updated, so we borrow a schema until we need to modify it. + schema: Cow<'a, Schema>, /// The transaction ID of the transaction. - pub tx_id: Entid, + tx_id: Entid, /// The timestamp when the transaction began to be committed. /// /// This is milliseconds after the Unix epoch according to the transactor's local clock. // TODO: :db.type/instant. - pub tx_instant: i64, + tx_instant: i64, } -impl<'conn> Tx<'conn> { - pub fn new(db: &'conn mut DB, conn: &'conn rusqlite::Connection, tx_id: Entid, tx_instant: i64) -> Tx<'conn> { +impl<'conn, 'a> Tx<'conn, 'a> { + pub fn new(store: &'conn rusqlite::Connection, partition_map: PartitionMap, schema: &'a Schema, tx_id: Entid, tx_instant: i64) -> Tx<'conn, 'a> { Tx { - db: db, - conn: conn, + store: store, + partition_map: partition_map, + schema: Cow::Borrowed(schema), tx_id: tx_id, tx_instant: tx_instant, } @@ -109,7 +129,7 @@ impl<'conn> Tx<'conn> { /// Given a collection of tempids and the [a v] pairs that they might upsert to, resolve exactly /// which [a v] pairs do upsert to entids, and map each tempid that upserts to the upserted /// entid. The keys of the resulting map are exactly those tempids that upserted. - pub fn resolve_temp_id_avs<'a>(&self, conn: &rusqlite::Connection, temp_id_avs: &'a [(TempId, AVPair)]) -> Result { + pub fn resolve_temp_id_avs<'b>(&self, temp_id_avs: &'b [(TempId, AVPair)]) -> Result { if temp_id_avs.is_empty() { return Ok(TempIdMap::default()); } @@ -121,7 +141,7 @@ impl<'conn> Tx<'conn> { } // Lookup in the store. - let av_map: AVMap = self.db.resolve_avs(conn, &av_pairs[..])?; + let av_map: AVMap = self.store.resolve_avs(&av_pairs[..])?; // Map id->entid. let mut temp_id_map: TempIdMap = TempIdMap::default(); @@ -153,16 +173,16 @@ impl<'conn> Tx<'conn> { Entity::AddOrRetract { op, e, a, v } => { let a: i64 = match a { entmod::Entid::Entid(ref a) => *a, - entmod::Entid::Ident(ref a) => self.db.schema.require_entid(&a)?, + entmod::Entid::Ident(ref a) => self.schema.require_entid(&a)?, }; - let attribute: &Attribute = self.db.schema.require_attribute_for_entid(a)?; + let attribute: &Attribute = self.schema.require_attribute_for_entid(a)?; let e = match e { entmod::EntidOrLookupRefOrTempId::Entid(e) => { let e: i64 = match e { entmod::Entid::Entid(ref e) => *e, - entmod::Entid::Ident(ref e) => self.db.schema.require_entid(&e)?, + entmod::Entid::Ident(ref e) => self.schema.require_entid(&e)?, }; std::result::Result::Ok(e) }, @@ -186,7 +206,7 @@ impl<'conn> Tx<'conn> { // Here is where we do schema-aware typechecking: we either assert that // the given value is in the attribute's value set, or (in limited // cases) coerce the value into the attribute's value set. - let typed_value: TypedValue = self.db.to_typed_value(&v, &attribute)?; + let typed_value: TypedValue = self.schema.to_typed_value(&v, &attribute)?; std::result::Result::Ok(typed_value) } @@ -215,27 +235,13 @@ impl<'conn> Tx<'conn> { }).collect::>>() } - /// Transact the given `entities` against the given SQLite `conn`, using the metadata in - /// `self.DB`. + /// Transact the given `entities` against the store. /// /// This approach is explained in https://github.com/mozilla/mentat/wiki/Transacting. // TODO: move this to the transactor layer. pub fn transact_entities(&mut self, entities: I) -> Result where I: IntoIterator { // TODO: push these into an internal transaction report? - /// Assertions that are :db.cardinality/one and not :db.fulltext. - let mut non_fts_one: Vec = vec![]; - - /// Assertions that are :db.cardinality/many and not :db.fulltext. - let mut non_fts_many: Vec = vec![]; - - // Transact [:db/add :db/txInstant NOW :db/tx]. - // TODO: allow this to be present in the transaction data. - non_fts_one.push((self.tx_id, - entids::DB_TX_INSTANT, - TypedValue::Long(self.tx_instant), - true)); - // We don't yet support lookup refs, so this isn't mutable. Later, it'll be mutable. let lookup_refs: intern_set::InternSet = intern_set::InternSet::new(); @@ -245,18 +251,18 @@ impl<'conn> Tx<'conn> { // Pipeline stage 2: resolve lookup refs -> terms with tempids. let lookup_ref_avs: Vec<&(i64, TypedValue)> = lookup_refs.inner.iter().map(|rc| &**rc).collect(); - let lookup_ref_map: AVMap = self.db.resolve_avs(self.conn, &lookup_ref_avs[..])?; + let lookup_ref_map: AVMap = self.store.resolve_avs(&lookup_ref_avs[..])?; let terms_with_temp_ids = self.resolve_lookup_refs(&lookup_ref_map, terms_with_temp_ids_and_lookup_refs)?; // Pipeline stage 3: upsert tempids -> terms without tempids or lookup refs. // Now we can collect upsert populations. - let (mut generation, inert_terms) = Generation::from(terms_with_temp_ids, &self.db.schema)?; + let (mut generation, inert_terms) = Generation::from(terms_with_temp_ids, &self.schema)?; // And evolve them forward. while generation.can_evolve() { // Evolve further. - let temp_id_map = self.resolve_temp_id_avs(self.conn, &generation.temp_id_avs()[..])?; + let temp_id_map = self.resolve_temp_id_avs(&generation.temp_id_avs()[..])?; generation = generation.evolve_one_step(&temp_id_map); } @@ -264,11 +270,18 @@ impl<'conn> Tx<'conn> { let unresolved_temp_ids: BTreeSet = generation.temp_ids_in_allocations(); // TODO: track partitions for temporary IDs. - let entids = self.db.allocate_entids(":db.part/user", unresolved_temp_ids.len()); + let entids = self.partition_map.allocate_entids(":db.part/user", unresolved_temp_ids.len()); let temp_id_allocations: TempIdMap = unresolved_temp_ids.into_iter().zip(entids).collect(); let final_populations = generation.into_final_populations(&temp_id_allocations)?; + { + /// Assertions that are :db.cardinality/one and not :db.fulltext. + let mut non_fts_one: Vec = vec![]; + + /// Assertions that are :db.cardinality/many and not :db.fulltext. + let mut non_fts_many: Vec = vec![]; + let final_terms: Vec = [final_populations.resolved, final_populations.allocated, inert_terms.into_iter().map(|term| term.unwrap()).collect()].concat(); @@ -279,36 +292,46 @@ impl<'conn> Tx<'conn> { for term in final_terms { match term { Term::AddOrRetract(op, e, a, v) => { - let attribute: &Attribute = self.db.schema.require_attribute_for_entid(a)?; + let attribute: &Attribute = self.schema.require_attribute_for_entid(a)?; if attribute.fulltext { bail!(ErrorKind::NotYetImplemented(format!("Transacting :db/fulltext entities is not yet implemented"))) // TODO: reference original input. Difficult! } let added = op == OpType::Add; if attribute.multival { - non_fts_many.push((e, a, v, added)); + non_fts_many.push((e, a, attribute, v, added)); } else { - non_fts_one.push((e, a, v, added)); + non_fts_one.push((e, a, attribute, v, added)); } }, } } + // Transact [:db/add :db/txInstant NOW :db/tx]. + // TODO: allow this to be present in the transaction data. + non_fts_one.push((self.tx_id, + entids::DB_TX_INSTANT, + // TODO: extract this to a constant. + self.schema.require_attribute_for_entid(self.schema.require_entid(&to_namespaced_keyword(":db/txInstant").unwrap())?)?, + TypedValue::Long(self.tx_instant), + true)); + if !non_fts_one.is_empty() { - self.db.insert_non_fts_searches(self.conn, &non_fts_one[..], self.tx_id, SearchType::Inexact)?; + self.store.insert_non_fts_searches(&non_fts_one[..], db::SearchType::Inexact)?; } if !non_fts_many.is_empty() { - self.db.insert_non_fts_searches(self.conn, &non_fts_many[..], self.tx_id, SearchType::Exact)?; + self.store.insert_non_fts_searches(&non_fts_many[..], db::SearchType::Exact)?; } - self.db.search(self.conn)?; + self.store.commit_transaction(self.tx_id)?; + } - self.db.insert_transaction(self.conn, self.tx_id)?; - self.db.update_datoms(self.conn, self.tx_id)?; + // let mut next_schema = self.schema.to_mut(); + // next_schema.ident_map.insert(NamespacedKeyword::new("db", "new"), 1000); // TODO: update idents and schema materialized views. - self.db.update_partition_map(self.conn)?; + db::update_partition_map(self.store, &self.partition_map)?; Ok(TxReport { tx_id: self.tx_id, @@ -316,3 +339,33 @@ impl<'conn> Tx<'conn> { }) } } + +use std::borrow::Cow; + +/// Transact the given `entities` against the given SQLite `conn`, using the metadata in +/// `self.DB`. +/// +/// This approach is explained in https://github.com/mozilla/mentat/wiki/Transacting. +// TODO: move this to the transactor layer. +pub fn transact<'conn, 'a, I>(conn: &'conn rusqlite::Connection, partition_map: &'a PartitionMap, schema: &'a Schema, entities: I) -> Result<(TxReport, PartitionMap, Option)> where I: IntoIterator { + // Eventually, this function will be responsible for managing a SQLite transaction. For + // now, it's just about the tx details. + + let tx_instant = ::now(); // Label the transaction with the timestamp when we first see it: leading edge. + + let mut next_partition_map: PartitionMap = partition_map.clone(); + let tx_id = next_partition_map.allocate_entid(":db.part/tx"); + + conn.begin_transaction()?; + + let mut tx = Tx::new(conn, next_partition_map, schema, tx_id, tx_instant); + + let report = tx.transact_entities(entities)?; + + // If the schema has moved on, return it. + let next_schema = match tx.schema { + Cow::Borrowed(_) => None, + Cow::Owned(next_schema) => Some(next_schema), + }; + Ok((report, tx.partition_map, next_schema)) +} diff --git a/edn/src/lib.rs b/edn/src/lib.rs index 04d02b5c..6ff2ff68 100644 --- a/edn/src/lib.rs +++ b/edn/src/lib.rs @@ -23,7 +23,8 @@ pub mod parse { include!(concat!(env!("OUT_DIR"), "/edn.rs")); } -pub use ordered_float::OrderedFloat; pub use num::BigInt; +pub use ordered_float::OrderedFloat; +pub use parse::ParseError; pub use types::Value; pub use symbols::{Keyword, NamespacedKeyword, PlainSymbol, NamespacedSymbol}; diff --git a/parser-utils/Cargo.toml b/parser-utils/Cargo.toml index edff070a..c020fdbd 100644 --- a/parser-utils/Cargo.toml +++ b/parser-utils/Cargo.toml @@ -5,7 +5,7 @@ authors = ["Victor Porof ", "Richard Newman >>`; see above. + pub errors: Vec>, +} + +impl std::fmt::Debug for ValueParseError { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, + "ParseError {{ position: {:?}, errors: {:?} }}", + self.position, + self.errors) + } +} + +impl std::fmt::Display for ValueParseError { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + try!(writeln!(f, "Parse error at {}", self.position)); + combine::primitives::Error::fmt_errors(&self.errors, f) + } +} + +impl std::error::Error for ValueParseError { + fn description(&self) -> &str { + "parse error parsing EDN values" + } +} + +impl<'a> From> for ValueParseError { + fn from(e: combine::primitives::ParseError<&'a [edn::Value]>) -> ValueParseError { + ValueParseError { + position: e.position, + errors: e.errors.into_iter().map(|e| e.map_range(|r| { + let mut v = Vec::new(); + v.extend_from_slice(r); + edn::Value::Vector(v) + })).collect(), + } + } +} + +/// Allow to map the range types of combine::primitives::{Info, Error}. +trait MapRange { + type Output; + fn map_range(self, f: F) -> Self::Output where F: FnOnce(R) -> S; +} + +impl MapRange for combine::primitives::Info { + type Output = combine::primitives::Info; + + fn map_range(self, f: F) -> combine::primitives::Info where F: FnOnce(R) -> S { + use combine::primitives::Info::*; + match self { + Token(t) => Token(t), + Range(r) => Range(f(r)), + Owned(s) => Owned(s), + Borrowed(x) => Borrowed(x), + } + } +} + +impl MapRange for combine::primitives::Error { + type Output = combine::primitives::Error; + + fn map_range(self, f: F) -> combine::primitives::Error where F: FnOnce(R) -> S { + use combine::primitives::Error::*; + match self { + Unexpected(x) => Unexpected(x.map_range(f)), + Expected(x) => Expected(x.map_range(f)), + Message(x) => Message(x.map_range(f)), + Other(x) => Other(x), + } + } +} diff --git a/query-parser/Cargo.toml b/query-parser/Cargo.toml index a3a5debf..7668db30 100644 --- a/query-parser/Cargo.toml +++ b/query-parser/Cargo.toml @@ -4,7 +4,8 @@ version = "0.0.1" workspace = ".." [dependencies] -combine = "2.1.1" +combine = "2.2.2" +error-chain = "0.9.0" matches = "0.1" [dependencies.edn] diff --git a/query-parser/src/errors.rs b/query-parser/src/errors.rs new file mode 100644 index 00000000..0ef8b53a --- /dev/null +++ b/query-parser/src/errors.rs @@ -0,0 +1,35 @@ +// Copyright 2016 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +#![allow(dead_code)] + +use edn; +use mentat_parser_utils::ValueParseError; + +error_chain! { + types { + Error, ErrorKind, ResultExt, Result; + } + + foreign_links { + EdnParseError(edn::ParseError); + } + + links { + } + + errors { + NotAVariableError(value: edn::Value) {} + InvalidInput(value: edn::Value) {} + FindParseError(value_parse_error: ValueParseError) {} + WhereParseError(value_parse_error: ValueParseError) {} + MissingField(field: edn::Keyword) {} + } +} diff --git a/query-parser/src/find.rs b/query-parser/src/find.rs index 0a626a5d..2946919d 100644 --- a/query-parser/src/find.rs +++ b/query-parser/src/find.rs @@ -34,6 +34,7 @@ /// ! parts of the map. extern crate edn; +extern crate mentat_parser_utils; extern crate mentat_query; use std::collections::BTreeMap; @@ -44,10 +45,15 @@ use self::mentat_query::{ SrcVar, Variable, }; +use self::mentat_parser_utils::ValueParseError; + +use super::errors::{ + Error, + ErrorKind, + Result, +}; use super::parse::{ - NotAVariableError, - QueryParseError, QueryParseResult, clause_seq_to_patterns, }; @@ -57,14 +63,14 @@ use super::util::vec_to_keyword_map; /// If the provided slice of EDN values are all variables as /// defined by `value_to_variable`, return a `Vec` of `Variable`s. /// Otherwise, return the unrecognized Value in a `NotAVariableError`. -fn values_to_variables(vals: &[edn::Value]) -> Result, NotAVariableError> { +fn values_to_variables(vals: &[edn::Value]) -> Result> { let mut out: Vec = Vec::with_capacity(vals.len()); for v in vals { if let Some(var) = Variable::from_value(v) { out.push(var); continue; } - return Err(NotAVariableError(v.clone())); + bail!(ErrorKind::NotAVariableError(v.clone())); } return Ok(out); } @@ -109,7 +115,6 @@ fn parse_find_parts(find: &[edn::Value], where_clauses: where_clauses, } }) - .map_err(QueryParseError::FindParseError) } fn parse_find_map(map: BTreeMap>) -> QueryParseResult { @@ -127,10 +132,10 @@ fn parse_find_map(map: BTreeMap>) -> QueryParseRes map.get(&kw_with).map(|x| x.as_slice()), wheres); } else { - return Err(QueryParseError::MissingField(kw_where)); + bail!(ErrorKind::MissingField(kw_where)); } } else { - return Err(QueryParseError::MissingField(kw_find)); + bail!(ErrorKind::MissingField(kw_find)); } } @@ -148,22 +153,16 @@ fn parse_find_edn_map(map: BTreeMap) -> QueryParseResult m.insert(kw, vec); continue; } else { - return Err(QueryParseError::InvalidInput(v)); + bail!(ErrorKind::InvalidInput(v)); } } else { - return Err(QueryParseError::InvalidInput(k)); + bail!(ErrorKind::InvalidInput(k)); } } parse_find_map(m) } -impl From for QueryParseError { - fn from(err: edn::parse::ParseError) -> QueryParseError { - QueryParseError::EdnParseError(err) - } -} - pub fn parse_find_string(string: &str) -> QueryParseResult { let expr = edn::parse::value(string)?; parse_find(expr.without_spans()) @@ -179,7 +178,7 @@ pub fn parse_find(expr: edn::Value) -> QueryParseResult { return parse_find_map(m); } } - return Err(QueryParseError::InvalidInput(expr)); + bail!(ErrorKind::InvalidInput(expr)); } #[cfg(test)] diff --git a/query-parser/src/lib.rs b/query-parser/src/lib.rs index 5805592d..a44f6973 100644 --- a/query-parser/src/lib.rs +++ b/query-parser/src/lib.rs @@ -10,16 +10,27 @@ #![allow(unused_imports)] +#[macro_use] +extern crate error_chain; #[macro_use] extern crate matches; +extern crate edn; #[macro_use] extern crate mentat_parser_utils; mod util; mod parse; +pub mod errors; pub mod find; +pub use errors::{ + Error, + ErrorKind, + ResultExt, + Result, +}; + pub use find::{ parse_find, parse_find_string, @@ -27,8 +38,4 @@ pub use find::{ pub use parse::{ QueryParseResult, - QueryParseError, - FindParseError, - WhereParseError, - NotAVariableError, }; diff --git a/query-parser/src/parse.rs b/query-parser/src/parse.rs index 7312b306..c0258198 100644 --- a/query-parser/src/parse.rs +++ b/query-parser/src/parse.rs @@ -13,7 +13,7 @@ extern crate edn; extern crate mentat_parser_utils; extern crate mentat_query; -use self::mentat_parser_utils::ResultParser; +use self::mentat_parser_utils::{ResultParser, ValueParseError}; use self::combine::{eof, many1, optional, parser, satisfy_map, Parser, ParseResult, Stream}; use self::combine::combinator::{choice, try}; use self::mentat_query::{ @@ -29,47 +29,10 @@ use self::mentat_query::{ WhereClause, }; -#[derive(Clone, Debug, Eq, PartialEq)] -pub struct NotAVariableError(pub edn::Value); - -#[allow(dead_code)] -#[derive(Clone, Debug, Eq, PartialEq)] -pub enum FindParseError { - Err, -} - -#[allow(dead_code)] -#[derive(Clone, Debug, Eq, PartialEq)] -pub enum WhereParseError { - Err, -} - -#[derive(Clone,Debug,Eq,PartialEq)] -pub enum QueryParseError { - InvalidInput(edn::Value), - EdnParseError(edn::parse::ParseError), - MissingField(edn::Keyword), - FindParseError(FindParseError), - WhereParseError(WhereParseError), - WithParseError(NotAVariableError), -} - -impl From for QueryParseError { - fn from(err: WhereParseError) -> QueryParseError { - QueryParseError::WhereParseError(err) - } -} - -impl From for QueryParseError { - fn from(err: NotAVariableError) -> QueryParseError { - QueryParseError::WithParseError(err) - } -} - -pub type WhereParseResult = Result, WhereParseError>; -pub type FindParseResult = Result; -pub type QueryParseResult = Result; - +use errors::{Error, ErrorKind, ResultExt, Result}; +pub type WhereParseResult = Result>; +pub type FindParseResult = Result; +pub type QueryParseResult = Result; pub struct Query(::std::marker::PhantomData I>); @@ -222,7 +185,8 @@ pub fn find_seq_to_find_spec(find: &[edn::Value]) -> FindParseResult { Find::find() .parse(find) .map(|x| x.0) - .map_err(|_| FindParseError::Err) + .map_err::(|e| e.translate_position(find).into()) + .map_err(|e| Error::from_kind(ErrorKind::FindParseError(e))) } #[allow(dead_code)] @@ -230,7 +194,8 @@ pub fn clause_seq_to_patterns(clauses: &[edn::Value]) -> WhereParseResult { Where::clauses() .parse(clauses) .map(|x| x.0) - .map_err(|_| WhereParseError::Err) + .map_err::(|e| e.translate_position(clauses).into()) + .map_err(|e| Error::from_kind(ErrorKind::WhereParseError(e))) } #[cfg(test)] @@ -401,15 +366,15 @@ mod test { edn::Value::PlainSymbol(ellipsis.clone())])]; let rel = [edn::Value::PlainSymbol(vx.clone()), edn::Value::PlainSymbol(vy.clone())]; - assert_eq!(Ok(FindSpec::FindScalar(Element::Variable(Variable(vx.clone())))), - find_seq_to_find_spec(&scalar)); - assert_eq!(Ok(FindSpec::FindTuple(vec![Element::Variable(Variable(vx.clone())), - Element::Variable(Variable(vy.clone()))])), - find_seq_to_find_spec(&tuple)); - assert_eq!(Ok(FindSpec::FindColl(Element::Variable(Variable(vx.clone())))), - find_seq_to_find_spec(&coll)); - assert_eq!(Ok(FindSpec::FindRel(vec![Element::Variable(Variable(vx.clone())), - Element::Variable(Variable(vy.clone()))])), - find_seq_to_find_spec(&rel)); + assert_eq!(FindSpec::FindScalar(Element::Variable(Variable(vx.clone()))), + find_seq_to_find_spec(&scalar).unwrap()); + assert_eq!(FindSpec::FindTuple(vec![Element::Variable(Variable(vx.clone())), + Element::Variable(Variable(vy.clone()))]), + find_seq_to_find_spec(&tuple).unwrap()); + assert_eq!(FindSpec::FindColl(Element::Variable(Variable(vx.clone()))), + find_seq_to_find_spec(&coll).unwrap()); + assert_eq!(FindSpec::FindRel(vec![Element::Variable(Variable(vx.clone())), + Element::Variable(Variable(vy.clone()))]), + find_seq_to_find_spec(&rel).unwrap()); } } diff --git a/src/errors.rs b/src/errors.rs new file mode 100644 index 00000000..8594172b --- /dev/null +++ b/src/errors.rs @@ -0,0 +1,35 @@ +// Copyright 2016 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +#![allow(dead_code)] + +use rusqlite; + +use edn; +use mentat_db; +use mentat_query_parser; +use mentat_tx_parser; + +error_chain! { + types { + Error, ErrorKind, ResultExt, Result; + } + + foreign_links { + EdnParseError(edn::ParseError); + Rusqlite(rusqlite::Error); + } + + links { + DbError(mentat_db::Error, mentat_db::ErrorKind); + QueryParseError(mentat_query_parser::Error, mentat_query_parser::ErrorKind); + TxParseError(mentat_tx_parser::Error, mentat_tx_parser::ErrorKind); + } +} diff --git a/src/lib.rs b/src/lib.rs index 681aabaa..d5c362cd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,6 +8,8 @@ // CONDITIONS OF ANY KIND, either express or implied. See the License for the // specific language governing permissions and limitations under the License. +#[macro_use] +extern crate error_chain; #[macro_use] extern crate slog; #[macro_use] @@ -21,9 +23,11 @@ extern crate mentat_db; extern crate mentat_query; extern crate mentat_query_parser; extern crate mentat_query_algebrizer; +extern crate mentat_tx_parser; use rusqlite::Connection; +pub mod errors; pub mod ident; pub mod query; diff --git a/src/query.rs b/src/query.rs index bccc3134..d8bd4064 100644 --- a/src/query.rs +++ b/src/query.rs @@ -18,7 +18,10 @@ use mentat_db::DB; use mentat_query_parser::{ parse_find_string, - QueryParseError, +}; + +use errors::{ + Result, }; // TODO @@ -31,19 +34,6 @@ pub enum QueryResults { Rel(Vec>), } -pub enum QueryExecutionError { - ParseError(QueryParseError), - InvalidArgumentName(String), -} - -impl From for QueryExecutionError { - fn from(err: QueryParseError) -> QueryExecutionError { - QueryExecutionError::ParseError(err) - } -} - -pub type QueryExecutionResult = Result; - /// Take an EDN query string, a reference to a open SQLite connection, a Mentat DB, and an optional /// collection of input bindings (which should be keyed by `"?varname"`), and execute the query /// immediately, blocking the current thread. @@ -53,7 +43,7 @@ pub type QueryExecutionResult = Result; pub fn q_once(sqlite: SQLiteConnection, db: DB, query: &str, - inputs: Option>) -> QueryExecutionResult { + inputs: Option>) -> Result { // TODO: validate inputs. let parsed = parse_find_string(query)?; Ok(QueryResults::Scalar(Some(TypedValue::Boolean(true)))) diff --git a/tx-parser/Cargo.toml b/tx-parser/Cargo.toml index b4311551..cb0c051d 100644 --- a/tx-parser/Cargo.toml +++ b/tx-parser/Cargo.toml @@ -4,13 +4,14 @@ version = "0.0.1" workspace = ".." [dependencies] -combine = "2.1.1" +combine = "2.2.2" +error-chain = "0.9.0" [dependencies.edn] - path = "../edn" +path = "../edn" [dependencies.mentat_tx] - path = "../tx" +path = "../tx" [dependencies.mentat_parser_utils] - path = "../parser-utils" +path = "../parser-utils" diff --git a/tx-parser/src/errors.rs b/tx-parser/src/errors.rs new file mode 100644 index 00000000..72bfb4a2 --- /dev/null +++ b/tx-parser/src/errors.rs @@ -0,0 +1,26 @@ +// Copyright 2016 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +#![allow(dead_code)] + +use mentat_parser_utils::ValueParseError; + +error_chain! { + types { + Error, ErrorKind, ResultExt, Result; + } + + errors { + ParseError(value_parse_error: ValueParseError) { + description("error parsing edn values") + display("error parsing edn values:\n{}", value_parse_error) + } + } +} diff --git a/tx-parser/src/lib.rs b/tx-parser/src/lib.rs index 92e29543..c6c9c411 100644 --- a/tx-parser/src/lib.rs +++ b/tx-parser/src/lib.rs @@ -10,8 +10,11 @@ #![allow(dead_code)] -extern crate edn; extern crate combine; +#[macro_use] +extern crate error_chain; + +extern crate edn; extern crate mentat_tx; #[macro_use] @@ -22,7 +25,10 @@ use combine::combinator::{Expected, FnParser}; use edn::symbols::NamespacedKeyword; use edn::types::Value; use mentat_tx::entities::{Entid, EntidOrLookupRefOrTempId, Entity, LookupRef, OpType}; -use mentat_parser_utils::ResultParser; +use mentat_parser_utils::{ResultParser, ValueParseError}; + +pub mod errors; +pub use errors::*; pub struct Tx(::std::marker::PhantomData I>); @@ -158,14 +164,14 @@ def_parser_fn!(Tx, entities, Value, Vec, input, { .parse_stream(input) }); -impl Tx - where I: Stream -{ - pub fn parse(input: I) -> Result, combine::ParseError> { - (Tx::::entities(), eof()) +impl<'a> Tx<&'a [edn::Value]> { + pub fn parse(input: &'a [edn::Value]) -> std::result::Result, errors::Error> { + (Tx::<_>::entities(), eof()) .map(|(es, _)| es) .parse(input) .map(|x| x.0) + .map_err::(|e| e.translate_position(input).into()) + .map_err(|e| Error::from_kind(ErrorKind::ParseError(e))) } } diff --git a/tx-parser/tests/parser.rs b/tx-parser/tests/parser.rs index 255243fc..443709d1 100644 --- a/tx-parser/tests/parser.rs +++ b/tx-parser/tests/parser.rs @@ -30,8 +30,8 @@ fn test_entities() { let input = [edn]; let result = Tx::parse(&input[..]); - assert_eq!(result, - Ok(vec![ + assert_eq!(result.unwrap(), + vec![ Entity::AddOrRetract { op: OpType::Add, e: EntidOrLookupRefOrTempId::Entid(Entid::Entid(101)), @@ -50,7 +50,7 @@ fn test_entities() { a: Entid::Ident(NamespacedKeyword::new("test", "b")), v: Value::Text("w".into()), }, - ])); + ]); } // TODO: test error handling in select cases. diff --git a/tx/fixtures/test_upsert_vector.edn b/tx/fixtures/test_upsert_vector.edn index 41a55c73..1b4ee4db 100644 --- a/tx/fixtures/test_upsert_vector.edn +++ b/tx/fixtures/test_upsert_vector.edn @@ -74,7 +74,7 @@ ;; This ref doesn't exist, so the assertion will be ignored. [:db/retract "t1" :db.schema/attribute 103]] :test/expected-transaction - #{[?tx6 :db/txInstant ?ms6 ?tx6 true]} + #{[?tx5 :db/txInstant ?ms5 ?tx5 true]} :test/expected-error-message "" :test/expected-tempids @@ -94,9 +94,9 @@ [[:db/add "t1" :db/ident :name/Josef] [:db/add "t2" :db.schema/attribute "t1"]] :test/expected-transaction - #{[65538 :db/ident :name/Josef ?tx8 true] - [65539 :db.schema/attribute 65538 ?tx8 true] - [?tx8 :db/txInstant ?ms8 ?tx8 true]} + #{[65538 :db/ident :name/Josef ?tx6 true] + [65539 :db.schema/attribute 65538 ?tx6 true] + [?tx6 :db/txInstant ?ms6 ?tx6 true]} :test/expected-error-message "" :test/expected-tempids