Cache partition map and update materialized partition view once. Fixes #47. r=rnewman
This commit is contained in:
commit
e48f58f5f4
5 changed files with 127 additions and 70 deletions
|
@ -88,6 +88,10 @@
|
|||
[db eid]
|
||||
"Returns the keyword associated with an id, or the key itself if passed.")
|
||||
|
||||
(part-map
|
||||
[db]
|
||||
"Return the partition map of this database, like {:db.part/user {:start 0x100 :idx 0x101}, ...}.")
|
||||
|
||||
(in-transaction!
|
||||
[db chan-fn]
|
||||
"Evaluate the given pair-chan `chan-fn` in an exclusive transaction. If it returns non-nil,
|
||||
|
@ -113,10 +117,9 @@
|
|||
[db fragment merge]
|
||||
"Apply added schema fragment to the store, using `merge` as a `merge-with` function.")
|
||||
|
||||
(<next-eid
|
||||
[db id-literal]
|
||||
"Return a unique integer for the given id-literal, accounting for the literal's partition. The
|
||||
returned integer should never be returned again."))
|
||||
(<apply-db-part-map
|
||||
[db part-map]
|
||||
"Apply updated partition map."))
|
||||
|
||||
(defn db? [x]
|
||||
(and (satisfies? IDB x)
|
||||
|
@ -495,14 +498,17 @@
|
|||
;; We index on tx, so the following is fast.
|
||||
["SELECT * FROM transactions WHERE tx = ?" tx])))))))
|
||||
|
||||
(defrecord DB [sqlite-connection schema ident-map]
|
||||
(defrecord DB [sqlite-connection schema ident-map part-map]
|
||||
;; ident-map maps between keyword idents and integer entids. The set of idents and entids is
|
||||
;; disjoint, so we represent both directions of the mapping in the same map for simplicity. Also
|
||||
;; for simplicity, we assume that an entid has at most one associated ident, and vice-versa. See
|
||||
;; http://docs.datomic.com/identity.html#idents.
|
||||
;;
|
||||
;; The partition-map part-map looks like {:db.part/user {:start 0x100 :idx 0x101}, ...}. It maps
|
||||
;; between keyword ident part names and integer ranges, where start is the beginning of the
|
||||
;; range (for future use to help identify which partition entids lie in, and idx is the current
|
||||
;; maximum entid in the partition.
|
||||
|
||||
;; TODO: cache parts. parts looks like {:db.part/db {:start 0 :current 10}}. It maps between
|
||||
;; keyword ident part names and integer ranges.
|
||||
IDB
|
||||
(query-context [db] (context/make-context (datoms-source db)))
|
||||
|
||||
|
@ -518,6 +524,9 @@
|
|||
(get (.-ident-map db) eid eid)
|
||||
eid))
|
||||
|
||||
(part-map [db]
|
||||
(:part-map db))
|
||||
|
||||
(in-transaction! [db chan-fn]
|
||||
(s/in-transaction!
|
||||
(:sqlite-connection db) chan-fn))
|
||||
|
@ -551,27 +560,27 @@
|
|||
<?
|
||||
yield-datom))))
|
||||
|
||||
(<next-eid [db tempid]
|
||||
{:pre [(id-literal? tempid)]}
|
||||
{:post [ds/entid?]}
|
||||
(go-pair
|
||||
;; TODO: keep all of these eid allocations in the transaction report and apply them at the end
|
||||
;; of the transaction.
|
||||
(let [exec (partial s/execute! (:sqlite-connection db))
|
||||
part (entid db (:part tempid))]
|
||||
(when-not (ds/entid? part) ;; TODO: cache parts materialized view.
|
||||
(raise "Cannot allocate entid for id-literal " tempid " because part " (:part tempid) " is not known"
|
||||
{:error :db/bad-part
|
||||
:part (:part tempid)}))
|
||||
|
||||
(p :next-eid-body
|
||||
(<? (exec ["UPDATE parts SET idx = idx + 1 WHERE part = ?" part]))
|
||||
(:eid (first (<? (s/all-rows (:sqlite-connection db) ["SELECT (start + idx) AS eid FROM parts WHERE part = ?" part]))))))))
|
||||
|
||||
(<apply-entities [db tx entities]
|
||||
{:pre [(db? db) (sequential? entities)]}
|
||||
(-<apply-entities db tx entities))
|
||||
|
||||
(<apply-db-part-map [db part-map]
|
||||
(go-pair
|
||||
(let [exec (partial s/execute! (:sqlite-connection db))]
|
||||
(let [pairs (mapcat (fn [[part {:keys [start idx]}]]
|
||||
(when-not (= idx (get-in db [:part-map part :idx]))
|
||||
[(sqlite-schema/->SQLite part) idx]))
|
||||
part-map)]
|
||||
;; TODO: chunk into 999/2 sections, for safety.
|
||||
(when-not (empty? pairs)
|
||||
(<?
|
||||
(exec
|
||||
(cons (apply str "UPDATE parts SET idx = CASE"
|
||||
(concat
|
||||
(repeat (count pairs) " WHEN part = ? THEN ?")
|
||||
[" ELSE idx END"]))
|
||||
pairs))))))
|
||||
(assoc db :part-map part-map)))
|
||||
|
||||
(<apply-db-ident-assertions [db added-idents merge]
|
||||
(go-pair
|
||||
|
@ -612,9 +621,11 @@
|
|||
(defn with-ident [db ident entid]
|
||||
(update db :ident-map #(assoc % ident entid, entid ident)))
|
||||
|
||||
(defn db [sqlite-connection idents schema]
|
||||
(defn db [sqlite-connection idents parts schema]
|
||||
{:pre [(map? idents)
|
||||
(every? keyword? (keys idents))
|
||||
(map? parts)
|
||||
(every? keyword? (keys parts))
|
||||
(map? schema)
|
||||
(every? keyword? (keys schema))]}
|
||||
(let [entid-schema (ds/schema (into {} (map (fn [[k v]] [(k idents) v]) schema))) ;; TODO: fail if ident missing.
|
||||
|
@ -622,9 +633,9 @@
|
|||
(map->DB
|
||||
{:sqlite-connection sqlite-connection
|
||||
:ident-map ident-map
|
||||
:part-map parts
|
||||
:symbolic-schema schema
|
||||
:schema entid-schema
|
||||
;; TODO :parts
|
||||
})))
|
||||
|
||||
;; TODO: factor this into the overall design.
|
||||
|
|
|
@ -34,6 +34,17 @@
|
|||
(s/all-rows sqlite-connection)))]
|
||||
(into {} (map (fn [row] [(sqlite-schema/<-SQLite :db.type/keyword (:ident row)) (:entid row)])) rows))))
|
||||
|
||||
(defn <parts [sqlite-connection]
|
||||
"Read the parts map materialized view from the given SQLite store.
|
||||
Returns a map (keyword part) -> {:start integer :idx integer}, like {:db.part/user {start: 0x100 idx: 0x101}}."
|
||||
|
||||
(go-pair
|
||||
(let [rows (<? (->>
|
||||
{:select [:part :start :idx] :from [:parts]}
|
||||
(s/format)
|
||||
(s/all-rows sqlite-connection)))]
|
||||
(into {} (map (fn [row] [(sqlite-schema/<-SQLite :db.type/keyword (:part row)) (select-keys row [:start :idx])])) rows))))
|
||||
|
||||
(defn <symbolic-schema [sqlite-connection]
|
||||
"Read the schema map materialized view from the given SQLite store.
|
||||
Returns a map (keyword ident) -> (map (keyword attribute -> keyword value)), like
|
||||
|
@ -72,7 +83,7 @@
|
|||
(when-not (= sqlite-schema/current-version (<? (sqlite-schema/<ensure-current-version sqlite-connection)))
|
||||
(raise "Could not ensure current SQLite schema version."))
|
||||
|
||||
(let [db (db/db sqlite-connection bootstrap/idents bootstrap/symbolic-schema)
|
||||
(let [db (db/db sqlite-connection bootstrap/idents bootstrap/parts bootstrap/symbolic-schema)
|
||||
bootstrapped? (<? (db/<bootstrapped? db))]
|
||||
(when-not bootstrapped?
|
||||
;; We need to bootstrap the DB.
|
||||
|
@ -84,16 +95,19 @@
|
|||
(raise "Altering schema attributes is not yet supported, got " new " altering existing schema attribute " old
|
||||
{:error :schema/alter-schema :old old :new new})
|
||||
new))]
|
||||
(do
|
||||
(let [exec (partial s/execute! (:sqlite-connection db))]
|
||||
(let [exec (partial s/execute! (:sqlite-connection db))
|
||||
part->vector (fn [[part {:keys [start idx]}]]
|
||||
(println "part->vector" part start idx)
|
||||
[(sqlite-schema/->SQLite part) start idx])]
|
||||
;; TODO: allow inserting new parts.
|
||||
;; TODO: think more carefully about allocating new parts and bitmasking part ranges.
|
||||
;; TODO: install these using bootstrap assertions. It's tricky because the part ranges are implicit.
|
||||
;; TODO: chunk into 999/3 sections, for safety.
|
||||
(<? (exec
|
||||
["INSERT INTO parts VALUES (?, ?, ?)" (db/entid db :db.part/db) 0x0 (inc (apply max (vals bootstrap/idents)))]))
|
||||
(<? (exec
|
||||
["INSERT INTO parts VALUES (?, ?, ?)" (db/entid db :db.part/user) 0x10000 0]))
|
||||
(<? (exec
|
||||
["INSERT INTO parts VALUES (?, ?, ?)" (db/entid db :db.part/tx) 0x10000000 0])))
|
||||
(cons (str "INSERT INTO parts VALUES "
|
||||
(apply str (interpose ", " (repeat (count bootstrap/parts) "(?, ?, ?)"))))
|
||||
(mapcat part->vector bootstrap/parts)))))
|
||||
|
||||
(-> db
|
||||
;; We use <with-internal rather than <transact! to apply the bootstrap transaction
|
||||
;; data but to not follow the regular schema application process. We can't apply the
|
||||
|
@ -103,21 +117,26 @@
|
|||
;; datoms to the store. It's feasible but awkward.) After bootstrapping, we read
|
||||
;; back the idents and schema, just like when we re-open.
|
||||
(transact/<with-internal (bootstrap/tx-data) fail-alter-ident fail-alter-attr)
|
||||
(<?)))))
|
||||
(<?))))
|
||||
|
||||
;; We just bootstrapped, or we are returning to an already bootstrapped DB.
|
||||
(let [idents (<? (<idents sqlite-connection))
|
||||
parts (<? (<parts sqlite-connection))
|
||||
symbolic-schema (<? (<symbolic-schema sqlite-connection))]
|
||||
(when-not bootstrapped?
|
||||
;; TODO: parts.
|
||||
(when (not (= idents bootstrap/idents))
|
||||
(raise "After bootstrapping database, expected new materialized idents and old bootstrapped idents to be identical"
|
||||
{:error :bootstrap/bad-idents,
|
||||
:new idents :old bootstrap/idents
|
||||
}))
|
||||
(when (not (= (dissoc parts :db.part/tx) (dissoc bootstrap/parts :db.part/tx))) ;; TODO: work around tx allocation.
|
||||
(raise "After bootstrapping database, expected new materialized parts and old bootstrapped parts to be identical (outside of db.part/tx)"
|
||||
{:error :bootstrap/bad-parts,
|
||||
:new (dissoc parts :db.part/tx) :old (dissoc bootstrap/parts :db.part/tx)
|
||||
}))
|
||||
(when (not (= symbolic-schema bootstrap/symbolic-schema))
|
||||
(raise "After bootstrapping database, expected new materialized symbolic schema and old bootstrapped symbolic schema to be identical"
|
||||
{:error :bootstrap/bad-symbolic-schema,
|
||||
:new symbolic-schema :old bootstrap/symbolic-schema
|
||||
})))
|
||||
(db/db sqlite-connection idents symbolic-schema)))))
|
||||
(db/db sqlite-connection idents parts symbolic-schema)))))
|
||||
|
|
|
@ -112,7 +112,7 @@
|
|||
;; TODO: allow arbitrary schema values (true/false) and tag the resulting values.
|
||||
"CREATE TABLE schema (ident TEXT NOT NULL, attr TEXT NOT NULL, value TEXT NOT NULL, FOREIGN KEY (ident) REFERENCES idents (ident))"
|
||||
"CREATE INDEX idx_schema_unique ON schema (ident, attr, value)"
|
||||
"CREATE TABLE parts (part INTEGER NOT NULL PRIMARY KEY, start INTEGER NOT NULL, idx INTEGER NOT NULL)"
|
||||
"CREATE TABLE parts (part TEXT NOT NULL PRIMARY KEY, start INTEGER NOT NULL, idx INTEGER NOT NULL)"
|
||||
])
|
||||
|
||||
(defn <create-current-version
|
||||
|
|
|
@ -64,6 +64,7 @@
|
|||
entities ;; The set of entities (like [:db/add e a v tx]) processed.
|
||||
tx-data ;; The set of datoms applied to the database, like (Datom. e a v tx added).
|
||||
tempids ;; The map from id-literal -> numeric entid.
|
||||
part-map ;; Map {:db.part/user {:start 0x10000 :idx 0x10000}, ...}.
|
||||
added-parts ;; The set of parts added during the transaction via :db.part/db :db.install/part.
|
||||
added-idents ;; The map of idents -> entid added during the transaction, via e :db/ident ident.
|
||||
added-attributes ;; The map of schema attributes (ident -> schema fragment) added during the transaction, via :db.part/db :db.install/attribute.
|
||||
|
@ -72,11 +73,19 @@
|
|||
(defn- report? [x]
|
||||
(and (instance? TxReport x)))
|
||||
|
||||
(defonce -eid (atom (- 0x200 1)))
|
||||
|
||||
;; TODO: better here.
|
||||
(defn- next-eid [db]
|
||||
(swap! -eid inc))
|
||||
(defn- -next-eid! [part-map-atom tempid]
|
||||
"Advance {:db.part/user {:start 0x10 :idx 0x11}, ...} to {:db.part/user {:start 0x10 :idx 0x12}, ...} and return 0x12."
|
||||
{:pre [(id-literal? tempid)]}
|
||||
(let [part (:part tempid)
|
||||
next (fn [part-map]
|
||||
(let [idx (get-in part-map [part :idx])]
|
||||
(when-not idx
|
||||
(raise "Cannot allocate entid for id-literal " tempid " because part " part " is not known"
|
||||
{:error :db/bad-part
|
||||
:parts (sorted-set (keys part-map))
|
||||
:part part}))
|
||||
(update-in part-map [part :idx] inc)))]
|
||||
(get-in (swap! part-map-atom next) [part :idx])))
|
||||
|
||||
(defn- allocate-eid
|
||||
[report id-literal eid]
|
||||
|
@ -327,22 +336,22 @@
|
|||
allocated-eid (get-in report [:tempids e])]
|
||||
(if (and upserted-eid allocated-eid (not= upserted-eid allocated-eid))
|
||||
(<? (<retry-with-tempid db initial-report initial-entities e upserted-eid)) ;; TODO: not initial report, just the sorted entities here.
|
||||
(let [eid (or upserted-eid allocated-eid (<? (db/<next-eid db e)))]
|
||||
(let [eid (or upserted-eid allocated-eid (-next-eid! (:part-map-atom report) e))]
|
||||
(recur (allocate-eid report e eid) (cons [op eid a v] entities)))))
|
||||
|
||||
;; Start allocating and retrying. We try with e last, so as to eventually upsert it.
|
||||
(id-literal? v)
|
||||
;; We can't fail with unbound literals here, since we could have multiple.
|
||||
(let [eid (or (get-in report [:tempids v]) (<? (db/<next-eid db e)))]
|
||||
(let [eid (or (get-in report [:tempids v]) (-next-eid! (:part-map-atom report) e))]
|
||||
(recur (allocate-eid report v eid) (cons [op e a eid] entities)))
|
||||
|
||||
(id-literal? a)
|
||||
;; TODO: should we even allow id-literal attributes? Datomic fails in some cases here.
|
||||
(let [eid (or (get-in report [:tempids a]) (<? (db/<next-eid db e)))]
|
||||
(let [eid (or (get-in report [:tempids a]) (-next-eid! (:part-map-atom report) e))]
|
||||
(recur (allocate-eid report a eid) (cons [op e eid v] entities)))
|
||||
|
||||
(id-literal? e)
|
||||
(let [eid (or (get-in report [:tempids e]) (<? (db/<next-eid db e)))]
|
||||
(let [eid (or (get-in report [:tempids e]) (-next-eid! (:part-map-atom report) e))]
|
||||
(recur (allocate-eid report e eid) (cons [op eid a v] entities)))
|
||||
|
||||
true
|
||||
|
@ -453,7 +462,13 @@
|
|||
;; TODO: expose this in a more appropriate way.
|
||||
(defn <with-internal [db tx-data merge-ident merge-attr]
|
||||
(go-pair
|
||||
(let [report (->>
|
||||
(let [part-map-atom
|
||||
(atom (db/part-map db))
|
||||
|
||||
tx
|
||||
(-next-eid! part-map-atom (id-literal :db.part/tx))
|
||||
|
||||
report (->>
|
||||
(map->TxReport
|
||||
{:db-before db
|
||||
:db-after db
|
||||
|
@ -461,7 +476,8 @@
|
|||
;; transaction ID and transaction timestamp directly from the report; Datomic
|
||||
;; makes this surprisingly difficult: one needs a :db.part/tx temporary and an
|
||||
;; explicit upsert of that temporary.
|
||||
:tx (<? (db/<next-eid db (id-literal :db.part/tx)))
|
||||
:part-map-atom part-map-atom
|
||||
:tx tx
|
||||
:txInstant (db/now db)
|
||||
:entities tx-data
|
||||
:tx-data []
|
||||
|
@ -484,6 +500,10 @@
|
|||
db-after (->
|
||||
db
|
||||
|
||||
(db/<apply-db-part-map @(:part-map-atom report))
|
||||
(<?)
|
||||
(->> (p :apply-db-part-changes))
|
||||
|
||||
(db/<apply-db-ident-assertions (:added-idents report) merge-ident)
|
||||
(<?)
|
||||
(->> (p :apply-db-ident-assertions))
|
||||
|
|
|
@ -76,9 +76,16 @@
|
|||
:db/doc 35
|
||||
})
|
||||
|
||||
(def parts
|
||||
{:db.part/db {:start 0 :idx (inc (apply max (vals idents)))}
|
||||
:db.part/user {:start 0x10000 :idx 0x10000}
|
||||
:db.part/tx {:start 0x10000000 :idx 0x10000000}
|
||||
})
|
||||
|
||||
(defn tx-data []
|
||||
(concat
|
||||
(map (fn [[ident entid]] [:db/add entid :db/ident ident]) idents)
|
||||
;; TODO: install partitions as well, like (map (fn [[ident entid]] [:db/add :db.part/db :db.install/partition ident])).
|
||||
(map (fn [[ident attrs]] (assoc attrs :db/id ident)) symbolic-schema)
|
||||
(map (fn [[ident attrs]] [:db/add :db.part/db :db.install/attribute (get idents ident)]) symbolic-schema) ;; TODO: fail if nil.
|
||||
))
|
||||
|
|
Loading…
Reference in a new issue