Schema alteration. Fixes #294 and #295. (#370) r=rnewman

* Pre: Don't retract :db/ident in test.

Datomic (and eventually Mentat) don't allow to retract :db/ident in
this way, so this runs afoul of future work to support mutating
metadata.

* Pre: s/VALUETYPE/VALUE_TYPE/.

This is consistent with the capitalization (which is "valueType") and
the other identifier.

* Pre: Remove some single quotes from error output.

* Part 1: Make materialized views be uniform [e a v value_type_tag].

This looks ahead to a time when we could support arbitrary
user-defined materialized views.  For now, the "idents" materialized
view is those datoms of the form [e :db/ident :namespaced/keyword] and
the "schema" materialized view is those datoms of the form [e a v]
where a is in a particular set of attributes that will become clear in
the following commits.

This change is not backwards compatible, so I'm removing the open
current (really, v2) test.  It'll be re-instated when we get to
https://github.com/mozilla/mentat/issues/194.

* Pre: Map TypedValue::Ref to TypedValue::Keyword in debug output.

* Part 3: Separate `schema_to_mutate` from the `schema` used to interpret.

This is just to keep track of the expected changes during
bootstrapping.  I want bootstrap metadata mutations to flow through
the same code path as metadata mutations during regular transactions;
by differentiating the schema used for interpretation from the schema
that will be updated I expect to be able to apply bootstrap metadata
mutations to an empty schema and have things like materialized views
created (using the regular code paths).

This commit has been re-ordered for conceptual clarity, but it won't
compile because it references the metadata module.  It's possible to
make it compile -- the functionality is there in the schema module --
but it's not worth the rebasing effort until after review (and
possibly not even then, since we'll squash down to a single commit to
land).

* Part 2: Maintain entids separately from idents.

In order to support historical idents, we need to distinguish the
"current" map from entid -> ident from the "complete historical" map
ident -> entid.  This is what Datomic does; in Datomic, an ident is
never retracted (although it can be replaced).  This approach is an
important part of allowing multiple consumers to share a schema
fragment as it migrates forward.

This fixes a limitation of the Clojure implementation, which did not
handle historical idents across knowledge base close and re-open.

The "entids" materialized view is naturally a slice of the "datoms"
table.  The "idents" materialized view is a slice of the
"transactions" table.  I hope that representing in this way, and
casting the problem in this light, might generalize to future
materialized views.

* Pre: Add DiffSet.

* Part 4: Collect mutations to a `Schema`.

I haven't taken your review comment about consuming AttributeBuilder
during each fluent function.  If you read my response and still want
this, I'm happy to do it in review.

* Part 5: Handle :db/ident and :db.{install,alter}/attribute.

This "loops" the committed datoms out of the SQL store and back
through the metadata (schema, but in future also partition map)
processor.  The metadata processor updates the schema and produces a
report of what changed; that report is then used to update the SQL
store.  That update includes:
- the materialized views ("entids", "idents", and "schema");
- if needed, a subset of the datoms themselves (as flags change).

I've left a TODO for handling attribute retraction in the cases that
it makes sense.  I expect that to be straight-forward.

* Review comment: Rename DiffSet to AddRetractAlterSet.

Also adds a little more commentary and a simple test.

* Review comment: Use ToIdent trait.

* Review comment: partially revert "Part 2: Maintain entids separately from idents."

This reverts commit 23a91df9c35e14398f2ddbd1ba25315821e67401.

Following our discussion, this removes the "entids" materialized
view.  The next commit will remove historical idents from the "idents"
materialized view.

* Post: Use custom Either rather than std::result::Result.

This is not necessary, but it was suggested that we might be paying an
overhead creating Err instances while using error_chain.  That seems
not to be the case, but this change shows that we don't actually use
any of the Result helper methods, so there's no reason to overload
Result.  This change might avoid some future confusion, so I'm going
to land it anyway.

Signed-off-by: Nick Alexander <nalexander@mozilla.com>

* Review comment: Don't preserve historical idents.

* Review comment: More prepared statements when updating materialized views.

* Post: Test altering :db/cardinality and :db/unique.

These tests fail due to a Datomic limitation, namely that the marker
flag :db.alter/attribute can only be asserted once for an attribute!
That is, [:db.part/db :db.alter/attribute :attribute] will only be
transacted at most once.  Since older versions of Datomic required the
:db.alter/attribute flag, I can only imagine they either never wrote
:db.alter/attribute to the store, or they handled it specially.  I'll
need to remove the marker flag system from Mentat in order to address
this fundamental limitation.

* Post: Remove some more single quotes from error output.

* Post: Add assert_transact! macro to unwrap safely.

I was finding it very difficult to track unwrapping errors while
making changes, due to an underlying Mac OS X symbolication issue that
makes running tests with RUST_BACKTRACE=1 so slow that they all time
out.

* Post: Don't expect or recognize :db.{install,alter}/attribute.

I had this all working... except we will never see a repeated
`[:db.part/db :db.alter/attribute :attribute]` assertion in the store!
That means my approach would let you alter an attribute at most one
time.  It's not worth hacking around this; it's better to just stop
expecting (and recognizing) the marker flags.  (We have all the data
to distinguish the various cases that we need without the marker
flags.)

This brings Mentat in line with the thrust of newer Datomic versions,
but isn't compatible with Datomic, because (if I understand correctly)
Datomic automatically adds :db.{install,alter}/attribute assertions to
transactions.

I haven't purged the corresponding :db/ident and schema fragments just
yet:
- we might want them back
- we might want them in order to upgrade v1 and v2 databases to the
  new on-disk layout we're fleshing out (v3?).

* Post: Don't make :db/unique :db.unique/* imply :db/index true.

This patch avoids a potential bug with the "schema" materialized view.
If :db/unique :db.unique/value implies :db/index true, then what
happens when you _retract_ :db.unique/value?  I think Datomic defines
this in some way, but I really want the "schema" materialized view to
be a slice of "datoms" and not have these sort of ambiguities and
persistent effects.  Therefore, to ensure that we don't retract a
schema characteristic and accidentally change more than we intended
to, this patch stops having any schema characteristic imply any other
schema characteristic(s).  To achieve that, I added an
Option<Unique::{Value,Identity}> type to Attribute; this helps with
this patch, and also looks ahead to when we allow to retract
:db/unique attributes.

* Post: Allow to retract :db/ident.

* Post: Include more details about invalid schema changes.

The tests use strings, so they hide the chained errors which do in
fact provide more detail.

* Review comment: Fix outdated comment.

* Review comment: s/_SET/_SQL_LIST/.

* Review comment: Use a sub-select for checking cardinality.

This might be faster in practice.

* Review comment: Put `attribute::Unique` into its own namespace.
This commit is contained in:
Nick Alexander 2017-03-20 13:18:59 -07:00 committed by GitHub
parent 8beea55e39
commit 15b4195a6e
15 changed files with 1225 additions and 333 deletions

View file

@ -74,7 +74,6 @@ impl TypedValue {
&TypedValue::Keyword(_) => ValueType::Keyword,
}
}
}
// Put this here rather than in `db` simply because it's widely needed.
@ -139,6 +138,14 @@ pub enum AttributeBitFlags {
UniqueValue = 1 << 3,
}
pub mod attribute {
#[derive(Clone,Debug,Eq,Hash,Ord,PartialOrd,PartialEq)]
pub enum Unique {
Value,
Identity,
}
}
/// A Mentat schema attribute has a value type and several other flags determining how assertions
/// with the attribute are interpreted.
///
@ -153,19 +160,22 @@ pub struct Attribute {
/// is `:db/cardinality :db.cardinality/one`.
pub multival: bool,
/// `true` if this attribute is unique-value, i.e., it is `:db/unique :db.unique/value`.
/// `None` if this attribute is neither unique-value nor unique-identity.
///
/// `Some(attribute::Unique::Value)` if this attribute is unique-value, i.e., it is `:db/unique
/// :db.unique/value`.
///
/// *Unique-value* means that there is at most one assertion with the attribute and a
/// particular value in the datom store.
pub unique_value: bool,
/// `true` if this attribute is unique-identity, i.e., it is `:db/unique :db.unique/identity`.
/// particular value in the datom store. Unique-value attributes can be used in lookup-refs.
///
/// `Some(attribute::Unique::Identity)` if this attribute is unique-identity, i.e., it is `:db/unique
/// :db.unique/identity`.
///
/// Unique-identity attributes always have value type `Ref`.
///
/// *Unique-identity* means that the attribute is *unique-value* and that they can be used in
/// lookup-refs and will automatically upsert where appropriate.
pub unique_identity: bool,
pub unique: Option<attribute::Unique>,
/// `true` if this attribute is automatically indexed, i.e., it is `:db/indexing true`.
pub index: bool,
@ -198,7 +208,7 @@ impl Attribute {
if self.fulltext {
flags |= AttributeBitFlags::IndexFulltext as u8;
}
if self.unique_value {
if self.unique.is_some() {
flags |= AttributeBitFlags::UniqueValue as u8;
}
flags
@ -213,8 +223,7 @@ impl Default for Attribute {
fulltext: false,
index: false,
multival: false,
unique_value: false,
unique_identity: false,
unique: None,
component: false,
}
}
@ -294,9 +303,8 @@ mod test {
index: true,
value_type: ValueType::Ref,
fulltext: false,
unique_value: false,
unique: None,
multival: false,
unique_identity: false,
component: false,
};
@ -309,9 +317,8 @@ mod test {
index: false,
value_type: ValueType::Boolean,
fulltext: true,
unique_value: true,
unique: Some(attribute::Unique::Value),
multival: false,
unique_identity: false,
component: false,
};
@ -319,6 +326,20 @@ mod test {
assert!(attr2.flags() & AttributeBitFlags::IndexVAET as u8 == 0);
assert!(attr2.flags() & AttributeBitFlags::IndexFulltext as u8 != 0);
assert!(attr2.flags() & AttributeBitFlags::UniqueValue as u8 != 0);
let attr3 = Attribute {
index: false,
value_type: ValueType::Boolean,
fulltext: true,
unique: Some(attribute::Unique::Identity),
multival: false,
component: false,
};
assert!(attr3.flags() & AttributeBitFlags::IndexAVET as u8 == 0);
assert!(attr3.flags() & AttributeBitFlags::IndexVAET as u8 == 0);
assert!(attr3.flags() & AttributeBitFlags::IndexFulltext as u8 != 0);
assert!(attr3.flags() & AttributeBitFlags::UniqueValue as u8 != 0);
}
}

View file

@ -0,0 +1,85 @@
// Copyright 2016 Mozilla
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// this file except in compliance with the License. You may obtain a copy of the
// License at http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
#![allow(dead_code)]
use std::collections::BTreeMap;
/// Witness assertions and retractions, folding (assertion, retraction) pairs into alterations.
/// Assumes that no assertion or retraction will be witnessed more than once.
///
/// This keeps track of when we see a :db/add, a :db/retract, or both :db/add and :db/retract in
/// some order.
#[derive(Clone, Debug, Eq, Hash, Ord, PartialOrd, PartialEq)]
pub struct AddRetractAlterSet<K, V> {
pub asserted: BTreeMap<K, V>,
pub retracted: BTreeMap<K, V>,
pub altered: BTreeMap<K, (V, V)>,
}
impl<K, V> Default for AddRetractAlterSet<K, V> where K: Ord {
fn default() -> AddRetractAlterSet<K, V> {
AddRetractAlterSet {
asserted: BTreeMap::default(),
retracted: BTreeMap::default(),
altered: BTreeMap::default(),
}
}
}
impl<K, V> AddRetractAlterSet<K, V> where K: Ord {
pub fn witness(&mut self, key: K, value: V, added: bool) {
if added {
if let Some(retracted_value) = self.retracted.remove(&key) {
self.altered.insert(key, (retracted_value, value));
} else {
self.asserted.insert(key, value);
}
} else {
if let Some(asserted_value) = self.asserted.remove(&key) {
self.altered.insert(key, (value, asserted_value));
} else {
self.retracted.insert(key, value);
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test() {
let mut set: AddRetractAlterSet<i64, char> = AddRetractAlterSet::default();
// Assertion.
set.witness(1, 'a', true);
// Retraction.
set.witness(2, 'b', false);
// Alteration.
set.witness(3, 'c', true);
set.witness(3, 'd', false);
// Alteration, witnessed in the with the retraction before the assertion.
set.witness(4, 'e', false);
set.witness(4, 'f', true);
let mut asserted = BTreeMap::default();
asserted.insert(1, 'a');
let mut retracted = BTreeMap::default();
retracted.insert(2, 'b');
let mut altered = BTreeMap::default();
altered.insert(3, ('d', 'c'));
altered.insert(4, ('e', 'f'));
assert_eq!(set.asserted, asserted);
assert_eq!(set.retracted, retracted);
assert_eq!(set.altered, altered);
}
}

View file

@ -38,7 +38,7 @@ lazy_static! {
(ns_keyword!("db.part", "db"), entids::DB_PART_DB),
(ns_keyword!("db", "txInstant"), entids::DB_TX_INSTANT),
(ns_keyword!("db.install", "partition"), entids::DB_INSTALL_PARTITION),
(ns_keyword!("db.install", "valueType"), entids::DB_INSTALL_VALUETYPE),
(ns_keyword!("db.install", "valueType"), entids::DB_INSTALL_VALUE_TYPE),
(ns_keyword!("db.install", "attribute"), entids::DB_INSTALL_ATTRIBUTE),
(ns_keyword!("db", "valueType"), entids::DB_VALUE_TYPE),
(ns_keyword!("db", "cardinality"), entids::DB_CARDINALITY),
@ -97,6 +97,7 @@ lazy_static! {
let s = r#"
{:db/ident {:db/valueType :db.type/keyword
:db/cardinality :db.cardinality/one
:db/index true
:db/unique :db.unique/identity}
:db.install/partition {:db/valueType :db.type/ref
:db/cardinality :db.cardinality/many}
@ -142,6 +143,7 @@ lazy_static! {
;; unique-value because an attribute can only belong to a single
;; schema fragment.
:db.schema/attribute {:db/valueType :db.type/ref
:db/index true
:db/unique :db.unique/value
:db/cardinality :db.cardinality/many}}"#;
let right = edn::parse::value(s)
@ -169,8 +171,8 @@ fn idents_to_assertions(idents: &[(symbols::NamespacedKeyword, i64)]) -> Vec<Val
/// Convert {:ident {:key :value ...} ...} to
/// vec![(symbols::NamespacedKeyword(:ident), symbols::NamespacedKeyword(:key), TypedValue(:value)), ...].
///
/// Such triples are closer to what the transactor will produce when processing
/// :db.install/attribute assertions.
/// Such triples are closer to what the transactor will produce when processing attribute
/// assertions.
fn symbolic_schema_to_triples(ident_map: &IdentMap, symbolic_schema: &Value) -> Result<Vec<(symbols::NamespacedKeyword, symbols::NamespacedKeyword, TypedValue)>> {
// Failure here is a coding error, not a runtime error.
let mut triples: Vec<(symbols::NamespacedKeyword, symbols::NamespacedKeyword, TypedValue)> = vec![];
@ -221,17 +223,12 @@ fn symbolic_schema_to_triples(ident_map: &IdentMap, symbolic_schema: &Value) ->
}
/// Convert {IDENT {:key :value ...} ...} to [[:db/add IDENT :key :value] ...].
/// In addition, add [:db.add :db.part/db :db.install/attribute IDENT] installation assertions.
fn symbolic_schema_to_assertions(symbolic_schema: &Value) -> Result<Vec<Value>> {
// Failure here is a coding error, not a runtime error.
let mut assertions: Vec<Value> = vec![];
match *symbolic_schema {
Value::Map(ref m) => {
for (ident, mp) in m {
assertions.push(Value::Vector(vec![values::DB_ADD.clone(),
values::DB_PART_DB.clone(),
values::DB_INSTALL_ATTRIBUTE.clone(),
ident.clone()]));
match *mp {
Value::Map(ref mpp) => {
for (attr, value) in mpp {

View file

@ -26,18 +26,23 @@ use rusqlite::limits::Limit;
use ::{repeat_values, to_namespaced_keyword};
use bootstrap;
use edn::types::Value;
use edn::symbols;
use entids;
use mentat_core::{
attribute,
Attribute,
AttributeBitFlags,
Entid,
IdentMap,
Schema,
SchemaMap,
TypedValue,
ValueType,
};
use errors::{ErrorKind, Result, ResultExt};
use schema::SchemaBuilding;
use metadata;
use schema::{
SchemaBuilding,
};
use types::{
AVMap,
AVPair,
@ -157,11 +162,11 @@ lazy_static! {
SELECT e, a, v, tx, value_type_tag, index_avet, index_vaet, index_fulltext, unique_value
FROM fulltext_datoms"#,
// Materialized views of the schema.
r#"CREATE TABLE idents (ident TEXT NOT NULL PRIMARY KEY, entid INTEGER UNIQUE NOT NULL)"#,
r#"CREATE TABLE schema (ident TEXT NOT NULL, attr TEXT NOT NULL, value BLOB NOT NULL, value_type_tag SMALLINT NOT NULL,
FOREIGN KEY (ident) REFERENCES idents (ident))"#,
r#"CREATE INDEX idx_schema_unique ON schema (ident, attr, value, value_type_tag)"#,
// Materialized views of the metadata.
r#"CREATE TABLE idents (e INTEGER NOT NULL, a SMALLINT NOT NULL, v BLOB NOT NULL, value_type_tag SMALLINT NOT NULL)"#,
r#"CREATE INDEX idx_idents_unique ON idents (e, a, v, value_type_tag)"#,
r#"CREATE TABLE schema (e INTEGER NOT NULL, a SMALLINT NOT NULL, v BLOB NOT NULL, value_type_tag SMALLINT NOT NULL)"#,
r#"CREATE INDEX idx_schema_unique ON schema (e, a, v, value_type_tag)"#,
// TODO: store entid instead of ident for partition name.
r#"CREATE TABLE parts (part TEXT NOT NULL PRIMARY KEY, start INTEGER NOT NULL, idx INTEGER NOT NULL)"#,
]
@ -209,10 +214,14 @@ pub fn create_current_version(conn: &mut rusqlite::Connection) -> Result<DB> {
// TODO: return to transact_internal to self-manage the encompassing SQLite transaction.
let bootstrap_schema = bootstrap::bootstrap_schema();
let (_report, next_partition_map, next_schema) = transact(&tx, bootstrap_partition_map, &bootstrap_schema, bootstrap::bootstrap_entities())?;
if next_schema.is_some() {
// TODO Use custom ErrorKind https://github.com/brson/error-chain/issues/117
bail!(ErrorKind::NotYetImplemented(format!("Initial bootstrap transaction did not produce expected bootstrap schema")));
let bootstrap_schema_for_mutation = Schema::default(); // The bootstrap transaction will populate this schema.
let (_report, next_partition_map, next_schema) = transact(&tx, bootstrap_partition_map, &bootstrap_schema_for_mutation, &bootstrap_schema, bootstrap::bootstrap_entities())?;
// TODO: validate metadata mutations that aren't schema related, like additional partitions.
if let Some(next_schema) = next_schema {
if next_schema != bootstrap_schema {
// TODO Use custom ErrorKind https://github.com/brson/error-chain/issues/117
bail!(ErrorKind::NotYetImplemented(format!("Initial bootstrap transaction did not produce expected bootstrap schema")));
}
}
set_user_version(&tx, CURRENT_VERSION)?;
@ -220,7 +229,6 @@ pub fn create_current_version(conn: &mut rusqlite::Connection) -> Result<DB> {
// TODO: use the drop semantics to do this automagically?
tx.commit()?;
// TODO: ensure that schema is not changed by bootstrap transaction.
let bootstrap_db = DB::new(next_partition_map, bootstrap_schema);
Ok(bootstrap_db)
}
@ -401,18 +409,23 @@ impl TypedSQLValue for TypedValue {
}
}
/// Read the ident map materialized view from the given SQL store.
pub fn read_ident_map(conn: &rusqlite::Connection) -> Result<IdentMap> {
let mut stmt: rusqlite::Statement = conn.prepare("SELECT ident, entid FROM idents")?;
let m = stmt.query_and_then(&[], |row| -> Result<(symbols::NamespacedKeyword, Entid)> {
let ident: String = row.get(0);
to_namespaced_keyword(&ident).map(|i| (i, row.get(1)))
/// Read an arbitrary [e a v value_type_tag] materialized view from the given table in the SQL
/// store.
fn read_materialized_view(conn: &rusqlite::Connection, table: &str) -> Result<Vec<(Entid, Entid, TypedValue)>> {
let mut stmt: rusqlite::Statement = conn.prepare(format!("SELECT e, a, v, value_type_tag FROM {}", table).as_str())?;
let m: Result<Vec<(Entid, Entid, TypedValue)>> = stmt.query_and_then(&[], |row| {
let e: Entid = row.get_checked(0)?;
let a: Entid = row.get_checked(1)?;
let v: rusqlite::types::Value = row.get_checked(2)?;
let value_type_tag: i32 = row.get_checked(3)?;
let typed_value = TypedValue::from_sql_value_pair(v, value_type_tag)?;
Ok((e, a, typed_value))
})?.collect();
m
}
/// Read the partition map materialized view from the given SQL store.
pub fn read_partition_map(conn: &rusqlite::Connection) -> Result<PartitionMap> {
fn read_partition_map(conn: &rusqlite::Connection) -> Result<PartitionMap> {
let mut stmt: rusqlite::Statement = conn.prepare("SELECT part, start, idx FROM parts")?;
let m = stmt.query_and_then(&[], |row| -> Result<(String, Partition)> {
Ok((row.get_checked(0)?, Partition::new(row.get_checked(1)?, row.get_checked(2)?)))
@ -420,29 +433,27 @@ pub fn read_partition_map(conn: &rusqlite::Connection) -> Result<PartitionMap> {
m
}
/// Read the schema materialized view from the given SQL store.
pub fn read_schema(conn: &rusqlite::Connection, ident_map: &IdentMap) -> Result<Schema> {
let mut stmt: rusqlite::Statement = conn.prepare("SELECT ident, attr, value, value_type_tag FROM schema")?;
let r: Result<Vec<(symbols::NamespacedKeyword, symbols::NamespacedKeyword, TypedValue)>> = stmt.query_and_then(&[], |row| {
// Each row looks like :db/index|:db/valueType|28|0. Observe that 28|0 represents a
// :db.type/ref to entid 28, which needs to be converted to a TypedValue.
// TODO: don't use textual ident and attr; just use entids directly.
let symbolic_ident: String = row.get_checked(0)?;
let symbolic_attr: String = row.get_checked(1)?;
let v: rusqlite::types::Value = row.get_checked(2)?;
let value_type_tag: i32 = row.get_checked(3)?;
let typed_value = TypedValue::from_sql_value_pair(v, value_type_tag)?;
let ident = to_namespaced_keyword(&symbolic_ident);
let attr = to_namespaced_keyword(&symbolic_attr);
match (ident, attr, typed_value) {
(Ok(ident), Ok(attr), typed_value) => Ok((ident, attr, typed_value)),
(Err(e), _, _) => Err(e),
(_, Err(e), _) => Err(e),
/// Read the ident map materialized view from the given SQL store.
fn read_ident_map(conn: &rusqlite::Connection) -> Result<IdentMap> {
let v = read_materialized_view(conn, "idents")?;
v.into_iter().map(|(e, a, typed_value)| {
if a != entids::DB_IDENT {
bail!(ErrorKind::NotYetImplemented(format!("bad idents materialized view: expected :db/ident but got {}", a)));
}
})?.collect();
if let TypedValue::Keyword(keyword) = typed_value {
Ok((keyword, e))
} else {
bail!(ErrorKind::NotYetImplemented(format!("bad idents materialized view: expected [entid :db/ident keyword] but got [entid :db/ident {:?}]", typed_value)));
}
}).collect()
}
r.and_then(|triples| Schema::from_ident_map_and_triples(ident_map.clone(), triples))
/// Read the schema materialized view from the given SQL store.
fn read_schema_map(conn: &rusqlite::Connection) -> Result<SchemaMap> {
let entid_triples = read_materialized_view(conn, "schema")?;
let mut schema_map = SchemaMap::default();
metadata::update_schema_map_from_entid_triples(&mut schema_map, entid_triples)?;
Ok(schema_map)
}
/// Read the materialized views from the given SQL store and return a Mentat `DB` for querying and
@ -450,7 +461,8 @@ pub fn read_schema(conn: &rusqlite::Connection, ident_map: &IdentMap) -> Result<
pub fn read_db(conn: &rusqlite::Connection) -> Result<DB> {
let partition_map = read_partition_map(conn)?;
let ident_map = read_ident_map(conn)?;
let schema = read_schema(conn, &ident_map)?;
let schema_map = read_schema_map(conn)?;
let schema = Schema::from_ident_map_and_schema_map(ident_map, schema_map)?;
Ok(DB::new(partition_map, schema))
}
@ -494,6 +506,9 @@ pub trait MentatStoring {
/// Use this to finalize temporary tables, complete indices, revert pragmas, etc, after the
/// final `insert_non_fts_searches` invocation.
fn commit_transaction(&self, tx_id: Entid) -> Result<()>;
/// Extract metadata-related [e a typed_value added] datoms committed in the given transaction.
fn committed_metadata_assertions(&self, tx_id: Entid) -> Result<Vec<(Entid, Entid, TypedValue, bool)>>;
}
/// Take search rows and complete `temp.search_results`.
@ -797,6 +812,19 @@ impl MentatStoring for rusqlite::Connection {
update_datoms(&self, tx_id)?;
Ok(())
}
fn committed_metadata_assertions(&self, tx_id: Entid) -> Result<Vec<(Entid, Entid, TypedValue, bool)>> {
// TODO: use concat! to avoid creating String instances.
let mut stmt = self.prepare_cached(format!("SELECT e, a, v, value_type_tag, added FROM transactions WHERE tx = ? AND a IN {} ORDER BY e, a, v, value_type_tag, added", entids::METADATA_SQL_LIST.as_str()).as_str())?;
let params = [&tx_id as &ToSql];
let m: Result<Vec<_>> = stmt.query_and_then(&params[..], |row| -> Result<(Entid, Entid, TypedValue, bool)> {
Ok((row.get_checked(0)?,
row.get_checked(1)?,
TypedValue::from_sql_value_pair(row.get_checked(2)?, row.get_checked(3)?)?,
row.get_checked(4)?))
})?.collect();
m
}
}
/// Update the current partition map materialized view.
@ -827,6 +855,91 @@ pub fn update_partition_map(conn: &rusqlite::Connection, partition_map: &Partiti
.chain_err(|| "Could not update partition map")
}
/// Update the metadata materialized views based on the given metadata report.
///
/// This updates the "entids", "idents", and "schema" materialized views, copying directly from the
/// "datoms" and "transactions" table as appropriate.
pub fn update_metadata(conn: &rusqlite::Connection, _old_schema: &Schema, new_schema: &Schema, metadata_report: &metadata::MetadataReport) -> Result<()>
{
use metadata::AttributeAlteration::*;
// Populate the materialized view directly from datoms (and, potentially in the future,
// transactions). This might generalize nicely as we expand the set of materialized views.
// TODO: consider doing this in fewer SQLite execute() invocations.
// TODO: use concat! to avoid creating String instances.
if !metadata_report.idents_altered.is_empty() {
// Idents is the materialized view of the [entid :db/ident ident] slice of datoms.
conn.execute(format!("DELETE FROM idents").as_str(),
&[])?;
conn.execute(format!("INSERT INTO idents SELECT e, a, v, value_type_tag FROM datoms WHERE a IN {}", entids::IDENTS_SQL_LIST.as_str()).as_str(),
&[])?;
}
let mut stmt = conn.prepare(format!("INSERT INTO schema SELECT e, a, v, value_type_tag FROM datoms WHERE e = ? AND a IN {}", entids::SCHEMA_SQL_LIST.as_str()).as_str())?;
for &entid in &metadata_report.attributes_installed {
stmt.execute(&[&entid as &ToSql])?;
}
let mut delete_stmt = conn.prepare(format!("DELETE FROM schema WHERE e = ? AND a IN {}", entids::SCHEMA_SQL_LIST.as_str()).as_str())?;
let mut insert_stmt = conn.prepare(format!("INSERT INTO schema SELECT e, a, v, value_type_tag FROM datoms WHERE e = ? AND a IN {}", entids::SCHEMA_SQL_LIST.as_str()).as_str())?;
let mut index_stmt = conn.prepare("UPDATE datoms SET index_avet = ? WHERE a = ?")?;
let mut unique_value_stmt = conn.prepare("UPDATE datoms SET unique_value = ? WHERE a = ?")?;
let mut cardinality_stmt = conn.prepare(r#"
SELECT EXISTS
(SELECT 1
FROM datoms AS left, datoms AS right
WHERE left.a = ? AND
left.a = right.a AND
left.e = right.e AND
left.v <> right.v)"#)?;
for (&entid, alterations) in &metadata_report.attributes_altered {
delete_stmt.execute(&[&entid as &ToSql])?;
insert_stmt.execute(&[&entid as &ToSql])?;
let attribute = new_schema.require_attribute_for_entid(entid)?;
for alteration in alterations {
match alteration {
&Index => {
// This should always succeed.
index_stmt.execute(&[&attribute.index, &entid as &ToSql])?;
},
&Unique => {
// TODO: This can fail if there are conflicting values; give a more helpful
// error message in this case.
if unique_value_stmt.execute(&[to_bool_ref(attribute.unique.is_some()), &entid as &ToSql]).is_err() {
match attribute.unique {
Some(attribute::Unique::Value) => bail!(ErrorKind::NotYetImplemented(format!("Cannot alter schema attribute {} to be :db.unique/value", entid))),
Some(attribute::Unique::Identity) => bail!(ErrorKind::NotYetImplemented(format!("Cannot alter schema attribute {} to be :db.unique/identity", entid))),
None => unreachable!(), // This shouldn't happen, even after we support removing :db/unique.
}
}
},
&Cardinality => {
// We can always go from :db.cardinality/one to :db.cardinality many. It's
// :db.cardinality/many to :db.cardinality/one that can fail.
//
// TODO: improve the failure message. Perhaps try to mimic what Datomic says in
// this case?
if !attribute.multival {
let mut rows = cardinality_stmt.query(&[&entid as &ToSql])?;
if rows.next().is_some() {
bail!(ErrorKind::NotYetImplemented(format!("Cannot alter schema attribute {} to be :db.cardinality/one", entid)));
}
}
},
&NoHistory | &IsComponent => {
// There's no on disk change required for either of these.
},
}
}
}
Ok(())
}
pub trait PartitionMapping {
fn allocate_entid<S: ?Sized + Ord + Display>(&mut self, partition: &S) -> i64 where String: Borrow<S>;
fn allocate_entids<S: ?Sized + Ord + Display>(&mut self, partition: &S, n: usize) -> Range<i64> where String: Borrow<S>;
@ -860,9 +973,10 @@ mod tests {
use edn;
use mentat_tx_parser;
use rusqlite;
use std::collections::BTreeMap;
use std::collections::{
BTreeMap,
};
use types::TxReport;
use tx;
// Macro to parse a `Borrow<str>` to an `edn::Value` and assert the given `edn::Value` `matches`
// against it.
@ -879,6 +993,21 @@ mod tests {
}}
}
// Transact $input against the given $conn, expecting success or a `Result<TxReport, String>`.
//
// This unwraps safely and makes asserting errors pleasant.
macro_rules! assert_transact {
( $conn: expr, $input: expr, $expected: expr ) => {{
let result = $conn.transact($input).map_err(|e| e.to_string());
assert_eq!(result, $expected.map_err(|e| e.to_string()));
}};
( $conn: expr, $input: expr ) => {{
let result = $conn.transact($input);
assert!(result.is_ok(), "Expected Ok(_), got `{}`", result.unwrap_err());
result.unwrap()
}};
}
// A connection that doesn't try to be clever about possibly sharing its `Schema`. Compare to
// `mentat::Conn`.
struct TestConn {
@ -888,17 +1017,37 @@ mod tests {
}
impl TestConn {
fn assert_materialized_views(&self) {
let materialized_ident_map = read_ident_map(&self.sqlite).expect("ident map");
let materialized_schema_map = read_schema_map(&self.sqlite).expect("schema map");
let materialized_schema = Schema::from_ident_map_and_schema_map(materialized_ident_map, materialized_schema_map).expect("schema");
assert_eq!(materialized_schema, self.schema);
}
fn transact<I>(&mut self, transaction: I) -> Result<TxReport> where I: Borrow<str> {
// Failure to parse the transaction is a coding error, so we unwrap.
let assertions = edn::parse::value(transaction.borrow()).unwrap().without_spans();
let entities: Vec<_> = mentat_tx_parser::Tx::parse(&[assertions][..]).unwrap();
// Applying the transaction can fail, so we don't unwrap.
let details = tx::transact(&self.sqlite, self.partition_map.clone(), &self.schema, entities)?;
let details = {
// The block scopes the borrow of self.sqlite.
let tx = self.sqlite.transaction()?;
// Applying the transaction can fail, so we don't unwrap.
let details = transact(&tx, self.partition_map.clone(), &self.schema, &self.schema, entities)?;
tx.commit()?;
details
};
let (report, next_partition_map, next_schema) = details;
self.partition_map = next_partition_map;
if let Some(next_schema) = next_schema {
self.schema = next_schema;
}
// Verify that we've updated the materialized views during transacting.
self.assert_materialized_views();
Ok(report)
}
@ -922,16 +1071,22 @@ mod tests {
// Does not include :db/txInstant.
let datoms = debug::datoms_after(&conn, &db.schema, 0).unwrap();
assert_eq!(datoms.0.len(), 88);
assert_eq!(datoms.0.len(), 74);
// Includes :db/txInstant.
let transactions = debug::transactions_after(&conn, &db.schema, 0).unwrap();
assert_eq!(transactions.0.len(), 1);
assert_eq!(transactions.0[0].0.len(), 89);
assert_eq!(transactions.0[0].0.len(), 75);
TestConn { sqlite: conn,
partition_map: db.partition_map,
schema: db.schema }
let test_conn = TestConn {
sqlite: conn,
partition_map: db.partition_map,
schema: db.schema };
// Verify that we've created the materialized views during bootstrapping.
test_conn.assert_materialized_views();
test_conn
}
}
@ -943,94 +1098,70 @@ mod tests {
edn::Value::Map(map)
}
#[test]
fn test_open_current_version() {
// TODO: figure out how to reference the fixtures directory for real. For now, assume we're
// executing `cargo test` in `db/`.
let conn = rusqlite::Connection::open("../fixtures/v2empty.db").unwrap();
let ident_map = read_ident_map(&conn).unwrap();
assert_eq!(ident_map, bootstrap::bootstrap_ident_map());
let schema = read_schema(&conn, &ident_map).unwrap();
assert_eq!(schema, bootstrap::bootstrap_schema());
let db = read_db(&conn).unwrap();
// Does not include :db/txInstant.
let datoms = debug::datoms_after(&conn, &db.schema, 0).unwrap();
assert_eq!(datoms.0.len(), 88);
// Includes :db/txInstant.
let transactions = debug::transactions_after(&conn, &db.schema, 0).unwrap();
assert_eq!(transactions.0.len(), 1);
assert_eq!(transactions.0[0].0.len(), 89);
}
#[test]
fn test_add() {
let mut conn = TestConn::default();
// Test inserting :db.cardinality/one elements.
conn.transact("[[:db/add 100 :db/ident :keyword/value1]
[:db/add 101 :db/ident :keyword/value2]]").unwrap();
assert_transact!(conn, "[[:db/add 100 :db.schema/version 1]
[:db/add 101 :db.schema/version 2]]");
assert_matches!(conn.last_transaction(),
"[[100 :db/ident :keyword/value1 ?tx true]
[101 :db/ident :keyword/value2 ?tx true]
"[[100 :db.schema/version 1 ?tx true]
[101 :db.schema/version 2 ?tx true]
[?tx :db/txInstant ?ms ?tx true]]");
assert_matches!(conn.datoms(),
"[[100 :db/ident :keyword/value1]
[101 :db/ident :keyword/value2]]");
"[[100 :db.schema/version 1]
[101 :db.schema/version 2]]");
// Test inserting :db.cardinality/many elements.
conn.transact("[[:db/add 200 :db.schema/attribute 100]
[:db/add 200 :db.schema/attribute 101]]").unwrap();
assert_transact!(conn, "[[:db/add 200 :db.schema/attribute 100]
[:db/add 200 :db.schema/attribute 101]]");
assert_matches!(conn.last_transaction(),
"[[200 :db.schema/attribute 100 ?tx true]
[200 :db.schema/attribute 101 ?tx true]
[?tx :db/txInstant ?ms ?tx true]]");
assert_matches!(conn.datoms(),
"[[100 :db/ident :keyword/value1]
[101 :db/ident :keyword/value2]
"[[100 :db.schema/version 1]
[101 :db.schema/version 2]
[200 :db.schema/attribute 100]
[200 :db.schema/attribute 101]]");
// Test replacing existing :db.cardinality/one elements.
conn.transact("[[:db/add 100 :db/ident :keyword/value11]
[:db/add 101 :db/ident :keyword/value22]]").unwrap();
assert_transact!(conn, "[[:db/add 100 :db.schema/version 11]
[:db/add 101 :db.schema/version 22]]");
assert_matches!(conn.last_transaction(),
"[[100 :db/ident :keyword/value1 ?tx false]
[100 :db/ident :keyword/value11 ?tx true]
[101 :db/ident :keyword/value2 ?tx false]
[101 :db/ident :keyword/value22 ?tx true]
"[[100 :db.schema/version 1 ?tx false]
[100 :db.schema/version 11 ?tx true]
[101 :db.schema/version 2 ?tx false]
[101 :db.schema/version 22 ?tx true]
[?tx :db/txInstant ?ms ?tx true]]");
assert_matches!(conn.datoms(),
"[[100 :db/ident :keyword/value11]
[101 :db/ident :keyword/value22]
"[[100 :db.schema/version 11]
[101 :db.schema/version 22]
[200 :db.schema/attribute 100]
[200 :db.schema/attribute 101]]");
// Test that asserting existing :db.cardinality/one elements doesn't change the store.
conn.transact("[[:db/add 100 :db/ident :keyword/value11]
[:db/add 101 :db/ident :keyword/value22]]").unwrap();
assert_transact!(conn, "[[:db/add 100 :db.schema/version 11]
[:db/add 101 :db.schema/version 22]]");
assert_matches!(conn.last_transaction(),
"[[?tx :db/txInstant ?ms ?tx true]]");
assert_matches!(conn.datoms(),
"[[100 :db/ident :keyword/value11]
[101 :db/ident :keyword/value22]
"[[100 :db.schema/version 11]
[101 :db.schema/version 22]
[200 :db.schema/attribute 100]
[200 :db.schema/attribute 101]]");
// Test that asserting existing :db.cardinality/many elements doesn't change the store.
conn.transact("[[:db/add 200 :db.schema/attribute 100]
[:db/add 200 :db.schema/attribute 101]]").unwrap();
assert_transact!(conn, "[[:db/add 200 :db.schema/attribute 100]
[:db/add 200 :db.schema/attribute 101]]");
assert_matches!(conn.last_transaction(),
"[[?tx :db/txInstant ?ms ?tx true]]");
assert_matches!(conn.datoms(),
"[[100 :db/ident :keyword/value11]
[101 :db/ident :keyword/value22]
"[[100 :db.schema/version 11]
[101 :db.schema/version 22]
[200 :db.schema/attribute 100]
[200 :db.schema/attribute 101]]");
}
@ -1040,66 +1171,67 @@ mod tests {
let mut conn = TestConn::default();
// Insert a few :db.cardinality/one elements.
conn.transact("[[:db/add 100 :db/ident :keyword/value1]
[:db/add 101 :db/ident :keyword/value2]]").unwrap();
assert_transact!(conn, "[[:db/add 100 :db.schema/version 1]
[:db/add 101 :db.schema/version 2]]");
assert_matches!(conn.last_transaction(),
"[[100 :db/ident :keyword/value1 ?tx true]
[101 :db/ident :keyword/value2 ?tx true]
"[[100 :db.schema/version 1 ?tx true]
[101 :db.schema/version 2 ?tx true]
[?tx :db/txInstant ?ms ?tx true]]");
assert_matches!(conn.datoms(),
"[[100 :db/ident :keyword/value1]
[101 :db/ident :keyword/value2]]");
"[[100 :db.schema/version 1]
[101 :db.schema/version 2]]");
// And a few :db.cardinality/many elements.
conn.transact("[[:db/add 200 :db.schema/attribute 100]
[:db/add 200 :db.schema/attribute 101]]").unwrap();
assert_transact!(conn, "[[:db/add 200 :db.schema/attribute 100]
[:db/add 200 :db.schema/attribute 101]]");
assert_matches!(conn.last_transaction(),
"[[200 :db.schema/attribute 100 ?tx true]
[200 :db.schema/attribute 101 ?tx true]
[?tx :db/txInstant ?ms ?tx true]]");
assert_matches!(conn.datoms(),
"[[100 :db/ident :keyword/value1]
[101 :db/ident :keyword/value2]
"[[100 :db.schema/version 1]
[101 :db.schema/version 2]
[200 :db.schema/attribute 100]
[200 :db.schema/attribute 101]]");
// Test that we can retract :db.cardinality/one elements.
conn.transact("[[:db/retract 100 :db/ident :keyword/value1]]").unwrap();
assert_transact!(conn, "[[:db/retract 100 :db.schema/version 1]]");
assert_matches!(conn.last_transaction(),
"[[100 :db/ident :keyword/value1 ?tx false]
"[[100 :db.schema/version 1 ?tx false]
[?tx :db/txInstant ?ms ?tx true]]");
assert_matches!(conn.datoms(),
"[[101 :db/ident :keyword/value2]
"[[101 :db.schema/version 2]
[200 :db.schema/attribute 100]
[200 :db.schema/attribute 101]]");
// Test that we can retract :db.cardinality/many elements.
conn.transact("[[:db/retract 200 :db.schema/attribute 100]]").unwrap();
assert_transact!(conn, "[[:db/retract 200 :db.schema/attribute 100]]");
assert_matches!(conn.last_transaction(),
"[[200 :db.schema/attribute 100 ?tx false]
[?tx :db/txInstant ?ms ?tx true]]");
assert_matches!(conn.datoms(),
"[[101 :db/ident :keyword/value2]
"[[101 :db.schema/version 2]
[200 :db.schema/attribute 101]]");
// Verify that retracting :db.cardinality/{one,many} elements that are not present doesn't
// change the store.
conn.transact("[[:db/retract 100 :db/ident :keyword/value1]
[:db/retract 200 :db.schema/attribute 100]]").unwrap();
assert_transact!(conn, "[[:db/retract 100 :db.schema/version 1]
[:db/retract 200 :db.schema/attribute 100]]");
assert_matches!(conn.last_transaction(),
"[[?tx :db/txInstant ?ms ?tx true]]");
assert_matches!(conn.datoms(),
"[[101 :db/ident :keyword/value2]
"[[101 :db.schema/version 2]
[200 :db.schema/attribute 101]]");
}
// TODO: don't use :db/ident to test upserts!
#[test]
fn test_upsert_vector() {
let mut conn = TestConn::default();
// Insert some :db.unique/identity elements.
conn.transact("[[:db/add 100 :db/ident :name/Ivan]
[:db/add 101 :db/ident :name/Petr]]").unwrap();
assert_transact!(conn, "[[:db/add 100 :db/ident :name/Ivan]
[:db/add 101 :db/ident :name/Petr]]");
assert_matches!(conn.last_transaction(),
"[[100 :db/ident :name/Ivan ?tx true]
[101 :db/ident :name/Petr ?tx true]
@ -1109,42 +1241,41 @@ mod tests {
[101 :db/ident :name/Petr]]");
// Upserting two tempids to the same entid works.
let report = conn.transact("[[:db/add \"t1\" :db/ident :name/Ivan]
[:db/add \"t1\" :db.schema/attribute 100]
[:db/add \"t2\" :db/ident :name/Petr]
[:db/add \"t2\" :db.schema/attribute 101]]").unwrap();
let report = assert_transact!(conn, "[[:db/add \"t1\" :db/ident :name/Ivan]
[:db/add \"t1\" :db.schema/attribute 100]
[:db/add \"t2\" :db/ident :name/Petr]
[:db/add \"t2\" :db.schema/attribute 101]]");
assert_matches!(conn.last_transaction(),
"[[100 :db.schema/attribute 100 ?tx true]
[101 :db.schema/attribute 101 ?tx true]
"[[100 :db.schema/attribute :name/Ivan ?tx true]
[101 :db.schema/attribute :name/Petr ?tx true]
[?tx :db/txInstant ?ms ?tx true]]");
assert_matches!(conn.datoms(),
"[[100 :db/ident :name/Ivan]
[100 :db.schema/attribute 100]
[100 :db.schema/attribute :name/Ivan]
[101 :db/ident :name/Petr]
[101 :db.schema/attribute 101]]");
[101 :db.schema/attribute :name/Petr]]");
assert_matches!(tempids(&report),
"{\"t1\" 100
\"t2\" 101}");
// Upserting a tempid works. The ref doesn't have to exist (at this time), but we can't
// reuse an existing ref due to :db/unique :db.unique/value.
let report = conn.transact("[[:db/add \"t1\" :db/ident :name/Ivan]
[:db/add \"t1\" :db.schema/attribute 102]]").unwrap();
let report = assert_transact!(conn, "[[:db/add \"t1\" :db/ident :name/Ivan]
[:db/add \"t1\" :db.schema/attribute 102]]");
assert_matches!(conn.last_transaction(),
"[[100 :db.schema/attribute 102 ?tx true]
[?tx :db/txInstant ?ms ?tx true]]");
[?true :db/txInstant ?ms ?tx true]]");
assert_matches!(conn.datoms(),
"[[100 :db/ident :name/Ivan]
[100 :db.schema/attribute 100]
[100 :db.schema/attribute :name/Ivan]
[100 :db.schema/attribute 102]
[101 :db/ident :name/Petr]
[101 :db.schema/attribute 101]]");
[101 :db.schema/attribute :name/Petr]]");
assert_matches!(tempids(&report),
"{\"t1\" 100}");
// A single complex upsert allocates a new entid.
let report = conn.transact("[[:db/add \"t1\" :db.schema/attribute \"t2\"]]").unwrap();
let report = assert_transact!(conn, "[[:db/add \"t1\" :db.schema/attribute \"t2\"]]");
assert_matches!(conn.last_transaction(),
"[[65536 :db.schema/attribute 65537 ?tx true]
[?tx :db/txInstant ?ms ?tx true]]");
@ -1153,18 +1284,18 @@ mod tests {
\"t2\" 65537}");
// Conflicting upserts fail.
let err = conn.transact("[[:db/add \"t1\" :db/ident :name/Ivan]
[:db/add \"t1\" :db/ident :name/Petr]]").unwrap_err().to_string();
assert_eq!(err, "not yet implemented: Conflicting upsert: tempid \'t1\' resolves to more than one entid: 100, 101");
assert_transact!(conn, "[[:db/add \"t1\" :db/ident :name/Ivan]
[:db/add \"t1\" :db/ident :name/Petr]]",
Err("not yet implemented: Conflicting upsert: tempid \'t1\' resolves to more than one entid: 100, 101"));
// tempids in :db/retract that don't upsert fail.
let err = conn.transact("[[:db/retract \"t1\" :db/ident :name/Anonymous]]").unwrap_err().to_string();
assert_eq!(err, "not yet implemented: [:db/retract ...] entity referenced tempid that did not upsert: t1");
assert_transact!(conn, "[[:db/retract \"t1\" :db/ident :name/Anonymous]]",
Err("not yet implemented: [:db/retract ...] entity referenced tempid that did not upsert: t1"));
// tempids in :db/retract that do upsert are retracted. The ref given doesn't exist, so the
// assertion will be ignored.
let report = conn.transact("[[:db/add \"t1\" :db/ident :name/Ivan]
[:db/retract \"t1\" :db.schema/attribute 103]]").unwrap();
let report = assert_transact!(conn, "[[:db/add \"t1\" :db/ident :name/Ivan]
[:db/retract \"t1\" :db.schema/attribute 103]]");
assert_matches!(conn.last_transaction(),
"[[?tx :db/txInstant ?ms ?tx true]]");
assert_matches!(tempids(&report),
@ -1172,11 +1303,11 @@ mod tests {
// A multistep upsert. The upsert algorithm will first try to resolve "t1", fail, and then
// allocate both "t1" and "t2".
let report = conn.transact("[[:db/add \"t1\" :db/ident :name/Josef]
[:db/add \"t2\" :db.schema/attribute \"t1\"]]").unwrap();
let report = assert_transact!(conn, "[[:db/add \"t1\" :db/ident :name/Josef]
[:db/add \"t2\" :db.schema/attribute \"t1\"]]");
assert_matches!(conn.last_transaction(),
"[[65538 :db/ident :name/Josef ?tx true]
[65539 :db.schema/attribute 65538 ?tx true]
[65539 :db.schema/attribute :name/Josef ?tx true]
[?tx :db/txInstant ?ms ?tx true]]");
assert_matches!(tempids(&report),
"{\"t1\" 65538
@ -1204,4 +1335,216 @@ mod tests {
conn.set_limit(Limit::SQLITE_LIMIT_VARIABLE_NUMBER, 222);
assert_eq!(222, conn.limit(Limit::SQLITE_LIMIT_VARIABLE_NUMBER));
}
#[test]
fn test_db_install() {
let mut conn = TestConn::default();
// We can assert a new attribute.
assert_transact!(conn, "[[:db/add 100 :db/ident :test/ident]
[:db/add 100 :db/valueType :db.type/long]
[:db/add 100 :db/cardinality :db.cardinality/many]]");
assert_eq!(conn.schema.entid_map.get(&100).cloned().unwrap(), to_namespaced_keyword(":test/ident").unwrap());
assert_eq!(conn.schema.ident_map.get(&to_namespaced_keyword(":test/ident").unwrap()).cloned().unwrap(), 100);
let attribute = conn.schema.attribute_for_entid(100).unwrap().clone();
assert_eq!(attribute.value_type, ValueType::Long);
assert_eq!(attribute.multival, true);
assert_eq!(attribute.fulltext, false);
assert_matches!(conn.last_transaction(),
"[[100 :db/ident :test/ident ?tx true]
[100 :db/valueType :db.type/long ?tx true]
[100 :db/cardinality :db.cardinality/many ?tx true]
[?tx :db/txInstant ?ms ?tx true]]");
assert_matches!(conn.datoms(),
"[[100 :db/ident :test/ident]
[100 :db/valueType :db.type/long]
[100 :db/cardinality :db.cardinality/many]]");
// Let's check we actually have the schema characteristics we expect.
let attribute = conn.schema.attribute_for_entid(100).unwrap().clone();
assert_eq!(attribute.value_type, ValueType::Long);
assert_eq!(attribute.multival, true);
assert_eq!(attribute.fulltext, false);
// Let's check that we can use the freshly installed attribute.
assert_transact!(conn, "[[:db/add 101 100 -10]
[:db/add 101 :test/ident -9]]");
assert_matches!(conn.last_transaction(),
"[[101 :test/ident -10 ?tx true]
[101 :test/ident -9 ?tx true]
[?tx :db/txInstant ?ms ?tx true]]");
// Cannot retract a characteristic of an installed attribute.
assert_transact!(conn,
"[[:db/retract 100 :db/cardinality :db.cardinality/many]]",
Err("not yet implemented: Retracting metadata attribute assertions not yet implemented: retracted [e a] pairs [[100 8]]"));
// Trying to install an attribute without a :db/ident is allowed.
assert_transact!(conn, "[[:db/add 101 :db/valueType :db.type/long]
[:db/add 101 :db/cardinality :db.cardinality/many]]");
}
#[test]
fn test_db_alter() {
let mut conn = TestConn::default();
// Start by installing a :db.cardinality/one attribute.
assert_transact!(conn, "[[:db/add 100 :db/ident :test/ident]
[:db/add 100 :db/valueType :db.type/keyword]
[:db/add 100 :db/cardinality :db.cardinality/one]]");
// Trying to alter the :db/valueType will fail.
assert_transact!(conn, "[[:db/add 100 :db/valueType :db.type/long]]",
Err("bad schema assertion: Schema alteration for existing attribute with entid 100 is not valid"));
// But we can alter the cardinality.
assert_transact!(conn, "[[:db/add 100 :db/cardinality :db.cardinality/many]]");
assert_matches!(conn.last_transaction(),
"[[100 :db/cardinality :db.cardinality/one ?tx false]
[100 :db/cardinality :db.cardinality/many ?tx true]
[?tx :db/txInstant ?ms ?tx true]]");
assert_matches!(conn.datoms(),
"[[100 :db/ident :test/ident]
[100 :db/valueType :db.type/keyword]
[100 :db/cardinality :db.cardinality/many]]");
// Let's check we actually have the schema characteristics we expect.
let attribute = conn.schema.attribute_for_entid(100).unwrap().clone();
assert_eq!(attribute.value_type, ValueType::Keyword);
assert_eq!(attribute.multival, true);
assert_eq!(attribute.fulltext, false);
// Let's check that we can use the freshly altered attribute's new characteristic.
assert_transact!(conn, "[[:db/add 101 100 :test/value1]
[:db/add 101 :test/ident :test/value2]]");
assert_matches!(conn.last_transaction(),
"[[101 :test/ident :test/value1 ?tx true]
[101 :test/ident :test/value2 ?tx true]
[?tx :db/txInstant ?ms ?tx true]]");
}
#[test]
fn test_db_ident() {
let mut conn = TestConn::default();
// We can assert a new :db/ident.
assert_transact!(conn, "[[:db/add 100 :db/ident :name/Ivan]]");
assert_matches!(conn.last_transaction(),
"[[100 :db/ident :name/Ivan ?tx true]
[?tx :db/txInstant ?ms ?tx true]]");
assert_matches!(conn.datoms(),
"[[100 :db/ident :name/Ivan]]");
assert_eq!(conn.schema.entid_map.get(&100).cloned().unwrap(), to_namespaced_keyword(":name/Ivan").unwrap());
assert_eq!(conn.schema.ident_map.get(&to_namespaced_keyword(":name/Ivan").unwrap()).cloned().unwrap(), 100);
// We can re-assert an existing :db/ident.
assert_transact!(conn, "[[:db/add 100 :db/ident :name/Ivan]]");
assert_matches!(conn.last_transaction(),
"[[?tx :db/txInstant ?ms ?tx true]]");
assert_matches!(conn.datoms(),
"[[100 :db/ident :name/Ivan]]");
assert_eq!(conn.schema.entid_map.get(&100).cloned().unwrap(), to_namespaced_keyword(":name/Ivan").unwrap());
assert_eq!(conn.schema.ident_map.get(&to_namespaced_keyword(":name/Ivan").unwrap()).cloned().unwrap(), 100);
// We can alter an existing :db/ident to have a new keyword.
assert_transact!(conn, "[[:db/add :name/Ivan :db/ident :name/Petr]]");
assert_matches!(conn.last_transaction(),
"[[100 :db/ident :name/Ivan ?tx false]
[100 :db/ident :name/Petr ?tx true]
[?tx :db/txInstant ?ms ?tx true]]");
assert_matches!(conn.datoms(),
"[[100 :db/ident :name/Petr]]");
// Entid map is updated.
assert_eq!(conn.schema.entid_map.get(&100).cloned().unwrap(), to_namespaced_keyword(":name/Petr").unwrap());
// Ident map contains the new ident.
assert_eq!(conn.schema.ident_map.get(&to_namespaced_keyword(":name/Petr").unwrap()).cloned().unwrap(), 100);
// Ident map no longer contains the old ident.
assert!(conn.schema.ident_map.get(&to_namespaced_keyword(":name/Ivan").unwrap()).is_none());
// We can re-purpose an old ident.
assert_transact!(conn, "[[:db/add 101 :db/ident :name/Ivan]]");
assert_matches!(conn.last_transaction(),
"[[101 :db/ident :name/Ivan ?tx true]
[?tx :db/txInstant ?ms ?tx true]]");
assert_matches!(conn.datoms(),
"[[100 :db/ident :name/Petr]
[101 :db/ident :name/Ivan]]");
// Entid map contains both entids.
assert_eq!(conn.schema.entid_map.get(&100).cloned().unwrap(), to_namespaced_keyword(":name/Petr").unwrap());
assert_eq!(conn.schema.entid_map.get(&101).cloned().unwrap(), to_namespaced_keyword(":name/Ivan").unwrap());
// Ident map contains the new ident.
assert_eq!(conn.schema.ident_map.get(&to_namespaced_keyword(":name/Petr").unwrap()).cloned().unwrap(), 100);
// Ident map contains the old ident, but re-purposed to the new entid.
assert_eq!(conn.schema.ident_map.get(&to_namespaced_keyword(":name/Ivan").unwrap()).cloned().unwrap(), 101);
// We can retract an existing :db/ident.
assert_transact!(conn, "[[:db/retract :name/Petr :db/ident :name/Petr]]");
// It's really gone.
assert!(conn.schema.entid_map.get(&100).is_none());
assert!(conn.schema.ident_map.get(&to_namespaced_keyword(":name/Petr").unwrap()).is_none());
}
#[test]
fn test_db_alter_cardinality() {
let mut conn = TestConn::default();
// Start by installing a :db.cardinality/one attribute.
assert_transact!(conn, "[[:db/add 100 :db/ident :test/ident]
[:db/add 100 :db/valueType :db.type/long]
[:db/add 100 :db/cardinality :db.cardinality/one]]");
assert_transact!(conn, "[[:db/add 200 :test/ident 1]]");
// We can always go from :db.cardinality/one to :db.cardinality/many.
assert_transact!(conn, "[[:db/add 100 :db/cardinality :db.cardinality/many]]");
assert_transact!(conn, "[[:db/add 200 :test/ident 2]]");
assert_matches!(conn.datoms(),
"[[100 :db/ident :test/ident]
[100 :db/valueType :db.type/long]
[100 :db/cardinality :db.cardinality/many]
[200 :test/ident 1]
[200 :test/ident 2]]");
// We can't always go from :db.cardinality/many to :db.cardinality/one.
assert_transact!(conn, "[[:db/add 100 :db/cardinality :db.cardinality/one]]",
// TODO: give more helpful error details.
Err("not yet implemented: Cannot alter schema attribute 100 to be :db.cardinality/one"));
}
#[test]
fn test_db_alter_unique_value() {
let mut conn = TestConn::default();
// Start by installing a :db.cardinality/one attribute.
assert_transact!(conn, "[[:db/add 100 :db/ident :test/ident]
[:db/add 100 :db/valueType :db.type/long]
[:db/add 100 :db/cardinality :db.cardinality/one]]");
assert_transact!(conn, "[[:db/add 200 :test/ident 1]
[:db/add 201 :test/ident 1]]");
// We can't always migrate to be :db.unique/value.
assert_transact!(conn, "[[:db/add :test/ident :db/unique :db.unique/value]]",
// TODO: give more helpful error details.
Err("not yet implemented: Cannot alter schema attribute 100 to be :db.unique/value"));
// Not even indirectly!
assert_transact!(conn, "[[:db/add :test/ident :db/unique :db.unique/identity]]",
// TODO: give more helpful error details.
Err("not yet implemented: Cannot alter schema attribute 100 to be :db.unique/identity"));
// But we can if we make sure there's no repeated [a v] pair.
assert_transact!(conn, "[[:db/add 201 :test/ident 2]]");
assert_transact!(conn, "[[:db/add :test/ident :db/index true]
[:db/add :test/ident :db/unique :db.unique/value]
[:db/add :db.part/db :db.alter/attribute 100]]");
}
}

View file

@ -86,6 +86,21 @@ impl Transactions {
}
}
/// Turn TypedValue::Ref into TypedValue::Keyword when it is possible.
trait ToIdent {
fn map_ident(self, schema: &Schema) -> Self;
}
impl ToIdent for TypedValue {
fn map_ident(self, schema: &Schema) -> Self {
if let TypedValue::Ref(e) = self {
schema.get_ident(e).cloned().map(TypedValue::Keyword).unwrap_or(TypedValue::Ref(e))
} else {
self
}
}
}
/// Convert a numeric entid to an ident `Entid` if possible, otherwise a numeric `Entid`.
fn to_entid(schema: &Schema, entid: i64) -> Entid {
schema.get_ident(entid).map_or(Entid::Entid(entid), |ident| Entid::Ident(ident.clone()))
@ -102,6 +117,8 @@ pub fn datoms<S: Borrow<Schema>>(conn: &rusqlite::Connection, schema: &S) -> Res
///
/// The datom set returned does not include any datoms of the form [... :db/txInstant ...].
pub fn datoms_after<S: Borrow<Schema>>(conn: &rusqlite::Connection, schema: &S, tx: i64) -> Result<Datoms> {
let borrowed_schema = schema.borrow();
let mut stmt: rusqlite::Statement = conn.prepare("SELECT e, a, v, value_type_tag, tx FROM datoms WHERE tx > ? ORDER BY e ASC, a ASC, value_type_tag ASC, v ASC, tx ASC")?;
let r: Result<Vec<_>> = stmt.query_and_then(&[&tx], |row| {
@ -115,12 +132,11 @@ pub fn datoms_after<S: Borrow<Schema>>(conn: &rusqlite::Connection, schema: &S,
let v: rusqlite::types::Value = row.get_checked(2)?;
let value_type_tag: i32 = row.get_checked(3)?;
let typed_value = TypedValue::from_sql_value_pair(v, value_type_tag)?;
let typed_value = TypedValue::from_sql_value_pair(v, value_type_tag)?.map_ident(borrowed_schema);
let (value, _) = typed_value.to_edn_value_pair();
let tx: i64 = row.get_checked(4)?;
let borrowed_schema = schema.borrow();
Ok(Some(Datom {
e: Entid::Entid(e),
a: to_entid(borrowed_schema, a),
@ -138,6 +154,8 @@ pub fn datoms_after<S: Borrow<Schema>>(conn: &rusqlite::Connection, schema: &S,
///
/// Each transaction returned includes the [:db/tx :db/txInstant ...] datom.
pub fn transactions_after<S: Borrow<Schema>>(conn: &rusqlite::Connection, schema: &S, tx: i64) -> Result<Transactions> {
let borrowed_schema = schema.borrow();
let mut stmt: rusqlite::Statement = conn.prepare("SELECT e, a, v, value_type_tag, tx, added FROM transactions WHERE tx > ? ORDER BY tx ASC, e ASC, a ASC, value_type_tag ASC, v ASC, added ASC")?;
let r: Result<Vec<_>> = stmt.query_and_then(&[&tx], |row| {
@ -147,13 +165,12 @@ pub fn transactions_after<S: Borrow<Schema>>(conn: &rusqlite::Connection, schema
let v: rusqlite::types::Value = row.get_checked(2)?;
let value_type_tag: i32 = row.get_checked(3)?;
let typed_value = TypedValue::from_sql_value_pair(v, value_type_tag)?;
let typed_value = TypedValue::from_sql_value_pair(v, value_type_tag)?.map_ident(borrowed_schema);
let (value, _) = typed_value.to_edn_value_pair();
let tx: i64 = row.get_checked(4)?;
let added: bool = row.get_checked(5)?;
let borrowed_schema = schema.borrow();
Ok(Datom {
e: Entid::Entid(e),
a: to_entid(borrowed_schema, a),

View file

@ -21,7 +21,7 @@ pub const DB_IDENT: Entid = 1;
pub const DB_PART_DB: Entid = 2;
pub const DB_TX_INSTANT: Entid = 3;
pub const DB_INSTALL_PARTITION: Entid = 4;
pub const DB_INSTALL_VALUETYPE: Entid = 5;
pub const DB_INSTALL_VALUE_TYPE: Entid = 5;
pub const DB_INSTALL_ATTRIBUTE: Entid = 6;
pub const DB_VALUE_TYPE: Entid = 7;
pub const DB_CARDINALITY: Entid = 8;
@ -56,3 +56,58 @@ pub const DB_DOC: Entid = 35;
// Added in SQL schema v2.
pub const DB_SCHEMA_VERSION: Entid = 36;
pub const DB_SCHEMA_ATTRIBUTE: Entid = 37;
/// Return `false` if the given attribute will not change the metadata: recognized idents, schema,
/// partitions in the partition map.
pub fn might_update_metadata(attribute: Entid) -> bool {
if attribute > DB_DOC {
return false
}
match attribute {
// Idents.
DB_IDENT |
// Schema.
DB_CARDINALITY |
DB_DOC |
DB_FULLTEXT |
DB_INDEX |
DB_IS_COMPONENT |
DB_UNIQUE |
DB_VALUE_TYPE =>
true,
_ => false,
}
}
lazy_static! {
/// Attributes that are "ident related". These might change the "idents" materialized view.
pub static ref IDENTS_SQL_LIST: String = {
format!("({})",
DB_IDENT)
};
/// Attributes that are "schema related". These might change the "schema" materialized view.
pub static ref SCHEMA_SQL_LIST: String = {
format!("({}, {}, {}, {}, {}, {}, {})",
DB_CARDINALITY,
DB_DOC,
DB_FULLTEXT,
DB_INDEX,
DB_IS_COMPONENT,
DB_UNIQUE,
DB_VALUE_TYPE)
};
/// Attributes that are "metadata" related. These might change one of the materialized views.
pub static ref METADATA_SQL_LIST: String = {
format!("({}, {}, {}, {}, {}, {}, {}, {})",
DB_CARDINALITY,
DB_DOC,
DB_FULLTEXT,
DB_IDENT,
DB_INDEX,
DB_IS_COMPONENT,
DB_UNIQUE,
DB_VALUE_TYPE)
};
}

View file

@ -55,25 +55,25 @@ error_chain! {
/// a runtime error.
BadBootstrapDefinition(t: String) {
description("bad bootstrap definition")
display("bad bootstrap definition: '{}'", t)
display("bad bootstrap definition: {}", t)
}
/// A schema assertion couldn't be parsed.
BadSchemaAssertion(t: String) {
description("bad schema assertion")
display("bad schema assertion: '{}'", t)
display("bad schema assertion: {}", t)
}
/// An ident->entid mapping failed.
UnrecognizedIdent(ident: String) {
description("no entid found for ident")
display("no entid found for ident: '{}'", ident)
display("no entid found for ident: {}", ident)
}
/// An entid->ident mapping failed.
UnrecognizedEntid(entid: Entid) {
description("no ident found for entid")
display("no ident found for entid: '{}'", entid)
display("no ident found for entid: {}", entid)
}
}
}

View file

@ -12,7 +12,6 @@
//! Types used only within the transactor. These should not be exposed outside of this crate.
use std;
use std::collections::HashMap;
use std::rc::Rc;
@ -31,8 +30,16 @@ pub enum Term<E, V> {
AddOrRetract(OpType, E, Entid, V),
}
pub type EntidOr<T> = std::result::Result<Entid, T>;
pub type TypedValueOr<T> = std::result::Result<TypedValue, T>;
#[derive(Clone,Debug,Eq,Hash,Ord,PartialOrd,PartialEq)]
pub enum Either<L, R> {
Left(L),
Right(R),
}
use self::Either::*;
pub type EntidOr<T> = Either<Entid, T>;
pub type TypedValueOr<T> = Either<TypedValue, T>;
pub type TempId = Rc<String>;
pub type TempIdMap = HashMap<TempId, Entid>;
@ -59,7 +66,7 @@ impl TermWithTempIds {
// Rust and seems appropriate here.
pub fn unwrap(self) -> TermWithoutTempIds {
match self {
Term::AddOrRetract(op, Ok(n), a, Ok(v)) => Term::AddOrRetract(op, n, a, v),
Term::AddOrRetract(op, Left(n), a, Left(v)) => Term::AddOrRetract(op, n, a, v),
_ => unreachable!(),
}
}
@ -74,14 +81,14 @@ impl TermWithTempIds {
/// The reason for this awkward expression is that we're parameterizing over the _type constructor_
/// (`EntidOr` or `TypedValueOr`), which is not trivial to express in Rust. This only works because
/// they're both the same `Result<...>` type with different parameterizations.
pub fn replace_lookup_ref<T, U>(lookup_map: &AVMap, desired_or: Result<T, LookupRefOrTempId>, lift: U) -> errors::Result<Result<T, TempId>> where U: FnOnce(Entid) -> T {
pub fn replace_lookup_ref<T, U>(lookup_map: &AVMap, desired_or: Either<T, LookupRefOrTempId>, lift: U) -> errors::Result<Either<T, TempId>> where U: FnOnce(Entid) -> T {
match desired_or {
Ok(desired) => Ok(Ok(desired)), // N.b., must unwrap here -- the ::Ok types are different!
Err(other) => {
Left(desired) => Ok(Left(desired)), // N.b., must unwrap here -- the ::Left types are different!
Right(other) => {
match other {
LookupRefOrTempId::TempId(t) => Ok(Err(t)),
LookupRefOrTempId::TempId(t) => Ok(Right(t)),
LookupRefOrTempId::LookupRef(av) => lookup_map.get(&*av)
.map(|x| lift(*x)).map(Ok)
.map(|x| lift(*x)).map(Left)
// XXX TODO: fix this error kind!
.ok_or_else(|| ErrorKind::UnrecognizedIdent(format!("couldn't lookup [a v]: {:?}", (*av).clone())).into()),
}

View file

@ -31,8 +31,10 @@ pub use errors::{Error, ErrorKind, ResultExt, Result};
pub mod db;
mod bootstrap;
pub mod debug;
mod add_retract_alter_set;
mod entids;
pub mod errors;
mod metadata;
mod schema;
mod types;
mod internal_types;

274
db/src/metadata.rs Normal file
View file

@ -0,0 +1,274 @@
// Copyright 2016 Mozilla
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// this file except in compliance with the License. You may obtain a copy of the
// License at http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
#![allow(dead_code)]
//! Most transactions can mutate the Mentat metadata by transacting assertions:
//!
//! - they can add (and, eventually, retract and alter) recognized idents using the `:db/ident`
//! attribute;
//!
//! - they can add (and, eventually, retract and alter) schema attributes using various `:db/*`
//! attributes;
//!
//! - eventually, they will be able to add (and possibly retract) entid partitions using a Mentat
//! equivalent (perhaps :db/partition or :db.partition/start) to Datomic's `:db.install/partition`
//! attribute.
//!
//! This module recognizes, validates, applies, and reports on these mutations.
use std::collections::{BTreeMap, BTreeSet};
use std::collections::btree_map::Entry;
use itertools::Itertools; // For join().
use add_retract_alter_set::{
AddRetractAlterSet,
};
use edn::symbols;
use entids;
use errors::{
ErrorKind,
Result,
ResultExt,
};
use mentat_core::{
attribute,
Entid,
Schema,
SchemaMap,
TypedValue,
ValueType,
};
use schema::{
AttributeBuilder,
};
/// An alteration to an attribute.
#[derive(Clone, Debug, Eq, Hash, Ord, PartialOrd, PartialEq)]
pub enum AttributeAlteration {
/// From http://blog.datomic.com/2014/01/schema-alteration.html:
/// - rename attributes
/// - rename your own programmatic identities (uses of :db/ident)
/// - add or remove indexes
Index,
/// - add or remove uniqueness constraints
Unique,
/// - change attribute cardinality
Cardinality,
/// - change whether history is retained for an attribute
NoHistory,
/// - change whether an attribute is treated as a component
IsComponent,
}
/// An alteration to an ident.
#[derive(Clone, Debug, Eq, Hash, Ord, PartialOrd, PartialEq)]
pub enum IdentAlteration {
Ident(symbols::NamespacedKeyword),
}
/// Summarizes changes to metadata such as a a `Schema` and (in the future) a `PartitionMap`.
#[derive(Clone, Debug, Eq, Hash, Ord, PartialOrd, PartialEq)]
pub struct MetadataReport {
// Entids that were not present in the original `SchemaMap` that was mutated.
pub attributes_installed: BTreeSet<Entid>,
// Entids that were present in the original `SchemaMap` that was mutated, together with a
// representation of the mutations that were applied.
pub attributes_altered: BTreeMap<Entid, Vec<AttributeAlteration>>,
// Idents that were installed into the `SchemaMap`.
pub idents_altered: BTreeMap<Entid, IdentAlteration>,
}
/// Update a `SchemaMap` in place from the given `[e a typed_value]` triples.
///
/// This is suitable for producing a `SchemaMap` from the `schema` materialized view, which does not
/// contain install and alter markers.
///
/// Returns a report summarizing the mutations that were applied.
pub fn update_schema_map_from_entid_triples<U>(schema_map: &mut SchemaMap, assertions: U) -> Result<MetadataReport>
where U: IntoIterator<Item=(Entid, Entid, TypedValue)> {
// Group mutations by impacted entid.
let mut builders: BTreeMap<Entid, AttributeBuilder> = BTreeMap::new();
for (entid, attr, ref value) in assertions.into_iter() {
let builder = builders.entry(entid).or_insert(AttributeBuilder::default());
// TODO: improve error messages throughout.
match attr {
entids::DB_DOC => {
match *value {
TypedValue::String(_) => {},
_ => bail!(ErrorKind::BadSchemaAssertion(format!("Expected [... :db/doc \"string value\"] but got [... :db/doc {:?}] for entid {} and attribute {}", value, entid, attr)))
}
},
entids::DB_VALUE_TYPE => {
match *value {
TypedValue::Ref(entids::DB_TYPE_REF) => { builder.value_type(ValueType::Ref); },
TypedValue::Ref(entids::DB_TYPE_BOOLEAN) => { builder.value_type(ValueType::Boolean); },
TypedValue::Ref(entids::DB_TYPE_DOUBLE) => { builder.value_type(ValueType::Double); },
TypedValue::Ref(entids::DB_TYPE_LONG) => { builder.value_type(ValueType::Long); },
TypedValue::Ref(entids::DB_TYPE_STRING) => { builder.value_type(ValueType::String); },
TypedValue::Ref(entids::DB_TYPE_KEYWORD) => { builder.value_type(ValueType::Keyword); },
_ => bail!(ErrorKind::BadSchemaAssertion(format!("Expected [... :db/valueType :db.type/*] but got [... :db/valueType {:?}] for entid {} and attribute {}", value, entid, attr)))
}
},
entids::DB_CARDINALITY => {
match *value {
TypedValue::Ref(entids::DB_CARDINALITY_MANY) => { builder.multival(true); },
TypedValue::Ref(entids::DB_CARDINALITY_ONE) => { builder.multival(false); },
_ => bail!(ErrorKind::BadSchemaAssertion(format!("Expected [... :db/cardinality :db.cardinality/many|:db.cardinality/one] but got [... :db/cardinality {:?}]", value)))
}
},
entids::DB_UNIQUE => {
match *value {
// TODO: accept nil in some form.
// TypedValue::Nil => {
// builder.unique_value(false);
// builder.unique_identity(false);
// },
TypedValue::Ref(entids::DB_UNIQUE_VALUE) => { builder.unique(Some(attribute::Unique::Value)); },
TypedValue::Ref(entids::DB_UNIQUE_IDENTITY) => { builder.unique(Some(attribute::Unique::Identity)); },
_ => bail!(ErrorKind::BadSchemaAssertion(format!("Expected [... :db/unique :db.unique/value|:db.unique/identity] but got [... :db/unique {:?}]", value)))
}
},
entids::DB_INDEX => {
match *value {
TypedValue::Boolean(x) => { builder.index(x); },
_ => bail!(ErrorKind::BadSchemaAssertion(format!("Expected [... :db/index true|false] but got [... :db/index {:?}]", value)))
}
},
entids::DB_FULLTEXT => {
match *value {
TypedValue::Boolean(x) => { builder.fulltext(x); },
_ => bail!(ErrorKind::BadSchemaAssertion(format!("Expected [... :db/fulltext true|false] but got [... :db/fulltext {:?}]", value)))
}
},
entids::DB_IS_COMPONENT => {
match *value {
TypedValue::Boolean(x) => { builder.component(x); },
_ => bail!(ErrorKind::BadSchemaAssertion(format!("Expected [... :db/isComponent true|false] but got [... :db/isComponent {:?}]", value)))
}
},
_ => {
bail!(ErrorKind::BadSchemaAssertion(format!("Do not recognize attribute {} for entid {}", attr, entid)))
}
}
};
let mut attributes_installed: BTreeSet<Entid> = BTreeSet::default();
let mut attributes_altered: BTreeMap<Entid, Vec<AttributeAlteration>> = BTreeMap::default();
for (entid, builder) in builders.into_iter() {
match schema_map.entry(entid) {
Entry::Vacant(entry) => {
builder.validate_install_attribute()
.chain_err(|| ErrorKind::BadSchemaAssertion(format!("Schema alteration for new attribute with entid {} is not valid", entid)))?;
entry.insert(builder.build());
attributes_installed.insert(entid);
},
Entry::Occupied(mut entry) => {
builder.validate_alter_attribute()
.chain_err(|| ErrorKind::BadSchemaAssertion(format!("Schema alteration for existing attribute with entid {} is not valid", entid)))?;
let mutations = builder.mutate(entry.get_mut());
attributes_altered.insert(entid, mutations);
},
}
}
Ok(MetadataReport {
attributes_installed: attributes_installed,
attributes_altered: attributes_altered,
idents_altered: BTreeMap::default(),
})
}
/// Update a `Schema` in place from the given `[e a typed_value added]` quadruples.
///
/// This layer enforces that ident assertions of the form [entid :db/ident ...] (as distinct from
/// attribute assertions) are present and correct.
///
/// This is suitable for mutating a `Schema` from an applied transaction.
///
/// Returns a report summarizing the mutations that were applied.
pub fn update_schema_from_entid_quadruples<U>(schema: &mut Schema, assertions: U) -> Result<MetadataReport>
where U: IntoIterator<Item=(Entid, Entid, TypedValue, bool)> {
// Group attribute assertions into asserted, retracted, and updated. We assume all our
// attribute assertions are :db/cardinality :db.cardinality/one (so they'll only be added or
// retracted at most once), which means all attribute alterations are simple changes from an old
// value to a new value.
let mut attribute_set: AddRetractAlterSet<(Entid, Entid), TypedValue> = AddRetractAlterSet::default();
let mut ident_set: AddRetractAlterSet<Entid, symbols::NamespacedKeyword> = AddRetractAlterSet::default();
for (e, a, typed_value, added) in assertions.into_iter() {
// Here we handle :db/ident assertions.
if a == entids::DB_IDENT {
if let TypedValue::Keyword(ref keyword) = typed_value {
ident_set.witness(e, keyword.clone(), added);
continue
} else {
// Something is terribly wrong: the schema ensures we have a keyword.
unreachable!();
}
}
attribute_set.witness((e, a), typed_value, added);
}
// Datomic does not allow to retract attributes or idents. For now, Mentat follows suit.
if !attribute_set.retracted.is_empty() {
bail!(ErrorKind::NotYetImplemented(format!("Retracting metadata attribute assertions not yet implemented: retracted [e a] pairs [{}]",
attribute_set.retracted.keys().map(|&(e, a)| format!("[{} {}]", e, a)).join(", "))));
}
// Collect triples.
let asserted_triples = attribute_set.asserted.into_iter().map(|((e, a), typed_value)| (e, a, typed_value));
let altered_triples = attribute_set.altered.into_iter().map(|((e, a), (_old_value, new_value))| (e, a, new_value));
let report = update_schema_map_from_entid_triples(&mut schema.schema_map, asserted_triples.chain(altered_triples))?;
let mut idents_altered: BTreeMap<Entid, IdentAlteration> = BTreeMap::new();
// Asserted, altered, or retracted :db/idents update the relevant entids.
for (entid, ident) in ident_set.asserted {
schema.entid_map.insert(entid, ident.clone());
schema.ident_map.insert(ident.clone(), entid);
idents_altered.insert(entid, IdentAlteration::Ident(ident.clone()));
}
for (entid, (old_ident, new_ident)) in ident_set.altered {
schema.entid_map.insert(entid, new_ident.clone()); // Overwrite existing.
schema.ident_map.remove(&old_ident); // Remove old.
schema.ident_map.insert(new_ident.clone(), entid); // Insert new.
idents_altered.insert(entid, IdentAlteration::Ident(new_ident.clone()));
}
for (entid, ident) in ident_set.retracted {
schema.entid_map.remove(&entid);
schema.ident_map.remove(&ident);
idents_altered.insert(entid, IdentAlteration::Ident(ident.clone()));
}
Ok(MetadataReport {
idents_altered: idents_altered,
.. report
})
}

View file

@ -12,10 +12,10 @@
use db::TypedSQLValue;
use edn;
use entids;
use errors::{ErrorKind, Result};
use edn::symbols;
use mentat_core::{
attribute,
Attribute,
Entid,
EntidMap,
@ -25,24 +25,29 @@ use mentat_core::{
TypedValue,
ValueType,
};
use metadata;
use metadata::{
AttributeAlteration,
};
/// Return `Ok(())` if `schema_map` defines a valid Mentat schema.
fn validate_schema_map(entid_map: &EntidMap, schema_map: &SchemaMap) -> Result<()> {
for (entid, attribute) in schema_map {
let ident = entid_map.get(entid).ok_or(ErrorKind::BadSchemaAssertion(format!("Could not get ident for entid: {}", entid)))?;
if attribute.unique_value && !attribute.index {
bail!(ErrorKind::BadSchemaAssertion(format!(":db/unique :db/unique_value true without :db/index true for entid: {}", ident)))
let ident = || entid_map.get(entid).map(|ident| ident.to_string()).unwrap_or(entid.to_string());
if attribute.unique == Some(attribute::Unique::Value) && !attribute.index {
bail!(ErrorKind::BadSchemaAssertion(format!(":db/unique :db/unique_value without :db/index true for entid: {}", ident())))
}
if attribute.unique_identity && !attribute.unique_value {
bail!(ErrorKind::BadSchemaAssertion(format!(":db/unique :db/unique_identity without :db/unique :db/unique_value for entid: {}", ident)))
if attribute.unique == Some(attribute::Unique::Identity) && !attribute.index {
bail!(ErrorKind::BadSchemaAssertion(format!(":db/unique :db/unique_identity without :db/index true for entid: {}", ident())))
}
if attribute.fulltext && attribute.value_type != ValueType::String {
bail!(ErrorKind::BadSchemaAssertion(format!(":db/fulltext true without :db/valueType :db.type/string for entid: {}", ident)))
bail!(ErrorKind::BadSchemaAssertion(format!(":db/fulltext true without :db/valueType :db.type/string for entid: {}", ident())))
}
if attribute.fulltext && !attribute.index {
bail!(ErrorKind::BadSchemaAssertion(format!(":db/fulltext true without :db/index true for entid: {}", ident())))
}
if attribute.component && attribute.value_type != ValueType::Ref {
bail!(ErrorKind::BadSchemaAssertion(format!(":db/isComponent true without :db/valueType :db.type/ref for entid: {}", ident)))
bail!(ErrorKind::BadSchemaAssertion(format!(":db/isComponent true without :db/valueType :db.type/ref for entid: {}", ident())))
}
// TODO: consider warning if we have :db/index true for :db/valueType :db.type/string,
// since this may be inefficient. More generally, we should try to drive complex
@ -52,6 +57,119 @@ fn validate_schema_map(entid_map: &EntidMap, schema_map: &SchemaMap) -> Result<(
Ok(())
}
#[derive(Clone,Debug,Default,Eq,Hash,Ord,PartialOrd,PartialEq)]
pub struct AttributeBuilder {
value_type: Option<ValueType>,
multival: Option<bool>,
unique: Option<Option<attribute::Unique>>,
index: Option<bool>,
fulltext: Option<bool>,
component: Option<bool>,
}
impl AttributeBuilder {
pub fn value_type<'a>(&'a mut self, value_type: ValueType) -> &'a mut Self {
self.value_type = Some(value_type);
self
}
pub fn multival<'a>(&'a mut self, multival: bool) -> &'a mut Self {
self.multival = Some(multival);
self
}
pub fn unique<'a>(&'a mut self, unique: Option<attribute::Unique>) -> &'a mut Self {
self.unique = Some(unique);
self
}
pub fn index<'a>(&'a mut self, index: bool) -> &'a mut Self {
self.index = Some(index);
self
}
pub fn fulltext<'a>(&'a mut self, fulltext: bool) -> &'a mut Self {
self.fulltext = Some(fulltext);
self
}
pub fn component<'a>(&'a mut self, component: bool) -> &'a mut Self {
self.component = Some(component);
self
}
pub fn validate_install_attribute(&self) -> Result<()> {
if self.value_type.is_none() {
bail!(ErrorKind::BadSchemaAssertion("Schema attribute for new attribute does not set :db/valueType".into()));
}
Ok(())
}
pub fn validate_alter_attribute(&self) -> Result<()> {
if self.value_type.is_some() {
bail!(ErrorKind::BadSchemaAssertion("Schema alteration must not set :db/valueType".into()));
}
if self.fulltext.is_some() {
bail!(ErrorKind::BadSchemaAssertion("Schema alteration must not set :db/fulltext".into()));
}
Ok(())
}
pub fn build(&self) -> Attribute {
let mut attribute = Attribute::default();
if let Some(value_type) = self.value_type {
attribute.value_type = value_type;
}
if let Some(fulltext) = self.fulltext {
attribute.fulltext = fulltext;
}
if let Some(multival) = self.multival {
attribute.multival = multival;
}
if let Some(ref unique) = self.unique {
attribute.unique = unique.clone();
}
if let Some(index) = self.index {
attribute.index = index;
}
if let Some(component) = self.component {
attribute.component = component;
}
attribute
}
pub fn mutate(&self, attribute: &mut Attribute) -> Vec<AttributeAlteration> {
let mut mutations = Vec::new();
if let Some(multival) = self.multival {
if multival != attribute.multival {
attribute.multival = multival;
mutations.push(AttributeAlteration::Cardinality);
}
}
if let Some(ref unique) = self.unique {
if *unique != attribute.unique {
attribute.unique = unique.clone();
mutations.push(AttributeAlteration::Unique);
}
}
if let Some(index) = self.index {
if index != attribute.index {
attribute.index = index;
mutations.push(AttributeAlteration::Index);
}
}
if let Some(component) = self.component {
if component != attribute.component {
attribute.component = component;
mutations.push(AttributeAlteration::IsComponent);
}
}
mutations
}
}
pub trait SchemaBuilding {
fn require_ident(&self, entid: Entid) -> Result<&symbols::NamespacedKeyword>;
fn require_entid(&self, ident: &symbols::NamespacedKeyword) -> Result<Entid>;
@ -90,93 +208,15 @@ impl SchemaBuilding for Schema {
/// Turn vec![(NamespacedKeyword(:ident), NamespacedKeyword(:key), TypedValue(:value)), ...] into a Mentat `Schema`.
fn from_ident_map_and_triples<U>(ident_map: IdentMap, assertions: U) -> Result<Schema>
where U: IntoIterator<Item=(symbols::NamespacedKeyword, symbols::NamespacedKeyword, TypedValue)>{
let mut schema_map = SchemaMap::new();
for (ref symbolic_ident, ref symbolic_attr, ref value) in assertions.into_iter() {
let ident: i64 = *ident_map.get(symbolic_ident).ok_or(ErrorKind::UnrecognizedIdent(symbolic_ident.to_string()))?;
let attr: i64 = *ident_map.get(symbolic_attr).ok_or(ErrorKind::UnrecognizedIdent(symbolic_attr.to_string()))?;
let attributes = schema_map.entry(ident).or_insert(Attribute::default());
// TODO: improve error messages throughout.
match attr {
entids::DB_VALUE_TYPE => {
match *value {
TypedValue::Ref(entids::DB_TYPE_REF) => { attributes.value_type = ValueType::Ref; },
TypedValue::Ref(entids::DB_TYPE_BOOLEAN) => { attributes.value_type = ValueType::Boolean; },
TypedValue::Ref(entids::DB_TYPE_LONG) => { attributes.value_type = ValueType::Long; },
TypedValue::Ref(entids::DB_TYPE_STRING) => { attributes.value_type = ValueType::String; },
TypedValue::Ref(entids::DB_TYPE_KEYWORD) => { attributes.value_type = ValueType::Keyword; },
_ => bail!(ErrorKind::BadSchemaAssertion(format!("Expected [... :db/valueType :db.type/*] but got [... :db/valueType {:?}] for ident '{}' and attribute '{}'", value, ident, attr)))
}
},
entids::DB_CARDINALITY => {
match *value {
TypedValue::Ref(entids::DB_CARDINALITY_MANY) => { attributes.multival = true; },
TypedValue::Ref(entids::DB_CARDINALITY_ONE) => { attributes.multival = false; },
_ => bail!(ErrorKind::BadSchemaAssertion(format!("Expected [... :db/cardinality :db.cardinality/many|:db.cardinality/one] but got [... :db/cardinality {:?}]", value)))
}
},
entids::DB_UNIQUE => {
match *value {
TypedValue::Ref(entids::DB_UNIQUE_VALUE) => {
attributes.unique_value = true;
attributes.index = true;
},
TypedValue::Ref(entids::DB_UNIQUE_IDENTITY) => {
attributes.unique_value = true;
attributes.unique_identity = true;
attributes.index = true;
},
_ => bail!(ErrorKind::BadSchemaAssertion(format!("Expected [... :db/unique :db.unique/value|:db.unique/identity] but got [... :db/unique {:?}]", value)))
}
},
entids::DB_INDEX => {
match *value {
TypedValue::Boolean(x) => { attributes.index = x },
_ => bail!(ErrorKind::BadSchemaAssertion(format!("Expected [... :db/index true|false] but got [... :db/index {:?}]", value)))
}
},
entids::DB_FULLTEXT => {
match *value {
TypedValue::Boolean(x) => {
attributes.fulltext = x;
if attributes.fulltext {
attributes.index = true;
}
},
_ => bail!(ErrorKind::BadSchemaAssertion(format!("Expected [... :db/fulltext true|false] but got [... :db/fulltext {:?}]", value)))
}
},
entids::DB_IS_COMPONENT => {
match *value {
TypedValue::Boolean(x) => { attributes.component = x },
_ => bail!(ErrorKind::BadSchemaAssertion(format!("Expected [... :db/isComponent true|false] but got [... :db/isComponent {:?}]", value)))
}
},
entids::DB_DOC => {
// Nothing for now.
},
entids::DB_IDENT => {
// Nothing for now.
},
entids::DB_INSTALL_ATTRIBUTE => {
// Nothing for now.
},
_ => {
bail!(ErrorKind::BadSchemaAssertion(format!("Do not recognize attribute '{}' for ident '{}'", attr, ident)))
}
}
};
Schema::from_ident_map_and_schema_map(ident_map.clone(), schema_map)
let entid_assertions: Result<Vec<(Entid, Entid, TypedValue)>> = assertions.into_iter().map(|(symbolic_ident, symbolic_attr, value)| {
let ident: i64 = *ident_map.get(&symbolic_ident).ok_or(ErrorKind::UnrecognizedIdent(symbolic_ident.to_string()))?;
let attr: i64 = *ident_map.get(&symbolic_attr).ok_or(ErrorKind::UnrecognizedIdent(symbolic_attr.to_string()))?;
Ok((ident, attr, value))
}).collect();
let mut schema = Schema::from_ident_map_and_schema_map(ident_map, SchemaMap::default())?;
metadata::update_schema_map_from_entid_triples(&mut schema.schema_map, entid_assertions?)?;
Ok(schema)
}
}

View file

@ -45,14 +45,12 @@
//! names -- `TermWithTempIdsAndLookupRefs`, anyone? -- and strongly typed stage functions will help
//! keep everything straight.
use std;
use std::borrow::Cow;
use std::collections::{
BTreeMap,
BTreeSet,
};
use ::{to_namespaced_keyword};
use db;
use db::{
MentatStoring,
@ -61,6 +59,7 @@ use db::{
use entids;
use errors::{ErrorKind, Result};
use internal_types::{
Either,
LookupRefOrTempId,
TempId,
TempIdMap,
@ -75,6 +74,7 @@ use mentat_core::{
};
use mentat_tx::entities as entmod;
use mentat_tx::entities::{Entity, OpType};
use metadata;
use rusqlite;
use schema::{
SchemaBuilding,
@ -104,10 +104,16 @@ pub struct Tx<'conn, 'a> {
/// allocates at least one tx ID, so we own and modify our own partition map.
partition_map: PartitionMap,
/// The schema to update from the transaction entities.
///
/// Transactions only update the schema infrequently, so we borrow this schema until we need to
/// modify it.
schema_for_mutation: Cow<'a, Schema>,
/// The schema to use when interpreting the transaction entities.
///
/// The schema is infrequently updated, so we borrow a schema until we need to modify it.
schema: Cow<'a, Schema>,
/// This schema is not updated, so we just borrow it.
schema: &'a Schema,
/// The transaction ID of the transaction.
tx_id: Entid,
@ -120,11 +126,18 @@ pub struct Tx<'conn, 'a> {
}
impl<'conn, 'a> Tx<'conn, 'a> {
pub fn new(store: &'conn rusqlite::Connection, partition_map: PartitionMap, schema: &'a Schema, tx_id: Entid, tx_instant: i64) -> Tx<'conn, 'a> {
pub fn new(
store: &'conn rusqlite::Connection,
partition_map: PartitionMap,
schema_for_mutation: &'a Schema,
schema: &'a Schema,
tx_id: Entid,
tx_instant: i64) -> Tx<'conn, 'a> {
Tx {
store: store,
partition_map: partition_map,
schema: Cow::Borrowed(schema),
schema_for_mutation: Cow::Borrowed(schema_for_mutation),
schema: schema,
tx_id: tx_id,
tx_instant: tx_instant,
}
@ -191,11 +204,11 @@ impl<'conn, 'a> Tx<'conn, 'a> {
entmod::Entid::Entid(ref e) => *e,
entmod::Entid::Ident(ref e) => self.schema.require_entid(&e)?,
};
std::result::Result::Ok(e)
Either::Left(e)
},
entmod::EntidOrLookupRefOrTempId::TempId(e) => {
std::result::Result::Err(LookupRefOrTempId::TempId(temp_ids.intern(e)))
Either::Right(LookupRefOrTempId::TempId(temp_ids.intern(e)))
},
entmod::EntidOrLookupRefOrTempId::LookupRef(_) => {
@ -206,7 +219,7 @@ impl<'conn, 'a> Tx<'conn, 'a> {
let v = {
if attribute.value_type == ValueType::Ref && v.is_text() {
std::result::Result::Err(LookupRefOrTempId::TempId(temp_ids.intern(v.as_text().unwrap().clone())))
Either::Right(LookupRefOrTempId::TempId(temp_ids.intern(v.as_text().unwrap().clone())))
} else if attribute.value_type == ValueType::Ref && v.is_vector() && v.as_vector().unwrap().len() == 2 {
bail!(ErrorKind::NotYetImplemented(format!("Transacting lookup-refs is not yet implemented")))
} else {
@ -215,7 +228,7 @@ impl<'conn, 'a> Tx<'conn, 'a> {
// cases) coerce the value into the attribute's value set.
let typed_value: TypedValue = self.schema.to_typed_value(&v, &attribute)?;
std::result::Result::Ok(typed_value)
Either::Left(typed_value)
}
};
@ -310,6 +323,14 @@ impl<'conn, 'a> Tx<'conn, 'a> {
assert!(tempids.contains_key(&**tempid));
}
// A transaction might try to add or retract :db/ident assertions or other metadata mutating
// assertions , but those assertions might not make it to the store. If we see a possible
// metadata mutation, we will figure out if any assertions made it through later. This is
// strictly an optimization: it would be correct to _always_ check what made it to the
// store.
let mut tx_might_update_metadata = false;
{
/// Assertions that are :db.cardinality/one and not :db.fulltext.
let mut non_fts_one: Vec<db::ReducedEntity> = vec![];
@ -331,6 +352,10 @@ impl<'conn, 'a> Tx<'conn, 'a> {
bail!(ErrorKind::NotYetImplemented(format!("Transacting :db/fulltext entities is not yet implemented"))) // TODO: reference original input. Difficult!
}
if entids::might_update_metadata(a) {
tx_might_update_metadata = true;
}
let added = op == OpType::Add;
if attribute.multival {
non_fts_many.push((e, a, attribute, v, added));
@ -345,8 +370,7 @@ impl<'conn, 'a> Tx<'conn, 'a> {
// TODO: allow this to be present in the transaction data.
non_fts_one.push((self.tx_id,
entids::DB_TX_INSTANT,
// TODO: extract this to a constant.
self.schema.require_attribute_for_entid(self.schema.require_entid(&to_namespaced_keyword(":db/txInstant").unwrap())?)?,
self.schema.require_attribute_for_entid(entids::DB_TX_INSTANT).unwrap(),
TypedValue::Long(self.tx_instant),
true));
@ -359,10 +383,29 @@ impl<'conn, 'a> Tx<'conn, 'a> {
}
self.store.commit_transaction(self.tx_id)?;
}
// TODO: update idents and schema materialized views.
db::update_partition_map(self.store, &self.partition_map)?;
if tx_might_update_metadata {
// Extract changes to metadata from the store.
let metadata_assertions = self.store.committed_metadata_assertions(self.tx_id)?;
let mut new_schema = (*self.schema_for_mutation).clone(); // Clone the underlying Schema for modification.
let metadata_report = metadata::update_schema_from_entid_quadruples(&mut new_schema, metadata_assertions)?;
// We might not have made any changes to the schema, even though it looked like we
// would. This should not happen, even during bootstrapping: we mutate an empty
// `Schema` in this case specifically to run the bootstrapped assertions through the
// regular transactor code paths, updating the schema and materialized views uniformly.
// But, belt-and-braces: handle it gracefully.
if new_schema != *self.schema_for_mutation {
let old_schema = (*self.schema_for_mutation).clone(); // Clone the original Schema for comparison.
*self.schema_for_mutation.to_mut() = new_schema; // Store the new Schema.
db::update_metadata(self.store, &old_schema, &*self.schema_for_mutation, &metadata_report)?;
}
}
Ok(TxReport {
tx_id: self.tx_id,
tx_instant: self.tx_instant,
@ -375,7 +418,12 @@ impl<'conn, 'a> Tx<'conn, 'a> {
///
/// This approach is explained in https://github.com/mozilla/mentat/wiki/Transacting.
// TODO: move this to the transactor layer.
pub fn transact<'conn, 'a, I>(conn: &'conn rusqlite::Connection, mut partition_map: PartitionMap, schema: &'a Schema, entities: I) -> Result<(TxReport, PartitionMap, Option<Schema>)> where I: IntoIterator<Item=Entity> {
pub fn transact<'conn, 'a, I>(
conn: &'conn rusqlite::Connection,
mut partition_map: PartitionMap,
schema_for_mutation: &'a Schema,
schema: &'a Schema,
entities: I) -> Result<(TxReport, PartitionMap, Option<Schema>)> where I: IntoIterator<Item=Entity> {
// Eventually, this function will be responsible for managing a SQLite transaction. For
// now, it's just about the tx details.
@ -384,12 +432,12 @@ pub fn transact<'conn, 'a, I>(conn: &'conn rusqlite::Connection, mut partition_m
conn.begin_transaction()?;
let mut tx = Tx::new(conn, partition_map, schema, tx_id, tx_instant);
let mut tx = Tx::new(conn, partition_map, schema_for_mutation, schema, tx_id, tx_instant);
let report = tx.transact_entities(entities)?;
// If the schema has moved on, return it.
let next_schema = match tx.schema {
let next_schema = match tx.schema_for_mutation {
Cow::Borrowed(_) => None,
Cow::Owned(next_schema) => Some(next_schema),
};

View file

@ -15,15 +15,10 @@
use std::collections::BTreeSet;
use mentat_tx::entities::OpType;
use errors;
use errors::ErrorKind;
use types::{
Attribute,
AVPair,
Entid,
Schema,
TypedValue,
};
use internal_types::{
Population,
@ -33,6 +28,15 @@ use internal_types::{
TermWithoutTempIds,
TermWithTempIds,
};
use internal_types::Either::*;
use mentat_core::{
attribute,
Attribute,
Entid,
Schema,
TypedValue,
};
use mentat_tx::entities::OpType;
use schema::SchemaBuilding;
/// A "Simple upsert" that looks like [:db/add TEMPID a v], where a is :db.unique/identity.
@ -93,30 +97,30 @@ impl Generation {
let is_unique = |a: Entid| -> errors::Result<bool> {
let attribute: &Attribute = schema.require_attribute_for_entid(a)?;
Ok(attribute.unique_identity)
Ok(attribute.unique == Some(attribute::Unique::Identity))
};
for term in terms.into_iter() {
match term {
Term::AddOrRetract(op, Err(e), a, Err(v)) => {
Term::AddOrRetract(op, Right(e), a, Right(v)) => {
if op == OpType::Add && is_unique(a)? {
generation.upserts_ev.push(UpsertEV(e, a, v));
} else {
generation.allocations.push(Term::AddOrRetract(op, Err(e), a, Err(v)));
generation.allocations.push(Term::AddOrRetract(op, Right(e), a, Right(v)));
}
},
Term::AddOrRetract(op, Err(e), a, Ok(v)) => {
Term::AddOrRetract(op, Right(e), a, Left(v)) => {
if op == OpType::Add && is_unique(a)? {
generation.upserts_e.push(UpsertE(e, a, v));
} else {
generation.allocations.push(Term::AddOrRetract(op, Err(e), a, Ok(v)));
generation.allocations.push(Term::AddOrRetract(op, Right(e), a, Left(v)));
}
},
Term::AddOrRetract(op, Ok(e), a, Err(v)) => {
generation.allocations.push(Term::AddOrRetract(op, Ok(e), a, Err(v)));
Term::AddOrRetract(op, Left(e), a, Right(v)) => {
generation.allocations.push(Term::AddOrRetract(op, Left(e), a, Right(v)));
},
Term::AddOrRetract(op, Ok(e), a, Ok(v)) => {
inert.push(Term::AddOrRetract(op, Ok(e), a, Ok(v)));
Term::AddOrRetract(op, Left(e), a, Left(v)) => {
inert.push(Term::AddOrRetract(op, Left(e), a, Left(v)));
},
}
}
@ -143,7 +147,7 @@ impl Generation {
for UpsertE(t, a, v) in self.upserts_e {
match temp_id_map.get(&*t) {
Some(&n) => next.upserted.push(Term::AddOrRetract(OpType::Add, n, a, v)),
None => next.allocations.push(Term::AddOrRetract(OpType::Add, Err(t), a, Ok(v))),
None => next.allocations.push(Term::AddOrRetract(OpType::Add, Right(t), a, Left(v))),
}
}
@ -151,8 +155,8 @@ impl Generation {
match (temp_id_map.get(&*t1), temp_id_map.get(&*t2)) {
(Some(&n1), Some(&n2)) => next.resolved.push(Term::AddOrRetract(OpType::Add, n1, a, TypedValue::Ref(n2))),
(None, Some(&n2)) => next.upserts_e.push(UpsertE(t1, a, TypedValue::Ref(n2))),
(Some(&n1), None) => next.allocations.push(Term::AddOrRetract(OpType::Add, Ok(n1), a, Err(t2))),
(None, None) => next.allocations.push(Term::AddOrRetract(OpType::Add, Err(t1), a, Err(t2))),
(Some(&n1), None) => next.allocations.push(Term::AddOrRetract(OpType::Add, Left(n1), a, Right(t2))),
(None, None) => next.allocations.push(Term::AddOrRetract(OpType::Add, Right(t1), a, Right(t2))),
}
}
@ -162,27 +166,27 @@ impl Generation {
// TODO: find an expression that destructures less? I still expect this to be efficient
// but it's a little verbose.
match term {
Term::AddOrRetract(op, Err(t1), a, Err(t2)) => {
Term::AddOrRetract(op, Right(t1), a, Right(t2)) => {
match (temp_id_map.get(&*t1), temp_id_map.get(&*t2)) {
(Some(&n1), Some(&n2)) => next.resolved.push(Term::AddOrRetract(op, n1, a, TypedValue::Ref(n2))),
(None, Some(&n2)) => next.allocations.push(Term::AddOrRetract(op, Err(t1), a, Ok(TypedValue::Ref(n2)))),
(Some(&n1), None) => next.allocations.push(Term::AddOrRetract(op, Ok(n1), a, Err(t2))),
(None, None) => next.allocations.push(Term::AddOrRetract(op, Err(t1), a, Err(t2))),
(None, Some(&n2)) => next.allocations.push(Term::AddOrRetract(op, Right(t1), a, Left(TypedValue::Ref(n2)))),
(Some(&n1), None) => next.allocations.push(Term::AddOrRetract(op, Left(n1), a, Right(t2))),
(None, None) => next.allocations.push(Term::AddOrRetract(op, Right(t1), a, Right(t2))),
}
},
Term::AddOrRetract(op, Err(t), a, Ok(v)) => {
Term::AddOrRetract(op, Right(t), a, Left(v)) => {
match temp_id_map.get(&*t) {
Some(&n) => next.resolved.push(Term::AddOrRetract(op, n, a, v)),
None => next.allocations.push(Term::AddOrRetract(op, Err(t), a, Ok(v))),
None => next.allocations.push(Term::AddOrRetract(op, Right(t), a, Left(v))),
}
},
Term::AddOrRetract(op, Ok(e), a, Err(t)) => {
Term::AddOrRetract(op, Left(e), a, Right(t)) => {
match temp_id_map.get(&*t) {
Some(&n) => next.resolved.push(Term::AddOrRetract(op, e, a, TypedValue::Ref(n))),
None => next.allocations.push(Term::AddOrRetract(op, Ok(e), a, Err(t))),
None => next.allocations.push(Term::AddOrRetract(op, Left(e), a, Right(t))),
}
},
Term::AddOrRetract(_, Ok(_), _, Ok(_)) => unreachable!(),
Term::AddOrRetract(_, Left(_), _, Left(_)) => unreachable!(),
}
}
@ -212,17 +216,17 @@ impl Generation {
for term in self.allocations.iter() {
match term {
&Term::AddOrRetract(OpType::Add, Err(ref t1), _, Err(ref t2)) => {
&Term::AddOrRetract(OpType::Add, Right(ref t1), _, Right(ref t2)) => {
temp_ids.insert(t1.clone());
temp_ids.insert(t2.clone());
},
&Term::AddOrRetract(OpType::Add, Err(ref t), _, Ok(_)) => {
&Term::AddOrRetract(OpType::Add, Right(ref t), _, Left(_)) => {
temp_ids.insert(t.clone());
},
&Term::AddOrRetract(OpType::Add, Ok(_), _, Err(ref t)) => {
&Term::AddOrRetract(OpType::Add, Left(_), _, Right(ref t)) => {
temp_ids.insert(t.clone());
},
&Term::AddOrRetract(OpType::Add, Ok(_), _, Ok(_)) => unreachable!(),
&Term::AddOrRetract(OpType::Add, Left(_), _, Left(_)) => unreachable!(),
&Term::AddOrRetract(OpType::Retract, _, _, _) => {
// [:db/retract ...] entities never allocate entids; they have to resolve due to
// other upserts (or they fail the transaction).
@ -247,28 +251,28 @@ impl Generation {
for term in self.allocations {
let allocated = match term {
// TODO: consider require implementing require on temp_id_map.
Term::AddOrRetract(op, Err(t1), a, Err(t2)) => {
Term::AddOrRetract(op, Right(t1), a, Right(t2)) => {
match (op, temp_id_map.get(&*t1), temp_id_map.get(&*t2)) {
(op, Some(&n1), Some(&n2)) => Term::AddOrRetract(op, n1, a, TypedValue::Ref(n2)),
(OpType::Add, _, _) => unreachable!(), // This is a coding error -- every tempid in a :db/add entity should resolve or be allocated.
(OpType::Retract, _, _) => bail!(ErrorKind::NotYetImplemented(format!("[:db/retract ...] entity referenced tempid that did not upsert: one of {}, {}", t1, t2))),
}
},
Term::AddOrRetract(op, Err(t), a, Ok(v)) => {
Term::AddOrRetract(op, Right(t), a, Left(v)) => {
match (op, temp_id_map.get(&*t)) {
(op, Some(&n)) => Term::AddOrRetract(op, n, a, v),
(OpType::Add, _) => unreachable!(), // This is a coding error.
(OpType::Retract, _) => bail!(ErrorKind::NotYetImplemented(format!("[:db/retract ...] entity referenced tempid that did not upsert: {}", t))),
}
},
Term::AddOrRetract(op, Ok(e), a, Err(t)) => {
Term::AddOrRetract(op, Left(e), a, Right(t)) => {
match (op, temp_id_map.get(&*t)) {
(op, Some(&n)) => Term::AddOrRetract(op, e, a, TypedValue::Ref(n)),
(OpType::Add, _) => unreachable!(), // This is a coding error.
(OpType::Retract, _) => bail!(ErrorKind::NotYetImplemented(format!("[:db/retract ...] entity referenced tempid that did not upsert: {}", t))),
}
},
Term::AddOrRetract(_, Ok(_), _, Ok(_)) => unreachable!(), // This is a coding error -- these should not be in allocations.
Term::AddOrRetract(_, Left(_), _, Left(_)) => unreachable!(), // This is a coding error -- these should not be in allocations.
};
populations.allocated.push(allocated);
}

View file

@ -1178,7 +1178,6 @@ mod testing {
});
add_attribute(&mut schema, 98, Attribute {
value_type: ValueType::String,
unique_identity: true,
..Default::default()
});

View file

@ -147,7 +147,7 @@ impl Conn {
};
// The transaction is processed while the mutex is not held.
let (report, next_partition_map, next_schema) = transact(&tx, current_partition_map, &*current_schema, entities)?;
let (report, next_partition_map, next_schema) = transact(&tx, current_partition_map, &*current_schema, &*current_schema, entities)?;
{
// The mutex is taken during this block.