diff --git a/core/src/cache.rs b/core/src/cache.rs index de07e846..32eda719 100644 --- a/core/src/cache.rs +++ b/core/src/cache.rs @@ -23,6 +23,8 @@ use ::{ pub trait CachedAttributes { fn is_attribute_cached_reverse(&self, entid: Entid) -> bool; fn is_attribute_cached_forward(&self, entid: Entid) -> bool; + fn has_cached_attributes(&self) -> bool; + fn get_values_for_entid(&self, schema: &Schema, attribute: Entid, entid: Entid) -> Option<&Vec>; fn get_value_for_entid(&self, schema: &Schema, attribute: Entid, entid: Entid) -> Option<&TypedValue>; @@ -30,3 +32,9 @@ pub trait CachedAttributes { fn get_entid_for_value(&self, attribute: Entid, value: &TypedValue) -> Option; fn get_entids_for_value(&self, attribute: Entid, value: &TypedValue) -> Option<&BTreeSet>; } + +pub trait UpdateableCache { + type Error; + fn update(&mut self, schema: &Schema, retractions: I, assertions: I) -> Result<(), Self::Error> + where I: Iterator; +} diff --git a/core/src/lib.rs b/core/src/lib.rs index bbbf02e5..b36c85d0 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -51,7 +51,10 @@ pub use edn::{ Utc, }; -pub use cache::CachedAttributes; +pub use cache::{ + CachedAttributes, + UpdateableCache, +}; /// Core types defining a Mentat knowledge base. diff --git a/db/Cargo.toml b/db/Cargo.toml index 7ace370c..60d1e735 100644 --- a/db/Cargo.toml +++ b/db/Cargo.toml @@ -7,6 +7,7 @@ workspace = ".." error-chain = { git = "https://github.com/rnewman/error-chain", branch = "rnewman/sync" } itertools = "0.7" lazy_static = "0.2" +num = "0.1" ordered-float = "0.5" time = "0.1" diff --git a/db/src/cache.rs b/db/src/cache.rs index 01e3d0e2..bcb3c12f 100644 --- a/db/src/cache.rs +++ b/db/src/cache.rs @@ -1,4 +1,4 @@ -// Copyright 2016 Mozilla +// Copyright 2018 Mozilla // // Licensed under the Apache License, Version 2.0 (the "License"); you may not use // this file except in compliance with the License. You may obtain a copy of the @@ -8,13 +8,74 @@ // CONDITIONS OF ANY KIND, either express or implied. See the License for the // specific language governing permissions and limitations under the License. +///! An implementation of attribute caching. +///! Attribute caching means storing the entities and values for a given attribute, in the current +///! state of the world, in one or both directions (forward or reverse). +///! +///! One might use a reverse cache to implement fast in-memory lookup of unique identities. One +///! might use a forward cache to implement fast in-memory lookup of common properties. +///! +///! These caches are specialized wrappers around maps. We have four: single/multi forward, and +///! unique/non-unique reverse. There are traits to provide access. +///! +///! A little tower of functions allows for multiple caches to be updated when provided with a +///! single SQLite cursor over `(a, e, v)`s, sorted appropriately. +///! +///! Much of the complexity in this module is to support copy-on-write. +///! +///! When a transaction begins, we expect: +///! +///! - Existing references to the `Conn`'s attribute cache to still be valid. +///! - Isolation to be preserved: that cache will return the same answers until the transaction +///! commits and a fresh reference to the cache is obtained. +///! - Assertions and retractions within the transaction to be reflected in the cache. +///! - Retractions apply first, then assertions. +///! - No writes = limited memory allocation for the cache. +///! - New attributes can be cached, and existing attributes uncached, during the transaction. +///! These changes are isolated, too. +///! +///! All of this means that we need a decent copy-on-write layer that can represent retractions. +///! +///! This is `InProgressSQLiteAttributeCache`. It listens for committed transactions, and handles +///! changes to the cached attribute set, maintaining a reference back to the stable cache. When +///! necessary it copies and modifies. Retractions are modeled via a `None` option. +///! +///! When we're done, we take each of the four caches, and each cached attribute that changed, and +///! absorbe them back into the stable cache. This uses `Arc::make_mut`, so if nobody is looking at +///! the old cache, we modify it in place. +///! +///! Most of the tests for this module are actually in `conn.rs`, where we can set up transactions +///! and test the external API. + use std::collections::{ BTreeMap, BTreeSet, + HashSet, }; +use std::collections::btree_map::{ + Entry, +}; + +use std::collections::btree_map::Entry::{ + Occupied, + Vacant, +}; + +use std::iter::{ + once, +}; + +use std::mem; + +use std::rc::Rc; + +use std::sync::Arc; + use std::iter::Peekable; +use num; + use rusqlite; use mentat_core::{ @@ -23,6 +84,15 @@ use mentat_core::{ HasSchema, Schema, TypedValue, + UpdateableCache, +}; + +use mentat_core::util::{ + Either, +}; + +use mentat_tx::entities::{ + OpType, }; use db::{ @@ -34,25 +104,132 @@ use errors::{ Result, }; -pub type Aev = (Entid, Entid, TypedValue); - -fn row_to_aev(row: &rusqlite::Row) -> Aev { - let a: Entid = row.get(0); - let e: Entid = row.get(1); - let value_type_tag: i32 = row.get(3); - let v = TypedValue::from_sql_value_pair(row.get(2), value_type_tag).map(|x| x).unwrap(); - (a, e, v) -} +use watcher::{ + TransactWatcher, +}; +// Right now we use BTreeMap, because we expect few cached attributes. pub type CacheMap = BTreeMap; -pub struct AevRows<'conn> { - rows: rusqlite::MappedRows<'conn, fn(&rusqlite::Row) -> Aev>, +trait Remove where T: PartialEq { + fn remove_every(&mut self, item: &T) -> usize; +} + +impl Remove for Vec where T: PartialEq { + /// Remove all occurrences from a vector in-place, by equality. + /// Eventually replace with unstable feature: #40062. + fn remove_every(&mut self, item: &T) -> usize { + let mut removed = 0; + let range = num::range_step_inclusive(self.len() as isize - 1, 0, -1); + for i in range { + if self.get(i as usize).map_or(false, |x| x == item) { + self.remove(i as usize); + removed += 1; + } + } + removed + } +} + +trait Absorb { + fn absorb(&mut self, other: Self); +} + +impl Absorb for CacheMap> where K: Ord { + fn absorb(&mut self, other: Self) { + for (e, v) in other.into_iter() { + match v { + None => { + // It was deleted. Remove it from our map. + self.remove(&e); + }, + s @ Some(_) => { + self.insert(e, s); + }, + } + } + } +} + +trait ExtendByAbsorbing { + /// Just like `extend`, but rather than replacing our value with the other, the other is + /// absorbed into ours. + fn extend_by_absorbing(&mut self, other: Self); +} + +impl ExtendByAbsorbing for BTreeMap where K: Ord, V: Absorb { + fn extend_by_absorbing(&mut self, other: Self) { + for (k, v) in other.into_iter() { + match self.entry(k) { + Occupied(mut entry) => { + entry.get_mut().absorb(v); + }, + Vacant(entry) => { + entry.insert(v); + }, + } + } + } +} + +// Can't currently put doc tests on traits, so here it is. +#[test] +fn test_vec_remove_item() { + let mut v = vec![1, 2, 3, 4, 5, 4, 3]; + v.remove_every(&3); + assert_eq!(v, vec![1, 2, 4, 5, 4]); + v.remove_every(&4); + assert_eq!(v, vec![1, 2, 5]); +} + +// +// The basics of attribute caching. +// + +pub type Aev = (Entid, Entid, TypedValue); + +pub struct AevFactory { + // Our own simple string-interning system. + strings: HashSet>, +} + +impl AevFactory { + fn new() -> AevFactory { + AevFactory { + strings: Default::default(), + } + } + + fn intern(&mut self, v: TypedValue) -> TypedValue { + match v { + TypedValue::String(rc) => { + let existing = self.strings.get(&rc).cloned().map(TypedValue::String); + if let Some(existing) = existing { + return existing; + } + self.strings.insert(rc.clone()); + return TypedValue::String(rc); + }, + t => t, + } + } + + fn row_to_aev(&mut self, row: &rusqlite::Row) -> Aev { + let a: Entid = row.get(0); + let e: Entid = row.get(1); + let value_type_tag: i32 = row.get(3); + let v = TypedValue::from_sql_value_pair(row.get(2), value_type_tag).map(|x| x).unwrap(); + (a, e, self.intern(v)) + } +} + +pub struct AevRows<'conn, F> { + rows: rusqlite::MappedRows<'conn, F>, } /// Unwrap the Result from MappedRows. We could also use this opportunity to map_err it, but /// for now it's convenient to avoid error handling. -impl<'conn> Iterator for AevRows<'conn> { +impl<'conn, F> Iterator for AevRows<'conn, F> where F: FnMut(&rusqlite::Row) -> Aev { type Item = Aev; fn next(&mut self) -> Option { self.rows @@ -63,52 +240,119 @@ impl<'conn> Iterator for AevRows<'conn> { // The behavior of the cache is different for different kinds of attributes: // - cardinality/one doesn't need a vec -// - unique/* should have a bijective mapping (reverse lookup) +// - unique/* should ideally have a bijective mapping (reverse lookup) -trait CardinalityOneCache { +trait RemoveFromCache { + fn remove(&mut self, e: Entid, v: &TypedValue); +} + +trait ClearCache { fn clear(&mut self); +} + +trait CardinalityOneCache: RemoveFromCache + ClearCache { fn set(&mut self, e: Entid, v: TypedValue); fn get(&self, e: Entid) -> Option<&TypedValue>; } -trait CardinalityManyCache { - fn clear(&mut self); +trait CardinalityManyCache: RemoveFromCache + ClearCache { fn acc(&mut self, e: Entid, v: TypedValue); fn set(&mut self, e: Entid, vs: Vec); fn get(&self, e: Entid) -> Option<&Vec>; } -#[derive(Debug, Default)] +#[derive(Clone, Debug, Default)] struct SingleValAttributeCache { attr: Entid, - e_v: CacheMap, + e_v: CacheMap>, } -impl CardinalityOneCache for SingleValAttributeCache { +impl Absorb for SingleValAttributeCache { + fn absorb(&mut self, other: Self) { + assert_eq!(self.attr, other.attr); + self.e_v.absorb(other.e_v); + } +} + +impl ClearCache for SingleValAttributeCache { fn clear(&mut self) { self.e_v.clear(); } +} - fn set(&mut self, e: Entid, v: TypedValue) { - self.e_v.insert(e, v); - } - - fn get(&self, e: Entid) -> Option<&TypedValue> { - self.e_v.get(&e) +impl RemoveFromCache for SingleValAttributeCache { + // We never directly remove from the cache unless we're InProgress. In that case, we + // want to leave a sentinel in place. + fn remove(&mut self, e: Entid, v: &TypedValue) { + match self.e_v.entry(e) { + Occupied(mut entry) => { + let removed = entry.insert(None); + match removed { + None => {}, // Already removed. + Some(ref r) if r == v => {}, // We removed it! + r => { + eprintln!("Cache inconsistency: should be ({}, {:?}), was ({}, {:?}).", + e, v, e, r); + } + } + }, + Vacant(entry) => { + entry.insert(None); + }, + } } } -#[derive(Debug, Default)] +impl CardinalityOneCache for SingleValAttributeCache { + fn set(&mut self, e: Entid, v: TypedValue) { + self.e_v.insert(e, Some(v)); + } + + fn get(&self, e: Entid) -> Option<&TypedValue> { + self.e_v.get(&e).and_then(|m| m.as_ref()) + } +} + +#[derive(Clone, Debug, Default)] struct MultiValAttributeCache { attr: Entid, e_vs: CacheMap>, } -impl CardinalityManyCache for MultiValAttributeCache { +impl Absorb for MultiValAttributeCache { + fn absorb(&mut self, other: Self) { + assert_eq!(self.attr, other.attr); + for (e, vs) in other.e_vs.into_iter() { + if vs.is_empty() { + self.e_vs.remove(&e); + } else { + // We always override with a whole vector, so let's just overwrite. + self.e_vs.insert(e, vs); + } + } + } +} + +impl ClearCache for MultiValAttributeCache { fn clear(&mut self) { self.e_vs.clear(); } +} +impl RemoveFromCache for MultiValAttributeCache { + fn remove(&mut self, e: Entid, v: &TypedValue) { + if let Some(vec) = self.e_vs.get_mut(&e) { + let removed = vec.remove_every(v); + if removed == 0 { + eprintln!("Cache inconsistency: tried to remove ({}, {:?}), was not present.", e, v); + } + } else { + eprintln!("Cache inconsistency: tried to remove ({}, {:?}), was empty.", e, v); + } + } +} + +impl CardinalityManyCache for MultiValAttributeCache { fn acc(&mut self, e: Entid, v: TypedValue) { self.e_vs.entry(e).or_insert(vec![]).push(v) } @@ -122,37 +366,100 @@ impl CardinalityManyCache for MultiValAttributeCache { } } -#[derive(Debug, Default)] +#[derive(Clone, Debug, Default)] struct UniqueReverseAttributeCache { attr: Entid, - v_e: CacheMap, + v_e: CacheMap>, } -impl UniqueReverseAttributeCache { +impl Absorb for UniqueReverseAttributeCache { + fn absorb(&mut self, other: Self) { + assert_eq!(self.attr, other.attr); + self.v_e.absorb(other.v_e); + } +} + +impl ClearCache for UniqueReverseAttributeCache { fn clear(&mut self) { self.v_e.clear(); } +} +impl RemoveFromCache for UniqueReverseAttributeCache { + fn remove(&mut self, e: Entid, v: &TypedValue) { + match self.v_e.entry(v.clone()) { // Future: better entry API! + Occupied(mut entry) => { + let removed = entry.insert(None); + match removed { + None => {}, // Already removed. + Some(r) if r == e => {}, // We removed it! + r => { + eprintln!("Cache inconsistency: should be ({}, {:?}), was ({}, {:?}).", e, v, e, r); + } + } + }, + Vacant(entry) => { + // It didn't already exist. + entry.insert(None); + }, + } + } +} + +impl UniqueReverseAttributeCache { fn set(&mut self, e: Entid, v: TypedValue) { - self.v_e.insert(v, e); + self.v_e.insert(v, Some(e)); } fn get_e(&self, v: &TypedValue) -> Option { + self.v_e.get(v).and_then(|o| o.clone()) + } + + fn lookup(&self, v: &TypedValue) -> Option> { self.v_e.get(v).cloned() } } -#[derive(Debug, Default)] +#[derive(Clone, Debug, Default)] struct NonUniqueReverseAttributeCache { attr: Entid, v_es: CacheMap>, } -impl NonUniqueReverseAttributeCache { +impl Absorb for NonUniqueReverseAttributeCache { + fn absorb(&mut self, other: Self) { + assert_eq!(self.attr, other.attr); + for (v, es) in other.v_es.into_iter() { + if es.is_empty() { + self.v_es.remove(&v); + } else { + // We always override with a whole vector, so let's just overwrite. + self.v_es.insert(v, es); + } + } + } +} + +impl ClearCache for NonUniqueReverseAttributeCache { fn clear(&mut self) { self.v_es.clear(); } +} +impl RemoveFromCache for NonUniqueReverseAttributeCache { + fn remove(&mut self, e: Entid, v: &TypedValue) { + if let Some(vec) = self.v_es.get_mut(&v) { + let removed = vec.remove(&e); + if !removed { + eprintln!("Cache inconsistency: tried to remove ({}, {:?}), was not present.", e, v); + } + } else { + eprintln!("Cache inconsistency: tried to remove ({}, {:?}), was empty.", e, v); + } + } +} + +impl NonUniqueReverseAttributeCache { fn acc(&mut self, e: Entid, v: TypedValue) { self.v_es.entry(v).or_insert(BTreeSet::new()).insert(e); } @@ -162,17 +469,6 @@ impl NonUniqueReverseAttributeCache { } } -#[derive(Debug, Default)] -pub struct AttributeCaches { - reverse_cached_attributes: BTreeSet, - forward_cached_attributes: BTreeSet, - - single_vals: BTreeMap, - multi_vals: BTreeMap, - unique_reverse: BTreeMap, - non_unique_reverse: BTreeMap, -} - fn with_aev_iter(a: Entid, iter: &mut Peekable, mut f: F) where I: Iterator, F: FnMut(Entid, TypedValue) { @@ -227,6 +523,51 @@ fn accumulate_multi_val_non_unique_evs_both(a: Entid, f: &mut C, r: &mut N }) } +fn accumulate_removal_one(a: Entid, c: &mut C, iter: &mut Peekable) where I: Iterator, C: RemoveFromCache { + with_aev_iter(a, iter, |e, v| { + c.remove(e, &v); + }) +} + +fn accumulate_removal_both(a: Entid, f: &mut F, r: &mut R, iter: &mut Peekable) +where I: Iterator, F: RemoveFromCache, R: RemoveFromCache { + with_aev_iter(a, iter, |e, v| { + f.remove(e, &v); + r.remove(e, &v); + }) +} + + +// +// Collect four different kinds of cache together, and track what we're storing. +// + +#[derive(Copy, Clone, Eq, PartialEq)] +enum AccumulationBehavior { + Add { replacing: bool }, + Remove, +} + +impl AccumulationBehavior { + fn is_replacing(&self) -> bool { + match self { + &AccumulationBehavior::Add { replacing } => replacing, + _ => false, + } + } +} + +#[derive(Clone, Debug, Default)] +pub struct AttributeCaches { + reverse_cached_attributes: BTreeSet, + forward_cached_attributes: BTreeSet, + + single_vals: BTreeMap, + multi_vals: BTreeMap, + unique_reverse: BTreeMap, + non_unique_reverse: BTreeMap, +} + // TODO: if an entity or attribute is ever renumbered, the cache will need to be rebuilt. impl AttributeCaches { // @@ -235,127 +576,218 @@ impl AttributeCaches { // s = single-val; m = multi-val. // u = unique; nu = non-unique. // c = cache. + // Note that each of these optionally copies the entry from a fallback cache for copy-on-write. #[inline] - fn fsc(&mut self, a: Entid) -> &mut SingleValAttributeCache { + fn fsc(&mut self, a: Entid, fallback: Option<&AttributeCaches>) -> &mut SingleValAttributeCache { self.single_vals .entry(a) - .or_insert_with(Default::default) + .or_insert_with(|| fallback.and_then(|c| c.single_vals.get(&a).cloned()) + .unwrap_or_else(Default::default)) } #[inline] - fn fmc(&mut self, a: Entid) -> &mut MultiValAttributeCache { + fn fmc(&mut self, a: Entid, fallback: Option<&AttributeCaches>) -> &mut MultiValAttributeCache { self.multi_vals .entry(a) - .or_insert_with(Default::default) + .or_insert_with(|| fallback.and_then(|c| c.multi_vals.get(&a).cloned()) + .unwrap_or_else(Default::default)) } #[inline] - fn ruc(&mut self, a: Entid) -> &mut UniqueReverseAttributeCache { + fn ruc(&mut self, a: Entid, fallback: Option<&AttributeCaches>) -> &mut UniqueReverseAttributeCache { self.unique_reverse .entry(a) - .or_insert_with(Default::default) + .or_insert_with(|| fallback.and_then(|c| c.unique_reverse.get(&a).cloned()) + .unwrap_or_else(Default::default)) } #[inline] - fn rnuc(&mut self, a: Entid) -> &mut NonUniqueReverseAttributeCache { + fn rnuc(&mut self, a: Entid, fallback: Option<&AttributeCaches>) -> &mut NonUniqueReverseAttributeCache { self.non_unique_reverse .entry(a) - .or_insert_with(Default::default) + .or_insert_with(|| fallback.and_then(|c| c.non_unique_reverse.get(&a).cloned()) + .unwrap_or_else(Default::default)) } #[inline] - fn both_s_u<'r>(&'r mut self, a: Entid) -> (&'r mut SingleValAttributeCache, &'r mut UniqueReverseAttributeCache) { - (self.single_vals.entry(a).or_insert_with(Default::default), - self.unique_reverse.entry(a).or_insert_with(Default::default)) + fn both_s_u<'r>(&'r mut self, a: Entid, forward_fallback: Option<&AttributeCaches>, reverse_fallback: Option<&AttributeCaches>) -> (&'r mut SingleValAttributeCache, &'r mut UniqueReverseAttributeCache) { + (self.single_vals + .entry(a) + .or_insert_with(|| forward_fallback.and_then(|c| c.single_vals.get(&a).cloned()) + .unwrap_or_else(Default::default)), + self.unique_reverse + .entry(a) + .or_insert_with(|| reverse_fallback.and_then(|c| c.unique_reverse.get(&a).cloned()) + .unwrap_or_else(Default::default))) } #[inline] - fn both_m_u<'r>(&'r mut self, a: Entid) -> (&'r mut MultiValAttributeCache, &'r mut UniqueReverseAttributeCache) { - (self.multi_vals.entry(a).or_insert_with(Default::default), - self.unique_reverse.entry(a).or_insert_with(Default::default)) + fn both_m_u<'r>(&'r mut self, a: Entid, forward_fallback: Option<&AttributeCaches>, reverse_fallback: Option<&AttributeCaches>) -> (&'r mut MultiValAttributeCache, &'r mut UniqueReverseAttributeCache) { + (self.multi_vals + .entry(a) + .or_insert_with(|| forward_fallback.and_then(|c| c.multi_vals.get(&a).cloned()) + .unwrap_or_else(Default::default)), + self.unique_reverse + .entry(a) + .or_insert_with(|| reverse_fallback.and_then(|c| c.unique_reverse.get(&a).cloned()) + .unwrap_or_else(Default::default))) } #[inline] - fn both_s_nu<'r>(&'r mut self, a: Entid) -> (&'r mut SingleValAttributeCache, &'r mut NonUniqueReverseAttributeCache) { - (self.single_vals.entry(a).or_insert_with(Default::default), - self.non_unique_reverse.entry(a).or_insert_with(Default::default)) + fn both_s_nu<'r>(&'r mut self, a: Entid, forward_fallback: Option<&AttributeCaches>, reverse_fallback: Option<&AttributeCaches>) -> (&'r mut SingleValAttributeCache, &'r mut NonUniqueReverseAttributeCache) { + (self.single_vals + .entry(a) + .or_insert_with(|| forward_fallback.and_then(|c| c.single_vals.get(&a).cloned()) + .unwrap_or_else(Default::default)), + self.non_unique_reverse + .entry(a) + .or_insert_with(|| reverse_fallback.and_then(|c| c.non_unique_reverse.get(&a).cloned()) + .unwrap_or_else(Default::default))) } #[inline] - fn both_m_nu<'r>(&'r mut self, a: Entid) -> (&'r mut MultiValAttributeCache, &'r mut NonUniqueReverseAttributeCache) { - (self.multi_vals.entry(a).or_insert_with(Default::default), - self.non_unique_reverse.entry(a).or_insert_with(Default::default)) + fn both_m_nu<'r>(&'r mut self, a: Entid, forward_fallback: Option<&AttributeCaches>, reverse_fallback: Option<&AttributeCaches>) -> (&'r mut MultiValAttributeCache, &'r mut NonUniqueReverseAttributeCache) { + (self.multi_vals + .entry(a) + .or_insert_with(|| forward_fallback.and_then(|c| c.multi_vals.get(&a).cloned()) + .unwrap_or_else(Default::default)), + self.non_unique_reverse + .entry(a) + .or_insert_with(|| reverse_fallback.and_then(|c| c.non_unique_reverse.get(&a).cloned()) + .unwrap_or_else(Default::default))) } // Process rows in `iter` that all share an attribute with the first. Leaves the iterator // advanced to the first non-matching row. - fn accumulate_evs(&mut self, schema: &Schema, iter: &mut Peekable, replace_a: bool) where I: Iterator { + fn accumulate_evs(&mut self, + fallback: Option<&AttributeCaches>, + schema: &Schema, + iter: &mut Peekable, + behavior: AccumulationBehavior) where I: Iterator { if let Some(&(a, _, _)) = iter.peek() { if let Some(attribute) = schema.attribute_for_entid(a) { - let forward = self.is_attribute_cached_forward(a); - let reverse = self.is_attribute_cached_reverse(a); + let fallback_cached_forward = fallback.map_or(false, |c| c.is_attribute_cached_forward(a)); + let fallback_cached_reverse = fallback.map_or(false, |c| c.is_attribute_cached_reverse(a)); + let now_cached_forward = self.is_attribute_cached_forward(a); + let now_cached_reverse = self.is_attribute_cached_reverse(a); + + let replace_a = behavior.is_replacing(); + let copy_forward_if_missing = now_cached_forward && fallback_cached_forward && !replace_a; + let copy_reverse_if_missing = now_cached_reverse && fallback_cached_reverse && !replace_a; + + let forward_fallback = if copy_forward_if_missing { + fallback + } else { + None + }; + let reverse_fallback = if copy_reverse_if_missing { + fallback + } else { + None + }; + let multi = attribute.multival; let unique = attribute.unique.is_some(); - match (forward, reverse, multi, unique) { + match (now_cached_forward, now_cached_reverse, multi, unique) { (true, true, true, true) => { - let (f, r) = self.both_m_u(a); - if replace_a { - f.clear(); - r.clear(); + let (f, r) = self.both_m_u(a, forward_fallback, reverse_fallback); + match behavior { + AccumulationBehavior::Add { replacing } => { + if replacing { + f.clear(); + r.clear(); + } + accumulate_multi_val_unique_evs_both(a, f, r, iter); + }, + AccumulationBehavior::Remove => accumulate_removal_both(a, f, r, iter), } - accumulate_multi_val_unique_evs_both(a, f, r, iter); }, (true, true, true, false) => { - let (f, r) = self.both_m_nu(a); - if replace_a { - f.clear(); - r.clear(); + let (f, r) = self.both_m_nu(a, forward_fallback, reverse_fallback); + match behavior { + AccumulationBehavior::Add { replacing } => { + if replacing { + f.clear(); + r.clear(); + } + accumulate_multi_val_non_unique_evs_both(a, f, r, iter); + }, + AccumulationBehavior::Remove => accumulate_removal_both(a, f, r, iter), } - accumulate_multi_val_non_unique_evs_both(a, f, r, iter); }, (true, true, false, true) => { - let (f, r) = self.both_s_u(a); - if replace_a { - f.clear(); - r.clear(); + let (f, r) = self.both_s_u(a, forward_fallback, reverse_fallback); + match behavior { + AccumulationBehavior::Add { replacing } => { + if replacing { + f.clear(); + r.clear(); + } + accumulate_single_val_unique_evs_both(a, f, r, iter); + }, + AccumulationBehavior::Remove => accumulate_removal_both(a, f, r, iter), } - accumulate_single_val_unique_evs_both(a, f, r, iter); }, (true, true, false, false) => { - let (f, r) = self.both_s_nu(a); - if replace_a { - f.clear(); - r.clear(); + let (f, r) = self.both_s_nu(a, forward_fallback, reverse_fallback); + match behavior { + AccumulationBehavior::Add { replacing } => { + if replacing { + f.clear(); + r.clear(); + } + accumulate_single_val_non_unique_evs_both(a, f, r, iter); + }, + AccumulationBehavior::Remove => accumulate_removal_both(a, f, r, iter), } - accumulate_single_val_non_unique_evs_both(a, f, r, iter); }, (true, false, true, _) => { - let f = self.fmc(a); - if replace_a { - f.clear(); + let f = self.fmc(a, forward_fallback); + match behavior { + AccumulationBehavior::Add { replacing } => { + if replacing { + f.clear(); + } + accumulate_multi_val_evs_forward(a, f, iter); + }, + AccumulationBehavior::Remove => accumulate_removal_one(a, f, iter), } - accumulate_multi_val_evs_forward(a, f, iter) }, (true, false, false, _) => { - let f = self.fsc(a); - if replace_a { - f.clear(); + let f = self.fsc(a, forward_fallback); + match behavior { + AccumulationBehavior::Add { replacing } => { + if replacing { + f.clear(); + } + accumulate_single_val_evs_forward(a, f, iter) + }, + AccumulationBehavior::Remove => accumulate_removal_one(a, f, iter), } - accumulate_single_val_evs_forward(a, f, iter) }, (false, true, _, true) => { - let r = self.ruc(a); - if replace_a { - r.clear(); + let r = self.ruc(a, reverse_fallback); + match behavior { + AccumulationBehavior::Add { replacing } => { + if replacing { + r.clear(); + } + accumulate_unique_evs_reverse(a, r, iter); + }, + AccumulationBehavior::Remove => accumulate_removal_one(a, r, iter), } - accumulate_unique_evs_reverse(a, r, iter); }, (false, true, _, false) => { - let r = self.rnuc(a); - if replace_a { - r.clear(); + let r = self.rnuc(a, reverse_fallback); + match behavior { + AccumulationBehavior::Add { replacing } => { + if replacing { + r.clear(); + } + accumulate_non_unique_evs_reverse(a, r, iter); + }, + AccumulationBehavior::Remove => accumulate_removal_one(a, r, iter), } - accumulate_non_unique_evs_reverse(a, r, iter); }, (false, false, _, _) => { unreachable!(); // Must be cached in at least one direction! @@ -365,9 +797,9 @@ impl AttributeCaches { } } - fn add_to_cache(&mut self, schema: &Schema, mut iter: Peekable, replace_a: bool) -> Result<()> where I: Iterator { + fn accumulate_into_cache(&mut self, fallback: Option<&AttributeCaches>, schema: &Schema, mut iter: Peekable, behavior: AccumulationBehavior) -> Result<()> where I: Iterator { while iter.peek().is_some() { - self.accumulate_evs(schema, &mut iter, replace_a); + self.accumulate_evs(fallback, schema, &mut iter, behavior); } Ok(()) } @@ -397,6 +829,49 @@ impl AttributeCaches { } } +// We need this block for fallback. +impl AttributeCaches { + fn get_entid_for_value_if_present(&self, attribute: Entid, value: &TypedValue) -> Option> { + if self.is_attribute_cached_reverse(attribute) { + self.unique_reverse + .get(&attribute) + .and_then(|c| c.lookup(value)) + } else { + None + } + } + + fn get_value_for_entid_if_present(&self, schema: &Schema, attribute: Entid, entid: Entid) -> Option> { + if let Some(&Some(ref tv)) = self.value_pairs(schema, attribute) + .and_then(|c| c.get(&entid)) { + Some(Some(tv)) + } else { + None + } + } +} + +/// SQL stuff. +impl AttributeCaches { + fn repopulate(&mut self, + schema: &Schema, + sqlite: &rusqlite::Connection, + attribute: Entid) -> Result<()> { + let is_fulltext = schema.attribute_for_entid(attribute).map_or(false, |s| s.fulltext); + let table = if is_fulltext { "fulltext_datoms" } else { "datoms" }; + let sql = format!("SELECT a, e, v, value_type_tag FROM {} WHERE a = ? ORDER BY a ASC, e ASC", table); + let args: Vec<&rusqlite::types::ToSql> = vec![&attribute]; + let mut stmt = sqlite.prepare(&sql)?; + let mut aev_factory = AevFactory::new(); + let rows = stmt.query_map(&args, |row| aev_factory.row_to_aev(row))?; + let aevs = AevRows { + rows: rows, + }; + self.accumulate_into_cache(None, schema, aevs.peekable(), AccumulationBehavior::Add { replacing: true })?; + Ok(()) + } +} + impl CachedAttributes for AttributeCaches { fn get_values_for_entid(&self, schema: &Schema, attribute: Entid, entid: Entid) -> Option<&Vec> { self.values_pairs(schema, attribute) @@ -404,8 +879,17 @@ impl CachedAttributes for AttributeCaches { } fn get_value_for_entid(&self, schema: &Schema, attribute: Entid, entid: Entid) -> Option<&TypedValue> { - self.value_pairs(schema, attribute) - .and_then(|c| c.get(&entid)) + if let Some(&Some(ref tv)) = self.value_pairs(schema, attribute) + .and_then(|c| c.get(&entid)) { + Some(tv) + } else { + None + } + } + + fn has_cached_attributes(&self) -> bool { + !self.reverse_cached_attributes.is_empty() || + !self.forward_cached_attributes.is_empty() } fn is_attribute_cached_reverse(&self, attribute: Entid) -> bool { @@ -433,7 +917,24 @@ impl CachedAttributes for AttributeCaches { } } +impl UpdateableCache for AttributeCaches { + type Error = ::errors::Error; + fn update(&mut self, schema: &Schema, retractions: I, assertions: I) -> ::std::result::Result<(), Self::Error> + where I: Iterator { + self.update_with_fallback(None, schema, retractions, assertions) + } +} + impl AttributeCaches { + fn update_with_fallback(&mut self, fallback: Option<&AttributeCaches>, schema: &Schema, retractions: I, assertions: I) -> ::std::result::Result<(), ::errors::Error> + where I: Iterator { + let r_aevs = retractions.peekable(); + self.accumulate_into_cache(fallback, schema, r_aevs, AccumulationBehavior::Remove)?; + + let aevs = assertions.peekable(); + self.accumulate_into_cache(fallback, schema, aevs, AccumulationBehavior::Add { replacing: false }) + } + fn values_pairs(&self, schema: &Schema, attribute: U) -> Option<&BTreeMap>> where U: Into { let attribute = attribute.into(); @@ -448,7 +949,7 @@ impl AttributeCaches { }) } - fn value_pairs(&self, schema: &Schema, attribute: U) -> Option<&CacheMap> + fn value_pairs(&self, schema: &Schema, attribute: U) -> Option<&CacheMap>> where U: Into { let attribute = attribute.into(); schema.attribute_for_entid(attribute) @@ -463,20 +964,46 @@ impl AttributeCaches { } } -#[derive(Debug, Default)] +impl Absorb for AttributeCaches { + // Replace or insert attribute-cache pairs from `other` into `self`. + // Fold in any in-place deletions. + fn absorb(&mut self, other: Self) { + self.forward_cached_attributes.extend(other.forward_cached_attributes); + self.reverse_cached_attributes.extend(other.reverse_cached_attributes); + + self.single_vals.extend_by_absorbing(other.single_vals); + self.multi_vals.extend_by_absorbing(other.multi_vals); + self.unique_reverse.extend_by_absorbing(other.unique_reverse); + self.non_unique_reverse.extend_by_absorbing(other.non_unique_reverse); + } +} + +#[derive(Clone, Debug, Default)] pub struct SQLiteAttributeCache { - inner: AttributeCaches, + inner: Arc, } impl SQLiteAttributeCache { + fn make_mut<'s>(&'s mut self) -> &'s mut AttributeCaches { + Arc::make_mut(&mut self.inner) + } + + fn make_override(&self) -> AttributeCaches { + let mut new = AttributeCaches::default(); + new.forward_cached_attributes = self.inner.forward_cached_attributes.clone(); + new.reverse_cached_attributes = self.inner.reverse_cached_attributes.clone(); + new + } + pub fn register_forward(&mut self, schema: &Schema, sqlite: &rusqlite::Connection, attribute: U) -> Result<()> where U: Into { let a = attribute.into(); // The attribute must exist! let _ = schema.attribute_for_entid(a).ok_or_else(|| ErrorKind::UnknownAttribute(a))?; - self.inner.forward_cached_attributes.insert(a); - self.repopulate(schema, sqlite, a) + let caches = self.make_mut(); + caches.forward_cached_attributes.insert(a); + caches.repopulate(schema, sqlite, a) } pub fn register_reverse(&mut self, schema: &Schema, sqlite: &rusqlite::Connection, attribute: U) -> Result<()> @@ -486,8 +1013,9 @@ impl SQLiteAttributeCache { // The attribute must exist! let _ = schema.attribute_for_entid(a).ok_or_else(|| ErrorKind::UnknownAttribute(a))?; - self.inner.reverse_cached_attributes.insert(a); - self.repopulate(schema, sqlite, a) + let caches = self.make_mut(); + caches.reverse_cached_attributes.insert(a); + caches.repopulate(schema, sqlite, a) } pub fn register(&mut self, schema: &Schema, sqlite: &rusqlite::Connection, attribute: U) -> Result<()> @@ -496,30 +1024,27 @@ impl SQLiteAttributeCache { // TODO: reverse-index unique by default? - self.inner.forward_cached_attributes.insert(a); - self.inner.reverse_cached_attributes.insert(a); - self.repopulate(schema, sqlite, a) - } - - fn repopulate(&mut self, schema: &Schema, sqlite: &rusqlite::Connection, attribute: Entid) -> Result<()> { - let sql = "SELECT a, e, v, value_type_tag FROM datoms WHERE a = ? ORDER BY a ASC, e ASC"; - let args: Vec<&rusqlite::types::ToSql> = vec![&attribute]; - let mut stmt = sqlite.prepare(sql)?; - let rows = stmt.query_map(&args, row_to_aev as fn(&rusqlite::Row) -> Aev)?; - let aevs = AevRows { - rows: rows, - }; - self.inner.add_to_cache(schema, aevs.peekable(), true)?; - Ok(()) + let caches = self.make_mut(); + caches.forward_cached_attributes.insert(a); + caches.reverse_cached_attributes.insert(a); + caches.repopulate(schema, sqlite, a) } pub fn unregister(&mut self, attribute: U) where U: Into { - self.inner.unregister_attribute(attribute); + self.make_mut().unregister_attribute(attribute); } pub fn unregister_all(&mut self) { - self.inner.unregister_all_attributes(); + self.make_mut().unregister_all_attributes(); + } +} + +impl UpdateableCache for SQLiteAttributeCache { + type Error = ::errors::Error; + fn update(&mut self, schema: &Schema, retractions: I, assertions: I) -> ::std::result::Result<(), Self::Error> + where I: Iterator { + self.make_mut().update(schema, retractions, assertions) } } @@ -540,6 +1065,11 @@ impl CachedAttributes for SQLiteAttributeCache { self.inner.is_attribute_cached_forward(attribute) } + fn has_cached_attributes(&self) -> bool { + !self.inner.forward_cached_attributes.is_empty() || + !self.inner.reverse_cached_attributes.is_empty() + } + fn get_entids_for_value(&self, attribute: Entid, value: &TypedValue) -> Option<&BTreeSet> { self.inner.get_entids_for_value(attribute, value) } @@ -557,8 +1087,350 @@ impl SQLiteAttributeCache { } /// Intended for use from tests. - pub fn value_pairs(&self, schema: &Schema, attribute: U) -> Option<&BTreeMap> + pub fn value_pairs(&self, schema: &Schema, attribute: U) -> Option<&BTreeMap>> where U: Into { self.inner.value_pairs(schema, attribute) } } + +/// We maintain a diff on top of the `inner` -- existing -- cache. +/// That involves tracking unregisterings and registerings. +#[derive(Debug, Default)] +pub struct InProgressSQLiteAttributeCache { + inner: Arc, + pub overlay: AttributeCaches, + unregistered_forward: BTreeSet, + unregistered_reverse: BTreeSet, +} + +impl InProgressSQLiteAttributeCache { + pub fn from_cache(inner: SQLiteAttributeCache) -> InProgressSQLiteAttributeCache { + let overlay = inner.make_override(); + InProgressSQLiteAttributeCache { + inner: inner.inner, + overlay: overlay, + unregistered_forward: Default::default(), + unregistered_reverse: Default::default(), + } + } + + pub fn register_forward(&mut self, schema: &Schema, sqlite: &rusqlite::Connection, attribute: U) -> Result<()> + where U: Into { + let a = attribute.into(); + + // The attribute must exist! + let _ = schema.attribute_for_entid(a).ok_or_else(|| ErrorKind::UnknownAttribute(a))?; + + if self.is_attribute_cached_forward(a) { + return Ok(()); + } + + self.unregistered_forward.remove(&a); + self.overlay.forward_cached_attributes.insert(a); + self.overlay.repopulate(schema, sqlite, a) + } + + pub fn register_reverse(&mut self, schema: &Schema, sqlite: &rusqlite::Connection, attribute: U) -> Result<()> + where U: Into { + let a = attribute.into(); + + // The attribute must exist! + let _ = schema.attribute_for_entid(a).ok_or_else(|| ErrorKind::UnknownAttribute(a))?; + + if self.is_attribute_cached_reverse(a) { + return Ok(()); + } + + self.unregistered_reverse.remove(&a); + self.overlay.reverse_cached_attributes.insert(a); + self.overlay.repopulate(schema, sqlite, a) + } + + pub fn register(&mut self, schema: &Schema, sqlite: &rusqlite::Connection, attribute: U) -> Result<()> + where U: Into { + let a = attribute.into(); + + // The attribute must exist! + let _ = schema.attribute_for_entid(a).ok_or_else(|| ErrorKind::UnknownAttribute(a))?; + + // TODO: reverse-index unique by default? + let reverse_done = self.is_attribute_cached_reverse(a); + let forward_done = self.is_attribute_cached_forward(a); + + if forward_done && reverse_done { + return Ok(()); + } + + self.unregistered_forward.remove(&a); + self.unregistered_reverse.remove(&a); + if !reverse_done { + self.overlay.reverse_cached_attributes.insert(a); + } + if !forward_done { + self.overlay.forward_cached_attributes.insert(a); + } + + self.overlay.repopulate(schema, sqlite, a) + } + + + pub fn unregister(&mut self, attribute: U) + where U: Into { + let a = attribute.into(); + self.overlay.unregister_attribute(a); + self.unregistered_forward.insert(a); + self.unregistered_reverse.insert(a); + } + + pub fn unregister_all(&mut self) { + self.overlay.unregister_all_attributes(); + self.unregistered_forward.extend(self.inner.forward_cached_attributes.iter().cloned()); + self.unregistered_reverse.extend(self.inner.reverse_cached_attributes.iter().cloned()); + } +} + +impl UpdateableCache for InProgressSQLiteAttributeCache { + type Error = ::errors::Error; + fn update(&mut self, schema: &Schema, retractions: I, assertions: I) -> ::std::result::Result<(), Self::Error> + where I: Iterator { + self.overlay.update_with_fallback(Some(&self.inner), schema, retractions, assertions) + } +} + +impl CachedAttributes for InProgressSQLiteAttributeCache { + fn get_values_for_entid(&self, schema: &Schema, attribute: Entid, entid: Entid) -> Option<&Vec> { + if self.unregistered_forward.contains(&attribute) { + None + } else { + // If it was present in `inner` but the values were deleted, there will be an empty + // array in `overlay` -- `Some(vec![])` -- and we won't fall through. + // We can safely use `or_else`. + self.overlay + .get_values_for_entid(schema, attribute, entid) + .or_else(|| self.inner.get_values_for_entid(schema, attribute, entid)) + } + } + + fn get_value_for_entid(&self, schema: &Schema, attribute: Entid, entid: Entid) -> Option<&TypedValue> { + if self.unregistered_forward.contains(&attribute) { + None + } else { + // If it was present in `inner` but the value was deleted, there will be `Some(None)` + // in `overlay`, and we won't fall through. + // We can safely use `or_else`. + match self.overlay.get_value_for_entid_if_present(schema, attribute, entid) { + Some(present) => present, + None => self.inner.get_value_for_entid(schema, attribute, entid), + } + } + } + + fn is_attribute_cached_reverse(&self, attribute: Entid) -> bool { + !self.unregistered_reverse.contains(&attribute) && + (self.inner.reverse_cached_attributes.contains(&attribute) || + self.overlay.reverse_cached_attributes.contains(&attribute)) + } + + fn is_attribute_cached_forward(&self, attribute: Entid) -> bool { + !self.unregistered_forward.contains(&attribute) && + (self.inner.forward_cached_attributes.contains(&attribute) || + self.overlay.forward_cached_attributes.contains(&attribute)) + } + + fn has_cached_attributes(&self) -> bool { + // If we've added any, we're definitely not empty. + if self.overlay.has_cached_attributes() { + return true; + } + + // If we haven't removed any, pass through to inner. + if self.unregistered_forward.is_empty() && + self.unregistered_reverse.is_empty() { + return self.inner.has_cached_attributes(); + } + + // Otherwise, we need to check whether we've removed anything that was cached. + if self.inner + .forward_cached_attributes + .iter() + .filter(|a| !self.unregistered_forward.contains(a)) + .next() + .is_some() { + return true; + } + + self.inner + .reverse_cached_attributes + .iter() + .filter(|a| !self.unregistered_reverse.contains(a)) + .next() + .is_some() + } + + fn get_entids_for_value(&self, attribute: Entid, value: &TypedValue) -> Option<&BTreeSet> { + if self.unregistered_reverse.contains(&attribute) { + None + } else { + self.overlay + .get_entids_for_value(attribute, value) + .or_else(|| self.inner.get_entids_for_value(attribute, value)) + } + } + + fn get_entid_for_value(&self, attribute: Entid, value: &TypedValue) -> Option { + if self.unregistered_reverse.contains(&attribute) { + None + } else { + // If it was present in `inner` but the value was deleted, there will be `Some(None)` + // in `overlay`, and we won't fall through. + // We can safely use `or_else`. + match self.overlay.get_entid_for_value_if_present(attribute, value) { + Some(present) => present, + None => self.inner.get_entid_for_value(attribute, value), + } + } + } +} + +impl InProgressSQLiteAttributeCache { + /// Intended for use from tests. + pub fn values_pairs(&self, schema: &Schema, attribute: U) -> Option<&BTreeMap>> + where U: Into { + let a = attribute.into(); + self.overlay.values_pairs(schema, a) + .or_else(|| self.inner.values_pairs(schema, a)) + } + + /// Intended for use from tests. + pub fn value_pairs(&self, schema: &Schema, attribute: U) -> Option<&BTreeMap>> + where U: Into { + let a = attribute.into(); + self.overlay + .value_pairs(schema, a) + .or_else(|| self.inner.value_pairs(schema, a)) + } + + pub fn commit_to(self, destination: &mut SQLiteAttributeCache) { + // If the destination is empty, great: just take `overlay`. + if !destination.has_cached_attributes() { + destination.inner = Arc::new(self.overlay); + return; + } + + // If we have exclusive write access to the destination cache, update it in place. + // Because the `Conn` also contains an `Arc`, this will ordinarily never be the case. + // In order to hit this code block, we need to eliminate our reference… do so by dropping + // our copy of the `Arc`. + ::std::mem::drop(self.inner); + if let Some(dest) = Arc::get_mut(&mut destination.inner) { + // Yeah, we unregister in both directions. The only way + // `unregistered_forward` won't be the same as `unregistered_reverse` is if + // our `overlay` added one direction back in. + for unregistered in self.unregistered_forward.union(&self.unregistered_reverse) { + dest.unregister_attribute(*unregistered); + } + + // Now replace each attribute's entry with `overlay`. + dest.absorb(self.overlay); + return; + } + + // If we don't, populate `self.overlay` with whatever we didn't overwrite, + // and then shim it into `destination.` + // We haven't implemented this because it does not currently occur. + // TODO: do this! Then do this: + // destination.inner = Arc::new(self.overlay); + unimplemented!(); + } +} + +pub struct InProgressCacheTransactWatcher<'a> { + // A transaction might involve attributes that we cache. Track those values here so that + // we can update the cache after we commit the transaction. + collected_assertions: BTreeMap>>, + collected_retractions: BTreeMap>>, + cache: &'a mut InProgressSQLiteAttributeCache, + active: bool, +} + +impl<'a> InProgressCacheTransactWatcher<'a> { + fn new(cache: &'a mut InProgressSQLiteAttributeCache) -> InProgressCacheTransactWatcher<'a> { + let mut w = InProgressCacheTransactWatcher { + collected_assertions: Default::default(), + collected_retractions: Default::default(), + cache: cache, + active: true, + }; + + // This won't change during a transact. + w.active = w.cache.has_cached_attributes(); + w + } +} + +impl<'a> TransactWatcher for InProgressCacheTransactWatcher<'a> { + fn datom(&mut self, op: OpType, e: Entid, a: Entid, v: &TypedValue) { + if !self.active { + return; + } + + let target = if op == OpType::Add { + &mut self.collected_assertions + } else { + &mut self.collected_retractions + }; + match target.entry(a) { + Entry::Vacant(entry) => { + let is_cached = self.cache.is_attribute_cached_forward(a) || + self.cache.is_attribute_cached_reverse(a); + if is_cached { + entry.insert(Either::Right(vec![(e, v.clone())])); + } else { + entry.insert(Either::Left(())); + } + }, + Entry::Occupied(mut entry) => { + match entry.get_mut() { + &mut Either::Left(_) => { + // Nothing to do. + }, + &mut Either::Right(ref mut vec) => { + vec.push((e, v.clone())); + }, + } + }, + } + } + + fn done(&mut self, schema: &Schema) -> Result<()> { + // Oh, I wish we had impl trait. Without it we have a six-line type signature if we + // try to break this out as a helper function. + let collected_retractions = mem::replace(&mut self.collected_retractions, Default::default()); + let collected_assertions = mem::replace(&mut self.collected_assertions, Default::default()); + let mut intermediate_expansion = + once(collected_retractions) + .chain(once(collected_assertions)) + .into_iter() + .map(move |tree| tree.into_iter() + .filter_map(move |(a, evs)| { + match evs { + // Drop the empty placeholders. + Either::Left(_) => None, + Either::Right(vec) => Some((a, vec)), + } + }) + .flat_map(move |(a, evs)| { + // Flatten into a vec of (a, e, v). + evs.into_iter().map(move |(e, v)| (a, e, v)) + })); + let retractions = intermediate_expansion.next().unwrap(); + let assertions = intermediate_expansion.next().unwrap(); + self.cache.update(schema, retractions, assertions) + } +} + +impl InProgressSQLiteAttributeCache { + pub fn transact_watcher<'a>(&'a mut self) -> InProgressCacheTransactWatcher<'a> { + InProgressCacheTransactWatcher::new(self) + } +} diff --git a/db/src/db.rs b/db/src/db.rs index 3fcf4297..6318b2f6 100644 --- a/db/src/db.rs +++ b/db/src/db.rs @@ -66,6 +66,10 @@ use types::{ }; use tx::transact; +use watcher::{ + NullWatcher, +}; + pub fn new_connection(uri: T) -> rusqlite::Result where T: AsRef { let conn = match uri.as_ref().to_string_lossy().len() { 0 => rusqlite::Connection::open_in_memory()?, @@ -249,7 +253,8 @@ pub fn create_current_version(conn: &mut rusqlite::Connection) -> Result { // TODO: return to transact_internal to self-manage the encompassing SQLite transaction. let bootstrap_schema_for_mutation = Schema::default(); // The bootstrap transaction will populate this schema. - let (_report, next_partition_map, next_schema) = transact(&tx, db.partition_map, &bootstrap_schema_for_mutation, &db.schema, bootstrap::bootstrap_entities())?; + let (_report, next_partition_map, next_schema, _watcher) = transact(&tx, db.partition_map, &bootstrap_schema_for_mutation, &db.schema, NullWatcher(), bootstrap::bootstrap_entities())?; + // TODO: validate metadata mutations that aren't schema related, like additional partitions. if let Some(next_schema) = next_schema { if next_schema != db.schema { @@ -1218,12 +1223,12 @@ mod tests { // We're about to write, so go straight ahead and get an IMMEDIATE transaction. let tx = self.sqlite.transaction_with_behavior(TransactionBehavior::Immediate)?; // Applying the transaction can fail, so we don't unwrap. - let details = transact(&tx, self.partition_map.clone(), &self.schema, &self.schema, entities)?; + let details = transact(&tx, self.partition_map.clone(), &self.schema, &self.schema, NullWatcher(), entities)?; tx.commit()?; details }; - let (report, next_partition_map, next_schema) = details; + let (report, next_partition_map, next_schema, _watcher) = details; self.partition_map = next_partition_map; if let Some(next_schema) = next_schema { self.schema = next_schema; diff --git a/db/src/lib.rs b/db/src/lib.rs index e1b56bb1..19e9619b 100644 --- a/db/src/lib.rs +++ b/db/src/lib.rs @@ -17,6 +17,8 @@ extern crate itertools; #[macro_use] extern crate lazy_static; + +extern crate num; extern crate rusqlite; extern crate tabwriter; extern crate time; @@ -43,6 +45,7 @@ pub mod errors; pub mod internal_types; // pub because we need them for building entities programmatically. mod metadata; mod schema; +mod watcher; mod tx; pub mod types; mod upsert_resolution; @@ -73,6 +76,10 @@ pub use db::{ new_connection, }; +pub use watcher::{ + TransactWatcher, +}; + pub use tx::{ transact, transact_terms, diff --git a/db/src/tx.rs b/db/src/tx.rs index c364d0c8..d416dfc8 100644 --- a/db/src/tx.rs +++ b/db/src/tx.rs @@ -51,6 +51,7 @@ use std::collections::{ BTreeSet, VecDeque, }; + use std::rc::Rc; use db; @@ -113,11 +114,16 @@ use types::{ TxReport, ValueType, }; + +use watcher::{ + TransactWatcher, +}; + use upsert_resolution::Generation; /// A transaction on its way to being applied. #[derive(Debug)] -pub struct Tx<'conn, 'a> { +pub struct Tx<'conn, 'a, W> where W: TransactWatcher { /// The storage to apply against. In the future, this will be a Mentat connection. store: &'conn rusqlite::Connection, // TODO: db::MentatStoring, @@ -138,6 +144,8 @@ pub struct Tx<'conn, 'a> { /// This schema is not updated, so we just borrow it. schema: &'a Schema, + watcher: W, + /// The transaction ID of the transaction. tx_id: Entid, @@ -145,18 +153,20 @@ pub struct Tx<'conn, 'a> { tx_instant: Option>, } -impl<'conn, 'a> Tx<'conn, 'a> { +impl<'conn, 'a, W> Tx<'conn, 'a, W> where W: TransactWatcher { pub fn new( store: &'conn rusqlite::Connection, partition_map: PartitionMap, schema_for_mutation: &'a Schema, schema: &'a Schema, - tx_id: Entid) -> Tx<'conn, 'a> { + watcher: W, + tx_id: Entid) -> Tx<'conn, 'a, W> { Tx { store: store, partition_map: partition_map, schema_for_mutation: Cow::Borrowed(schema_for_mutation), schema: schema, + watcher: watcher, tx_id: tx_id, tx_instant: None, } @@ -516,7 +526,9 @@ impl<'conn, 'a> Tx<'conn, 'a> { /// /// This approach is explained in https://github.com/mozilla/mentat/wiki/Transacting. // TODO: move this to the transactor layer. - pub fn transact_entities(&mut self, entities: I) -> Result where I: IntoIterator { + pub fn transact_entities(&mut self, entities: I) -> Result + where I: IntoIterator, + W: TransactWatcher { // Pipeline stage 1: entities -> terms with tempids and lookup refs. let (terms_with_temp_ids_and_lookup_refs, tempid_set, lookup_ref_set) = self.entities_into_terms_with_temp_ids_and_lookup_refs(entities)?; @@ -529,7 +541,9 @@ impl<'conn, 'a> Tx<'conn, 'a> { self.transact_simple_terms(terms_with_temp_ids, tempid_set) } - pub fn transact_simple_terms(&mut self, terms: I, tempid_set: InternSet) -> Result where I: IntoIterator { + pub fn transact_simple_terms(&mut self, terms: I, tempid_set: InternSet) -> Result + where I: IntoIterator, + W: TransactWatcher { // TODO: push these into an internal transaction report? let mut tempids: BTreeMap = BTreeMap::default(); @@ -654,6 +668,8 @@ impl<'conn, 'a> Tx<'conn, 'a> { } } + self.watcher.datom(op, e, a, &v); + let reduced = (e, a, attribute, v, added); match (attribute.fulltext, attribute.multival) { (false, true) => non_fts_many.push(reduced), @@ -694,6 +710,7 @@ impl<'conn, 'a> Tx<'conn, 'a> { } db::update_partition_map(self.store, &self.partition_map)?; + self.watcher.done(self.schema)?; if tx_might_update_metadata { // Extract changes to metadata from the store. @@ -723,24 +740,27 @@ impl<'conn, 'a> Tx<'conn, 'a> { } /// Initialize a new Tx object with a new tx id and a tx instant. Kick off the SQLite conn, too. -fn start_tx<'conn, 'a>(conn: &'conn rusqlite::Connection, +fn start_tx<'conn, 'a, W>(conn: &'conn rusqlite::Connection, mut partition_map: PartitionMap, schema_for_mutation: &'a Schema, - schema: &'a Schema) -> Result> { + schema: &'a Schema, + watcher: W) -> Result> + where W: TransactWatcher { let tx_id = partition_map.allocate_entid(":db.part/tx"); conn.begin_tx_application()?; - Ok(Tx::new(conn, partition_map, schema_for_mutation, schema, tx_id)) + Ok(Tx::new(conn, partition_map, schema_for_mutation, schema, watcher, tx_id)) } -fn conclude_tx(tx: Tx, report: TxReport) -> Result<(TxReport, PartitionMap, Option)> { +fn conclude_tx(tx: Tx, report: TxReport) -> Result<(TxReport, PartitionMap, Option, W)> +where W: TransactWatcher { // If the schema has moved on, return it. let next_schema = match tx.schema_for_mutation { Cow::Borrowed(_) => None, Cow::Owned(next_schema) => Some(next_schema), }; - Ok((report, tx.partition_map, next_schema)) + Ok((report, tx.partition_map, next_schema, tx.watcher)) } /// Transact the given `entities` against the given SQLite `conn`, using the given metadata. @@ -749,28 +769,32 @@ fn conclude_tx(tx: Tx, report: TxReport) -> Result<(TxReport, PartitionMap, Opti /// /// This approach is explained in https://github.com/mozilla/mentat/wiki/Transacting. // TODO: move this to the transactor layer. -pub fn transact<'conn, 'a, I>(conn: &'conn rusqlite::Connection, - partition_map: PartitionMap, - schema_for_mutation: &'a Schema, - schema: &'a Schema, - entities: I) -> Result<(TxReport, PartitionMap, Option)> - where I: IntoIterator { +pub fn transact<'conn, 'a, I, W>(conn: &'conn rusqlite::Connection, + partition_map: PartitionMap, + schema_for_mutation: &'a Schema, + schema: &'a Schema, + watcher: W, + entities: I) -> Result<(TxReport, PartitionMap, Option, W)> + where I: IntoIterator, + W: TransactWatcher { - let mut tx = start_tx(conn, partition_map, schema_for_mutation, schema)?; + let mut tx = start_tx(conn, partition_map, schema_for_mutation, schema, watcher)?; let report = tx.transact_entities(entities)?; conclude_tx(tx, report) } /// Just like `transact`, but accepts lower-level inputs to allow bypassing the parser interface. -pub fn transact_terms<'conn, 'a, I>(conn: &'conn rusqlite::Connection, - partition_map: PartitionMap, - schema_for_mutation: &'a Schema, - schema: &'a Schema, - terms: I, - tempid_set: InternSet) -> Result<(TxReport, PartitionMap, Option)> - where I: IntoIterator { +pub fn transact_terms<'conn, 'a, I, W>(conn: &'conn rusqlite::Connection, + partition_map: PartitionMap, + schema_for_mutation: &'a Schema, + schema: &'a Schema, + watcher: W, + terms: I, + tempid_set: InternSet) -> Result<(TxReport, PartitionMap, Option, W)> + where I: IntoIterator, + W: TransactWatcher { - let mut tx = start_tx(conn, partition_map, schema_for_mutation, schema)?; + let mut tx = start_tx(conn, partition_map, schema_for_mutation, schema, watcher)?; let report = tx.transact_simple_terms(terms, tempid_set)?; conclude_tx(tx, report) } diff --git a/db/src/watcher.rs b/db/src/watcher.rs new file mode 100644 index 00000000..fa22cb6c --- /dev/null +++ b/db/src/watcher.rs @@ -0,0 +1,53 @@ +// Copyright 2018 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +// A trivial interface for extracting information from a transact as it happens. +// We have two situations in which we need to do this: +// +// - InProgress and Conn both have attribute caches. InProgress's is different from Conn's, +// because it needs to be able to roll back. These wish to see changes in a certain set of +// attributes in order to synchronously update the cache during a write. +// - When observers are registered we want to flip some flags as writes occur so that we can +// notifying them outside the transaction. + +use mentat_core::{ + Entid, + Schema, + TypedValue, +}; + +use mentat_tx::entities::{ + OpType, +}; + +use errors::{ + Result, +}; + +pub trait TransactWatcher { + fn datom(&mut self, op: OpType, e: Entid, a: Entid, v: &TypedValue); + + /// Only return an error if you want to interrupt the transact! + /// Called with the schema _prior to_ the transact -- any attributes or + /// attribute changes transacted during this transact are not reflected in + /// the schema. + fn done(&mut self, schema: &Schema) -> Result<()>; +} + +pub struct NullWatcher(); + +impl TransactWatcher for NullWatcher { + fn datom(&mut self, _op: OpType, _e: Entid, _a: Entid, _v: &TypedValue) { + } + + fn done(&mut self, _schema: &Schema) -> Result<()> { + Ok(()) + } +} diff --git a/query-algebrizer/src/clauses/mod.rs b/query-algebrizer/src/clauses/mod.rs index 2e2d382e..1acc09c3 100644 --- a/query-algebrizer/src/clauses/mod.rs +++ b/query-algebrizer/src/clauses/mod.rs @@ -156,7 +156,7 @@ impl Intersection for BTreeMap { } } -type VariableBindings = BTreeMap; +pub type VariableBindings = BTreeMap; /// A `ConjoiningClauses` (CC) is a collection of clauses that are combined with `JOIN`. /// The topmost form in a query is a `ConjoiningClauses`. @@ -393,6 +393,10 @@ impl ConjoiningClauses { self.value_bindings.contains_key(var) } + pub fn value_bindings(&self, variables: &BTreeSet) -> VariableBindings { + self.value_bindings.with_intersected_keys(variables) + } + /// Return an interator over the variables externally bound to values. pub fn value_bound_variables(&self) -> ::std::collections::btree_map::Keys { self.value_bindings.keys() diff --git a/query-algebrizer/src/clauses/pattern.rs b/query-algebrizer/src/clauses/pattern.rs index 8b6aaf3d..d5104ebf 100644 --- a/query-algebrizer/src/clauses/pattern.rs +++ b/query-algebrizer/src/clauses/pattern.rs @@ -381,7 +381,6 @@ impl ConjoiningClauses { return true; }, Some(item) => { - println!("{} is known to be {:?}", var, item); self.bind_value(var, item.clone()); return true; } diff --git a/query-algebrizer/src/lib.rs b/query-algebrizer/src/lib.rs index 348a2b75..ed13ba75 100644 --- a/query-algebrizer/src/lib.rs +++ b/query-algebrizer/src/lib.rs @@ -38,6 +38,7 @@ use mentat_core::{ use mentat_core::counter::RcCounter; use mentat_query::{ + Element, FindQuery, FindSpec, Limit, @@ -55,6 +56,7 @@ pub use errors::{ pub use clauses::{ QueryInputs, + VariableBindings, }; pub use types::{ @@ -140,6 +142,23 @@ impl AlgebraicQuery { self.cc.is_known_empty() } + /// Return true if every variable in the find spec is fully bound to a single value. + pub fn is_fully_bound(&self) -> bool { + self.find_spec + .columns() + .all(|e| match e { + &Element::Variable(ref var) => self.cc.is_value_bound(var), + }) + } + + /// Return true if every variable in the find spec is fully bound to a single value, + /// and evaluating the query doesn't require running SQL. + pub fn is_fully_unit_bound(&self) -> bool { + self.cc.wheres.is_empty() && + self.is_fully_bound() + } + + /// Return a set of the input variables mentioned in the `:in` clause that have not yet been /// bound. We do this by looking at the CC. pub fn unbound_variables(&self) -> BTreeSet { diff --git a/query-projector/src/lib.rs b/query-projector/src/lib.rs index 6a0b8637..f44591ad 100644 --- a/query-projector/src/lib.rs +++ b/query-projector/src/lib.rs @@ -19,6 +19,10 @@ extern crate mentat_query_algebrizer; extern crate mentat_query_sql; extern crate mentat_sql; +use std::collections::{ + BTreeSet, +}; + use std::iter; use std::rc::Rc; @@ -34,6 +38,10 @@ use mentat_core::{ ValueTypeTag, }; +use mentat_core::util::{ + Either, +}; + use mentat_db::{ TypedSQLValue, }; @@ -49,6 +57,7 @@ use mentat_query_algebrizer::{ AlgebraicQuery, ColumnName, ConjoiningClauses, + VariableBindings, VariableColumn, }; @@ -86,7 +95,7 @@ pub struct QueryOutput { pub results: QueryResults, } -#[derive(Debug, PartialEq, Eq)] +#[derive(Clone, Debug, PartialEq, Eq)] pub enum QueryResults { Scalar(Option), Tuple(Option>), @@ -134,6 +143,32 @@ impl QueryOutput { } } + pub fn from_constants(spec: &Rc, bindings: VariableBindings) -> QueryResults { + use self::FindSpec::*; + match &**spec { + &FindScalar(Element::Variable(ref var)) => { + let val = bindings.get(var).cloned(); + QueryResults::Scalar(val) + }, + &FindTuple(ref elements) => { + let values = elements.iter().map(|e| match e { + &Element::Variable(ref var) => bindings.get(var).cloned().expect("every var to have a binding"), + }).collect(); + QueryResults::Tuple(Some(values)) + }, + &FindColl(Element::Variable(ref var)) => { + let val = bindings.get(var).cloned().expect("every var to have a binding"); + QueryResults::Coll(vec![val]) + }, + &FindRel(ref elements) => { + let values = elements.iter().map(|e| match e { + &Element::Variable(ref var) => bindings.get(var).cloned().expect("every var to have a binding"), + }).collect(); + QueryResults::Rel(vec![values]) + }, + } + } + pub fn into_scalar(self) -> Result> { self.results.into_scalar() } @@ -350,11 +385,12 @@ fn project_elements<'a, I: IntoIterator>( pub trait Projector { fn project<'stmt>(&self, rows: Rows<'stmt>) -> Result; + fn columns<'s>(&'s self) -> Box + 's>; } /// A projector that produces a `QueryResult` containing fixed data. /// Takes a boxed function that should return an empty result set of the desired type. -struct ConstantProjector { +pub struct ConstantProjector { spec: Rc, results_factory: Box QueryResults>, } @@ -366,10 +402,8 @@ impl ConstantProjector { results_factory: results_factory, } } -} -impl Projector for ConstantProjector { - fn project<'stmt>(&self, _: Rows<'stmt>) -> Result { + pub fn project_without_rows<'stmt>(&self) -> Result { let results = (self.results_factory)(); let spec = self.spec.clone(); Ok(QueryOutput { @@ -379,6 +413,16 @@ impl Projector for ConstantProjector { } } +impl Projector for ConstantProjector { + fn project<'stmt>(&self, _: Rows<'stmt>) -> Result { + self.project_without_rows() + } + + fn columns<'s>(&'s self) -> Box + 's> { + self.spec.columns() + } +} + struct ScalarProjector { spec: Rc, template: TypedIndex, @@ -417,6 +461,10 @@ impl Projector for ScalarProjector { results: results, }) } + + fn columns<'s>(&'s self) -> Box + 's> { + self.spec.columns() + } } /// A tuple projector produces a single vector. It's the single-result version of rel. @@ -470,6 +518,10 @@ impl Projector for TupleProjector { results: results, }) } + + fn columns<'s>(&'s self) -> Box + 's> { + self.spec.columns() + } } /// A rel projector produces a vector of vectors. @@ -524,6 +576,10 @@ impl Projector for RelProjector { results: QueryResults::Rel(out), }) } + + fn columns<'s>(&'s self) -> Box + 's> { + self.spec.columns() + } } /// A coll projector produces a vector of values. @@ -564,6 +620,10 @@ impl Projector for CollProjector { results: QueryResults::Coll(out), }) } + + fn columns<'s>(&'s self) -> Box + 's> { + self.spec.columns() + } } /// Combines the two things you need to turn a query into SQL and turn its results into @@ -598,19 +658,24 @@ impl CombinedProjection { /// - The bindings established by the topmost CC. /// - The types known at algebrizing time. /// - The types extracted from the store for unknown attributes. -pub fn query_projection(query: &AlgebraicQuery) -> Result { +pub fn query_projection(query: &AlgebraicQuery) -> Result> { use self::FindSpec::*; let spec = query.find_spec.clone(); - if query.is_known_empty() { + if query.is_fully_unit_bound() { + // Do a few gyrations to produce empty results of the right kind for the query. + + let variables: BTreeSet = spec.columns().map(|e| match e { &Element::Variable(ref var) => var.clone() }).collect(); + + // TODO: error handling + let results = QueryOutput::from_constants(&spec, query.cc.value_bindings(&variables)); + let f = Box::new(move || {results.clone()}); + + Ok(Either::Left(ConstantProjector::new(spec, f))) + } else if query.is_known_empty() { // Do a few gyrations to produce empty results of the right kind for the query. let empty = QueryOutput::empty_factory(&spec); - let constant_projector = ConstantProjector::new(spec, empty); - Ok(CombinedProjection { - sql_projection: Projection::One, - datalog_projector: Box::new(constant_projector), - distinct: false, - }) + Ok(Either::Left(ConstantProjector::new(spec, empty))) } else { match *query.find_spec { FindColl(ref element) => { @@ -634,6 +699,6 @@ pub fn query_projection(query: &AlgebraicQuery) -> Result { let (cols, templates) = project_elements(column_count, elements, query)?; TupleProjector::combine(spec, column_count, cols, templates) }, - } + }.map(Either::Right) } } diff --git a/query-translator/src/lib.rs b/query-translator/src/lib.rs index 003e187a..0cec2d50 100644 --- a/query-translator/src/lib.rs +++ b/query-translator/src/lib.rs @@ -24,6 +24,7 @@ pub use mentat_query_sql::{ }; pub use translate::{ + ProjectedSelect, cc_to_exists, query_to_select, }; diff --git a/query-translator/src/translate.rs b/query-translator/src/translate.rs index 75a51f00..a5594a80 100644 --- a/query-translator/src/translate.rs +++ b/query-translator/src/translate.rs @@ -17,7 +17,13 @@ use mentat_core::{ ValueTypeSet, }; -use mentat_query::Limit; +use mentat_core::util::{ + Either, +}; + +use mentat_query::{ + Limit, +}; use mentat_query_algebrizer::{ AlgebraicQuery, @@ -40,6 +46,7 @@ use mentat_query_algebrizer::{ use mentat_query_projector::{ CombinedProjection, + ConstantProjector, Projector, projected_column_for_var, query_projection, @@ -237,9 +244,12 @@ impl ToConstraint for ColumnConstraint { } } -pub struct ProjectedSelect{ - pub query: SelectQuery, - pub projector: Box, +pub enum ProjectedSelect { + Constant(ConstantProjector), + Query { + query: SelectQuery, + projector: Box, + }, } // Nasty little hack to let us move out of indexed context. @@ -325,6 +335,17 @@ fn table_for_computed(computed: ComputedTable, alias: TableAlias) -> TableOrSubq } } +fn empty_query() -> SelectQuery { + SelectQuery { + distinct: false, + projection: Projection::One, + from: FromClause::Nothing, + constraints: vec![], + order: vec![], + limit: Limit::None, + } +} + /// Returns a `SelectQuery` that queries for the provided `cc`. Note that this _always_ returns a /// query that runs SQL. The next level up the call stack can check for known-empty queries if /// needed. @@ -380,14 +401,7 @@ fn cc_to_select_query(projection: Projection, pub fn cc_to_exists(cc: ConjoiningClauses) -> SelectQuery { if cc.is_known_empty() { // In this case we can produce a very simple query that returns no results. - SelectQuery { - distinct: false, - projection: Projection::One, - from: FromClause::Nothing, - constraints: vec![], - order: vec![], - limit: Limit::None, - } + empty_query() } else { cc_to_select_query(Projection::One, cc, false, None, Limit::None) } @@ -398,9 +412,14 @@ pub fn cc_to_exists(cc: ConjoiningClauses) -> SelectQuery { pub fn query_to_select(query: AlgebraicQuery) -> Result { // TODO: we can't pass `query.limit` here if we aggregate during projection. // SQL-based aggregation -- `SELECT SUM(datoms00.e)` -- is fine. - let CombinedProjection { sql_projection, datalog_projector, distinct } = query_projection(&query)?; - Ok(ProjectedSelect { - query: cc_to_select_query(sql_projection, query.cc, distinct, query.order, query.limit), - projector: datalog_projector, - }) + query_projection(&query).map(|e| match e { + Either::Left(constant) => ProjectedSelect::Constant(constant), + Either::Right(CombinedProjection { sql_projection, datalog_projector, distinct, }) => { + let q = cc_to_select_query(sql_projection, query.cc, distinct, query.order, query.limit); + ProjectedSelect::Query { + query: q, + projector: datalog_projector, + } + }, + }).map_err(|e| e.into()) } diff --git a/query-translator/tests/translate.rs b/query-translator/tests/translate.rs index b7dafde9..8e11e9bb 100644 --- a/query-translator/tests/translate.rs +++ b/query-translator/tests/translate.rs @@ -12,6 +12,7 @@ extern crate mentat_core; extern crate mentat_query; extern crate mentat_query_algebrizer; extern crate mentat_query_parser; +extern crate mentat_query_projector; extern crate mentat_query_translator; extern crate mentat_sql; @@ -20,6 +21,7 @@ use std::collections::BTreeMap; use std::rc::Rc; use mentat_query::{ + FindSpec, NamespacedKeyword, Variable, }; @@ -39,12 +41,27 @@ use mentat_query_algebrizer::{ algebrize, algebrize_with_inputs, }; + +use mentat_query_projector::{ + ConstantProjector, +}; + use mentat_query_translator::{ + ProjectedSelect, query_to_select, }; use mentat_sql::SQLQuery; +/// Produce the appropriate `Variable` for the provided valid ?-prefixed name. +/// This lives here because we can't re-export macros: +/// https://github.com/rust-lang/rust/issues/29638. +macro_rules! var { + ( ? $var:ident ) => { + $crate::Variable::from_valid_name(concat!("?", stringify!($var))) + }; +} + fn associate_ident(schema: &mut Schema, i: NamespacedKeyword, e: Entid) { schema.entid_map.insert(e, i.clone()); schema.ident_map.insert(i.clone(), e); @@ -54,18 +71,56 @@ fn add_attribute(schema: &mut Schema, e: Entid, a: Attribute) { schema.attribute_map.insert(e, a); } -fn translate_with_inputs(schema: &Schema, query: &'static str, inputs: QueryInputs) -> SQLQuery { +fn query_to_sql(query: ProjectedSelect) -> SQLQuery { + match query { + ProjectedSelect::Query { query, projector: _projector } => { + query.to_sql_query().expect("to_sql_query to succeed") + }, + ProjectedSelect::Constant(constant) => { + panic!("ProjectedSelect wasn't ::Query! Got constant {:#?}", constant.project_without_rows()); + }, + } +} + +fn query_to_constant(query: ProjectedSelect) -> ConstantProjector { + match query { + ProjectedSelect::Constant(constant) => { + constant + }, + _ => panic!("ProjectedSelect wasn't ::Constant!"), + } +} + +fn assert_query_is_empty(query: ProjectedSelect, expected_spec: FindSpec) { + let constant = query_to_constant(query).project_without_rows().expect("constant run"); + assert_eq!(*constant.spec, expected_spec); + assert!(constant.results.is_empty()); +} + +fn inner_translate_with_inputs(schema: &Schema, query: &'static str, inputs: QueryInputs) -> ProjectedSelect { let known = Known::for_schema(schema); let parsed = parse_find_string(query).expect("parse to succeed"); let algebrized = algebrize_with_inputs(known, parsed, 0, inputs).expect("algebrize to succeed"); - let select = query_to_select(algebrized).expect("translate to succeed"); - select.query.to_sql_query().unwrap() + query_to_select(algebrized).expect("translate to succeed") +} + +fn translate_with_inputs(schema: &Schema, query: &'static str, inputs: QueryInputs) -> SQLQuery { + query_to_sql(inner_translate_with_inputs(schema, query, inputs)) } fn translate(schema: &Schema, query: &'static str) -> SQLQuery { translate_with_inputs(schema, query, QueryInputs::default()) } +fn translate_with_inputs_to_constant(schema: &Schema, query: &'static str, inputs: QueryInputs) -> ConstantProjector { + query_to_constant(inner_translate_with_inputs(schema, query, inputs)) +} + +fn translate_to_constant(schema: &Schema, query: &'static str) -> ConstantProjector { + translate_with_inputs_to_constant(schema, query, QueryInputs::default()) +} + + fn prepopulated_typed_schema(foo_type: ValueType) -> Schema { let mut schema = Schema::default(); associate_ident(&mut schema, NamespacedKeyword::new("foo", "bar"), 99); @@ -195,7 +250,7 @@ fn test_bound_variable_limit_affects_types() { algebrized.cc.known_type(&Variable::from_valid_name("?limit"))); let select = query_to_select(algebrized).expect("query to translate"); - let SQLQuery { sql, args } = select.query.to_sql_query().unwrap(); + let SQLQuery { sql, args } = query_to_sql(select); // TODO: this query isn't actually correct -- we don't yet algebrize for variables that are // specified in `:in` but not provided at algebrizing time. But it shows what we care about @@ -286,8 +341,7 @@ fn test_unknown_ident() { // If you insist… let select = query_to_select(algebrized).expect("query to translate"); - let sql = select.query.to_sql_query().unwrap().sql; - assert_eq!("SELECT 1 LIMIT 0", sql); + assert_query_is_empty(select, FindSpec::FindRel(vec![var!(?x).into()])); } #[test] @@ -678,16 +732,18 @@ fn test_ground_scalar() { // Verify that we accept inline constants. let query = r#"[:find ?x . :where [(ground "yyy") ?x]]"#; - let SQLQuery { sql, args } = translate(&schema, query); - assert_eq!(sql, "SELECT $v0 AS `?x` LIMIT 1"); - assert_eq!(args, vec![make_arg("$v0", "yyy")]); + let constant = translate_to_constant(&schema, query); + assert_eq!(constant.project_without_rows().unwrap() + .into_scalar().unwrap(), + Some(TypedValue::typed_string("yyy"))); // Verify that we accept bound input constants. let query = r#"[:find ?x . :in ?v :where [(ground ?v) ?x]]"#; let inputs = QueryInputs::with_value_sequence(vec![(Variable::from_valid_name("?v"), TypedValue::String(Rc::new("aaa".into())))]); - let SQLQuery { sql, args } = translate_with_inputs(&schema, query, inputs); - assert_eq!(sql, "SELECT $v0 AS `?x` LIMIT 1"); - assert_eq!(args, vec![make_arg("$v0", "aaa"),]); + let constant = translate_with_inputs_to_constant(&schema, query, inputs); + assert_eq!(constant.project_without_rows().unwrap() + .into_scalar().unwrap(), + Some(TypedValue::typed_string("aaa"))); } #[test] @@ -696,18 +752,26 @@ fn test_ground_tuple() { // Verify that we accept inline constants. let query = r#"[:find ?x ?y :where [(ground [1 "yyy"]) [?x ?y]]]"#; - let SQLQuery { sql, args } = translate(&schema, query); - assert_eq!(sql, "SELECT DISTINCT 1 AS `?x`, $v0 AS `?y`"); - assert_eq!(args, vec![make_arg("$v0", "yyy")]); + let constant = translate_to_constant(&schema, query); + assert_eq!(constant.project_without_rows().unwrap() + .into_rel().unwrap(), + vec![vec![TypedValue::Long(1), TypedValue::typed_string("yyy")]]); // Verify that we accept bound input constants. let query = r#"[:find [?x ?y] :in ?u ?v :where [(ground [?u ?v]) [?x ?y]]]"#; let inputs = QueryInputs::with_value_sequence(vec![(Variable::from_valid_name("?u"), TypedValue::Long(2)), (Variable::from_valid_name("?v"), TypedValue::String(Rc::new("aaa".into()))),]); - let SQLQuery { sql, args } = translate_with_inputs(&schema, query, inputs); + + let constant = translate_with_inputs_to_constant(&schema, query, inputs); + assert_eq!(constant.project_without_rows().unwrap() + .into_tuple().unwrap(), + Some(vec![TypedValue::Long(2), TypedValue::typed_string("aaa")])); + // TODO: treat 2 as an input variable that could be bound late, rather than eagerly binding it. - assert_eq!(sql, "SELECT 2 AS `?x`, $v0 AS `?y` LIMIT 1"); - assert_eq!(args, vec![make_arg("$v0", "aaa"),]); + // In that case the query wouldn't be constant, and would look more like: + // let SQLQuery { sql, args } = translate_with_inputs(&schema, query, inputs); + // assert_eq!(sql, "SELECT 2 AS `?x`, $v0 AS `?y` LIMIT 1"); + // assert_eq!(args, vec![make_arg("$v0", "aaa"),]); } #[test] diff --git a/query/src/lib.rs b/query/src/lib.rs index e1eea1d7..d041ca63 100644 --- a/query/src/lib.rs +++ b/query/src/lib.rs @@ -449,6 +449,12 @@ pub enum Element { // Pull(Pull), // TODO } +impl From for Element { + fn from(x: Variable) -> Element { + Element::Variable(x) + } +} + impl std::fmt::Display for Element { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { match self { diff --git a/src/conn.rs b/src/conn.rs index 05ed53b6..7c88d8f3 100644 --- a/src/conn.rs +++ b/src/conn.rs @@ -25,9 +25,6 @@ use std::path::{ use std::sync::{ Arc, Mutex, - RwLock, - RwLockReadGuard, - RwLockWriteGuard, }; use rusqlite; @@ -50,7 +47,11 @@ use mentat_core::{ use mentat_core::intern_set::InternSet; -use mentat_db::cache::SQLiteAttributeCache; +use mentat_db::cache::{ + InProgressSQLiteAttributeCache, + SQLiteAttributeCache, +}; + use mentat_db::db; use mentat_db::{ transact, @@ -101,15 +102,17 @@ pub struct Metadata { pub generation: u64, pub partition_map: PartitionMap, pub schema: Arc, + pub attribute_cache: SQLiteAttributeCache, } impl Metadata { // Intentionally not public. - fn new(generation: u64, partition_map: PartitionMap, schema: Arc) -> Metadata { + fn new(generation: u64, partition_map: PartitionMap, schema: Arc, cache: SQLiteAttributeCache) -> Metadata { Metadata { generation: generation, partition_map: partition_map, schema: schema, + attribute_cache: cache, } } } @@ -118,20 +121,25 @@ impl Metadata { pub struct Conn { /// `Mutex` since all reads and writes need to be exclusive. Internally, owned data for the /// volatile parts (generation and partition map), and `Arc` for the infrequently changing parts - /// (schema) that we want to share across threads. A consuming thread may use a shared + /// (schema, cache) that we want to share across threads. A consuming thread may use a shared /// reference after the `Conn`'s `Metadata` has moved on. /// /// The motivating case is multiple query threads taking references to the current schema to /// perform long-running queries while a single writer thread moves the metadata -- partition /// map and schema -- forward. + /// + /// We want the attribute cache to be isolated across transactions, updated within + /// `InProgress` writes, and updated in the `Conn` on commit. To achieve this we + /// store the cache itself in an `Arc` inside `SQLiteAttributeCache`, so that `.get_mut()` + /// gives us copy-on-write semantics. + /// We store that cached `Arc` here in a `Mutex`, so that the main copy can be carefully + /// replaced on commit. metadata: Mutex, // TODO: maintain set of change listeners or handles to transaction report queues. #298. // TODO: maintain cache of query plans that could be shared across threads and invalidated when // the schema changes. #315. - - attribute_cache: RwLock, } /// A convenience wrapper around a single SQLite connection and a Conn. This is suitable @@ -194,7 +202,8 @@ pub struct InProgress<'a, 'c> { generation: u64, partition_map: PartitionMap, schema: Schema, - cache: RwLockWriteGuard<'a, SQLiteAttributeCache>, + + cache: InProgressSQLiteAttributeCache, use_caching: bool, } @@ -235,7 +244,7 @@ impl<'a, 'c> Queryable for InProgress<'a, 'c> { where T: Into> { if self.use_caching { - let known = Known::new(&self.schema, Some(&*self.cache)); + let known = Known::new(&self.schema, Some(&self.cache)); q_once(&*(self.transaction), known, query, @@ -251,7 +260,7 @@ impl<'a, 'c> Queryable for InProgress<'a, 'c> { fn q_prepare(&self, query: &str, inputs: T) -> PreparedResult where T: Into> { - let known = Known::new(&self.schema, Some(&*self.cache)); + let known = Known::new(&self.schema, Some(&self.cache)); q_prepare(&*(self.transaction), known, query, @@ -261,7 +270,7 @@ impl<'a, 'c> Queryable for InProgress<'a, 'c> { fn q_explain(&self, query: &str, inputs: T) -> Result where T: Into> { - let known = Known::new(&self.schema, Some(&*self.cache)); + let known = Known::new(&self.schema, Some(&self.cache)); q_explain(&*(self.transaction), known, query, @@ -270,13 +279,13 @@ impl<'a, 'c> Queryable for InProgress<'a, 'c> { fn lookup_values_for_attribute(&self, entity: E, attribute: &edn::NamespacedKeyword) -> Result> where E: Into { - let known = Known::new(&self.schema, Some(&*self.cache)); + let known = Known::new(&self.schema, Some(&self.cache)); lookup_values_for_attribute(&*(self.transaction), known, entity, attribute) } fn lookup_value_for_attribute(&self, entity: E, attribute: &edn::NamespacedKeyword) -> Result> where E: Into { - let known = Known::new(&self.schema, Some(&*self.cache)); + let known = Known::new(&self.schema, Some(&self.cache)); lookup_value_for_attribute(&*(self.transaction), known, entity, attribute) } } @@ -357,12 +366,14 @@ impl<'a, 'c> InProgress<'a, 'c> { } pub fn transact_terms(&mut self, terms: I, tempid_set: InternSet) -> Result where I: IntoIterator { - let (report, next_partition_map, next_schema) = transact_terms(&self.transaction, - self.partition_map.clone(), - &self.schema, - &self.schema, - terms, - tempid_set)?; + let (report, next_partition_map, next_schema, _watcher) = + transact_terms(&self.transaction, + self.partition_map.clone(), + &self.schema, + &self.schema, + self.cache.transact_watcher(), + terms, + tempid_set)?; self.partition_map = next_partition_map; if let Some(schema) = next_schema { self.schema = schema; @@ -379,7 +390,13 @@ impl<'a, 'c> InProgress<'a, 'c> { // `Metadata` on return. If we used `Cell` or other mechanisms, we'd be using // `Default::default` in those situations to extract the partition map, and so there // would still be some cost. - let (report, next_partition_map, next_schema) = transact(&self.transaction, self.partition_map.clone(), &self.schema, &self.schema, entities)?; + let (report, next_partition_map, next_schema, _watcher) = + transact(&self.transaction, + self.partition_map.clone(), + &self.schema, + &self.schema, + self.cache.transact_watcher(), + entities)?; self.partition_map = next_partition_map; if let Some(schema) = next_schema { self.schema = schema; @@ -423,6 +440,9 @@ impl<'a, 'c> InProgress<'a, 'c> { metadata.generation += 1; metadata.partition_map = self.partition_map; + // Update the conn's cache if we made any changes. + self.cache.commit_to(&mut metadata.attribute_cache); + if self.schema != *(metadata.schema) { metadata.schema = Arc::new(self.schema); @@ -433,6 +453,29 @@ impl<'a, 'c> InProgress<'a, 'c> { Ok(()) } + + pub fn cache(&mut self, + attribute: &NamespacedKeyword, + cache_direction: CacheDirection, + cache_action: CacheAction) -> Result<()> { + let attribute_entid: Entid = self.schema + .attribute_for_ident(&attribute) + .ok_or_else(|| ErrorKind::UnknownAttribute(attribute.to_string()))?.1.into(); + + match cache_action { + CacheAction::Register => { + match cache_direction { + CacheDirection::Both => self.cache.register(&self.schema, &self.transaction, attribute_entid), + CacheDirection::Forward => self.cache.register_forward(&self.schema, &self.transaction, attribute_entid), + CacheDirection::Reverse => self.cache.register_reverse(&self.schema, &self.transaction, attribute_entid), + }.map_err(|e| e.into()) + }, + CacheAction::Deregister => { + self.cache.unregister(attribute_entid); + Ok(()) + }, + } + } } impl Store { @@ -520,8 +563,7 @@ impl Conn { // Intentionally not public. fn new(partition_map: PartitionMap, schema: Schema) -> Conn { Conn { - metadata: Mutex::new(Metadata::new(0, partition_map, Arc::new(schema))), - attribute_cache: Default::default() + metadata: Mutex::new(Metadata::new(0, partition_map, Arc::new(schema), Default::default())), } } @@ -559,15 +601,10 @@ impl Conn { self.metadata.lock().unwrap().schema.clone() } - pub fn attribute_cache<'s>(&'s self) -> RwLockReadGuard<'s, SQLiteAttributeCache> { - self.attribute_cache.read().unwrap() + pub fn current_cache(&self) -> SQLiteAttributeCache { + self.metadata.lock().unwrap().attribute_cache.clone() } - pub fn attribute_cache_mut<'s>(&'s self) -> RwLockWriteGuard<'s, SQLiteAttributeCache> { - self.attribute_cache.write().unwrap() - } - - /// Query the Mentat store, using the given connection and the current metadata. pub fn q_once(&self, sqlite: &rusqlite::Connection, @@ -577,8 +614,7 @@ impl Conn { // Doesn't clone, unlike `current_schema`. let metadata = self.metadata.lock().unwrap(); - let cache = &*self.attribute_cache.read().unwrap(); - let known = Known::new(&*metadata.schema, Some(cache)); + let known = Known::new(&*metadata.schema, Some(&metadata.attribute_cache)); q_once(sqlite, known, query, @@ -607,8 +643,7 @@ impl Conn { where T: Into> { let metadata = self.metadata.lock().unwrap(); - let cache = &*self.attribute_cache.read().unwrap(); - let known = Known::new(&*metadata.schema, Some(cache)); + let known = Known::new(&*metadata.schema, Some(&metadata.attribute_cache)); q_prepare(sqlite, known, query, @@ -622,8 +657,7 @@ impl Conn { where T: Into> { let metadata = self.metadata.lock().unwrap(); - let cache = &*self.attribute_cache.read().unwrap(); - let known = Known::new(&*metadata.schema, Some(cache)); + let known = Known::new(&*metadata.schema, Some(&metadata.attribute_cache)); q_explain(sqlite, known, query, @@ -634,9 +668,8 @@ impl Conn { sqlite: &rusqlite::Connection, entity: Entid, attribute: &edn::NamespacedKeyword) -> Result> { - let schema = &*self.current_schema(); - let cache = &*self.attribute_cache(); - let known = Known::new(schema, Some(cache)); + let metadata = self.metadata.lock().unwrap(); + let known = Known::new(&*metadata.schema, Some(&metadata.attribute_cache)); lookup_values_for_attribute(sqlite, known, entity, attribute) } @@ -644,16 +677,15 @@ impl Conn { sqlite: &rusqlite::Connection, entity: Entid, attribute: &edn::NamespacedKeyword) -> Result> { - let schema = &*self.current_schema(); - let cache = &*self.attribute_cache(); - let known = Known::new(schema, Some(cache)); + let metadata = self.metadata.lock().unwrap(); + let known = Known::new(&*metadata.schema, Some(&metadata.attribute_cache)); lookup_value_for_attribute(sqlite, known, entity, attribute) } /// Take a SQLite transaction. fn begin_transaction_with_behavior<'m, 'conn>(&'m mut self, sqlite: &'conn mut rusqlite::Connection, behavior: TransactionBehavior) -> Result> { let tx = sqlite.transaction_with_behavior(behavior)?; - let (current_generation, current_partition_map, current_schema) = + let (current_generation, current_partition_map, current_schema, cache_cow) = { // The mutex is taken during this block. let ref current: Metadata = *self.metadata.lock().unwrap(); @@ -661,7 +693,8 @@ impl Conn { // Expensive, but the partition map is updated after every committed transaction. current.partition_map.clone(), // Cheap. - current.schema.clone()) + current.schema.clone(), + current.attribute_cache.clone()) }; Ok(InProgress { @@ -670,7 +703,7 @@ impl Conn { generation: current_generation, partition_map: current_partition_map, schema: (*current_schema).clone(), - cache: self.attribute_cache.write().unwrap(), + cache: InProgressSQLiteAttributeCache::from_cache(cache_cow), use_caching: true, }) } @@ -717,41 +750,39 @@ impl Conn { Ok(report) } - // TODO: Figure out how to set max cache size and max result size and implement those on cache - // Question: Should those be only for lazy cache? The eager cache could perhaps grow infinitely - // and it becomes up to the client to manage memory usage by excising from cache when no longer - // needed - /// Adds or removes the values of a given attribute to an in memory cache - /// The attribute should be a namespaced string `:foo/bar`. - /// cache_action determines if the attribute should be added or removed from the cache. - /// CacheAction::Add is idempotent - each attribute is only added once and cannot be both lazy - /// and eager. + /// Adds or removes the values of a given attribute to an in-memory cache. + /// The attribute should be a namespaced string: e.g., `:foo/bar`. + /// `cache_action` determines if the attribute should be added or removed from the cache. + /// CacheAction::Add is idempotent - each attribute is only added once. /// CacheAction::Remove throws an error if the attribute does not currently exist in the cache. - /// CacheType::Eager fetches all the values of the attribute and caches them on add. - /// CacheType::Lazy caches values only after they have first been fetched. pub fn cache(&mut self, sqlite: &mut rusqlite::Connection, schema: &Schema, attribute: &NamespacedKeyword, cache_direction: CacheDirection, cache_action: CacheAction) -> Result<()> { - match self.current_schema().attribute_for_ident(&attribute) { - None => bail!(ErrorKind::UnknownAttribute(attribute.to_string())), - Some((_attribute, attribute_entid)) => { - let mut cache = self.attribute_cache.write().unwrap(); - match cache_action { - CacheAction::Register => { - match cache_direction { - CacheDirection::Both => cache.register(schema, sqlite, attribute_entid), - CacheDirection::Forward => cache.register_forward(schema, sqlite, attribute_entid), - CacheDirection::Reverse => cache.register_reverse(schema, sqlite, attribute_entid), - }.map_err(|e| e.into()) - }, - CacheAction::Deregister => { - cache.unregister(attribute_entid); - Ok(()) - }, - } + let mut metadata = self.metadata.lock().unwrap(); + let attribute_entid: Entid; + + // Immutable borrow of metadata. + { + attribute_entid = metadata.schema + .attribute_for_ident(&attribute) + .ok_or_else(|| ErrorKind::UnknownAttribute(attribute.to_string()))?.1.into(); + } + + let cache = &mut metadata.attribute_cache; + match cache_action { + CacheAction::Register => { + match cache_direction { + CacheDirection::Both => cache.register(schema, sqlite, attribute_entid), + CacheDirection::Forward => cache.register_forward(schema, sqlite, attribute_entid), + CacheDirection::Reverse => cache.register_reverse(schema, sqlite, attribute_entid), + }.map_err(|e| e.into()) + }, + CacheAction::Deregister => { + cache.unregister(attribute_entid); + Ok(()) }, } } @@ -761,18 +792,34 @@ impl Conn { mod tests { use super::*; + extern crate time; extern crate mentat_parser_utils; + use std::collections::{ + BTreeSet, + }; + + use std::path::{ + PathBuf, + }; + use std::time::Instant; use mentat_core::{ + CachedAttributes, TypedValue, }; + use query::{ + PreparedQuery, Variable, }; - use ::QueryResults; + use ::{ + IntoResult, + QueryInputs, + QueryResults, + }; use mentat_db::USER0; @@ -1081,4 +1128,257 @@ mod tests { assert!(cached_elapsed_time < uncached_elapsed_time); } } + + #[test] + fn test_cache_usage() { + let mut sqlite = db::new_connection("").unwrap(); + let mut conn = Conn::connect(&mut sqlite).unwrap(); + + let db_ident = (*conn.current_schema()).get_entid(&kw!(:db/ident)).expect("db_ident").0; + let db_type = (*conn.current_schema()).get_entid(&kw!(:db/valueType)).expect("db_ident").0; + println!("db/ident is {}", db_ident); + println!("db/type is {}", db_type); + let query = format!("[:find ?ident . :where [?e {} :db/doc][?e {} ?type][?type {} ?ident]]", + db_ident, db_type, db_ident); + + println!("Query is {}", query); + + assert!(!conn.current_cache().is_attribute_cached_forward(db_ident)); + + { + let mut ip = conn.begin_transaction(&mut sqlite).expect("began"); + + let ident = ip.q_once(query.as_str(), None).into_scalar_result().expect("query"); + assert_eq!(ident, Some(TypedValue::typed_ns_keyword("db.type", "string"))); + + let start = time::PreciseTime::now(); + ip.q_once(query.as_str(), None).into_scalar_result().expect("query"); + let end = time::PreciseTime::now(); + println!("Uncached took {}µs", start.to(end).num_microseconds().unwrap()); + + ip.cache(&kw!(:db/ident), CacheDirection::Forward, CacheAction::Register).expect("registered"); + ip.cache(&kw!(:db/valueType), CacheDirection::Forward, CacheAction::Register).expect("registered"); + + assert!(ip.cache.is_attribute_cached_forward(db_ident)); + + let ident = ip.q_once(query.as_str(), None).into_scalar_result().expect("query"); + assert_eq!(ident, Some(TypedValue::typed_ns_keyword("db.type", "string"))); + + let start = time::PreciseTime::now(); + ip.q_once(query.as_str(), None).into_scalar_result().expect("query"); + let end = time::PreciseTime::now(); + println!("Cached took {}µs", start.to(end).num_microseconds().unwrap()); + + // If we roll back the change, our caching operations are also rolled back. + ip.rollback().expect("rolled back"); + } + + assert!(!conn.current_cache().is_attribute_cached_forward(db_ident)); + + { + let mut ip = conn.begin_transaction(&mut sqlite).expect("began"); + + let ident = ip.q_once(query.as_str(), None).into_scalar_result().expect("query"); + assert_eq!(ident, Some(TypedValue::typed_ns_keyword("db.type", "string"))); + ip.cache(&kw!(:db/ident), CacheDirection::Forward, CacheAction::Register).expect("registered"); + ip.cache(&kw!(:db/valueType), CacheDirection::Forward, CacheAction::Register).expect("registered"); + + assert!(ip.cache.is_attribute_cached_forward(db_ident)); + + ip.commit().expect("rolled back"); + } + + assert!(conn.current_cache().is_attribute_cached_forward(db_ident)); + assert!(conn.current_cache().is_attribute_cached_forward(db_type)); + } + + fn fixture_path(rest: &str) -> PathBuf { + let fixtures = Path::new("fixtures/"); + fixtures.join(Path::new(rest)) + } + + #[test] + fn test_prepared_query_with_cache() { + let mut store = Store::open("").expect("opened"); + let mut in_progress = store.begin_transaction().expect("began"); + in_progress.import(fixture_path("cities.schema")).expect("transacted schema"); + in_progress.import(fixture_path("all_seattle.edn")).expect("transacted data"); + in_progress.cache(&kw!(:neighborhood/district), CacheDirection::Forward, CacheAction::Register).expect("cache done"); + in_progress.cache(&kw!(:district/name), CacheDirection::Forward, CacheAction::Register).expect("cache done"); + in_progress.cache(&kw!(:neighborhood/name), CacheDirection::Reverse, CacheAction::Register).expect("cache done"); + + let query = r#"[:find ?district + :in ?hood + :where + [?neighborhood :neighborhood/name ?hood] + [?neighborhood :neighborhood/district ?d] + [?d :district/name ?district]]"#; + let hood = "Beacon Hill"; + let inputs = QueryInputs::with_value_sequence(vec![(var!(?hood), TypedValue::typed_string(hood))]); + let mut prepared = in_progress.q_prepare(query, inputs) + .expect("prepared"); + match &prepared { + &PreparedQuery::Constant { select: ref _select } => {}, + _ => panic!(), + }; + + + let start = time::PreciseTime::now(); + let results = prepared.run(None).expect("results"); + let end = time::PreciseTime::now(); + println!("Prepared cache execution took {}µs", start.to(end).num_microseconds().unwrap()); + assert_eq!(results.into_rel().expect("result"), + vec![vec![TypedValue::typed_string("Greater Duwamish")]]); + } + + trait StoreCache { + fn get_entid_for_value(&self, attr: Entid, val: &TypedValue) -> Option; + fn is_attribute_cached_reverse(&self, attr: Entid) -> bool; + fn is_attribute_cached_forward(&self, attr: Entid) -> bool; + } + + impl StoreCache for Store { + fn get_entid_for_value(&self, attr: Entid, val: &TypedValue) -> Option { + let cache = self.conn.current_cache(); + cache.get_entid_for_value(attr, val) + } + + fn is_attribute_cached_forward(&self, attr: Entid) -> bool { + self.conn.current_cache().is_attribute_cached_forward(attr) + } + + fn is_attribute_cached_reverse(&self, attr: Entid) -> bool { + self.conn.current_cache().is_attribute_cached_reverse(attr) + } + } + + #[test] + fn test_cache_mutation() { + let mut store = Store::open("").expect("opened"); + + { + let mut in_progress = store.begin_transaction().expect("begun"); + in_progress.transact(r#"[ + { :db/ident :foo/bar + :db/cardinality :db.cardinality/one + :db/index true + :db/unique :db.unique/identity + :db/valueType :db.type/long }, + { :db/ident :foo/baz + :db/cardinality :db.cardinality/one + :db/valueType :db.type/boolean } + { :db/ident :foo/x + :db/cardinality :db.cardinality/many + :db/valueType :db.type/long }]"#).expect("transact"); + + // Cache one…. + in_progress.cache(&kw!(:foo/bar), CacheDirection::Reverse, CacheAction::Register).expect("cache done"); + in_progress.commit().expect("commit"); + } + + let foo_bar = store.conn.current_schema().get_entid(&kw!(:foo/bar)).expect("foo/bar").0; + let foo_baz = store.conn.current_schema().get_entid(&kw!(:foo/baz)).expect("foo/baz").0; + let foo_x = store.conn.current_schema().get_entid(&kw!(:foo/x)).expect("foo/x").0; + + // … and cache the others via the store. + store.cache(&kw!(:foo/baz), CacheDirection::Both).expect("cache done"); + store.cache(&kw!(:foo/x), CacheDirection::Forward).expect("cache done"); + { + assert!(store.is_attribute_cached_reverse(foo_bar)); + assert!(store.is_attribute_cached_forward(foo_baz)); + assert!(store.is_attribute_cached_reverse(foo_baz)); + assert!(store.is_attribute_cached_forward(foo_x)); + } + + // Add some data. + { + let mut in_progress = store.begin_transaction().expect("begun"); + + { + assert!(in_progress.cache.is_attribute_cached_reverse(foo_bar)); + assert!(in_progress.cache.is_attribute_cached_forward(foo_baz)); + assert!(in_progress.cache.is_attribute_cached_reverse(foo_baz)); + assert!(in_progress.cache.is_attribute_cached_forward(foo_x)); + + assert!(in_progress.cache.overlay.is_attribute_cached_reverse(foo_bar)); + assert!(in_progress.cache.overlay.is_attribute_cached_forward(foo_baz)); + assert!(in_progress.cache.overlay.is_attribute_cached_reverse(foo_baz)); + assert!(in_progress.cache.overlay.is_attribute_cached_forward(foo_x)); + } + + in_progress.transact(r#"[ + {:foo/bar 15, :foo/baz false, :foo/x [1, 2, 3]} + {:foo/bar 99, :foo/baz true} + {:foo/bar -2, :foo/baz true} + ]"#).expect("transact"); + + // Data is in the cache. + let first = in_progress.cache.get_entid_for_value(foo_bar, &TypedValue::Long(15)).expect("id"); + assert_eq!(in_progress.cache.get_value_for_entid(&in_progress.schema, foo_baz, first).expect("val"), &TypedValue::Boolean(false)); + + // All three values for :foo/x. + let all_three: BTreeSet = in_progress.cache + .get_values_for_entid(&in_progress.schema, foo_x, first) + .expect("val") + .iter().cloned().collect(); + assert_eq!(all_three, vec![1, 2, 3].into_iter().map(TypedValue::Long).collect()); + + in_progress.commit().expect("commit"); + } + + // Data is still in the cache. + { + let first = store.get_entid_for_value(foo_bar, &TypedValue::Long(15)).expect("id"); + let cache: SQLiteAttributeCache = store.conn.current_cache(); + assert_eq!(cache.get_value_for_entid(&store.conn.current_schema(), foo_baz, first).expect("val"), &TypedValue::Boolean(false)); + + let all_three: BTreeSet = cache.get_values_for_entid(&store.conn.current_schema(), foo_x, first) + .expect("val") + .iter().cloned().collect(); + assert_eq!(all_three, vec![1, 2, 3].into_iter().map(TypedValue::Long).collect()); + } + + // We can remove data and the cache reflects it, immediately and after commit. + { + let mut in_progress = store.begin_transaction().expect("began"); + let first = in_progress.cache.get_entid_for_value(foo_bar, &TypedValue::Long(15)).expect("id"); + in_progress.transact(format!("[[:db/retract {} :foo/x 2]]", first).as_str()).expect("transact"); + + let only_two: BTreeSet = in_progress.cache + .get_values_for_entid(&in_progress.schema, foo_x, first) + .expect("val") + .iter().cloned().collect(); + assert_eq!(only_two, vec![1, 3].into_iter().map(TypedValue::Long).collect()); + + // Rollback: unchanged. + } + { + let first = store.get_entid_for_value(foo_bar, &TypedValue::Long(15)).expect("id"); + let cache: SQLiteAttributeCache = store.conn.current_cache(); + assert_eq!(cache.get_value_for_entid(&store.conn.current_schema(), foo_baz, first).expect("val"), &TypedValue::Boolean(false)); + + let all_three: BTreeSet = cache.get_values_for_entid(&store.conn.current_schema(), foo_x, first) + .expect("val") + .iter().cloned().collect(); + assert_eq!(all_three, vec![1, 2, 3].into_iter().map(TypedValue::Long).collect()); + } + + // Try again, but this time commit. + { + let mut in_progress = store.begin_transaction().expect("began"); + let first = in_progress.cache.get_entid_for_value(foo_bar, &TypedValue::Long(15)).expect("id"); + in_progress.transact(format!("[[:db/retract {} :foo/x 2]]", first).as_str()).expect("transact"); + in_progress.commit().expect("committed"); + } + { + let first = store.get_entid_for_value(foo_bar, &TypedValue::Long(15)).expect("id"); + let cache: SQLiteAttributeCache = store.conn.current_cache(); + assert_eq!(cache.get_value_for_entid(&store.conn.current_schema(), foo_baz, first).expect("val"), &TypedValue::Boolean(false)); + + let only_two: BTreeSet = cache.get_values_for_entid(&store.conn.current_schema(), foo_x, first) + .expect("val") + .iter().cloned().collect(); + assert_eq!(only_two, vec![1, 3].into_iter().map(TypedValue::Long).collect()); + } + } } diff --git a/src/query.rs b/src/query.rs index 88a4fcf4..1c260092 100644 --- a/src/query.rs +++ b/src/query.rs @@ -52,6 +52,7 @@ use mentat_query_parser::{ }; use mentat_query_projector::{ + ConstantProjector, Projector, }; @@ -60,6 +61,7 @@ use mentat_sql::{ }; use mentat_query_translator::{ + ProjectedSelect, query_to_select, }; @@ -84,6 +86,9 @@ pub enum PreparedQuery<'sqlite> { Empty { find_spec: Rc, }, + Constant { + select: ConstantProjector, + }, Bound { statement: rusqlite::Statement<'sqlite>, args: Vec<(String, Rc)>, @@ -97,6 +102,9 @@ impl<'sqlite> PreparedQuery<'sqlite> { &mut PreparedQuery::Empty { ref find_spec } => { Ok(QueryOutput::empty(find_spec)) }, + &mut PreparedQuery::Constant { ref select } => { + select.project_without_rows().map_err(|e| e.into()) + }, &mut PreparedQuery::Bound { ref mut statement, ref args, ref projector } => { let rows = run_statement(statement, args)?; projector @@ -136,6 +144,10 @@ impl IntoResult for QueryExecutionResult { pub enum QueryExplanation { /// A query known in advance to be empty, and why we believe that. KnownEmpty(EmptyBecause), + + /// A query known in advance to return a constant value. + KnownConstant, + /// A query that takes actual work to execute. ExecutionPlan { /// The translated query and any bindings. @@ -316,14 +328,18 @@ fn run_algebrized_query<'sqlite>(sqlite: &'sqlite rusqlite::Connection, algebriz } let select = query_to_select(algebrized)?; - let SQLQuery { sql, args } = select.query.to_sql_query()?; + match select { + ProjectedSelect::Constant(constant) => constant.project_without_rows() + .map_err(|e| e.into()), + ProjectedSelect::Query { query, projector } => { + let SQLQuery { sql, args } = query.to_sql_query()?; - let mut statement = sqlite.prepare(sql.as_str())?; - let rows = run_statement(&mut statement, &args)?; + let mut statement = sqlite.prepare(sql.as_str())?; + let rows = run_statement(&mut statement, &args)?; - select.projector - .project(rows) - .map_err(|e| e.into()) + projector.project(rows).map_err(|e| e.into()) + }, + } } /// Take an EDN query string, a reference to an open SQLite connection, a Mentat schema, and an @@ -382,14 +398,23 @@ pub fn q_prepare<'sqlite, 'query, T> } let select = query_to_select(algebrized)?; - let SQLQuery { sql, args } = select.query.to_sql_query()?; - let statement = sqlite.prepare(sql.as_str())?; + match select { + ProjectedSelect::Constant(constant) => { + Ok(PreparedQuery::Constant { + select: constant, + }) + }, + ProjectedSelect::Query { query, projector } => { + let SQLQuery { sql, args } = query.to_sql_query()?; + let statement = sqlite.prepare(sql.as_str())?; - Ok(PreparedQuery::Bound { - statement, - args, - projector: select.projector - }) + Ok(PreparedQuery::Bound { + statement, + args, + projector: projector + }) + }, + } } pub fn q_explain<'sqlite, 'query, T> @@ -403,18 +428,23 @@ pub fn q_explain<'sqlite, 'query, T> if algebrized.is_known_empty() { return Ok(QueryExplanation::KnownEmpty(algebrized.cc.empty_because.unwrap())); } - let query = query_to_select(algebrized)?.query.to_sql_query()?; + match query_to_select(algebrized)? { + ProjectedSelect::Constant(_constant) => Ok(QueryExplanation::KnownConstant), + ProjectedSelect::Query { query, projector: _projector } => { + let query = query.to_sql_query()?; - let plan_sql = format!("EXPLAIN QUERY PLAN {}", query.sql); + let plan_sql = format!("EXPLAIN QUERY PLAN {}", query.sql); - let steps = run_sql_query(sqlite, &plan_sql, &query.args, |row| { - QueryPlanStep { - select_id: row.get(0), - order: row.get(1), - from: row.get(2), - detail: row.get(3) - } - })?; + let steps = run_sql_query(sqlite, &plan_sql, &query.args, |row| { + QueryPlanStep { + select_id: row.get(0), + order: row.get(1), + from: row.get(2), + detail: row.get(3) + } + })?; - Ok(QueryExplanation::ExecutionPlan { query, steps }) + Ok(QueryExplanation::ExecutionPlan { query, steps }) + }, + } } diff --git a/tests/cache.rs b/tests/cache.rs index ef7cca88..e2adbc62 100644 --- a/tests/cache.rs +++ b/tests/cache.rs @@ -97,7 +97,7 @@ fn test_add_to_cache() { { let cached_values = attribute_cache.value_pairs(schema, attr).expect("non-None"); assert!(!cached_values.is_empty()); - let flattened: BTreeSet = cached_values.values().cloned().collect(); + let flattened: BTreeSet = cached_values.values().cloned().filter_map(|x| x).collect(); let expected: BTreeSet = vec![TypedValue::Long(100), TypedValue::Long(200)].into_iter().collect(); assert_eq!(flattened, expected); } diff --git a/tests/query.rs b/tests/query.rs index 3ef75ace..13b73a62 100644 --- a/tests/query.rs +++ b/tests/query.rs @@ -32,7 +32,6 @@ use mentat_core::{ }; use mentat::{ - IntoResult, NamespacedKeyword, PlainSymbol, QueryInputs, @@ -622,38 +621,3 @@ fn test_type_reqs() { } }; } - -#[test] -fn test_cache_usage() { - let mut c = new_connection("").expect("opened connection"); - let conn = Conn::connect(&mut c).expect("connected"); - - let db_ident = (*conn.current_schema()).get_entid(&kw!(:db/ident)).expect("db_ident"); - let db_type = (*conn.current_schema()).get_entid(&kw!(:db/valueType)).expect("db_ident"); - println!("db/ident is {}", db_ident.0); - println!("db/type is {}", db_type.0); - let query = format!("[:find ?ident . :where [?e {} :db/doc][?e {} ?type][?type {} ?ident]]", - db_ident.0, db_type.0, db_ident.0); - - println!("Query is {}", query); - - let schema = conn.current_schema(); - (*conn.attribute_cache_mut()).register(&schema, &mut c, db_ident).expect("registered"); - (*conn.attribute_cache_mut()).register(&schema, &mut c, db_type).expect("registered"); - - let ident = conn.q_once(&c, query.as_str(), None).into_scalar_result().expect("query"); - assert_eq!(ident, Some(TypedValue::typed_ns_keyword("db.type", "string"))); - - let ident = conn.q_uncached(&c, query.as_str(), None).into_scalar_result().expect("query"); - assert_eq!(ident, Some(TypedValue::typed_ns_keyword("db.type", "string"))); - - let start = time::PreciseTime::now(); - conn.q_once(&c, query.as_str(), None).into_scalar_result().expect("query"); - let end = time::PreciseTime::now(); - println!("Cached took {}µs", start.to(end).num_microseconds().unwrap()); - - let start = time::PreciseTime::now(); - conn.q_uncached(&c, query.as_str(), None).into_scalar_result().expect("query"); - let end = time::PreciseTime::now(); - println!("Uncached took {}µs", start.to(end).num_microseconds().unwrap()); -} diff --git a/tools/cli/src/mentat_cli/repl.rs b/tools/cli/src/mentat_cli/repl.rs index 06fa44c2..b85e9404 100644 --- a/tools/cli/src/mentat_cli/repl.rs +++ b/tools/cli/src/mentat_cli/repl.rs @@ -458,6 +458,8 @@ impl Repl { match self.store.q_explain(query.as_str(), None) { Result::Err(err) => println!("{:?}.", err), + Result::Ok(QueryExplanation::KnownConstant) => + println!("Query is known constant!"), Result::Ok(QueryExplanation::KnownEmpty(empty_because)) => println!("Query is known empty: {:?}", empty_because), Result::Ok(QueryExplanation::ExecutionPlan { query, steps }) => {