Implement simple pull expressions (#638) r=nalexander

* Refactor AttributeCache populator code for use from pull.

* Pre: add to_value_rc to Cloned.

* Pre: add From<StructuredMap> for Binding.

* Pre: clarify Store::open_empty.

* Pre: StructuredMap cleanup.

* Pre: clean up a doc test.

* Split projector crate. Pass schema to projector.

* CLI support for printing bindings.

* Add and use ConjoiningClauses::derive_types_from_find_spec.

* Define pull types.

* Implement pull on top of the attribute cache layer.

* Add pull support to the projector.

* Parse pull expressions.

* Add simple pull support to connection objects.

* Tests for pull.

* Compile with Rust 1.25.

The only choice involved in this commit is that of replacing the
anonymous lifetime '_ with a named lifetime for the cache; since we're
accepting a Known, which includes the cache in question, I think it's
clear that we expect the function to apply to any given cache
lifetime.

* Review comments.

* Bail on unnamed attribute.

* Make assert_parse_failure_contains safe to use.

* Rework query parser to report better errors for pull.

* Test for mixed wildcard and simple attribute.
This commit is contained in:
Richard Newman 2018-05-04 12:56:00 -07:00 committed by GitHub
parent 90465ae74a
commit e21156a754
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
33 changed files with 2318 additions and 398 deletions

View file

@ -64,6 +64,9 @@ path = "query-parser"
[dependencies.mentat_query_projector]
path = "query-projector"
[dependencies.mentat_query_pull]
path = "query-pull"
[dependencies.mentat_query_sql]
path = "query-sql"

View file

@ -55,12 +55,13 @@ mod value_type_set;
mod sql_types;
pub use types::{
Binding,
Cloned,
Entid,
FromRc,
KnownEntid,
StructuredMap,
TypedValue,
Binding,
ValueType,
ValueTypeTag,
ValueRc,
@ -362,6 +363,45 @@ pub mod intern_set;
pub mod counter;
pub mod util;
/// A helper macro to sequentially process an iterable sequence,
/// evaluating a block between each pair of items.
///
/// This is used to simply and efficiently produce output like
///
/// ```sql
/// 1, 2, 3
/// ```
///
/// or
///
/// ```sql
/// x = 1 AND y = 2
/// ```
///
/// without producing an intermediate string sequence.
#[macro_export]
macro_rules! interpose {
( $name: pat, $across: expr, $body: block, $inter: block ) => {
interpose_iter!($name, $across.iter(), $body, $inter)
}
}
/// A helper to bind `name` to values in `across`, running `body` for each value,
/// and running `inter` between each value. See `interpose` for examples.
#[macro_export]
macro_rules! interpose_iter {
( $name: pat, $across: expr, $body: block, $inter: block ) => {
let mut seq = $across;
if let Some($name) = seq.next() {
$body;
for $name in seq {
$inter;
$body;
}
}
}
}
#[cfg(test)]
mod test {
use super::*;

View file

@ -96,24 +96,37 @@ impl<T> FromRc<T> for Box<T> where T: Sized + Clone {
// We do this a lot for errors.
pub trait Cloned<T> {
fn cloned(&self) -> T;
fn to_value_rc(&self) -> ValueRc<T>;
}
impl<T: Clone> Cloned<T> for Rc<T> where T: Sized + Clone {
fn cloned(&self) -> T {
(*self.as_ref()).clone()
}
fn to_value_rc(&self) -> ValueRc<T> {
ValueRc::from_rc(self.clone())
}
}
impl<T: Clone> Cloned<T> for Arc<T> where T: Sized + Clone {
fn cloned(&self) -> T {
(*self.as_ref()).clone()
}
fn to_value_rc(&self) -> ValueRc<T> {
ValueRc::from_arc(self.clone())
}
}
impl<T: Clone> Cloned<T> for Box<T> where T: Sized + Clone {
fn cloned(&self) -> T {
self.as_ref().clone()
}
fn to_value_rc(&self) -> ValueRc<T> {
ValueRc::new(self.cloned())
}
}
///
@ -284,11 +297,17 @@ pub enum Binding {
}
impl<T> From<T> for Binding where T: Into<TypedValue> {
fn from(value: T) -> Binding {
fn from(value: T) -> Self {
Binding::Scalar(value.into())
}
}
impl From<StructuredMap> for Binding {
fn from(value: StructuredMap) -> Self {
Binding::Map(ValueRc::new(value))
}
}
impl Binding {
pub fn val(self) -> Option<TypedValue> {
match self {
@ -308,8 +327,31 @@ impl Binding {
///
/// We entirely support the former, and partially support the latter -- you can alias
/// using a different keyword only.
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct StructuredMap(IndexMap<ValueRc<NamespacedKeyword>, Binding>);
#[derive(Clone, Debug, Default, Eq, PartialEq)]
pub struct StructuredMap(pub IndexMap<ValueRc<NamespacedKeyword>, Binding>);
impl StructuredMap {
pub fn insert<N, B>(&mut self, name: N, value: B) where N: Into<ValueRc<NamespacedKeyword>>, B: Into<Binding> {
self.0.insert(name.into(), value.into());
}
}
impl From<IndexMap<ValueRc<NamespacedKeyword>, Binding>> for StructuredMap {
fn from(src: IndexMap<ValueRc<NamespacedKeyword>, Binding>) -> Self {
StructuredMap(src)
}
}
// Mostly for testing.
impl<T> From<Vec<(NamespacedKeyword, T)>> for StructuredMap where T: Into<Binding> {
fn from(value: Vec<(NamespacedKeyword, T)>) -> Self {
let mut sm = StructuredMap::default();
for (k, v) in value.into_iter() {
sm.insert(k, v);
}
sm
}
}
impl Binding {
/// Returns true if the provided type is `Some` and matches this value's type, or if the

View file

@ -23,6 +23,9 @@ path = "../edn"
[dependencies.mentat_core]
path = "../core"
[dependencies.mentat_sql]
path = "../sql"
[dependencies.mentat_tx]
path = "../tx"

View file

@ -77,6 +77,7 @@ use num;
use rusqlite;
use mentat_core::{
Binding,
CachedAttributes,
Entid,
HasSchema,
@ -90,6 +91,12 @@ use mentat_core::util::{
Either,
};
use mentat_sql::{
QueryBuilder,
SQLiteQueryBuilder,
SQLQuery,
};
use mentat_tx::entities::{
OpType,
};
@ -241,6 +248,11 @@ impl<'conn, F> Iterator for AevRows<'conn, F> where F: FnMut(&rusqlite::Row) ->
// - cardinality/one doesn't need a vec
// - unique/* should ideally have a bijective mapping (reverse lookup)
pub trait AttributeCache {
fn has_e(&self, e: Entid) -> bool;
fn binding_for_e(&self, e: Entid) -> Option<Binding>;
}
trait RemoveFromCache {
fn remove(&mut self, e: Entid, v: &TypedValue);
}
@ -273,6 +285,16 @@ impl Absorb for SingleValAttributeCache {
}
}
impl AttributeCache for SingleValAttributeCache {
fn binding_for_e(&self, e: Entid) -> Option<Binding> {
self.get(e).map(|v| v.clone().into())
}
fn has_e(&self, e: Entid) -> bool {
self.e_v.contains_key(&e)
}
}
impl ClearCache for SingleValAttributeCache {
fn clear(&mut self) {
self.e_v.clear();
@ -332,6 +354,19 @@ impl Absorb for MultiValAttributeCache {
}
}
impl AttributeCache for MultiValAttributeCache {
fn binding_for_e(&self, e: Entid) -> Option<Binding> {
self.e_vs.get(&e).map(|vs| {
let bindings = vs.iter().cloned().map(|v| v.into()).collect();
Binding::Vec(ValueRc::new(bindings))
})
}
fn has_e(&self, e: Entid) -> bool {
self.e_vs.contains_key(&e)
}
}
impl ClearCache for MultiValAttributeCache {
fn clear(&mut self) {
self.e_vs.clear();
@ -861,16 +896,219 @@ impl AttributeCaches {
let sql = format!("SELECT a, e, v, value_type_tag FROM {} WHERE a = ? ORDER BY a ASC, e ASC", table);
let args: Vec<&rusqlite::types::ToSql> = vec![&attribute];
let mut stmt = sqlite.prepare(&sql)?;
let replacing = true;
self.repopulate_from_aevt(schema, &mut stmt, args, replacing)
}
fn repopulate_from_aevt<'a, 's, 'c, 'v>(&'a mut self,
schema: &'s Schema,
statement: &'c mut rusqlite::Statement,
args: Vec<&'v rusqlite::types::ToSql>,
replacing: bool) -> Result<()> {
let mut aev_factory = AevFactory::new();
let rows = stmt.query_map(&args, |row| aev_factory.row_to_aev(row))?;
let rows = statement.query_map(&args, |row| aev_factory.row_to_aev(row))?;
let aevs = AevRows {
rows: rows,
};
self.accumulate_into_cache(None, schema, aevs.peekable(), AccumulationBehavior::Add { replacing: true })?;
self.accumulate_into_cache(None, schema, aevs.peekable(), AccumulationBehavior::Add { replacing })?;
Ok(())
}
}
#[derive(Clone)]
pub enum AttributeSpec {
All,
Specified {
// These are assumed to not include duplicates.
fts: Vec<Entid>,
non_fts: Vec<Entid>,
},
}
impl AttributeSpec {
pub fn all() -> AttributeSpec {
AttributeSpec::All
}
pub fn specified(attrs: &BTreeSet<Entid>, schema: &Schema) -> AttributeSpec {
let mut fts = Vec::with_capacity(attrs.len());
let mut non_fts = Vec::with_capacity(attrs.len());
for attr in attrs.iter() {
if let Some(a) = schema.attribute_for_entid(*attr) {
if a.fulltext {
fts.push(*attr);
} else {
non_fts.push(*attr);
}
}
}
AttributeSpec::Specified { fts, non_fts }
}
}
impl AttributeCaches {
/// Fetch the requested entities and attributes from the store and put them in the cache.
///
/// The caller is responsible for ensuring that `entities` is unique, and for avoiding any
/// redundant work.
///
/// Each provided attribute will be marked as forward-cached; the caller is responsible for
/// ensuring that this cache is complete or that it is not expected to be complete.
fn populate_cache_for_entities_and_attributes<'s, 'c>(&mut self,
schema: &'s Schema,
sqlite: &'c rusqlite::Connection,
attrs: AttributeSpec,
entities: &Vec<Entid>) -> Result<()> {
// Mark the attributes as cached as we go. We do this because we're going in through the
// back door here, and the usual caching API won't have taken care of this for us.
let mut qb = SQLiteQueryBuilder::new();
qb.push_sql("SELECT a, e, v, value_type_tag FROM ");
match attrs {
AttributeSpec::All => {
qb.push_sql("all_datoms WHERE e IN (");
interpose!(item, entities,
{ qb.push_sql(&item.to_string()) },
{ qb.push_sql(", ") });
qb.push_sql(") ORDER BY a ASC, e ASC");
self.forward_cached_attributes.extend(schema.attribute_map.keys());
},
AttributeSpec::Specified { fts, non_fts } => {
let has_fts = !fts.is_empty();
let has_non_fts = !non_fts.is_empty();
if !has_fts && !has_non_fts {
// Nothing to do.
return Ok(());
}
if has_non_fts {
qb.push_sql("datoms WHERE e IN (");
interpose!(item, entities,
{ qb.push_sql(&item.to_string()) },
{ qb.push_sql(", ") });
qb.push_sql(") AND a IN (");
interpose!(item, non_fts,
{ qb.push_sql(&item.to_string()) },
{ qb.push_sql(", ") });
qb.push_sql(")");
self.forward_cached_attributes.extend(non_fts.iter());
}
if has_fts && has_non_fts {
// Both.
qb.push_sql(" UNION ALL SELECT a, e, v, value_type_tag FROM ");
}
if has_fts {
qb.push_sql("fulltext_datoms WHERE e IN (");
interpose!(item, entities,
{ qb.push_sql(&item.to_string()) },
{ qb.push_sql(", ") });
qb.push_sql(") AND a IN (");
interpose!(item, fts,
{ qb.push_sql(&item.to_string()) },
{ qb.push_sql(", ") });
qb.push_sql(")");
self.forward_cached_attributes.extend(fts.iter());
}
qb.push_sql(" ORDER BY a ASC, e ASC");
},
};
let SQLQuery { sql, args } = qb.finish();
assert!(args.is_empty()); // TODO: we know there are never args, but we'd like to run this query 'properly'.
let mut stmt = sqlite.prepare(sql.as_str())?;
let replacing = false;
self.repopulate_from_aevt(schema, &mut stmt, vec![], replacing)
}
/// Return a reference to the cache for the provided `a`, if `a` names an attribute that is
/// cached in the forward direction. If `a` doesn't name an attribute, or it's not cached at
/// all, or it's only cached in reverse (`v` to `e`, not `e` to `v`), `None` is returned.
pub fn forward_attribute_cache_for_attribute<'a, 's>(&'a self, schema: &'s Schema, a: Entid) -> Option<&'a AttributeCache> {
if !self.forward_cached_attributes.contains(&a) {
return None;
}
schema.attribute_for_entid(a)
.and_then(|attr|
if attr.multival {
self.multi_vals.get(&a).map(|v| v as &AttributeCache)
} else {
self.single_vals.get(&a).map(|v| v as &AttributeCache)
})
}
/// Fetch the requested entities and attributes from the store and put them in the cache.
/// The caller is responsible for ensuring that `entities` is unique.
/// Attributes for which every entity is already cached will not be processed again.
pub fn extend_cache_for_entities_and_attributes<'s, 'c>(&mut self,
schema: &'s Schema,
sqlite: &'c rusqlite::Connection,
mut attrs: AttributeSpec,
entities: &Vec<Entid>) -> Result<()> {
// TODO: Exclude any entities for which every attribute is known.
// TODO: initialize from an existing (complete) AttributeCache.
// Exclude any attributes for which every entity's value is already known.
match &mut attrs {
&mut AttributeSpec::All => {
// If we're caching all attributes, there's nothing we can exclude.
},
&mut AttributeSpec::Specified { ref mut non_fts, ref mut fts } => {
// Remove any attributes for which all entities are present in the cache (even
// as a 'miss').
let exclude_missing = |vec: &mut Vec<Entid>| {
vec.retain(|a| {
if let Some(attr) = schema.attribute_for_entid(*a) {
if !self.forward_cached_attributes.contains(a) {
// The attribute isn't cached at all. Do the work for all entities.
return true;
}
// Return true if there are any entities missing for this attribute.
if attr.multival {
self.multi_vals
.get(&a)
.map(|cache| entities.iter().any(|e| !cache.has_e(*e)))
.unwrap_or(true)
} else {
self.single_vals
.get(&a)
.map(|cache| entities.iter().any(|e| !cache.has_e(*e)))
.unwrap_or(true)
}
} else {
// Unknown attribute.
false
}
});
};
exclude_missing(non_fts);
exclude_missing(fts);
},
}
self.populate_cache_for_entities_and_attributes(schema, sqlite, attrs, entities)
}
/// Fetch the requested entities and attributes and put them in a new cache.
/// The caller is responsible for ensuring that `entities` is unique.
pub fn make_cache_for_entities_and_attributes<'s, 'c>(schema: &'s Schema,
sqlite: &'c rusqlite::Connection,
attrs: AttributeSpec,
entities: &Vec<Entid>) -> Result<AttributeCaches> {
let mut cache = AttributeCaches::default();
cache.populate_cache_for_entities_and_attributes(schema, sqlite, attrs, entities)?;
Ok(cache)
}
}
impl CachedAttributes for AttributeCaches {
fn get_values_for_entid(&self, schema: &Schema, attribute: Entid, entid: Entid) -> Option<&Vec<TypedValue>> {
self.values_pairs(schema, attribute)

View file

@ -23,7 +23,8 @@ extern crate tabwriter;
extern crate time;
#[macro_use] extern crate edn;
extern crate mentat_core;
#[macro_use] extern crate mentat_core;
extern crate mentat_sql;
extern crate mentat_tx;
extern crate mentat_tx_parser;
@ -60,6 +61,7 @@ pub use schema::{
AttributeBuilder,
AttributeValidation,
};
pub use bootstrap::{
CORE_SCHEMA_VERSION,
};

View file

@ -116,7 +116,7 @@ macro_rules! assert_parse_failure_contains {
let input = edn::parse::value($input).expect("to be able to parse input as EDN");
let par = $parser();
let stream = input.atom_stream();
let result = par.skip(eof()).parse(stream).map(|x| x.0).map_err(|e| -> ::ValueParseError { e.into() });
let result = par.skip(eof()).parse(stream).map(|x| x.0).map_err(|e| -> $crate::ValueParseError { e.into() });
assert!(format!("{:?}", result).contains($expected), "Expected {:?} to contain {:?}", result, $expected);
}}
}

View file

@ -40,7 +40,10 @@ use mentat_core::{
use mentat_core::counter::RcCounter;
use mentat_query::{
Element,
FindSpec,
NamespacedKeyword,
Pull,
Variable,
WhereClause,
};
@ -334,6 +337,21 @@ impl ConjoiningClauses {
}
}
/// Early-stage query handling.
impl ConjoiningClauses {
pub(crate) fn derive_types_from_find_spec(&mut self, find_spec: &FindSpec) {
for spec in find_spec.columns() {
match spec {
&Element::Pull(Pull { ref var, patterns: _ }) => {
self.constrain_var_to_type(var.clone(), ValueType::Ref);
},
_ => {
},
}
}
}
}
/// Cloning.
impl ConjoiningClauses {
fn make_receptacle(&self) -> ConjoiningClauses {

View file

@ -161,6 +161,10 @@ impl AlgebraicQuery {
self.find_spec
.columns()
.all(|e| match e {
// Pull expressions are never fully bound.
// TODO: but the 'inside' of a pull expression certainly can be.
&Element::Pull(_) => false,
&Element::Variable(ref var) |
&Element::Corresponding(ref var) => self.cc.is_value_bound(var),
@ -275,6 +279,9 @@ pub fn algebrize_with_inputs(known: Known,
let alias_counter = RcCounter::with_initial(counter);
let mut cc = ConjoiningClauses::with_inputs_and_alias_counter(parsed.in_vars, inputs, alias_counter);
// This is so the rest of the query knows that `?x` is a ref if `(pull ?x …)` appears in `:find`.
cc.derive_types_from_find_spec(&parsed.find_spec);
// Do we have a variable limit? If so, tell the CC that the var must be numeric.
if let &Limit::Variable(ref var) = &parsed.limit {
cc.constrain_var_to_long(var.clone());

View file

@ -18,7 +18,20 @@ use std; // To refer to std::result::Result.
use std::collections::BTreeSet;
use self::combine::{eof, many, many1, optional, parser, satisfy, satisfy_map, Parser, ParseResult, Stream};
use self::combine::{
eof,
look_ahead,
many,
many1,
optional,
parser,
satisfy,
satisfy_map,
Parser,
ParseResult,
Stream,
};
use self::combine::combinator::{any, choice, or, try};
use self::mentat_core::ValueType;
@ -33,6 +46,7 @@ use self::mentat_parser_utils::value_and_span::Stream as ValueStream;
use self::mentat_parser_utils::value_and_span::{
Item,
OfExactlyParsing,
forward_keyword,
keyword_map,
list,
map,
@ -58,6 +72,9 @@ use self::mentat_query::{
PatternNonValuePlace,
PatternValuePlace,
Predicate,
Pull,
PullAttributeSpec,
PullConcreteAttribute,
QueryFunction,
SrcVar,
TypeAnnotation,
@ -172,6 +189,8 @@ def_parser!(Query, order, Order, {
});
def_matches_plain_symbol!(Query, the, "the");
def_matches_plain_symbol!(Query, pull, "pull");
def_matches_plain_symbol!(Query, wildcard, "*");
pub struct Where<'a>(std::marker::PhantomData<&'a ()>);
@ -278,12 +297,69 @@ def_parser!(Query, func, (QueryFunction, Vec<FnArg>), {
});
def_parser!(Query, aggregate, Aggregate, {
seq().of_exactly(Query::func())
Query::func()
.map(|(func, args)| Aggregate {
func, args,
})
});
def_parser!(Query, pull_concrete_attribute, PullAttributeSpec, {
forward_keyword().map(|k|
PullAttributeSpec::Attribute(
PullConcreteAttribute::Ident(
::std::rc::Rc::new(k.clone()))))
});
def_parser!(Query, pull_wildcard_attribute, PullAttributeSpec, {
Query::wildcard().map(|_| PullAttributeSpec::Wildcard)
});
def_parser!(Query, pull_attribute, PullAttributeSpec, {
choice([
try(Query::pull_concrete_attribute()),
try(Query::pull_wildcard_attribute()),
// TODO: reversed keywords, entids (with aliases, presumably…).
])
});
// A wildcard can appear only once.
// If a wildcard appears, only map expressions can be present.
fn validate_attributes<'a, I>(attrs: I) -> std::result::Result<(), &'static str>
where I: IntoIterator<Item=&'a PullAttributeSpec> {
let mut wildcard_seen = false;
let mut non_map_or_wildcard_seen = false;
for attr in attrs {
match attr {
&PullAttributeSpec::Wildcard => {
if wildcard_seen {
return Err("duplicate wildcard pull attribute");
}
wildcard_seen = true;
if non_map_or_wildcard_seen {
return Err("wildcard with specified attributes");
}
},
// &PullAttributeSpec::LimitedAttribute(_, _) => {
&PullAttributeSpec::Attribute(_) => {
non_map_or_wildcard_seen = true;
if wildcard_seen {
return Err("wildcard with specified attributes");
}
},
// TODO: map form.
}
}
Ok(())
}
def_parser!(Query, pull_attributes, Vec<PullAttributeSpec>, {
vector().of_exactly(many1(Query::pull_attribute()))
.and_then(|attrs: Vec<PullAttributeSpec>|
validate_attributes(&attrs)
.and(Ok(attrs))
.map_err(|e| combine::primitives::Error::Unexpected(e.into())))
});
/// A vector containing just a parenthesized filter expression.
def_parser!(Where, pred, WhereClause, {
// Accept either a nested list or a nested vector here:
@ -432,7 +508,7 @@ def_parser!(Find, variable_element, Element, {
});
def_parser!(Find, corresponding_element, Element, {
seq().of_exactly(Query::the().with(Query::variable()))
Query::the().with(Query::variable())
.map(Element::Corresponding)
});
@ -440,10 +516,43 @@ def_parser!(Find, aggregate_element, Element, {
Query::aggregate().map(Element::Aggregate)
});
def_parser!(Find, pull_element, Element, {
Query::pull().with(Query::variable().and(Query::pull_attributes()))
.map(|(var, attrs)| Element::Pull(Pull { var: var, patterns: attrs }))
});
enum ElementType {
Corresponding,
Pull,
Aggregate,
}
def_parser!(Find, seq_elem, Element, {
let element_parser_for_type = |ty: ElementType| {
match ty {
ElementType::Corresponding => Find::corresponding_element(),
ElementType::Pull => Find::pull_element(),
ElementType::Aggregate => Find::aggregate_element(),
}
};
// This slightly tortured phrasing ensures that we don't consume
// when the first item in the list -- the function name -- doesn't
// match, but once we decide what the list is, we go ahead and
// commit to that branch.
seq().of_exactly(
// This comes first because otherwise (the ?x) will match as an aggregate.
look_ahead(Query::the()).map(|_| ElementType::Corresponding)
// Similarly, we have to parse pull before general functions.
.or(look_ahead(Query::pull()).map(|_| ElementType::Pull))
.or(look_ahead(Query::func()).map(|_| ElementType::Aggregate))
.then(element_parser_for_type))
});
def_parser!(Find, elem, Element, {
choice([try(Find::variable_element()),
try(Find::corresponding_element()),
try(Find::aggregate_element())])
try(Find::variable_element())
.or(Find::seq_elem())
});
def_parser!(Find, find_scalar, FindSpec, {
@ -982,7 +1091,7 @@ mod test {
#[test]
fn test_the() {
assert_edn_parses_to!(Find::corresponding_element,
assert_edn_parses_to!(Find::seq_elem,
"(the ?y)",
Element::Corresponding(Variable::from_valid_name("?y")));
assert_edn_parses_to!(Find::find_tuple,
@ -1017,6 +1126,11 @@ mod test {
"[:find [(the ?x) ?y]
:where [?x _ ?y]]",
expected_query);
// If we give a malformed pull expression, we don't fall through to aggregates.
assert_parse_failure_contains!(Find::elem,
"(pull x [])",
r#"errors: [Unexpected(Token(ValueAndSpan { inner: PlainSymbol(PlainSymbol("x")), span: Span(6, 7) })), Expected(Borrowed("variable"))]"#);
}
#[test]
@ -1073,4 +1187,82 @@ mod test {
}));
}
#[test]
fn test_pull() {
assert_edn_parses_to!(Query::pull_attribute,
"*",
PullAttributeSpec::Wildcard);
assert_edn_parses_to!(Query::pull_attributes,
"[*]",
vec![PullAttributeSpec::Wildcard]);
assert_edn_parses_to!(Find::elem,
"(pull ?v [*])",
Element::Pull(Pull {
var: Variable::from_valid_name("?v"),
patterns: vec![PullAttributeSpec::Wildcard],
}));
let foo_bar = ::std::rc::Rc::new(edn::NamespacedKeyword::new("foo", "bar"));
let foo_baz = ::std::rc::Rc::new(edn::NamespacedKeyword::new("foo", "baz"));
assert_edn_parses_to!(Query::pull_concrete_attribute,
":foo/bar",
PullAttributeSpec::Attribute(
PullConcreteAttribute::Ident(foo_bar.clone())));
assert_edn_parses_to!(Query::pull_attribute,
":foo/bar",
PullAttributeSpec::Attribute(
PullConcreteAttribute::Ident(foo_bar.clone())));
assert_edn_parses_to!(Find::elem,
"(pull ?v [:foo/bar :foo/baz])",
Element::Pull(Pull {
var: Variable::from_valid_name("?v"),
patterns: vec![
PullAttributeSpec::Attribute(
PullConcreteAttribute::Ident(foo_bar.clone())),
PullAttributeSpec::Attribute(
PullConcreteAttribute::Ident(foo_baz.clone())),
],
}));
assert_parse_failure_contains!(Find::elem,
"(pull ?x [* :foo/bar])",
r#"errors: [Unexpected(Borrowed("wildcard with specified attributes"))]"#);
}
#[test]
fn test_query_with_pull() {
let q = "[:find ?x (pull ?x [:foo/bar]) :where [?x _ _]]";
let expected_query =
FindQuery {
find_spec: FindSpec::FindRel(vec![
Element::Variable(Variable::from_valid_name("?x")),
Element::Pull(Pull {
var: Variable::from_valid_name("?x"),
patterns: vec![
PullAttributeSpec::Attribute(
PullConcreteAttribute::Ident(
::std::rc::Rc::new(edn::NamespacedKeyword::new("foo", "bar"))
)
),
] })]),
where_clauses: vec![
WhereClause::Pattern(Pattern {
source: None,
entity: PatternNonValuePlace::Variable(Variable::from_valid_name("?x")),
attribute: PatternNonValuePlace::Placeholder,
value: PatternValuePlace::Placeholder,
tx: PatternNonValuePlace::Placeholder,
})],
default_source: SrcVar::DefaultSrc,
with: Default::default(),
in_vars: Default::default(),
in_sources: Default::default(),
limit: Limit::None,
order: None,
};
assert_edn_parses_to!(Find::query,
q,
expected_query);
}
}

View file

@ -26,6 +26,9 @@ path = "../query"
[dependencies.mentat_query_algebrizer]
path = "../query-algebrizer"
[dependencies.mentat_query_pull]
path = "../query-pull"
# Only for tests.
[dev-dependencies.mentat_query_parser]
path = "../query-parser"

View file

@ -20,6 +20,8 @@ use mentat_query::{
PlainSymbol,
};
use mentat_query_pull;
use aggregates::{
SimpleAggregationOp,
};
@ -72,5 +74,6 @@ error_chain! {
links {
DbError(mentat_db::Error, mentat_db::ErrorKind);
PullError(mentat_query_pull::errors::Error, mentat_query_pull::errors::ErrorKind);
}
}

View file

@ -17,6 +17,7 @@ extern crate mentat_core;
extern crate mentat_db; // For value conversion.
extern crate mentat_query;
extern crate mentat_query_algebrizer;
extern crate mentat_query_pull;
extern crate mentat_query_sql;
extern crate mentat_sql;
@ -35,6 +36,7 @@ use rusqlite::{
use mentat_core::{
Binding,
Schema,
TypedValue,
ValueType,
ValueTypeTag,
@ -67,6 +69,8 @@ use mentat_query_sql::{
mod aggregates;
mod project;
mod projectors;
mod pull;
mod relresult;
pub mod errors;
@ -83,6 +87,22 @@ pub use project::{
projected_column_for_var,
};
pub use projectors::{
ConstantProjector,
Projector,
};
use projectors::{
CollProjector,
CollTwoStagePullProjector,
RelProjector,
RelTwoStagePullProjector,
ScalarProjector,
ScalarTwoStagePullProjector,
TupleProjector,
TupleTwoStagePullProjector,
};
pub use relresult::{
RelResult,
StructuredRelResult,
@ -161,7 +181,11 @@ impl QueryOutput {
QueryResults::Scalar(val)
},
&FindScalar(Element::Aggregate(ref _agg)) => {
// TODO
// TODO: static aggregates.
unimplemented!();
},
&FindScalar(Element::Pull(ref _pull)) => {
// TODO: static pull.
unimplemented!();
},
&FindTuple(ref elements) => {
@ -174,6 +198,10 @@ impl QueryOutput {
.expect("every var to have a binding")
.into()
},
&Element::Pull(ref _pull) => {
// TODO: static pull.
unreachable!();
},
&Element::Aggregate(ref _agg) => {
// TODO: static computation of aggregates, then
// implement the condition in `is_fully_bound`.
@ -191,6 +219,10 @@ impl QueryOutput {
.into();
QueryResults::Coll(vec![val])
},
&FindColl(Element::Pull(ref _pull)) => {
// TODO: static pull.
unimplemented!();
},
&FindColl(Element::Aggregate(ref _agg)) => {
// Does it even make sense to write
// [:find [(max ?x) ...] :where [_ :foo/bar ?x]]
@ -208,6 +240,10 @@ impl QueryOutput {
.expect("every var to have a binding")
.into()
},
&Element::Pull(ref _pull) => {
// TODO: static pull.
unreachable!();
},
&Element::Aggregate(ref _agg) => {
// TODO: static computation of aggregates, then
// implement the condition in `is_fully_bound`.
@ -336,283 +372,6 @@ impl TypedIndex {
}
}
pub trait Projector {
fn project<'stmt>(&self, rows: Rows<'stmt>) -> Result<QueryOutput>;
fn columns<'s>(&'s self) -> Box<Iterator<Item=&Element> + 's>;
}
/// A projector that produces a `QueryResult` containing fixed data.
/// Takes a boxed function that should return an empty result set of the desired type.
pub struct ConstantProjector {
spec: Rc<FindSpec>,
results_factory: Box<Fn() -> QueryResults>,
}
impl ConstantProjector {
fn new(spec: Rc<FindSpec>, results_factory: Box<Fn() -> QueryResults>) -> ConstantProjector {
ConstantProjector {
spec: spec,
results_factory: results_factory,
}
}
pub fn project_without_rows<'stmt>(&self) -> Result<QueryOutput> {
let results = (self.results_factory)();
let spec = self.spec.clone();
Ok(QueryOutput {
spec: spec,
results: results,
})
}
}
impl Projector for ConstantProjector {
fn project<'stmt>(&self, _: Rows<'stmt>) -> Result<QueryOutput> {
self.project_without_rows()
}
fn columns<'s>(&'s self) -> Box<Iterator<Item=&Element> + 's> {
self.spec.columns()
}
}
struct ScalarProjector {
spec: Rc<FindSpec>,
template: TypedIndex,
}
impl ScalarProjector {
fn with_template(spec: Rc<FindSpec>, template: TypedIndex) -> ScalarProjector {
ScalarProjector {
spec: spec,
template: template,
}
}
fn combine(spec: Rc<FindSpec>, mut elements: ProjectedElements) -> Result<CombinedProjection> {
let template = elements.templates.pop().expect("Expected a single template");
Ok(CombinedProjection {
sql_projection: elements.sql_projection,
pre_aggregate_projection: elements.pre_aggregate_projection,
datalog_projector: Box::new(ScalarProjector::with_template(spec, template)),
distinct: false,
group_by_cols: elements.group_by,
})
}
}
impl Projector for ScalarProjector {
fn project<'stmt>(&self, mut rows: Rows<'stmt>) -> Result<QueryOutput> {
let results =
if let Some(r) = rows.next() {
let row = r?;
let binding = self.template.lookup(&row)?;
QueryResults::Scalar(Some(binding))
} else {
QueryResults::Scalar(None)
};
Ok(QueryOutput {
spec: self.spec.clone(),
results: results,
})
}
fn columns<'s>(&'s self) -> Box<Iterator<Item=&Element> + 's> {
self.spec.columns()
}
}
/// A tuple projector produces a single vector. It's the single-result version of rel.
struct TupleProjector {
spec: Rc<FindSpec>,
len: usize,
templates: Vec<TypedIndex>,
}
impl TupleProjector {
fn with_templates(spec: Rc<FindSpec>, len: usize, templates: Vec<TypedIndex>) -> TupleProjector {
TupleProjector {
spec: spec,
len: len,
templates: templates,
}
}
// This is exactly the same as for rel.
fn collect_bindings<'a, 'stmt>(&self, row: Row<'a, 'stmt>) -> Result<Vec<Binding>> {
// There will be at least as many SQL columns as Datalog columns.
// gte 'cos we might be querying extra columns for ordering.
// The templates will take care of ignoring columns.
assert!(row.column_count() >= self.len as i32);
self.templates
.iter()
.map(|ti| ti.lookup(&row))
.collect::<Result<Vec<Binding>>>()
}
fn combine(spec: Rc<FindSpec>, column_count: usize, elements: ProjectedElements) -> Result<CombinedProjection> {
let p = TupleProjector::with_templates(spec, column_count, elements.templates);
Ok(CombinedProjection {
sql_projection: elements.sql_projection,
pre_aggregate_projection: elements.pre_aggregate_projection,
datalog_projector: Box::new(p),
distinct: false,
group_by_cols: elements.group_by,
})
}
}
impl Projector for TupleProjector {
fn project<'stmt>(&self, mut rows: Rows<'stmt>) -> Result<QueryOutput> {
let results =
if let Some(r) = rows.next() {
let row = r?;
let bindings = self.collect_bindings(row)?;
QueryResults::Tuple(Some(bindings))
} else {
QueryResults::Tuple(None)
};
Ok(QueryOutput {
spec: self.spec.clone(),
results: results,
})
}
fn columns<'s>(&'s self) -> Box<Iterator<Item=&Element> + 's> {
self.spec.columns()
}
}
/// A rel projector produces a RelResult, which is a striding abstraction over a vector.
/// Each stride across the vector is the same size, and sourced from the same columns.
/// Each column in each stride is the result of taking one or two columns from
/// the `Row`: one for the value and optionally one for the type tag.
struct RelProjector {
spec: Rc<FindSpec>,
len: usize,
templates: Vec<TypedIndex>,
}
impl RelProjector {
fn with_templates(spec: Rc<FindSpec>, len: usize, templates: Vec<TypedIndex>) -> RelProjector {
RelProjector {
spec: spec,
len: len,
templates: templates,
}
}
fn collect_bindings_into<'a, 'stmt, 'out>(&self, row: Row<'a, 'stmt>, out: &mut Vec<Binding>) -> Result<()> {
// There will be at least as many SQL columns as Datalog columns.
// gte 'cos we might be querying extra columns for ordering.
// The templates will take care of ignoring columns.
assert!(row.column_count() >= self.len as i32);
let mut count = 0;
for binding in self.templates
.iter()
.map(|ti| ti.lookup(&row)) {
out.push(binding?);
count += 1;
}
assert_eq!(self.len, count);
Ok(())
}
fn combine(spec: Rc<FindSpec>, column_count: usize, elements: ProjectedElements) -> Result<CombinedProjection> {
let p = RelProjector::with_templates(spec, column_count, elements.templates);
// If every column yields only one value, or if this is an aggregate query
// (because by definition every column in an aggregate query is either
// aggregated or is a variable _upon which we group_), then don't bother
// with DISTINCT.
let already_distinct = elements.pre_aggregate_projection.is_some() ||
p.columns().all(|e| e.is_unit());
Ok(CombinedProjection {
sql_projection: elements.sql_projection,
pre_aggregate_projection: elements.pre_aggregate_projection,
datalog_projector: Box::new(p),
distinct: !already_distinct,
group_by_cols: elements.group_by,
})
}
}
impl Projector for RelProjector {
fn project<'stmt>(&self, mut rows: Rows<'stmt>) -> Result<QueryOutput> {
// Allocate space for five rows to start.
// This is better than starting off by doubling the buffer a couple of times, and will
// rapidly grow to support larger query results.
let width = self.len;
let mut values: Vec<_> = Vec::with_capacity(5 * width);
while let Some(r) = rows.next() {
let row = r?;
self.collect_bindings_into(row, &mut values)?;
}
Ok(QueryOutput {
spec: self.spec.clone(),
results: QueryResults::Rel(RelResult { width, values }),
})
}
fn columns<'s>(&'s self) -> Box<Iterator<Item=&Element> + 's> {
self.spec.columns()
}
}
/// A coll projector produces a vector of values.
/// Each value is sourced from the same column.
struct CollProjector {
spec: Rc<FindSpec>,
template: TypedIndex,
}
impl CollProjector {
fn with_template(spec: Rc<FindSpec>, template: TypedIndex) -> CollProjector {
CollProjector {
spec: spec,
template: template,
}
}
fn combine(spec: Rc<FindSpec>, mut elements: ProjectedElements) -> Result<CombinedProjection> {
let template = elements.templates.pop().expect("Expected a single template");
let p = CollProjector::with_template(spec, template);
// If every column yields only one value, or if this is an aggregate query
// (because by definition every column in an aggregate query is either
// aggregated or is a variable _upon which we group_), then don't bother
// with DISTINCT.
let already_distinct = elements.pre_aggregate_projection.is_some() ||
p.columns().all(|e| e.is_unit());
Ok(CombinedProjection {
sql_projection: elements.sql_projection,
pre_aggregate_projection: elements.pre_aggregate_projection,
datalog_projector: Box::new(p),
distinct: !already_distinct,
group_by_cols: elements.group_by,
})
}
}
impl Projector for CollProjector {
fn project<'stmt>(&self, mut rows: Rows<'stmt>) -> Result<QueryOutput> {
let mut out: Vec<_> = vec![];
while let Some(r) = rows.next() {
let row = r?;
let binding = self.template.lookup(&row)?;
out.push(binding);
}
Ok(QueryOutput {
spec: self.spec.clone(),
results: QueryResults::Coll(out),
})
}
fn columns<'s>(&'s self) -> Box<Iterator<Item=&Element> + 's> {
self.spec.columns()
}
}
/// Combines the things you need to turn a query into SQL and turn its results into
/// `QueryResults`: SQL-related projection information (`DISTINCT`, columns, etc.) and
@ -652,6 +411,19 @@ impl CombinedProjection {
}
}
trait IsPull {
fn is_pull(&self) -> bool;
}
impl IsPull for Element {
fn is_pull(&self) -> bool {
match self {
&Element::Pull(_) => true,
_ => false,
}
}
}
/// Compute a suitable SQL projection for an algebrized query.
/// This takes into account a number of things:
/// - The variable list in the find spec.
@ -660,7 +432,7 @@ impl CombinedProjection {
/// - The bindings established by the topmost CC.
/// - The types known at algebrizing time.
/// - The types extracted from the store for unknown attributes.
pub fn query_projection(query: &AlgebraicQuery) -> Result<Either<ConstantProjector, CombinedProjection>> {
pub fn query_projection(schema: &Schema, query: &AlgebraicQuery) -> Result<Either<ConstantProjector, CombinedProjection>> {
use self::FindSpec::*;
let spec = query.find_spec.clone();
@ -671,6 +443,13 @@ pub fn query_projection(query: &AlgebraicQuery) -> Result<Either<ConstantProject
.map(|e| match e {
&Element::Variable(ref var) |
&Element::Corresponding(ref var) => var.clone(),
// Pull expressions can never be fully bound.
// TODO: but the interior can be, in which case we
// can handle this and simply project.
&Element::Pull(_) => {
unreachable!();
},
&Element::Aggregate(ref _agg) => {
// TODO: static computation of aggregates, then
// implement the condition in `is_fully_bound`.
@ -692,24 +471,42 @@ pub fn query_projection(query: &AlgebraicQuery) -> Result<Either<ConstantProject
match *query.find_spec {
FindColl(ref element) => {
let elements = project_elements(1, iter::once(element), query)?;
CollProjector::combine(spec, elements).map(|p| p.flip_distinct_for_limit(&query.limit))
if element.is_pull() {
CollTwoStagePullProjector::combine(spec, elements)
} else {
CollProjector::combine(spec, elements)
}.map(|p| p.flip_distinct_for_limit(&query.limit))
},
FindScalar(ref element) => {
let elements = project_elements(1, iter::once(element), query)?;
if element.is_pull() {
ScalarTwoStagePullProjector::combine(schema, spec, elements)
} else {
ScalarProjector::combine(spec, elements)
}
},
FindRel(ref elements) => {
let is_pull = elements.iter().any(|e| e.is_pull());
let column_count = query.find_spec.expected_column_count();
let elements = project_elements(column_count, elements, query)?;
RelProjector::combine(spec, column_count, elements).map(|p| p.flip_distinct_for_limit(&query.limit))
if is_pull {
RelTwoStagePullProjector::combine(spec, column_count, elements)
} else {
RelProjector::combine(spec, column_count, elements)
}.map(|p| p.flip_distinct_for_limit(&query.limit))
},
FindTuple(ref elements) => {
let is_pull = elements.iter().any(|e| e.is_pull());
let column_count = query.find_spec.expected_column_count();
let elements = project_elements(column_count, elements, query)?;
if is_pull {
TupleTwoStagePullProjector::combine(spec, column_count, elements)
} else {
TupleProjector::combine(spec, column_count, elements)
}
},
}.map(Either::Right)
}

View file

@ -28,6 +28,7 @@ use mentat_core::util::{
use mentat_query::{
Element,
Pull,
Variable,
};
@ -58,7 +59,18 @@ use errors::{
Result,
};
use projectors::{
Projector,
};
use pull::{
PullIndices,
PullOperation,
PullTemplate,
};
use super::{
CombinedProjection,
TypedIndex,
};
@ -73,9 +85,40 @@ pub(crate) struct ProjectedElements {
pub sql_projection: Projection,
pub pre_aggregate_projection: Option<Projection>,
pub templates: Vec<TypedIndex>,
// TODO: when we have an expression like
// [:find (pull ?x [:foo/name :foo/age]) (pull ?x [:foo/friend]) …]
// it would be more efficient to combine them.
pub pulls: Vec<PullTemplate>,
pub group_by: Vec<GroupBy>,
}
impl ProjectedElements {
pub(crate) fn combine(self, projector: Box<Projector>, distinct: bool) -> Result<CombinedProjection> {
Ok(CombinedProjection {
sql_projection: self.sql_projection,
pre_aggregate_projection: self.pre_aggregate_projection,
datalog_projector: projector,
distinct: distinct,
group_by_cols: self.group_by,
})
}
// We need the templates to make a projector that we can then hand to `combine`. This is the easy
// way to get it.
pub(crate) fn take_templates(&mut self) -> Vec<TypedIndex> {
let mut out = vec![];
::std::mem::swap(&mut out, &mut self.templates);
out
}
pub(crate) fn take_pulls(&mut self) -> Vec<PullTemplate> {
let mut out = vec![];
::std::mem::swap(&mut out, &mut self.pulls);
out
}
}
fn candidate_type_column(cc: &ConjoiningClauses, var: &Variable) -> Result<(ColumnOrExpression, Name)> {
cc.extracted_types
.get(var)
@ -120,6 +163,7 @@ pub fn projected_column_for_var(var: &Variable, cc: &ConjoiningClauses) -> Resul
Ok((ProjectedColumn(column, name), cc.known_type_set(var)))
}
}
/// Walk an iterator of `Element`s, collecting projector templates and columns.
///
/// Returns a `ProjectedElements`, which combines SQL projections
@ -148,6 +192,7 @@ pub(crate) fn project_elements<'a, I: IntoIterator<Item = &'a Element>>(
let mut i: i32 = 0;
let mut min_max_count: usize = 0;
let mut templates = vec![];
let mut pulls: Vec<PullTemplate> = vec![];
let mut aggregates = false;
@ -182,9 +227,11 @@ pub(crate) fn project_elements<'a, I: IntoIterator<Item = &'a Element>>(
},
&Element::Aggregate(_) => {
},
&Element::Pull(_) => {
},
};
// Record variables -- (the ?x) and ?x are different in this regard, because we don't want
// Record variables -- `(the ?x)` and `?x` are different in this regard, because we don't want
// to group on variables that are corresponding-projected.
match e {
&Element::Variable(ref var) => {
@ -195,6 +242,11 @@ pub(crate) fn project_elements<'a, I: IntoIterator<Item = &'a Element>>(
// so we know not to group them.
corresponded_variables.insert(var.clone());
},
&Element::Pull(Pull { ref var, patterns: _ }) => {
// We treat `pull` as an ordinary variable extraction,
// and we expand it later.
outer_variables.insert(var.clone());
},
&Element::Aggregate(_) => {
},
};
@ -225,6 +277,35 @@ pub(crate) fn project_elements<'a, I: IntoIterator<Item = &'a Element>>(
outer_projection.push(Either::Left(type_name));
}
},
&Element::Pull(Pull { ref var, ref patterns }) => {
inner_variables.insert(var.clone());
let (projected_column, type_set) = projected_column_for_var(&var, &query.cc)?;
outer_projection.push(Either::Left(projected_column.1.clone()));
inner_projection.push(projected_column);
if let Some(tag) = type_set.unique_type_tag() {
// We will have at least as many SQL columns as Datalog output columns.
// `i` tracks the former. The length of `templates` is the current latter.
// Projecting pull requires grabbing values, which we can do from the raw
// rows, and then populating the output, so we keep both column indices.
let output_index = templates.len();
assert!(output_index <= i as usize);
templates.push(TypedIndex::Known(i, tag));
pulls.push(PullTemplate {
indices: PullIndices {
sql_index: i,
output_index,
},
op: PullOperation((*patterns).clone()),
});
i += 1; // We used one SQL column.
} else {
// This should be impossible: (pull ?x) implies that ?x is a ref.
unreachable!();
}
},
&Element::Aggregate(ref a) => {
if let Some(simple) = a.to_simple() {
aggregates = true;
@ -331,6 +412,7 @@ pub(crate) fn project_elements<'a, I: IntoIterator<Item = &'a Element>>(
sql_projection: Projection::Columns(inner_projection),
pre_aggregate_projection: None,
templates,
pulls,
group_by: vec![],
});
}
@ -434,6 +516,7 @@ pub(crate) fn project_elements<'a, I: IntoIterator<Item = &'a Element>>(
sql_projection: Projection::Columns(outer_projection),
pre_aggregate_projection: Some(Projection::Columns(inner_projection)),
templates,
pulls,
group_by,
})
}

View file

@ -0,0 +1,66 @@
// Copyright 2018 Mozilla
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// this file except in compliance with the License. You may obtain a copy of the
// License at http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
use std::rc::Rc;
use ::{
Element,
FindSpec,
QueryOutput,
QueryResults,
Rows,
Schema,
rusqlite,
};
use ::errors::{
Result,
};
use super::{
Projector,
};
/// A projector that produces a `QueryResult` containing fixed data.
/// Takes a boxed function that should return an empty result set of the desired type.
pub struct ConstantProjector {
spec: Rc<FindSpec>,
results_factory: Box<Fn() -> QueryResults>,
}
impl ConstantProjector {
pub fn new(spec: Rc<FindSpec>, results_factory: Box<Fn() -> QueryResults>) -> ConstantProjector {
ConstantProjector {
spec: spec,
results_factory: results_factory,
}
}
pub fn project_without_rows<'stmt>(&self) -> Result<QueryOutput> {
let results = (self.results_factory)();
let spec = self.spec.clone();
Ok(QueryOutput {
spec: spec,
results: results,
})
}
}
// TODO: a ConstantProjector with non-constant pull expressions.
impl Projector for ConstantProjector {
fn project<'stmt, 's>(&self, _schema: &Schema, _sqlite: &'s rusqlite::Connection, _rows: Rows<'stmt>) -> Result<QueryOutput> {
self.project_without_rows()
}
fn columns<'s>(&'s self) -> Box<Iterator<Item=&Element> + 's> {
self.spec.columns()
}
}

View file

@ -0,0 +1,46 @@
// Copyright 2018 Mozilla
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// this file except in compliance with the License. You may obtain a copy of the
// License at http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
use super::{
Element,
Schema,
QueryOutput,
Rows,
rusqlite,
};
use super::errors::{
Result,
};
pub trait Projector {
fn project<'stmt, 's>(&self, schema: &Schema, sqlite: &'s rusqlite::Connection, rows: Rows<'stmt>) -> Result<QueryOutput>;
fn columns<'s>(&'s self) -> Box<Iterator<Item=&Element> + 's>;
}
mod constant;
mod simple;
mod pull_two_stage;
pub use self::constant::ConstantProjector;
pub(crate) use self::simple::{
CollProjector,
RelProjector,
ScalarProjector,
TupleProjector,
};
pub(crate) use self::pull_two_stage::{
CollTwoStagePullProjector,
RelTwoStagePullProjector,
ScalarTwoStagePullProjector,
TupleTwoStagePullProjector,
};

View file

@ -0,0 +1,335 @@
// Copyright 2018 Mozilla
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// this file except in compliance with the License. You may obtain a copy of the
// License at http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
use std::rc::Rc;
use std::iter::{
once,
};
use mentat_query_pull::{
Puller,
};
use mentat_core::{
Entid,
};
use ::{
Binding,
CombinedProjection,
Element,
FindSpec,
ProjectedElements,
QueryOutput,
QueryResults,
RelResult,
Row,
Rows,
Schema,
TypedIndex,
rusqlite,
};
use ::pull::{
PullConsumer,
PullOperation,
PullTemplate,
};
use ::errors::{
Result,
};
use super::{
Projector,
};
pub(crate) struct ScalarTwoStagePullProjector {
spec: Rc<FindSpec>,
puller: Puller,
}
// TODO: almost by definition, a scalar result format doesn't need to be run in two stages.
// The only output is the pull expression, and so we can directly supply the projected entity
// to the pull SQL.
impl ScalarTwoStagePullProjector {
fn with_template(schema: &Schema, spec: Rc<FindSpec>, pull: PullOperation) -> Result<ScalarTwoStagePullProjector> {
Ok(ScalarTwoStagePullProjector {
spec: spec,
puller: Puller::prepare(schema, pull.0.clone())?,
})
}
pub(crate) fn combine(schema: &Schema, spec: Rc<FindSpec>, mut elements: ProjectedElements) -> Result<CombinedProjection> {
let pull = elements.pulls.pop().expect("Expected a single pull");
let projector = Box::new(ScalarTwoStagePullProjector::with_template(schema, spec, pull.op)?);
let distinct = false;
elements.combine(projector, distinct)
}
}
impl Projector for ScalarTwoStagePullProjector {
fn project<'stmt, 's>(&self, schema: &Schema, sqlite: &'s rusqlite::Connection, mut rows: Rows<'stmt>) -> Result<QueryOutput> {
// Scalar is pretty straightforward -- zero or one entity, do the pull directly.
let results =
if let Some(r) = rows.next() {
let row = r?;
let entity: Entid = row.get(0); // This will always be 0 and a ref.
let bindings = self.puller.pull(schema, sqlite, once(entity))?;
let m = Binding::Map(bindings.get(&entity).cloned().unwrap_or_else(Default::default));
QueryResults::Scalar(Some(m))
} else {
QueryResults::Scalar(None)
};
Ok(QueryOutput {
spec: self.spec.clone(),
results: results,
})
}
fn columns<'s>(&'s self) -> Box<Iterator<Item=&Element> + 's> {
self.spec.columns()
}
}
/// A tuple projector produces a single vector. It's the single-result version of rel.
pub(crate) struct TupleTwoStagePullProjector {
spec: Rc<FindSpec>,
len: usize,
templates: Vec<TypedIndex>,
pulls: Vec<PullTemplate>,
}
impl TupleTwoStagePullProjector {
fn with_templates(spec: Rc<FindSpec>, len: usize, templates: Vec<TypedIndex>, pulls: Vec<PullTemplate>) -> TupleTwoStagePullProjector {
TupleTwoStagePullProjector {
spec: spec,
len: len,
templates: templates,
pulls: pulls,
}
}
// This is exactly the same as for rel.
fn collect_bindings<'a, 'stmt>(&self, row: Row<'a, 'stmt>) -> Result<Vec<Binding>> {
// There will be at least as many SQL columns as Datalog columns.
// gte 'cos we might be querying extra columns for ordering.
// The templates will take care of ignoring columns.
assert!(row.column_count() >= self.len as i32);
self.templates
.iter()
.map(|ti| ti.lookup(&row))
.collect::<Result<Vec<Binding>>>()
}
pub(crate) fn combine(spec: Rc<FindSpec>, column_count: usize, mut elements: ProjectedElements) -> Result<CombinedProjection> {
let projector = Box::new(TupleTwoStagePullProjector::with_templates(spec, column_count, elements.take_templates(), elements.take_pulls()));
let distinct = false;
elements.combine(projector, distinct)
}
}
impl Projector for TupleTwoStagePullProjector {
fn project<'stmt, 's>(&self, schema: &Schema, sqlite: &'s rusqlite::Connection, mut rows: Rows<'stmt>) -> Result<QueryOutput> {
let results =
if let Some(r) = rows.next() {
let row = r?;
// Keeping the compiler happy.
let pull_consumers: Result<Vec<PullConsumer>> = self.pulls
.iter()
.map(|op| PullConsumer::for_template(schema, op))
.collect();
let mut pull_consumers = pull_consumers?;
// Collect the usual bindings and accumulate entity IDs for pull.
for mut p in pull_consumers.iter_mut() {
p.collect_entity(&row);
}
let mut bindings = self.collect_bindings(row)?;
// Run the pull expressions for the collected IDs.
for mut p in pull_consumers.iter_mut() {
p.pull(sqlite)?;
}
// Expand the pull expressions back into the results vector.
for p in pull_consumers.into_iter() {
p.expand(&mut bindings);
}
QueryResults::Tuple(Some(bindings))
} else {
QueryResults::Tuple(None)
};
Ok(QueryOutput {
spec: self.spec.clone(),
results: results,
})
}
fn columns<'s>(&'s self) -> Box<Iterator<Item=&Element> + 's> {
self.spec.columns()
}
}
/// A rel projector produces a RelResult, which is a striding abstraction over a vector.
/// Each stride across the vector is the same size, and sourced from the same columns.
/// Each column in each stride is the result of taking one or two columns from
/// the `Row`: one for the value and optionally one for the type tag.
pub(crate) struct RelTwoStagePullProjector {
spec: Rc<FindSpec>,
len: usize,
templates: Vec<TypedIndex>,
pulls: Vec<PullTemplate>,
}
impl RelTwoStagePullProjector {
fn with_templates(spec: Rc<FindSpec>, len: usize, templates: Vec<TypedIndex>, pulls: Vec<PullTemplate>) -> RelTwoStagePullProjector {
RelTwoStagePullProjector {
spec: spec,
len: len,
templates: templates,
pulls: pulls,
}
}
fn collect_bindings_into<'a, 'stmt, 'out>(&self, row: Row<'a, 'stmt>, out: &mut Vec<Binding>) -> Result<()> {
// There will be at least as many SQL columns as Datalog columns.
// gte 'cos we might be querying extra columns for ordering.
// The templates will take care of ignoring columns.
assert!(row.column_count() >= self.len as i32);
let mut count = 0;
for binding in self.templates
.iter()
.map(|ti| ti.lookup(&row)) {
out.push(binding?);
count += 1;
}
assert_eq!(self.len, count);
Ok(())
}
pub(crate) fn combine(spec: Rc<FindSpec>, column_count: usize, mut elements: ProjectedElements) -> Result<CombinedProjection> {
let projector = Box::new(RelTwoStagePullProjector::with_templates(spec, column_count, elements.take_templates(), elements.take_pulls()));
// If every column yields only one value, or if this is an aggregate query
// (because by definition every column in an aggregate query is either
// aggregated or is a variable _upon which we group_), then don't bother
// with DISTINCT.
let already_distinct = elements.pre_aggregate_projection.is_some() ||
projector.columns().all(|e| e.is_unit());
elements.combine(projector, !already_distinct)
}
}
impl Projector for RelTwoStagePullProjector {
fn project<'stmt, 's>(&self, schema: &Schema, sqlite: &'s rusqlite::Connection, mut rows: Rows<'stmt>) -> Result<QueryOutput> {
// Allocate space for five rows to start.
// This is better than starting off by doubling the buffer a couple of times, and will
// rapidly grow to support larger query results.
let width = self.len;
let mut values: Vec<_> = Vec::with_capacity(5 * width);
let pull_consumers: Result<Vec<PullConsumer>> = self.pulls
.iter()
.map(|op| PullConsumer::for_template(schema, op))
.collect();
let mut pull_consumers = pull_consumers?;
// Collect the usual bindings and accumulate entity IDs for pull.
while let Some(r) = rows.next() {
let row = r?;
for mut p in pull_consumers.iter_mut() {
p.collect_entity(&row);
}
self.collect_bindings_into(row, &mut values)?;
}
// Run the pull expressions for the collected IDs.
for mut p in pull_consumers.iter_mut() {
p.pull(sqlite)?;
}
// Expand the pull expressions back into the results vector.
for bindings in values.chunks_mut(width) {
for p in pull_consumers.iter() {
p.expand(bindings);
}
};
Ok(QueryOutput {
spec: self.spec.clone(),
results: QueryResults::Rel(RelResult { width, values }),
})
}
fn columns<'s>(&'s self) -> Box<Iterator<Item=&Element> + 's> {
self.spec.columns()
}
}
/// A coll projector produces a vector of values.
/// Each value is sourced from the same column.
pub(crate) struct CollTwoStagePullProjector {
spec: Rc<FindSpec>,
pull: PullOperation,
}
impl CollTwoStagePullProjector {
fn with_pull(spec: Rc<FindSpec>, pull: PullOperation) -> CollTwoStagePullProjector {
CollTwoStagePullProjector {
spec: spec,
pull: pull,
}
}
pub(crate) fn combine(spec: Rc<FindSpec>, mut elements: ProjectedElements) -> Result<CombinedProjection> {
let pull = elements.pulls.pop().expect("Expected a single pull");
let projector = Box::new(CollTwoStagePullProjector::with_pull(spec, pull.op));
// If every column yields only one value, or we're grouping by the value,
// don't bother with DISTINCT. This shouldn't really apply to coll-pull.
let already_distinct = elements.pre_aggregate_projection.is_some() ||
projector.columns().all(|e| e.is_unit());
elements.combine(projector, !already_distinct)
}
}
impl Projector for CollTwoStagePullProjector {
fn project<'stmt, 's>(&self, schema: &Schema, sqlite: &'s rusqlite::Connection, mut rows: Rows<'stmt>) -> Result<QueryOutput> {
let mut pull_consumer = PullConsumer::for_operation(schema, &self.pull)?;
while let Some(r) = rows.next() {
let row = r?;
pull_consumer.collect_entity(&row);
}
// Run the pull expressions for the collected IDs.
pull_consumer.pull(sqlite)?;
// Expand the pull expressions into a results vector.
let out = pull_consumer.into_coll_results();
Ok(QueryOutput {
spec: self.spec.clone(),
results: QueryResults::Coll(out),
})
}
fn columns<'s>(&'s self) -> Box<Iterator<Item=&Element> + 's> {
self.spec.columns()
}
}

