diff --git a/scalardb/project.clj b/scalardb/project.clj index 78b8388..79d5ee3 100644 --- a/scalardb/project.clj +++ b/scalardb/project.clj @@ -8,8 +8,8 @@ [net.java.dev.jna/jna-platform "5.11.0"] [org.slf4j/slf4j-jdk14 "2.0.6"] [cassandra "0.1.0-SNAPSHOT"] - [cc.qbits/alia "4.3.6"] - [cc.qbits/hayt "4.1.0"]] + [cheshire "5.12.0"] + [com.scalar-labs/scalardb-schema-loader "4.0.0-SNAPSHOT"]] :repositories {"sonartype" "https://oss.sonatype.org/content/repositories/snapshots/"} :profiles {:dev {:dependencies [[tortue/spy "2.0.0"]] :plugins [[lein-cloverage "1.1.2"]]} diff --git a/scalardb/src/scalardb/core.clj b/scalardb/src/scalardb/core.clj index 86225a3..fce0f4b 100644 --- a/scalardb/src/scalardb/core.clj +++ b/scalardb/src/scalardb/core.clj @@ -1,15 +1,14 @@ (ns scalardb.core - (:require [cassandra.core :as c] - [clojure.string :as string] + (:require [cheshire.core :as cheshire] [clojure.tools.logging :refer [info warn]] [jepsen.checker :as checker] [jepsen.independent :as independent] - [qbits.alia :as alia]) + [scalardb.db-extend :as ext]) (:import (com.scalar.db.api TransactionState) + (com.scalar.db.schemaloader SchemaLoader) (com.scalar.db.service TransactionFactory StorageFactory) - (com.scalar.db.transaction.consensuscommit Coordinator) - (java.util Properties))) + (com.scalar.db.transaction.consensuscommit Coordinator))) (def ^:const RETRIES 8) (def ^:const RETRIES_FOR_RECONNECTION 3) @@ -18,43 +17,31 @@ (def ^:const DEFAULT_TABLE_COUNT 3) (def ^:const KEYSPACE "jepsen") -(def ^:private ^:const COORDINATOR "coordinator") -(def ^:private ^:const STATE_TABLE "state") (def ^:const VERSION "tx_version") -(def ^:private ISOLATION_LEVELS {:snapshot "SNAPSHOT" - :serializable "SERIALIZABLE"}) - -(def ^:private SERIALIZABLE_STRATEGIES {:extra-read "EXTRA_READ" - :extra-write "EXTRA_WRITE"}) +(defn exponential-backoff + [r] + (Thread/sleep (reduce * 1000 (repeat r 2)))) (defn setup-transaction-tables [test schemata] - (let [cluster (alia/cluster {:contact-points (:nodes test)}) - session (alia/connect cluster)] + (let [properties (ext/create-properties (:db test) test) + options (ext/create-table-opts (:db test) test)] (doseq [schema schemata] - (c/create-my-keyspace session test schema) - (c/create-my-table session schema)) - - (c/create-my-keyspace session test {:keyspace COORDINATOR}) - (c/create-my-table session {:keyspace COORDINATOR - :table STATE_TABLE - :schema {:tx_id :text - :tx_state :int - :tx_created_at :bigint - :primary-key [:tx_id]}}) - (c/close-cassandra cluster session))) - -(defn- create-properties - [test nodes] - (doto (Properties.) - (.setProperty "scalar.db.contact_points" (string/join "," nodes)) - (.setProperty "scalar.db.username" "cassandra") - (.setProperty "scalar.db.password" "cassandra") - (.setProperty "scalar.db.isolation_level" - ((:isolation-level test) ISOLATION_LEVELS)) - (.setProperty "scalar.db.consensus_commit.serializable_strategy" - ((:serializable-strategy test) SERIALIZABLE_STRATEGIES)))) + (loop [retries RETRIES] + (when (zero? retries) + (throw (ex-info "Failed to set up tables" {:schema schema}))) + (let [result (try + (SchemaLoader/load properties + (cheshire/generate-string schema) + options + true) + :success + (catch Exception e + (warn (.getMessage e)) + :fail))] + (when (= result :fail) + (recur (dec retries)))))))) (defn- close-storage! [test] @@ -90,9 +77,7 @@ (defn- create-service-instance [test mode] - (when-let [properties (some->> (c/live-nodes test) - not-empty - (create-properties test))] + (when-let [properties (ext/create-properties (:db test) test)] (try (condp = mode :storage (.getStorage (StorageFactory/create properties)) @@ -110,7 +95,7 @@ (info "reconnecting to the cluster") (loop [tries RETRIES] (when (< tries RETRIES) - (c/exponential-backoff (- RETRIES tries))) + (exponential-backoff (- RETRIES tries))) (if-not (pos? tries) (warn "Failed to connect to the cluster") (if-let [instance (create-service-instance test mode)] @@ -190,7 +175,7 @@ [connect-fn test & body] `(loop [tries# RETRIES] (when (< tries# RETRIES) - (c/exponential-backoff (- RETRIES tries#))) + (exponential-backoff (- RETRIES tries#))) (when (zero? (mod tries# RETRIES_FOR_RECONNECTION)) (~connect-fn ~test)) (if-let [results# ~@body] @@ -212,7 +197,7 @@ (do (warn e) (when fallback (fallback)) - (c/exponential-backoff (- RETRIES tries)) + (exponential-backoff (- RETRIES tries)) (recur (dec tries) f args fallback)) (:value res))))) diff --git a/scalardb/src/scalardb/db_extend.clj b/scalardb/src/scalardb/db_extend.clj new file mode 100644 index 0000000..a5ccb36 --- /dev/null +++ b/scalardb/src/scalardb/db_extend.clj @@ -0,0 +1,66 @@ +(ns scalardb.db-extend + (:require [cassandra.core :as cassandra] + [clojure.string :as string] + [jepsen.db :as db]) + (:import (com.scalar.db.storage.cassandra CassandraAdmin + CassandraAdmin$ReplicationStrategy + CassandraAdmin$CompactionStrategy) + (java.util Properties))) + +(def ^:private ISOLATION_LEVELS {:snapshot "SNAPSHOT" + :serializable "SERIALIZABLE"}) + +(def ^:private SERIALIZABLE_STRATEGIES {:extra-read "EXTRA_READ" + :extra-write "EXTRA_WRITE"}) + +(defprotocol DbExtension + (live-nodes [this test]) + (wait-for-recovery [this test]) + (create-table-opts [this test]) + (create-properties [this test])) + +(defrecord ExtCassandra [] + DbExtension + (live-nodes [_ test] (cassandra/live-nodes test)) + (wait-for-recovery [_ test] (cassandra/wait-rf-nodes test)) + (create-table-opts + [_ test] + {(keyword CassandraAdmin/REPLICATION_STRATEGY) + (str CassandraAdmin$ReplicationStrategy/SIMPLE_STRATEGY) + (keyword CassandraAdmin/COMPACTION_STRATEGY) + (str CassandraAdmin$CompactionStrategy/LCS) + (keyword CassandraAdmin/REPLICATION_FACTOR) (:rf test)}) + (create-properties + [_ test] + (let [nodes (cassandra/live-nodes test)] + (when (nil? nodes) + (throw (ex-info "No living node" {:test test}))) + (doto (Properties.) + (.setProperty "scalar.db.contact_points" (string/join "," nodes)) + (.setProperty "scalar.db.username" "cassandra") + (.setProperty "scalar.db.password" "cassandra") + (.setProperty "scalar.db.consensus_commit.isolation_level" + ((:isolation-level test) ISOLATION_LEVELS)) + (.setProperty "scalar.db.consensus_commit.serializable_strategy" + ((:serializable-strategy test) SERIALIZABLE_STRATEGIES)))))) + +(def ^:private ext-dbs + {:cassandra (->ExtCassandra)}) + +(defn extend-db + [db db-type] + (let [ext-db (db-type ext-dbs)] + (reify + db/DB + (setup! [_ test node] (db/setup! db test node)) + (teardown! [_ test node] (db/teardown! db test node)) + db/Primary + (primaries [_ test] (db/primaries db test)) + (setup-primary! [_ test node] (db/setup-primary! db test node)) + db/LogFiles + (log-files [_ test node] (db/log-files db test node)) + DbExtension + (live-nodes [_ test] (live-nodes ext-db test)) + (wait-for-recovery [_ test] (wait-for-recovery ext-db test)) + (create-table-opts [_ test] (create-table-opts ext-db test)) + (create-properties [_ test] (create-properties ext-db test))))) diff --git a/scalardb/src/scalardb/elle_append.clj b/scalardb/src/scalardb/elle_append.clj index 8c01d32..1c182c3 100644 --- a/scalardb/src/scalardb/elle_append.clj +++ b/scalardb/src/scalardb/elle_append.clj @@ -17,22 +17,12 @@ UnknownTransactionStatusException))) (def ^:const TABLE "txn") -(def ^:const SCHEMA {:id :int - :val :text - :tx_id :text - :tx_version :int - :tx_state :int - :tx_prepared_at :bigint - :tx_committed_at :bigint - :before_val :text - :before_tx_id :text - :before_tx_version :int - :before_tx_state :int - :before_tx_prepared_at :bigint - :before_tx_committed_at :bigint - :primary-key [:id]}) (def ^:private ^:const ID "id") (def ^:private ^:const VALUE "val") +(def ^:const SCHEMA {:transaction true + :partition-key [ID] + :clustering-key [] + :columns {(keyword ID) "INT" (keyword VALUE) "INT"}}) (defn prepare-get [table id] @@ -80,9 +70,8 @@ [test] (doseq [id (range (inc INITIAL_TABLE_ID)) i (range DEFAULT_TABLE_COUNT)] - (scalar/setup-transaction-tables test [{:keyspace KEYSPACE - :table (str TABLE id \_ i) - :schema SCHEMA}]))) + (scalar/setup-transaction-tables + test [{(keyword (str KEYSPACE \. TABLE id \_ i)) SCHEMA}]))) (defn add-tables [test next-id] @@ -92,12 +81,9 @@ (when (compare-and-set! (:table-id test) current-id next-id) (info (str "Creating new tables for " next-id)) (doseq [i (range DEFAULT_TABLE_COUNT)] - (scalar/setup-transaction-tables test [{:keyspace KEYSPACE - :table (str TABLE - next-id - \_ - i) - :schema SCHEMA}]))))))) + (scalar/setup-transaction-tables + test + [{(keyword (str KEYSPACE \. TABLE next-id \_ i)) SCHEMA}]))))))) (defrecord AppendClient [initialized?] client/Client diff --git a/scalardb/src/scalardb/elle_write_read.clj b/scalardb/src/scalardb/elle_write_read.clj index f7bc339..8de0516 100644 --- a/scalardb/src/scalardb/elle_write_read.clj +++ b/scalardb/src/scalardb/elle_write_read.clj @@ -15,22 +15,12 @@ UnknownTransactionStatusException))) (def ^:const TABLE "txn") -(def ^:private ^:const SCHEMA {:id :int - :val :int - :tx_id :text - :tx_version :int - :tx_state :int - :tx_prepared_at :bigint - :tx_committed_at :bigint - :before_val :int - :before_tx_id :text - :before_tx_version :int - :before_tx_state :int - :before_tx_prepared_at :bigint - :before_tx_committed_at :bigint - :primary-key [:id]}) (def ^:private ^:const ID "id") (def ^:private ^:const VALUE "val") +(def ^:const SCHEMA {:transaction true + :partition-key [ID] + :clustering-key [] + :columns {(keyword ID) "INT" (keyword VALUE) "INT"}}) (defn prepare-get [table id] @@ -75,9 +65,8 @@ [test] (doseq [id (range (inc INITIAL_TABLE_ID)) i (range DEFAULT_TABLE_COUNT)] - (scalar/setup-transaction-tables test [{:keyspace KEYSPACE - :table (str TABLE id \_ i) - :schema SCHEMA}]))) + (scalar/setup-transaction-tables + test [{(keyword (str KEYSPACE \. TABLE id \_ i)) SCHEMA}]))) (defn add-tables [test next-id] @@ -87,12 +76,9 @@ (when (compare-and-set! (:table-id test) current-id next-id) (info (str "Creating new tables for " next-id)) (doseq [i (range DEFAULT_TABLE_COUNT)] - (scalar/setup-transaction-tables test [{:keyspace KEYSPACE - :table (str TABLE - next-id - \_ - i) - :schema SCHEMA}]))))))) + (scalar/setup-transaction-tables + test + [{(keyword (str KEYSPACE \. TABLE next-id \_ i)) SCHEMA}]))))))) (defrecord WriteReadClient [initialized?] client/Client diff --git a/scalardb/src/scalardb/runner.clj b/scalardb/src/scalardb/runner.clj index 71b60f1..42e4cbd 100644 --- a/scalardb/src/scalardb/runner.clj +++ b/scalardb/src/scalardb/runner.clj @@ -1,9 +1,14 @@ (ns scalardb.runner (:gen-class) - (:require [cassandra.runner :as car] + (:require [cassandra.core :as cassandra] + [cassandra.nemesis :as cn] + [cassandra.runner :as car] + [clojure.string :as string] [jepsen [core :as jepsen] - [cli :as cli]] + [cli :as cli] + [generator :as gen] + [tests :as tests]] [scalardb [core :refer [INITIAL_TABLE_ID]] [transfer] @@ -13,7 +18,28 @@ [transfer-2pc] [transfer-append-2pc] [elle-append-2pc] - [elle-write-read-2pc]])) + [elle-write-read-2pc] + [db-extend :refer [extend-db]]])) + +(def db-keys + "The map of test DBs." + {"cassandra" :cassandra}) + +(defn- gen-db + [db-key faults admin] + (case db-key + :cassandra (let [db (extend-db (cassandra/db) :cassandra)] + [db + (cn/nemesis-package + {:db db + :faults faults + :admin admin + :partition {:targets [:one + :primaries + :majority + :majorities-ring + :minority-third]}})]) + (throw (ex-info "Unsupported DB" {:db db-key})))) (def workload-keys "A map of test workload keys." @@ -38,7 +64,10 @@ :elle-write-read-2pc scalardb.elle-write-read-2pc/workload}) (def test-opt-spec - [(cli/repeated-opt nil "--workload NAME" "Test(s) to run" [] workload-keys) + [(cli/repeated-opt nil "--db NAME" "DB(s) on which the test is run" + [:cassandra] db-keys) + + (cli/repeated-opt nil "--workload NAME" "Test(s) to run" [] workload-keys) [nil "--isolation-level ISOLATION_LEVEL" "isolation level" :default :snapshot @@ -57,6 +86,48 @@ "consistency model to be checked" ["snapshot-isolation"])]) +(defn- test-name + [workload-key faults admin] + (-> ["scalardb" (name workload-key)] + (into (map name faults)) + (into (map name admin)) + (->> (remove nil?) (string/join "-")))) + +(def ^:private scalardb-opts + {:storage (atom nil) + :transaction (atom nil) + :2pc (atom nil) + :table-id (atom INITIAL_TABLE_ID) + :unknown-tx (atom #{}) + :failures (atom 0) + :decommissioned (atom #{})}) + +(defn scalardb-test + [base-opts db-key workload-key faults admin] + (let [[db nemesis] (gen-db db-key faults admin) + consistency-model (->> base-opts :consistency-model (mapv keyword)) + workload-opts (merge base-opts + scalardb-opts + {:consistency-model consistency-model}) + workload ((workload-key workloads) workload-opts)] + (merge tests/noop-test + workload-opts + {:name (test-name workload-key faults admin) + :client (:client workload) + :db db + :pure-generators true + :generator (gen/phases + (->> (:generator workload) + (gen/nemesis + (gen/phases + (gen/sleep 5) + (:generator nemesis))) + (gen/time-limit (:time-limit base-opts))) + (gen/nemesis (:final-generator nemesis)) + (gen/clients (:final-generator workload))) + :nemesis (:nemesis nemesis) + :checker (:checker workload)}))) + (defn test-cmd [] {"test" {:opt-spec (->> test-opt-spec @@ -65,28 +136,19 @@ :opt-fn (fn [parsed] (-> parsed cli/test-opt-fn)) :usage (cli/test-usage) :run (fn [{:keys [options]}] - (with-redefs [car/workloads workloads] - (doseq [_ (range (:test-count options)) - workload (:workload options) - nemesis (:nemesis options) - admin (:admin options)] - (let [test (-> options - (assoc :target "scalardb" - :workload workload - :nemesis nemesis - :admin admin - :storage (atom nil) - :transaction (atom nil) - :2pc (atom nil) - :table-id (atom INITIAL_TABLE_ID) - :unknown-tx (atom #{}) - :failures (atom 0)) - (update :consistency-model - (fn [ms] (mapv keyword ms))) - car/cassandra-test - jepsen/run!)] - (when-not (:valid? (:results test)) - (System/exit 1))))))}}) + (doseq [_ (range (:test-count options)) + db-key (:db options) + workload-key (:workload options) + faults (:nemesis options) + admin (:admin options)] + (let [test (-> options + (scalardb-test db-key + workload-key + faults + admin) + jepsen/run!)] + (when-not (:valid? (:results test)) + (System/exit 1)))))}}) (defn -main [& args] diff --git a/scalardb/src/scalardb/transfer.clj b/scalardb/src/scalardb/transfer.clj index 9a6fe38..b2b5314 100644 --- a/scalardb/src/scalardb/transfer.clj +++ b/scalardb/src/scalardb/transfer.clj @@ -1,12 +1,12 @@ (ns scalardb.transfer - (:require [cassandra.core :as cassandra] - [clojure.core.reducers :as r] + (:require [clojure.core.reducers :as r] [jepsen [client :as client] [checker :as checker] [generator :as gen]] [knossos.op :as op] - [scalardb.core :as scalar :refer [KEYSPACE]]) + [scalardb.core :as scalar :refer [KEYSPACE]] + [scalardb.db-extend :refer [wait-for-recovery]]) (:import (com.scalar.db.api Consistency Get Put @@ -25,26 +25,16 @@ (def ^:const NUM_ACCOUNTS 10) (def ^:private ^:const TOTAL_BALANCE (* NUM_ACCOUNTS INITIAL_BALANCE)) -(def ^:const SCHEMA {:account_id :int - :balance :int - :tx_id :text - :tx_version :int - :tx_state :int - :tx_prepared_at :bigint - :tx_committed_at :bigint - :before_balance :int - :before_tx_id :text - :before_tx_version :int - :before_tx_state :int - :before_tx_prepared_at :bigint - :before_tx_committed_at :bigint - :primary-key [:account_id]}) +(def ^:const SCHEMA {(keyword (str KEYSPACE \. TABLE)) + {:transaction true + :partition-key [ACCOUNT_ID] + :clustering-key [] + :columns {(keyword ACCOUNT_ID) "INT" + (keyword BALANCE) "INT"}}}) (defn setup-tables [test] - (scalar/setup-transaction-tables test [{:keyspace KEYSPACE - :table TABLE - :schema SCHEMA}])) + (scalar/setup-transaction-tables test [SCHEMA])) (defn prepare-get [id] @@ -153,7 +143,7 @@ (scalar/try-reconnection-for-transaction! test) (assoc op :type :fail :error "Skipped due to no connection"))) :get-all (do - (cassandra/wait-rf-nodes test) + (wait-for-recovery (:db test) test) (if-let [results (read-all-with-retry test (:num op))] (assoc op :type :ok :value {:balance (get-balances results) :version (get-versions results)}) diff --git a/scalardb/src/scalardb/transfer_2pc.clj b/scalardb/src/scalardb/transfer_2pc.clj index 5886cf2..96b8563 100644 --- a/scalardb/src/scalardb/transfer_2pc.clj +++ b/scalardb/src/scalardb/transfer_2pc.clj @@ -1,10 +1,10 @@ (ns scalardb.transfer-2pc - (:require [cassandra.core :as cassandra] - [jepsen + (:require [jepsen [client :as client] [generator :as gen]] [scalardb.core :as scalar] - [scalardb.transfer :as transfer]) + [scalardb.transfer :as transfer] + [scalardb.db-extend :refer [wait-for-recovery]]) (:import (com.scalar.db.exception.transaction UnknownTransactionStatusException))) (defn- tx-transfer @@ -54,7 +54,7 @@ (scalar/try-reconnection-for-2pc! test) (assoc op :type :fail :error (.getMessage e))))) :get-all (do - (cassandra/wait-rf-nodes test) + (wait-for-recovery (:db test) test) (if-let [results (transfer/read-all-with-retry test (:num op))] (assoc op :type :ok :value {:balance (transfer/get-balances results) diff --git a/scalardb/src/scalardb/transfer_append.clj b/scalardb/src/scalardb/transfer_append.clj index 78e92d0..02b423e 100644 --- a/scalardb/src/scalardb/transfer_append.clj +++ b/scalardb/src/scalardb/transfer_append.clj @@ -1,12 +1,12 @@ (ns scalardb.transfer-append - (:require [cassandra.core :as cassandra] - [clojure.core.reducers :as r] + (:require [clojure.core.reducers :as r] [clojure.tools.logging :refer [info]] [jepsen [client :as client] [checker :as checker] [generator :as gen]] - [scalardb.core :as scalar :refer [KEYSPACE]]) + [scalardb.core :as scalar :refer [KEYSPACE]] + [scalardb.db-extend :refer [wait-for-recovery]]) (:import (com.scalar.db.api Put Scan Scan$Ordering @@ -24,27 +24,17 @@ (def ^:const INITIAL_BALANCE 10000) (def ^:const NUM_ACCOUNTS 10) (def ^:private ^:const TOTAL_BALANCE (* NUM_ACCOUNTS INITIAL_BALANCE)) -(def ^:private ^:const SCHEMA {:account_id :int - :age :int - :balance :int - :tx_id :text - :tx_prepared_at :bigint - :tx_committed_at :bigint - :tx_state :int - :tx_version :int - :before_balance :int - :before_tx_committed_at :bigint - :before_tx_id :text - :before_tx_prepared_at :bigint - :before_tx_state :int - :before_tx_version :int - :primary-key [:account_id :age]}) +(def ^:const SCHEMA {(keyword (str KEYSPACE \. TABLE)) + {:transaction true + :partition-key [ACCOUNT_ID] + :clustering-key [AGE] + :columns {(keyword ACCOUNT_ID) "INT" + (keyword AGE) "INT" + (keyword BALANCE) "INT"}}}) (defn setup-tables [test] - (scalar/setup-transaction-tables test [{:keyspace KEYSPACE - :table TABLE - :schema SCHEMA}])) + (scalar/setup-transaction-tables test [SCHEMA])) (defn- prepare-scan [id] @@ -167,7 +157,7 @@ (scalar/try-reconnection-for-transaction! test) (assoc op :type :fail, :error "Skipped due to no connection"))) :get-all (do - (cassandra/wait-rf-nodes test) + (wait-for-recovery (:db test) test) (if-let [results (scan-all-records-with-retry test (:num op))] (assoc op :type, :ok :value {:balance (get-balances results) :age (get-ages results) diff --git a/scalardb/src/scalardb/transfer_append_2pc.clj b/scalardb/src/scalardb/transfer_append_2pc.clj index ab62f6e..9f0767a 100644 --- a/scalardb/src/scalardb/transfer_append_2pc.clj +++ b/scalardb/src/scalardb/transfer_append_2pc.clj @@ -1,10 +1,10 @@ (ns scalardb.transfer-append-2pc - (:require [cassandra.core :as cassandra] - [clojure.tools.logging :refer [info]] + (:require [clojure.tools.logging :refer [info]] [jepsen [client :as client] [generator :as gen]] [scalardb.core :as scalar] + [scalardb.db-extend :refer [wait-for-recovery]] [scalardb.transfer-append :as t-append]) (:import (com.scalar.db.api Result) (com.scalar.db.exception.transaction UnknownTransactionStatusException))) @@ -63,7 +63,7 @@ (scalar/try-reconnection-for-2pc! test) (assoc op :type :fail :error (.getMessage e))))) :get-all (do - (cassandra/wait-rf-nodes test) + (wait-for-recovery (:db test) test) (if-let [results (t-append/scan-all-records-with-retry test (:num op))] (assoc op :type :ok diff --git a/scalardb/test/scalardb/core_test.clj b/scalardb/test/scalardb/core_test.clj index b04c8d7..9dd86db 100644 --- a/scalardb/test/scalardb/core_test.clj +++ b/scalardb/test/scalardb/core_test.clj @@ -1,8 +1,8 @@ (ns scalardb.core-test (:require [clojure.test :refer [deftest is]] - [qbits.alia :as alia] - [cassandra.core :as c] + [jepsen.db :as db] [scalardb.core :as scalar] + [scalardb.db-extend :as ext] [spy.core :as spy]) (:import (com.scalar.db.api DistributedStorage DistributedTransaction @@ -16,45 +16,6 @@ TextValue) (java.util Optional))) -(deftest setup-transaction-tables-test - (with-redefs [alia/cluster (spy/stub "cluster") - alia/connect (spy/stub "session") - alia/shutdown (spy/spy) - c/create-my-keyspace (spy/spy) - c/create-my-table (spy/spy)] - (scalar/setup-transaction-tables {:nodes ["n1" "n2" "n3"]} - [{:keyspace "test-keyspace1" - :table "test-table2" - :schema {:id :text - :val :int - :primary-key [:id]}} - {:keyspace "test-keyspace2" - :table "test-table2" - :schema {:id :text - :val :int - :val2 :int - :primary-key [:id]}}]) - (is (spy/called-once? alia/connect)) - (is (spy/called-n-times? alia/shutdown 2)) - (is (spy/called-n-times? c/create-my-keyspace 3)) - (is (spy/called-n-times? c/create-my-table 3)))) - -(deftest create-properties-test - (let [nodes ["n1" "n2" "n3"] - properties (#'scalar/create-properties - {:isolation-level :serializable - :serializable-strategy :extra-write} nodes)] - (is (= "n1,n2,n3" - (.getProperty properties "scalar.db.contact_points"))) - (is (= "cassandra" - (.getProperty properties "scalar.db.username"))) - (is (= "cassandra" - (.getProperty properties "scalar.db.password"))) - (is (= "SERIALIZABLE" - (.getProperty properties "scalar.db.isolation_level"))) - (is (= "EXTRA_WRITE" - (.getProperty properties "scalar.db.consensus_commit.serializable_strategy"))))) - (defn- mock-result "This is only for Coordinator/get and this returns ID as `tx_state`" [id] @@ -66,6 +27,22 @@ "tx_created_at" (Optional/of (BigIntValue. column (long 1566376246))) "tx_state" (Optional/of (IntValue. column (Integer/parseInt id))))))) +(def mock-db + (reify + db/DB + (setup! [_ _ _]) + (teardown! [_ _ _]) + db/Primary + (primaries [_ _]) + (setup-primary! [_ _ _]) + db/LogFiles + (log-files [_ _ _]) + ext/DbExtension + (live-nodes [_ _] ["n1" "n2" "n3"]) + (wait-for-recovery [_ _]) + (create-table-opts [_ _]) + (create-properties [_ _]))) + (def mock-storage (reify DistributedStorage @@ -108,26 +85,23 @@ (is (nil? @(:2pc test))))) (deftest prepare-storage-service-test - (with-redefs [c/live-nodes (spy/stub ["n1" "n2" "n3"]) - scalar/create-service-instance (spy/stub mock-storage)] - (let [test {:storage (atom nil)}] + (with-redefs [scalar/create-service-instance (spy/stub mock-storage)] + (let [test {:db mock-db :storage (atom nil)}] (scalar/prepare-storage-service! test) (is (= mock-storage @(:storage test)))))) (deftest prepare-transaction-service-test - (with-redefs [c/live-nodes (spy/stub ["n1" "n2" "n3"]) - scalar/create-service-instance (spy/stub mock-tx-manager)] - (let [test {:transaction (atom nil)}] + (with-redefs [scalar/create-service-instance (spy/stub mock-tx-manager)] + (let [test {:db mock-db :transaction (atom nil)}] (scalar/prepare-transaction-service! test) (is (= mock-tx-manager @(:transaction test)))))) (deftest prepare-service-fail-test - (with-redefs [c/live-nodes (spy/stub ["n1" "n2" "n3"]) - c/exponential-backoff (spy/spy) + (with-redefs [scalar/exponential-backoff (spy/spy) scalar/create-service-instance (spy/stub nil)] - (let [test {:storage (atom nil)}] + (let [test {:db mock-db :storage (atom nil)}] (scalar/prepare-storage-service! test) - (is (spy/called-n-times? c/exponential-backoff 8)) + (is (spy/called-n-times? scalar/exponential-backoff 8)) (is (nil? @(:storage test)))))) (deftest check-connection-test @@ -169,10 +143,10 @@ (is (= 1 (scalar/check-transaction-states test #{"3" "4"})))))) (deftest check-transaction-states-fail-test - (with-redefs [c/exponential-backoff (spy/spy) + (with-redefs [scalar/exponential-backoff (spy/spy) scalar/prepare-storage-service! (spy/spy)] (let [test {:storage (atom mock-storage)}] (is (thrown? clojure.lang.ExceptionInfo (scalar/check-transaction-states test #{"tx"}))) - (is (spy/called-n-times? c/exponential-backoff 8)) + (is (spy/called-n-times? scalar/exponential-backoff 8)) (is (spy/called-n-times? scalar/prepare-storage-service! 3))))) diff --git a/scalardb/test/scalardb/db_extend_test.clj b/scalardb/test/scalardb/db_extend_test.clj new file mode 100644 index 0000000..93ede4d --- /dev/null +++ b/scalardb/test/scalardb/db_extend_test.clj @@ -0,0 +1,26 @@ +(ns scalardb.db-extend-test + (:require [cassandra.core :as cassandra] + [clojure.test :refer [deftest is]] + [scalardb.db-extend :as ext] + [spy.core :as spy])) + +(deftest create-properties-test + (with-redefs [cassandra/live-nodes (spy/stub ["n1" "n2" "n3"])] + (let [db (ext/extend-db (cassandra/db) :cassandra) + properties (ext/create-properties db + {:isolation-level :serializable + :serializable-strategy :extra-write})] + (is (= "n1,n2,n3" + (.getProperty properties "scalar.db.contact_points"))) + (is (= "cassandra" + (.getProperty properties "scalar.db.username"))) + (is (= "cassandra" + (.getProperty properties "scalar.db.password"))) + (is (= "SERIALIZABLE" + (.getProperty + properties + "scalar.db.consensus_commit.isolation_level"))) + (is (= "EXTRA_WRITE" + (.getProperty + properties + "scalar.db.consensus_commit.serializable_strategy")))))) diff --git a/scalardb/test/scalardb/transfer_2pc_test.clj b/scalardb/test/scalardb/transfer_2pc_test.clj index 3662246..b2695d2 100644 --- a/scalardb/test/scalardb/transfer_2pc_test.clj +++ b/scalardb/test/scalardb/transfer_2pc_test.clj @@ -2,8 +2,8 @@ (:require [clojure.test :refer [deftest is]] [jepsen.client :as client] [jepsen.checker :as checker] - [cassandra.core :as cass] [scalardb.core :as scalar] + [scalardb.core-test :refer [mock-db]] [scalardb.transfer :as transfer] [scalardb.transfer-2pc :as transfer-2pc] [spy.core :as spy]) @@ -205,13 +205,13 @@ (deftest transfer-client-get-all-test (binding [test-records (atom {0 1000 1 100 2 10 3 1 4 0})] - (with-redefs [cass/wait-rf-nodes (spy/spy) - scalar/check-transaction-connection! (spy/spy) + (with-redefs [scalar/check-transaction-connection! (spy/spy) scalar/check-storage-connection! (spy/spy) scalar/start-transaction (spy/stub mock-transaction)] (let [client (client/open! (transfer-2pc/->TransferClient (atom false) 5 100) nil nil) - result (client/invoke! client {:storage (ref mock-storage)} + result (client/invoke! client {:db mock-db + :storage (ref mock-storage)} (#'transfer/get-all {:client client} nil))] (is (spy/called-once? scalar/check-transaction-connection!)) @@ -221,8 +221,7 @@ (is (= [1000 100 10 1 0] (get-in result [:value :version]))))))) (deftest transfer-client-get-all-fail-test - (with-redefs [cass/wait-rf-nodes (spy/spy) - cass/exponential-backoff (spy/spy) + (with-redefs [scalar/exponential-backoff (spy/spy) scalar/check-transaction-connection! (spy/spy) scalar/check-storage-connection! (spy/spy) scalar/prepare-transaction-service! (spy/spy) @@ -231,10 +230,11 @@ (let [client (client/open! (transfer-2pc/->TransferClient (atom false) 5 100) nil nil)] (is (thrown? clojure.lang.ExceptionInfo - (client/invoke! client {:storage (ref mock-storage)} + (client/invoke! client {:db mock-db + :storage (ref mock-storage)} (#'transfer/get-all {:client client} nil)))) - (is (spy/called-n-times? cass/exponential-backoff scalar/RETRIES)) + (is (spy/called-n-times? scalar/exponential-backoff scalar/RETRIES)) (is (spy/called-n-times? scalar/prepare-transaction-service! scalar/RETRIES_FOR_RECONNECTION)) (is (spy/called-n-times? scalar/prepare-storage-service! scalar/RETRIES_FOR_RECONNECTION))))) diff --git a/scalardb/test/scalardb/transfer_append_2pc_test.clj b/scalardb/test/scalardb/transfer_append_2pc_test.clj index 59f8d57..2824920 100644 --- a/scalardb/test/scalardb/transfer_append_2pc_test.clj +++ b/scalardb/test/scalardb/transfer_append_2pc_test.clj @@ -2,8 +2,8 @@ (:require [clojure.test :refer [deftest is]] [jepsen.client :as client] [jepsen.checker :as checker] - [cassandra.core :as cass] [scalardb.core :as scalar] + [scalardb.core-test :refer [mock-db]] [scalardb.transfer-append :as transfer] [scalardb.transfer-append-2pc :as t-append-2pc] [spy.core :as spy]) @@ -221,12 +221,11 @@ {:age 2 :balance 100} {:age 3 :balance 10}] 2 [{:age 1 :balance 1}]})] - (with-redefs [cass/wait-rf-nodes (spy/spy) - scalar/check-transaction-connection! (spy/spy) + (with-redefs [scalar/check-transaction-connection! (spy/spy) scalar/start-transaction (spy/stub mock-transaction)] (let [client (client/open! (t-append-2pc/->TransferClient (atom false) 3 100) nil nil) - result (client/invoke! client {} + result (client/invoke! client {:db mock-db} (#'transfer/get-all {:client client} nil))] (is (spy/called-once? scalar/check-transaction-connection!)) @@ -236,18 +235,17 @@ (is (= [2 3 1] (get-in result [:value :num]))))))) (deftest transfer-client-get-all-fail-test - (with-redefs [cass/wait-rf-nodes (spy/spy) - cass/exponential-backoff (spy/spy) + (with-redefs [scalar/exponential-backoff (spy/spy) scalar/check-transaction-connection! (spy/spy) scalar/prepare-transaction-service! (spy/spy) scalar/start-transaction (spy/stub mock-transaction-throws-exception)] (let [client (client/open! (t-append-2pc/->TransferClient (atom false) 5 100) nil nil)] (is (thrown? clojure.lang.ExceptionInfo - (client/invoke! client {} + (client/invoke! client {:db mock-db} (#'transfer/get-all {:client client} nil)))) - (is (spy/called-n-times? cass/exponential-backoff scalar/RETRIES)) + (is (spy/called-n-times? scalar/exponential-backoff scalar/RETRIES)) (is (spy/called-n-times? scalar/prepare-transaction-service! scalar/RETRIES_FOR_RECONNECTION))))) (deftest transfer-client-check-tx-test diff --git a/scalardb/test/scalardb/transfer_append_test.clj b/scalardb/test/scalardb/transfer_append_test.clj index 46e398c..c2c6586 100644 --- a/scalardb/test/scalardb/transfer_append_test.clj +++ b/scalardb/test/scalardb/transfer_append_test.clj @@ -2,8 +2,8 @@ (:require [clojure.test :refer [deftest is]] [jepsen.client :as client] [jepsen.checker :as checker] - [cassandra.core :as cass] [scalardb.core :as scalar] + [scalardb.core-test :refer [mock-db]] [scalardb.transfer-append :as transfer] [spy.core :as spy]) (:import (com.scalar.db.api DistributedTransaction @@ -192,12 +192,11 @@ {:age 2 :balance 100} {:age 3 :balance 10}] 2 [{:age 1 :balance 1}]})] - (with-redefs [cass/wait-rf-nodes (spy/spy) - scalar/check-transaction-connection! (spy/spy) + (with-redefs [scalar/check-transaction-connection! (spy/spy) scalar/start-transaction (spy/stub mock-transaction)] (let [client (client/open! (transfer/->TransferClient (atom false) 3 100) nil nil) - result (client/invoke! client {} + result (client/invoke! client {:db mock-db} (#'transfer/get-all {:client client} nil))] (is (spy/called-once? scalar/check-transaction-connection!)) @@ -207,18 +206,17 @@ (is (= [2 3 1] (get-in result [:value :num]))))))) (deftest transfer-client-get-all-fail-test - (with-redefs [cass/wait-rf-nodes (spy/spy) - cass/exponential-backoff (spy/spy) + (with-redefs [scalar/exponential-backoff (spy/spy) scalar/check-transaction-connection! (spy/spy) scalar/prepare-transaction-service! (spy/spy) scalar/start-transaction (spy/stub mock-transaction-throws-exception)] (let [client (client/open! (transfer/->TransferClient (atom false) 5 100) nil nil)] (is (thrown? clojure.lang.ExceptionInfo - (client/invoke! client {} + (client/invoke! client {:db mock-db} (#'transfer/get-all {:client client} nil)))) - (is (spy/called-n-times? cass/exponential-backoff scalar/RETRIES)) + (is (spy/called-n-times? scalar/exponential-backoff scalar/RETRIES)) (is (spy/called-n-times? scalar/prepare-transaction-service! scalar/RETRIES_FOR_RECONNECTION))))) (deftest transfer-client-check-tx-test diff --git a/scalardb/test/scalardb/transfer_test.clj b/scalardb/test/scalardb/transfer_test.clj index df4b7d2..9f9f268 100644 --- a/scalardb/test/scalardb/transfer_test.clj +++ b/scalardb/test/scalardb/transfer_test.clj @@ -2,7 +2,7 @@ (:require [clojure.test :refer [deftest is]] [jepsen.client :as client] [jepsen.checker :as checker] - [cassandra.core :as cass] + [scalardb.core-test :refer [mock-db]] [scalardb.core :as scalar] [scalardb.transfer :as transfer] [spy.core :as spy]) @@ -176,13 +176,13 @@ (deftest transfer-client-get-all-test (binding [test-records (atom {0 1000 1 100 2 10 3 1 4 0})] - (with-redefs [cass/wait-rf-nodes (spy/spy) - scalar/check-transaction-connection! (spy/spy) + (with-redefs [scalar/check-transaction-connection! (spy/spy) scalar/check-storage-connection! (spy/spy) scalar/start-transaction (spy/stub mock-transaction)] (let [client (client/open! (transfer/->TransferClient (atom false) 5 100) nil nil) - result (client/invoke! client {:storage (ref mock-storage)} + result (client/invoke! client {:db mock-db + :storage (ref mock-storage)} (#'transfer/get-all {:client client} nil))] (is (spy/called-once? scalar/check-transaction-connection!)) @@ -192,8 +192,7 @@ (is (= [1000 100 10 1 0] (get-in result [:value :version]))))))) (deftest transfer-client-get-all-fail-test - (with-redefs [cass/wait-rf-nodes (spy/spy) - cass/exponential-backoff (spy/spy) + (with-redefs [scalar/exponential-backoff (spy/spy) scalar/check-transaction-connection! (spy/spy) scalar/check-storage-connection! (spy/spy) scalar/prepare-transaction-service! (spy/spy) @@ -202,10 +201,11 @@ (let [client (client/open! (transfer/->TransferClient (atom false) 5 100) nil nil)] (is (thrown? clojure.lang.ExceptionInfo - (client/invoke! client {:storage (ref mock-storage)} + (client/invoke! client {:db mock-db + :storage (ref mock-storage)} (#'transfer/get-all {:client client} nil)))) - (is (spy/called-n-times? cass/exponential-backoff scalar/RETRIES)) + (is (spy/called-n-times? scalar/exponential-backoff scalar/RETRIES)) (is (spy/called-n-times? scalar/prepare-transaction-service! scalar/RETRIES_FOR_RECONNECTION)) (is (spy/called-n-times? scalar/prepare-storage-service! scalar/RETRIES_FOR_RECONNECTION)))))