Implement parts: Make the DB allocate and persist entity IDs.

This implementation is inefficient because each allocated temporary ID
touches the database, but it's enough to allow to re-open DBs.
This commit is contained in:
Nick Alexander 2016-08-06 17:35:24 -07:00 committed by Richard Newman
parent 470cb7a82d
commit 3dfdea99e7
9 changed files with 134 additions and 89 deletions

View file

@ -82,16 +82,15 @@
[db eid]
"Returns the keyword associated with an id, or the key itself if passed.")
(current-tx
[db]
"TODO: document this interface.")
(in-transaction!
[db chan-fn]
"Evaluate the given pair-chan `chan-fn` in an exclusive transaction. If it returns non-nil,
commit the transaction; otherwise, rollback the transaction. Returns a pair-chan resolving to
the pair-chan returned by `chan-fn`.")
(<bootstrapped? [db]
"Return true if this database has no transactions yet committed.")
(<ea [db e a]
"Search for datoms using the EAVT index.")
@ -114,9 +113,10 @@
[db fragment merge]
"Apply added schema fragment to the store, using `merge` as a `merge-with` function.")
(<advance-tx
[db]
"TODO: document this interface."))
(<next-eid
[db id-literal]
"Return a unique integer for the given id-literal, accounting for the literal's partition. The
returned integer should never be returned again."))
(defn db? [x]
(and (satisfies? IDB x)
@ -146,7 +146,6 @@
]
rowid)))
(defn datoms-attribute-transform
[db x]
{:pre [(db? db)]}
@ -168,11 +167,14 @@
:table-alias source/gensym-table-alias
:make-constraints nil}))
(defrecord DB [sqlite-connection schema entids ident-map current-tx]
(defrecord DB [sqlite-connection schema ident-map]
;; ident-map maps between keyword idents and integer entids. The set of idents and entids is
;; disjoint, so we represent both directions of the mapping in the same map for simplicity. Also
;; for simplicity, we assume that an entid has at most one associated ident, and vice-versa. See
;; http://docs.datomic.com/identity.html#idents.
;; TODO: cache parts. parts looks like {:db.part/db {:start 0 :current 10}}. It maps between
;; keyword ident part names and integer ranges.
IDB
(query-context [db] (context/->Context (datoms-source db) nil nil))
@ -188,14 +190,20 @@
(get (.-ident-map db) eid eid)
eid))
(current-tx
[db]
(inc (:current-tx db)))
(in-transaction! [db chan-fn]
(s/in-transaction!
(:sqlite-connection db) chan-fn))
(<bootstrapped? [db]
(go-pair
(->
(:sqlite-connection db)
(s/all-rows ["SELECT EXISTS(SELECT 1 FROM transactions LIMIT 1) AS bootstrapped"])
(<?)
(first)
(:bootstrapped)
(not= 0))))
;; TODO: use q for searching? Have q use this for searching for a single pattern?
(<ea [db e a]
(go-pair
@ -272,15 +280,21 @@
["DELETE FROM datoms WHERE (e = ? AND a = ? AND value_type_tag = ? AND v = ?)" e a tag v])))))))
db))
(<advance-tx [db]
(<next-eid [db tempid]
{:pre [(id-literal? tempid)]}
{:post [ds/entid?]}
(go-pair
(let [exec (partial s/execute! (:sqlite-connection db))]
;; (let [ret (<? (exec
;; ;; TODO: be more clever about UPDATE OR ...?
;; ["UPDATE metadata SET current_tx = ? WHERE current_tx = ?" (inc (:current-tx db)) (:current-tx db)]))]
;; TODO: keep all of these eid allocations in the transaction report and apply them at the end
;; of the transaction.
(let [exec (partial s/execute! (:sqlite-connection db))
part (entid db (:part tempid))]
(when-not (ds/entid? part) ;; TODO: cache parts materialized view.
(raise "Cannot allocate entid for id-literal " tempid " because part " (:part tempid) " is not known"
{:error :db/bad-part
:part (:part tempid)}))
;; TODO: handle exclusion across transactions here.
(update db :current-tx inc))))
(<? (exec ["UPDATE parts SET idx = idx + 1 WHERE part = ?" part]))
(:eid (first (<? (s/all-rows (:sqlite-connection db) ["SELECT (start + idx) AS eid FROM parts WHERE part = ?" part])))))))
(<apply-db-ident-assertions [db added-idents merge]
(go-pair
@ -321,7 +335,7 @@
(defn with-ident [db ident entid]
(update db :ident-map #(assoc % ident entid, entid ident)))
(defn db [sqlite-connection idents schema current-tx]
(defn db [sqlite-connection idents schema]
{:pre [(map? idents)
(every? keyword? (keys idents))
(map? schema)
@ -333,7 +347,8 @@
:ident-map ident-map
:symbolic-schema schema
:schema entid-schema
:current-tx current-tx})))
;; TODO :parts
})))
;; TODO: factor this into the overall design.
(defn <?run

View file

@ -23,9 +23,6 @@
(:import
[datomish.datom Datom])))
;; TODO: implement support for DB parts?
(def tx0 0x2000000)
(defn <idents [sqlite-connection]
"Read the ident map materialized view from the given SQLite store.
Returns a map (keyword ident) -> (integer entid), like {:db/ident 0}."
@ -37,14 +34,6 @@
(s/all-rows sqlite-connection)))]
(into {} (map (fn [row] [(sqlite-schema/<-SQLite :db.type/keyword (:ident row)) (:entid row)])) rows))))
(defn <current-tx [sqlite-connection]
"Find the largest tx written to the SQLite store.
Returns an integer, -1 if no transactions have been written yet."
(go-pair
(let [rows (<? (s/all-rows sqlite-connection ["SELECT COALESCE(MAX(tx), -1) AS current_tx FROM transactions"]))]
(:current_tx (first rows)))))
(defn <symbolic-schema [sqlite-connection]
"Read the schema map materialized view from the given SQLite store.
Returns a map (keyword ident) -> (map (keyword attribute -> keyword value)), like
@ -83,10 +72,9 @@
(when-not (= sqlite-schema/current-version (<? (sqlite-schema/<ensure-current-version sqlite-connection)))
(raise "Could not ensure current SQLite schema version."))
(let [current-tx (<? (<current-tx sqlite-connection))
bootstrapped (>= current-tx 0)
current-tx (max current-tx tx0)]
(when-not bootstrapped
(let [db (db/db sqlite-connection bootstrap/idents bootstrap/symbolic-schema)
bootstrapped? (<? (db/<bootstrapped? db))]
(when-not bootstrapped?
;; We need to bootstrap the DB.
(let [fail-alter-ident (fn [old new] (if-not (= old new)
(raise "Altering idents is not yet supported, got " new " altering existing ident " old
@ -96,7 +84,17 @@
(raise "Altering schema attributes is not yet supported, got " new " altering existing schema attribute " old
{:error :schema/alter-schema :old old :new new})
new))]
(-> (db/db sqlite-connection bootstrap/idents bootstrap/symbolic-schema current-tx)
(do
(let [exec (partial s/execute! (:sqlite-connection db))]
;; TODO: allow inserting new parts.
;; TODO: think more carefully about allocating new parts and bitmasking part ranges.
(<? (exec
["INSERT INTO parts VALUES (?, ?, ?)" (db/entid db :db.part/db) 0x0 (inc (apply max (vals bootstrap/idents)))]))
(<? (exec
["INSERT INTO parts VALUES (?, ?, ?)" (db/entid db :db.part/user) 0x10000 0]))
(<? (exec
["INSERT INTO parts VALUES (?, ?, ?)" (db/entid db :db.part/tx) 0x10000000 0])))
(-> db
;; We use <with-internal rather than <transact! to apply the bootstrap transaction
;; data but to not follow the regular schema application process. We can't apply the
;; schema changes, since the applied datoms would conflict with the bootstrapping
@ -105,12 +103,13 @@
;; datoms to the store. It's feasible but awkward.) After bootstrapping, we read
;; back the idents and schema, just like when we re-open.
(transact/<with-internal (bootstrap/tx-data) fail-alter-ident fail-alter-attr)
(<?))))
(<?)))))
;; We just bootstrapped, or we are returning to an already bootstrapped DB.
(let [idents (<? (<idents sqlite-connection))
symbolic-schema (<? (<symbolic-schema sqlite-connection))]
(when-not bootstrapped
(when-not bootstrapped?
;; TODO: parts.
(when (not (= idents bootstrap/idents))
(raise "After bootstrapping database, expected new materialized idents and old bootstrapped idents to be identical"
{:error :bootstrap/bad-idents,
@ -121,4 +120,4 @@
{:error :bootstrap/bad-symbolic-schema,
:new symbolic-schema :old bootstrap/symbolic-schema
})))
(db/db sqlite-connection idents symbolic-schema (inc current-tx))))))
(db/db sqlite-connection idents symbolic-schema)))))

