Add {un}listen{-chan}! to connection. (#61)

This commit is contained in:
Nick Alexander 2016-10-11 20:29:43 -07:00
parent a8ad79d0e6
commit e5917406b4
3 changed files with 152 additions and 1 deletions

View file

@ -26,6 +26,10 @@
(def <transact! transact/<transact!) (def <transact! transact/<transact!)
(def listen! transact/listen!)
(def listen-chan! transact/listen-chan!)
(def unlisten-chan! transact/unlisten-chan!)
;; TODO: use Potemkin, or a subset of Potemkin that is CLJS friendly (like ;; TODO: use Potemkin, or a subset of Potemkin that is CLJS friendly (like
;; https://github.com/ztellman/potemkin/issues/31) to improve this re-exporting process. ;; https://github.com/ztellman/potemkin/issues/31) to improve this re-exporting process.
(def <close transact/close) (def <close transact/close)

View file

@ -103,8 +103,18 @@
(declare start-transactor) (declare start-transactor)
(defn connection-with-db [db] (defn connection-with-db [db]
(let [connection ;; Puts to listener-source may park if listener-mult can't distribute them fast enough. Since the
;; underlying taps are asserted to be be unblocking, the parking time should be very short.
(let [listener-source
(a/chan 1)
listener-mult
(a/mult listener-source) ;; Just for tapping.
connection
(map->Connection {:current-db (atom db) (map->Connection {:current-db (atom db)
:listener-source listener-source
:listener-mult listener-mult
:transact-chan (a/chan (util/unlimited-buffer)) :transact-chan (a/chan (util/unlimited-buffer))
})] })]
(start-transactor connection) (start-transactor connection)
@ -583,8 +593,49 @@
#(-> (<with db tx-data))))] #(-> (<with db tx-data))))]
;; We only get here if the transaction is committed. ;; We only get here if the transaction is committed.
(reset! (:current-db conn) (:db-after report)) (reset! (:current-db conn) (:db-after report))
(>! (:listener-source conn) report)
report)))] report)))]
(>! result pair)) (>! result pair))
(a/close! result) (a/close! result)
(>! token-chan token) (>! token-chan token)
(recur))))))) (recur)))))))
(defn listen-chan!
"Put reports successfully transacted against the given connection onto the given channel.
The listener sink channel must be unblocking.
Returns the channel listened to, for future unlistening."
[conn listener-sink]
{:pre [(conn? conn)]}
(when-not (util/unblocking-chan? listener-sink)
(raise "Listener sinks must be channels backed by unblocking buffers"
{:error :transact/bad-listener :listener-sink listener-sink}))
;; Tapping an already registered sink is a no-op.
(a/tap (:listener-mult conn) listener-sink)
listener-sink)
(defn- -listen-chan [f]
(let [c (a/chan (a/sliding-buffer 10))]
(go-loop []
(when-let [v (<! c)]
(do
(f v)
(recur))))
c))
(defn listen!
"Evaluate the given function with reports successfully transacted against the given connection.
`f` should be a function of one argument, the transaction report.
Returns the channel listened to, for future calls to `unlisten-chan!`."
([conn f]
{:pre [(fn? f)]}
(listen-chan! conn (-listen-chan f))))
(defn unlisten-chan! [conn listener-sink]
"Stop putting reports successfully transacted against the given connection onto the given channel."
{:pre [(conn? conn)]}
;; Untapping an un-registered sink is a no-op.
(a/untap (:listener-mult conn) listener-sink))

View file

@ -113,4 +113,100 @@
(filter #(not= :db/txInstant (second %)) (<? (<transactions-after (d/db conn) tx0))))))) (filter #(not= :db/txInstant (second %)) (<? (<transactions-after (d/db conn) tx0)))))))
(deftest-db test-listeners conn
(let [{tx0 :tx} (<? (d/<transact! conn test-schema))
c1 (a/chan (a/dropping-buffer 5))
c2 (a/chan (a/dropping-buffer 5))]
(testing "no listeners is okay"
;; So that we can upsert to concrete entids.
(<? (d/<transact! conn [[:db/add 101 :name "Ivan"]
[:db/add 102 :name "Petr"]])))
(testing "listeners are added, not accidentally notified of events before they were added"
(d/listen-chan! conn c1)
(d/listen-chan! conn c2)
;; This is not authoritative, because in an error situation a report may
;; be put! to a listener tap outside the expected flow. We should witness
;; such an occurrence later in the test.
(is (= nil (a/poll! c1)))
(is (= nil (a/poll! c2))))
(testing "unlistening to unrecognized key is ignored"
(d/unlisten-chan! conn (a/chan)))
(testing "listeners observe reports"
(<? (d/<transact! conn [[:db/add (d/id-literal :db.part/user -1) :name "Ivan"]]))
(is (= {-1 101}
(tempids (<! c1))))
(is (= {-1 101}
(tempids (<! c2))))
;; Again, not authoritative.
(is (= nil (a/poll! c1)))
(is (= nil (a/poll! c2))))
(testing "unlisten removes correct listener"
(d/unlisten-chan! conn c1)
(<? (d/<transact! conn [[:db/add (d/id-literal :db.part/user -2) :name "Petr"]]))
(is (= {-2 102}
(tempids (<! c2))))
;; Again, not authoritative.
(is (= nil (a/poll! c1))))
(testing "returning to no listeners is okay"
(d/unlisten-chan! conn c2)
(<? (d/<transact! conn [[:db/add (d/id-literal :db.part/user -1) :name "Petr"]]))
;; Again, not authoritative.
(is (= nil (a/poll! c1)))
(is (= nil (a/poll! c2)))
;; This should be authoritative, however. We should be able to put! due
;; to the size of the buffer, and we should take! what we put!.
(>! c1 :token-1)
(is (= :token-1 (<! c1)))
(>! c2 :token-1)
(is (= :token-1 (<! c2))))
(testing "complains about blocking channels"
(is (thrown-with-msg?
ExceptionInfo #"unblocking buffers"
(d/listen-chan! conn (a/chan 1)))))
))
(deftest-db test-transact-in-listener conn
(let [{tx0 :tx} (<? (d/<transact! conn test-schema))
;; So that we can see all transactions.
lc (a/chan (a/dropping-buffer 5))
;; A oneshot listener, to prevent infinite recursion.
ofl (atom false)
ol (fn [report]
(when (compare-and-set! ofl false true)
;; Asynchronously throw another transaction at the wall. This
;; upserts to the earlier one.
(d/<transact! conn [{:db/id (d/id-literal :db.part/user -1) :name "Ivan" :email "@1"}])))
]
(testing "that we can invoke <transact! from within a listener"
(d/listen-chan! conn lc)
(d/listen! conn ol)
;; Transact once to get started, and so that we can upsert against concrete ids.
(<? (d/<transact! conn [{:db/id 101 :name "Ivan"}]))
(is (= (+ 1 tx0) (:tx (<! lc))))
;; The listener should have kicked off another transaction, but we can't
;; wait for it explicitly. However, we can wait for the report to hit the
;; listening channel.
(let [r (<! lc)]
(is (= (+ 2 tx0) (:tx r)))
(is (= {-1 101}
(tempids r)))
(is (= nil (a/poll! lc)))))))
#_ (time (t/run-tests)) #_ (time (t/run-tests))