[tx] Fail transactions where complex upserts resolve to multiple entids. (#670) r=rnewman
This commit is contained in:
commit
9513012aa5
8 changed files with 189 additions and 39 deletions
|
@ -8,6 +8,7 @@ error-chain = { git = "https://github.com/rnewman/error-chain", branch = "rnewma
|
||||||
indexmap = "1"
|
indexmap = "1"
|
||||||
itertools = "0.7"
|
itertools = "0.7"
|
||||||
lazy_static = "0.2"
|
lazy_static = "0.2"
|
||||||
|
log = "0.4"
|
||||||
num = "0.1"
|
num = "0.1"
|
||||||
ordered-float = "0.5"
|
ordered-float = "0.5"
|
||||||
time = "0.1"
|
time = "0.1"
|
||||||
|
@ -31,3 +32,6 @@ path = "../tx-parser"
|
||||||
# Should be dev-dependencies.
|
# Should be dev-dependencies.
|
||||||
[dependencies.tabwriter]
|
[dependencies.tabwriter]
|
||||||
version = "1.0.3"
|
version = "1.0.3"
|
||||||
|
|
||||||
|
[dev-dependencies]
|
||||||
|
env_logger = "0.5"
|
||||||
|
|
64
db/src/db.rs
64
db/src/db.rs
|
@ -1567,8 +1567,16 @@ mod tests {
|
||||||
|
|
||||||
// Conflicting upserts fail.
|
// Conflicting upserts fail.
|
||||||
assert_transact!(conn, "[[:db/add \"t1\" :db/ident :name/Ivan]
|
assert_transact!(conn, "[[:db/add \"t1\" :db/ident :name/Ivan]
|
||||||
[:db/add \"t1\" :db/ident :name/Petr]]",
|
[:db/add \"t1\" :db/ident :name/Petr]]",
|
||||||
Err("not yet implemented: Conflicting upsert: tempid \'t1\' resolves to more than one entid: 100, 101"));
|
Err("schema constraint violation: conflicting upserts:\n tempid External(\"t1\") upserts to {KnownEntid(100), KnownEntid(101)}\n"));
|
||||||
|
|
||||||
|
// The error messages of conflicting upserts gives information about all failing upserts (in a particular generation).
|
||||||
|
assert_transact!(conn, "[[:db/add \"t2\" :db/ident :name/Grigory]
|
||||||
|
[:db/add \"t2\" :db/ident :name/Petr]
|
||||||
|
[:db/add \"t2\" :db/ident :name/Ivan]
|
||||||
|
[:db/add \"t1\" :db/ident :name/Ivan]
|
||||||
|
[:db/add \"t1\" :db/ident :name/Petr]]",
|
||||||
|
Err("schema constraint violation: conflicting upserts:\n tempid External(\"t1\") upserts to {KnownEntid(100), KnownEntid(101)}\n tempid External(\"t2\") upserts to {KnownEntid(100), KnownEntid(101)}\n"));
|
||||||
|
|
||||||
// tempids in :db/retract that don't upsert fail.
|
// tempids in :db/retract that don't upsert fail.
|
||||||
assert_transact!(conn, "[[:db/retract \"t1\" :db/ident :name/Anonymous]]",
|
assert_transact!(conn, "[[:db/retract \"t1\" :db/ident :name/Anonymous]]",
|
||||||
|
@ -2448,4 +2456,56 @@ mod tests {
|
||||||
]"#,
|
]"#,
|
||||||
Err("Could not insert non-fts one statements into temporary search table!"));
|
Err("Could not insert non-fts one statements into temporary search table!"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_conflicting_upserts() {
|
||||||
|
let mut conn = TestConn::default();
|
||||||
|
|
||||||
|
assert_transact!(conn, r#"[
|
||||||
|
{:db/ident :page/id :db/valueType :db.type/string :db/index true :db/unique :db.unique/identity}
|
||||||
|
{:db/ident :page/ref :db/valueType :db.type/ref :db/index true :db/unique :db.unique/identity}
|
||||||
|
{:db/ident :page/title :db/valueType :db.type/string :db/cardinality :db.cardinality/many}
|
||||||
|
]"#);
|
||||||
|
|
||||||
|
// Let's test some conflicting upserts. First, valid data to work with -- note self references.
|
||||||
|
assert_transact!(conn, r#"[
|
||||||
|
[:db/add 111 :page/id "1"]
|
||||||
|
[:db/add 111 :page/ref 111]
|
||||||
|
[:db/add 222 :page/id "2"]
|
||||||
|
[:db/add 222 :page/ref 222]
|
||||||
|
]"#);
|
||||||
|
|
||||||
|
// Now valid upserts. Note the references are valid.
|
||||||
|
let report = assert_transact!(conn, r#"[
|
||||||
|
[:db/add "a" :page/id "1"]
|
||||||
|
[:db/add "a" :page/ref "a"]
|
||||||
|
[:db/add "b" :page/id "2"]
|
||||||
|
[:db/add "b" :page/ref "b"]
|
||||||
|
]"#);
|
||||||
|
assert_matches!(tempids(&report),
|
||||||
|
"{\"a\" 111
|
||||||
|
\"b\" 222}");
|
||||||
|
|
||||||
|
// Now conflicting upserts. Note the references are reversed. This example is interesting
|
||||||
|
// because the first round `UpsertE` instances upsert, and this resolves all of the tempids
|
||||||
|
// in the `UpsertEV` instances. However, those `UpsertEV` instances lead to conflicting
|
||||||
|
// upserts! This tests that we don't resolve too far, giving a chance for those upserts to
|
||||||
|
// fail. This error message is crossing generations, although it's not reflected in the
|
||||||
|
// error data structure.
|
||||||
|
assert_transact!(conn, r#"[
|
||||||
|
[:db/add "a" :page/id "1"]
|
||||||
|
[:db/add "a" :page/ref "b"]
|
||||||
|
[:db/add "b" :page/id "2"]
|
||||||
|
[:db/add "b" :page/ref "a"]
|
||||||
|
]"#,
|
||||||
|
Err("schema constraint violation: conflicting upserts:\n tempid External(\"a\") upserts to {KnownEntid(111), KnownEntid(222)}\n tempid External(\"b\") upserts to {KnownEntid(111), KnownEntid(222)}\n"));
|
||||||
|
|
||||||
|
// Here's a case where the upsert is not resolved, just allocated, but leads to conflicting
|
||||||
|
// cardinality one datoms.
|
||||||
|
assert_transact!(conn, r#"[
|
||||||
|
[:db/add "x" :page/ref 333]
|
||||||
|
[:db/add "x" :page/ref 444]
|
||||||
|
]"#,
|
||||||
|
Err("Could not insert non-fts one statements into temporary search table!"));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -10,11 +10,55 @@
|
||||||
|
|
||||||
#![allow(dead_code)]
|
#![allow(dead_code)]
|
||||||
|
|
||||||
|
use std::collections::{
|
||||||
|
BTreeMap,
|
||||||
|
BTreeSet,
|
||||||
|
};
|
||||||
|
|
||||||
use edn;
|
use edn;
|
||||||
use rusqlite;
|
use rusqlite;
|
||||||
|
|
||||||
|
use mentat_tx::entities::{
|
||||||
|
TempId,
|
||||||
|
};
|
||||||
use mentat_tx_parser;
|
use mentat_tx_parser;
|
||||||
use types::{Entid, ValueType};
|
use mentat_core::{
|
||||||
|
KnownEntid,
|
||||||
|
};
|
||||||
|
use types::{
|
||||||
|
Entid,
|
||||||
|
ValueType,
|
||||||
|
};
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, Eq, PartialEq)]
|
||||||
|
pub enum SchemaConstraintViolation {
|
||||||
|
/// A transaction tried to assert datoms where one tempid upserts to two (or more) distinct
|
||||||
|
/// entids.
|
||||||
|
ConflictingUpserts {
|
||||||
|
/// A map from tempid to the entids it would upsert to.
|
||||||
|
///
|
||||||
|
/// In the future, we might even be able to attribute the upserts to particular (reduced)
|
||||||
|
/// datoms, i.e., to particular `[e a v]` triples that caused the constraint violation.
|
||||||
|
/// Attributing constraint violations to input data is more difficult to the multiple
|
||||||
|
/// rewriting passes the input undergoes.
|
||||||
|
conflicting_upserts: BTreeMap<TempId, BTreeSet<KnownEntid>>,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ::std::fmt::Display for SchemaConstraintViolation {
|
||||||
|
fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
|
||||||
|
use self::SchemaConstraintViolation::*;
|
||||||
|
match self {
|
||||||
|
&ConflictingUpserts { ref conflicting_upserts } => {
|
||||||
|
write!(f, "conflicting upserts:\n")?;
|
||||||
|
for (tempid, entids) in conflicting_upserts {
|
||||||
|
write!(f, " tempid {:?} upserts to {:?}\n", tempid, entids)?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
error_chain! {
|
error_chain! {
|
||||||
types {
|
types {
|
||||||
|
@ -102,5 +146,11 @@ error_chain! {
|
||||||
description("schema alteration failed")
|
description("schema alteration failed")
|
||||||
display("schema alteration failed: {}", t)
|
display("schema alteration failed: {}", t)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// A transaction tried to violate a constraint of the schema of the Mentat store.
|
||||||
|
SchemaConstraintViolation(violation: SchemaConstraintViolation) {
|
||||||
|
description("schema constraint violation")
|
||||||
|
display("schema constraint violation: {}", violation)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -11,21 +11,18 @@
|
||||||
// Oh, error_chain.
|
// Oh, error_chain.
|
||||||
#![recursion_limit="128"]
|
#![recursion_limit="128"]
|
||||||
|
|
||||||
#[macro_use]
|
#[macro_use] extern crate error_chain;
|
||||||
extern crate error_chain;
|
|
||||||
extern crate indexmap;
|
extern crate indexmap;
|
||||||
extern crate itertools;
|
extern crate itertools;
|
||||||
|
#[macro_use] extern crate lazy_static;
|
||||||
#[macro_use]
|
#[macro_use] extern crate log;
|
||||||
extern crate lazy_static;
|
|
||||||
|
|
||||||
extern crate num;
|
extern crate num;
|
||||||
extern crate rusqlite;
|
extern crate rusqlite;
|
||||||
extern crate tabwriter;
|
extern crate tabwriter;
|
||||||
extern crate time;
|
extern crate time;
|
||||||
|
|
||||||
#[macro_use]
|
#[macro_use] extern crate edn;
|
||||||
extern crate edn;
|
|
||||||
extern crate mentat_core;
|
extern crate mentat_core;
|
||||||
extern crate mentat_tx;
|
extern crate mentat_tx;
|
||||||
extern crate mentat_tx_parser;
|
extern crate mentat_tx_parser;
|
||||||
|
|
82
db/src/tx.rs
82
db/src/tx.rs
|
@ -51,6 +51,9 @@ use std::collections::{
|
||||||
BTreeSet,
|
BTreeSet,
|
||||||
VecDeque,
|
VecDeque,
|
||||||
};
|
};
|
||||||
|
use std::iter::{
|
||||||
|
once,
|
||||||
|
};
|
||||||
use std::rc::{
|
use std::rc::{
|
||||||
Rc,
|
Rc,
|
||||||
};
|
};
|
||||||
|
@ -64,7 +67,11 @@ use edn::{
|
||||||
NamespacedKeyword,
|
NamespacedKeyword,
|
||||||
};
|
};
|
||||||
use entids;
|
use entids;
|
||||||
use errors::{ErrorKind, Result};
|
use errors;
|
||||||
|
use errors::{
|
||||||
|
ErrorKind,
|
||||||
|
Result,
|
||||||
|
};
|
||||||
use internal_types::{
|
use internal_types::{
|
||||||
KnownEntidOr,
|
KnownEntidOr,
|
||||||
LookupRef,
|
LookupRef,
|
||||||
|
@ -190,21 +197,30 @@ impl<'conn, 'a, W> Tx<'conn, 'a, W> where W: TransactWatcher {
|
||||||
// Lookup in the store.
|
// Lookup in the store.
|
||||||
let av_map: AVMap = self.store.resolve_avs(&av_pairs[..])?;
|
let av_map: AVMap = self.store.resolve_avs(&av_pairs[..])?;
|
||||||
|
|
||||||
|
debug!("looked up avs {:?}", av_map);
|
||||||
|
|
||||||
// Map id->entid.
|
// Map id->entid.
|
||||||
let mut temp_id_map: TempIdMap = TempIdMap::default();
|
let mut tempids: TempIdMap = TempIdMap::default();
|
||||||
for &(ref temp_id, ref av_pair) in temp_id_avs {
|
|
||||||
if let Some(n) = av_map.get(&av_pair) {
|
// Errors. BTree* since we want deterministic results.
|
||||||
if let Some(&KnownEntid(previous_n)) = temp_id_map.get(&*temp_id) {
|
let mut conflicting_upserts: BTreeMap<TempId, BTreeSet<KnownEntid>> = BTreeMap::default();
|
||||||
if *n != previous_n {
|
|
||||||
// Conflicting upsert! TODO: collect conflicts and give more details on what failed this transaction.
|
for &(ref tempid, ref av_pair) in temp_id_avs {
|
||||||
bail!(ErrorKind::NotYetImplemented(format!("Conflicting upsert: tempid '{}' resolves to more than one entid: {:?}, {:?}", temp_id, previous_n, n))) // XXX
|
trace!("tempid {:?} av_pair {:?} -> {:?}", tempid, av_pair, av_map.get(&av_pair));
|
||||||
|
if let Some(entid) = av_map.get(&av_pair).cloned().map(KnownEntid) {
|
||||||
|
tempids.insert(tempid.clone(), entid).map(|previous| {
|
||||||
|
if entid != previous {
|
||||||
|
conflicting_upserts.entry((**tempid).clone()).or_insert_with(|| once(previous).collect::<BTreeSet<_>>()).insert(entid);
|
||||||
}
|
}
|
||||||
}
|
});
|
||||||
temp_id_map.insert(temp_id.clone(), KnownEntid(*n));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(temp_id_map)
|
if !conflicting_upserts.is_empty() {
|
||||||
|
bail!(ErrorKind::SchemaConstraintViolation(errors::SchemaConstraintViolation::ConflictingUpserts { conflicting_upserts }));
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(tempids)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Pipeline stage 1: convert `Entity` instances into `Term` instances, ready for term
|
/// Pipeline stage 1: convert `Entity` instances into `Term` instances, ready for term
|
||||||
|
@ -581,30 +597,46 @@ impl<'conn, 'a, W> Tx<'conn, 'a, W> where W: TransactWatcher {
|
||||||
|
|
||||||
// And evolve them forward.
|
// And evolve them forward.
|
||||||
while generation.can_evolve() {
|
while generation.can_evolve() {
|
||||||
|
debug!("generation {:?}", generation);
|
||||||
|
|
||||||
|
let tempid_avs = generation.temp_id_avs();
|
||||||
|
debug!("trying to resolve avs {:?}", tempid_avs);
|
||||||
|
|
||||||
// Evolve further.
|
// Evolve further.
|
||||||
let temp_id_map: TempIdMap = self.resolve_temp_id_avs(&generation.temp_id_avs()[..])?;
|
let temp_id_map: TempIdMap = self.resolve_temp_id_avs(&tempid_avs[..])?;
|
||||||
|
|
||||||
|
debug!("resolved avs for tempids {:?}", temp_id_map);
|
||||||
|
|
||||||
generation = generation.evolve_one_step(&temp_id_map);
|
generation = generation.evolve_one_step(&temp_id_map);
|
||||||
|
|
||||||
|
// Errors. BTree* since we want deterministic results.
|
||||||
|
let mut conflicting_upserts: BTreeMap<TempId, BTreeSet<KnownEntid>> = BTreeMap::default();
|
||||||
|
|
||||||
// Report each tempid that resolves via upsert.
|
// Report each tempid that resolves via upsert.
|
||||||
for (tempid, entid) in temp_id_map {
|
for (tempid, entid) in temp_id_map {
|
||||||
// Every tempid should be resolved at most once. Prima facie, we might expect a
|
// Since `UpsertEV` instances always transition to `UpsertE` instances, it might be
|
||||||
// tempid to be resolved in two different generations. However, that is not so: the
|
// that a tempid resolves in two generations, and those resolutions might conflict.
|
||||||
// keys of temp_id_map are unique between generations.Suppose that id->e and id->e*
|
tempids.insert((*tempid).clone(), entid).map(|previous| {
|
||||||
// are two such mappings, resolved on subsequent evolutionary steps, and that `id`
|
if entid != previous {
|
||||||
// is a key in the intersection of the two key sets. This can't happen: if `id` maps
|
conflicting_upserts.entry((*tempid).clone()).or_insert_with(|| once(previous).collect::<BTreeSet<_>>()).insert(entid);
|
||||||
// to `e` via id->e, all instances of `id` have been evolved forward (replaced with
|
}
|
||||||
// `e`) before we try to resolve the next set of `UpsertsE`. That is, we'll never
|
});
|
||||||
// successfully upsert the same tempid in more than one generation step. (We might
|
|
||||||
// upsert the same tempid to multiple entids via distinct `[a v]` pairs in a single
|
|
||||||
// generation step; in this case, the transaction will fail.)
|
|
||||||
let previous = tempids.insert((*tempid).clone(), entid);
|
|
||||||
assert!(previous.is_none());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !conflicting_upserts.is_empty() {
|
||||||
|
bail!(ErrorKind::SchemaConstraintViolation(errors::SchemaConstraintViolation::ConflictingUpserts { conflicting_upserts }));
|
||||||
|
}
|
||||||
|
|
||||||
|
debug!("tempids {:?}", tempids);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
debug!("final generation {:?}", generation);
|
||||||
|
|
||||||
// Allocate entids for tempids that didn't upsert. BTreeSet rather than HashSet so this is deterministic.
|
// Allocate entids for tempids that didn't upsert. BTreeSet rather than HashSet so this is deterministic.
|
||||||
let unresolved_temp_ids: BTreeSet<TempIdHandle> = generation.temp_ids_in_allocations();
|
let unresolved_temp_ids: BTreeSet<TempIdHandle> = generation.temp_ids_in_allocations();
|
||||||
|
|
||||||
|
debug!("unresolved tempids {:?}", unresolved_temp_ids);
|
||||||
|
|
||||||
// TODO: track partitions for temporary IDs.
|
// TODO: track partitions for temporary IDs.
|
||||||
let entids = self.partition_map.allocate_entids(":db.part/user", unresolved_temp_ids.len());
|
let entids = self.partition_map.allocate_entids(":db.part/user", unresolved_temp_ids.len());
|
||||||
|
|
||||||
|
@ -612,6 +644,8 @@ impl<'conn, 'a, W> Tx<'conn, 'a, W> where W: TransactWatcher {
|
||||||
.zip(entids.map(|e| KnownEntid(e)))
|
.zip(entids.map(|e| KnownEntid(e)))
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
|
debug!("tempid allocations {:?}", temp_id_allocations);
|
||||||
|
|
||||||
let final_populations = generation.into_final_populations(&temp_id_allocations)?;
|
let final_populations = generation.into_final_populations(&temp_id_allocations)?;
|
||||||
|
|
||||||
// Report each tempid that is allocated.
|
// Report each tempid that is allocated.
|
||||||
|
|
|
@ -155,7 +155,12 @@ impl Generation {
|
||||||
|
|
||||||
for UpsertEV(t1, a, t2) in self.upserts_ev {
|
for UpsertEV(t1, a, t2) in self.upserts_ev {
|
||||||
match (temp_id_map.get(&*t1), temp_id_map.get(&*t2)) {
|
match (temp_id_map.get(&*t1), temp_id_map.get(&*t2)) {
|
||||||
(Some(&n1), Some(&n2)) => next.resolved.push(Term::AddOrRetract(OpType::Add, n1, a, TypedValue::Ref(n2.0))),
|
(Some(_), Some(&n2)) => {
|
||||||
|
// Even though we can resolve entirely, it's possible that the remaining upsert
|
||||||
|
// could conflict. Moving straight to resolved doesn't give us a chance to
|
||||||
|
// search the store for the conflict.
|
||||||
|
next.upserts_e.push(UpsertE(t1, a, TypedValue::Ref(n2.0)))
|
||||||
|
},
|
||||||
(None, Some(&n2)) => next.upserts_e.push(UpsertE(t1, a, TypedValue::Ref(n2.0))),
|
(None, Some(&n2)) => next.upserts_e.push(UpsertE(t1, a, TypedValue::Ref(n2.0))),
|
||||||
(Some(&n1), None) => next.allocations.push(Term::AddOrRetract(OpType::Add, Left(n1), a, Right(t2))),
|
(Some(&n1), None) => next.allocations.push(Term::AddOrRetract(OpType::Add, Left(n1), a, Right(t2))),
|
||||||
(None, None) => next.allocations.push(Term::AddOrRetract(OpType::Add, Right(t1), a, Right(t2))),
|
(None, None) => next.allocations.push(Term::AddOrRetract(OpType::Add, Right(t1), a, Right(t2))),
|
||||||
|
|
|
@ -1175,8 +1175,8 @@ mod tests {
|
||||||
let report = conn.transact(&mut sqlite, "[[:db/add \"u\" :db/ident :a/keyword]
|
let report = conn.transact(&mut sqlite, "[[:db/add \"u\" :db/ident :a/keyword]
|
||||||
[:db/add \"u\" :db/ident :b/keyword]]");
|
[:db/add \"u\" :db/ident :b/keyword]]");
|
||||||
match report.unwrap_err() {
|
match report.unwrap_err() {
|
||||||
Error(ErrorKind::DbError(::mentat_db::errors::ErrorKind::NotYetImplemented(_)), _) => { },
|
Error(ErrorKind::DbError(::mentat_db::errors::ErrorKind::SchemaConstraintViolation(_)), _) => { },
|
||||||
x => panic!("expected EDN parse error, got {:?}", x),
|
x => panic!("expected schema constraint violation, got {:?}", x),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -13,11 +13,11 @@ test = false
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
combine = "2.2.2"
|
combine = "2.2.2"
|
||||||
env_logger = "0.3"
|
env_logger = "0.5"
|
||||||
getopts = "0.2"
|
getopts = "0.2"
|
||||||
lazy_static = "0.2"
|
lazy_static = "0.2"
|
||||||
linefeed = "0.4"
|
linefeed = "0.4"
|
||||||
log = "0.3"
|
log = "0.4"
|
||||||
tabwriter = "1"
|
tabwriter = "1"
|
||||||
tempfile = "1.1"
|
tempfile = "1.1"
|
||||||
termion = "1"
|
termion = "1"
|
||||||
|
|
Loading…
Reference in a new issue