From baec3815b0d0cab4b93c1d4556617498e4a22f91 Mon Sep 17 00:00:00 2001 From: Nick Alexander Date: Wed, 27 Jul 2016 14:29:16 -0700 Subject: [PATCH] Implement transactions. --- src/datomish/datom.cljc | 136 ++++++++++ src/datomish/db.cljc | 507 ++++++++++++++++++++++++++++++++++++- src/datomish/schema.cljc | 96 +++++++ test/datomish/db_test.cljc | 149 +++++++++++ test/datomish/test.cljs | 2 + 5 files changed, 876 insertions(+), 14 deletions(-) create mode 100644 src/datomish/datom.cljc create mode 100644 src/datomish/schema.cljc create mode 100644 test/datomish/db_test.cljc diff --git a/src/datomish/datom.cljc b/src/datomish/datom.cljc new file mode 100644 index 00000000..78b61b9f --- /dev/null +++ b/src/datomish/datom.cljc @@ -0,0 +1,136 @@ +;; This Source Code Form is subject to the terms of the Mozilla Public +;; License, v. 2.0. If a copy of the MPL was not distributed with this +;; file, You can obtain one at http://mozilla.org/MPL/2.0/. + +;; Purloined from DataScript. + +(ns datomish.datom) + +(declare hash-datom equiv-datom seq-datom val-at-datom nth-datom assoc-datom) + +(deftype Datom [e a v tx added] + #?@(:cljs + [IHash + (-hash [d] (or (.-__hash d) + (set! (.-__hash d) (hash-datom d)))) + IEquiv + (-equiv [d o] (and (instance? Datom o) (equiv-datom d o))) + + ISeqable + (-seq [d] (seq-datom d)) + + ILookup + (-lookup [d k] (val-at-datom d k nil)) + (-lookup [d k nf] (val-at-datom d k nf)) + + IIndexed + (-nth [this i] (nth-datom this i)) + (-nth [this i not-found] (nth-datom this i not-found)) + + IAssociative + (-assoc [d k v] (assoc-datom d k v)) + + IPrintWithWriter + (-pr-writer [d writer opts] + (pr-sequential-writer writer pr-writer + "#datascript/Datom [" " " "]" + opts [(.-e d) (.-a d) (.-v d) (.-tx d) (.-added d)])) + ] + :clj + [Object + (hashCode [d] (hash-datom d)) + + clojure.lang.IHashEq + (hasheq [d] (hash-datom d)) + + clojure.lang.Seqable + (seq [d] (seq-datom d)) + + clojure.lang.IPersistentCollection + (equiv [d o] (and (instance? Datom o) (equiv-datom d o))) + (empty [d] (throw (UnsupportedOperationException. "empty is not supported on Datom"))) + (count [d] 5) + (cons [d [k v]] (assoc-datom d k v)) + + clojure.lang.Indexed + (nth [this i] (nth-datom this i)) + (nth [this i not-found] (nth-datom this i not-found)) + + clojure.lang.ILookup + (valAt [d k] (val-at-datom d k nil)) + (valAt [d k nf] (val-at-datom d k nf)) + + clojure.lang.Associative + (entryAt [d k] (some->> (val-at-datom d k nil) (clojure.lang.MapEntry k))) + (containsKey [e k] (#{:e :a :v :tx :added} k)) + (assoc [d k v] (assoc-datom d k v)) + ])) + +(defn ^Datom datom + ([e a v tx] (Datom. e a v tx true)) + ([e a v tx added] (Datom. e a v tx added))) + +(defn datom? [x] (instance? Datom x)) + +(defn- hash-datom [^Datom d] + (-> (hash (.-e d)) + (hash-combine (hash (.-a d))) + (hash-combine (hash (.-v d))))) + +(defn- equiv-datom [^Datom d ^Datom o] + (and (= (.-e d) (.-e o)) + (= (.-a d) (.-a o)) + (= (.-v d) (.-v o)))) + +(defn- seq-datom [^Datom d] + (list (.-e d) (.-a d) (.-v d) (.-tx d) (.-added d))) + +;; keep it fast by duplicating for both keyword and string cases +;; instead of using sets or some other matching func +(defn- val-at-datom [^Datom d k not-found] + (case k + :e (.-e d) "e" (.-e d) + :a (.-a d) "a" (.-a d) + :v (.-v d) "v" (.-v d) + :tx (.-tx d) "tx" (.-tx d) + :added (.-added d) "added" (.-added d) + not-found)) + +(defn- nth-datom + ([^Datom d ^long i] + (case i + 0 (.-e d) + 1 (.-a d) + 2 (.-v d) + 3 (.-tx d) + 4 (.-added d) + #?(:clj (throw (IndexOutOfBoundsException.)) + :cljs (throw (js/Error. (str "Datom/-nth: Index out of bounds: " i)))))) + ([^Datom d ^long i not-found] + (case i + 0 (.-e d) + 1 (.-a d) + 2 (.-v d) + 3 (.-tx d) + 4 (.-added d) + not-found))) + +(defn- ^Datom assoc-datom [^Datom d k v] + (case k + :e (Datom. v (.-a d) (.-v d) (.-tx d) (.-added d)) + :a (Datom. (.-e d) v (.-v d) (.-tx d) (.-added d)) + :v (Datom. (.-e d) (.-a d) v (.-tx d) (.-added d)) + :tx (Datom. (.-e d) (.-a d) (.-v d) v (.-added d)) + :added (Datom. (.-e d) (.-a d) (.-v d) (.-tx d) v) + (throw (IllegalArgumentException. (str "invalid key for #datascript/Datom: " k))))) + +;; printing and reading + +(defn ^Datom datom-from-reader [vec] + (apply datom vec)) + +#?(:clj + (defmethod print-method Datom [^Datom d, ^java.io.Writer w] + (.write w (str "#datascript/Datom ")) + (binding [*out* w] + (pr [(.-e d) (.-a d) (.-v d) (.-tx d) (.-added d)])))) diff --git a/src/datomish/db.cljc b/src/datomish/db.cljc index cd5c0531..f2a61ca4 100644 --- a/src/datomish/db.cljc +++ b/src/datomish/db.cljc @@ -12,31 +12,308 @@ [datomish.query.projection :as projection] [datomish.query.source :as source] [datomish.query :as query] + [honeysql.core :as sql] + [datomish.datom :as dd :refer [datom datom? #?@(:cljs [Datom])]] + [datomish.util :as util #?(:cljs :refer-macros :clj :refer) [raise raise-str cond-let]] + [datomish.schema :as ds] [datomish.sqlite :as s] [datomish.sqlite-schema :as sqlite-schema] - [datomish.util :as util #?(:cljs :refer-macros :clj :refer) [raise raise-str]] #?@(:clj [[datomish.pair-chan :refer [go-pair !]]]) #?@(:cljs [[datomish.pair-chan] - [cljs.core.async :as a :refer [chan !]]]))) + [cljs.core.async :as a :refer [chan !]]])) + #?(:clj + (:import + [datomish.datom Datom]))) + +#?(:clj + ;; From https://stuartsierra.com/2015/05/27/clojure-uncaught-exceptions + ;; Assuming require [clojure.tools.logging :as log] + (Thread/setDefaultUncaughtExceptionHandler + (reify Thread$UncaughtExceptionHandler + (uncaughtException [_ thread ex] + (println ex "Uncaught exception on" (.getName thread)))))) (defprotocol IDB (query-context [db]) - (close - [db] - "Close this database. Returns a pair channel of [nil error].")) -(defrecord DB [sqlite-connection] + (close-db + [db] + "Close this database. Returns a pair channel of [nil error].") + + (schema + [db] + "Return the schema of this database.") + + (idents + [db] + "Return the known idents of this database, as a map from keyword idents to entids.") + + (current-tx + [db] + "TODO: document this interface.") + + (sql-clause [pattern] + (merge + {:select [:*] ;; e :a :v :tx] ;; TODO: generalize columns. + :from [:datoms]} + (if-not (empty? pattern) + {:where (cons :and (map #(vector := %1 (if (keyword? %2) (str %2) %2)) [:e :a :v :tx] pattern))} ;; TODO: use schema to process v. + {}))) + +(defrecord DB [sqlite-connection schema idents current-tx] + ;; idents is map {ident -> entid} of known idents. See http://docs.datomic.com/identity.html#idents. IDB (query-context [db] (context/->Context (source/datoms-source db) nil nil)) - (close [db] (s/close (.-sqlite-connection db)))) -(defn > + (search->sql-clause pattern) + (sql/format) + (s/all-rows (:sqlite-connection db))))] + (mapv #(Datom. (:e %) (:a %) (:v %) (:tx %) true) rows)))) ;; TODO: map values according to schema. + + (> + {:select [:*] :from [:datoms] :where [:and [:= :a a] [:= :v v]]} + (sql/format) + (s/all-rows (:sqlite-connection db))))] + (mapv #(Datom. (:e %) (:a %) (:v %) (:tx %) true) rows)))) ;; TODO: map values according to schema. + + (TempId part (swap! -id-literal-idx dec)))) + ([part idx] + (->TempId part idx))) + +(defn id-literal? [x] + (and (instance? TempId x))) + +(defn temp-literal? [x] + (and (id-literal? x) + (= :db.part/temp (:part x)))) + +;; (def data-readers {'db/id id-literal}) + +;; #?(:cljs +;; (doseq [[tag cb] data-readers] (cljs.reader/register-tag-parser! tag cb))) + +;; TODO: implement support for DB parts? +(def tx0 0x2000000) + +(defn DB sqlite-connection))) + (let [rows (> + {:select [:e :v] :from [:datoms] :where [:= :a ":db/ident"]} ;; TODO: don't stringify? + (sql/format) + (s/all-rows sqlite-connection)))] + (into {} (map #(-> {(keyword (:v %)) (:e %)})) rows)))) + +(defn DB + {:sqlite-connection sqlite-connection + :idents idents + :schema (ds/schema (into {} (map (fn [[k v]] [(k idents) v]) schema))) + :current-tx tx0}))))) + +(defn connection-with-db [db] + (map->Connection {:current-db (atom db)})) + +(defrecord TxReport [db-before db-after entities tx-data tempids]) + +(defn- report? [x] + (and (instance? TxReport x))) + +;; ;; TODO: persist max-tx and max-eid in SQLite. + +(defn maybe-datom->entity [entity] + (cond + (datom? entity) + (-> + (let [[e a v tx added] entity] + (if added + [:db/add [e a v tx]] + [:db/retract [e a v tx]])) + (with-meta (get (meta entity) :source))) + + true + entity)) + +(defn maybe-explode [schema entity] ;; TODO db? schema? + (cond + (map? entity) + ;; TODO: reverse refs, lists, nested maps + (let [eid (or (:db/id entity) + (id-literal :db.part/temp))] ;; Must upsert if no ID given. TODO: check if this fails in Datomic/DS. + (for [[a v] (dissoc entity :db/id)] + [:db/add eid a v])) + + ;; (raise "Map entities are not yet supported, got " entity + ;; {:error :transact/syntax + ;; :op entity }) + + true + [entity])) + +(defn maybe-ident->entid [db [op & entity :as orig]] + ;; TODO: use something faster than `into` here. + (-> + (into [op] (for [field entity] + (get (idents db) field field))) ;; TODO: schema, not db. + ;; (with-meta (get (meta orig) :source {:source orig})) + )) + +(defrecord Transaction [db tempids entities]) + +(defn- tx-entity [db] + (let [tx (current-tx db)] + [:db/add tx :db/txInstant 0xdeadbeef tx])) ;; TODO: now. + +(defn maybe-add-current-tx [current-tx entity] + (let [[op e a v tx] entity] + [op e a v (or tx current-tx)])) + +(defn preprocess [db report] + (let [initial-es (conj (or (:entities report) []) (tx-entity db))] + (when-not (sequential? initial-es) + (raise "Bad transaction data " initial-es ", expected sequential collection" + {:error :transact/syntax, :tx-data initial-es})) + + (->> + (-> + (comp + ;; Track the provenance of each assertion for error reporting. + (map #(with-meta % {:source %})) + + ;; Normalize Datoms into :db/add or :db/retract vectors. + (map maybe-datom->entity) + + ;; Explode map shorthand, such as {:db/id e :attr value :_reverse ref}, + ;; to a list of vectors, like + ;; [[:db/add e :attr value] [:db/add ref :reverse e]]. + (mapcat (partial maybe-explode (schema db))) + + ;; Replace idents with entids where possible. + (map (partial maybe-ident->entid db)) + + ;; Add tx if not given. + (map (partial maybe-add-current-tx (current-tx db)))) + (transduce conj [] initial-es)) + (assoc-in report [:entities])))) (defn db - query-context - (query/find-into-context parsed)) + query-context + (query/find-into-context parsed)) row-pair-transducer (projection/row-pair-transducer context) sql (query/context->sql-string context args) chan (chan 50 row-pair-transducer)] @@ -67,4 +344,206 @@ [db find args] (a/reduce (partial reduce-error-pair conj) [[] nil] ( + report + (assoc-in [:tempids tempid] upserted-eid) + (assoc-in [:entities] es))))) + +(defn- transact-entity [report entity] + (update-in report [:entities] conj entity)) + +(defn report + (transact-report (datom e a (.-v old-datom) tx false)) + (transact-report (datom e a v tx true))) + entities)) + (recur (transact-report report (datom e a v tx true)) entities))) + + (= op :db/retract) + (if (first (> initial-report + (preprocess db) + + (TxReport + {:db-before db + :db-after db + ;; :current-tx current-tx + :entities tx-data + :tx-data [] + :tempids {}}))) + db-after (-> + db + + ( report + (assoc-in [:db-after] db-after))))) + +(defn properties [k v] + (cond + (= [k v] [:db/isComponent true]) [:db/isComponent] + (= v :db.type/ref) [:db.type/ref :db/index] + (= v :db.cardinality/many) [:db.cardinality/many] + (= v :db.unique/identity) [:db/unique :db.unique/identity :db/index] + (= v :db.unique/value) [:db/unique :db.unique/value :db/index] + (= [k v] [:db/index true]) [:db/index])) + +(defn- multimap [e m] + (reduce + (fn [acc [k v]] + (update-in acc [k] (fnil conj e) v)) + {} m)) + +(defn- rschema [schema] + (->> + (for [[a kv] schema + [k v] kv + prop (attr->properties k v)] + [prop a]) + (multimap #{}))) + +(defn- validate-schema-key [a k v expected] + (when-not (or (nil? v) + (contains? expected v)) + (throw (ex-info (str "Bad attribute specification for " (pr-str {a {k v}}) ", expected one of " expected) + {:error :schema/validation + :attribute a + :key k + :value v})))) + +(defn- validate-schema [schema] + (doseq [[a kv] schema] + (let [comp? (:db/isComponent kv false)] + (validate-schema-key a :db/isComponent (:db/isComponent kv) #{true false}) + (when (and comp? (not= (:db/valueType kv) :db.type/ref)) + (throw (ex-info (str "Bad attribute specification for " a ": {:db/isComponent true} should also have {:db/valueType :db.type/ref}") + {:error :schema/validation + :attribute a + :key :db/isComponent})))) + (validate-schema-key a :db/unique (:db/unique kv) #{:db.unique/value :db.unique/identity}) + (validate-schema-key a :db/valueType (:db/valueType kv) #{:db.type/ref}) + (validate-schema-key a :db/cardinality (:db/cardinality kv) #{:db.cardinality/one :db.cardinality/many})) + schema) + +(defn schema [schema] + {:pre [(or (nil? schema) (map? schema))]} + (map->Schema {:schema (validate-schema schema) + :rschema (rschema schema)})) diff --git a/test/datomish/db_test.cljc b/test/datomish/db_test.cljc new file mode 100644 index 00000000..4255bd14 --- /dev/null +++ b/test/datomish/db_test.cljc @@ -0,0 +1,149 @@ +;; This Source Code Form is subject to the terms of the Mozilla Public +;; License, v. 2.0. If a copy of the MPL was not distributed with this +;; file, You can obtain one at http://mozilla.org/MPL/2.0/. + +(ns datomish.db-test + #?(:cljs + (:require-macros + [datomish.pair-chan :refer [go-pair !]]]) + #?@(:cljs [[datomish.pair-chan] + [datomish.test-macros :refer-macros [deftest-async]] + [datomish.node-tempfile :refer [tempfile]] + [cljs.test :as t :refer-macros [is are deftest testing async]] + [cljs.core.async :as a :refer [!]]])) + #?(:clj + (:import [clojure.lang ExceptionInfo])) + #?(:clj + (:import [datascript.db DB]))) + +(defn- > + (s/all-rows (:sqlite-connection db) ["SELECT e, a, v, tx FROM datoms"]) + (> + (s/all-rows (:sqlite-connection db) ["SELECT e, a, v, tx, added FROM transactions ORDER BY tx ASC, e, a, v, added"]) + (