From e1b1abe2dea1787ebca86f48636e90c917bc8ba6 Mon Sep 17 00:00:00 2001 From: Nick Alexander Date: Tue, 11 Oct 2016 20:21:48 -0700 Subject: [PATCH 1/9] Pre: clarify comments. --- src/common/datomish/db.cljc | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/src/common/datomish/db.cljc b/src/common/datomish/db.cljc index 3fe7107d..f0d9c0d5 100644 --- a/src/common/datomish/db.cljc +++ b/src/common/datomish/db.cljc @@ -111,9 +111,12 @@ (in-transaction! [db chan-fn] - "Evaluate the given pair-chan `chan-fn` in an exclusive transaction. If it returns non-nil, - commit the transaction; otherwise, rollback the transaction. Returns a pair-chan resolving to - the pair-chan returned by `chan-fn`.") + "Evaluate the given `chan-fn` in an exclusive transaction. If it returns non-nil, + commit the transaction; otherwise, rollback the transaction. + + `chan-fn` should be a function of no arguments returning a pair-chan. + + Returns a pair-chan resolving to the same pair as the pair-chan returned by `chan-fn`.") ( db - query-context - (query/options-into-context limit order-by) - (query/find-into-context parsed)) + query-context + (query/options-into-context limit order-by) + (query/find-into-context parsed)) ;; We turn each row into either an array of values or an unadorned ;; value. The row-pair-transducer does this work. From 2081ca4563fb990adc4924adb48c66a16bb8e998 Mon Sep 17 00:00:00 2001 From: Nick Alexander Date: Tue, 11 Oct 2016 13:12:27 -0700 Subject: [PATCH 2/9] Pre: Add unlimited-buffer and unblocking-chan?. --- src/common/datomish/util.cljc | 57 +++++++++++++++++++++++++++++++++-- test/datomish/util_test.cljc | 17 +++++++++-- 2 files changed, 69 insertions(+), 5 deletions(-) diff --git a/src/common/datomish/util.cljc b/src/common/datomish/util.cljc index e279be2e..13a9ee09 100644 --- a/src/common/datomish/util.cljc +++ b/src/common/datomish/util.cljc @@ -3,9 +3,16 @@ ;; file, You can obtain one at http://mozilla.org/MPL/2.0/. (ns datomish.util - #?(:cljs (:require-macros datomish.util)) + #?(:cljs + (:require-macros + [datomish.util] + [cljs.core.async.macros :refer [go go-loop]])) (:require - [clojure.string :as str])) + [clojure.string :as str] + #?@(:clj [[clojure.core.async :as a :refer [go go-loop !]] + [clojure.core.async.impl.protocols]]) + #?@(:cljs [[cljs.core.async :as a :refer [!]] + [cljs.core.async.impl.protocols]]))) #?(:clj (defmacro raise-str @@ -101,3 +108,49 @@ (defn mapvals [f m] (into (empty m) (map #(vector (first %) (f (second %))) m))) + +(defn unblocking-chan? + "Returns true if the channel will never block. That is to say, puts + into this channel will never cause the buffer to be full." + [chan] + (a/unblocking-buffer? + ;; See http://dev.clojure.org/jira/browse/ASYNC-181. + (#?(:cljs .-buf :clj .buf) chan))) + +;; Modified from http://dev.clojure.org/jira/browse/ASYNC-23. +#?(:cljs + (deftype UnlimitedBuffer [buf] + cljs.core.async.impl.protocols/UnblockingBuffer + + cljs.core.async.impl.protocols/Buffer + (full? [this] + false) + (remove! [this] + (.pop buf)) + (add!* [this itm] + (.unshift buf itm)) + (close-buf! [this]) + + cljs.core/ICounted + (-count [this] + (.-length buf)))) + +#?(:clj + (deftype UnlimitedBuffer [^java.util.LinkedList buf] + clojure.core.async.impl.protocols/UnblockingBuffer + + clojure.core.async.impl.protocols/Buffer + (full? [this] + false) + (remove! [this] + (.removeLast buf)) + (add!* [this itm] + (.addFirst buf itm)) + (close-buf! [this]) + + clojure.lang.Counted + (count [this] + (.size buf)))) + +(defn unlimited-buffer [] + (UnlimitedBuffer. #?(:cljs (array) :clj (java.util.LinkedList.)))) diff --git a/test/datomish/util_test.cljc b/test/datomish/util_test.cljc index 898b49f4..87f25ce1 100644 --- a/test/datomish/util_test.cljc +++ b/test/datomish/util_test.cljc @@ -1,9 +1,14 @@ (ns datomish.util-test + #?(:cljs + (:require-macros + [cljs.core.async.macros :as a :refer [go go-loop]])) (:require [datomish.util :as util] - #?(:clj [clojure.test :as t :refer [is are deftest testing]]) - #?(:cljs [cljs.test :as t :refer-macros [is are deftest testing]]) - )) + #?@(:clj [[clojure.test :as t :refer [is are deftest testing]] + [clojure.core.async :as a :refer [go go-loop !]]]) + + #?@(:cljs [[cljs.test :as t :refer-macros [is are deftest testing]] + [cljs.core.async :as a :refer [!]]]))) (deftest test-var-translation (is (= :x (util/var->sql-var '?x))) @@ -35,3 +40,9 @@ (catch :default e e))] (is (= "succeed" (aget caught "message"))) (is (= {:foo 1} (aget caught "data")))))) + +(deftest test-unblocking-chan? + (is (util/unblocking-chan? (a/chan (a/dropping-buffer 10)))) + (is (util/unblocking-chan? (a/chan (a/sliding-buffer 10)))) + (is (util/unblocking-chan? (a/chan (util/unlimited-buffer)))) + (is (not (util/unblocking-chan? (a/chan (a/buffer 10)))))) From a8ad79d0e6169f404445692b8f16ad3e435b9eab Mon Sep 17 00:00:00 2001 From: Nick Alexander Date: Tue, 11 Oct 2016 20:25:40 -0700 Subject: [PATCH 3/9] Make !]]]) + [clojure.core.async :as a :refer [chan go go-loop !]]]) #?@(:cljs [[datomish.pair-chan] [cljs.core.async :as a :refer [chan !]]])) #?(:clj @@ -45,9 +45,11 @@ [conn] "Get the full transaction history DB associated with this connection.")) -(defrecord Connection [current-db] +(defrecord Connection [current-db transact-chan] IConnection - (close [conn] (db/close-db @(:current-db conn))) + (close [conn] + (a/close! (:transact-chan conn)) + (db/close-db @(:current-db conn))) (db [conn] @(:current-db conn)) @@ -98,12 +100,15 @@ ;; #?(:cljs ;; (doseq [[tag cb] data-readers] (cljs.reader/register-tag-parser! tag cb))) -;; TODO: implement support for DB parts? +(declare start-transactor) (defn connection-with-db [db] - (map->Connection {:current-db (atom db)})) - -;; ;; TODO: persist max-tx and max-eid in SQLite. + (let [connection + (map->Connection {:current-db (atom db) + :transact-chan (a/chan (util/unlimited-buffer)) + })] + (start-transactor connection) + connection)) (defn maybe-datom->entity [entity] (cond @@ -552,12 +557,34 @@ (:db-after (! token-chan (gensym "transactor-token")) + (loop [] + (let [token ( (! result pair)) + (a/close! result) + (>! token-chan token) + (recur))))))) diff --git a/test/datomish/test.cljs b/test/datomish/test.cljs index 92eb9430..ae43bf10 100644 --- a/test/datomish/test.cljs +++ b/test/datomish/test.cljs @@ -9,6 +9,7 @@ datomish.schema-test datomish.sqlite-user-version-test datomish.tofinoish-test + datomish.transact-test datomish.util-test datomish.test.transforms datomish.test.query @@ -23,6 +24,7 @@ 'datomish.schema-test 'datomish.sqlite-user-version-test 'datomish.tofinoish-test + 'datomish.transact-test 'datomish.util-test 'datomish.test.transforms 'datomish.test.query diff --git a/test/datomish/transact_test.cljc b/test/datomish/transact_test.cljc new file mode 100644 index 00000000..a23a2f25 --- /dev/null +++ b/test/datomish/transact_test.cljc @@ -0,0 +1,116 @@ +;; This Source Code Form is subject to the terms of the Mozilla Public +;; License, v. 2.0. If a copy of the MPL was not distributed with this +;; file, You can obtain one at http://mozilla.org/MPL/2.0/. + +(ns datomish.transact-test + #?(:cljs + (:require-macros + [datomish.pair-chan :refer [go-pair = !]]]) + #?@(:cljs [[datomish.js-sqlite] + [datomish.pair-chan] + [datomish.test-macros :refer-macros [deftest-async deftest-db]] + [datomish.node-tempfile :refer [tempfile]] + [cljs.test :as t :refer-macros [is are deftest testing async]] + [cljs.core.async :as a :refer [!]]])) + #?(:clj + (:import [clojure.lang ExceptionInfo])) + #?(:clj + (:import [datascript.db DB]))) + +#?(:cljs + (def Throwable js/Error)) + +(defn- tempids [tx] + (into {} (map (juxt (comp :idx first) second) (:tempids tx)))) + +(def test-schema + [{:db/id (d/id-literal :db.part/user) + :db/ident :x + :db/unique :db.unique/identity + :db/valueType :db.type/long + :db.install/_attribute :db.part/db} + {:db/id (d/id-literal :db.part/user) + :db/ident :name + :db/unique :db.unique/identity + :db/valueType :db.type/string + :db.install/_attribute :db.part/db} + {:db/id (d/id-literal :db.part/user) + :db/ident :y + :db/cardinality :db.cardinality/many + :db/valueType :db.type/long + :db.install/_attribute :db.part/db} + {:db/id (d/id-literal :db.part/user) + :db/ident :aka + :db/cardinality :db.cardinality/many + :db/valueType :db.type/string + :db.install/_attribute :db.part/db} + {:db/id (d/id-literal :db.part/user) + :db/ident :age + :db/valueType :db.type/long + :db.install/_attribute :db.part/db} + {:db/id (d/id-literal :db.part/user) + :db/ident :email + :db/unique :db.unique/identity + :db/valueType :db.type/string + :db.install/_attribute :db.part/db} + {:db/id (d/id-literal :db.part/user) + :db/ident :spouse + :db/unique :db.unique/value + :db/valueType :db.type/string + :db.install/_attribute :db.part/db} + {:db/id (d/id-literal :db.part/user) + :db/ident :friends + :db/cardinality :db.cardinality/many + :db/valueType :db.type/ref + :db.install/_attribute :db.part/db} + ]) + +(deftest-db test-overlapping-transacts conn + (let [{tx0 :tx} ( [[id0 :email (str "@" %) (+ 3 % tx0) 0] + [id0 :email (str "@" (inc %)) (+ 3 % tx0) 1]]) + (range 0 (dec n)))) + + (filter #(not= :db/txInstant (second %)) ( Date: Tue, 11 Oct 2016 20:29:43 -0700 Subject: [PATCH 4/9] Add {un}listen{-chan}! to connection. (#61) --- src/common/datomish/api.cljc | 4 ++ src/common/datomish/transact.cljc | 53 ++++++++++++++++- test/datomish/transact_test.cljc | 96 +++++++++++++++++++++++++++++++ 3 files changed, 152 insertions(+), 1 deletion(-) diff --git a/src/common/datomish/api.cljc b/src/common/datomish/api.cljc index 567335e0..2e012f84 100644 --- a/src/common/datomish/api.cljc +++ b/src/common/datomish/api.cljc @@ -26,6 +26,10 @@ (def Connection {:current-db (atom db) + :listener-source listener-source + :listener-mult listener-mult :transact-chan (a/chan (util/unlimited-buffer)) })] (start-transactor connection) @@ -583,8 +593,49 @@ #(-> (! (:listener-source conn) report) report)))] (>! result pair)) (a/close! result) (>! token-chan token) (recur))))))) + +(defn listen-chan! + "Put reports successfully transacted against the given connection onto the given channel. + + The listener sink channel must be unblocking. + + Returns the channel listened to, for future unlistening." + [conn listener-sink] + {:pre [(conn? conn)]} + (when-not (util/unblocking-chan? listener-sink) + (raise "Listener sinks must be channels backed by unblocking buffers" + {:error :transact/bad-listener :listener-sink listener-sink})) + ;; Tapping an already registered sink is a no-op. + (a/tap (:listener-mult conn) listener-sink) + listener-sink) + +(defn- -listen-chan [f] + (let [c (a/chan (a/sliding-buffer 10))] + (go-loop [] + (when-let [v (! c1 :token-1) + (is (= :token-1 (! c2 :token-1) + (is (= :token-1 ( Date: Thu, 13 Oct 2016 12:00:49 -0700 Subject: [PATCH 5/9] Review comment: return pair-chan; accept a result chan and close? flag. --- src/common/datomish/transact.cljc | 22 +++++++++++++--------- test/datomish/transact_test.cljc | 4 ++-- 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/src/common/datomish/transact.cljc b/src/common/datomish/transact.cljc index 58e0d33a..bc62ff05 100644 --- a/src/common/datomish/transact.cljc +++ b/src/common/datomish/transact.cljc @@ -570,13 +570,16 @@ "Submits a transaction to the database for writing. Returns a pair-chan resolving to `[result error]`." - [conn tx-data] - {:pre [(conn? conn)]} - (let [result (a/chan 1)] - ;; Any race to put! is a real race between callers of ! token-chan (gensym "transactor-token")) (loop [] (let [token (! (:listener-source conn) report) report)))] (>! result pair)) - (a/close! result) (>! token-chan token) + (when close? + (a/close! result)) (recur))))))) (defn listen-chan! diff --git a/test/datomish/transact_test.cljc b/test/datomish/transact_test.cljc index 05e28698..1436eea9 100644 --- a/test/datomish/transact_test.cljc +++ b/test/datomish/transact_test.cljc @@ -98,8 +98,8 @@ ;; Wait for all transactions to complete. ( Date: Thu, 13 Oct 2016 12:23:25 -0700 Subject: [PATCH 6/9] Review comment: ensure report is non-nil after in-transaction!. --- src/common/datomish/transact.cljc | 12 ++++++++---- test/datomish/transact_test.cljc | 12 ++++++++++++ 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/src/common/datomish/transact.cljc b/src/common/datomish/transact.cljc index bc62ff05..68da51dc 100644 --- a/src/common/datomish/transact.cljc +++ b/src/common/datomish/transact.cljc @@ -586,7 +586,7 @@ (go (>! token-chan (gensym "transactor-token")) (loop [] - (let [token ( (! (:listener-source conn) report) + (when report + ;; ! (:listener-source conn) report)) report)))] + ;; Even when report is nil (transaction not committed), pair is non-nil. (>! result pair)) (>! token-chan token) (when close? diff --git a/test/datomish/transact_test.cljc b/test/datomish/transact_test.cljc index 1436eea9..34970f8b 100644 --- a/test/datomish/transact_test.cljc +++ b/test/datomish/transact_test.cljc @@ -209,4 +209,16 @@ (tempids r))) (is (= nil (a/poll! lc))))))) +(deftest-db test-failing-transacts conn + (let [{tx0 :tx} ( Date: Thu, 13 Oct 2016 12:45:29 -0700 Subject: [PATCH 7/9] Review comment: ensure Connection {:current-db (atom db) + (map->Connection {:closed? (atom false) + :current-db (atom db) :listener-source listener-source :listener-mult listener-mult :transact-chan (a/chan (util/unlimited-buffer)) @@ -576,10 +583,12 @@ {:pre [(conn? conn)]} ;; Any race to put! is a real race between callers of Date: Thu, 13 Oct 2016 14:24:49 -0700 Subject: [PATCH 8/9] Review comment: fail pending transactions after closing connection. This is pretty difficult to test robustly, but here's a stab at it. --- src/common/datomish/transact.cljc | 54 ++++++++++++++++++++----------- test/datomish/transact_test.cljc | 36 +++++++++++++++++++++ 2 files changed, 71 insertions(+), 19 deletions(-) diff --git a/src/common/datomish/transact.cljc b/src/common/datomish/transact.cljc index 04c56e69..3c369ba7 100644 --- a/src/common/datomish/transact.cljc +++ b/src/common/datomish/transact.cljc @@ -50,12 +50,18 @@ (defrecord Connection [closed? current-db transact-chan] IConnection (close [conn] - (if (compare-and-set! (:closed? conn) false true) - (do - ;; This immediately stops ! token-chan (gensym "transactor-token")) (loop [] (when-let [token ( (! (:listener-source conn) report)) - report)))] + (case sentinel + :sentinel-close + ;; Time to close the underlying DB. + ( (! (:listener-source conn) report)) + report)))))] ;; Even when report is nil (transaction not committed), pair is non-nil. (>! result pair)) (>! token-chan token) diff --git a/test/datomish/transact_test.cljc b/test/datomish/transact_test.cljc index ed211711..732e1a7c 100644 --- a/test/datomish/transact_test.cljc +++ b/test/datomish/transact_test.cljc @@ -242,4 +242,40 @@ ;; Closing a closed connection is a no-op. ( (count xs) 0)) + (is (> (count es) 0)) + (is (= {:error :transact/connection-closed} (ex-data e)))))) + (finally + ;; Closing a closed connection is a no-op. + ( Date: Thu, 13 Oct 2016 14:28:23 -0700 Subject: [PATCH 9/9] Review comment: make a large-ish dropping buffer for JS listen! consumers. --- src/common/datomish/transact.cljc | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/common/datomish/transact.cljc b/src/common/datomish/transact.cljc index 3c369ba7..81daefc8 100644 --- a/src/common/datomish/transact.cljc +++ b/src/common/datomish/transact.cljc @@ -648,8 +648,9 @@ (a/tap (:listener-mult conn) listener-sink) listener-sink) -(defn- -listen-chan [f] - (let [c (a/chan (a/sliding-buffer 10))] +(defn- -listen-chan + [f n] + (let [c (a/chan (a/dropping-buffer n))] (go-loop [] (when-let [v (