View file

@ -0,0 +1,253 @@
// Copyright 2018 Mozilla
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// this file except in compliance with the License. You may obtain a copy of the
// License at http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
use std::rc::Rc;
use ::{
Binding,
CombinedProjection,
Element,
FindSpec,
ProjectedElements,
QueryOutput,
QueryResults,
RelResult,
Row,
Rows,
Schema,
TypedIndex,
rusqlite,
};
use ::errors::{
Result,
};
use super::{
Projector,
};
pub(crate) struct ScalarProjector {
spec: Rc<FindSpec>,
template: TypedIndex,
}
impl ScalarProjector {
fn with_template(spec: Rc<FindSpec>, template: TypedIndex) -> ScalarProjector {
ScalarProjector {
spec: spec,
template: template,
}
}
pub(crate) fn combine(spec: Rc<FindSpec>, mut elements: ProjectedElements) -> Result<CombinedProjection> {
let template = elements.templates.pop().expect("Expected a single template");
let projector = Box::new(ScalarProjector::with_template(spec, template));
let distinct = false;
elements.combine(projector, distinct)
}
}
impl Projector for ScalarProjector {
fn project<'stmt, 's>(&self, _schema: &Schema, _sqlite: &'s rusqlite::Connection, mut rows: Rows<'stmt>) -> Result<QueryOutput> {
let results =
if let Some(r) = rows.next() {
let row = r?;
let binding = self.template.lookup(&row)?;
QueryResults::Scalar(Some(binding))
} else {
QueryResults::Scalar(None)
};
Ok(QueryOutput {
spec: self.spec.clone(),
results: results,
})
}
fn columns<'s>(&'s self) -> Box<Iterator<Item=&Element> + 's> {
self.spec.columns()
}
}
/// A tuple projector produces a single vector. It's the single-result version of rel.
pub(crate) struct TupleProjector {
spec: Rc<FindSpec>,
len: usize,
templates: Vec<TypedIndex>,
}
impl TupleProjector {
fn with_templates(spec: Rc<FindSpec>, len: usize, templates: Vec<TypedIndex>) -> TupleProjector {
TupleProjector {
spec: spec,
len: len,
templates: templates,
}
}
// This is just like we do for `rel`, but into a vec of its own.
fn collect_bindings<'a, 'stmt>(&self, row: Row<'a, 'stmt>) -> Result<Vec<Binding>> {
// There will be at least as many SQL columns as Datalog columns.
// gte 'cos we might be querying extra columns for ordering.
// The templates will take care of ignoring columns.
assert!(row.column_count() >= self.len as i32);
self.templates
.iter()
.map(|ti| ti.lookup(&row))
.collect::<Result<Vec<Binding>>>()
}
pub(crate) fn combine(spec: Rc<FindSpec>, column_count: usize, mut elements: ProjectedElements) -> Result<CombinedProjection> {
let projector = Box::new(TupleProjector::with_templates(spec, column_count, elements.take_templates()));
let distinct = false;
elements.combine(projector, distinct)
}
}
impl Projector for TupleProjector {
fn project<'stmt, 's>(&self, _schema: &Schema, _sqlite: &'s rusqlite::Connection, mut rows: Rows<'stmt>) -> Result<QueryOutput> {
let results =
if let Some(r) = rows.next() {
let row = r?;
let bindings = self.collect_bindings(row)?;
QueryResults::Tuple(Some(bindings))
} else {
QueryResults::Tuple(None)
};
Ok(QueryOutput {
spec: self.spec.clone(),
results: results,
})
}
fn columns<'s>(&'s self) -> Box<Iterator<Item=&Element> + 's> {
self.spec.columns()
}
}
/// A rel projector produces a RelResult, which is a striding abstraction over a vector.
/// Each stride across the vector is the same size, and sourced from the same columns.
/// Each column in each stride is the result of taking one or two columns from
/// the `Row`: one for the value and optionally one for the type tag.
pub(crate) struct RelProjector {
spec: Rc<FindSpec>,
len: usize,
templates: Vec<TypedIndex>,
}
impl RelProjector {
fn with_templates(spec: Rc<FindSpec>, len: usize, templates: Vec<TypedIndex>) -> RelProjector {
RelProjector {
spec: spec,
len: len,
templates: templates,
}
}
fn collect_bindings_into<'a, 'stmt, 'out>(&self, row: Row<'a, 'stmt>, out: &mut Vec<Binding>) -> Result<()> {
// There will be at least as many SQL columns as Datalog columns.
// gte 'cos we might be querying extra columns for ordering.
// The templates will take care of ignoring columns.
assert!(row.column_count() >= self.len as i32);
let mut count = 0;
for binding in self.templates
.iter()
.map(|ti| ti.lookup(&row)) {
out.push(binding?);
count += 1;
}
assert_eq!(self.len, count);
Ok(())
}
pub(crate) fn combine(spec: Rc<FindSpec>, column_count: usize, mut elements: ProjectedElements) -> Result<CombinedProjection> {
let projector = Box::new(RelProjector::with_templates(spec, column_count, elements.take_templates()));
// If every column yields only one value, or if this is an aggregate query
// (because by definition every column in an aggregate query is either
// aggregated or is a variable _upon which we group_), then don't bother
// with DISTINCT.
let already_distinct = elements.pre_aggregate_projection.is_some() ||
projector.columns().all(|e| e.is_unit());
elements.combine(projector, !already_distinct)
}
}
impl Projector for RelProjector {
fn project<'stmt, 's>(&self, _schema: &Schema, _sqlite: &'s rusqlite::Connection, mut rows: Rows<'stmt>) -> Result<QueryOutput> {
// Allocate space for five rows to start.
// This is better than starting off by doubling the buffer a couple of times, and will
// rapidly grow to support larger query results.
let width = self.len;
let mut values: Vec<_> = Vec::with_capacity(5 * width);
while let Some(r) = rows.next() {
let row = r?;
self.collect_bindings_into(row, &mut values)?;
}
Ok(QueryOutput {
spec: self.spec.clone(),
results: QueryResults::Rel(RelResult { width, values }),
})
}
fn columns<'s>(&'s self) -> Box<Iterator<Item=&Element> + 's> {
self.spec.columns()
}
}
/// A coll projector produces a vector of values.
/// Each value is sourced from the same column.
pub(crate) struct CollProjector {
spec: Rc<FindSpec>,
template: TypedIndex,
}
impl CollProjector {
fn with_template(spec: Rc<FindSpec>, template: TypedIndex) -> CollProjector {
CollProjector {
spec: spec,
template: template,
}
}
pub(crate) fn combine(spec: Rc<FindSpec>, mut elements: ProjectedElements) -> Result<CombinedProjection> {
let template = elements.templates.pop().expect("Expected a single template");
let projector = Box::new(CollProjector::with_template(spec, template));
// If every column yields only one value, or if this is an aggregate query
// (because by definition every column in an aggregate query is either
// aggregated or is a variable _upon which we group_), then don't bother
// with DISTINCT.
let already_distinct = elements.pre_aggregate_projection.is_some() ||
projector.columns().all(|e| e.is_unit());
elements.combine(projector, !already_distinct)
}
}
impl Projector for CollProjector {
fn project<'stmt, 's>(&self, _schema: &Schema, _sqlite: &'s rusqlite::Connection, mut rows: Rows<'stmt>) -> Result<QueryOutput> {
let mut out: Vec<_> = vec![];
while let Some(r) = rows.next() {
let row = r?;
let binding = self.template.lookup(&row)?;
out.push(binding);
}
Ok(QueryOutput {
spec: self.spec.clone(),
results: QueryResults::Coll(out),
})
}
fn columns<'s>(&'s self) -> Box<Iterator<Item=&Element> + 's> {
self.spec.columns()
}
}

