Review comment: fail pending transactions after closing connection.
This is pretty difficult to test robustly, but here's a stab at it.
This commit is contained in:
parent
f02d508370
commit
032bfafec2
2 changed files with 71 additions and 19 deletions
|
@ -50,12 +50,18 @@
|
||||||
(defrecord Connection [closed? current-db transact-chan]
|
(defrecord Connection [closed? current-db transact-chan]
|
||||||
IConnection
|
IConnection
|
||||||
(close [conn]
|
(close [conn]
|
||||||
(if (compare-and-set! (:closed? conn) false true)
|
(go-pair ;; Always want to return a pair-chan.
|
||||||
(do
|
(when (compare-and-set! (:closed? conn) false true)
|
||||||
;; This immediately stops <transact! enqueueing new work.
|
(let [result (a/chan 1)]
|
||||||
|
;; Ask for the underlying database to be closed while (usually, after) draining the queue.
|
||||||
|
;; Invariant: we see :sentinel-close in the transactor queue at most once.
|
||||||
|
(a/put! (:transact-chan conn) [:sentinel-close nil result true])
|
||||||
|
;; This immediately stops <transact! enqueueing new transactions.
|
||||||
(a/close! (:transact-chan conn))
|
(a/close! (:transact-chan conn))
|
||||||
(db/close-db @(:current-db conn)))
|
;; The transactor will close the underlying DB after draining the queue; by waiting for
|
||||||
(go [nil nil])))
|
;; result, we can raise any error from closing the DB and ensure that the DB is really
|
||||||
|
;; closed after waiting for the connection to close.
|
||||||
|
(<? result)))))
|
||||||
|
|
||||||
(db [conn] @(:current-db conn))
|
(db [conn] @(:current-db conn))
|
||||||
|
|
||||||
|
@ -583,7 +589,7 @@
|
||||||
{:pre [(conn? conn)]}
|
{:pre [(conn? conn)]}
|
||||||
;; Any race to put! is a real race between callers of <transact!. We can't just park on put!,
|
;; Any race to put! is a real race between callers of <transact!. We can't just park on put!,
|
||||||
;; because the parked putter that is woken is non-deterministic.
|
;; because the parked putter that is woken is non-deterministic.
|
||||||
(let [closed? (not (a/put! (:transact-chan conn) [tx-data result close?]))]
|
(let [closed? (not (a/put! (:transact-chan conn) [:sentinel-transact tx-data result close?]))]
|
||||||
(go-pair
|
(go-pair
|
||||||
;; We want to return a pair-chan, no matter what kind of channel result is.
|
;; We want to return a pair-chan, no matter what kind of channel result is.
|
||||||
(if closed?
|
(if closed?
|
||||||
|
@ -596,9 +602,19 @@
|
||||||
(>! token-chan (gensym "transactor-token"))
|
(>! token-chan (gensym "transactor-token"))
|
||||||
(loop []
|
(loop []
|
||||||
(when-let [token (<! token-chan)]
|
(when-let [token (<! token-chan)]
|
||||||
(when-let [[tx-data result close?] (<! (:transact-chan conn))]
|
(when-let [[sentinel tx-data result close?] (<! (:transact-chan conn))]
|
||||||
(let [pair
|
(let [pair
|
||||||
(<! (go-pair ;; Catch exceptions, return the pair.
|
(<! (go-pair ;; Catch exceptions, return the pair.
|
||||||
|
(case sentinel
|
||||||
|
:sentinel-close
|
||||||
|
;; Time to close the underlying DB.
|
||||||
|
(<? (db/close-db @(:current-db conn)))
|
||||||
|
|
||||||
|
;; Default: process the transaction.
|
||||||
|
(do
|
||||||
|
(when @(:closed? conn)
|
||||||
|
;; Drain enqueued transactions.
|
||||||
|
(raise "Connection is closed" {:error :transact/connection-closed}))
|
||||||
(let [db (db conn)
|
(let [db (db conn)
|
||||||
report (<? (db/in-transaction!
|
report (<? (db/in-transaction!
|
||||||
db
|
db
|
||||||
|
@ -609,7 +625,7 @@
|
||||||
;; so the transaction has committed.
|
;; so the transaction has committed.
|
||||||
(reset! (:current-db conn) (:db-after report))
|
(reset! (:current-db conn) (:db-after report))
|
||||||
(>! (:listener-source conn) report))
|
(>! (:listener-source conn) report))
|
||||||
report)))]
|
report)))))]
|
||||||
;; Even when report is nil (transaction not committed), pair is non-nil.
|
;; Even when report is nil (transaction not committed), pair is non-nil.
|
||||||
(>! result pair))
|
(>! result pair))
|
||||||
(>! token-chan token)
|
(>! token-chan token)
|
||||||
|
|
|
@ -242,4 +242,40 @@
|
||||||
;; Closing a closed connection is a no-op.
|
;; Closing a closed connection is a no-op.
|
||||||
(<? (d/<close conn)))))
|
(<? (d/<close conn)))))
|
||||||
|
|
||||||
|
;; We don't use deftest-db in order to be able to close the connection ourselves.
|
||||||
|
(deftest-async test-transact-queued-before-close
|
||||||
|
(with-tempfile [t (tempfile)]
|
||||||
|
(let [conn (<? (d/<connect t))
|
||||||
|
{tx0 :tx} (<? (d/<transact! conn test-schema))
|
||||||
|
|
||||||
|
n 100
|
||||||
|
make-t (fn [i]
|
||||||
|
(d/<transact! conn [{:db/id (d/id-literal :db.part/user -1)
|
||||||
|
:name "Petr"
|
||||||
|
:email (str "@" i)}]))]
|
||||||
|
(try
|
||||||
|
(testing "close while outstanding transactions are pending"
|
||||||
|
;; It's not really possible to ensure that at least one of the transactions is not
|
||||||
|
;; serviced before we close, so we just start "a lot" and wait for them all to resolve.
|
||||||
|
(let [ts (mapv make-t (range n))]
|
||||||
|
;; Give a little time for some to succeed, and then wait for one, non-deterministically.
|
||||||
|
(<! (a/timeout 10))
|
||||||
|
(a/alts! ts)
|
||||||
|
(<? (d/<close conn))
|
||||||
|
|
||||||
|
;; We should have some successes and some failures.
|
||||||
|
(let [ps (a/into []
|
||||||
|
(a/merge ;; pair-chan's never stop providing values; use take to force close.
|
||||||
|
(map (partial a/take 1) ts)))
|
||||||
|
rs (group-by (comp some? second) (<! ps))
|
||||||
|
xs (get rs false)
|
||||||
|
es (get rs true)
|
||||||
|
[v e] (first es)]
|
||||||
|
(is (> (count xs) 0))
|
||||||
|
(is (> (count es) 0))
|
||||||
|
(is (= {:error :transact/connection-closed} (ex-data e))))))
|
||||||
|
(finally
|
||||||
|
;; Closing a closed connection is a no-op.
|
||||||
|
(<? (d/<close conn)))))))
|
||||||
|
|
||||||
#_ (time (t/run-tests))
|
#_ (time (t/run-tests))
|
||||||
|
|
Loading…
Reference in a new issue