From 86b5a8ea8ac2b61d3af076e42be4684d267f1cf7 Mon Sep 17 00:00:00 2001 From: Richard Newman Date: Wed, 10 Aug 2016 13:16:25 -0700 Subject: [PATCH] Rework queries [retractions tx fulltext? ->SQLite] + (let + [f-q + "WITH vv AS (SELECT rowid FROM fulltext_values WHERE text = ?) + INSERT INTO tx_lookup (e0, a0, v0, tx0, added0, value_type_tag0, sv, svalue_type_tag) + VALUES (?, ?, (SELECT rowid FROM vv), ?, 0, ?, (SELECT rowid FROM vv), ?)" + + non-f-q + "INSERT INTO tx_lookup (e0, a0, v0, tx0, added0, value_type_tag0, sv, svalue_type_tag) + VALUES (?, ?, ?, ?, 0, ?, ?, ?)"] + (map + (fn [[_ e a v]] + (let [[v tag] (->SQLite a v)] + (if (fulltext? a) + [f-q + v e a tx tag tag] + [non-f-q + e a v tx tag v tag]))) + retractions))) + +(defn- non-fts-many->queries [ops tx ->SQLite indexing? ref? unique?] + (let [q "INSERT INTO tx_lookup (e0, a0, v0, tx0, added0, value_type_tag0, index_avet0, index_vaet0, index_fulltext0, unique_value0, sv, svalue_type_tag) VALUES " + + values-part + ;; e0, a0, v0, tx0, added0, value_type_tag0 + ;; index_avet0, index_vaet0, index_fulltext0, + ;; unique_value0, sv, svalue_type_tag + "(?, ?, ?, ?, 1, ?, ?, ?, 0, ?, ?, ?)" + + repeater (memoize (fn [n] (interpose ", " (repeat n values-part))))] + + ;; This query takes ten variables per item. So we partition into max-sql-vars / 10. + (map + (fn [chunk] + (cons + ;; Query string. + (apply str q (repeater (count chunk))) + + ;; Bindings. + (mapcat (fn [[_ e a v]] + (let [[v tag] (->SQLite a v)] + [e a v tx tag + (indexing? a) ; index_avet + (ref? a) ; index_vaet + (unique? a) ; unique_value + v tag])) + chunk))) + + (partition-all (quot max-sql-vars 10) ops)))) + +(defn- non-fts-one->queries [ops tx ->SQLite indexing? ref? unique?] + (let [first-values-part + ;; TODO: order value and tag closer together. + ;; flags0 + ;; sv, svalue_type_tag + "(?, ?, ?, ?, ?, ?, ?, ?, 0, ?, ?, ?)" + first-repeater (memoize (fn [n] (interpose ", " (repeat n first-values-part)))) + + second-values-part + "(?, ?, ?, ?, ?, ?)" + second-repeater (memoize (fn [n] (interpose ", " (repeat n second-values-part)))) + ] + + ;; :db.cardinality/one takes two queries. + (mapcat + (fn [chunk] + [(cons + (apply + str + "INSERT INTO tx_lookup (e0, a0, v0, tx0, added0, value_type_tag0, index_avet0, index_vaet0, index_fulltext0, unique_value0, sv, svalue_type_tag) VALUES " + (first-repeater (count chunk))) + (mapcat (fn [[_ e a v]] + (let [[v tag] (->SQLite a v)] + [e a v tx 1 tag + (indexing? a) ; index_avet + (ref? a) ; index_vaet + (unique? a) ; unique_value + v tag])) + chunk)) + + (cons + (apply + str + "INSERT INTO tx_lookup (e0, a0, v0, tx0, added0, value_type_tag0) VALUES " + (second-repeater (count chunk))) + (mapcat (fn [[_ e a v]] + (let [[v tag] (->SQLite a v)] + [e a v tx 0 tag])) + chunk))]) + (partition-all (quot max-sql-vars 11) ops)))) + +;;; An FTS insertion happens in two parts. +;;; Firstly, we ensure that the fulltext value is present in the store. +;;; This is effectively an INSERT OR IGNORE… but FTS tables don't support +;;; uniqueness constraints. So we do it through a trigger on a view. +;;; When we insert the value, we pass with it a searchid. We'll use this +;;; later when inserting the datom. +;;; Secondly, we insert a row just like for non-FTS. The only difference +;;; is that the value is the rowid into the fulltext_values table. +(defn- fts-many->queries [ops tx ->SQLite indexing? ref? unique?] + ;; TODO: operations with the same text value should be + ;; coordinated here! + ;; It'll work fine without so long as queries are executed + ;; in order and not combined, but even so it's inefficient. + (conj + (mapcat + (fn [[_ e a v] searchid] + (let [[v tag] (->SQLite a v)] + ;; First query: ensure the value exists. + [["INSERT INTO fulltext_values_view (text, searchid) VALUES (?, ?)" + v searchid] + + ;; Second query: tx_lookup. + [(str + "WITH vv(rowid) AS (SELECT rowid FROM fulltext_values WHERE searchid = ?) " + "INSERT INTO tx_lookup (e0, a0, v0, tx0, added0, value_type_tag0, index_avet0, index_vaet0, index_fulltext0, unique_value0, sv, svalue_type_tag) VALUES " + "(?, ?, (SELECT rowid FROM vv), ?, 1, ?, ?, ?, 1, ?, (SELECT rowid FROM vv), ?)") + searchid + e a tx tag + (indexing? a) ; index_avet + (ref? a) ; index_vaet + (unique? a) ; unique_value + tag]])) + ops + (range 2000 999999999)) + ["UPDATE fulltext_values SET searchid = NULL WHERE searchid IS NOT NULL"])) + +(defn fts-one->queries [ops tx ->SQLite indexing? ref? unique?] + (conj + (mapcat + (fn [[_ e a v] searchid] + (let [[v tag] (->SQLite a v)] + ;; First query: ensure the value exists. + [["INSERT INTO fulltext_values_view (text, searchid) VALUES (?, ?)" + v searchid] + + ;; Second and third queries: tx_lookup. + [(str + "WITH vv(rowid) AS (SELECT rowid FROM fulltext_values WHERE searchid = ?) " + "INSERT INTO tx_lookup (e0, a0, v0, tx0, added0, value_type_tag0, index_avet0, index_vaet0, index_fulltext0, unique_value0, sv, svalue_type_tag) VALUES " + "(?, ?, (SELECT rowid FROM vv), ?, 1, ?, ?, ?, 1, ?, (SELECT rowid FROM vv), ?)") + searchid + e a tx tag + (indexing? a) ; index_avet + (ref? a) ; index_vaet + (unique? a) ; unique_value + tag] + + [(str + "INSERT INTO tx_lookup (e0, a0, v0, tx0, added0, value_type_tag0) VALUES " + "(?, ?, (SELECT rowid FROM fulltext_values WHERE searchid = ?), ?, 0, ?)") + e a searchid tx tag]])) + ops + (range 3000 999999999)) + ["UPDATE fulltext_values SET searchid = NULL WHERE searchid IS NOT NULL"])) + +(defn- -run-queries [conn queries exception-message] + (go-pair + (try + (doseq [q queries] + (SQLite (partial ds/->SQLite schema) + fulltext? (memoize (partial ds/fulltext? schema)) + many? (memoize (fn [a] (ds/multival? schema a))) + indexing? (memoize (fn [a] (ds/indexing? schema a))) + ref? (memoize (fn [a] (ds/ref? schema a))) + unique? (memoize (fn [a] (ds/unique? schema a))) + conn (:sqlite-connection db) + + ;; Collect all the queries we need to run. + queries (atom []) + operations (group-by first entities)] + + (when-not (clojure.set/subset? (keys operations) #{:db/retract :db/add}) + (raise (str "Unknown operations " (keys operations)) + {:error :transact/syntax, :operations (dissoc operations :db/retract :db/add)})) + + ;; We can turn all non-FTS operations into simple SQL queries that we run serially. + ;; FTS queries require us to get a rowid from the FTS table and use that for + ;; insertion, so we need another pass. + ;; We can't just freely use `go-pair` here, because this function is so complicated + ;; that ClojureScript blows the stack trying to compile it. + + (when-let [retractions (:db/retract operations)] + (swap! + queries concat (retractions->queries retractions tx fulltext? ->SQLite))) + + ;; We want to partition our additions into four groups according to two + ;; characteristics: whether they require writing to the FTS value table, + ;; and whether the attribute has a 'many' cardinality constraint. Each of + ;; these four requires different queries. + (let [additions + (group-by (fn [[op e a v]] + (if (fulltext? a) + (if (many? a) + :fts-many + :fts-one) + (if (many? a) + :non-fts-many + :non-fts-one))) + (:db/add operations)) + transforms + {:fts-one fts-one->queries + :fts-many fts-many->queries + :non-fts-one non-fts-one->queries + :non-fts-many non-fts-many->queries}] + + (doseq [[key ops] additions] + (when-let [transform (key transforms)] + (swap! + queries concat + (transform ops tx ->SQLite indexing? ref? unique?))))) + + ;; Now run each query. + ;; This code is a little tortured to avoid blowing the compiler stack in cljs. + + (go-pair + (Datom schema) + (SQLite schema a v)] + (let [schema (.-schema db) ;; TODO: understand why (schema db) fails. + a (entid db a) + [v tag] (ds/->SQLite schema a v) + yield-datom + (fn [rows] + (when-let [row (first rows)] + (row->Datom schema row)))] (go-pair (->> - {:select [:e :a :v :tx [1 :added]] ;; TODO: generalize columns. - :from [:all_datoms] - :where [:and [:= :index_avet 1] [:= :a a] [:= :value_type_tag tag] [:= :v v]]} - (s/format) ;; TODO: format these statements only once. + ;; TODO: generalize columns. + ["SELECT e, a, v, tx, 1 AS added FROM all_datoms + WHERE index_avet = 1 AND a = ? AND value_type_tag = ? AND v = ? + LIMIT 1" a tag v] (s/all-rows (:sqlite-connection db)) - (Datom (.-schema db))))))) ;; TODO: understand why (schema db) fails. + SQLite schema a v) - fulltext? (ds/fulltext? schema a)] - (cond - (= op :db/add) - (let [v (if fulltext? - (> - (s/all-rows (:sqlite-connection db) ["SELECT * FROM transactions WHERE tx = ?" tx]) - (Datom schema))))] - tx-data)))) (