diff --git a/db/Cargo.toml b/db/Cargo.toml index 77c2bc70..b791c118 100644 --- a/db/Cargo.toml +++ b/db/Cargo.toml @@ -8,6 +8,7 @@ error-chain = { git = "https://github.com/rnewman/error-chain", branch = "rnewma indexmap = "1" itertools = "0.7" lazy_static = "0.2" +log = "0.4" num = "0.1" ordered-float = "0.5" time = "0.1" @@ -31,3 +32,6 @@ path = "../tx-parser" # Should be dev-dependencies. [dependencies.tabwriter] version = "1.0.3" + +[dev-dependencies] +env_logger = "0.5" diff --git a/db/src/db.rs b/db/src/db.rs index 451caef0..0be561d0 100644 --- a/db/src/db.rs +++ b/db/src/db.rs @@ -1567,8 +1567,16 @@ mod tests { // Conflicting upserts fail. assert_transact!(conn, "[[:db/add \"t1\" :db/ident :name/Ivan] - [:db/add \"t1\" :db/ident :name/Petr]]", - Err("not yet implemented: Conflicting upsert: tempid \'t1\' resolves to more than one entid: 100, 101")); + [:db/add \"t1\" :db/ident :name/Petr]]", + Err("schema constraint violation: conflicting upserts:\n tempid External(\"t1\") upserts to {KnownEntid(100), KnownEntid(101)}\n")); + + // The error messages of conflicting upserts gives information about all failing upserts (in a particular generation). + assert_transact!(conn, "[[:db/add \"t2\" :db/ident :name/Grigory] + [:db/add \"t2\" :db/ident :name/Petr] + [:db/add \"t2\" :db/ident :name/Ivan] + [:db/add \"t1\" :db/ident :name/Ivan] + [:db/add \"t1\" :db/ident :name/Petr]]", + Err("schema constraint violation: conflicting upserts:\n tempid External(\"t1\") upserts to {KnownEntid(100), KnownEntid(101)}\n tempid External(\"t2\") upserts to {KnownEntid(100), KnownEntid(101)}\n")); // tempids in :db/retract that don't upsert fail. assert_transact!(conn, "[[:db/retract \"t1\" :db/ident :name/Anonymous]]", @@ -2448,4 +2456,56 @@ mod tests { ]"#, Err("Could not insert non-fts one statements into temporary search table!")); } + + #[test] + fn test_conflicting_upserts() { + let mut conn = TestConn::default(); + + assert_transact!(conn, r#"[ + {:db/ident :page/id :db/valueType :db.type/string :db/index true :db/unique :db.unique/identity} + {:db/ident :page/ref :db/valueType :db.type/ref :db/index true :db/unique :db.unique/identity} + {:db/ident :page/title :db/valueType :db.type/string :db/cardinality :db.cardinality/many} + ]"#); + + // Let's test some conflicting upserts. First, valid data to work with -- note self references. + assert_transact!(conn, r#"[ + [:db/add 111 :page/id "1"] + [:db/add 111 :page/ref 111] + [:db/add 222 :page/id "2"] + [:db/add 222 :page/ref 222] + ]"#); + + // Now valid upserts. Note the references are valid. + let report = assert_transact!(conn, r#"[ + [:db/add "a" :page/id "1"] + [:db/add "a" :page/ref "a"] + [:db/add "b" :page/id "2"] + [:db/add "b" :page/ref "b"] + ]"#); + assert_matches!(tempids(&report), + "{\"a\" 111 + \"b\" 222}"); + + // Now conflicting upserts. Note the references are reversed. This example is interesting + // because the first round `UpsertE` instances upsert, and this resolves all of the tempids + // in the `UpsertEV` instances. However, those `UpsertEV` instances lead to conflicting + // upserts! This tests that we don't resolve too far, giving a chance for those upserts to + // fail. This error message is crossing generations, although it's not reflected in the + // error data structure. + assert_transact!(conn, r#"[ + [:db/add "a" :page/id "1"] + [:db/add "a" :page/ref "b"] + [:db/add "b" :page/id "2"] + [:db/add "b" :page/ref "a"] + ]"#, + Err("schema constraint violation: conflicting upserts:\n tempid External(\"a\") upserts to {KnownEntid(111), KnownEntid(222)}\n tempid External(\"b\") upserts to {KnownEntid(111), KnownEntid(222)}\n")); + + // Here's a case where the upsert is not resolved, just allocated, but leads to conflicting + // cardinality one datoms. + assert_transact!(conn, r#"[ + [:db/add "x" :page/ref 333] + [:db/add "x" :page/ref 444] + ]"#, + Err("Could not insert non-fts one statements into temporary search table!")); + } } diff --git a/db/src/errors.rs b/db/src/errors.rs index 6dbf97f7..a90982d0 100644 --- a/db/src/errors.rs +++ b/db/src/errors.rs @@ -10,11 +10,55 @@ #![allow(dead_code)] +use std::collections::{ + BTreeMap, + BTreeSet, +}; + use edn; use rusqlite; +use mentat_tx::entities::{ + TempId, +}; use mentat_tx_parser; -use types::{Entid, ValueType}; +use mentat_core::{ + KnownEntid, +}; +use types::{ + Entid, + ValueType, +}; + +#[derive(Clone, Debug, Eq, PartialEq)] +pub enum SchemaConstraintViolation { + /// A transaction tried to assert datoms where one tempid upserts to two (or more) distinct + /// entids. + ConflictingUpserts { + /// A map from tempid to the entids it would upsert to. + /// + /// In the future, we might even be able to attribute the upserts to particular (reduced) + /// datoms, i.e., to particular `[e a v]` triples that caused the constraint violation. + /// Attributing constraint violations to input data is more difficult to the multiple + /// rewriting passes the input undergoes. + conflicting_upserts: BTreeMap>, + }, +} + +impl ::std::fmt::Display for SchemaConstraintViolation { + fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result { + use self::SchemaConstraintViolation::*; + match self { + &ConflictingUpserts { ref conflicting_upserts } => { + write!(f, "conflicting upserts:\n")?; + for (tempid, entids) in conflicting_upserts { + write!(f, " tempid {:?} upserts to {:?}\n", tempid, entids)?; + } + Ok(()) + }, + } + } +} error_chain! { types { @@ -102,5 +146,11 @@ error_chain! { description("schema alteration failed") display("schema alteration failed: {}", t) } + + /// A transaction tried to violate a constraint of the schema of the Mentat store. + SchemaConstraintViolation(violation: SchemaConstraintViolation) { + description("schema constraint violation") + display("schema constraint violation: {}", violation) + } } } diff --git a/db/src/lib.rs b/db/src/lib.rs index bb4d257c..e0994e80 100644 --- a/db/src/lib.rs +++ b/db/src/lib.rs @@ -11,21 +11,18 @@ // Oh, error_chain. #![recursion_limit="128"] -#[macro_use] -extern crate error_chain; +#[macro_use] extern crate error_chain; extern crate indexmap; extern crate itertools; - -#[macro_use] -extern crate lazy_static; +#[macro_use] extern crate lazy_static; +#[macro_use] extern crate log; extern crate num; extern crate rusqlite; extern crate tabwriter; extern crate time; -#[macro_use] -extern crate edn; +#[macro_use] extern crate edn; extern crate mentat_core; extern crate mentat_tx; extern crate mentat_tx_parser; diff --git a/db/src/tx.rs b/db/src/tx.rs index e8a30e1c..6e03c400 100644 --- a/db/src/tx.rs +++ b/db/src/tx.rs @@ -51,6 +51,9 @@ use std::collections::{ BTreeSet, VecDeque, }; +use std::iter::{ + once, +}; use std::rc::{ Rc, }; @@ -64,7 +67,11 @@ use edn::{ NamespacedKeyword, }; use entids; -use errors::{ErrorKind, Result}; +use errors; +use errors::{ + ErrorKind, + Result, +}; use internal_types::{ KnownEntidOr, LookupRef, @@ -190,21 +197,30 @@ impl<'conn, 'a, W> Tx<'conn, 'a, W> where W: TransactWatcher { // Lookup in the store. let av_map: AVMap = self.store.resolve_avs(&av_pairs[..])?; + debug!("looked up avs {:?}", av_map); + // Map id->entid. - let mut temp_id_map: TempIdMap = TempIdMap::default(); - for &(ref temp_id, ref av_pair) in temp_id_avs { - if let Some(n) = av_map.get(&av_pair) { - if let Some(&KnownEntid(previous_n)) = temp_id_map.get(&*temp_id) { - if *n != previous_n { - // Conflicting upsert! TODO: collect conflicts and give more details on what failed this transaction. - bail!(ErrorKind::NotYetImplemented(format!("Conflicting upsert: tempid '{}' resolves to more than one entid: {:?}, {:?}", temp_id, previous_n, n))) // XXX + let mut tempids: TempIdMap = TempIdMap::default(); + + // Errors. BTree* since we want deterministic results. + let mut conflicting_upserts: BTreeMap> = BTreeMap::default(); + + for &(ref tempid, ref av_pair) in temp_id_avs { + trace!("tempid {:?} av_pair {:?} -> {:?}", tempid, av_pair, av_map.get(&av_pair)); + if let Some(entid) = av_map.get(&av_pair).cloned().map(KnownEntid) { + tempids.insert(tempid.clone(), entid).map(|previous| { + if entid != previous { + conflicting_upserts.entry((**tempid).clone()).or_insert_with(|| once(previous).collect::>()).insert(entid); } - } - temp_id_map.insert(temp_id.clone(), KnownEntid(*n)); + }); } } - Ok(temp_id_map) + if !conflicting_upserts.is_empty() { + bail!(ErrorKind::SchemaConstraintViolation(errors::SchemaConstraintViolation::ConflictingUpserts { conflicting_upserts })); + } + + Ok(tempids) } /// Pipeline stage 1: convert `Entity` instances into `Term` instances, ready for term @@ -581,30 +597,46 @@ impl<'conn, 'a, W> Tx<'conn, 'a, W> where W: TransactWatcher { // And evolve them forward. while generation.can_evolve() { + debug!("generation {:?}", generation); + + let tempid_avs = generation.temp_id_avs(); + debug!("trying to resolve avs {:?}", tempid_avs); + // Evolve further. - let temp_id_map: TempIdMap = self.resolve_temp_id_avs(&generation.temp_id_avs()[..])?; + let temp_id_map: TempIdMap = self.resolve_temp_id_avs(&tempid_avs[..])?; + + debug!("resolved avs for tempids {:?}", temp_id_map); + generation = generation.evolve_one_step(&temp_id_map); + // Errors. BTree* since we want deterministic results. + let mut conflicting_upserts: BTreeMap> = BTreeMap::default(); + // Report each tempid that resolves via upsert. for (tempid, entid) in temp_id_map { - // Every tempid should be resolved at most once. Prima facie, we might expect a - // tempid to be resolved in two different generations. However, that is not so: the - // keys of temp_id_map are unique between generations.Suppose that id->e and id->e* - // are two such mappings, resolved on subsequent evolutionary steps, and that `id` - // is a key in the intersection of the two key sets. This can't happen: if `id` maps - // to `e` via id->e, all instances of `id` have been evolved forward (replaced with - // `e`) before we try to resolve the next set of `UpsertsE`. That is, we'll never - // successfully upsert the same tempid in more than one generation step. (We might - // upsert the same tempid to multiple entids via distinct `[a v]` pairs in a single - // generation step; in this case, the transaction will fail.) - let previous = tempids.insert((*tempid).clone(), entid); - assert!(previous.is_none()); + // Since `UpsertEV` instances always transition to `UpsertE` instances, it might be + // that a tempid resolves in two generations, and those resolutions might conflict. + tempids.insert((*tempid).clone(), entid).map(|previous| { + if entid != previous { + conflicting_upserts.entry((*tempid).clone()).or_insert_with(|| once(previous).collect::>()).insert(entid); + } + }); } + + if !conflicting_upserts.is_empty() { + bail!(ErrorKind::SchemaConstraintViolation(errors::SchemaConstraintViolation::ConflictingUpserts { conflicting_upserts })); + } + + debug!("tempids {:?}", tempids); } + debug!("final generation {:?}", generation); + // Allocate entids for tempids that didn't upsert. BTreeSet rather than HashSet so this is deterministic. let unresolved_temp_ids: BTreeSet = generation.temp_ids_in_allocations(); + debug!("unresolved tempids {:?}", unresolved_temp_ids); + // TODO: track partitions for temporary IDs. let entids = self.partition_map.allocate_entids(":db.part/user", unresolved_temp_ids.len()); @@ -612,6 +644,8 @@ impl<'conn, 'a, W> Tx<'conn, 'a, W> where W: TransactWatcher { .zip(entids.map(|e| KnownEntid(e))) .collect(); + debug!("tempid allocations {:?}", temp_id_allocations); + let final_populations = generation.into_final_populations(&temp_id_allocations)?; // Report each tempid that is allocated. diff --git a/db/src/upsert_resolution.rs b/db/src/upsert_resolution.rs index da44edc9..6aeea940 100644 --- a/db/src/upsert_resolution.rs +++ b/db/src/upsert_resolution.rs @@ -155,7 +155,12 @@ impl Generation { for UpsertEV(t1, a, t2) in self.upserts_ev { match (temp_id_map.get(&*t1), temp_id_map.get(&*t2)) { - (Some(&n1), Some(&n2)) => next.resolved.push(Term::AddOrRetract(OpType::Add, n1, a, TypedValue::Ref(n2.0))), + (Some(_), Some(&n2)) => { + // Even though we can resolve entirely, it's possible that the remaining upsert + // could conflict. Moving straight to resolved doesn't give us a chance to + // search the store for the conflict. + next.upserts_e.push(UpsertE(t1, a, TypedValue::Ref(n2.0))) + }, (None, Some(&n2)) => next.upserts_e.push(UpsertE(t1, a, TypedValue::Ref(n2.0))), (Some(&n1), None) => next.allocations.push(Term::AddOrRetract(OpType::Add, Left(n1), a, Right(t2))), (None, None) => next.allocations.push(Term::AddOrRetract(OpType::Add, Right(t1), a, Right(t2))), diff --git a/src/conn.rs b/src/conn.rs index 3ef0df5d..6549ea45 100644 --- a/src/conn.rs +++ b/src/conn.rs @@ -1175,8 +1175,8 @@ mod tests { let report = conn.transact(&mut sqlite, "[[:db/add \"u\" :db/ident :a/keyword] [:db/add \"u\" :db/ident :b/keyword]]"); match report.unwrap_err() { - Error(ErrorKind::DbError(::mentat_db::errors::ErrorKind::NotYetImplemented(_)), _) => { }, - x => panic!("expected EDN parse error, got {:?}", x), + Error(ErrorKind::DbError(::mentat_db::errors::ErrorKind::SchemaConstraintViolation(_)), _) => { }, + x => panic!("expected schema constraint violation, got {:?}", x), } } diff --git a/tools/cli/Cargo.toml b/tools/cli/Cargo.toml index 93c3488f..e06753b3 100644 --- a/tools/cli/Cargo.toml +++ b/tools/cli/Cargo.toml @@ -13,11 +13,11 @@ test = false [dependencies] combine = "2.2.2" -env_logger = "0.3" +env_logger = "0.5" getopts = "0.2" lazy_static = "0.2" linefeed = "0.4" -log = "0.3" +log = "0.4" tabwriter = "1" tempfile = "1.1" termion = "1"