Initial work on executing queries. r=nalexander

Signed-off-by: Richard Newman <rnewman@twinql.com>
This commit is contained in:
Richard Newman 2016-07-15 09:00:49 -07:00
parent 437a80a978
commit d695554123
5 changed files with 172 additions and 23 deletions

39
src/datomish/exec.cljc Normal file
View file

@ -0,0 +1,39 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
(ns datomish.exec
#?(:cljs
(:require-macros
[datomish.util :refer [while-let]]
[datomish.pair-chan :refer [go-pair <?]]
[cljs.core.async.macros :refer [go]]))
(:require
[datomish.sqlite :as s]
[datomish.sqlite-schema :as ss]
[datomish.query :as dq]
#?@(:clj
[[datomish.jdbc-sqlite]
[datomish.pair-chan :refer [go-pair <?]]
[datomish.util :refer [while-let]]
[clojure.core.async :refer
[go ; macro in cljs.
<! >! chan close! take!]]])
#?@(:cljs
[[datomish.promise-sqlite]
[datomish.pair-chan]
[datomish.util]
[cljs.core.async :as a :refer
[<! >! chan close! take!]]])))
(defn <?run
"Execute the provided query on the provided DB.
Returns a transduced channel of [result err] pairs.
Closes the channel when fully consumed."
[db find]
(let [context (dq/find->prepared-context (dq/parse find))
row-pair-transducer (dq/row-pair-transducer context (dq/sql-projection context))
chan (chan 50 row-pair-transducer)]
(s/<?all-rows db (dq/context->sql-string context) chan)
chan))

View file

