Allow customers to assert facts about the current transaction.

Fixes #225
This commit is contained in:
Edouard Oger 2018-01-22 13:17:12 -05:00 committed by Richard Newman
parent 98502eb68f
commit 55d196094a
4 changed files with 90 additions and 19 deletions

View file

@ -1327,6 +1327,43 @@ mod tests {
[200 :db.schema/attribute 101]]");
}
#[test]
fn test_tx_assertions() {
let mut conn = TestConn::default();
// Test that txInstant can be asserted.
assert_transact!(conn, "[[:db/add :db/tx :db/txInstant #inst \"2017-06-16T00:56:41.257Z\"]
[:db/add 100 :db/ident :name/Ivan]
[:db/add 101 :db/ident :name/Petr]]");
assert_matches!(conn.last_transaction(),
"[[100 :db/ident :name/Ivan ?tx true]
[101 :db/ident :name/Petr ?tx true]
[?tx :db/txInstant #inst \"2017-06-16T00:56:41.257Z\" ?tx true]]");
// Test multiple txInstant with different values should fail.
assert_transact!(conn, "[[:db/add :db/tx :db/txInstant #inst \"2017-06-16T00:59:11.257Z\"]
[:db/add :db/tx :db/txInstant #inst \"2017-06-16T00:59:11.752Z\"]
[:db/add 102 :db/ident :name/Vlad]]",
Err("Could not insert non-fts one statements into temporary search table!"));
// Test multiple txInstants with the same value.
// Test disabled: depends on #535.
// assert_transact!(conn, "[[:db/add :db/tx :db/txInstant #inst \"2017-06-16T00:59:11.257Z\"]
// [:db/add :db/tx :db/txInstant #inst \"2017-06-16T00:59:11.257Z\"]
// [:db/add 103 :db/ident :name/Dimitri]
// [:db/add 104 :db/ident :name/Anton]]");
// assert_matches!(conn.last_transaction(),
// "[[103 :db/ident :name/Dimitri ?tx true]
// [104 :db/ident :name/Anton ?tx true]
// [?tx :db/txInstant #inst \"2017-06-16T00:59:11.257Z\" ?tx true]]");
// Test txInstant retraction
// Test disabled: retracting a datom that doesn't exist should fail.
// assert_transact!(conn, "[[:db/retract :db/tx :db/txInstant #inst \"2017-06-16T00:59:11.257Z\"]
// [:db/add 105 :db/ident :name/Vadim]]",
// Err("Should fail!"));
}
#[test]
fn test_retract() {
let mut conn = TestConn::default();

View file

@ -141,7 +141,7 @@ pub struct Tx<'conn, 'a> {
tx_id: Entid,
/// The timestamp when the transaction began to be committed.
tx_instant: DateTime<Utc>,
tx_instant: Option<DateTime<Utc>>,
}
impl<'conn, 'a> Tx<'conn, 'a> {
@ -150,15 +150,14 @@ impl<'conn, 'a> Tx<'conn, 'a> {
partition_map: PartitionMap,
schema_for_mutation: &'a Schema,
schema: &'a Schema,
tx_id: Entid,
tx_instant: DateTime<Utc>) -> Tx<'conn, 'a> {
tx_id: Entid) -> Tx<'conn, 'a> {
Tx {
store: store,
partition_map: partition_map,
schema_for_mutation: Cow::Borrowed(schema_for_mutation),
schema: schema,
tx_id: tx_id,
tx_instant: tx_instant,
tx_instant: None,
}
}
@ -206,16 +205,18 @@ impl<'conn, 'a> Tx<'conn, 'a> {
partition_map: &'a PartitionMap,
schema: &'a Schema,
mentat_id_count: i64,
tx_id: KnownEntid,
temp_ids: InternSet<TempId>,
lookup_refs: InternSet<AVPair>,
}
impl<'a> InProcess<'a> {
fn with_schema_and_partition_map(schema: &'a Schema, partition_map: &'a PartitionMap) -> InProcess<'a> {
fn with_schema_and_partition_map(schema: &'a Schema, partition_map: &'a PartitionMap, tx_id: KnownEntid) -> InProcess<'a> {
InProcess {
partition_map,
schema,
mentat_id_count: 0,
tx_id,
temp_ids: InternSet::new(),
lookup_refs: InternSet::new(),
}
@ -269,6 +270,11 @@ impl<'conn, 'a> Tx<'conn, 'a> {
Ok(Either::Left(e))
},
// Special case: current tx ID.
entmod::EntidOrLookupRefOrTempId::TempId(TempId::Tx) => {
Ok(Either::Left(self.tx_id))
},
entmod::EntidOrLookupRefOrTempId::TempId(e) => {
Ok(Either::Right(LookupRefOrTempId::TempId(self.intern_temp_id(e))))
},
@ -334,7 +340,7 @@ impl<'conn, 'a> Tx<'conn, 'a> {
}
}
let mut in_process = InProcess::with_schema_and_partition_map(&self.schema, &self.partition_map);
let mut in_process = InProcess::with_schema_and_partition_map(&self.schema, &self.partition_map, KnownEntid(self.tx_id));
// We want to handle entities in the order they're given to us, while also "exploding" some
// entities into many. We therefore push the initial entities onto the back of the deque,
@ -593,6 +599,8 @@ impl<'conn, 'a> Tx<'conn, 'a> {
final_populations.allocated,
inert_terms.into_iter().map(|term| term.unwrap()).collect()].concat();
let tx_instant;
{ // TODO: Don't use this block to scope borrowing the schema; instead, extract a helper function.
// Assertions that are :db.cardinality/one and not :db.fulltext.
@ -615,14 +623,27 @@ impl<'conn, 'a> Tx<'conn, 'a> {
// TODO: use something like Clojure's group_by to do this.
for term in final_terms {
match term {
Term::AddOrRetract(op, e, a, v) => {
Term::AddOrRetract(op, KnownEntid(e), a, v) => {
let attribute: &Attribute = self.schema.require_attribute_for_entid(a)?;
if entids::might_update_metadata(a) {
tx_might_update_metadata = true;
}
let added = op == OpType::Add;
let reduced = (e.0, a, attribute, v, added);
// We take the last encountered :db/txInstant value.
if added &&
e == self.tx_id &&
a == entids::DB_TX_INSTANT {
if let TypedValue::Instant(instant) = v {
self.tx_instant = Some(instant);
} else {
// The type error has been caught earlier.
unreachable!()
}
}
let reduced = (e, a, attribute, v, added);
match (attribute.fulltext, attribute.multival) {
(false, true) => non_fts_many.push(reduced),
(false, false) => non_fts_one.push(reduced),
@ -633,13 +654,16 @@ impl<'conn, 'a> Tx<'conn, 'a> {
}
}
// Transact [:db/add :db/txInstant NOW :db/tx].
// TODO: allow this to be present in the transaction data.
non_fts_one.push((self.tx_id,
entids::DB_TX_INSTANT,
self.schema.require_attribute_for_entid(entids::DB_TX_INSTANT).unwrap(),
TypedValue::Instant(self.tx_instant),
true));
tx_instant = self.tx_instant.unwrap_or(::now());
// Transact [:db/add :db/txInstant NOW :db/tx] if it doesn't exist.
if self.tx_instant == None {
non_fts_one.push((self.tx_id,
entids::DB_TX_INSTANT,
self.schema.require_attribute_for_entid(entids::DB_TX_INSTANT).unwrap(),
TypedValue::Instant(tx_instant),
true));
}
if !non_fts_one.is_empty() {
self.store.insert_non_fts_searches(&non_fts_one[..], db::SearchType::Inexact)?;
@ -683,7 +707,7 @@ impl<'conn, 'a> Tx<'conn, 'a> {
Ok(TxReport {
tx_id: self.tx_id,
tx_instant: self.tx_instant,
tx_instant,
tempids: tempids,
})
}
@ -694,12 +718,11 @@ fn start_tx<'conn, 'a>(conn: &'conn rusqlite::Connection,
mut partition_map: PartitionMap,
schema_for_mutation: &'a Schema,
schema: &'a Schema) -> Result<Tx<'conn, 'a>> {
let tx_instant = ::now(); // Label the transaction with the timestamp when we first see it: leading edge.
let tx_id = partition_map.allocate_entid(":db.part/tx");
conn.begin_tx_application()?;
Ok(Tx::new(conn, partition_map, schema_for_mutation, schema, tx_id, tx_instant))
Ok(Tx::new(conn, partition_map, schema_for_mutation, schema, tx_id))
}
fn conclude_tx(tx: Tx, report: TxReport) -> Result<(TxReport, PartitionMap, Option<Schema>)> {

View file

@ -89,11 +89,18 @@ def_parser!(Tx, lookup_ref, LookupRef, {
});
def_parser!(Tx, entid_or_lookup_ref_or_temp_id, EntidOrLookupRefOrTempId, {
Tx::entid().map(EntidOrLookupRefOrTempId::Entid)
Tx::db_tx().map(EntidOrLookupRefOrTempId::TempId)
.or(Tx::entid().map(EntidOrLookupRefOrTempId::Entid))
.or(Tx::lookup_ref().map(EntidOrLookupRefOrTempId::LookupRef))
.or(Tx::temp_id().map(EntidOrLookupRefOrTempId::TempId))
});
def_matches_namespaced_keyword!(Tx, literal_db_tx, "db", "tx");
def_parser!(Tx, db_tx, TempId, {
Tx::literal_db_tx().map(|_| TempId::Tx)
});
def_parser!(Tx, temp_id, TempId, {
satisfy_map(|x: &'a edn::ValueAndSpan| x.as_text().cloned().map(TempId::External))
});

View file

@ -23,12 +23,14 @@ use self::edn::symbols::NamespacedKeyword;
pub enum TempId {
External(String),
Internal(i64),
Tx, // Special identifier used to refer to the current transaction.
}
impl TempId {
pub fn into_external(self) -> Option<String> {
match self {
TempId::External(s) => Some(s),
TempId::Tx |
TempId::Internal(_) => None,
}
}
@ -36,6 +38,7 @@ impl TempId {
pub fn into_internal(self) -> Option<i64> {
match self {
TempId::Internal(x) => Some(x),
TempId::Tx |
TempId::External(_) => None,
}
}
@ -46,6 +49,7 @@ impl fmt::Display for TempId {
match self {
&TempId::External(ref s) => write!(f, "{}", s),
&TempId::Internal(x) => write!(f, "<tempid {}>", x),
&TempId::Tx => write!(f, "<Tx>"),
}
}
}