Rewrite resolve-id-literals to use bulk <avs. (#88)

The metaphor we use is that of "evolution", where each "evolutionary
step" contains a number of different "generations".  Entities in the
process of being resolved are increasingly "evolved" into simpler
generations, until no further evolution is possible.
This commit is contained in:
Nick Alexander 2016-10-12 11:51:17 -07:00
parent 1c83287fcf
commit 39c909ec32
2 changed files with 223 additions and 72 deletions

View file

@ -121,10 +121,6 @@
(<bootstrapped? [db] (<bootstrapped? [db]
"Return true if this database has no transactions yet committed.") "Return true if this database has no transactions yet committed.")
(<av
[db a v]
"Search for a single matching datom using the AVET index.")
(<avs (<avs
[db avs] [db avs]
"Search for many matching datoms using the AVET index. "Search for many matching datoms using the AVET index.
@ -642,25 +638,6 @@
(:bootstrapped) (:bootstrapped)
(not= 0)))) (not= 0))))
(<av [db a v]
(let [schema (.-schema db) ;; TODO: understand why (schema db) fails.
a (entid db a)
[v tag] (ds/->SQLite schema a v)
yield-datom
(fn [rows]
(when-let [row (first rows)]
(row->Datom schema row)))]
(go-pair
(->>
;; TODO: generalize columns.
["SELECT e, a, v, tx, 1 AS added FROM all_datoms
WHERE index_avet = 1 AND a = ? AND value_type_tag = ? AND v = ?
LIMIT 1" a tag v]
(s/all-rows (:sqlite-connection db))
<?
yield-datom))))
(<avs (<avs
[db avs] [db avs]
{:pre [(sequential? avs)]} {:pre [(sequential? avs)]}

View file

@ -335,6 +335,221 @@
(defn- transact-entity [report entity] (defn- transact-entity [report entity]
(update-in report [:entities] conj entity)) (update-in report [:entities] conj entity))
(defn id-literal-generation [unique-identity? entity]
"Group entities possibly containing id-literals into 'generations'.
Entities are grouped into one of the following generations:
:upserts-ev - 'complex upserts' that look like [:db/add -1 a -2] where a is :db.unique/identity;
:upserts-e - 'simple upserts' that look like [:db/add -1 a v] where a is :db.unique/identity;
:allocations-{e,v,ev} - things like [:db/add -1 b v], [:db/add e b -2], or [:db/add -3 b -4] where
b is *not* :db.unique/identity, or like [:db/add e a -5] where a is :db.unique/identity;
:entities - not :db/add, or no id-literals."
{:pre [(sequential? entity)]}
(let [[op e a v] entity
e? (id-literal? e)
a? (id-literal? a)
v? (id-literal? v)]
(cond
(not= (first entity) :db/add) ;; TODO: verify no id-literals appear.
:entities
a?
(raise "id-literal attributes are not yet supported: " entity
{:error :transact/no-id-literal-attributes
:entity entity })
e?
(if (unique-identity? a)
(if v?
:upserts-ev
:upserts-e)
(if v?
:allocations-ev
:allocations-e))
v?
:allocations-v
true
:entities)))
(defn <resolve-upserts-e [db upserts-e]
"Given a sequence of :upserts-e, query the database to try to map them to existing entities.
Returns a map of id-literals to integer entids, with keys only those id-literals that mapped to
existing entities."
(go-pair
(if (seq upserts-e)
(let [->id-av (fn [[op id-literal a v]] [id-literal [a v]])
;; Like {[id-literal [[:a1 :v1] [:a2 :v2] ...]] ...}.
id->avs (util/group-by-kv ->id-av upserts-e)
;; Like [[:a1 :v1] [:a2 v2] ...].
avs (apply concat (vals id->avs))
;; Like {[[:a1 :v1] e1] ...}.
av->e (<? (db/<avs db avs))
avs->es (fn [avs] (set (keep (partial get av->e) avs)))
id->es (util/mapvals avs->es id->avs)]
(into {}
;; nil is dropped.
(map (fn [[id es]]
(when-let [e (first es)]
(when (second es)
(raise "Conflicting upsert: " id " resolves"
" to more than one entid " es
{:error :transact/upsert :tempid id :entids es}))
[id e])))
id->es)))))
(defn evolve-upserts-e [id->e upserts-e]
(let [evolve1
(fn [[op id-e a v]]
(let [e* (get id->e id-e)]
(if e*
[:upserted [op e* a v]]
[:allocations-e [op id-e a v]])))]
(util/group-by-kv evolve1 upserts-e)))
(defn evolve-upserts-ev [id->e upserts-ev]
"Given a map id->e of id-literals to integer entids, evolve the given entities. Returns a map
whose keys are generations and whose values are vectors of entities in those generations."
(let [evolve1
(fn [[op id-e a id-v]]
(let [e* (get id->e id-e)
v* (get id->e id-v)]
(if e*
(if v*
[:resolved [op e* a v*]]
[:allocations-v [op e* a id-v]])
(if v*
[:upserts-e [op id-e a v*]]
[:upserts-ev [op id-e a id-v]]))))]
(util/group-by-kv evolve1 upserts-ev)))
(defn evolve-allocations-e [id->e allocations-e]
"Given a map id->e of id-literals to integer entids, evolve the given entities. Returns a map
whose keys are generations and whose values are vectors of entities in those generations."
(let [evolve1
(fn [[op id-e a v]]
(let [e* (get id->e id-e)]
(if e*
[:resolved [op e* a v]]
[:allocations-e [op id-e a v]])))]
(util/group-by-kv evolve1 allocations-e)))
(defn evolve-allocations-v [id->e allocations-v]
"Given a map id->e of id-literals to integer entids, evolve the given entities. Returns a map
whose keys are generations and whose values are vectors of entities in those generations."
(let [evolve1
(fn [[op e a id-v]]
(let [v* (get id->e id-v)]
(if v*
[:resolved [op e a v*]]
[:allocations-v [op e a id-v]])))]
(util/group-by-kv evolve1 allocations-v)))
(defn evolve-allocations-ev [id->e allocations-ev]
"Given a map id->e of id-literals to integer entids, evolve the entities in allocations-ev. Returns a
map whose keys are generations and whose values are vectors of entities in those generations."
(let [evolve1
(fn [[op id-e a id-v]]
(let [e* (get id->e id-e)
v* (get id->e id-v)]
(if e*
(if v*
[:resolved [op e* a v*]]
[:allocations-v [op e* a id-v]])
(if v*
[:allocations-e [op id-e a v*]]
[:allocations-ev [op id-e a id-v]]))))]
(util/group-by-kv evolve1 allocations-ev)))
(defn <evolve [db evolution]
"Evolve a map of generations {:upserts-e [...], :upserts-ev [...], ...} as much as possible.
The algorithm is as follows.
First, resolve :upserts-e against the database. Some [a v] -> e will upsert; some will not.
Some :upserts-e evolve to become actual :upserts (they upserted!); any other :upserts-e evolve to
become :allocations-e (they did not upsert, and will not upsert this transaction).
Using the newly upserted id-literals, some :upserts-ev evolve to become :resolved;
some :upserts-ev evolve to become :upserts-e; and some :upserts-ev remain :upserts-ev.
Likewise, some :allocations-ev evolve to become :allocations-e, :allocations-v, or :resolved; some
:allocations-e evolve to become :resolved; and some :allocations-v evolve to become :resolved.
If we have *new* :upserts-e (i.e., some :upserts-ev become :upserts-e), then we may be able to
make more progress. We recurse, trying to resolve these new :upserts-e.
Eventually we will have no :upserts-e. At this point, :upserts-ev become :allocations-ev, and now
we have :entities, :upserted, :resolved, and various :allocations-*.
As a future optimization, :upserts do not need to be inserted; they upserted, so they already
exist in the DB. (We still need to verify uniqueness and ensure no overlapping can occur.)
Similary, :allocations-* do not need to be checked for existence, so they can be written to the DB
faster."
(go-pair
(let [{:keys [upserted resolved upserts-ev upserts-e allocations-ev allocations-e allocations-v entities]} evolution]
(if-not upserts-e
;; No more progress to be made. Any upserts-ev must just be allocations.
{:allocations-ev (concat upserts-ev allocations-ev)
:allocations-e allocations-e
:allocations-v allocations-v
:upserted upserted
:resolved resolved
:entities entities}
;; Progress can be made. Try to evolve further.
(let [id->e (<? (<resolve-upserts-e db upserts-e))]
(merge-with
concat
{:tempids id->e ;; TODO: ensure we handle conflicting upserts across generations correctly here.
:upserted upserted
:resolved resolved
:entities entities}
(evolve-upserts-ev id->e upserts-ev)
(evolve-upserts-e id->e upserts-e)
(evolve-allocations-ev id->e allocations-ev)
(evolve-allocations-e id->e allocations-e)
(evolve-allocations-v id->e allocations-v)))))))
;; TODO: do this in one step, rather than iterating.
(defn allocate [report evolution]
"Given a maximally evolved map of generations, allocate entids for all id-literals that did not
get upserted."
(let [{:keys [tempids upserted resolved allocations-ev allocations-e allocations-v entities]} evolution
initial-report (assoc report :tempids tempids)]
(loop [report
(assoc initial-report
;; TODO: drop :upserted, they already exist in the DB; and don't search for
;; :allocations-*, they definitely don't already exist in the DB.
:entities (concat upserted resolved entities))
es
(concat allocations-ev allocations-e allocations-v)]
(let [[[op e a v :as entity] & entities] es]
(cond
(nil? entity)
report
(id-literal? e)
(let [eid (or (get-in report [:tempids e]) (-next-eid! (:part-map-atom report) e))]
(recur (allocate-eid report e eid) (cons [op eid a v] entities)))
(id-literal? v)
(let [eid (or (get-in report [:tempids v]) (-next-eid! (:part-map-atom report) v))]
(recur (allocate-eid report v eid) (cons [op e a eid] entities)))
true
(recur (transact-entity report entity) entities)
)))))
(defn <resolve-id-literals (defn <resolve-id-literals
"Upsert uniquely identified literals when possible and allocate new entids for all other id literals. "Upsert uniquely identified literals when possible and allocate new entids for all other id literals.
@ -358,57 +573,16 @@
{:pre [(db/db? db) (report? report)]} {:pre [(db/db? db) (report? report)]}
(go-pair (go-pair
(let [keyfn (fn [[op e a v]] (let [schema (db/schema db)
(if (and (id-literal? e) unique-identity? (memoize (partial ds/unique-identity? schema))
(not-any? id-literal? [a v]))
(- 5)
(- (count (filter id-literal? [e a v])))))
initial-report (assoc report :entities []) ;; TODO.
initial-entities (sort-by keyfn (:entities report))]
(loop [report initial-report
es initial-entities]
(let [[[op e a v :as entity] & entities] es]
(cond
(nil? entity)
report
(and (not= op :db/add) generations
(or (id-literal? e) (group-by (partial id-literal-generation unique-identity?) (:entities report))
(id-literal? a)
(id-literal? v)))
(raise "id-literals are resolved for :db/add only"
{:error :transact/syntax
:op entity })
;; Upsert! evolution
(and (id-literal? e) (<? (<evolve db generations))
(ds/unique-identity? (db/schema db) a) ]
(not-any? id-literal? [a v])) (allocate report evolution))))
(let [upserted-eid (:e (<? (db/<av db a v)))
allocated-eid (get-in report [:tempids e])]
(if (and upserted-eid allocated-eid (not= upserted-eid allocated-eid))
(<? (<retry-with-tempid db initial-report initial-entities e upserted-eid)) ;; TODO: not initial report, just the sorted entities here.
(let [eid (or upserted-eid allocated-eid (-next-eid! (:part-map-atom report) e))]
(recur (allocate-eid report e eid) (cons [op eid a v] entities)))))
;; Start allocating and retrying. We try with e last, so as to eventually upsert it.
(id-literal? v)
;; We can't fail with unbound literals here, since we could have multiple.
(let [eid (or (get-in report [:tempids v]) (-next-eid! (:part-map-atom report) e))]
(recur (allocate-eid report v eid) (cons [op e a eid] entities)))
(id-literal? a)
;; TODO: should we even allow id-literal attributes? Datomic fails in some cases here.
(let [eid (or (get-in report [:tempids a]) (-next-eid! (:part-map-atom report) e))]
(recur (allocate-eid report a eid) (cons [op e eid v] entities)))
(id-literal? e)
(let [eid (or (get-in report [:tempids e]) (-next-eid! (:part-map-atom report) e))]
(recur (allocate-eid report e eid) (cons [op eid a v] entities)))
true
(recur (transact-entity report entity) entities)
))))))
(defn- transact-report [report datom] (defn- transact-report [report datom]
(update-in report [:tx-data] conj datom)) (update-in report [:tx-data] conj datom))