Refactoring: split up the projector crate. No other code changes.

This commit is contained in:
Richard Newman 2018-03-30 12:19:02 -07:00
parent 39f1d61175
commit 909b2a8be5
8 changed files with 714 additions and 585 deletions

View file

@ -0,0 +1,216 @@
// Copyright 2018 Mozilla
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// this file except in compliance with the License. You may obtain a copy of the
// License at http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
use mentat_core::{
ValueType,
ValueTypeSet,
};
use mentat_query::{
Aggregate,
QueryFunction,
Variable,
};
use mentat_query_algebrizer::{
ColumnName,
ConjoiningClauses,
VariableColumn,
};
use mentat_query_sql::{
ColumnOrExpression,
Expression,
Name,
ProjectedColumn,
};
use errors::{
ErrorKind,
Result,
};
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum SimpleAggregationOp {
Avg,
Count,
Max,
Min,
Sum,
}
impl SimpleAggregationOp {
pub(crate) fn to_sql(&self) -> &'static str {
use self::SimpleAggregationOp::*;
match self {
&Avg => "avg",
&Count => "count",
&Max => "max",
&Min => "min",
&Sum => "sum",
}
}
fn for_function(function: &QueryFunction) -> Option<SimpleAggregationOp> {
match function.0.plain_name() {
"avg" => Some(SimpleAggregationOp::Avg),
"count" => Some(SimpleAggregationOp::Count),
"max" => Some(SimpleAggregationOp::Max),
"min" => Some(SimpleAggregationOp::Min),
"sum" => Some(SimpleAggregationOp::Sum),
_ => None,
}
}
/// With knowledge of the types to which a variable might be bound,
/// return a `Result` to determine whether this aggregation is suitable.
/// For example, it's valid to take the `Avg` of `{Double, Long}`, invalid
/// to take `Sum` of `{Instant}`, valid to take (lexicographic) `Max` of `{String}`,
/// but invalid to take `Max` of `{Uuid, String}`.
///
/// The returned type is the type of the result of the aggregation.
pub(crate) fn is_applicable_to_types(&self, possibilities: ValueTypeSet) -> Result<ValueType> {
use self::SimpleAggregationOp::*;
if possibilities.is_empty() {
bail!(ErrorKind::CannotProjectImpossibleBinding(*self))
}
match self {
// One can always count results.
&Count => Ok(ValueType::Long),
// Only numeric types can be averaged or summed.
&Avg => {
if possibilities.is_only_numeric() {
// The mean of a set of numeric values will always, for our purposes, be a double.
Ok(ValueType::Double)
} else {
bail!(ErrorKind::CannotApplyAggregateOperationToTypes(*self, possibilities))
}
},
&Sum => {
if possibilities.is_only_numeric() {
if possibilities.contains(ValueType::Double) {
Ok(ValueType::Double)
} else {
// TODO: BigInt.
Ok(ValueType::Long)
}
} else {
bail!(ErrorKind::CannotApplyAggregateOperationToTypes(*self, possibilities))
}
},
&Max | &Min => {
if possibilities.is_unit() {
use ValueType::*;
let the_type = possibilities.exemplar().expect("a type");
match the_type {
// These types are numerically ordered.
Double | Long | Instant => Ok(the_type),
// Boolean: false < true.
Boolean => Ok(the_type),
// String: lexicographic order.
String => Ok(the_type),
// These types are unordered.
Keyword | Ref | Uuid => {
bail!(ErrorKind::CannotApplyAggregateOperationToTypes(*self, possibilities))
},
}
} else {
// It cannot be empty -- we checked.
// The only types that are valid to compare cross-type are numbers.
if possibilities.is_only_numeric() {
// Note that if the max/min is a Long, it will be returned as a Double!
if possibilities.contains(ValueType::Double) {
Ok(ValueType::Double)
} else {
// TODO: BigInt.
Ok(ValueType::Long)
}
} else {
bail!(ErrorKind::CannotApplyAggregateOperationToTypes(*self, possibilities))
}
}
},
}
}
}
pub(crate) struct SimpleAggregate {
pub op: SimpleAggregationOp,
pub var: Variable,
}
impl SimpleAggregate {
pub(crate) fn column_name(&self) -> Name {
format!("({} {})", self.op.to_sql(), self.var.name())
}
pub(crate) fn use_static_value(&self) -> bool {
use self::SimpleAggregationOp::*;
match self.op {
Avg | Max | Min => true,
Count | Sum => false,
}
}
}
pub(crate) trait SimpleAggregation {
fn to_simple(&self) -> Option<SimpleAggregate>;
}
impl SimpleAggregation for Aggregate {
fn to_simple(&self) -> Option<SimpleAggregate> {
if self.args.len() != 1 {
return None;
}
self.args[0]
.as_variable()
.and_then(|v| SimpleAggregationOp::for_function(&self.func)
.map(|op| SimpleAggregate { op, var: v.clone(), }))
}
}
/// Returns two values:
/// - The `ColumnOrExpression` to use in the query. This will always refer to other
/// variables by name; never to a datoms column.
/// - The known type of that value.
pub(crate) fn projected_column_for_simple_aggregate(simple: &SimpleAggregate, cc: &ConjoiningClauses) -> Result<(ProjectedColumn, ValueType)> {
let known_types = cc.known_type_set(&simple.var);
let return_type = simple.op.is_applicable_to_types(known_types)?;
let projected_column_or_expression =
if let Some(value) = cc.bound_value(&simple.var) {
// Oh, we already know the value!
if simple.use_static_value() {
// We can statically compute the aggregate result for some operators -- not count or
// sum, but avg/max/min are OK.
ColumnOrExpression::Value(value)
} else {
let expression = Expression::Unary {
sql_op: simple.op.to_sql(),
arg: ColumnOrExpression::Value(value),
};
ColumnOrExpression::Expression(Box::new(expression), return_type)
}
} else {
// The common case: the values are bound during execution.
let name = VariableColumn::Variable(simple.var.clone()).column_name();
let expression = Expression::Unary {
sql_op: simple.op.to_sql(),
arg: ColumnOrExpression::ExistingColumn(name),
};
ColumnOrExpression::Expression(Box::new(expression), return_type)
};
Ok((ProjectedColumn(projected_column_or_expression, simple.column_name()), return_type))
}

