Serialize <transact!; add {un}listen{-chan}!. r=rnewman (#61, #80)

This commit is contained in:
Nick Alexander 2016-10-13 16:29:24 -07:00
commit ed545d4a11
7 changed files with 498 additions and 29 deletions

View file

@ -26,6 +26,10 @@
(def <transact! transact/<transact!)
(def listen! transact/listen!)
(def listen-chan! transact/listen-chan!)
(def unlisten-chan! transact/unlisten-chan!)
;; TODO: use Potemkin, or a subset of Potemkin that is CLJS friendly (like
;; https://github.com/ztellman/potemkin/issues/31) to improve this re-exporting process.
(def <close transact/close)

View file

@ -111,9 +111,12 @@
(in-transaction!
[db chan-fn]
"Evaluate the given pair-chan `chan-fn` in an exclusive transaction. If it returns non-nil,
commit the transaction; otherwise, rollback the transaction. Returns a pair-chan resolving to
the pair-chan returned by `chan-fn`.")
"Evaluate the given `chan-fn` in an exclusive transaction. If it returns non-nil,
commit the transaction; otherwise, rollback the transaction.
`chan-fn` should be a function of no arguments returning a pair-chan.
Returns a pair-chan resolving to the same pair as the pair-chan returned by `chan-fn`.")
(<bootstrapped? [db]
"Return true if this database has no transactions yet committed.")

View file

@ -6,7 +6,7 @@
#?(:cljs
(:require-macros
[datomish.pair-chan :refer [go-pair <?]]
[cljs.core.async.macros :refer [go]]))
[cljs.core.async.macros :refer [go go-loop]]))
(:require
[datomish.query.context :as context]
[datomish.query.projection :as projection]
@ -25,7 +25,7 @@
[taoensso.tufte :as tufte
#?(:cljs :refer-macros :clj :refer) [defnp p profiled profile]]
#?@(:clj [[datomish.pair-chan :refer [go-pair <?]]
[clojure.core.async :as a :refer [chan go <! >!]]])
[clojure.core.async :as a :refer [chan go go-loop <! >!]]])
#?@(:cljs [[datomish.pair-chan]
[cljs.core.async :as a :refer [chan <! >!]]]))
#?(:clj
@ -35,7 +35,9 @@
(defprotocol IConnection
(close
[conn]
"Close this connection. Returns a pair channel of [nil error].")
"Close this connection. Returns a pair channel of [nil error].
Closing a closed connection is a no-op.")
(db
[conn]
@ -45,9 +47,21 @@
[conn]
"Get the full transaction history DB associated with this connection."))
(defrecord Connection [current-db]
(defrecord Connection [closed? current-db transact-chan]
IConnection
(close [conn] (db/close-db @(:current-db conn)))
(close [conn]
(go-pair ;; Always want to return a pair-chan.
(when (compare-and-set! (:closed? conn) false true)
(let [result (a/chan 1)]
;; Ask for the underlying database to be closed while (usually, after) draining the queue.
;; Invariant: we see :sentinel-close in the transactor queue at most once.
(a/put! (:transact-chan conn) [:sentinel-close nil result true])
;; This immediately stops <transact! enqueueing new transactions.
(a/close! (:transact-chan conn))
;; The transactor will close the underlying DB after draining the queue; by waiting for
;; result, we can raise any error from closing the DB and ensure that the DB is really
;; closed after waiting for the connection to close.
(<? result)))))
(db [conn] @(:current-db conn))
@ -98,12 +112,26 @@
;; #?(:cljs
;; (doseq [[tag cb] data-readers] (cljs.reader/register-tag-parser! tag cb)))
;; TODO: implement support for DB parts?
(declare start-transactor)
(defn connection-with-db [db]
(map->Connection {:current-db (atom db)}))
;; Puts to listener-source may park if listener-mult can't distribute them fast enough. Since the
;; underlying taps are asserted to be be unblocking, the parking time should be very short.
(let [listener-source
(a/chan 1)
;; ;; TODO: persist max-tx and max-eid in SQLite.
listener-mult
(a/mult listener-source) ;; Just for tapping.
connection
(map->Connection {:closed? (atom false)
:current-db (atom db)
:listener-source listener-source
:listener-mult listener-mult
:transact-chan (a/chan (util/unlimited-buffer))
})]
(start-transactor connection)
connection))
(defn maybe-datom->entity [entity]
(cond
@ -552,12 +580,99 @@
(:db-after (<? (<with db tx-data)))))
(defn <transact!
[conn tx-data]
"Submits a transaction to the database for writing.
Returns a pair-chan resolving to `[result error]`."
([conn tx-data]
(<transact! conn tx-data (a/chan 1) true))
([conn tx-data result close?]
{:pre [(conn? conn)]}
(let [db (db conn)] ;; TODO: be careful with swapping atoms.
(db/in-transaction!
;; Any race to put! is a real race between callers of <transact!. We can't just park on put!,
;; because the parked putter that is woken is non-deterministic.
(let [closed? (not (a/put! (:transact-chan conn) [:sentinel-transact tx-data result close?]))]
(go-pair
;; We want to return a pair-chan, no matter what kind of channel result is.
(if closed?
(raise "Connection is closed" {:error :transact/connection-closed})
(<? result))))))
(defn- start-transactor [conn]
(let [token-chan (a/chan 1)]
(go
(>! token-chan (gensym "transactor-token"))
(loop []
(when-let [token (<! token-chan)]
(when-let [[sentinel tx-data result close?] (<! (:transact-chan conn))]
(let [pair
(<! (go-pair ;; Catch exceptions, return the pair.
(case sentinel
:sentinel-close
;; Time to close the underlying DB.
(<? (db/close-db @(:current-db conn)))
;; Default: process the transaction.
(do
(when @(:closed? conn)
;; Drain enqueued transactions.
(raise "Connection is closed" {:error :transact/connection-closed}))
(let [db (db conn)
report (<? (db/in-transaction!
db
#(go-pair
(let [report (<? (<with db tx-data))]
#(-> (<with db tx-data))))]
(when report
;; <with returns non-nil or throws, but we still check report just in
;; case. Here, in-transaction! function completed and returned non-nil,
;; so the transaction has committed.
(reset! (:current-db conn) (:db-after report))
report)))))
(>! (:listener-source conn) report))
report)))))]
;; Even when report is nil (transaction not committed), pair is non-nil.
(>! result pair))
(>! token-chan token)
(when close?
(a/close! result))
(recur)))))))
(defn listen-chan!
"Put reports successfully transacted against the given connection onto the given channel.
The listener sink channel must be unblocking.
Returns the channel listened to, for future unlistening."
[conn listener-sink]
{:pre [(conn? conn)]}
(when-not (util/unblocking-chan? listener-sink)
(raise "Listener sinks must be channels backed by unblocking buffers"
{:error :transact/bad-listener :listener-sink listener-sink}))
;; Tapping an already registered sink is a no-op.
(a/tap (:listener-mult conn) listener-sink)
listener-sink)
(defn- -listen-chan
[f n]
(let [c (a/chan (a/dropping-buffer n))]
(go-loop []
(when-let [v (<! c)]
(do
(f v)
(recur))))
c))
(defn listen!
"Evaluate the given function with reports successfully transacted against the given connection.
`f` should be a function of one argument, the transaction report.
Returns the channel listened to, for future calls to `unlisten-chan!`."
([conn f]
;; Decently large buffer before dropping, for JS consumers.
(listen! conn f 1024))
([conn f n]
{:pre [(fn? f) (pos? n)]}
(listen-chan! conn (-listen-chan f n))))
(defn unlisten-chan! [conn listener-sink]
"Stop putting reports successfully transacted against the given connection onto the given channel."
{:pre [(conn? conn)]}
;; Untapping an un-registered sink is a no-op.
(a/untap (:listener-mult conn) listener-sink))

View file

@ -3,9 +3,16 @@
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
(ns datomish.util
#?(:cljs (:require-macros datomish.util))
#?(:cljs
(:require-macros
[datomish.util]
[cljs.core.async.macros :refer [go go-loop]]))
(:require
[clojure.string :as str]))
[clojure.string :as str]
#?@(:clj [[clojure.core.async :as a :refer [go go-loop <! >!]]
[clojure.core.async.impl.protocols]])
#?@(:cljs [[cljs.core.async :as a :refer [<! >!]]
[cljs.core.async.impl.protocols]])))
#?(:clj
(defmacro raise-str
@ -101,3 +108,49 @@
(defn mapvals [f m]
(into (empty m) (map #(vector (first %) (f (second %))) m)))
(defn unblocking-chan?
"Returns true if the channel will never block. That is to say, puts
into this channel will never cause the buffer to be full."
[chan]
(a/unblocking-buffer?
;; See http://dev.clojure.org/jira/browse/ASYNC-181.
(#?(:cljs .-buf :clj .buf) chan)))
;; Modified from http://dev.clojure.org/jira/browse/ASYNC-23.
#?(:cljs
(deftype UnlimitedBuffer [buf]
cljs.core.async.impl.protocols/UnblockingBuffer
cljs.core.async.impl.protocols/Buffer
(full? [this]
false)
(remove! [this]
(.pop buf))
(add!* [this itm]
(.unshift buf itm))
(close-buf! [this])
cljs.core/ICounted
(-count [this]
(.-length buf))))
#?(:clj
(deftype UnlimitedBuffer [^java.util.LinkedList buf]
clojure.core.async.impl.protocols/UnblockingBuffer
clojure.core.async.impl.protocols/Buffer
(full? [this]
false)
(remove! [this]
(.removeLast buf))
(add!* [this itm]
(.addFirst buf itm))
(close-buf! [this])
clojure.lang.Counted
(count [this]
(.size buf))))
(defn unlimited-buffer []
(UnlimitedBuffer. #?(:cljs (array) :clj (java.util.LinkedList.))))

View file

@ -9,6 +9,7 @@
datomish.schema-test
datomish.sqlite-user-version-test
datomish.tofinoish-test
datomish.transact-test
datomish.util-test
datomish.test.transforms
datomish.test.query
@ -23,6 +24,7 @@
'datomish.schema-test
'datomish.sqlite-user-version-test
'datomish.tofinoish-test
'datomish.transact-test
'datomish.util-test
'datomish.test.transforms
'datomish.test.query

View file

@ -0,0 +1,281 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
(ns datomish.transact-test
#?(:cljs
(:require-macros
[datomish.pair-chan :refer [go-pair <?]]
[datomish.node-tempfile-macros :refer [with-tempfile]]
[cljs.core.async.macros :as a :refer [go go-loop]]))
(:require
[datomish.api :as d]
[datomish.db.debug :refer [<datoms-after <datoms>= <transactions-after <shallow-entity <fulltext-values]]
[datomish.util :as util #?(:cljs :refer-macros :clj :refer) [raise cond-let]]
[datomish.schema :as ds]
[datomish.simple-schema]
[datomish.sqlite :as s]
[datomish.sqlite-schema]
[datomish.datom]
#?@(:clj [[datomish.jdbc-sqlite]
[datomish.pair-chan :refer [go-pair <?]]
[tempfile.core :refer [tempfile with-tempfile]]
[datomish.test-macros :refer [deftest-async deftest-db]]
[clojure.test :as t :refer [is are deftest testing]]
[clojure.core.async :as a :refer [go go-loop <! >!]]])
#?@(:cljs [[datomish.js-sqlite]
[datomish.pair-chan]
[datomish.test-macros :refer-macros [deftest-async deftest-db]]
[datomish.node-tempfile :refer [tempfile]]
[cljs.test :as t :refer-macros [is are deftest testing async]]
[cljs.core.async :as a :refer [<! >!]]]))
#?(:clj
(:import [clojure.lang ExceptionInfo]))
#?(:clj
(:import [datascript.db DB])))
#?(:cljs
(def Throwable js/Error))
(defn- tempids [tx]
(into {} (map (juxt (comp :idx first) second) (:tempids tx))))
(def test-schema
[{:db/id (d/id-literal :db.part/user)
:db/ident :x
:db/unique :db.unique/identity
:db/valueType :db.type/long
:db.install/_attribute :db.part/db}
{:db/id (d/id-literal :db.part/user)
:db/ident :name
:db/unique :db.unique/identity
:db/valueType :db.type/string
:db.install/_attribute :db.part/db}
{:db/id (d/id-literal :db.part/user)
:db/ident :y
:db/cardinality :db.cardinality/many
:db/valueType :db.type/long
:db.install/_attribute :db.part/db}
{:db/id (d/id-literal :db.part/user)
:db/ident :aka
:db/cardinality :db.cardinality/many
:db/valueType :db.type/string
:db.install/_attribute :db.part/db}
{:db/id (d/id-literal :db.part/user)
:db/ident :age
:db/valueType :db.type/long
:db.install/_attribute :db.part/db}
{:db/id (d/id-literal :db.part/user)
:db/ident :email
:db/unique :db.unique/identity
:db/valueType :db.type/string
:db.install/_attribute :db.part/db}
{:db/id (d/id-literal :db.part/user)
:db/ident :spouse
:db/unique :db.unique/value
:db/valueType :db.type/string
:db.install/_attribute :db.part/db}
{:db/id (d/id-literal :db.part/user)
:db/ident :friends
:db/cardinality :db.cardinality/many
:db/valueType :db.type/ref
:db.install/_attribute :db.part/db}
])
(deftest-db test-overlapping-transacts conn
(let [{tx0 :tx} (<? (d/<transact! conn test-schema))
report0 (<? (d/<transact! conn [{:db/id (d/id-literal :db.part/user -1)
:name "Petr"}]))
id0 (get (tempids report0) -1)
n 5
make-t (fn [i]
;; Be aware that a go block with a parking operation here
;; can change the order of transaction evaluation, since the
;; parking operation will be unparked non-deterministically.
(d/<transact! conn [{:db/id (d/id-literal :db.part/user -1)
:name "Petr"
:email (str "@" i)}]))]
;; Wait for all transactions to complete.
(<! (a/into []
(a/merge ;; pair-chan's never stop providing values; use take to force close.
(map #(a/take 1 (make-t %)) (range n)))))
;; Transactions should be processed in order. This is an awkward way to
;; express the expected data, but it's robust in the face of changing default
;; identities, transaction numbers, and values of n.
(is (= (concat [[id0 :name "Petr" (+ 1 tx0) 1]
[id0 :email "@0" (+ 2 tx0) 1]]
(mapcat
#(-> [[id0 :email (str "@" %) (+ 3 % tx0) 0]
[id0 :email (str "@" (inc %)) (+ 3 % tx0) 1]])
(range 0 (dec n))))
(filter #(not= :db/txInstant (second %)) (<? (<transactions-after (d/db conn) tx0)))))))
(deftest-db test-listeners conn
(let [{tx0 :tx} (<? (d/<transact! conn test-schema))
c1 (a/chan (a/dropping-buffer 5))
c2 (a/chan (a/dropping-buffer 5))]
(testing "no listeners is okay"
;; So that we can upsert to concrete entids.
(<? (d/<transact! conn [[:db/add 101 :name "Ivan"]
[:db/add 102 :name "Petr"]])))
(testing "listeners are added, not accidentally notified of events before they were added"
(d/listen-chan! conn c1)
(d/listen-chan! conn c2)
;; This is not authoritative, because in an error situation a report may
;; be put! to a listener tap outside the expected flow. We should witness
;; such an occurrence later in the test.
(is (= nil (a/poll! c1)))
(is (= nil (a/poll! c2))))
(testing "unlistening to unrecognized key is ignored"
(d/unlisten-chan! conn (a/chan)))
(testing "listeners observe reports"
(<? (d/<transact! conn [[:db/add (d/id-literal :db.part/user -1) :name "Ivan"]]))
(is (= {-1 101}
(tempids (<! c1))))
(is (= {-1 101}
(tempids (<! c2))))
;; Again, not authoritative.
(is (= nil (a/poll! c1)))
(is (= nil (a/poll! c2))))
(testing "unlisten removes correct listener"
(d/unlisten-chan! conn c1)
(<? (d/<transact! conn [[:db/add (d/id-literal :db.part/user -2) :name "Petr"]]))
(is (= {-2 102}
(tempids (<! c2))))
;; Again, not authoritative.
(is (= nil (a/poll! c1))))
(testing "returning to no listeners is okay"
(d/unlisten-chan! conn c2)
(<? (d/<transact! conn [[:db/add (d/id-literal :db.part/user -1) :name "Petr"]]))
;; Again, not authoritative.
(is (= nil (a/poll! c1)))
(is (= nil (a/poll! c2)))
;; This should be authoritative, however. We should be able to put! due
;; to the size of the buffer, and we should take! what we put!.
(>! c1 :token-1)
(is (= :token-1 (<! c1)))
(>! c2 :token-1)
(is (= :token-1 (<! c2))))
(testing "complains about blocking channels"
(is (thrown-with-msg?
ExceptionInfo #"unblocking buffers"
(d/listen-chan! conn (a/chan 1)))))
))
(deftest-db test-transact-in-listener conn
(let [{tx0 :tx} (<? (d/<transact! conn test-schema))
;; So that we can see all transactions.
lc (a/chan (a/dropping-buffer 5))
;; A oneshot listener, to prevent infinite recursion.
ofl (atom false)
ol (fn [report]
(when (compare-and-set! ofl false true)
;; Asynchronously throw another transaction at the wall. This
;; upserts to the earlier one.
(d/<transact! conn [{:db/id (d/id-literal :db.part/user -1) :name "Ivan" :email "@1"}])))
]
(testing "that we can invoke <transact! from within a listener"
(d/listen-chan! conn lc)
(d/listen! conn ol)
;; Transact once to get started, and so that we can upsert against concrete ids.
(<? (d/<transact! conn [{:db/id 101 :name "Ivan"}]))
(is (= (+ 1 tx0) (:tx (<! lc))))
;; The listener should have kicked off another transaction, but we can't
;; wait for it explicitly. However, we can wait for the report to hit the
;; listening channel.
(let [r (<! lc)]
(is (= (+ 2 tx0) (:tx r)))
(is (= {-1 101}
(tempids r)))
(is (= nil (a/poll! lc)))))))
(deftest-db test-failing-transacts conn
(let [{tx0 :tx} (<? (d/<transact! conn test-schema))]
(testing "failing transact throws"
(is (thrown-with-msg?
ExceptionInfo #"expected :db.type/string"
(<? (d/<transact! conn [{:db/id (d/id-literal :db.part/user -1) :name 1}])))))
(testing "transaction after bad transaction is applied"
(<? (d/<transact! conn [{:db/id 101 :name "Petr"}]))
(is (= (<? (<datoms-after (d/db conn) tx0))
#{[101 :name "Petr"]})))))
;; We don't use deftest-db in order to be able to close the connection ourselves.
(deftest-async test-transact-after-close
(with-tempfile [t (tempfile)]
(let [conn (<? (d/<connect t))
{tx0 :tx} (<? (d/<transact! conn test-schema))]
(try
(testing "transaction before close is applied"
(<? (d/<transact! conn [{:db/id 101 :name "Petr"}]))
(is (= (<? (<datoms-after (d/db conn) tx0))
#{[101 :name "Petr"]})))
(finally
(<? (d/<close conn))))
(testing "transact after close throws"
(is (thrown-with-msg?
ExceptionInfo #"Connection is closed"
(<? (d/<transact! conn [{:db/id (d/id-literal :db.part/user -1) :name "Petr"}])))))
;; Closing a closed connection is a no-op.
(<? (d/<close conn)))))
;; We don't use deftest-db in order to be able to close the connection ourselves.
(deftest-async test-transact-queued-before-close
(with-tempfile [t (tempfile)]
(let [conn (<? (d/<connect t))
{tx0 :tx} (<? (d/<transact! conn test-schema))
n 100
make-t (fn [i]
(d/<transact! conn [{:db/id (d/id-literal :db.part/user -1)
:name "Petr"
:email (str "@" i)}]))]
(try
(testing "close while outstanding transactions are pending"
;; It's not really possible to ensure that at least one of the transactions is not
;; serviced before we close, so we just start "a lot" and wait for them all to resolve.
(let [ts (mapv make-t (range n))]
;; Give a little time for some to succeed, and then wait for one, non-deterministically.
(<! (a/timeout 10))
(a/alts! ts)
(<? (d/<close conn))
;; We should have some successes and some failures.
(let [ps (a/into []
(a/merge ;; pair-chan's never stop providing values; use take to force close.
(map (partial a/take 1) ts)))
rs (group-by (comp some? second) (<! ps))
xs (get rs false)
es (get rs true)
[v e] (first es)]
(is (> (count xs) 0))
(is (> (count es) 0))
(is (= {:error :transact/connection-closed} (ex-data e))))))
(finally
;; Closing a closed connection is a no-op.
(<? (d/<close conn)))))))
#_ (time (t/run-tests))

View file

@ -1,9 +1,14 @@
(ns datomish.util-test
#?(:cljs
(:require-macros
[cljs.core.async.macros :as a :refer [go go-loop]]))
(:require
[datomish.util :as util]
#?(:clj [clojure.test :as t :refer [is are deftest testing]])
#?(:cljs [cljs.test :as t :refer-macros [is are deftest testing]])
))
#?@(:clj [[clojure.test :as t :refer [is are deftest testing]]
[clojure.core.async :as a :refer [go go-loop <! >!]]])
#?@(:cljs [[cljs.test :as t :refer-macros [is are deftest testing]]
[cljs.core.async :as a :refer [<! >!]]])))
(deftest test-var-translation
(is (= :x (util/var->sql-var '?x)))
@ -35,3 +40,9 @@
(catch :default e e))]
(is (= "succeed" (aget caught "message")))
(is (= {:foo 1} (aget caught "data"))))))
(deftest test-unblocking-chan?
(is (util/unblocking-chan? (a/chan (a/dropping-buffer 10))))
(is (util/unblocking-chan? (a/chan (a/sliding-buffer 10))))
(is (util/unblocking-chan? (a/chan (util/unlimited-buffer))))
(is (not (util/unblocking-chan? (a/chan (a/buffer 10))))))