diff --git a/db/src/db.rs b/db/src/db.rs index 8b7fb887..cc2469fc 100644 --- a/db/src/db.rs +++ b/db/src/db.rs @@ -1320,6 +1320,30 @@ mod tests { [200 :db.schema/attribute 101]]"); } + #[test] + fn test_tx_assertions() { + let mut conn = TestConn::default(); + + // Test that txInstant can be asserted. + assert_transact!(conn, "[[:db/add :db/tx :db/txInstant #inst \"2017-06-16T00:56:41.257Z\"] + [:db/add 100 :db.schema/version 1] + [:db/add 101 :db.schema/version 2]]"); + assert_matches!(conn.last_transaction(), + "[[100 :db.schema/version 1 ?tx true] + [101 :db.schema/version 2 ?tx true] + [?tx :db/txInstant #inst \"2017-06-16T00:56:41.257Z\" ?tx true]]"); + + // Test other tx assertion. + assert_transact!(conn, "[[:db/add :db/tx :db.schema/version 7] + [:db/add 200 :db.schema/version 2] + [:db/add 201 :db.schema/version 3]]"); + assert_matches!(conn.last_transaction(), + "[[200 :db.schema/version 2 ?tx true] + [201 :db.schema/version 3 ?tx true] + [?tx :db/txInstant ?ms ?tx true] + [?tx :db.schema/version 7 ?tx true]]"); + } + #[test] fn test_retract() { let mut conn = TestConn::default(); diff --git a/db/src/tx.rs b/db/src/tx.rs index 190a9fab..afce74ee 100644 --- a/db/src/tx.rs +++ b/db/src/tx.rs @@ -204,16 +204,18 @@ impl<'conn, 'a> Tx<'conn, 'a> { partition_map: &'a PartitionMap, schema: &'a Schema, mentat_id_count: i64, + tx_id: Entid, temp_ids: intern_set::InternSet, lookup_refs: intern_set::InternSet, } impl<'a> InProcess<'a> { - fn with_schema_and_partition_map(schema: &'a Schema, partition_map: &'a PartitionMap) -> InProcess<'a> { + fn with_schema_and_partition_map(schema: &'a Schema, partition_map: &'a PartitionMap, tx_id: Entid) -> InProcess<'a> { InProcess { partition_map, schema, mentat_id_count: 0, + tx_id, temp_ids: intern_set::InternSet::new(), lookup_refs: intern_set::InternSet::new(), } @@ -268,6 +270,11 @@ impl<'conn, 'a> Tx<'conn, 'a> { Ok(Either::Left(e)) }, + // Special case: current tx ID. + entmod::EntidOrLookupRefOrTempId::TempId(TempId::Tx) => { + Ok(Either::Left(KnownEntid(self.tx_id))) + }, + entmod::EntidOrLookupRefOrTempId::TempId(e) => { Ok(Either::Right(LookupRefOrTempId::TempId(self.intern_temp_id(e)))) }, @@ -333,7 +340,7 @@ impl<'conn, 'a> Tx<'conn, 'a> { } } - let mut in_process = InProcess::with_schema_and_partition_map(&self.schema, &self.partition_map); + let mut in_process = InProcess::with_schema_and_partition_map(&self.schema, &self.partition_map, self.tx_id); // We want to handle entities in the order they're given to us, while also "exploding" some // entities into many. We therefore push the initial entities onto the back of the deque, @@ -608,6 +615,7 @@ impl<'conn, 'a> Tx<'conn, 'a> { // Pipeline stage 4: final terms (after rewriting) -> DB insertions. // Collect into non_fts_*. // TODO: use something like Clojure's group_by to do this. + let mut tx_instant_set = false; for term in final_terms { match term { Term::AddOrRetract(op, e, a, v) => { @@ -616,6 +624,17 @@ impl<'conn, 'a> Tx<'conn, 'a> { tx_might_update_metadata = true; } + if e == KnownEntid(self.tx_id) && a == entids::DB_TX_INSTANT { + tx_instant_set = true; + if let TypedValue::Instant(instant) = v { + self.tx_instant = instant; + } else { + panic!("This type error should have been caught earlier."); + } + // TODO: INSTANT to be strictly after the last transaction + // timestamp and strictly before the current transactor timestamp. + } + let added = op == OpType::Add; let reduced = (e.0, a, attribute, v, added); match (attribute.fulltext, attribute.multival) { @@ -628,13 +647,14 @@ impl<'conn, 'a> Tx<'conn, 'a> { } } - // Transact [:db/add :db/txInstant NOW :db/tx]. - // TODO: allow this to be present in the transaction data. - non_fts_one.push((self.tx_id, - entids::DB_TX_INSTANT, - self.schema.require_attribute_for_entid(entids::DB_TX_INSTANT).unwrap(), - TypedValue::Instant(self.tx_instant), - true)); + // Transact [:db/add :db/txInstant NOW :db/tx] if it doesn't exist. + if !tx_instant_set { + non_fts_one.push((self.tx_id, + entids::DB_TX_INSTANT, + self.schema.require_attribute_for_entid(entids::DB_TX_INSTANT).unwrap(), + TypedValue::Instant(self.tx_instant), + true)); + } if !non_fts_one.is_empty() { self.store.insert_non_fts_searches(&non_fts_one[..], db::SearchType::Inexact)?; diff --git a/tx-parser/src/lib.rs b/tx-parser/src/lib.rs index 875303e3..111b5e65 100644 --- a/tx-parser/src/lib.rs +++ b/tx-parser/src/lib.rs @@ -89,11 +89,18 @@ def_parser!(Tx, lookup_ref, LookupRef, { }); def_parser!(Tx, entid_or_lookup_ref_or_temp_id, EntidOrLookupRefOrTempId, { - Tx::entid().map(EntidOrLookupRefOrTempId::Entid) + Tx::db_tx().map(EntidOrLookupRefOrTempId::TempId) + .or(Tx::entid().map(EntidOrLookupRefOrTempId::Entid)) .or(Tx::lookup_ref().map(EntidOrLookupRefOrTempId::LookupRef)) .or(Tx::temp_id().map(EntidOrLookupRefOrTempId::TempId)) }); +def_matches_namespaced_keyword!(Tx, literal_db_tx, "db", "tx"); + +def_parser!(Tx, db_tx, TempId, { + Tx::literal_db_tx().map(|_| TempId::Tx) +}); + def_parser!(Tx, temp_id, TempId, { satisfy_map(|x: &'a edn::ValueAndSpan| x.as_text().cloned().map(TempId::External)) }); diff --git a/tx/src/entities.rs b/tx/src/entities.rs index 6f741ed0..2d49f9cd 100644 --- a/tx/src/entities.rs +++ b/tx/src/entities.rs @@ -23,20 +23,21 @@ use self::edn::symbols::NamespacedKeyword; pub enum TempId { External(String), Internal(i64), + Tx, // Special identifier used to refer to the current transaction. } impl TempId { pub fn into_external(self) -> Option { match self { TempId::External(s) => Some(s), - TempId::Internal(_) => None, + _ => None, } } pub fn into_internal(self) -> Option { match self { TempId::Internal(x) => Some(x), - TempId::External(_) => None, + _ => None, } } } @@ -46,6 +47,7 @@ impl fmt::Display for TempId { match self { &TempId::External(ref s) => write!(f, "{}", s), &TempId::Internal(x) => write!(f, "", x), + &TempId::Tx => write!(f, ""), } } }