Pre: Add unlimited-buffer and unblocking-chan?.
This commit is contained in:
parent
e1b1abe2de
commit
2081ca4563
2 changed files with 69 additions and 5 deletions
|
@ -3,9 +3,16 @@
|
||||||
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
||||||
|
|
||||||
(ns datomish.util
|
(ns datomish.util
|
||||||
#?(:cljs (:require-macros datomish.util))
|
#?(:cljs
|
||||||
|
(:require-macros
|
||||||
|
[datomish.util]
|
||||||
|
[cljs.core.async.macros :refer [go go-loop]]))
|
||||||
(:require
|
(:require
|
||||||
[clojure.string :as str]))
|
[clojure.string :as str]
|
||||||
|
#?@(:clj [[clojure.core.async :as a :refer [go go-loop <! >!]]
|
||||||
|
[clojure.core.async.impl.protocols]])
|
||||||
|
#?@(:cljs [[cljs.core.async :as a :refer [<! >!]]
|
||||||
|
[cljs.core.async.impl.protocols]])))
|
||||||
|
|
||||||
#?(:clj
|
#?(:clj
|
||||||
(defmacro raise-str
|
(defmacro raise-str
|
||||||
|
@ -101,3 +108,49 @@
|
||||||
|
|
||||||
(defn mapvals [f m]
|
(defn mapvals [f m]
|
||||||
(into (empty m) (map #(vector (first %) (f (second %))) m)))
|
(into (empty m) (map #(vector (first %) (f (second %))) m)))
|
||||||
|
|
||||||
|
(defn unblocking-chan?
|
||||||
|
"Returns true if the channel will never block. That is to say, puts
|
||||||
|
into this channel will never cause the buffer to be full."
|
||||||
|
[chan]
|
||||||
|
(a/unblocking-buffer?
|
||||||
|
;; See http://dev.clojure.org/jira/browse/ASYNC-181.
|
||||||
|
(#?(:cljs .-buf :clj .buf) chan)))
|
||||||
|
|
||||||
|
;; Modified from http://dev.clojure.org/jira/browse/ASYNC-23.
|
||||||
|
#?(:cljs
|
||||||
|
(deftype UnlimitedBuffer [buf]
|
||||||
|
cljs.core.async.impl.protocols/UnblockingBuffer
|
||||||
|
|
||||||
|
cljs.core.async.impl.protocols/Buffer
|
||||||
|
(full? [this]
|
||||||
|
false)
|
||||||
|
(remove! [this]
|
||||||
|
(.pop buf))
|
||||||
|
(add!* [this itm]
|
||||||
|
(.unshift buf itm))
|
||||||
|
(close-buf! [this])
|
||||||
|
|
||||||
|
cljs.core/ICounted
|
||||||
|
(-count [this]
|
||||||
|
(.-length buf))))
|
||||||
|
|
||||||
|
#?(:clj
|
||||||
|
(deftype UnlimitedBuffer [^java.util.LinkedList buf]
|
||||||
|
clojure.core.async.impl.protocols/UnblockingBuffer
|
||||||
|
|
||||||
|
clojure.core.async.impl.protocols/Buffer
|
||||||
|
(full? [this]
|
||||||
|
false)
|
||||||
|
(remove! [this]
|
||||||
|
(.removeLast buf))
|
||||||
|
(add!* [this itm]
|
||||||
|
(.addFirst buf itm))
|
||||||
|
(close-buf! [this])
|
||||||
|
|
||||||
|
clojure.lang.Counted
|
||||||
|
(count [this]
|
||||||
|
(.size buf))))
|
||||||
|
|
||||||
|
(defn unlimited-buffer []
|
||||||
|
(UnlimitedBuffer. #?(:cljs (array) :clj (java.util.LinkedList.))))
|
||||||
|
|
|
@ -1,9 +1,14 @@
|
||||||
(ns datomish.util-test
|
(ns datomish.util-test
|
||||||
|
#?(:cljs
|
||||||
|
(:require-macros
|
||||||
|
[cljs.core.async.macros :as a :refer [go go-loop]]))
|
||||||
(:require
|
(:require
|
||||||
[datomish.util :as util]
|
[datomish.util :as util]
|
||||||
#?(:clj [clojure.test :as t :refer [is are deftest testing]])
|
#?@(:clj [[clojure.test :as t :refer [is are deftest testing]]
|
||||||
#?(:cljs [cljs.test :as t :refer-macros [is are deftest testing]])
|
[clojure.core.async :as a :refer [go go-loop <! >!]]])
|
||||||
))
|
|
||||||
|
#?@(:cljs [[cljs.test :as t :refer-macros [is are deftest testing]]
|
||||||
|
[cljs.core.async :as a :refer [<! >!]]])))
|
||||||
|
|
||||||
(deftest test-var-translation
|
(deftest test-var-translation
|
||||||
(is (= :x (util/var->sql-var '?x)))
|
(is (= :x (util/var->sql-var '?x)))
|
||||||
|
@ -35,3 +40,9 @@
|
||||||
(catch :default e e))]
|
(catch :default e e))]
|
||||||
(is (= "succeed" (aget caught "message")))
|
(is (= "succeed" (aget caught "message")))
|
||||||
(is (= {:foo 1} (aget caught "data"))))))
|
(is (= {:foo 1} (aget caught "data"))))))
|
||||||
|
|
||||||
|
(deftest test-unblocking-chan?
|
||||||
|
(is (util/unblocking-chan? (a/chan (a/dropping-buffer 10))))
|
||||||
|
(is (util/unblocking-chan? (a/chan (a/sliding-buffer 10))))
|
||||||
|
(is (util/unblocking-chan? (a/chan (util/unlimited-buffer))))
|
||||||
|
(is (not (util/unblocking-chan? (a/chan (a/buffer 10))))))
|
||||||
|
|
Loading…
Reference in a new issue