diff --git a/src/common/datomish/api.cljc b/src/common/datomish/api.cljc index 567335e0..2e012f84 100644 --- a/src/common/datomish/api.cljc +++ b/src/common/datomish/api.cljc @@ -26,6 +26,10 @@ (def db - query-context - (query/options-into-context limit order-by) - (query/find-into-context parsed)) + query-context + (query/options-into-context limit order-by) + (query/find-into-context parsed)) ;; We turn each row into either an array of values or an unadorned ;; value. The row-pair-transducer does this work. diff --git a/src/common/datomish/transact.cljc b/src/common/datomish/transact.cljc index f4ef7e10..81daefc8 100644 --- a/src/common/datomish/transact.cljc +++ b/src/common/datomish/transact.cljc @@ -6,7 +6,7 @@ #?(:cljs (:require-macros [datomish.pair-chan :refer [go-pair !]]]) + [clojure.core.async :as a :refer [chan go go-loop !]]]) #?@(:cljs [[datomish.pair-chan] [cljs.core.async :as a :refer [chan !]]])) #?(:clj @@ -35,7 +35,9 @@ (defprotocol IConnection (close [conn] - "Close this connection. Returns a pair channel of [nil error].") + "Close this connection. Returns a pair channel of [nil error]. + + Closing a closed connection is a no-op.") (db [conn] @@ -45,9 +47,21 @@ [conn] "Get the full transaction history DB associated with this connection.")) -(defrecord Connection [current-db] +(defrecord Connection [closed? current-db transact-chan] IConnection - (close [conn] (db/close-db @(:current-db conn))) + (close [conn] + (go-pair ;; Always want to return a pair-chan. + (when (compare-and-set! (:closed? conn) false true) + (let [result (a/chan 1)] + ;; Ask for the underlying database to be closed while (usually, after) draining the queue. + ;; Invariant: we see :sentinel-close in the transactor queue at most once. + (a/put! (:transact-chan conn) [:sentinel-close nil result true]) + ;; This immediately stops Connection {:current-db (atom db)})) + ;; Puts to listener-source may park if listener-mult can't distribute them fast enough. Since the + ;; underlying taps are asserted to be be unblocking, the parking time should be very short. + (let [listener-source + (a/chan 1) -;; ;; TODO: persist max-tx and max-eid in SQLite. + listener-mult + (a/mult listener-source) ;; Just for tapping. + + connection + (map->Connection {:closed? (atom false) + :current-db (atom db) + :listener-source listener-source + :listener-mult listener-mult + :transact-chan (a/chan (util/unlimited-buffer)) + })] + (start-transactor connection) + connection)) (defn maybe-datom->entity [entity] (cond @@ -552,12 +580,99 @@ (:db-after (! token-chan (gensym "transactor-token")) + (loop [] + (when-let [token ( (! (:listener-source conn) report)) + report)))))] + ;; Even when report is nil (transaction not committed), pair is non-nil. + (>! result pair)) + (>! token-chan token) + (when close? + (a/close! result)) + (recur))))))) + +(defn listen-chan! + "Put reports successfully transacted against the given connection onto the given channel. + + The listener sink channel must be unblocking. + + Returns the channel listened to, for future unlistening." + [conn listener-sink] {:pre [(conn? conn)]} - (let [db (db conn)] ;; TODO: be careful with swapping atoms. - (db/in-transaction! - db - #(go-pair - (let [report (!]] + [clojure.core.async.impl.protocols]]) + #?@(:cljs [[cljs.core.async :as a :refer [!]] + [cljs.core.async.impl.protocols]]))) #?(:clj (defmacro raise-str @@ -101,3 +108,49 @@ (defn mapvals [f m] (into (empty m) (map #(vector (first %) (f (second %))) m))) + +(defn unblocking-chan? + "Returns true if the channel will never block. That is to say, puts + into this channel will never cause the buffer to be full." + [chan] + (a/unblocking-buffer? + ;; See http://dev.clojure.org/jira/browse/ASYNC-181. + (#?(:cljs .-buf :clj .buf) chan))) + +;; Modified from http://dev.clojure.org/jira/browse/ASYNC-23. +#?(:cljs + (deftype UnlimitedBuffer [buf] + cljs.core.async.impl.protocols/UnblockingBuffer + + cljs.core.async.impl.protocols/Buffer + (full? [this] + false) + (remove! [this] + (.pop buf)) + (add!* [this itm] + (.unshift buf itm)) + (close-buf! [this]) + + cljs.core/ICounted + (-count [this] + (.-length buf)))) + +#?(:clj + (deftype UnlimitedBuffer [^java.util.LinkedList buf] + clojure.core.async.impl.protocols/UnblockingBuffer + + clojure.core.async.impl.protocols/Buffer + (full? [this] + false) + (remove! [this] + (.removeLast buf)) + (add!* [this itm] + (.addFirst buf itm)) + (close-buf! [this]) + + clojure.lang.Counted + (count [this] + (.size buf)))) + +(defn unlimited-buffer [] + (UnlimitedBuffer. #?(:cljs (array) :clj (java.util.LinkedList.)))) diff --git a/test/datomish/test.cljs b/test/datomish/test.cljs index 92eb9430..ae43bf10 100644 --- a/test/datomish/test.cljs +++ b/test/datomish/test.cljs @@ -9,6 +9,7 @@ datomish.schema-test datomish.sqlite-user-version-test datomish.tofinoish-test + datomish.transact-test datomish.util-test datomish.test.transforms datomish.test.query @@ -23,6 +24,7 @@ 'datomish.schema-test 'datomish.sqlite-user-version-test 'datomish.tofinoish-test + 'datomish.transact-test 'datomish.util-test 'datomish.test.transforms 'datomish.test.query diff --git a/test/datomish/transact_test.cljc b/test/datomish/transact_test.cljc new file mode 100644 index 00000000..732e1a7c --- /dev/null +++ b/test/datomish/transact_test.cljc @@ -0,0 +1,281 @@ +;; This Source Code Form is subject to the terms of the Mozilla Public +;; License, v. 2.0. If a copy of the MPL was not distributed with this +;; file, You can obtain one at http://mozilla.org/MPL/2.0/. + +(ns datomish.transact-test + #?(:cljs + (:require-macros + [datomish.pair-chan :refer [go-pair = !]]]) + #?@(:cljs [[datomish.js-sqlite] + [datomish.pair-chan] + [datomish.test-macros :refer-macros [deftest-async deftest-db]] + [datomish.node-tempfile :refer [tempfile]] + [cljs.test :as t :refer-macros [is are deftest testing async]] + [cljs.core.async :as a :refer [!]]])) + #?(:clj + (:import [clojure.lang ExceptionInfo])) + #?(:clj + (:import [datascript.db DB]))) + +#?(:cljs + (def Throwable js/Error)) + +(defn- tempids [tx] + (into {} (map (juxt (comp :idx first) second) (:tempids tx)))) + +(def test-schema + [{:db/id (d/id-literal :db.part/user) + :db/ident :x + :db/unique :db.unique/identity + :db/valueType :db.type/long + :db.install/_attribute :db.part/db} + {:db/id (d/id-literal :db.part/user) + :db/ident :name + :db/unique :db.unique/identity + :db/valueType :db.type/string + :db.install/_attribute :db.part/db} + {:db/id (d/id-literal :db.part/user) + :db/ident :y + :db/cardinality :db.cardinality/many + :db/valueType :db.type/long + :db.install/_attribute :db.part/db} + {:db/id (d/id-literal :db.part/user) + :db/ident :aka + :db/cardinality :db.cardinality/many + :db/valueType :db.type/string + :db.install/_attribute :db.part/db} + {:db/id (d/id-literal :db.part/user) + :db/ident :age + :db/valueType :db.type/long + :db.install/_attribute :db.part/db} + {:db/id (d/id-literal :db.part/user) + :db/ident :email + :db/unique :db.unique/identity + :db/valueType :db.type/string + :db.install/_attribute :db.part/db} + {:db/id (d/id-literal :db.part/user) + :db/ident :spouse + :db/unique :db.unique/value + :db/valueType :db.type/string + :db.install/_attribute :db.part/db} + {:db/id (d/id-literal :db.part/user) + :db/ident :friends + :db/cardinality :db.cardinality/many + :db/valueType :db.type/ref + :db.install/_attribute :db.part/db} + ]) + +(deftest-db test-overlapping-transacts conn + (let [{tx0 :tx} ( [[id0 :email (str "@" %) (+ 3 % tx0) 0] + [id0 :email (str "@" (inc %)) (+ 3 % tx0) 1]]) + (range 0 (dec n)))) + + (filter #(not= :db/txInstant (second %)) (! c1 :token-1) + (is (= :token-1 (! c2 :token-1) + (is (= :token-1 ( (count xs) 0)) + (is (> (count es) 0)) + (is (= {:error :transact/connection-closed} (ex-data e)))))) + (finally + ;; Closing a closed connection is a no-op. + (!]]]) + + #?@(:cljs [[cljs.test :as t :refer-macros [is are deftest testing]] + [cljs.core.async :as a :refer [!]]]))) (deftest test-var-translation (is (= :x (util/var->sql-var '?x))) @@ -35,3 +40,9 @@ (catch :default e e))] (is (= "succeed" (aget caught "message"))) (is (= {:foo 1} (aget caught "data")))))) + +(deftest test-unblocking-chan? + (is (util/unblocking-chan? (a/chan (a/dropping-buffer 10)))) + (is (util/unblocking-chan? (a/chan (a/sliding-buffer 10)))) + (is (util/unblocking-chan? (a/chan (util/unlimited-buffer)))) + (is (not (util/unblocking-chan? (a/chan (a/buffer 10))))))