diff --git a/core/src/lib.rs b/core/src/lib.rs index 4c1e8738..9d42e69c 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -233,6 +233,28 @@ impl TypedValue { } } +trait MicrosecondPrecision { + /// Truncate the provided `DateTime` to microsecond precision. + fn microsecond_precision(self) -> Self; +} + +impl MicrosecondPrecision for DateTime { + fn microsecond_precision(self) -> DateTime { + let nanoseconds = self.nanosecond(); + if nanoseconds % 1000 == 0 { + return self; + } + let microseconds = nanoseconds / 1000; + let truncated = microseconds * 1000; + self.with_nanosecond(truncated).expect("valid timestamp") + } +} + +/// Return the current time as a UTC `DateTime` instance with microsecond precision. +pub fn now() -> DateTime { + Utc::now().microsecond_precision() +} + // We don't do From or From 'cos it's ambiguous. impl From for TypedValue { @@ -245,9 +267,7 @@ impl From for TypedValue { /// `TypedValue::Instant`. impl From> for TypedValue { fn from(value: DateTime) -> TypedValue { - let microseconds = value.nanosecond() / 1000; - let truncated = microseconds * 1000; - TypedValue::Instant(value.with_nanosecond(truncated).expect("valid timestamp")) + TypedValue::Instant(value.microsecond_precision()) } } diff --git a/db/src/db.rs b/db/src/db.rs index c9cf8336..7df3138e 100644 --- a/db/src/db.rs +++ b/db/src/db.rs @@ -1327,6 +1327,42 @@ mod tests { [200 :db.schema/attribute 101]]"); } + #[test] + fn test_tx_assertions() { + let mut conn = TestConn::default(); + + // Test that txInstant can be asserted. + assert_transact!(conn, "[[:db/add :db/tx :db/txInstant #inst \"2017-06-16T00:56:41.257Z\"] + [:db/add 100 :db/ident :name/Ivan] + [:db/add 101 :db/ident :name/Petr]]"); + assert_matches!(conn.last_transaction(), + "[[100 :db/ident :name/Ivan ?tx true] + [101 :db/ident :name/Petr ?tx true] + [?tx :db/txInstant #inst \"2017-06-16T00:56:41.257Z\" ?tx true]]"); + + // Test multiple txInstant with different values should fail. + assert_transact!(conn, "[[:db/add :db/tx :db/txInstant #inst \"2017-06-16T00:59:11.257Z\"] + [:db/add :db/tx :db/txInstant #inst \"2017-06-16T00:59:11.752Z\"] + [:db/add 102 :db/ident :name/Vlad]]", + Err("conflicting datoms in tx")); + + // Test multiple txInstants with the same value. + assert_transact!(conn, "[[:db/add :db/tx :db/txInstant #inst \"2017-06-16T00:59:11.257Z\"] + [:db/add :db/tx :db/txInstant #inst \"2017-06-16T00:59:11.257Z\"] + [:db/add 103 :db/ident :name/Dimitri] + [:db/add 104 :db/ident :name/Anton]]"); + assert_matches!(conn.last_transaction(), + "[[103 :db/ident :name/Dimitri ?tx true] + [104 :db/ident :name/Anton ?tx true] + [?tx :db/txInstant #inst \"2017-06-16T00:59:11.257Z\" ?tx true]]"); + + // Test txInstant retraction + // Test disabled: retracting a datom that doesn't exist should fail. + // assert_transact!(conn, "[[:db/retract :db/tx :db/txInstant #inst \"2017-06-16T00:59:11.257Z\"] + // [:db/add 105 :db/ident :name/Vadim]]", + // Err("Should fail!")); + } + #[test] fn test_retract() { let mut conn = TestConn::default(); diff --git a/db/src/errors.rs b/db/src/errors.rs index 62a263d0..15b8614c 100644 --- a/db/src/errors.rs +++ b/db/src/errors.rs @@ -82,5 +82,10 @@ error_chain! { description("unrecognized or no ident found for entid") display("unrecognized or no ident found for entid: {}", entid) } + + ConflictingDatoms { + description("conflicting datoms in tx") + display("conflicting datoms in tx") + } } } diff --git a/db/src/lib.rs b/db/src/lib.rs index dc1f6b7f..2c9ca700 100644 --- a/db/src/lib.rs +++ b/db/src/lib.rs @@ -8,6 +8,9 @@ // CONDITIONS OF ANY KIND, either express or implied. See the License for the // specific language governing permissions and limitations under the License. +// Oh, error_chain. +#![recursion_limit="128"] + #[macro_use] extern crate error_chain; extern crate itertools; @@ -28,11 +31,6 @@ use std::iter::repeat; use itertools::Itertools; -use mentat_core::{ - DateTime, - Utc, -}; - pub use errors::{Error, ErrorKind, ResultExt, Result}; pub mod db; @@ -116,8 +114,3 @@ pub fn repeat_values(values_per_tuple: usize, tuples: usize) -> String { let values: String = repeat(inner).take(tuples).join(", "); values } - -/// Return the current time as a UTC `DateTime` instance. -pub fn now() -> DateTime { - Utc::now() -} diff --git a/db/src/tx.rs b/db/src/tx.rs index 48280411..8adac2a5 100644 --- a/db/src/tx.rs +++ b/db/src/tx.rs @@ -85,6 +85,7 @@ use mentat_core::{ Schema, Utc, attribute, + now, }; use mentat_core::intern_set::InternSet; @@ -141,7 +142,7 @@ pub struct Tx<'conn, 'a> { tx_id: Entid, /// The timestamp when the transaction began to be committed. - tx_instant: DateTime, + tx_instant: Option>, } impl<'conn, 'a> Tx<'conn, 'a> { @@ -150,15 +151,14 @@ impl<'conn, 'a> Tx<'conn, 'a> { partition_map: PartitionMap, schema_for_mutation: &'a Schema, schema: &'a Schema, - tx_id: Entid, - tx_instant: DateTime) -> Tx<'conn, 'a> { + tx_id: Entid) -> Tx<'conn, 'a> { Tx { store: store, partition_map: partition_map, schema_for_mutation: Cow::Borrowed(schema_for_mutation), schema: schema, tx_id: tx_id, - tx_instant: tx_instant, + tx_instant: None, } } @@ -206,16 +206,18 @@ impl<'conn, 'a> Tx<'conn, 'a> { partition_map: &'a PartitionMap, schema: &'a Schema, mentat_id_count: i64, + tx_id: KnownEntid, temp_ids: InternSet, lookup_refs: InternSet, } impl<'a> InProcess<'a> { - fn with_schema_and_partition_map(schema: &'a Schema, partition_map: &'a PartitionMap) -> InProcess<'a> { + fn with_schema_and_partition_map(schema: &'a Schema, partition_map: &'a PartitionMap, tx_id: KnownEntid) -> InProcess<'a> { InProcess { partition_map, schema, mentat_id_count: 0, + tx_id, temp_ids: InternSet::new(), lookup_refs: InternSet::new(), } @@ -269,6 +271,11 @@ impl<'conn, 'a> Tx<'conn, 'a> { Ok(Either::Left(e)) }, + // Special case: current tx ID. + entmod::EntidOrLookupRefOrTempId::TempId(TempId::Tx) => { + Ok(Either::Left(self.tx_id)) + }, + entmod::EntidOrLookupRefOrTempId::TempId(e) => { Ok(Either::Right(LookupRefOrTempId::TempId(self.intern_temp_id(e)))) }, @@ -334,7 +341,7 @@ impl<'conn, 'a> Tx<'conn, 'a> { } } - let mut in_process = InProcess::with_schema_and_partition_map(&self.schema, &self.partition_map); + let mut in_process = InProcess::with_schema_and_partition_map(&self.schema, &self.partition_map, KnownEntid(self.tx_id)); // We want to handle entities in the order they're given to us, while also "exploding" some // entities into many. We therefore push the initial entities onto the back of the deque, @@ -593,6 +600,8 @@ impl<'conn, 'a> Tx<'conn, 'a> { final_populations.allocated, inert_terms.into_iter().map(|term| term.unwrap()).collect()].concat(); + let tx_instant; + { // TODO: Don't use this block to scope borrowing the schema; instead, extract a helper function. // Assertions that are :db.cardinality/one and not :db.fulltext. @@ -615,14 +624,37 @@ impl<'conn, 'a> Tx<'conn, 'a> { // TODO: use something like Clojure's group_by to do this. for term in final_terms { match term { - Term::AddOrRetract(op, e, a, v) => { + Term::AddOrRetract(op, KnownEntid(e), a, v) => { let attribute: &Attribute = self.schema.require_attribute_for_entid(a)?; if entids::might_update_metadata(a) { tx_might_update_metadata = true; } let added = op == OpType::Add; - let reduced = (e.0, a, attribute, v, added); + + // We take the last encountered :db/txInstant value. + // If more than one is provided, the transactor will fail. + if added && + e == self.tx_id && + a == entids::DB_TX_INSTANT { + if let TypedValue::Instant(instant) = v { + if let Some(ts) = self.tx_instant { + if ts == instant { + // Dupes are fine. + } else { + bail!(ErrorKind::ConflictingDatoms); + } + } else { + self.tx_instant = Some(instant); + } + continue; + } else { + // The type error has been caught earlier. + unreachable!() + } + } + + let reduced = (e, a, attribute, v, added); match (attribute.fulltext, attribute.multival) { (false, true) => non_fts_many.push(reduced), (false, false) => non_fts_one.push(reduced), @@ -633,12 +665,13 @@ impl<'conn, 'a> Tx<'conn, 'a> { } } - // Transact [:db/add :db/txInstant NOW :db/tx]. - // TODO: allow this to be present in the transaction data. + tx_instant = self.tx_instant.unwrap_or_else(now); + + // Transact [:db/add :db/txInstant tx_instant :db/tx]. non_fts_one.push((self.tx_id, entids::DB_TX_INSTANT, self.schema.require_attribute_for_entid(entids::DB_TX_INSTANT).unwrap(), - TypedValue::Instant(self.tx_instant), + tx_instant.into(), true)); if !non_fts_one.is_empty() { @@ -683,7 +716,7 @@ impl<'conn, 'a> Tx<'conn, 'a> { Ok(TxReport { tx_id: self.tx_id, - tx_instant: self.tx_instant, + tx_instant, tempids: tempids, }) } @@ -694,12 +727,11 @@ fn start_tx<'conn, 'a>(conn: &'conn rusqlite::Connection, mut partition_map: PartitionMap, schema_for_mutation: &'a Schema, schema: &'a Schema) -> Result> { - let tx_instant = ::now(); // Label the transaction with the timestamp when we first see it: leading edge. let tx_id = partition_map.allocate_entid(":db.part/tx"); conn.begin_tx_application()?; - Ok(Tx::new(conn, partition_map, schema_for_mutation, schema, tx_id, tx_instant)) + Ok(Tx::new(conn, partition_map, schema_for_mutation, schema, tx_id)) } fn conclude_tx(tx: Tx, report: TxReport) -> Result<(TxReport, PartitionMap, Option)> { diff --git a/tx-parser/src/lib.rs b/tx-parser/src/lib.rs index 875303e3..111b5e65 100644 --- a/tx-parser/src/lib.rs +++ b/tx-parser/src/lib.rs @@ -89,11 +89,18 @@ def_parser!(Tx, lookup_ref, LookupRef, { }); def_parser!(Tx, entid_or_lookup_ref_or_temp_id, EntidOrLookupRefOrTempId, { - Tx::entid().map(EntidOrLookupRefOrTempId::Entid) + Tx::db_tx().map(EntidOrLookupRefOrTempId::TempId) + .or(Tx::entid().map(EntidOrLookupRefOrTempId::Entid)) .or(Tx::lookup_ref().map(EntidOrLookupRefOrTempId::LookupRef)) .or(Tx::temp_id().map(EntidOrLookupRefOrTempId::TempId)) }); +def_matches_namespaced_keyword!(Tx, literal_db_tx, "db", "tx"); + +def_parser!(Tx, db_tx, TempId, { + Tx::literal_db_tx().map(|_| TempId::Tx) +}); + def_parser!(Tx, temp_id, TempId, { satisfy_map(|x: &'a edn::ValueAndSpan| x.as_text().cloned().map(TempId::External)) }); diff --git a/tx/src/entities.rs b/tx/src/entities.rs index dddf5a5a..0da757e9 100644 --- a/tx/src/entities.rs +++ b/tx/src/entities.rs @@ -23,12 +23,14 @@ use self::edn::symbols::NamespacedKeyword; pub enum TempId { External(String), Internal(i64), + Tx, // Special identifier used to refer to the current transaction. } impl TempId { pub fn into_external(self) -> Option { match self { TempId::External(s) => Some(s), + TempId::Tx | TempId::Internal(_) => None, } } @@ -36,6 +38,7 @@ impl TempId { pub fn into_internal(self) -> Option { match self { TempId::Internal(x) => Some(x), + TempId::Tx | TempId::External(_) => None, } } @@ -46,6 +49,7 @@ impl fmt::Display for TempId { match self { &TempId::External(ref s) => write!(f, "{}", s), &TempId::Internal(x) => write!(f, "", x), + &TempId::Tx => write!(f, ""), } } }