Process lookup-refs in batches. Fixes #25.

This uses a common table expression and multiple SQL calls rather than a
temporary table, since transactions with huge numbers of distinct
lookup-refs are likely to be very rare.

We mark lookup-refs with `lookup-ref`, which is a little awkward because
binding `(let [[a v] lookup-ref] ...)` doesn't directly work, but avoids
some ambiguity present in Datomic and DataScript around interpreting
lookup-refs as multiple value lists.  (Which bit the tests in an earlier
version of this patch!)
This commit is contained in:
Nick Alexander 2016-09-09 15:26:13 -07:00
parent 20531c1789
commit 611d44fcce
6 changed files with 178 additions and 43 deletions

View file

@ -32,6 +32,8 @@
(def id-literal db/id-literal) (def id-literal db/id-literal)
(def lookup-ref db/lookup-ref)
(def db transact/db) (def db transact/db)
(def entid db/entid) (def entid db/entid)

View file

@ -63,6 +63,23 @@
(defn id-literal? [x] (defn id-literal? [x]
(instance? TempId x)) (instance? TempId x))
(defrecord LookupRef [a v])
(defn lookup-ref
[a v]
(if (and
(or (keyword? a)
(integer? a))
v)
(->LookupRef a v)
(raise (str "Lookup-ref with bad attribute " a " or value " v
{:error :transact/bad-lookup-ref, :a a, :v v}))))
(defn lookup-ref? [x]
"Return `x` if `x` is like [:attr value], nil otherwise."
(when (instance? LookupRef x)
x))
(defprotocol IClock (defprotocol IClock
(now (now
[clock] [clock]
@ -105,6 +122,13 @@
[db a v] [db a v]
"Search for a single matching datom using the AVET index.") "Search for a single matching datom using the AVET index.")
(<avs
[db avs]
"Search for many matching datoms using the AVET index.
Take [[a0 v0] [a1 v1] ...] and return a map {[a0 v0] e0}. If no datom [e1 a1 v1] exists, the
key [a1 v1] is not present in the returned map.")
(<apply-entities (<apply-entities
[db tx entities] [db tx entities]
"Apply entities to the store, returning sequence of datoms transacted.") "Apply entities to the store, returning sequence of datoms transacted.")
@ -634,6 +658,61 @@
<? <?
yield-datom)))) yield-datom))))
(<avs
[db avs]
{:pre [(sequential? avs)]}
(go-pair
(let [schema
(.-schema db)
values-part
"(?, ?, ?, ?)"
repeater
(memoize (fn [n] (interpose ", " (repeat n values-part))))
exec
(partial s/execute! (:sqlite-connection db))
;; Map [a v] -> searchid.
av->searchid
(into {} (map vector avs (range)))
;; Each query takes 4 variables per item. So we partition into max-sql-vars / 4.
qs
(map
(fn [chunk]
(cons
;; Query string.
(apply str "WITH t(searchid, a, v, value_type_tag) AS (VALUES "
(apply str (repeater (count chunk))) ;; TODO: join?
") SELECT t.searchid, d.e
FROM t, datoms AS d
WHERE d.index_avet IS NOT 0 AND d.a = t.a AND d.value_type_tag = t.value_type_tag AND d.v = t.v")
;; Bindings.
(mapcat (fn [[[a v] searchid]]
(let [a (entid db a)
[v tag] (ds/->SQLite schema a v)]
[searchid a v tag]))
chunk)))
(partition-all (quot max-sql-vars 4) av->searchid))
;; Map searchid -> e. There's a generic reduce that takes [pair-chan] lurking in here.
searchid->e
(loop [coll {}
qs qs]
(let [[q & qs] qs]
(if q
(let [rs (<? (s/all-rows (:sqlite-connection db) q))
coll* (into coll (map (juxt :searchid :e)) rs)]
(recur coll* qs))
coll)))
]
(util/mapvals (partial get searchid->e) av->searchid))))
(<apply-entities [db tx entities] (<apply-entities [db tx entities]
{:pre [(db? db) (sequential? entities)]} {:pre [(db? db) (sequential? entities)]}
(-<apply-entities db tx entities)) (-<apply-entities db tx entities))

View file

@ -29,6 +29,15 @@
(filter #(not (= :db/txInstant (second %)))) (filter #(not (= :db/txInstant (second %))))
(set)))) (set))))
(defn <datoms>= [db tx]
(go-pair
(->>
(s/all-rows (:sqlite-connection db) ["SELECT e, a, v, tx FROM datoms WHERE tx >= ?" tx])
(<?)
(mapv #(vector (:e %) (db/ident db (:a %)) (:v %)))
(filter #(not (= :db/txInstant (second %))))
(set))))
(defn <datoms [db] (defn <datoms [db]
(<datoms-after db 0)) (<datoms-after db 0))

View file

@ -127,7 +127,7 @@
(db/entid db v))] (db/entid db v))]
(when (and a (not (integer? a))) (when (and a (not (integer? a)))
(raise "Unknown attribute " a (raise "Unknown attribute " a
{:form orig :attribute a})) {:form orig :attribute a :entity orig}))
[op e a v])) [op e a v]))
(defrecord Transaction [db tempids entities]) (defrecord Transaction [db tempids entities])
@ -250,49 +250,40 @@
;; Extract the current txInstant for the report. ;; Extract the current txInstant for the report.
(->> (update-txInstant db*))))) (->> (update-txInstant db*)))))
(defn- lookup-ref? [x]
"Return `x` if `x` is like [:attr value], false otherwise."
(and (sequential? x)
(= (count x) 2)
(or (keyword? (first x))
(integer? (first x)))
x))
(defn <resolve-lookup-refs [db report] (defn <resolve-lookup-refs [db report]
{:pre [(db/db? db) (report? report)]} {:pre [(db/db? db) (report? report)]}
(let [entities (:entities report)] (let [unique-identity? (memoize (partial ds/unique-identity? (db/schema db)))
;; TODO: meta. groups (group-by (partial keep db/lookup-ref?) (:entities report))
entities (get groups (lazy-seq)) ;; No lookup-refs? Pass through.
to-resolve (dissoc groups (lazy-seq)) ;; The ones with lookup-refs.
avs (set (map (juxt :a :v) (apply concat (keys to-resolve))))
->av (fn [r] ;; Conditional (juxt :a :v) that passes through nil.
(when r [(:a r) (:v r)]))]
(go-pair (go-pair
(if (empty? entities) (let [av->e (<? (db/<avs db avs))
report resolve1 (fn [field]
(assoc-in (if-let [[a v] (->av (db/lookup-ref? field))]
report [:entities] (if-not (unique-identity? (db/entid db a))
;; We can't use `for` because go-pair doesn't traverse function boundaries. (raise "Lookup-ref found with non-unique-identity attribute " a " and value " v
;; Apologies for the tortured nested loop. {:error :transact/lookup-ref-with-non-unique-identity-attribute
(loop [[op & entity] (first entities) :a a
next (rest entities) :v v})
acc []] (or
(if (nil? op) (get av->e [a v])
acc (raise "No entity found for lookup-ref with attribute " a " and value " v
(recur (first next) {:error :transact/lookup-ref-not-found
(rest next) :a a
(conj acc :v v})))
(loop [field (first entity) field))
rem (rest entity) resolve (fn [entity]
acc [op]] (mapv resolve1 entity))]
(if (nil? field) (assoc
acc report
(recur (first rem) :entities
(rest rem) (concat
(conj acc entities
(if-let [[a v] (lookup-ref? field)] (map resolve (apply concat (vals to-resolve)))))))))
(or
;; The lookup might fail! If so, throw.
(:e (<? (db/<av db a v)))
(raise "No entity found with attr " a " and val " v "."
{:a a :v v}))
field))))))))))))))
(declare <resolve-id-literals) (declare <resolve-id-literals)