View file

@ -0,0 +1,121 @@
// Copyright 2018 Mozilla
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// this file except in compliance with the License. You may obtain a copy of the
// License at http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
use std::collections::{
BTreeMap,
BTreeSet,
};
use mentat_core::{
Binding,
Entid,
Schema,
StructuredMap,
TypedValue,
ValueRc,
};
use mentat_query::{
PullAttributeSpec,
};
use mentat_query_pull::{
Puller,
};
use errors::{
Result,
};
use super::{
Index,
rusqlite,
};
#[derive(Clone, Debug)]
pub(crate) struct PullOperation(pub(crate) Vec<PullAttributeSpec>);
#[derive(Clone, Copy, Debug)]
pub(crate) struct PullIndices {
pub(crate) sql_index: Index, // SQLite column index.
pub(crate) output_index: usize,
}
impl PullIndices {
fn zero() -> PullIndices {
PullIndices {
sql_index: 0,
output_index: 0,
}
}
}
#[derive(Debug)]
pub(crate) struct PullTemplate {
pub(crate) indices: PullIndices,
pub(crate) op: PullOperation,
}
pub(crate) struct PullConsumer<'schema> {
indices: PullIndices,
schema: &'schema Schema,
puller: Puller,
entities: BTreeSet<Entid>,
results: BTreeMap<Entid, ValueRc<StructuredMap>>,
}
impl<'schema> PullConsumer<'schema> {
pub(crate) fn for_puller(puller: Puller, schema: &'schema Schema, indices: PullIndices) -> PullConsumer<'schema> {
PullConsumer {
indices: indices,
schema: schema,
puller: puller,
entities: Default::default(),
results: Default::default(),
}
}
pub(crate) fn for_template(schema: &'schema Schema, template: &PullTemplate) -> Result<PullConsumer<'schema>> {
let puller = Puller::prepare(schema, template.op.0.clone())?;
Ok(PullConsumer::for_puller(puller, schema, template.indices))
}
pub(crate) fn for_operation(schema: &'schema Schema, operation: &PullOperation) -> Result<PullConsumer<'schema>> {
let puller = Puller::prepare(schema, operation.0.clone())?;
Ok(PullConsumer::for_puller(puller, schema, PullIndices::zero()))
}
pub(crate) fn collect_entity<'a, 'stmt>(&mut self, row: &rusqlite::Row<'a, 'stmt>) -> Entid {
let entity = row.get(self.indices.sql_index);
self.entities.insert(entity);
entity
}
pub(crate) fn pull(&mut self, sqlite: &rusqlite::Connection) -> Result<()> {
let entities: Vec<Entid> = self.entities.iter().cloned().collect();
self.results = self.puller.pull(self.schema, sqlite, entities)?;
Ok(())
}
pub(crate) fn expand(&self, bindings: &mut [Binding]) {
if let Binding::Scalar(TypedValue::Ref(id)) = bindings[self.indices.output_index] {
if let Some(pulled) = self.results.get(&id).cloned() {
bindings[self.indices.output_index] = Binding::Map(pulled);
} else {
bindings[self.indices.output_index] = Binding::Map(ValueRc::new(Default::default()));
}
}
}
// TODO: do we need to include empty maps for entities that didn't match any pull?
pub(crate) fn into_coll_results(self) -> Vec<Binding> {
self.results.values().cloned().map(|vrc| Binding::Map(vrc)).collect()
}
}

View file

@ -84,7 +84,7 @@ fn test_aggregate_unsuitable_type() {
let algebrized = algebrize(Known::for_schema(&schema), parsed).expect("query algebrizes");
// … when we look at the projection list, we cannot reconcile the types.
assert!(query_projection(&algebrized).is_err());
assert!(query_projection(&schema, &algebrized).is_err());
}
#[test]
@ -100,7 +100,7 @@ fn test_the_without_max_or_min() {
let algebrized = algebrize(Known::for_schema(&schema), parsed).expect("query algebrizes");
// … when we look at the projection list, we cannot reconcile the types.
let projection = query_projection(&algebrized);
let projection = query_projection(&schema, &algebrized);
assert!(projection.is_err());
use ::mentat_query_projector::errors::{
ErrorKind,

33
query-pull/Cargo.toml Normal file
View file

@ -0,0 +1,33 @@
[package]
name = "mentat_query_pull"
version = "0.0.1"
workspace = ".."
[dependencies]
error-chain = { git = "https://github.com/rnewman/error-chain", branch = "rnewman/sync" }
[dependencies.rusqlite]
version = "0.13"
features = ["limits"]
[dependencies.mentat_core]
path = "../core"
[dependencies.mentat_db]
path = "../db"
[dependencies.mentat_query]
path = "../query"
[dependencies.mentat_query_algebrizer]
path = "../query-algebrizer"
[dependencies.mentat_query_sql]
path = "../query-sql"
[dependencies.mentat_sql]
path = "../sql"
# Only for tests.
[dev-dependencies.mentat_query_parser]
path = "../query-parser"

30
query-pull/src/errors.rs Normal file
View file

@ -0,0 +1,30 @@
// Copyright 2018 Mozilla
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// this file except in compliance with the License. You may obtain a copy of the
// License at http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
use mentat_core::{
Entid,
};
error_chain! {
types {
Error, ErrorKind, ResultExt, Result;
}
errors {
UnnamedAttribute(id: Entid) {
description("unnamed attribute")
display("attribute {:?} has no name", id)
}
}
links {
DbError(::mentat_db::Error, ::mentat_db::ErrorKind);
}
}

246
query-pull/src/lib.rs Normal file
View file

@ -0,0 +1,246 @@
// Copyright 2018 Mozilla
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// this file except in compliance with the License. You may obtain a copy of the
// License at http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
#![allow(dead_code)]
///! A pull expression is a function.
///!
///! Its inputs are a store, a schema, and a set of bindings.
///!
///! Its output is a map whose keys are the input bindings and whose values are
///! appropriate structured values to represent the pull expression.
///!
///! For example, the pull expression:
///!
///! ```edn
///! (pull ?person [:person/name
///! :person/tattoo
///! {:person/friend [*]}])`
///! ```
///!
///! will return values shaped like:
///!
///! ```edn
///! {:person/name "Alice" ; Single-valued attribute
///! ; Absence: Alice has no tattoos.
///! :person/friend [ ; Multi-valued attribute.
///! {:person/name "Bob" ; Nesting and wildcard.
///! :person/pet ["Harrison", "Hoppy"]}]}
///! ```
///!
///! There will be one such value for each input binding.
///!
///! We fetch layers of a pull expression iteratively: all attributes at the same
///! 'level' can be fetched at the same time and accumulated into maps.
///!
///! Those maps are wrapped in `Rc` for two reasons:
///! - They might occur multiple times when projected from a `:find` query.
///! - They might refer to each other (consider recursion).
///!
///! A nested or recursive pull expression consumes values produced by earlier stages
///! (the recursion with a smaller recursion limit and a growing 'seen' list),
///! generating another layer of mappings.
///!
///! For example, you can imagine the nesting in the earlier pull expression being
///! decomposed into two chained expressions:
///!
///! ```edn
///! (pull
///! (pull ?person [:person/friend])
/// [*]))
///! ```
#[macro_use]
extern crate error_chain;
extern crate rusqlite;
extern crate mentat_core;
extern crate mentat_db;
extern crate mentat_query;
extern crate mentat_query_algebrizer;
extern crate mentat_query_sql;
extern crate mentat_sql;
use std::collections::{
BTreeMap,
BTreeSet,
};
use std::iter::{
once,
};
use mentat_core::{
Cloned,
Entid,
HasSchema,
NamespacedKeyword,
Schema,
StructuredMap,
ValueRc,
};
use mentat_db::cache;
use mentat_query::{
PullAttributeSpec,
PullConcreteAttribute,
};
pub mod errors;
use errors::{
ErrorKind,
Result,
};
type PullResults = BTreeMap<Entid, ValueRc<StructuredMap>>;
pub fn pull_attributes_for_entity<A>(schema: &Schema,
db: &rusqlite::Connection,
entity: Entid,
attributes: A) -> Result<StructuredMap>
where A: IntoIterator<Item=Entid> {
let attrs = attributes.into_iter()
.map(|e| PullAttributeSpec::Attribute(PullConcreteAttribute::Entid(e)))
.collect();
Puller::prepare(schema, attrs)?
.pull(schema, db, once(entity))
.map(|m| m.into_iter()
.next()
.map(|(k, vs)| {
assert_eq!(k, entity);
vs.cloned()
})
.unwrap_or_else(StructuredMap::default))
}
pub fn pull_attributes_for_entities<E, A>(schema: &Schema,
db: &rusqlite::Connection,
entities: E,
attributes: A) -> Result<PullResults>
where E: IntoIterator<Item=Entid>,
A: IntoIterator<Item=Entid> {
let attrs = attributes.into_iter()
.map(|e| PullAttributeSpec::Attribute(PullConcreteAttribute::Entid(e)))
.collect();
Puller::prepare(schema, attrs)?
.pull(schema, db, entities)
}
/// A `Puller` constructs on demand a map from a provided set of entity IDs to a set of structured maps.
pub struct Puller {
// The domain of this map is the set of attributes to fetch.
// The range is the set of aliases to use in the output.
attributes: BTreeMap<Entid, ValueRc<NamespacedKeyword>>,
attribute_spec: cache::AttributeSpec,
}
impl Puller {
pub fn prepare_simple_attributes(schema: &Schema, attributes: Vec<Entid>) -> Result<Puller> {
Puller::prepare(schema,
attributes.into_iter()
.map(|e| PullAttributeSpec::Attribute(PullConcreteAttribute::Entid(e)))
.collect())
}
pub fn prepare(schema: &Schema, attributes: Vec<PullAttributeSpec>) -> Result<Puller> {
// TODO: eventually this entry point will handle aliasing and that kind of
// thing. For now it's just a convenience.
let lookup_name = |i: &Entid| {
// In the unlikely event that we have an attribute with no name, we bail.
schema.get_ident(*i)
.map(|ident| ValueRc::new(ident.clone()))
.ok_or_else(|| ErrorKind::UnnamedAttribute(*i))
};
let mut names: BTreeMap<Entid, ValueRc<NamespacedKeyword>> = Default::default();
let mut attrs: BTreeSet<Entid> = Default::default();
for attr in attributes.iter() {
match attr {
&PullAttributeSpec::Wildcard => {
let attribute_ids = schema.attribute_map.keys();
for id in attribute_ids {
names.insert(*id, lookup_name(id)?);
attrs.insert(*id);
}
break;
},
&PullAttributeSpec::Attribute(PullConcreteAttribute::Ident(ref i)) => {
if let Some(entid) = schema.get_entid(i) {
names.insert(entid.into(), i.to_value_rc());
attrs.insert(entid.into());
}
},
&PullAttributeSpec::Attribute(PullConcreteAttribute::Entid(ref entid)) => {
names.insert(*entid, lookup_name(entid)?);
attrs.insert(*entid);
},
}
}
Ok(Puller {
attributes: names,
attribute_spec: cache::AttributeSpec::specified(&attrs, schema),
})
}
pub fn pull<E>(&self,
schema: &Schema,
db: &rusqlite::Connection,
entities: E) -> Result<PullResults>
where E: IntoIterator<Item=Entid> {
// We implement pull by:
// - Generating `AttributeCaches` for the provided attributes and entities.
// TODO: it would be nice to invert the cache as we build it, rather than have to invert it here.
// - Recursing. (TODO: we'll need AttributeCaches to not overwrite in case of recursion! And
// ideally not do excess work when some entity/attribute pairs are known.)
// - Building a structure by walking the pull expression with the caches.
// TODO: aliases.
// TODO: limits.
// TODO: fts.
// Build a cache for these attributes and entities.
// TODO: use the store's existing cache!
let entities: Vec<Entid> = entities.into_iter().collect();
let caches = cache::AttributeCaches::make_cache_for_entities_and_attributes(
schema,
db,
self.attribute_spec.clone(),
&entities)?;
// Now construct the appropriate result format.
// TODO: should we walk `e` then `a`, or `a` then `e`? Possibly the right answer
// is just to collect differently!
let mut maps = BTreeMap::new();
for (name, cache) in self.attributes.iter().filter_map(|(a, name)|
caches.forward_attribute_cache_for_attribute(schema, *a)
.map(|cache| (name.clone(), cache))) {
for e in entities.iter() {
if let Some(binding) = cache.binding_for_e(*e) {
let mut r = maps.entry(*e)
.or_insert(ValueRc::new(StructuredMap::default()));
// Get into the inner map so we can accumulate a value.
// We can unwrap here because we created all of these maps…
let mut m = ValueRc::get_mut(r).unwrap();
m.insert(name.clone(), binding);
}
}
}
Ok(maps)
}
}

View file

@ -9,7 +9,7 @@
// specific language governing permissions and limitations under the License.
extern crate regex;
extern crate mentat_core;
#[macro_use] extern crate mentat_core;
extern crate mentat_query;
extern crate mentat_query_algebrizer;
extern crate mentat_sql;
@ -253,42 +253,6 @@ fn push_column(qb: &mut QueryBuilder, col: &Column) -> BuildQueryResult {
//---------------------------------------------------------
// Turn that representation into SQL.
/// A helper macro to sequentially process an iterable sequence,
/// evaluating a block between each pair of items.
///
/// This is used to simply and efficiently produce output like
///
/// ```sql
/// 1, 2, 3
/// ```
///
/// or
///
/// ```sql
/// x = 1 AND y = 2
/// ```
///
/// without producing an intermediate string sequence.
macro_rules! interpose {
( $name: pat, $across: expr, $body: block, $inter: block ) => {
interpose_iter!($name, $across.iter(), $body, $inter)
}
}
macro_rules! interpose_iter {
( $name: pat, $across: expr, $body: block, $inter: block ) => {
let mut seq = $across;
if let Some($name) = seq.next() {
$body;
for $name in seq {
$inter;
$body;
}
}
}
}
impl QueryFragment for ColumnOrExpression {
fn push_sql(&self, out: &mut QueryBuilder) -> BuildQueryResult {
use self::ColumnOrExpression::*;

View file

@ -9,6 +9,7 @@
// specific language governing permissions and limitations under the License.
use mentat_core::{
Schema,
SQLTypeAffinity,
SQLValueType,
SQLValueTypeSet,
@ -439,10 +440,10 @@ fn re_project(mut inner: SelectQuery, projection: Projection) -> SelectQuery {
/// Consume a provided `AlgebraicQuery` to yield a new
/// `ProjectedSelect`.
pub fn query_to_select(query: AlgebraicQuery) -> Result<ProjectedSelect> {
pub fn query_to_select(schema: &Schema, query: AlgebraicQuery) -> Result<ProjectedSelect> {
// TODO: we can't pass `query.limit` here if we aggregate during projection.
// SQL-based aggregation -- `SELECT SUM(datoms00.e)` -- is fine.
query_projection(&query).map(|e| match e {
query_projection(schema, &query).map(|e| match e {
Either::Left(constant) => ProjectedSelect::Constant(constant),
Either::Right(CombinedProjection {
sql_projection,

View file

@ -101,7 +101,7 @@ fn inner_translate_with_inputs(schema: &Schema, query: &'static str, inputs: Que
let known = Known::for_schema(schema);
let parsed = parse_find_string(query).expect("parse to succeed");
let algebrized = algebrize_with_inputs(known, parsed, 0, inputs).expect("algebrize to succeed");
query_to_select(algebrized).expect("translate to succeed")
query_to_select(schema, algebrized).expect("translate to succeed")
}
fn translate_with_inputs(schema: &Schema, query: &'static str, inputs: QueryInputs) -> SQLQuery {
@ -249,7 +249,7 @@ fn test_bound_variable_limit_affects_types() {
assert_eq!(Some(ValueType::Long),
algebrized.cc.known_type(&Variable::from_valid_name("?limit")));
let select = query_to_select(algebrized).expect("query to translate");
let select = query_to_select(&schema, algebrized).expect("query to translate");
let SQLQuery { sql, args } = query_to_sql(select);
// TODO: this query isn't actually correct -- we don't yet algebrize for variables that are
@ -340,7 +340,7 @@ fn test_unknown_ident() {
assert!(algebrized.is_known_empty());
// If you insist…
let select = query_to_select(algebrized).expect("query to translate");
let select = query_to_select(&schema, algebrized).expect("query to translate");
assert_query_is_empty(select, FindSpec::FindRel(vec![var!(?x).into()]));
}

View file

@ -486,18 +486,61 @@ impl PatternValuePlace {
}
}
/*
pub enum PullPattern {
Constant(Constant),
Variable(Variable),
// Not yet used.
// pub enum PullDefaultValue {
// EntidOrInteger(i64),
// IdentOrKeyword(Rc<NamespacedKeyword>),
// Constant(NonIntegerConstant),
// }
#[derive(Clone, Debug, Eq, PartialEq)]
pub enum PullConcreteAttribute {
Ident(Rc<NamespacedKeyword>),
Entid(i64),
}
pub struct Pull {
pub src: SrcVar,
pub var: Variable,
pub pattern: PullPattern, // Constant, variable, or plain variable.
#[derive(Clone, Debug, Eq, PartialEq)]
pub enum PullAttributeSpec {
Wildcard,
Attribute(PullConcreteAttribute),
// PullMapSpec(Vec<…>),
// AttributeWithOpts(PullConcreteAttribute, …),
// LimitedAttribute(PullConcreteAttribute, u64), // Limit nil => Attribute instead.
// DefaultedAttribute(PullConcreteAttribute, PullDefaultValue),
}
impl std::fmt::Display for PullConcreteAttribute {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
&PullConcreteAttribute::Ident(ref k) => {
write!(f, "{}", k)
},
&PullConcreteAttribute::Entid(i) => {
write!(f, "{}", i)
},
}
}
}
impl std::fmt::Display for PullAttributeSpec {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
&PullAttributeSpec::Wildcard => {
write!(f, "*")
},
&PullAttributeSpec::Attribute(ref a) => {
write!(f, "{}", a)
},
}
}
}
#[derive(Debug, Eq, PartialEq)]
pub struct Pull {
pub var: Variable,
pub patterns: Vec<PullAttributeSpec>,
}
*/
#[derive(Debug, Eq, PartialEq)]
pub struct Aggregate {
@ -516,7 +559,7 @@ pub enum Element {
/// `max` or `min` cannot yield predictable behavior, and will err during
/// algebrizing.
Corresponding(Variable),
// Pull(Pull), // TODO
Pull(Pull),
}
impl Element {
@ -524,6 +567,7 @@ impl Element {
pub fn is_unit(&self) -> bool {
match self {
&Element::Variable(_) => false,
&Element::Pull(_) => false,
&Element::Aggregate(_) => true,
&Element::Corresponding(_) => true,
}
@ -542,6 +586,13 @@ impl std::fmt::Display for Element {
&Element::Variable(ref var) => {
write!(f, "{}", var)
},
&Element::Pull(Pull { ref var, ref patterns }) => {
write!(f, "(pull {} [ ", var)?;
for p in patterns.iter() {
write!(f, "{} ", p)?;
}
write!(f, "])")
},
&Element::Aggregate(ref agg) => {
match agg.args.len() {
0 => write!(f, "({})", agg.func),
@ -573,10 +624,7 @@ pub enum Limit {
/// Examples:
///
/// ```rust
/// # extern crate edn;
/// # extern crate mentat_query;
/// # use std::rc::Rc;
/// # use edn::PlainSymbol;
/// # use mentat_query::{Element, FindSpec, Variable};
///
/// # fn main() {

View file

@ -10,6 +10,10 @@
#![allow(dead_code)]
use std::collections::{
BTreeMap,
};
use std::fs::{
File,
};
@ -41,7 +45,9 @@ use mentat_core::{
KnownEntid,
NamespacedKeyword,
Schema,
StructuredMap,
TypedValue,
ValueRc,
ValueType,
};
@ -67,6 +73,11 @@ use mentat_db::{
use mentat_db::internal_types::TermWithTempIds;
use mentat_query_pull::{
pull_attributes_for_entities,
pull_attributes_for_entity,
};
use mentat_tx;
use mentat_tx::entities::{
@ -161,6 +172,17 @@ pub struct Store {
}
impl Store {
/// Open a store at the supplied path, ensuring that it includes the bootstrap schema.
pub fn open(path: &str) -> Result<Store> {
let mut connection = ::new_connection(path)?;
let conn = Conn::connect(&mut connection)?;
Ok(Store {
conn: conn,
sqlite: connection,
})
}
/// Returns a totally blank store with no bootstrap schema. Use `open` instead.
pub fn open_empty(path: &str) -> Result<Store> {
if !path.is_empty() {
if Path::new(path).exists() {
@ -176,15 +198,6 @@ impl Store {
})
}
pub fn open(path: &str) -> Result<Store> {
let mut connection = ::new_connection(path)?;
let conn = Conn::connect(&mut connection)?;
Ok(Store {
conn: conn,
sqlite: connection,
})
}
pub fn transact(&mut self, transaction: &str) -> Result<TxReport> {
let mut ip = self.begin_transaction()?;
let report = ip.transact(transaction)?;
@ -206,6 +219,14 @@ pub trait Queryable {
where E: Into<Entid>;
}
pub trait Pullable {
fn pull_attributes_for_entities<E, A>(&self, entities: E, attributes: A) -> Result<BTreeMap<Entid, ValueRc<StructuredMap>>>
where E: IntoIterator<Item=Entid>,
A: IntoIterator<Item=Entid>;
fn pull_attributes_for_entity<A>(&self, entity: Entid, attributes: A) -> Result<StructuredMap>
where A: IntoIterator<Item=Entid>;
}
pub trait Syncable {
fn sync(&mut self, server_uri: &String, user_uuid: &String) -> Result<()>;
}
@ -257,6 +278,19 @@ impl<'a, 'c> Queryable for InProgressRead<'a, 'c> {
}
}
impl<'a, 'c> Pullable for InProgressRead<'a, 'c> {
fn pull_attributes_for_entities<E, A>(&self, entities: E, attributes: A) -> Result<BTreeMap<Entid, ValueRc<StructuredMap>>>
where E: IntoIterator<Item=Entid>,
A: IntoIterator<Item=Entid> {
self.0.pull_attributes_for_entities(entities, attributes)
}
fn pull_attributes_for_entity<A>(&self, entity: Entid, attributes: A) -> Result<StructuredMap>
where A: IntoIterator<Item=Entid> {
self.0.pull_attributes_for_entity(entity, attributes)
}
}
impl<'a, 'c> Queryable for InProgress<'a, 'c> {
fn q_once<T>(&self, query: &str, inputs: T) -> Result<QueryOutput>
where T: Into<Option<QueryInputs>> {
@ -308,6 +342,21 @@ impl<'a, 'c> Queryable for InProgress<'a, 'c> {
}
}
impl<'a, 'c> Pullable for InProgress<'a, 'c> {
fn pull_attributes_for_entities<E, A>(&self, entities: E, attributes: A) -> Result<BTreeMap<Entid, ValueRc<StructuredMap>>>
where E: IntoIterator<Item=Entid>,
A: IntoIterator<Item=Entid> {
pull_attributes_for_entities(&self.schema, &*(self.transaction), entities, attributes)
.map_err(|e| e.into())
}
fn pull_attributes_for_entity<A>(&self, entity: Entid, attributes: A) -> Result<StructuredMap>
where A: IntoIterator<Item=Entid> {
pull_attributes_for_entity(&self.schema, &*(self.transaction), entity, attributes)
.map_err(|e| e.into())
}
}
impl<'a, 'c> HasSchema for InProgressRead<'a, 'c> {
fn entid_for_type(&self, t: ValueType) -> Option<KnownEntid> {
self.0.entid_for_type(t)
@ -630,6 +679,19 @@ impl Queryable for Store {
}
}
impl Pullable for Store {
fn pull_attributes_for_entities<E, A>(&self, entities: E, attributes: A) -> Result<BTreeMap<Entid, ValueRc<StructuredMap>>>
where E: IntoIterator<Item=Entid>,
A: IntoIterator<Item=Entid> {
self.conn.pull_attributes_for_entities(&self.sqlite, entities, attributes)
}
fn pull_attributes_for_entity<A>(&self, entity: Entid, attributes: A) -> Result<StructuredMap>
where A: IntoIterator<Item=Entid> {
self.conn.pull_attributes_for_entity(&self.sqlite, entity, attributes)
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum CacheDirection {
Forward,
@ -756,6 +818,29 @@ impl Conn {
inputs)
}
pub fn pull_attributes_for_entities<E, A>(&self,
sqlite: &rusqlite::Connection,
entities: E,
attributes: A) -> Result<BTreeMap<Entid, ValueRc<StructuredMap>>>
where E: IntoIterator<Item=Entid>,
A: IntoIterator<Item=Entid> {
let metadata = self.metadata.lock().unwrap();
let schema = &*metadata.schema;
pull_attributes_for_entities(schema, sqlite, entities, attributes)
.map_err(|e| e.into())
}
pub fn pull_attributes_for_entity<A>(&self,
sqlite: &rusqlite::Connection,
entity: Entid,
attributes: A) -> Result<StructuredMap>
where A: IntoIterator<Item=Entid> {
let metadata = self.metadata.lock().unwrap();
let schema = &*metadata.schema;
pull_attributes_for_entity(schema, sqlite, entity, attributes)
.map_err(|e| e.into())
}
pub fn lookup_values_for_attribute(&self,
sqlite: &rusqlite::Connection,
entity: Entid,

View file

@ -26,6 +26,7 @@ use mentat_query;
use mentat_query_algebrizer;
use mentat_query_parser;
use mentat_query_projector;
use mentat_query_pull;
use mentat_query_translator;
use mentat_sql;
use mentat_tolstoy;
@ -48,6 +49,7 @@ error_chain! {
QueryError(mentat_query_algebrizer::Error, mentat_query_algebrizer::ErrorKind); // Let's not leak the term 'algebrizer'.
QueryParseError(mentat_query_parser::Error, mentat_query_parser::ErrorKind);
ProjectorError(mentat_query_projector::errors::Error, mentat_query_projector::errors::ErrorKind);
PullError(mentat_query_pull::errors::Error, mentat_query_pull::errors::ErrorKind);
TranslatorError(mentat_query_translator::Error, mentat_query_translator::ErrorKind);
SqlError(mentat_sql::Error, mentat_sql::ErrorKind);
TxParseError(mentat_tx_parser::Error, mentat_tx_parser::ErrorKind);

View file

@ -27,6 +27,7 @@ extern crate mentat_query;
extern crate mentat_query_algebrizer;
extern crate mentat_query_parser;
extern crate mentat_query_projector;
extern crate mentat_query_pull;
extern crate mentat_query_translator;
extern crate mentat_sql;
extern crate mentat_tolstoy;
@ -124,6 +125,7 @@ pub use conn::{
Conn,
InProgress,
Metadata,
Pullable,
Queryable,
Syncable,
Store,

View file

@ -93,6 +93,8 @@ pub enum PreparedQuery<'sqlite> {
},
Bound {
statement: rusqlite::Statement<'sqlite>,
schema: Schema,
connection: &'sqlite rusqlite::Connection,
args: Vec<(String, Rc<rusqlite::types::Value>)>,
projector: Box<Projector>,
},
@ -107,10 +109,9 @@ impl<'sqlite> PreparedQuery<'sqlite> {
&mut PreparedQuery::Constant { ref select } => {
select.project_without_rows().map_err(|e| e.into())
},
&mut PreparedQuery::Bound { ref mut statement, ref args, ref projector } => {
&mut PreparedQuery::Bound { ref mut statement, ref schema, ref connection, ref args, ref projector } => {
let rows = run_statement(statement, args)?;
projector
.project(rows)
projector.project(schema, connection, rows)
.map_err(|e| e.into())
}
}
@ -208,7 +209,7 @@ fn fetch_values<'sqlite>
let algebrized = algebrize_query(known, query, None)?;
run_algebrized_query(sqlite, algebrized)
run_algebrized_query(known, sqlite, algebrized)
}
fn lookup_attribute(schema: &Schema, attribute: &NamespacedKeyword) -> Result<KnownEntid> {
@ -327,7 +328,10 @@ fn algebrize_query_str<'query, T>
algebrize_query(known, parsed, inputs)
}
fn run_algebrized_query<'sqlite>(sqlite: &'sqlite rusqlite::Connection, algebrized: AlgebraicQuery) -> QueryExecutionResult {
fn run_algebrized_query<'sqlite>
(known: Known,
sqlite: &'sqlite rusqlite::Connection,
algebrized: AlgebraicQuery) -> QueryExecutionResult {
assert!(algebrized.unbound_variables().is_empty(),
"Unbound variables should be checked by now");
if algebrized.is_known_empty() {
@ -335,17 +339,19 @@ fn run_algebrized_query<'sqlite>(sqlite: &'sqlite rusqlite::Connection, algebriz
return Ok(QueryOutput::empty(&algebrized.find_spec));
}
let select = query_to_select(algebrized)?;
let select = query_to_select(known.schema, algebrized)?;
match select {
ProjectedSelect::Constant(constant) => constant.project_without_rows()
.map_err(|e| e.into()),
ProjectedSelect::Constant(constant) => {
constant.project_without_rows()
.map_err(|e| e.into())
},
ProjectedSelect::Query { query, projector } => {
let SQLQuery { sql, args } = query.to_sql_query()?;
let mut statement = sqlite.prepare(sql.as_str())?;
let rows = run_statement(&mut statement, &args)?;
projector.project(rows).map_err(|e| e.into())
projector.project(known.schema, sqlite, rows).map_err(|e| e.into())
},
}
}
@ -365,7 +371,7 @@ pub fn q_once<'sqlite, 'query, T>
where T: Into<Option<QueryInputs>>
{
let algebrized = algebrize_query_str(known, query, inputs)?;
run_algebrized_query(sqlite, algebrized)
run_algebrized_query(known, sqlite, algebrized)
}
/// Just like `q_once`, but doesn't use any cached values.
@ -379,12 +385,12 @@ pub fn q_uncached<'sqlite, 'schema, 'query, T>
let known = Known::for_schema(schema);
let algebrized = algebrize_query_str(known, query, inputs)?;
run_algebrized_query(sqlite, algebrized)
run_algebrized_query(known, sqlite, algebrized)
}
pub fn q_prepare<'sqlite, 'query, T>
pub fn q_prepare<'sqlite, 'schema, 'cache, 'query, T>
(sqlite: &'sqlite rusqlite::Connection,
known: Known,
known: Known<'schema, 'cache>,
query: &'query str,
inputs: T) -> PreparedResult<'sqlite>
where T: Into<Option<QueryInputs>>
@ -405,7 +411,7 @@ pub fn q_prepare<'sqlite, 'query, T>
});
}
let select = query_to_select(algebrized)?;
let select = query_to_select(known.schema, algebrized)?;
match select {
ProjectedSelect::Constant(constant) => {
Ok(PreparedQuery::Constant {
@ -418,6 +424,8 @@ pub fn q_prepare<'sqlite, 'query, T>
Ok(PreparedQuery::Bound {
statement,
schema: known.schema.clone(),
connection: sqlite,
args,
projector: projector
})
@ -436,7 +444,7 @@ pub fn q_explain<'sqlite, 'query, T>
if algebrized.is_known_empty() {
return Ok(QueryExplanation::KnownEmpty(algebrized.cc.empty_because.unwrap()));
}
match query_to_select(algebrized)? {
match query_to_select(known.schema, algebrized)? {
ProjectedSelect::Constant(_constant) => Ok(QueryExplanation::KnownConstant),
ProjectedSelect::Query { query, projector: _projector } => {
let query = query.to_sql_query()?;

218
tests/pull.rs Normal file
View file

@ -0,0 +1,218 @@
// Copyright 2018 Mozilla
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// this file except in compliance with the License. You may obtain a copy of the
// License at http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
#[macro_use]
extern crate mentat;
extern crate mentat_core;
extern crate mentat_query_pull;
use std::collections::{
BTreeMap,
BTreeSet,
};
use std::path::{
Path,
PathBuf,
};
// TODO: re-export from Mentat.
use mentat_core::{
Binding,
StructuredMap,
ValueRc,
};
use mentat::{
Entid,
HasSchema,
IntoResult,
NamespacedKeyword,
Pullable,
Queryable,
QueryInputs,
Store,
RelResult,
TypedValue,
};
fn fixture_path(rest: &str) -> PathBuf {
let fixtures = Path::new("fixtures/");
fixtures.join(Path::new(rest))
}
#[test]
fn test_simple_pull() {
let beacon;
let capitol;
let beacon_district;
let capitol_district;
let mut store = Store::open("").expect("opened");
{
let mut in_progress = store.begin_transaction().expect("began");
in_progress.import(fixture_path("cities.schema")).expect("transacted schema");
let report = in_progress.import(fixture_path("all_seattle.edn")).expect("transacted data");
// These tempid strings come out of all_seattle.edn.
beacon = *report.tempids.get(&"a17592186045471".to_string()).expect("beacon");
beacon_district = *report.tempids.get(&"a17592186045450".to_string()).expect("beacon district");
capitol = *report.tempids.get(&"a17592186045439".to_string()).expect("capitol");
capitol_district = *report.tempids.get(&"a17592186045438".to_string()).expect("capitol district");
in_progress.commit().expect("committed");
}
let schema = store.conn().current_schema();
let district = schema.get_entid(&kw!(:neighborhood/district)).expect("district").0;
let name = schema.get_entid(&kw!(:neighborhood/name)).expect("name").0;
let attrs: BTreeSet<Entid> = vec![district, name].into_iter().collect();
// Find Beacon Hill and Capitol Hill via query.
let query = r#"[:find [?hood ...]
:where
(or [?hood :neighborhood/name "Beacon Hill"]
[?hood :neighborhood/name "Capitol Hill"])]"#;
let hoods: BTreeSet<Entid> = store.q_once(query, None)
.into_coll_result()
.expect("hoods")
.into_iter()
.map(|b| {
b.val().and_then(|tv| tv.into_entid()).expect("scalar")
})
.collect();
println!("Hoods: {:?}", hoods);
// The query and the tempids agree.
assert_eq!(hoods, vec![beacon, capitol].into_iter().collect());
// Pull attributes of those two neighborhoods.
let pulled = store.begin_read().expect("read")
.pull_attributes_for_entities(hoods, attrs)
.expect("pulled");
// Here's what we expect:
let c: StructuredMap = vec![
(kw!(:neighborhood/name), "Capitol Hill".into()),
(kw!(:neighborhood/district), TypedValue::Ref(capitol_district)),
].into();
let b: StructuredMap = vec![
(kw!(:neighborhood/name), "Beacon Hill".into()),
(kw!(:neighborhood/district), TypedValue::Ref(beacon_district)),
].into();
let expected: BTreeMap<Entid, ValueRc<StructuredMap>>;
expected = vec![(capitol, c.into()), (beacon, b.into())].into_iter().collect();
assert_eq!(pulled, expected);
// Now test pull inside the query itself.
let query = r#"[:find ?hood (pull ?district [:district/name :district/region])
:where
(or [?hood :neighborhood/name "Beacon Hill"]
[?hood :neighborhood/name "Capitol Hill"])
[?hood :neighborhood/district ?district]
:order ?hood]"#;
let results: RelResult<Binding> = store.begin_read().expect("read")
.q_once(query, None)
.into_rel_result()
.expect("results");
let beacon_district: Vec<(NamespacedKeyword, TypedValue)> = vec![
(kw!(:district/name), "Greater Duwamish".into()),
(kw!(:district/region), schema.get_entid(&NamespacedKeyword::new("region", "se")).unwrap().into())
];
let beacon_district: StructuredMap = beacon_district.into();
let capitol_district: Vec<(NamespacedKeyword, TypedValue)> = vec![
(kw!(:district/name), "East".into()),
(kw!(:district/region), schema.get_entid(&NamespacedKeyword::new("region", "e")).unwrap().into())
];
let capitol_district: StructuredMap = capitol_district.into();
let expected = RelResult {
width: 2,
values: vec![
TypedValue::Ref(capitol).into(), capitol_district.into(),
TypedValue::Ref(beacon).into(), beacon_district.into(),
].into(),
};
assert_eq!(results, expected.clone());
// We can also prepare the query.
let reader = store.begin_read().expect("read");
let mut prepared = reader.q_prepare(query, None).expect("prepared");
assert_eq!(prepared.run(None)
.into_rel_result()
.expect("results"),
expected);
// Execute a scalar query where the body is constant.
// TODO: we shouldn't require `:where`; that makes this non-constant!
let query = r#"[:find (pull ?hood [:neighborhood/name]) . :in ?hood
:where [?hood :neighborhood/district _]]"#;
let result = reader.q_once(query, QueryInputs::with_value_sequence(vec![(var!(?hood), TypedValue::Ref(beacon))]))
.into_scalar_result()
.expect("success")
.expect("result");
let expected: StructuredMap = vec![(kw!(:neighborhood/name), TypedValue::from("Beacon Hill"))].into();
assert_eq!(result, expected.into());
// Collect the names and regions of all districts.
let query = r#"[:find [(pull ?district [:district/name :district/region]) ...]
:where [_ :neighborhood/district ?district]]"#;
let results = reader.q_once(query, None)
.into_coll_result()
.expect("result");
let expected: Vec<StructuredMap> = vec![
vec![(kw!(:district/name), TypedValue::from("East")),
(kw!(:district/region), TypedValue::Ref(65556))].into(),
vec![(kw!(:district/name), TypedValue::from("Southwest")),
(kw!(:district/region), TypedValue::Ref(65559))].into(),
vec![(kw!(:district/name), TypedValue::from("Downtown")),
(kw!(:district/region), TypedValue::Ref(65560))].into(),
vec![(kw!(:district/name), TypedValue::from("Greater Duwamish")),
(kw!(:district/region), TypedValue::Ref(65557))].into(),
vec![(kw!(:district/name), TypedValue::from("Ballard")),
(kw!(:district/region), TypedValue::Ref(65561))].into(),
vec![(kw!(:district/name), TypedValue::from("Northeast")),
(kw!(:district/region), TypedValue::Ref(65555))].into(),
vec![(kw!(:district/name), TypedValue::from("Southeast")),
(kw!(:district/region), TypedValue::Ref(65557))].into(),
vec![(kw!(:district/name), TypedValue::from("Northwest")),
(kw!(:district/region), TypedValue::Ref(65561))].into(),
vec![(kw!(:district/name), TypedValue::from("Central")),
(kw!(:district/region), TypedValue::Ref(65556))].into(),
vec![(kw!(:district/name), TypedValue::from("Delridge")),
(kw!(:district/region), TypedValue::Ref(65559))].into(),
vec![(kw!(:district/name), TypedValue::from("Lake Union")),
(kw!(:district/region), TypedValue::Ref(65560))].into(),
vec![(kw!(:district/name), TypedValue::from("Magnolia/Queen Anne")),
(kw!(:district/region), TypedValue::Ref(65560))].into(),
vec![(kw!(:district/name), TypedValue::from("North")),
(kw!(:district/region), TypedValue::Ref(65555))].into(),
];
let expected: Vec<Binding> = expected.into_iter().map(|m| m.into()).collect();
assert_eq!(results, expected);
}
// TEST:
// - Constant query bodies in pull.
// - Values that are present in the cache (=> constant pull, too).
// - Pull for each kind of find spec.
// - That the keys in each map are ValueRc::ptr_eq.
// - Entity presence/absence when the pull expressions don't match anything.
// - Aliases. (No parser support yet.)

View file

@ -23,6 +23,10 @@ use time::{
PreciseTime,
};
use mentat_core::{
StructuredMap,
};
use mentat::{
CacheDirection,
NamespacedKeyword,
@ -423,14 +427,14 @@ impl Repl {
match query_output.results {
QueryResults::Scalar(v) => {
if let Some(val) = v {
writeln!(output, "| {}\t |", &self.binding_as_string(val))?;
writeln!(output, "| {}\t |", &self.binding_as_string(&val))?;
}
},
QueryResults::Tuple(vv) => {
if let Some(vals) = vv {
for val in vals {
write!(output, "| {}\t", self.binding_as_string(val))?;
write!(output, "| {}\t", self.binding_as_string(&val))?;
}
writeln!(output, "|")?;
}
@ -438,14 +442,14 @@ impl Repl {
QueryResults::Coll(vv) => {
for val in vv {
writeln!(output, "| {}\t|", self.binding_as_string(val))?;
writeln!(output, "| {}\t|", self.binding_as_string(&val))?;
}
},
QueryResults::Rel(vvv) => {
for vv in vvv {
for v in vv {
write!(output, "| {}\t", self.binding_as_string(v))?;
write!(output, "| {}\t", self.binding_as_string(&v))?;
}
writeln!(output, "|")?;
}
@ -512,26 +516,53 @@ impl Repl {
Ok(report)
}
fn binding_as_string(&self, value: Binding) -> String {
fn binding_as_string(&self, value: &Binding) -> String {
use self::Binding::*;
match value {
Scalar(v) => self.value_as_string(v),
Map(_) => format!("TODO"),
Vec(_) => format!("TODO"),
&Scalar(ref v) => self.value_as_string(v),
&Map(ref v) => self.map_as_string(v),
&Vec(ref v) => self.vec_as_string(v),
}
}
fn value_as_string(&self, value: TypedValue) -> String {
fn vec_as_string(&self, value: &Vec<Binding>) -> String {
let mut out: String = "[".to_string();
let vals: Vec<String> = value.iter()
.map(|v| self.binding_as_string(v))
.collect();
out.push_str(vals.join(", ").as_str());
out.push_str("]");
out
}
fn map_as_string(&self, value: &StructuredMap) -> String {
let mut out: String = "{".to_string();
let mut first = true;
for (k, v) in value.0.iter() {
if !first {
out.push_str(", ");
first = true;
}
out.push_str(&k.to_string());
out.push_str(" ");
out.push_str(self.binding_as_string(v).as_str());
}
out.push_str("}");
out
}
fn value_as_string(&self, value: &TypedValue) -> String {
use self::TypedValue::*;
match value {
Boolean(b) => if b { "true".to_string() } else { "false".to_string() },
Double(d) => format!("{}", d),
Instant(i) => format!("{}", i),
Keyword(k) => format!("{}", k),
Long(l) => format!("{}", l),
Ref(r) => format!("{}", r),
String(s) => format!("{:?}", s.to_string()),
Uuid(u) => format!("{}", u),
&Boolean(b) => if b { "true".to_string() } else { "false".to_string() },
&Double(d) => format!("{}", d),
&Instant(ref i) => format!("{}", i),
&Keyword(ref k) => format!("{}", k),
&Long(l) => format!("{}", l),
&Ref(r) => format!("{}", r),
&String(ref s) => format!("{:?}", s.to_string()),
&Uuid(ref u) => format!("{}", u),
}
}
}