Process <transact! in a transaction; add hook for processing :db.part/db changes.

This commit is contained in:
Nick Alexander 2016-07-25 08:07:53 -07:00 committed by Richard Newman
parent 96caadb189
commit 513f23c45c
4 changed files with 457 additions and 5 deletions

View file

@ -14,21 +14,47 @@
[datomish.source :as source]
[datomish.sqlite :as s]
[datomish.sqlite-schema :as sqlite-schema]
[datomish.util :as util :refer [raise-str]]
[datomish.util :as util :refer [raise raise-str]]
#?@(:clj [[datomish.pair-chan :refer [go-pair <?]]
[clojure.core.async :as a :refer [chan go <! >!]]])
#?@(:cljs [[datomish.pair-chan]
[cljs.core.async :as a :refer [chan <! >!]]])))
(defprotocol IDB
(idents
[db]
"Return map {ident -> entid} if known idents. See http://docs.datomic.com/identity.html#idents.")
(query-context
[db])
(close
[db]
"Close this database. Returns a pair channel of [nil error]."))
(defrecord DB [sqlite-connection]
(defn db? [x]
(and (satisfies? IDB x)))
;; TODO: implement support for DB parts?
(def tx0 0x2000000)
;; TODO: write tx-meta to transaction.
(defrecord TxReport [tx-data tempids tx-meta])
;; TODO: persist max-tx and max-eid in SQLite.
(defn <allocate-tx [db]
(go-pair
(swap! (:current-tx db) inc)))
;; TODO: add fancy destructuring.
;; TODO: handle reading.
(deftype Datom [e a v tx added])
(defn datom? [x] (instance? Datom x))
(defrecord DB [sqlite-connection idents max-tx]
IDB
(idents [db] @(:idents db))
(query-context [db] (context/->Context (source/datoms-source db) nil nil))
(close [db] (s/close (.-sqlite-connection db))))
@ -36,7 +62,58 @@
(go-pair
(when-not (= sqlite-schema/current-version (<? (sqlite-schema/<ensure-current-version sqlite-connection)))
(raise-str "Could not ensure current SQLite schema version."))
(->DB sqlite-connection)))
(map->DB {:sqlite-connection sqlite-connection
:idents (atom {:db/txInstant 100 :x 101 :y 102}) ;; TODO: pre-populate idents and SQLite tables?
:current-tx (atom (dec tx0))}))) ;; TODO: get rid of dec.
#?(:clj
(defmethod print-method Datom [^Datom d, ^java.io.Writer w]
(.write w (str "#datomish/Datom "))
(binding [*out* w]
(pr [(.-e d) (.-a d) (.-v d) (.-tx d) (.-added d)]))))
;; TODO: implement schemas.
(defn multival? [db attr] false)
;; TODO: implement schemas.
(defn ref? [db attr] false)
(defn <entid [db eid]
{:pre [(db? db)]}
(go-pair
(cond
(number? eid)
eid
(keyword? eid)
;; Turn ident into entid if possible.
(get (idents db) eid eid)
(sequential? eid)
(raise "Lookup ref for entity id not yet supported, got " eid
{:error :entity-id/syntax
:entity-id eid})
:else
(raise "Expected number or lookup ref for entity id, got " eid
{:error :entity-id/syntax
:entity-id eid}))))
(defn <entid-strict [db eid]
{:pre [(db? db)]}
(go-pair
(or (<? (<entid db eid))
(raise "Nothing found for entity id " eid
{:error :entity-id/missing
:entity-id eid}))))
(defn <entid-some [db eid]
{:pre [(db? db)]}
(go-pair
(when eid
(<? (<entid-strict db eid)))))
(defn <?run
"Execute the provided query on the provided DB.
@ -46,7 +123,7 @@
(let [parsed (query/parse find)
context (-> db
query-context
(query/expand-find-into-context parsed))
(query/find-into-context parsed))
row-pair-transducer (projection/row-pair-transducer context)
sql (query/context->sql-string context args)
chan (chan 50 row-pair-transducer)]
@ -67,4 +144,3 @@
[db find args]
(a/reduce (partial reduce-error-pair conj) [[] nil]
(<?run db find args)))