View file

@ -47,6 +47,7 @@
(explode-entity-a-v db entity v straight-a eid) (explode-entity-a-v db entity v straight-a eid)
(and (map? v) (and (map? v)
(not (db/lookup-ref? v))
(not (db/id-literal? v))) (not (db/id-literal? v)))
;; Another entity is given as a nested map. ;; Another entity is given as a nested map.
(if (ds/ref? (db/schema db) straight-a*) (if (ds/ref? (db/schema db) straight-a*)

View file

@ -10,7 +10,7 @@
[cljs.core.async.macros :as a :refer [go]])) [cljs.core.async.macros :as a :refer [go]]))
(:require (:require
[datomish.api :as d] [datomish.api :as d]
[datomish.db.debug :refer [<datoms-after <transactions-after <shallow-entity <fulltext-values]] [datomish.db.debug :refer [<datoms-after <datoms>= <transactions-after <shallow-entity <fulltext-values]]
[datomish.util :as util #?(:cljs :refer-macros :clj :refer) [raise cond-let]] [datomish.util :as util #?(:cljs :refer-macros :clj :refer) [raise cond-let]]
[datomish.schema :as ds] [datomish.schema :as ds]
[datomish.simple-schema] [datomish.simple-schema]
@ -846,8 +846,61 @@
:db.install/_attribute :db.part/db}]] :db.install/_attribute :db.part/db}]]
(testing "Simple schemas are expanded." (testing "Simple schemas are expanded."
(is (= (map #(dissoc %1 :db/id) (datomish.simple-schema/simple-schema->schema in)) (is (= (map #(dissoc %1 :db/id) (datomish.simple-schema/simple-schema->schema in))
expected))))) expected)))))
(deftest-db test-lookup-refs conn
(let [{tx0 :tx} (<? (d/<transact! conn test-schema))
{tx1 :tx} (<? (d/<transact! conn [[:db/add 1 :name "Ivan"]
[:db/add 2 :name "Phil"]
[:db/add 3 :name "Petr"]]))]
(testing "Looks up entity refs"
(let [{tx :tx} (<? (d/<transact! conn [[:db/add (d/lookup-ref :name "Ivan") :aka "Devil"]
[:db/add (d/lookup-ref :name "Phil") :email "@1"]]))]
(is (= #{[1 :name "Ivan"]
[2 :name "Phil"]
[3 :name "Petr"]
[1 :aka "Devil"]
[2 :email "@1"]}
(<? (<datoms>= (d/db conn) tx1))))))
(testing "Looks up value refs"
(let [{tx :tx} (<? (d/<transact! conn [[:db/add 1 :friends (d/lookup-ref :name "Petr")]
[:db/add 3 :friends (d/lookup-ref :name "Ivan")]]))]
(is (= #{[1 :friends 3]
[3 :friends 1]}
(<? (<datoms>= (d/db conn) tx))))))
(testing "Looks up entity refs in maps"
(let [{tx :tx} (<? (d/<transact! conn [{:db/id (d/lookup-ref :name "Phil") :friends 1}]))]
(is (= #{[2 :friends 1]}
(<? (<datoms>= (d/db conn) tx))))))
(testing "Looks up value refs in maps"
(let [{tx :tx} (<? (d/<transact! conn [{:db/id 2 :friends (d/lookup-ref :name "Petr")}]))]
(is (= #{[2 :friends 3]}
(<? (<datoms>= (d/db conn) tx))))))
(testing "Looks up value refs in sequences in maps"
(let [{tx :tx} (<? (d/<transact! conn [{:db/id 1 :friends [(d/lookup-ref :name "Ivan") (d/lookup-ref :name "Phil")]}]))]
(is (= #{[1 :friends 1]
[1 :friends 2]}
(<? (<datoms>= (d/db conn) tx))))))
(testing "Fails for missing entities"
(is (thrown-with-msg?
ExceptionInfo #"No entity found for lookup-ref"
(<? (d/<transact! conn [[:db/add (d/lookup-ref :name "Mysterioso") :aka "The Magician"]]))))
(is (thrown-with-msg?
ExceptionInfo #"No entity found for lookup-ref"
(<? (d/<transact! conn [[:db/add 1 :friends (d/lookup-ref :name "Mysterioso")]])))))
(testing "Fails for non-identity attributes"
(is (thrown-with-msg?
ExceptionInfo #"Lookup-ref found with non-unique-identity attribute"
(<? (d/<transact! conn [[:db/add (d/lookup-ref :aka "The Magician") :email "@2"]]))))
(is (thrown-with-msg?
ExceptionInfo #"Lookup-ref found with non-unique-identity attribute"
(<? (d/<transact! conn [[:db/add 1 :friends (d/lookup-ref :aka "The Magician")]])))))))
#_ (time (t/run-tests)) #_ (time (t/run-tests))