View file

@ -0,0 +1,72 @@
// Copyright 2018 Mozilla
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// this file except in compliance with the License. You may obtain a copy of the
// License at http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
use rusqlite;
use mentat_core::{
ValueTypeSet,
};
use mentat_db;
use mentat_query::{
PlainSymbol,
};
use aggregates::{
SimpleAggregationOp,
};
error_chain! {
types {
Error, ErrorKind, ResultExt, Result;
}
errors {
/// We're just not done yet. Message that the feature is recognized but not yet
/// implemented.
NotYetImplemented(t: String) {
description("not yet implemented")
display("not yet implemented: {}", t)
}
CannotProjectImpossibleBinding(op: SimpleAggregationOp) {
description("no possible types for variable in projection list")
display("no possible types for value provided to {:?}", op)
}
CannotApplyAggregateOperationToTypes(op: SimpleAggregationOp, types: ValueTypeSet) {
description("cannot apply projection operation to types")
display("cannot apply projection operation {:?} to types {:?}", op, types)
}
UnboundVariable(var: PlainSymbol) {
description("cannot project unbound variable")
display("cannot project unbound variable {:?}", var)
}
NoTypeAvailableForVariable(var: PlainSymbol) {
description("cannot find type for variable")
display("cannot find type for variable {:?}", var)
}
UnexpectedResultsType(actual: &'static str, expected: &'static str) {
description("unexpected query results type")
display("expected {}, got {}", expected, actual)
}
AmbiguousAggregates(min_max_count: usize, corresponding_count: usize) {
description("ambiguous aggregates")
display("min/max expressions: {} (max 1), corresponding: {}", min_max_count, corresponding_count)
}
}
foreign_links {
Rusqlite(rusqlite::Error);
}
links {
DbError(mentat_db::Error, mentat_db::ErrorKind);
}
}

View file

