Cache partition map and update materialized partition view once. Fixes #47.

This caches a partition map per DB, which is helpful because it exposes
what the point in time DB partition state is, but is unhelpful because
the partition state can advance underneath the DB cache.  This is
generally true of the approach -- this can happen to the ident/entid
maps, and the datoms themselves -- so we'll roll with it for now.

This reduces the number of SQL UPDATE operations from linear in the
number of id-literals used to constant in the number of known
partitions.
This commit is contained in:
Nick Alexander 2016-08-31 16:04:46 -07:00
parent d419554361
commit d92016166a
5 changed files with 127 additions and 70 deletions

View file

@ -88,6 +88,10 @@
[db eid] [db eid]
"Returns the keyword associated with an id, or the key itself if passed.") "Returns the keyword associated with an id, or the key itself if passed.")
(part-map
[db]
"Return the partition map of this database, like {:db.part/user {:start 0x100 :idx 0x101}, ...}.")
(in-transaction! (in-transaction!
[db chan-fn] [db chan-fn]
"Evaluate the given pair-chan `chan-fn` in an exclusive transaction. If it returns non-nil, "Evaluate the given pair-chan `chan-fn` in an exclusive transaction. If it returns non-nil,
@ -113,10 +117,9 @@
[db fragment merge] [db fragment merge]
"Apply added schema fragment to the store, using `merge` as a `merge-with` function.") "Apply added schema fragment to the store, using `merge` as a `merge-with` function.")
(<next-eid (<apply-db-part-map
[db id-literal] [db part-map]
"Return a unique integer for the given id-literal, accounting for the literal's partition. The "Apply updated partition map."))
returned integer should never be returned again."))
(defn db? [x] (defn db? [x]
(and (satisfies? IDB x) (and (satisfies? IDB x)
@ -495,14 +498,17 @@
;; We index on tx, so the following is fast. ;; We index on tx, so the following is fast.
["SELECT * FROM transactions WHERE tx = ?" tx]))))))) ["SELECT * FROM transactions WHERE tx = ?" tx])))))))
(defrecord DB [sqlite-connection schema ident-map] (defrecord DB [sqlite-connection schema ident-map part-map]
;; ident-map maps between keyword idents and integer entids. The set of idents and entids is ;; ident-map maps between keyword idents and integer entids. The set of idents and entids is
;; disjoint, so we represent both directions of the mapping in the same map for simplicity. Also ;; disjoint, so we represent both directions of the mapping in the same map for simplicity. Also
;; for simplicity, we assume that an entid has at most one associated ident, and vice-versa. See ;; for simplicity, we assume that an entid has at most one associated ident, and vice-versa. See
;; http://docs.datomic.com/identity.html#idents. ;; http://docs.datomic.com/identity.html#idents.
;;
;; The partition-map part-map looks like {:db.part/user {:start 0x100 :idx 0x101}, ...}. It maps
;; between keyword ident part names and integer ranges, where start is the beginning of the
;; range (for future use to help identify which partition entids lie in, and idx is the current
;; maximum entid in the partition.
;; TODO: cache parts. parts looks like {:db.part/db {:start 0 :current 10}}. It maps between
;; keyword ident part names and integer ranges.
IDB IDB
(query-context [db] (context/make-context (datoms-source db))) (query-context [db] (context/make-context (datoms-source db)))
@ -518,6 +524,9 @@
(get (.-ident-map db) eid eid) (get (.-ident-map db) eid eid)
eid)) eid))
(part-map [db]
(:part-map db))
(in-transaction! [db chan-fn] (in-transaction! [db chan-fn]
(s/in-transaction! (s/in-transaction!
(:sqlite-connection db) chan-fn)) (:sqlite-connection db) chan-fn))
@ -551,27 +560,27 @@
<? <?
yield-datom)))) yield-datom))))
(<next-eid [db tempid]
{:pre [(id-literal? tempid)]}
{:post [ds/entid?]}
(go-pair
;; TODO: keep all of these eid allocations in the transaction report and apply them at the end
;; of the transaction.
(let [exec (partial s/execute! (:sqlite-connection db))
part (entid db (:part tempid))]
(when-not (ds/entid? part) ;; TODO: cache parts materialized view.
(raise "Cannot allocate entid for id-literal " tempid " because part " (:part tempid) " is not known"
{:error :db/bad-part
:part (:part tempid)}))
(p :next-eid-body
(<? (exec ["UPDATE parts SET idx = idx + 1 WHERE part = ?" part]))
(:eid (first (<? (s/all-rows (:sqlite-connection db) ["SELECT (start + idx) AS eid FROM parts WHERE part = ?" part]))))))))
(<apply-entities [db tx entities] (<apply-entities [db tx entities]
{:pre [(db? db) (sequential? entities)]} {:pre [(db? db) (sequential? entities)]}
(-<apply-entities db tx entities)) (-<apply-entities db tx entities))
(<apply-db-part-map [db part-map]
(go-pair
(let [exec (partial s/execute! (:sqlite-connection db))]
(let [pairs (mapcat (fn [[part {:keys [start idx]}]]
(when-not (= idx (get-in db [:part-map part :idx]))
[(sqlite-schema/->SQLite part) idx]))
part-map)]
;; TODO: chunk into 999/2 sections, for safety.
(when-not (empty? pairs)
(<?
(exec
(cons (apply str "UPDATE parts SET idx = CASE"
(concat
(repeat (count pairs) " WHEN part = ? THEN ?")
[" ELSE idx END"]))
pairs))))))
(assoc db :part-map part-map)))
(<apply-db-ident-assertions [db added-idents merge] (<apply-db-ident-assertions [db added-idents merge]
(go-pair (go-pair
@ -612,9 +621,11 @@
(defn with-ident [db ident entid] (defn with-ident [db ident entid]
(update db :ident-map #(assoc % ident entid, entid ident))) (update db :ident-map #(assoc % ident entid, entid ident)))
(defn db [sqlite-connection idents schema] (defn db [sqlite-connection idents parts schema]
{:pre [(map? idents) {:pre [(map? idents)
(every? keyword? (keys idents)) (every? keyword? (keys idents))
(map? parts)
(every? keyword? (keys parts))
(map? schema) (map? schema)
(every? keyword? (keys schema))]} (every? keyword? (keys schema))]}
(let [entid-schema (ds/schema (into {} (map (fn [[k v]] [(k idents) v]) schema))) ;; TODO: fail if ident missing. (let [entid-schema (ds/schema (into {} (map (fn [[k v]] [(k idents) v]) schema))) ;; TODO: fail if ident missing.
@ -622,9 +633,9 @@
(map->DB (map->DB
{:sqlite-connection sqlite-connection {:sqlite-connection sqlite-connection
:ident-map ident-map :ident-map ident-map
:part-map parts
:symbolic-schema schema :symbolic-schema schema
:schema entid-schema :schema entid-schema
;; TODO :parts
}))) })))
;; TODO: factor this into the overall design. ;; TODO: factor this into the overall design.

View file

@ -34,6 +34,17 @@
(s/all-rows sqlite-connection)))] (s/all-rows sqlite-connection)))]
(into {} (map (fn [row] [(sqlite-schema/<-SQLite :db.type/keyword (:ident row)) (:entid row)])) rows)))) (into {} (map (fn [row] [(sqlite-schema/<-SQLite :db.type/keyword (:ident row)) (:entid row)])) rows))))
(defn <parts [sqlite-connection]
"Read the parts map materialized view from the given SQLite store.
Returns a map (keyword part) -> {:start integer :idx integer}, like {:db.part/user {start: 0x100 idx: 0x101}}."
(go-pair
(let [rows (<? (->>
{:select [:part :start :idx] :from [:parts]}
(s/format)
(s/all-rows sqlite-connection)))]
(into {} (map (fn [row] [(sqlite-schema/<-SQLite :db.type/keyword (:part row)) (select-keys row [:start :idx])])) rows))))
(defn <symbolic-schema [sqlite-connection] (defn <symbolic-schema [sqlite-connection]
"Read the schema map materialized view from the given SQLite store. "Read the schema map materialized view from the given SQLite store.
Returns a map (keyword ident) -> (map (keyword attribute -> keyword value)), like Returns a map (keyword ident) -> (map (keyword attribute -> keyword value)), like
@ -72,7 +83,7 @@
(when-not (= sqlite-schema/current-version (<? (sqlite-schema/<ensure-current-version sqlite-connection))) (when-not (= sqlite-schema/current-version (<? (sqlite-schema/<ensure-current-version sqlite-connection)))
(raise "Could not ensure current SQLite schema version.")) (raise "Could not ensure current SQLite schema version."))
(let [db (db/db sqlite-connection bootstrap/idents bootstrap/symbolic-schema) (let [db (db/db sqlite-connection bootstrap/idents bootstrap/parts bootstrap/symbolic-schema)
bootstrapped? (<? (db/<bootstrapped? db))] bootstrapped? (<? (db/<bootstrapped? db))]
(when-not bootstrapped? (when-not bootstrapped?
;; We need to bootstrap the DB. ;; We need to bootstrap the DB.
@ -84,16 +95,19 @@
(raise "Altering schema attributes is not yet supported, got " new " altering existing schema attribute " old (raise "Altering schema attributes is not yet supported, got " new " altering existing schema attribute " old
{:error :schema/alter-schema :old old :new new}) {:error :schema/alter-schema :old old :new new})
new))] new))]
(do (let [exec (partial s/execute! (:sqlite-connection db))
(let [exec (partial s/execute! (:sqlite-connection db))] part->vector (fn [[part {:keys [start idx]}]]
(println "part->vector" part start idx)
[(sqlite-schema/->SQLite part) start idx])]
;; TODO: allow inserting new parts. ;; TODO: allow inserting new parts.
;; TODO: think more carefully about allocating new parts and bitmasking part ranges. ;; TODO: think more carefully about allocating new parts and bitmasking part ranges.
;; TODO: install these using bootstrap assertions. It's tricky because the part ranges are implicit.
;; TODO: chunk into 999/3 sections, for safety.
(<? (exec (<? (exec
["INSERT INTO parts VALUES (?, ?, ?)" (db/entid db :db.part/db) 0x0 (inc (apply max (vals bootstrap/idents)))])) (cons (str "INSERT INTO parts VALUES "
(<? (exec (apply str (interpose ", " (repeat (count bootstrap/parts) "(?, ?, ?)"))))
["INSERT INTO parts VALUES (?, ?, ?)" (db/entid db :db.part/user) 0x10000 0])) (mapcat part->vector bootstrap/parts)))))
(<? (exec
["INSERT INTO parts VALUES (?, ?, ?)" (db/entid db :db.part/tx) 0x10000000 0])))
(-> db (-> db
;; We use <with-internal rather than <transact! to apply the bootstrap transaction ;; We use <with-internal rather than <transact! to apply the bootstrap transaction
;; data but to not follow the regular schema application process. We can't apply the ;; data but to not follow the regular schema application process. We can't apply the
@ -103,21 +117,26 @@
;; datoms to the store. It's feasible but awkward.) After bootstrapping, we read ;; datoms to the store. It's feasible but awkward.) After bootstrapping, we read
;; back the idents and schema, just like when we re-open. ;; back the idents and schema, just like when we re-open.
(transact/<with-internal (bootstrap/tx-data) fail-alter-ident fail-alter-attr) (transact/<with-internal (bootstrap/tx-data) fail-alter-ident fail-alter-attr)
(<?))))) (<?))))
;; We just bootstrapped, or we are returning to an already bootstrapped DB. ;; We just bootstrapped, or we are returning to an already bootstrapped DB.
(let [idents (<? (<idents sqlite-connection)) (let [idents (<? (<idents sqlite-connection))
parts (<? (<parts sqlite-connection))
symbolic-schema (<? (<symbolic-schema sqlite-connection))] symbolic-schema (<? (<symbolic-schema sqlite-connection))]
(when-not bootstrapped? (when-not bootstrapped?
;; TODO: parts.
(when (not (= idents bootstrap/idents)) (when (not (= idents bootstrap/idents))
(raise "After bootstrapping database, expected new materialized idents and old bootstrapped idents to be identical" (raise "After bootstrapping database, expected new materialized idents and old bootstrapped idents to be identical"
{:error :bootstrap/bad-idents, {:error :bootstrap/bad-idents,
:new idents :old bootstrap/idents :new idents :old bootstrap/idents
})) }))
(when (not (= (dissoc parts :db.part/tx) (dissoc bootstrap/parts :db.part/tx))) ;; TODO: work around tx allocation.
(raise "After bootstrapping database, expected new materialized parts and old bootstrapped parts to be identical (outside of db.part/tx)"
{:error :bootstrap/bad-parts,
:new (dissoc parts :db.part/tx) :old (dissoc bootstrap/parts :db.part/tx)
}))
(when (not (= symbolic-schema bootstrap/symbolic-schema)) (when (not (= symbolic-schema bootstrap/symbolic-schema))
(raise "After bootstrapping database, expected new materialized symbolic schema and old bootstrapped symbolic schema to be identical" (raise "After bootstrapping database, expected new materialized symbolic schema and old bootstrapped symbolic schema to be identical"
{:error :bootstrap/bad-symbolic-schema, {:error :bootstrap/bad-symbolic-schema,
:new symbolic-schema :old bootstrap/symbolic-schema :new symbolic-schema :old bootstrap/symbolic-schema
}))) })))
(db/db sqlite-connection idents symbolic-schema))))) (db/db sqlite-connection idents parts symbolic-schema)))))

