From 55d196094aad94c79627221d88ae11d72b73fabd Mon Sep 17 00:00:00 2001 From: Edouard Oger Date: Mon, 22 Jan 2018 13:17:12 -0500 Subject: [PATCH] Allow customers to assert facts about the current transaction. Fixes #225 --- db/src/db.rs | 37 +++++++++++++++++++++++++++ db/src/tx.rs | 59 ++++++++++++++++++++++++++++++-------------- tx-parser/src/lib.rs | 9 ++++++- tx/src/entities.rs | 4 +++ 4 files changed, 90 insertions(+), 19 deletions(-) diff --git a/db/src/db.rs b/db/src/db.rs index c9cf8336..33289878 100644 --- a/db/src/db.rs +++ b/db/src/db.rs @@ -1327,6 +1327,43 @@ mod tests { [200 :db.schema/attribute 101]]"); } + #[test] + fn test_tx_assertions() { + let mut conn = TestConn::default(); + + // Test that txInstant can be asserted. + assert_transact!(conn, "[[:db/add :db/tx :db/txInstant #inst \"2017-06-16T00:56:41.257Z\"] + [:db/add 100 :db/ident :name/Ivan] + [:db/add 101 :db/ident :name/Petr]]"); + assert_matches!(conn.last_transaction(), + "[[100 :db/ident :name/Ivan ?tx true] + [101 :db/ident :name/Petr ?tx true] + [?tx :db/txInstant #inst \"2017-06-16T00:56:41.257Z\" ?tx true]]"); + + // Test multiple txInstant with different values should fail. + assert_transact!(conn, "[[:db/add :db/tx :db/txInstant #inst \"2017-06-16T00:59:11.257Z\"] + [:db/add :db/tx :db/txInstant #inst \"2017-06-16T00:59:11.752Z\"] + [:db/add 102 :db/ident :name/Vlad]]", + Err("Could not insert non-fts one statements into temporary search table!")); + + // Test multiple txInstants with the same value. + // Test disabled: depends on #535. + // assert_transact!(conn, "[[:db/add :db/tx :db/txInstant #inst \"2017-06-16T00:59:11.257Z\"] + // [:db/add :db/tx :db/txInstant #inst \"2017-06-16T00:59:11.257Z\"] + // [:db/add 103 :db/ident :name/Dimitri] + // [:db/add 104 :db/ident :name/Anton]]"); + // assert_matches!(conn.last_transaction(), + // "[[103 :db/ident :name/Dimitri ?tx true] + // [104 :db/ident :name/Anton ?tx true] + // [?tx :db/txInstant #inst \"2017-06-16T00:59:11.257Z\" ?tx true]]"); + + // Test txInstant retraction + // Test disabled: retracting a datom that doesn't exist should fail. + // assert_transact!(conn, "[[:db/retract :db/tx :db/txInstant #inst \"2017-06-16T00:59:11.257Z\"] + // [:db/add 105 :db/ident :name/Vadim]]", + // Err("Should fail!")); + } + #[test] fn test_retract() { let mut conn = TestConn::default(); diff --git a/db/src/tx.rs b/db/src/tx.rs index 48280411..03b2f0f3 100644 --- a/db/src/tx.rs +++ b/db/src/tx.rs @@ -141,7 +141,7 @@ pub struct Tx<'conn, 'a> { tx_id: Entid, /// The timestamp when the transaction began to be committed. - tx_instant: DateTime, + tx_instant: Option>, } impl<'conn, 'a> Tx<'conn, 'a> { @@ -150,15 +150,14 @@ impl<'conn, 'a> Tx<'conn, 'a> { partition_map: PartitionMap, schema_for_mutation: &'a Schema, schema: &'a Schema, - tx_id: Entid, - tx_instant: DateTime) -> Tx<'conn, 'a> { + tx_id: Entid) -> Tx<'conn, 'a> { Tx { store: store, partition_map: partition_map, schema_for_mutation: Cow::Borrowed(schema_for_mutation), schema: schema, tx_id: tx_id, - tx_instant: tx_instant, + tx_instant: None, } } @@ -206,16 +205,18 @@ impl<'conn, 'a> Tx<'conn, 'a> { partition_map: &'a PartitionMap, schema: &'a Schema, mentat_id_count: i64, + tx_id: KnownEntid, temp_ids: InternSet, lookup_refs: InternSet, } impl<'a> InProcess<'a> { - fn with_schema_and_partition_map(schema: &'a Schema, partition_map: &'a PartitionMap) -> InProcess<'a> { + fn with_schema_and_partition_map(schema: &'a Schema, partition_map: &'a PartitionMap, tx_id: KnownEntid) -> InProcess<'a> { InProcess { partition_map, schema, mentat_id_count: 0, + tx_id, temp_ids: InternSet::new(), lookup_refs: InternSet::new(), } @@ -269,6 +270,11 @@ impl<'conn, 'a> Tx<'conn, 'a> { Ok(Either::Left(e)) }, + // Special case: current tx ID. + entmod::EntidOrLookupRefOrTempId::TempId(TempId::Tx) => { + Ok(Either::Left(self.tx_id)) + }, + entmod::EntidOrLookupRefOrTempId::TempId(e) => { Ok(Either::Right(LookupRefOrTempId::TempId(self.intern_temp_id(e)))) }, @@ -334,7 +340,7 @@ impl<'conn, 'a> Tx<'conn, 'a> { } } - let mut in_process = InProcess::with_schema_and_partition_map(&self.schema, &self.partition_map); + let mut in_process = InProcess::with_schema_and_partition_map(&self.schema, &self.partition_map, KnownEntid(self.tx_id)); // We want to handle entities in the order they're given to us, while also "exploding" some // entities into many. We therefore push the initial entities onto the back of the deque, @@ -593,6 +599,8 @@ impl<'conn, 'a> Tx<'conn, 'a> { final_populations.allocated, inert_terms.into_iter().map(|term| term.unwrap()).collect()].concat(); + let tx_instant; + { // TODO: Don't use this block to scope borrowing the schema; instead, extract a helper function. // Assertions that are :db.cardinality/one and not :db.fulltext. @@ -615,14 +623,27 @@ impl<'conn, 'a> Tx<'conn, 'a> { // TODO: use something like Clojure's group_by to do this. for term in final_terms { match term { - Term::AddOrRetract(op, e, a, v) => { + Term::AddOrRetract(op, KnownEntid(e), a, v) => { let attribute: &Attribute = self.schema.require_attribute_for_entid(a)?; if entids::might_update_metadata(a) { tx_might_update_metadata = true; } let added = op == OpType::Add; - let reduced = (e.0, a, attribute, v, added); + + // We take the last encountered :db/txInstant value. + if added && + e == self.tx_id && + a == entids::DB_TX_INSTANT { + if let TypedValue::Instant(instant) = v { + self.tx_instant = Some(instant); + } else { + // The type error has been caught earlier. + unreachable!() + } + } + + let reduced = (e, a, attribute, v, added); match (attribute.fulltext, attribute.multival) { (false, true) => non_fts_many.push(reduced), (false, false) => non_fts_one.push(reduced), @@ -633,13 +654,16 @@ impl<'conn, 'a> Tx<'conn, 'a> { } } - // Transact [:db/add :db/txInstant NOW :db/tx]. - // TODO: allow this to be present in the transaction data. - non_fts_one.push((self.tx_id, - entids::DB_TX_INSTANT, - self.schema.require_attribute_for_entid(entids::DB_TX_INSTANT).unwrap(), - TypedValue::Instant(self.tx_instant), - true)); + tx_instant = self.tx_instant.unwrap_or(::now()); + + // Transact [:db/add :db/txInstant NOW :db/tx] if it doesn't exist. + if self.tx_instant == None { + non_fts_one.push((self.tx_id, + entids::DB_TX_INSTANT, + self.schema.require_attribute_for_entid(entids::DB_TX_INSTANT).unwrap(), + TypedValue::Instant(tx_instant), + true)); + } if !non_fts_one.is_empty() { self.store.insert_non_fts_searches(&non_fts_one[..], db::SearchType::Inexact)?; @@ -683,7 +707,7 @@ impl<'conn, 'a> Tx<'conn, 'a> { Ok(TxReport { tx_id: self.tx_id, - tx_instant: self.tx_instant, + tx_instant, tempids: tempids, }) } @@ -694,12 +718,11 @@ fn start_tx<'conn, 'a>(conn: &'conn rusqlite::Connection, mut partition_map: PartitionMap, schema_for_mutation: &'a Schema, schema: &'a Schema) -> Result> { - let tx_instant = ::now(); // Label the transaction with the timestamp when we first see it: leading edge. let tx_id = partition_map.allocate_entid(":db.part/tx"); conn.begin_tx_application()?; - Ok(Tx::new(conn, partition_map, schema_for_mutation, schema, tx_id, tx_instant)) + Ok(Tx::new(conn, partition_map, schema_for_mutation, schema, tx_id)) } fn conclude_tx(tx: Tx, report: TxReport) -> Result<(TxReport, PartitionMap, Option)> { diff --git a/tx-parser/src/lib.rs b/tx-parser/src/lib.rs index 875303e3..111b5e65 100644 --- a/tx-parser/src/lib.rs +++ b/tx-parser/src/lib.rs @@ -89,11 +89,18 @@ def_parser!(Tx, lookup_ref, LookupRef, { }); def_parser!(Tx, entid_or_lookup_ref_or_temp_id, EntidOrLookupRefOrTempId, { - Tx::entid().map(EntidOrLookupRefOrTempId::Entid) + Tx::db_tx().map(EntidOrLookupRefOrTempId::TempId) + .or(Tx::entid().map(EntidOrLookupRefOrTempId::Entid)) .or(Tx::lookup_ref().map(EntidOrLookupRefOrTempId::LookupRef)) .or(Tx::temp_id().map(EntidOrLookupRefOrTempId::TempId)) }); +def_matches_namespaced_keyword!(Tx, literal_db_tx, "db", "tx"); + +def_parser!(Tx, db_tx, TempId, { + Tx::literal_db_tx().map(|_| TempId::Tx) +}); + def_parser!(Tx, temp_id, TempId, { satisfy_map(|x: &'a edn::ValueAndSpan| x.as_text().cloned().map(TempId::External)) }); diff --git a/tx/src/entities.rs b/tx/src/entities.rs index dddf5a5a..0da757e9 100644 --- a/tx/src/entities.rs +++ b/tx/src/entities.rs @@ -23,12 +23,14 @@ use self::edn::symbols::NamespacedKeyword; pub enum TempId { External(String), Internal(i64), + Tx, // Special identifier used to refer to the current transaction. } impl TempId { pub fn into_external(self) -> Option { match self { TempId::External(s) => Some(s), + TempId::Tx | TempId::Internal(_) => None, } } @@ -36,6 +38,7 @@ impl TempId { pub fn into_internal(self) -> Option { match self { TempId::Internal(x) => Some(x), + TempId::Tx | TempId::External(_) => None, } } @@ -46,6 +49,7 @@ impl fmt::Display for TempId { match self { &TempId::External(ref s) => write!(f, "{}", s), &TempId::Internal(x) => write!(f, "", x), + &TempId::Tx => write!(f, ""), } } }