Compare commits
4 commits
master
...
rnewman/tx
Author | SHA1 | Date | |
---|---|---|---|
|
e64ffc555e | ||
|
149e6c8a46 | ||
|
3ab4b2ca95 | ||
|
a50b7aec3a |
4 changed files with 118 additions and 4 deletions
76
db/src/db.rs
76
db/src/db.rs
|
@ -743,6 +743,10 @@ impl MentatStoring for rusqlite::Connection {
|
|||
value_type_tag0 SMALLINT NOT NULL,
|
||||
added0 TINYINT NOT NULL,
|
||||
flags0 TINYINT NOT NULL)"#,
|
||||
|
||||
// We create this unique index so that it's impossible to violate a cardinality constraint
|
||||
// within a transaction.
|
||||
r#"CREATE UNIQUE INDEX IF NOT EXISTS temp.inexact_searches_unique ON inexact_searches (e0, a0) WHERE added0 = 1"#,
|
||||
r#"DROP TABLE IF EXISTS temp.search_results"#,
|
||||
// TODO: don't encode search_type as a STRING. This is explicit and much easier to read
|
||||
// than another flag, so we'll do it to start, and optimize later.
|
||||
|
@ -824,7 +828,7 @@ impl MentatStoring for rusqlite::Connection {
|
|||
let s: String = if search_type == SearchType::Exact {
|
||||
format!("INSERT INTO temp.exact_searches (e0, a0, v0, value_type_tag0, added0, flags0) VALUES {}", values)
|
||||
} else {
|
||||
format!("INSERT INTO temp.inexact_searches (e0, a0, v0, value_type_tag0, added0, flags0) VALUES {}", values)
|
||||
format!("INSERT OR REPLACE INTO temp.inexact_searches (e0, a0, v0, value_type_tag0, added0, flags0) VALUES {}", values)
|
||||
};
|
||||
|
||||
// TODO: consider ensuring we inserted the expected number of rows.
|
||||
|
@ -2269,4 +2273,74 @@ mod tests {
|
|||
"[{:test/_dangling 1.23}]",
|
||||
Err("EDN value \'1.23\' is not the expected Mentat value type Ref"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_cardinality_one_violation_existing_entity() {
|
||||
let mut conn = TestConn::default();
|
||||
|
||||
// Start by installing a few attributes.
|
||||
assert_transact!(conn, r#"[
|
||||
[:db/add 111 :db/ident :test/one]
|
||||
[:db/add 111 :db/valueType :db.type/long]
|
||||
[:db/add 111 :db/cardinality :db.cardinality/one]
|
||||
[:db/add 112 :db/ident :test/unique]
|
||||
[:db/add 112 :db/index true]
|
||||
[:db/add 112 :db/valueType :db.type/string]
|
||||
[:db/add 112 :db/cardinality :db.cardinality/one]
|
||||
[:db/add 112 :db/unique :db.unique/identity]
|
||||
]"#);
|
||||
|
||||
assert_transact!(conn, r#"[
|
||||
[:db/add "foo" :test/unique "x"]
|
||||
]"#);
|
||||
|
||||
// You can try to assert two values for the same entity and attribute,
|
||||
// but only one will be transacted.
|
||||
let report = assert_transact!(conn, r#"[
|
||||
[:db/add "foo" :test/unique "x"]
|
||||
[:db/add "foo" :test/one 123]
|
||||
[:db/add "bar" :test/unique "x"]
|
||||
[:db/add "bar" :test/one 124]
|
||||
]"#);
|
||||
assert_eq!(report.tempids.get("foo"), report.tempids.get("bar"));
|
||||
|
||||
assert_matches!(conn.last_transaction(),
|
||||
r#"[[65536 :test/one 124 ?tx true]
|
||||
[?tx :db/txInstant ?ms ?tx true]]"#);
|
||||
}
|
||||
|
||||
// TODO
|
||||
#[test]
|
||||
fn test_cardinality_one_violation_new_entity() {
|
||||
let mut conn = TestConn::default();
|
||||
|
||||
// Start by installing a few attributes.
|
||||
assert_transact!(conn, r#"[
|
||||
[:db/add 111 :db/ident :test/one]
|
||||
[:db/add 111 :db/valueType :db.type/long]
|
||||
[:db/add 111 :db/cardinality :db.cardinality/one]
|
||||
[:db/add 112 :db/ident :test/unique]
|
||||
[:db/add 112 :db/index true]
|
||||
[:db/add 112 :db/valueType :db.type/string]
|
||||
[:db/add 112 :db/cardinality :db.cardinality/one]
|
||||
[:db/add 112 :db/unique :db.unique/identity]
|
||||
]"#);
|
||||
|
||||
// You can try to assert two values for the same entity and attribute,
|
||||
// but only one will be transacted.
|
||||
// This variant (see also `test_cardinality_one_violation_existing_entity`)
|
||||
// is particularly tricky because the unique attribute is a new entity,
|
||||
// and so the search will fail, two entids will be allocated, and then
|
||||
// insert will fail.
|
||||
let report = assert_transact!(conn, r#"[
|
||||
[:db/add "foo" :test/unique "x"]
|
||||
[:db/add "foo" :test/one 123]
|
||||
[:db/add "bar" :test/unique "x"]
|
||||
[:db/add "bar" :test/one 124]
|
||||
]"#);
|
||||
assert_matches!(conn.last_transaction(),
|
||||
r#"[[998 :test/unique "x" ?tx true]
|
||||
[998 :test/one 124 ?tx true]
|
||||
[?tx :db/txInstant ?ms ?tx true]]"#);
|
||||
}
|
||||
}
|
||||
|
|
43
db/src/tx.rs
43
db/src/tx.rs
|
@ -164,6 +164,7 @@ impl<'conn, 'a> Tx<'conn, 'a> {
|
|||
/// which [a v] pairs do upsert to entids, and map each tempid that upserts to the upserted
|
||||
/// entid. The keys of the resulting map are exactly those tempids that upserted.
|
||||
pub fn resolve_temp_id_avs<'b>(&self, temp_id_avs: &'b [(TempIdHandle, AVPair)]) -> Result<TempIdMap> {
|
||||
println!("resolve_temp_id_avs: {:?}", temp_id_avs);
|
||||
if temp_id_avs.is_empty() {
|
||||
return Ok(TempIdMap::default());
|
||||
}
|
||||
|
@ -191,6 +192,7 @@ impl<'conn, 'a> Tx<'conn, 'a> {
|
|||
}
|
||||
}
|
||||
|
||||
println!("=> {:?}", temp_id_map);
|
||||
Ok((temp_id_map))
|
||||
}
|
||||
|
||||
|
@ -485,6 +487,7 @@ impl<'conn, 'a> Tx<'conn, 'a> {
|
|||
},
|
||||
}
|
||||
};
|
||||
println!("TERMS: {:?}", terms);
|
||||
Ok((terms, in_process.temp_ids, in_process.lookup_refs))
|
||||
}
|
||||
|
||||
|
@ -493,15 +496,46 @@ impl<'conn, 'a> Tx<'conn, 'a> {
|
|||
///
|
||||
/// The `Term` instances produced share interned TempId handles and have no LookupRef references.
|
||||
fn resolve_lookup_refs<I>(&self, lookup_ref_map: &AVMap, terms: I) -> Result<Vec<TermWithTempIds>> where I: IntoIterator<Item=TermWithTempIdsAndLookupRefs> {
|
||||
terms.into_iter().map(|term: TermWithTempIdsAndLookupRefs| -> Result<TermWithTempIds> {
|
||||
println!("\nLookup ref map: {:?}", lookup_ref_map);
|
||||
|
||||
let mut equivalences: ::std::collections::HashMap<(Entid, TypedValue), BTreeSet<TempIdHandle>> = Default::default();
|
||||
|
||||
let res = terms.into_iter().map(|term: TermWithTempIdsAndLookupRefs| -> Result<TermWithTempIds> {
|
||||
println!("\nResolving lookup refs: {:?}", term);
|
||||
match term {
|
||||
Term::AddOrRetract(op, e, a, v) => {
|
||||
let e = replace_lookup_ref(&lookup_ref_map, e, |x| KnownEntid(x))?;
|
||||
let v = replace_lookup_ref(&lookup_ref_map, v, |x| TypedValue::Ref(x))?;
|
||||
|
||||
// It's possible here that we'll encounter two lookup refs that are the same,
|
||||
// but have different tempids. We want these to collide to avoid problems in
|
||||
// the storage layer.
|
||||
if let (OpType::Add, &Either::Right(ref tempid), &Either::Left(ref vv)) = (op, &e, &v) {
|
||||
let pair: AVPair = (a, vv.clone());
|
||||
|
||||
println!("TempID {:?} is identified by {:?}", tempid, pair);
|
||||
|
||||
equivalences.entry(pair)
|
||||
.or_insert(Default::default())
|
||||
.insert(tempid.clone());
|
||||
}
|
||||
Ok(Term::AddOrRetract(op, e, a, v))
|
||||
},
|
||||
}
|
||||
}).collect::<Result<Vec<_>>>()
|
||||
}).collect::<Result<Vec<_>>>();
|
||||
|
||||
// Now invert the equivalences index: we wish to go from the entire set of tempids to
|
||||
// a subset of canonical tempids.
|
||||
let mut tempids: ::std::collections::HashMap<TempIdHandle, TempIdHandle> = Default::default();
|
||||
for (_, set) in equivalences.into_iter().filter(|&(_, ref v)| v.len() > 1) {
|
||||
let mut all = set.into_iter();
|
||||
let winner = all.next().unwrap(); // Length check above.
|
||||
for t in all {
|
||||
tempids.insert(t, winner.clone());
|
||||
}
|
||||
}
|
||||
println!("Found equivalences: {:?}", &tempids);
|
||||
res
|
||||
}
|
||||
|
||||
/// Transact the given `entities` against the store.
|
||||
|
@ -521,6 +555,7 @@ impl<'conn, 'a> Tx<'conn, 'a> {
|
|||
|
||||
let terms_with_temp_ids = self.resolve_lookup_refs(&lookup_ref_map, terms_with_temp_ids_and_lookup_refs)?;
|
||||
|
||||
println!("Terms with temp IDs: {:?}", terms_with_temp_ids);
|
||||
// Pipeline stage 3: upsert tempids -> terms without tempids or lookup refs.
|
||||
// Now we can collect upsert populations.
|
||||
let (mut generation, inert_terms) = Generation::from(terms_with_temp_ids, &self.schema)?;
|
||||
|
@ -551,6 +586,8 @@ impl<'conn, 'a> Tx<'conn, 'a> {
|
|||
// Allocate entids for tempids that didn't upsert. BTreeSet rather than HashSet so this is deterministic.
|
||||
let unresolved_temp_ids: BTreeSet<TempIdHandle> = generation.temp_ids_in_allocations();
|
||||
|
||||
println!("\nUNRESOLVED: \n>> {:?}\n>> {:?}\n", unresolved_temp_ids, &generation.temp_id_avs()[..]);
|
||||
|
||||
// TODO: track partitions for temporary IDs.
|
||||
let entids = self.partition_map.allocate_entids(":db.part/user", unresolved_temp_ids.len());
|
||||
|
||||
|
@ -587,6 +624,7 @@ impl<'conn, 'a> Tx<'conn, 'a> {
|
|||
let final_terms: Vec<TermWithoutTempIds> = [final_populations.resolved,
|
||||
final_populations.allocated,
|
||||
inert_terms.into_iter().map(|term| term.unwrap()).collect()].concat();
|
||||
println!("Final terms: {:?}", final_terms);
|
||||
|
||||
{ // TODO: Don't use this block to scope borrowing the schema; instead, extract a helper function.
|
||||
|
||||
|
@ -636,6 +674,7 @@ impl<'conn, 'a> Tx<'conn, 'a> {
|
|||
TypedValue::Instant(self.tx_instant),
|
||||
true));
|
||||
|
||||
println!("non_fts_one: {:?}", non_fts_one);
|
||||
if !non_fts_one.is_empty() {
|
||||
self.store.insert_non_fts_searches(&non_fts_one[..], db::SearchType::Inexact)?;
|
||||
}
|
||||
|
|
|
@ -144,6 +144,7 @@ impl Generation {
|
|||
///
|
||||
/// TODO: Considering doing this in place; the function already consumes `self`.
|
||||
pub fn evolve_one_step(self, temp_id_map: &TempIdMap) -> Generation {
|
||||
println!("EVOLVE ONE STEP: {:?}\n", temp_id_map);
|
||||
let mut next = Generation::default();
|
||||
|
||||
for UpsertE(t, a, v) in self.upserts_e {
|
||||
|
|
|
@ -104,7 +104,7 @@ pub enum EntidOrLookupRefOrTempId {
|
|||
TempId(TempId),
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Eq, Hash, Ord, PartialOrd, PartialEq)]
|
||||
#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialOrd, PartialEq)]
|
||||
pub enum OpType {
|
||||
Add,
|
||||
Retract,
|
||||
|
|
Loading…
Reference in a new issue