@ -28,21 +28,14 @@ use std::iter;
use std::rc::Rc;
use indexmap::{
IndexSet,
};
use rusqlite::{
Row,
Rows,
};
use mentat_core::{
SQLValueType,
SQLValueTypeSet,
TypedValue,
ValueType,
ValueTypeSet,
ValueTypeTag,
};
@ -55,79 +48,43 @@ use mentat_db::{
};
use mentat_query::{
Aggregate,
Element,
FindSpec,
Limit,
PlainSymbol,
QueryFunction,
Variable,
};
use mentat_query_algebrizer::{
AlgebraicQuery,
ColumnName,
ConjoiningClauses,
QualifiedAlias,
VariableBindings,
VariableColumn,
};
use mentat_query_sql::{
ColumnOrExpression,
Expression,
GroupBy,
Name,
Projection,
ProjectedColumn,
};
error_chain! {
types {
Error, ErrorKind, ResultExt, Result;
}
mod aggregates;
mod project;
pub mod errors;
errors {
/// We're just not done yet. Message that the feature is recognized but not yet
/// implemented.
NotYetImplemented(t: String) {
description("not yet implemented")
display("not yet implemented: {}", t)
}
CannotProjectImpossibleBinding(op: SimpleAggregationOp) {
description("no possible types for variable in projection list")
display("no possible types for value provided to {:?}", op)
}
CannotApplyAggregateOperationToTypes(op: SimpleAggregationOp, types: ValueTypeSet) {
description("cannot apply projection operation to types")
display("cannot apply projection operation {:?} to types {:?}", op, types)
}
UnboundVariable(var: PlainSymbol) {
description("cannot project unbound variable")
display("cannot project unbound variable {:?}", var)
}
NoTypeAvailableForVariable(var: PlainSymbol) {
description("cannot find type for variable")
display("cannot find type for variable {:?}", var)
}
UnexpectedResultsType(actual: &'static str, expected: &'static str) {
description("unexpected query results type")
display("expected {}, got {}", expected, actual)
}
AmbiguousAggregates(min_max_count: usize, corresponding_count: usize) {
description("ambiguous aggregates")
display("min/max expressions: {} (max 1), corresponding: {}", min_max_count, corresponding_count)
}
}
pub use aggregates::{
SimpleAggregationOp,
};
foreign_links {
Rusqlite(rusqlite::Error);
}
use project::{
ProjectedElements,
project_elements,
};
links {
DbError(mentat_db::Error, mentat_db::ErrorKind);
}
}
pub use project::{
projected_column_for_var,
};
use errors::{
ErrorKind,
Result,
};
#[derive(Debug, PartialEq, Eq)]
pub struct QueryOutput {
@ -353,525 +310,6 @@ impl TypedIndex {
}
}
fn cc_column(cc: &ConjoiningClauses, var: &Variable) -> Result<QualifiedAlias> {
cc.column_bindings
.get(var)
.and_then(|cols| cols.get(0).cloned())
.ok_or_else(|| ErrorKind::UnboundVariable(var.name()).into())
}
fn candidate_column(cc: &ConjoiningClauses, var: &Variable) -> Result<(ColumnOrExpression, Name)> {
// Every variable should be bound by the top-level CC to at least
// one column in the query. If that constraint is violated it's a
// bug in our code, so it's appropriate to panic here.
cc_column(cc, var)
.map(|qa| {
let name = VariableColumn::Variable(var.clone()).column_name();
(ColumnOrExpression::Column(qa), name)
})
}
fn candidate_type_column(cc: &ConjoiningClauses, var: &Variable) -> Result<(ColumnOrExpression, Name)> {
cc.extracted_types
.get(var)
.cloned()
.map(|alias| {
let type_name = VariableColumn::VariableTypeTag(var.clone()).column_name();
(ColumnOrExpression::Column(alias), type_name)
})
.ok_or_else(|| ErrorKind::UnboundVariable(var.name()).into())
}
/// Return the projected column -- that is, a value or SQL column and an associated name -- for a
/// given variable. Also return the type.
/// Callers are expected to determine whether to project a type tag as an additional SQL column.
pub fn projected_column_for_var(var: &Variable, cc: &ConjoiningClauses) -> Result<(ProjectedColumn, ValueTypeSet)> {
if let Some(value) = cc.bound_value(&var) {
// If we already know the value, then our lives are easy.
let tag = value.value_type();
let name = VariableColumn::Variable(var.clone()).column_name();
Ok((ProjectedColumn(ColumnOrExpression::Value(value.clone()), name), ValueTypeSet::of_one(tag)))
} else {
// If we don't, then the CC *must* have bound the variable.
let (column, name) = candidate_column(cc, var)?;
Ok((ProjectedColumn(column, name), cc.known_type_set(var)))
}
}
/// Returns two values:
/// - The `ColumnOrExpression` to use in the query. This will always refer to other
/// variables by name; never to a datoms column.
/// - The known type of that value.
fn projected_column_for_simple_aggregate(simple: &SimpleAggregate, cc: &ConjoiningClauses) -> Result<(ProjectedColumn, ValueType)> {
let known_types = cc.known_type_set(&simple.var);
let return_type = simple.op.is_applicable_to_types(known_types)?;
let projected_column_or_expression =
if let Some(value) = cc.bound_value(&simple.var) {
// Oh, we already know the value!
if simple.use_static_value() {
// We can statically compute the aggregate result for some operators -- not count or
// sum, but avg/max/min are OK.
ColumnOrExpression::Value(value)
} else {
let expression = Expression::Unary {
sql_op: simple.op.to_sql(),
arg: ColumnOrExpression::Value(value),
};
ColumnOrExpression::Expression(Box::new(expression), return_type)
}
} else {
// The common case: the values are bound during execution.
let name = VariableColumn::Variable(simple.var.clone()).column_name();
let expression = Expression::Unary {
sql_op: simple.op.to_sql(),
arg: ColumnOrExpression::ExistingColumn(name),
};
ColumnOrExpression::Expression(Box::new(expression), return_type)
};
Ok((ProjectedColumn(projected_column_or_expression, simple.column_name()), return_type))
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum SimpleAggregationOp {
Avg,
Count,
Max,
Min,
Sum,
}
impl SimpleAggregationOp {
fn to_sql(&self) -> &'static str {
use SimpleAggregationOp::*;
match self {
&Avg => "avg",
&Count => "count",
&Max => "max",
&Min => "min",
&Sum => "sum",
}
}
fn for_function(function: &QueryFunction) -> Option<SimpleAggregationOp> {
match function.0.plain_name() {
"avg" => Some(SimpleAggregationOp::Avg),
"count" => Some(SimpleAggregationOp::Count),
"max" => Some(SimpleAggregationOp::Max),
"min" => Some(SimpleAggregationOp::Min),
"sum" => Some(SimpleAggregationOp::Sum),
_ => None,
}
}
/// With knowledge of the types to which a variable might be bound,
/// return a `Result` to determine whether this aggregation is suitable.
/// For example, it's valid to take the `Avg` of `{Double, Long}`, invalid
/// to take `Sum` of `{Instant}`, valid to take (lexicographic) `Max` of `{String}`,
/// but invalid to take `Max` of `{Uuid, String}`.
///
/// The returned type is the type of the result of the aggregation.
fn is_applicable_to_types(&self, possibilities: ValueTypeSet) -> Result<ValueType> {
use SimpleAggregationOp::*;
if possibilities.is_empty() {
bail!(ErrorKind::CannotProjectImpossibleBinding(*self))
}
match self {
// One can always count results.
&Count => Ok(ValueType::Long),
// Only numeric types can be averaged or summed.
&Avg => {
if possibilities.is_only_numeric() {
// The mean of a set of numeric values will always, for our purposes, be a double.
Ok(ValueType::Double)
} else {
bail!(ErrorKind::CannotApplyAggregateOperationToTypes(*self, possibilities))
}
},
&Sum => {
if possibilities.is_only_numeric() {
if possibilities.contains(ValueType::Double) {
Ok(ValueType::Double)
} else {
// TODO: BigInt.
Ok(ValueType::Long)
}
} else {
bail!(ErrorKind::CannotApplyAggregateOperationToTypes(*self, possibilities))
}
},
&Max | &Min => {
if possibilities.is_unit() {
use ValueType::*;
let the_type = possibilities.exemplar().expect("a type");
match the_type {
// These types are numerically ordered.
Double | Long | Instant => Ok(the_type),
// Boolean: false < true.
Boolean => Ok(the_type),
// String: lexicographic order.
String => Ok(the_type),
// These types are unordered.
Keyword | Ref | Uuid => {
bail!(ErrorKind::CannotApplyAggregateOperationToTypes(*self, possibilities))
},
}
} else {
// It cannot be empty -- we checked.
// The only types that are valid to compare cross-type are numbers.
if possibilities.is_only_numeric() {
// Note that if the max/min is a Long, it will be returned as a Double!
if possibilities.contains(ValueType::Double) {
Ok(ValueType::Double)
} else {
// TODO: BigInt.
Ok(ValueType::Long)
}
} else {
bail!(ErrorKind::CannotApplyAggregateOperationToTypes(*self, possibilities))
}
}
},
}
}
}
struct SimpleAggregate {
op: SimpleAggregationOp,
var: Variable,
}
impl SimpleAggregate {
fn column_name(&self) -> Name {
format!("({} {})", self.op.to_sql(), self.var.name())
}
fn use_static_value(&self) -> bool {
use SimpleAggregationOp::*;
match self.op {
Avg | Max | Min => true,
Count | Sum => false,
}
}
}
trait SimpleAggregation {
fn to_simple(&self) -> Option<SimpleAggregate>;
}
impl SimpleAggregation for Aggregate {
fn to_simple(&self) -> Option<SimpleAggregate> {
if self.args.len() != 1 {
return None;
}
self.args[0]
.as_variable()
.and_then(|v| SimpleAggregationOp::for_function(&self.func)
.map(|op| SimpleAggregate { op, var: v.clone(), }))
}
}
/// An internal temporary struct to pass between the projection 'walk' and the
/// resultant projector.
/// Projection accumulates four things:
/// - Two SQL projection lists. We need two because aggregate queries are nested
/// in order to apply DISTINCT to values prior to aggregation.
/// - A collection of templates for the projector to use to extract values.
/// - A list of columns to use for grouping. Grouping is a property of the projection!
struct ProjectedElements {
sql_projection: Projection,
pre_aggregate_projection: Option<Projection>,
templates: Vec<TypedIndex>,
group_by: Vec<GroupBy>,
}
/// Walk an iterator of `Element`s, collecting projector templates and columns.
///
/// Returns a `ProjectedElements`, which combines SQL projections
/// and a `Vec` of `TypedIndex` 'keys' to use when looking up values.
///
/// Callers must ensure that every `Element` is distinct -- a query like
///
/// ```edn
/// [:find ?x ?x :where [?x _ _]]
/// ```
///
/// should fail to parse. See #358.
fn project_elements<'a, I: IntoIterator<Item = &'a Element>>(
count: usize,
elements: I,
query: &AlgebraicQuery) -> Result<ProjectedElements> {
// Give a little padding for type tags.
let mut inner_projection = Vec::with_capacity(count + 2);
// Everything in the outer query will _either_ be an aggregate operation
// _or_ a reference to a name projected from the inner.
// We'll expand them later.
let mut outer_projection: Vec<Either<Name, ProjectedColumn>> = Vec::with_capacity(count + 2);
let mut i: i32 = 0;
let mut min_max_count: usize = 0;
let mut corresponding_count: usize = 0;
let mut templates = vec![];
let mut aggregates = false;
// Any variable that appears intact in the :find clause, not inside an aggregate expression.
// "Query variables not in aggregate expressions will group the results and appear intact
// in the result."
// We use an ordered set here so that we group in the correct order.
let mut outer_variables = IndexSet::new();
// Any variable that we are projecting from the inner query.
let mut inner_variables = BTreeSet::new();
for e in elements {
if let &Element::Corresponding(_) = e {
corresponding_count += 1;
}
match e {
// Each time we come across a variable, we push a SQL column
// into the SQL projection, aliased to the name of the variable,
// and we push an annotated index into the projector.
&Element::Variable(ref var) |
&Element::Corresponding(ref var) => {
if outer_variables.contains(var) {
eprintln!("Warning: duplicate variable {} in query.", var);
}
// TODO: it's an error to have `[:find ?x (the ?x) …]`.
outer_variables.insert(var.clone());
inner_variables.insert(var.clone());
let (projected_column, type_set) = projected_column_for_var(&var, &query.cc)?;
outer_projection.push(Either::Left(projected_column.1.clone()));
inner_projection.push(projected_column);
if let Some(tag) = type_set.unique_type_tag() {
templates.push(TypedIndex::Known(i, tag));
i += 1; // We used one SQL column.
} else {
templates.push(TypedIndex::Unknown(i, i + 1));
i += 2; // We used two SQL columns.
// Also project the type from the SQL query.
let (type_column, type_name) = candidate_type_column(&query.cc, &var)?;
inner_projection.push(ProjectedColumn(type_column, type_name.clone()));
outer_projection.push(Either::Left(type_name));
}
},
&Element::Aggregate(ref a) => {
if let Some(simple) = a.to_simple() {
aggregates = true;
use SimpleAggregationOp::*;
match simple.op {
Max | Min => {
min_max_count += 1;
},
Avg | Count | Sum => (),
}
// When we encounter a simple aggregate -- one in which the aggregation can be
// implemented in SQL, on a single variable -- we just push the SQL aggregation op.
// We must ensure the following:
// - There's a column for the var.
// - The type of the var is known to be restricted to a sensible input set
// (not necessarily a single type, but e.g., all vals must be Double or Long).
// - The type set must be appropriate for the operation. E.g., `Sum` is not a
// meaningful operation on instants.
let (projected_column, return_type) = projected_column_for_simple_aggregate(&simple, &query.cc)?;
outer_projection.push(Either::Right(projected_column));
if !inner_variables.contains(&simple.var) {
inner_variables.insert(simple.var.clone());
let (projected_column, _type_set) = projected_column_for_var(&simple.var, &query.cc)?;
inner_projection.push(projected_column);
if query.cc.known_type_set(&simple.var).unique_type_tag().is_none() {
// Also project the type from the SQL query.
let (type_column, type_name) = candidate_type_column(&query.cc, &simple.var)?;
inner_projection.push(ProjectedColumn(type_column, type_name.clone()));
}
}
// We might regret using the type tag here instead of the `ValueType`.
templates.push(TypedIndex::Known(i, return_type.value_type_tag()));
i += 1;
} else {
// TODO: complex aggregates.
bail!(ErrorKind::NotYetImplemented("complex aggregates".into()));
}
},
}
}
match (min_max_count, corresponding_count) {
(0, 0) | (_, 0) => {},
(0, _) => {
eprintln!("Warning: used `(the ?var)` without `min` or `max`.");
},
(1, _) => {
// This is the success case!
},
(n, c) => {
bail!(ErrorKind::AmbiguousAggregates(n, c));
},
}
// Anything used in ORDER BY (which we're given in `named_projection`)
// needs to be in the SQL column list so we can refer to it by name.
//
// They don't affect projection.
//
// If a variable is of a non-fixed type, also project the type tag column, so we don't
// accidentally unify across types when considering uniqueness!
for var in query.named_projection.iter() {
if outer_variables.contains(var) {
continue;
}
// If it's a fixed value, we need do nothing further.
if query.cc.is_value_bound(&var) {
continue;
}
let already_inner = inner_variables.contains(&var);
let (column, name) = candidate_column(&query.cc, &var)?;
if !already_inner {
inner_projection.push(ProjectedColumn(column, name.clone()));
inner_variables.insert(var.clone());
}
outer_projection.push(Either::Left(name));
outer_variables.insert(var.clone());
// We don't care if a column has a single _type_, we care if it has a single type _tag_,
// because that's what we'll use if we're projecting. E.g., Long and Double.
// Single type implies single type tag, and is cheaper, so we check that first.
let types = query.cc.known_type_set(&var);
if !types.has_unique_type_tag() {
let (type_column, type_name) = candidate_type_column(&query.cc, &var)?;
if !already_inner {
inner_projection.push(ProjectedColumn(type_column, type_name.clone()));
}
outer_projection.push(Either::Left(type_name));
}
}
if !aggregates {
// We're done -- we never need to group unless we're aggregating.
return Ok(ProjectedElements {
sql_projection: Projection::Columns(inner_projection),
pre_aggregate_projection: None,
templates,
group_by: vec![],
});
}
// OK, on to aggregates.
// We need to produce two SQL projection lists: one for an inner query and one for the outer.
//
// The inner serves these purposes:
// - Projecting variables to avoid duplicates being elided. (:with)
// - Making bindings available to the outermost query for projection, ordering, and grouping.
//
// The outer is consumed by the projector.
//
// We will also be producing:
// - A GROUP BY list to group the output of the inner query by non-aggregate variables
// so that it can be correctly aggregated.
// Turn this collection of vars into a collection of columns from the query.
// We don't allow grouping on anything but a variable bound in the query.
// We group by tag if necessary.
let mut group_by = Vec::with_capacity(outer_variables.len() + 2);
for var in outer_variables.into_iter() {
if query.cc.is_value_bound(&var) {
continue;
}
// The GROUP BY goes outside, but it needs every variable and type tag to be
// projected from inside. Collect in both directions here.
let name = VariableColumn::Variable(var.clone()).column_name();
group_by.push(GroupBy::ProjectedColumn(name));
let needs_type_projection = !query.cc.known_type_set(&var).has_unique_type_tag();
let already_inner = inner_variables.contains(&var);
if !already_inner {
let (column, name) = candidate_column(&query.cc, &var)?;
inner_projection.push(ProjectedColumn(column, name.clone()));
}
if needs_type_projection {
let type_name = VariableColumn::VariableTypeTag(var.clone()).column_name();
if !already_inner {
let type_col = query.cc
.extracted_types
.get(&var)
.cloned()
.ok_or_else(|| ErrorKind::NoTypeAvailableForVariable(var.name().clone()))?;
inner_projection.push(ProjectedColumn(ColumnOrExpression::Column(type_col), type_name.clone()));
}
group_by.push(GroupBy::ProjectedColumn(type_name));
};
}
for var in query.with.iter() {
// We never need to project a constant.
if query.cc.is_value_bound(&var) {
continue;
}
// We don't need to add inner projections for :with if they are already there.
if !inner_variables.contains(&var) {
let (projected_column, type_set) = projected_column_for_var(&var, &query.cc)?;
inner_projection.push(projected_column);
if type_set.unique_type_tag().is_none() {
// Also project the type from the SQL query.
let (type_column, type_name) = candidate_type_column(&query.cc, &var)?;
inner_projection.push(ProjectedColumn(type_column, type_name.clone()));
}
}
}
// At this point we know we have a double-layer projection. Collect the outer.
//
// If we have an inner and outer layer, the inner layer will name its
// variables, and the outer will re-project them.
// If we only have one layer, then the outer will do the naming.
// (We could try to not use names in the inner query, but then what would we do for
// `ground` and known values?)
// Walk the projection, switching the outer columns to use the inner names.
let outer_projection = outer_projection.into_iter().map(|c| {
match c {
Either::Left(name) => {
ProjectedColumn(ColumnOrExpression::ExistingColumn(name.clone()),
name)
},
Either::Right(pc) => pc,
}
}).collect();
Ok(ProjectedElements {
sql_projection: Projection::Columns(outer_projection),
pre_aggregate_projection: Some(Projection::Columns(inner_projection)),
templates,
group_by,
})
}
pub trait Projector {
fn project<'stmt>(&self, rows: Rows<'stmt>) -> Result<QueryOutput>;
fn columns<'s>(&'s self) -> Box<Iterator<Item=&Element> + 's>;

