[tx] Fail transactions where complex upserts resolve to multiple entids. (#670)
This innocuous looking change (upserts_ev -> upserts_e -> resolved in all situations, rather than upserts_ev -> resolved in some situations) is a significant change in semantics and assumptions in the transactor. Witness the large comment being removed about the same tempid resolving in different generations! To support this change, we provide more holistic errors for conflicting upserts, which entails collecting some (relatively expensive) diagnostic data. I left in some debug logging, simply since it shouldn't hurt in general, and will likely be useful for the next bug we see in the transactor.
This commit is contained in:
parent
7960b4ccd2
commit
2b82ffb2e5
6 changed files with 183 additions and 37 deletions
64
db/src/db.rs
64
db/src/db.rs
|
@ -1567,8 +1567,16 @@ mod tests {
|
||||||
|
|
||||||
// Conflicting upserts fail.
|
// Conflicting upserts fail.
|
||||||
assert_transact!(conn, "[[:db/add \"t1\" :db/ident :name/Ivan]
|
assert_transact!(conn, "[[:db/add \"t1\" :db/ident :name/Ivan]
|
||||||
[:db/add \"t1\" :db/ident :name/Petr]]",
|
[:db/add \"t1\" :db/ident :name/Petr]]",
|
||||||
Err("not yet implemented: Conflicting upsert: tempid \'t1\' resolves to more than one entid: 100, 101"));
|
Err("schema constraint violation: conflicting upserts:\n tempid External(\"t1\") upserts to {KnownEntid(100), KnownEntid(101)}\n"));
|
||||||
|
|
||||||
|
// The error messages of conflicting upserts gives information about all failing upserts (in a particular generation).
|
||||||
|
assert_transact!(conn, "[[:db/add \"t2\" :db/ident :name/Grigory]
|
||||||
|
[:db/add \"t2\" :db/ident :name/Petr]
|
||||||
|
[:db/add \"t2\" :db/ident :name/Ivan]
|
||||||
|
[:db/add \"t1\" :db/ident :name/Ivan]
|
||||||
|
[:db/add \"t1\" :db/ident :name/Petr]]",
|
||||||
|
Err("schema constraint violation: conflicting upserts:\n tempid External(\"t1\") upserts to {KnownEntid(100), KnownEntid(101)}\n tempid External(\"t2\") upserts to {KnownEntid(100), KnownEntid(101)}\n"));
|
||||||
|
|
||||||
// tempids in :db/retract that don't upsert fail.
|
// tempids in :db/retract that don't upsert fail.
|
||||||
assert_transact!(conn, "[[:db/retract \"t1\" :db/ident :name/Anonymous]]",
|
assert_transact!(conn, "[[:db/retract \"t1\" :db/ident :name/Anonymous]]",
|
||||||
|
@ -2448,4 +2456,56 @@ mod tests {
|
||||||
]"#,
|
]"#,
|
||||||
Err("Could not insert non-fts one statements into temporary search table!"));
|
Err("Could not insert non-fts one statements into temporary search table!"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_conflicting_upserts() {
|
||||||
|
let mut conn = TestConn::default();
|
||||||
|
|
||||||
|
assert_transact!(conn, r#"[
|
||||||
|
{:db/ident :page/id :db/valueType :db.type/string :db/index true :db/unique :db.unique/identity}
|
||||||
|
{:db/ident :page/ref :db/valueType :db.type/ref :db/index true :db/unique :db.unique/identity}
|
||||||
|
{:db/ident :page/title :db/valueType :db.type/string :db/cardinality :db.cardinality/many}
|
||||||
|
]"#);
|
||||||
|
|
||||||
|
// Let's test some conflicting upserts. First, valid data to work with -- note self references.
|
||||||
|
assert_transact!(conn, r#"[
|
||||||
|
[:db/add 111 :page/id "1"]
|
||||||
|
[:db/add 111 :page/ref 111]
|
||||||
|
[:db/add 222 :page/id "2"]
|
||||||
|
[:db/add 222 :page/ref 222]
|
||||||
|
]"#);
|
||||||
|
|
||||||
|
// Now valid upserts. Note the references are valid.
|
||||||
|
let report = assert_transact!(conn, r#"[
|
||||||
|
[:db/add "a" :page/id "1"]
|
||||||
|
[:db/add "a" :page/ref "a"]
|
||||||
|
[:db/add "b" :page/id "2"]
|
||||||
|
[:db/add "b" :page/ref "b"]
|
||||||
|
]"#);
|
||||||
|
assert_matches!(tempids(&report),
|
||||||
|
"{\"a\" 111
|
||||||
|
\"b\" 222}");
|
||||||
|
|
||||||
|
// Now conflicting upserts. Note the references are reversed. This example is interesting
|
||||||
|
// because the first round `UpsertE` instances upsert, and this resolves all of the tempids
|
||||||
|
// in the `UpsertEV` instances. However, those `UpsertEV` instances lead to conflicting
|
||||||
|
// upserts! This tests that we don't resolve too far, giving a chance for those upserts to
|
||||||
|
// fail. This error message is crossing generations, although it's not reflected in the
|
||||||
|
// error data structure.
|
||||||
|
assert_transact!(conn, r#"[
|
||||||
|
[:db/add "a" :page/id "1"]
|
||||||
|
[:db/add "a" :page/ref "b"]
|
||||||
|
[:db/add "b" :page/id "2"]
|
||||||
|
[:db/add "b" :page/ref "a"]
|
||||||
|
]"#,
|
||||||
|
Err("schema constraint violation: conflicting upserts:\n tempid External(\"a\") upserts to {KnownEntid(111), KnownEntid(222)}\n tempid External(\"b\") upserts to {KnownEntid(111), KnownEntid(222)}\n"));
|
||||||
|
|
||||||
|
// Here's a case where the upsert is not resolved, just allocated, but leads to conflicting
|
||||||
|
// cardinality one datoms.
|
||||||
|
assert_transact!(conn, r#"[
|
||||||
|
[:db/add "x" :page/ref 333]
|
||||||
|
[:db/add "x" :page/ref 444]
|
||||||
|
]"#,
|
||||||
|
Err("Could not insert non-fts one statements into temporary search table!"));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -10,11 +10,55 @@
|
||||||
|
|
||||||
#![allow(dead_code)]
|
#![allow(dead_code)]
|
||||||
|
|
||||||
|
use std::collections::{
|
||||||
|
BTreeMap,
|
||||||
|
BTreeSet,
|
||||||
|
};
|
||||||
|
|
||||||
use edn;
|
use edn;
|
||||||
use rusqlite;
|
use rusqlite;
|
||||||
|
|
||||||
|
use mentat_tx::entities::{
|
||||||
|
TempId,
|
||||||
|
};
|
||||||
use mentat_tx_parser;
|
use mentat_tx_parser;
|
||||||
use types::{Entid, ValueType};
|
use mentat_core::{
|
||||||
|
KnownEntid,
|
||||||
|
};
|
||||||
|
use types::{
|
||||||
|
Entid,
|
||||||
|
ValueType,
|
||||||
|
};
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, Eq, PartialEq)]
|
||||||
|
pub enum SchemaConstraintViolation {
|
||||||
|
/// A transaction tried to assert datoms where one tempid upserts to two (or more) distinct
|
||||||
|
/// entids.
|
||||||
|
ConflictingUpserts {
|
||||||
|
/// A map from tempid to the entids it would upsert to.
|
||||||
|
///
|
||||||
|
/// In the future, we might even be able to attribute the upserts to particular (reduced)
|
||||||
|
/// datoms, i.e., to particular `[e a v]` triples that caused the constraint violation.
|
||||||
|
/// Attributing constraint violations to input data is more difficult to the multiple
|
||||||
|
/// rewriting passes the input undergoes.
|
||||||
|
conflicting_upserts: BTreeMap<TempId, BTreeSet<KnownEntid>>,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ::std::fmt::Display for SchemaConstraintViolation {
|
||||||
|
fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
|
||||||
|
use self::SchemaConstraintViolation::*;
|
||||||
|
match self {
|
||||||
|
&ConflictingUpserts { ref conflicting_upserts } => {
|
||||||
|
write!(f, "conflicting upserts:\n")?;
|
||||||
|
for (tempid, entids) in conflicting_upserts {
|
||||||
|
write!(f, " tempid {:?} upserts to {:?}\n", tempid, entids)?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
error_chain! {
|
error_chain! {
|
||||||
types {
|
types {
|
||||||
|
@ -102,5 +146,11 @@ error_chain! {
|
||||||
description("schema alteration failed")
|
description("schema alteration failed")
|
||||||
display("schema alteration failed: {}", t)
|
display("schema alteration failed: {}", t)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// A transaction tried to violate a constraint of the schema of the Mentat store.
|
||||||
|
SchemaConstraintViolation(violation: SchemaConstraintViolation) {
|
||||||
|
description("schema constraint violation")
|
||||||
|
display("schema constraint violation: {}", violation)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -11,21 +11,18 @@
|
||||||
// Oh, error_chain.
|
// Oh, error_chain.
|
||||||
#![recursion_limit="128"]
|
#![recursion_limit="128"]
|
||||||
|
|
||||||
#[macro_use]
|
#[macro_use] extern crate error_chain;
|
||||||
extern crate error_chain;
|
|
||||||
extern crate indexmap;
|
extern crate indexmap;
|
||||||
extern crate itertools;
|
extern crate itertools;
|
||||||
|
#[macro_use] extern crate lazy_static;
|
||||||
#[macro_use]
|
#[macro_use] extern crate log;
|
||||||
extern crate lazy_static;
|
|
||||||
|
|
||||||
extern crate num;
|
extern crate num;
|
||||||
extern crate rusqlite;
|
extern crate rusqlite;
|
||||||
extern crate tabwriter;
|
extern crate tabwriter;
|
||||||
extern crate time;
|
extern crate time;
|
||||||
|
|
||||||
#[macro_use]
|
#[macro_use] extern crate edn;
|
||||||
extern crate edn;
|
|
||||||
extern crate mentat_core;
|
extern crate mentat_core;
|
||||||
extern crate mentat_tx;
|
extern crate mentat_tx;
|
||||||
extern crate mentat_tx_parser;
|
extern crate mentat_tx_parser;
|
||||||
|
|
82
db/src/tx.rs
82
db/src/tx.rs
|
@ -51,6 +51,9 @@ use std::collections::{
|
||||||
BTreeSet,
|
BTreeSet,
|
||||||
VecDeque,
|
VecDeque,
|
||||||
};
|
};
|
||||||
|
use std::iter::{
|
||||||
|
once,
|
||||||
|
};
|
||||||
use std::rc::{
|
use std::rc::{
|
||||||
Rc,
|
Rc,
|
||||||
};
|
};
|
||||||
|
@ -64,7 +67,11 @@ use edn::{
|
||||||
NamespacedKeyword,
|
NamespacedKeyword,
|
||||||
};
|
};
|
||||||
use entids;
|
use entids;
|
||||||
use errors::{ErrorKind, Result};
|
use errors;
|
||||||
|
use errors::{
|
||||||
|
ErrorKind,
|
||||||
|
Result,
|
||||||
|
};
|
||||||
use internal_types::{
|
use internal_types::{
|
||||||
KnownEntidOr,
|
KnownEntidOr,
|
||||||
LookupRef,
|
LookupRef,
|
||||||
|
@ -190,21 +197,30 @@ impl<'conn, 'a, W> Tx<'conn, 'a, W> where W: TransactWatcher {
|
||||||
// Lookup in the store.
|
// Lookup in the store.
|
||||||
let av_map: AVMap = self.store.resolve_avs(&av_pairs[..])?;
|
let av_map: AVMap = self.store.resolve_avs(&av_pairs[..])?;
|
||||||
|
|
||||||
|
debug!("looked up avs {:?}", av_map);
|
||||||
|
|
||||||
// Map id->entid.
|
// Map id->entid.
|
||||||
let mut temp_id_map: TempIdMap = TempIdMap::default();
|
let mut tempids: TempIdMap = TempIdMap::default();
|
||||||
for &(ref temp_id, ref av_pair) in temp_id_avs {
|
|
||||||
if let Some(n) = av_map.get(&av_pair) {
|
// Errors. BTree* since we want deterministic results.
|
||||||
if let Some(&KnownEntid(previous_n)) = temp_id_map.get(&*temp_id) {
|
let mut conflicting_upserts: BTreeMap<TempId, BTreeSet<KnownEntid>> = BTreeMap::default();
|
||||||
if *n != previous_n {
|
|
||||||
// Conflicting upsert! TODO: collect conflicts and give more details on what failed this transaction.
|
for &(ref tempid, ref av_pair) in temp_id_avs {
|
||||||
bail!(ErrorKind::NotYetImplemented(format!("Conflicting upsert: tempid '{}' resolves to more than one entid: {:?}, {:?}", temp_id, previous_n, n))) // XXX
|
trace!("tempid {:?} av_pair {:?} -> {:?}", tempid, av_pair, av_map.get(&av_pair));
|
||||||
|
if let Some(entid) = av_map.get(&av_pair).cloned().map(KnownEntid) {
|
||||||
|
tempids.insert(tempid.clone(), entid).map(|previous| {
|
||||||
|
if entid != previous {
|
||||||
|
conflicting_upserts.entry((**tempid).clone()).or_insert_with(|| once(previous).collect::<BTreeSet<_>>()).insert(entid);
|
||||||
}
|
}
|
||||||
}
|
});
|
||||||
temp_id_map.insert(temp_id.clone(), KnownEntid(*n));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(temp_id_map)
|
if !conflicting_upserts.is_empty() {
|
||||||
|
bail!(ErrorKind::SchemaConstraintViolation(errors::SchemaConstraintViolation::ConflictingUpserts { conflicting_upserts }));
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(tempids)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Pipeline stage 1: convert `Entity` instances into `Term` instances, ready for term
|
/// Pipeline stage 1: convert `Entity` instances into `Term` instances, ready for term
|
||||||
|
@ -581,30 +597,46 @@ impl<'conn, 'a, W> Tx<'conn, 'a, W> where W: TransactWatcher {
|
||||||
|
|
||||||
// And evolve them forward.
|
// And evolve them forward.
|
||||||
while generation.can_evolve() {
|
while generation.can_evolve() {
|
||||||
|
debug!("generation {:?}", generation);
|
||||||
|
|
||||||
|
let tempid_avs = generation.temp_id_avs();
|
||||||
|
debug!("trying to resolve avs {:?}", tempid_avs);
|
||||||
|
|
||||||
// Evolve further.
|
// Evolve further.
|
||||||
let temp_id_map: TempIdMap = self.resolve_temp_id_avs(&generation.temp_id_avs()[..])?;
|
let temp_id_map: TempIdMap = self.resolve_temp_id_avs(&tempid_avs[..])?;
|
||||||
|
|
||||||
|
debug!("resolved avs for tempids {:?}", temp_id_map);
|
||||||
|
|
||||||
generation = generation.evolve_one_step(&temp_id_map);
|
generation = generation.evolve_one_step(&temp_id_map);
|
||||||
|
|
||||||
|
// Errors. BTree* since we want deterministic results.
|
||||||
|
let mut conflicting_upserts: BTreeMap<TempId, BTreeSet<KnownEntid>> = BTreeMap::default();
|
||||||
|
|
||||||
// Report each tempid that resolves via upsert.
|
// Report each tempid that resolves via upsert.
|
||||||
for (tempid, entid) in temp_id_map {
|
for (tempid, entid) in temp_id_map {
|
||||||
// Every tempid should be resolved at most once. Prima facie, we might expect a
|
// Since `UpsertEV` instances always transition to `UpsertE` instances, it might be
|
||||||
// tempid to be resolved in two different generations. However, that is not so: the
|
// that a tempid resolves in two generations, and those resolutions might conflict.
|
||||||
// keys of temp_id_map are unique between generations.Suppose that id->e and id->e*
|
tempids.insert((*tempid).clone(), entid).map(|previous| {
|
||||||
// are two such mappings, resolved on subsequent evolutionary steps, and that `id`
|
if entid != previous {
|
||||||
// is a key in the intersection of the two key sets. This can't happen: if `id` maps
|
conflicting_upserts.entry((*tempid).clone()).or_insert_with(|| once(previous).collect::<BTreeSet<_>>()).insert(entid);
|
||||||
// to `e` via id->e, all instances of `id` have been evolved forward (replaced with
|
}
|
||||||
// `e`) before we try to resolve the next set of `UpsertsE`. That is, we'll never
|
});
|
||||||
// successfully upsert the same tempid in more than one generation step. (We might
|
|
||||||
// upsert the same tempid to multiple entids via distinct `[a v]` pairs in a single
|
|
||||||
// generation step; in this case, the transaction will fail.)
|
|
||||||
let previous = tempids.insert((*tempid).clone(), entid);
|
|
||||||
assert!(previous.is_none());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !conflicting_upserts.is_empty() {
|
||||||
|
bail!(ErrorKind::SchemaConstraintViolation(errors::SchemaConstraintViolation::ConflictingUpserts { conflicting_upserts }));
|
||||||
|
}
|
||||||
|
|
||||||
|
debug!("tempids {:?}", tempids);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
debug!("final generation {:?}", generation);
|
||||||
|
|
||||||
// Allocate entids for tempids that didn't upsert. BTreeSet rather than HashSet so this is deterministic.
|
// Allocate entids for tempids that didn't upsert. BTreeSet rather than HashSet so this is deterministic.
|
||||||
let unresolved_temp_ids: BTreeSet<TempIdHandle> = generation.temp_ids_in_allocations();
|
let unresolved_temp_ids: BTreeSet<TempIdHandle> = generation.temp_ids_in_allocations();
|
||||||
|
|
||||||
|
debug!("unresolved tempids {:?}", unresolved_temp_ids);
|
||||||
|
|
||||||
// TODO: track partitions for temporary IDs.
|
// TODO: track partitions for temporary IDs.
|
||||||
let entids = self.partition_map.allocate_entids(":db.part/user", unresolved_temp_ids.len());
|
let entids = self.partition_map.allocate_entids(":db.part/user", unresolved_temp_ids.len());
|
||||||
|
|
||||||
|
@ -612,6 +644,8 @@ impl<'conn, 'a, W> Tx<'conn, 'a, W> where W: TransactWatcher {
|
||||||
.zip(entids.map(|e| KnownEntid(e)))
|
.zip(entids.map(|e| KnownEntid(e)))
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
|
debug!("tempid allocations {:?}", temp_id_allocations);
|
||||||
|
|
||||||
let final_populations = generation.into_final_populations(&temp_id_allocations)?;
|
let final_populations = generation.into_final_populations(&temp_id_allocations)?;
|
||||||
|
|
||||||
// Report each tempid that is allocated.
|
// Report each tempid that is allocated.
|
||||||
|
|
|
@ -155,7 +155,12 @@ impl Generation {
|
||||||
|
|
||||||
for UpsertEV(t1, a, t2) in self.upserts_ev {
|
for UpsertEV(t1, a, t2) in self.upserts_ev {
|
||||||
match (temp_id_map.get(&*t1), temp_id_map.get(&*t2)) {
|
match (temp_id_map.get(&*t1), temp_id_map.get(&*t2)) {
|
||||||
(Some(&n1), Some(&n2)) => next.resolved.push(Term::AddOrRetract(OpType::Add, n1, a, TypedValue::Ref(n2.0))),
|
(Some(_), Some(&n2)) => {
|
||||||
|
// Even though we can resolve entirely, it's possible that the remaining upsert
|
||||||
|
// could conflict. Moving straight to resolved doesn't give us a chance to
|
||||||
|
// search the store for the conflict.
|
||||||
|
next.upserts_e.push(UpsertE(t1, a, TypedValue::Ref(n2.0)))
|
||||||
|
},
|
||||||
(None, Some(&n2)) => next.upserts_e.push(UpsertE(t1, a, TypedValue::Ref(n2.0))),
|
(None, Some(&n2)) => next.upserts_e.push(UpsertE(t1, a, TypedValue::Ref(n2.0))),
|
||||||
(Some(&n1), None) => next.allocations.push(Term::AddOrRetract(OpType::Add, Left(n1), a, Right(t2))),
|
(Some(&n1), None) => next.allocations.push(Term::AddOrRetract(OpType::Add, Left(n1), a, Right(t2))),
|
||||||
(None, None) => next.allocations.push(Term::AddOrRetract(OpType::Add, Right(t1), a, Right(t2))),
|
(None, None) => next.allocations.push(Term::AddOrRetract(OpType::Add, Right(t1), a, Right(t2))),
|
||||||
|
|
|
@ -1175,8 +1175,8 @@ mod tests {
|
||||||
let report = conn.transact(&mut sqlite, "[[:db/add \"u\" :db/ident :a/keyword]
|
let report = conn.transact(&mut sqlite, "[[:db/add \"u\" :db/ident :a/keyword]
|
||||||
[:db/add \"u\" :db/ident :b/keyword]]");
|
[:db/add \"u\" :db/ident :b/keyword]]");
|
||||||
match report.unwrap_err() {
|
match report.unwrap_err() {
|
||||||
Error(ErrorKind::DbError(::mentat_db::errors::ErrorKind::NotYetImplemented(_)), _) => { },
|
Error(ErrorKind::DbError(::mentat_db::errors::ErrorKind::SchemaConstraintViolation(_)), _) => { },
|
||||||
x => panic!("expected EDN parse error, got {:?}", x),
|
x => panic!("expected schema constraint violation, got {:?}", x),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue