Update cache on write. (#566) r=emily

* Use the cache to make constant queries super fast.
* Fix translate tests to match: we no longer generate SQL for many of them!
* Accumulate additions and removals into the cache.
    * Make attribute cache clone-on-write; store it in Metadata.
    * Allow caching of fulltext attributes, interning strings.
This commit is contained in:
Richard Newman 2018-03-06 09:01:20 -08:00
parent bead9752bd
commit f42ae35b70
21 changed files with 1795 additions and 349 deletions

View file

@ -23,6 +23,8 @@ use ::{
pub trait CachedAttributes {
fn is_attribute_cached_reverse(&self, entid: Entid) -> bool;
fn is_attribute_cached_forward(&self, entid: Entid) -> bool;
fn has_cached_attributes(&self) -> bool;
fn get_values_for_entid(&self, schema: &Schema, attribute: Entid, entid: Entid) -> Option<&Vec<TypedValue>>;
fn get_value_for_entid(&self, schema: &Schema, attribute: Entid, entid: Entid) -> Option<&TypedValue>;
@ -30,3 +32,9 @@ pub trait CachedAttributes {
fn get_entid_for_value(&self, attribute: Entid, value: &TypedValue) -> Option<Entid>;
fn get_entids_for_value(&self, attribute: Entid, value: &TypedValue) -> Option<&BTreeSet<Entid>>;
}
pub trait UpdateableCache {
type Error;
fn update<I>(&mut self, schema: &Schema, retractions: I, assertions: I) -> Result<(), Self::Error>
where I: Iterator<Item=(Entid, Entid, TypedValue)>;
}

View file

@ -51,7 +51,10 @@ pub use edn::{
Utc,
};
pub use cache::CachedAttributes;
pub use cache::{
CachedAttributes,
UpdateableCache,
};
/// Core types defining a Mentat knowledge base.

View file

@ -7,6 +7,7 @@ workspace = ".."
error-chain = { git = "https://github.com/rnewman/error-chain", branch = "rnewman/sync" }
itertools = "0.7"
lazy_static = "0.2"
num = "0.1"
ordered-float = "0.5"
time = "0.1"

File diff suppressed because it is too large Load diff

View file

@ -66,6 +66,10 @@ use types::{
};
use tx::transact;
use watcher::{
NullWatcher,
};
pub fn new_connection<T>(uri: T) -> rusqlite::Result<rusqlite::Connection> where T: AsRef<Path> {
let conn = match uri.as_ref().to_string_lossy().len() {
0 => rusqlite::Connection::open_in_memory()?,
@ -249,7 +253,8 @@ pub fn create_current_version(conn: &mut rusqlite::Connection) -> Result<DB> {
// TODO: return to transact_internal to self-manage the encompassing SQLite transaction.
let bootstrap_schema_for_mutation = Schema::default(); // The bootstrap transaction will populate this schema.
let (_report, next_partition_map, next_schema) = transact(&tx, db.partition_map, &bootstrap_schema_for_mutation, &db.schema, bootstrap::bootstrap_entities())?;
let (_report, next_partition_map, next_schema, _watcher) = transact(&tx, db.partition_map, &bootstrap_schema_for_mutation, &db.schema, NullWatcher(), bootstrap::bootstrap_entities())?;
// TODO: validate metadata mutations that aren't schema related, like additional partitions.
if let Some(next_schema) = next_schema {
if next_schema != db.schema {
@ -1218,12 +1223,12 @@ mod tests {
// We're about to write, so go straight ahead and get an IMMEDIATE transaction.
let tx = self.sqlite.transaction_with_behavior(TransactionBehavior::Immediate)?;
// Applying the transaction can fail, so we don't unwrap.
let details = transact(&tx, self.partition_map.clone(), &self.schema, &self.schema, entities)?;
let details = transact(&tx, self.partition_map.clone(), &self.schema, &self.schema, NullWatcher(), entities)?;
tx.commit()?;
details
};
let (report, next_partition_map, next_schema) = details;
let (report, next_partition_map, next_schema, _watcher) = details;
self.partition_map = next_partition_map;
if let Some(next_schema) = next_schema {
self.schema = next_schema;

View file

@ -17,6 +17,8 @@ extern crate itertools;
#[macro_use]
extern crate lazy_static;
extern crate num;
extern crate rusqlite;
extern crate tabwriter;
extern crate time;
@ -43,6 +45,7 @@ pub mod errors;
pub mod internal_types; // pub because we need them for building entities programmatically.
mod metadata;
mod schema;
mod watcher;
mod tx;
pub mod types;
mod upsert_resolution;
@ -73,6 +76,10 @@ pub use db::{
new_connection,
};
pub use watcher::{
TransactWatcher,
};
pub use tx::{
transact,
transact_terms,

View file

@ -51,6 +51,7 @@ use std::collections::{
BTreeSet,
VecDeque,
};
use std::rc::Rc;
use db;
@ -113,11 +114,16 @@ use types::{
TxReport,
ValueType,
};
use watcher::{
TransactWatcher,
};
use upsert_resolution::Generation;
/// A transaction on its way to being applied.
#[derive(Debug)]
pub struct Tx<'conn, 'a> {
pub struct Tx<'conn, 'a, W> where W: TransactWatcher {
/// The storage to apply against. In the future, this will be a Mentat connection.
store: &'conn rusqlite::Connection, // TODO: db::MentatStoring,
@ -138,6 +144,8 @@ pub struct Tx<'conn, 'a> {
/// This schema is not updated, so we just borrow it.
schema: &'a Schema,
watcher: W,
/// The transaction ID of the transaction.
tx_id: Entid,
@ -145,18 +153,20 @@ pub struct Tx<'conn, 'a> {
tx_instant: Option<DateTime<Utc>>,
}
impl<'conn, 'a> Tx<'conn, 'a> {
impl<'conn, 'a, W> Tx<'conn, 'a, W> where W: TransactWatcher {
pub fn new(
store: &'conn rusqlite::Connection,
partition_map: PartitionMap,
schema_for_mutation: &'a Schema,
schema: &'a Schema,
tx_id: Entid) -> Tx<'conn, 'a> {
watcher: W,
tx_id: Entid) -> Tx<'conn, 'a, W> {
Tx {
store: store,
partition_map: partition_map,
schema_for_mutation: Cow::Borrowed(schema_for_mutation),
schema: schema,
watcher: watcher,
tx_id: tx_id,
tx_instant: None,
}
@ -516,7 +526,9 @@ impl<'conn, 'a> Tx<'conn, 'a> {
///
/// This approach is explained in https://github.com/mozilla/mentat/wiki/Transacting.
// TODO: move this to the transactor layer.
pub fn transact_entities<I>(&mut self, entities: I) -> Result<TxReport> where I: IntoIterator<Item=Entity> {
pub fn transact_entities<I>(&mut self, entities: I) -> Result<TxReport>
where I: IntoIterator<Item=Entity>,
W: TransactWatcher {
// Pipeline stage 1: entities -> terms with tempids and lookup refs.
let (terms_with_temp_ids_and_lookup_refs, tempid_set, lookup_ref_set) = self.entities_into_terms_with_temp_ids_and_lookup_refs(entities)?;
@ -529,7 +541,9 @@ impl<'conn, 'a> Tx<'conn, 'a> {
self.transact_simple_terms(terms_with_temp_ids, tempid_set)
}
pub fn transact_simple_terms<I>(&mut self, terms: I, tempid_set: InternSet<TempId>) -> Result<TxReport> where I: IntoIterator<Item=TermWithTempIds> {
pub fn transact_simple_terms<I>(&mut self, terms: I, tempid_set: InternSet<TempId>) -> Result<TxReport>
where I: IntoIterator<Item=TermWithTempIds>,
W: TransactWatcher {
// TODO: push these into an internal transaction report?
let mut tempids: BTreeMap<TempId, KnownEntid> = BTreeMap::default();
@ -654,6 +668,8 @@ impl<'conn, 'a> Tx<'conn, 'a> {
}
}
self.watcher.datom(op, e, a, &v);
let reduced = (e, a, attribute, v, added);
match (attribute.fulltext, attribute.multival) {
(false, true) => non_fts_many.push(reduced),
@ -694,6 +710,7 @@ impl<'conn, 'a> Tx<'conn, 'a> {
}
db::update_partition_map(self.store, &self.partition_map)?;
self.watcher.done(self.schema)?;
if tx_might_update_metadata {
// Extract changes to metadata from the store.
@ -723,24 +740,27 @@ impl<'conn, 'a> Tx<'conn, 'a> {
}
/// Initialize a new Tx object with a new tx id and a tx instant. Kick off the SQLite conn, too.
fn start_tx<'conn, 'a>(conn: &'conn rusqlite::Connection,
fn start_tx<'conn, 'a, W>(conn: &'conn rusqlite::Connection,
mut partition_map: PartitionMap,
schema_for_mutation: &'a Schema,
schema: &'a Schema) -> Result<Tx<'conn, 'a>> {
schema: &'a Schema,
watcher: W) -> Result<Tx<'conn, 'a, W>>
where W: TransactWatcher {
let tx_id = partition_map.allocate_entid(":db.part/tx");
conn.begin_tx_application()?;
Ok(Tx::new(conn, partition_map, schema_for_mutation, schema, tx_id))
Ok(Tx::new(conn, partition_map, schema_for_mutation, schema, watcher, tx_id))
}
fn conclude_tx(tx: Tx, report: TxReport) -> Result<(TxReport, PartitionMap, Option<Schema>)> {
fn conclude_tx<W>(tx: Tx<W>, report: TxReport) -> Result<(TxReport, PartitionMap, Option<Schema>, W)>
where W: TransactWatcher {
// If the schema has moved on, return it.
let next_schema = match tx.schema_for_mutation {
Cow::Borrowed(_) => None,
Cow::Owned(next_schema) => Some(next_schema),
};
Ok((report, tx.partition_map, next_schema))
Ok((report, tx.partition_map, next_schema, tx.watcher))
}
/// Transact the given `entities` against the given SQLite `conn`, using the given metadata.
@ -749,28 +769,32 @@ fn conclude_tx(tx: Tx, report: TxReport) -> Result<(TxReport, PartitionMap, Opti
///
/// This approach is explained in https://github.com/mozilla/mentat/wiki/Transacting.
// TODO: move this to the transactor layer.
pub fn transact<'conn, 'a, I>(conn: &'conn rusqlite::Connection,
partition_map: PartitionMap,
schema_for_mutation: &'a Schema,
schema: &'a Schema,
entities: I) -> Result<(TxReport, PartitionMap, Option<Schema>)>
where I: IntoIterator<Item=Entity> {
pub fn transact<'conn, 'a, I, W>(conn: &'conn rusqlite::Connection,
partition_map: PartitionMap,
schema_for_mutation: &'a Schema,
schema: &'a Schema,
watcher: W,
entities: I) -> Result<(TxReport, PartitionMap, Option<Schema>, W)>
where I: IntoIterator<Item=Entity>,
W: TransactWatcher {
let mut tx = start_tx(conn, partition_map, schema_for_mutation, schema)?;
let mut tx = start_tx(conn, partition_map, schema_for_mutation, schema, watcher)?;
let report = tx.transact_entities(entities)?;
conclude_tx(tx, report)
}
/// Just like `transact`, but accepts lower-level inputs to allow bypassing the parser interface.
pub fn transact_terms<'conn, 'a, I>(conn: &'conn rusqlite::Connection,
partition_map: PartitionMap,
schema_for_mutation: &'a Schema,
schema: &'a Schema,
terms: I,
tempid_set: InternSet<TempId>) -> Result<(TxReport, PartitionMap, Option<Schema>)>
where I: IntoIterator<Item=TermWithTempIds> {
pub fn transact_terms<'conn, 'a, I, W>(conn: &'conn rusqlite::Connection,
partition_map: PartitionMap,
schema_for_mutation: &'a Schema,
schema: &'a Schema,
watcher: W,
terms: I,
tempid_set: InternSet<TempId>) -> Result<(TxReport, PartitionMap, Option<Schema>, W)>
where I: IntoIterator<Item=TermWithTempIds>,
W: TransactWatcher {
let mut tx = start_tx(conn, partition_map, schema_for_mutation, schema)?;
let mut tx = start_tx(conn, partition_map, schema_for_mutation, schema, watcher)?;
let report = tx.transact_simple_terms(terms, tempid_set)?;
conclude_tx(tx, report)
}

53
db/src/watcher.rs Normal file
View file

@ -0,0 +1,53 @@
// Copyright 2018 Mozilla
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// this file except in compliance with the License. You may obtain a copy of the
// License at http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
// A trivial interface for extracting information from a transact as it happens.
// We have two situations in which we need to do this:
//
// - InProgress and Conn both have attribute caches. InProgress's is different from Conn's,
// because it needs to be able to roll back. These wish to see changes in a certain set of
// attributes in order to synchronously update the cache during a write.
// - When observers are registered we want to flip some flags as writes occur so that we can
// notifying them outside the transaction.
use mentat_core::{
Entid,
Schema,
TypedValue,
};
use mentat_tx::entities::{
OpType,
};
use errors::{
Result,
};
pub trait TransactWatcher {
fn datom(&mut self, op: OpType, e: Entid, a: Entid, v: &TypedValue);
/// Only return an error if you want to interrupt the transact!
/// Called with the schema _prior to_ the transact -- any attributes or
/// attribute changes transacted during this transact are not reflected in
/// the schema.
fn done(&mut self, schema: &Schema) -> Result<()>;
}
pub struct NullWatcher();
impl TransactWatcher for NullWatcher {
fn datom(&mut self, _op: OpType, _e: Entid, _a: Entid, _v: &TypedValue) {
}
fn done(&mut self, _schema: &Schema) -> Result<()> {
Ok(())
}
}

View file

@ -156,7 +156,7 @@ impl<K: Clone + Ord, V: Clone> Intersection<K> for BTreeMap<K, V> {
}
}
type VariableBindings = BTreeMap<Variable, TypedValue>;
pub type VariableBindings = BTreeMap<Variable, TypedValue>;
/// A `ConjoiningClauses` (CC) is a collection of clauses that are combined with `JOIN`.
/// The topmost form in a query is a `ConjoiningClauses`.
@ -393,6 +393,10 @@ impl ConjoiningClauses {
self.value_bindings.contains_key(var)
}
pub fn value_bindings(&self, variables: &BTreeSet<Variable>) -> VariableBindings {
self.value_bindings.with_intersected_keys(variables)
}
/// Return an interator over the variables externally bound to values.
pub fn value_bound_variables(&self) -> ::std::collections::btree_map::Keys<Variable, TypedValue> {
self.value_bindings.keys()

View file

@ -381,7 +381,6 @@ impl ConjoiningClauses {
return true;
},
Some(item) => {
println!("{} is known to be {:?}", var, item);
self.bind_value(var, item.clone());
return true;
}

View file

@ -38,6 +38,7 @@ use mentat_core::{
use mentat_core::counter::RcCounter;
use mentat_query::{
Element,
FindQuery,
FindSpec,
Limit,
@ -55,6 +56,7 @@ pub use errors::{
pub use clauses::{
QueryInputs,
VariableBindings,
};
pub use types::{
@ -140,6 +142,23 @@ impl AlgebraicQuery {
self.cc.is_known_empty()
}
/// Return true if every variable in the find spec is fully bound to a single value.
pub fn is_fully_bound(&self) -> bool {
self.find_spec
.columns()
.all(|e| match e {
&Element::Variable(ref var) => self.cc.is_value_bound(var),
})
}
/// Return true if every variable in the find spec is fully bound to a single value,
/// and evaluating the query doesn't require running SQL.
pub fn is_fully_unit_bound(&self) -> bool {
self.cc.wheres.is_empty() &&
self.is_fully_bound()
}
/// Return a set of the input variables mentioned in the `:in` clause that have not yet been
/// bound. We do this by looking at the CC.
pub fn unbound_variables(&self) -> BTreeSet<Variable> {

View file

@ -19,6 +19,10 @@ extern crate mentat_query_algebrizer;
extern crate mentat_query_sql;
extern crate mentat_sql;
use std::collections::{
BTreeSet,
};
use std::iter;
use std::rc::Rc;
@ -34,6 +38,10 @@ use mentat_core::{
ValueTypeTag,
};
use mentat_core::util::{
Either,
};
use mentat_db::{
TypedSQLValue,
};
@ -49,6 +57,7 @@ use mentat_query_algebrizer::{
AlgebraicQuery,
ColumnName,
ConjoiningClauses,
VariableBindings,
VariableColumn,
};
@ -86,7 +95,7 @@ pub struct QueryOutput {
pub results: QueryResults,
}
#[derive(Debug, PartialEq, Eq)]
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum QueryResults {
Scalar(Option<TypedValue>),
Tuple(Option<Vec<TypedValue>>),
@ -134,6 +143,32 @@ impl QueryOutput {
}
}
pub fn from_constants(spec: &Rc<FindSpec>, bindings: VariableBindings) -> QueryResults {
use self::FindSpec::*;
match &**spec {
&FindScalar(Element::Variable(ref var)) => {
let val = bindings.get(var).cloned();
QueryResults::Scalar(val)
},
&FindTuple(ref elements) => {
let values = elements.iter().map(|e| match e {
&Element::Variable(ref var) => bindings.get(var).cloned().expect("every var to have a binding"),
}).collect();
QueryResults::Tuple(Some(values))
},
&FindColl(Element::Variable(ref var)) => {
let val = bindings.get(var).cloned().expect("every var to have a binding");
QueryResults::Coll(vec![val])
},
&FindRel(ref elements) => {
let values = elements.iter().map(|e| match e {
&Element::Variable(ref var) => bindings.get(var).cloned().expect("every var to have a binding"),
}).collect();
QueryResults::Rel(vec![values])
},
}
}
pub fn into_scalar(self) -> Result<Option<TypedValue>> {
self.results.into_scalar()
}
@ -350,11 +385,12 @@ fn project_elements<'a, I: IntoIterator<Item = &'a Element>>(
pub trait Projector {
fn project<'stmt>(&self, rows: Rows<'stmt>) -> Result<QueryOutput>;
fn columns<'s>(&'s self) -> Box<Iterator<Item=&Element> + 's>;
}
/// A projector that produces a `QueryResult` containing fixed data.
/// Takes a boxed function that should return an empty result set of the desired type.
struct ConstantProjector {
pub struct ConstantProjector {
spec: Rc<FindSpec>,
results_factory: Box<Fn() -> QueryResults>,
}
@ -366,10 +402,8 @@ impl ConstantProjector {
results_factory: results_factory,
}
}
}
impl Projector for ConstantProjector {
fn project<'stmt>(&self, _: Rows<'stmt>) -> Result<QueryOutput> {
pub fn project_without_rows<'stmt>(&self) -> Result<QueryOutput> {
let results = (self.results_factory)();
let spec = self.spec.clone();
Ok(QueryOutput {
@ -379,6 +413,16 @@ impl Projector for ConstantProjector {
}
}
impl Projector for ConstantProjector {
fn project<'stmt>(&self, _: Rows<'stmt>) -> Result<QueryOutput> {
self.project_without_rows()
}
fn columns<'s>(&'s self) -> Box<Iterator<Item=&Element> + 's> {
self.spec.columns()
}
}
struct ScalarProjector {
spec: Rc<FindSpec>,
template: TypedIndex,
@ -417,6 +461,10 @@ impl Projector for ScalarProjector {
results: results,
})
}
fn columns<'s>(&'s self) -> Box<Iterator<Item=&Element> + 's> {
self.spec.columns()
}
}
/// A tuple projector produces a single vector. It's the single-result version of rel.
@ -470,6 +518,10 @@ impl Projector for TupleProjector {
results: results,
})
}
fn columns<'s>(&'s self) -> Box<Iterator<Item=&Element> + 's> {
self.spec.columns()
}
}
/// A rel projector produces a vector of vectors.
@ -524,6 +576,10 @@ impl Projector for RelProjector {
results: QueryResults::Rel(out),
})
}
fn columns<'s>(&'s self) -> Box<Iterator<Item=&Element> + 's> {
self.spec.columns()
}
}
/// A coll projector produces a vector of values.
@ -564,6 +620,10 @@ impl Projector for CollProjector {
results: QueryResults::Coll(out),
})
}
fn columns<'s>(&'s self) -> Box<Iterator<Item=&Element> + 's> {
self.spec.columns()
}
}
/// Combines the two things you need to turn a query into SQL and turn its results into
@ -598,19 +658,24 @@ impl CombinedProjection {
/// - The bindings established by the topmost CC.
/// - The types known at algebrizing time.
/// - The types extracted from the store for unknown attributes.
pub fn query_projection(query: &AlgebraicQuery) -> Result<CombinedProjection> {
pub fn query_projection(query: &AlgebraicQuery) -> Result<Either<ConstantProjector, CombinedProjection>> {
use self::FindSpec::*;
let spec = query.find_spec.clone();
if query.is_known_empty() {
if query.is_fully_unit_bound() {
// Do a few gyrations to produce empty results of the right kind for the query.
let variables: BTreeSet<Variable> = spec.columns().map(|e| match e { &Element::Variable(ref var) => var.clone() }).collect();
// TODO: error handling
let results = QueryOutput::from_constants(&spec, query.cc.value_bindings(&variables));
let f = Box::new(move || {results.clone()});
Ok(Either::Left(ConstantProjector::new(spec, f)))
} else if query.is_known_empty() {
// Do a few gyrations to produce empty results of the right kind for the query.
let empty = QueryOutput::empty_factory(&spec);
let constant_projector = ConstantProjector::new(spec, empty);
Ok(CombinedProjection {
sql_projection: Projection::One,
datalog_projector: Box::new(constant_projector),
distinct: false,
})
Ok(Either::Left(ConstantProjector::new(spec, empty)))
} else {
match *query.find_spec {
FindColl(ref element) => {
@ -634,6 +699,6 @@ pub fn query_projection(query: &AlgebraicQuery) -> Result<CombinedProjection> {
let (cols, templates) = project_elements(column_count, elements, query)?;
TupleProjector::combine(spec, column_count, cols, templates)
},
}
}.map(Either::Right)
}
}

View file

@ -24,6 +24,7 @@ pub use mentat_query_sql::{
};
pub use translate::{
ProjectedSelect,
cc_to_exists,
query_to_select,
};

View file

@ -17,7 +17,13 @@ use mentat_core::{
ValueTypeSet,
};
use mentat_query::Limit;
use mentat_core::util::{
Either,
};
use mentat_query::{
Limit,
};
use mentat_query_algebrizer::{
AlgebraicQuery,
@ -40,6 +46,7 @@ use mentat_query_algebrizer::{
use mentat_query_projector::{
CombinedProjection,
ConstantProjector,
Projector,
projected_column_for_var,
query_projection,
@ -237,9 +244,12 @@ impl ToConstraint for ColumnConstraint {
}
}
pub struct ProjectedSelect{
pub query: SelectQuery,
pub projector: Box<Projector>,
pub enum ProjectedSelect {
Constant(ConstantProjector),
Query {
query: SelectQuery,
projector: Box<Projector>,
},
}
// Nasty little hack to let us move out of indexed context.
@ -325,6 +335,17 @@ fn table_for_computed(computed: ComputedTable, alias: TableAlias) -> TableOrSubq
}
}
fn empty_query() -> SelectQuery {
SelectQuery {
distinct: false,
projection: Projection::One,
from: FromClause::Nothing,
constraints: vec![],
order: vec![],
limit: Limit::None,
}
}
/// Returns a `SelectQuery` that queries for the provided `cc`. Note that this _always_ returns a
/// query that runs SQL. The next level up the call stack can check for known-empty queries if
/// needed.
@ -380,14 +401,7 @@ fn cc_to_select_query(projection: Projection,
pub fn cc_to_exists(cc: ConjoiningClauses) -> SelectQuery {
if cc.is_known_empty() {
// In this case we can produce a very simple query that returns no results.
SelectQuery {
distinct: false,
projection: Projection::One,
from: FromClause::Nothing,
constraints: vec![],
order: vec![],
limit: Limit::None,
}
empty_query()
} else {
cc_to_select_query(Projection::One, cc, false, None, Limit::None)
}
@ -398,9 +412,14 @@ pub fn cc_to_exists(cc: ConjoiningClauses) -> SelectQuery {
pub fn query_to_select(query: AlgebraicQuery) -> Result<ProjectedSelect> {
// TODO: we can't pass `query.limit` here if we aggregate during projection.
// SQL-based aggregation -- `SELECT SUM(datoms00.e)` -- is fine.
let CombinedProjection { sql_projection, datalog_projector, distinct } = query_projection(&query)?;
Ok(ProjectedSelect {
query: cc_to_select_query(sql_projection, query.cc, distinct, query.order, query.limit),
projector: datalog_projector,
})
query_projection(&query).map(|e| match e {
Either::Left(constant) => ProjectedSelect::Constant(constant),
Either::Right(CombinedProjection { sql_projection, datalog_projector, distinct, }) => {
let q = cc_to_select_query(sql_projection, query.cc, distinct, query.order, query.limit);
ProjectedSelect::Query {
query: q,
projector: datalog_projector,
}
},
}).map_err(|e| e.into())
}

View file

@ -12,6 +12,7 @@ extern crate mentat_core;
extern crate mentat_query;
extern crate mentat_query_algebrizer;
extern crate mentat_query_parser;
extern crate mentat_query_projector;
extern crate mentat_query_translator;
extern crate mentat_sql;
@ -20,6 +21,7 @@ use std::collections::BTreeMap;
use std::rc::Rc;
use mentat_query::{
FindSpec,
NamespacedKeyword,
Variable,
};
@ -39,12 +41,27 @@ use mentat_query_algebrizer::{
algebrize,
algebrize_with_inputs,
};
use mentat_query_projector::{
ConstantProjector,
};
use mentat_query_translator::{
ProjectedSelect,
query_to_select,
};
use mentat_sql::SQLQuery;
/// Produce the appropriate `Variable` for the provided valid ?-prefixed name.
/// This lives here because we can't re-export macros:
/// https://github.com/rust-lang/rust/issues/29638.
macro_rules! var {
( ? $var:ident ) => {
$crate::Variable::from_valid_name(concat!("?", stringify!($var)))
};
}
fn associate_ident(schema: &mut Schema, i: NamespacedKeyword, e: Entid) {
schema.entid_map.insert(e, i.clone());
schema.ident_map.insert(i.clone(), e);
@ -54,18 +71,56 @@ fn add_attribute(schema: &mut Schema, e: Entid, a: Attribute) {
schema.attribute_map.insert(e, a);
}
fn translate_with_inputs(schema: &Schema, query: &'static str, inputs: QueryInputs) -> SQLQuery {
fn query_to_sql(query: ProjectedSelect) -> SQLQuery {
match query {
ProjectedSelect::Query { query, projector: _projector } => {
query.to_sql_query().expect("to_sql_query to succeed")
},
ProjectedSelect::Constant(constant) => {
panic!("ProjectedSelect wasn't ::Query! Got constant {:#?}", constant.project_without_rows());
},
}
}
fn query_to_constant(query: ProjectedSelect) -> ConstantProjector {
match query {
ProjectedSelect::Constant(constant) => {
constant
},
_ => panic!("ProjectedSelect wasn't ::Constant!"),
}
}
fn assert_query_is_empty(query: ProjectedSelect, expected_spec: FindSpec) {
let constant = query_to_constant(query).project_without_rows().expect("constant run");
assert_eq!(*constant.spec, expected_spec);
assert!(constant.results.is_empty());
}
fn inner_translate_with_inputs(schema: &Schema, query: &'static str, inputs: QueryInputs) -> ProjectedSelect {
let known = Known::for_schema(schema);
let parsed = parse_find_string(query).expect("parse to succeed");
let algebrized = algebrize_with_inputs(known, parsed, 0, inputs).expect("algebrize to succeed");
let select = query_to_select(algebrized).expect("translate to succeed");
select.query.to_sql_query().unwrap()
query_to_select(algebrized).expect("translate to succeed")
}
fn translate_with_inputs(schema: &Schema, query: &'static str, inputs: QueryInputs) -> SQLQuery {
query_to_sql(inner_translate_with_inputs(schema, query, inputs))
}
fn translate(schema: &Schema, query: &'static str) -> SQLQuery {
translate_with_inputs(schema, query, QueryInputs::default())
}
fn translate_with_inputs_to_constant(schema: &Schema, query: &'static str, inputs: QueryInputs) -> ConstantProjector {
query_to_constant(inner_translate_with_inputs(schema, query, inputs))
}
fn translate_to_constant(schema: &Schema, query: &'static str) -> ConstantProjector {
translate_with_inputs_to_constant(schema, query, QueryInputs::default())
}
fn prepopulated_typed_schema(foo_type: ValueType) -> Schema {
let mut schema = Schema::default();
associate_ident(&mut schema, NamespacedKeyword::new("foo", "bar"), 99);
@ -195,7 +250,7 @@ fn test_bound_variable_limit_affects_types() {
algebrized.cc.known_type(&Variable::from_valid_name("?limit")));
let select = query_to_select(algebrized).expect("query to translate");
let SQLQuery { sql, args } = select.query.to_sql_query().unwrap();
let SQLQuery { sql, args } = query_to_sql(select);
// TODO: this query isn't actually correct -- we don't yet algebrize for variables that are
// specified in `:in` but not provided at algebrizing time. But it shows what we care about
@ -286,8 +341,7 @@ fn test_unknown_ident() {
// If you insist…
let select = query_to_select(algebrized).expect("query to translate");
let sql = select.query.to_sql_query().unwrap().sql;
assert_eq!("SELECT 1 LIMIT 0", sql);
assert_query_is_empty(select, FindSpec::FindRel(vec![var!(?x).into()]));
}
#[test]
@ -678,16 +732,18 @@ fn test_ground_scalar() {
// Verify that we accept inline constants.
let query = r#"[:find ?x . :where [(ground "yyy") ?x]]"#;
let SQLQuery { sql, args } = translate(&schema, query);
assert_eq!(sql, "SELECT $v0 AS `?x` LIMIT 1");
assert_eq!(args, vec![make_arg("$v0", "yyy")]);
let constant = translate_to_constant(&schema, query);
assert_eq!(constant.project_without_rows().unwrap()
.into_scalar().unwrap(),
Some(TypedValue::typed_string("yyy")));
// Verify that we accept bound input constants.
let query = r#"[:find ?x . :in ?v :where [(ground ?v) ?x]]"#;
let inputs = QueryInputs::with_value_sequence(vec![(Variable::from_valid_name("?v"), TypedValue::String(Rc::new("aaa".into())))]);
let SQLQuery { sql, args } = translate_with_inputs(&schema, query, inputs);
assert_eq!(sql, "SELECT $v0 AS `?x` LIMIT 1");
assert_eq!(args, vec![make_arg("$v0", "aaa"),]);
let constant = translate_with_inputs_to_constant(&schema, query, inputs);
assert_eq!(constant.project_without_rows().unwrap()
.into_scalar().unwrap(),
Some(TypedValue::typed_string("aaa")));
}
#[test]
@ -696,18 +752,26 @@ fn test_ground_tuple() {
// Verify that we accept inline constants.
let query = r#"[:find ?x ?y :where [(ground [1 "yyy"]) [?x ?y]]]"#;
let SQLQuery { sql, args } = translate(&schema, query);
assert_eq!(sql, "SELECT DISTINCT 1 AS `?x`, $v0 AS `?y`");
assert_eq!(args, vec![make_arg("$v0", "yyy")]);
let constant = translate_to_constant(&schema, query);
assert_eq!(constant.project_without_rows().unwrap()
.into_rel().unwrap(),
vec![vec![TypedValue::Long(1), TypedValue::typed_string("yyy")]]);
// Verify that we accept bound input constants.
let query = r#"[:find [?x ?y] :in ?u ?v :where [(ground [?u ?v]) [?x ?y]]]"#;
let inputs = QueryInputs::with_value_sequence(vec![(Variable::from_valid_name("?u"), TypedValue::Long(2)),
(Variable::from_valid_name("?v"), TypedValue::String(Rc::new("aaa".into()))),]);
let SQLQuery { sql, args } = translate_with_inputs(&schema, query, inputs);
let constant = translate_with_inputs_to_constant(&schema, query, inputs);
assert_eq!(constant.project_without_rows().unwrap()
.into_tuple().unwrap(),
Some(vec![TypedValue::Long(2), TypedValue::typed_string("aaa")]));
// TODO: treat 2 as an input variable that could be bound late, rather than eagerly binding it.
assert_eq!(sql, "SELECT 2 AS `?x`, $v0 AS `?y` LIMIT 1");
assert_eq!(args, vec![make_arg("$v0", "aaa"),]);
// In that case the query wouldn't be constant, and would look more like:
// let SQLQuery { sql, args } = translate_with_inputs(&schema, query, inputs);
// assert_eq!(sql, "SELECT 2 AS `?x`, $v0 AS `?y` LIMIT 1");
// assert_eq!(args, vec![make_arg("$v0", "aaa"),]);
}
#[test]

View file

@ -449,6 +449,12 @@ pub enum Element {
// Pull(Pull), // TODO
}
impl From<Variable> for Element {
fn from(x: Variable) -> Element {
Element::Variable(x)
}
}
impl std::fmt::Display for Element {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {

View file

@ -25,9 +25,6 @@ use std::path::{
use std::sync::{
Arc,
Mutex,
RwLock,
RwLockReadGuard,
RwLockWriteGuard,
};
use rusqlite;
@ -50,7 +47,11 @@ use mentat_core::{
use mentat_core::intern_set::InternSet;
use mentat_db::cache::SQLiteAttributeCache;
use mentat_db::cache::{
InProgressSQLiteAttributeCache,
SQLiteAttributeCache,
};
use mentat_db::db;
use mentat_db::{
transact,
@ -101,15 +102,17 @@ pub struct Metadata {
pub generation: u64,
pub partition_map: PartitionMap,
pub schema: Arc<Schema>,
pub attribute_cache: SQLiteAttributeCache,
}
impl Metadata {
// Intentionally not public.
fn new(generation: u64, partition_map: PartitionMap, schema: Arc<Schema>) -> Metadata {
fn new(generation: u64, partition_map: PartitionMap, schema: Arc<Schema>, cache: SQLiteAttributeCache) -> Metadata {
Metadata {
generation: generation,
partition_map: partition_map,
schema: schema,
attribute_cache: cache,
}
}
}
@ -118,20 +121,25 @@ impl Metadata {
pub struct Conn {
/// `Mutex` since all reads and writes need to be exclusive. Internally, owned data for the
/// volatile parts (generation and partition map), and `Arc` for the infrequently changing parts
/// (schema) that we want to share across threads. A consuming thread may use a shared
/// (schema, cache) that we want to share across threads. A consuming thread may use a shared
/// reference after the `Conn`'s `Metadata` has moved on.
///
/// The motivating case is multiple query threads taking references to the current schema to
/// perform long-running queries while a single writer thread moves the metadata -- partition
/// map and schema -- forward.
///
/// We want the attribute cache to be isolated across transactions, updated within
/// `InProgress` writes, and updated in the `Conn` on commit. To achieve this we
/// store the cache itself in an `Arc` inside `SQLiteAttributeCache`, so that `.get_mut()`
/// gives us copy-on-write semantics.
/// We store that cached `Arc` here in a `Mutex`, so that the main copy can be carefully
/// replaced on commit.
metadata: Mutex<Metadata>,
// TODO: maintain set of change listeners or handles to transaction report queues. #298.
// TODO: maintain cache of query plans that could be shared across threads and invalidated when
// the schema changes. #315.
attribute_cache: RwLock<SQLiteAttributeCache>,
}
/// A convenience wrapper around a single SQLite connection and a Conn. This is suitable
@ -194,7 +202,8 @@ pub struct InProgress<'a, 'c> {
generation: u64,
partition_map: PartitionMap,
schema: Schema,
cache: RwLockWriteGuard<'a, SQLiteAttributeCache>,
cache: InProgressSQLiteAttributeCache,
use_caching: bool,
}
@ -235,7 +244,7 @@ impl<'a, 'c> Queryable for InProgress<'a, 'c> {
where T: Into<Option<QueryInputs>> {
if self.use_caching {
let known = Known::new(&self.schema, Some(&*self.cache));
let known = Known::new(&self.schema, Some(&self.cache));
q_once(&*(self.transaction),
known,
query,
@ -251,7 +260,7 @@ impl<'a, 'c> Queryable for InProgress<'a, 'c> {
fn q_prepare<T>(&self, query: &str, inputs: T) -> PreparedResult
where T: Into<Option<QueryInputs>> {
let known = Known::new(&self.schema, Some(&*self.cache));
let known = Known::new(&self.schema, Some(&self.cache));
q_prepare(&*(self.transaction),
known,
query,
@ -261,7 +270,7 @@ impl<'a, 'c> Queryable for InProgress<'a, 'c> {
fn q_explain<T>(&self, query: &str, inputs: T) -> Result<QueryExplanation>
where T: Into<Option<QueryInputs>> {
let known = Known::new(&self.schema, Some(&*self.cache));
let known = Known::new(&self.schema, Some(&self.cache));
q_explain(&*(self.transaction),
known,
query,
@ -270,13 +279,13 @@ impl<'a, 'c> Queryable for InProgress<'a, 'c> {
fn lookup_values_for_attribute<E>(&self, entity: E, attribute: &edn::NamespacedKeyword) -> Result<Vec<TypedValue>>
where E: Into<Entid> {
let known = Known::new(&self.schema, Some(&*self.cache));
let known = Known::new(&self.schema, Some(&self.cache));
lookup_values_for_attribute(&*(self.transaction), known, entity, attribute)
}
fn lookup_value_for_attribute<E>(&self, entity: E, attribute: &edn::NamespacedKeyword) -> Result<Option<TypedValue>>
where E: Into<Entid> {
let known = Known::new(&self.schema, Some(&*self.cache));
let known = Known::new(&self.schema, Some(&self.cache));
lookup_value_for_attribute(&*(self.transaction), known, entity, attribute)
}
}
@ -357,12 +366,14 @@ impl<'a, 'c> InProgress<'a, 'c> {
}
pub fn transact_terms<I>(&mut self, terms: I, tempid_set: InternSet<TempId>) -> Result<TxReport> where I: IntoIterator<Item=TermWithTempIds> {
let (report, next_partition_map, next_schema) = transact_terms(&self.transaction,
self.partition_map.clone(),
&self.schema,
&self.schema,
terms,
tempid_set)?;
let (report, next_partition_map, next_schema, _watcher) =
transact_terms(&self.transaction,
self.partition_map.clone(),
&self.schema,
&self.schema,
self.cache.transact_watcher(),
terms,
tempid_set)?;
self.partition_map = next_partition_map;
if let Some(schema) = next_schema {
self.schema = schema;
@ -379,7 +390,13 @@ impl<'a, 'c> InProgress<'a, 'c> {
// `Metadata` on return. If we used `Cell` or other mechanisms, we'd be using
// `Default::default` in those situations to extract the partition map, and so there
// would still be some cost.
let (report, next_partition_map, next_schema) = transact(&self.transaction, self.partition_map.clone(), &self.schema, &self.schema, entities)?;
let (report, next_partition_map, next_schema, _watcher) =
transact(&self.transaction,
self.partition_map.clone(),
&self.schema,
&self.schema,
self.cache.transact_watcher(),
entities)?;
self.partition_map = next_partition_map;
if let Some(schema) = next_schema {
self.schema = schema;
@ -423,6 +440,9 @@ impl<'a, 'c> InProgress<'a, 'c> {
metadata.generation += 1;
metadata.partition_map = self.partition_map;
// Update the conn's cache if we made any changes.
self.cache.commit_to(&mut metadata.attribute_cache);
if self.schema != *(metadata.schema) {
metadata.schema = Arc::new(self.schema);
@ -433,6 +453,29 @@ impl<'a, 'c> InProgress<'a, 'c> {
Ok(())
}
pub fn cache(&mut self,
attribute: &NamespacedKeyword,
cache_direction: CacheDirection,
cache_action: CacheAction) -> Result<()> {
let attribute_entid: Entid = self.schema
.attribute_for_ident(&attribute)
.ok_or_else(|| ErrorKind::UnknownAttribute(attribute.to_string()))?.1.into();
match cache_action {
CacheAction::Register => {
match cache_direction {
CacheDirection::Both => self.cache.register(&self.schema, &self.transaction, attribute_entid),
CacheDirection::Forward => self.cache.register_forward(&self.schema, &self.transaction, attribute_entid),
CacheDirection::Reverse => self.cache.register_reverse(&self.schema, &self.transaction, attribute_entid),
}.map_err(|e| e.into())
},
CacheAction::Deregister => {
self.cache.unregister(attribute_entid);
Ok(())
},
}
}
}
impl Store {
@ -520,8 +563,7 @@ impl Conn {
// Intentionally not public.
fn new(partition_map: PartitionMap, schema: Schema) -> Conn {
Conn {
metadata: Mutex::new(Metadata::new(0, partition_map, Arc::new(schema))),
attribute_cache: Default::default()
metadata: Mutex::new(Metadata::new(0, partition_map, Arc::new(schema), Default::default())),
}
}
@ -559,15 +601,10 @@ impl Conn {
self.metadata.lock().unwrap().schema.clone()
}
pub fn attribute_cache<'s>(&'s self) -> RwLockReadGuard<'s, SQLiteAttributeCache> {
self.attribute_cache.read().unwrap()
pub fn current_cache(&self) -> SQLiteAttributeCache {
self.metadata.lock().unwrap().attribute_cache.clone()
}
pub fn attribute_cache_mut<'s>(&'s self) -> RwLockWriteGuard<'s, SQLiteAttributeCache> {
self.attribute_cache.write().unwrap()
}
/// Query the Mentat store, using the given connection and the current metadata.
pub fn q_once<T>(&self,
sqlite: &rusqlite::Connection,
@ -577,8 +614,7 @@ impl Conn {
// Doesn't clone, unlike `current_schema`.
let metadata = self.metadata.lock().unwrap();
let cache = &*self.attribute_cache.read().unwrap();
let known = Known::new(&*metadata.schema, Some(cache));
let known = Known::new(&*metadata.schema, Some(&metadata.attribute_cache));
q_once(sqlite,
known,
query,
@ -607,8 +643,7 @@ impl Conn {
where T: Into<Option<QueryInputs>> {
let metadata = self.metadata.lock().unwrap();
let cache = &*self.attribute_cache.read().unwrap();
let known = Known::new(&*metadata.schema, Some(cache));
let known = Known::new(&*metadata.schema, Some(&metadata.attribute_cache));
q_prepare(sqlite,
known,
query,
@ -622,8 +657,7 @@ impl Conn {
where T: Into<Option<QueryInputs>>
{
let metadata = self.metadata.lock().unwrap();
let cache = &*self.attribute_cache.read().unwrap();
let known = Known::new(&*metadata.schema, Some(cache));
let known = Known::new(&*metadata.schema, Some(&metadata.attribute_cache));
q_explain(sqlite,
known,
query,
@ -634,9 +668,8 @@ impl Conn {
sqlite: &rusqlite::Connection,
entity: Entid,
attribute: &edn::NamespacedKeyword) -> Result<Vec<TypedValue>> {
let schema = &*self.current_schema();
let cache = &*self.attribute_cache();
let known = Known::new(schema, Some(cache));
let metadata = self.metadata.lock().unwrap();
let known = Known::new(&*metadata.schema, Some(&metadata.attribute_cache));
lookup_values_for_attribute(sqlite, known, entity, attribute)
}
@ -644,16 +677,15 @@ impl Conn {
sqlite: &rusqlite::Connection,
entity: Entid,
attribute: &edn::NamespacedKeyword) -> Result<Option<TypedValue>> {
let schema = &*self.current_schema();
let cache = &*self.attribute_cache();
let known = Known::new(schema, Some(cache));
let metadata = self.metadata.lock().unwrap();
let known = Known::new(&*metadata.schema, Some(&metadata.attribute_cache));
lookup_value_for_attribute(sqlite, known, entity, attribute)
}
/// Take a SQLite transaction.
fn begin_transaction_with_behavior<'m, 'conn>(&'m mut self, sqlite: &'conn mut rusqlite::Connection, behavior: TransactionBehavior) -> Result<InProgress<'m, 'conn>> {
let tx = sqlite.transaction_with_behavior(behavior)?;
let (current_generation, current_partition_map, current_schema) =
let (current_generation, current_partition_map, current_schema, cache_cow) =
{
// The mutex is taken during this block.
let ref current: Metadata = *self.metadata.lock().unwrap();
@ -661,7 +693,8 @@ impl Conn {
// Expensive, but the partition map is updated after every committed transaction.
current.partition_map.clone(),
// Cheap.
current.schema.clone())
current.schema.clone(),
current.attribute_cache.clone())
};
Ok(InProgress {
@ -670,7 +703,7 @@ impl Conn {
generation: current_generation,
partition_map: current_partition_map,
schema: (*current_schema).clone(),
cache: self.attribute_cache.write().unwrap(),
cache: InProgressSQLiteAttributeCache::from_cache(cache_cow),
use_caching: true,
})
}
@ -717,41 +750,39 @@ impl Conn {
Ok(report)
}
// TODO: Figure out how to set max cache size and max result size and implement those on cache
// Question: Should those be only for lazy cache? The eager cache could perhaps grow infinitely
// and it becomes up to the client to manage memory usage by excising from cache when no longer
// needed
/// Adds or removes the values of a given attribute to an in memory cache
/// The attribute should be a namespaced string `:foo/bar`.
/// cache_action determines if the attribute should be added or removed from the cache.
/// CacheAction::Add is idempotent - each attribute is only added once and cannot be both lazy
/// and eager.
/// Adds or removes the values of a given attribute to an in-memory cache.
/// The attribute should be a namespaced string: e.g., `:foo/bar`.
/// `cache_action` determines if the attribute should be added or removed from the cache.
/// CacheAction::Add is idempotent - each attribute is only added once.
/// CacheAction::Remove throws an error if the attribute does not currently exist in the cache.
/// CacheType::Eager fetches all the values of the attribute and caches them on add.
/// CacheType::Lazy caches values only after they have first been fetched.
pub fn cache(&mut self,
sqlite: &mut rusqlite::Connection,
schema: &Schema,
attribute: &NamespacedKeyword,
cache_direction: CacheDirection,
cache_action: CacheAction) -> Result<()> {
match self.current_schema().attribute_for_ident(&attribute) {
None => bail!(ErrorKind::UnknownAttribute(attribute.to_string())),
Some((_attribute, attribute_entid)) => {
let mut cache = self.attribute_cache.write().unwrap();
match cache_action {
CacheAction::Register => {
match cache_direction {
CacheDirection::Both => cache.register(schema, sqlite, attribute_entid),
CacheDirection::Forward => cache.register_forward(schema, sqlite, attribute_entid),
CacheDirection::Reverse => cache.register_reverse(schema, sqlite, attribute_entid),
}.map_err(|e| e.into())
},
CacheAction::Deregister => {
cache.unregister(attribute_entid);
Ok(())
},
}
let mut metadata = self.metadata.lock().unwrap();
let attribute_entid: Entid;
// Immutable borrow of metadata.
{
attribute_entid = metadata.schema
.attribute_for_ident(&attribute)
.ok_or_else(|| ErrorKind::UnknownAttribute(attribute.to_string()))?.1.into();
}
let cache = &mut metadata.attribute_cache;
match cache_action {
CacheAction::Register => {
match cache_direction {
CacheDirection::Both => cache.register(schema, sqlite, attribute_entid),
CacheDirection::Forward => cache.register_forward(schema, sqlite, attribute_entid),
CacheDirection::Reverse => cache.register_reverse(schema, sqlite, attribute_entid),
}.map_err(|e| e.into())
},
CacheAction::Deregister => {
cache.unregister(attribute_entid);
Ok(())
},
}
}
@ -761,18 +792,34 @@ impl Conn {
mod tests {
use super::*;
extern crate time;
extern crate mentat_parser_utils;
use std::collections::{
BTreeSet,
};
use std::path::{
PathBuf,
};
use std::time::Instant;
use mentat_core::{
CachedAttributes,
TypedValue,
};
use query::{
PreparedQuery,
Variable,
};
use ::QueryResults;
use ::{
IntoResult,
QueryInputs,
QueryResults,
};
use mentat_db::USER0;
@ -1081,4 +1128,257 @@ mod tests {
assert!(cached_elapsed_time < uncached_elapsed_time);
}
}
#[test]
fn test_cache_usage() {
let mut sqlite = db::new_connection("").unwrap();
let mut conn = Conn::connect(&mut sqlite).unwrap();
let db_ident = (*conn.current_schema()).get_entid(&kw!(:db/ident)).expect("db_ident").0;
let db_type = (*conn.current_schema()).get_entid(&kw!(:db/valueType)).expect("db_ident").0;
println!("db/ident is {}", db_ident);
println!("db/type is {}", db_type);
let query = format!("[:find ?ident . :where [?e {} :db/doc][?e {} ?type][?type {} ?ident]]",
db_ident, db_type, db_ident);
println!("Query is {}", query);
assert!(!conn.current_cache().is_attribute_cached_forward(db_ident));
{
let mut ip = conn.begin_transaction(&mut sqlite).expect("began");
let ident = ip.q_once(query.as_str(), None).into_scalar_result().expect("query");
assert_eq!(ident, Some(TypedValue::typed_ns_keyword("db.type", "string")));
let start = time::PreciseTime::now();
ip.q_once(query.as_str(), None).into_scalar_result().expect("query");
let end = time::PreciseTime::now();
println!("Uncached took {}µs", start.to(end).num_microseconds().unwrap());
ip.cache(&kw!(:db/ident), CacheDirection::Forward, CacheAction::Register).expect("registered");
ip.cache(&kw!(:db/valueType), CacheDirection::Forward, CacheAction::Register).expect("registered");
assert!(ip.cache.is_attribute_cached_forward(db_ident));
let ident = ip.q_once(query.as_str(), None).into_scalar_result().expect("query");
assert_eq!(ident, Some(TypedValue::typed_ns_keyword("db.type", "string")));
let start = time::PreciseTime::now();
ip.q_once(query.as_str(), None).into_scalar_result().expect("query");
let end = time::PreciseTime::now();
println!("Cached took {}µs", start.to(end).num_microseconds().unwrap());
// If we roll back the change, our caching operations are also rolled back.
ip.rollback().expect("rolled back");
}
assert!(!conn.current_cache().is_attribute_cached_forward(db_ident));
{
let mut ip = conn.begin_transaction(&mut sqlite).expect("began");
let ident = ip.q_once(query.as_str(), None).into_scalar_result().expect("query");
assert_eq!(ident, Some(TypedValue::typed_ns_keyword("db.type", "string")));
ip.cache(&kw!(:db/ident), CacheDirection::Forward, CacheAction::Register).expect("registered");
ip.cache(&kw!(:db/valueType), CacheDirection::Forward, CacheAction::Register).expect("registered");
assert!(ip.cache.is_attribute_cached_forward(db_ident));
ip.commit().expect("rolled back");
}
assert!(conn.current_cache().is_attribute_cached_forward(db_ident));
assert!(conn.current_cache().is_attribute_cached_forward(db_type));
}
fn fixture_path(rest: &str) -> PathBuf {
let fixtures = Path::new("fixtures/");
fixtures.join(Path::new(rest))
}
#[test]
fn test_prepared_query_with_cache() {
let mut store = Store::open("").expect("opened");
let mut in_progress = store.begin_transaction().expect("began");
in_progress.import(fixture_path("cities.schema")).expect("transacted schema");
in_progress.import(fixture_path("all_seattle.edn")).expect("transacted data");
in_progress.cache(&kw!(:neighborhood/district), CacheDirection::Forward, CacheAction::Register).expect("cache done");
in_progress.cache(&kw!(:district/name), CacheDirection::Forward, CacheAction::Register).expect("cache done");
in_progress.cache(&kw!(:neighborhood/name), CacheDirection::Reverse, CacheAction::Register).expect("cache done");
let query = r#"[:find ?district
:in ?hood
:where
[?neighborhood :neighborhood/name ?hood]
[?neighborhood :neighborhood/district ?d]
[?d :district/name ?district]]"#;
let hood = "Beacon Hill";
let inputs = QueryInputs::with_value_sequence(vec![(var!(?hood), TypedValue::typed_string(hood))]);
let mut prepared = in_progress.q_prepare(query, inputs)
.expect("prepared");
match &prepared {
&PreparedQuery::Constant { select: ref _select } => {},
_ => panic!(),
};
let start = time::PreciseTime::now();
let results = prepared.run(None).expect("results");
let end = time::PreciseTime::now();
println!("Prepared cache execution took {}µs", start.to(end).num_microseconds().unwrap());
assert_eq!(results.into_rel().expect("result"),
vec![vec![TypedValue::typed_string("Greater Duwamish")]]);
}
trait StoreCache {
fn get_entid_for_value(&self, attr: Entid, val: &TypedValue) -> Option<Entid>;
fn is_attribute_cached_reverse(&self, attr: Entid) -> bool;
fn is_attribute_cached_forward(&self, attr: Entid) -> bool;
}
impl StoreCache for Store {
fn get_entid_for_value(&self, attr: Entid, val: &TypedValue) -> Option<Entid> {
let cache = self.conn.current_cache();
cache.get_entid_for_value(attr, val)
}
fn is_attribute_cached_forward(&self, attr: Entid) -> bool {
self.conn.current_cache().is_attribute_cached_forward(attr)
}
fn is_attribute_cached_reverse(&self, attr: Entid) -> bool {
self.conn.current_cache().is_attribute_cached_reverse(attr)
}
}
#[test]
fn test_cache_mutation() {
let mut store = Store::open("").expect("opened");
{
let mut in_progress = store.begin_transaction().expect("begun");
in_progress.transact(r#"[
{ :db/ident :foo/bar
:db/cardinality :db.cardinality/one
:db/index true
:db/unique :db.unique/identity
:db/valueType :db.type/long },
{ :db/ident :foo/baz
:db/cardinality :db.cardinality/one
:db/valueType :db.type/boolean }
{ :db/ident :foo/x
:db/cardinality :db.cardinality/many
:db/valueType :db.type/long }]"#).expect("transact");
// Cache one….
in_progress.cache(&kw!(:foo/bar), CacheDirection::Reverse, CacheAction::Register).expect("cache done");
in_progress.commit().expect("commit");
}
let foo_bar = store.conn.current_schema().get_entid(&kw!(:foo/bar)).expect("foo/bar").0;
let foo_baz = store.conn.current_schema().get_entid(&kw!(:foo/baz)).expect("foo/baz").0;
let foo_x = store.conn.current_schema().get_entid(&kw!(:foo/x)).expect("foo/x").0;
// … and cache the others via the store.
store.cache(&kw!(:foo/baz), CacheDirection::Both).expect("cache done");
store.cache(&kw!(:foo/x), CacheDirection::Forward).expect("cache done");
{
assert!(store.is_attribute_cached_reverse(foo_bar));
assert!(store.is_attribute_cached_forward(foo_baz));
assert!(store.is_attribute_cached_reverse(foo_baz));
assert!(store.is_attribute_cached_forward(foo_x));
}
// Add some data.
{
let mut in_progress = store.begin_transaction().expect("begun");
{
assert!(in_progress.cache.is_attribute_cached_reverse(foo_bar));
assert!(in_progress.cache.is_attribute_cached_forward(foo_baz));
assert!(in_progress.cache.is_attribute_cached_reverse(foo_baz));
assert!(in_progress.cache.is_attribute_cached_forward(foo_x));
assert!(in_progress.cache.overlay.is_attribute_cached_reverse(foo_bar));
assert!(in_progress.cache.overlay.is_attribute_cached_forward(foo_baz));
assert!(in_progress.cache.overlay.is_attribute_cached_reverse(foo_baz));
assert!(in_progress.cache.overlay.is_attribute_cached_forward(foo_x));
}
in_progress.transact(r#"[
{:foo/bar 15, :foo/baz false, :foo/x [1, 2, 3]}
{:foo/bar 99, :foo/baz true}
{:foo/bar -2, :foo/baz true}
]"#).expect("transact");
// Data is in the cache.
let first = in_progress.cache.get_entid_for_value(foo_bar, &TypedValue::Long(15)).expect("id");
assert_eq!(in_progress.cache.get_value_for_entid(&in_progress.schema, foo_baz, first).expect("val"), &TypedValue::Boolean(false));
// All three values for :foo/x.
let all_three: BTreeSet<TypedValue> = in_progress.cache
.get_values_for_entid(&in_progress.schema, foo_x, first)
.expect("val")
.iter().cloned().collect();
assert_eq!(all_three, vec![1, 2, 3].into_iter().map(TypedValue::Long).collect());
in_progress.commit().expect("commit");
}
// Data is still in the cache.
{
let first = store.get_entid_for_value(foo_bar, &TypedValue::Long(15)).expect("id");
let cache: SQLiteAttributeCache = store.conn.current_cache();
assert_eq!(cache.get_value_for_entid(&store.conn.current_schema(), foo_baz, first).expect("val"), &TypedValue::Boolean(false));
let all_three: BTreeSet<TypedValue> = cache.get_values_for_entid(&store.conn.current_schema(), foo_x, first)
.expect("val")
.iter().cloned().collect();
assert_eq!(all_three, vec![1, 2, 3].into_iter().map(TypedValue::Long).collect());
}
// We can remove data and the cache reflects it, immediately and after commit.
{
let mut in_progress = store.begin_transaction().expect("began");
let first = in_progress.cache.get_entid_for_value(foo_bar, &TypedValue::Long(15)).expect("id");
in_progress.transact(format!("[[:db/retract {} :foo/x 2]]", first).as_str()).expect("transact");
let only_two: BTreeSet<TypedValue> = in_progress.cache
.get_values_for_entid(&in_progress.schema, foo_x, first)
.expect("val")
.iter().cloned().collect();
assert_eq!(only_two, vec![1, 3].into_iter().map(TypedValue::Long).collect());
// Rollback: unchanged.
}
{
let first = store.get_entid_for_value(foo_bar, &TypedValue::Long(15)).expect("id");
let cache: SQLiteAttributeCache = store.conn.current_cache();
assert_eq!(cache.get_value_for_entid(&store.conn.current_schema(), foo_baz, first).expect("val"), &TypedValue::Boolean(false));
let all_three: BTreeSet<TypedValue> = cache.get_values_for_entid(&store.conn.current_schema(), foo_x, first)
.expect("val")
.iter().cloned().collect();
assert_eq!(all_three, vec![1, 2, 3].into_iter().map(TypedValue::Long).collect());
}
// Try again, but this time commit.
{
let mut in_progress = store.begin_transaction().expect("began");
let first = in_progress.cache.get_entid_for_value(foo_bar, &TypedValue::Long(15)).expect("id");
in_progress.transact(format!("[[:db/retract {} :foo/x 2]]", first).as_str()).expect("transact");
in_progress.commit().expect("committed");
}
{
let first = store.get_entid_for_value(foo_bar, &TypedValue::Long(15)).expect("id");
let cache: SQLiteAttributeCache = store.conn.current_cache();
assert_eq!(cache.get_value_for_entid(&store.conn.current_schema(), foo_baz, first).expect("val"), &TypedValue::Boolean(false));
let only_two: BTreeSet<TypedValue> = cache.get_values_for_entid(&store.conn.current_schema(), foo_x, first)
.expect("val")
.iter().cloned().collect();
assert_eq!(only_two, vec![1, 3].into_iter().map(TypedValue::Long).collect());
}
}
}

View file

@ -52,6 +52,7 @@ use mentat_query_parser::{
};
use mentat_query_projector::{
ConstantProjector,
Projector,
};
@ -60,6 +61,7 @@ use mentat_sql::{
};
use mentat_query_translator::{
ProjectedSelect,
query_to_select,
};
@ -84,6 +86,9 @@ pub enum PreparedQuery<'sqlite> {
Empty {
find_spec: Rc<FindSpec>,
},
Constant {
select: ConstantProjector,
},
Bound {
statement: rusqlite::Statement<'sqlite>,
args: Vec<(String, Rc<rusqlite::types::Value>)>,
@ -97,6 +102,9 @@ impl<'sqlite> PreparedQuery<'sqlite> {
&mut PreparedQuery::Empty { ref find_spec } => {
Ok(QueryOutput::empty(find_spec))
},
&mut PreparedQuery::Constant { ref select } => {
select.project_without_rows().map_err(|e| e.into())
},
&mut PreparedQuery::Bound { ref mut statement, ref args, ref projector } => {
let rows = run_statement(statement, args)?;
projector
@ -136,6 +144,10 @@ impl IntoResult for QueryExecutionResult {
pub enum QueryExplanation {
/// A query known in advance to be empty, and why we believe that.
KnownEmpty(EmptyBecause),
/// A query known in advance to return a constant value.
KnownConstant,
/// A query that takes actual work to execute.
ExecutionPlan {
/// The translated query and any bindings.
@ -316,14 +328,18 @@ fn run_algebrized_query<'sqlite>(sqlite: &'sqlite rusqlite::Connection, algebriz
}
let select = query_to_select(algebrized)?;
let SQLQuery { sql, args } = select.query.to_sql_query()?;
match select {
ProjectedSelect::Constant(constant) => constant.project_without_rows()
.map_err(|e| e.into()),
ProjectedSelect::Query { query, projector } => {
let SQLQuery { sql, args } = query.to_sql_query()?;
let mut statement = sqlite.prepare(sql.as_str())?;
let rows = run_statement(&mut statement, &args)?;
let mut statement = sqlite.prepare(sql.as_str())?;
let rows = run_statement(&mut statement, &args)?;
select.projector
.project(rows)
.map_err(|e| e.into())
projector.project(rows).map_err(|e| e.into())
},
}
}
/// Take an EDN query string, a reference to an open SQLite connection, a Mentat schema, and an
@ -382,14 +398,23 @@ pub fn q_prepare<'sqlite, 'query, T>
}
let select = query_to_select(algebrized)?;
let SQLQuery { sql, args } = select.query.to_sql_query()?;
let statement = sqlite.prepare(sql.as_str())?;
match select {
ProjectedSelect::Constant(constant) => {
Ok(PreparedQuery::Constant {
select: constant,
})
},
ProjectedSelect::Query { query, projector } => {
let SQLQuery { sql, args } = query.to_sql_query()?;
let statement = sqlite.prepare(sql.as_str())?;
Ok(PreparedQuery::Bound {
statement,
args,
projector: select.projector
})
Ok(PreparedQuery::Bound {
statement,
args,
projector: projector
})
},
}
}
pub fn q_explain<'sqlite, 'query, T>
@ -403,18 +428,23 @@ pub fn q_explain<'sqlite, 'query, T>
if algebrized.is_known_empty() {
return Ok(QueryExplanation::KnownEmpty(algebrized.cc.empty_because.unwrap()));
}
let query = query_to_select(algebrized)?.query.to_sql_query()?;
match query_to_select(algebrized)? {
ProjectedSelect::Constant(_constant) => Ok(QueryExplanation::KnownConstant),
ProjectedSelect::Query { query, projector: _projector } => {
let query = query.to_sql_query()?;
let plan_sql = format!("EXPLAIN QUERY PLAN {}", query.sql);
let plan_sql = format!("EXPLAIN QUERY PLAN {}", query.sql);
let steps = run_sql_query(sqlite, &plan_sql, &query.args, |row| {
QueryPlanStep {
select_id: row.get(0),
order: row.get(1),
from: row.get(2),
detail: row.get(3)
}
})?;
let steps = run_sql_query(sqlite, &plan_sql, &query.args, |row| {
QueryPlanStep {
select_id: row.get(0),
order: row.get(1),
from: row.get(2),
detail: row.get(3)
}
})?;
Ok(QueryExplanation::ExecutionPlan { query, steps })
Ok(QueryExplanation::ExecutionPlan { query, steps })
},
}
}

View file

@ -97,7 +97,7 @@ fn test_add_to_cache() {
{
let cached_values = attribute_cache.value_pairs(schema, attr).expect("non-None");
assert!(!cached_values.is_empty());
let flattened: BTreeSet<TypedValue> = cached_values.values().cloned().collect();
let flattened: BTreeSet<TypedValue> = cached_values.values().cloned().filter_map(|x| x).collect();
let expected: BTreeSet<TypedValue> = vec![TypedValue::Long(100), TypedValue::Long(200)].into_iter().collect();
assert_eq!(flattened, expected);
}

View file

@ -32,7 +32,6 @@ use mentat_core::{
};
use mentat::{
IntoResult,
NamespacedKeyword,
PlainSymbol,
QueryInputs,
@ -622,38 +621,3 @@ fn test_type_reqs() {
}
};
}
#[test]
fn test_cache_usage() {
let mut c = new_connection("").expect("opened connection");
let conn = Conn::connect(&mut c).expect("connected");
let db_ident = (*conn.current_schema()).get_entid(&kw!(:db/ident)).expect("db_ident");
let db_type = (*conn.current_schema()).get_entid(&kw!(:db/valueType)).expect("db_ident");
println!("db/ident is {}", db_ident.0);
println!("db/type is {}", db_type.0);
let query = format!("[:find ?ident . :where [?e {} :db/doc][?e {} ?type][?type {} ?ident]]",
db_ident.0, db_type.0, db_ident.0);
println!("Query is {}", query);
let schema = conn.current_schema();
(*conn.attribute_cache_mut()).register(&schema, &mut c, db_ident).expect("registered");
(*conn.attribute_cache_mut()).register(&schema, &mut c, db_type).expect("registered");
let ident = conn.q_once(&c, query.as_str(), None).into_scalar_result().expect("query");
assert_eq!(ident, Some(TypedValue::typed_ns_keyword("db.type", "string")));
let ident = conn.q_uncached(&c, query.as_str(), None).into_scalar_result().expect("query");
assert_eq!(ident, Some(TypedValue::typed_ns_keyword("db.type", "string")));
let start = time::PreciseTime::now();
conn.q_once(&c, query.as_str(), None).into_scalar_result().expect("query");
let end = time::PreciseTime::now();
println!("Cached took {}µs", start.to(end).num_microseconds().unwrap());
let start = time::PreciseTime::now();
conn.q_uncached(&c, query.as_str(), None).into_scalar_result().expect("query");
let end = time::PreciseTime::now();
println!("Uncached took {}µs", start.to(end).num_microseconds().unwrap());
}

View file

@ -458,6 +458,8 @@ impl Repl {
match self.store.q_explain(query.as_str(), None) {
Result::Err(err) =>
println!("{:?}.", err),
Result::Ok(QueryExplanation::KnownConstant) =>
println!("Query is known constant!"),
Result::Ok(QueryExplanation::KnownEmpty(empty_because)) =>
println!("Query is known empty: {:?}", empty_because),
Result::Ok(QueryExplanation::ExecutionPlan { query, steps }) => {