@ -0,0 +1,69 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
(ns datomish.exec-repl
#?(:cljs
(:require-macros
[datomish.util :refer [while-let]]
[datomish.pair-chan :refer [go-pair <?]]
[cljs.core.async.macros :refer [go]]))
(:require
[datomish.exec :as exec]
[datomish.sqlite :as s]
[datomish.sqlite-schema :as ss]
[datomish.query :as dq]
#?@(:clj
[[datomish.jdbc-sqlite]
[datomish.pair-chan :refer [go-pair <?]]
[datomish.util :refer [while-let]]
[clojure.core.async]])
#?@(:cljs
[[datomish.promise-sqlite]
[datomish.pair-chan]
[datomish.util]])))
#?(:clj
(defn pair-channel->lazy-seq
"Returns a blocking lazy sequence of items taken from the provided channel."
[channel]
(lazy-seq
(when-let [v (clojure.core.async/<!! channel)]
(if (second v)
(cons v nil)
(cons v (channel->lazy-seq channel)))))))
#?(:clj
(defn run-to-pair-seq
"Given an open database, returns a lazy sequence of results.
When fully consumed, underlying resources will be released."
[db find]
(pair-channel->lazy-seq (exec/<?run db find))))
#_(defn xxopen []
(datomish.pair-chan/go-pair
(let [d (datomish.pair-chan/<? (s/<sqlite-connection "/tmp/foo.sqlite"))]
(clojure.core.async/<!! (ss/<ensure-current-version d))
(def db d))))
;; With an open DB…
#_(datomish.exec/run-to-pair-seq
db
'[:find ?page :in $ :where [?page :page/starred true ?t]])
;; In a Clojure REPL with no open DB…
#_(clojure.core.async/<!!
(datomish.exec-repl/<open-and-run-to-seq-promise
"/tmp/foo.sqlite"
'[:find ?page :in $ :where [?page :page/starred true ?t]]))
#_(defn test-cljs []
(datomish.pair-chan/go-pair
(let [d (datomish.pair-chan/<? (s/<sqlite-connection "/tmp/foo.sqlite"))]
(cljs.core.async/<! (ss/<ensure-current-version d))
(let [chan (exec/<?run d
'[:find ?page :in $ :where [?page :page/starred true ?t]])]
(println (datomish.pair-chan/<? chan))
(println (datomish.pair-chan/<? chan))
(println (datomish.pair-chan/<? chan))))))

View file

@ -16,11 +16,34 @@
[then else] [then else]
(if (cljs-env? &env) then else)) (if (cljs-env? &env) then else))
(defmacro go-safely [[chan chan-form] & body]
"Evaluate `body` forms in a `go` block. Binds `chan-form` to `chan`.
`chan-form` must evaluate to an error-channel.
If `body` throws, the exception is propagated into `chan` and `chan` is closed.
Returns `chan`."
`(if-cljs
(let [~chan ~chan-form]
(cljs.core.async.macros/go
(try
(do ~@body)
(catch js/Error ex#
(cljs.core.async/>! ~chan [nil ex#]))))
~chan)
(let [~chan ~chan-form]
(clojure.core.async/go
(try
(do ~@body)
(catch Exception ex#
(clojure.core.async/>! ~chan [nil ex#]))))
~chan)))
;; It's a huge pain to declare cross-environment macros. This is awful, but making the namespace a ;; It's a huge pain to declare cross-environment macros. This is awful, but making the namespace a
;; parameter appears to be *even worse*. Note also that `go` is not in a consistent namespace... ;; parameter appears to be *even worse*. Note also that `go` is not in a consistent namespace...
(defmacro go-pair [& body] (defmacro go-pair [& body]
"Evaluate `body` forms in a `go` block. Catch errors and return a "Evaluate `body` forms in a `go` block to yield a result.
pair chan (a promise channel resolving to `[result error]`)." Catch errors during evaluation.
Return a promise channel that yields a pair: the result (or nil), and any
error thrown (or nil)."
`(if-cljs `(if-cljs
(let [pc-chan# (cljs.core.async/promise-chan)] (let [pc-chan# (cljs.core.async/promise-chan)]
(cljs.core.async.macros/go (cljs.core.async.macros/go

View file

@ -160,16 +160,14 @@
[(lookup-variable context var) (var->sql-var var)])) [(lookup-variable context var) (var->sql-var var)]))
elements))) elements)))
(defn row-transducer [context projection rf] (defn row-pair-transducer [context projection]
;; For now, we only support straight var lists, so ;; For now, we only support straight var lists, so
;; our transducer is trivial. ;; our transducer is trivial.
(let [columns-in-order (map second projection) (let [columns-in-order (map second projection)]
row-mapper (fn [row] (map columns-in-order row))] (map (fn [[row err]]
(fn (if err
([] (rf)) [row err]
([result] (rf result)) [(map row columns-in-order) nil])))))
([result input]
(rf result (row-mapper input))))))
(defn context->sql-clause [context] (defn context->sql-clause [context]
{:select (sql-projection context) {:select (sql-projection context)
@ -178,6 +176,12 @@
nil nil
(cons :and (:wheres context)))}) (cons :and (:wheres context)))})
(defn context->sql-string [context]
(->
context
context->sql-clause
(sql/format :quoting sql-quoting-style)))
(defn- validate-with [with] (defn- validate-with [with]
(when-not (nil? with) (when-not (nil? with)
(raise "`with` not supported."))) (raise "`with` not supported.")))
@ -206,13 +210,15 @@
;; There's some confusing use of 'where' and friends here. That's because ;; There's some confusing use of 'where' and friends here. That's because
;; the parsed Datalog includes :where, and it's also input to honeysql's ;; the parsed Datalog includes :where, and it's also input to honeysql's
;; SQL formatter. ;; SQL formatter.
(context->sql-clause (-> find find->prepared-context context->sql-clause))
(find->prepared-context find)))
(defn find->sql-string (defn find->sql-string
"Take a parsed `find` expression and turn it into SQL." "Take a parsed `find` expression and turn it into SQL."
[find] [find]
(-> find find->sql-clause (sql/format :quoting sql-quoting-style))) (->
find
find->sql-clause
(sql/format :quoting sql-quoting-style)))
(defn parse (defn parse
"Parse a Datalog query array into a structured `find` expression." "Parse a Datalog query array into a structured `find` expression."
@ -225,7 +231,7 @@
'[:find ?page :in $ :where [?page :page/starred true ?t] ]))) '[:find ?page :in $ :where [?page :page/starred true ?t] ])))
(comment (comment
(datomish.query/find->sql-string (datomish.query/find->prepared-context
(datomish.query/parse (datomish.query/parse
'[:find ?timestampMicros ?page '[:find ?timestampMicros ?page
:in $ :in $

View file

@ -5,16 +5,16 @@
(ns datomish.sqlite (ns datomish.sqlite
#?(:cljs #?(:cljs
(:require-macros (:require-macros
[datomish.pair-chan :refer [go-pair <?]] [datomish.pair-chan :refer [go-pair go-safely <?]]
[cljs.core.async.macros :refer [go]])) [cljs.core.async.macros :refer [go]]))
#?(:clj #?(:clj
(:require (:require
[datomish.pair-chan :refer [go-pair <?]] [datomish.pair-chan :refer [go-pair go-safely <?]]
[clojure.core.async :refer [go <! >!]]) [clojure.core.async :refer [go <! >! chan put! take! close!]])
:cljs :cljs
(:require (:require
[datomish.pair-chan] [datomish.pair-chan]
[cljs.core.async :as a :refer [<! >!]]))) [cljs.core.async :as a :refer [<! >! chan put! take! close!]])))
(defprotocol ISQLiteConnection (defprotocol ISQLiteConnection
(-execute! (-execute!
@ -49,11 +49,23 @@
(defn reduce-rows (defn reduce-rows
[db [sql & bindings] initial f] [db [sql & bindings] initial f]
(let [acc (atom initial)] (let [acc (atom initial)]
(go (go-pair
(let [[_ err] (<! (-each db sql bindings #(swap! acc f %)))] (<? (-each db sql bindings #(swap! acc f %)))
(if err @acc)))
[nil err]
[@acc nil]))))) (defn <?all-rows
"Takes a new channel, put!ing rows as [row err] pairs
into it as they arrive from storage. Closes the channel
when no more results exist. Consume with <?."
[db [sql & bindings :as rest] chan]
(go-safely [c chan]
(let [result (<! (-each db sql bindings (fn [row] (put! c [row nil]))))]
;; We assume that a failure will result in the promise
;; channel being rejected and no further row callbacks
;; being called.
(when (second result)
(put! result c))
(close! c))))
(defn all-rows (defn all-rows
[db [sql & bindings :as rest]] [db [sql & bindings :as rest]]