diff --git a/core/src/intern_set.rs b/core/src/intern_set.rs new file mode 100644 index 00000000..490a337b --- /dev/null +++ b/core/src/intern_set.rs @@ -0,0 +1,45 @@ +// Copyright 2016 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +#![allow(dead_code)] + +use std::collections::HashSet; +use std::hash::Hash; +use std::rc::Rc; + +/// An `InternSet` allows to "intern" some potentially large values, maintaining a single value +/// instance owned by the `InternSet` and leaving consumers with lightweight ref-counted handles to +/// the large owned value. This can avoid expensive clone() operations. +/// +/// In Mentat, such large values might be strings or arbitrary [a v] pairs. +/// +/// See https://en.wikipedia.org/wiki/String_interning for discussion. +#[derive(Clone, Debug, Default, Eq, PartialEq)] +pub struct InternSet where T: Eq + Hash { + pub inner: HashSet>, +} + +impl InternSet where T: Eq + Hash { + pub fn new() -> InternSet { + InternSet { + inner: HashSet::new(), + } + } + + /// Intern a value, providing a ref-counted handle to the interned value. + pub fn intern(&mut self, value: T) -> Rc { + let key = Rc::new(value); + if self.inner.insert(key.clone()) { + key + } else { + self.inner.get(&key).unwrap().clone() + } + } +} diff --git a/core/src/lib.rs b/core/src/lib.rs index c85b5534..7ec9162d 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -272,3 +272,5 @@ mod test { assert!(attr2.flags() & AttributeBitFlags::UniqueValue as u8 != 0); } } + +pub mod intern_set; diff --git a/db/src/db.rs b/db/src/db.rs index 788c8898..9aa6b105 100644 --- a/db/src/db.rs +++ b/db/src/db.rs @@ -10,19 +10,21 @@ #![allow(dead_code)] -use std::iter::once; +use std::borrow::Borrow; +use std::collections::HashMap; +use std::fmt::Display; +use std::iter::{once, repeat}; +use std::ops::Range; use std::path::Path; use itertools; use itertools::Itertools; use rusqlite; use rusqlite::types::{ToSql, ToSqlOutput}; -use time; -use ::{repeat_values, to_namespaced_keyword}; +use ::{now, repeat_values, to_namespaced_keyword}; use bootstrap; use edn::types::Value; -use entids; use mentat_core::{ Attribute, AttributeBitFlags, @@ -32,11 +34,18 @@ use mentat_core::{ TypedValue, ValueType, }; -use mentat_tx::entities as entmod; -use mentat_tx::entities::{Entity, OpType}; +use mentat_tx::entities::Entity; use errors::{ErrorKind, Result, ResultExt}; -use types::{DB, Partition, PartitionMap}; use schema::SchemaBuilding; +use types::{ + AVMap, + AVPair, + DB, + Partition, + PartitionMap, + TxReport, +}; +use tx::Tx; pub fn new_connection(uri: T) -> rusqlite::Result where T: AsRef { let conn = match uri.as_ref().to_string_lossy().len() { @@ -153,6 +162,7 @@ lazy_static! { r#"CREATE TABLE schema (ident TEXT NOT NULL, attr TEXT NOT NULL, value BLOB NOT NULL, value_type_tag SMALLINT NOT NULL, FOREIGN KEY (ident) REFERENCES idents (ident))"#, r#"CREATE INDEX idx_schema_unique ON schema (ident, attr, value, value_type_tag)"#, + // TODO: store entid instead of ident for partition name. r#"CREATE TABLE parts (part TEXT NOT NULL PRIMARY KEY, start INTEGER NOT NULL, idx INTEGER NOT NULL)"#, ] }; @@ -180,7 +190,7 @@ fn get_user_version(conn: &rusqlite::Connection) -> Result { } // TODO: rename "SQL" functions to align with "datoms" functions. -pub fn create_current_version(conn: &mut rusqlite::Connection) -> Result { +pub fn create_current_version(conn: &mut rusqlite::Connection) -> Result { let tx = conn.transaction()?; for statement in (&V2_STATEMENTS).iter() { @@ -191,20 +201,21 @@ pub fn create_current_version(conn: &mut rusqlite::Connection) -> Result { // TODO: think more carefully about allocating new parts and bitmasking part ranges. // TODO: install these using bootstrap assertions. It's tricky because the part ranges are implicit. // TODO: one insert, chunk into 999/3 sections, for safety. + // This is necessary: `transact` will only UPDATE parts, not INSERT them if they're missing. for (part, partition) in bootstrap_partition_map.iter() { // TODO: Convert "keyword" part to SQL using Value conversion. tx.execute("INSERT INTO parts VALUES (?, ?, ?)", &[part, &partition.start, &partition.index])?; } - let bootstrap_db = DB::new(bootstrap_partition_map, bootstrap::bootstrap_schema()); - bootstrap_db.transact_internal(&tx, &bootstrap::bootstrap_entities()[..], bootstrap::TX0)?; + // TODO: return to transact_internal to self-manage the encompassing SQLite transaction. + let mut bootstrap_db = DB::new(bootstrap_partition_map, bootstrap::bootstrap_schema()); + bootstrap_db.transact(&tx, bootstrap::bootstrap_entities())?; set_user_version(&tx, CURRENT_VERSION)?; - let user_version = get_user_version(&tx)?; // TODO: use the drop semantics to do this automagically? tx.commit()?; - Ok(user_version) + Ok(bootstrap_db) } // (def v2-statements v1-statements) @@ -305,12 +316,12 @@ pub fn update_from_version(conn: &mut rusqlite::Connection, current_version: i32 Ok(user_version) } -pub fn ensure_current_version(conn: &mut rusqlite::Connection) -> Result { +pub fn ensure_current_version(conn: &mut rusqlite::Connection) -> Result { let user_version = get_user_version(&conn)?; match user_version { - CURRENT_VERSION => Ok(user_version), 0 => create_current_version(conn), - v => update_from_version(conn, v), + // TODO: support updating or re-opening an existing store. + v => bail!(ErrorKind::NotYetImplemented(format!("Opening databases with Mentat version: {}", v))), } } @@ -429,7 +440,7 @@ pub fn read_db(conn: &rusqlite::Connection) -> Result { } /// Internal representation of an [e a v added] datom, ready to be transacted against the store. -type ReducedEntity = (i64, i64, TypedValue, bool); +pub type ReducedEntity = (i64, i64, TypedValue, bool); #[derive(Clone,Debug,Eq,Hash,Ord,PartialOrd,PartialEq)] pub enum SearchType { @@ -463,6 +474,77 @@ impl DB { } } + /// Given a slice of [a v] lookup-refs, look up the corresponding [e a v] triples. + /// + /// It is assumed that the attribute `a` in each lookup-ref is `:db/unique`, so that at most one + /// matching [e a v] triple exists. (If this is not true, some matching entid `e` will be + /// chosen non-deterministically, if one exists.) + /// + /// Returns a map &(a, v) -> e, to avoid cloning potentially large values. The keys of the map + /// are exactly those (a, v) pairs that have an assertion [e a v] in the datom store. + pub fn resolve_avs<'a>(&self, conn: &rusqlite::Connection, avs: &'a [&'a AVPair]) -> Result> { + // Start search_id's at some identifiable number. + let initial_search_id = 2000; + let bindings_per_statement = 4; + + // We map [a v] -> numeric search_id -> e, and then we use the search_id lookups to finally + // produce the map [a v] -> e. + // + // TODO: `collect` into a HashSet so that any (a, v) is resolved at most once. + let chunks: itertools::IntoChunks<_> = avs.into_iter().enumerate().chunks(::SQLITE_MAX_VARIABLE_NUMBER / 4); + + // We'd like to `flat_map` here, but it's not obvious how to `flat_map` across `Result`. + // Alternatively, this is a `fold`, and it might be wise to express it as such. + let results: Result>> = chunks.into_iter().map(|chunk| -> Result> { + let mut count = 0; + + // We must keep these computed values somewhere to reference them later, so we can't + // combine this `map` and the subsequent `flat_map`. + let block: Vec<(i64, i64, ToSqlOutput<'a>, i32)> = chunk.map(|(index, &&(a, ref v))| { + count += 1; + let search_id: i64 = initial_search_id + index as i64; + let (value, value_type_tag) = v.to_sql_value_pair(); + (search_id, a, value, value_type_tag) + }).collect(); + + // `params` reference computed values in `block`. + let params: Vec<&ToSql> = block.iter().flat_map(|&(ref searchid, ref a, ref value, ref value_type_tag)| { + // Avoid inner heap allocation. + once(searchid as &ToSql) + .chain(once(a as &ToSql) + .chain(once(value as &ToSql) + .chain(once(value_type_tag as &ToSql)))) + }).collect(); + + // TODO: cache these statements for selected values of `count`. + // TODO: query against `datoms` and UNION ALL with `fulltext_datoms` rather than + // querying against `all_datoms`. We know all the attributes, and in the common case, + // where most unique attributes will not be fulltext-indexed, we'll be querying just + // `datoms`, which will be much faster. + let values: String = repeat_values(bindings_per_statement, count); + let s: String = format!("WITH t(search_id, a, v, value_type_tag) AS (VALUES {}) SELECT t.search_id, d.e \ + FROM t, all_datoms AS d \ + WHERE d.index_avet IS NOT 0 AND d.a = t.a AND d.value_type_tag = t.value_type_tag AND d.v = t.v", + values); + let mut stmt: rusqlite::Statement = conn.prepare(s.as_str())?; + + let m: Result> = stmt.query_and_then(¶ms, |row| -> Result<(i64, Entid)> { + Ok((row.get_checked(0)?, row.get_checked(1)?)) + })?.collect(); + m + }).collect::>>>(); + + // Flatten. + let results: Vec<(i64, Entid)> = results?.as_slice().concat(); + + // Create map [a v] -> e. + let m: HashMap<&'a AVPair, Entid> = results.into_iter().map(|(search_id, entid)| { + let index: usize = (search_id - initial_search_id) as usize; + (avs[index], entid) + }).collect(); + Ok(m) + } + /// Create empty temporary tables for search parameters and search results. fn create_temp_tables(&self, conn: &rusqlite::Connection) -> Result<()> { // We can't do this in one shot, since we can't prepare a batch statement. @@ -528,7 +610,7 @@ impl DB { /// /// Eventually, the details of this approach will be captured in /// https://github.com/mozilla/mentat/wiki/Transacting:-entity-to-SQL-translation. - fn insert_non_fts_searches<'a>(&self, conn: &rusqlite::Connection, entities: &'a [ReducedEntity], tx: Entid, search_type: SearchType) -> Result<()> { + pub fn insert_non_fts_searches<'a>(&self, conn: &rusqlite::Connection, entities: &'a [ReducedEntity], tx: Entid, search_type: SearchType) -> Result<()> { let bindings_per_statement = 7; let chunks: itertools::IntoChunks<_> = entities.into_iter().chunks(::SQLITE_MAX_VARIABLE_NUMBER / bindings_per_statement); @@ -592,7 +674,7 @@ impl DB { /// Take search rows and complete `temp.search_results`. /// /// See https://github.com/mozilla/mentat/wiki/Transacting:-entity-to-SQL-translation. - fn search(&self, conn: &rusqlite::Connection) -> Result<()> { + pub fn search(&self, conn: &rusqlite::Connection) -> Result<()> { // First is fast, only one table walk: lookup by exact eav. // Second is slower, but still only one table walk: lookup old value by ea. let s = r#" @@ -625,7 +707,7 @@ impl DB { /// /// See https://github.com/mozilla/mentat/wiki/Transacting:-entity-to-SQL-translation. // TODO: capture `conn` in a `TxInternal` structure. - fn insert_transaction(&self, conn: &rusqlite::Connection, tx: Entid) -> Result<()> { + pub fn insert_transaction(&self, conn: &rusqlite::Connection, tx: Entid) -> Result<()> { let s = r#" INSERT INTO transactions (e, a, v, tx, added, value_type_tag) SELECT e0, a0, v0, ?, 1, value_type_tag0 @@ -659,7 +741,7 @@ impl DB { /// /// See https://github.com/mozilla/mentat/wiki/Transacting:-entity-to-SQL-translation. // TODO: capture `conn` in a `TxInternal` structure. - fn update_datoms(&self, conn: &rusqlite::Connection, tx: Entid) -> Result<()> { + pub fn update_datoms(&self, conn: &rusqlite::Connection, tx: Entid) -> Result<()> { // Delete datoms that were retracted, or those that were :db.cardinality/one and will be // replaced. let s = r#" @@ -699,130 +781,67 @@ impl DB { Ok(()) } + /// Update the current partition map materialized view. + // TODO: only update changed partitions. + pub fn update_partition_map(&self, conn: &rusqlite::Connection) -> Result<()> { + let values_per_statement = 2; + let max_partitions = ::SQLITE_MAX_VARIABLE_NUMBER / values_per_statement; + if self.partition_map.len() > max_partitions { + bail!(ErrorKind::NotYetImplemented(format!("No more than {} partitions are supported", max_partitions))); + } + + // Like "UPDATE parts SET idx = CASE WHEN part = ? THEN ? WHEN part = ? THEN ? ELSE idx END". + let s = format!("UPDATE parts SET idx = CASE {} ELSE idx END", + repeat("WHEN part = ? THEN ?").take(self.partition_map.len()).join(" ")); + + let params: Vec<&ToSql> = self.partition_map.iter().flat_map(|(name, partition)| { + once(name as &ToSql) + .chain(once(&partition.index as &ToSql)) + }).collect(); + + // TODO: only cache the latest of these statements. Changing the set of partitions isn't + // supported in the Clojure implementation at all, and might not be supported in Mentat soon, + // so this is very low priority. + let mut stmt = conn.prepare_cached(s.as_str())?; + stmt.execute(¶ms[..]) + .map(|_c| ()) + .chain_err(|| "Could not update partition map") + } + + /// Allocate a single fresh entid in the given `partition`. + pub fn allocate_entid(&mut self, partition: &S) -> i64 where String: Borrow { + self.allocate_entids(partition, 1).start + } + + /// Allocate `n` fresh entids in the given `partition`. + pub fn allocate_entids(&mut self, partition: &S, n: usize) -> Range where String: Borrow { + match self.partition_map.get_mut(partition) { + Some(mut partition) => { + let idx = partition.index; + partition.index += n as i64; + idx..partition.index + }, + // This is a programming error. + None => panic!("Cannot allocate entid from unknown partition: {}", partition), + } + } + /// Transact the given `entities` against the given SQLite `conn`, using the metadata in /// `self.DB`. /// /// This approach is explained in https://github.com/mozilla/mentat/wiki/Transacting. // TODO: move this to the transactor layer. - pub fn transact_internal(&self, conn: &rusqlite::Connection, entities: &[Entity], tx: Entid) -> Result<()>{ - // TODO: push these into an internal transaction report? + pub fn transact(&mut self, conn: &rusqlite::Connection, entities: I) -> Result where I: IntoIterator { + // Eventually, this function will be responsible for managing a SQLite transaction. For + // now, it's just about the tx details. - /// Assertions that are :db.cardinality/one and not :db.fulltext. - let mut non_fts_one: Vec = vec![]; - - /// Assertions that are :db.cardinality/many and not :db.fulltext. - let mut non_fts_many: Vec = vec![]; - - // Transact [:db/add :db/txInstant NOW :db/tx]. - // TODO: allow this to be present in the transaction data. - let now = time::get_time(); - let tx_instant = (now.sec as i64 * 1_000) + (now.nsec as i64 / (1_000_000)); - non_fts_one.push((tx, - entids::DB_TX_INSTANT, - TypedValue::Long(tx_instant), - true)); - - // Right now, this could be a for loop, saving some mapping, collecting, and type - // annotations. However, I expect it to be a multi-stage map as we start to transform the - // underlying entities, in which case this expression is more natural than for loops. - let r: Vec> = entities.into_iter().map(|entity: &Entity| -> Result<()> { - match *entity { - Entity::AddOrRetract { - op: OpType::Add, - e: entmod::EntidOrLookupRef::Entid(ref e_), - a: ref a_, - v: entmod::ValueOrLookupRef::Value(ref v_)} => { - - let e: i64 = match e_ { - &entmod::Entid::Entid(ref e__) => *e__, - &entmod::Entid::Ident(ref e__) => self.schema.require_entid(&e__.to_string())?, - }; - - let a: i64 = match a_ { - &entmod::Entid::Entid(ref a__) => *a__, - &entmod::Entid::Ident(ref a__) => self.schema.require_entid(&a__.to_string())?, - }; - - let attribute: &Attribute = self.schema.require_attribute_for_entid(a)?; - if attribute.fulltext { - bail!(ErrorKind::NotYetImplemented(format!("Transacting :db/fulltext entities is not yet implemented: {:?}", entity))) - } - - // This is our chance to do schema-aware typechecking: to either assert that the - // given value is in the attribute's value set, or (in limited cases) to coerce - // the value into the attribute's value set. - let typed_value: TypedValue = self.to_typed_value(v_, &attribute)?; - - let added = true; - if attribute.multival { - non_fts_many.push((e, a, typed_value, added)); - } else { - non_fts_one.push((e, a, typed_value, added)); - } - Ok(()) - }, - - Entity::AddOrRetract { - op: OpType::Retract, - e: entmod::EntidOrLookupRef::Entid(ref e_), - a: ref a_, - v: entmod::ValueOrLookupRef::Value(ref v_) } => { - - let e: i64 = match e_ { - &entmod::Entid::Entid(ref e__) => *e__, - &entmod::Entid::Ident(ref e__) => self.schema.require_entid(&e__.to_string())?, - }; - - let a: i64 = match a_ { - &entmod::Entid::Entid(ref a__) => *a__, - &entmod::Entid::Ident(ref a__) => self.schema.require_entid(&a__.to_string())?, - }; - - let attribute: &Attribute = self.schema.require_attribute_for_entid(a)?; - if attribute.fulltext { - bail!(ErrorKind::NotYetImplemented(format!("Transacting :db/fulltext entities is not yet implemented: {:?}", entity))) - } - - // This is our chance to do schema-aware typechecking: to either assert that the - // given value is in the attribute's value set, or (in limited cases) to coerce - // the value into the attribute's value set. - let typed_value: TypedValue = self.to_typed_value(v_, &attribute)?; - - let added = false; - - if attribute.multival { - non_fts_many.push((e, a, typed_value, added)); - } else { - non_fts_one.push((e, a, typed_value, added)); - } - Ok(()) - }, - - _ => bail!(ErrorKind::NotYetImplemented(format!("Transacting this entity is not yet implemented: {:?}", entity))) - } - }).collect(); - - let r: Result> = r.into_iter().collect(); - r?; + let tx_instant = now(); // Label the transaction with the timestamp when we first see it: leading edge. + let tx_id = self.allocate_entid(":db.part/tx"); self.create_temp_tables(conn)?; - if !non_fts_one.is_empty() { - self.insert_non_fts_searches(conn, &non_fts_one[..], tx, SearchType::Inexact)?; - } - - if !non_fts_many.is_empty() { - self.insert_non_fts_searches(conn, &non_fts_many[..], tx, SearchType::Exact)?; - } - - self.search(conn)?; - - self.insert_transaction(conn, tx)?; - self.update_datoms(conn, tx)?; - - // TODO: update parts, idents, schema materialized views. - - Ok(()) + let mut tx = Tx::new(self, conn, tx_id, tx_instant); + tx.transact_entities(entities) } } @@ -869,7 +888,7 @@ mod tests { /// There is some magic here about transaction numbering that I don't want to commit to or /// document just yet. The end state might be much more general pattern matching syntax, rather /// than the targeted transaction ID and timestamp replacement we have right now. - fn assert_transactions(conn: &rusqlite::Connection, db: &DB, transactions: &Vec) { + fn assert_transactions(conn: &rusqlite::Connection, db: &mut DB, transactions: &Vec) { for (index, transaction) in transactions.into_iter().enumerate() { let index = index as i64; let transaction = transaction.as_map().unwrap(); @@ -877,22 +896,40 @@ mod tests { let assertions: edn::Value = transaction.get(&edn::Value::NamespacedKeyword(symbols::NamespacedKeyword::new("test", "assertions"))).unwrap().clone(); let expected_transaction: Option<&edn::Value> = transaction.get(&edn::Value::NamespacedKeyword(symbols::NamespacedKeyword::new("test", "expected-transaction"))); let expected_datoms: Option<&edn::Value> = transaction.get(&edn::Value::NamespacedKeyword(symbols::NamespacedKeyword::new("test", "expected-datoms"))); + let expected_error_message: Option<&edn::Value> = transaction.get(&edn::Value::NamespacedKeyword(symbols::NamespacedKeyword::new("test", "expected-error-message"))); let entities: Vec<_> = mentat_tx_parser::Tx::parse(&[assertions][..]).unwrap(); - db.transact_internal(&conn, &entities[..], bootstrap::TX0 + index + 1).unwrap(); + + let maybe_report = db.transact(&conn, entities); if let Some(expected_transaction) = expected_transaction { + if expected_transaction.is_nil() { + assert!(maybe_report.is_err()); + + if let Some(expected_error_message) = expected_error_message { + let expected_error_message = expected_error_message.as_text(); + assert!(expected_error_message.is_some(), "Expected error message to be text:\n{:?}", expected_error_message); + let error_message = maybe_report.unwrap_err().to_string(); + assert!(error_message.contains(expected_error_message.unwrap()), "Expected error message:\n{}\nto contain:\n{}", error_message, expected_error_message.unwrap()); + } + continue + } + + let report = maybe_report.unwrap(); + assert_eq!(report.tx_id, bootstrap::TX0 + index + 1); + let transactions = debug::transactions_after(&conn, &db, bootstrap::TX0 + index).unwrap(); - assert_eq!(transactions.0[0].into_edn(), - *expected_transaction, - "\n{} - expected transaction:\n{}\n{}", label, transactions.0[0].into_edn(), *expected_transaction); + assert_eq!(transactions.0.len(), 1); + assert_eq!(*expected_transaction, + transactions.0[0].into_edn(), + "\n{} - expected transaction:\n{}\nbut got transaction:\n{}", label, *expected_transaction, transactions.0[0].into_edn()); } if let Some(expected_datoms) = expected_datoms { let datoms = debug::datoms_after(&conn, &db, bootstrap::TX0).unwrap(); - assert_eq!(datoms.into_edn(), - *expected_datoms, - "\n{} - expected datoms:\n{}\n{}", label, datoms.into_edn(), *expected_datoms); + assert_eq!(*expected_datoms, + datoms.into_edn(), + "\n{} - expected datoms:\n{}\nbut got datoms:\n{}", label, *expected_datoms, datoms.into_edn()) } // Don't allow empty tests. This will need to change if we allow transacting schema @@ -905,16 +942,14 @@ mod tests { #[test] fn test_add() { let mut conn = new_connection("").expect("Couldn't open in-memory db"); - assert_eq!(ensure_current_version(&mut conn).unwrap(), CURRENT_VERSION); - - let bootstrap_db = DB::new(bootstrap::bootstrap_partition_map(), bootstrap::bootstrap_schema()); + let mut db = ensure_current_version(&mut conn).unwrap(); // Does not include :db/txInstant. - let datoms = debug::datoms_after(&conn, &bootstrap_db, 0).unwrap(); + let datoms = debug::datoms_after(&conn, &db, 0).unwrap(); assert_eq!(datoms.0.len(), 88); // Includes :db/txInstant. - let transactions = debug::transactions_after(&conn, &bootstrap_db, 0).unwrap(); + let transactions = debug::transactions_after(&conn, &db, 0).unwrap(); assert_eq!(transactions.0.len(), 1); assert_eq!(transactions.0[0].0.len(), 89); @@ -922,28 +957,46 @@ mod tests { let value = edn::parse::value(include_str!("../../tx/fixtures/test_add.edn")).unwrap(); let transactions = value.as_vector().unwrap(); - assert_transactions(&conn, &bootstrap_db, transactions); + assert_transactions(&conn, &mut db, transactions); } #[test] fn test_retract() { let mut conn = new_connection("").expect("Couldn't open in-memory db"); - assert_eq!(ensure_current_version(&mut conn).unwrap(), CURRENT_VERSION); - - let bootstrap_db = DB::new(bootstrap::bootstrap_partition_map(), bootstrap::bootstrap_schema()); + let mut db = ensure_current_version(&mut conn).unwrap(); // Does not include :db/txInstant. - let datoms = debug::datoms_after(&conn, &bootstrap_db, 0).unwrap(); + let datoms = debug::datoms_after(&conn, &db, 0).unwrap(); assert_eq!(datoms.0.len(), 88); // Includes :db/txInstant. - let transactions = debug::transactions_after(&conn, &bootstrap_db, 0).unwrap(); + let transactions = debug::transactions_after(&conn, &db, 0).unwrap(); assert_eq!(transactions.0.len(), 1); assert_eq!(transactions.0[0].0.len(), 89); let value = edn::parse::value(include_str!("../../tx/fixtures/test_retract.edn")).unwrap(); let transactions = value.as_vector().unwrap(); - assert_transactions(&conn, &bootstrap_db, transactions); + assert_transactions(&conn, &mut db, transactions); + } + + #[test] + fn test_upsert_vector() { + let mut conn = new_connection("").expect("Couldn't open in-memory db"); + let mut db = ensure_current_version(&mut conn).unwrap(); + + // Does not include :db/txInstant. + let datoms = debug::datoms_after(&conn, &db, 0).unwrap(); + assert_eq!(datoms.0.len(), 88); + + // Includes :db/txInstant. + let transactions = debug::transactions_after(&conn, &db, 0).unwrap(); + assert_eq!(transactions.0.len(), 1); + assert_eq!(transactions.0[0].0.len(), 89); + + let value = edn::parse::value(include_str!("../../tx/fixtures/test_upsert_vector.edn")).unwrap(); + + let transactions = value.as_vector().unwrap(); + assert_transactions(&conn, &mut db, transactions); } } diff --git a/db/src/internal_types.rs b/db/src/internal_types.rs new file mode 100644 index 00000000..f71677f8 --- /dev/null +++ b/db/src/internal_types.rs @@ -0,0 +1,85 @@ +// Copyright 2016 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +#![allow(dead_code)] + +//! Types used only within the transactor. These should not be exposed outside of this crate. + +use std; +use std::collections::HashMap; +use std::rc::Rc; + +use errors; +use errors::ErrorKind; +use types::*; +use mentat_tx::entities::OpType; + +#[derive(Clone,Debug,Eq,Hash,Ord,PartialOrd,PartialEq)] +pub enum Term { + AddOrRetract(OpType, E, Entid, V), +} + +pub type EntidOr = std::result::Result; +pub type TypedValueOr = std::result::Result; + +pub type TempId = Rc; +pub type TempIdMap = HashMap; + +pub type LookupRef = Rc; + +/// Internal representation of an entid on its way to resolution. We either have the simple case (a +/// numeric entid), a lookup-ref that still needs to be resolved (an atomized [a v] pair), or a temp +/// ID that needs to be upserted or allocated (an atomized tempid). +#[derive(Clone,Debug,Eq,Hash,Ord,PartialOrd,PartialEq)] +pub enum LookupRefOrTempId { + LookupRef(LookupRef), + TempId(TempId) +} + +pub type TermWithTempIdsAndLookupRefs = Term, TypedValueOr>; +pub type TermWithTempIds = Term, TypedValueOr>; +pub type TermWithoutTempIds = Term; +pub type Population = Vec; + +impl TermWithTempIds { + // These have no tempids by definition, and just need to be unwrapped. This operation might + // also be called "lowering" or "level lowering", but the concept of "unwrapping" is common in + // Rust and seems appropriate here. + pub fn unwrap(self) -> TermWithoutTempIds { + match self { + Term::AddOrRetract(op, Ok(n), a, Ok(v)) => Term::AddOrRetract(op, n, a, v), + _ => unreachable!(), + } + } +} + +/// Given an `EntidOr` or a `TypedValueOr`, replace any internal `LookupRef` with the entid from +/// the given map. Fail if any `LookupRef` cannot be replaced. +/// +/// `lift` allows to specify how the entid found is mapped into the output type. (This could +/// also be an `Into` or `From` requirement.) +/// +/// The reason for this awkward expression is that we're parameterizing over the _type constructor_ +/// (`EntidOr` or `TypedValueOr`), which is not trivial to express in Rust. This only works because +/// they're both the same `Result<...>` type with different parameterizations. +pub fn replace_lookup_ref(lookup_map: &AVMap, desired_or: Result, lift: U) -> errors::Result> where U: FnOnce(Entid) -> T { + match desired_or { + Ok(desired) => Ok(Ok(desired)), // N.b., must unwrap here -- the ::Ok types are different! + Err(other) => { + match other { + LookupRefOrTempId::TempId(t) => Ok(Err(t)), + LookupRefOrTempId::LookupRef(av) => lookup_map.get(&*av) + .map(|x| lift(*x)).map(Ok) + // XXX TODO: fix this error kind! + .ok_or_else(|| ErrorKind::UnrecognizedIdent(format!("couldn't lookup [a v]: {:?}", (*av).clone())).into()), + } + } + } +} diff --git a/db/src/lib.rs b/db/src/lib.rs index 463e5bd0..fd8dca7a 100644 --- a/db/src/lib.rs +++ b/db/src/lib.rs @@ -33,7 +33,10 @@ mod entids; mod errors; mod schema; mod types; +mod internal_types; +mod upsert_resolution; mod values; +mod tx; pub use types::DB; @@ -73,3 +76,11 @@ pub fn repeat_values(values_per_tuple: usize, tuples: usize) -> String { let values: String = repeat(inner).take(tuples).join(", "); values } + +/// Return the current time in milliseconds after the Unix epoch according to the local clock. +/// +/// Compare `Date.now()` in JavaScript, `System.currentTimeMillis` in Java. +pub fn now() -> i64 { + let now = time::get_time(); + (now.sec as i64 * 1_000) + (now.nsec as i64 / (1_000_000)) +} diff --git a/db/src/tx.rs b/db/src/tx.rs new file mode 100644 index 00000000..14933ce5 --- /dev/null +++ b/db/src/tx.rs @@ -0,0 +1,309 @@ +// Copyright 2016 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +#![allow(dead_code)] + +//! This module implements the transaction application algorithm described at +//! https://github.com/mozilla/mentat/wiki/Transacting and its children pages. +//! +//! The implementation proceeds in four main stages, labeled "Pipeline stage 1" through "Pipeline +//! stage 4". _Pipeline_ may be a misnomer, since the stages as written **cannot** be interleaved +//! in parallel. That is, a single transacted entity cannot flow through all the stages without its +//! sibling entities. +//! +//! This unintuitive architectural decision was made because the second and third stages (resolving +//! lookup refs and tempids, respectively) operate _in bulk_ to minimize the number of expensive +//! SQLite queries by processing many in one SQLite invocation. Pipeline stage 2 doesn't need to +//! operate like this: it is easy to handle each transacted entity independently of all the others +//! (and earlier, less efficient, implementations did this). However, Pipeline stage 3 appears to +//! require processing multiple elements at the same time, since there can be arbitrarily complex +//! graph relationships between tempids. Pipeline stage 4 (inserting elements into the SQL store) +//! could also be expressed as an independent operation per transacted entity, but there are +//! non-trivial uniqueness relationships inside a single transaction that need to enforced. +//! Therefore, some multi-entity processing is required, and a per-entity pipeline becomes less +//! attractive. +//! +//! A note on the types in the implementation. The pipeline stages are strongly typed: each stage +//! accepts and produces a subset of the previous. We hope this will reduce errors as data moves +//! through the system. In contrast the Clojure implementation rewrote the fundamental entity type +//! in place and suffered bugs where particular code paths missed cases. +//! +//! The type hierarchy accepts `Entity` instances from the transaction parser and flows `Term` +//! instances through the term-rewriting transaction applier. `Term` is a general `[:db/add e a v]` +//! with restrictions on the `e` and `v` components. The hierarchy is expressed using `Result` to +//! model either/or, and layers of `Result` are stripped -- we might say the `Term` instances are +//! _lowered_ as they flow through the pipeline. This type hierarchy could have been expressed by +//! combinatorially increasing `enum` cases, but this makes it difficult to handle the `e` and `v` +//! components symmetrically. Hence, layers of `Result` type aliases. Hopefully the explanatory +//! names -- `TermWithTempIdsAndLookupRefs`, anyone? -- and strongly typed stage functions will help +//! keep everything straight. + +use std; +use std::collections::BTreeSet; + +use db::{ReducedEntity, SearchType}; +use entids; +use errors::*; +use internal_types::{ + LookupRefOrTempId, + TempId, + TempIdMap, + Term, + TermWithTempIdsAndLookupRefs, + TermWithTempIds, + TermWithoutTempIds, + replace_lookup_ref}; +use mentat_core::intern_set; +use mentat_tx::entities as entmod; +use mentat_tx::entities::{Entity, OpType}; +use rusqlite; +use schema::SchemaBuilding; +use types::*; +use upsert_resolution::Generation; + +/// A transaction on its way to being applied. +#[derive(Debug)] +pub struct Tx<'conn> { + /// The metadata to use to interpret the transaction entities with. + pub db: &'conn mut DB, + + /// The SQLite connection to apply against. In the future, this will be a Mentat connection. + pub conn: &'conn rusqlite::Connection, + + /// The transaction ID of the transaction. + pub tx_id: Entid, + + /// The timestamp when the transaction began to be committed. + /// + /// This is milliseconds after the Unix epoch according to the transactor's local clock. + // TODO: :db.type/instant. + pub tx_instant: i64, +} + +impl<'conn> Tx<'conn> { + pub fn new(db: &'conn mut DB, conn: &'conn rusqlite::Connection, tx_id: Entid, tx_instant: i64) -> Tx<'conn> { + Tx { + db: db, + conn: conn, + tx_id: tx_id, + tx_instant: tx_instant, + } + } + + /// Given a collection of tempids and the [a v] pairs that they might upsert to, resolve exactly + /// which [a v] pairs do upsert to entids, and map each tempid that upserts to the upserted + /// entid. The keys of the resulting map are exactly those tempids that upserted. + pub fn resolve_temp_id_avs<'a>(&self, conn: &rusqlite::Connection, temp_id_avs: &'a [(TempId, AVPair)]) -> Result { + if temp_id_avs.is_empty() { + return Ok(TempIdMap::default()); + } + + // Map [a v]->entid. + let mut av_pairs: Vec<&AVPair> = vec![]; + for i in 0..temp_id_avs.len() { + av_pairs.push(&temp_id_avs[i].1); + } + + // Lookup in the store. + let av_map: AVMap = self.db.resolve_avs(conn, &av_pairs[..])?; + + // Map id->entid. + let mut temp_id_map: TempIdMap = TempIdMap::default(); + for &(ref temp_id, ref av_pair) in temp_id_avs { + if let Some(n) = av_map.get(&av_pair) { + if let Some(previous_n) = temp_id_map.get(&*temp_id) { + if n != previous_n { + // Conflicting upsert! TODO: collect conflicts and give more details on what failed this transaction. + bail!(ErrorKind::NotYetImplemented(format!("Conflicting upsert: tempid '{}' resolves to more than one entid: {:?}, {:?}", temp_id, previous_n, n))) // XXX + } + } + temp_id_map.insert(temp_id.clone(), *n); + } + } + + Ok((temp_id_map)) + } + + /// Pipeline stage 1: convert `Entity` instances into `Term` instances, ready for term + /// rewriting. + /// + /// The `Term` instances produce share interned TempId and LookupRef handles. + fn entities_into_terms_with_temp_ids_and_lookup_refs(&self, entities: I) -> Result> where I: IntoIterator { + let mut temp_ids = intern_set::InternSet::new(); + + entities.into_iter() + .map(|entity: Entity| -> Result { + match entity { + Entity::AddOrRetract { op, e, a, v } => { + let a: i64 = match a { + entmod::Entid::Entid(ref a) => *a, + entmod::Entid::Ident(ref a) => self.db.schema.require_entid(&a.to_string())?, + }; + + let attribute: &Attribute = self.db.schema.require_attribute_for_entid(a)?; + + let e = match e { + entmod::EntidOrLookupRefOrTempId::Entid(e) => { + let e: i64 = match e { + entmod::Entid::Entid(ref e) => *e, + entmod::Entid::Ident(ref e) => self.db.schema.require_entid(&e.to_string())?, + }; + std::result::Result::Ok(e) + }, + + entmod::EntidOrLookupRefOrTempId::TempId(e) => { + std::result::Result::Err(LookupRefOrTempId::TempId(temp_ids.intern(e))) + }, + + entmod::EntidOrLookupRefOrTempId::LookupRef(_) => { + // TODO: reference entity and initial input. + bail!(ErrorKind::NotYetImplemented(format!("Transacting lookup-refs is not yet implemented"))) + }, + }; + + let v = { + if attribute.value_type == ValueType::Ref && v.is_text() { + std::result::Result::Err(LookupRefOrTempId::TempId(temp_ids.intern(v.as_text().unwrap().clone()))) + } else if attribute.value_type == ValueType::Ref && v.is_vector() && v.as_vector().unwrap().len() == 2 { + bail!(ErrorKind::NotYetImplemented(format!("Transacting lookup-refs is not yet implemented"))) + } else { + // Here is where we do schema-aware typechecking: we either assert that + // the given value is in the attribute's value set, or (in limited + // cases) coerce the value into the attribute's value set. + let typed_value: TypedValue = self.db.to_typed_value(&v, &attribute)?; + + std::result::Result::Ok(typed_value) + } + }; + + Ok(Term::AddOrRetract(op, e, a, v)) + }, + } + }) + .collect::>>() + } + + /// Pipeline stage 2: rewrite `Term` instances with lookup refs into `Term` instances without + /// lookup refs. + /// + /// The `Term` instances produce share interned TempId handles and have no LookupRef references. + fn resolve_lookup_refs(&self, lookup_ref_map: &AVMap, terms: I) -> Result> where I: IntoIterator { + terms.into_iter().map(|term: TermWithTempIdsAndLookupRefs| -> Result { + match term { + Term::AddOrRetract(op, e, a, v) => { + let e = replace_lookup_ref(&lookup_ref_map, e, |x| x)?; + let v = replace_lookup_ref(&lookup_ref_map, v, |x| TypedValue::Ref(x))?; + Ok(Term::AddOrRetract(op, e, a, v)) + }, + } + }).collect::>>() + } + + /// Transact the given `entities` against the given SQLite `conn`, using the metadata in + /// `self.DB`. + /// + /// This approach is explained in https://github.com/mozilla/mentat/wiki/Transacting. + // TODO: move this to the transactor layer. + pub fn transact_entities(&mut self, entities: I) -> Result where I: IntoIterator { + // TODO: push these into an internal transaction report? + + /// Assertions that are :db.cardinality/one and not :db.fulltext. + let mut non_fts_one: Vec = vec![]; + + /// Assertions that are :db.cardinality/many and not :db.fulltext. + let mut non_fts_many: Vec = vec![]; + + // Transact [:db/add :db/txInstant NOW :db/tx]. + // TODO: allow this to be present in the transaction data. + non_fts_one.push((self.tx_id, + entids::DB_TX_INSTANT, + TypedValue::Long(self.tx_instant), + true)); + + // We don't yet support lookup refs, so this isn't mutable. Later, it'll be mutable. + let lookup_refs: intern_set::InternSet = intern_set::InternSet::new(); + + // TODO: extract the tempids set as well. + // Pipeline stage 1: entities -> terms with tempids and lookup refs. + let terms_with_temp_ids_and_lookup_refs = self.entities_into_terms_with_temp_ids_and_lookup_refs(entities)?; + + // Pipeline stage 2: resolve lookup refs -> terms with tempids. + let lookup_ref_avs: Vec<&(i64, TypedValue)> = lookup_refs.inner.iter().map(|rc| &**rc).collect(); + let lookup_ref_map: AVMap = self.db.resolve_avs(self.conn, &lookup_ref_avs[..])?; + + let terms_with_temp_ids = self.resolve_lookup_refs(&lookup_ref_map, terms_with_temp_ids_and_lookup_refs)?; + + // Pipeline stage 3: upsert tempids -> terms without tempids or lookup refs. + // Now we can collect upsert populations. + let (mut generation, inert_terms) = Generation::from(terms_with_temp_ids, &self.db.schema)?; + + // And evolve them forward. + while generation.can_evolve() { + // Evolve further. + let temp_id_map = self.resolve_temp_id_avs(self.conn, &generation.temp_id_avs()[..])?; + generation = generation.evolve_one_step(&temp_id_map); + } + + // Allocate entids for tempids that didn't upsert. BTreeSet rather than HashSet so this is deterministic. + let unresolved_temp_ids: BTreeSet = generation.temp_ids_in_allocations(); + + // TODO: track partitions for temporary IDs. + let entids = self.db.allocate_entids(":db.part/user", unresolved_temp_ids.len()); + + let temp_id_allocations: TempIdMap = unresolved_temp_ids.into_iter().zip(entids).collect(); + + let final_populations = generation.into_final_populations(&temp_id_allocations)?; + let final_terms: Vec = [final_populations.resolved, + final_populations.allocated, + inert_terms.into_iter().map(|term| term.unwrap()).collect()].concat(); + + // Pipeline stage 4: final terms (after rewriting) -> DB insertions. + // Collect into non_fts_*. + // TODO: use something like Clojure's group_by to do this. + for term in final_terms { + match term { + Term::AddOrRetract(op, e, a, v) => { + let attribute: &Attribute = self.db.schema.require_attribute_for_entid(a)?; + if attribute.fulltext { + bail!(ErrorKind::NotYetImplemented(format!("Transacting :db/fulltext entities is not yet implemented"))) // TODO: reference original input. Difficult! + } + + let added = op == OpType::Add; + if attribute.multival { + non_fts_many.push((e, a, v, added)); + } else { + non_fts_one.push((e, a, v, added)); + } + }, + } + } + + if !non_fts_one.is_empty() { + self.db.insert_non_fts_searches(self.conn, &non_fts_one[..], self.tx_id, SearchType::Inexact)?; + } + + if !non_fts_many.is_empty() { + self.db.insert_non_fts_searches(self.conn, &non_fts_many[..], self.tx_id, SearchType::Exact)?; + } + + self.db.search(self.conn)?; + + self.db.insert_transaction(self.conn, self.tx_id)?; + self.db.update_datoms(self.conn, self.tx_id)?; + + // TODO: update idents and schema materialized views. + self.db.update_partition_map(self.conn)?; + + Ok(TxReport { + tx_id: self.tx_id, + tx_instant: self.tx_instant, + }) + } +} diff --git a/db/src/types.rs b/db/src/types.rs index dbc3823c..0403f47a 100644 --- a/db/src/types.rs +++ b/db/src/types.rs @@ -10,7 +10,8 @@ #![allow(dead_code)] -use std::collections::{BTreeMap}; +use std::collections::HashMap; +use std::collections::BTreeMap; extern crate mentat_core; @@ -64,3 +65,27 @@ impl DB { } } } + +/// A pair [a v] in the store. +/// +/// Used to represent lookup-refs and [TEMPID a v] upserts as they are resolved. +pub type AVPair = (Entid, TypedValue); + +/// Map [a v] pairs to existing entids. +/// +/// Used to resolve lookup-refs and upserts. +pub type AVMap<'a> = HashMap<&'a AVPair, Entid>; + +/// A transaction report summarizes an applied transaction. +// TODO: include map of resolved tempids. +#[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialOrd, PartialEq)] +pub struct TxReport { + /// The transaction ID of the transaction. + pub tx_id: Entid, + + /// The timestamp when the transaction began to be committed. + /// + /// This is milliseconds after the Unix epoch according to the transactor's local clock. + // TODO: :db.type/instant. + pub tx_instant: i64, +} diff --git a/db/src/upsert_resolution.rs b/db/src/upsert_resolution.rs new file mode 100644 index 00000000..1262821b --- /dev/null +++ b/db/src/upsert_resolution.rs @@ -0,0 +1,265 @@ +// Copyright 2016 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +#![allow(dead_code)] + +//! This module implements the upsert resolution algorithm described at +//! https://github.com/mozilla/mentat/wiki/Transacting:-upsert-resolution-algorithm. + +use std::collections::BTreeSet; + +use mentat_tx::entities::OpType; +use errors; +use errors::ErrorKind; +use types::{Attribute, AVPair, Entid, Schema, TypedValue}; +use internal_types::*; +use schema::SchemaBuilding; + +/// A "Simple upsert" that looks like [:db/add TEMPID a v], where a is :db.unique/identity. +#[derive(Clone,Debug,Eq,Hash,Ord,PartialOrd,PartialEq)] +struct UpsertE(TempId, Entid, TypedValue); + +/// A "Complex upsert" that looks like [:db/add TEMPID a OTHERID], where a is :db.unique/identity +#[derive(Clone,Debug,Eq,Hash,Ord,PartialOrd,PartialEq)] +struct UpsertEV(TempId, Entid, TempId); + +/// A generation collects entities into populations at a single evolutionary step in the upsert +/// resolution evolution process. +/// +/// The upsert resolution process is only concerned with [:db/add ...] entities until the final +/// entid allocations. That's why we separate into special simple and complex upsert types +/// immediately, and then collect the more general term types for final resolution. +#[derive(Clone,Debug,Default,Eq,Hash,Ord,PartialOrd,PartialEq)] +pub struct Generation { + /// "Simple upserts" that look like [:db/add TEMPID a v], where a is :db.unique/identity. + upserts_e: Vec, + + /// "Complex upserts" that look like [:db/add TEMPID a OTHERID], where a is :db.unique/identity + upserts_ev: Vec, + + /// Entities that look like: + /// - [:db/add TEMPID b OTHERID], where b is not :db.unique/identity; + /// - [:db/add TEMPID b v], where b is not :db.unique/identity. + /// - [:db/add e b OTHERID]. + allocations: Vec, + + /// Entities that upserted and no longer reference tempids. These assertions are guaranteed to + /// be in the store. + upserted: Vec, + + /// Entities that resolved due to other upserts and no longer reference tempids. These + /// assertions may or may not be in the store. + resolved: Vec, +} + +#[derive(Clone,Debug,Default,Eq,Hash,Ord,PartialOrd,PartialEq)] +pub struct FinalPopulations { + /// Upserts that upserted. + pub upserted: Vec, + + /// Allocations that resolved due to other upserts. + pub resolved: Vec, + + /// Allocations that required new entid allocations. + pub allocated: Vec, +} + +impl Generation { + /// Split entities into a generation of populations that need to evolve to have their tempids + /// resolved or allocated, and a population of inert entities that do not reference tempids. + pub fn from(terms: I, schema: &Schema) -> errors::Result<(Generation, Population)> where I: IntoIterator { + let mut generation = Generation::default(); + let mut inert = vec![]; + + let is_unique = |a: Entid| -> errors::Result { + let attribute: &Attribute = schema.require_attribute_for_entid(a)?; + Ok(attribute.unique_identity) + }; + + for term in terms.into_iter() { + match term { + Term::AddOrRetract(op, Err(e), a, Err(v)) => { + if op == OpType::Add && is_unique(a)? { + generation.upserts_ev.push(UpsertEV(e, a, v)); + } else { + generation.allocations.push(Term::AddOrRetract(op, Err(e), a, Err(v))); + } + }, + Term::AddOrRetract(op, Err(e), a, Ok(v)) => { + if op == OpType::Add && is_unique(a)? { + generation.upserts_e.push(UpsertE(e, a, v)); + } else { + generation.allocations.push(Term::AddOrRetract(op, Err(e), a, Ok(v))); + } + }, + Term::AddOrRetract(op, Ok(e), a, Err(v)) => { + generation.allocations.push(Term::AddOrRetract(op, Ok(e), a, Err(v))); + }, + Term::AddOrRetract(op, Ok(e), a, Ok(v)) => { + inert.push(Term::AddOrRetract(op, Ok(e), a, Ok(v))); + }, + } + } + + Ok((generation, inert)) + } + + /// Return true if it's possible to evolve this generation further. + /// + /// There can be complex upserts but no simple upserts to help resolve them. We accept the + /// overhead of having the database try to resolve an empty set of simple upserts, to avoid + /// having to special case complex upserts at entid allocation time. + pub fn can_evolve(&self) -> bool { + !self.upserts_e.is_empty() || !self.upserts_ev.is_empty() + } + + /// Evolve this generation one step further by rewriting the existing :db/add entities using the + /// given temporary IDs. + /// + /// TODO: Considering doing this in place; the function already consumes `self`. + pub fn evolve_one_step(self, temp_id_map: &TempIdMap) -> Generation { + let mut next = Generation::default(); + + for UpsertE(t, a, v) in self.upserts_e { + match temp_id_map.get(&*t) { + Some(&n) => next.upserted.push(Term::AddOrRetract(OpType::Add, n, a, v)), + None => next.allocations.push(Term::AddOrRetract(OpType::Add, Err(t), a, Ok(v))), + } + } + + for UpsertEV(t1, a, t2) in self.upserts_ev { + match (temp_id_map.get(&*t1), temp_id_map.get(&*t2)) { + (Some(&n1), Some(&n2)) => next.resolved.push(Term::AddOrRetract(OpType::Add, n1, a, TypedValue::Ref(n2))), + (None, Some(&n2)) => next.upserts_e.push(UpsertE(t1, a, TypedValue::Ref(n2))), + (Some(&n1), None) => next.allocations.push(Term::AddOrRetract(OpType::Add, Ok(n1), a, Err(t2))), + (None, None) => next.allocations.push(Term::AddOrRetract(OpType::Add, Err(t1), a, Err(t2))), + } + } + + // There's no particular need to separate resolved from allocations right here and right + // now, although it is convenient. + for term in self.allocations { + // TODO: find an expression that destructures less? I still expect this to be efficient + // but it's a little verbose. + match term { + Term::AddOrRetract(op, Err(t1), a, Err(t2)) => { + match (temp_id_map.get(&*t1), temp_id_map.get(&*t2)) { + (Some(&n1), Some(&n2)) => next.resolved.push(Term::AddOrRetract(op, n1, a, TypedValue::Ref(n2))), + (None, Some(&n2)) => next.allocations.push(Term::AddOrRetract(op, Err(t1), a, Ok(TypedValue::Ref(n2)))), + (Some(&n1), None) => next.allocations.push(Term::AddOrRetract(op, Ok(n1), a, Err(t2))), + (None, None) => next.allocations.push(Term::AddOrRetract(op, Err(t1), a, Err(t2))), + } + }, + Term::AddOrRetract(op, Err(t), a, Ok(v)) => { + match temp_id_map.get(&*t) { + Some(&n) => next.resolved.push(Term::AddOrRetract(op, n, a, v)), + None => next.allocations.push(Term::AddOrRetract(op, Err(t), a, Ok(v))), + } + }, + Term::AddOrRetract(op, Ok(e), a, Err(t)) => { + match temp_id_map.get(&*t) { + Some(&n) => next.resolved.push(Term::AddOrRetract(op, e, a, TypedValue::Ref(n))), + None => next.allocations.push(Term::AddOrRetract(op, Ok(e), a, Err(t))), + } + }, + Term::AddOrRetract(_, Ok(_), _, Ok(_)) => unreachable!(), + } + } + + next + } + + // Collect id->[a v] pairs that might upsert at this evolutionary step. + pub fn temp_id_avs<'a>(&'a self) -> Vec<(TempId, AVPair)> { + let mut temp_id_avs: Vec<(TempId, AVPair)> = vec![]; + // TODO: map/collect. + for &UpsertE(ref t, ref a, ref v) in &self.upserts_e { + // TODO: figure out how to make this less expensive, i.e., don't require + // clone() of an arbitrary value. + temp_id_avs.push((t.clone(), (*a, v.clone()))); + } + temp_id_avs + } + + /// After evolution is complete, yield the set of tempids that require entid allocation. These + /// are the tempids that appeared in [:db/add ...] entities, but that didn't upsert to existing + /// entids. + pub fn temp_ids_in_allocations(&self) -> BTreeSet { + assert!(self.upserts_e.is_empty(), "All upserts should have been upserted, resolved, or moved to the allocated population!"); + assert!(self.upserts_ev.is_empty(), "All upserts should have been upserted, resolved, or moved to the allocated population!"); + + let mut temp_ids: BTreeSet = BTreeSet::default(); + + for term in self.allocations.iter() { + match term { + &Term::AddOrRetract(OpType::Add, Err(ref t1), _, Err(ref t2)) => { + temp_ids.insert(t1.clone()); + temp_ids.insert(t2.clone()); + }, + &Term::AddOrRetract(OpType::Add, Err(ref t), _, Ok(_)) => { + temp_ids.insert(t.clone()); + }, + &Term::AddOrRetract(OpType::Add, Ok(_), _, Err(ref t)) => { + temp_ids.insert(t.clone()); + }, + &Term::AddOrRetract(OpType::Add, Ok(_), _, Ok(_)) => unreachable!(), + &Term::AddOrRetract(OpType::Retract, _, _, _) => { + // [:db/retract ...] entities never allocate entids; they have to resolve due to + // other upserts (or they fail the transaction). + }, + } + } + + temp_ids + } + + /// After evolution is complete, use the provided allocated entids to segment `self` into + /// populations, each with no references to tempids. + pub fn into_final_populations(self, temp_id_map: &TempIdMap) -> errors::Result { + assert!(self.upserts_e.is_empty()); + assert!(self.upserts_ev.is_empty()); + + let mut populations = FinalPopulations::default(); + + populations.upserted = self.upserted; + populations.resolved = self.resolved; + + for term in self.allocations { + let allocated = match term { + // TODO: consider require implementing require on temp_id_map. + Term::AddOrRetract(op, Err(t1), a, Err(t2)) => { + match (op, temp_id_map.get(&*t1), temp_id_map.get(&*t2)) { + (op, Some(&n1), Some(&n2)) => Term::AddOrRetract(op, n1, a, TypedValue::Ref(n2)), + (OpType::Add, _, _) => unreachable!(), // This is a coding error -- every tempid in a :db/add entity should resolve or be allocated. + (OpType::Retract, _, _) => bail!(ErrorKind::NotYetImplemented(format!("[:db/retract ...] entity referenced tempid that did not upsert: one of {}, {}", t1, t2))), + } + }, + Term::AddOrRetract(op, Err(t), a, Ok(v)) => { + match (op, temp_id_map.get(&*t)) { + (op, Some(&n)) => Term::AddOrRetract(op, n, a, v), + (OpType::Add, _) => unreachable!(), // This is a coding error. + (OpType::Retract, _) => bail!(ErrorKind::NotYetImplemented(format!("[:db/retract ...] entity referenced tempid that did not upsert: {}", t))), + } + }, + Term::AddOrRetract(op, Ok(e), a, Err(t)) => { + match (op, temp_id_map.get(&*t)) { + (op, Some(&n)) => Term::AddOrRetract(op, e, a, TypedValue::Ref(n)), + (OpType::Add, _) => unreachable!(), // This is a coding error. + (OpType::Retract, _) => bail!(ErrorKind::NotYetImplemented(format!("[:db/retract ...] entity referenced tempid that did not upsert: {}", t))), + } + }, + Term::AddOrRetract(_, Ok(_), _, Ok(_)) => unreachable!(), // This is a coding error -- these should not be in allocations. + }; + populations.allocated.push(allocated); + } + + Ok(populations) + } +} diff --git a/tx-parser/src/lib.rs b/tx-parser/src/lib.rs index 598f7eef..1fc96375 100644 --- a/tx-parser/src/lib.rs +++ b/tx-parser/src/lib.rs @@ -21,7 +21,7 @@ use combine::{any, eof, many, parser, satisfy_map, token, Parser, ParseResult, S use combine::combinator::{Expected, FnParser}; use edn::symbols::NamespacedKeyword; use edn::types::Value; -use mentat_tx::entities::{Entid, EntidOrLookupRef, Entity, LookupRef, OpType, ValueOrLookupRef}; +use mentat_tx::entities::{Entid, EntidOrLookupRefOrTempId, Entity, LookupRef, OpType}; use mentat_parser_utils::ResultParser; pub struct Tx(::std::marker::PhantomData I>); @@ -64,20 +64,25 @@ def_parser_fn!(Tx, lookup_ref, Value, LookupRef, input, { .parse_stream(input) }); -def_parser_fn!(Tx, entid_or_lookup_ref, Value, EntidOrLookupRef, input, { - Tx::::entid() - .map(|x| EntidOrLookupRef::Entid(x)) - .or(Tx::::lookup_ref().map(|x| EntidOrLookupRef::LookupRef(x))) +def_parser_fn!(Tx, entid_or_lookup_ref_or_temp_id, Value, EntidOrLookupRefOrTempId, input, { + Tx::::entid().map(|x| EntidOrLookupRefOrTempId::Entid(x)) + .or(Tx::::lookup_ref().map(|x| EntidOrLookupRefOrTempId::LookupRef(x))) + .or(Tx::::temp_id().map(|x| EntidOrLookupRefOrTempId::TempId(x))) .parse_lazy(input) .into() }); +def_parser_fn!(Tx, temp_id, Value, String, input, { + satisfy_map(|x: Value| x.into_text()) + .parse_stream(input) +}); + // TODO: abstract the "match Vector, parse internal stream" pattern to remove this boilerplate. def_parser_fn!(Tx, add, Value, Entity, input, { satisfy_map(|x: Value| -> Option { if let Value::Vector(y) = x { let mut p = (token(Value::NamespacedKeyword(NamespacedKeyword::new("db", "add"))), - Tx::<&[Value]>::entid_or_lookup_ref(), + Tx::<&[Value]>::entid_or_lookup_ref_or_temp_id(), Tx::<&[Value]>::entid(), // TODO: handle lookup-ref. any(), @@ -87,7 +92,7 @@ def_parser_fn!(Tx, add, Value, Entity, input, { op: OpType::Add, e: e, a: a, - v: ValueOrLookupRef::Value(v), + v: v, } }); // TODO: use ok() with a type annotation rather than explicit match. @@ -106,7 +111,7 @@ def_parser_fn!(Tx, retract, Value, Entity, input, { satisfy_map(|x: Value| -> Option { if let Value::Vector(y) = x { let mut p = (token(Value::NamespacedKeyword(NamespacedKeyword::new("db", "retract"))), - Tx::<&[Value]>::entid_or_lookup_ref(), + Tx::<&[Value]>::entid_or_lookup_ref_or_temp_id(), Tx::<&[Value]>::entid(), // TODO: handle lookup-ref. any(), @@ -116,7 +121,7 @@ def_parser_fn!(Tx, retract, Value, Entity, input, { op: OpType::Retract, e: e, a: a, - v: ValueOrLookupRef::Value(v), + v: v, } }); // TODO: use ok() with a type annotation rather than explicit match. @@ -170,6 +175,7 @@ mod tests { use combine::Parser; use edn::symbols::NamespacedKeyword; use edn::types::Value; + use mentat_tx::entities::{Entid, EntidOrLookupRefOrTempId, Entity, LookupRef, OpType}; fn kw(namespace: &str, name: &str) -> Value { Value::NamespacedKeyword(NamespacedKeyword::new(namespace, name)) @@ -186,10 +192,10 @@ mod tests { assert_eq!(result, Ok((Entity::AddOrRetract { op: OpType::Add, - e: EntidOrLookupRef::Entid(Entid::Ident(NamespacedKeyword::new("test", - "entid"))), + e: EntidOrLookupRefOrTempId::Entid(Entid::Ident(NamespacedKeyword::new("test", + "entid"))), a: Entid::Ident(NamespacedKeyword::new("test", "a")), - v: ValueOrLookupRef::Value(Value::Text("v".into())), + v: Value::Text("v".into()), }, &[][..]))); } @@ -205,9 +211,9 @@ mod tests { assert_eq!(result, Ok((Entity::AddOrRetract { op: OpType::Retract, - e: EntidOrLookupRef::Entid(Entid::Entid(101)), + e: EntidOrLookupRefOrTempId::Entid(Entid::Entid(101)), a: Entid::Ident(NamespacedKeyword::new("test", "a")), - v: ValueOrLookupRef::Value(Value::Text("v".into())), + v: Value::Text("v".into()), }, &[][..]))); } @@ -224,12 +230,12 @@ mod tests { assert_eq!(result, Ok((Entity::AddOrRetract { op: OpType::Add, - e: EntidOrLookupRef::LookupRef(LookupRef { + e: EntidOrLookupRefOrTempId::LookupRef(LookupRef { a: Entid::Ident(NamespacedKeyword::new("test", "a1")), v: Value::Text("v1".into()), }), a: Entid::Ident(NamespacedKeyword::new("test", "a")), - v: ValueOrLookupRef::Value(Value::Text("v".into())), + v: Value::Text("v".into()), }, &[][..]))); } diff --git a/tx-parser/tests/parser.rs b/tx-parser/tests/parser.rs index 66d29e43..f0a7add4 100644 --- a/tx-parser/tests/parser.rs +++ b/tx-parser/tests/parser.rs @@ -16,15 +16,15 @@ extern crate mentat_tx_parser; use edn::parse; use edn::symbols::NamespacedKeyword; use edn::types::Value; -use mentat_tx::entities::{Entid, EntidOrLookupRef, Entity, OpType, ValueOrLookupRef}; +use mentat_tx::entities::{Entid, EntidOrLookupRefOrTempId, Entity, OpType}; use mentat_tx_parser::Tx; #[test] fn test_entities() { - - // TODO: align with whitespace after the EDN parser ignores more whitespace. - let input = r#"[[:db/add 101 :test/a "v"] -[:db/retract 102 :test/b "w"]]"#; + let input = r#" +[[:db/add 101 :test/a "v"] + [:db/add "tempid" :test/a "v"] + [:db/retract 102 :test/b "w"]]"#; let edn = parse::value(input).unwrap(); let input = [edn]; @@ -33,16 +33,22 @@ fn test_entities() { assert_eq!(result, Ok(vec![ Entity::AddOrRetract { - e: EntidOrLookupRef::Entid(Entid::Entid(101)), - a: Entid::Ident(NamespacedKeyword::new("test", "a")), - v: ValueOrLookupRef::Value(Value::Text("v".into())), op: OpType::Add, + e: EntidOrLookupRefOrTempId::Entid(Entid::Entid(101)), + a: Entid::Ident(NamespacedKeyword::new("test", "a")), + v: Value::Text("v".into()), + }, + Entity::AddOrRetract { + op: OpType::Add, + e: EntidOrLookupRefOrTempId::TempId("tempid".into()), + a: Entid::Ident(NamespacedKeyword::new("test", "a")), + v: Value::Text("v".into()), }, Entity::AddOrRetract { - e: EntidOrLookupRef::Entid(Entid::Entid(102)), - a: Entid::Ident(NamespacedKeyword::new("test", "b")), - v: ValueOrLookupRef::Value(Value::Text("w".into())), op: OpType::Retract, + e: EntidOrLookupRefOrTempId::Entid(Entid::Entid(102)), + a: Entid::Ident(NamespacedKeyword::new("test", "b")), + v: Value::Text("w".into()), }, ])); } diff --git a/tx/fixtures/test_upsert_vector.edn b/tx/fixtures/test_upsert_vector.edn new file mode 100644 index 00000000..41a55c73 --- /dev/null +++ b/tx/fixtures/test_upsert_vector.edn @@ -0,0 +1,121 @@ +[{:test/label ":db.cardinality/one, insert" + :test/assertions + [[:db/add 100 :db/ident :name/Ivan] + [:db/add 101 :db/ident :name/Petr]] + :test/expected-transaction + #{[100 :db/ident :name/Ivan ?tx1 true] + [101 :db/ident :name/Petr ?tx1 true] + [?tx1 :db/txInstant ?ms1 ?tx1 true]} + :test/expected-datoms + #{[100 :db/ident :name/Ivan] + [101 :db/ident :name/Petr]}} + + {:test/label "upsert two tempids to same entid" + :test/assertions + [[:db/add "t1" :db/ident :name/Ivan] + [:db/add "t1" :db.schema/attribute 100] + [:db/add "t2" :db/ident :name/Petr] + [:db/add "t2" :db.schema/attribute 101]] + :test/expected-transaction + #{[100 :db.schema/attribute 100 ?tx2 true] + [101 :db.schema/attribute 101 ?tx2 true] + [?tx2 :db/txInstant ?ms2 ?tx2 true]} + :test/expected-datoms + #{[100 :db/ident :name/Ivan] + [101 :db/ident :name/Petr] + [100 :db.schema/attribute 100] + [101 :db.schema/attribute 101]} + :test/expected-tempids + {"t1" 100 + "t2" 101}} + + {:test/label "upsert with tempid" + :test/assertions + [[:db/add "t1" :db/ident :name/Ivan] + ;; Ref doesn't have to exist (at this time). Can't reuse due to :db/unique :db.unique/value. + [:db/add "t1" :db.schema/attribute 102]] + :test/expected-transaction + #{[100 :db.schema/attribute 102 ?tx3 true] + [?tx3 :db/txInstant ?ms3 ?tx3 true]} + :test/expected-datoms + #{[100 :db/ident :name/Ivan] + [101 :db/ident :name/Petr] + [100 :db.schema/attribute 100] + [100 :db.schema/attribute 102] + [101 :db.schema/attribute 101]} + :test/expected-tempids + {"t1" 100}} + + ;; TODO: don't hard-code allocated entids. + {:test/label "single complex upsert allocates new entid" + :test/assertions + [[:db/add "t1" :db.schema/attribute "t2"]] + :test/expected-transaction + #{[65536 :db.schema/attribute 65537 ?tx4 true] + [?tx4 :db/txInstant ?ms4 ?tx4 true]} + :test/expected-tempids + {"t1" 65536 + "t2" 65537}} + + {:test/label "conflicting upserts fail" + :test/assertions + [[:db/add "t1" :db/ident :name/Ivan] + [:db/add "t1" :db/ident :name/Petr]] + :test/expected-transaction + nil + :test/expected-error-message + "Conflicting upsert" + ;; nil + } + + {:test/label "tempids in :db/retract that do upsert are fine" + :test/assertions + [[:db/add "t1" :db/ident :name/Ivan] + ;; This ref doesn't exist, so the assertion will be ignored. + [:db/retract "t1" :db.schema/attribute 103]] + :test/expected-transaction + #{[?tx6 :db/txInstant ?ms6 ?tx6 true]} + :test/expected-error-message + "" + :test/expected-tempids + {}} + + {:test/label "tempids in :db/retract that don't upsert fail" + :test/assertions + [[:db/retract "t1" :db/ident :name/Anonymous]] + :test/expected-transaction + nil + :test/expected-error-message + ""} + + ;; The upsert algorithm will first try to resolve "t1", fail, and then allocate both "t1" and "t2". + {:test/label "multistep, both allocated" + :test/assertions + [[:db/add "t1" :db/ident :name/Josef] + [:db/add "t2" :db.schema/attribute "t1"]] + :test/expected-transaction + #{[65538 :db/ident :name/Josef ?tx8 true] + [65539 :db.schema/attribute 65538 ?tx8 true] + [?tx8 :db/txInstant ?ms8 ?tx8 true]} + :test/expected-error-message + "" + :test/expected-tempids + {"t1" 65538 + "t2" 65539}} + + ;; Can't quite test this without more schema elements. + ;; ;; This time, we can resolve both, but we have to try "t1", succeed, and then resolve "t2". + ;; {:test/label "multistep, upserted allocated" + ;; :test/assertions + ;; [[:db/add "t1" :db/ident :name/Josef] + ;; [:db/add "t2" :db/ident "t1"]] + ;; :test/expected-transaction + ;; #{[65538 :db/ident :name/Josef] + ;; [65538 :db/ident :name/Karl] + ;; [?tx8 :db/txInstant ?ms8 ?tx8 true]} + ;; :test/expected-error-message + ;; "" + ;; :test/expected-tempids + ;; {"t1" 65538 + ;; "t2" 65539}} + ] diff --git a/tx/src/entities.rs b/tx/src/entities.rs index c714cd3a..04a6126f 100644 --- a/tx/src/entities.rs +++ b/tx/src/entities.rs @@ -35,12 +35,13 @@ pub enum EntidOrLookupRef { } #[derive(Clone, Debug, PartialEq)] -pub enum ValueOrLookupRef { - Value(Value), +pub enum EntidOrLookupRefOrTempId { + Entid(Entid), LookupRef(LookupRef), + TempId(String), } -#[derive(Clone, Debug, PartialEq)] +#[derive(Clone, Debug, Eq, Hash, Ord, PartialOrd, PartialEq)] pub enum OpType { Add, Retract, @@ -50,8 +51,8 @@ pub enum OpType { pub enum Entity { AddOrRetract { op: OpType, - e: EntidOrLookupRef, + e: EntidOrLookupRefOrTempId, a: Entid, - v: ValueOrLookupRef, + v: Value, }, }