Implement aggregation. Fixes #39.

This commit is contained in:
Richard Newman 2016-08-23 20:48:15 -07:00
parent 1e04425287
commit 38cd30a895
6 changed files with 315 additions and 66 deletions

View file

@ -495,7 +495,7 @@
;; TODO: cache parts. parts looks like {:db.part/db {:start 0 :current 10}}. It maps between ;; TODO: cache parts. parts looks like {:db.part/db {:start 0 :current 10}}. It maps between
;; keyword ident part names and integer ranges. ;; keyword ident part names and integer ranges.
IDB IDB
(query-context [db] (context/->Context (datoms-source db) nil nil)) (query-context [db] (context/make-context (datoms-source db)))
(schema [db] (.-schema db)) (schema [db] (.-schema db))

View file

@ -40,14 +40,26 @@
(def sql-quoting-style :ansi) (def sql-quoting-style :ansi)
(defn context->sql-clause [context] (defn context->sql-clause [context]
(merge (let [inner
{:select (projection/sql-projection context) (merge
{:select (projection/sql-projection-for-relation context)
;; Always SELECT DISTINCT, because Datalog is set-based. ;; Always SELECT DISTINCT, because Datalog is set-based.
;; TODO: determine from schema analysis whether we can avoid ;; TODO: determine from schema analysis whether we can avoid
;; the need to do this. ;; the need to do this.
:modifiers [:distinct]} :modifiers [:distinct]}
(clauses/cc->partial-subquery (:cc context)))) (clauses/cc->partial-subquery (:cc context)))]
(if (:has-aggregates? context)
(merge
(when-not (empty? (:group-by-vars context))
;; We shouldn't need to account for types here, until we account for
;; `:or` clauses that bind from different attributes.
{:group-by (map util/var->sql-var (:group-by-vars context))})
{:select (projection/sql-projection-for-aggregation context :preag)
:modifiers [:distinct]
:from [:preag]
:with {:preag inner}})
inner)))
(defn context->sql-string [context args] (defn context->sql-string [context args]
(-> (->
@ -56,8 +68,9 @@
(sql/format args :quoting sql-quoting-style))) (sql/format args :quoting sql-quoting-style)))
(defn- validate-with [with] (defn- validate-with [with]
(when-not (nil? with) (when-not (or (nil? with)
(raise-str "`with` not supported."))) (every? #(instance? Variable %1) with))
(raise "Complex :with not supported." {:with with})))
(defn- validate-in [in] (defn- validate-in [in]
(when (nil? in) (when (nil? in)
@ -92,9 +105,13 @@
(validate-with with) (validate-with with)
(validate-in in) (validate-in in)
(let [external-bindings (in->bindings in) (let [external-bindings (in->bindings in)
known-types {}] elements (:elements find)
known-types {}
group-by-vars (projection/extract-group-by-vars elements with)]
(assoc context (assoc context
:elements (:elements find) :elements elements
:group-by-vars group-by-vars
:has-aggregates? (not (nil? group-by-vars))
:cc (clauses/patterns->cc (:default-source context) where known-types external-bindings))))) :cc (clauses/patterns->cc (:default-source context) where known-types external-bindings)))))
(defn find->sql-clause (defn find->sql-clause
@ -122,7 +139,7 @@
#_ #_
(datomish.query/find->sql-string (datomish.query/find->sql-string
(datomish.query.context/->Context (datomish.query.source/datoms-source nil) nil nil) (datomish.query.context/make-context (datomish.query.source/datoms-source nil))
(datomish.query/parse (datomish.query/parse
'[:find ?timestampMicros ?page :in $ ?latest :where '[:find ?timestampMicros ?page :in $ ?latest :where
[?page :page/starred true ?t] [?page :page/starred true ?t]
@ -132,7 +149,7 @@
#_ #_
(datomish.query/find->sql-string (datomish.query/find->sql-string
(datomish.query.context/->Context (datomish.query.source/datoms-source nil) nil nil) (datomish.query.context/make-context (datomish.query.source/datoms-source nil))
(datomish.query/parse (datomish.query/parse
'[:find ?page :in $ ?latest :where '[:find ?page :in $ ?latest :where
[?page :page/url "http://example.com/"] [?page :page/url "http://example.com/"]

View file

@ -2,8 +2,18 @@
;; License, v. 2.0. If a copy of the MPL was not distributed with this ;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/. ;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;; A context, very simply, holds on to a default source. Eventually ;; A context, very simply, holds on to a default source and some knowledge
;; it'll also do projection and similar transforms. ;; needed for aggregation.
(ns datomish.query.context) (ns datomish.query.context)
(defrecord Context [default-source elements cc]) (defrecord Context
[
default-source
elements ; The :find list itself.
has-aggregates?
group-by-vars ; A list of variables from :find and :with, used to generate GROUP BY.
cc ; The main conjoining clause.
])
(defn make-context [source]
(->Context source nil false nil nil))

View file

@ -4,20 +4,114 @@
(ns datomish.query.projection (ns datomish.query.projection
(:require (:require
[honeysql.core :as sql]
[datomish.query.source :as source] [datomish.query.source :as source]
[datomish.sqlite-schema :as ss] [datomish.sqlite-schema :as ss]
[datomish.util :as util #?(:cljs :refer-macros :clj :refer) [raise-str cond-let]] [datomish.util :as util #?(:cljs :refer-macros :clj :refer) [raise raise-str cond-let]]
[datascript.parser :as dp [datascript.parser :as dp
#?@(:cljs [:refer [Pattern DefaultSrc Variable Constant Placeholder]])] #?@(:cljs [:refer [Aggregate Pattern DefaultSrc Variable Constant Placeholder PlainSymbol]])]
) )
#?(:clj (:import [datascript.parser Pattern DefaultSrc Variable Constant Placeholder])) #?(:clj (:import [datascript.parser Aggregate Pattern DefaultSrc Variable Constant Placeholder PlainSymbol]))
) )
(defn lookup-variable [cc variable] (defn lookup-variable [cc variable]
(or (-> cc :bindings variable first) (or (-> cc :bindings variable first)
(raise-str "Couldn't find variable " variable))) (raise-str "Couldn't find variable " variable)))
(defn sql-projection (def aggregate-functions
{:avg :avg
:count :count
:max :max
:min :min
:sum :total
})
(defn- aggregate-symbols->projected-var [fn-symbol var-symbol]
(keyword (str "_" (name fn-symbol) "_" (subs (name var-symbol) 1))))
(defn- aggregate->projected-var [elem]
(aggregate-symbols->projected-var (:symbol (:fn elem))
(:symbol (first (:args elem)))))
(defn simple-aggregate?
"If `elem` is a simple aggregate -- symbolic function, one var arg --
return the variable symbol."
[elem]
(when (instance? Aggregate elem)
(let [{:keys [fn args]} elem]
(when (and (instance? PlainSymbol fn)
(= 1 (count args)))
(let [arg (first args)]
(when (instance? Variable arg)
(:symbol arg)))))))
(defn- aggregate->var [elem]
(when (instance? Aggregate elem)
(when-not (simple-aggregate? elem)
(raise-str "Only know how to handle simple aggregates."))
(:symbol (first (:args elem)))))
(defn- variable->var [elem]
(when (instance? Variable elem)
(:symbol elem)))
(defn- aggregate->projection [elem context lookup-fn]
(when (instance? Aggregate elem)
(when-not (simple-aggregate? elem)
(raise-str "Only know how to handle simple aggregates."))
(let [var-symbol (:symbol (first (:args elem)))
fn-symbol (:symbol (:fn elem))
lookup-var (lookup-fn var-symbol)
aggregate-fn (get aggregate-functions (keyword fn-symbol))]
(when-not aggregate-fn
(raise-str "Unknown aggregate function " fn-symbol))
(let [funcall-var (util/aggregate->sql-var aggregate-fn lookup-var)
project-as (aggregate-symbols->projected-var fn-symbol var-symbol)]
[[funcall-var project-as]]))))
(defn- type-projection
"Produce a projection pair by looking up `var` in the provided
`extracted-types`."
[extracted-types var]
(when-let [t (get extracted-types var)]
[t (util/var->sql-type-var var)]))
(defn- aggregate-type-projection
"Produce a passthrough projection pair for a type field
in an inner query."
[inner var]
(let [type-var (util/var->sql-type-var var)]
[(sql/qualify inner type-var) type-var]))
(defn- symbol->projection
"Given a variable symbol, produce a projection pair.
`lookup-fn` will be used to find a column. For a non-aggregate query,
this will typically be a lookup into the CC's bindings. For an
aggregate query it'll be a qualification of the same var into the
subquery.
`known-types` is a type map to decide whether to project a type tag.
`type-projection-fn` is like `lookup-fn` but for type tag columns."
[var lookup-fn known-types type-projection-fn]
(let [lookup-var (lookup-fn var)
projected-var (util/var->sql-var var)
var-projection [lookup-var projected-var]]
;; If the type of a variable isn't explicitly known, we also select
;; its type column so we can transform it.
(if-let [type-proj (when (not (contains? known-types var))
(type-projection-fn var))]
[var-projection type-proj]
[var-projection])))
(defn- variable->projection [elem lookup-fn known-types type-projection-fn]
(when (instance? Variable elem)
(symbol->projection (:symbol elem) lookup-fn known-types type-projection-fn)))
(defn sql-projection-for-relation
"Take a `find` clause's `:elements` list and turn it into a SQL "Take a `find` clause's `:elements` list and turn it into a SQL
projection clause, suitable for passing as a `:select` clause to projection clause, suitable for passing as a `:select` clause to
honeysql. honeysql.
@ -34,68 +128,114 @@
[[:datoms12.e :foo] [:datoms13.e :bar]] [[:datoms12.e :foo] [:datoms13.e :bar]]
Note that we also look at `:group-by-vars`, because we need to
alias columns and apply `DISTINCT` to those columns in order to
aggregate correctly.
This function unpacks aggregate operations, instead selecting the var.
@param context A Context, containing elements. @param context A Context, containing elements.
@return a sequence of pairs." @return a sequence of pairs."
[context] [context]
(let [elements (:elements context) (let [{:keys [group-by-vars elements cc]} context
cc (:cc context) {:keys [known-types extracted-types]} cc]
known-types (:known-types cc)
extracted-types (:extracted-types cc)]
(when-not (every? #(instance? Variable %1) elements) ;; The primary projections from the :find list.
(raise-str "Unable to :find non-variables.")) ;; Note that deduplication will be necessary, because we unpack aggregates.
(let [projected-vars
(map (fn [elem]
(or (aggregate->var elem)
(variable->var elem)
(raise "Only able to :find variables or aggregates."
{:elem elem})))
elements)
;; If the type of a variable isn't explicitly known, we also select ;; If we have any GROUP BY requirements from :with, that aren't already
;; its type column so we can transform it. ;; included in the above, project them now.
additional-vars
(clojure.set/difference
(set group-by-vars)
(set projected-vars))
full-var-list
(distinct (concat projected-vars additional-vars))
type-proj-fn
(partial type-projection extracted-types)
lookup-fn
(partial lookup-variable cc)]
(mapcat (fn [var]
(symbol->projection var lookup-fn known-types type-proj-fn))
full-var-list))))
(defn sql-projection-for-aggregation
"Project an element list that contains aggregates. This expects a subquery
aliased to `inner-table` which itself will project each var with the
correct name."
[context inner-table]
(let [{:keys [group-by-vars elements cc]} context
{:keys [known-types extracted-types]} cc
lookup-fn (fn [var]
(sql/qualify inner-table (util/var->sql-var var)))
type-proj-fn (partial aggregate-type-projection inner-table)]
(mapcat (fn [elem] (mapcat (fn [elem]
(let [var (:symbol elem) (or (variable->projection elem lookup-fn known-types type-proj-fn)
lookup-var (lookup-variable cc var) (aggregate->projection elem context lookup-fn)
projected-var (util/var->sql-var var) (raise "Only able to :find variables or aggregates."
var-projection [lookup-var projected-var]] {:elem elem})))
(if (or (contains? known-types var) elements)))
(not (contains? extracted-types var)))
[var-projection]
[var-projection [(get extracted-types var)
(util/var->sql-type-var var)]])))
elements)))
(defn make-projectors-for-columns [elements known-types extracted-types] (defn make-projectors-for-columns [elements known-types extracted-types]
{:pre [(map? extracted-types) {:pre [(map? extracted-types)
(map? known-types)]} (map? known-types)]}
(map (fn [elem] (letfn [(variable->projector [elem known-types extracted-types tag-decoder]
(let [var (:symbol elem) (when (instance? Variable elem)
projected-var (util/var->sql-var var) (let [var (:symbol elem)
tag-decoder (memoize projected-var (util/var->sql-var var)]
(fn [tag]
(partial ss/<-tagged-SQLite tag)))]
(if-let [type (get known-types var)] (if-let [type (get known-types var)]
;; We know the type! We already know how to decode it. ;; We know the type! We already know how to decode it.
;; TODO: most of these tags don't actually require calling through to <-tagged-SQLite. ;; TODO: most of these tags don't actually require calling through to <-tagged-SQLite.
;; TODO: optimize this without making it horrible. ;; TODO: optimize this without making it horrible.
(let [decoder (tag-decoder (ss/->tag type))] (let [decoder (tag-decoder (ss/->tag type))]
(fn [row] (fn [row]
(decoder (get row projected-var)))) (decoder (get row projected-var))))
;; We don't know the type. Find the type projection column ;; We don't know the type. Find the type projection column
;; and use it to decode the value. ;; and use it to decode the value.
(if (contains? extracted-types var) (if (contains? extracted-types var)
(let [type-column (util/var->sql-type-var var)] (let [type-column (util/var->sql-type-var var)]
(fn [row] (fn [row]
(ss/<-tagged-SQLite (ss/<-tagged-SQLite
(get row type-column) (get row type-column)
(get row projected-var)))) (get row projected-var))))
;; We didn't extract a type and we don't know it in advance. ;; We didn't extract a type and we don't know it in advance.
;; Just pass through; the :col will look itself up in the row. ;; Just pass through; the :col will look itself up in the row.
projected-var)))) projected-var)))))
elements))
;; For now we assume numerics and that everything will shake out in the wash.
(aggregate->projector [elem]
(when (instance? Aggregate elem)
(let [var (aggregate->projected-var elem)]
(fn [row]
(get row var)))))]
(let [tag-decoder (memoize
(fn [tag]
(partial ss/<-tagged-SQLite tag)))]
(map (fn [elem]
(or (variable->projector elem known-types extracted-types tag-decoder)
(aggregate->projector elem)))
elements))))
(defn row-pair-transducer [context] (defn row-pair-transducer [context]
(let [{:keys [elements cc]} context (let [{:keys [elements cc]} context
{:keys [source known-types extracted-types]} cc {:keys [source known-types extracted-types]} cc
;; We know the projection will fail above if these aren't simple variables. ;; We know the projection will fail above if these aren't simple variables or aggregates.
projectors projectors
(make-projectors-for-columns elements known-types extracted-types)] (make-projectors-for-columns elements known-types extracted-types)]
@ -104,3 +244,29 @@
(if err (if err
[row err] [row err]
[(map (fn [projector] (projector row)) projectors) nil]))))) [(map (fn [projector] (projector row)) projectors) nil])))))
(defn extract-group-by-vars
"Take inputs to :find and, if any aggregates exist in `elements`,
return the variable names upon which we should GROUP BY."
[elements with]
(when (some #(instance? Aggregate %1) elements)
(loop [ignore #{}
group-by (map :symbol with)
e elements]
(if-let [element (first e)]
(if-let [aggregated-var (simple-aggregate? element)]
(recur (conj ignore aggregated-var)
group-by
(rest e))
(if (instance? Variable element)
(let [var (:symbol element)]
(recur ignore
(if (contains? ignore var)
group-by
(conj group-by var))
(rest e)))
(raise-str "Unknown element." {:element element})))
;; Done. Remove any later vars we saw.
(remove ignore group-by)))))

View file

@ -46,6 +46,11 @@
(keyword (subs (name x) 1)) (keyword (subs (name x) 1))
(throw (ex-info (str x " is not a Datalog var.") {})))) (throw (ex-info (str x " is not a Datalog var.") {}))))
(defn aggregate->sql-var
"Turns (:max 'column) into :%max.column."
[fn-kw x]
(keyword (str "%" (name fn-kw) "." (name x))))
(defn concat-in (defn concat-in
{:static true} {:static true}
[m [k & ks] vs] [m [k & ks] vs]

View file

@ -91,6 +91,24 @@
:db/valueType :db.type/boolean :db/valueType :db.type/boolean
:db/cardinality :db.cardinality/one}]) :db/cardinality :db.cardinality/one}])
(def aggregate-schema
[{:db/id (d/id-literal :db.part/user)
:db.install/_attribute :db.part/db
:db/ident :page/url
:db/valueType :db.type/string
:db/unique :db.unique/identity
:db/cardinality :db.cardinality/one}
{:db/id (d/id-literal :db.part/user)
:db.install/_attribute :db.part/db
:db/ident :foo/points
:db/valueType :db.type/long
:db/cardinality :db.cardinality/many}
{:db/id (d/id-literal :db.part/user)
:db.install/_attribute :db.part/db
:db/ident :foo/visitedAt
:db/valueType :db.type/instant
:db/cardinality :db.cardinality/many}])
(def schema-with-page (def schema-with-page
(concat (concat
simple-schema simple-schema
@ -101,7 +119,7 @@
:table-alias (comp (make-predictable-gensym) name))) :table-alias (comp (make-predictable-gensym) name)))
(defn conn->context [conn] (defn conn->context [conn]
(context/->Context (mock-source (d/db conn)) nil nil)) (context/make-context (mock-source (d/db conn))))
(defn- expand [find conn] (defn- expand [find conn]
(let [context (conn->context conn) (let [context (conn->context conn)
@ -443,3 +461,36 @@
'[:find ?page ?thing :in $ :where '[:find ?page ?thing :in $ :where
[?page _ ?thing]] [?page _ ?thing]]
conn))))) conn)))))
(deftest-db test-aggregates conn
(let [attrs (<? (<initialize-with-schema conn aggregate-schema))
context
(populate '[:find ?date (max ?v)
:with ?e
:in $ ?then
:where
[?e :foo/visitedAt ?date]
[(> ?date ?then)]
[?e :foo/points ?v]] conn)]
(is (= (:group-by-vars context)
['?date '?e]))
(is (= {:select '([:preag.date :date]
[:%max.preag.v :_max_v])
:modifiers [:distinct]
:group-by '(:date :e),
:with {:preag
{:select '([:datoms0.v :date]
[:datoms1.v :v]
[:datoms0.e :e]), ; Because we need to group on it.
:modifiers [:distinct],
:from '([:datoms datoms0] [:datoms datoms1]),
:where (list
:and
[:= :datoms0.a (:foo/visitedAt attrs)]
(list :> :datoms0.v (sql/param :then))
[:= :datoms1.a (:foo/points attrs)]
[:= :datoms0.e :datoms1.e])}}
:from [:preag]}
(query/context->sql-clause context)))))