diff --git a/Cargo.toml b/Cargo.toml index 7adde7de..b9029f4e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -64,6 +64,9 @@ path = "query-parser" [dependencies.mentat_query_projector] path = "query-projector" +[dependencies.mentat_query_pull] +path = "query-pull" + [dependencies.mentat_query_sql] path = "query-sql" diff --git a/core/src/lib.rs b/core/src/lib.rs index a2f54d67..6353833f 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -55,12 +55,13 @@ mod value_type_set; mod sql_types; pub use types::{ + Binding, Cloned, Entid, FromRc, KnownEntid, + StructuredMap, TypedValue, - Binding, ValueType, ValueTypeTag, ValueRc, @@ -362,6 +363,45 @@ pub mod intern_set; pub mod counter; pub mod util; +/// A helper macro to sequentially process an iterable sequence, +/// evaluating a block between each pair of items. +/// +/// This is used to simply and efficiently produce output like +/// +/// ```sql +/// 1, 2, 3 +/// ``` +/// +/// or +/// +/// ```sql +/// x = 1 AND y = 2 +/// ``` +/// +/// without producing an intermediate string sequence. +#[macro_export] +macro_rules! interpose { + ( $name: pat, $across: expr, $body: block, $inter: block ) => { + interpose_iter!($name, $across.iter(), $body, $inter) + } +} + +/// A helper to bind `name` to values in `across`, running `body` for each value, +/// and running `inter` between each value. See `interpose` for examples. +#[macro_export] +macro_rules! interpose_iter { + ( $name: pat, $across: expr, $body: block, $inter: block ) => { + let mut seq = $across; + if let Some($name) = seq.next() { + $body; + for $name in seq { + $inter; + $body; + } + } + } +} + #[cfg(test)] mod test { use super::*; diff --git a/core/src/types.rs b/core/src/types.rs index 0885866b..4f9b0b1a 100644 --- a/core/src/types.rs +++ b/core/src/types.rs @@ -96,24 +96,37 @@ impl FromRc for Box where T: Sized + Clone { // We do this a lot for errors. pub trait Cloned { fn cloned(&self) -> T; + fn to_value_rc(&self) -> ValueRc; } impl Cloned for Rc where T: Sized + Clone { fn cloned(&self) -> T { (*self.as_ref()).clone() } + + fn to_value_rc(&self) -> ValueRc { + ValueRc::from_rc(self.clone()) + } } impl Cloned for Arc where T: Sized + Clone { fn cloned(&self) -> T { (*self.as_ref()).clone() } + + fn to_value_rc(&self) -> ValueRc { + ValueRc::from_arc(self.clone()) + } } impl Cloned for Box where T: Sized + Clone { fn cloned(&self) -> T { self.as_ref().clone() } + + fn to_value_rc(&self) -> ValueRc { + ValueRc::new(self.cloned()) + } } /// @@ -284,11 +297,17 @@ pub enum Binding { } impl From for Binding where T: Into { - fn from(value: T) -> Binding { + fn from(value: T) -> Self { Binding::Scalar(value.into()) } } +impl From for Binding { + fn from(value: StructuredMap) -> Self { + Binding::Map(ValueRc::new(value)) + } +} + impl Binding { pub fn val(self) -> Option { match self { @@ -308,8 +327,31 @@ impl Binding { /// /// We entirely support the former, and partially support the latter -- you can alias /// using a different keyword only. -#[derive(Clone, Debug, Eq, PartialEq)] -pub struct StructuredMap(IndexMap, Binding>); +#[derive(Clone, Debug, Default, Eq, PartialEq)] +pub struct StructuredMap(pub IndexMap, Binding>); + +impl StructuredMap { + pub fn insert(&mut self, name: N, value: B) where N: Into>, B: Into { + self.0.insert(name.into(), value.into()); + } +} + +impl From, Binding>> for StructuredMap { + fn from(src: IndexMap, Binding>) -> Self { + StructuredMap(src) + } +} + +// Mostly for testing. +impl From> for StructuredMap where T: Into { + fn from(value: Vec<(NamespacedKeyword, T)>) -> Self { + let mut sm = StructuredMap::default(); + for (k, v) in value.into_iter() { + sm.insert(k, v); + } + sm + } +} impl Binding { /// Returns true if the provided type is `Some` and matches this value's type, or if the diff --git a/db/Cargo.toml b/db/Cargo.toml index b791c118..0dd92f43 100644 --- a/db/Cargo.toml +++ b/db/Cargo.toml @@ -23,6 +23,9 @@ path = "../edn" [dependencies.mentat_core] path = "../core" +[dependencies.mentat_sql] +path = "../sql" + [dependencies.mentat_tx] path = "../tx" diff --git a/db/src/cache.rs b/db/src/cache.rs index 92558faf..8a8a2c40 100644 --- a/db/src/cache.rs +++ b/db/src/cache.rs @@ -77,6 +77,7 @@ use num; use rusqlite; use mentat_core::{ + Binding, CachedAttributes, Entid, HasSchema, @@ -90,6 +91,12 @@ use mentat_core::util::{ Either, }; +use mentat_sql::{ + QueryBuilder, + SQLiteQueryBuilder, + SQLQuery, +}; + use mentat_tx::entities::{ OpType, }; @@ -241,6 +248,11 @@ impl<'conn, F> Iterator for AevRows<'conn, F> where F: FnMut(&rusqlite::Row) -> // - cardinality/one doesn't need a vec // - unique/* should ideally have a bijective mapping (reverse lookup) +pub trait AttributeCache { + fn has_e(&self, e: Entid) -> bool; + fn binding_for_e(&self, e: Entid) -> Option; +} + trait RemoveFromCache { fn remove(&mut self, e: Entid, v: &TypedValue); } @@ -273,6 +285,16 @@ impl Absorb for SingleValAttributeCache { } } +impl AttributeCache for SingleValAttributeCache { + fn binding_for_e(&self, e: Entid) -> Option { + self.get(e).map(|v| v.clone().into()) + } + + fn has_e(&self, e: Entid) -> bool { + self.e_v.contains_key(&e) + } +} + impl ClearCache for SingleValAttributeCache { fn clear(&mut self) { self.e_v.clear(); @@ -332,6 +354,19 @@ impl Absorb for MultiValAttributeCache { } } +impl AttributeCache for MultiValAttributeCache { + fn binding_for_e(&self, e: Entid) -> Option { + self.e_vs.get(&e).map(|vs| { + let bindings = vs.iter().cloned().map(|v| v.into()).collect(); + Binding::Vec(ValueRc::new(bindings)) + }) + } + + fn has_e(&self, e: Entid) -> bool { + self.e_vs.contains_key(&e) + } +} + impl ClearCache for MultiValAttributeCache { fn clear(&mut self) { self.e_vs.clear(); @@ -861,16 +896,219 @@ impl AttributeCaches { let sql = format!("SELECT a, e, v, value_type_tag FROM {} WHERE a = ? ORDER BY a ASC, e ASC", table); let args: Vec<&rusqlite::types::ToSql> = vec![&attribute]; let mut stmt = sqlite.prepare(&sql)?; + let replacing = true; + self.repopulate_from_aevt(schema, &mut stmt, args, replacing) + } + + fn repopulate_from_aevt<'a, 's, 'c, 'v>(&'a mut self, + schema: &'s Schema, + statement: &'c mut rusqlite::Statement, + args: Vec<&'v rusqlite::types::ToSql>, + replacing: bool) -> Result<()> { let mut aev_factory = AevFactory::new(); - let rows = stmt.query_map(&args, |row| aev_factory.row_to_aev(row))?; + let rows = statement.query_map(&args, |row| aev_factory.row_to_aev(row))?; let aevs = AevRows { rows: rows, }; - self.accumulate_into_cache(None, schema, aevs.peekable(), AccumulationBehavior::Add { replacing: true })?; + self.accumulate_into_cache(None, schema, aevs.peekable(), AccumulationBehavior::Add { replacing })?; Ok(()) } } +#[derive(Clone)] +pub enum AttributeSpec { + All, + Specified { + // These are assumed to not include duplicates. + fts: Vec, + non_fts: Vec, + }, +} + +impl AttributeSpec { + pub fn all() -> AttributeSpec { + AttributeSpec::All + } + + pub fn specified(attrs: &BTreeSet, schema: &Schema) -> AttributeSpec { + let mut fts = Vec::with_capacity(attrs.len()); + let mut non_fts = Vec::with_capacity(attrs.len()); + for attr in attrs.iter() { + if let Some(a) = schema.attribute_for_entid(*attr) { + if a.fulltext { + fts.push(*attr); + } else { + non_fts.push(*attr); + } + } + } + + AttributeSpec::Specified { fts, non_fts } + } +} + +impl AttributeCaches { + /// Fetch the requested entities and attributes from the store and put them in the cache. + /// + /// The caller is responsible for ensuring that `entities` is unique, and for avoiding any + /// redundant work. + /// + /// Each provided attribute will be marked as forward-cached; the caller is responsible for + /// ensuring that this cache is complete or that it is not expected to be complete. + fn populate_cache_for_entities_and_attributes<'s, 'c>(&mut self, + schema: &'s Schema, + sqlite: &'c rusqlite::Connection, + attrs: AttributeSpec, + entities: &Vec) -> Result<()> { + + // Mark the attributes as cached as we go. We do this because we're going in through the + // back door here, and the usual caching API won't have taken care of this for us. + let mut qb = SQLiteQueryBuilder::new(); + qb.push_sql("SELECT a, e, v, value_type_tag FROM "); + match attrs { + AttributeSpec::All => { + qb.push_sql("all_datoms WHERE e IN ("); + interpose!(item, entities, + { qb.push_sql(&item.to_string()) }, + { qb.push_sql(", ") }); + qb.push_sql(") ORDER BY a ASC, e ASC"); + + self.forward_cached_attributes.extend(schema.attribute_map.keys()); + }, + AttributeSpec::Specified { fts, non_fts } => { + let has_fts = !fts.is_empty(); + let has_non_fts = !non_fts.is_empty(); + + if !has_fts && !has_non_fts { + // Nothing to do. + return Ok(()); + } + + if has_non_fts { + qb.push_sql("datoms WHERE e IN ("); + interpose!(item, entities, + { qb.push_sql(&item.to_string()) }, + { qb.push_sql(", ") }); + qb.push_sql(") AND a IN ("); + interpose!(item, non_fts, + { qb.push_sql(&item.to_string()) }, + { qb.push_sql(", ") }); + qb.push_sql(")"); + + self.forward_cached_attributes.extend(non_fts.iter()); + } + + if has_fts && has_non_fts { + // Both. + qb.push_sql(" UNION ALL SELECT a, e, v, value_type_tag FROM "); + } + + if has_fts { + qb.push_sql("fulltext_datoms WHERE e IN ("); + interpose!(item, entities, + { qb.push_sql(&item.to_string()) }, + { qb.push_sql(", ") }); + qb.push_sql(") AND a IN ("); + interpose!(item, fts, + { qb.push_sql(&item.to_string()) }, + { qb.push_sql(", ") }); + qb.push_sql(")"); + + self.forward_cached_attributes.extend(fts.iter()); + } + qb.push_sql(" ORDER BY a ASC, e ASC"); + }, + }; + + let SQLQuery { sql, args } = qb.finish(); + assert!(args.is_empty()); // TODO: we know there are never args, but we'd like to run this query 'properly'. + let mut stmt = sqlite.prepare(sql.as_str())?; + let replacing = false; + self.repopulate_from_aevt(schema, &mut stmt, vec![], replacing) + } + + /// Return a reference to the cache for the provided `a`, if `a` names an attribute that is + /// cached in the forward direction. If `a` doesn't name an attribute, or it's not cached at + /// all, or it's only cached in reverse (`v` to `e`, not `e` to `v`), `None` is returned. + pub fn forward_attribute_cache_for_attribute<'a, 's>(&'a self, schema: &'s Schema, a: Entid) -> Option<&'a AttributeCache> { + if !self.forward_cached_attributes.contains(&a) { + return None; + } + schema.attribute_for_entid(a) + .and_then(|attr| + if attr.multival { + self.multi_vals.get(&a).map(|v| v as &AttributeCache) + } else { + self.single_vals.get(&a).map(|v| v as &AttributeCache) + }) + } + + /// Fetch the requested entities and attributes from the store and put them in the cache. + /// The caller is responsible for ensuring that `entities` is unique. + /// Attributes for which every entity is already cached will not be processed again. + pub fn extend_cache_for_entities_and_attributes<'s, 'c>(&mut self, + schema: &'s Schema, + sqlite: &'c rusqlite::Connection, + mut attrs: AttributeSpec, + entities: &Vec) -> Result<()> { + // TODO: Exclude any entities for which every attribute is known. + // TODO: initialize from an existing (complete) AttributeCache. + + // Exclude any attributes for which every entity's value is already known. + match &mut attrs { + &mut AttributeSpec::All => { + // If we're caching all attributes, there's nothing we can exclude. + }, + &mut AttributeSpec::Specified { ref mut non_fts, ref mut fts } => { + // Remove any attributes for which all entities are present in the cache (even + // as a 'miss'). + let exclude_missing = |vec: &mut Vec| { + vec.retain(|a| { + if let Some(attr) = schema.attribute_for_entid(*a) { + if !self.forward_cached_attributes.contains(a) { + // The attribute isn't cached at all. Do the work for all entities. + return true; + } + + // Return true if there are any entities missing for this attribute. + if attr.multival { + self.multi_vals + .get(&a) + .map(|cache| entities.iter().any(|e| !cache.has_e(*e))) + .unwrap_or(true) + } else { + self.single_vals + .get(&a) + .map(|cache| entities.iter().any(|e| !cache.has_e(*e))) + .unwrap_or(true) + } + } else { + // Unknown attribute. + false + } + }); + }; + exclude_missing(non_fts); + exclude_missing(fts); + }, + } + + self.populate_cache_for_entities_and_attributes(schema, sqlite, attrs, entities) + } + + /// Fetch the requested entities and attributes and put them in a new cache. + /// The caller is responsible for ensuring that `entities` is unique. + pub fn make_cache_for_entities_and_attributes<'s, 'c>(schema: &'s Schema, + sqlite: &'c rusqlite::Connection, + attrs: AttributeSpec, + entities: &Vec) -> Result { + let mut cache = AttributeCaches::default(); + cache.populate_cache_for_entities_and_attributes(schema, sqlite, attrs, entities)?; + Ok(cache) + } +} + + impl CachedAttributes for AttributeCaches { fn get_values_for_entid(&self, schema: &Schema, attribute: Entid, entid: Entid) -> Option<&Vec> { self.values_pairs(schema, attribute) diff --git a/db/src/lib.rs b/db/src/lib.rs index e0994e80..b87619e4 100644 --- a/db/src/lib.rs +++ b/db/src/lib.rs @@ -23,7 +23,8 @@ extern crate tabwriter; extern crate time; #[macro_use] extern crate edn; -extern crate mentat_core; +#[macro_use] extern crate mentat_core; +extern crate mentat_sql; extern crate mentat_tx; extern crate mentat_tx_parser; @@ -60,6 +61,7 @@ pub use schema::{ AttributeBuilder, AttributeValidation, }; + pub use bootstrap::{ CORE_SCHEMA_VERSION, }; diff --git a/parser-utils/src/macros.rs b/parser-utils/src/macros.rs index 8dd9dbd2..98888a73 100644 --- a/parser-utils/src/macros.rs +++ b/parser-utils/src/macros.rs @@ -116,7 +116,7 @@ macro_rules! assert_parse_failure_contains { let input = edn::parse::value($input).expect("to be able to parse input as EDN"); let par = $parser(); let stream = input.atom_stream(); - let result = par.skip(eof()).parse(stream).map(|x| x.0).map_err(|e| -> ::ValueParseError { e.into() }); + let result = par.skip(eof()).parse(stream).map(|x| x.0).map_err(|e| -> $crate::ValueParseError { e.into() }); assert!(format!("{:?}", result).contains($expected), "Expected {:?} to contain {:?}", result, $expected); }} } diff --git a/query-algebrizer/src/clauses/mod.rs b/query-algebrizer/src/clauses/mod.rs index 7ee8b136..7009b505 100644 --- a/query-algebrizer/src/clauses/mod.rs +++ b/query-algebrizer/src/clauses/mod.rs @@ -40,7 +40,10 @@ use mentat_core::{ use mentat_core::counter::RcCounter; use mentat_query::{ + Element, + FindSpec, NamespacedKeyword, + Pull, Variable, WhereClause, }; @@ -334,6 +337,21 @@ impl ConjoiningClauses { } } +/// Early-stage query handling. +impl ConjoiningClauses { + pub(crate) fn derive_types_from_find_spec(&mut self, find_spec: &FindSpec) { + for spec in find_spec.columns() { + match spec { + &Element::Pull(Pull { ref var, patterns: _ }) => { + self.constrain_var_to_type(var.clone(), ValueType::Ref); + }, + _ => { + }, + } + } + } +} + /// Cloning. impl ConjoiningClauses { fn make_receptacle(&self) -> ConjoiningClauses { diff --git a/query-algebrizer/src/lib.rs b/query-algebrizer/src/lib.rs index e4e2b6e0..6414e71b 100644 --- a/query-algebrizer/src/lib.rs +++ b/query-algebrizer/src/lib.rs @@ -161,6 +161,10 @@ impl AlgebraicQuery { self.find_spec .columns() .all(|e| match e { + // Pull expressions are never fully bound. + // TODO: but the 'inside' of a pull expression certainly can be. + &Element::Pull(_) => false, + &Element::Variable(ref var) | &Element::Corresponding(ref var) => self.cc.is_value_bound(var), @@ -275,6 +279,9 @@ pub fn algebrize_with_inputs(known: Known, let alias_counter = RcCounter::with_initial(counter); let mut cc = ConjoiningClauses::with_inputs_and_alias_counter(parsed.in_vars, inputs, alias_counter); + // This is so the rest of the query knows that `?x` is a ref if `(pull ?x …)` appears in `:find`. + cc.derive_types_from_find_spec(&parsed.find_spec); + // Do we have a variable limit? If so, tell the CC that the var must be numeric. if let &Limit::Variable(ref var) = &parsed.limit { cc.constrain_var_to_long(var.clone()); diff --git a/query-parser/src/parse.rs b/query-parser/src/parse.rs index 3d982a3f..4ba5aca1 100644 --- a/query-parser/src/parse.rs +++ b/query-parser/src/parse.rs @@ -18,7 +18,20 @@ use std; // To refer to std::result::Result. use std::collections::BTreeSet; -use self::combine::{eof, many, many1, optional, parser, satisfy, satisfy_map, Parser, ParseResult, Stream}; +use self::combine::{ + eof, + look_ahead, + many, + many1, + optional, + parser, + satisfy, + satisfy_map, + Parser, + ParseResult, + Stream, +}; + use self::combine::combinator::{any, choice, or, try}; use self::mentat_core::ValueType; @@ -33,6 +46,7 @@ use self::mentat_parser_utils::value_and_span::Stream as ValueStream; use self::mentat_parser_utils::value_and_span::{ Item, OfExactlyParsing, + forward_keyword, keyword_map, list, map, @@ -58,6 +72,9 @@ use self::mentat_query::{ PatternNonValuePlace, PatternValuePlace, Predicate, + Pull, + PullAttributeSpec, + PullConcreteAttribute, QueryFunction, SrcVar, TypeAnnotation, @@ -172,6 +189,8 @@ def_parser!(Query, order, Order, { }); def_matches_plain_symbol!(Query, the, "the"); +def_matches_plain_symbol!(Query, pull, "pull"); +def_matches_plain_symbol!(Query, wildcard, "*"); pub struct Where<'a>(std::marker::PhantomData<&'a ()>); @@ -278,12 +297,69 @@ def_parser!(Query, func, (QueryFunction, Vec), { }); def_parser!(Query, aggregate, Aggregate, { - seq().of_exactly(Query::func()) + Query::func() .map(|(func, args)| Aggregate { func, args, }) }); +def_parser!(Query, pull_concrete_attribute, PullAttributeSpec, { + forward_keyword().map(|k| + PullAttributeSpec::Attribute( + PullConcreteAttribute::Ident( + ::std::rc::Rc::new(k.clone())))) +}); + +def_parser!(Query, pull_wildcard_attribute, PullAttributeSpec, { + Query::wildcard().map(|_| PullAttributeSpec::Wildcard) +}); + +def_parser!(Query, pull_attribute, PullAttributeSpec, { + choice([ + try(Query::pull_concrete_attribute()), + try(Query::pull_wildcard_attribute()), + // TODO: reversed keywords, entids (with aliases, presumably…). + ]) +}); + +// A wildcard can appear only once. +// If a wildcard appears, only map expressions can be present. +fn validate_attributes<'a, I>(attrs: I) -> std::result::Result<(), &'static str> + where I: IntoIterator { + let mut wildcard_seen = false; + let mut non_map_or_wildcard_seen = false; + for attr in attrs { + match attr { + &PullAttributeSpec::Wildcard => { + if wildcard_seen { + return Err("duplicate wildcard pull attribute"); + } + wildcard_seen = true; + if non_map_or_wildcard_seen { + return Err("wildcard with specified attributes"); + } + }, + // &PullAttributeSpec::LimitedAttribute(_, _) => { + &PullAttributeSpec::Attribute(_) => { + non_map_or_wildcard_seen = true; + if wildcard_seen { + return Err("wildcard with specified attributes"); + } + }, + // TODO: map form. + } + } + Ok(()) +} + +def_parser!(Query, pull_attributes, Vec, { + vector().of_exactly(many1(Query::pull_attribute())) + .and_then(|attrs: Vec| + validate_attributes(&attrs) + .and(Ok(attrs)) + .map_err(|e| combine::primitives::Error::Unexpected(e.into()))) +}); + /// A vector containing just a parenthesized filter expression. def_parser!(Where, pred, WhereClause, { // Accept either a nested list or a nested vector here: @@ -432,7 +508,7 @@ def_parser!(Find, variable_element, Element, { }); def_parser!(Find, corresponding_element, Element, { - seq().of_exactly(Query::the().with(Query::variable())) + Query::the().with(Query::variable()) .map(Element::Corresponding) }); @@ -440,10 +516,43 @@ def_parser!(Find, aggregate_element, Element, { Query::aggregate().map(Element::Aggregate) }); +def_parser!(Find, pull_element, Element, { + Query::pull().with(Query::variable().and(Query::pull_attributes())) + .map(|(var, attrs)| Element::Pull(Pull { var: var, patterns: attrs })) +}); + +enum ElementType { + Corresponding, + Pull, + Aggregate, +} + +def_parser!(Find, seq_elem, Element, { + let element_parser_for_type = |ty: ElementType| { + match ty { + ElementType::Corresponding => Find::corresponding_element(), + ElementType::Pull => Find::pull_element(), + ElementType::Aggregate => Find::aggregate_element(), + } + }; + + // This slightly tortured phrasing ensures that we don't consume + // when the first item in the list -- the function name -- doesn't + // match, but once we decide what the list is, we go ahead and + // commit to that branch. + seq().of_exactly( + // This comes first because otherwise (the ?x) will match as an aggregate. + look_ahead(Query::the()).map(|_| ElementType::Corresponding) + + // Similarly, we have to parse pull before general functions. + .or(look_ahead(Query::pull()).map(|_| ElementType::Pull)) + .or(look_ahead(Query::func()).map(|_| ElementType::Aggregate)) + .then(element_parser_for_type)) +}); + def_parser!(Find, elem, Element, { - choice([try(Find::variable_element()), - try(Find::corresponding_element()), - try(Find::aggregate_element())]) + try(Find::variable_element()) + .or(Find::seq_elem()) }); def_parser!(Find, find_scalar, FindSpec, { @@ -982,7 +1091,7 @@ mod test { #[test] fn test_the() { - assert_edn_parses_to!(Find::corresponding_element, + assert_edn_parses_to!(Find::seq_elem, "(the ?y)", Element::Corresponding(Variable::from_valid_name("?y"))); assert_edn_parses_to!(Find::find_tuple, @@ -1017,6 +1126,11 @@ mod test { "[:find [(the ?x) ?y] :where [?x _ ?y]]", expected_query); + + // If we give a malformed pull expression, we don't fall through to aggregates. + assert_parse_failure_contains!(Find::elem, + "(pull x [])", + r#"errors: [Unexpected(Token(ValueAndSpan { inner: PlainSymbol(PlainSymbol("x")), span: Span(6, 7) })), Expected(Borrowed("variable"))]"#); } #[test] @@ -1073,4 +1187,82 @@ mod test { })); } + + #[test] + fn test_pull() { + assert_edn_parses_to!(Query::pull_attribute, + "*", + PullAttributeSpec::Wildcard); + assert_edn_parses_to!(Query::pull_attributes, + "[*]", + vec![PullAttributeSpec::Wildcard]); + assert_edn_parses_to!(Find::elem, + "(pull ?v [*])", + Element::Pull(Pull { + var: Variable::from_valid_name("?v"), + patterns: vec![PullAttributeSpec::Wildcard], + })); + + let foo_bar = ::std::rc::Rc::new(edn::NamespacedKeyword::new("foo", "bar")); + let foo_baz = ::std::rc::Rc::new(edn::NamespacedKeyword::new("foo", "baz")); + assert_edn_parses_to!(Query::pull_concrete_attribute, + ":foo/bar", + PullAttributeSpec::Attribute( + PullConcreteAttribute::Ident(foo_bar.clone()))); + assert_edn_parses_to!(Query::pull_attribute, + ":foo/bar", + PullAttributeSpec::Attribute( + PullConcreteAttribute::Ident(foo_bar.clone()))); + assert_edn_parses_to!(Find::elem, + "(pull ?v [:foo/bar :foo/baz])", + Element::Pull(Pull { + var: Variable::from_valid_name("?v"), + patterns: vec![ + PullAttributeSpec::Attribute( + PullConcreteAttribute::Ident(foo_bar.clone())), + PullAttributeSpec::Attribute( + PullConcreteAttribute::Ident(foo_baz.clone())), + ], + })); + assert_parse_failure_contains!(Find::elem, + "(pull ?x [* :foo/bar])", + r#"errors: [Unexpected(Borrowed("wildcard with specified attributes"))]"#); + } + + #[test] + fn test_query_with_pull() { + let q = "[:find ?x (pull ?x [:foo/bar]) :where [?x _ _]]"; + let expected_query = + FindQuery { + find_spec: FindSpec::FindRel(vec![ + Element::Variable(Variable::from_valid_name("?x")), + Element::Pull(Pull { + var: Variable::from_valid_name("?x"), + patterns: vec![ + PullAttributeSpec::Attribute( + PullConcreteAttribute::Ident( + ::std::rc::Rc::new(edn::NamespacedKeyword::new("foo", "bar")) + ) + ), + ] })]), + where_clauses: vec![ + WhereClause::Pattern(Pattern { + source: None, + entity: PatternNonValuePlace::Variable(Variable::from_valid_name("?x")), + attribute: PatternNonValuePlace::Placeholder, + value: PatternValuePlace::Placeholder, + tx: PatternNonValuePlace::Placeholder, + })], + + default_source: SrcVar::DefaultSrc, + with: Default::default(), + in_vars: Default::default(), + in_sources: Default::default(), + limit: Limit::None, + order: None, + }; + assert_edn_parses_to!(Find::query, + q, + expected_query); + } } diff --git a/query-projector/Cargo.toml b/query-projector/Cargo.toml index 95059cd5..e5e2563e 100644 --- a/query-projector/Cargo.toml +++ b/query-projector/Cargo.toml @@ -26,6 +26,9 @@ path = "../query" [dependencies.mentat_query_algebrizer] path = "../query-algebrizer" +[dependencies.mentat_query_pull] +path = "../query-pull" + # Only for tests. [dev-dependencies.mentat_query_parser] path = "../query-parser" diff --git a/query-projector/src/errors.rs b/query-projector/src/errors.rs index 7046284e..8db51d21 100644 --- a/query-projector/src/errors.rs +++ b/query-projector/src/errors.rs @@ -20,6 +20,8 @@ use mentat_query::{ PlainSymbol, }; +use mentat_query_pull; + use aggregates::{ SimpleAggregationOp, }; @@ -72,5 +74,6 @@ error_chain! { links { DbError(mentat_db::Error, mentat_db::ErrorKind); + PullError(mentat_query_pull::errors::Error, mentat_query_pull::errors::ErrorKind); } } diff --git a/query-projector/src/lib.rs b/query-projector/src/lib.rs index b2a279c5..94bc9d5b 100644 --- a/query-projector/src/lib.rs +++ b/query-projector/src/lib.rs @@ -17,6 +17,7 @@ extern crate mentat_core; extern crate mentat_db; // For value conversion. extern crate mentat_query; extern crate mentat_query_algebrizer; +extern crate mentat_query_pull; extern crate mentat_query_sql; extern crate mentat_sql; @@ -35,6 +36,7 @@ use rusqlite::{ use mentat_core::{ Binding, + Schema, TypedValue, ValueType, ValueTypeTag, @@ -67,6 +69,8 @@ use mentat_query_sql::{ mod aggregates; mod project; +mod projectors; +mod pull; mod relresult; pub mod errors; @@ -83,6 +87,22 @@ pub use project::{ projected_column_for_var, }; +pub use projectors::{ + ConstantProjector, + Projector, +}; + +use projectors::{ + CollProjector, + CollTwoStagePullProjector, + RelProjector, + RelTwoStagePullProjector, + ScalarProjector, + ScalarTwoStagePullProjector, + TupleProjector, + TupleTwoStagePullProjector, +}; + pub use relresult::{ RelResult, StructuredRelResult, @@ -161,7 +181,11 @@ impl QueryOutput { QueryResults::Scalar(val) }, &FindScalar(Element::Aggregate(ref _agg)) => { - // TODO + // TODO: static aggregates. + unimplemented!(); + }, + &FindScalar(Element::Pull(ref _pull)) => { + // TODO: static pull. unimplemented!(); }, &FindTuple(ref elements) => { @@ -174,6 +198,10 @@ impl QueryOutput { .expect("every var to have a binding") .into() }, + &Element::Pull(ref _pull) => { + // TODO: static pull. + unreachable!(); + }, &Element::Aggregate(ref _agg) => { // TODO: static computation of aggregates, then // implement the condition in `is_fully_bound`. @@ -191,6 +219,10 @@ impl QueryOutput { .into(); QueryResults::Coll(vec![val]) }, + &FindColl(Element::Pull(ref _pull)) => { + // TODO: static pull. + unimplemented!(); + }, &FindColl(Element::Aggregate(ref _agg)) => { // Does it even make sense to write // [:find [(max ?x) ...] :where [_ :foo/bar ?x]] @@ -208,6 +240,10 @@ impl QueryOutput { .expect("every var to have a binding") .into() }, + &Element::Pull(ref _pull) => { + // TODO: static pull. + unreachable!(); + }, &Element::Aggregate(ref _agg) => { // TODO: static computation of aggregates, then // implement the condition in `is_fully_bound`. @@ -336,283 +372,6 @@ impl TypedIndex { } } -pub trait Projector { - fn project<'stmt>(&self, rows: Rows<'stmt>) -> Result; - fn columns<'s>(&'s self) -> Box + 's>; -} - -/// A projector that produces a `QueryResult` containing fixed data. -/// Takes a boxed function that should return an empty result set of the desired type. -pub struct ConstantProjector { - spec: Rc, - results_factory: Box QueryResults>, -} - -impl ConstantProjector { - fn new(spec: Rc, results_factory: Box QueryResults>) -> ConstantProjector { - ConstantProjector { - spec: spec, - results_factory: results_factory, - } - } - - pub fn project_without_rows<'stmt>(&self) -> Result { - let results = (self.results_factory)(); - let spec = self.spec.clone(); - Ok(QueryOutput { - spec: spec, - results: results, - }) - } -} - -impl Projector for ConstantProjector { - fn project<'stmt>(&self, _: Rows<'stmt>) -> Result { - self.project_without_rows() - } - - fn columns<'s>(&'s self) -> Box + 's> { - self.spec.columns() - } -} - -struct ScalarProjector { - spec: Rc, - template: TypedIndex, -} - -impl ScalarProjector { - fn with_template(spec: Rc, template: TypedIndex) -> ScalarProjector { - ScalarProjector { - spec: spec, - template: template, - } - } - - fn combine(spec: Rc, mut elements: ProjectedElements) -> Result { - let template = elements.templates.pop().expect("Expected a single template"); - Ok(CombinedProjection { - sql_projection: elements.sql_projection, - pre_aggregate_projection: elements.pre_aggregate_projection, - datalog_projector: Box::new(ScalarProjector::with_template(spec, template)), - distinct: false, - group_by_cols: elements.group_by, - }) - } -} - -impl Projector for ScalarProjector { - fn project<'stmt>(&self, mut rows: Rows<'stmt>) -> Result { - let results = - if let Some(r) = rows.next() { - let row = r?; - let binding = self.template.lookup(&row)?; - QueryResults::Scalar(Some(binding)) - } else { - QueryResults::Scalar(None) - }; - Ok(QueryOutput { - spec: self.spec.clone(), - results: results, - }) - } - - fn columns<'s>(&'s self) -> Box + 's> { - self.spec.columns() - } -} - -/// A tuple projector produces a single vector. It's the single-result version of rel. -struct TupleProjector { - spec: Rc, - len: usize, - templates: Vec, -} - -impl TupleProjector { - fn with_templates(spec: Rc, len: usize, templates: Vec) -> TupleProjector { - TupleProjector { - spec: spec, - len: len, - templates: templates, - } - } - - // This is exactly the same as for rel. - fn collect_bindings<'a, 'stmt>(&self, row: Row<'a, 'stmt>) -> Result> { - // There will be at least as many SQL columns as Datalog columns. - // gte 'cos we might be querying extra columns for ordering. - // The templates will take care of ignoring columns. - assert!(row.column_count() >= self.len as i32); - self.templates - .iter() - .map(|ti| ti.lookup(&row)) - .collect::>>() - } - - fn combine(spec: Rc, column_count: usize, elements: ProjectedElements) -> Result { - let p = TupleProjector::with_templates(spec, column_count, elements.templates); - Ok(CombinedProjection { - sql_projection: elements.sql_projection, - pre_aggregate_projection: elements.pre_aggregate_projection, - datalog_projector: Box::new(p), - distinct: false, - group_by_cols: elements.group_by, - }) - } -} - -impl Projector for TupleProjector { - fn project<'stmt>(&self, mut rows: Rows<'stmt>) -> Result { - let results = - if let Some(r) = rows.next() { - let row = r?; - let bindings = self.collect_bindings(row)?; - QueryResults::Tuple(Some(bindings)) - } else { - QueryResults::Tuple(None) - }; - Ok(QueryOutput { - spec: self.spec.clone(), - results: results, - }) - } - - fn columns<'s>(&'s self) -> Box + 's> { - self.spec.columns() - } -} - -/// A rel projector produces a RelResult, which is a striding abstraction over a vector. -/// Each stride across the vector is the same size, and sourced from the same columns. -/// Each column in each stride is the result of taking one or two columns from -/// the `Row`: one for the value and optionally one for the type tag. -struct RelProjector { - spec: Rc, - len: usize, - templates: Vec, -} - -impl RelProjector { - fn with_templates(spec: Rc, len: usize, templates: Vec) -> RelProjector { - RelProjector { - spec: spec, - len: len, - templates: templates, - } - } - - fn collect_bindings_into<'a, 'stmt, 'out>(&self, row: Row<'a, 'stmt>, out: &mut Vec) -> Result<()> { - // There will be at least as many SQL columns as Datalog columns. - // gte 'cos we might be querying extra columns for ordering. - // The templates will take care of ignoring columns. - assert!(row.column_count() >= self.len as i32); - let mut count = 0; - for binding in self.templates - .iter() - .map(|ti| ti.lookup(&row)) { - out.push(binding?); - count += 1; - } - assert_eq!(self.len, count); - Ok(()) - } - - fn combine(spec: Rc, column_count: usize, elements: ProjectedElements) -> Result { - let p = RelProjector::with_templates(spec, column_count, elements.templates); - - // If every column yields only one value, or if this is an aggregate query - // (because by definition every column in an aggregate query is either - // aggregated or is a variable _upon which we group_), then don't bother - // with DISTINCT. - let already_distinct = elements.pre_aggregate_projection.is_some() || - p.columns().all(|e| e.is_unit()); - Ok(CombinedProjection { - sql_projection: elements.sql_projection, - pre_aggregate_projection: elements.pre_aggregate_projection, - datalog_projector: Box::new(p), - distinct: !already_distinct, - group_by_cols: elements.group_by, - }) - } -} - -impl Projector for RelProjector { - fn project<'stmt>(&self, mut rows: Rows<'stmt>) -> Result { - // Allocate space for five rows to start. - // This is better than starting off by doubling the buffer a couple of times, and will - // rapidly grow to support larger query results. - let width = self.len; - let mut values: Vec<_> = Vec::with_capacity(5 * width); - - while let Some(r) = rows.next() { - let row = r?; - self.collect_bindings_into(row, &mut values)?; - } - Ok(QueryOutput { - spec: self.spec.clone(), - results: QueryResults::Rel(RelResult { width, values }), - }) - } - - fn columns<'s>(&'s self) -> Box + 's> { - self.spec.columns() - } -} - -/// A coll projector produces a vector of values. -/// Each value is sourced from the same column. -struct CollProjector { - spec: Rc, - template: TypedIndex, -} - -impl CollProjector { - fn with_template(spec: Rc, template: TypedIndex) -> CollProjector { - CollProjector { - spec: spec, - template: template, - } - } - - fn combine(spec: Rc, mut elements: ProjectedElements) -> Result { - let template = elements.templates.pop().expect("Expected a single template"); - let p = CollProjector::with_template(spec, template); - - // If every column yields only one value, or if this is an aggregate query - // (because by definition every column in an aggregate query is either - // aggregated or is a variable _upon which we group_), then don't bother - // with DISTINCT. - let already_distinct = elements.pre_aggregate_projection.is_some() || - p.columns().all(|e| e.is_unit()); - Ok(CombinedProjection { - sql_projection: elements.sql_projection, - pre_aggregate_projection: elements.pre_aggregate_projection, - datalog_projector: Box::new(p), - distinct: !already_distinct, - group_by_cols: elements.group_by, - }) - } -} - -impl Projector for CollProjector { - fn project<'stmt>(&self, mut rows: Rows<'stmt>) -> Result { - let mut out: Vec<_> = vec![]; - while let Some(r) = rows.next() { - let row = r?; - let binding = self.template.lookup(&row)?; - out.push(binding); - } - Ok(QueryOutput { - spec: self.spec.clone(), - results: QueryResults::Coll(out), - }) - } - - fn columns<'s>(&'s self) -> Box + 's> { - self.spec.columns() - } -} /// Combines the things you need to turn a query into SQL and turn its results into /// `QueryResults`: SQL-related projection information (`DISTINCT`, columns, etc.) and @@ -652,6 +411,19 @@ impl CombinedProjection { } } +trait IsPull { + fn is_pull(&self) -> bool; +} + +impl IsPull for Element { + fn is_pull(&self) -> bool { + match self { + &Element::Pull(_) => true, + _ => false, + } + } +} + /// Compute a suitable SQL projection for an algebrized query. /// This takes into account a number of things: /// - The variable list in the find spec. @@ -660,7 +432,7 @@ impl CombinedProjection { /// - The bindings established by the topmost CC. /// - The types known at algebrizing time. /// - The types extracted from the store for unknown attributes. -pub fn query_projection(query: &AlgebraicQuery) -> Result> { +pub fn query_projection(schema: &Schema, query: &AlgebraicQuery) -> Result> { use self::FindSpec::*; let spec = query.find_spec.clone(); @@ -671,6 +443,13 @@ pub fn query_projection(query: &AlgebraicQuery) -> Result var.clone(), + + // Pull expressions can never be fully bound. + // TODO: but the interior can be, in which case we + // can handle this and simply project. + &Element::Pull(_) => { + unreachable!(); + }, &Element::Aggregate(ref _agg) => { // TODO: static computation of aggregates, then // implement the condition in `is_fully_bound`. @@ -692,24 +471,42 @@ pub fn query_projection(query: &AlgebraicQuery) -> Result { let elements = project_elements(1, iter::once(element), query)?; - CollProjector::combine(spec, elements).map(|p| p.flip_distinct_for_limit(&query.limit)) + if element.is_pull() { + CollTwoStagePullProjector::combine(spec, elements) + } else { + CollProjector::combine(spec, elements) + }.map(|p| p.flip_distinct_for_limit(&query.limit)) }, FindScalar(ref element) => { let elements = project_elements(1, iter::once(element), query)?; - ScalarProjector::combine(spec, elements) + if element.is_pull() { + ScalarTwoStagePullProjector::combine(schema, spec, elements) + } else { + ScalarProjector::combine(spec, elements) + } }, FindRel(ref elements) => { + let is_pull = elements.iter().any(|e| e.is_pull()); let column_count = query.find_spec.expected_column_count(); let elements = project_elements(column_count, elements, query)?; - RelProjector::combine(spec, column_count, elements).map(|p| p.flip_distinct_for_limit(&query.limit)) + if is_pull { + RelTwoStagePullProjector::combine(spec, column_count, elements) + } else { + RelProjector::combine(spec, column_count, elements) + }.map(|p| p.flip_distinct_for_limit(&query.limit)) }, FindTuple(ref elements) => { + let is_pull = elements.iter().any(|e| e.is_pull()); let column_count = query.find_spec.expected_column_count(); let elements = project_elements(column_count, elements, query)?; - TupleProjector::combine(spec, column_count, elements) + if is_pull { + TupleTwoStagePullProjector::combine(spec, column_count, elements) + } else { + TupleProjector::combine(spec, column_count, elements) + } }, }.map(Either::Right) } diff --git a/query-projector/src/project.rs b/query-projector/src/project.rs index e63f6bee..57ec1619 100644 --- a/query-projector/src/project.rs +++ b/query-projector/src/project.rs @@ -28,6 +28,7 @@ use mentat_core::util::{ use mentat_query::{ Element, + Pull, Variable, }; @@ -58,7 +59,18 @@ use errors::{ Result, }; +use projectors::{ + Projector, +}; + +use pull::{ + PullIndices, + PullOperation, + PullTemplate, +}; + use super::{ + CombinedProjection, TypedIndex, }; @@ -73,9 +85,40 @@ pub(crate) struct ProjectedElements { pub sql_projection: Projection, pub pre_aggregate_projection: Option, pub templates: Vec, + + // TODO: when we have an expression like + // [:find (pull ?x [:foo/name :foo/age]) (pull ?x [:foo/friend]) …] + // it would be more efficient to combine them. + pub pulls: Vec, pub group_by: Vec, } +impl ProjectedElements { + pub(crate) fn combine(self, projector: Box, distinct: bool) -> Result { + Ok(CombinedProjection { + sql_projection: self.sql_projection, + pre_aggregate_projection: self.pre_aggregate_projection, + datalog_projector: projector, + distinct: distinct, + group_by_cols: self.group_by, + }) + } + + // We need the templates to make a projector that we can then hand to `combine`. This is the easy + // way to get it. + pub(crate) fn take_templates(&mut self) -> Vec { + let mut out = vec![]; + ::std::mem::swap(&mut out, &mut self.templates); + out + } + + pub(crate) fn take_pulls(&mut self) -> Vec { + let mut out = vec![]; + ::std::mem::swap(&mut out, &mut self.pulls); + out + } +} + fn candidate_type_column(cc: &ConjoiningClauses, var: &Variable) -> Result<(ColumnOrExpression, Name)> { cc.extracted_types .get(var) @@ -120,6 +163,7 @@ pub fn projected_column_for_var(var: &Variable, cc: &ConjoiningClauses) -> Resul Ok((ProjectedColumn(column, name), cc.known_type_set(var))) } } + /// Walk an iterator of `Element`s, collecting projector templates and columns. /// /// Returns a `ProjectedElements`, which combines SQL projections @@ -148,6 +192,7 @@ pub(crate) fn project_elements<'a, I: IntoIterator>( let mut i: i32 = 0; let mut min_max_count: usize = 0; let mut templates = vec![]; + let mut pulls: Vec = vec![]; let mut aggregates = false; @@ -182,9 +227,11 @@ pub(crate) fn project_elements<'a, I: IntoIterator>( }, &Element::Aggregate(_) => { }, + &Element::Pull(_) => { + }, }; - // Record variables -- (the ?x) and ?x are different in this regard, because we don't want + // Record variables -- `(the ?x)` and `?x` are different in this regard, because we don't want // to group on variables that are corresponding-projected. match e { &Element::Variable(ref var) => { @@ -195,6 +242,11 @@ pub(crate) fn project_elements<'a, I: IntoIterator>( // so we know not to group them. corresponded_variables.insert(var.clone()); }, + &Element::Pull(Pull { ref var, patterns: _ }) => { + // We treat `pull` as an ordinary variable extraction, + // and we expand it later. + outer_variables.insert(var.clone()); + }, &Element::Aggregate(_) => { }, }; @@ -225,6 +277,35 @@ pub(crate) fn project_elements<'a, I: IntoIterator>( outer_projection.push(Either::Left(type_name)); } }, + &Element::Pull(Pull { ref var, ref patterns }) => { + inner_variables.insert(var.clone()); + + let (projected_column, type_set) = projected_column_for_var(&var, &query.cc)?; + outer_projection.push(Either::Left(projected_column.1.clone())); + inner_projection.push(projected_column); + + if let Some(tag) = type_set.unique_type_tag() { + // We will have at least as many SQL columns as Datalog output columns. + // `i` tracks the former. The length of `templates` is the current latter. + // Projecting pull requires grabbing values, which we can do from the raw + // rows, and then populating the output, so we keep both column indices. + let output_index = templates.len(); + assert!(output_index <= i as usize); + + templates.push(TypedIndex::Known(i, tag)); + pulls.push(PullTemplate { + indices: PullIndices { + sql_index: i, + output_index, + }, + op: PullOperation((*patterns).clone()), + }); + i += 1; // We used one SQL column. + } else { + // This should be impossible: (pull ?x) implies that ?x is a ref. + unreachable!(); + } + }, &Element::Aggregate(ref a) => { if let Some(simple) = a.to_simple() { aggregates = true; @@ -331,6 +412,7 @@ pub(crate) fn project_elements<'a, I: IntoIterator>( sql_projection: Projection::Columns(inner_projection), pre_aggregate_projection: None, templates, + pulls, group_by: vec![], }); } @@ -434,6 +516,7 @@ pub(crate) fn project_elements<'a, I: IntoIterator>( sql_projection: Projection::Columns(outer_projection), pre_aggregate_projection: Some(Projection::Columns(inner_projection)), templates, + pulls, group_by, }) } diff --git a/query-projector/src/projectors/constant.rs b/query-projector/src/projectors/constant.rs new file mode 100644 index 00000000..f56bac0a --- /dev/null +++ b/query-projector/src/projectors/constant.rs @@ -0,0 +1,66 @@ +// Copyright 2018 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +use std::rc::Rc; + +use ::{ + Element, + FindSpec, + QueryOutput, + QueryResults, + Rows, + Schema, + rusqlite, +}; + +use ::errors::{ + Result, +}; + +use super::{ + Projector, +}; + +/// A projector that produces a `QueryResult` containing fixed data. +/// Takes a boxed function that should return an empty result set of the desired type. +pub struct ConstantProjector { + spec: Rc, + results_factory: Box QueryResults>, +} + +impl ConstantProjector { + pub fn new(spec: Rc, results_factory: Box QueryResults>) -> ConstantProjector { + ConstantProjector { + spec: spec, + results_factory: results_factory, + } + } + + pub fn project_without_rows<'stmt>(&self) -> Result { + let results = (self.results_factory)(); + let spec = self.spec.clone(); + Ok(QueryOutput { + spec: spec, + results: results, + }) + } +} + +// TODO: a ConstantProjector with non-constant pull expressions. + +impl Projector for ConstantProjector { + fn project<'stmt, 's>(&self, _schema: &Schema, _sqlite: &'s rusqlite::Connection, _rows: Rows<'stmt>) -> Result { + self.project_without_rows() + } + + fn columns<'s>(&'s self) -> Box + 's> { + self.spec.columns() + } +} diff --git a/query-projector/src/projectors/mod.rs b/query-projector/src/projectors/mod.rs new file mode 100644 index 00000000..b4e7c363 --- /dev/null +++ b/query-projector/src/projectors/mod.rs @@ -0,0 +1,46 @@ +// Copyright 2018 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +use super::{ + Element, + Schema, + QueryOutput, + Rows, + rusqlite, +}; + +use super::errors::{ + Result, +}; + +pub trait Projector { + fn project<'stmt, 's>(&self, schema: &Schema, sqlite: &'s rusqlite::Connection, rows: Rows<'stmt>) -> Result; + fn columns<'s>(&'s self) -> Box + 's>; +} + +mod constant; +mod simple; +mod pull_two_stage; + +pub use self::constant::ConstantProjector; + +pub(crate) use self::simple::{ + CollProjector, + RelProjector, + ScalarProjector, + TupleProjector, +}; + +pub(crate) use self::pull_two_stage::{ + CollTwoStagePullProjector, + RelTwoStagePullProjector, + ScalarTwoStagePullProjector, + TupleTwoStagePullProjector, +}; diff --git a/query-projector/src/projectors/pull_two_stage.rs b/query-projector/src/projectors/pull_two_stage.rs new file mode 100644 index 00000000..f7197c38 --- /dev/null +++ b/query-projector/src/projectors/pull_two_stage.rs @@ -0,0 +1,335 @@ +// Copyright 2018 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +use std::rc::Rc; + +use std::iter::{ + once, +}; + +use mentat_query_pull::{ + Puller, +}; + +use mentat_core::{ + Entid, +}; + +use ::{ + Binding, + CombinedProjection, + Element, + FindSpec, + ProjectedElements, + QueryOutput, + QueryResults, + RelResult, + Row, + Rows, + Schema, + TypedIndex, + rusqlite, +}; + +use ::pull::{ + PullConsumer, + PullOperation, + PullTemplate, +}; + +use ::errors::{ + Result, +}; + +use super::{ + Projector, +}; + +pub(crate) struct ScalarTwoStagePullProjector { + spec: Rc, + puller: Puller, +} + +// TODO: almost by definition, a scalar result format doesn't need to be run in two stages. +// The only output is the pull expression, and so we can directly supply the projected entity +// to the pull SQL. +impl ScalarTwoStagePullProjector { + fn with_template(schema: &Schema, spec: Rc, pull: PullOperation) -> Result { + Ok(ScalarTwoStagePullProjector { + spec: spec, + puller: Puller::prepare(schema, pull.0.clone())?, + }) + } + + pub(crate) fn combine(schema: &Schema, spec: Rc, mut elements: ProjectedElements) -> Result { + let pull = elements.pulls.pop().expect("Expected a single pull"); + let projector = Box::new(ScalarTwoStagePullProjector::with_template(schema, spec, pull.op)?); + let distinct = false; + elements.combine(projector, distinct) + } +} + +impl Projector for ScalarTwoStagePullProjector { + fn project<'stmt, 's>(&self, schema: &Schema, sqlite: &'s rusqlite::Connection, mut rows: Rows<'stmt>) -> Result { + // Scalar is pretty straightforward -- zero or one entity, do the pull directly. + let results = + if let Some(r) = rows.next() { + let row = r?; + let entity: Entid = row.get(0); // This will always be 0 and a ref. + let bindings = self.puller.pull(schema, sqlite, once(entity))?; + let m = Binding::Map(bindings.get(&entity).cloned().unwrap_or_else(Default::default)); + QueryResults::Scalar(Some(m)) + } else { + QueryResults::Scalar(None) + }; + + Ok(QueryOutput { + spec: self.spec.clone(), + results: results, + }) + } + + fn columns<'s>(&'s self) -> Box + 's> { + self.spec.columns() + } +} + +/// A tuple projector produces a single vector. It's the single-result version of rel. +pub(crate) struct TupleTwoStagePullProjector { + spec: Rc, + len: usize, + templates: Vec, + pulls: Vec, +} + +impl TupleTwoStagePullProjector { + fn with_templates(spec: Rc, len: usize, templates: Vec, pulls: Vec) -> TupleTwoStagePullProjector { + TupleTwoStagePullProjector { + spec: spec, + len: len, + templates: templates, + pulls: pulls, + } + } + + // This is exactly the same as for rel. + fn collect_bindings<'a, 'stmt>(&self, row: Row<'a, 'stmt>) -> Result> { + // There will be at least as many SQL columns as Datalog columns. + // gte 'cos we might be querying extra columns for ordering. + // The templates will take care of ignoring columns. + assert!(row.column_count() >= self.len as i32); + self.templates + .iter() + .map(|ti| ti.lookup(&row)) + .collect::>>() + } + + pub(crate) fn combine(spec: Rc, column_count: usize, mut elements: ProjectedElements) -> Result { + let projector = Box::new(TupleTwoStagePullProjector::with_templates(spec, column_count, elements.take_templates(), elements.take_pulls())); + let distinct = false; + elements.combine(projector, distinct) + } +} + +impl Projector for TupleTwoStagePullProjector { + fn project<'stmt, 's>(&self, schema: &Schema, sqlite: &'s rusqlite::Connection, mut rows: Rows<'stmt>) -> Result { + let results = + if let Some(r) = rows.next() { + let row = r?; + + // Keeping the compiler happy. + let pull_consumers: Result> = self.pulls + .iter() + .map(|op| PullConsumer::for_template(schema, op)) + .collect(); + let mut pull_consumers = pull_consumers?; + + // Collect the usual bindings and accumulate entity IDs for pull. + for mut p in pull_consumers.iter_mut() { + p.collect_entity(&row); + } + + let mut bindings = self.collect_bindings(row)?; + + // Run the pull expressions for the collected IDs. + for mut p in pull_consumers.iter_mut() { + p.pull(sqlite)?; + } + + // Expand the pull expressions back into the results vector. + for p in pull_consumers.into_iter() { + p.expand(&mut bindings); + } + + QueryResults::Tuple(Some(bindings)) + } else { + QueryResults::Tuple(None) + }; + Ok(QueryOutput { + spec: self.spec.clone(), + results: results, + }) + } + + fn columns<'s>(&'s self) -> Box + 's> { + self.spec.columns() + } +} + +/// A rel projector produces a RelResult, which is a striding abstraction over a vector. +/// Each stride across the vector is the same size, and sourced from the same columns. +/// Each column in each stride is the result of taking one or two columns from +/// the `Row`: one for the value and optionally one for the type tag. +pub(crate) struct RelTwoStagePullProjector { + spec: Rc, + len: usize, + templates: Vec, + pulls: Vec, +} + +impl RelTwoStagePullProjector { + fn with_templates(spec: Rc, len: usize, templates: Vec, pulls: Vec) -> RelTwoStagePullProjector { + RelTwoStagePullProjector { + spec: spec, + len: len, + templates: templates, + pulls: pulls, + } + } + + fn collect_bindings_into<'a, 'stmt, 'out>(&self, row: Row<'a, 'stmt>, out: &mut Vec) -> Result<()> { + // There will be at least as many SQL columns as Datalog columns. + // gte 'cos we might be querying extra columns for ordering. + // The templates will take care of ignoring columns. + assert!(row.column_count() >= self.len as i32); + let mut count = 0; + for binding in self.templates + .iter() + .map(|ti| ti.lookup(&row)) { + out.push(binding?); + count += 1; + } + assert_eq!(self.len, count); + Ok(()) + } + + pub(crate) fn combine(spec: Rc, column_count: usize, mut elements: ProjectedElements) -> Result { + let projector = Box::new(RelTwoStagePullProjector::with_templates(spec, column_count, elements.take_templates(), elements.take_pulls())); + + // If every column yields only one value, or if this is an aggregate query + // (because by definition every column in an aggregate query is either + // aggregated or is a variable _upon which we group_), then don't bother + // with DISTINCT. + let already_distinct = elements.pre_aggregate_projection.is_some() || + projector.columns().all(|e| e.is_unit()); + + elements.combine(projector, !already_distinct) + } +} + +impl Projector for RelTwoStagePullProjector { + fn project<'stmt, 's>(&self, schema: &Schema, sqlite: &'s rusqlite::Connection, mut rows: Rows<'stmt>) -> Result { + // Allocate space for five rows to start. + // This is better than starting off by doubling the buffer a couple of times, and will + // rapidly grow to support larger query results. + let width = self.len; + let mut values: Vec<_> = Vec::with_capacity(5 * width); + + let pull_consumers: Result> = self.pulls + .iter() + .map(|op| PullConsumer::for_template(schema, op)) + .collect(); + let mut pull_consumers = pull_consumers?; + + // Collect the usual bindings and accumulate entity IDs for pull. + while let Some(r) = rows.next() { + let row = r?; + for mut p in pull_consumers.iter_mut() { + p.collect_entity(&row); + } + self.collect_bindings_into(row, &mut values)?; + } + + // Run the pull expressions for the collected IDs. + for mut p in pull_consumers.iter_mut() { + p.pull(sqlite)?; + } + + // Expand the pull expressions back into the results vector. + for bindings in values.chunks_mut(width) { + for p in pull_consumers.iter() { + p.expand(bindings); + } + }; + + Ok(QueryOutput { + spec: self.spec.clone(), + results: QueryResults::Rel(RelResult { width, values }), + }) + } + + fn columns<'s>(&'s self) -> Box + 's> { + self.spec.columns() + } +} + +/// A coll projector produces a vector of values. +/// Each value is sourced from the same column. +pub(crate) struct CollTwoStagePullProjector { + spec: Rc, + pull: PullOperation, +} + +impl CollTwoStagePullProjector { + fn with_pull(spec: Rc, pull: PullOperation) -> CollTwoStagePullProjector { + CollTwoStagePullProjector { + spec: spec, + pull: pull, + } + } + + pub(crate) fn combine(spec: Rc, mut elements: ProjectedElements) -> Result { + let pull = elements.pulls.pop().expect("Expected a single pull"); + let projector = Box::new(CollTwoStagePullProjector::with_pull(spec, pull.op)); + + // If every column yields only one value, or we're grouping by the value, + // don't bother with DISTINCT. This shouldn't really apply to coll-pull. + let already_distinct = elements.pre_aggregate_projection.is_some() || + projector.columns().all(|e| e.is_unit()); + elements.combine(projector, !already_distinct) + } +} + +impl Projector for CollTwoStagePullProjector { + fn project<'stmt, 's>(&self, schema: &Schema, sqlite: &'s rusqlite::Connection, mut rows: Rows<'stmt>) -> Result { + let mut pull_consumer = PullConsumer::for_operation(schema, &self.pull)?; + + while let Some(r) = rows.next() { + let row = r?; + pull_consumer.collect_entity(&row); + } + + // Run the pull expressions for the collected IDs. + pull_consumer.pull(sqlite)?; + + // Expand the pull expressions into a results vector. + let out = pull_consumer.into_coll_results(); + + Ok(QueryOutput { + spec: self.spec.clone(), + results: QueryResults::Coll(out), + }) + } + + fn columns<'s>(&'s self) -> Box + 's> { + self.spec.columns() + } +} + diff --git a/query-projector/src/projectors/simple.rs b/query-projector/src/projectors/simple.rs new file mode 100644 index 00000000..337fc214 --- /dev/null +++ b/query-projector/src/projectors/simple.rs @@ -0,0 +1,253 @@ +// Copyright 2018 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +use std::rc::Rc; + +use ::{ + Binding, + CombinedProjection, + Element, + FindSpec, + ProjectedElements, + QueryOutput, + QueryResults, + RelResult, + Row, + Rows, + Schema, + TypedIndex, + rusqlite, +}; + +use ::errors::{ + Result, +}; + +use super::{ + Projector, +}; + +pub(crate) struct ScalarProjector { + spec: Rc, + template: TypedIndex, +} + +impl ScalarProjector { + fn with_template(spec: Rc, template: TypedIndex) -> ScalarProjector { + ScalarProjector { + spec: spec, + template: template, + } + } + + pub(crate) fn combine(spec: Rc, mut elements: ProjectedElements) -> Result { + let template = elements.templates.pop().expect("Expected a single template"); + let projector = Box::new(ScalarProjector::with_template(spec, template)); + let distinct = false; + elements.combine(projector, distinct) + } +} + +impl Projector for ScalarProjector { + fn project<'stmt, 's>(&self, _schema: &Schema, _sqlite: &'s rusqlite::Connection, mut rows: Rows<'stmt>) -> Result { + let results = + if let Some(r) = rows.next() { + let row = r?; + let binding = self.template.lookup(&row)?; + QueryResults::Scalar(Some(binding)) + } else { + QueryResults::Scalar(None) + }; + Ok(QueryOutput { + spec: self.spec.clone(), + results: results, + }) + } + + fn columns<'s>(&'s self) -> Box + 's> { + self.spec.columns() + } +} + +/// A tuple projector produces a single vector. It's the single-result version of rel. +pub(crate) struct TupleProjector { + spec: Rc, + len: usize, + templates: Vec, +} + +impl TupleProjector { + fn with_templates(spec: Rc, len: usize, templates: Vec) -> TupleProjector { + TupleProjector { + spec: spec, + len: len, + templates: templates, + } + } + + // This is just like we do for `rel`, but into a vec of its own. + fn collect_bindings<'a, 'stmt>(&self, row: Row<'a, 'stmt>) -> Result> { + // There will be at least as many SQL columns as Datalog columns. + // gte 'cos we might be querying extra columns for ordering. + // The templates will take care of ignoring columns. + assert!(row.column_count() >= self.len as i32); + self.templates + .iter() + .map(|ti| ti.lookup(&row)) + .collect::>>() + } + + pub(crate) fn combine(spec: Rc, column_count: usize, mut elements: ProjectedElements) -> Result { + let projector = Box::new(TupleProjector::with_templates(spec, column_count, elements.take_templates())); + let distinct = false; + elements.combine(projector, distinct) + } +} + +impl Projector for TupleProjector { + fn project<'stmt, 's>(&self, _schema: &Schema, _sqlite: &'s rusqlite::Connection, mut rows: Rows<'stmt>) -> Result { + let results = + if let Some(r) = rows.next() { + let row = r?; + let bindings = self.collect_bindings(row)?; + QueryResults::Tuple(Some(bindings)) + } else { + QueryResults::Tuple(None) + }; + Ok(QueryOutput { + spec: self.spec.clone(), + results: results, + }) + } + + fn columns<'s>(&'s self) -> Box + 's> { + self.spec.columns() + } +} + +/// A rel projector produces a RelResult, which is a striding abstraction over a vector. +/// Each stride across the vector is the same size, and sourced from the same columns. +/// Each column in each stride is the result of taking one or two columns from +/// the `Row`: one for the value and optionally one for the type tag. +pub(crate) struct RelProjector { + spec: Rc, + len: usize, + templates: Vec, +} + +impl RelProjector { + fn with_templates(spec: Rc, len: usize, templates: Vec) -> RelProjector { + RelProjector { + spec: spec, + len: len, + templates: templates, + } + } + + fn collect_bindings_into<'a, 'stmt, 'out>(&self, row: Row<'a, 'stmt>, out: &mut Vec) -> Result<()> { + // There will be at least as many SQL columns as Datalog columns. + // gte 'cos we might be querying extra columns for ordering. + // The templates will take care of ignoring columns. + assert!(row.column_count() >= self.len as i32); + let mut count = 0; + for binding in self.templates + .iter() + .map(|ti| ti.lookup(&row)) { + out.push(binding?); + count += 1; + } + assert_eq!(self.len, count); + Ok(()) + } + + pub(crate) fn combine(spec: Rc, column_count: usize, mut elements: ProjectedElements) -> Result { + let projector = Box::new(RelProjector::with_templates(spec, column_count, elements.take_templates())); + + // If every column yields only one value, or if this is an aggregate query + // (because by definition every column in an aggregate query is either + // aggregated or is a variable _upon which we group_), then don't bother + // with DISTINCT. + let already_distinct = elements.pre_aggregate_projection.is_some() || + projector.columns().all(|e| e.is_unit()); + elements.combine(projector, !already_distinct) + } +} + +impl Projector for RelProjector { + fn project<'stmt, 's>(&self, _schema: &Schema, _sqlite: &'s rusqlite::Connection, mut rows: Rows<'stmt>) -> Result { + // Allocate space for five rows to start. + // This is better than starting off by doubling the buffer a couple of times, and will + // rapidly grow to support larger query results. + let width = self.len; + let mut values: Vec<_> = Vec::with_capacity(5 * width); + + while let Some(r) = rows.next() { + let row = r?; + self.collect_bindings_into(row, &mut values)?; + } + + Ok(QueryOutput { + spec: self.spec.clone(), + results: QueryResults::Rel(RelResult { width, values }), + }) + } + + fn columns<'s>(&'s self) -> Box + 's> { + self.spec.columns() + } +} + +/// A coll projector produces a vector of values. +/// Each value is sourced from the same column. +pub(crate) struct CollProjector { + spec: Rc, + template: TypedIndex, +} + +impl CollProjector { + fn with_template(spec: Rc, template: TypedIndex) -> CollProjector { + CollProjector { + spec: spec, + template: template, + } + } + + pub(crate) fn combine(spec: Rc, mut elements: ProjectedElements) -> Result { + let template = elements.templates.pop().expect("Expected a single template"); + let projector = Box::new(CollProjector::with_template(spec, template)); + + // If every column yields only one value, or if this is an aggregate query + // (because by definition every column in an aggregate query is either + // aggregated or is a variable _upon which we group_), then don't bother + // with DISTINCT. + let already_distinct = elements.pre_aggregate_projection.is_some() || + projector.columns().all(|e| e.is_unit()); + elements.combine(projector, !already_distinct) + } +} + +impl Projector for CollProjector { + fn project<'stmt, 's>(&self, _schema: &Schema, _sqlite: &'s rusqlite::Connection, mut rows: Rows<'stmt>) -> Result { + let mut out: Vec<_> = vec![]; + while let Some(r) = rows.next() { + let row = r?; + let binding = self.template.lookup(&row)?; + out.push(binding); + } + Ok(QueryOutput { + spec: self.spec.clone(), + results: QueryResults::Coll(out), + }) + } + + fn columns<'s>(&'s self) -> Box + 's> { + self.spec.columns() + } +} diff --git a/query-projector/src/pull.rs b/query-projector/src/pull.rs index e69de29b..bca0c217 100644 --- a/query-projector/src/pull.rs +++ b/query-projector/src/pull.rs @@ -0,0 +1,121 @@ +// Copyright 2018 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +use std::collections::{ + BTreeMap, + BTreeSet, +}; + +use mentat_core::{ + Binding, + Entid, + Schema, + StructuredMap, + TypedValue, + ValueRc, +}; + +use mentat_query::{ + PullAttributeSpec, +}; + +use mentat_query_pull::{ + Puller, +}; + +use errors::{ + Result, +}; + +use super::{ + Index, + rusqlite, +}; + +#[derive(Clone, Debug)] +pub(crate) struct PullOperation(pub(crate) Vec); + +#[derive(Clone, Copy, Debug)] +pub(crate) struct PullIndices { + pub(crate) sql_index: Index, // SQLite column index. + pub(crate) output_index: usize, +} + +impl PullIndices { + fn zero() -> PullIndices { + PullIndices { + sql_index: 0, + output_index: 0, + } + } +} + +#[derive(Debug)] +pub(crate) struct PullTemplate { + pub(crate) indices: PullIndices, + pub(crate) op: PullOperation, +} + +pub(crate) struct PullConsumer<'schema> { + indices: PullIndices, + schema: &'schema Schema, + puller: Puller, + entities: BTreeSet, + results: BTreeMap>, +} + +impl<'schema> PullConsumer<'schema> { + pub(crate) fn for_puller(puller: Puller, schema: &'schema Schema, indices: PullIndices) -> PullConsumer<'schema> { + PullConsumer { + indices: indices, + schema: schema, + puller: puller, + entities: Default::default(), + results: Default::default(), + } + } + + pub(crate) fn for_template(schema: &'schema Schema, template: &PullTemplate) -> Result> { + let puller = Puller::prepare(schema, template.op.0.clone())?; + Ok(PullConsumer::for_puller(puller, schema, template.indices)) + } + + pub(crate) fn for_operation(schema: &'schema Schema, operation: &PullOperation) -> Result> { + let puller = Puller::prepare(schema, operation.0.clone())?; + Ok(PullConsumer::for_puller(puller, schema, PullIndices::zero())) + } + + pub(crate) fn collect_entity<'a, 'stmt>(&mut self, row: &rusqlite::Row<'a, 'stmt>) -> Entid { + let entity = row.get(self.indices.sql_index); + self.entities.insert(entity); + entity + } + + pub(crate) fn pull(&mut self, sqlite: &rusqlite::Connection) -> Result<()> { + let entities: Vec = self.entities.iter().cloned().collect(); + self.results = self.puller.pull(self.schema, sqlite, entities)?; + Ok(()) + } + + pub(crate) fn expand(&self, bindings: &mut [Binding]) { + if let Binding::Scalar(TypedValue::Ref(id)) = bindings[self.indices.output_index] { + if let Some(pulled) = self.results.get(&id).cloned() { + bindings[self.indices.output_index] = Binding::Map(pulled); + } else { + bindings[self.indices.output_index] = Binding::Map(ValueRc::new(Default::default())); + } + } + } + + // TODO: do we need to include empty maps for entities that didn't match any pull? + pub(crate) fn into_coll_results(self) -> Vec { + self.results.values().cloned().map(|vrc| Binding::Map(vrc)).collect() + } +} diff --git a/query-projector/tests/aggregates.rs b/query-projector/tests/aggregates.rs index 2590d141..1a18db81 100644 --- a/query-projector/tests/aggregates.rs +++ b/query-projector/tests/aggregates.rs @@ -84,7 +84,7 @@ fn test_aggregate_unsuitable_type() { let algebrized = algebrize(Known::for_schema(&schema), parsed).expect("query algebrizes"); // … when we look at the projection list, we cannot reconcile the types. - assert!(query_projection(&algebrized).is_err()); + assert!(query_projection(&schema, &algebrized).is_err()); } #[test] @@ -100,7 +100,7 @@ fn test_the_without_max_or_min() { let algebrized = algebrize(Known::for_schema(&schema), parsed).expect("query algebrizes"); // … when we look at the projection list, we cannot reconcile the types. - let projection = query_projection(&algebrized); + let projection = query_projection(&schema, &algebrized); assert!(projection.is_err()); use ::mentat_query_projector::errors::{ ErrorKind, diff --git a/query-pull/Cargo.toml b/query-pull/Cargo.toml new file mode 100644 index 00000000..9b89a6af --- /dev/null +++ b/query-pull/Cargo.toml @@ -0,0 +1,33 @@ +[package] +name = "mentat_query_pull" +version = "0.0.1" +workspace = ".." + +[dependencies] +error-chain = { git = "https://github.com/rnewman/error-chain", branch = "rnewman/sync" } + +[dependencies.rusqlite] +version = "0.13" +features = ["limits"] + +[dependencies.mentat_core] +path = "../core" + +[dependencies.mentat_db] +path = "../db" + +[dependencies.mentat_query] +path = "../query" + +[dependencies.mentat_query_algebrizer] +path = "../query-algebrizer" + +[dependencies.mentat_query_sql] +path = "../query-sql" + +[dependencies.mentat_sql] +path = "../sql" + +# Only for tests. +[dev-dependencies.mentat_query_parser] +path = "../query-parser" diff --git a/query-pull/src/errors.rs b/query-pull/src/errors.rs new file mode 100644 index 00000000..f0e747e6 --- /dev/null +++ b/query-pull/src/errors.rs @@ -0,0 +1,30 @@ +// Copyright 2018 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +use mentat_core::{ + Entid, +}; + +error_chain! { + types { + Error, ErrorKind, ResultExt, Result; + } + + errors { + UnnamedAttribute(id: Entid) { + description("unnamed attribute") + display("attribute {:?} has no name", id) + } + } + + links { + DbError(::mentat_db::Error, ::mentat_db::ErrorKind); + } +} diff --git a/query-pull/src/lib.rs b/query-pull/src/lib.rs new file mode 100644 index 00000000..a011ac51 --- /dev/null +++ b/query-pull/src/lib.rs @@ -0,0 +1,246 @@ +// Copyright 2018 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +#![allow(dead_code)] + +///! A pull expression is a function. +///! +///! Its inputs are a store, a schema, and a set of bindings. +///! +///! Its output is a map whose keys are the input bindings and whose values are +///! appropriate structured values to represent the pull expression. +///! +///! For example, the pull expression: +///! +///! ```edn +///! (pull ?person [:person/name +///! :person/tattoo +///! {:person/friend [*]}])` +///! ``` +///! +///! will return values shaped like: +///! +///! ```edn +///! {:person/name "Alice" ; Single-valued attribute +///! ; Absence: Alice has no tattoos. +///! :person/friend [ ; Multi-valued attribute. +///! {:person/name "Bob" ; Nesting and wildcard. +///! :person/pet ["Harrison", "Hoppy"]}]} +///! ``` +///! +///! There will be one such value for each input binding. +///! +///! We fetch layers of a pull expression iteratively: all attributes at the same +///! 'level' can be fetched at the same time and accumulated into maps. +///! +///! Those maps are wrapped in `Rc` for two reasons: +///! - They might occur multiple times when projected from a `:find` query. +///! - They might refer to each other (consider recursion). +///! +///! A nested or recursive pull expression consumes values produced by earlier stages +///! (the recursion with a smaller recursion limit and a growing 'seen' list), +///! generating another layer of mappings. +///! +///! For example, you can imagine the nesting in the earlier pull expression being +///! decomposed into two chained expressions: +///! +///! ```edn +///! (pull +///! (pull ?person [:person/friend]) +/// [*])) +///! ``` + +#[macro_use] +extern crate error_chain; + +extern crate rusqlite; + +extern crate mentat_core; +extern crate mentat_db; +extern crate mentat_query; +extern crate mentat_query_algebrizer; +extern crate mentat_query_sql; +extern crate mentat_sql; + +use std::collections::{ + BTreeMap, + BTreeSet, +}; + +use std::iter::{ + once, +}; + +use mentat_core::{ + Cloned, + Entid, + HasSchema, + NamespacedKeyword, + Schema, + StructuredMap, + ValueRc, +}; + +use mentat_db::cache; + +use mentat_query::{ + PullAttributeSpec, + PullConcreteAttribute, +}; + +pub mod errors; + +use errors::{ + ErrorKind, + Result, +}; + +type PullResults = BTreeMap>; + +pub fn pull_attributes_for_entity(schema: &Schema, + db: &rusqlite::Connection, + entity: Entid, + attributes: A) -> Result + where A: IntoIterator { + let attrs = attributes.into_iter() + .map(|e| PullAttributeSpec::Attribute(PullConcreteAttribute::Entid(e))) + .collect(); + Puller::prepare(schema, attrs)? + .pull(schema, db, once(entity)) + .map(|m| m.into_iter() + .next() + .map(|(k, vs)| { + assert_eq!(k, entity); + vs.cloned() + }) + .unwrap_or_else(StructuredMap::default)) +} + +pub fn pull_attributes_for_entities(schema: &Schema, + db: &rusqlite::Connection, + entities: E, + attributes: A) -> Result + where E: IntoIterator, + A: IntoIterator { + let attrs = attributes.into_iter() + .map(|e| PullAttributeSpec::Attribute(PullConcreteAttribute::Entid(e))) + .collect(); + Puller::prepare(schema, attrs)? + .pull(schema, db, entities) +} + +/// A `Puller` constructs on demand a map from a provided set of entity IDs to a set of structured maps. +pub struct Puller { + // The domain of this map is the set of attributes to fetch. + // The range is the set of aliases to use in the output. + attributes: BTreeMap>, + attribute_spec: cache::AttributeSpec, +} + +impl Puller { + pub fn prepare_simple_attributes(schema: &Schema, attributes: Vec) -> Result { + Puller::prepare(schema, + attributes.into_iter() + .map(|e| PullAttributeSpec::Attribute(PullConcreteAttribute::Entid(e))) + .collect()) + } + + pub fn prepare(schema: &Schema, attributes: Vec) -> Result { + // TODO: eventually this entry point will handle aliasing and that kind of + // thing. For now it's just a convenience. + + let lookup_name = |i: &Entid| { + // In the unlikely event that we have an attribute with no name, we bail. + schema.get_ident(*i) + .map(|ident| ValueRc::new(ident.clone())) + .ok_or_else(|| ErrorKind::UnnamedAttribute(*i)) + }; + + let mut names: BTreeMap> = Default::default(); + let mut attrs: BTreeSet = Default::default(); + for attr in attributes.iter() { + match attr { + &PullAttributeSpec::Wildcard => { + let attribute_ids = schema.attribute_map.keys(); + for id in attribute_ids { + names.insert(*id, lookup_name(id)?); + attrs.insert(*id); + } + break; + }, + &PullAttributeSpec::Attribute(PullConcreteAttribute::Ident(ref i)) => { + if let Some(entid) = schema.get_entid(i) { + names.insert(entid.into(), i.to_value_rc()); + attrs.insert(entid.into()); + } + }, + &PullAttributeSpec::Attribute(PullConcreteAttribute::Entid(ref entid)) => { + names.insert(*entid, lookup_name(entid)?); + attrs.insert(*entid); + }, + } + } + + Ok(Puller { + attributes: names, + attribute_spec: cache::AttributeSpec::specified(&attrs, schema), + }) + } + + pub fn pull(&self, + schema: &Schema, + db: &rusqlite::Connection, + entities: E) -> Result + where E: IntoIterator { + // We implement pull by: + // - Generating `AttributeCaches` for the provided attributes and entities. + // TODO: it would be nice to invert the cache as we build it, rather than have to invert it here. + // - Recursing. (TODO: we'll need AttributeCaches to not overwrite in case of recursion! And + // ideally not do excess work when some entity/attribute pairs are known.) + // - Building a structure by walking the pull expression with the caches. + // TODO: aliases. + // TODO: limits. + // TODO: fts. + + // Build a cache for these attributes and entities. + // TODO: use the store's existing cache! + let entities: Vec = entities.into_iter().collect(); + let caches = cache::AttributeCaches::make_cache_for_entities_and_attributes( + schema, + db, + self.attribute_spec.clone(), + &entities)?; + + // Now construct the appropriate result format. + // TODO: should we walk `e` then `a`, or `a` then `e`? Possibly the right answer + // is just to collect differently! + let mut maps = BTreeMap::new(); + for (name, cache) in self.attributes.iter().filter_map(|(a, name)| + caches.forward_attribute_cache_for_attribute(schema, *a) + .map(|cache| (name.clone(), cache))) { + + for e in entities.iter() { + if let Some(binding) = cache.binding_for_e(*e) { + let mut r = maps.entry(*e) + .or_insert(ValueRc::new(StructuredMap::default())); + + // Get into the inner map so we can accumulate a value. + // We can unwrap here because we created all of these maps… + let mut m = ValueRc::get_mut(r).unwrap(); + + m.insert(name.clone(), binding); + } + } + } + + Ok(maps) + } + +} diff --git a/query-sql/src/lib.rs b/query-sql/src/lib.rs index 1f72144f..b6d85d82 100644 --- a/query-sql/src/lib.rs +++ b/query-sql/src/lib.rs @@ -9,7 +9,7 @@ // specific language governing permissions and limitations under the License. extern crate regex; -extern crate mentat_core; +#[macro_use] extern crate mentat_core; extern crate mentat_query; extern crate mentat_query_algebrizer; extern crate mentat_sql; @@ -253,42 +253,6 @@ fn push_column(qb: &mut QueryBuilder, col: &Column) -> BuildQueryResult { //--------------------------------------------------------- // Turn that representation into SQL. - -/// A helper macro to sequentially process an iterable sequence, -/// evaluating a block between each pair of items. -/// -/// This is used to simply and efficiently produce output like -/// -/// ```sql -/// 1, 2, 3 -/// ``` -/// -/// or -/// -/// ```sql -/// x = 1 AND y = 2 -/// ``` -/// -/// without producing an intermediate string sequence. -macro_rules! interpose { - ( $name: pat, $across: expr, $body: block, $inter: block ) => { - interpose_iter!($name, $across.iter(), $body, $inter) - } -} - -macro_rules! interpose_iter { - ( $name: pat, $across: expr, $body: block, $inter: block ) => { - let mut seq = $across; - if let Some($name) = seq.next() { - $body; - for $name in seq { - $inter; - $body; - } - } - } -} - impl QueryFragment for ColumnOrExpression { fn push_sql(&self, out: &mut QueryBuilder) -> BuildQueryResult { use self::ColumnOrExpression::*; diff --git a/query-translator/src/translate.rs b/query-translator/src/translate.rs index 71f97418..54b8e5d1 100644 --- a/query-translator/src/translate.rs +++ b/query-translator/src/translate.rs @@ -9,6 +9,7 @@ // specific language governing permissions and limitations under the License. use mentat_core::{ + Schema, SQLTypeAffinity, SQLValueType, SQLValueTypeSet, @@ -439,10 +440,10 @@ fn re_project(mut inner: SelectQuery, projection: Projection) -> SelectQuery { /// Consume a provided `AlgebraicQuery` to yield a new /// `ProjectedSelect`. -pub fn query_to_select(query: AlgebraicQuery) -> Result { +pub fn query_to_select(schema: &Schema, query: AlgebraicQuery) -> Result { // TODO: we can't pass `query.limit` here if we aggregate during projection. // SQL-based aggregation -- `SELECT SUM(datoms00.e)` -- is fine. - query_projection(&query).map(|e| match e { + query_projection(schema, &query).map(|e| match e { Either::Left(constant) => ProjectedSelect::Constant(constant), Either::Right(CombinedProjection { sql_projection, diff --git a/query-translator/tests/translate.rs b/query-translator/tests/translate.rs index bc3676f6..101810f1 100644 --- a/query-translator/tests/translate.rs +++ b/query-translator/tests/translate.rs @@ -101,7 +101,7 @@ fn inner_translate_with_inputs(schema: &Schema, query: &'static str, inputs: Que let known = Known::for_schema(schema); let parsed = parse_find_string(query).expect("parse to succeed"); let algebrized = algebrize_with_inputs(known, parsed, 0, inputs).expect("algebrize to succeed"); - query_to_select(algebrized).expect("translate to succeed") + query_to_select(schema, algebrized).expect("translate to succeed") } fn translate_with_inputs(schema: &Schema, query: &'static str, inputs: QueryInputs) -> SQLQuery { @@ -249,7 +249,7 @@ fn test_bound_variable_limit_affects_types() { assert_eq!(Some(ValueType::Long), algebrized.cc.known_type(&Variable::from_valid_name("?limit"))); - let select = query_to_select(algebrized).expect("query to translate"); + let select = query_to_select(&schema, algebrized).expect("query to translate"); let SQLQuery { sql, args } = query_to_sql(select); // TODO: this query isn't actually correct -- we don't yet algebrize for variables that are @@ -340,7 +340,7 @@ fn test_unknown_ident() { assert!(algebrized.is_known_empty()); // If you insist… - let select = query_to_select(algebrized).expect("query to translate"); + let select = query_to_select(&schema, algebrized).expect("query to translate"); assert_query_is_empty(select, FindSpec::FindRel(vec![var!(?x).into()])); } diff --git a/query/src/lib.rs b/query/src/lib.rs index f8c4cdcb..76e4e115 100644 --- a/query/src/lib.rs +++ b/query/src/lib.rs @@ -486,18 +486,61 @@ impl PatternValuePlace { } } -/* -pub enum PullPattern { - Constant(Constant), - Variable(Variable), +// Not yet used. +// pub enum PullDefaultValue { +// EntidOrInteger(i64), +// IdentOrKeyword(Rc), +// Constant(NonIntegerConstant), +// } + +#[derive(Clone, Debug, Eq, PartialEq)] +pub enum PullConcreteAttribute { + Ident(Rc), + Entid(i64), } -pub struct Pull { - pub src: SrcVar, - pub var: Variable, - pub pattern: PullPattern, // Constant, variable, or plain variable. +#[derive(Clone, Debug, Eq, PartialEq)] +pub enum PullAttributeSpec { + Wildcard, + Attribute(PullConcreteAttribute), + // PullMapSpec(Vec<…>), + // AttributeWithOpts(PullConcreteAttribute, …), + // LimitedAttribute(PullConcreteAttribute, u64), // Limit nil => Attribute instead. + // DefaultedAttribute(PullConcreteAttribute, PullDefaultValue), +} + +impl std::fmt::Display for PullConcreteAttribute { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match self { + &PullConcreteAttribute::Ident(ref k) => { + write!(f, "{}", k) + }, + &PullConcreteAttribute::Entid(i) => { + write!(f, "{}", i) + }, + } + } +} + +impl std::fmt::Display for PullAttributeSpec { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match self { + &PullAttributeSpec::Wildcard => { + write!(f, "*") + }, + &PullAttributeSpec::Attribute(ref a) => { + write!(f, "{}", a) + }, + } + } +} + + +#[derive(Debug, Eq, PartialEq)] +pub struct Pull { + pub var: Variable, + pub patterns: Vec, } -*/ #[derive(Debug, Eq, PartialEq)] pub struct Aggregate { @@ -516,7 +559,7 @@ pub enum Element { /// `max` or `min` cannot yield predictable behavior, and will err during /// algebrizing. Corresponding(Variable), - // Pull(Pull), // TODO + Pull(Pull), } impl Element { @@ -524,6 +567,7 @@ impl Element { pub fn is_unit(&self) -> bool { match self { &Element::Variable(_) => false, + &Element::Pull(_) => false, &Element::Aggregate(_) => true, &Element::Corresponding(_) => true, } @@ -542,6 +586,13 @@ impl std::fmt::Display for Element { &Element::Variable(ref var) => { write!(f, "{}", var) }, + &Element::Pull(Pull { ref var, ref patterns }) => { + write!(f, "(pull {} [ ", var)?; + for p in patterns.iter() { + write!(f, "{} ", p)?; + } + write!(f, "])") + }, &Element::Aggregate(ref agg) => { match agg.args.len() { 0 => write!(f, "({})", agg.func), @@ -573,10 +624,7 @@ pub enum Limit { /// Examples: /// /// ```rust -/// # extern crate edn; /// # extern crate mentat_query; -/// # use std::rc::Rc; -/// # use edn::PlainSymbol; /// # use mentat_query::{Element, FindSpec, Variable}; /// /// # fn main() { diff --git a/src/conn.rs b/src/conn.rs index 6549ea45..f27d43f7 100644 --- a/src/conn.rs +++ b/src/conn.rs @@ -10,6 +10,10 @@ #![allow(dead_code)] +use std::collections::{ + BTreeMap, +}; + use std::fs::{ File, }; @@ -41,7 +45,9 @@ use mentat_core::{ KnownEntid, NamespacedKeyword, Schema, + StructuredMap, TypedValue, + ValueRc, ValueType, }; @@ -67,6 +73,11 @@ use mentat_db::{ use mentat_db::internal_types::TermWithTempIds; +use mentat_query_pull::{ + pull_attributes_for_entities, + pull_attributes_for_entity, +}; + use mentat_tx; use mentat_tx::entities::{ @@ -161,6 +172,17 @@ pub struct Store { } impl Store { + /// Open a store at the supplied path, ensuring that it includes the bootstrap schema. + pub fn open(path: &str) -> Result { + let mut connection = ::new_connection(path)?; + let conn = Conn::connect(&mut connection)?; + Ok(Store { + conn: conn, + sqlite: connection, + }) + } + + /// Returns a totally blank store with no bootstrap schema. Use `open` instead. pub fn open_empty(path: &str) -> Result { if !path.is_empty() { if Path::new(path).exists() { @@ -176,15 +198,6 @@ impl Store { }) } - pub fn open(path: &str) -> Result { - let mut connection = ::new_connection(path)?; - let conn = Conn::connect(&mut connection)?; - Ok(Store { - conn: conn, - sqlite: connection, - }) - } - pub fn transact(&mut self, transaction: &str) -> Result { let mut ip = self.begin_transaction()?; let report = ip.transact(transaction)?; @@ -206,6 +219,14 @@ pub trait Queryable { where E: Into; } +pub trait Pullable { + fn pull_attributes_for_entities(&self, entities: E, attributes: A) -> Result>> + where E: IntoIterator, + A: IntoIterator; + fn pull_attributes_for_entity(&self, entity: Entid, attributes: A) -> Result + where A: IntoIterator; +} + pub trait Syncable { fn sync(&mut self, server_uri: &String, user_uuid: &String) -> Result<()>; } @@ -257,6 +278,19 @@ impl<'a, 'c> Queryable for InProgressRead<'a, 'c> { } } +impl<'a, 'c> Pullable for InProgressRead<'a, 'c> { + fn pull_attributes_for_entities(&self, entities: E, attributes: A) -> Result>> + where E: IntoIterator, + A: IntoIterator { + self.0.pull_attributes_for_entities(entities, attributes) + } + + fn pull_attributes_for_entity(&self, entity: Entid, attributes: A) -> Result + where A: IntoIterator { + self.0.pull_attributes_for_entity(entity, attributes) + } +} + impl<'a, 'c> Queryable for InProgress<'a, 'c> { fn q_once(&self, query: &str, inputs: T) -> Result where T: Into> { @@ -308,6 +342,21 @@ impl<'a, 'c> Queryable for InProgress<'a, 'c> { } } +impl<'a, 'c> Pullable for InProgress<'a, 'c> { + fn pull_attributes_for_entities(&self, entities: E, attributes: A) -> Result>> + where E: IntoIterator, + A: IntoIterator { + pull_attributes_for_entities(&self.schema, &*(self.transaction), entities, attributes) + .map_err(|e| e.into()) + } + + fn pull_attributes_for_entity(&self, entity: Entid, attributes: A) -> Result + where A: IntoIterator { + pull_attributes_for_entity(&self.schema, &*(self.transaction), entity, attributes) + .map_err(|e| e.into()) + } +} + impl<'a, 'c> HasSchema for InProgressRead<'a, 'c> { fn entid_for_type(&self, t: ValueType) -> Option { self.0.entid_for_type(t) @@ -630,6 +679,19 @@ impl Queryable for Store { } } +impl Pullable for Store { + fn pull_attributes_for_entities(&self, entities: E, attributes: A) -> Result>> + where E: IntoIterator, + A: IntoIterator { + self.conn.pull_attributes_for_entities(&self.sqlite, entities, attributes) + } + + fn pull_attributes_for_entity(&self, entity: Entid, attributes: A) -> Result + where A: IntoIterator { + self.conn.pull_attributes_for_entity(&self.sqlite, entity, attributes) + } +} + #[derive(Clone, Copy, Debug, Eq, PartialEq)] pub enum CacheDirection { Forward, @@ -756,6 +818,29 @@ impl Conn { inputs) } + pub fn pull_attributes_for_entities(&self, + sqlite: &rusqlite::Connection, + entities: E, + attributes: A) -> Result>> + where E: IntoIterator, + A: IntoIterator { + let metadata = self.metadata.lock().unwrap(); + let schema = &*metadata.schema; + pull_attributes_for_entities(schema, sqlite, entities, attributes) + .map_err(|e| e.into()) + } + + pub fn pull_attributes_for_entity(&self, + sqlite: &rusqlite::Connection, + entity: Entid, + attributes: A) -> Result + where A: IntoIterator { + let metadata = self.metadata.lock().unwrap(); + let schema = &*metadata.schema; + pull_attributes_for_entity(schema, sqlite, entity, attributes) + .map_err(|e| e.into()) + } + pub fn lookup_values_for_attribute(&self, sqlite: &rusqlite::Connection, entity: Entid, diff --git a/src/errors.rs b/src/errors.rs index ed405559..42ea0c1c 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -26,6 +26,7 @@ use mentat_query; use mentat_query_algebrizer; use mentat_query_parser; use mentat_query_projector; +use mentat_query_pull; use mentat_query_translator; use mentat_sql; use mentat_tolstoy; @@ -48,6 +49,7 @@ error_chain! { QueryError(mentat_query_algebrizer::Error, mentat_query_algebrizer::ErrorKind); // Let's not leak the term 'algebrizer'. QueryParseError(mentat_query_parser::Error, mentat_query_parser::ErrorKind); ProjectorError(mentat_query_projector::errors::Error, mentat_query_projector::errors::ErrorKind); + PullError(mentat_query_pull::errors::Error, mentat_query_pull::errors::ErrorKind); TranslatorError(mentat_query_translator::Error, mentat_query_translator::ErrorKind); SqlError(mentat_sql::Error, mentat_sql::ErrorKind); TxParseError(mentat_tx_parser::Error, mentat_tx_parser::ErrorKind); diff --git a/src/lib.rs b/src/lib.rs index d9facc6f..7bc81013 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -27,6 +27,7 @@ extern crate mentat_query; extern crate mentat_query_algebrizer; extern crate mentat_query_parser; extern crate mentat_query_projector; +extern crate mentat_query_pull; extern crate mentat_query_translator; extern crate mentat_sql; extern crate mentat_tolstoy; @@ -124,6 +125,7 @@ pub use conn::{ Conn, InProgress, Metadata, + Pullable, Queryable, Syncable, Store, diff --git a/src/query.rs b/src/query.rs index 207b5502..384ef99e 100644 --- a/src/query.rs +++ b/src/query.rs @@ -93,6 +93,8 @@ pub enum PreparedQuery<'sqlite> { }, Bound { statement: rusqlite::Statement<'sqlite>, + schema: Schema, + connection: &'sqlite rusqlite::Connection, args: Vec<(String, Rc)>, projector: Box, }, @@ -107,11 +109,10 @@ impl<'sqlite> PreparedQuery<'sqlite> { &mut PreparedQuery::Constant { ref select } => { select.project_without_rows().map_err(|e| e.into()) }, - &mut PreparedQuery::Bound { ref mut statement, ref args, ref projector } => { + &mut PreparedQuery::Bound { ref mut statement, ref schema, ref connection, ref args, ref projector } => { let rows = run_statement(statement, args)?; - projector - .project(rows) - .map_err(|e| e.into()) + projector.project(schema, connection, rows) + .map_err(|e| e.into()) } } } @@ -208,7 +209,7 @@ fn fetch_values<'sqlite> let algebrized = algebrize_query(known, query, None)?; - run_algebrized_query(sqlite, algebrized) + run_algebrized_query(known, sqlite, algebrized) } fn lookup_attribute(schema: &Schema, attribute: &NamespacedKeyword) -> Result { @@ -327,7 +328,10 @@ fn algebrize_query_str<'query, T> algebrize_query(known, parsed, inputs) } -fn run_algebrized_query<'sqlite>(sqlite: &'sqlite rusqlite::Connection, algebrized: AlgebraicQuery) -> QueryExecutionResult { +fn run_algebrized_query<'sqlite> +(known: Known, + sqlite: &'sqlite rusqlite::Connection, + algebrized: AlgebraicQuery) -> QueryExecutionResult { assert!(algebrized.unbound_variables().is_empty(), "Unbound variables should be checked by now"); if algebrized.is_known_empty() { @@ -335,17 +339,19 @@ fn run_algebrized_query<'sqlite>(sqlite: &'sqlite rusqlite::Connection, algebriz return Ok(QueryOutput::empty(&algebrized.find_spec)); } - let select = query_to_select(algebrized)?; + let select = query_to_select(known.schema, algebrized)?; match select { - ProjectedSelect::Constant(constant) => constant.project_without_rows() - .map_err(|e| e.into()), + ProjectedSelect::Constant(constant) => { + constant.project_without_rows() + .map_err(|e| e.into()) + }, ProjectedSelect::Query { query, projector } => { let SQLQuery { sql, args } = query.to_sql_query()?; let mut statement = sqlite.prepare(sql.as_str())?; let rows = run_statement(&mut statement, &args)?; - projector.project(rows).map_err(|e| e.into()) + projector.project(known.schema, sqlite, rows).map_err(|e| e.into()) }, } } @@ -365,7 +371,7 @@ pub fn q_once<'sqlite, 'query, T> where T: Into> { let algebrized = algebrize_query_str(known, query, inputs)?; - run_algebrized_query(sqlite, algebrized) + run_algebrized_query(known, sqlite, algebrized) } /// Just like `q_once`, but doesn't use any cached values. @@ -379,12 +385,12 @@ pub fn q_uncached<'sqlite, 'schema, 'query, T> let known = Known::for_schema(schema); let algebrized = algebrize_query_str(known, query, inputs)?; - run_algebrized_query(sqlite, algebrized) + run_algebrized_query(known, sqlite, algebrized) } -pub fn q_prepare<'sqlite, 'query, T> +pub fn q_prepare<'sqlite, 'schema, 'cache, 'query, T> (sqlite: &'sqlite rusqlite::Connection, - known: Known, + known: Known<'schema, 'cache>, query: &'query str, inputs: T) -> PreparedResult<'sqlite> where T: Into> @@ -405,7 +411,7 @@ pub fn q_prepare<'sqlite, 'query, T> }); } - let select = query_to_select(algebrized)?; + let select = query_to_select(known.schema, algebrized)?; match select { ProjectedSelect::Constant(constant) => { Ok(PreparedQuery::Constant { @@ -418,6 +424,8 @@ pub fn q_prepare<'sqlite, 'query, T> Ok(PreparedQuery::Bound { statement, + schema: known.schema.clone(), + connection: sqlite, args, projector: projector }) @@ -436,7 +444,7 @@ pub fn q_explain<'sqlite, 'query, T> if algebrized.is_known_empty() { return Ok(QueryExplanation::KnownEmpty(algebrized.cc.empty_because.unwrap())); } - match query_to_select(algebrized)? { + match query_to_select(known.schema, algebrized)? { ProjectedSelect::Constant(_constant) => Ok(QueryExplanation::KnownConstant), ProjectedSelect::Query { query, projector: _projector } => { let query = query.to_sql_query()?; diff --git a/tests/pull.rs b/tests/pull.rs new file mode 100644 index 00000000..dda56c10 --- /dev/null +++ b/tests/pull.rs @@ -0,0 +1,218 @@ +// Copyright 2018 Mozilla +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +#[macro_use] +extern crate mentat; +extern crate mentat_core; +extern crate mentat_query_pull; + +use std::collections::{ + BTreeMap, + BTreeSet, +}; + +use std::path::{ + Path, + PathBuf, +}; + +// TODO: re-export from Mentat. +use mentat_core::{ + Binding, + StructuredMap, + ValueRc, +}; + +use mentat::{ + Entid, + HasSchema, + IntoResult, + NamespacedKeyword, + Pullable, + Queryable, + QueryInputs, + Store, + RelResult, + TypedValue, +}; + +fn fixture_path(rest: &str) -> PathBuf { + let fixtures = Path::new("fixtures/"); + fixtures.join(Path::new(rest)) +} + +#[test] +fn test_simple_pull() { + let beacon; + let capitol; + let beacon_district; + let capitol_district; + + let mut store = Store::open("").expect("opened"); + { + let mut in_progress = store.begin_transaction().expect("began"); + in_progress.import(fixture_path("cities.schema")).expect("transacted schema"); + let report = in_progress.import(fixture_path("all_seattle.edn")).expect("transacted data"); + + // These tempid strings come out of all_seattle.edn. + beacon = *report.tempids.get(&"a17592186045471".to_string()).expect("beacon"); + beacon_district = *report.tempids.get(&"a17592186045450".to_string()).expect("beacon district"); + + capitol = *report.tempids.get(&"a17592186045439".to_string()).expect("capitol"); + capitol_district = *report.tempids.get(&"a17592186045438".to_string()).expect("capitol district"); + + in_progress.commit().expect("committed"); + } + + let schema = store.conn().current_schema(); + let district = schema.get_entid(&kw!(:neighborhood/district)).expect("district").0; + let name = schema.get_entid(&kw!(:neighborhood/name)).expect("name").0; + let attrs: BTreeSet = vec![district, name].into_iter().collect(); + + // Find Beacon Hill and Capitol Hill via query. + let query = r#"[:find [?hood ...] + :where + (or [?hood :neighborhood/name "Beacon Hill"] + [?hood :neighborhood/name "Capitol Hill"])]"#; + let hoods: BTreeSet = store.q_once(query, None) + .into_coll_result() + .expect("hoods") + .into_iter() + .map(|b| { + b.val().and_then(|tv| tv.into_entid()).expect("scalar") + }) + .collect(); + + println!("Hoods: {:?}", hoods); + + // The query and the tempids agree. + assert_eq!(hoods, vec![beacon, capitol].into_iter().collect()); + + // Pull attributes of those two neighborhoods. + let pulled = store.begin_read().expect("read") + .pull_attributes_for_entities(hoods, attrs) + .expect("pulled"); + + // Here's what we expect: + let c: StructuredMap = vec![ + (kw!(:neighborhood/name), "Capitol Hill".into()), + (kw!(:neighborhood/district), TypedValue::Ref(capitol_district)), + ].into(); + let b: StructuredMap = vec![ + (kw!(:neighborhood/name), "Beacon Hill".into()), + (kw!(:neighborhood/district), TypedValue::Ref(beacon_district)), + ].into(); + + let expected: BTreeMap>; + expected = vec![(capitol, c.into()), (beacon, b.into())].into_iter().collect(); + + assert_eq!(pulled, expected); + + // Now test pull inside the query itself. + let query = r#"[:find ?hood (pull ?district [:district/name :district/region]) + :where + (or [?hood :neighborhood/name "Beacon Hill"] + [?hood :neighborhood/name "Capitol Hill"]) + [?hood :neighborhood/district ?district] + :order ?hood]"#; + let results: RelResult = store.begin_read().expect("read") + .q_once(query, None) + .into_rel_result() + .expect("results"); + + let beacon_district: Vec<(NamespacedKeyword, TypedValue)> = vec![ + (kw!(:district/name), "Greater Duwamish".into()), + (kw!(:district/region), schema.get_entid(&NamespacedKeyword::new("region", "se")).unwrap().into()) + ]; + let beacon_district: StructuredMap = beacon_district.into(); + let capitol_district: Vec<(NamespacedKeyword, TypedValue)> = vec![ + (kw!(:district/name), "East".into()), + (kw!(:district/region), schema.get_entid(&NamespacedKeyword::new("region", "e")).unwrap().into()) + ]; + let capitol_district: StructuredMap = capitol_district.into(); + + let expected = RelResult { + width: 2, + values: vec![ + TypedValue::Ref(capitol).into(), capitol_district.into(), + TypedValue::Ref(beacon).into(), beacon_district.into(), + ].into(), + }; + assert_eq!(results, expected.clone()); + + // We can also prepare the query. + let reader = store.begin_read().expect("read"); + let mut prepared = reader.q_prepare(query, None).expect("prepared"); + + assert_eq!(prepared.run(None) + .into_rel_result() + .expect("results"), + expected); + + // Execute a scalar query where the body is constant. + // TODO: we shouldn't require `:where`; that makes this non-constant! + let query = r#"[:find (pull ?hood [:neighborhood/name]) . :in ?hood + :where [?hood :neighborhood/district _]]"#; + let result = reader.q_once(query, QueryInputs::with_value_sequence(vec![(var!(?hood), TypedValue::Ref(beacon))])) + .into_scalar_result() + .expect("success") + .expect("result"); + + let expected: StructuredMap = vec![(kw!(:neighborhood/name), TypedValue::from("Beacon Hill"))].into(); + assert_eq!(result, expected.into()); + + // Collect the names and regions of all districts. + let query = r#"[:find [(pull ?district [:district/name :district/region]) ...] + :where [_ :neighborhood/district ?district]]"#; + let results = reader.q_once(query, None) + .into_coll_result() + .expect("result"); + + let expected: Vec = vec![ + vec![(kw!(:district/name), TypedValue::from("East")), + (kw!(:district/region), TypedValue::Ref(65556))].into(), + vec![(kw!(:district/name), TypedValue::from("Southwest")), + (kw!(:district/region), TypedValue::Ref(65559))].into(), + vec![(kw!(:district/name), TypedValue::from("Downtown")), + (kw!(:district/region), TypedValue::Ref(65560))].into(), + vec![(kw!(:district/name), TypedValue::from("Greater Duwamish")), + (kw!(:district/region), TypedValue::Ref(65557))].into(), + vec![(kw!(:district/name), TypedValue::from("Ballard")), + (kw!(:district/region), TypedValue::Ref(65561))].into(), + vec![(kw!(:district/name), TypedValue::from("Northeast")), + (kw!(:district/region), TypedValue::Ref(65555))].into(), + vec![(kw!(:district/name), TypedValue::from("Southeast")), + (kw!(:district/region), TypedValue::Ref(65557))].into(), + vec![(kw!(:district/name), TypedValue::from("Northwest")), + (kw!(:district/region), TypedValue::Ref(65561))].into(), + vec![(kw!(:district/name), TypedValue::from("Central")), + (kw!(:district/region), TypedValue::Ref(65556))].into(), + vec![(kw!(:district/name), TypedValue::from("Delridge")), + (kw!(:district/region), TypedValue::Ref(65559))].into(), + vec![(kw!(:district/name), TypedValue::from("Lake Union")), + (kw!(:district/region), TypedValue::Ref(65560))].into(), + vec![(kw!(:district/name), TypedValue::from("Magnolia/Queen Anne")), + (kw!(:district/region), TypedValue::Ref(65560))].into(), + vec![(kw!(:district/name), TypedValue::from("North")), + (kw!(:district/region), TypedValue::Ref(65555))].into(), + ]; + + let expected: Vec = expected.into_iter().map(|m| m.into()).collect(); + assert_eq!(results, expected); + +} + +// TEST: +// - Constant query bodies in pull. +// - Values that are present in the cache (=> constant pull, too). +// - Pull for each kind of find spec. +// - That the keys in each map are ValueRc::ptr_eq. +// - Entity presence/absence when the pull expressions don't match anything. +// - Aliases. (No parser support yet.) diff --git a/tools/cli/src/mentat_cli/repl.rs b/tools/cli/src/mentat_cli/repl.rs index 5ed13c6e..119c7ebc 100644 --- a/tools/cli/src/mentat_cli/repl.rs +++ b/tools/cli/src/mentat_cli/repl.rs @@ -23,6 +23,10 @@ use time::{ PreciseTime, }; +use mentat_core::{ + StructuredMap, +}; + use mentat::{ CacheDirection, NamespacedKeyword, @@ -423,14 +427,14 @@ impl Repl { match query_output.results { QueryResults::Scalar(v) => { if let Some(val) = v { - writeln!(output, "| {}\t |", &self.binding_as_string(val))?; + writeln!(output, "| {}\t |", &self.binding_as_string(&val))?; } }, QueryResults::Tuple(vv) => { if let Some(vals) = vv { for val in vals { - write!(output, "| {}\t", self.binding_as_string(val))?; + write!(output, "| {}\t", self.binding_as_string(&val))?; } writeln!(output, "|")?; } @@ -438,14 +442,14 @@ impl Repl { QueryResults::Coll(vv) => { for val in vv { - writeln!(output, "| {}\t|", self.binding_as_string(val))?; + writeln!(output, "| {}\t|", self.binding_as_string(&val))?; } }, QueryResults::Rel(vvv) => { for vv in vvv { for v in vv { - write!(output, "| {}\t", self.binding_as_string(v))?; + write!(output, "| {}\t", self.binding_as_string(&v))?; } writeln!(output, "|")?; } @@ -512,26 +516,53 @@ impl Repl { Ok(report) } - fn binding_as_string(&self, value: Binding) -> String { + fn binding_as_string(&self, value: &Binding) -> String { use self::Binding::*; match value { - Scalar(v) => self.value_as_string(v), - Map(_) => format!("TODO"), - Vec(_) => format!("TODO"), + &Scalar(ref v) => self.value_as_string(v), + &Map(ref v) => self.map_as_string(v), + &Vec(ref v) => self.vec_as_string(v), } } - fn value_as_string(&self, value: TypedValue) -> String { + fn vec_as_string(&self, value: &Vec) -> String { + let mut out: String = "[".to_string(); + let vals: Vec = value.iter() + .map(|v| self.binding_as_string(v)) + .collect(); + + out.push_str(vals.join(", ").as_str()); + out.push_str("]"); + out + } + + fn map_as_string(&self, value: &StructuredMap) -> String { + let mut out: String = "{".to_string(); + let mut first = true; + for (k, v) in value.0.iter() { + if !first { + out.push_str(", "); + first = true; + } + out.push_str(&k.to_string()); + out.push_str(" "); + out.push_str(self.binding_as_string(v).as_str()); + } + out.push_str("}"); + out + } + + fn value_as_string(&self, value: &TypedValue) -> String { use self::TypedValue::*; match value { - Boolean(b) => if b { "true".to_string() } else { "false".to_string() }, - Double(d) => format!("{}", d), - Instant(i) => format!("{}", i), - Keyword(k) => format!("{}", k), - Long(l) => format!("{}", l), - Ref(r) => format!("{}", r), - String(s) => format!("{:?}", s.to_string()), - Uuid(u) => format!("{}", u), + &Boolean(b) => if b { "true".to_string() } else { "false".to_string() }, + &Double(d) => format!("{}", d), + &Instant(ref i) => format!("{}", i), + &Keyword(ref k) => format!("{}", k), + &Long(l) => format!("{}", l), + &Ref(r) => format!("{}", r), + &String(ref s) => format!("{:?}", s.to_string()), + &Uuid(ref u) => format!("{}", u), } } }