Compare commits

...

3 commits

Author SHA1 Message Date
Richard Newman
96dbf26d38 Review comment: take the last seen txInstant and transact it once. 2018-01-22 13:18:24 -08:00
Richard Newman
68b4d51e1b Review comment: name all enum cases. 2018-01-22 13:11:09 -08:00
Edouard Oger
87dd6a711e Allow customers to assert facts about the current transaction.
Fixes #225
2018-01-22 13:52:54 -05:00
4 changed files with 66 additions and 7 deletions

View file

@ -1320,6 +1320,30 @@ mod tests {
[200 :db.schema/attribute 101]]"); [200 :db.schema/attribute 101]]");
} }
#[test]
fn test_tx_assertions() {
let mut conn = TestConn::default();
// Test that txInstant can be asserted.
assert_transact!(conn, "[[:db/add :db/tx :db/txInstant #inst \"2017-06-16T00:56:41.257Z\"]
[:db/add 100 :db.schema/version 1]
[:db/add 101 :db.schema/version 2]]");
assert_matches!(conn.last_transaction(),
"[[100 :db.schema/version 1 ?tx true]
[101 :db.schema/version 2 ?tx true]
[?tx :db/txInstant #inst \"2017-06-16T00:56:41.257Z\" ?tx true]]");
// Test other tx assertion.
assert_transact!(conn, "[[:db/add :db/tx :db.schema/version 7]
[:db/add 200 :db.schema/version 2]
[:db/add 201 :db.schema/version 3]]");
assert_matches!(conn.last_transaction(),
"[[200 :db.schema/version 2 ?tx true]
[201 :db.schema/version 3 ?tx true]
[?tx :db/txInstant ?ms ?tx true]
[?tx :db.schema/version 7 ?tx true]]");
}
#[test] #[test]
fn test_retract() { fn test_retract() {
let mut conn = TestConn::default(); let mut conn = TestConn::default();

View file

@ -204,16 +204,18 @@ impl<'conn, 'a> Tx<'conn, 'a> {
partition_map: &'a PartitionMap, partition_map: &'a PartitionMap,
schema: &'a Schema, schema: &'a Schema,
mentat_id_count: i64, mentat_id_count: i64,
tx_id: Entid,
temp_ids: intern_set::InternSet<TempId>, temp_ids: intern_set::InternSet<TempId>,
lookup_refs: intern_set::InternSet<AVPair>, lookup_refs: intern_set::InternSet<AVPair>,
} }
impl<'a> InProcess<'a> { impl<'a> InProcess<'a> {
fn with_schema_and_partition_map(schema: &'a Schema, partition_map: &'a PartitionMap) -> InProcess<'a> { fn with_schema_and_partition_map(schema: &'a Schema, partition_map: &'a PartitionMap, tx_id: Entid) -> InProcess<'a> {
InProcess { InProcess {
partition_map, partition_map,
schema, schema,
mentat_id_count: 0, mentat_id_count: 0,
tx_id,
temp_ids: intern_set::InternSet::new(), temp_ids: intern_set::InternSet::new(),
lookup_refs: intern_set::InternSet::new(), lookup_refs: intern_set::InternSet::new(),
} }
@ -268,6 +270,11 @@ impl<'conn, 'a> Tx<'conn, 'a> {
Ok(Either::Left(e)) Ok(Either::Left(e))
}, },
// Special case: current tx ID.
entmod::EntidOrLookupRefOrTempId::TempId(TempId::Tx) => {
Ok(Either::Left(KnownEntid(self.tx_id)))
},
entmod::EntidOrLookupRefOrTempId::TempId(e) => { entmod::EntidOrLookupRefOrTempId::TempId(e) => {
Ok(Either::Right(LookupRefOrTempId::TempId(self.intern_temp_id(e)))) Ok(Either::Right(LookupRefOrTempId::TempId(self.intern_temp_id(e))))
}, },
@ -333,7 +340,7 @@ impl<'conn, 'a> Tx<'conn, 'a> {
} }
} }
let mut in_process = InProcess::with_schema_and_partition_map(&self.schema, &self.partition_map); let mut in_process = InProcess::with_schema_and_partition_map(&self.schema, &self.partition_map, self.tx_id);
// We want to handle entities in the order they're given to us, while also "exploding" some // We want to handle entities in the order they're given to us, while also "exploding" some
// entities into many. We therefore push the initial entities onto the back of the deque, // entities into many. We therefore push the initial entities onto the back of the deque,
@ -610,14 +617,32 @@ impl<'conn, 'a> Tx<'conn, 'a> {
// TODO: use something like Clojure's group_by to do this. // TODO: use something like Clojure's group_by to do this.
for term in final_terms { for term in final_terms {
match term { match term {
Term::AddOrRetract(op, e, a, v) => { Term::AddOrRetract(op, KnownEntid(e), a, v) => {
let attribute: &Attribute = self.schema.require_attribute_for_entid(a)?; let attribute: &Attribute = self.schema.require_attribute_for_entid(a)?;
if entids::might_update_metadata(a) { if entids::might_update_metadata(a) {
tx_might_update_metadata = true; tx_might_update_metadata = true;
} }
let added = op == OpType::Add; let added = op == OpType::Add;
let reduced = (e.0, a, attribute, v, added);
// We take the last encountered :db/txInstant value.
if added &&
e == self.tx_id &&
a == entids::DB_TX_INSTANT {
if let TypedValue::Instant(instant) = v {
// TODO: `instant` should be strictly after the last transaction
// timestamp and strictly before the current transactor timestamp.
self.tx_instant = instant;
} else {
panic!("This type error should have been caught earlier.");
}
// We don't need to add the datom here: we'll do so for the current
// timestamp at the end of the loop.
continue;
}
let reduced = (e, a, attribute, v, added);
match (attribute.fulltext, attribute.multival) { match (attribute.fulltext, attribute.multival) {
(false, true) => non_fts_many.push(reduced), (false, true) => non_fts_many.push(reduced),
(false, false) => non_fts_one.push(reduced), (false, false) => non_fts_one.push(reduced),
@ -628,8 +653,7 @@ impl<'conn, 'a> Tx<'conn, 'a> {
} }
} }
// Transact [:db/add :db/txInstant NOW :db/tx]. // Transact [:db/add :db/txInstant NOW :db/tx] if it doesn't exist.
// TODO: allow this to be present in the transaction data.
non_fts_one.push((self.tx_id, non_fts_one.push((self.tx_id,
entids::DB_TX_INSTANT, entids::DB_TX_INSTANT,
self.schema.require_attribute_for_entid(entids::DB_TX_INSTANT).unwrap(), self.schema.require_attribute_for_entid(entids::DB_TX_INSTANT).unwrap(),

View file

@ -89,11 +89,18 @@ def_parser!(Tx, lookup_ref, LookupRef, {
}); });
def_parser!(Tx, entid_or_lookup_ref_or_temp_id, EntidOrLookupRefOrTempId, { def_parser!(Tx, entid_or_lookup_ref_or_temp_id, EntidOrLookupRefOrTempId, {
Tx::entid().map(EntidOrLookupRefOrTempId::Entid) Tx::db_tx().map(EntidOrLookupRefOrTempId::TempId)
.or(Tx::entid().map(EntidOrLookupRefOrTempId::Entid))
.or(Tx::lookup_ref().map(EntidOrLookupRefOrTempId::LookupRef)) .or(Tx::lookup_ref().map(EntidOrLookupRefOrTempId::LookupRef))
.or(Tx::temp_id().map(EntidOrLookupRefOrTempId::TempId)) .or(Tx::temp_id().map(EntidOrLookupRefOrTempId::TempId))
}); });
def_matches_namespaced_keyword!(Tx, literal_db_tx, "db", "tx");
def_parser!(Tx, db_tx, TempId, {
Tx::literal_db_tx().map(|_| TempId::Tx)
});
def_parser!(Tx, temp_id, TempId, { def_parser!(Tx, temp_id, TempId, {
satisfy_map(|x: &'a edn::ValueAndSpan| x.as_text().cloned().map(TempId::External)) satisfy_map(|x: &'a edn::ValueAndSpan| x.as_text().cloned().map(TempId::External))
}); });

View file

@ -23,12 +23,14 @@ use self::edn::symbols::NamespacedKeyword;
pub enum TempId { pub enum TempId {
External(String), External(String),
Internal(i64), Internal(i64),
Tx, // Special identifier used to refer to the current transaction.
} }
impl TempId { impl TempId {
pub fn into_external(self) -> Option<String> { pub fn into_external(self) -> Option<String> {
match self { match self {
TempId::External(s) => Some(s), TempId::External(s) => Some(s),
TempId::Tx |
TempId::Internal(_) => None, TempId::Internal(_) => None,
} }
} }
@ -36,6 +38,7 @@ impl TempId {
pub fn into_internal(self) -> Option<i64> { pub fn into_internal(self) -> Option<i64> {
match self { match self {
TempId::Internal(x) => Some(x), TempId::Internal(x) => Some(x),
TempId::Tx |
TempId::External(_) => None, TempId::External(_) => None,
} }
} }
@ -46,6 +49,7 @@ impl fmt::Display for TempId {
match self { match self {
&TempId::External(ref s) => write!(f, "{}", s), &TempId::External(ref s) => write!(f, "{}", s),
&TempId::Internal(x) => write!(f, "<tempid {}>", x), &TempId::Internal(x) => write!(f, "<tempid {}>", x),
&TempId::Tx => write!(f, "<Tx>"),
} }
} }
} }