Completely rewrite main transaction logic to be faster.
This is almost complete; it passes the test suite save for retracting fulltext datoms correctly. There's a lot to say about this approach, but I don't have time to give too many details. The broad outline is as follows. We collect datoms to add and retract in a tx_lookup table. Depending on flags ("search value" sv and "search value type tag" svalue_type_tag) we "complete" the tx_lookup table by joining matching datoms. This allows us to find datoms that are present (and should not be added as part of the transaction, or should be retracted as part of the transaction, or should be replaced as part of the transaction. We complete the tx_lookup (in place!) in two separate INSERTs to avoid a quadratic two-table walk (explain the queries to observe that both INSERTs walk the lookup table once and then use the datoms indexes to complete the matching values). We could simplify the code by using multiple lookup tables, both for the two cases of search parameters (eav vs. ea) and for the incomplete and completed rows. Right now we differentiate the former with NULL checks, and the latter by incrementing the added0 column. It performs well enough, so I haven't tried to understand the performance of separating these things. After the tx_lookup table is completed, we build the transaction from it; and update the datoms materialized view table as well. Observe the careful handling of the "search value" sv parameters to handle replacing :db.cardinality/one datoms. Finally, we read the processed transaction back to produce to the API. This is strictly to match the Datomic API; we might make allow to skip this, since many consumers will not want to stream this over the wire. Rough timings show the transactor processing a single >50k datom transaction in about 3.5s, of which less than 0.5s is spent in the expensive joins. Further, repeating the processing of the same transaction is only about 3.5s again! That's the worst possible for the joins, since every single inserted datom will already be present in the database, making the most expensive join match every row.
This commit is contained in:
parent
4a46bdd1bd
commit
badec36aaa
5 changed files with 231 additions and 214 deletions
|
@ -9,6 +9,7 @@
|
||||||
[datascript "0.15.1"]
|
[datascript "0.15.1"]
|
||||||
[honeysql "0.8.0"]
|
[honeysql "0.8.0"]
|
||||||
[com.datomic/datomic-free "0.9.5359"]
|
[com.datomic/datomic-free "0.9.5359"]
|
||||||
|
[com.taoensso/tufte "1.0.2"]
|
||||||
[jamesmacaulay/cljs-promises "0.1.0"]]
|
[jamesmacaulay/cljs-promises "0.1.0"]]
|
||||||
|
|
||||||
:cljsbuild {:builds {:release {
|
:cljsbuild {:builds {:release {
|
||||||
|
|
|
@ -14,10 +14,13 @@
|
||||||
[datomish.query.source :as source]
|
[datomish.query.source :as source]
|
||||||
[datomish.query :as query]
|
[datomish.query :as query]
|
||||||
[datomish.datom :as dd :refer [datom datom? #?@(:cljs [Datom])]]
|
[datomish.datom :as dd :refer [datom datom? #?@(:cljs [Datom])]]
|
||||||
[datomish.util :as util #?(:cljs :refer-macros :clj :refer) [raise raise-str cond-let]]
|
[datomish.util :as util
|
||||||
|
#?(:cljs :refer-macros :clj :refer) [raise raise-str cond-let]]
|
||||||
[datomish.schema :as ds]
|
[datomish.schema :as ds]
|
||||||
[datomish.sqlite :as s]
|
[datomish.sqlite :as s]
|
||||||
[datomish.sqlite-schema :as sqlite-schema]
|
[datomish.sqlite-schema :as sqlite-schema]
|
||||||
|
[taoensso.tufte :as tufte
|
||||||
|
#?(:cljs :refer-macros :clj :refer) [defnp p profiled profile]]
|
||||||
#?@(:clj [[datomish.pair-chan :refer [go-pair <?]]
|
#?@(:clj [[datomish.pair-chan :refer [go-pair <?]]
|
||||||
[clojure.core.async :as a :refer [chan go <! >!]]])
|
[clojure.core.async :as a :refer [chan go <! >!]]])
|
||||||
#?@(:cljs [[datomish.pair-chan]
|
#?@(:cljs [[datomish.pair-chan]
|
||||||
|
@ -91,19 +94,13 @@
|
||||||
(<bootstrapped? [db]
|
(<bootstrapped? [db]
|
||||||
"Return true if this database has no transactions yet committed.")
|
"Return true if this database has no transactions yet committed.")
|
||||||
|
|
||||||
(<ea [db e a]
|
|
||||||
"Search for datoms using the EAVT index.")
|
|
||||||
|
|
||||||
(<eav [db e a v]
|
|
||||||
"Search for datoms using the EAVT index.")
|
|
||||||
|
|
||||||
(<av
|
(<av
|
||||||
[db a v]
|
[db a v]
|
||||||
"Search for datoms using the AVET index.")
|
"Search for datoms using the AVET index.")
|
||||||
|
|
||||||
(<apply-datoms
|
(<apply-entities
|
||||||
[db datoms]
|
[db tx entities]
|
||||||
"Apply datoms to the store.")
|
"Apply entities to the store, returning sequence of datoms transacted.")
|
||||||
|
|
||||||
(<apply-db-ident-assertions
|
(<apply-db-ident-assertions
|
||||||
[db added-idents merge]
|
[db added-idents merge]
|
||||||
|
@ -204,34 +201,6 @@
|
||||||
(:bootstrapped)
|
(:bootstrapped)
|
||||||
(not= 0))))
|
(not= 0))))
|
||||||
|
|
||||||
;; TODO: use q for searching? Have q use this for searching for a single pattern?
|
|
||||||
(<ea [db e a]
|
|
||||||
(go-pair
|
|
||||||
(->>
|
|
||||||
{:select [:e :a :v :tx [1 :added]]
|
|
||||||
:from [:all_datoms]
|
|
||||||
:where [:and [:= :e e] [:= :a a]]}
|
|
||||||
(s/format) ;; TODO: format these statements only once.
|
|
||||||
|
|
||||||
(s/all-rows (:sqlite-connection db))
|
|
||||||
(<?)
|
|
||||||
|
|
||||||
(mapv (partial row->Datom (.-schema db))))))
|
|
||||||
|
|
||||||
(<eav [db e a v]
|
|
||||||
(let [[v tag] (ds/->SQLite schema a v)]
|
|
||||||
(go-pair
|
|
||||||
(->>
|
|
||||||
{:select [:e :a :v :tx [1 :added]] ;; TODO: generalize columns.
|
|
||||||
:from [:all_datoms]
|
|
||||||
:where [:and [:= :e e] [:= :a a] [:= :value_type_tag tag] [:= :v v]]}
|
|
||||||
(s/format) ;; TODO: format these statements only once.
|
|
||||||
|
|
||||||
(s/all-rows (:sqlite-connection db))
|
|
||||||
(<?)
|
|
||||||
|
|
||||||
(mapv (partial row->Datom (.-schema db))))))) ;; TODO: understand why (schema db) fails.
|
|
||||||
|
|
||||||
(<av [db a v]
|
(<av [db a v]
|
||||||
(let [[v tag] (ds/->SQLite schema a v)]
|
(let [[v tag] (ds/->SQLite schema a v)]
|
||||||
(go-pair
|
(go-pair
|
||||||
|
@ -246,39 +215,6 @@
|
||||||
|
|
||||||
(mapv (partial row->Datom (.-schema db))))))) ;; TODO: understand why (schema db) fails.
|
(mapv (partial row->Datom (.-schema db))))))) ;; TODO: understand why (schema db) fails.
|
||||||
|
|
||||||
(<apply-datoms [db datoms]
|
|
||||||
(go-pair
|
|
||||||
(let [exec (partial s/execute! (:sqlite-connection db))
|
|
||||||
schema (.-schema db)] ;; TODO: understand why (schema db) fails.
|
|
||||||
;; TODO: batch insert, batch delete.
|
|
||||||
(doseq [datom datoms]
|
|
||||||
(let [[e a v tx added] datom
|
|
||||||
[v tag] (ds/->SQLite schema a v)
|
|
||||||
fulltext? (ds/fulltext? schema a)]
|
|
||||||
;; Append to transaction log.
|
|
||||||
(<? (exec
|
|
||||||
["INSERT INTO transactions VALUES (?, ?, ?, ?, ?, ?)" e a v tx (if added 1 0) tag]))
|
|
||||||
;; Update materialized datom view.
|
|
||||||
(if (.-added datom)
|
|
||||||
(let [v (if fulltext?
|
|
||||||
(<? (<insert-fulltext-value db v))
|
|
||||||
v)]
|
|
||||||
(<? (exec
|
|
||||||
["INSERT INTO datoms VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)" e a v tx
|
|
||||||
tag ;; value_type_tag
|
|
||||||
(ds/indexing? schema a) ;; index_avet
|
|
||||||
(ds/ref? schema a) ;; index_vaet
|
|
||||||
fulltext? ;; index_fulltext
|
|
||||||
(ds/unique? schema a) ;; unique_value
|
|
||||||
])))
|
|
||||||
(if fulltext?
|
|
||||||
(<? (exec
|
|
||||||
;; TODO: in the future, purge fulltext values from the fulltext_datoms table.
|
|
||||||
["DELETE FROM datoms WHERE (e = ? AND a = ? AND value_type_tag = ? AND v IN (SELECT rowid FROM fulltext_values WHERE text = ?))" e a tag v]))
|
|
||||||
(<? (exec
|
|
||||||
["DELETE FROM datoms WHERE (e = ? AND a = ? AND value_type_tag = ? AND v = ?)" e a tag v])))))))
|
|
||||||
db))
|
|
||||||
|
|
||||||
(<next-eid [db tempid]
|
(<next-eid [db tempid]
|
||||||
{:pre [(id-literal? tempid)]}
|
{:pre [(id-literal? tempid)]}
|
||||||
{:post [ds/entid?]}
|
{:post [ds/entid?]}
|
||||||
|
@ -295,6 +231,134 @@
|
||||||
(<? (exec ["UPDATE parts SET idx = idx + 1 WHERE part = ?" part]))
|
(<? (exec ["UPDATE parts SET idx = idx + 1 WHERE part = ?" part]))
|
||||||
(:eid (first (<? (s/all-rows (:sqlite-connection db) ["SELECT (start + idx) AS eid FROM parts WHERE part = ?" part])))))))
|
(:eid (first (<? (s/all-rows (:sqlite-connection db) ["SELECT (start + idx) AS eid FROM parts WHERE part = ?" part])))))))
|
||||||
|
|
||||||
|
(<apply-entities [db tx entities]
|
||||||
|
{:pre [(db? db) (sequential? entities)]}
|
||||||
|
(go-pair
|
||||||
|
(let [schema (.-schema db)
|
||||||
|
many? (memoize (fn [a] (ds/multival? schema a)))
|
||||||
|
<exec (partial s/execute! (:sqlite-connection db))]
|
||||||
|
(p :delete-tx-lookup-before
|
||||||
|
(<? (<exec ["DELETE FROM tx_lookup"])))
|
||||||
|
|
||||||
|
(p :insertions
|
||||||
|
(try
|
||||||
|
(doseq [entity entities]
|
||||||
|
(let [[op e a v] entity
|
||||||
|
[v tag] (ds/->SQLite schema a v)
|
||||||
|
fulltext? (ds/fulltext? schema a)]
|
||||||
|
(cond
|
||||||
|
(= op :db/add)
|
||||||
|
(let [v (if fulltext?
|
||||||
|
(<? (<insert-fulltext-value db v))
|
||||||
|
v)]
|
||||||
|
(if (many? a)
|
||||||
|
;; :db.cardinality/many
|
||||||
|
(<? (<exec [(str "INSERT INTO tx_lookup (e0, a0, v0, tx0, added0, value_type_tag0, index_avet0, index_vaet0, index_fulltext0, unique_value0, sv, svalue_type_tag) VALUES ("
|
||||||
|
"?, ?, ?, ?, 1, ?, " ;; e0, a0, v0, tx0, added0, value_type_tag0
|
||||||
|
"?, ?, ?, ?, " ;; flags0
|
||||||
|
"?, ?" ;; sv, svalue_type_tag
|
||||||
|
")")
|
||||||
|
e a v tx tag
|
||||||
|
(ds/indexing? schema a) ;; index_avet
|
||||||
|
(ds/ref? schema a) ;; index_vaet
|
||||||
|
fulltext? ;; index_fulltext
|
||||||
|
(ds/unique? schema a) ;; unique_value
|
||||||
|
v tag
|
||||||
|
]))
|
||||||
|
;; :db.cardinality/one
|
||||||
|
(do
|
||||||
|
(<? (<exec [(str "INSERT INTO tx_lookup (e0, a0, v0, tx0, added0, value_type_tag0, index_avet0, index_vaet0, index_fulltext0, unique_value0, sv, svalue_type_tag) VALUES ("
|
||||||
|
"?, ?, ?, ?, ?, ?, " ;; TODO: order value and tag closer together.
|
||||||
|
"?, ?, ?, ?, " ;; flags0
|
||||||
|
"?, ?" ;; sv, svalue_type_tag
|
||||||
|
")")
|
||||||
|
e a v tx 1 tag
|
||||||
|
(ds/indexing? schema a) ;; index_avet
|
||||||
|
(ds/ref? schema a) ;; index_vaet
|
||||||
|
fulltext? ;; index_fulltext
|
||||||
|
(ds/unique? schema a) ;; unique_value
|
||||||
|
v tag
|
||||||
|
]))
|
||||||
|
(<? (<exec [(str "INSERT INTO tx_lookup (e0, a0, v0, tx0, added0, value_type_tag0, sv, svalue_type_tag) VALUES ("
|
||||||
|
"?, ?, ?, ?, ?, ?, "
|
||||||
|
"?, ?" ;; sv, svalue_type_tag
|
||||||
|
")")
|
||||||
|
e a v tx 0 tag
|
||||||
|
nil nil ;; Search values.
|
||||||
|
])))))
|
||||||
|
|
||||||
|
(= op :db/retract)
|
||||||
|
(<? (<exec [(str "INSERT INTO tx_lookup (e0, a0, v0, tx0, added0, value_type_tag0, sv, svalue_type_tag) VALUES ("
|
||||||
|
"?, ?, ?, ?, ?, ?, "
|
||||||
|
"?, ?" ;; sv, svalue_type_tag
|
||||||
|
")")
|
||||||
|
e a v tx 0 tag
|
||||||
|
v tag
|
||||||
|
]))
|
||||||
|
|
||||||
|
true
|
||||||
|
(raise "Unknown operation at " entity ", expected :db/add, :db/retract"
|
||||||
|
{:error :transact/syntax, :operation op, :tx-data entity}))))
|
||||||
|
|
||||||
|
(catch java.sql.SQLException e
|
||||||
|
(throw (ex-info "Transaction violates cardinality constraint" {} e))))) ;; TODO: say more about the conflicting datoms.
|
||||||
|
|
||||||
|
(p :join
|
||||||
|
;; Fast, only one table walk: lookup by exact eav.
|
||||||
|
(p :join-eav
|
||||||
|
(<? (<exec [(str "INSERT INTO tx_lookup SELECT t.e0, t.a0, t.v0, t.tx0, t.added0 + 2, t.value_type_tag0, t.index_avet0, t.index_vaet0, t.index_fulltext0, t.unique_value0, t.sv, t.svalue_type_tag, d.rowid, d.e, d.a, d.v, d.tx, d.value_type_tag FROM tx_lookup AS t LEFT JOIN datoms AS d ON t.e0 = d.e AND t.a0 = d.a AND t.sv = d.v AND t.svalue_type_tag = d.value_type_tag AND t.sv IS NOT NULL")])))
|
||||||
|
|
||||||
|
;; Slower, but still only one table walk: lookup old value by ea.
|
||||||
|
(p :join-ea
|
||||||
|
(<? (<exec [(str "INSERT INTO tx_lookup SELECT t.e0, t.a0, t.v0, t.tx0, t.added0 + 2, t.value_type_tag0, t.index_avet0, t.index_vaet0, t.index_fulltext0, t.unique_value0, t.sv, t.svalue_type_tag, d.rowid, d.e, d.a, d.v, d.tx, d.value_type_tag FROM tx_lookup AS t, datoms AS d WHERE t.sv IS NULL AND t.e0 = d.e AND t.a0 = d.a")]))))
|
||||||
|
|
||||||
|
(p :insert-transaction
|
||||||
|
(p :insert-transaction-added
|
||||||
|
;; Add datoms that aren't already present.
|
||||||
|
(<? (<exec [(str "INSERT INTO transactions (e, a, v, tx, added, value_type_tag) "
|
||||||
|
"SELECT e0, a0, v0, ?, 1, value_type_tag0 "
|
||||||
|
"FROM tx_lookup "
|
||||||
|
"WHERE added0 IS 3 AND e IS NULL") tx]))) ;; TODO: get rid of magic value 3.
|
||||||
|
|
||||||
|
(p :insert-transaction-retracted
|
||||||
|
;; Retract datoms carefully, either when they're matched exactly or when the existing value doesn't match the new value.
|
||||||
|
(<? (<exec [(str "INSERT INTO transactions (e, a, v, tx, added, value_type_tag) "
|
||||||
|
"SELECT e, a, v, ?, 0, value_type_tag "
|
||||||
|
"FROM tx_lookup "
|
||||||
|
"WHERE added0 IS 2 AND ((sv IS NOT NULL) OR (sv IS NULL AND v0 IS NOT v)) AND v IS NOT NULL") tx])))) ;; TODO: get rid of magic value 2.
|
||||||
|
|
||||||
|
(try
|
||||||
|
(p :update-datoms-materialized-view
|
||||||
|
(p :insert-datoms-added
|
||||||
|
;; Add datoms that aren't already present.
|
||||||
|
(<? (<exec [(str "INSERT INTO datoms (e, a, v, tx, value_type_tag, index_avet, index_vaet, index_fulltext, unique_value) "
|
||||||
|
"SELECT e0, a0, v0, ?, value_type_tag0, "
|
||||||
|
"index_avet0, index_vaet0, index_fulltext0, unique_value0 "
|
||||||
|
"FROM tx_lookup "
|
||||||
|
"WHERE added0 IS 3 AND e IS NULL") tx])) ;; TODO: get rid of magic value 3.)
|
||||||
|
|
||||||
|
;; TODO: retract fulltext datoms correctly.
|
||||||
|
(p :delete-datoms-retracted
|
||||||
|
(<? (<exec [(str "WITH ids AS (SELECT l.rid FROM tx_lookup AS l WHERE l.added0 IS 2 AND ((l.sv IS NOT NULL) OR (l.sv IS NULL AND l.v0 IS NOT l.v))) " ;; TODO: get rid of magic value 2.
|
||||||
|
"DELETE FROM datoms WHERE rowid IN ids"
|
||||||
|
)])))))
|
||||||
|
|
||||||
|
(catch java.sql.SQLException e
|
||||||
|
(throw (ex-info "Transaction violates unique constraint" {} e)))) ;; TODO: say more about the conflicting datoms.
|
||||||
|
|
||||||
|
;; The lookup table takes space on disk, so we purge it aggressively.
|
||||||
|
(p :delete-tx-lookup-after
|
||||||
|
(<? (<exec ["DELETE FROM tx_lookup"])))
|
||||||
|
|
||||||
|
;; The transaction has been written -- read it back. (We index on tx, so the following is fast.)
|
||||||
|
(let [tx-data (p :select-tx-data
|
||||||
|
(->>
|
||||||
|
(s/all-rows (:sqlite-connection db) ["SELECT * FROM transactions WHERE tx = ?" tx])
|
||||||
|
(<?)
|
||||||
|
|
||||||
|
(mapv (partial row->Datom schema))))]
|
||||||
|
tx-data))))
|
||||||
|
|
||||||
(<apply-db-ident-assertions [db added-idents merge]
|
(<apply-db-ident-assertions [db added-idents merge]
|
||||||
(go-pair
|
(go-pair
|
||||||
(let [exec (partial s/execute! (:sqlite-connection db))]
|
(let [exec (partial s/execute! (:sqlite-connection db))]
|
||||||
|
|
|
@ -23,8 +23,26 @@
|
||||||
index_avet TINYINT NOT NULL DEFAULT 0, index_vaet TINYINT NOT NULL DEFAULT 0,
|
index_avet TINYINT NOT NULL DEFAULT 0, index_vaet TINYINT NOT NULL DEFAULT 0,
|
||||||
index_fulltext TINYINT NOT NULL DEFAULT 0,
|
index_fulltext TINYINT NOT NULL DEFAULT 0,
|
||||||
unique_value TINYINT NOT NULL DEFAULT 0)"
|
unique_value TINYINT NOT NULL DEFAULT 0)"
|
||||||
"CREATE INDEX idx_datoms_eavt ON datoms (e, a, value_type_tag, v)"
|
"CREATE UNIQUE INDEX idx_datoms_eavt ON datoms (e, a, value_type_tag, v)"
|
||||||
"CREATE INDEX idx_datoms_aevt ON datoms (a, e, value_type_tag, v)"
|
"CREATE UNIQUE INDEX idx_datoms_aevt ON datoms (a, e, value_type_tag, v)"
|
||||||
|
|
||||||
|
;; n.b., v0/value_type_tag0 can be NULL, in which case we look up v from datoms;
|
||||||
|
;; and the datom columns are NULL into the LEFT JOIN fills them in.
|
||||||
|
;; TODO: update comment about sv.
|
||||||
|
"CREATE TABLE tx_lookup (e0 INTEGER NOT NULL, a0 SMALLINT NOT NULL, v0 BLOB NOT NULL, tx0 INTEGER NOT NULL, added0 TINYINT NOT NULL,
|
||||||
|
value_type_tag0 SMALLINT NOT NULL,
|
||||||
|
index_avet0 TINYINT, index_vaet0 TINYINT,
|
||||||
|
index_fulltext0 TINYINT,
|
||||||
|
unique_value0 TINYINT,
|
||||||
|
sv BLOB,
|
||||||
|
svalue_type_tag SMALLINT,
|
||||||
|
rid INTEGER,
|
||||||
|
e INTEGER, a SMALLINT, v BLOB, tx INTEGER, value_type_tag SMALLINT)"
|
||||||
|
|
||||||
|
"CREATE INDEX idx_tx_lookup_added ON tx_lookup (added0)"
|
||||||
|
|
||||||
|
;; Prevent overlapping transactions. TODO: drop added0?
|
||||||
|
"CREATE UNIQUE INDEX idx_tx_lookup_eavt ON tx_lookup (e0, a0, v0, added0, value_type_tag0) WHERE sv IS NOT NULL"
|
||||||
|
|
||||||
;; Opt-in index: only if a has :db/index true.
|
;; Opt-in index: only if a has :db/index true.
|
||||||
"CREATE UNIQUE INDEX idx_datoms_avet ON datoms (a, value_type_tag, v, e) WHERE index_avet IS NOT 0"
|
"CREATE UNIQUE INDEX idx_datoms_avet ON datoms (a, value_type_tag, v, e) WHERE index_avet IS NOT 0"
|
||||||
|
@ -45,7 +63,7 @@
|
||||||
"CREATE UNIQUE INDEX idx_datoms_unique_value ON datoms (a, value_type_tag, v) WHERE unique_value IS NOT 0"
|
"CREATE UNIQUE INDEX idx_datoms_unique_value ON datoms (a, value_type_tag, v) WHERE unique_value IS NOT 0"
|
||||||
|
|
||||||
"CREATE TABLE transactions (e INTEGER NOT NULL, a SMALLINT NOT NULL, v BLOB NOT NULL, tx INTEGER NOT NULL, added TINYINT NOT NULL DEFAULT 1, value_type_tag SMALLINT NOT NULL)"
|
"CREATE TABLE transactions (e INTEGER NOT NULL, a SMALLINT NOT NULL, v BLOB NOT NULL, tx INTEGER NOT NULL, added TINYINT NOT NULL DEFAULT 1, value_type_tag SMALLINT NOT NULL)"
|
||||||
"CREATE INDEX idx_transactions_tx ON transactions (tx)"
|
"CREATE INDEX idx_transactions_tx ON transactions (tx, added)"
|
||||||
|
|
||||||
;; Fulltext indexing.
|
;; Fulltext indexing.
|
||||||
;; A fulltext indexed value v is an integer rowid referencing fulltext_values.
|
;; A fulltext indexed value v is an integer rowid referencing fulltext_values.
|
||||||
|
|
|
@ -13,6 +13,7 @@
|
||||||
[datomish.query.source :as source]
|
[datomish.query.source :as source]
|
||||||
[datomish.query :as query]
|
[datomish.query :as query]
|
||||||
[datomish.db :as db :refer [id-literal id-literal?]]
|
[datomish.db :as db :refer [id-literal id-literal?]]
|
||||||
|
[datomish.db.debug :as debug]
|
||||||
[datomish.datom :as dd :refer [datom datom? #?@(:cljs [Datom])]]
|
[datomish.datom :as dd :refer [datom datom? #?@(:cljs [Datom])]]
|
||||||
[datomish.util :as util #?(:cljs :refer-macros :clj :refer) [raise raise-str cond-let]]
|
[datomish.util :as util #?(:cljs :refer-macros :clj :refer) [raise raise-str cond-let]]
|
||||||
[datomish.schema :as ds]
|
[datomish.schema :as ds]
|
||||||
|
@ -21,6 +22,8 @@
|
||||||
[datomish.sqlite-schema :as sqlite-schema]
|
[datomish.sqlite-schema :as sqlite-schema]
|
||||||
[datomish.transact.bootstrap :as bootstrap]
|
[datomish.transact.bootstrap :as bootstrap]
|
||||||
[datomish.transact.explode :as explode]
|
[datomish.transact.explode :as explode]
|
||||||
|
[taoensso.tufte :as tufte
|
||||||
|
#?(:cljs :refer-macros :clj :refer) [defnp p profiled profile]]
|
||||||
#?@(:clj [[datomish.pair-chan :refer [go-pair <?]]
|
#?@(:clj [[datomish.pair-chan :refer [go-pair <?]]
|
||||||
[clojure.core.async :as a :refer [chan go <! >!]]])
|
[clojure.core.async :as a :refer [chan go <! >!]]])
|
||||||
#?@(:cljs [[datomish.pair-chan]
|
#?@(:cljs [[datomish.pair-chan]
|
||||||
|
@ -334,124 +337,35 @@
|
||||||
(ds/ensure-valid-value schema a v)))
|
(ds/ensure-valid-value schema a v)))
|
||||||
report))
|
report))
|
||||||
|
|
||||||
(defn- <ensure-unique-constraints
|
|
||||||
"Throw unless all datoms in :tx-data obey the uniqueness constraints."
|
|
||||||
|
|
||||||
[db report]
|
|
||||||
{:pre [(db/db? db) (report? report)]}
|
|
||||||
|
|
||||||
;; TODO: consider accumulating errors to show more meaningful error reports.
|
|
||||||
;; TODO: constrain entities; constrain attributes.
|
|
||||||
|
|
||||||
(go-pair
|
|
||||||
;; TODO: comment on applying datoms that violate uniqueness.
|
|
||||||
(let [schema (db/schema db)
|
|
||||||
unique-datoms (transient {})] ;; map (nil, a, v)|(e, a, nil)|(e, a, v) -> datom.
|
|
||||||
(doseq [[e a v tx added :as datom] (:tx-data report)]
|
|
||||||
|
|
||||||
(when added
|
|
||||||
;; Check for violated :db/unique constraint between datom and existing store.
|
|
||||||
(when (ds/unique? schema a)
|
|
||||||
(when-let [found (first (<? (db/<av db a v)))]
|
|
||||||
(raise "Cannot add " datom " because of unique constraint: " found
|
|
||||||
{:error :transact/unique
|
|
||||||
:attribute a ;; TODO: map attribute back to ident.
|
|
||||||
:entity datom})))
|
|
||||||
|
|
||||||
;; Check for violated :db/unique constraint between datoms.
|
|
||||||
(when (ds/unique? schema a)
|
|
||||||
(let [key [nil a v]]
|
|
||||||
(when-let [other (get unique-datoms key)]
|
|
||||||
(raise "Cannot add " datom " and " other " because together they violate unique constraint"
|
|
||||||
{:error :transact/unique
|
|
||||||
:attribute a ;; TODO: map attribute back to ident.
|
|
||||||
:entity datom}))
|
|
||||||
(assoc! unique-datoms key datom)))
|
|
||||||
|
|
||||||
;; Check for violated :db/cardinality :db.cardinality/one constraint between datoms.
|
|
||||||
(when-not (ds/multival? schema a)
|
|
||||||
(let [key [e a nil]]
|
|
||||||
(when-let [other (get unique-datoms key)]
|
|
||||||
(raise "Cannot add " datom " and " other " because together they violate cardinality constraint"
|
|
||||||
{:error :transact/unique
|
|
||||||
:entity datom}))
|
|
||||||
(assoc! unique-datoms key datom)))
|
|
||||||
|
|
||||||
;; Check for duplicated datoms. Datomic doesn't allow overlapping writes, and we don't
|
|
||||||
;; want to guarantee order, so we don't either.
|
|
||||||
(let [key [e a v]]
|
|
||||||
(when-let [other (get unique-datoms key)]
|
|
||||||
(raise "Cannot add duplicate " datom
|
|
||||||
{:error :transact/unique
|
|
||||||
:entity datom}))
|
|
||||||
(assoc! unique-datoms key datom)))))
|
|
||||||
report))
|
|
||||||
|
|
||||||
(defn <entities->tx-data [db report]
|
|
||||||
{:pre [(db/db? db) (report? report)]}
|
|
||||||
(go-pair
|
|
||||||
(let [initial-report report
|
|
||||||
{tx :tx} report
|
|
||||||
schema (db/schema db)]
|
|
||||||
(loop [report initial-report
|
|
||||||
es (:entities initial-report)]
|
|
||||||
(let [[[op e a v :as entity] & entities] es]
|
|
||||||
(cond
|
|
||||||
(nil? entity)
|
|
||||||
report
|
|
||||||
|
|
||||||
(= op :db/add)
|
|
||||||
(if (ds/multival? schema a)
|
|
||||||
(if (empty? (<? (db/<eav db e a v)))
|
|
||||||
(recur (transact-report report (datom e a v tx true)) entities)
|
|
||||||
(recur report entities))
|
|
||||||
(if-let [^Datom old-datom (first (<? (db/<ea db e a)))]
|
|
||||||
(if (= (.-v old-datom) v)
|
|
||||||
(recur report entities)
|
|
||||||
(recur (-> report
|
|
||||||
(transact-report (datom e a (.-v old-datom) tx false))
|
|
||||||
(transact-report (datom e a v tx true)))
|
|
||||||
entities))
|
|
||||||
(recur (transact-report report (datom e a v tx true)) entities)))
|
|
||||||
|
|
||||||
(= op :db/retract)
|
|
||||||
(if (first (<? (db/<eav db e a v)))
|
|
||||||
(recur (transact-report report (datom e a v tx false)) entities)
|
|
||||||
(recur report entities))
|
|
||||||
|
|
||||||
true
|
|
||||||
(raise "Unknown operation at " entity ", expected :db/add, :db/retract"
|
|
||||||
{:error :transact/syntax, :operation op, :tx-data entity})))))))
|
|
||||||
|
|
||||||
(defn <transact-tx-data
|
(defn <transact-tx-data
|
||||||
[db report]
|
[db report]
|
||||||
{:pre [(db/db? db) (report? report)]}
|
{:pre [(db/db? db) (report? report)]}
|
||||||
|
|
||||||
(go-pair
|
(let [<apply-entities (fn [db report]
|
||||||
(->>
|
(go-pair
|
||||||
report
|
(let [tx-data (<? (db/<apply-entities db (:tx report) (:entities report)))]
|
||||||
(preprocess db)
|
(assoc report :tx-data tx-data))))]
|
||||||
|
(go-pair
|
||||||
|
(->>
|
||||||
|
report
|
||||||
|
(preprocess db)
|
||||||
|
|
||||||
(<resolve-lookup-refs db)
|
(<resolve-lookup-refs db)
|
||||||
(<?)
|
(<?)
|
||||||
|
(p :resolve-lookup-refs)
|
||||||
|
|
||||||
(<resolve-id-literals db)
|
(<resolve-id-literals db)
|
||||||
(<?)
|
(<?)
|
||||||
|
(p :resolve-id-literals)
|
||||||
|
|
||||||
(<ensure-schema-constraints db)
|
(<ensure-schema-constraints db)
|
||||||
(<?)
|
(<?)
|
||||||
|
(p :ensure-schema-constraints)
|
||||||
|
|
||||||
(<entities->tx-data db)
|
(<apply-entities db)
|
||||||
(<?)
|
(<?)
|
||||||
|
(p :apply-entities)
|
||||||
(<ensure-unique-constraints db)
|
))))
|
||||||
(<?))))
|
|
||||||
|
|
||||||
;; Normalize as [op int|id-literal int|id-literal value|id-literal]. ;; TODO: mention lookup-refs.
|
|
||||||
|
|
||||||
;; Replace lookup-refs with entids where possible.
|
|
||||||
|
|
||||||
;; Upsert or allocate id-literals.
|
|
||||||
|
|
||||||
(defn- is-ident? [db [_ a & _]]
|
(defn- is-ident? [db [_ a & _]]
|
||||||
(= a (db/entid db :db/ident)))
|
(= a (db/entid db :db/ident)))
|
||||||
|
@ -529,21 +443,25 @@
|
||||||
|
|
||||||
(<transact-tx-data db)
|
(<transact-tx-data db)
|
||||||
(<?)
|
(<?)
|
||||||
|
(p :transact-tx-data)
|
||||||
|
|
||||||
(collect-db-ident-assertions db)
|
(collect-db-ident-assertions db)
|
||||||
|
(p :collect-db-ident-assertions)
|
||||||
|
|
||||||
(collect-db-install-assertions db))
|
(collect-db-install-assertions db)
|
||||||
db-after (->
|
(p :collect-db-install-assertions))
|
||||||
db
|
|
||||||
|
|
||||||
(db/<apply-datoms (:tx-data report))
|
db-after (->
|
||||||
(<?)
|
db
|
||||||
|
|
||||||
(db/<apply-db-ident-assertions (:added-idents report) merge-ident)
|
(db/<apply-db-ident-assertions (:added-idents report) merge-ident)
|
||||||
(<?)
|
(<?)
|
||||||
|
(->> (p :apply-db-ident-assertions))
|
||||||
|
|
||||||
(db/<apply-db-install-assertions (:added-attributes report) merge-attr)
|
(db/<apply-db-install-assertions (:added-attributes report) merge-attr)
|
||||||
(<?))]
|
(<?)
|
||||||
|
(->> (p :apply-db-install-assertions)))
|
||||||
|
]
|
||||||
(-> report
|
(-> report
|
||||||
(assoc-in [:db-after] db-after)))))
|
(assoc-in [:db-after] db-after)))))
|
||||||
|
|
||||||
|
|
|
@ -412,32 +412,48 @@
|
||||||
tx0 (:tx (<? (d/<transact! conn schema)))]
|
tx0 (:tx (<? (d/<transact! conn schema)))]
|
||||||
(try
|
(try
|
||||||
(testing "Can add fulltext indexed datoms"
|
(testing "Can add fulltext indexed datoms"
|
||||||
(let [r (<? (d/<transact! conn [[:db/add 101 :test/fulltext "test this"]]))]
|
(let [{tx1 :tx txInstant1 :txInstant} (<? (d/<transact! conn [[:db/add 101 :test/fulltext "test this"]]))]
|
||||||
(is (= (<? (<fulltext-values (d/db conn)))
|
(is (= (<? (<fulltext-values (d/db conn)))
|
||||||
[[1 "test this"]]))
|
[[1 "test this"]]))
|
||||||
(is (= (<? (<datoms-after (d/db conn) tx0))
|
(is (= (<? (<datoms-after (d/db conn) tx0))
|
||||||
#{[101 :test/fulltext 1]})) ;; Values are raw; 1 is the rowid into fulltext_values.
|
#{[101 :test/fulltext 1]})) ;; Values are raw; 1 is the rowid into fulltext_values.
|
||||||
))
|
(is (= (<? (<transactions-after (d/db conn) tx0))
|
||||||
|
[[101 :test/fulltext 1 tx1 1] ;; Values are raw; 1 is the rowid into fulltext_values.
|
||||||
|
[tx1 :db/txInstant txInstant1 tx1 1]]))
|
||||||
|
|
||||||
(testing "Can replace fulltext indexed datoms"
|
(testing "Can replace fulltext indexed datoms"
|
||||||
(let [r (<? (d/<transact! conn [[:db/add 101 :test/fulltext "alternate thing"]]))]
|
(let [{tx2 :tx txInstant2 :txInstant} (<? (d/<transact! conn [[:db/add 101 :test/fulltext "alternate thing"]]))]
|
||||||
(is (= (<? (<fulltext-values (d/db conn)))
|
(is (= (<? (<fulltext-values (d/db conn)))
|
||||||
[[1 "test this"]
|
[[1 "test this"]
|
||||||
[2 "alternate thing"]]))
|
[2 "alternate thing"]]))
|
||||||
(is (= (<? (<datoms-after (d/db conn) tx0))
|
(is (= (<? (<datoms-after (d/db conn) tx0))
|
||||||
#{[101 :test/fulltext 2]})) ;; Values are raw; 2 is the rowid into fulltext_values.
|
#{[101 :test/fulltext 2]})) ;; Values are raw; 2 is the rowid into fulltext_values.
|
||||||
))
|
(is (= (<? (<transactions-after (d/db conn) tx0))
|
||||||
|
[[101 :test/fulltext 1 tx1 1] ;; Values are raw; 1 is the rowid into fulltext_values.
|
||||||
|
[tx1 :db/txInstant txInstant1 tx1 1]
|
||||||
|
[101 :test/fulltext 1 tx2 0] ;; Values are raw; 1 is the rowid into fulltext_values.
|
||||||
|
[101 :test/fulltext 2 tx2 1] ;; Values are raw; 2 is the rowid into fulltext_values.
|
||||||
|
[tx2 :db/txInstant txInstant2 tx2 1]]))
|
||||||
|
|
||||||
(testing "Can upsert keyed by fulltext indexed datoms"
|
(testing "Can upsert keyed by fulltext indexed datoms"
|
||||||
(let [r (<? (d/<transact! conn [{:db/id (d/id-literal :db.part/user) :test/fulltext "alternate thing" :test/other "other"}]))]
|
(let [{tx3 :tx txInstant3 :txInstant} (<? (d/<transact! conn [{:db/id (d/id-literal :db.part/user) :test/fulltext "alternate thing" :test/other "other"}]))]
|
||||||
(is (= (<? (<fulltext-values (d/db conn)))
|
(is (= (<? (<fulltext-values (d/db conn)))
|
||||||
[[1 "test this"]
|
[[1 "test this"]
|
||||||
[2 "alternate thing"]
|
[2 "alternate thing"]
|
||||||
[3 "other"]]))
|
[3 "other"]]))
|
||||||
(is (= (<? (<datoms-after (d/db conn) tx0))
|
(is (= (<? (<datoms-after (d/db conn) tx0))
|
||||||
#{[101 :test/fulltext 2] ;; Values are raw; 2, 3 are the rowids into fulltext_values.
|
#{[101 :test/fulltext 2] ;; Values are raw; 2, 3 are the rowids into fulltext_values.
|
||||||
[101 :test/other 3]}))
|
[101 :test/other 3]}))
|
||||||
))
|
(is (= (<? (<transactions-after (d/db conn) tx0))
|
||||||
|
[[101 :test/fulltext 1 tx1 1] ;; Values are raw; 1 is the rowid into fulltext_values.
|
||||||
|
[tx1 :db/txInstant txInstant1 tx1 1]
|
||||||
|
[101 :test/fulltext 1 tx2 0] ;; Values are raw; 1 is the rowid into fulltext_values.
|
||||||
|
[101 :test/fulltext 2 tx2 1] ;; Values are raw; 2 is the rowid into fulltext_values.
|
||||||
|
[tx2 :db/txInstant txInstant2 tx2 1]
|
||||||
|
[101 :test/other 3 tx3 1] ;; Values are raw; 3 is the rowid into fulltext_values.
|
||||||
|
[tx3 :db/txInstant txInstant3 tx3 1]]))
|
||||||
|
|
||||||
|
))))))
|
||||||
|
|
||||||
(testing "Can re-use fulltext indexed datoms"
|
(testing "Can re-use fulltext indexed datoms"
|
||||||
(let [r (<? (d/<transact! conn [[:db/add 102 :test/other "test this"]]))]
|
(let [r (<? (d/<transact! conn [[:db/add 102 :test/other "test this"]]))]
|
||||||
|
@ -647,7 +663,7 @@
|
||||||
|
|
||||||
(testing "can't upsert a :db.unique/value field"
|
(testing "can't upsert a :db.unique/value field"
|
||||||
(is (thrown-with-msg?
|
(is (thrown-with-msg?
|
||||||
ExceptionInfo #"because of unique constraint"
|
ExceptionInfo #"unique constraint"
|
||||||
(<? (d/<transact! conn [{:db/id (d/id-literal :db.part/user -1) :test/x 12345 :test/y 99999}]))))))
|
(<? (d/<transact! conn [{:db/id (d/id-literal :db.part/user -1) :test/x 12345 :test/y 99999}]))))))
|
||||||
|
|
||||||
(finally
|
(finally
|
||||||
|
|
Loading…
Reference in a new issue