Rework <apply-entities to be 40% faster and not blow the stack in CLJS.
* Batch up datoms into a smaller number of queries, improving transact speed by about 50%. * Restore transacting FTS attributes. * Implement retraction of freetext datoms.
This commit is contained in:
parent
63342e344f
commit
bda67ac8e8
3 changed files with 357 additions and 162 deletions
|
@ -37,6 +37,9 @@
|
||||||
(uncaughtException [_ thread ex]
|
(uncaughtException [_ thread ex]
|
||||||
(println ex "Uncaught exception on" (.getName thread))))))
|
(println ex "Uncaught exception on" (.getName thread))))))
|
||||||
|
|
||||||
|
(def max-sql-vars 999) ;; TODO: generalize.
|
||||||
|
|
||||||
|
|
||||||
;; ----------------------------------------------------------------------------
|
;; ----------------------------------------------------------------------------
|
||||||
;; define data-readers to be made available to EDN readers. in CLJS
|
;; define data-readers to be made available to EDN readers. in CLJS
|
||||||
;; they're magically available. in CLJ, data_readers.clj may or may
|
;; they're magically available. in CLJ, data_readers.clj may or may
|
||||||
|
@ -96,7 +99,7 @@
|
||||||
|
|
||||||
(<av
|
(<av
|
||||||
[db a v]
|
[db a v]
|
||||||
"Search for datoms using the AVET index.")
|
"Search for a single matching datom using the AVET index.")
|
||||||
|
|
||||||
(<apply-entities
|
(<apply-entities
|
||||||
[db tx entities]
|
[db tx entities]
|
||||||
|
@ -164,6 +167,324 @@
|
||||||
:table-alias source/gensym-table-alias
|
:table-alias source/gensym-table-alias
|
||||||
:make-constraints nil}))
|
:make-constraints nil}))
|
||||||
|
|
||||||
|
(defn- retractions->queries [retractions tx fulltext? ->SQLite]
|
||||||
|
(let
|
||||||
|
[f-q
|
||||||
|
"WITH vv AS (SELECT rowid FROM fulltext_values WHERE text = ?)
|
||||||
|
INSERT INTO tx_lookup (e0, a0, v0, tx0, added0, value_type_tag0, sv, svalue_type_tag)
|
||||||
|
VALUES (?, ?, (SELECT rowid FROM vv), ?, 0, ?, (SELECT rowid FROM vv), ?)"
|
||||||
|
|
||||||
|
non-f-q
|
||||||
|
"INSERT INTO tx_lookup (e0, a0, v0, tx0, added0, value_type_tag0, sv, svalue_type_tag)
|
||||||
|
VALUES (?, ?, ?, ?, 0, ?, ?, ?)"]
|
||||||
|
(map
|
||||||
|
(fn [[_ e a v]]
|
||||||
|
(let [[v tag] (->SQLite a v)]
|
||||||
|
(if (fulltext? a)
|
||||||
|
[f-q
|
||||||
|
v e a tx tag tag]
|
||||||
|
[non-f-q
|
||||||
|
e a v tx tag v tag])))
|
||||||
|
retractions)))
|
||||||
|
|
||||||
|
(defn- non-fts-many->queries [ops tx ->SQLite indexing? ref? unique?]
|
||||||
|
(let [q "INSERT INTO tx_lookup (e0, a0, v0, tx0, added0, value_type_tag0, index_avet0, index_vaet0, index_fulltext0, unique_value0, sv, svalue_type_tag) VALUES "
|
||||||
|
|
||||||
|
values-part
|
||||||
|
;; e0, a0, v0, tx0, added0, value_type_tag0
|
||||||
|
;; index_avet0, index_vaet0, index_fulltext0,
|
||||||
|
;; unique_value0, sv, svalue_type_tag
|
||||||
|
"(?, ?, ?, ?, 1, ?, ?, ?, 0, ?, ?, ?)"
|
||||||
|
|
||||||
|
repeater (memoize (fn [n] (interpose ", " (repeat n values-part))))]
|
||||||
|
|
||||||
|
;; This query takes ten variables per item. So we partition into max-sql-vars / 10.
|
||||||
|
(map
|
||||||
|
(fn [chunk]
|
||||||
|
(cons
|
||||||
|
;; Query string.
|
||||||
|
(apply str q (repeater (count chunk)))
|
||||||
|
|
||||||
|
;; Bindings.
|
||||||
|
(mapcat (fn [[_ e a v]]
|
||||||
|
(let [[v tag] (->SQLite a v)]
|
||||||
|
[e a v tx tag
|
||||||
|
(indexing? a) ; index_avet
|
||||||
|
(ref? a) ; index_vaet
|
||||||
|
(unique? a) ; unique_value
|
||||||
|
v tag]))
|
||||||
|
chunk)))
|
||||||
|
|
||||||
|
(partition-all (quot max-sql-vars 10) ops))))
|
||||||
|
|
||||||
|
(defn- non-fts-one->queries [ops tx ->SQLite indexing? ref? unique?]
|
||||||
|
(let [first-values-part
|
||||||
|
;; TODO: order value and tag closer together.
|
||||||
|
;; flags0
|
||||||
|
;; sv, svalue_type_tag
|
||||||
|
"(?, ?, ?, ?, ?, ?, ?, ?, 0, ?, ?, ?)"
|
||||||
|
first-repeater (memoize (fn [n] (interpose ", " (repeat n first-values-part))))
|
||||||
|
|
||||||
|
second-values-part
|
||||||
|
"(?, ?, ?, ?, ?, ?)"
|
||||||
|
second-repeater (memoize (fn [n] (interpose ", " (repeat n second-values-part))))
|
||||||
|
]
|
||||||
|
|
||||||
|
;; :db.cardinality/one takes two queries.
|
||||||
|
(mapcat
|
||||||
|
(fn [chunk]
|
||||||
|
[(cons
|
||||||
|
(apply
|
||||||
|
str
|
||||||
|
"INSERT INTO tx_lookup (e0, a0, v0, tx0, added0, value_type_tag0, index_avet0, index_vaet0, index_fulltext0, unique_value0, sv, svalue_type_tag) VALUES "
|
||||||
|
(first-repeater (count chunk)))
|
||||||
|
(mapcat (fn [[_ e a v]]
|
||||||
|
(let [[v tag] (->SQLite a v)]
|
||||||
|
[e a v tx 1 tag
|
||||||
|
(indexing? a) ; index_avet
|
||||||
|
(ref? a) ; index_vaet
|
||||||
|
(unique? a) ; unique_value
|
||||||
|
v tag]))
|
||||||
|
chunk))
|
||||||
|
|
||||||
|
(cons
|
||||||
|
(apply
|
||||||
|
str
|
||||||
|
"INSERT INTO tx_lookup (e0, a0, v0, tx0, added0, value_type_tag0) VALUES "
|
||||||
|
(second-repeater (count chunk)))
|
||||||
|
(mapcat (fn [[_ e a v]]
|
||||||
|
(let [[v tag] (->SQLite a v)]
|
||||||
|
[e a v tx 0 tag]))
|
||||||
|
chunk))])
|
||||||
|
(partition-all (quot max-sql-vars 11) ops))))
|
||||||
|
|
||||||
|
;;; An FTS insertion happens in two parts.
|
||||||
|
;;; Firstly, we ensure that the fulltext value is present in the store.
|
||||||
|
;;; This is effectively an INSERT OR IGNORE… but FTS tables don't support
|
||||||
|
;;; uniqueness constraints. So we do it through a trigger on a view.
|
||||||
|
;;; When we insert the value, we pass with it a searchid. We'll use this
|
||||||
|
;;; later when inserting the datom.
|
||||||
|
;;; Secondly, we insert a row just like for non-FTS. The only difference
|
||||||
|
;;; is that the value is the rowid into the fulltext_values table.
|
||||||
|
(defn- fts-many->queries [ops tx ->SQLite indexing? ref? unique?]
|
||||||
|
;; TODO: operations with the same text value should be
|
||||||
|
;; coordinated here!
|
||||||
|
;; It'll work fine without so long as queries are executed
|
||||||
|
;; in order and not combined, but even so it's inefficient.
|
||||||
|
(conj
|
||||||
|
(mapcat
|
||||||
|
(fn [[_ e a v] searchid]
|
||||||
|
(let [[v tag] (->SQLite a v)]
|
||||||
|
;; First query: ensure the value exists.
|
||||||
|
[["INSERT INTO fulltext_values_view (text, searchid) VALUES (?, ?)"
|
||||||
|
v searchid]
|
||||||
|
|
||||||
|
;; Second query: tx_lookup.
|
||||||
|
[(str
|
||||||
|
"WITH vv(rowid) AS (SELECT rowid FROM fulltext_values WHERE searchid = ?) "
|
||||||
|
"INSERT INTO tx_lookup (e0, a0, v0, tx0, added0, value_type_tag0, index_avet0, index_vaet0, index_fulltext0, unique_value0, sv, svalue_type_tag) VALUES "
|
||||||
|
"(?, ?, (SELECT rowid FROM vv), ?, 1, ?, ?, ?, 1, ?, (SELECT rowid FROM vv), ?)")
|
||||||
|
searchid
|
||||||
|
e a tx tag
|
||||||
|
(indexing? a) ; index_avet
|
||||||
|
(ref? a) ; index_vaet
|
||||||
|
(unique? a) ; unique_value
|
||||||
|
tag]]))
|
||||||
|
ops
|
||||||
|
(range 2000 999999999))
|
||||||
|
["UPDATE fulltext_values SET searchid = NULL WHERE searchid IS NOT NULL"]))
|
||||||
|
|
||||||
|
(defn fts-one->queries [ops tx ->SQLite indexing? ref? unique?]
|
||||||
|
(conj
|
||||||
|
(mapcat
|
||||||
|
(fn [[_ e a v] searchid]
|
||||||
|
(let [[v tag] (->SQLite a v)]
|
||||||
|
;; First query: ensure the value exists.
|
||||||
|
[["INSERT INTO fulltext_values_view (text, searchid) VALUES (?, ?)"
|
||||||
|
v searchid]
|
||||||
|
|
||||||
|
;; Second and third queries: tx_lookup.
|
||||||
|
[(str
|
||||||
|
"WITH vv(rowid) AS (SELECT rowid FROM fulltext_values WHERE searchid = ?) "
|
||||||
|
"INSERT INTO tx_lookup (e0, a0, v0, tx0, added0, value_type_tag0, index_avet0, index_vaet0, index_fulltext0, unique_value0, sv, svalue_type_tag) VALUES "
|
||||||
|
"(?, ?, (SELECT rowid FROM vv), ?, 1, ?, ?, ?, 1, ?, (SELECT rowid FROM vv), ?)")
|
||||||
|
searchid
|
||||||
|
e a tx tag
|
||||||
|
(indexing? a) ; index_avet
|
||||||
|
(ref? a) ; index_vaet
|
||||||
|
(unique? a) ; unique_value
|
||||||
|
tag]
|
||||||
|
|
||||||
|
[(str
|
||||||
|
"INSERT INTO tx_lookup (e0, a0, v0, tx0, added0, value_type_tag0) VALUES "
|
||||||
|
"(?, ?, (SELECT rowid FROM fulltext_values WHERE searchid = ?), ?, 0, ?)")
|
||||||
|
e a searchid tx tag]]))
|
||||||
|
ops
|
||||||
|
(range 3000 999999999))
|
||||||
|
["UPDATE fulltext_values SET searchid = NULL WHERE searchid IS NOT NULL"]))
|
||||||
|
|
||||||
|
(defn- -run-queries [conn queries exception-message]
|
||||||
|
(go-pair
|
||||||
|
(try
|
||||||
|
(doseq [q queries]
|
||||||
|
(<? (s/execute! conn q)))
|
||||||
|
(catch #?(:clj java.sql.SQLException :cljs js/Error) e
|
||||||
|
(throw (ex-info exception-message {} e))))))
|
||||||
|
|
||||||
|
(defn- -preamble-drop [conn]
|
||||||
|
(let [preamble-drop-index ["DROP INDEX IF EXISTS id_tx_lookup_added"]
|
||||||
|
preamble-delete-tx-lookup ["DELETE FROM tx_lookup"]]
|
||||||
|
(go-pair
|
||||||
|
(p :preamble
|
||||||
|
(doseq [q [preamble-drop-index preamble-delete-tx-lookup]]
|
||||||
|
(<? (s/execute! conn q)))))))
|
||||||
|
|
||||||
|
(defn- -after-drop [conn]
|
||||||
|
(go-pair
|
||||||
|
(doseq [q [;; The lookup table takes space on disk, so we purge it aggressively.
|
||||||
|
["DROP INDEX IF EXISTS id_tx_lookup_added"]
|
||||||
|
["DELETE FROM tx_lookup"]]]
|
||||||
|
(<? (s/execute! conn q)))))
|
||||||
|
|
||||||
|
(defn- -build-transaction [conn tx]
|
||||||
|
(let [build-indices ["CREATE INDEX IF NOT EXISTS idx_tx_lookup_added ON tx_lookup (added0)"]
|
||||||
|
|
||||||
|
;; First is fast, only one table walk: lookup by exact eav.
|
||||||
|
;; Second is slower, but still only one table walk: lookup old value by ea.
|
||||||
|
insert-into-tx-lookup
|
||||||
|
["INSERT INTO tx_lookup
|
||||||
|
SELECT t.e0, t.a0, t.v0, t.tx0, t.added0 + 2, t.value_type_tag0, t.index_avet0, t.index_vaet0, t.index_fulltext0, t.unique_value0, t.sv, t.svalue_type_tag, d.rowid, d.e, d.a, d.v, d.tx, d.value_type_tag
|
||||||
|
FROM tx_lookup AS t
|
||||||
|
LEFT JOIN datoms AS d
|
||||||
|
ON t.e0 = d.e AND
|
||||||
|
t.a0 = d.a AND
|
||||||
|
t.sv = d.v AND
|
||||||
|
t.svalue_type_tag = d.value_type_tag AND
|
||||||
|
t.sv IS NOT NULL
|
||||||
|
|
||||||
|
UNION ALL
|
||||||
|
SELECT t.e0, t.a0, t.v0, t.tx0, t.added0 + 2, t.value_type_tag0, t.index_avet0, t.index_vaet0, t.index_fulltext0, t.unique_value0, t.sv, t.svalue_type_tag, d.rowid, d.e, d.a, d.v, d.tx, d.value_type_tag
|
||||||
|
FROM tx_lookup AS t,
|
||||||
|
datoms AS d
|
||||||
|
WHERE t.sv IS NULL AND
|
||||||
|
t.e0 = d.e AND
|
||||||
|
t.a0 = d.a"]
|
||||||
|
|
||||||
|
t-datoms-not-already-present
|
||||||
|
["INSERT INTO transactions (e, a, v, tx, added, value_type_tag)
|
||||||
|
SELECT e0, a0, v0, ?, 1, value_type_tag0
|
||||||
|
FROM tx_lookup
|
||||||
|
WHERE added0 IS 3 AND e IS NULL" tx] ;; TODO: get rid of magic value 3.
|
||||||
|
|
||||||
|
t-retract-datoms-carefully
|
||||||
|
["INSERT INTO transactions (e, a, v, tx, added, value_type_tag)
|
||||||
|
SELECT e, a, v, ?, 0, value_type_tag
|
||||||
|
FROM tx_lookup
|
||||||
|
WHERE added0 IS 2 AND ((sv IS NOT NULL) OR (sv IS NULL AND v0 IS NOT v)) AND v IS NOT NULL" tx] ;; TODO: get rid of magic value 2.
|
||||||
|
]
|
||||||
|
(go-pair
|
||||||
|
(doseq [q [build-indices insert-into-tx-lookup
|
||||||
|
t-datoms-not-already-present
|
||||||
|
t-retract-datoms-carefully]]
|
||||||
|
(<? (s/execute! conn q))))))
|
||||||
|
|
||||||
|
(defn- -build-datoms [conn tx]
|
||||||
|
(let [d-datoms-not-already-present
|
||||||
|
["INSERT INTO datoms (e, a, v, tx, value_type_tag, index_avet, index_vaet, index_fulltext, unique_value)
|
||||||
|
SELECT e0, a0, v0, ?, value_type_tag0,
|
||||||
|
index_avet0, index_vaet0, index_fulltext0, unique_value0
|
||||||
|
FROM tx_lookup
|
||||||
|
WHERE added0 IS 3 AND e IS NULL" tx] ;; TODO: get rid of magic value 3.
|
||||||
|
|
||||||
|
;; TODO: retract fulltext datoms correctly.
|
||||||
|
d-retract-datoms-carefully
|
||||||
|
["WITH ids AS (SELECT l.rid FROM tx_lookup AS l WHERE l.added0 IS 2 AND ((l.sv IS NOT NULL) OR (l.sv IS NULL AND l.v0 IS NOT l.v)))
|
||||||
|
DELETE FROM datoms WHERE rowid IN ids" ;; TODO: get rid of magic value 2.
|
||||||
|
]]
|
||||||
|
(-run-queries conn [d-datoms-not-already-present d-retract-datoms-carefully]
|
||||||
|
"Transaction violates unique constraint")))
|
||||||
|
|
||||||
|
(defn- -<apply-entities [db tx entities]
|
||||||
|
(let [schema (.-schema db)
|
||||||
|
->SQLite (partial ds/->SQLite schema)
|
||||||
|
fulltext? (memoize (partial ds/fulltext? schema))
|
||||||
|
many? (memoize (fn [a] (ds/multival? schema a)))
|
||||||
|
indexing? (memoize (fn [a] (ds/indexing? schema a)))
|
||||||
|
ref? (memoize (fn [a] (ds/ref? schema a)))
|
||||||
|
unique? (memoize (fn [a] (ds/unique? schema a)))
|
||||||
|
conn (:sqlite-connection db)
|
||||||
|
|
||||||
|
;; Collect all the queries we need to run.
|
||||||
|
queries (atom [])
|
||||||
|
operations (group-by first entities)]
|
||||||
|
|
||||||
|
(when-not (clojure.set/subset? (keys operations) #{:db/retract :db/add})
|
||||||
|
(raise (str "Unknown operations " (keys operations))
|
||||||
|
{:error :transact/syntax, :operations (dissoc operations :db/retract :db/add)}))
|
||||||
|
|
||||||
|
;; We can turn all non-FTS operations into simple SQL queries that we run serially.
|
||||||
|
;; FTS queries require us to get a rowid from the FTS table and use that for
|
||||||
|
;; insertion, so we need another pass.
|
||||||
|
;; We can't just freely use `go-pair` here, because this function is so complicated
|
||||||
|
;; that ClojureScript blows the stack trying to compile it.
|
||||||
|
|
||||||
|
(when-let [retractions (:db/retract operations)]
|
||||||
|
(swap!
|
||||||
|
queries concat (retractions->queries retractions tx fulltext? ->SQLite)))
|
||||||
|
|
||||||
|
;; We want to partition our additions into four groups according to two
|
||||||
|
;; characteristics: whether they require writing to the FTS value table,
|
||||||
|
;; and whether the attribute has a 'many' cardinality constraint. Each of
|
||||||
|
;; these four requires different queries.
|
||||||
|
(let [additions
|
||||||
|
(group-by (fn [[op e a v]]
|
||||||
|
(if (fulltext? a)
|
||||||
|
(if (many? a)
|
||||||
|
:fts-many
|
||||||
|
:fts-one)
|
||||||
|
(if (many? a)
|
||||||
|
:non-fts-many
|
||||||
|
:non-fts-one)))
|
||||||
|
(:db/add operations))
|
||||||
|
transforms
|
||||||
|
{:fts-one fts-one->queries
|
||||||
|
:fts-many fts-many->queries
|
||||||
|
:non-fts-one non-fts-one->queries
|
||||||
|
:non-fts-many non-fts-many->queries}]
|
||||||
|
|
||||||
|
(doseq [[key ops] additions]
|
||||||
|
(when-let [transform (key transforms)]
|
||||||
|
(swap!
|
||||||
|
queries concat
|
||||||
|
(transform ops tx ->SQLite indexing? ref? unique?)))))
|
||||||
|
|
||||||
|
;; Now run each query.
|
||||||
|
;; This code is a little tortured to avoid blowing the compiler stack in cljs.
|
||||||
|
|
||||||
|
(go-pair
|
||||||
|
(<? (-preamble-drop conn))
|
||||||
|
|
||||||
|
(p :run-insert-queries
|
||||||
|
(<? (-run-queries conn @queries "Transaction violates cardinality constraint")))
|
||||||
|
|
||||||
|
;; Follow up by building indices, then do the work.
|
||||||
|
(p :build-and-transaction
|
||||||
|
(<? (-build-transaction conn tx)))
|
||||||
|
|
||||||
|
(p :update-materialized-datoms
|
||||||
|
(<? (-build-datoms conn tx)))
|
||||||
|
|
||||||
|
(<? (-after-drop conn))
|
||||||
|
|
||||||
|
;; Return the written transaction.
|
||||||
|
(p :select-tx-data
|
||||||
|
(mapv (partial row->Datom schema)
|
||||||
|
(<?
|
||||||
|
(s/all-rows
|
||||||
|
(:sqlite-connection db)
|
||||||
|
;; We index on tx, so the following is fast.
|
||||||
|
["SELECT * FROM transactions WHERE tx = ?" tx])))))))
|
||||||
|
|
||||||
(defrecord DB [sqlite-connection schema ident-map]
|
(defrecord DB [sqlite-connection schema ident-map]
|
||||||
;; ident-map maps between keyword idents and integer entids. The set of idents and entids is
|
;; ident-map maps between keyword idents and integer entids. The set of idents and entids is
|
||||||
;; disjoint, so we represent both directions of the mapping in the same map for simplicity. Also
|
;; disjoint, so we represent both directions of the mapping in the same map for simplicity. Also
|
||||||
|
@ -202,18 +523,23 @@
|
||||||
(not= 0))))
|
(not= 0))))
|
||||||
|
|
||||||
(<av [db a v]
|
(<av [db a v]
|
||||||
(let [[v tag] (ds/->SQLite schema a v)]
|
(let [schema (.-schema db) ;; TODO: understand why (schema db) fails.
|
||||||
|
a (entid db a)
|
||||||
|
[v tag] (ds/->SQLite schema a v)
|
||||||
|
yield-datom
|
||||||
|
(fn [rows]
|
||||||
|
(when-let [row (first rows)]
|
||||||
|
(row->Datom schema row)))]
|
||||||
(go-pair
|
(go-pair
|
||||||
(->>
|
(->>
|
||||||
{:select [:e :a :v :tx [1 :added]] ;; TODO: generalize columns.
|
;; TODO: generalize columns.
|
||||||
:from [:all_datoms]
|
["SELECT e, a, v, tx, 1 AS added FROM all_datoms
|
||||||
:where [:and [:= :index_avet 1] [:= :a a] [:= :value_type_tag tag] [:= :v v]]}
|
WHERE index_avet = 1 AND a = ? AND value_type_tag = ? AND v = ?
|
||||||
(s/format) ;; TODO: format these statements only once.
|
LIMIT 1" a tag v]
|
||||||
|
|
||||||
(s/all-rows (:sqlite-connection db))
|
(s/all-rows (:sqlite-connection db))
|
||||||
(<?)
|
<?
|
||||||
|
yield-datom))))
|
||||||
(mapv (partial row->Datom (.-schema db))))))) ;; TODO: understand why (schema db) fails.
|
|
||||||
|
|
||||||
(<next-eid [db tempid]
|
(<next-eid [db tempid]
|
||||||
{:pre [(id-literal? tempid)]}
|
{:pre [(id-literal? tempid)]}
|
||||||
|
@ -228,161 +554,14 @@
|
||||||
{:error :db/bad-part
|
{:error :db/bad-part
|
||||||
:part (:part tempid)}))
|
:part (:part tempid)}))
|
||||||
|
|
||||||
(<? (exec ["UPDATE parts SET idx = idx + 1 WHERE part = ?" part]))
|
(p :next-eid-body
|
||||||
(:eid (first (<? (s/all-rows (:sqlite-connection db) ["SELECT (start + idx) AS eid FROM parts WHERE part = ?" part])))))))
|
(<? (exec ["UPDATE parts SET idx = idx + 1 WHERE part = ?" part]))
|
||||||
|
(:eid (first (<? (s/all-rows (:sqlite-connection db) ["SELECT (start + idx) AS eid FROM parts WHERE part = ?" part]))))))))
|
||||||
|
|
||||||
(<apply-entities [db tx entities]
|
(<apply-entities [db tx entities]
|
||||||
{:pre [(db? db) (sequential? entities)]}
|
{:pre [(db? db) (sequential? entities)]}
|
||||||
(go-pair
|
(-<apply-entities db tx entities))
|
||||||
(let [schema (.-schema db)
|
|
||||||
many? (memoize (fn [a] (ds/multival? schema a)))
|
|
||||||
indexing? (memoize (fn [a] (ds/indexing? schema a)))
|
|
||||||
ref? (memoize (fn [a] (ds/ref? schema a)))
|
|
||||||
unique? (memoize (fn [a] (ds/unique? schema a)))
|
|
||||||
<exec (partial s/execute! (:sqlite-connection db))]
|
|
||||||
(p :delete-tx-lookup-before
|
|
||||||
(<? (<exec ["DROP INDEX IF EXISTS id_tx_lookup_added"]))
|
|
||||||
(<? (<exec ["DELETE FROM tx_lookup"])))
|
|
||||||
|
|
||||||
(p :insertions
|
|
||||||
(try
|
|
||||||
(doseq [entity entities]
|
|
||||||
(let [[op e a v] entity
|
|
||||||
[v tag] (ds/->SQLite schema a v)
|
|
||||||
fulltext? (ds/fulltext? schema a)]
|
|
||||||
(cond
|
|
||||||
(= op :db/add)
|
|
||||||
(let [v (if fulltext?
|
|
||||||
(<? (<insert-fulltext-value db v))
|
|
||||||
v)]
|
|
||||||
(if (many? a)
|
|
||||||
;; :db.cardinality/many
|
|
||||||
(<? (<exec [(str "INSERT INTO tx_lookup (e0, a0, v0, tx0, added0, value_type_tag0, index_avet0, index_vaet0, index_fulltext0, unique_value0, sv, svalue_type_tag) VALUES ("
|
|
||||||
"?, ?, ?, ?, 1, ?, " ; e0, a0, v0, tx0, added0, value_type_tag0
|
|
||||||
"?, ?, ?, ?, " ; flags0
|
|
||||||
"?, ?" ; sv, svalue_type_tag
|
|
||||||
")")
|
|
||||||
e a v tx tag
|
|
||||||
(indexing? a) ; index_avet
|
|
||||||
(ref? a) ; index_vaet
|
|
||||||
fulltext? ; index_fulltext
|
|
||||||
(unique? a) ; unique_value
|
|
||||||
v tag
|
|
||||||
]))
|
|
||||||
;; :db.cardinality/one
|
|
||||||
(do
|
|
||||||
(<? (<exec [(str "INSERT INTO tx_lookup (e0, a0, v0, tx0, added0, value_type_tag0, index_avet0, index_vaet0, index_fulltext0, unique_value0, sv, svalue_type_tag) VALUES ("
|
|
||||||
"?, ?, ?, ?, ?, ?, " ; TODO: order value and tag closer together.
|
|
||||||
"?, ?, ?, ?, " ; flags0
|
|
||||||
"?, ?" ; sv, svalue_type_tag
|
|
||||||
")")
|
|
||||||
e a v tx 1 tag
|
|
||||||
(indexing? a) ; index_avet
|
|
||||||
(ref? a) ; index_vaet
|
|
||||||
fulltext? ; index_fulltext
|
|
||||||
(unique? a) ; unique_value
|
|
||||||
v tag
|
|
||||||
]))
|
|
||||||
(<? (<exec [(str "INSERT INTO tx_lookup (e0, a0, v0, tx0, added0, value_type_tag0, sv, svalue_type_tag) VALUES ("
|
|
||||||
"?, ?, ?, ?, ?, ?, "
|
|
||||||
"?, ?" ; sv, svalue_type_tag
|
|
||||||
")")
|
|
||||||
e a v tx 0 tag
|
|
||||||
nil nil ; Search values.
|
|
||||||
])))))
|
|
||||||
|
|
||||||
(= op :db/retract)
|
|
||||||
(<? (<exec [(str "INSERT INTO tx_lookup (e0, a0, v0, tx0, added0, value_type_tag0, sv, svalue_type_tag) VALUES ("
|
|
||||||
"?, ?, ?, ?, ?, ?, "
|
|
||||||
"?, ?" ; sv, svalue_type_tag
|
|
||||||
")")
|
|
||||||
e a v tx 0 tag
|
|
||||||
v tag
|
|
||||||
]))
|
|
||||||
|
|
||||||
true
|
|
||||||
(raise "Unknown operation at " entity ", expected :db/add, :db/retract"
|
|
||||||
{:error :transact/syntax, :operation op, :tx-data entity}))))
|
|
||||||
|
|
||||||
(catch java.sql.SQLException e
|
|
||||||
(throw (ex-info "Transaction violates cardinality constraint" {} e))))) ;; TODO: say more about the conflicting datoms.
|
|
||||||
|
|
||||||
(p :create-tx-lookup-indices
|
|
||||||
(<? (<exec ["CREATE INDEX IF NOT EXISTS idx_tx_lookup_added ON tx_lookup (added0)"])))
|
|
||||||
|
|
||||||
(p :join
|
|
||||||
(<?
|
|
||||||
(<exec
|
|
||||||
;; First is fast, only one table walk: lookup by exact eav.
|
|
||||||
;; Second is slower, but still only one table walk: lookup old value by ea.
|
|
||||||
["INSERT INTO tx_lookup
|
|
||||||
|
|
||||||
SELECT t.e0, t.a0, t.v0, t.tx0, t.added0 + 2, t.value_type_tag0, t.index_avet0, t.index_vaet0, t.index_fulltext0, t.unique_value0, t.sv, t.svalue_type_tag, d.rowid, d.e, d.a, d.v, d.tx, d.value_type_tag
|
|
||||||
FROM tx_lookup AS t
|
|
||||||
LEFT JOIN datoms AS d
|
|
||||||
ON t.e0 = d.e AND
|
|
||||||
t.a0 = d.a AND
|
|
||||||
t.sv = d.v AND
|
|
||||||
t.svalue_type_tag = d.value_type_tag AND
|
|
||||||
t.sv IS NOT NULL
|
|
||||||
|
|
||||||
UNION ALL
|
|
||||||
SELECT t.e0, t.a0, t.v0, t.tx0, t.added0 + 2, t.value_type_tag0, t.index_avet0, t.index_vaet0, t.index_fulltext0, t.unique_value0, t.sv, t.svalue_type_tag, d.rowid, d.e, d.a, d.v, d.tx, d.value_type_tag
|
|
||||||
FROM tx_lookup AS t,
|
|
||||||
datoms AS d
|
|
||||||
WHERE t.sv IS NULL AND
|
|
||||||
t.e0 = d.e AND
|
|
||||||
t.a0 = d.a
|
|
||||||
"])))
|
|
||||||
|
|
||||||
(p :insert-transaction
|
|
||||||
(p :insert-transaction-added
|
|
||||||
;; Add datoms that aren't already present.
|
|
||||||
(<? (<exec ["INSERT INTO transactions (e, a, v, tx, added, value_type_tag)
|
|
||||||
SELECT e0, a0, v0, ?, 1, value_type_tag0
|
|
||||||
FROM tx_lookup
|
|
||||||
WHERE added0 IS 3 AND e IS NULL" tx]))) ;; TODO: get rid of magic value 3.
|
|
||||||
|
|
||||||
(p :insert-transaction-retracted
|
|
||||||
;; Retract datoms carefully, either when they're matched exactly or when the existing value doesn't match the new value.
|
|
||||||
(<? (<exec ["INSERT INTO transactions (e, a, v, tx, added, value_type_tag)
|
|
||||||
SELECT e, a, v, ?, 0, value_type_tag
|
|
||||||
FROM tx_lookup
|
|
||||||
WHERE added0 IS 2 AND ((sv IS NOT NULL) OR (sv IS NULL AND v0 IS NOT v)) AND v IS NOT NULL" tx])))) ;; TODO: get rid of magic value 2.
|
|
||||||
|
|
||||||
(try
|
|
||||||
(p :update-datoms-materialized-view
|
|
||||||
(p :insert-datoms-added
|
|
||||||
;; Add datoms that aren't already present.
|
|
||||||
(<? (<exec ["INSERT INTO datoms (e, a, v, tx, value_type_tag, index_avet, index_vaet, index_fulltext, unique_value)
|
|
||||||
SELECT e0, a0, v0, ?, value_type_tag0,
|
|
||||||
index_avet0, index_vaet0, index_fulltext0, unique_value0
|
|
||||||
FROM tx_lookup
|
|
||||||
WHERE added0 IS 3 AND e IS NULL" tx])) ;; TODO: get rid of magic value 3.)
|
|
||||||
|
|
||||||
;; TODO: retract fulltext datoms correctly.
|
|
||||||
(p :delete-datoms-retracted
|
|
||||||
(<? (<exec
|
|
||||||
["WITH ids AS (SELECT l.rid FROM tx_lookup AS l WHERE l.added0 IS 2 AND ((l.sv IS NOT NULL) OR (l.sv IS NULL AND l.v0 IS NOT l.v)))
|
|
||||||
DELETE FROM datoms WHERE rowid IN ids" ;; TODO: get rid of magic value 2.
|
|
||||||
])))))
|
|
||||||
|
|
||||||
(catch java.sql.SQLException e
|
|
||||||
(throw (ex-info "Transaction violates unique constraint" {} e)))) ;; TODO: say more about the conflicting datoms.
|
|
||||||
|
|
||||||
;; The lookup table takes space on disk, so we purge it aggressively.
|
|
||||||
(p :delete-tx-lookup-after
|
|
||||||
(<? (<exec ["DROP INDEX IF EXISTS id_tx_lookup_added"]))
|
|
||||||
(<? (<exec ["DELETE FROM tx_lookup"])))
|
|
||||||
|
|
||||||
;; The transaction has been written -- read it back. (We index on tx, so the following is fast.)
|
|
||||||
(let [tx-data (p :select-tx-data
|
|
||||||
(->>
|
|
||||||
(s/all-rows (:sqlite-connection db) ["SELECT * FROM transactions WHERE tx = ?" tx])
|
|
||||||
(<?)
|
|
||||||
|
|
||||||
(mapv (partial row->Datom schema))))]
|
|
||||||
tx-data))))
|
|
||||||
|
|
||||||
(<apply-db-ident-assertions [db added-idents merge]
|
(<apply-db-ident-assertions [db added-idents merge]
|
||||||
(go-pair
|
(go-pair
|
||||||
|
|
|
@ -74,7 +74,23 @@
|
||||||
;; By default we use Unicode-aware tokenizing (particularly for case folding), but preserve
|
;; By default we use Unicode-aware tokenizing (particularly for case folding), but preserve
|
||||||
;; diacritics.
|
;; diacritics.
|
||||||
"CREATE VIRTUAL TABLE fulltext_values
|
"CREATE VIRTUAL TABLE fulltext_values
|
||||||
USING FTS4 (text NOT NULL, tokenize=unicode61 \"remove_diacritics=0\")"
|
USING FTS4 (text NOT NULL, searchid INT, tokenize=unicode61 \"remove_diacritics=0\")"
|
||||||
|
|
||||||
|
;; This combination of view and triggers allows you to transparently
|
||||||
|
;; update-or-insert into FTS. Just INSERT INTO fulltext_values_view (text, searchid).
|
||||||
|
"CREATE VIEW fulltext_values_view AS SELECT * FROM fulltext_values"
|
||||||
|
"CREATE TRIGGER replace_fulltext_searchid
|
||||||
|
INSTEAD OF INSERT ON fulltext_values_view
|
||||||
|
WHEN EXISTS (SELECT 1 FROM fulltext_values WHERE text = new.text)
|
||||||
|
BEGIN
|
||||||
|
UPDATE fulltext_values SET searchid = new.searchid WHERE text = new.text;
|
||||||
|
END"
|
||||||
|
"CREATE TRIGGER insert_fulltext_searchid
|
||||||
|
INSTEAD OF INSERT ON fulltext_values_view
|
||||||
|
WHEN NOT EXISTS (SELECT 1 FROM fulltext_values WHERE text = new.text)
|
||||||
|
BEGIN
|
||||||
|
INSERT INTO fulltext_values (text, searchid) VALUES (new.text, new.searchid);
|
||||||
|
END"
|
||||||
|
|
||||||
;; A view transparently interpolating fulltext indexed values into the datom structure.
|
;; A view transparently interpolating fulltext indexed values into the datom structure.
|
||||||
"CREATE VIEW fulltext_datoms AS
|
"CREATE VIEW fulltext_datoms AS
|
||||||
|
|
|
@ -295,7 +295,7 @@
|
||||||
(and (id-literal? e)
|
(and (id-literal? e)
|
||||||
(ds/unique-identity? (db/schema db) a)
|
(ds/unique-identity? (db/schema db) a)
|
||||||
(not-any? id-literal? [a v]))
|
(not-any? id-literal? [a v]))
|
||||||
(let [upserted-eid (:e (first (<? (db/<av db a v))))
|
(let [upserted-eid (:e (<? (db/<av db a v)))
|
||||||
allocated-eid (get-in report [:tempids e])]
|
allocated-eid (get-in report [:tempids e])]
|
||||||
(if (and upserted-eid allocated-eid (not= upserted-eid allocated-eid))
|
(if (and upserted-eid allocated-eid (not= upserted-eid allocated-eid))
|
||||||
(<? (<retry-with-tempid db initial-report initial-entities e upserted-eid)) ;; TODO: not initial report, just the sorted entities here.
|
(<? (<retry-with-tempid db initial-report initial-entities e upserted-eid)) ;; TODO: not initial report, just the sorted entities here.
|
||||||
|
|
Loading…
Reference in a new issue