Review comment: return pair-chan; accept a result chan and close? flag.

This commit is contained in:
Nick Alexander 2016-10-13 12:00:49 -07:00
parent e5917406b4
commit cea0e3d60f
2 changed files with 15 additions and 11 deletions

View file

@ -570,13 +570,16 @@
"Submits a transaction to the database for writing. "Submits a transaction to the database for writing.
Returns a pair-chan resolving to `[result error]`." Returns a pair-chan resolving to `[result error]`."
[conn tx-data] ([conn tx-data]
{:pre [(conn? conn)]} (<transact! conn tx-data (a/chan 1) true))
(let [result (a/chan 1)] ([conn tx-data result close?]
;; Any race to put! is a real race between callers of <transact!. We can't just park on put!, {:pre [(conn? conn)]}
;; because the parked putter that is woken is non-deterministic. ;; Any race to put! is a real race between callers of <transact!. We can't just park on put!,
(a/put! (:transact-chan conn) [tx-data result]) ;; because the parked putter that is woken is non-deterministic.
result)) (a/put! (:transact-chan conn) [tx-data result close?])
(go-pair
;; We want to return a pair-chan, no matter what kind of channel result is.
(<? result))))
(defn- start-transactor [conn] (defn- start-transactor [conn]
(let [token-chan (a/chan 1)] (let [token-chan (a/chan 1)]
@ -584,7 +587,7 @@
(>! token-chan (gensym "transactor-token")) (>! token-chan (gensym "transactor-token"))
(loop [] (loop []
(let [token (<! token-chan)] (let [token (<! token-chan)]
(when-let [[tx-data result] (<! (:transact-chan conn))] (when-let [[tx-data result close?] (<! (:transact-chan conn))]
(let [pair (let [pair
(<! (go-pair ;; Catch exceptions, return the pair. (<! (go-pair ;; Catch exceptions, return the pair.
(let [db (db conn) (let [db (db conn)
@ -596,8 +599,9 @@
(>! (:listener-source conn) report) (>! (:listener-source conn) report)
report)))] report)))]
(>! result pair)) (>! result pair))
(a/close! result)
(>! token-chan token) (>! token-chan token)
(when close?
(a/close! result))
(recur))))))) (recur)))))))
(defn listen-chan! (defn listen-chan!

View file

@ -98,8 +98,8 @@
;; Wait for all transactions to complete. ;; Wait for all transactions to complete.
(<! (a/into [] (<! (a/into []
(a/merge (a/merge ;; pair-chan's never stop providing values; use take to force close.
(map make-t (range n))))) (map #(a/take 1 (make-t %)) (range n)))))
;; Transactions should be processed in order. This is an awkward way to ;; Transactions should be processed in order. This is an awkward way to
;; express the expected data, but it's robust in the face of changing default ;; express the expected data, but it's robust in the face of changing default