View file

@ -0,0 +1,403 @@
// Copyright 2018 Mozilla
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// this file except in compliance with the License. You may obtain a copy of the
// License at http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
use std::collections::{
BTreeSet,
};
use indexmap::{
IndexSet,
};
use mentat_core::{
SQLValueType,
SQLValueTypeSet,
ValueTypeSet,
};
use mentat_core::util::{
Either,
};
use mentat_query::{
Element,
Variable,
};
use mentat_query_algebrizer::{
AlgebraicQuery,
ColumnName,
ConjoiningClauses,
QualifiedAlias,
VariableColumn,
};
use mentat_query_sql::{
ColumnOrExpression,
GroupBy,
Name,
Projection,
ProjectedColumn,
};
use aggregates::{
SimpleAggregation,
projected_column_for_simple_aggregate,
};
use errors::{
ErrorKind,
Result,
};
use super::{
TypedIndex,
};
/// An internal temporary struct to pass between the projection 'walk' and the
/// resultant projector.
/// Projection accumulates four things:
/// - Two SQL projection lists. We need two because aggregate queries are nested
/// in order to apply DISTINCT to values prior to aggregation.
/// - A collection of templates for the projector to use to extract values.
/// - A list of columns to use for grouping. Grouping is a property of the projection!
pub(crate) struct ProjectedElements {
pub sql_projection: Projection,
pub pre_aggregate_projection: Option<Projection>,
pub templates: Vec<TypedIndex>,
pub group_by: Vec<GroupBy>,
}
fn candidate_type_column(cc: &ConjoiningClauses, var: &Variable) -> Result<(ColumnOrExpression, Name)> {
cc.extracted_types
.get(var)
.cloned()
.map(|alias| {
let type_name = VariableColumn::VariableTypeTag(var.clone()).column_name();
(ColumnOrExpression::Column(alias), type_name)
})
.ok_or_else(|| ErrorKind::UnboundVariable(var.name()).into())
}
fn cc_column(cc: &ConjoiningClauses, var: &Variable) -> Result<QualifiedAlias> {
cc.column_bindings
.get(var)
.and_then(|cols| cols.get(0).cloned())
.ok_or_else(|| ErrorKind::UnboundVariable(var.name()).into())
}
fn candidate_column(cc: &ConjoiningClauses, var: &Variable) -> Result<(ColumnOrExpression, Name)> {
// Every variable should be bound by the top-level CC to at least
// one column in the query. If that constraint is violated it's a
// bug in our code, so it's appropriate to panic here.
cc_column(cc, var)
.map(|qa| {
let name = VariableColumn::Variable(var.clone()).column_name();
(ColumnOrExpression::Column(qa), name)
})
}
/// Return the projected column -- that is, a value or SQL column and an associated name -- for a
/// given variable. Also return the type.
/// Callers are expected to determine whether to project a type tag as an additional SQL column.
pub fn projected_column_for_var(var: &Variable, cc: &ConjoiningClauses) -> Result<(ProjectedColumn, ValueTypeSet)> {
if let Some(value) = cc.bound_value(&var) {
// If we already know the value, then our lives are easy.
let tag = value.value_type();
let name = VariableColumn::Variable(var.clone()).column_name();
Ok((ProjectedColumn(ColumnOrExpression::Value(value.clone()), name), ValueTypeSet::of_one(tag)))
} else {
// If we don't, then the CC *must* have bound the variable.
let (column, name) = candidate_column(cc, var)?;
Ok((ProjectedColumn(column, name), cc.known_type_set(var)))
}
}
/// Walk an iterator of `Element`s, collecting projector templates and columns.
///
/// Returns a `ProjectedElements`, which combines SQL projections
/// and a `Vec` of `TypedIndex` 'keys' to use when looking up values.
///
/// Callers must ensure that every `Element` is distinct -- a query like
///
/// ```edn
/// [:find ?x ?x :where [?x _ _]]
/// ```
///
/// should fail to parse. See #358.
pub(crate) fn project_elements<'a, I: IntoIterator<Item = &'a Element>>(
count: usize,
elements: I,
query: &AlgebraicQuery) -> Result<ProjectedElements> {
// Give a little padding for type tags.
let mut inner_projection = Vec::with_capacity(count + 2);
// Everything in the outer query will _either_ be an aggregate operation
// _or_ a reference to a name projected from the inner.
// We'll expand them later.
let mut outer_projection: Vec<Either<Name, ProjectedColumn>> = Vec::with_capacity(count + 2);
let mut i: i32 = 0;
let mut min_max_count: usize = 0;
let mut corresponding_count: usize = 0;
let mut templates = vec![];
let mut aggregates = false;
// Any variable that appears intact in the :find clause, not inside an aggregate expression.
// "Query variables not in aggregate expressions will group the results and appear intact
// in the result."
// We use an ordered set here so that we group in the correct order.
let mut outer_variables = IndexSet::new();
// Any variable that we are projecting from the inner query.
let mut inner_variables = BTreeSet::new();
for e in elements {
if let &Element::Corresponding(_) = e {
corresponding_count += 1;
}
match e {
// Each time we come across a variable, we push a SQL column
// into the SQL projection, aliased to the name of the variable,
// and we push an annotated index into the projector.
&Element::Variable(ref var) |
&Element::Corresponding(ref var) => {
if outer_variables.contains(var) {
eprintln!("Warning: duplicate variable {} in query.", var);
}
// TODO: it's an error to have `[:find ?x (the ?x) …]`.
outer_variables.insert(var.clone());
inner_variables.insert(var.clone());
let (projected_column, type_set) = projected_column_for_var(&var, &query.cc)?;
outer_projection.push(Either::Left(projected_column.1.clone()));
inner_projection.push(projected_column);
if let Some(tag) = type_set.unique_type_tag() {
templates.push(TypedIndex::Known(i, tag));
i += 1; // We used one SQL column.
} else {
templates.push(TypedIndex::Unknown(i, i + 1));
i += 2; // We used two SQL columns.
// Also project the type from the SQL query.
let (type_column, type_name) = candidate_type_column(&query.cc, &var)?;
inner_projection.push(ProjectedColumn(type_column, type_name.clone()));
outer_projection.push(Either::Left(type_name));
}
},
&Element::Aggregate(ref a) => {
if let Some(simple) = a.to_simple() {
aggregates = true;
use aggregates::SimpleAggregationOp::*;
match simple.op {
Max | Min => {
min_max_count += 1;
},
Avg | Count | Sum => (),
}
// When we encounter a simple aggregate -- one in which the aggregation can be
// implemented in SQL, on a single variable -- we just push the SQL aggregation op.
// We must ensure the following:
// - There's a column for the var.
// - The type of the var is known to be restricted to a sensible input set
// (not necessarily a single type, but e.g., all vals must be Double or Long).
// - The type set must be appropriate for the operation. E.g., `Sum` is not a
// meaningful operation on instants.
let (projected_column, return_type) = projected_column_for_simple_aggregate(&simple, &query.cc)?;
outer_projection.push(Either::Right(projected_column));
if !inner_variables.contains(&simple.var) {
inner_variables.insert(simple.var.clone());
let (projected_column, _type_set) = projected_column_for_var(&simple.var, &query.cc)?;
inner_projection.push(projected_column);
if query.cc.known_type_set(&simple.var).unique_type_tag().is_none() {
// Also project the type from the SQL query.
let (type_column, type_name) = candidate_type_column(&query.cc, &simple.var)?;
inner_projection.push(ProjectedColumn(type_column, type_name.clone()));
}
}
// We might regret using the type tag here instead of the `ValueType`.
templates.push(TypedIndex::Known(i, return_type.value_type_tag()));
i += 1;
} else {
// TODO: complex aggregates.
bail!(ErrorKind::NotYetImplemented("complex aggregates".into()));
}
},
}
}
match (min_max_count, corresponding_count) {
(0, 0) | (_, 0) => {},
(0, _) => {
eprintln!("Warning: used `(the ?var)` without `min` or `max`.");
},
(1, _) => {
// This is the success case!
},
(n, c) => {
bail!(ErrorKind::AmbiguousAggregates(n, c));
},
}
// Anything used in ORDER BY (which we're given in `named_projection`)
// needs to be in the SQL column list so we can refer to it by name.
//
// They don't affect projection.
//
// If a variable is of a non-fixed type, also project the type tag column, so we don't
// accidentally unify across types when considering uniqueness!
for var in query.named_projection.iter() {
if outer_variables.contains(var) {
continue;
}
// If it's a fixed value, we need do nothing further.
if query.cc.is_value_bound(&var) {
continue;
}
let already_inner = inner_variables.contains(&var);
let (column, name) = candidate_column(&query.cc, &var)?;
if !already_inner {
inner_projection.push(ProjectedColumn(column, name.clone()));
inner_variables.insert(var.clone());
}
outer_projection.push(Either::Left(name));
outer_variables.insert(var.clone());
// We don't care if a column has a single _type_, we care if it has a single type _tag_,
// because that's what we'll use if we're projecting. E.g., Long and Double.
// Single type implies single type tag, and is cheaper, so we check that first.
let types = query.cc.known_type_set(&var);
if !types.has_unique_type_tag() {
let (type_column, type_name) = candidate_type_column(&query.cc, &var)?;
if !already_inner {
inner_projection.push(ProjectedColumn(type_column, type_name.clone()));
}
outer_projection.push(Either::Left(type_name));
}
}
if !aggregates {
// We're done -- we never need to group unless we're aggregating.
return Ok(ProjectedElements {
sql_projection: Projection::Columns(inner_projection),
pre_aggregate_projection: None,
templates,
group_by: vec![],
});
}
// OK, on to aggregates.
// We need to produce two SQL projection lists: one for an inner query and one for the outer.
//
// The inner serves these purposes:
// - Projecting variables to avoid duplicates being elided. (:with)
// - Making bindings available to the outermost query for projection, ordering, and grouping.
//
// The outer is consumed by the projector.
//
// We will also be producing:
// - A GROUP BY list to group the output of the inner query by non-aggregate variables
// so that it can be correctly aggregated.
// Turn this collection of vars into a collection of columns from the query.
// We don't allow grouping on anything but a variable bound in the query.
// We group by tag if necessary.
let mut group_by = Vec::with_capacity(outer_variables.len() + 2);
for var in outer_variables.into_iter() {
if query.cc.is_value_bound(&var) {
continue;
}
// The GROUP BY goes outside, but it needs every variable and type tag to be
// projected from inside. Collect in both directions here.
let name = VariableColumn::Variable(var.clone()).column_name();
group_by.push(GroupBy::ProjectedColumn(name));
let needs_type_projection = !query.cc.known_type_set(&var).has_unique_type_tag();
let already_inner = inner_variables.contains(&var);
if !already_inner {
let (column, name) = candidate_column(&query.cc, &var)?;
inner_projection.push(ProjectedColumn(column, name.clone()));
}
if needs_type_projection {
let type_name = VariableColumn::VariableTypeTag(var.clone()).column_name();
if !already_inner {
let type_col = query.cc
.extracted_types
.get(&var)
.cloned()
.ok_or_else(|| ErrorKind::NoTypeAvailableForVariable(var.name().clone()))?;
inner_projection.push(ProjectedColumn(ColumnOrExpression::Column(type_col), type_name.clone()));
}
group_by.push(GroupBy::ProjectedColumn(type_name));
};
}
for var in query.with.iter() {
// We never need to project a constant.
if query.cc.is_value_bound(&var) {
continue;
}
// We don't need to add inner projections for :with if they are already there.
if !inner_variables.contains(&var) {
let (projected_column, type_set) = projected_column_for_var(&var, &query.cc)?;
inner_projection.push(projected_column);
if type_set.unique_type_tag().is_none() {
// Also project the type from the SQL query.
let (type_column, type_name) = candidate_type_column(&query.cc, &var)?;
inner_projection.push(ProjectedColumn(type_column, type_name.clone()));
}
}
}
// At this point we know we have a double-layer projection. Collect the outer.
//
// If we have an inner and outer layer, the inner layer will name its
// variables, and the outer will re-project them.
// If we only have one layer, then the outer will do the naming.
// (We could try to not use names in the inner query, but then what would we do for
// `ground` and known values?)
// Walk the projection, switching the outer columns to use the inner names.
let outer_projection = outer_projection.into_iter().map(|c| {
match c {
Either::Left(name) => {
ProjectedColumn(ColumnOrExpression::ExistingColumn(name.clone()),
name)
},
Either::Right(pc) => pc,
}
}).collect();
Ok(ProjectedElements {
sql_projection: Projection::Columns(outer_projection),
pre_aggregate_projection: Some(Projection::Columns(inner_projection)),
templates,
group_by,
})
}

