Allow callers to run functions within the scope of a transaction.
This generalizes the transactor loop to allow callers to run an arbitrary function within an `in-transaction!` body. Combined with exposing `<report-transact-tx-data!`, this allows an admittedly sophisticated consumer to conditionally query and transact in a consistent way -- for example, cleaning up inconsistent data then transacting a new schema version.
This commit is contained in:
parent
bd0a56e501
commit
3212be565c
1 changed files with 52 additions and 28 deletions
|
@ -791,14 +791,14 @@
|
||||||
(-> report
|
(-> report
|
||||||
(assoc-in [:db-after] db-after)))))
|
(assoc-in [:db-after] db-after)))))
|
||||||
|
|
||||||
(defn- <with [db tx-data]
|
(defn <transact-tx-data-in-transaction! [db tx-data]
|
||||||
(let [fail-touch-attr (fn [old new] (raise "Altering schema attributes is not yet supported, got " new " altering existing schema attribute " old
|
(let [fail-touch-attr (fn [old new] (raise "Altering schema attributes is not yet supported, got " new " altering existing schema attribute " old
|
||||||
{:error :schema/alter-schema :old old :new new}))]
|
{:error :schema/alter-schema :old old :new new}))]
|
||||||
(<with-internal db tx-data fail-touch-attr)))
|
(<with-internal db tx-data fail-touch-attr)))
|
||||||
|
|
||||||
(defn <db-with [db tx-data]
|
(defn <db-transact-tx-data! [db tx-data]
|
||||||
(go-pair
|
(go-pair
|
||||||
(:db-after (<? (<with db tx-data)))))
|
(:db-after (<? (<transact-tx-data-in-transaction! db tx-data)))))
|
||||||
|
|
||||||
(defn <transact!
|
(defn <transact!
|
||||||
"Submits a transaction to the database for writing.
|
"Submits a transaction to the database for writing.
|
||||||
|
@ -806,11 +806,15 @@
|
||||||
Returns a pair-chan resolving to `[result error]`."
|
Returns a pair-chan resolving to `[result error]`."
|
||||||
([conn tx-data]
|
([conn tx-data]
|
||||||
(<transact! conn tx-data (a/chan 1) true))
|
(<transact! conn tx-data (a/chan 1) true))
|
||||||
([conn tx-data result close?]
|
([conn tx-data-or-fn result close?]
|
||||||
{:pre [(conn? conn)]}
|
{:pre [(conn? conn)]}
|
||||||
;; Any race to put! is a real race between callers of <transact!. We can't just park on put!,
|
;; Any race to put! is a real race between callers of <transact!. We can't just park on put!,
|
||||||
;; because the parked putter that is woken is non-deterministic.
|
;; because the parked putter that is woken is non-deterministic.
|
||||||
(let [closed? (not (a/put! (:transact-chan conn) [:sentinel-transact tx-data result close?]))]
|
(let [op (if (fn? tx-data-or-fn)
|
||||||
|
:sentinel-fn
|
||||||
|
:sentinel-transact)
|
||||||
|
closed? (not (a/put! (:transact-chan conn)
|
||||||
|
[op tx-data-or-fn result close?]))]
|
||||||
(go-pair
|
(go-pair
|
||||||
;; We want to return a pair-chan, no matter what kind of channel result is.
|
;; We want to return a pair-chan, no matter what kind of channel result is.
|
||||||
(if closed?
|
(if closed?
|
||||||
|
@ -823,30 +827,50 @@
|
||||||
(>! token-chan (gensym "transactor-token"))
|
(>! token-chan (gensym "transactor-token"))
|
||||||
(loop []
|
(loop []
|
||||||
(when-let [token (<! token-chan)]
|
(when-let [token (<! token-chan)]
|
||||||
(when-let [[sentinel tx-data result close?] (<! (:transact-chan conn))]
|
(when-let [[sentinel tx-data-or-fn result close?] (<! (:transact-chan conn))]
|
||||||
(let [pair
|
(let
|
||||||
(<! (go-pair ;; Catch exceptions, return the pair.
|
[pair
|
||||||
(case sentinel
|
(<!
|
||||||
:sentinel-close
|
(go-pair ;; Catch exceptions, return the pair.
|
||||||
;; Time to close the underlying DB.
|
(case sentinel
|
||||||
(<? (db/close-db @(:current-db conn)))
|
:sentinel-close
|
||||||
|
;; Time to close the underlying DB.
|
||||||
|
(<? (db/close-db @(:current-db conn)))
|
||||||
|
|
||||||
;; Default: process the transaction.
|
;; Default: process the transaction.
|
||||||
(do
|
(do
|
||||||
(when @(:closed? conn)
|
(when @(:closed? conn)
|
||||||
;; Drain enqueued transactions.
|
;; Drain enqueued transactions.
|
||||||
(raise "Connection is closed" {:error :transact/connection-closed}))
|
(raise "Connection is closed" {:error :transact/connection-closed}))
|
||||||
(let [db (db conn)
|
|
||||||
report (<? (db/in-transaction!
|
(let [db (db conn)
|
||||||
db
|
in-transaction-fn
|
||||||
#(-> (<with db tx-data))))]
|
(case sentinel
|
||||||
(when report
|
:sentinel-fn
|
||||||
;; <with returns non-nil or throws, but we still check report just in
|
;; This is a function that we'd like to run
|
||||||
;; case. Here, in-transaction! function completed and returned non-nil,
|
;; within a database transaction. See
|
||||||
;; so the transaction has committed.
|
;; db/in-transaction! for details.
|
||||||
(reset! (:current-db conn) (:db-after report))
|
;; The function is invoked with two arguments:
|
||||||
(>! (:listener-source conn) report))
|
;; the db and a function that takes (db,
|
||||||
report)))))]
|
;; tx-data) and transacts it to return a
|
||||||
|
;; TxReport.
|
||||||
|
;; The function must return a TxReport.
|
||||||
|
;; The function must not itself call
|
||||||
|
;; `in-transaction!` or `<transact!`.
|
||||||
|
(partial tx-data-or-fn db <transact-tx-data-in-transaction!)
|
||||||
|
|
||||||
|
:sentinel-transact
|
||||||
|
;; This is data. Apply it with `<transact-tx-data-in-transaction!`.
|
||||||
|
(partial <transact-tx-data-in-transaction! db tx-data-or-fn))
|
||||||
|
|
||||||
|
report (<? (db/in-transaction! db in-transaction-fn))]
|
||||||
|
(when report
|
||||||
|
;; <r-t-t-d! returns non-nil or throws, but we still check report just in
|
||||||
|
;; case. Here, in-transaction! function completed and returned non-nil,
|
||||||
|
;; so the transaction has committed.
|
||||||
|
(reset! (:current-db conn) (:db-after report))
|
||||||
|
(>! (:listener-source conn) report))
|
||||||
|
report)))))]
|
||||||
;; Even when report is nil (transaction not committed), pair is non-nil.
|
;; Even when report is nil (transaction not committed), pair is non-nil.
|
||||||
(>! result pair))
|
(>! result pair))
|
||||||
(>! token-chan token)
|
(>! token-chan token)
|
||||||
|
|
Loading…
Reference in a new issue