Implement upsert resolution algorithm. (#186, #283). r=rnewman, f=jsantell

* Pre: Implement batch [a v] pair lookup.

* Pre: Add InternSet for sharing ref-counted handles to large values.

* Pre: Derive more for Entity.

* Pre: Return DB from creating; return TxReport from transact.

I explicitly am not supporting opening existing databases yet, let
alone upgrading databases from earlier versions.  That can follow fast
once basic transactions are supported.

* Pre: Parse string temporary ID entities; remove ValueOrLookupRef.

This adds TempId entities, but we can't disambiguate String temporary
IDs from values without the use of the schema, so there's no new value
branch.  Similarly, we can't disambiguate lookup-ref values from two
element list values without a schema, so we remove this entirely.
We'll handle the ambiguity later in the transactor.

* Persist partitions to SQL store; allocate transaction ID. (#186)

* Post: Test upserting with vectors.

This converts an existing test to EDN:
84a80f40f5/test/datomish/db_test.cljc (L193).

* Implement tempid upsert resolution algorithm. (#184)

* Post: Separate Tx out of DB.

This is very preliminary, since we don't have a real connection type
to manage transactions and their metadata yet.

* Post: Comment on implementation choices in the transactor.

* Review comment: Put long use lists on separate lines.

* Review comment: Accept String: Borrow<S> instead of just String.

* Review comment: Address nits.
This commit is contained in:
Nick Alexander 2017-02-14 16:50:40 -08:00 committed by GitHub
parent bfb62302cb
commit 16e9740d8a
12 changed files with 1117 additions and 188 deletions

45
core/src/intern_set.rs Normal file
View file

@ -0,0 +1,45 @@
// Copyright 2016 Mozilla
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// this file except in compliance with the License. You may obtain a copy of the
// License at http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
#![allow(dead_code)]
use std::collections::HashSet;
use std::hash::Hash;
use std::rc::Rc;
/// An `InternSet` allows to "intern" some potentially large values, maintaining a single value
/// instance owned by the `InternSet` and leaving consumers with lightweight ref-counted handles to
/// the large owned value. This can avoid expensive clone() operations.
///
/// In Mentat, such large values might be strings or arbitrary [a v] pairs.
///
/// See https://en.wikipedia.org/wiki/String_interning for discussion.
#[derive(Clone, Debug, Default, Eq, PartialEq)]
pub struct InternSet<T> where T: Eq + Hash {
pub inner: HashSet<Rc<T>>,
}
impl<T> InternSet<T> where T: Eq + Hash {
pub fn new() -> InternSet<T> {
InternSet {
inner: HashSet::new(),
}
}
/// Intern a value, providing a ref-counted handle to the interned value.
pub fn intern(&mut self, value: T) -> Rc<T> {
let key = Rc::new(value);
if self.inner.insert(key.clone()) {
key
} else {
self.inner.get(&key).unwrap().clone()
}
}
}

View file

@ -272,3 +272,5 @@ mod test {
assert!(attr2.flags() & AttributeBitFlags::UniqueValue as u8 != 0);
}
}
pub mod intern_set;

View file

@ -10,19 +10,21 @@
#![allow(dead_code)]
use std::iter::once;
use std::borrow::Borrow;
use std::collections::HashMap;
use std::fmt::Display;
use std::iter::{once, repeat};
use std::ops::Range;
use std::path::Path;
use itertools;
use itertools::Itertools;
use rusqlite;
use rusqlite::types::{ToSql, ToSqlOutput};
use time;
use ::{repeat_values, to_namespaced_keyword};
use ::{now, repeat_values, to_namespaced_keyword};
use bootstrap;
use edn::types::Value;
use entids;
use mentat_core::{
Attribute,
AttributeBitFlags,
@ -32,11 +34,18 @@ use mentat_core::{
TypedValue,
ValueType,
};
use mentat_tx::entities as entmod;
use mentat_tx::entities::{Entity, OpType};
use mentat_tx::entities::Entity;
use errors::{ErrorKind, Result, ResultExt};
use types::{DB, Partition, PartitionMap};
use schema::SchemaBuilding;
use types::{
AVMap,
AVPair,
DB,
Partition,
PartitionMap,
TxReport,
};
use tx::Tx;
pub fn new_connection<T>(uri: T) -> rusqlite::Result<rusqlite::Connection> where T: AsRef<Path> {
let conn = match uri.as_ref().to_string_lossy().len() {
@ -153,6 +162,7 @@ lazy_static! {
r#"CREATE TABLE schema (ident TEXT NOT NULL, attr TEXT NOT NULL, value BLOB NOT NULL, value_type_tag SMALLINT NOT NULL,
FOREIGN KEY (ident) REFERENCES idents (ident))"#,
r#"CREATE INDEX idx_schema_unique ON schema (ident, attr, value, value_type_tag)"#,
// TODO: store entid instead of ident for partition name.
r#"CREATE TABLE parts (part TEXT NOT NULL PRIMARY KEY, start INTEGER NOT NULL, idx INTEGER NOT NULL)"#,
]
};
@ -180,7 +190,7 @@ fn get_user_version(conn: &rusqlite::Connection) -> Result<i32> {
}
// TODO: rename "SQL" functions to align with "datoms" functions.
pub fn create_current_version(conn: &mut rusqlite::Connection) -> Result<i32> {
pub fn create_current_version(conn: &mut rusqlite::Connection) -> Result<DB> {
let tx = conn.transaction()?;
for statement in (&V2_STATEMENTS).iter() {
@ -191,20 +201,21 @@ pub fn create_current_version(conn: &mut rusqlite::Connection) -> Result<i32> {
// TODO: think more carefully about allocating new parts and bitmasking part ranges.
// TODO: install these using bootstrap assertions. It's tricky because the part ranges are implicit.
// TODO: one insert, chunk into 999/3 sections, for safety.
// This is necessary: `transact` will only UPDATE parts, not INSERT them if they're missing.
for (part, partition) in bootstrap_partition_map.iter() {
// TODO: Convert "keyword" part to SQL using Value conversion.
tx.execute("INSERT INTO parts VALUES (?, ?, ?)", &[part, &partition.start, &partition.index])?;
}
let bootstrap_db = DB::new(bootstrap_partition_map, bootstrap::bootstrap_schema());
bootstrap_db.transact_internal(&tx, &bootstrap::bootstrap_entities()[..], bootstrap::TX0)?;
// TODO: return to transact_internal to self-manage the encompassing SQLite transaction.
let mut bootstrap_db = DB::new(bootstrap_partition_map, bootstrap::bootstrap_schema());
bootstrap_db.transact(&tx, bootstrap::bootstrap_entities())?;
set_user_version(&tx, CURRENT_VERSION)?;
let user_version = get_user_version(&tx)?;
// TODO: use the drop semantics to do this automagically?
tx.commit()?;
Ok(user_version)
Ok(bootstrap_db)
}
// (def v2-statements v1-statements)
@ -305,12 +316,12 @@ pub fn update_from_version(conn: &mut rusqlite::Connection, current_version: i32
Ok(user_version)
}
pub fn ensure_current_version(conn: &mut rusqlite::Connection) -> Result<i32> {
pub fn ensure_current_version(conn: &mut rusqlite::Connection) -> Result<DB> {
let user_version = get_user_version(&conn)?;
match user_version {
CURRENT_VERSION => Ok(user_version),
0 => create_current_version(conn),
v => update_from_version(conn, v),
// TODO: support updating or re-opening an existing store.
v => bail!(ErrorKind::NotYetImplemented(format!("Opening databases with Mentat version: {}", v))),
}
}
@ -429,7 +440,7 @@ pub fn read_db(conn: &rusqlite::Connection) -> Result<DB> {
}
/// Internal representation of an [e a v added] datom, ready to be transacted against the store.
type ReducedEntity = (i64, i64, TypedValue, bool);
pub type ReducedEntity = (i64, i64, TypedValue, bool);
#[derive(Clone,Debug,Eq,Hash,Ord,PartialOrd,PartialEq)]
pub enum SearchType {
@ -463,6 +474,77 @@ impl DB {
}
}
/// Given a slice of [a v] lookup-refs, look up the corresponding [e a v] triples.
///
/// It is assumed that the attribute `a` in each lookup-ref is `:db/unique`, so that at most one
/// matching [e a v] triple exists. (If this is not true, some matching entid `e` will be
/// chosen non-deterministically, if one exists.)
///
/// Returns a map &(a, v) -> e, to avoid cloning potentially large values. The keys of the map
/// are exactly those (a, v) pairs that have an assertion [e a v] in the datom store.
pub fn resolve_avs<'a>(&self, conn: &rusqlite::Connection, avs: &'a [&'a AVPair]) -> Result<AVMap<'a>> {
// Start search_id's at some identifiable number.
let initial_search_id = 2000;
let bindings_per_statement = 4;
// We map [a v] -> numeric search_id -> e, and then we use the search_id lookups to finally
// produce the map [a v] -> e.
//
// TODO: `collect` into a HashSet so that any (a, v) is resolved at most once.
let chunks: itertools::IntoChunks<_> = avs.into_iter().enumerate().chunks(::SQLITE_MAX_VARIABLE_NUMBER / 4);
// We'd like to `flat_map` here, but it's not obvious how to `flat_map` across `Result`.
// Alternatively, this is a `fold`, and it might be wise to express it as such.
let results: Result<Vec<Vec<_>>> = chunks.into_iter().map(|chunk| -> Result<Vec<_>> {
let mut count = 0;
// We must keep these computed values somewhere to reference them later, so we can't
// combine this `map` and the subsequent `flat_map`.
let block: Vec<(i64, i64, ToSqlOutput<'a>, i32)> = chunk.map(|(index, &&(a, ref v))| {
count += 1;
let search_id: i64 = initial_search_id + index as i64;
let (value, value_type_tag) = v.to_sql_value_pair();
(search_id, a, value, value_type_tag)
}).collect();
// `params` reference computed values in `block`.
let params: Vec<&ToSql> = block.iter().flat_map(|&(ref searchid, ref a, ref value, ref value_type_tag)| {
// Avoid inner heap allocation.
once(searchid as &ToSql)
.chain(once(a as &ToSql)
.chain(once(value as &ToSql)
.chain(once(value_type_tag as &ToSql))))
}).collect();
// TODO: cache these statements for selected values of `count`.
// TODO: query against `datoms` and UNION ALL with `fulltext_datoms` rather than
// querying against `all_datoms`. We know all the attributes, and in the common case,
// where most unique attributes will not be fulltext-indexed, we'll be querying just
// `datoms`, which will be much faster.
let values: String = repeat_values(bindings_per_statement, count);
let s: String = format!("WITH t(search_id, a, v, value_type_tag) AS (VALUES {}) SELECT t.search_id, d.e \
FROM t, all_datoms AS d \
WHERE d.index_avet IS NOT 0 AND d.a = t.a AND d.value_type_tag = t.value_type_tag AND d.v = t.v",
values);
let mut stmt: rusqlite::Statement = conn.prepare(s.as_str())?;
let m: Result<Vec<(i64, Entid)>> = stmt.query_and_then(&params, |row| -> Result<(i64, Entid)> {
Ok((row.get_checked(0)?, row.get_checked(1)?))
})?.collect();
m
}).collect::<Result<Vec<Vec<(i64, Entid)>>>>();
// Flatten.
let results: Vec<(i64, Entid)> = results?.as_slice().concat();
// Create map [a v] -> e.
let m: HashMap<&'a AVPair, Entid> = results.into_iter().map(|(search_id, entid)| {
let index: usize = (search_id - initial_search_id) as usize;
(avs[index], entid)
}).collect();
Ok(m)
}
/// Create empty temporary tables for search parameters and search results.
fn create_temp_tables(&self, conn: &rusqlite::Connection) -> Result<()> {
// We can't do this in one shot, since we can't prepare a batch statement.
@ -528,7 +610,7 @@ impl DB {
///
/// Eventually, the details of this approach will be captured in
/// https://github.com/mozilla/mentat/wiki/Transacting:-entity-to-SQL-translation.
fn insert_non_fts_searches<'a>(&self, conn: &rusqlite::Connection, entities: &'a [ReducedEntity], tx: Entid, search_type: SearchType) -> Result<()> {
pub fn insert_non_fts_searches<'a>(&self, conn: &rusqlite::Connection, entities: &'a [ReducedEntity], tx: Entid, search_type: SearchType) -> Result<()> {
let bindings_per_statement = 7;
let chunks: itertools::IntoChunks<_> = entities.into_iter().chunks(::SQLITE_MAX_VARIABLE_NUMBER / bindings_per_statement);
@ -592,7 +674,7 @@ impl DB {
/// Take search rows and complete `temp.search_results`.
///
/// See https://github.com/mozilla/mentat/wiki/Transacting:-entity-to-SQL-translation.
fn search(&self, conn: &rusqlite::Connection) -> Result<()> {
pub fn search(&self, conn: &rusqlite::Connection) -> Result<()> {
// First is fast, only one table walk: lookup by exact eav.
// Second is slower, but still only one table walk: lookup old value by ea.
let s = r#"
@ -625,7 +707,7 @@ impl DB {
///
/// See https://github.com/mozilla/mentat/wiki/Transacting:-entity-to-SQL-translation.
// TODO: capture `conn` in a `TxInternal` structure.
fn insert_transaction(&self, conn: &rusqlite::Connection, tx: Entid) -> Result<()> {
pub fn insert_transaction(&self, conn: &rusqlite::Connection, tx: Entid) -> Result<()> {
let s = r#"
INSERT INTO transactions (e, a, v, tx, added, value_type_tag)
SELECT e0, a0, v0, ?, 1, value_type_tag0
@ -659,7 +741,7 @@ impl DB {
///
/// See https://github.com/mozilla/mentat/wiki/Transacting:-entity-to-SQL-translation.
// TODO: capture `conn` in a `TxInternal` structure.
fn update_datoms(&self, conn: &rusqlite::Connection, tx: Entid) -> Result<()> {
pub fn update_datoms(&self, conn: &rusqlite::Connection, tx: Entid) -> Result<()> {
// Delete datoms that were retracted, or those that were :db.cardinality/one and will be
// replaced.
let s = r#"
@ -699,130 +781,67 @@ impl DB {
Ok(())
}
/// Update the current partition map materialized view.
// TODO: only update changed partitions.
pub fn update_partition_map(&self, conn: &rusqlite::Connection) -> Result<()> {
let values_per_statement = 2;
let max_partitions = ::SQLITE_MAX_VARIABLE_NUMBER / values_per_statement;
if self.partition_map.len() > max_partitions {
bail!(ErrorKind::NotYetImplemented(format!("No more than {} partitions are supported", max_partitions)));
}
// Like "UPDATE parts SET idx = CASE WHEN part = ? THEN ? WHEN part = ? THEN ? ELSE idx END".
let s = format!("UPDATE parts SET idx = CASE {} ELSE idx END",
repeat("WHEN part = ? THEN ?").take(self.partition_map.len()).join(" "));
let params: Vec<&ToSql> = self.partition_map.iter().flat_map(|(name, partition)| {
once(name as &ToSql)
.chain(once(&partition.index as &ToSql))
}).collect();
// TODO: only cache the latest of these statements. Changing the set of partitions isn't
// supported in the Clojure implementation at all, and might not be supported in Mentat soon,
// so this is very low priority.
let mut stmt = conn.prepare_cached(s.as_str())?;
stmt.execute(&params[..])
.map(|_c| ())
.chain_err(|| "Could not update partition map")
}
/// Allocate a single fresh entid in the given `partition`.
pub fn allocate_entid<S: ?Sized + Ord + Display>(&mut self, partition: &S) -> i64 where String: Borrow<S> {
self.allocate_entids(partition, 1).start
}
/// Allocate `n` fresh entids in the given `partition`.
pub fn allocate_entids<S: ?Sized + Ord + Display>(&mut self, partition: &S, n: usize) -> Range<i64> where String: Borrow<S> {
match self.partition_map.get_mut(partition) {
Some(mut partition) => {
let idx = partition.index;
partition.index += n as i64;
idx..partition.index
},
// This is a programming error.
None => panic!("Cannot allocate entid from unknown partition: {}", partition),
}
}
/// Transact the given `entities` against the given SQLite `conn`, using the metadata in
/// `self.DB`.
///
/// This approach is explained in https://github.com/mozilla/mentat/wiki/Transacting.
// TODO: move this to the transactor layer.
pub fn transact_internal(&self, conn: &rusqlite::Connection, entities: &[Entity], tx: Entid) -> Result<()>{
// TODO: push these into an internal transaction report?
pub fn transact<I>(&mut self, conn: &rusqlite::Connection, entities: I) -> Result<TxReport> where I: IntoIterator<Item=Entity> {
// Eventually, this function will be responsible for managing a SQLite transaction. For
// now, it's just about the tx details.
/// Assertions that are :db.cardinality/one and not :db.fulltext.
let mut non_fts_one: Vec<ReducedEntity> = vec![];
/// Assertions that are :db.cardinality/many and not :db.fulltext.
let mut non_fts_many: Vec<ReducedEntity> = vec![];
// Transact [:db/add :db/txInstant NOW :db/tx].
// TODO: allow this to be present in the transaction data.
let now = time::get_time();
let tx_instant = (now.sec as i64 * 1_000) + (now.nsec as i64 / (1_000_000));
non_fts_one.push((tx,
entids::DB_TX_INSTANT,
TypedValue::Long(tx_instant),
true));
// Right now, this could be a for loop, saving some mapping, collecting, and type
// annotations. However, I expect it to be a multi-stage map as we start to transform the
// underlying entities, in which case this expression is more natural than for loops.
let r: Vec<Result<()>> = entities.into_iter().map(|entity: &Entity| -> Result<()> {
match *entity {
Entity::AddOrRetract {
op: OpType::Add,
e: entmod::EntidOrLookupRef::Entid(ref e_),
a: ref a_,
v: entmod::ValueOrLookupRef::Value(ref v_)} => {
let e: i64 = match e_ {
&entmod::Entid::Entid(ref e__) => *e__,
&entmod::Entid::Ident(ref e__) => self.schema.require_entid(&e__.to_string())?,
};
let a: i64 = match a_ {
&entmod::Entid::Entid(ref a__) => *a__,
&entmod::Entid::Ident(ref a__) => self.schema.require_entid(&a__.to_string())?,
};
let attribute: &Attribute = self.schema.require_attribute_for_entid(a)?;
if attribute.fulltext {
bail!(ErrorKind::NotYetImplemented(format!("Transacting :db/fulltext entities is not yet implemented: {:?}", entity)))
}
// This is our chance to do schema-aware typechecking: to either assert that the
// given value is in the attribute's value set, or (in limited cases) to coerce
// the value into the attribute's value set.
let typed_value: TypedValue = self.to_typed_value(v_, &attribute)?;
let added = true;
if attribute.multival {
non_fts_many.push((e, a, typed_value, added));
} else {
non_fts_one.push((e, a, typed_value, added));
}
Ok(())
},
Entity::AddOrRetract {
op: OpType::Retract,
e: entmod::EntidOrLookupRef::Entid(ref e_),
a: ref a_,
v: entmod::ValueOrLookupRef::Value(ref v_) } => {
let e: i64 = match e_ {
&entmod::Entid::Entid(ref e__) => *e__,
&entmod::Entid::Ident(ref e__) => self.schema.require_entid(&e__.to_string())?,
};
let a: i64 = match a_ {
&entmod::Entid::Entid(ref a__) => *a__,
&entmod::Entid::Ident(ref a__) => self.schema.require_entid(&a__.to_string())?,
};
let attribute: &Attribute = self.schema.require_attribute_for_entid(a)?;
if attribute.fulltext {
bail!(ErrorKind::NotYetImplemented(format!("Transacting :db/fulltext entities is not yet implemented: {:?}", entity)))
}
// This is our chance to do schema-aware typechecking: to either assert that the
// given value is in the attribute's value set, or (in limited cases) to coerce
// the value into the attribute's value set.
let typed_value: TypedValue = self.to_typed_value(v_, &attribute)?;
let added = false;
if attribute.multival {
non_fts_many.push((e, a, typed_value, added));
} else {
non_fts_one.push((e, a, typed_value, added));
}
Ok(())
},
_ => bail!(ErrorKind::NotYetImplemented(format!("Transacting this entity is not yet implemented: {:?}", entity)))
}
}).collect();
let r: Result<Vec<()>> = r.into_iter().collect();
r?;
let tx_instant = now(); // Label the transaction with the timestamp when we first see it: leading edge.
let tx_id = self.allocate_entid(":db.part/tx");
self.create_temp_tables(conn)?;
if !non_fts_one.is_empty() {
self.insert_non_fts_searches(conn, &non_fts_one[..], tx, SearchType::Inexact)?;
}
if !non_fts_many.is_empty() {
self.insert_non_fts_searches(conn, &non_fts_many[..], tx, SearchType::Exact)?;
}
self.search(conn)?;
self.insert_transaction(conn, tx)?;
self.update_datoms(conn, tx)?;
// TODO: update parts, idents, schema materialized views.
Ok(())
let mut tx = Tx::new(self, conn, tx_id, tx_instant);
tx.transact_entities(entities)
}
}
@ -869,7 +888,7 @@ mod tests {
/// There is some magic here about transaction numbering that I don't want to commit to or
/// document just yet. The end state might be much more general pattern matching syntax, rather
/// than the targeted transaction ID and timestamp replacement we have right now.
fn assert_transactions(conn: &rusqlite::Connection, db: &DB, transactions: &Vec<edn::Value>) {
fn assert_transactions(conn: &rusqlite::Connection, db: &mut DB, transactions: &Vec<edn::Value>) {
for (index, transaction) in transactions.into_iter().enumerate() {
let index = index as i64;
let transaction = transaction.as_map().unwrap();
@ -877,22 +896,40 @@ mod tests {
let assertions: edn::Value = transaction.get(&edn::Value::NamespacedKeyword(symbols::NamespacedKeyword::new("test", "assertions"))).unwrap().clone();
let expected_transaction: Option<&edn::Value> = transaction.get(&edn::Value::NamespacedKeyword(symbols::NamespacedKeyword::new("test", "expected-transaction")));
let expected_datoms: Option<&edn::Value> = transaction.get(&edn::Value::NamespacedKeyword(symbols::NamespacedKeyword::new("test", "expected-datoms")));
let expected_error_message: Option<&edn::Value> = transaction.get(&edn::Value::NamespacedKeyword(symbols::NamespacedKeyword::new("test", "expected-error-message")));
let entities: Vec<_> = mentat_tx_parser::Tx::parse(&[assertions][..]).unwrap();
db.transact_internal(&conn, &entities[..], bootstrap::TX0 + index + 1).unwrap();
let maybe_report = db.transact(&conn, entities);
if let Some(expected_transaction) = expected_transaction {
if expected_transaction.is_nil() {
assert!(maybe_report.is_err());
if let Some(expected_error_message) = expected_error_message {
let expected_error_message = expected_error_message.as_text();
assert!(expected_error_message.is_some(), "Expected error message to be text:\n{:?}", expected_error_message);
let error_message = maybe_report.unwrap_err().to_string();
assert!(error_message.contains(expected_error_message.unwrap()), "Expected error message:\n{}\nto contain:\n{}", error_message, expected_error_message.unwrap());
}
continue
}
let report = maybe_report.unwrap();
assert_eq!(report.tx_id, bootstrap::TX0 + index + 1);
let transactions = debug::transactions_after(&conn, &db, bootstrap::TX0 + index).unwrap();
assert_eq!(transactions.0[0].into_edn(),
*expected_transaction,
"\n{} - expected transaction:\n{}\n{}", label, transactions.0[0].into_edn(), *expected_transaction);
assert_eq!(transactions.0.len(), 1);
assert_eq!(*expected_transaction,
transactions.0[0].into_edn(),
"\n{} - expected transaction:\n{}\nbut got transaction:\n{}", label, *expected_transaction, transactions.0[0].into_edn());
}
if let Some(expected_datoms) = expected_datoms {
let datoms = debug::datoms_after(&conn, &db, bootstrap::TX0).unwrap();
assert_eq!(datoms.into_edn(),
*expected_datoms,
"\n{} - expected datoms:\n{}\n{}", label, datoms.into_edn(), *expected_datoms);
assert_eq!(*expected_datoms,
datoms.into_edn(),
"\n{} - expected datoms:\n{}\nbut got datoms:\n{}", label, *expected_datoms, datoms.into_edn())
}
// Don't allow empty tests. This will need to change if we allow transacting schema
@ -905,16 +942,14 @@ mod tests {
#[test]
fn test_add() {
let mut conn = new_connection("").expect("Couldn't open in-memory db");
assert_eq!(ensure_current_version(&mut conn).unwrap(), CURRENT_VERSION);
let bootstrap_db = DB::new(bootstrap::bootstrap_partition_map(), bootstrap::bootstrap_schema());
let mut db = ensure_current_version(&mut conn).unwrap();
// Does not include :db/txInstant.
let datoms = debug::datoms_after(&conn, &bootstrap_db, 0).unwrap();
let datoms = debug::datoms_after(&conn, &db, 0).unwrap();
assert_eq!(datoms.0.len(), 88);
// Includes :db/txInstant.
let transactions = debug::transactions_after(&conn, &bootstrap_db, 0).unwrap();
let transactions = debug::transactions_after(&conn, &db, 0).unwrap();
assert_eq!(transactions.0.len(), 1);
assert_eq!(transactions.0[0].0.len(), 89);
@ -922,28 +957,46 @@ mod tests {
let value = edn::parse::value(include_str!("../../tx/fixtures/test_add.edn")).unwrap();
let transactions = value.as_vector().unwrap();
assert_transactions(&conn, &bootstrap_db, transactions);
assert_transactions(&conn, &mut db, transactions);
}
#[test]
fn test_retract() {
let mut conn = new_connection("").expect("Couldn't open in-memory db");
assert_eq!(ensure_current_version(&mut conn).unwrap(), CURRENT_VERSION);
let bootstrap_db = DB::new(bootstrap::bootstrap_partition_map(), bootstrap::bootstrap_schema());
let mut db = ensure_current_version(&mut conn).unwrap();
// Does not include :db/txInstant.
let datoms = debug::datoms_after(&conn, &bootstrap_db, 0).unwrap();
let datoms = debug::datoms_after(&conn, &db, 0).unwrap();
assert_eq!(datoms.0.len(), 88);
// Includes :db/txInstant.
let transactions = debug::transactions_after(&conn, &bootstrap_db, 0).unwrap();
let transactions = debug::transactions_after(&conn, &db, 0).unwrap();
assert_eq!(transactions.0.len(), 1);
assert_eq!(transactions.0[0].0.len(), 89);
let value = edn::parse::value(include_str!("../../tx/fixtures/test_retract.edn")).unwrap();
let transactions = value.as_vector().unwrap();
assert_transactions(&conn, &bootstrap_db, transactions);
assert_transactions(&conn, &mut db, transactions);
}
#[test]
fn test_upsert_vector() {
let mut conn = new_connection("").expect("Couldn't open in-memory db");
let mut db = ensure_current_version(&mut conn).unwrap();
// Does not include :db/txInstant.
let datoms = debug::datoms_after(&conn, &db, 0).unwrap();
assert_eq!(datoms.0.len(), 88);
// Includes :db/txInstant.
let transactions = debug::transactions_after(&conn, &db, 0).unwrap();
assert_eq!(transactions.0.len(), 1);
assert_eq!(transactions.0[0].0.len(), 89);
let value = edn::parse::value(include_str!("../../tx/fixtures/test_upsert_vector.edn")).unwrap();
let transactions = value.as_vector().unwrap();
assert_transactions(&conn, &mut db, transactions);
}
}

85
db/src/internal_types.rs Normal file
View file

@ -0,0 +1,85 @@
// Copyright 2016 Mozilla
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// this file except in compliance with the License. You may obtain a copy of the
// License at http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
#![allow(dead_code)]
//! Types used only within the transactor. These should not be exposed outside of this crate.
use std;
use std::collections::HashMap;
use std::rc::Rc;
use errors;
use errors::ErrorKind;
use types::*;
use mentat_tx::entities::OpType;
#[derive(Clone,Debug,Eq,Hash,Ord,PartialOrd,PartialEq)]
pub enum Term<E, V> {
AddOrRetract(OpType, E, Entid, V),
}
pub type EntidOr<T> = std::result::Result<Entid, T>;
pub type TypedValueOr<T> = std::result::Result<TypedValue, T>;
pub type TempId = Rc<String>;
pub type TempIdMap = HashMap<TempId, Entid>;
pub type LookupRef = Rc<AVPair>;
/// Internal representation of an entid on its way to resolution. We either have the simple case (a
/// numeric entid), a lookup-ref that still needs to be resolved (an atomized [a v] pair), or a temp
/// ID that needs to be upserted or allocated (an atomized tempid).
#[derive(Clone,Debug,Eq,Hash,Ord,PartialOrd,PartialEq)]
pub enum LookupRefOrTempId {
LookupRef(LookupRef),
TempId(TempId)
}
pub type TermWithTempIdsAndLookupRefs = Term<EntidOr<LookupRefOrTempId>, TypedValueOr<LookupRefOrTempId>>;
pub type TermWithTempIds = Term<EntidOr<TempId>, TypedValueOr<TempId>>;
pub type TermWithoutTempIds = Term<Entid, TypedValue>;
pub type Population = Vec<TermWithTempIds>;
impl TermWithTempIds {
// These have no tempids by definition, and just need to be unwrapped. This operation might
// also be called "lowering" or "level lowering", but the concept of "unwrapping" is common in
// Rust and seems appropriate here.
pub fn unwrap(self) -> TermWithoutTempIds {
match self {
Term::AddOrRetract(op, Ok(n), a, Ok(v)) => Term::AddOrRetract(op, n, a, v),
_ => unreachable!(),
}
}
}
/// Given an `EntidOr` or a `TypedValueOr`, replace any internal `LookupRef` with the entid from
/// the given map. Fail if any `LookupRef` cannot be replaced.
///
/// `lift` allows to specify how the entid found is mapped into the output type. (This could
/// also be an `Into` or `From` requirement.)
///
/// The reason for this awkward expression is that we're parameterizing over the _type constructor_
/// (`EntidOr` or `TypedValueOr`), which is not trivial to express in Rust. This only works because
/// they're both the same `Result<...>` type with different parameterizations.
pub fn replace_lookup_ref<T, U>(lookup_map: &AVMap, desired_or: Result<T, LookupRefOrTempId>, lift: U) -> errors::Result<Result<T, TempId>> where U: FnOnce(Entid) -> T {
match desired_or {
Ok(desired) => Ok(Ok(desired)), // N.b., must unwrap here -- the ::Ok types are different!
Err(other) => {
match other {
LookupRefOrTempId::TempId(t) => Ok(Err(t)),
LookupRefOrTempId::LookupRef(av) => lookup_map.get(&*av)
.map(|x| lift(*x)).map(Ok)
// XXX TODO: fix this error kind!
.ok_or_else(|| ErrorKind::UnrecognizedIdent(format!("couldn't lookup [a v]: {:?}", (*av).clone())).into()),
}
}
}
}

View file

@ -33,7 +33,10 @@ mod entids;
mod errors;
mod schema;
mod types;
mod internal_types;
mod upsert_resolution;
mod values;
mod tx;
pub use types::DB;
@ -73,3 +76,11 @@ pub fn repeat_values(values_per_tuple: usize, tuples: usize) -> String {
let values: String = repeat(inner).take(tuples).join(", ");
values
}
/// Return the current time in milliseconds after the Unix epoch according to the local clock.
///
/// Compare `Date.now()` in JavaScript, `System.currentTimeMillis` in Java.
pub fn now() -> i64 {
let now = time::get_time();
(now.sec as i64 * 1_000) + (now.nsec as i64 / (1_000_000))
}

309
db/src/tx.rs Normal file
View file

@ -0,0 +1,309 @@
// Copyright 2016 Mozilla
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// this file except in compliance with the License. You may obtain a copy of the
// License at http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
#![allow(dead_code)]
//! This module implements the transaction application algorithm described at
//! https://github.com/mozilla/mentat/wiki/Transacting and its children pages.
//!
//! The implementation proceeds in four main stages, labeled "Pipeline stage 1" through "Pipeline
//! stage 4". _Pipeline_ may be a misnomer, since the stages as written **cannot** be interleaved
//! in parallel. That is, a single transacted entity cannot flow through all the stages without its
//! sibling entities.
//!
//! This unintuitive architectural decision was made because the second and third stages (resolving
//! lookup refs and tempids, respectively) operate _in bulk_ to minimize the number of expensive
//! SQLite queries by processing many in one SQLite invocation. Pipeline stage 2 doesn't need to
//! operate like this: it is easy to handle each transacted entity independently of all the others
//! (and earlier, less efficient, implementations did this). However, Pipeline stage 3 appears to
//! require processing multiple elements at the same time, since there can be arbitrarily complex
//! graph relationships between tempids. Pipeline stage 4 (inserting elements into the SQL store)
//! could also be expressed as an independent operation per transacted entity, but there are
//! non-trivial uniqueness relationships inside a single transaction that need to enforced.
//! Therefore, some multi-entity processing is required, and a per-entity pipeline becomes less
//! attractive.
//!
//! A note on the types in the implementation. The pipeline stages are strongly typed: each stage
//! accepts and produces a subset of the previous. We hope this will reduce errors as data moves
//! through the system. In contrast the Clojure implementation rewrote the fundamental entity type
//! in place and suffered bugs where particular code paths missed cases.
//!
//! The type hierarchy accepts `Entity` instances from the transaction parser and flows `Term`
//! instances through the term-rewriting transaction applier. `Term` is a general `[:db/add e a v]`
//! with restrictions on the `e` and `v` components. The hierarchy is expressed using `Result` to
//! model either/or, and layers of `Result` are stripped -- we might say the `Term` instances are
//! _lowered_ as they flow through the pipeline. This type hierarchy could have been expressed by
//! combinatorially increasing `enum` cases, but this makes it difficult to handle the `e` and `v`
//! components symmetrically. Hence, layers of `Result` type aliases. Hopefully the explanatory
//! names -- `TermWithTempIdsAndLookupRefs`, anyone? -- and strongly typed stage functions will help
//! keep everything straight.
use std;
use std::collections::BTreeSet;
use db::{ReducedEntity, SearchType};
use entids;
use errors::*;
use internal_types::{
LookupRefOrTempId,
TempId,
TempIdMap,
Term,
TermWithTempIdsAndLookupRefs,
TermWithTempIds,
TermWithoutTempIds,
replace_lookup_ref};
use mentat_core::intern_set;
use mentat_tx::entities as entmod;
use mentat_tx::entities::{Entity, OpType};
use rusqlite;
use schema::SchemaBuilding;
use types::*;
use upsert_resolution::Generation;
/// A transaction on its way to being applied.
#[derive(Debug)]
pub struct Tx<'conn> {
/// The metadata to use to interpret the transaction entities with.
pub db: &'conn mut DB,
/// The SQLite connection to apply against. In the future, this will be a Mentat connection.
pub conn: &'conn rusqlite::Connection,
/// The transaction ID of the transaction.
pub tx_id: Entid,
/// The timestamp when the transaction began to be committed.
///
/// This is milliseconds after the Unix epoch according to the transactor's local clock.
// TODO: :db.type/instant.
pub tx_instant: i64,
}
impl<'conn> Tx<'conn> {
pub fn new(db: &'conn mut DB, conn: &'conn rusqlite::Connection, tx_id: Entid, tx_instant: i64) -> Tx<'conn> {
Tx {
db: db,
conn: conn,
tx_id: tx_id,
tx_instant: tx_instant,
}
}
/// Given a collection of tempids and the [a v] pairs that they might upsert to, resolve exactly
/// which [a v] pairs do upsert to entids, and map each tempid that upserts to the upserted
/// entid. The keys of the resulting map are exactly those tempids that upserted.
pub fn resolve_temp_id_avs<'a>(&self, conn: &rusqlite::Connection, temp_id_avs: &'a [(TempId, AVPair)]) -> Result<TempIdMap> {
if temp_id_avs.is_empty() {
return Ok(TempIdMap::default());
}
// Map [a v]->entid.
let mut av_pairs: Vec<&AVPair> = vec![];
for i in 0..temp_id_avs.len() {
av_pairs.push(&temp_id_avs[i].1);
}
// Lookup in the store.
let av_map: AVMap = self.db.resolve_avs(conn, &av_pairs[..])?;
// Map id->entid.
let mut temp_id_map: TempIdMap = TempIdMap::default();
for &(ref temp_id, ref av_pair) in temp_id_avs {
if let Some(n) = av_map.get(&av_pair) {
if let Some(previous_n) = temp_id_map.get(&*temp_id) {
if n != previous_n {
// Conflicting upsert! TODO: collect conflicts and give more details on what failed this transaction.
bail!(ErrorKind::NotYetImplemented(format!("Conflicting upsert: tempid '{}' resolves to more than one entid: {:?}, {:?}", temp_id, previous_n, n))) // XXX
}
}
temp_id_map.insert(temp_id.clone(), *n);
}
}
Ok((temp_id_map))
}
/// Pipeline stage 1: convert `Entity` instances into `Term` instances, ready for term
/// rewriting.
///
/// The `Term` instances produce share interned TempId and LookupRef handles.
fn entities_into_terms_with_temp_ids_and_lookup_refs<I>(&self, entities: I) -> Result<Vec<TermWithTempIdsAndLookupRefs>> where I: IntoIterator<Item=Entity> {
let mut temp_ids = intern_set::InternSet::new();
entities.into_iter()
.map(|entity: Entity| -> Result<TermWithTempIdsAndLookupRefs> {
match entity {
Entity::AddOrRetract { op, e, a, v } => {
let a: i64 = match a {
entmod::Entid::Entid(ref a) => *a,
entmod::Entid::Ident(ref a) => self.db.schema.require_entid(&a.to_string())?,
};
let attribute: &Attribute = self.db.schema.require_attribute_for_entid(a)?;
let e = match e {
entmod::EntidOrLookupRefOrTempId::Entid(e) => {
let e: i64 = match e {
entmod::Entid::Entid(ref e) => *e,
entmod::Entid::Ident(ref e) => self.db.schema.require_entid(&e.to_string())?,
};
std::result::Result::Ok(e)
},
entmod::EntidOrLookupRefOrTempId::TempId(e) => {
std::result::Result::Err(LookupRefOrTempId::TempId(temp_ids.intern(e)))
},
entmod::EntidOrLookupRefOrTempId::LookupRef(_) => {
// TODO: reference entity and initial input.
bail!(ErrorKind::NotYetImplemented(format!("Transacting lookup-refs is not yet implemented")))
},
};
let v = {
if attribute.value_type == ValueType::Ref && v.is_text() {
std::result::Result::Err(LookupRefOrTempId::TempId(temp_ids.intern(v.as_text().unwrap().clone())))
} else if attribute.value_type == ValueType::Ref && v.is_vector() && v.as_vector().unwrap().len() == 2 {
bail!(ErrorKind::NotYetImplemented(format!("Transacting lookup-refs is not yet implemented")))
} else {
// Here is where we do schema-aware typechecking: we either assert that
// the given value is in the attribute's value set, or (in limited
// cases) coerce the value into the attribute's value set.
let typed_value: TypedValue = self.db.to_typed_value(&v, &attribute)?;
std::result::Result::Ok(typed_value)
}
};
Ok(Term::AddOrRetract(op, e, a, v))
},
}
})
.collect::<Result<Vec<_>>>()
}
/// Pipeline stage 2: rewrite `Term` instances with lookup refs into `Term` instances without
/// lookup refs.
///
/// The `Term` instances produce share interned TempId handles and have no LookupRef references.
fn resolve_lookup_refs<I>(&self, lookup_ref_map: &AVMap, terms: I) -> Result<Vec<TermWithTempIds>> where I: IntoIterator<Item=TermWithTempIdsAndLookupRefs> {
terms.into_iter().map(|term: TermWithTempIdsAndLookupRefs| -> Result<TermWithTempIds> {
match term {
Term::AddOrRetract(op, e, a, v) => {
let e = replace_lookup_ref(&lookup_ref_map, e, |x| x)?;
let v = replace_lookup_ref(&lookup_ref_map, v, |x| TypedValue::Ref(x))?;
Ok(Term::AddOrRetract(op, e, a, v))
},
}
}).collect::<Result<Vec<_>>>()
}
/// Transact the given `entities` against the given SQLite `conn`, using the metadata in
/// `self.DB`.
///
/// This approach is explained in https://github.com/mozilla/mentat/wiki/Transacting.
// TODO: move this to the transactor layer.
pub fn transact_entities<I>(&mut self, entities: I) -> Result<TxReport> where I: IntoIterator<Item=Entity> {
// TODO: push these into an internal transaction report?
/// Assertions that are :db.cardinality/one and not :db.fulltext.
let mut non_fts_one: Vec<ReducedEntity> = vec![];
/// Assertions that are :db.cardinality/many and not :db.fulltext.
let mut non_fts_many: Vec<ReducedEntity> = vec![];
// Transact [:db/add :db/txInstant NOW :db/tx].
// TODO: allow this to be present in the transaction data.
non_fts_one.push((self.tx_id,
entids::DB_TX_INSTANT,
TypedValue::Long(self.tx_instant),
true));
// We don't yet support lookup refs, so this isn't mutable. Later, it'll be mutable.
let lookup_refs: intern_set::InternSet<AVPair> = intern_set::InternSet::new();
// TODO: extract the tempids set as well.
// Pipeline stage 1: entities -> terms with tempids and lookup refs.
let terms_with_temp_ids_and_lookup_refs = self.entities_into_terms_with_temp_ids_and_lookup_refs(entities)?;
// Pipeline stage 2: resolve lookup refs -> terms with tempids.
let lookup_ref_avs: Vec<&(i64, TypedValue)> = lookup_refs.inner.iter().map(|rc| &**rc).collect();
let lookup_ref_map: AVMap = self.db.resolve_avs(self.conn, &lookup_ref_avs[..])?;
let terms_with_temp_ids = self.resolve_lookup_refs(&lookup_ref_map, terms_with_temp_ids_and_lookup_refs)?;
// Pipeline stage 3: upsert tempids -> terms without tempids or lookup refs.
// Now we can collect upsert populations.
let (mut generation, inert_terms) = Generation::from(terms_with_temp_ids, &self.db.schema)?;
// And evolve them forward.
while generation.can_evolve() {
// Evolve further.
let temp_id_map = self.resolve_temp_id_avs(self.conn, &generation.temp_id_avs()[..])?;
generation = generation.evolve_one_step(&temp_id_map);
}
// Allocate entids for tempids that didn't upsert. BTreeSet rather than HashSet so this is deterministic.
let unresolved_temp_ids: BTreeSet<TempId> = generation.temp_ids_in_allocations();
// TODO: track partitions for temporary IDs.
let entids = self.db.allocate_entids(":db.part/user", unresolved_temp_ids.len());
let temp_id_allocations: TempIdMap = unresolved_temp_ids.into_iter().zip(entids).collect();
let final_populations = generation.into_final_populations(&temp_id_allocations)?;
let final_terms: Vec<TermWithoutTempIds> = [final_populations.resolved,
final_populations.allocated,
inert_terms.into_iter().map(|term| term.unwrap()).collect()].concat();
// Pipeline stage 4: final terms (after rewriting) -> DB insertions.
// Collect into non_fts_*.
// TODO: use something like Clojure's group_by to do this.
for term in final_terms {
match term {
Term::AddOrRetract(op, e, a, v) => {
let attribute: &Attribute = self.db.schema.require_attribute_for_entid(a)?;
if attribute.fulltext {
bail!(ErrorKind::NotYetImplemented(format!("Transacting :db/fulltext entities is not yet implemented"))) // TODO: reference original input. Difficult!
}
let added = op == OpType::Add;
if attribute.multival {
non_fts_many.push((e, a, v, added));
} else {
non_fts_one.push((e, a, v, added));
}
},
}
}
if !non_fts_one.is_empty() {
self.db.insert_non_fts_searches(self.conn, &non_fts_one[..], self.tx_id, SearchType::Inexact)?;
}
if !non_fts_many.is_empty() {
self.db.insert_non_fts_searches(self.conn, &non_fts_many[..], self.tx_id, SearchType::Exact)?;
}
self.db.search(self.conn)?;
self.db.insert_transaction(self.conn, self.tx_id)?;
self.db.update_datoms(self.conn, self.tx_id)?;
// TODO: update idents and schema materialized views.
self.db.update_partition_map(self.conn)?;
Ok(TxReport {
tx_id: self.tx_id,
tx_instant: self.tx_instant,
})
}
}

View file

@ -10,7 +10,8 @@
#![allow(dead_code)]
use std::collections::{BTreeMap};
use std::collections::HashMap;
use std::collections::BTreeMap;
extern crate mentat_core;
@ -64,3 +65,27 @@ impl DB {
}
}
}
/// A pair [a v] in the store.
///
/// Used to represent lookup-refs and [TEMPID a v] upserts as they are resolved.
pub type AVPair = (Entid, TypedValue);
/// Map [a v] pairs to existing entids.
///
/// Used to resolve lookup-refs and upserts.
pub type AVMap<'a> = HashMap<&'a AVPair, Entid>;
/// A transaction report summarizes an applied transaction.
// TODO: include map of resolved tempids.
#[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialOrd, PartialEq)]
pub struct TxReport {
/// The transaction ID of the transaction.
pub tx_id: Entid,
/// The timestamp when the transaction began to be committed.
///
/// This is milliseconds after the Unix epoch according to the transactor's local clock.
// TODO: :db.type/instant.
pub tx_instant: i64,
}

265
db/src/upsert_resolution.rs Normal file
View file

@ -0,0 +1,265 @@
// Copyright 2016 Mozilla
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// this file except in compliance with the License. You may obtain a copy of the
// License at http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
#![allow(dead_code)]
//! This module implements the upsert resolution algorithm described at
//! https://github.com/mozilla/mentat/wiki/Transacting:-upsert-resolution-algorithm.
use std::collections::BTreeSet;
use mentat_tx::entities::OpType;
use errors;
use errors::ErrorKind;
use types::{Attribute, AVPair, Entid, Schema, TypedValue};
use internal_types::*;
use schema::SchemaBuilding;
/// A "Simple upsert" that looks like [:db/add TEMPID a v], where a is :db.unique/identity.
#[derive(Clone,Debug,Eq,Hash,Ord,PartialOrd,PartialEq)]
struct UpsertE(TempId, Entid, TypedValue);
/// A "Complex upsert" that looks like [:db/add TEMPID a OTHERID], where a is :db.unique/identity
#[derive(Clone,Debug,Eq,Hash,Ord,PartialOrd,PartialEq)]
struct UpsertEV(TempId, Entid, TempId);
/// A generation collects entities into populations at a single evolutionary step in the upsert
/// resolution evolution process.
///
/// The upsert resolution process is only concerned with [:db/add ...] entities until the final
/// entid allocations. That's why we separate into special simple and complex upsert types
/// immediately, and then collect the more general term types for final resolution.
#[derive(Clone,Debug,Default,Eq,Hash,Ord,PartialOrd,PartialEq)]
pub struct Generation {
/// "Simple upserts" that look like [:db/add TEMPID a v], where a is :db.unique/identity.
upserts_e: Vec<UpsertE>,
/// "Complex upserts" that look like [:db/add TEMPID a OTHERID], where a is :db.unique/identity
upserts_ev: Vec<UpsertEV>,
/// Entities that look like:
/// - [:db/add TEMPID b OTHERID], where b is not :db.unique/identity;
/// - [:db/add TEMPID b v], where b is not :db.unique/identity.
/// - [:db/add e b OTHERID].
allocations: Vec<TermWithTempIds>,
/// Entities that upserted and no longer reference tempids. These assertions are guaranteed to
/// be in the store.
upserted: Vec<TermWithoutTempIds>,
/// Entities that resolved due to other upserts and no longer reference tempids. These
/// assertions may or may not be in the store.
resolved: Vec<TermWithoutTempIds>,
}
#[derive(Clone,Debug,Default,Eq,Hash,Ord,PartialOrd,PartialEq)]
pub struct FinalPopulations {
/// Upserts that upserted.
pub upserted: Vec<TermWithoutTempIds>,
/// Allocations that resolved due to other upserts.
pub resolved: Vec<TermWithoutTempIds>,
/// Allocations that required new entid allocations.
pub allocated: Vec<TermWithoutTempIds>,
}
impl Generation {
/// Split entities into a generation of populations that need to evolve to have their tempids
/// resolved or allocated, and a population of inert entities that do not reference tempids.
pub fn from<I>(terms: I, schema: &Schema) -> errors::Result<(Generation, Population)> where I: IntoIterator<Item=TermWithTempIds> {
let mut generation = Generation::default();
let mut inert = vec![];
let is_unique = |a: Entid| -> errors::Result<bool> {
let attribute: &Attribute = schema.require_attribute_for_entid(a)?;
Ok(attribute.unique_identity)
};
for term in terms.into_iter() {
match term {
Term::AddOrRetract(op, Err(e), a, Err(v)) => {
if op == OpType::Add && is_unique(a)? {
generation.upserts_ev.push(UpsertEV(e, a, v));
} else {
generation.allocations.push(Term::AddOrRetract(op, Err(e), a, Err(v)));
}
},
Term::AddOrRetract(op, Err(e), a, Ok(v)) => {
if op == OpType::Add && is_unique(a)? {
generation.upserts_e.push(UpsertE(e, a, v));
} else {
generation.allocations.push(Term::AddOrRetract(op, Err(e), a, Ok(v)));
}
},
Term::AddOrRetract(op, Ok(e), a, Err(v)) => {
generation.allocations.push(Term::AddOrRetract(op, Ok(e), a, Err(v)));
},
Term::AddOrRetract(op, Ok(e), a, Ok(v)) => {
inert.push(Term::AddOrRetract(op, Ok(e), a, Ok(v)));
},
}
}
Ok((generation, inert))
}
/// Return true if it's possible to evolve this generation further.
///
/// There can be complex upserts but no simple upserts to help resolve them. We accept the
/// overhead of having the database try to resolve an empty set of simple upserts, to avoid
/// having to special case complex upserts at entid allocation time.
pub fn can_evolve(&self) -> bool {
!self.upserts_e.is_empty() || !self.upserts_ev.is_empty()
}
/// Evolve this generation one step further by rewriting the existing :db/add entities using the
/// given temporary IDs.
///
/// TODO: Considering doing this in place; the function already consumes `self`.
pub fn evolve_one_step(self, temp_id_map: &TempIdMap) -> Generation {
let mut next = Generation::default();
for UpsertE(t, a, v) in self.upserts_e {
match temp_id_map.get(&*t) {
Some(&n) => next.upserted.push(Term::AddOrRetract(OpType::Add, n, a, v)),
None => next.allocations.push(Term::AddOrRetract(OpType::Add, Err(t), a, Ok(v))),
}
}
for UpsertEV(t1, a, t2) in self.upserts_ev {
match (temp_id_map.get(&*t1), temp_id_map.get(&*t2)) {
(Some(&n1), Some(&n2)) => next.resolved.push(Term::AddOrRetract(OpType::Add, n1, a, TypedValue::Ref(n2))),
(None, Some(&n2)) => next.upserts_e.push(UpsertE(t1, a, TypedValue::Ref(n2))),
(Some(&n1), None) => next.allocations.push(Term::AddOrRetract(OpType::Add, Ok(n1), a, Err(t2))),
(None, None) => next.allocations.push(Term::AddOrRetract(OpType::Add, Err(t1), a, Err(t2))),
}
}
// There's no particular need to separate resolved from allocations right here and right
// now, although it is convenient.
for term in self.allocations {
// TODO: find an expression that destructures less? I still expect this to be efficient
// but it's a little verbose.
match term {
Term::AddOrRetract(op, Err(t1), a, Err(t2)) => {
match (temp_id_map.get(&*t1), temp_id_map.get(&*t2)) {
(Some(&n1), Some(&n2)) => next.resolved.push(Term::AddOrRetract(op, n1, a, TypedValue::Ref(n2))),
(None, Some(&n2)) => next.allocations.push(Term::AddOrRetract(op, Err(t1), a, Ok(TypedValue::Ref(n2)))),
(Some(&n1), None) => next.allocations.push(Term::AddOrRetract(op, Ok(n1), a, Err(t2))),
(None, None) => next.allocations.push(Term::AddOrRetract(op, Err(t1), a, Err(t2))),
}
},
Term::AddOrRetract(op, Err(t), a, Ok(v)) => {
match temp_id_map.get(&*t) {
Some(&n) => next.resolved.push(Term::AddOrRetract(op, n, a, v)),
None => next.allocations.push(Term::AddOrRetract(op, Err(t), a, Ok(v))),
}
},
Term::AddOrRetract(op, Ok(e), a, Err(t)) => {
match temp_id_map.get(&*t) {
Some(&n) => next.resolved.push(Term::AddOrRetract(op, e, a, TypedValue::Ref(n))),
None => next.allocations.push(Term::AddOrRetract(op, Ok(e), a, Err(t))),
}
},
Term::AddOrRetract(_, Ok(_), _, Ok(_)) => unreachable!(),
}
}
next
}
// Collect id->[a v] pairs that might upsert at this evolutionary step.
pub fn temp_id_avs<'a>(&'a self) -> Vec<(TempId, AVPair)> {
let mut temp_id_avs: Vec<(TempId, AVPair)> = vec![];
// TODO: map/collect.
for &UpsertE(ref t, ref a, ref v) in &self.upserts_e {
// TODO: figure out how to make this less expensive, i.e., don't require
// clone() of an arbitrary value.
temp_id_avs.push((t.clone(), (*a, v.clone())));
}
temp_id_avs
}
/// After evolution is complete, yield the set of tempids that require entid allocation. These
/// are the tempids that appeared in [:db/add ...] entities, but that didn't upsert to existing
/// entids.
pub fn temp_ids_in_allocations(&self) -> BTreeSet<TempId> {
assert!(self.upserts_e.is_empty(), "All upserts should have been upserted, resolved, or moved to the allocated population!");
assert!(self.upserts_ev.is_empty(), "All upserts should have been upserted, resolved, or moved to the allocated population!");
let mut temp_ids: BTreeSet<TempId> = BTreeSet::default();
for term in self.allocations.iter() {
match term {
&Term::AddOrRetract(OpType::Add, Err(ref t1), _, Err(ref t2)) => {
temp_ids.insert(t1.clone());
temp_ids.insert(t2.clone());
},
&Term::AddOrRetract(OpType::Add, Err(ref t), _, Ok(_)) => {
temp_ids.insert(t.clone());
},
&Term::AddOrRetract(OpType::Add, Ok(_), _, Err(ref t)) => {
temp_ids.insert(t.clone());
},
&Term::AddOrRetract(OpType::Add, Ok(_), _, Ok(_)) => unreachable!(),
&Term::AddOrRetract(OpType::Retract, _, _, _) => {
// [:db/retract ...] entities never allocate entids; they have to resolve due to
// other upserts (or they fail the transaction).
},
}
}
temp_ids
}
/// After evolution is complete, use the provided allocated entids to segment `self` into
/// populations, each with no references to tempids.
pub fn into_final_populations(self, temp_id_map: &TempIdMap) -> errors::Result<FinalPopulations> {
assert!(self.upserts_e.is_empty());
assert!(self.upserts_ev.is_empty());
let mut populations = FinalPopulations::default();
populations.upserted = self.upserted;
populations.resolved = self.resolved;
for term in self.allocations {
let allocated = match term {
// TODO: consider require implementing require on temp_id_map.
Term::AddOrRetract(op, Err(t1), a, Err(t2)) => {
match (op, temp_id_map.get(&*t1), temp_id_map.get(&*t2)) {
(op, Some(&n1), Some(&n2)) => Term::AddOrRetract(op, n1, a, TypedValue::Ref(n2)),
(OpType::Add, _, _) => unreachable!(), // This is a coding error -- every tempid in a :db/add entity should resolve or be allocated.
(OpType::Retract, _, _) => bail!(ErrorKind::NotYetImplemented(format!("[:db/retract ...] entity referenced tempid that did not upsert: one of {}, {}", t1, t2))),
}
},
Term::AddOrRetract(op, Err(t), a, Ok(v)) => {
match (op, temp_id_map.get(&*t)) {
(op, Some(&n)) => Term::AddOrRetract(op, n, a, v),
(OpType::Add, _) => unreachable!(), // This is a coding error.
(OpType::Retract, _) => bail!(ErrorKind::NotYetImplemented(format!("[:db/retract ...] entity referenced tempid that did not upsert: {}", t))),
}
},
Term::AddOrRetract(op, Ok(e), a, Err(t)) => {
match (op, temp_id_map.get(&*t)) {
(op, Some(&n)) => Term::AddOrRetract(op, e, a, TypedValue::Ref(n)),
(OpType::Add, _) => unreachable!(), // This is a coding error.
(OpType::Retract, _) => bail!(ErrorKind::NotYetImplemented(format!("[:db/retract ...] entity referenced tempid that did not upsert: {}", t))),
}
},
Term::AddOrRetract(_, Ok(_), _, Ok(_)) => unreachable!(), // This is a coding error -- these should not be in allocations.
};
populations.allocated.push(allocated);
}
Ok(populations)
}
}

View file

@ -21,7 +21,7 @@ use combine::{any, eof, many, parser, satisfy_map, token, Parser, ParseResult, S
use combine::combinator::{Expected, FnParser};
use edn::symbols::NamespacedKeyword;
use edn::types::Value;
use mentat_tx::entities::{Entid, EntidOrLookupRef, Entity, LookupRef, OpType, ValueOrLookupRef};
use mentat_tx::entities::{Entid, EntidOrLookupRefOrTempId, Entity, LookupRef, OpType};
use mentat_parser_utils::ResultParser;
pub struct Tx<I>(::std::marker::PhantomData<fn(I) -> I>);
@ -64,20 +64,25 @@ def_parser_fn!(Tx, lookup_ref, Value, LookupRef, input, {
.parse_stream(input)
});
def_parser_fn!(Tx, entid_or_lookup_ref, Value, EntidOrLookupRef, input, {
Tx::<I>::entid()
.map(|x| EntidOrLookupRef::Entid(x))
.or(Tx::<I>::lookup_ref().map(|x| EntidOrLookupRef::LookupRef(x)))
def_parser_fn!(Tx, entid_or_lookup_ref_or_temp_id, Value, EntidOrLookupRefOrTempId, input, {
Tx::<I>::entid().map(|x| EntidOrLookupRefOrTempId::Entid(x))
.or(Tx::<I>::lookup_ref().map(|x| EntidOrLookupRefOrTempId::LookupRef(x)))
.or(Tx::<I>::temp_id().map(|x| EntidOrLookupRefOrTempId::TempId(x)))
.parse_lazy(input)
.into()
});
def_parser_fn!(Tx, temp_id, Value, String, input, {
satisfy_map(|x: Value| x.into_text())
.parse_stream(input)
});
// TODO: abstract the "match Vector, parse internal stream" pattern to remove this boilerplate.
def_parser_fn!(Tx, add, Value, Entity, input, {
satisfy_map(|x: Value| -> Option<Entity> {
if let Value::Vector(y) = x {
let mut p = (token(Value::NamespacedKeyword(NamespacedKeyword::new("db", "add"))),
Tx::<&[Value]>::entid_or_lookup_ref(),
Tx::<&[Value]>::entid_or_lookup_ref_or_temp_id(),
Tx::<&[Value]>::entid(),
// TODO: handle lookup-ref.
any(),
@ -87,7 +92,7 @@ def_parser_fn!(Tx, add, Value, Entity, input, {
op: OpType::Add,
e: e,
a: a,
v: ValueOrLookupRef::Value(v),
v: v,
}
});
// TODO: use ok() with a type annotation rather than explicit match.
@ -106,7 +111,7 @@ def_parser_fn!(Tx, retract, Value, Entity, input, {
satisfy_map(|x: Value| -> Option<Entity> {
if let Value::Vector(y) = x {
let mut p = (token(Value::NamespacedKeyword(NamespacedKeyword::new("db", "retract"))),
Tx::<&[Value]>::entid_or_lookup_ref(),
Tx::<&[Value]>::entid_or_lookup_ref_or_temp_id(),
Tx::<&[Value]>::entid(),
// TODO: handle lookup-ref.
any(),
@ -116,7 +121,7 @@ def_parser_fn!(Tx, retract, Value, Entity, input, {
op: OpType::Retract,
e: e,
a: a,
v: ValueOrLookupRef::Value(v),
v: v,
}
});
// TODO: use ok() with a type annotation rather than explicit match.
@ -170,6 +175,7 @@ mod tests {
use combine::Parser;
use edn::symbols::NamespacedKeyword;
use edn::types::Value;
use mentat_tx::entities::{Entid, EntidOrLookupRefOrTempId, Entity, LookupRef, OpType};
fn kw(namespace: &str, name: &str) -> Value {
Value::NamespacedKeyword(NamespacedKeyword::new(namespace, name))
@ -186,10 +192,10 @@ mod tests {
assert_eq!(result,
Ok((Entity::AddOrRetract {
op: OpType::Add,
e: EntidOrLookupRef::Entid(Entid::Ident(NamespacedKeyword::new("test",
"entid"))),
e: EntidOrLookupRefOrTempId::Entid(Entid::Ident(NamespacedKeyword::new("test",
"entid"))),
a: Entid::Ident(NamespacedKeyword::new("test", "a")),
v: ValueOrLookupRef::Value(Value::Text("v".into())),
v: Value::Text("v".into()),
},
&[][..])));
}
@ -205,9 +211,9 @@ mod tests {
assert_eq!(result,
Ok((Entity::AddOrRetract {
op: OpType::Retract,
e: EntidOrLookupRef::Entid(Entid::Entid(101)),
e: EntidOrLookupRefOrTempId::Entid(Entid::Entid(101)),
a: Entid::Ident(NamespacedKeyword::new("test", "a")),
v: ValueOrLookupRef::Value(Value::Text("v".into())),
v: Value::Text("v".into()),
},
&[][..])));
}
@ -224,12 +230,12 @@ mod tests {
assert_eq!(result,
Ok((Entity::AddOrRetract {
op: OpType::Add,
e: EntidOrLookupRef::LookupRef(LookupRef {
e: EntidOrLookupRefOrTempId::LookupRef(LookupRef {
a: Entid::Ident(NamespacedKeyword::new("test", "a1")),
v: Value::Text("v1".into()),
}),
a: Entid::Ident(NamespacedKeyword::new("test", "a")),
v: ValueOrLookupRef::Value(Value::Text("v".into())),
v: Value::Text("v".into()),
},
&[][..])));
}

View file

@ -16,15 +16,15 @@ extern crate mentat_tx_parser;
use edn::parse;
use edn::symbols::NamespacedKeyword;
use edn::types::Value;
use mentat_tx::entities::{Entid, EntidOrLookupRef, Entity, OpType, ValueOrLookupRef};
use mentat_tx::entities::{Entid, EntidOrLookupRefOrTempId, Entity, OpType};
use mentat_tx_parser::Tx;
#[test]
fn test_entities() {
// TODO: align with whitespace after the EDN parser ignores more whitespace.
let input = r#"[[:db/add 101 :test/a "v"]
[:db/retract 102 :test/b "w"]]"#;
let input = r#"
[[:db/add 101 :test/a "v"]
[:db/add "tempid" :test/a "v"]
[:db/retract 102 :test/b "w"]]"#;
let edn = parse::value(input).unwrap();
let input = [edn];
@ -33,16 +33,22 @@ fn test_entities() {
assert_eq!(result,
Ok(vec![
Entity::AddOrRetract {
e: EntidOrLookupRef::Entid(Entid::Entid(101)),
a: Entid::Ident(NamespacedKeyword::new("test", "a")),
v: ValueOrLookupRef::Value(Value::Text("v".into())),
op: OpType::Add,
e: EntidOrLookupRefOrTempId::Entid(Entid::Entid(101)),
a: Entid::Ident(NamespacedKeyword::new("test", "a")),
v: Value::Text("v".into()),
},
Entity::AddOrRetract {
op: OpType::Add,
e: EntidOrLookupRefOrTempId::TempId("tempid".into()),
a: Entid::Ident(NamespacedKeyword::new("test", "a")),
v: Value::Text("v".into()),
},
Entity::AddOrRetract {
e: EntidOrLookupRef::Entid(Entid::Entid(102)),
a: Entid::Ident(NamespacedKeyword::new("test", "b")),
v: ValueOrLookupRef::Value(Value::Text("w".into())),
op: OpType::Retract,
e: EntidOrLookupRefOrTempId::Entid(Entid::Entid(102)),
a: Entid::Ident(NamespacedKeyword::new("test", "b")),
v: Value::Text("w".into()),
},
]));
}

View file

@ -0,0 +1,121 @@
[{:test/label ":db.cardinality/one, insert"
:test/assertions
[[:db/add 100 :db/ident :name/Ivan]
[:db/add 101 :db/ident :name/Petr]]
:test/expected-transaction
#{[100 :db/ident :name/Ivan ?tx1 true]
[101 :db/ident :name/Petr ?tx1 true]
[?tx1 :db/txInstant ?ms1 ?tx1 true]}
:test/expected-datoms
#{[100 :db/ident :name/Ivan]
[101 :db/ident :name/Petr]}}
{:test/label "upsert two tempids to same entid"
:test/assertions
[[:db/add "t1" :db/ident :name/Ivan]
[:db/add "t1" :db.schema/attribute 100]
[:db/add "t2" :db/ident :name/Petr]
[:db/add "t2" :db.schema/attribute 101]]
:test/expected-transaction
#{[100 :db.schema/attribute 100 ?tx2 true]
[101 :db.schema/attribute 101 ?tx2 true]
[?tx2 :db/txInstant ?ms2 ?tx2 true]}
:test/expected-datoms
#{[100 :db/ident :name/Ivan]
[101 :db/ident :name/Petr]
[100 :db.schema/attribute 100]
[101 :db.schema/attribute 101]}
:test/expected-tempids
{"t1" 100
"t2" 101}}
{:test/label "upsert with tempid"
:test/assertions
[[:db/add "t1" :db/ident :name/Ivan]
;; Ref doesn't have to exist (at this time). Can't reuse due to :db/unique :db.unique/value.
[:db/add "t1" :db.schema/attribute 102]]
:test/expected-transaction
#{[100 :db.schema/attribute 102 ?tx3 true]
[?tx3 :db/txInstant ?ms3 ?tx3 true]}
:test/expected-datoms
#{[100 :db/ident :name/Ivan]
[101 :db/ident :name/Petr]
[100 :db.schema/attribute 100]
[100 :db.schema/attribute 102]
[101 :db.schema/attribute 101]}
:test/expected-tempids
{"t1" 100}}
;; TODO: don't hard-code allocated entids.
{:test/label "single complex upsert allocates new entid"
:test/assertions
[[:db/add "t1" :db.schema/attribute "t2"]]
:test/expected-transaction
#{[65536 :db.schema/attribute 65537 ?tx4 true]
[?tx4 :db/txInstant ?ms4 ?tx4 true]}
:test/expected-tempids
{"t1" 65536
"t2" 65537}}
{:test/label "conflicting upserts fail"
:test/assertions
[[:db/add "t1" :db/ident :name/Ivan]
[:db/add "t1" :db/ident :name/Petr]]
:test/expected-transaction
nil
:test/expected-error-message
"Conflicting upsert"
;; nil
}
{:test/label "tempids in :db/retract that do upsert are fine"
:test/assertions
[[:db/add "t1" :db/ident :name/Ivan]
;; This ref doesn't exist, so the assertion will be ignored.
[:db/retract "t1" :db.schema/attribute 103]]
:test/expected-transaction
#{[?tx6 :db/txInstant ?ms6 ?tx6 true]}
:test/expected-error-message
""
:test/expected-tempids
{}}
{:test/label "tempids in :db/retract that don't upsert fail"
:test/assertions
[[:db/retract "t1" :db/ident :name/Anonymous]]
:test/expected-transaction
nil
:test/expected-error-message
""}
;; The upsert algorithm will first try to resolve "t1", fail, and then allocate both "t1" and "t2".
{:test/label "multistep, both allocated"
:test/assertions
[[:db/add "t1" :db/ident :name/Josef]
[:db/add "t2" :db.schema/attribute "t1"]]
:test/expected-transaction
#{[65538 :db/ident :name/Josef ?tx8 true]
[65539 :db.schema/attribute 65538 ?tx8 true]
[?tx8 :db/txInstant ?ms8 ?tx8 true]}
:test/expected-error-message
""
:test/expected-tempids
{"t1" 65538
"t2" 65539}}
;; Can't quite test this without more schema elements.
;; ;; This time, we can resolve both, but we have to try "t1", succeed, and then resolve "t2".
;; {:test/label "multistep, upserted allocated"
;; :test/assertions
;; [[:db/add "t1" :db/ident :name/Josef]
;; [:db/add "t2" :db/ident "t1"]]
;; :test/expected-transaction
;; #{[65538 :db/ident :name/Josef]
;; [65538 :db/ident :name/Karl]
;; [?tx8 :db/txInstant ?ms8 ?tx8 true]}
;; :test/expected-error-message
;; ""
;; :test/expected-tempids
;; {"t1" 65538
;; "t2" 65539}}
]

View file

@ -35,12 +35,13 @@ pub enum EntidOrLookupRef {
}
#[derive(Clone, Debug, PartialEq)]
pub enum ValueOrLookupRef {
Value(Value),
pub enum EntidOrLookupRefOrTempId {
Entid(Entid),
LookupRef(LookupRef),
TempId(String),
}
#[derive(Clone, Debug, PartialEq)]
#[derive(Clone, Debug, Eq, Hash, Ord, PartialOrd, PartialEq)]
pub enum OpType {
Add,
Retract,
@ -50,8 +51,8 @@ pub enum OpType {
pub enum Entity {
AddOrRetract {
op: OpType,
e: EntidOrLookupRef,
e: EntidOrLookupRefOrTempId,
a: Entid,
v: ValueOrLookupRef,
v: Value,
},
}