Make <transact! run in a critical section. (#80)

This commit is contained in:
Nick Alexander 2016-10-11 20:25:40 -07:00
parent 2081ca4563
commit a8ad79d0e6
3 changed files with 160 additions and 15 deletions

View file

@ -6,7 +6,7 @@
#?(:cljs
(:require-macros
[datomish.pair-chan :refer [go-pair <?]]
[cljs.core.async.macros :refer [go]]))
[cljs.core.async.macros :refer [go go-loop]]))
(:require
[datomish.query.context :as context]
[datomish.query.projection :as projection]
@ -25,7 +25,7 @@
[taoensso.tufte :as tufte
#?(:cljs :refer-macros :clj :refer) [defnp p profiled profile]]
#?@(:clj [[datomish.pair-chan :refer [go-pair <?]]
[clojure.core.async :as a :refer [chan go <! >!]]])
[clojure.core.async :as a :refer [chan go go-loop <! >!]]])
#?@(:cljs [[datomish.pair-chan]
[cljs.core.async :as a :refer [chan <! >!]]]))
#?(:clj
@ -45,9 +45,11 @@
[conn]
"Get the full transaction history DB associated with this connection."))
(defrecord Connection [current-db]
(defrecord Connection [current-db transact-chan]
IConnection
(close [conn] (db/close-db @(:current-db conn)))
(close [conn]
(a/close! (:transact-chan conn))
(db/close-db @(:current-db conn)))
(db [conn] @(:current-db conn))
@ -98,12 +100,15 @@
;; #?(:cljs
;; (doseq [[tag cb] data-readers] (cljs.reader/register-tag-parser! tag cb)))
;; TODO: implement support for DB parts?
(declare start-transactor)
(defn connection-with-db [db]
(map->Connection {:current-db (atom db)}))
;; ;; TODO: persist max-tx and max-eid in SQLite.
(let [connection
(map->Connection {:current-db (atom db)
:transact-chan (a/chan (util/unlimited-buffer))
})]
(start-transactor connection)
connection))
(defn maybe-datom->entity [entity]
(cond
@ -552,12 +557,34 @@
(:db-after (<? (<with db tx-data)))))
(defn <transact!
"Submits a transaction to the database for writing.
Returns a pair-chan resolving to `[result error]`."
[conn tx-data]
{:pre [(conn? conn)]}
(let [db (db conn)] ;; TODO: be careful with swapping atoms.
(db/in-transaction!
db
#(go-pair
(let [report (<? (<with db tx-data))]
(reset! (:current-db conn) (:db-after report))
report)))))
(let [result (a/chan 1)]
;; Any race to put! is a real race between callers of <transact!. We can't just park on put!,
;; because the parked putter that is woken is non-deterministic.
(a/put! (:transact-chan conn) [tx-data result])
result))
(defn- start-transactor [conn]
(let [token-chan (a/chan 1)]
(go
(>! token-chan (gensym "transactor-token"))
(loop []
(let [token (<! token-chan)]
(when-let [[tx-data result] (<! (:transact-chan conn))]
(let [pair
(<! (go-pair ;; Catch exceptions, return the pair.
(let [db (db conn)
report (<? (db/in-transaction!
db
#(-> (<with db tx-data))))]
;; We only get here if the transaction is committed.
(reset! (:current-db conn) (:db-after report))
report)))]
(>! result pair))
(a/close! result)
(>! token-chan token)
(recur)))))))

View file

@ -9,6 +9,7 @@
datomish.schema-test
datomish.sqlite-user-version-test
datomish.tofinoish-test
datomish.transact-test
datomish.util-test
datomish.test.transforms
datomish.test.query
@ -23,6 +24,7 @@
'datomish.schema-test
'datomish.sqlite-user-version-test
'datomish.tofinoish-test
'datomish.transact-test
'datomish.util-test
'datomish.test.transforms
'datomish.test.query

View file

@ -0,0 +1,116 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
(ns datomish.transact-test
#?(:cljs
(:require-macros
[datomish.pair-chan :refer [go-pair <?]]
[datomish.node-tempfile-macros :refer [with-tempfile]]
[cljs.core.async.macros :as a :refer [go go-loop]]))
(:require
[datomish.api :as d]
[datomish.db.debug :refer [<datoms-after <datoms>= <transactions-after <shallow-entity <fulltext-values]]
[datomish.util :as util #?(:cljs :refer-macros :clj :refer) [raise cond-let]]
[datomish.schema :as ds]
[datomish.simple-schema]
[datomish.sqlite :as s]
[datomish.sqlite-schema]
[datomish.datom]
#?@(:clj [[datomish.jdbc-sqlite]
[datomish.pair-chan :refer [go-pair <?]]
[tempfile.core :refer [tempfile with-tempfile]]
[datomish.test-macros :refer [deftest-async deftest-db]]
[clojure.test :as t :refer [is are deftest testing]]
[clojure.core.async :as a :refer [go go-loop <! >!]]])
#?@(:cljs [[datomish.js-sqlite]
[datomish.pair-chan]
[datomish.test-macros :refer-macros [deftest-async deftest-db]]
[datomish.node-tempfile :refer [tempfile]]
[cljs.test :as t :refer-macros [is are deftest testing async]]
[cljs.core.async :as a :refer [<! >!]]]))
#?(:clj
(:import [clojure.lang ExceptionInfo]))
#?(:clj
(:import [datascript.db DB])))
#?(:cljs
(def Throwable js/Error))
(defn- tempids [tx]
(into {} (map (juxt (comp :idx first) second) (:tempids tx))))
(def test-schema
[{:db/id (d/id-literal :db.part/user)
:db/ident :x
:db/unique :db.unique/identity
:db/valueType :db.type/long
:db.install/_attribute :db.part/db}
{:db/id (d/id-literal :db.part/user)
:db/ident :name
:db/unique :db.unique/identity
:db/valueType :db.type/string
:db.install/_attribute :db.part/db}
{:db/id (d/id-literal :db.part/user)
:db/ident :y
:db/cardinality :db.cardinality/many
:db/valueType :db.type/long
:db.install/_attribute :db.part/db}
{:db/id (d/id-literal :db.part/user)
:db/ident :aka
:db/cardinality :db.cardinality/many
:db/valueType :db.type/string
:db.install/_attribute :db.part/db}
{:db/id (d/id-literal :db.part/user)
:db/ident :age
:db/valueType :db.type/long
:db.install/_attribute :db.part/db}
{:db/id (d/id-literal :db.part/user)
:db/ident :email
:db/unique :db.unique/identity
:db/valueType :db.type/string
:db.install/_attribute :db.part/db}
{:db/id (d/id-literal :db.part/user)
:db/ident :spouse
:db/unique :db.unique/value
:db/valueType :db.type/string
:db.install/_attribute :db.part/db}
{:db/id (d/id-literal :db.part/user)
:db/ident :friends
:db/cardinality :db.cardinality/many
:db/valueType :db.type/ref
:db.install/_attribute :db.part/db}
])
(deftest-db test-overlapping-transacts conn
(let [{tx0 :tx} (<? (d/<transact! conn test-schema))
report0 (<? (d/<transact! conn [{:db/id (d/id-literal :db.part/user -1)
:name "Petr"}]))
id0 (get (tempids report0) -1)
n 5
make-t (fn [i]
;; Be aware that a go block with a parking operation here
;; can change the order of transaction evaluation, since the
;; parking operation will be unparked non-deterministically.
(d/<transact! conn [{:db/id (d/id-literal :db.part/user -1)
:name "Petr"
:email (str "@" i)}]))]
;; Wait for all transactions to complete.
(<! (a/into []
(a/merge
(map make-t (range n)))))
;; Transactions should be processed in order. This is an awkward way to
;; express the expected data, but it's robust in the face of changing default
;; identities, transaction numbers, and values of n.
(is (= (concat [[id0 :name "Petr" (+ 1 tx0) 1]
[id0 :email "@0" (+ 2 tx0) 1]]
(mapcat
#(-> [[id0 :email (str "@" %) (+ 3 % tx0) 0]
[id0 :email (str "@" (inc %)) (+ 3 % tx0) 1]])
(range 0 (dec n))))
(filter #(not= :db/txInstant (second %)) (<? (<transactions-after (d/db conn) tx0)))))))
#_ (time (t/run-tests))