Create generalized in-memory cache for attributes (#525)
* Nit: Alphabetical ordering of imports * Create Cache and provide functions for calling it * Get tests working. Move to using NamespacedKeyword over KnownEntid in function signature * Add is_cached check to caching tests * Move lazy and add/remove boolean flags to enums * Move function definitions into generic trait and implement trait for AttributeCache * Remove lazy cache and generalize cache * Update tests * Eager cache becomes simple key value store. AttributeMap handles attribute storing specifics * Update tests to test presence of correct values in cache * Move EagerCache, AttributeValueProvider and ValueProvider into mentat_db * Add test for get_for_entid * Add test for lookup attribute * Make caches cloneable. Add value_for alongside values_for * Use cache in attribute lookups * Split test for values and value and add cardinality * address review feedback r=rnewman
This commit is contained in:
parent
d848d954cf
commit
715d434945
6 changed files with 496 additions and 25 deletions
99
db/src/cache.rs
Normal file
99
db/src/cache.rs
Normal file
|
@ -0,0 +1,99 @@
|
||||||
|
// Copyright 2016 Mozilla
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
|
||||||
|
// this file except in compliance with the License. You may obtain a copy of the
|
||||||
|
// License at http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
// Unless required by applicable law or agreed to in writing, software distributed
|
||||||
|
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
|
||||||
|
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
|
||||||
|
// specific language governing permissions and limitations under the License.
|
||||||
|
|
||||||
|
use std::cmp::Ord;
|
||||||
|
use std::collections::BTreeMap;
|
||||||
|
use std::fmt::Debug;
|
||||||
|
|
||||||
|
use rusqlite;
|
||||||
|
|
||||||
|
use errors::{
|
||||||
|
Result
|
||||||
|
};
|
||||||
|
use db::{
|
||||||
|
TypedSQLValue,
|
||||||
|
};
|
||||||
|
use mentat_core::{
|
||||||
|
Entid,
|
||||||
|
TypedValue,
|
||||||
|
};
|
||||||
|
|
||||||
|
pub type CacheMap<K, V> = BTreeMap<K, V>;
|
||||||
|
|
||||||
|
pub trait ValueProvider<K, V>: Clone {
|
||||||
|
fn fetch_values<'sqlite>(&mut self, sqlite: &'sqlite rusqlite::Connection) -> Result<CacheMap<K, V>>;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub trait Cacheable {
|
||||||
|
type Key;
|
||||||
|
type Value;
|
||||||
|
|
||||||
|
fn cache_values<'sqlite>(&mut self, sqlite: &'sqlite rusqlite::Connection) -> Result<()>;
|
||||||
|
fn get(&self, key: &Self::Key) -> Option<&Self::Value>;
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct EagerCache<K, V, VP> where K: Ord, VP: ValueProvider<K, V> {
|
||||||
|
pub cache: CacheMap<K, V>,
|
||||||
|
value_provider: VP,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<K, V, VP> EagerCache<K, V, VP> where K: Ord, VP: ValueProvider<K, V> {
|
||||||
|
pub fn new(value_provider: VP) -> Self {
|
||||||
|
EagerCache {
|
||||||
|
cache: CacheMap::new(),
|
||||||
|
value_provider: value_provider,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<K, V, VP> Cacheable for EagerCache<K, V, VP>
|
||||||
|
where K: Ord + Clone + Debug + ::std::hash::Hash,
|
||||||
|
V: Clone,
|
||||||
|
VP: ValueProvider<K, V> {
|
||||||
|
type Key = K;
|
||||||
|
type Value = V;
|
||||||
|
|
||||||
|
fn cache_values<'sqlite>(&mut self, sqlite: &'sqlite rusqlite::Connection) -> Result<()> {
|
||||||
|
// fetch results and add to cache
|
||||||
|
self.cache = self.value_provider.fetch_values(sqlite)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get(&self, key: &Self::Key) -> Option<&Self::Value> {
|
||||||
|
self.cache.get(&key)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct AttributeValueProvider {
|
||||||
|
pub attribute: Entid,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ValueProvider<Entid, Vec<TypedValue>> for AttributeValueProvider {
|
||||||
|
fn fetch_values<'sqlite>(&mut self, sqlite: &'sqlite rusqlite::Connection) -> Result<CacheMap<Entid, Vec<TypedValue>>> {
|
||||||
|
let sql = "SELECT e, v, value_type_tag FROM datoms WHERE a = ? ORDER BY e ASC";
|
||||||
|
let mut stmt = sqlite.prepare(sql)?;
|
||||||
|
let value_iter = stmt.query_map(&[&self.attribute], |row| {
|
||||||
|
let entid: Entid = row.get(0);
|
||||||
|
let value_type_tag: i32 = row.get(2);
|
||||||
|
let value = TypedValue::from_sql_value_pair(row.get(1), value_type_tag).map(|x| x).unwrap();
|
||||||
|
(entid, value)
|
||||||
|
}).map_err(|e| e.into());
|
||||||
|
value_iter.map(|v| {
|
||||||
|
v.fold(CacheMap::new(), |mut map, row| {
|
||||||
|
let _ = row.map(|r| {
|
||||||
|
map.entry(r.0).or_insert(vec![]).push(r.1);
|
||||||
|
});
|
||||||
|
map
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
|
@ -33,18 +33,19 @@ use itertools::Itertools;
|
||||||
|
|
||||||
pub use errors::{Error, ErrorKind, ResultExt, Result};
|
pub use errors::{Error, ErrorKind, ResultExt, Result};
|
||||||
|
|
||||||
|
mod add_retract_alter_set;
|
||||||
|
pub mod cache;
|
||||||
pub mod db;
|
pub mod db;
|
||||||
mod bootstrap;
|
mod bootstrap;
|
||||||
pub mod debug;
|
pub mod debug;
|
||||||
mod add_retract_alter_set;
|
|
||||||
pub mod entids;
|
pub mod entids;
|
||||||
pub mod errors;
|
pub mod errors;
|
||||||
|
pub mod internal_types; // pub because we need them for building entities programmatically.
|
||||||
mod metadata;
|
mod metadata;
|
||||||
mod schema;
|
mod schema;
|
||||||
pub mod types;
|
|
||||||
pub mod internal_types; // pub because we need them for building entities programmatically.
|
|
||||||
mod upsert_resolution;
|
|
||||||
mod tx;
|
mod tx;
|
||||||
|
pub mod types;
|
||||||
|
mod upsert_resolution;
|
||||||
|
|
||||||
// Export these for reference from tests. cfg(test) should work, but doesn't.
|
// Export these for reference from tests. cfg(test) should work, but doesn't.
|
||||||
// #[cfg(test)]
|
// #[cfg(test)]
|
||||||
|
@ -57,11 +58,12 @@ pub use schema::{
|
||||||
AttributeBuilder,
|
AttributeBuilder,
|
||||||
AttributeValidation,
|
AttributeValidation,
|
||||||
};
|
};
|
||||||
|
|
||||||
pub use bootstrap::{
|
pub use bootstrap::{
|
||||||
CORE_SCHEMA_VERSION,
|
CORE_SCHEMA_VERSION,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
use edn::symbols;
|
||||||
|
|
||||||
pub use entids::{
|
pub use entids::{
|
||||||
DB_SCHEMA_CORE,
|
DB_SCHEMA_CORE,
|
||||||
};
|
};
|
||||||
|
@ -82,8 +84,6 @@ pub use types::{
|
||||||
TxReport,
|
TxReport,
|
||||||
};
|
};
|
||||||
|
|
||||||
use edn::symbols;
|
|
||||||
|
|
||||||
pub fn to_namespaced_keyword(s: &str) -> Result<symbols::NamespacedKeyword> {
|
pub fn to_namespaced_keyword(s: &str) -> Result<symbols::NamespacedKeyword> {
|
||||||
let splits = [':', '/'];
|
let splits = [':', '/'];
|
||||||
let mut i = s.split(&splits[..]);
|
let mut i = s.split(&splits[..]);
|
||||||
|
|
221
src/cache.rs
Normal file
221
src/cache.rs
Normal file
|
@ -0,0 +1,221 @@
|
||||||
|
// Copyright 2016 Mozilla
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
|
||||||
|
// this file except in compliance with the License. You may obtain a copy of the
|
||||||
|
// License at http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
// Unless required by applicable law or agreed to in writing, software distributed
|
||||||
|
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
|
||||||
|
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
|
||||||
|
// specific language governing permissions and limitations under the License.
|
||||||
|
|
||||||
|
use std::collections::BTreeMap;
|
||||||
|
|
||||||
|
use rusqlite;
|
||||||
|
|
||||||
|
use mentat_core::{
|
||||||
|
Entid,
|
||||||
|
TypedValue,
|
||||||
|
};
|
||||||
|
|
||||||
|
use mentat_db::cache::{
|
||||||
|
AttributeValueProvider,
|
||||||
|
Cacheable,
|
||||||
|
EagerCache,
|
||||||
|
CacheMap,
|
||||||
|
};
|
||||||
|
|
||||||
|
use errors::{
|
||||||
|
Result,
|
||||||
|
};
|
||||||
|
|
||||||
|
pub enum CacheAction {
|
||||||
|
Register,
|
||||||
|
Deregister,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct AttributeCacher {
|
||||||
|
a_e_vs_cache: BTreeMap<Entid, EagerCache<Entid, Vec<TypedValue>, AttributeValueProvider>>, // values keyed by attribute
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AttributeCacher {
|
||||||
|
|
||||||
|
pub fn new() -> Self {
|
||||||
|
AttributeCacher {
|
||||||
|
a_e_vs_cache: BTreeMap::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn register_attribute<'sqlite>(&mut self, sqlite: &'sqlite rusqlite::Connection, attribute: Entid) -> Result<()> {
|
||||||
|
let value_provider = AttributeValueProvider{ attribute: attribute };
|
||||||
|
let mut cacher = EagerCache::new(value_provider);
|
||||||
|
cacher.cache_values(sqlite)?;
|
||||||
|
self.a_e_vs_cache.insert(attribute, cacher);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn deregister_attribute(&mut self, attribute: &Entid) -> Option<CacheMap<Entid, Vec<TypedValue>>> {
|
||||||
|
self.a_e_vs_cache.remove(&attribute).map(|m| m.cache)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get(&self, attribute: &Entid) -> Option<&CacheMap<Entid, Vec<TypedValue>>> {
|
||||||
|
self.a_e_vs_cache.get( &attribute ).map(|m| &m.cache)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_values_for_entid(&self, attribute: &Entid, entid: &Entid) -> Option<&Vec<TypedValue>> {
|
||||||
|
self.a_e_vs_cache.get(&attribute).and_then(|c| c.get(&entid))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_value_for_entid(&self, attribute: &Entid, entid: &Entid) -> Option<&TypedValue> {
|
||||||
|
self.get_values_for_entid(attribute, entid).and_then(|c| c.first())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use std::rc::Rc;
|
||||||
|
use mentat_core::{
|
||||||
|
HasSchema,
|
||||||
|
KnownEntid,
|
||||||
|
};
|
||||||
|
use mentat_db::db;
|
||||||
|
use mentat_db::types::TypedValue;
|
||||||
|
|
||||||
|
use conn::Conn;
|
||||||
|
|
||||||
|
fn populate_db() -> (Conn, rusqlite::Connection) {
|
||||||
|
let mut sqlite = db::new_connection("").unwrap();
|
||||||
|
let mut conn = Conn::connect(&mut sqlite).unwrap();
|
||||||
|
let _report = conn.transact(&mut sqlite, r#"[
|
||||||
|
{ :db/ident :foo/bar
|
||||||
|
:db/valueType :db.type/long
|
||||||
|
:db/cardinality :db.cardinality/one },
|
||||||
|
{ :db/ident :foo/baz
|
||||||
|
:db/valueType :db.type/boolean
|
||||||
|
:db/cardinality :db.cardinality/one },
|
||||||
|
{ :db/ident :foo/bap
|
||||||
|
:db/valueType :db.type/string
|
||||||
|
:db/cardinality :db.cardinality/many}]"#).expect("transaction expected to succeed");
|
||||||
|
let _report = conn.transact(&mut sqlite, r#"[
|
||||||
|
{ :foo/bar 100
|
||||||
|
:foo/baz false
|
||||||
|
:foo/bap ["one","two","buckle my shoe"] },
|
||||||
|
{ :foo/bar 200
|
||||||
|
:foo/baz true
|
||||||
|
:foo/bap ["three", "four", "knock at my door"] }]"#).expect("transaction expected to succeed");
|
||||||
|
(conn, sqlite)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn assert_values_present_for_attribute(attribute_cache: &mut AttributeCacher, attribute: &KnownEntid, values: Vec<Vec<TypedValue>>) {
|
||||||
|
let cached_values: Vec<Vec<TypedValue>> = attribute_cache.get(&attribute.0)
|
||||||
|
.expect("Expected cached values")
|
||||||
|
.values()
|
||||||
|
.cloned()
|
||||||
|
.collect();
|
||||||
|
assert_eq!(cached_values, values);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_add_to_cache() {
|
||||||
|
let (conn, sqlite) = populate_db();
|
||||||
|
let schema = conn.current_schema();
|
||||||
|
let mut attribute_cache = AttributeCacher::new();
|
||||||
|
let kw = kw!(:foo/bar);
|
||||||
|
let entid = schema.get_entid(&kw).expect("Expected entid for attribute");
|
||||||
|
attribute_cache.register_attribute(&sqlite, entid.0.clone() ).expect("No errors on add to cache");
|
||||||
|
assert_values_present_for_attribute(&mut attribute_cache, &entid, vec![vec![TypedValue::Long(100)], vec![TypedValue::Long(200)]]);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_add_attribute_already_in_cache() {
|
||||||
|
let (conn, mut sqlite) = populate_db();
|
||||||
|
let schema = conn.current_schema();
|
||||||
|
|
||||||
|
let kw = kw!(:foo/bar);
|
||||||
|
let entid = schema.get_entid(&kw).expect("Expected entid for attribute");
|
||||||
|
let mut attribute_cache = AttributeCacher::new();
|
||||||
|
|
||||||
|
attribute_cache.register_attribute(&mut sqlite, entid.0.clone()).expect("No errors on add to cache");
|
||||||
|
assert_values_present_for_attribute(&mut attribute_cache, &entid, vec![vec![TypedValue::Long(100)], vec![TypedValue::Long(200)]]);
|
||||||
|
attribute_cache.register_attribute(&mut sqlite, entid.0.clone()).expect("No errors on add to cache");
|
||||||
|
assert_values_present_for_attribute(&mut attribute_cache, &entid, vec![vec![TypedValue::Long(100)], vec![TypedValue::Long(200)]]);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_remove_from_cache() {
|
||||||
|
let (conn, mut sqlite) = populate_db();
|
||||||
|
let schema = conn.current_schema();
|
||||||
|
|
||||||
|
let kwr = kw!(:foo/bar);
|
||||||
|
let entidr = schema.get_entid(&kwr).expect("Expected entid for attribute");
|
||||||
|
let kwz = kw!(:foo/baz);
|
||||||
|
let entidz = schema.get_entid(&kwz).expect("Expected entid for attribute");
|
||||||
|
|
||||||
|
let mut attribute_cache = AttributeCacher::new();
|
||||||
|
|
||||||
|
attribute_cache.register_attribute(&mut sqlite, entidr.0.clone()).expect("No errors on add to cache");
|
||||||
|
assert_values_present_for_attribute(&mut attribute_cache, &entidr, vec![vec![TypedValue::Long(100)], vec![TypedValue::Long(200)]]);
|
||||||
|
attribute_cache.register_attribute(&mut sqlite, entidz.0.clone()).expect("No errors on add to cache");
|
||||||
|
assert_values_present_for_attribute(&mut attribute_cache, &entidz, vec![vec![TypedValue::Boolean(false)], vec![TypedValue::Boolean(true)]]);
|
||||||
|
|
||||||
|
// test that we can remove an item from cache
|
||||||
|
attribute_cache.deregister_attribute(&entidz.0).expect("No errors on remove from cache");
|
||||||
|
assert_eq!(attribute_cache.get(&entidz.0), None);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_remove_attribute_not_in_cache() {
|
||||||
|
let (conn, _sqlite) = populate_db();
|
||||||
|
let mut attribute_cache = AttributeCacher::new();
|
||||||
|
|
||||||
|
let schema = conn.current_schema();
|
||||||
|
let kw = kw!(:foo/baz);
|
||||||
|
let entid = schema.get_entid(&kw).expect("Expected entid for attribute").0;
|
||||||
|
assert_eq!(None, attribute_cache.deregister_attribute(&entid));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_fetch_attribute_value_for_entid() {
|
||||||
|
let (conn, mut sqlite) = populate_db();
|
||||||
|
let schema = conn.current_schema();
|
||||||
|
|
||||||
|
let entities = conn.q_once(&sqlite, r#"[:find ?e . :where [?e :foo/bar 100]]"#, None).expect("Expected query to work").into_scalar().expect("expected scalar results");
|
||||||
|
let entid = match entities {
|
||||||
|
Some(TypedValue::Ref(entid)) => entid,
|
||||||
|
x => panic!("expected Some(Ref), got {:?}", x),
|
||||||
|
};
|
||||||
|
|
||||||
|
let kwr = kw!(:foo/bar);
|
||||||
|
let attr_entid = schema.get_entid(&kwr).expect("Expected entid for attribute").0;
|
||||||
|
|
||||||
|
let mut attribute_cache = AttributeCacher::new();
|
||||||
|
|
||||||
|
attribute_cache.register_attribute(&mut sqlite, attr_entid.clone()).expect("No errors on add to cache");
|
||||||
|
let val = attribute_cache.get_value_for_entid(&attr_entid, &entid).expect("Expected value");
|
||||||
|
assert_eq!(*val, TypedValue::Long(100));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_fetch_attribute_values_for_entid() {
|
||||||
|
let (conn, mut sqlite) = populate_db();
|
||||||
|
let schema = conn.current_schema();
|
||||||
|
|
||||||
|
let entities = conn.q_once(&sqlite, r#"[:find ?e . :where [?e :foo/bar 100]]"#, None).expect("Expected query to work").into_scalar().expect("expected scalar results");
|
||||||
|
let entid = match entities {
|
||||||
|
Some(TypedValue::Ref(entid)) => entid,
|
||||||
|
x => panic!("expected Some(Ref), got {:?}", x),
|
||||||
|
};
|
||||||
|
|
||||||
|
let kwp = kw!(:foo/bap);
|
||||||
|
let attr_entid = schema.get_entid(&kwp).expect("Expected entid for attribute").0;
|
||||||
|
|
||||||
|
let mut attribute_cache = AttributeCacher::new();
|
||||||
|
|
||||||
|
attribute_cache.register_attribute(&mut sqlite, attr_entid.clone()).expect("No errors on add to cache");
|
||||||
|
let val = attribute_cache.get_values_for_entid(&attr_entid, &entid).expect("Expected value");
|
||||||
|
assert_eq!(*val, vec![TypedValue::String(Rc::new("buckle my shoe".to_string())), TypedValue::String(Rc::new("one".to_string())), TypedValue::String(Rc::new("two".to_string()))]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
151
src/conn.rs
151
src/conn.rs
|
@ -10,7 +10,7 @@
|
||||||
|
|
||||||
#![allow(dead_code)]
|
#![allow(dead_code)]
|
||||||
|
|
||||||
use std::sync::{Arc, Mutex};
|
use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
|
||||||
|
|
||||||
use rusqlite;
|
use rusqlite;
|
||||||
use rusqlite::{
|
use rusqlite::{
|
||||||
|
@ -48,7 +48,20 @@ use mentat_tx::entities::TempId;
|
||||||
|
|
||||||
use mentat_tx_parser;
|
use mentat_tx_parser;
|
||||||
|
|
||||||
|
use cache::{
|
||||||
|
AttributeCacher,
|
||||||
|
};
|
||||||
|
|
||||||
|
pub use cache::{
|
||||||
|
CacheAction,
|
||||||
|
};
|
||||||
|
|
||||||
|
use entity_builder::{
|
||||||
|
InProgressBuilder,
|
||||||
|
};
|
||||||
|
|
||||||
use errors::*;
|
use errors::*;
|
||||||
|
|
||||||
use query::{
|
use query::{
|
||||||
lookup_value_for_attribute,
|
lookup_value_for_attribute,
|
||||||
lookup_values_for_attribute,
|
lookup_values_for_attribute,
|
||||||
|
@ -59,10 +72,6 @@ use query::{
|
||||||
QueryOutput,
|
QueryOutput,
|
||||||
};
|
};
|
||||||
|
|
||||||
use entity_builder::{
|
|
||||||
InProgressBuilder,
|
|
||||||
};
|
|
||||||
|
|
||||||
/// Connection metadata required to query from, or apply transactions to, a Mentat store.
|
/// Connection metadata required to query from, or apply transactions to, a Mentat store.
|
||||||
///
|
///
|
||||||
/// Owned data for the volatile parts (generation and partition map), and `Arc` for the infrequently
|
/// Owned data for the volatile parts (generation and partition map), and `Arc` for the infrequently
|
||||||
|
@ -102,6 +111,8 @@ pub struct Conn {
|
||||||
|
|
||||||
// TODO: maintain cache of query plans that could be shared across threads and invalidated when
|
// TODO: maintain cache of query plans that could be shared across threads and invalidated when
|
||||||
// the schema changes. #315.
|
// the schema changes. #315.
|
||||||
|
|
||||||
|
attribute_cache: RwLock<AttributeCacher>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A convenience wrapper around a single SQLite connection and a Conn. This is suitable
|
/// A convenience wrapper around a single SQLite connection and a Conn. This is suitable
|
||||||
|
@ -143,6 +154,7 @@ pub struct InProgress<'a, 'c> {
|
||||||
generation: u64,
|
generation: u64,
|
||||||
partition_map: PartitionMap,
|
partition_map: PartitionMap,
|
||||||
schema: Schema,
|
schema: Schema,
|
||||||
|
cache: RwLockWriteGuard<'a, AttributeCacher>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Represents an in-progress set of reads to the store. Just like `InProgress`,
|
/// Represents an in-progress set of reads to the store. Just like `InProgress`,
|
||||||
|
@ -191,12 +203,14 @@ impl<'a, 'c> Queryable for InProgress<'a, 'c> {
|
||||||
|
|
||||||
fn lookup_values_for_attribute<E>(&self, entity: E, attribute: &edn::NamespacedKeyword) -> Result<Vec<TypedValue>>
|
fn lookup_values_for_attribute<E>(&self, entity: E, attribute: &edn::NamespacedKeyword) -> Result<Vec<TypedValue>>
|
||||||
where E: Into<Entid> {
|
where E: Into<Entid> {
|
||||||
lookup_values_for_attribute(&*(self.transaction), &self.schema, entity, attribute)
|
let cc = &*self.cache;
|
||||||
|
lookup_values_for_attribute(&*(self.transaction), &self.schema, cc, entity, attribute)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn lookup_value_for_attribute<E>(&self, entity: E, attribute: &edn::NamespacedKeyword) -> Result<Option<TypedValue>>
|
fn lookup_value_for_attribute<E>(&self, entity: E, attribute: &edn::NamespacedKeyword) -> Result<Option<TypedValue>>
|
||||||
where E: Into<Entid> {
|
where E: Into<Entid> {
|
||||||
lookup_value_for_attribute(&*(self.transaction), &self.schema, entity, attribute)
|
let cc = &*self.cache;
|
||||||
|
lookup_value_for_attribute(&*(self.transaction), &self.schema, cc, entity, attribute)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -385,7 +399,8 @@ impl Conn {
|
||||||
// Intentionally not public.
|
// Intentionally not public.
|
||||||
fn new(partition_map: PartitionMap, schema: Schema) -> Conn {
|
fn new(partition_map: PartitionMap, schema: Schema) -> Conn {
|
||||||
Conn {
|
Conn {
|
||||||
metadata: Mutex::new(Metadata::new(0, partition_map, Arc::new(schema)))
|
metadata: Mutex::new(Metadata::new(0, partition_map, Arc::new(schema))),
|
||||||
|
attribute_cache: RwLock::new(AttributeCacher::new())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -412,6 +427,10 @@ impl Conn {
|
||||||
self.metadata.lock().unwrap().schema.clone()
|
self.metadata.lock().unwrap().schema.clone()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn attribute_cache<'s>(&'s self) -> RwLockReadGuard<'s, AttributeCacher> {
|
||||||
|
self.attribute_cache.read().unwrap()
|
||||||
|
}
|
||||||
|
|
||||||
/// Query the Mentat store, using the given connection and the current metadata.
|
/// Query the Mentat store, using the given connection and the current metadata.
|
||||||
pub fn q_once<T>(&self,
|
pub fn q_once<T>(&self,
|
||||||
sqlite: &rusqlite::Connection,
|
sqlite: &rusqlite::Connection,
|
||||||
|
@ -439,14 +458,16 @@ impl Conn {
|
||||||
sqlite: &rusqlite::Connection,
|
sqlite: &rusqlite::Connection,
|
||||||
entity: Entid,
|
entity: Entid,
|
||||||
attribute: &edn::NamespacedKeyword) -> Result<Vec<TypedValue>> {
|
attribute: &edn::NamespacedKeyword) -> Result<Vec<TypedValue>> {
|
||||||
lookup_values_for_attribute(sqlite, &*self.current_schema(), entity, attribute)
|
let cc: &AttributeCacher = &*self.attribute_cache();
|
||||||
|
lookup_values_for_attribute(sqlite, &*self.current_schema(), cc, entity, attribute)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn lookup_value_for_attribute(&self,
|
pub fn lookup_value_for_attribute(&self,
|
||||||
sqlite: &rusqlite::Connection,
|
sqlite: &rusqlite::Connection,
|
||||||
entity: Entid,
|
entity: Entid,
|
||||||
attribute: &edn::NamespacedKeyword) -> Result<Option<TypedValue>> {
|
attribute: &edn::NamespacedKeyword) -> Result<Option<TypedValue>> {
|
||||||
lookup_value_for_attribute(sqlite, &*self.current_schema(), entity, attribute)
|
let cc: &AttributeCacher = &*self.attribute_cache();
|
||||||
|
lookup_value_for_attribute(sqlite, &*self.current_schema(), cc, entity, attribute)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Take a SQLite transaction.
|
/// Take a SQLite transaction.
|
||||||
|
@ -469,6 +490,7 @@ impl Conn {
|
||||||
generation: current_generation,
|
generation: current_generation,
|
||||||
partition_map: current_partition_map,
|
partition_map: current_partition_map,
|
||||||
schema: (*current_schema).clone(),
|
schema: (*current_schema).clone(),
|
||||||
|
cache: self.attribute_cache.write().unwrap(),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -505,6 +527,34 @@ impl Conn {
|
||||||
|
|
||||||
Ok(report)
|
Ok(report)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: Figure out how to set max cache size and max result size and implement those on cache
|
||||||
|
// Question: Should those be only for lazy cache? The eager cache could perhaps grow infinitely
|
||||||
|
// and it becomes up to the client to manage memory usage by excising from cache when no longer
|
||||||
|
// needed
|
||||||
|
/// Adds or removes the values of a given attribute to an in memory cache
|
||||||
|
/// The attribute should be a namespaced string `:foo/bar`.
|
||||||
|
/// cache_action determines if the attribute should be added or removed from the cache.
|
||||||
|
/// CacheAction::Add is idempotent - each attribute is only added once and cannot be both lazy
|
||||||
|
/// and eager.
|
||||||
|
/// CacheAction::Remove throws an error if the attribute does not currently exist in the cache.
|
||||||
|
/// CacheType::Eager fetches all the values of the attribute and caches them on add.
|
||||||
|
/// CacheType::Lazy caches values only after they have first been fetched.
|
||||||
|
pub fn cache(&mut self,
|
||||||
|
sqlite: &mut rusqlite::Connection,
|
||||||
|
attribute: &NamespacedKeyword,
|
||||||
|
cache_action: CacheAction) -> Result<()> {
|
||||||
|
// fetch the attribute for the given name
|
||||||
|
let schema = self.current_schema();
|
||||||
|
|
||||||
|
let mut cache = self.attribute_cache.write().unwrap();
|
||||||
|
let attribute_entid = schema.get_entid(&attribute).ok_or_else(|| ErrorKind::UnknownAttribute(attribute.to_string()))?;
|
||||||
|
match cache_action {
|
||||||
|
CacheAction::Register => { cache.register_attribute(sqlite, attribute_entid.0)?; },
|
||||||
|
CacheAction::Deregister => { cache.deregister_attribute(&attribute_entid.0); },
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
@ -512,6 +562,9 @@ mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|
||||||
extern crate mentat_parser_utils;
|
extern crate mentat_parser_utils;
|
||||||
|
|
||||||
|
use std::time::Instant;
|
||||||
|
|
||||||
use mentat_core::{
|
use mentat_core::{
|
||||||
TypedValue,
|
TypedValue,
|
||||||
};
|
};
|
||||||
|
@ -708,4 +761,82 @@ mod tests {
|
||||||
x => panic!("expected EDN parse error, got {:?}", x),
|
x => panic!("expected EDN parse error, got {:?}", x),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_add_to_cache_failure_no_attribute() {
|
||||||
|
let mut sqlite = db::new_connection("").unwrap();
|
||||||
|
let mut conn = Conn::connect(&mut sqlite).unwrap();
|
||||||
|
let _report = conn.transact(&mut sqlite, r#"[
|
||||||
|
{ :db/ident :foo/bar
|
||||||
|
:db/valueType :db.type/long },
|
||||||
|
{ :db/ident :foo/baz
|
||||||
|
:db/valueType :db.type/boolean }]"#).unwrap();
|
||||||
|
|
||||||
|
let kw = kw!(:foo/bat);
|
||||||
|
let res = conn.cache(&mut sqlite,&kw, CacheAction::Register);
|
||||||
|
match res.unwrap_err() {
|
||||||
|
Error(ErrorKind::UnknownAttribute(msg), _) => assert_eq!(msg, ":foo/bat"),
|
||||||
|
x => panic!("expected UnknownAttribute error, got {:?}", x),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO expand tests to cover lookup_value_for_attribute comparing with and without caching
|
||||||
|
#[test]
|
||||||
|
fn test_lookup_attribute_with_caching() {
|
||||||
|
|
||||||
|
let mut sqlite = db::new_connection("").unwrap();
|
||||||
|
let mut conn = Conn::connect(&mut sqlite).unwrap();
|
||||||
|
let _report = conn.transact(&mut sqlite, r#"[
|
||||||
|
{ :db/ident :foo/bar
|
||||||
|
:db/valueType :db.type/long },
|
||||||
|
{ :db/ident :foo/baz
|
||||||
|
:db/valueType :db.type/boolean }]"#).expect("transaction expected to succeed");
|
||||||
|
|
||||||
|
{
|
||||||
|
let mut in_progress = conn.begin_transaction(&mut sqlite).expect("transaction");
|
||||||
|
for _ in 1..100 {
|
||||||
|
let _report = in_progress.transact(r#"[
|
||||||
|
{ :foo/bar 100
|
||||||
|
:foo/baz false },
|
||||||
|
{ :foo/bar 200
|
||||||
|
:foo/baz true },
|
||||||
|
{ :foo/bar 100
|
||||||
|
:foo/baz false },
|
||||||
|
{ :foo/bar 300
|
||||||
|
:foo/baz true },
|
||||||
|
{ :foo/bar 400
|
||||||
|
:foo/baz false },
|
||||||
|
{ :foo/bar 500
|
||||||
|
:foo/baz true }]"#).expect("transaction expected to succeed");
|
||||||
|
}
|
||||||
|
in_progress.commit().expect("Committed");
|
||||||
|
}
|
||||||
|
|
||||||
|
let entities = conn.q_once(&sqlite, r#"[:find ?e . :where [?e :foo/bar 400]]"#, None).expect("Expected query to work").into_scalar().expect("expected rel results");
|
||||||
|
let first = entities.expect("expected a result");
|
||||||
|
let entid = match first {
|
||||||
|
TypedValue::Ref(entid) => entid,
|
||||||
|
x => panic!("expected Some(Ref), got {:?}", x),
|
||||||
|
};
|
||||||
|
|
||||||
|
let kw = kw!(:foo/bar);
|
||||||
|
let start = Instant::now();
|
||||||
|
let uncached_val = conn.lookup_value_for_attribute(&sqlite, entid, &kw).expect("Expected value on lookup");
|
||||||
|
let finish = Instant::now();
|
||||||
|
let uncached_elapsed_time = finish.duration_since(start);
|
||||||
|
println!("Uncached time: {:?}", uncached_elapsed_time);
|
||||||
|
|
||||||
|
conn.cache(&mut sqlite, &kw, CacheAction::Register).expect("expected caching to work");
|
||||||
|
|
||||||
|
for _ in 1..5 {
|
||||||
|
let start = Instant::now();
|
||||||
|
let cached_val = conn.lookup_value_for_attribute(&sqlite, entid, &kw).expect("Expected value on lookup");
|
||||||
|
let finish = Instant::now();
|
||||||
|
let cached_elapsed_time = finish.duration_since(start);
|
||||||
|
assert_eq!(cached_val, uncached_val);
|
||||||
|
|
||||||
|
println!("Cached time: {:?}", cached_elapsed_time);
|
||||||
|
assert!(cached_elapsed_time < uncached_elapsed_time);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -73,6 +73,7 @@ macro_rules! kw {
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub mod cache;
|
||||||
pub mod errors;
|
pub mod errors;
|
||||||
pub mod ident;
|
pub mod ident;
|
||||||
pub mod vocabulary;
|
pub mod vocabulary;
|
||||||
|
|
35
src/query.rs
35
src/query.rs
|
@ -69,6 +69,10 @@ use errors::{
|
||||||
Result,
|
Result,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
use cache::{
|
||||||
|
AttributeCacher,
|
||||||
|
};
|
||||||
|
|
||||||
pub type QueryExecutionResult = Result<QueryOutput>;
|
pub type QueryExecutionResult = Result<QueryOutput>;
|
||||||
|
|
||||||
pub trait IntoResult {
|
pub trait IntoResult {
|
||||||
|
@ -170,44 +174,59 @@ fn lookup_attribute(schema: &Schema, attribute: &NamespacedKeyword) -> Result<Kn
|
||||||
/// If the attribute is multi-valued, an arbitrary value is returned.
|
/// If the attribute is multi-valued, an arbitrary value is returned.
|
||||||
/// If no value is present for that entity, `None` is returned.
|
/// If no value is present for that entity, `None` is returned.
|
||||||
/// If `attribute` isn't an attribute, `None` is returned.
|
/// If `attribute` isn't an attribute, `None` is returned.
|
||||||
pub fn lookup_value<'sqlite, 'schema, E, A>
|
pub fn lookup_value<'sqlite, 'schema, 'cache, E, A>
|
||||||
(sqlite: &'sqlite rusqlite::Connection,
|
(sqlite: &'sqlite rusqlite::Connection,
|
||||||
schema: &'schema Schema,
|
schema: &'schema Schema,
|
||||||
|
cache: &'cache AttributeCacher,
|
||||||
entity: E,
|
entity: E,
|
||||||
attribute: A) -> Result<Option<TypedValue>>
|
attribute: A) -> Result<Option<TypedValue>>
|
||||||
where E: Into<Entid>, A: Into<Entid> {
|
where E: Into<Entid>, A: Into<Entid> {
|
||||||
fetch_values(sqlite, schema, entity.into(), attribute.into(), true).into_scalar_result()
|
let entid = entity.into();
|
||||||
|
let attrid = attribute.into();
|
||||||
|
let cached = cache.get_value_for_entid(&attrid, &entid).cloned();
|
||||||
|
if cached.is_some() {
|
||||||
|
return Ok(cached);
|
||||||
|
}
|
||||||
|
fetch_values(sqlite, schema, entid, attrid, true).into_scalar_result()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn lookup_values<'sqlite, 'schema, E, A>
|
pub fn lookup_values<'sqlite, 'schema, 'cache, E, A>
|
||||||
(sqlite: &'sqlite rusqlite::Connection,
|
(sqlite: &'sqlite rusqlite::Connection,
|
||||||
schema: &'schema Schema,
|
schema: &'schema Schema,
|
||||||
|
cache: &'cache AttributeCacher,
|
||||||
entity: E,
|
entity: E,
|
||||||
attribute: A) -> Result<Vec<TypedValue>>
|
attribute: A) -> Result<Vec<TypedValue>>
|
||||||
where E: Into<Entid>, A: Into<Entid> {
|
where E: Into<Entid>, A: Into<Entid> {
|
||||||
fetch_values(sqlite, schema, entity.into(), attribute.into(), false).into_coll_result()
|
let entid = entity.into();
|
||||||
|
let attrid = attribute.into();
|
||||||
|
if let Some(cached) = cache.get_values_for_entid(&attrid, &entid).cloned() {
|
||||||
|
return Ok(cached);
|
||||||
|
}
|
||||||
|
fetch_values(sqlite, schema, entid, attrid, false).into_coll_result()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Return a single value for the provided entity and attribute.
|
/// Return a single value for the provided entity and attribute.
|
||||||
/// If the attribute is multi-valued, an arbitrary value is returned.
|
/// If the attribute is multi-valued, an arbitrary value is returned.
|
||||||
/// If no value is present for that entity, `None` is returned.
|
/// If no value is present for that entity, `None` is returned.
|
||||||
/// If `attribute` doesn't name an attribute, an error is returned.
|
/// If `attribute` doesn't name an attribute, an error is returned.
|
||||||
pub fn lookup_value_for_attribute<'sqlite, 'schema, 'attribute, E>
|
pub fn lookup_value_for_attribute<'sqlite, 'schema, 'cache, 'attribute, E>
|
||||||
(sqlite: &'sqlite rusqlite::Connection,
|
(sqlite: &'sqlite rusqlite::Connection,
|
||||||
schema: &'schema Schema,
|
schema: &'schema Schema,
|
||||||
|
cache: &'cache AttributeCacher,
|
||||||
entity: E,
|
entity: E,
|
||||||
attribute: &'attribute NamespacedKeyword) -> Result<Option<TypedValue>>
|
attribute: &'attribute NamespacedKeyword) -> Result<Option<TypedValue>>
|
||||||
where E: Into<Entid> {
|
where E: Into<Entid> {
|
||||||
lookup_value(sqlite, schema, entity.into(), lookup_attribute(schema, attribute)?)
|
lookup_value(sqlite, schema, cache, entity.into(), lookup_attribute(schema, attribute)?)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn lookup_values_for_attribute<'sqlite, 'schema, 'attribute, E>
|
pub fn lookup_values_for_attribute<'sqlite, 'schema, 'cache, 'attribute, E>
|
||||||
(sqlite: &'sqlite rusqlite::Connection,
|
(sqlite: &'sqlite rusqlite::Connection,
|
||||||
schema: &'schema Schema,
|
schema: &'schema Schema,
|
||||||
|
cache: &'cache AttributeCacher,
|
||||||
entity: E,
|
entity: E,
|
||||||
attribute: &'attribute NamespacedKeyword) -> Result<Vec<TypedValue>>
|
attribute: &'attribute NamespacedKeyword) -> Result<Vec<TypedValue>>
|
||||||
where E: Into<Entid> {
|
where E: Into<Entid> {
|
||||||
lookup_values(sqlite, schema, entity.into(), lookup_attribute(schema, attribute)?)
|
lookup_values(sqlite, schema, cache, entity.into(), lookup_attribute(schema, attribute)?)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn run_statement<'sqlite, 'stmt, 'bound>
|
fn run_statement<'sqlite, 'stmt, 'bound>
|
||||||
|
|
Loading…
Reference in a new issue