diff --git a/db/src/db.rs b/db/src/db.rs index f43ef6a8..170e4e3b 100644 --- a/db/src/db.rs +++ b/db/src/db.rs @@ -2310,7 +2310,6 @@ mod tests { } // TODO - /* #[test] fn test_cardinality_one_violation_new_entity() { let mut conn = TestConn::default(); @@ -2344,5 +2343,4 @@ mod tests { [998 :test/one 124 ?tx true] [?tx :db/txInstant ?ms ?tx true]]"#); } - */ } diff --git a/db/src/tx.rs b/db/src/tx.rs index 190a9fab..be880e0d 100644 --- a/db/src/tx.rs +++ b/db/src/tx.rs @@ -164,6 +164,7 @@ impl<'conn, 'a> Tx<'conn, 'a> { /// which [a v] pairs do upsert to entids, and map each tempid that upserts to the upserted /// entid. The keys of the resulting map are exactly those tempids that upserted. pub fn resolve_temp_id_avs<'b>(&self, temp_id_avs: &'b [(TempIdHandle, AVPair)]) -> Result { + println!("resolve_temp_id_avs: {:?}", temp_id_avs); if temp_id_avs.is_empty() { return Ok(TempIdMap::default()); } @@ -191,6 +192,7 @@ impl<'conn, 'a> Tx<'conn, 'a> { } } + println!("=> {:?}", temp_id_map); Ok((temp_id_map)) } @@ -485,6 +487,7 @@ impl<'conn, 'a> Tx<'conn, 'a> { }, } }; + println!("TERMS: {:?}", terms); Ok((terms, in_process.temp_ids, in_process.lookup_refs)) } @@ -493,15 +496,46 @@ impl<'conn, 'a> Tx<'conn, 'a> { /// /// The `Term` instances produced share interned TempId handles and have no LookupRef references. fn resolve_lookup_refs(&self, lookup_ref_map: &AVMap, terms: I) -> Result> where I: IntoIterator { - terms.into_iter().map(|term: TermWithTempIdsAndLookupRefs| -> Result { + println!("\nLookup ref map: {:?}", lookup_ref_map); + + let mut equivalences: ::std::collections::HashMap<(Entid, TypedValue), BTreeSet> = Default::default(); + + let res = terms.into_iter().map(|term: TermWithTempIdsAndLookupRefs| -> Result { + println!("\nResolving lookup refs: {:?}", term); match term { Term::AddOrRetract(op, e, a, v) => { let e = replace_lookup_ref(&lookup_ref_map, e, |x| KnownEntid(x))?; let v = replace_lookup_ref(&lookup_ref_map, v, |x| TypedValue::Ref(x))?; + + // It's possible here that we'll encounter two lookup refs that are the same, + // but have different tempids. We want these to collide to avoid problems in + // the storage layer. + if let (OpType::Add, &Either::Right(ref tempid), &Either::Left(ref vv)) = (op, &e, &v) { + let pair: AVPair = (a, vv.clone()); + + println!("TempID {:?} is identified by {:?}", tempid, pair); + + equivalences.entry(pair) + .or_insert(Default::default()) + .insert(tempid.clone()); + } Ok(Term::AddOrRetract(op, e, a, v)) }, } - }).collect::>>() + }).collect::>>(); + + // Now invert the equivalences index: we wish to go from the entire set of tempids to + // a subset of canonical tempids. + let mut tempids: ::std::collections::HashMap = Default::default(); + for (_, set) in equivalences.into_iter().filter(|&(_, ref v)| v.len() > 1) { + let mut all = set.into_iter(); + let winner = all.next().unwrap(); // Length check above. + for t in all { + tempids.insert(t, winner.clone()); + } + } + println!("Found equivalences: {:?}", &tempids); + res } /// Transact the given `entities` against the store. @@ -521,6 +555,7 @@ impl<'conn, 'a> Tx<'conn, 'a> { let terms_with_temp_ids = self.resolve_lookup_refs(&lookup_ref_map, terms_with_temp_ids_and_lookup_refs)?; +println!("Terms with temp IDs: {:?}", terms_with_temp_ids); // Pipeline stage 3: upsert tempids -> terms without tempids or lookup refs. // Now we can collect upsert populations. let (mut generation, inert_terms) = Generation::from(terms_with_temp_ids, &self.schema)?; @@ -551,6 +586,8 @@ impl<'conn, 'a> Tx<'conn, 'a> { // Allocate entids for tempids that didn't upsert. BTreeSet rather than HashSet so this is deterministic. let unresolved_temp_ids: BTreeSet = generation.temp_ids_in_allocations(); +println!("\nUNRESOLVED: \n>> {:?}\n>> {:?}\n", unresolved_temp_ids, &generation.temp_id_avs()[..]); + // TODO: track partitions for temporary IDs. let entids = self.partition_map.allocate_entids(":db.part/user", unresolved_temp_ids.len()); @@ -587,6 +624,7 @@ impl<'conn, 'a> Tx<'conn, 'a> { let final_terms: Vec = [final_populations.resolved, final_populations.allocated, inert_terms.into_iter().map(|term| term.unwrap()).collect()].concat(); +println!("Final terms: {:?}", final_terms); { // TODO: Don't use this block to scope borrowing the schema; instead, extract a helper function. @@ -636,6 +674,7 @@ impl<'conn, 'a> Tx<'conn, 'a> { TypedValue::Instant(self.tx_instant), true)); +println!("non_fts_one: {:?}", non_fts_one); if !non_fts_one.is_empty() { self.store.insert_non_fts_searches(&non_fts_one[..], db::SearchType::Inexact)?; } diff --git a/db/src/upsert_resolution.rs b/db/src/upsert_resolution.rs index 3c74562f..aae27540 100644 --- a/db/src/upsert_resolution.rs +++ b/db/src/upsert_resolution.rs @@ -144,6 +144,7 @@ impl Generation { /// /// TODO: Considering doing this in place; the function already consumes `self`. pub fn evolve_one_step(self, temp_id_map: &TempIdMap) -> Generation { + println!("EVOLVE ONE STEP: {:?}\n", temp_id_map); let mut next = Generation::default(); for UpsertE(t, a, v) in self.upserts_e {