Allow customers to assert facts about the current transaction. (#225) r=rnewman
Also move `now` into core, implement microsecond truncation. This is so we don't return a more granular -- and thus subtly different -- timestamp in a `TxReport` than we put into the store.
This commit is contained in:
parent
98502eb68f
commit
3bf7459315
7 changed files with 125 additions and 28 deletions
|
@ -233,6 +233,28 @@ impl TypedValue {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
trait MicrosecondPrecision {
|
||||||
|
/// Truncate the provided `DateTime` to microsecond precision.
|
||||||
|
fn microsecond_precision(self) -> Self;
|
||||||
|
}
|
||||||
|
|
||||||
|
impl MicrosecondPrecision for DateTime<Utc> {
|
||||||
|
fn microsecond_precision(self) -> DateTime<Utc> {
|
||||||
|
let nanoseconds = self.nanosecond();
|
||||||
|
if nanoseconds % 1000 == 0 {
|
||||||
|
return self;
|
||||||
|
}
|
||||||
|
let microseconds = nanoseconds / 1000;
|
||||||
|
let truncated = microseconds * 1000;
|
||||||
|
self.with_nanosecond(truncated).expect("valid timestamp")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Return the current time as a UTC `DateTime` instance with microsecond precision.
|
||||||
|
pub fn now() -> DateTime<Utc> {
|
||||||
|
Utc::now().microsecond_precision()
|
||||||
|
}
|
||||||
|
|
||||||
// We don't do From<i64> or From<Entid> 'cos it's ambiguous.
|
// We don't do From<i64> or From<Entid> 'cos it's ambiguous.
|
||||||
|
|
||||||
impl From<bool> for TypedValue {
|
impl From<bool> for TypedValue {
|
||||||
|
@ -245,9 +267,7 @@ impl From<bool> for TypedValue {
|
||||||
/// `TypedValue::Instant`.
|
/// `TypedValue::Instant`.
|
||||||
impl From<DateTime<Utc>> for TypedValue {
|
impl From<DateTime<Utc>> for TypedValue {
|
||||||
fn from(value: DateTime<Utc>) -> TypedValue {
|
fn from(value: DateTime<Utc>) -> TypedValue {
|
||||||
let microseconds = value.nanosecond() / 1000;
|
TypedValue::Instant(value.microsecond_precision())
|
||||||
let truncated = microseconds * 1000;
|
|
||||||
TypedValue::Instant(value.with_nanosecond(truncated).expect("valid timestamp"))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
36
db/src/db.rs
36
db/src/db.rs
|
@ -1327,6 +1327,42 @@ mod tests {
|
||||||
[200 :db.schema/attribute 101]]");
|
[200 :db.schema/attribute 101]]");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_tx_assertions() {
|
||||||
|
let mut conn = TestConn::default();
|
||||||
|
|
||||||
|
// Test that txInstant can be asserted.
|
||||||
|
assert_transact!(conn, "[[:db/add :db/tx :db/txInstant #inst \"2017-06-16T00:56:41.257Z\"]
|
||||||
|
[:db/add 100 :db/ident :name/Ivan]
|
||||||
|
[:db/add 101 :db/ident :name/Petr]]");
|
||||||
|
assert_matches!(conn.last_transaction(),
|
||||||
|
"[[100 :db/ident :name/Ivan ?tx true]
|
||||||
|
[101 :db/ident :name/Petr ?tx true]
|
||||||
|
[?tx :db/txInstant #inst \"2017-06-16T00:56:41.257Z\" ?tx true]]");
|
||||||
|
|
||||||
|
// Test multiple txInstant with different values should fail.
|
||||||
|
assert_transact!(conn, "[[:db/add :db/tx :db/txInstant #inst \"2017-06-16T00:59:11.257Z\"]
|
||||||
|
[:db/add :db/tx :db/txInstant #inst \"2017-06-16T00:59:11.752Z\"]
|
||||||
|
[:db/add 102 :db/ident :name/Vlad]]",
|
||||||
|
Err("conflicting datoms in tx"));
|
||||||
|
|
||||||
|
// Test multiple txInstants with the same value.
|
||||||
|
assert_transact!(conn, "[[:db/add :db/tx :db/txInstant #inst \"2017-06-16T00:59:11.257Z\"]
|
||||||
|
[:db/add :db/tx :db/txInstant #inst \"2017-06-16T00:59:11.257Z\"]
|
||||||
|
[:db/add 103 :db/ident :name/Dimitri]
|
||||||
|
[:db/add 104 :db/ident :name/Anton]]");
|
||||||
|
assert_matches!(conn.last_transaction(),
|
||||||
|
"[[103 :db/ident :name/Dimitri ?tx true]
|
||||||
|
[104 :db/ident :name/Anton ?tx true]
|
||||||
|
[?tx :db/txInstant #inst \"2017-06-16T00:59:11.257Z\" ?tx true]]");
|
||||||
|
|
||||||
|
// Test txInstant retraction
|
||||||
|
// Test disabled: retracting a datom that doesn't exist should fail.
|
||||||
|
// assert_transact!(conn, "[[:db/retract :db/tx :db/txInstant #inst \"2017-06-16T00:59:11.257Z\"]
|
||||||
|
// [:db/add 105 :db/ident :name/Vadim]]",
|
||||||
|
// Err("Should fail!"));
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_retract() {
|
fn test_retract() {
|
||||||
let mut conn = TestConn::default();
|
let mut conn = TestConn::default();
|
||||||
|
|
|
@ -82,5 +82,10 @@ error_chain! {
|
||||||
description("unrecognized or no ident found for entid")
|
description("unrecognized or no ident found for entid")
|
||||||
display("unrecognized or no ident found for entid: {}", entid)
|
display("unrecognized or no ident found for entid: {}", entid)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ConflictingDatoms {
|
||||||
|
description("conflicting datoms in tx")
|
||||||
|
display("conflicting datoms in tx")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,6 +8,9 @@
|
||||||
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
|
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
|
||||||
// specific language governing permissions and limitations under the License.
|
// specific language governing permissions and limitations under the License.
|
||||||
|
|
||||||
|
// Oh, error_chain.
|
||||||
|
#![recursion_limit="128"]
|
||||||
|
|
||||||
#[macro_use]
|
#[macro_use]
|
||||||
extern crate error_chain;
|
extern crate error_chain;
|
||||||
extern crate itertools;
|
extern crate itertools;
|
||||||
|
@ -28,11 +31,6 @@ use std::iter::repeat;
|
||||||
|
|
||||||
use itertools::Itertools;
|
use itertools::Itertools;
|
||||||
|
|
||||||
use mentat_core::{
|
|
||||||
DateTime,
|
|
||||||
Utc,
|
|
||||||
};
|
|
||||||
|
|
||||||
pub use errors::{Error, ErrorKind, ResultExt, Result};
|
pub use errors::{Error, ErrorKind, ResultExt, Result};
|
||||||
|
|
||||||
pub mod db;
|
pub mod db;
|
||||||
|
@ -116,8 +114,3 @@ pub fn repeat_values(values_per_tuple: usize, tuples: usize) -> String {
|
||||||
let values: String = repeat(inner).take(tuples).join(", ");
|
let values: String = repeat(inner).take(tuples).join(", ");
|
||||||
values
|
values
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Return the current time as a UTC `DateTime` instance.
|
|
||||||
pub fn now() -> DateTime<Utc> {
|
|
||||||
Utc::now()
|
|
||||||
}
|
|
||||||
|
|
60
db/src/tx.rs
60
db/src/tx.rs
|
@ -85,6 +85,7 @@ use mentat_core::{
|
||||||
Schema,
|
Schema,
|
||||||
Utc,
|
Utc,
|
||||||
attribute,
|
attribute,
|
||||||
|
now,
|
||||||
};
|
};
|
||||||
|
|
||||||
use mentat_core::intern_set::InternSet;
|
use mentat_core::intern_set::InternSet;
|
||||||
|
@ -141,7 +142,7 @@ pub struct Tx<'conn, 'a> {
|
||||||
tx_id: Entid,
|
tx_id: Entid,
|
||||||
|
|
||||||
/// The timestamp when the transaction began to be committed.
|
/// The timestamp when the transaction began to be committed.
|
||||||
tx_instant: DateTime<Utc>,
|
tx_instant: Option<DateTime<Utc>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'conn, 'a> Tx<'conn, 'a> {
|
impl<'conn, 'a> Tx<'conn, 'a> {
|
||||||
|
@ -150,15 +151,14 @@ impl<'conn, 'a> Tx<'conn, 'a> {
|
||||||
partition_map: PartitionMap,
|
partition_map: PartitionMap,
|
||||||
schema_for_mutation: &'a Schema,
|
schema_for_mutation: &'a Schema,
|
||||||
schema: &'a Schema,
|
schema: &'a Schema,
|
||||||
tx_id: Entid,
|
tx_id: Entid) -> Tx<'conn, 'a> {
|
||||||
tx_instant: DateTime<Utc>) -> Tx<'conn, 'a> {
|
|
||||||
Tx {
|
Tx {
|
||||||
store: store,
|
store: store,
|
||||||
partition_map: partition_map,
|
partition_map: partition_map,
|
||||||
schema_for_mutation: Cow::Borrowed(schema_for_mutation),
|
schema_for_mutation: Cow::Borrowed(schema_for_mutation),
|
||||||
schema: schema,
|
schema: schema,
|
||||||
tx_id: tx_id,
|
tx_id: tx_id,
|
||||||
tx_instant: tx_instant,
|
tx_instant: None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -206,16 +206,18 @@ impl<'conn, 'a> Tx<'conn, 'a> {
|
||||||
partition_map: &'a PartitionMap,
|
partition_map: &'a PartitionMap,
|
||||||
schema: &'a Schema,
|
schema: &'a Schema,
|
||||||
mentat_id_count: i64,
|
mentat_id_count: i64,
|
||||||
|
tx_id: KnownEntid,
|
||||||
temp_ids: InternSet<TempId>,
|
temp_ids: InternSet<TempId>,
|
||||||
lookup_refs: InternSet<AVPair>,
|
lookup_refs: InternSet<AVPair>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a> InProcess<'a> {
|
impl<'a> InProcess<'a> {
|
||||||
fn with_schema_and_partition_map(schema: &'a Schema, partition_map: &'a PartitionMap) -> InProcess<'a> {
|
fn with_schema_and_partition_map(schema: &'a Schema, partition_map: &'a PartitionMap, tx_id: KnownEntid) -> InProcess<'a> {
|
||||||
InProcess {
|
InProcess {
|
||||||
partition_map,
|
partition_map,
|
||||||
schema,
|
schema,
|
||||||
mentat_id_count: 0,
|
mentat_id_count: 0,
|
||||||
|
tx_id,
|
||||||
temp_ids: InternSet::new(),
|
temp_ids: InternSet::new(),
|
||||||
lookup_refs: InternSet::new(),
|
lookup_refs: InternSet::new(),
|
||||||
}
|
}
|
||||||
|
@ -269,6 +271,11 @@ impl<'conn, 'a> Tx<'conn, 'a> {
|
||||||
Ok(Either::Left(e))
|
Ok(Either::Left(e))
|
||||||
},
|
},
|
||||||
|
|
||||||
|
// Special case: current tx ID.
|
||||||
|
entmod::EntidOrLookupRefOrTempId::TempId(TempId::Tx) => {
|
||||||
|
Ok(Either::Left(self.tx_id))
|
||||||
|
},
|
||||||
|
|
||||||
entmod::EntidOrLookupRefOrTempId::TempId(e) => {
|
entmod::EntidOrLookupRefOrTempId::TempId(e) => {
|
||||||
Ok(Either::Right(LookupRefOrTempId::TempId(self.intern_temp_id(e))))
|
Ok(Either::Right(LookupRefOrTempId::TempId(self.intern_temp_id(e))))
|
||||||
},
|
},
|
||||||
|
@ -334,7 +341,7 @@ impl<'conn, 'a> Tx<'conn, 'a> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut in_process = InProcess::with_schema_and_partition_map(&self.schema, &self.partition_map);
|
let mut in_process = InProcess::with_schema_and_partition_map(&self.schema, &self.partition_map, KnownEntid(self.tx_id));
|
||||||
|
|
||||||
// We want to handle entities in the order they're given to us, while also "exploding" some
|
// We want to handle entities in the order they're given to us, while also "exploding" some
|
||||||
// entities into many. We therefore push the initial entities onto the back of the deque,
|
// entities into many. We therefore push the initial entities onto the back of the deque,
|
||||||
|
@ -593,6 +600,8 @@ impl<'conn, 'a> Tx<'conn, 'a> {
|
||||||
final_populations.allocated,
|
final_populations.allocated,
|
||||||
inert_terms.into_iter().map(|term| term.unwrap()).collect()].concat();
|
inert_terms.into_iter().map(|term| term.unwrap()).collect()].concat();
|
||||||
|
|
||||||
|
let tx_instant;
|
||||||
|
|
||||||
{ // TODO: Don't use this block to scope borrowing the schema; instead, extract a helper function.
|
{ // TODO: Don't use this block to scope borrowing the schema; instead, extract a helper function.
|
||||||
|
|
||||||
// Assertions that are :db.cardinality/one and not :db.fulltext.
|
// Assertions that are :db.cardinality/one and not :db.fulltext.
|
||||||
|
@ -615,14 +624,37 @@ impl<'conn, 'a> Tx<'conn, 'a> {
|
||||||
// TODO: use something like Clojure's group_by to do this.
|
// TODO: use something like Clojure's group_by to do this.
|
||||||
for term in final_terms {
|
for term in final_terms {
|
||||||
match term {
|
match term {
|
||||||
Term::AddOrRetract(op, e, a, v) => {
|
Term::AddOrRetract(op, KnownEntid(e), a, v) => {
|
||||||
let attribute: &Attribute = self.schema.require_attribute_for_entid(a)?;
|
let attribute: &Attribute = self.schema.require_attribute_for_entid(a)?;
|
||||||
if entids::might_update_metadata(a) {
|
if entids::might_update_metadata(a) {
|
||||||
tx_might_update_metadata = true;
|
tx_might_update_metadata = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
let added = op == OpType::Add;
|
let added = op == OpType::Add;
|
||||||
let reduced = (e.0, a, attribute, v, added);
|
|
||||||
|
// We take the last encountered :db/txInstant value.
|
||||||
|
// If more than one is provided, the transactor will fail.
|
||||||
|
if added &&
|
||||||
|
e == self.tx_id &&
|
||||||
|
a == entids::DB_TX_INSTANT {
|
||||||
|
if let TypedValue::Instant(instant) = v {
|
||||||
|
if let Some(ts) = self.tx_instant {
|
||||||
|
if ts == instant {
|
||||||
|
// Dupes are fine.
|
||||||
|
} else {
|
||||||
|
bail!(ErrorKind::ConflictingDatoms);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
self.tx_instant = Some(instant);
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
} else {
|
||||||
|
// The type error has been caught earlier.
|
||||||
|
unreachable!()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let reduced = (e, a, attribute, v, added);
|
||||||
match (attribute.fulltext, attribute.multival) {
|
match (attribute.fulltext, attribute.multival) {
|
||||||
(false, true) => non_fts_many.push(reduced),
|
(false, true) => non_fts_many.push(reduced),
|
||||||
(false, false) => non_fts_one.push(reduced),
|
(false, false) => non_fts_one.push(reduced),
|
||||||
|
@ -633,12 +665,13 @@ impl<'conn, 'a> Tx<'conn, 'a> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Transact [:db/add :db/txInstant NOW :db/tx].
|
tx_instant = self.tx_instant.unwrap_or_else(now);
|
||||||
// TODO: allow this to be present in the transaction data.
|
|
||||||
|
// Transact [:db/add :db/txInstant tx_instant :db/tx].
|
||||||
non_fts_one.push((self.tx_id,
|
non_fts_one.push((self.tx_id,
|
||||||
entids::DB_TX_INSTANT,
|
entids::DB_TX_INSTANT,
|
||||||
self.schema.require_attribute_for_entid(entids::DB_TX_INSTANT).unwrap(),
|
self.schema.require_attribute_for_entid(entids::DB_TX_INSTANT).unwrap(),
|
||||||
TypedValue::Instant(self.tx_instant),
|
tx_instant.into(),
|
||||||
true));
|
true));
|
||||||
|
|
||||||
if !non_fts_one.is_empty() {
|
if !non_fts_one.is_empty() {
|
||||||
|
@ -683,7 +716,7 @@ impl<'conn, 'a> Tx<'conn, 'a> {
|
||||||
|
|
||||||
Ok(TxReport {
|
Ok(TxReport {
|
||||||
tx_id: self.tx_id,
|
tx_id: self.tx_id,
|
||||||
tx_instant: self.tx_instant,
|
tx_instant,
|
||||||
tempids: tempids,
|
tempids: tempids,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -694,12 +727,11 @@ fn start_tx<'conn, 'a>(conn: &'conn rusqlite::Connection,
|
||||||
mut partition_map: PartitionMap,
|
mut partition_map: PartitionMap,
|
||||||
schema_for_mutation: &'a Schema,
|
schema_for_mutation: &'a Schema,
|
||||||
schema: &'a Schema) -> Result<Tx<'conn, 'a>> {
|
schema: &'a Schema) -> Result<Tx<'conn, 'a>> {
|
||||||
let tx_instant = ::now(); // Label the transaction with the timestamp when we first see it: leading edge.
|
|
||||||
let tx_id = partition_map.allocate_entid(":db.part/tx");
|
let tx_id = partition_map.allocate_entid(":db.part/tx");
|
||||||
|
|
||||||
conn.begin_tx_application()?;
|
conn.begin_tx_application()?;
|
||||||
|
|
||||||
Ok(Tx::new(conn, partition_map, schema_for_mutation, schema, tx_id, tx_instant))
|
Ok(Tx::new(conn, partition_map, schema_for_mutation, schema, tx_id))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn conclude_tx(tx: Tx, report: TxReport) -> Result<(TxReport, PartitionMap, Option<Schema>)> {
|
fn conclude_tx(tx: Tx, report: TxReport) -> Result<(TxReport, PartitionMap, Option<Schema>)> {
|
||||||
|
|
|
@ -89,11 +89,18 @@ def_parser!(Tx, lookup_ref, LookupRef, {
|
||||||
});
|
});
|
||||||
|
|
||||||
def_parser!(Tx, entid_or_lookup_ref_or_temp_id, EntidOrLookupRefOrTempId, {
|
def_parser!(Tx, entid_or_lookup_ref_or_temp_id, EntidOrLookupRefOrTempId, {
|
||||||
Tx::entid().map(EntidOrLookupRefOrTempId::Entid)
|
Tx::db_tx().map(EntidOrLookupRefOrTempId::TempId)
|
||||||
|
.or(Tx::entid().map(EntidOrLookupRefOrTempId::Entid))
|
||||||
.or(Tx::lookup_ref().map(EntidOrLookupRefOrTempId::LookupRef))
|
.or(Tx::lookup_ref().map(EntidOrLookupRefOrTempId::LookupRef))
|
||||||
.or(Tx::temp_id().map(EntidOrLookupRefOrTempId::TempId))
|
.or(Tx::temp_id().map(EntidOrLookupRefOrTempId::TempId))
|
||||||
});
|
});
|
||||||
|
|
||||||
|
def_matches_namespaced_keyword!(Tx, literal_db_tx, "db", "tx");
|
||||||
|
|
||||||
|
def_parser!(Tx, db_tx, TempId, {
|
||||||
|
Tx::literal_db_tx().map(|_| TempId::Tx)
|
||||||
|
});
|
||||||
|
|
||||||
def_parser!(Tx, temp_id, TempId, {
|
def_parser!(Tx, temp_id, TempId, {
|
||||||
satisfy_map(|x: &'a edn::ValueAndSpan| x.as_text().cloned().map(TempId::External))
|
satisfy_map(|x: &'a edn::ValueAndSpan| x.as_text().cloned().map(TempId::External))
|
||||||
});
|
});
|
||||||
|
|
|
@ -23,12 +23,14 @@ use self::edn::symbols::NamespacedKeyword;
|
||||||
pub enum TempId {
|
pub enum TempId {
|
||||||
External(String),
|
External(String),
|
||||||
Internal(i64),
|
Internal(i64),
|
||||||
|
Tx, // Special identifier used to refer to the current transaction.
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TempId {
|
impl TempId {
|
||||||
pub fn into_external(self) -> Option<String> {
|
pub fn into_external(self) -> Option<String> {
|
||||||
match self {
|
match self {
|
||||||
TempId::External(s) => Some(s),
|
TempId::External(s) => Some(s),
|
||||||
|
TempId::Tx |
|
||||||
TempId::Internal(_) => None,
|
TempId::Internal(_) => None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -36,6 +38,7 @@ impl TempId {
|
||||||
pub fn into_internal(self) -> Option<i64> {
|
pub fn into_internal(self) -> Option<i64> {
|
||||||
match self {
|
match self {
|
||||||
TempId::Internal(x) => Some(x),
|
TempId::Internal(x) => Some(x),
|
||||||
|
TempId::Tx |
|
||||||
TempId::External(_) => None,
|
TempId::External(_) => None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -46,6 +49,7 @@ impl fmt::Display for TempId {
|
||||||
match self {
|
match self {
|
||||||
&TempId::External(ref s) => write!(f, "{}", s),
|
&TempId::External(ref s) => write!(f, "{}", s),
|
||||||
&TempId::Internal(x) => write!(f, "<tempid {}>", x),
|
&TempId::Internal(x) => write!(f, "<tempid {}>", x),
|
||||||
|
&TempId::Tx => write!(f, "<Tx>"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue