Implement vocabulary-driven schema upgrades.

This commit is contained in:
Richard Newman 2018-04-03 15:04:04 -07:00
parent 29ccbee911
commit 65e7252b56
2 changed files with 377 additions and 66 deletions

View file

@ -85,7 +85,9 @@ use std::collections::BTreeMap;
pub use mentat_core::attribute;
use mentat_core::attribute::Unique;
use mentat_core::KnownEntid;
use mentat_core::{
KnownEntid,
};
use ::{
CORE_SCHEMA_VERSION,
@ -126,7 +128,7 @@ pub type Datom = (Entid, Entid, TypedValue);
/// its version number, we need to know the attributes that the application cares about -- it's
/// not enough to know the name and version. Indeed, we even care about the details of each attribute,
/// because that's how we'll detect errors.
#[derive(Debug)]
#[derive(Clone, Debug)]
pub struct Definition {
pub name: NamespacedKeyword,
pub version: Version,
@ -243,7 +245,7 @@ impl<T> HasCoreSchema for T where T: HasSchema {
}
impl Definition {
fn description_for_attributes<'s, T, R>(&'s self, attributes: &[R], via: &T) -> Result<Terms>
fn description_for_attributes<'s, T, R>(&'s self, attributes: &[R], via: &T, diff: Option<BTreeMap<NamespacedKeyword, Attribute>>) -> Result<Terms>
where T: HasCoreSchema,
R: ::std::borrow::Borrow<(NamespacedKeyword, Attribute)> {
@ -279,13 +281,10 @@ impl Definition {
// Describe each of its attributes.
// This is a lot like Schema::to_edn_value; at some point we should tidy this up.
for ref r in attributes.iter() {
let &(ref name, ref attr) = r.borrow();
let &(ref kw, ref attr) = r.borrow();
// Note that we allow tempid resolution to find an existing entity, if it
// exists. We don't yet support upgrades, which will involve producing
// alteration statements.
let tempid = builder.named_tempid(name.to_string());
let name: TypedValue = name.clone().into();
let tempid = builder.named_tempid(kw.to_string());
let name: TypedValue = kw.clone().into();
builder.add(tempid.clone(), a_ident, name)?;
builder.add(schema.clone(), a_attr, tempid.clone())?;
@ -299,18 +298,12 @@ impl Definition {
};
builder.add(tempid.clone(), a_cardinality, c)?;
if attr.index {
builder.add(tempid.clone(), a_index, TypedValue::Boolean(true))?;
}
if attr.fulltext {
builder.add(tempid.clone(), a_fulltext, TypedValue::Boolean(true))?;
}
if attr.component {
builder.add(tempid.clone(), a_is_component, TypedValue::Boolean(true))?;
}
if attr.no_history {
builder.add(tempid.clone(), a_no_history, TypedValue::Boolean(true))?;
}
// These are all unconditional because we use attribute descriptions to _alter_, not
// just to _add_, and so absence is distinct from negation!
builder.add(tempid.clone(), a_index, TypedValue::Boolean(attr.index))?;
builder.add(tempid.clone(), a_fulltext, TypedValue::Boolean(attr.fulltext))?;
builder.add(tempid.clone(), a_is_component, TypedValue::Boolean(attr.component))?;
builder.add(tempid.clone(), a_no_history, TypedValue::Boolean(attr.no_history))?;
if let Some(u) = attr.unique {
let uu = match u {
@ -318,15 +311,49 @@ impl Definition {
Unique::Value => v_unique_value,
};
builder.add(tempid.clone(), a_unique, uu)?;
} else {
let existing_unique =
if let Some(ref diff) = diff {
diff.get(kw).and_then(|a| a.unique)
} else {
None
};
match existing_unique {
None => {
// Nothing to do.
},
Some(Unique::Identity) => {
builder.retract(tempid.clone(), a_unique, v_unique_identity.clone())?;
},
Some(Unique::Value) => {
builder.retract(tempid.clone(), a_unique, v_unique_value.clone())?;
},
}
}
}
builder.build()
}
/// Return a sequence of terms that describes this vocabulary definition and its attributes.
fn description_diff<T>(&self, via: &T, from: &Vocabulary) -> Result<Terms> where T: HasSchema {
let relevant = self.attributes.iter()
.filter_map(|(ref keyword, _)|
// Look up the keyword to see if it's currently in use.
via.get_entid(keyword)
// If so, map it to the existing attribute.
.and_then(|e| from.find(e).cloned())
// Collect enough that we can do lookups.
.map(|e| (keyword.clone(), e)))
.collect();
self.description_for_attributes(self.attributes.as_slice(), via, Some(relevant))
}
/// Return a sequence of terms that describes this vocabulary definition and its attributes.
fn description<T>(&self, via: &T) -> Result<Terms> where T: HasSchema {
self.description_for_attributes(self.attributes.as_slice(), via)
self.description_for_attributes(self.attributes.as_slice(), via, None)
}
}
@ -361,46 +388,8 @@ pub trait HasVocabularies {
fn read_vocabulary_named(&self, name: &NamespacedKeyword) -> Result<Option<Vocabulary>>;
}
pub trait VersionedStore {
pub trait VersionedStore: HasVocabularies + HasSchema {
/// Check whether the vocabulary described by the provided metadata is present in the store.
fn check_vocabulary<'definition>(&self, definition: &'definition Definition) -> Result<VocabularyCheck<'definition>>;
/// Check whether the provided vocabulary is present in the store. If it isn't, make it so.
fn ensure_vocabulary(&mut self, definition: &Definition) -> Result<VocabularyOutcome>;
/// Make sure that our expectations of the core vocabulary -- basic types and attributes -- are met.
fn verify_core_schema(&self) -> Result<()>;
}
trait VocabularyMechanics {
fn install_vocabulary(&mut self, definition: &Definition) -> Result<VocabularyOutcome>;
fn install_attributes_for<'definition>(&mut self, definition: &'definition Definition, attributes: Vec<&'definition (NamespacedKeyword, Attribute)>) -> Result<VocabularyOutcome>;
fn upgrade_vocabulary(&mut self, definition: &Definition, from_version: Vocabulary) -> Result<VocabularyOutcome>;
}
impl Vocabulary {
// TODO: don't do linear search!
fn find<T>(&self, entid: T) -> Option<&Attribute> where T: Into<Entid> {
let to_find = entid.into();
self.attributes.iter().find(|&&(e, _)| e == to_find).map(|&(_, ref a)| a)
}
}
impl<'a, 'c> VersionedStore for InProgress<'a, 'c> {
fn verify_core_schema(&self) -> Result<()> {
if let Some(core) = self.read_vocabulary_named(&DB_SCHEMA_CORE)? {
if core.version != CORE_SCHEMA_VERSION {
bail!(ErrorKind::UnexpectedCoreSchema(Some(core.version)));
}
// TODO: check things other than the version.
} else {
// This would be seriously messed up.
bail!(ErrorKind::UnexpectedCoreSchema(None));
}
Ok(())
}
fn check_vocabulary<'definition>(&self, definition: &'definition Definition) -> Result<VocabularyCheck<'definition>> {
if let Some(vocabulary) = self.read_vocabulary_named(&definition.name)? {
// The name is present.
@ -449,6 +438,49 @@ impl<'a, 'c> VersionedStore for InProgress<'a, 'c> {
}
}
/// Check whether the provided vocabulary is present in the store. If it isn't, make it so.
fn ensure_vocabulary(&mut self, definition: &Definition) -> Result<VocabularyOutcome>;
/// Check whether the provided vocabularies are present in the store at the correct
/// version and with all defined attributes. If any are not, invoke the `pre`
/// function on the provided `VocabularyProvider`, install or upgrade the necessary vocabularies,
/// then invoke `post`. Returns `Ok` if all of these steps succeed.
///
/// Use this function instead of calling `ensure_vocabulary` if you need to have pre/post
/// functions invoked when vocabulary changes are necessary.
fn ensure_vocabularies(&mut self, vocabularies: &VocabularyProvider) -> Result<BTreeMap<NamespacedKeyword, VocabularyOutcome>>;
/// Make sure that our expectations of the core vocabulary -- basic types and attributes -- are met.
fn verify_core_schema(&self) -> Result<()> {
if let Some(core) = self.read_vocabulary_named(&DB_SCHEMA_CORE)? {
if core.version != CORE_SCHEMA_VERSION {
bail!(ErrorKind::UnexpectedCoreSchema(Some(core.version)));
}
// TODO: check things other than the version.
} else {
// This would be seriously messed up.
bail!(ErrorKind::UnexpectedCoreSchema(None));
}
Ok(())
}
}
trait VocabularyMechanics {
fn install_vocabulary(&mut self, definition: &Definition) -> Result<VocabularyOutcome>;
fn install_attributes_for<'definition>(&mut self, definition: &'definition Definition, attributes: Vec<&'definition (NamespacedKeyword, Attribute)>) -> Result<VocabularyOutcome>;
fn upgrade_vocabulary(&mut self, definition: &Definition, from_version: Vocabulary) -> Result<VocabularyOutcome>;
}
impl Vocabulary {
// TODO: don't do linear search!
fn find<T>(&self, entid: T) -> Option<&Attribute> where T: Into<Entid> {
let to_find = entid.into();
self.attributes.iter().find(|&&(e, _)| e == to_find).map(|&(_, ref a)| a)
}
}
impl<'a, 'c> VersionedStore for InProgress<'a, 'c> {
fn ensure_vocabulary(&mut self, definition: &Definition) -> Result<VocabularyOutcome> {
match self.check_vocabulary(definition)? {
VocabularyCheck::Present => Ok(VocabularyOutcome::Existed),
@ -458,6 +490,59 @@ impl<'a, 'c> VersionedStore for InProgress<'a, 'c> {
VocabularyCheck::PresentButTooNew { newer_version } => Err(ErrorKind::ExistingVocabularyTooNew(definition.name.to_string(), newer_version.version, definition.version).into()),
}
}
fn ensure_vocabularies(&mut self, vocabularies: &VocabularyProvider) -> Result<BTreeMap<NamespacedKeyword, VocabularyOutcome>> {
let mut install = Vec::new();
let mut update = Vec::new();
let mut missing = Vec::new();
let mut out = BTreeMap::new();
for definition in vocabularies.definitions.iter() {
match self.check_vocabulary(definition)? {
VocabularyCheck::Present => {
out.insert(definition.name.clone(), VocabularyOutcome::Existed);
},
VocabularyCheck::NotPresent => {
install.push(definition);
},
VocabularyCheck::PresentButNeedsUpdate { older_version } => {
update.push((definition, older_version));
},
VocabularyCheck::PresentButMissingAttributes { attributes } => {
missing.push((definition, attributes));
},
VocabularyCheck::PresentButTooNew { newer_version } => {
bail!(ErrorKind::ExistingVocabularyTooNew(definition.name.to_string(), newer_version.version, definition.version));
},
}
}
if install.is_empty() && update.is_empty() && missing.is_empty() {
return Ok(out);
}
// If any work needs to be done, run pre/post.
(vocabularies.pre)(self)?;
for d in install {
out.insert(d.name.clone(), self.install_vocabulary(d)?);
}
for (d, v) in update {
out.insert(d.name.clone(), self.upgrade_vocabulary(d, v)?);
}
for (d, a) in missing {
out.insert(d.name.clone(), self.install_attributes_for(d, a)?);
}
(vocabularies.post)(self)?;
Ok(out)
}
}
pub struct VocabularyProvider {
pub pre: fn(&mut InProgress) -> Result<()>,
pub post: fn(&mut InProgress) -> Result<()>,
pub definitions: Vec<Definition>,
}
impl<'a, 'c> VocabularyMechanics for InProgress<'a, 'c> {
@ -469,17 +554,23 @@ impl<'a, 'c> VocabularyMechanics for InProgress<'a, 'c> {
}
fn install_attributes_for<'definition>(&mut self, definition: &'definition Definition, attributes: Vec<&'definition (NamespacedKeyword, Attribute)>) -> Result<VocabularyOutcome> {
let (terms, tempids) = definition.description_for_attributes(&attributes, self)?;
let (terms, tempids) = definition.description_for_attributes(&attributes, self, None)?;
self.transact_terms(terms, tempids)?;
Ok(VocabularyOutcome::InstalledMissingAttributes)
}
/// Turn the declarative parts of the vocabulary into alterations. Run the 'pre' steps.
/// Transact the changes. Run the 'post' steps. Return the result and the new `InProgress`!
fn upgrade_vocabulary(&mut self, _definition: &Definition, _from_version: Vocabulary) -> Result<VocabularyOutcome> {
unimplemented!();
// TODO
// Ok(VocabularyOutcome::Installed)
fn upgrade_vocabulary(&mut self, definition: &Definition, from_version: Vocabulary) -> Result<VocabularyOutcome> {
// It's sufficient for us to generate the datom form of each attribute and transact that.
// We trust that the vocabulary will implement a 'pre' function that cleans up data for any
// failable conversion (e.g., cardinality-many to cardinality-one).
// TODO: don't do work for attributes that are unchanged. Here we rely on the transactor
// to elide duplicate datoms.
let (terms, tempids) = definition.description_diff(self, &from_version)?;
self.transact_terms(terms, tempids)?;
Ok(VocabularyOutcome::Upgraded)
}
}

View file

@ -23,6 +23,7 @@ use mentat::vocabulary::{
VersionedStore,
VocabularyCheck,
VocabularyOutcome,
VocabularyProvider,
};
use mentat::query::IntoResult;
@ -38,6 +39,7 @@ use mentat::{
Conn,
NamespacedKeyword,
Queryable,
Store,
TypedValue,
ValueType,
};
@ -291,4 +293,222 @@ fn test_add_vocab() {
_ => panic!(),
}
}
// Some alterations -- cardinality/one to cardinality/many, unique to weaker unique or
// no unique, unindexed to indexed -- can be applied automatically, so long as you
// bump the version number.
let multival_bar = vocabulary::AttributeBuilder::helpful()
.value_type(ValueType::Instant)
.multival(true)
.index(true)
.build();
let multival_bar_and_baz = vec![
(kw!(:foo/bar), multival_bar),
(kw!(:foo/baz), baz.clone()),
];
let altered_vocabulary = vocabulary::Definition {
name: kw!(:org.mozilla/foo),
version: 2,
attributes: multival_bar_and_baz,
};
// foo/bar starts single-valued.
assert_eq!(false, conn.current_schema().attribute_for_ident(&kw!(:foo/bar)).expect("attribute").0.multival);
// Scoped borrow of `conn`.
{
let mut in_progress = conn.begin_transaction(&mut sqlite).expect("begun successfully");
assert_eq!(in_progress.ensure_vocabulary(&altered_vocabulary).expect("success"),
VocabularyOutcome::Upgraded);
in_progress.commit().expect("commit succeeded");
}
// Now it's multi-valued.
assert_eq!(true, conn.current_schema().attribute_for_ident(&kw!(:foo/bar)).expect("attribute").0.multival);
}
// This is a real-world-style test that evolves a schema with data changes.
// We start with a basic vocabulary in three parts:
//
// Part 1 describes foods by name.
// Part 2 describes movies by title.
// Part 3 describes people: their names and heights, and their likes.
//
// We simulate three common migrations:
// - We made a trivial modeling error: movie names should not be unique.
// - We made a less trivial modeling error, one that can fail: food names should be unique so that
// we can more easily refer to them during writes.
// In order for this migration to succeed, we need to merge duplicates, then alter the schema --
// which we will do by introducing a new property in the same vocabulary, deprecating the old one
// -- then transact the transformed data.
// - We need to normalize some non-unique data: we recorded heights in inches when they should be
// in centimeters.
// - We need to normalize some unique data: food names should all be lowercase. Again, that can fail
// because of a uniqueness constraint. (We might know that it can't fail thanks to application
// restrictions, in which case we can treat this as we did the height alteration.)
// - We made a more significant modeling error: we used 'like' to identify both movies and foods,
// and we have decided that food preferences and movie preferences should be different attributes.
// We wish to split these up and deprecate the old attribute. In order to do so we need to retract
// all of the datoms that use the old attribute, transact new attributes _in both movies and foods_,
// then re-assert the data.
#[test]
fn test_upgrade_with_functions() {
let mut store = Store::open("").expect("open");
let food_v1 = vocabulary::Definition {
name: kw!(:org.mozilla/food),
version: 1,
attributes: vec![
(kw!(:food/name),
vocabulary::AttributeBuilder::helpful()
.value_type(ValueType::String)
.multival(false)
.build()),
],
};
let movies_v1 = vocabulary::Definition {
name: kw!(:org.mozilla/movies),
version: 1,
attributes: vec![
(kw!(:movie/year),
vocabulary::AttributeBuilder::helpful()
.value_type(ValueType::Long) // No need for Instant here.
.multival(false)
.build()),
(kw!(:movie/title),
vocabulary::AttributeBuilder::helpful()
.value_type(ValueType::String)
.multival(false)
.unique(vocabulary::attribute::Unique::Identity)
.index(true)
.build()),
],
};
let people_v1 = vocabulary::Definition {
name: kw!(:org.mozilla/people),
version: 1,
attributes: vec![
(kw!(:person/name),
vocabulary::AttributeBuilder::helpful()
.value_type(ValueType::String)
.multival(false)
.unique(vocabulary::attribute::Unique::Identity)
.index(true)
.build()),
(kw!(:person/height),
vocabulary::AttributeBuilder::helpful()
.value_type(ValueType::Long)
.multival(false)
.build()),
(kw!(:person/likes),
vocabulary::AttributeBuilder::helpful()
.value_type(ValueType::Ref)
.multival(true)
.build()),
],
};
// Apply v1 of each.
let v1_provider = VocabularyProvider {
pre: |_ip| Ok(()),
definitions: vec![
food_v1.clone(),
movies_v1.clone(),
people_v1.clone(),
],
post: |_ip| Ok(()),
};
// Mutable borrow of store.
{
let mut in_progress = store.begin_transaction().expect("began");
in_progress.ensure_vocabularies(&v1_provider).expect("success");
// Also add some data. We do this in one transaction 'cos -- thanks to the modeling errors
// we are about to fix! -- it's a little awkward to make references to entities without
// unique attributes.
in_progress.transact(r#"[
{:movie/title "John Wick"
:movie/year 2014
:db/id "mjw"}
{:movie/title "Terminator 2: Judgment Day"
:movie/year 1991
:db/id "mt2"}
{:movie/title "Dune"
:db/id "md"
:movie/year 1984}
{:movie/title "Upstream Color"
:movie/year 2013
:db/id "muc"}
{:movie/title "Primer"
:db/id "mp"
:movie/year 2004}
;; No year: not yet released.
{:movie/title "The Modern Ocean"
:db/id "mtmo"}
{:food/name "Carrots" :db/id "fc"}
{:food/name "Weird blue worms" :db/id "fwbw"}
{:food/name "Spice" :db/id "fS"}
{:food/name "spice" :db/id "fs"}
;; Sam likes action movies, carrots, and lowercase spice.
{:person/name "Sam"
:person/height 64
:person/likes ["mjw", "mt2", "fc", "fs"]}
;; Beth likes thoughtful and weird movies, weird blue worms, and Spice.
{:person/name "Beth"
:person/height 68
:person/likes ["muc", "mp", "md", "fwbw", "fS"]}
]"#).expect("transacted");
in_progress.commit().expect("commit succeeded");
}
// Mutable borrow of store.
{
// Crap, there are several movies named Dune. We need to de-uniqify that attribute.
let movies_v2 = vocabulary::Definition {
name: kw!(:org.mozilla/movies),
version: 2,
attributes: vec![
(kw!(:movie/title),
vocabulary::AttributeBuilder::helpful()
.value_type(ValueType::String)
.multival(false)
.non_unique()
.index(true)
.build()),
],
};
let mut in_progress = store.begin_transaction().expect("began");
in_progress.ensure_vocabulary(&movies_v2).expect("success");
// We can now add another Dune movie: Denis Villeneuve's 2019 version.
// (Let's just pretend that it's been released, here in 2018!)
in_progress.transact(r#"[
{:movie/title "Dune"
:movie/year 2019}
]"#).expect("transact succeeded");
// And we can query both.
let years =
in_progress.q_once(r#"[:find [?year ...]
:where [?movie :movie/title "Dune"]
[?movie :movie/year ?year]
:order (asc ?year)]"#, None)
.into_coll_result()
.expect("coll");
assert_eq!(years, vec![TypedValue::Long(1984), TypedValue::Long(2019)]);
in_progress.commit().expect("commit succeeded");
}
}