243
src/datomish/transact.cljc Normal file
View file

@ -0,0 +1,243 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
(ns datomish.transact
#?(:cljs
(:require-macros
[datomish.pair-chan :refer [go-pair <?]]
[cljs.core.async.macros :refer [go]]))
#?(:clj (:import [datomish.db Datom TxReport]))
(:require
[datomish.context :as context]
[datomish.db :as db :refer [#?@(:cljs [Datom TxReport]) db?]]
[datomish.projection :as projection]
[datomish.query :as query]
[datomish.source :as source]
[datomish.sqlite :as s]
[datomish.sqlite-schema :as sqlite-schema]
[datomish.util :as util :refer [raise raise-str]]
[honeysql.core :as sql]
#?@(:clj [[datomish.pair-chan :refer [go-pair <?]]
[clojure.core.async :as a :refer [chan go <! >!]]])
#?@(:cljs [[datomish.pair-chan]
[cljs.core.async :as a :refer [chan <! >!]]])))
(defn- tx-id? [e]
(= e :db/current-tx))
(defn- validate-eid [eid at]
(when-not (number? eid)
(raise "Bad entity id " eid " at " at ", expected number"
{:error :transact/syntax, :entity-id eid, :context at})))
(defn- validate-attr [attr at]
(when-not (number? attr)
(raise "Bad entity attribute " attr " at " at ", expected number"
{:error :transact/syntax, :attribute attr, :context at})))
(defn- validate-val [v at]
(when (nil? v)
(raise "Cannot store nil as a value at " at
{:error :transact/syntax, :value v, :context at})))
;; TODO: handle _?
(defn search->sql-clause [pattern]
(merge
{:select [:*] ;; e :a :v :tx] ;; TODO: generalize columns.
:from [:datoms]}
(if-not (empty? pattern)
{:where (cons :and (map #(vector := %1 %2) [:e :a :v :tx] pattern))} ;; TODO: use schema to v.
{})))
(defn <search [db pattern]
{:pre [(db/db? db)]}
(go-pair
;; TODO: find a better expression of this pattern.
(let [rows (<? (->>
(search->sql-clause pattern)
(sql/format)
(s/all-rows (:sqlite-connection db))))]
(mapv #(Datom. (:e %) (:a %) (:v %) (:tx %) true) rows))))
(defn- <transact-report [db report datom]
{:pre [(db/db? db)]}
(go-pair
(let [exec (partial s/execute! (:sqlite-connection db))
[e a v tx added] [(.-e datom) (.-a datom) (.-v datom) (.-tx datom) (.-added datom)]] ;; TODO: destructuring.
(validate-eid e [e a v tx added]) ;; TODO: track original vs. transformed?
;; Append to transaction log.
(<? (exec
["INSERT INTO transactions VALUES (?, ?, ?, ?, ?)" e a v tx added]))
;; Update materialized datom view.
(if (.-added datom)
(<? (exec
;; TODO: use schema to insert correct indexing flags.
["INSERT INTO datoms VALUES (?, ?, ?, ?, 0, 0)" e a v tx]))
(<? (exec
;; TODO: verify this is correct.
["DELETE FROM datoms WHERE (e = ? AND a = ? AND v = ?)" e a v])))
(-> report
(update-in [:tx-data] conj datom)))))
(defn- <transact-add [db report [_ e a v tx :as entity]]
{:pre [(db/db? db)]}
(go-pair
(validate-attr a entity)
(validate-val v entity)
(let [tx (or tx (:current-tx report))
e (<? (db/<entid-strict db e))
v (if (db/ref? db a) (<? (db/<entid-strict db v)) v)
datom (Datom. e a v tx true)]
(if (db/multival? db a)
;; TODO: consider adding a UNIQUE CONSTRAINT and using INSERT OR IGNORE.
(if (empty? (<? (<search db [e a v])))
(<? (<transact-report db report datom))
report)
(if-let [^Datom old-datom (first (<? (<search db [e a])))]
(if (= (.-v old-datom) v)
report
(let [ra (<? (<transact-report db report (Datom. e a (.-v old-datom) tx false)))
rb (<? (<transact-report db ra datom))]
rb)) ;; TODO: express this better.
(<? (<transact-report db report datom)))))))
(defn- <transact-retract [db report [_ e a v _ :as entity]] ;; TODO: think about retracting with tx.
{:pre [(db/db? db)]}
(go-pair
(let [tx (:current-tx report)]
(if-let [e (<? (db/<entid db e))]
(let [v (if (db/ref? db a) (<? (db/<entid-strict db v)) v)]
(validate-attr a entity)
(validate-val v entity)
(if-let [old-datom (first (<? (<search db [e a v])))]
(<? (<transact-report db report (Datom. e a v tx false)))
report))
report))))
(defn- #?@(:clj [^Boolean neg-number?]
:cljs [^boolean neg-number?])
[x]
(and (number? x) (neg? x)))
(defn <transact-tx-data
[db now initial-report initial-es]
{:pre [(db/db? db)]}
(go-pair
(when-not (or (nil? initial-es)
(sequential? initial-es))
(raise "Bad transaction data " initial-es ", expected sequential collection"
{:error :transact/syntax, :tx-data initial-es}))
(loop [report initial-report
es initial-es]
(let [[entity & entities] es
current-tx (:current-tx report)]
(cond
(nil? entity)
;; We're done! Add transaction datom to the report.
(do
;; TODO: don't special case :db/txInstant attribute.
(<? (<transact-report db report (Datom. current-tx (get (db/idents db) :db/txInstant) now current-tx true)))
(-> report
(assoc-in [:tempids :db/current-tx] current-tx)))
(map? entity)
(raise "Map entities are not yet supported, got " entity
{:error :transact/syntax
:op entity })
(sequential? entity)
(let [[op e a v] entity]
(cond
(keyword? a)
(if-let [entid (get (db/idents db) a)]
(recur report (cons [op e entid v] entities))
(raise "No entid found for ident " a
{:error :transact/syntax
:op entity}))
(= op :db.fn/call)
(raise "DataScript's transactor functions are not yet supported, got " entity
{:error :transact/syntax
:op entity })
(= op :db.fn/cas)
(raise "Datomic's compare-and-swap is not yet supported, got " entity
{:error :transact/syntax
:op entity })
(tx-id? e)
(recur report (cons [op current-tx a v] entities))
(and (db/ref? db a) (tx-id? v))
(recur report (cons [op e a current-tx] entities))
(neg-number? e)
(if (not= op :db/add)
(raise "Negative entity ids are resolved for :db/add only"
{:error :transact/syntax
:op entity })
(raise "Negative entity ids are not yet supported, got " entity
{:error :transact/syntax
:op entity }))
(and (db/ref? db a) (neg-number? v))
(raise "Negative entity ids are not yet supported, got " entity
{:error :transact/syntax
:op entity })
(= op :db/add)
(recur (<? (<transact-add db report entity)) entities)
(= op :db/retract)
(recur (<? (<transact-retract db report entity)) entities)
(= op :db.fn/retractAttribute)
(raise "DataScript's :db.fn/retractAttribute shortcut is not yet supported, got " entity
{:error :transact/syntax
:op entity })
(= op :db.fn/retractEntity)
(raise "Datomic's :db.fn/retractEntity shortcut is not yet supported, got " entity
{:error :transact/syntax
:op entity })
:else
(raise "Unknown operation at " entity ", expected :db/add, :db/retract, :db.fn/call, :db.fn/retractAttribute or :db.fn/retractEntity"
{:error :transact/syntax, :operation op, :tx-data entity})))
(db/datom? entity)
(raise "Datom entities are not yet supported, got " entity
{:error :transact/syntax
:op entity })
:else
(raise "Bad entity type at " entity ", expected map or vector"
{:error :transact/syntax, :tx-data entity})
)))))
(defn <process-db-part
"Transactions may add idents, install new partitions, and install new schema attributes. Handle
them, atomically, here."
[db report]
(go-pair
nil))
(defn <transact!
([db tx-data]
(<transact! db tx-data nil 0xdeadbeef)) ;; TODO: timestamp!
([db tx-data tx-meta now]
{:pre [(db? db)]}
(s/in-transaction!
(:sqlite-connection db)
#(go-pair
(let [current-tx (<? (db/<allocate-tx db))
report (<? (<transact-tx-data db now
(db/map->TxReport
{:current-tx current-tx
:tx-data []
:tempids {}
:tx-meta tx-meta}) tx-data))]
(<? (<process-db-part db report))
report)))))

131
test/datomish/db_test.cljc Normal file
View file

@ -0,0 +1,131 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
(ns datomish.db-test
#?(:cljs
(:require-macros
[datomish.pair-chan :refer [go-pair <?]]
[datomish.node-tempfile-macros :refer [with-tempfile]]
[cljs.core.async.macros :as a :refer [go]]))
(:require
[datomish.util :as util #?(:cljs :refer-macros :clj :refer) [raise cond-let]]
[datomish.sqlite :as s]
[datomish.db :as db]
[datomish.transact :as transact]
#?@(:clj [[datomish.pair-chan :refer [go-pair <?]]
[tempfile.core :refer [tempfile with-tempfile]]
[datomish.test-macros :refer [deftest-async]]
[clojure.test :as t :refer [is are deftest testing]]
[clojure.core.async :refer [go <! >!]]])
#?@(:cljs [[datomish.pair-chan]
[datomish.test-macros :refer-macros [deftest-async]]
[datomish.node-tempfile :refer [tempfile]]
[cljs.test :as t :refer-macros [is are deftest testing async]]
[cljs.core.async :as a :refer [<! >!]]])))
(defn <datoms [db]
(go-pair
(->>
(<? (s/all-rows (:sqlite-connection db) ["SELECT e, a, v, tx FROM datoms ORDER BY tx ASC, e, a, v"]))
(mapv #(vector (:e %) (:a %) (:v %) (:tx %) true)))))
(defn <transactions [db]
(go-pair
(->>
(<? (s/all-rows (:sqlite-connection db) ["SELECT e, a, v, tx, added FROM transactions ORDER BY tx ASC, e, a, v, added"]))
(mapv #(vector (:e %) (:a %) (:v %) (:tx %) (:added %))))))
(deftest-async test-add-one
(with-tempfile [t (tempfile)]
(let [c (<? (s/<sqlite-connection t))
db (<? (db/<with-sqlite-connection c))]
(try
(let [now -1
txInstant (<? (db/<entid db :db/txInstant)) ;; TODO: convert entids to idents on egress.
x (<? (db/<entid db :x)) ;; TODO: convert entids to idents on egress.
report (<? (transact/<transact! db [[:db/add 0 :x "valuex"]] nil now))
current-tx (:current-tx report)]
(is (= current-tx db/tx0))
(is (= (<? (<datoms db))
[[0 x "valuex" db/tx0 true]
[db/tx0 txInstant now db/tx0 true]]))
(is (= (<? (<transactions db))
[[0 x "valuex" db/tx0 1] ;; TODO: true, not 1.
[db/tx0 txInstant now db/tx0 1]])))
(finally
(<? (db/close db)))))))
(deftest-async test-add-two
(with-tempfile [t (tempfile)]
(let [c (<? (s/<sqlite-connection t))
db (<? (db/<with-sqlite-connection c))]
(try
(let [now -1
txInstant (<? (db/<entid db :db/txInstant)) ;; TODO: convert entids to idents on egress.
x (<? (db/<entid db :x)) ;; TODO: convert entids to idents on egress.
y (<? (db/<entid db :y)) ;; TODO: convert entids to idents on egress.
report (<? (transact/<transact! db [[:db/add 0 :x "valuex"] [:db/add 1 :y "valuey"]] nil now))
current-tx (:current-tx report)]
(is (= current-tx db/tx0))
(is (= (<? (<datoms db))
[[0 x "valuex" db/tx0 true]
[1 y "valuey" db/tx0 true]
[db/tx0 txInstant now db/tx0 true]]))
(is (= (<? (<transactions db))
[[0 x "valuex" db/tx0 1] ;; TODO: true, not 1.
[1 y "valuey" db/tx0 1]
[db/tx0 txInstant now db/tx0 1]])))
(finally
(<? (db/close db)))))))
;; TODO: test multipe :add and :retract of the same datom in the same transaction.
(deftest-async test-retract
(with-tempfile [t (tempfile)]
(let [c (<? (s/<sqlite-connection t))
db (<? (db/<with-sqlite-connection c))]
(try
(let [now -1
txInstant (<? (db/<entid db :db/txInstant)) ;; TODO: convert entids to idents on egress.
x (<? (db/<entid db :x)) ;; TODO: convert entids to idents on egress.
ra (<? (transact/<transact! db [[:db/add 0 :x "valuex"]] nil now))
rb (<? (transact/<transact! db [[:db/retract 0 :x "valuex"]] nil now))
txa (:current-tx ra)
txb (:current-tx rb)]
(is (= (<? (<datoms db))
[[txa txInstant now txa true]
[txb txInstant now txb true]]))
(is (= (<? (<transactions db))
[[0 x "valuex" txa 1] ;; TODO: true, not 1.
[txa txInstant -1 txa 1]
[0 x "valuex" txb 0]
[txb txInstant -1 txb 1]])))
(finally
(<? (db/close db)))))))
(defn result=
"Query results are unordered. Do a set-wise comparison instead."
[expected actual]
(= (set expected)
(set actual)))
(deftest-async test-q
(with-tempfile [t (tempfile)]
(let [c (<? (s/<sqlite-connection t))
db (<? (db/<with-sqlite-connection c))]
(try
(let [now -1
txInstant (<? (db/<entid db :db/txInstant)) ;; TODO: convert entids to idents on egress.
x (<? (db/<entid db :x)) ;; TODO: convert entids to idents on egress.
report (<? (transact/<transact! db [[:db/add 0 :x "valuex"]] nil now))
current-tx (:current-tx report)]
(is (= current-tx db/tx0))
(is (= (<? (<datoms db))
[[0 x "valuex" db/tx0 true]
[db/tx0 txInstant now db/tx0 true]]))
(is (result=
[[0 x "valuex" db/tx0] ;; TODO: include added.
[db/tx0 txInstant now db/tx0]]
(<? (db/<?q db '[:find ?e ?a ?v ?tx :in $ :where [?e ?a ?v ?tx]] {})))))
(finally
(<? (db/close db)))))))

View file

@ -3,6 +3,7 @@
[doo.runner :refer-macros [doo-tests doo-all-tests]]
[cljs.test :as t :refer-macros [is are deftest testing]]
datomish.promise-sqlite-test
datomish.db-test
datomish.sqlite-user-version-test
datomish.test.util
datomish.test.transforms
@ -11,6 +12,7 @@
(doo-tests
'datomish.promise-sqlite-test
'datomish.db-test
'datomish.sqlite-user-version-test
'datomish.test.util
'datomish.test.transforms