diff --git a/db/Cargo.toml b/db/Cargo.toml index 438f558a..76b2d5c1 100644 --- a/db/Cargo.toml +++ b/db/Cargo.toml @@ -12,6 +12,7 @@ log = "0.4" num = "0.1" ordered-float = "0.5" time = "0.1" +petgraph = "0.4.12" [dependencies.rusqlite] version = "0.13" diff --git a/db/src/db.rs b/db/src/db.rs index 5c6c33aa..8c20301b 100644 --- a/db/src/db.rs +++ b/db/src/db.rs @@ -513,6 +513,12 @@ fn search(conn: &rusqlite::Connection) -> Result<()> { /// /// See https://github.com/mozilla/mentat/wiki/Transacting:-entity-to-SQL-translation. fn insert_transaction(conn: &rusqlite::Connection, tx: Entid) -> Result<()> { + // Mentat follows Datomic and treats its input as a set. That means it is okay to transact the + // same [e a v] twice in one transaction. However, we don't want to represent the transacted + // datom twice. Therefore, the transactor unifies repeated datoms, and in addition we add + // indices to the search inputs and search results to ensure that we don't see repeated datoms + // at this point. + let s = r#" INSERT INTO transactions (e, a, v, tx, added, value_type_tag) SELECT e0, a0, v0, ?, 1, value_type_tag0 @@ -561,8 +567,12 @@ fn update_datoms(conn: &rusqlite::Connection, tx: Entid) -> Result<()> { .map(|_c| ()) .chain_err(|| "Could not update datoms: failed to retract datoms already present")?; - // Insert datoms that were added and not already present. We also must - // expand our bitfield into flags. + // Insert datoms that were added and not already present. We also must expand our bitfield into + // flags. Since Mentat follows Datomic and treats its input as a set, it is okay to transact + // the same [e a v] twice in one transaction, but we don't want to represent the transacted + // datom twice in datoms. The transactor unifies repeated datoms, and in addition we add + // indices to the search inputs and search results to ensure that we don't see repeated datoms + // at this point. let s = format!(r#" INSERT INTO datoms (e, a, v, tx, value_type_tag, index_avet, index_vaet, index_fulltext, unique_value) SELECT e0, a0, v0, ?, value_type_tag0, @@ -679,8 +689,10 @@ impl MentatStoring for rusqlite::Connection { added0 TINYINT NOT NULL, flags0 TINYINT NOT NULL)"#, - // We create this unique index so that it's impossible to violate a cardinality constraint - // within a transaction. + // It is fine to transact the same [e a v] twice in one transaction, but the transaction + // processor should unify such repeated datoms. This index will cause insertion to fail + // if the transaction processor incorrectly tries to assert the same (cardinality one) + // datom twice. (Sadly, the failure is opaque.) r#"CREATE UNIQUE INDEX IF NOT EXISTS temp.inexact_searches_unique ON inexact_searches (e0, a0) WHERE added0 = 1"#, r#"DROP TABLE IF EXISTS temp.search_results"#, // TODO: don't encode search_type as a STRING. This is explicit and much easier to read @@ -695,9 +707,10 @@ impl MentatStoring for rusqlite::Connection { search_type STRING NOT NULL, rid INTEGER, v BLOB)"#, - // It is an error to transact the same [e a v] twice in one transaction. This index will - // cause insertion to fail if a transaction tries to do that. (Sadly, the failure is - // opaque.) + // It is fine to transact the same [e a v] twice in one transaction, but the transaction + // processor should identify those datoms. This index will cause insertion to fail if + // the internals of the database searching code incorrectly find the same datom twice. + // (Sadly, the failure is opaque.) // // N.b.: temp goes on index name, not table name. See http://stackoverflow.com/a/22308016. r#"CREATE UNIQUE INDEX IF NOT EXISTS temp.search_results_unique ON search_results (e0, a0, v0, value_type_tag0)"#, @@ -1066,21 +1079,39 @@ impl PartitionMapping for PartitionMap { #[cfg(test)] mod tests { + extern crate env_logger; + use super::*; use bootstrap; use debug; + use errors; use edn; use mentat_core::{ HasSchema, Keyword, - Schema, + KnownEntid, attribute, }; + use mentat_core::intern_set::{ + InternSet, + }; + use mentat_core::util::Either::*; + use mentat_tx::entities::{ + OpType, + TempId, + }; use rusqlite; use std::collections::{ BTreeMap, }; + use internal_types::{ + Term, + TermWithTempIds, + }; use types::TxReport; + use tx::{ + transact_terms, + }; // Macro to parse a `Borrow` to an `edn::Value` and assert the given `edn::Value` `matches` // against it. @@ -1104,10 +1135,12 @@ mod tests { // This unwraps safely and makes asserting errors pleasant. macro_rules! assert_transact { ( $conn: expr, $input: expr, $expected: expr ) => {{ + trace!("assert_transact: {}", $input); let result = $conn.transact($input).map_err(|e| e.to_string()); assert_eq!(result, $expected.map_err(|e| e.to_string())); }}; ( $conn: expr, $input: expr ) => {{ + trace!("assert_transact: {}", $input); let result = $conn.transact($input); assert!(result.is_ok(), "Expected Ok(_), got `{}`", result.unwrap_err()); result.unwrap() @@ -1157,6 +1190,29 @@ mod tests { Ok(report) } + fn transact_simple_terms(&mut self, terms: I, tempid_set: InternSet) -> Result where I: IntoIterator { + let details = { + // The block scopes the borrow of self.sqlite. + // We're about to write, so go straight ahead and get an IMMEDIATE transaction. + let tx = self.sqlite.transaction_with_behavior(TransactionBehavior::Immediate)?; + // Applying the transaction can fail, so we don't unwrap. + let details = transact_terms(&tx, self.partition_map.clone(), &self.schema, &self.schema, NullWatcher(), terms, tempid_set)?; + tx.commit()?; + details + }; + + let (report, next_partition_map, next_schema, _watcher) = details; + self.partition_map = next_partition_map; + if let Some(next_schema) = next_schema { + self.schema = next_schema; + } + + // Verify that we've updated the materialized views during transacting. + self.assert_materialized_views(); + + Ok(report) + } + fn last_tx_id(&self) -> Entid { self.partition_map.get(&":db.part/tx".to_string()).unwrap().index - 1 } @@ -1303,7 +1359,7 @@ mod tests { assert_transact!(conn, "[[:db/add (transaction-tx) :db/txInstant #inst \"2017-06-16T00:59:11.257Z\"] [:db/add (transaction-tx) :db/txInstant #inst \"2017-06-16T00:59:11.752Z\"] [:db/add 102 :db/ident :name/Vlad]]", - Err("conflicting datoms in tx")); + Err("schema constraint violation: cardinality conflicts:\n CardinalityOneAddConflict { e: 268435458, a: 3, vs: {Instant(2017-06-16T00:59:11.257Z), Instant(2017-06-16T00:59:11.752Z)} }\n")); // Test multiple txInstants with the same value. assert_transact!(conn, "[[:db/add (transaction-tx) :db/txInstant #inst \"2017-06-16T00:59:11.257Z\"] @@ -1529,6 +1585,53 @@ mod tests { // [?tx :db/txInstant ?ms ?tx true]]"); } + #[test] + fn test_resolved_upserts() { + let mut conn = TestConn::default(); + assert_transact!(conn, "[ + {:db/ident :test/id + :db/valueType :db.type/string + :db/unique :db.unique/identity + :db/index true + :db/cardinality :db.cardinality/one} + {:db/ident :test/ref + :db/valueType :db.type/ref + :db/unique :db.unique/identity + :db/index true + :db/cardinality :db.cardinality/one} + ]"); + + // Partial data for :test/id, links via :test/ref. + assert_transact!(conn, r#"[ + [:db/add 100 :test/id "0"] + [:db/add 101 :test/ref 100] + [:db/add 102 :test/ref 101] + [:db/add 103 :test/ref 102] + ]"#); + + // Fill in the rest of the data for :test/id, using the links of :test/ref. + let report = assert_transact!(conn, r#"[ + {:db/id "a" :test/id "0"} + {:db/id "b" :test/id "1" :test/ref "a"} + {:db/id "c" :test/id "2" :test/ref "b"} + {:db/id "d" :test/id "3" :test/ref "c"} + ]"#); + + assert_matches!(tempids(&report), r#"{ + "a" 100 + "b" 101 + "c" 102 + "d" 103 + }"#); + + assert_matches!(conn.last_transaction(), r#"[ + [101 :test/id "1" ?tx true] + [102 :test/id "2" ?tx true] + [103 :test/id "3" ?tx true] + [?tx :db/txInstant ?ms ?tx true] + ]"#); + } + #[test] fn test_sqlite_limit() { let conn = new_connection("").expect("Couldn't open in-memory db"); @@ -2362,14 +2465,16 @@ mod tests { [:db/add "bar" :test/unique "x"] [:db/add "bar" :test/one 124] ]"#, - Err("Could not insert non-fts one statements into temporary search table!")); + // This is implementation specific (due to the allocated entid), but it should be deterministic. + Err("schema constraint violation: cardinality conflicts:\n CardinalityOneAddConflict { e: 65536, a: 111, vs: {Long(123), Long(124)} }\n")); // It also fails for map notation. assert_transact!(conn, r#"[ {:test/unique "x", :test/one 123} {:test/unique "x", :test/one 124} ]"#, - Err("Could not insert non-fts one statements into temporary search table!")); + // This is implementation specific (due to the allocated entid), but it should be deterministic. + Err("schema constraint violation: cardinality conflicts:\n CardinalityOneAddConflict { e: 65536, a: 111, vs: {Long(123), Long(124)} }\n")); } #[test] @@ -2421,6 +2526,162 @@ mod tests { [:db/add "x" :page/ref 333] [:db/add "x" :page/ref 444] ]"#, - Err("Could not insert non-fts one statements into temporary search table!")); + Err("schema constraint violation: cardinality conflicts:\n CardinalityOneAddConflict { e: 65539, a: 65537, vs: {Ref(333), Ref(444)} }\n")); + } + + #[test] + fn test_upsert_issue_532() { + let mut conn = TestConn::default(); + + assert_transact!(conn, r#"[ + {:db/ident :page/id :db/valueType :db.type/string :db/index true :db/unique :db.unique/identity} + {:db/ident :page/ref :db/valueType :db.type/ref :db/index true :db/unique :db.unique/identity} + {:db/ident :page/title :db/valueType :db.type/string :db/cardinality :db.cardinality/many} + ]"#); + + // Observe that "foo" and "zot" upsert to the same entid, and that doesn't cause a + // cardinality conflict, because we treat the input with set semantics and accept + // duplicate datoms. + let report = assert_transact!(conn, r#"[ + [:db/add "bar" :page/id "z"] + [:db/add "foo" :page/ref "bar"] + [:db/add "foo" :page/title "x"] + [:db/add "zot" :page/ref "bar"] + [:db/add "zot" :db/ident :other/ident] + ]"#); + assert_matches!(tempids(&report), + "{\"bar\" ?b + \"foo\" ?f + \"zot\" ?f}"); + assert_matches!(conn.last_transaction(), + "[[?b :page/id \"z\" ?tx true] + [?f :db/ident :other/ident ?tx true] + [?f :page/ref ?b ?tx true] + [?f :page/title \"x\" ?tx true] + [?tx :db/txInstant ?ms ?tx true]]"); + + let report = assert_transact!(conn, r#"[ + [:db/add "foo" :page/id "x"] + [:db/add "foo" :page/title "x"] + [:db/add "bar" :page/id "x"] + [:db/add "bar" :page/title "y"] + ]"#); + assert_matches!(tempids(&report), + "{\"foo\" ?e + \"bar\" ?e}"); + + // One entity, two page titles. + assert_matches!(conn.last_transaction(), + "[[?e :page/id \"x\" ?tx true] + [?e :page/title \"x\" ?tx true] + [?e :page/title \"y\" ?tx true] + [?tx :db/txInstant ?ms ?tx true]]"); + + // Here, "foo", "bar", and "baz", all refer to the same reference, but none of them actually + // upsert to existing entities. + let report = assert_transact!(conn, r#"[ + [:db/add "foo" :page/id "id"] + [:db/add "bar" :db/ident :bar/bar] + {:db/id "baz" :page/id "id" :db/ident :bar/bar} + ]"#); + assert_matches!(tempids(&report), + "{\"foo\" ?e + \"bar\" ?e + \"baz\" ?e}"); + + assert_matches!(conn.last_transaction(), + "[[?e :db/ident :bar/bar ?tx true] + [?e :page/id \"id\" ?tx true] + [?tx :db/txInstant ?ms ?tx true]]"); + + // If we do it again, everything resolves to the same IDs. + let report = assert_transact!(conn, r#"[ + [:db/add "foo" :page/id "id"] + [:db/add "bar" :db/ident :bar/bar] + {:db/id "baz" :page/id "id" :db/ident :bar/bar} + ]"#); + assert_matches!(tempids(&report), + "{\"foo\" ?e + \"bar\" ?e + \"baz\" ?e}"); + + assert_matches!(conn.last_transaction(), + "[[?tx :db/txInstant ?ms ?tx true]]"); + } + + #[test] + fn test_term_typechecking_issue_663() { + // The builder interfaces provide untrusted `Term` instances to the transactor, bypassing + // the typechecking layers invoked in the schema-aware coercion from `edn::Value` into + // `TypedValue`. Typechecking now happens lower in the stack (as well as higher in the + // stack) so we shouldn't be able to insert bad data into the store. + + let mut conn = TestConn::default(); + + let mut terms = vec![]; + + terms.push(Term::AddOrRetract(OpType::Add, Left(KnownEntid(200)), entids::DB_IDENT, Left(TypedValue::typed_string("test")))); + terms.push(Term::AddOrRetract(OpType::Retract, Left(KnownEntid(100)), entids::DB_TX_INSTANT, Left(TypedValue::Long(-1)))); + + let report = conn.transact_simple_terms(terms, InternSet::new()); + + match report.unwrap_err() { + errors::Error(ErrorKind::SchemaConstraintViolation(errors::SchemaConstraintViolation::TypeDisagreements { conflicting_datoms }), _) => { + let mut map = BTreeMap::default(); + map.insert((100, entids::DB_TX_INSTANT, TypedValue::Long(-1)), ValueType::Instant); + map.insert((200, entids::DB_IDENT, TypedValue::typed_string("test")), ValueType::Keyword); + + assert_eq!(conflicting_datoms, map); + }, + x => panic!("expected schema constraint violation, got {:?}", x), + } + } + + #[test] + fn test_cardinality_constraints() { + let mut conn = TestConn::default(); + + assert_transact!(conn, r#"[ + {:db/id 200 :db/ident :test/one :db/valueType :db.type/long :db/cardinality :db.cardinality/one} + {:db/id 201 :db/ident :test/many :db/valueType :db.type/long :db/cardinality :db.cardinality/many} + ]"#); + + // Can add the same datom multiple times for an attribute, regardless of cardinality. + assert_transact!(conn, r#"[ + [:db/add 100 :test/one 1] + [:db/add 100 :test/one 1] + [:db/add 100 :test/many 2] + [:db/add 100 :test/many 2] + ]"#); + + // Can retract the same datom multiple times for an attribute, regardless of cardinality. + assert_transact!(conn, r#"[ + [:db/retract 100 :test/one 1] + [:db/retract 100 :test/one 1] + [:db/retract 100 :test/many 2] + [:db/retract 100 :test/many 2] + ]"#); + + // Can't transact multiple datoms for a cardinality one attribute. + assert_transact!(conn, r#"[ + [:db/add 100 :test/one 3] + [:db/add 100 :test/one 4] + ]"#, + Err("schema constraint violation: cardinality conflicts:\n CardinalityOneAddConflict { e: 100, a: 200, vs: {Long(3), Long(4)} }\n")); + + // Can transact multiple datoms for a cardinality many attribute. + assert_transact!(conn, r#"[ + [:db/add 100 :test/many 5] + [:db/add 100 :test/many 6] + ]"#); + + // Can't add and retract the same datom for an attribute, regardless of cardinality. + assert_transact!(conn, r#"[ + [:db/add 100 :test/one 7] + [:db/retract 100 :test/one 7] + [:db/add 100 :test/many 8] + [:db/retract 100 :test/many 8] + ]"#, + Err("schema constraint violation: cardinality conflicts:\n AddRetractConflict { e: 100, a: 200, vs: {Long(7)} }\n AddRetractConflict { e: 100, a: 201, vs: {Long(8)} }\n")); } } diff --git a/db/src/errors.rs b/db/src/errors.rs index 4d7b20bf..e2b31166 100644 --- a/db/src/errors.rs +++ b/db/src/errors.rs @@ -26,9 +26,27 @@ use mentat_core::{ }; use types::{ Entid, + TypedValue, ValueType, }; +#[derive(Clone, Debug, Eq, PartialEq)] +pub enum CardinalityConflict { + /// A cardinality one attribute has multiple assertions `[e a v1], [e a v2], ...`. + CardinalityOneAddConflict { + e: Entid, + a: Entid, + vs: BTreeSet, + }, + + /// A datom has been both asserted and retracted, like `[:db/add e a v]` and `[:db/retract e a v]`. + AddRetractConflict { + e: Entid, + a: Entid, + vs: BTreeSet, + }, +} + #[derive(Clone, Debug, Eq, PartialEq)] pub enum SchemaConstraintViolation { /// A transaction tried to assert datoms where one tempid upserts to two (or more) distinct @@ -42,6 +60,17 @@ pub enum SchemaConstraintViolation { /// rewriting passes the input undergoes. conflicting_upserts: BTreeMap>, }, + + /// A transaction tried to assert a datom or datoms with the wrong value `v` type(s). + TypeDisagreements { + /// The key (`[e a v]`) has an invalid value `v`: it is not of the expected value type. + conflicting_datoms: BTreeMap<(Entid, Entid, TypedValue), ValueType> + }, + + /// A transaction tried to assert datoms that don't observe the schema's cardinality constraints. + CardinalityConflicts { + conflicts: Vec, + }, } impl ::std::fmt::Display for SchemaConstraintViolation { @@ -49,9 +78,23 @@ impl ::std::fmt::Display for SchemaConstraintViolation { use self::SchemaConstraintViolation::*; match self { &ConflictingUpserts { ref conflicting_upserts } => { - write!(f, "conflicting upserts:\n")?; + writeln!(f, "conflicting upserts:")?; for (tempid, entids) in conflicting_upserts { - write!(f, " tempid {:?} upserts to {:?}\n", tempid, entids)?; + writeln!(f, " tempid {:?} upserts to {:?}", tempid, entids)?; + } + Ok(()) + }, + &TypeDisagreements { ref conflicting_datoms } => { + writeln!(f, "type disagreements:")?; + for (ref datom, expected_type) in conflicting_datoms { + writeln!(f, " expected value of type {} but got datom [{} {} {:?}]", expected_type, datom.0, datom.1, datom.2)?; + } + Ok(()) + }, + &CardinalityConflicts { ref conflicts } => { + writeln!(f, "cardinality conflicts:")?; + for ref conflict in conflicts { + writeln!(f, " {:?}", conflict)?; } Ok(()) }, @@ -146,11 +189,6 @@ error_chain! { display("unrecognized or no ident found for entid: {}", entid) } - ConflictingDatoms { - description("conflicting datoms in tx") - display("conflicting datoms in tx") - } - UnknownAttribute(attr: Entid) { description("unknown attribute") display("unknown attribute for entid: {}", attr) diff --git a/db/src/internal_types.rs b/db/src/internal_types.rs index a65c06a4..a402810f 100644 --- a/db/src/internal_types.rs +++ b/db/src/internal_types.rs @@ -12,7 +12,11 @@ //! Types used only within the transactor. These should not be exposed outside of this crate. -use std::collections::HashMap; +use std::collections::{ + BTreeMap, + BTreeSet, + HashMap, +}; use std::rc::Rc; use mentat_core::KnownEntid; @@ -34,6 +38,7 @@ use schema::{ SchemaTypeChecking, }; use types::{ + Attribute, AVMap, AVPair, Entid, @@ -184,3 +189,13 @@ pub fn replace_lookup_ref(lookup_map: &AVMap, desired_or: Either, + pub(crate) retract: BTreeSet, +} + +// A trie-like structure mapping a -> e -> v that prefix compresses and makes uniqueness constraint +// checking more efficient. BTree* for deterministic errors. +pub(crate) type AEVTrie<'schema> = BTreeMap<(Entid, &'schema Attribute), BTreeMap>; diff --git a/db/src/lib.rs b/db/src/lib.rs index d5cf0476..5e6af503 100644 --- a/db/src/lib.rs +++ b/db/src/lib.rs @@ -18,6 +18,7 @@ extern crate itertools; #[macro_use] extern crate log; extern crate num; +extern crate petgraph; extern crate rusqlite; extern crate tabwriter; extern crate time; @@ -46,6 +47,7 @@ mod schema; pub mod tx_observer; mod watcher; mod tx; +mod tx_checking; pub mod types; mod upsert_resolution; diff --git a/db/src/tx.rs b/db/src/tx.rs index fdb51184..147b78c1 100644 --- a/db/src/tx.rs +++ b/db/src/tx.rs @@ -45,7 +45,9 @@ //! names -- `TermWithTempIdsAndLookupRefs`, anyone? -- and strongly typed stage functions will help //! keep everything straight. -use std::borrow::Cow; +use std::borrow::{ + Cow, +}; use std::collections::{ BTreeMap, BTreeSet, @@ -73,6 +75,8 @@ use errors::{ Result, }; use internal_types::{ + AddAndRetract, + AEVTrie, KnownEntidOr, LookupRef, LookupRefOrTempId, @@ -112,6 +116,7 @@ use schema::{ SchemaBuilding, SchemaTypeChecking, }; +use tx_checking; use types::{ Attribute, AVPair, @@ -122,13 +127,14 @@ use types::{ TxReport, ValueType, }; - +use upsert_resolution::{ + FinalPopulations, + Generation, +}; use watcher::{ TransactWatcher, }; -use upsert_resolution::Generation; - /// A transaction on its way to being applied. #[derive(Debug)] pub struct Tx<'conn, 'a, W> where W: TransactWatcher { @@ -156,9 +162,6 @@ pub struct Tx<'conn, 'a, W> where W: TransactWatcher { /// The transaction ID of the transaction. tx_id: Entid, - - /// The timestamp when the transaction began to be committed. - tx_instant: Option>, } /// Remove any :db/id value from the given map notation, converting the returned value into @@ -199,7 +202,6 @@ impl<'conn, 'a, W> Tx<'conn, 'a, W> where W: TransactWatcher { schema: schema, watcher: watcher, tx_id: tx_id, - tx_instant: None, } } @@ -656,19 +658,22 @@ impl<'conn, 'a, W> Tx<'conn, 'a, W> where W: TransactWatcher { debug!("tempids {:?}", tempids); } + generation.allocate_unresolved_upserts()?; + debug!("final generation {:?}", generation); - // Allocate entids for tempids that didn't upsert. BTreeSet rather than HashSet so this is deterministic. - let unresolved_temp_ids: BTreeSet = generation.temp_ids_in_allocations(); + // Allocate entids for tempids that didn't upsert. BTreeMap so this is deterministic. + let unresolved_temp_ids: BTreeMap = generation.temp_ids_in_allocations(&self.schema)?; debug!("unresolved tempids {:?}", unresolved_temp_ids); // TODO: track partitions for temporary IDs. let entids = self.partition_map.allocate_entids(":db.part/user", unresolved_temp_ids.len()); - let temp_id_allocations: TempIdMap = unresolved_temp_ids.into_iter() - .zip(entids.map(|e| KnownEntid(e))) - .collect(); + let temp_id_allocations = unresolved_temp_ids + .into_iter() + .map(|(tempid, index)| (tempid, KnownEntid(entids.start + (index as i64)))) + .collect(); debug!("tempid allocations {:?}", temp_id_allocations); @@ -698,10 +703,8 @@ impl<'conn, 'a, W> Tx<'conn, 'a, W> where W: TransactWatcher { // store. let mut tx_might_update_metadata = false; - let final_terms: Vec = [final_populations.resolved, - final_populations.allocated, - inert_terms.into_iter().map(|term| term.unwrap()).collect()].concat(); - + // Mutable so that we can add the transaction :db/txInstant. + let mut aev_trie = into_aev_trie(&self.schema, final_populations, inert_terms)?; let tx_instant; { // TODO: Don't use this block to scope borrowing the schema; instead, extract a helper function. @@ -721,62 +724,44 @@ impl<'conn, 'a, W> Tx<'conn, 'a, W> where W: TransactWatcher { // We need to ensure that callers can't blindly transact entities that haven't been // allocated by this store. - // Pipeline stage 4: final terms (after rewriting) -> DB insertions. - // Collect into non_fts_*. - // TODO: use something like Clojure's group_by to do this. - for term in final_terms { - match term { - Term::AddOrRetract(op, KnownEntid(e), a, v) => { - let attribute: &Attribute = self.schema.require_attribute_for_entid(a)?; - if entids::might_update_metadata(a) { - tx_might_update_metadata = true; - } - - let added = op == OpType::Add; - - // We take the last encountered :db/txInstant value. - // If more than one is provided, the transactor will fail. - if added && - e == self.tx_id && - a == entids::DB_TX_INSTANT { - if let TypedValue::Instant(instant) = v { - if let Some(ts) = self.tx_instant { - if ts == instant { - // Dupes are fine. - } else { - bail!(ErrorKind::ConflictingDatoms); - } - } else { - self.tx_instant = Some(instant); - } - continue; - } else { - // The type error has been caught earlier. - unreachable!() - } - } - - self.watcher.datom(op, e, a, &v); - - let reduced = (e, a, attribute, v, added); - match (attribute.fulltext, attribute.multival) { - (false, true) => non_fts_many.push(reduced), - (false, false) => non_fts_one.push(reduced), - (true, false) => fts_one.push(reduced), - (true, true) => fts_many.push(reduced), - } - }, - } + let errors = tx_checking::type_disagreements(&aev_trie); + if !errors.is_empty() { + bail!(ErrorKind::SchemaConstraintViolation(errors::SchemaConstraintViolation::TypeDisagreements { conflicting_datoms: errors })); } - tx_instant = self.tx_instant.unwrap_or_else(now); + let errors = tx_checking::cardinality_conflicts(&aev_trie); + if !errors.is_empty() { + bail!(ErrorKind::SchemaConstraintViolation(errors::SchemaConstraintViolation::CardinalityConflicts { conflicts: errors })); + } - // Transact [:db/add :db/txInstant tx_instant (transaction-tx)]. - non_fts_one.push((self.tx_id, - entids::DB_TX_INSTANT, - self.schema.require_attribute_for_entid(entids::DB_TX_INSTANT).unwrap(), - tx_instant.into(), - true)); + // Pipeline stage 4: final terms (after rewriting) -> DB insertions. + // Collect into non_fts_*. + + tx_instant = get_or_insert_tx_instant(&mut aev_trie, &self.schema, self.tx_id)?; + + for ((a, attribute), evs) in aev_trie { + if entids::might_update_metadata(a) { + tx_might_update_metadata = true; + } + + let mut queue = match (attribute.fulltext, attribute.multival) { + (false, true) => &mut non_fts_many, + (false, false) => &mut non_fts_one, + (true, false) => &mut fts_one, + (true, true) => &mut fts_many, + }; + + for (e, ars) in evs { + for (added, v) in ars.add.into_iter().map(|v| (true, v)).chain(ars.retract.into_iter().map(|v| (false, v))) { + let op = match added { + true => OpType::Add, + false => OpType::Retract, + }; + self.watcher.datom(op, e, a, &v); + queue.push((e, a, attribute, v, added)); + } + } + } if !non_fts_one.is_empty() { self.store.insert_non_fts_searches(&non_fts_one[..], db::SearchType::Inexact)?; @@ -885,3 +870,59 @@ pub fn transact_terms<'conn, 'a, I, W>(conn: &'conn rusqlite::Connection, let report = tx.transact_simple_terms(terms, tempid_set)?; conclude_tx(tx, report) } + +fn extend_aev_trie<'schema, I>(schema: &'schema Schema, terms: I, trie: &mut AEVTrie<'schema>) -> Result<()> +where I: IntoIterator +{ + for Term::AddOrRetract(op, KnownEntid(e), a, v) in terms.into_iter() { + let attribute: &Attribute = schema.require_attribute_for_entid(a)?; + + let a_and_r = trie + .entry((a, attribute)).or_insert(BTreeMap::default()) + .entry(e).or_insert(AddAndRetract::default()); + + match op { + OpType::Add => a_and_r.add.insert(v), + OpType::Retract => a_and_r.retract.insert(v), + }; + } + + Ok(()) +} + +pub(crate) fn into_aev_trie<'schema>(schema: &'schema Schema, final_populations: FinalPopulations, inert_terms: Vec) -> Result> { + let mut trie = AEVTrie::default(); + extend_aev_trie(schema, final_populations.resolved, &mut trie)?; + extend_aev_trie(schema, final_populations.allocated, &mut trie)?; + // Inert terms need to be unwrapped. It is a coding error if a term can't be unwrapped. + extend_aev_trie(schema, inert_terms.into_iter().map(|term| term.unwrap()), &mut trie)?; + + Ok(trie) +} + +/// Transact [:db/add :db/txInstant tx_instant (transaction-tx)] if the trie doesn't contain it +/// already. Return the instant from the input or the instant inserted. +fn get_or_insert_tx_instant<'schema>(aev_trie: &mut AEVTrie<'schema>, schema: &'schema Schema, tx_id: Entid) -> Result> { + let ars = aev_trie + .entry((entids::DB_TX_INSTANT, schema.require_attribute_for_entid(entids::DB_TX_INSTANT)?)) + .or_insert(BTreeMap::default()) + .entry(tx_id) + .or_insert(AddAndRetract::default()); + if !ars.retract.is_empty() { + // Cannot retract :db/txInstant! + } + + // Otherwise we have a coding error -- we should have cardinality checked this already. + assert!(ars.add.len() <= 1); + + let first = ars.add.iter().next().cloned(); + match first { + Some(TypedValue::Instant(instant)) => Ok(instant), + Some(_) => unreachable!(), // This is a coding error -- we should have typechecked this already. + None => { + let instant = now(); + ars.add.insert(instant.into()); + Ok(instant) + }, + } +} diff --git a/db/src/tx_checking.rs b/db/src/tx_checking.rs new file mode 100644 index 00000000..64b1eb78 --- /dev/null +++ b/db/src/tx_checking.rs @@ -0,0 +1,84 @@ +// Copyright 2018 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +use std::collections::{ + BTreeSet, + BTreeMap, +}; + +use mentat_core::{ + Entid, + TypedValue, + ValueType, +}; + +use errors::{ + CardinalityConflict, +}; + +use internal_types::{ + AEVTrie, +}; + +/// Map from found [e a v] to expected type. +pub(crate) type TypeDisagreements = BTreeMap<(Entid, Entid, TypedValue), ValueType>; + +/// Ensure that the given terms type check. +/// +/// We try to be maximally helpful by yielding every malformed datom, rather than only the first. +/// In the future, we might change this choice, or allow the consumer to specify the robustness of +/// the type checking desired, since there is a cost to providing helpful diagnostics. +pub(crate) fn type_disagreements<'schema>(aev_trie: &AEVTrie<'schema>) -> TypeDisagreements { + let mut errors: TypeDisagreements = TypeDisagreements::default(); + + for (&(a, attribute), evs) in aev_trie { + for (&e, ref ars) in evs { + for v in ars.add.iter().chain(ars.retract.iter()) { + if attribute.value_type != v.value_type() { + errors.insert((e, a, v.clone()), attribute.value_type); + } + } + } + } + + errors +} + +/// Ensure that the given terms obey the cardinality restrictions of the given schema. +/// +/// That is, ensure that any cardinality one attribute is added with at most one distinct value for +/// any specific entity (although that one value may be repeated for the given entity). +/// It is an error to: +/// +/// - add two distinct values for the same cardinality one attribute and entity in a single transaction +/// - add and remove the same values for the same attribute and entity in a single transaction +/// +/// We try to be maximally helpful by yielding every malformed set of datoms, rather than just the +/// first set, or even the first conflict. In the future, we might change this choice, or allow the +/// consumer to specify the robustness of the cardinality checking desired. +pub(crate) fn cardinality_conflicts<'schema>(aev_trie: &AEVTrie<'schema>) -> Vec { + let mut errors = vec![]; + + for (&(a, attribute), evs) in aev_trie { + for (&e, ref ars) in evs { + if !attribute.multival && ars.add.len() > 1 { + let vs = ars.add.clone(); + errors.push(CardinalityConflict::CardinalityOneAddConflict { e, a, vs }); + } + + let vs: BTreeSet<_> = ars.retract.intersection(&ars.add).cloned().collect(); + if !vs.is_empty() { + errors.push(CardinalityConflict::AddRetractConflict { e, a, vs }) + } + } + } + + errors +} diff --git a/db/src/upsert_resolution.rs b/db/src/upsert_resolution.rs index 6aeea940..7e748757 100644 --- a/db/src/upsert_resolution.rs +++ b/db/src/upsert_resolution.rs @@ -13,7 +13,13 @@ //! This module implements the upsert resolution algorithm described at //! https://github.com/mozilla/mentat/wiki/Transacting:-upsert-resolution-algorithm. -use std::collections::BTreeSet; +use std::collections::{ + BTreeMap, + BTreeSet, +}; + +use indexmap; +use petgraph::unionfind; use errors; use errors::ErrorKind; @@ -27,6 +33,7 @@ use internal_types::{ Term, TermWithoutTempIds, TermWithTempIds, + TypedValueOr, }; use mentat_core::util::Either::*; @@ -64,8 +71,8 @@ pub(crate) struct Generation { upserts_ev: Vec, /// Entities that look like: - /// - [:db/add TEMPID b OTHERID], where b is not :db.unique/identity; - /// - [:db/add TEMPID b v], where b is not :db.unique/identity. + /// - [:db/add TEMPID b OTHERID]. b may be :db.unique/identity if it has failed to upsert. + /// - [:db/add TEMPID b v]. b may be :db.unique/identity if it has failed to upsert. /// - [:db/add e b OTHERID]. allocations: Vec, @@ -132,11 +139,10 @@ impl Generation { /// Return true if it's possible to evolve this generation further. /// - /// There can be complex upserts but no simple upserts to help resolve them. We accept the - /// overhead of having the database try to resolve an empty set of simple upserts, to avoid - /// having to special case complex upserts at entid allocation time. + /// Note that there can be complex upserts but no simple upserts to help resolve them, and in + /// this case, we cannot evolve further. pub(crate) fn can_evolve(&self) -> bool { - !self.upserts_e.is_empty() || !self.upserts_ev.is_empty() + !self.upserts_e.is_empty() } /// Evolve this generation one step further by rewriting the existing :db/add entities using the @@ -146,6 +152,10 @@ impl Generation { pub(crate) fn evolve_one_step(self, temp_id_map: &TempIdMap) -> Generation { let mut next = Generation::default(); + // We'll iterate our own allocations to resolve more things, but terms that have already + // resolved stay resolved. + next.resolved = self.resolved; + for UpsertE(t, a, v) in self.upserts_e { match temp_id_map.get(&*t) { Some(&n) => next.upserted.push(Term::AddOrRetract(OpType::Add, n, a, v)), @@ -163,7 +173,7 @@ impl Generation { }, (None, Some(&n2)) => next.upserts_e.push(UpsertE(t1, a, TypedValue::Ref(n2.0))), (Some(&n1), None) => next.allocations.push(Term::AddOrRetract(OpType::Add, Left(n1), a, Right(t2))), - (None, None) => next.allocations.push(Term::AddOrRetract(OpType::Add, Right(t1), a, Right(t2))), + (None, None) => next.upserts_ev.push(UpsertEV(t1, a, t2)) } } @@ -212,23 +222,43 @@ impl Generation { temp_id_avs } - /// After evolution is complete, yield the set of tempids that require entid allocation. These - /// are the tempids that appeared in [:db/add ...] entities, but that didn't upsert to existing - /// entids. - pub(crate) fn temp_ids_in_allocations(&self) -> BTreeSet { + /// Evolve potential upserts that haven't resolved into allocations. + pub(crate) fn allocate_unresolved_upserts(&mut self) -> errors::Result<()> { + let mut upserts_ev = vec![]; + ::std::mem::swap(&mut self.upserts_ev, &mut upserts_ev); + + self.allocations.extend(upserts_ev.into_iter().map(|UpsertEV(t1, a, t2)| Term::AddOrRetract(OpType::Add, Right(t1), a, Right(t2)))); + + Ok(()) + } + + /// After evolution is complete, yield the set of tempids that require entid allocation. + /// + /// Some of the tempids may be identified, so we also provide a map from tempid to a dense set + /// of contiguous integer labels. + pub(crate) fn temp_ids_in_allocations(&self, schema: &Schema) -> errors::Result> { assert!(self.upserts_e.is_empty(), "All upserts should have been upserted, resolved, or moved to the allocated population!"); assert!(self.upserts_ev.is_empty(), "All upserts should have been upserted, resolved, or moved to the allocated population!"); let mut temp_ids: BTreeSet = BTreeSet::default(); + let mut tempid_avs: BTreeMap<(Entid, TypedValueOr), Vec> = BTreeMap::default(); for term in self.allocations.iter() { match term { - &Term::AddOrRetract(OpType::Add, Right(ref t1), _, Right(ref t2)) => { + &Term::AddOrRetract(OpType::Add, Right(ref t1), a, Right(ref t2)) => { temp_ids.insert(t1.clone()); temp_ids.insert(t2.clone()); + let attribute: &Attribute = schema.require_attribute_for_entid(a)?; + if attribute.unique == Some(attribute::Unique::Identity) { + tempid_avs.entry((a, Right(t2.clone()))).or_insert(vec![]).push(t1.clone()); + } }, - &Term::AddOrRetract(OpType::Add, Right(ref t), _, Left(_)) => { + &Term::AddOrRetract(OpType::Add, Right(ref t), a, ref x @ Left(_)) => { temp_ids.insert(t.clone()); + let attribute: &Attribute = schema.require_attribute_for_entid(a)?; + if attribute.unique == Some(attribute::Unique::Identity) { + tempid_avs.entry((a, x.clone())).or_insert(vec![]).push(t.clone()); + } }, &Term::AddOrRetract(OpType::Add, Left(_), _, Right(ref t)) => { temp_ids.insert(t.clone()); @@ -241,7 +271,44 @@ impl Generation { } } - temp_ids + // Now we union-find all the known tempids. Two tempids are unioned if they both appear as + // the entity of an `[a v]` upsert, including when the value column `v` is itself a tempid. + let mut uf = unionfind::UnionFind::new(temp_ids.len()); + + // The union-find implementation from petgraph operates on contiguous indices, so we need to + // maintain the map from our tempids to indices ourselves. + let temp_ids: BTreeMap = temp_ids.into_iter().enumerate().map(|(i, tempid)| (tempid, i)).collect(); + + debug!("need to label tempids aggregated using tempid_avs {:?}", tempid_avs); + + for vs in tempid_avs.values() { + vs.first().and_then(|first| temp_ids.get(first)).map(|&first_index| { + for tempid in vs { + temp_ids.get(tempid).map(|&i| uf.union(first_index, i)); + } + }); + } + + debug!("union-find aggregation {:?}", uf.clone().into_labeling()); + + // Now that we have aggregated tempids, we need to label them using the smallest number of + // contiguous labels possible. + let mut tempid_map: BTreeMap = BTreeMap::default(); + + let mut dense_labels: indexmap::IndexSet = indexmap::IndexSet::default(); + + // We want to produce results that are as deterministic as possible, so we allocate labels + // for tempids in sorted order. This has the effect of making "a" allocate before "b", + // which is pleasant for testing. + for (tempid, tempid_index) in temp_ids { + let rep = uf.find_mut(tempid_index); + dense_labels.insert(rep); + dense_labels.get_full(&rep).map(|(dense_index, _)| tempid_map.insert(tempid.clone(), dense_index)); + } + + debug!("labeled tempids using {} labels: {:?}", dense_labels.len(), tempid_map); + + Ok(tempid_map) } /// After evolution is complete, use the provided allocated entids to segment `self` into diff --git a/src/conn.rs b/src/conn.rs index 585c9943..c694aa9a 100644 --- a/src/conn.rs +++ b/src/conn.rs @@ -1697,11 +1697,13 @@ mod tests { let mut tx_ids = Vec::new(); let mut changesets = Vec::new(); + let db_tx_instant_entid: Entid = conn.conn().current_schema().get_entid(&kw!(:db/txInstant)).expect("entid to exist for :db/txInstant").into(); let uuid_entid: Entid = conn.conn().current_schema().get_entid(&kw!(:todo/uuid)).expect("entid to exist for name").into(); { let mut in_progress = conn.begin_transaction().expect("expected transaction"); for i in 0..3 { let mut changeset = BTreeSet::new(); + changeset.insert(db_tx_instant_entid.clone()); let name = format!("todo{}", i); let uuid = Uuid::new_v4(); let mut builder = in_progress.builder().describe_tempid(&name); diff --git a/tests/tolstoy.rs b/tests/tolstoy.rs index 4c905e00..788dc442 100644 --- a/tests/tolstoy.rs +++ b/tests/tolstoy.rs @@ -147,9 +147,8 @@ fn test_reader() { // Inspect the transaction part. let tx_id = receiver.txes.keys().nth(1).expect("tx"); let datoms = receiver.txes.get(tx_id).expect("datoms"); - let part = &datoms[0]; + let part = datoms.iter().find(|&part| &part.e == asserted_e).expect("to find asserted datom"); - assert_eq!(asserted_e, &part.e); assert_eq!(numba_entity_id, &part.a); assert!(part.v.matches_type(ValueType::Long)); assert_eq!(TypedValue::Long(123), part.v);