View file

View file

@ -38,6 +38,6 @@ error_chain! {
}
links {
ProjectorError(mentat_query_projector::Error, mentat_query_projector::ErrorKind);
ProjectorError(mentat_query_projector::errors::Error, mentat_query_projector::errors::ErrorKind);
}
}

View file

@ -47,7 +47,7 @@ error_chain! {
DbError(mentat_db::Error, mentat_db::ErrorKind);
QueryError(mentat_query_algebrizer::Error, mentat_query_algebrizer::ErrorKind); // Let's not leak the term 'algebrizer'.
QueryParseError(mentat_query_parser::Error, mentat_query_parser::ErrorKind);
ProjectorError(mentat_query_projector::Error, mentat_query_projector::ErrorKind);
ProjectorError(mentat_query_projector::errors::Error, mentat_query_projector::errors::ErrorKind);
TranslatorError(mentat_query_translator::Error, mentat_query_translator::ErrorKind);
SqlError(mentat_sql::Error, mentat_sql::ErrorKind);
TxParseError(mentat_tx_parser::Error, mentat_tx_parser::ErrorKind);

View file

@ -584,7 +584,7 @@ fn test_aggregates_type_handling() {
Error(
ErrorKind::TranslatorError(
::mentat_query_translator::ErrorKind::ProjectorError(
::mentat_query_projector::ErrorKind::CannotApplyAggregateOperationToTypes(
::mentat_query_projector::errors::ErrorKind::CannotApplyAggregateOperationToTypes(
SimpleAggregationOp::Sum,
types
),
@ -605,7 +605,7 @@ fn test_aggregates_type_handling() {
Error(
ErrorKind::TranslatorError(
::mentat_query_translator::ErrorKind::ProjectorError(
::mentat_query_projector::ErrorKind::CannotApplyAggregateOperationToTypes(
::mentat_query_projector::errors::ErrorKind::CannotApplyAggregateOperationToTypes(
SimpleAggregationOp::Sum,
types
),
@ -1173,7 +1173,7 @@ fn test_aggregation_implicit_grouping() {
Error(
ErrorKind::TranslatorError(
::mentat_query_translator::ErrorKind::ProjectorError(
::mentat_query_projector::ErrorKind::AmbiguousAggregates(mmc, cc)
::mentat_query_projector::errors::ErrorKind::AmbiguousAggregates(mmc, cc)
)
), _)) => {
assert_eq!(mmc, 2);