View file

@ -9,6 +9,9 @@
[datomish.sqlite-schema :as sqlite-schema]
[datomish.util :as util #?(:cljs :refer-macros :clj :refer) [raise]]))
(defn entid? [x]
(and (integer? x) (pos? x)))
(defprotocol ISchema
(attrs-by
[schema property]
@ -97,7 +100,7 @@
:value v}))))
(def value-type-map
{:db.type/ref { :valid? #(and (integer? %) (pos? %)) }
{:db.type/ref { :valid? entid? }
:db.type/keyword { :valid? keyword? }
:db.type/string { :valid? string? }
:db.type/boolean { :valid? #?(:clj #(instance? Boolean %) :cljs #(= js/Boolean (type %))) }

View file

@ -83,6 +83,7 @@
;; TODO: allow arbitrary schema values (true/false) and tag the resulting values.
"CREATE TABLE schema (ident TEXT NOT NULL, attr TEXT NOT NULL, value TEXT NOT NULL, FOREIGN KEY (ident) REFERENCES idents (ident))"
"CREATE INDEX idx_schema_unique ON schema (ident, attr, value)"
"CREATE TABLE parts (part INTEGER NOT NULL PRIMARY KEY, start INTEGER NOT NULL, idx INTEGER NOT NULL)"
])
(defn <create-current-version

View file

@ -56,7 +56,8 @@
(defrecord TxReport [db-before ;; The DB before the transaction.
db-after ;; The DB after the transaction.
current-tx ;; The tx ID represented by the transaction in this report.
tx ;; The tx ID represented by the transaction in this report; refer :db/tx.
txInstant ;; The timestamp instant when the the transaction was processed/committed in this report; refer :db/txInstant.
entities ;; The set of entities (like [:db/add e a v tx]) processed.
tx-data ;; The set of datoms applied to the database, like (Datom. e a v tx added).
tempids ;; The map from id-literal -> numeric entid.
@ -293,22 +294,22 @@
allocated-eid (get-in report [:tempids e])]
(if (and upserted-eid allocated-eid (not= upserted-eid allocated-eid))
(<? (<retry-with-tempid db initial-report initial-entities e upserted-eid)) ;; TODO: not initial report, just the sorted entities here.
(let [eid (or upserted-eid allocated-eid (next-eid db))]
(let [eid (or upserted-eid allocated-eid (<? (db/<next-eid db e)))]
(recur (allocate-eid report e eid) (cons [op eid a v] entities)))))
;; Start allocating and retrying. We try with e last, so as to eventually upsert it.
(id-literal? v)
;; We can't fail with unbound literals here, since we could have multiple.
(let [eid (or (get-in report [:tempids v]) (next-eid db))]
(let [eid (or (get-in report [:tempids v]) (<? (db/<next-eid db e)))]
(recur (allocate-eid report v eid) (cons [op e a eid] entities)))
(id-literal? a)
;; TODO: should we even allow id-literal attributes? Datomic fails in some cases here.
(let [eid (or (get-in report [:tempids a]) (next-eid db))]
(let [eid (or (get-in report [:tempids a]) (<? (db/<next-eid db e)))]
(recur (allocate-eid report a eid) (cons [op e eid v] entities)))
(id-literal? e)
(let [eid (or (get-in report [:tempids e]) (next-eid db))]
(let [eid (or (get-in report [:tempids e]) (<? (db/<next-eid db e)))]
(recur (allocate-eid report e eid) (cons [op eid a v] entities)))
true
@ -516,7 +517,7 @@
;; transaction ID and transaction timestamp directly from the report; Datomic
;; makes this surprisingly difficult: one needs a :db.part/tx temporary and an
;; explicit upsert of that temporary.
:tx (db/current-tx db)
:tx (<? (db/<next-eid db (id-literal :db.part/tx)))
:txInstant (db/now db)
:entities tx-data
:tx-data []
@ -542,9 +543,6 @@
(<?)
(db/<apply-db-install-assertions (:added-attributes report) merge-attr)
(<?)
(db/<advance-tx)
(<?))]
(-> report
(assoc-in [:db-after] db-after)))))

View file

@ -52,8 +52,8 @@
:db/noHistory 13
:db/add 14
:db/retract 15
:db.part/tx 16
:db.part/user 17
:db.part/user 16
:db.part/tx 17
:db/excise 18
:db.excise/attrs 19
:db.excise/beforeT 20

View file

@ -73,47 +73,46 @@
(<?)
(mapv #(vector (:rowid %) (:text %))))))
;; TODO: use reverse refs!
(def test-schema
[{:db/id (d/id-literal :test -1)
[{:db/id (d/id-literal :db.part/user)
:db/ident :x
:db/unique :db.unique/identity
:db/valueType :db.type/long}
{:db/id :db.part/db :db.install/attribute (d/id-literal :test -1)}
{:db/id (d/id-literal :test -2)
:db/valueType :db.type/long
:db.install/_attribute :db.part/db}
{:db/id (d/id-literal :db.part/user)
:db/ident :name
:db/unique :db.unique/identity
:db/valueType :db.type/string}
{:db/id :db.part/db :db.install/attribute (d/id-literal :test -2)}
{:db/id (d/id-literal :test -3)
:db/valueType :db.type/string
:db.install/_attribute :db.part/db}
{:db/id (d/id-literal :db.part/user)
:db/ident :y
:db/cardinality :db.cardinality/many
:db/valueType :db.type/long}
{:db/id :db.part/db :db.install/attribute (d/id-literal :test -3)}
{:db/id (d/id-literal :test -5)
:db/valueType :db.type/long
:db.install/_attribute :db.part/db}
{:db/id (d/id-literal :db.part/user)
:db/ident :aka
:db/cardinality :db.cardinality/many
:db/valueType :db.type/string}
{:db/id :db.part/db :db.install/attribute (d/id-literal :test -5)}
{:db/id (d/id-literal :test -6)
:db/valueType :db.type/string
:db.install/_attribute :db.part/db}
{:db/id (d/id-literal :db.part/user)
:db/ident :age
:db/valueType :db.type/long}
{:db/id :db.part/db :db.install/attribute (d/id-literal :test -6)}
{:db/id (d/id-literal :test -7)
:db/valueType :db.type/long
:db.install/_attribute :db.part/db}
{:db/id (d/id-literal :db.part/user)
:db/ident :email
:db/unique :db.unique/identity
:db/valueType :db.type/string}
{:db/id :db.part/db :db.install/attribute (d/id-literal :test -7)}
{:db/id (d/id-literal :test -8)
:db/valueType :db.type/string
:db.install/_attribute :db.part/db}
{:db/id (d/id-literal :db.part/user)
:db/ident :spouse
:db/unique :db.unique/value
:db/valueType :db.type/string}
{:db/id :db.part/db :db.install/attribute (d/id-literal :test -8)}
{:db/id (d/id-literal :test -9)
:db/valueType :db.type/string
:db.install/_attribute :db.part/db}
{:db/id (d/id-literal :db.part/user)
:db/ident :friends
:db/cardinality :db.cardinality/many
:db/valueType :db.type/ref}
{:db/id :db.part/db :db.install/attribute (d/id-literal :test -9)}
:db/valueType :db.type/ref
:db.install/_attribute :db.part/db}
])
(deftest-async test-add-one
@ -630,3 +629,31 @@
(finally
(<? (d/<close conn)))))))
(deftest-async test-next-eid
(with-tempfile [t (tempfile)]
(let [conn (<? (d/<connect t))
{tx0 :tx} (<? (d/<transact! conn test-schema))]
(testing "entids are increasing, tx ids are larger than user ids"
(let [r1 (<? (d/<transact! conn [{:db/id (d/id-literal :db.part/user -1) :name "Igor"}]))
r2 (<? (d/<transact! conn [{:db/id (d/id-literal :db.part/user -2) :name "Oleg"}]))
e1 (get (tempids r1) -1)
e2 (get (tempids r2) -2)]
(is (< e1 (:tx r1)))
(is (< e2 (:tx r2)))
(is (< e1 e2))
(is (< (:tx r1) (:tx r2)))
;; Close and re-open same DB.
(<? (d/<close conn))
(let [conn (<? (d/<connect t))]
(try
(testing "entid counters are persisted across re-opens"
(let [r3 (<? (d/<transact! conn [{:db/id (d/id-literal :db.part/user -3) :name "Petr"}]))
e3 (get (tempids r3) -3)]
(is (< e3 (:tx r3)))
(is (< e2 e3))
(is (< (:tx r2) (:tx r3)))))
(finally
(<? (d/<close conn))))))))))

View file

@ -29,7 +29,7 @@
(def Throwable js/Error))
(def test-schema
[{:db/id (d/id-literal :test -1)
[{:db/id (d/id-literal :db.part/user)
:db/ident :x
:db/unique :db.unique/identity
:db/valueType :db.type/long

View file

@ -4,6 +4,7 @@
[cljs.test :as t :refer-macros [is are deftest testing]]
datomish.promise-sqlite-test
datomish.db-test
datomish.query-test
datomish.sqlite-user-version-test
datomish.test.util
datomish.test.transforms
@ -13,6 +14,7 @@
(doo-tests
'datomish.promise-sqlite-test
'datomish.db-test
'datomish.query-test
'datomish.sqlite-user-version-test
'datomish.test.util
'datomish.test.transforms