View file

@ -112,7 +112,7 @@
;; TODO: allow arbitrary schema values (true/false) and tag the resulting values. ;; TODO: allow arbitrary schema values (true/false) and tag the resulting values.
"CREATE TABLE schema (ident TEXT NOT NULL, attr TEXT NOT NULL, value TEXT NOT NULL, FOREIGN KEY (ident) REFERENCES idents (ident))" "CREATE TABLE schema (ident TEXT NOT NULL, attr TEXT NOT NULL, value TEXT NOT NULL, FOREIGN KEY (ident) REFERENCES idents (ident))"
"CREATE INDEX idx_schema_unique ON schema (ident, attr, value)" "CREATE INDEX idx_schema_unique ON schema (ident, attr, value)"
"CREATE TABLE parts (part INTEGER NOT NULL PRIMARY KEY, start INTEGER NOT NULL, idx INTEGER NOT NULL)" "CREATE TABLE parts (part TEXT NOT NULL PRIMARY KEY, start INTEGER NOT NULL, idx INTEGER NOT NULL)"
]) ])
(defn <create-current-version (defn <create-current-version

View file

@ -64,6 +64,7 @@
entities ;; The set of entities (like [:db/add e a v tx]) processed. entities ;; The set of entities (like [:db/add e a v tx]) processed.
tx-data ;; The set of datoms applied to the database, like (Datom. e a v tx added). tx-data ;; The set of datoms applied to the database, like (Datom. e a v tx added).
tempids ;; The map from id-literal -> numeric entid. tempids ;; The map from id-literal -> numeric entid.
part-map ;; Map {:db.part/user {:start 0x10000 :idx 0x10000}, ...}.
added-parts ;; The set of parts added during the transaction via :db.part/db :db.install/part. added-parts ;; The set of parts added during the transaction via :db.part/db :db.install/part.
added-idents ;; The map of idents -> entid added during the transaction, via e :db/ident ident. added-idents ;; The map of idents -> entid added during the transaction, via e :db/ident ident.
added-attributes ;; The map of schema attributes (ident -> schema fragment) added during the transaction, via :db.part/db :db.install/attribute. added-attributes ;; The map of schema attributes (ident -> schema fragment) added during the transaction, via :db.part/db :db.install/attribute.
@ -72,11 +73,19 @@
(defn- report? [x] (defn- report? [x]
(and (instance? TxReport x))) (and (instance? TxReport x)))
(defonce -eid (atom (- 0x200 1))) (defn- -next-eid! [part-map-atom tempid]
"Advance {:db.part/user {:start 0x10 :idx 0x11}, ...} to {:db.part/user {:start 0x10 :idx 0x12}, ...} and return 0x12."
;; TODO: better here. {:pre [(id-literal? tempid)]}
(defn- next-eid [db] (let [part (:part tempid)
(swap! -eid inc)) next (fn [part-map]
(let [idx (get-in part-map [part :idx])]
(when-not idx
(raise "Cannot allocate entid for id-literal " tempid " because part " part " is not known"
{:error :db/bad-part
:parts (sorted-set (keys part-map))
:part part}))
(update-in part-map [part :idx] inc)))]
(get-in (swap! part-map-atom next) [part :idx])))
(defn- allocate-eid (defn- allocate-eid
[report id-literal eid] [report id-literal eid]
@ -327,22 +336,22 @@
allocated-eid (get-in report [:tempids e])] allocated-eid (get-in report [:tempids e])]
(if (and upserted-eid allocated-eid (not= upserted-eid allocated-eid)) (if (and upserted-eid allocated-eid (not= upserted-eid allocated-eid))
(<? (<retry-with-tempid db initial-report initial-entities e upserted-eid)) ;; TODO: not initial report, just the sorted entities here. (<? (<retry-with-tempid db initial-report initial-entities e upserted-eid)) ;; TODO: not initial report, just the sorted entities here.
(let [eid (or upserted-eid allocated-eid (<? (db/<next-eid db e)))] (let [eid (or upserted-eid allocated-eid (-next-eid! (:part-map-atom report) e))]
(recur (allocate-eid report e eid) (cons [op eid a v] entities))))) (recur (allocate-eid report e eid) (cons [op eid a v] entities)))))
;; Start allocating and retrying. We try with e last, so as to eventually upsert it. ;; Start allocating and retrying. We try with e last, so as to eventually upsert it.
(id-literal? v) (id-literal? v)
;; We can't fail with unbound literals here, since we could have multiple. ;; We can't fail with unbound literals here, since we could have multiple.
(let [eid (or (get-in report [:tempids v]) (<? (db/<next-eid db e)))] (let [eid (or (get-in report [:tempids v]) (-next-eid! (:part-map-atom report) e))]
(recur (allocate-eid report v eid) (cons [op e a eid] entities))) (recur (allocate-eid report v eid) (cons [op e a eid] entities)))
(id-literal? a) (id-literal? a)
;; TODO: should we even allow id-literal attributes? Datomic fails in some cases here. ;; TODO: should we even allow id-literal attributes? Datomic fails in some cases here.
(let [eid (or (get-in report [:tempids a]) (<? (db/<next-eid db e)))] (let [eid (or (get-in report [:tempids a]) (-next-eid! (:part-map-atom report) e))]
(recur (allocate-eid report a eid) (cons [op e eid v] entities))) (recur (allocate-eid report a eid) (cons [op e eid v] entities)))
(id-literal? e) (id-literal? e)
(let [eid (or (get-in report [:tempids e]) (<? (db/<next-eid db e)))] (let [eid (or (get-in report [:tempids e]) (-next-eid! (:part-map-atom report) e))]
(recur (allocate-eid report e eid) (cons [op eid a v] entities))) (recur (allocate-eid report e eid) (cons [op eid a v] entities)))
true true
@ -453,7 +462,13 @@
;; TODO: expose this in a more appropriate way. ;; TODO: expose this in a more appropriate way.
(defn <with-internal [db tx-data merge-ident merge-attr] (defn <with-internal [db tx-data merge-ident merge-attr]
(go-pair (go-pair
(let [report (->> (let [part-map-atom
(atom (db/part-map db))
tx
(-next-eid! part-map-atom (id-literal :db.part/tx))
report (->>
(map->TxReport (map->TxReport
{:db-before db {:db-before db
:db-after db :db-after db
@ -461,7 +476,8 @@
;; transaction ID and transaction timestamp directly from the report; Datomic ;; transaction ID and transaction timestamp directly from the report; Datomic
;; makes this surprisingly difficult: one needs a :db.part/tx temporary and an ;; makes this surprisingly difficult: one needs a :db.part/tx temporary and an
;; explicit upsert of that temporary. ;; explicit upsert of that temporary.
:tx (<? (db/<next-eid db (id-literal :db.part/tx))) :part-map-atom part-map-atom
:tx tx
:txInstant (db/now db) :txInstant (db/now db)
:entities tx-data :entities tx-data
:tx-data [] :tx-data []
@ -484,6 +500,10 @@
db-after (-> db-after (->
db db
(db/<apply-db-part-map @(:part-map-atom report))
(<?)
(->> (p :apply-db-part-changes))
(db/<apply-db-ident-assertions (:added-idents report) merge-ident) (db/<apply-db-ident-assertions (:added-idents report) merge-ident)
(<?) (<?)
(->> (p :apply-db-ident-assertions)) (->> (p :apply-db-ident-assertions))

View file

@ -76,9 +76,16 @@
:db/doc 35 :db/doc 35
}) })
(def parts
{:db.part/db {:start 0 :idx (inc (apply max (vals idents)))}
:db.part/user {:start 0x10000 :idx 0x10000}
:db.part/tx {:start 0x10000000 :idx 0x10000000}
})
(defn tx-data [] (defn tx-data []
(concat (concat
(map (fn [[ident entid]] [:db/add entid :db/ident ident]) idents) (map (fn [[ident entid]] [:db/add entid :db/ident ident]) idents)
;; TODO: install partitions as well, like (map (fn [[ident entid]] [:db/add :db.part/db :db.install/partition ident])).
(map (fn [[ident attrs]] (assoc attrs :db/id ident)) symbolic-schema) (map (fn [[ident attrs]] (assoc attrs :db/id ident)) symbolic-schema)
(map (fn [[ident attrs]] [:db/add :db.part/db :db.install/attribute (get idents ident)]) symbolic-schema) ;; TODO: fail if nil. (map (fn [[ident attrs]] [:db/add :db.part/db :db.install/attribute (get idents ident)]) symbolic-schema) ;; TODO: fail if nil.
)) ))