From 2a4be8fc5572e561a777019b1a779fc0e268a6ef Mon Sep 17 00:00:00 2001 From: yito88 Date: Sat, 21 Oct 2023 22:54:39 +0200 Subject: [PATCH 1/8] use SchemaLoader --- scalardb/project.clj | 4 +- scalardb/src/scalardb/core.clj | 59 +++++++++--------------------- scalardb/src/scalardb/runner.clj | 43 ++++++++++++++-------- scalardb/src/scalardb/transfer.clj | 24 ++++-------- 4 files changed, 54 insertions(+), 76 deletions(-) diff --git a/scalardb/project.clj b/scalardb/project.clj index 78b8388..7ba01d0 100644 --- a/scalardb/project.clj +++ b/scalardb/project.clj @@ -9,7 +9,9 @@ [org.slf4j/slf4j-jdk14 "2.0.6"] [cassandra "0.1.0-SNAPSHOT"] [cc.qbits/alia "4.3.6"] - [cc.qbits/hayt "4.1.0"]] + [cc.qbits/hayt "4.1.0"] + [cheshire "5.12.0"] + [com.scalar-labs/scalardb-schema-loader "3.10.1"]] :repositories {"sonartype" "https://oss.sonatype.org/content/repositories/snapshots/"} :profiles {:dev {:dependencies [[tortue/spy "2.0.0"]] :plugins [[lein-cloverage "1.1.2"]]} diff --git a/scalardb/src/scalardb/core.clj b/scalardb/src/scalardb/core.clj index 86225a3..65a797d 100644 --- a/scalardb/src/scalardb/core.clj +++ b/scalardb/src/scalardb/core.clj @@ -1,15 +1,14 @@ (ns scalardb.core - (:require [cassandra.core :as c] - [clojure.string :as string] + (:require [cheshire.core :as cheshire] [clojure.tools.logging :refer [info warn]] [jepsen.checker :as checker] [jepsen.independent :as independent] - [qbits.alia :as alia]) + [scalardb.db-extend :as ext]) (:import (com.scalar.db.api TransactionState) + (com.scalar.db.schemaloader SchemaLoader) (com.scalar.db.service TransactionFactory StorageFactory) - (com.scalar.db.transaction.consensuscommit Coordinator) - (java.util Properties))) + (com.scalar.db.transaction.consensuscommit Coordinator))) (def ^:const RETRIES 8) (def ^:const RETRIES_FOR_RECONNECTION 3) @@ -18,43 +17,21 @@ (def ^:const DEFAULT_TABLE_COUNT 3) (def ^:const KEYSPACE "jepsen") -(def ^:private ^:const COORDINATOR "coordinator") -(def ^:private ^:const STATE_TABLE "state") (def ^:const VERSION "tx_version") -(def ^:private ISOLATION_LEVELS {:snapshot "SNAPSHOT" - :serializable "SERIALIZABLE"}) - -(def ^:private SERIALIZABLE_STRATEGIES {:extra-read "EXTRA_READ" - :extra-write "EXTRA_WRITE"}) +(defn- exponential-backoff + [r] + (Thread/sleep (reduce * 1000 (repeat r 2)))) (defn setup-transaction-tables [test schemata] - (let [cluster (alia/cluster {:contact-points (:nodes test)}) - session (alia/connect cluster)] + (let [properties (ext/create-properties (:db test) test) + options (ext/create-table-opts (:db test) test)] (doseq [schema schemata] - (c/create-my-keyspace session test schema) - (c/create-my-table session schema)) - - (c/create-my-keyspace session test {:keyspace COORDINATOR}) - (c/create-my-table session {:keyspace COORDINATOR - :table STATE_TABLE - :schema {:tx_id :text - :tx_state :int - :tx_created_at :bigint - :primary-key [:tx_id]}}) - (c/close-cassandra cluster session))) - -(defn- create-properties - [test nodes] - (doto (Properties.) - (.setProperty "scalar.db.contact_points" (string/join "," nodes)) - (.setProperty "scalar.db.username" "cassandra") - (.setProperty "scalar.db.password" "cassandra") - (.setProperty "scalar.db.isolation_level" - ((:isolation-level test) ISOLATION_LEVELS)) - (.setProperty "scalar.db.consensus_commit.serializable_strategy" - ((:serializable-strategy test) SERIALIZABLE_STRATEGIES)))) + (SchemaLoader/load properties + (cheshire/generate-string schema) + options + true)))) (defn- close-storage! [test] @@ -90,9 +67,7 @@ (defn- create-service-instance [test mode] - (when-let [properties (some->> (c/live-nodes test) - not-empty - (create-properties test))] + (when-let [properties (ext/create-properties (:db test) test)] (try (condp = mode :storage (.getStorage (StorageFactory/create properties)) @@ -110,7 +85,7 @@ (info "reconnecting to the cluster") (loop [tries RETRIES] (when (< tries RETRIES) - (c/exponential-backoff (- RETRIES tries))) + (exponential-backoff (- RETRIES tries))) (if-not (pos? tries) (warn "Failed to connect to the cluster") (if-let [instance (create-service-instance test mode)] @@ -190,7 +165,7 @@ [connect-fn test & body] `(loop [tries# RETRIES] (when (< tries# RETRIES) - (c/exponential-backoff (- RETRIES tries#))) + (exponential-backoff (- RETRIES tries#))) (when (zero? (mod tries# RETRIES_FOR_RECONNECTION)) (~connect-fn ~test)) (if-let [results# ~@body] @@ -212,7 +187,7 @@ (do (warn e) (when fallback (fallback)) - (c/exponential-backoff (- RETRIES tries)) + (exponential-backoff (- RETRIES tries)) (recur (dec tries) f args fallback)) (:value res))))) diff --git a/scalardb/src/scalardb/runner.clj b/scalardb/src/scalardb/runner.clj index 71b60f1..971fe76 100644 --- a/scalardb/src/scalardb/runner.clj +++ b/scalardb/src/scalardb/runner.clj @@ -13,7 +13,8 @@ [transfer-2pc] [transfer-append-2pc] [elle-append-2pc] - [elle-write-read-2pc]])) + [elle-write-read-2pc] + [db-extend :refer [extend-db]]])) (def workload-keys "A map of test workload keys." @@ -57,6 +58,22 @@ "consistency model to be checked" ["snapshot-isolation"])]) +(defn- init-scalardb-test + [opts workload nemesis admin] + (-> opts + (assoc :target "scalardb" + :workload workload + :nemesis nemesis + :admin admin + :storage (atom nil) + :transaction (atom nil) + :2pc (atom nil) + :table-id (atom INITIAL_TABLE_ID) + :unknown-tx (atom #{}) + :failures (atom 0)) + (update :consistency-model + (fn [ms] (mapv keyword ms))))) + (defn test-cmd [] {"test" {:opt-spec (->> test-opt-spec @@ -70,21 +87,15 @@ workload (:workload options) nemesis (:nemesis options) admin (:admin options)] - (let [test (-> options - (assoc :target "scalardb" - :workload workload - :nemesis nemesis - :admin admin - :storage (atom nil) - :transaction (atom nil) - :2pc (atom nil) - :table-id (atom INITIAL_TABLE_ID) - :unknown-tx (atom #{}) - :failures (atom 0)) - (update :consistency-model - (fn [ms] (mapv keyword ms))) - car/cassandra-test - jepsen/run!)] + (let [opts (init-scalardb-test options + workload + nemesis + admin) + updated-opts (-> opts + car/cassandra-test + (update :db + #(extend-db % :cassandra))) + test (jepsen/run! updated-opts)] (when-not (:valid? (:results test)) (System/exit 1))))))}}) diff --git a/scalardb/src/scalardb/transfer.clj b/scalardb/src/scalardb/transfer.clj index 9a6fe38..d739acf 100644 --- a/scalardb/src/scalardb/transfer.clj +++ b/scalardb/src/scalardb/transfer.clj @@ -25,26 +25,16 @@ (def ^:const NUM_ACCOUNTS 10) (def ^:private ^:const TOTAL_BALANCE (* NUM_ACCOUNTS INITIAL_BALANCE)) -(def ^:const SCHEMA {:account_id :int - :balance :int - :tx_id :text - :tx_version :int - :tx_state :int - :tx_prepared_at :bigint - :tx_committed_at :bigint - :before_balance :int - :before_tx_id :text - :before_tx_version :int - :before_tx_state :int - :before_tx_prepared_at :bigint - :before_tx_committed_at :bigint - :primary-key [:account_id]}) +(def ^:const SCHEMA {(keyword (str KEYSPACE \. TABLE)) + {:transaction true + :partition-key [ACCOUNT_ID] + :clustering-key [] + :columns {(keyword ACCOUNT_ID) "INT" + (keyword BALANCE) "INT"}}}) (defn setup-tables [test] - (scalar/setup-transaction-tables test [{:keyspace KEYSPACE - :table TABLE - :schema SCHEMA}])) + (scalar/setup-transaction-tables test [SCHEMA])) (defn prepare-get [id] From 9faa79b16cdb98323dd6b1a89a96d4f9afccfdc7 Mon Sep 17 00:00:00 2001 From: yito88 Date: Sat, 21 Oct 2023 22:55:19 +0200 Subject: [PATCH 2/8] add db_extend.clj --- scalardb/src/scalardb/db_extend.clj | 62 +++++++++++++++++++++++++++++ 1 file changed, 62 insertions(+) create mode 100644 scalardb/src/scalardb/db_extend.clj diff --git a/scalardb/src/scalardb/db_extend.clj b/scalardb/src/scalardb/db_extend.clj new file mode 100644 index 0000000..95f62b1 --- /dev/null +++ b/scalardb/src/scalardb/db_extend.clj @@ -0,0 +1,62 @@ +(ns scalardb.db-extend + (:require [cassandra.core :as cassandra] + [clojure.string :as string] + [jepsen.db :as db]) + (:import (com.scalar.db.storage.cassandra CassandraAdmin + CassandraAdmin$ReplicationStrategy + CassandraAdmin$CompactionStrategy) + (java.util Properties))) + +(def ^:private ISOLATION_LEVELS {:snapshot "SNAPSHOT" + :serializable "SERIALIZABLE"}) + +(def ^:private SERIALIZABLE_STRATEGIES {:extra-read "EXTRA_READ" + :extra-write "EXTRA_WRITE"}) + +(defprotocol DbExtension + (live-nodes [this test]) + (create-table-opts [this test]) + (create-properties [this test])) + +(def ^:private cassandra-functions + {:live-nodes (fn [_ test] (cassandra/live-nodes test)) + :create-table-opts + (fn [_ test] + {(keyword CassandraAdmin/REPLICATION_STRATEGY) + (str CassandraAdmin$ReplicationStrategy/SIMPLE_STRATEGY) + (keyword CassandraAdmin/COMPACTION_STRATEGY) + (str CassandraAdmin$CompactionStrategy/LCS) + (keyword CassandraAdmin/REPLICATION_FACTOR) (:rf test)}) + :create-properties + (fn [_ test] + (let [nodes (cassandra/live-nodes test)] + (when (nil? nodes) + (throw (ex-info "No living node" {:test test}))) + (doto (Properties.) + (.setProperty "scalar.db.contact_points" (string/join "," nodes)) + (.setProperty "scalar.db.username" "cassandra") + (.setProperty "scalar.db.password" "cassandra") + (.setProperty "scalar.db.isolation_level" + ((:isolation-level test) ISOLATION_LEVELS)) + (.setProperty "scalar.db.consensus_commit.serializable_strategy" + ((:serializable-strategy test) SERIALIZABLE_STRATEGIES)))))}) + +(def ext-db-functions + {:cassandra cassandra-functions}) + +(defn extend-db + [db db-type] + (let [ext-fns (db-type ext-db-functions)] + (reify + db/DB + (db/setup! [_ test node] (db/setup! db test node)) + (db/teardown! [_ test node] (db/teardown! db test node)) + db/Primary + (primaries [_ test] (db/primaries db test)) + (setup-primary! [_ test node] (db/setup-primary! db test node)) + db/LogFiles + (log-files [_ test node] (db/log-files db test node)) + DbExtension + (live-nodes [_ test] ((:live-nodes ext-fns) db test)) + (create-table-opts [_ test] ((:create-table-opts ext-fns) db test)) + (create-properties [_ test] ((:create-properties ext-fns) db test))))) From 10c8506ddc63c11030d03c0da6ca106ab3f92e26 Mon Sep 17 00:00:00 2001 From: yito88 Date: Sun, 22 Oct 2023 10:42:40 +0200 Subject: [PATCH 3/8] remove cassandra-test --- scalardb/src/scalardb/runner.clj | 117 ++++++++++++++++++++++--------- 1 file changed, 84 insertions(+), 33 deletions(-) diff --git a/scalardb/src/scalardb/runner.clj b/scalardb/src/scalardb/runner.clj index 971fe76..f73eaa8 100644 --- a/scalardb/src/scalardb/runner.clj +++ b/scalardb/src/scalardb/runner.clj @@ -1,9 +1,14 @@ (ns scalardb.runner (:gen-class) (:require [cassandra.runner :as car] + [cassandra.core :as cassandra] + [cassandra.nemesis :as cn] + [clojure.string :as string] [jepsen [core :as jepsen] - [cli :as cli]] + [cli :as cli] + [generator :as gen] + [tests :as tests]] [scalardb [core :refer [INITIAL_TABLE_ID]] [transfer] @@ -16,6 +21,26 @@ [elle-write-read-2pc] [db-extend :refer [extend-db]]])) +(def db-keys + "The map of test DBs." + {"cassandra" :cassandra}) + +(defn- gen-db + [db-key faults admin] + (case db-key + :cassandra (let [db (extend-db (cassandra/db) :cassandra)] + [db + (cn/nemesis-package + {:db db + :faults faults + :admin admin + :partition {:targets [:one + :primaries + :majority + :majorities-ring + :minority-third]}})]) + (throw (ex-info "Unsupported DB" {:db db-key})))) + (def workload-keys "A map of test workload keys." {"transfer" :transfer @@ -39,7 +64,10 @@ :elle-write-read-2pc scalardb.elle-write-read-2pc/workload}) (def test-opt-spec - [(cli/repeated-opt nil "--workload NAME" "Test(s) to run" [] workload-keys) + [(cli/repeated-opt nil "--db NAME" "DB(s) on which the test is run" + ["cassandra"] db-keys) + + (cli/repeated-opt nil "--workload NAME" "Test(s) to run" [] workload-keys) [nil "--isolation-level ISOLATION_LEVEL" "isolation level" :default :snapshot @@ -58,21 +86,47 @@ "consistency model to be checked" ["snapshot-isolation"])]) -(defn- init-scalardb-test - [opts workload nemesis admin] - (-> opts - (assoc :target "scalardb" - :workload workload - :nemesis nemesis - :admin admin - :storage (atom nil) - :transaction (atom nil) - :2pc (atom nil) - :table-id (atom INITIAL_TABLE_ID) - :unknown-tx (atom #{}) - :failures (atom 0)) - (update :consistency-model - (fn [ms] (mapv keyword ms))))) +(defn- test-name + [workload-key faults admin] + (-> ["scalardb" (name workload-key)] + (into (map name faults)) + (into (map name admin)) + (->> (remove nil?) (string/join "-")))) + +(def ^:private scalardb-opts + {:storage (atom nil) + :transaction (atom nil) + :2pc (atom nil) + :table-id (atom INITIAL_TABLE_ID) + :unknown-tx (atom #{}) + :failures (atom 0) + :decommissioned (atom #{})}) + +(defn scalardb-test + [base-opts db-key workload-key faults admin] + (let [workload ((workload-key workloads) base-opts) + [db nemesis] (gen-db db-key faults admin) + consistency-model (->> base-opts :consistency-model (mapv keyword))] + (merge tests/noop-test + base-opts + scalardb-opts + {:consistency-model consistency-model} + {:name (test-name workload-key faults admin) + :client (:client workload) + :db db + :pure-generators true + :generator (gen/phases + (->> (:generator workload) + gen/mix + (gen/nemesis + (gen/phases + (gen/sleep 5) + (:generator nemesis))) + (gen/time-limit (:time-limit base-opts))) + (gen/nemesis (:final-generator nemesis)) + (gen/clients (:final-generator workload))) + :nemesis (:nemesis nemesis) + :checker (:checker workload)}))) (defn test-cmd [] @@ -82,22 +136,19 @@ :opt-fn (fn [parsed] (-> parsed cli/test-opt-fn)) :usage (cli/test-usage) :run (fn [{:keys [options]}] - (with-redefs [car/workloads workloads] - (doseq [_ (range (:test-count options)) - workload (:workload options) - nemesis (:nemesis options) - admin (:admin options)] - (let [opts (init-scalardb-test options - workload - nemesis - admin) - updated-opts (-> opts - car/cassandra-test - (update :db - #(extend-db % :cassandra))) - test (jepsen/run! updated-opts)] - (when-not (:valid? (:results test)) - (System/exit 1))))))}}) + (doseq [_ (range (:test-count options)) + db-key (:db options) + workload-key (:workload options) + faults (:nemesis options) + admin (:admin options)] + (let [test (-> options + (scalardb-test db-key + workload-key + faults + admin) + jepsen/run!)] + (when-not (:valid? (:results test)) + (System/exit 1)))))}}) (defn -main [& args] From 47145935ef24dca17ce350b8c57f0f208f52774a Mon Sep 17 00:00:00 2001 From: yito88 Date: Sun, 22 Oct 2023 17:18:59 +0200 Subject: [PATCH 4/8] fix default db --- scalardb/src/scalardb/runner.clj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scalardb/src/scalardb/runner.clj b/scalardb/src/scalardb/runner.clj index f73eaa8..400f006 100644 --- a/scalardb/src/scalardb/runner.clj +++ b/scalardb/src/scalardb/runner.clj @@ -65,7 +65,7 @@ (def test-opt-spec [(cli/repeated-opt nil "--db NAME" "DB(s) on which the test is run" - ["cassandra"] db-keys) + [:cassandra] db-keys) (cli/repeated-opt nil "--workload NAME" "Test(s) to run" [] workload-keys) From 950d132729465f8f39a5c6c322c332f21da1a667 Mon Sep 17 00:00:00 2001 From: yito88 Date: Sun, 22 Oct 2023 23:21:28 +0200 Subject: [PATCH 5/8] fix extend-db --- scalardb/project.clj | 2 - scalardb/src/scalardb/core.clj | 2 +- scalardb/src/scalardb/db_extend.clj | 80 ++++++++++--------- scalardb/src/scalardb/elle_append.clj | 32 +++----- scalardb/src/scalardb/elle_write_read.clj | 32 +++----- scalardb/src/scalardb/runner.clj | 4 +- scalardb/src/scalardb/transfer.clj | 8 +- scalardb/src/scalardb/transfer_2pc.clj | 8 +- scalardb/src/scalardb/transfer_append.clj | 34 +++----- scalardb/src/scalardb/transfer_append_2pc.clj | 6 +- scalardb/test/scalardb/core_test.clj | 80 +++++++------------ scalardb/test/scalardb/db_extend_test.clj | 24 ++++++ scalardb/test/scalardb/transfer_2pc_test.clj | 16 ++-- .../scalardb/transfer_append_2pc_test.clj | 14 ++-- .../test/scalardb/transfer_append_test.clj | 14 ++-- scalardb/test/scalardb/transfer_test.clj | 16 ++-- 16 files changed, 165 insertions(+), 207 deletions(-) create mode 100644 scalardb/test/scalardb/db_extend_test.clj diff --git a/scalardb/project.clj b/scalardb/project.clj index 7ba01d0..bf79161 100644 --- a/scalardb/project.clj +++ b/scalardb/project.clj @@ -8,8 +8,6 @@ [net.java.dev.jna/jna-platform "5.11.0"] [org.slf4j/slf4j-jdk14 "2.0.6"] [cassandra "0.1.0-SNAPSHOT"] - [cc.qbits/alia "4.3.6"] - [cc.qbits/hayt "4.1.0"] [cheshire "5.12.0"] [com.scalar-labs/scalardb-schema-loader "3.10.1"]] :repositories {"sonartype" "https://oss.sonatype.org/content/repositories/snapshots/"} diff --git a/scalardb/src/scalardb/core.clj b/scalardb/src/scalardb/core.clj index 65a797d..7b773cb 100644 --- a/scalardb/src/scalardb/core.clj +++ b/scalardb/src/scalardb/core.clj @@ -19,7 +19,7 @@ (def ^:const KEYSPACE "jepsen") (def ^:const VERSION "tx_version") -(defn- exponential-backoff +(defn exponential-backoff [r] (Thread/sleep (reduce * 1000 (repeat r 2)))) diff --git a/scalardb/src/scalardb/db_extend.clj b/scalardb/src/scalardb/db_extend.clj index 95f62b1..a5ccb36 100644 --- a/scalardb/src/scalardb/db_extend.clj +++ b/scalardb/src/scalardb/db_extend.clj @@ -15,48 +15,52 @@ (defprotocol DbExtension (live-nodes [this test]) + (wait-for-recovery [this test]) (create-table-opts [this test]) (create-properties [this test])) -(def ^:private cassandra-functions - {:live-nodes (fn [_ test] (cassandra/live-nodes test)) - :create-table-opts - (fn [_ test] - {(keyword CassandraAdmin/REPLICATION_STRATEGY) - (str CassandraAdmin$ReplicationStrategy/SIMPLE_STRATEGY) - (keyword CassandraAdmin/COMPACTION_STRATEGY) - (str CassandraAdmin$CompactionStrategy/LCS) - (keyword CassandraAdmin/REPLICATION_FACTOR) (:rf test)}) - :create-properties - (fn [_ test] - (let [nodes (cassandra/live-nodes test)] - (when (nil? nodes) - (throw (ex-info "No living node" {:test test}))) - (doto (Properties.) - (.setProperty "scalar.db.contact_points" (string/join "," nodes)) - (.setProperty "scalar.db.username" "cassandra") - (.setProperty "scalar.db.password" "cassandra") - (.setProperty "scalar.db.isolation_level" - ((:isolation-level test) ISOLATION_LEVELS)) - (.setProperty "scalar.db.consensus_commit.serializable_strategy" - ((:serializable-strategy test) SERIALIZABLE_STRATEGIES)))))}) +(defrecord ExtCassandra [] + DbExtension + (live-nodes [_ test] (cassandra/live-nodes test)) + (wait-for-recovery [_ test] (cassandra/wait-rf-nodes test)) + (create-table-opts + [_ test] + {(keyword CassandraAdmin/REPLICATION_STRATEGY) + (str CassandraAdmin$ReplicationStrategy/SIMPLE_STRATEGY) + (keyword CassandraAdmin/COMPACTION_STRATEGY) + (str CassandraAdmin$CompactionStrategy/LCS) + (keyword CassandraAdmin/REPLICATION_FACTOR) (:rf test)}) + (create-properties + [_ test] + (let [nodes (cassandra/live-nodes test)] + (when (nil? nodes) + (throw (ex-info "No living node" {:test test}))) + (doto (Properties.) + (.setProperty "scalar.db.contact_points" (string/join "," nodes)) + (.setProperty "scalar.db.username" "cassandra") + (.setProperty "scalar.db.password" "cassandra") + (.setProperty "scalar.db.consensus_commit.isolation_level" + ((:isolation-level test) ISOLATION_LEVELS)) + (.setProperty "scalar.db.consensus_commit.serializable_strategy" + ((:serializable-strategy test) SERIALIZABLE_STRATEGIES)))))) -(def ext-db-functions - {:cassandra cassandra-functions}) +(def ^:private ext-dbs + {:cassandra (->ExtCassandra)}) (defn extend-db [db db-type] - (let [ext-fns (db-type ext-db-functions)] - (reify - db/DB - (db/setup! [_ test node] (db/setup! db test node)) - (db/teardown! [_ test node] (db/teardown! db test node)) - db/Primary - (primaries [_ test] (db/primaries db test)) - (setup-primary! [_ test node] (db/setup-primary! db test node)) - db/LogFiles - (log-files [_ test node] (db/log-files db test node)) - DbExtension - (live-nodes [_ test] ((:live-nodes ext-fns) db test)) - (create-table-opts [_ test] ((:create-table-opts ext-fns) db test)) - (create-properties [_ test] ((:create-properties ext-fns) db test))))) + (let [ext-db (db-type ext-dbs)] + (reify + db/DB + (setup! [_ test node] (db/setup! db test node)) + (teardown! [_ test node] (db/teardown! db test node)) + db/Primary + (primaries [_ test] (db/primaries db test)) + (setup-primary! [_ test node] (db/setup-primary! db test node)) + db/LogFiles + (log-files [_ test node] (db/log-files db test node)) + DbExtension + (live-nodes [_ test] (live-nodes ext-db test)) + (wait-for-recovery [_ test] (wait-for-recovery ext-db test)) + (create-table-opts [_ test] (create-table-opts ext-db test)) + (create-properties [_ test] (create-properties ext-db test))))) diff --git a/scalardb/src/scalardb/elle_append.clj b/scalardb/src/scalardb/elle_append.clj index 8c01d32..1c182c3 100644 --- a/scalardb/src/scalardb/elle_append.clj +++ b/scalardb/src/scalardb/elle_append.clj @@ -17,22 +17,12 @@ UnknownTransactionStatusException))) (def ^:const TABLE "txn") -(def ^:const SCHEMA {:id :int - :val :text - :tx_id :text - :tx_version :int - :tx_state :int - :tx_prepared_at :bigint - :tx_committed_at :bigint - :before_val :text - :before_tx_id :text - :before_tx_version :int - :before_tx_state :int - :before_tx_prepared_at :bigint - :before_tx_committed_at :bigint - :primary-key [:id]}) (def ^:private ^:const ID "id") (def ^:private ^:const VALUE "val") +(def ^:const SCHEMA {:transaction true + :partition-key [ID] + :clustering-key [] + :columns {(keyword ID) "INT" (keyword VALUE) "INT"}}) (defn prepare-get [table id] @@ -80,9 +70,8 @@ [test] (doseq [id (range (inc INITIAL_TABLE_ID)) i (range DEFAULT_TABLE_COUNT)] - (scalar/setup-transaction-tables test [{:keyspace KEYSPACE - :table (str TABLE id \_ i) - :schema SCHEMA}]))) + (scalar/setup-transaction-tables + test [{(keyword (str KEYSPACE \. TABLE id \_ i)) SCHEMA}]))) (defn add-tables [test next-id] @@ -92,12 +81,9 @@ (when (compare-and-set! (:table-id test) current-id next-id) (info (str "Creating new tables for " next-id)) (doseq [i (range DEFAULT_TABLE_COUNT)] - (scalar/setup-transaction-tables test [{:keyspace KEYSPACE - :table (str TABLE - next-id - \_ - i) - :schema SCHEMA}]))))))) + (scalar/setup-transaction-tables + test + [{(keyword (str KEYSPACE \. TABLE next-id \_ i)) SCHEMA}]))))))) (defrecord AppendClient [initialized?] client/Client diff --git a/scalardb/src/scalardb/elle_write_read.clj b/scalardb/src/scalardb/elle_write_read.clj index f7bc339..8de0516 100644 --- a/scalardb/src/scalardb/elle_write_read.clj +++ b/scalardb/src/scalardb/elle_write_read.clj @@ -15,22 +15,12 @@ UnknownTransactionStatusException))) (def ^:const TABLE "txn") -(def ^:private ^:const SCHEMA {:id :int - :val :int - :tx_id :text - :tx_version :int - :tx_state :int - :tx_prepared_at :bigint - :tx_committed_at :bigint - :before_val :int - :before_tx_id :text - :before_tx_version :int - :before_tx_state :int - :before_tx_prepared_at :bigint - :before_tx_committed_at :bigint - :primary-key [:id]}) (def ^:private ^:const ID "id") (def ^:private ^:const VALUE "val") +(def ^:const SCHEMA {:transaction true + :partition-key [ID] + :clustering-key [] + :columns {(keyword ID) "INT" (keyword VALUE) "INT"}}) (defn prepare-get [table id] @@ -75,9 +65,8 @@ [test] (doseq [id (range (inc INITIAL_TABLE_ID)) i (range DEFAULT_TABLE_COUNT)] - (scalar/setup-transaction-tables test [{:keyspace KEYSPACE - :table (str TABLE id \_ i) - :schema SCHEMA}]))) + (scalar/setup-transaction-tables + test [{(keyword (str KEYSPACE \. TABLE id \_ i)) SCHEMA}]))) (defn add-tables [test next-id] @@ -87,12 +76,9 @@ (when (compare-and-set! (:table-id test) current-id next-id) (info (str "Creating new tables for " next-id)) (doseq [i (range DEFAULT_TABLE_COUNT)] - (scalar/setup-transaction-tables test [{:keyspace KEYSPACE - :table (str TABLE - next-id - \_ - i) - :schema SCHEMA}]))))))) + (scalar/setup-transaction-tables + test + [{(keyword (str KEYSPACE \. TABLE next-id \_ i)) SCHEMA}]))))))) (defrecord WriteReadClient [initialized?] client/Client diff --git a/scalardb/src/scalardb/runner.clj b/scalardb/src/scalardb/runner.clj index 400f006..dcb1de4 100644 --- a/scalardb/src/scalardb/runner.clj +++ b/scalardb/src/scalardb/runner.clj @@ -1,8 +1,8 @@ (ns scalardb.runner (:gen-class) - (:require [cassandra.runner :as car] - [cassandra.core :as cassandra] + (:require [cassandra.core :as cassandra] [cassandra.nemesis :as cn] + [cassandra.runner :as car] [clojure.string :as string] [jepsen [core :as jepsen] diff --git a/scalardb/src/scalardb/transfer.clj b/scalardb/src/scalardb/transfer.clj index d739acf..b2b5314 100644 --- a/scalardb/src/scalardb/transfer.clj +++ b/scalardb/src/scalardb/transfer.clj @@ -1,12 +1,12 @@ (ns scalardb.transfer - (:require [cassandra.core :as cassandra] - [clojure.core.reducers :as r] + (:require [clojure.core.reducers :as r] [jepsen [client :as client] [checker :as checker] [generator :as gen]] [knossos.op :as op] - [scalardb.core :as scalar :refer [KEYSPACE]]) + [scalardb.core :as scalar :refer [KEYSPACE]] + [scalardb.db-extend :refer [wait-for-recovery]]) (:import (com.scalar.db.api Consistency Get Put @@ -143,7 +143,7 @@ (scalar/try-reconnection-for-transaction! test) (assoc op :type :fail :error "Skipped due to no connection"))) :get-all (do - (cassandra/wait-rf-nodes test) + (wait-for-recovery (:db test) test) (if-let [results (read-all-with-retry test (:num op))] (assoc op :type :ok :value {:balance (get-balances results) :version (get-versions results)}) diff --git a/scalardb/src/scalardb/transfer_2pc.clj b/scalardb/src/scalardb/transfer_2pc.clj index 5886cf2..96b8563 100644 --- a/scalardb/src/scalardb/transfer_2pc.clj +++ b/scalardb/src/scalardb/transfer_2pc.clj @@ -1,10 +1,10 @@ (ns scalardb.transfer-2pc - (:require [cassandra.core :as cassandra] - [jepsen + (:require [jepsen [client :as client] [generator :as gen]] [scalardb.core :as scalar] - [scalardb.transfer :as transfer]) + [scalardb.transfer :as transfer] + [scalardb.db-extend :refer [wait-for-recovery]]) (:import (com.scalar.db.exception.transaction UnknownTransactionStatusException))) (defn- tx-transfer @@ -54,7 +54,7 @@ (scalar/try-reconnection-for-2pc! test) (assoc op :type :fail :error (.getMessage e))))) :get-all (do - (cassandra/wait-rf-nodes test) + (wait-for-recovery (:db test) test) (if-let [results (transfer/read-all-with-retry test (:num op))] (assoc op :type :ok :value {:balance (transfer/get-balances results) diff --git a/scalardb/src/scalardb/transfer_append.clj b/scalardb/src/scalardb/transfer_append.clj index 78e92d0..02b423e 100644 --- a/scalardb/src/scalardb/transfer_append.clj +++ b/scalardb/src/scalardb/transfer_append.clj @@ -1,12 +1,12 @@ (ns scalardb.transfer-append - (:require [cassandra.core :as cassandra] - [clojure.core.reducers :as r] + (:require [clojure.core.reducers :as r] [clojure.tools.logging :refer [info]] [jepsen [client :as client] [checker :as checker] [generator :as gen]] - [scalardb.core :as scalar :refer [KEYSPACE]]) + [scalardb.core :as scalar :refer [KEYSPACE]] + [scalardb.db-extend :refer [wait-for-recovery]]) (:import (com.scalar.db.api Put Scan Scan$Ordering @@ -24,27 +24,17 @@ (def ^:const INITIAL_BALANCE 10000) (def ^:const NUM_ACCOUNTS 10) (def ^:private ^:const TOTAL_BALANCE (* NUM_ACCOUNTS INITIAL_BALANCE)) -(def ^:private ^:const SCHEMA {:account_id :int - :age :int - :balance :int - :tx_id :text - :tx_prepared_at :bigint - :tx_committed_at :bigint - :tx_state :int - :tx_version :int - :before_balance :int - :before_tx_committed_at :bigint - :before_tx_id :text - :before_tx_prepared_at :bigint - :before_tx_state :int - :before_tx_version :int - :primary-key [:account_id :age]}) +(def ^:const SCHEMA {(keyword (str KEYSPACE \. TABLE)) + {:transaction true + :partition-key [ACCOUNT_ID] + :clustering-key [AGE] + :columns {(keyword ACCOUNT_ID) "INT" + (keyword AGE) "INT" + (keyword BALANCE) "INT"}}}) (defn setup-tables [test] - (scalar/setup-transaction-tables test [{:keyspace KEYSPACE - :table TABLE - :schema SCHEMA}])) + (scalar/setup-transaction-tables test [SCHEMA])) (defn- prepare-scan [id] @@ -167,7 +157,7 @@ (scalar/try-reconnection-for-transaction! test) (assoc op :type :fail, :error "Skipped due to no connection"))) :get-all (do - (cassandra/wait-rf-nodes test) + (wait-for-recovery (:db test) test) (if-let [results (scan-all-records-with-retry test (:num op))] (assoc op :type, :ok :value {:balance (get-balances results) :age (get-ages results) diff --git a/scalardb/src/scalardb/transfer_append_2pc.clj b/scalardb/src/scalardb/transfer_append_2pc.clj index ab62f6e..9f0767a 100644 --- a/scalardb/src/scalardb/transfer_append_2pc.clj +++ b/scalardb/src/scalardb/transfer_append_2pc.clj @@ -1,10 +1,10 @@ (ns scalardb.transfer-append-2pc - (:require [cassandra.core :as cassandra] - [clojure.tools.logging :refer [info]] + (:require [clojure.tools.logging :refer [info]] [jepsen [client :as client] [generator :as gen]] [scalardb.core :as scalar] + [scalardb.db-extend :refer [wait-for-recovery]] [scalardb.transfer-append :as t-append]) (:import (com.scalar.db.api Result) (com.scalar.db.exception.transaction UnknownTransactionStatusException))) @@ -63,7 +63,7 @@ (scalar/try-reconnection-for-2pc! test) (assoc op :type :fail :error (.getMessage e))))) :get-all (do - (cassandra/wait-rf-nodes test) + (wait-for-recovery (:db test) test) (if-let [results (t-append/scan-all-records-with-retry test (:num op))] (assoc op :type :ok diff --git a/scalardb/test/scalardb/core_test.clj b/scalardb/test/scalardb/core_test.clj index b04c8d7..9dd86db 100644 --- a/scalardb/test/scalardb/core_test.clj +++ b/scalardb/test/scalardb/core_test.clj @@ -1,8 +1,8 @@ (ns scalardb.core-test (:require [clojure.test :refer [deftest is]] - [qbits.alia :as alia] - [cassandra.core :as c] + [jepsen.db :as db] [scalardb.core :as scalar] + [scalardb.db-extend :as ext] [spy.core :as spy]) (:import (com.scalar.db.api DistributedStorage DistributedTransaction @@ -16,45 +16,6 @@ TextValue) (java.util Optional))) -(deftest setup-transaction-tables-test - (with-redefs [alia/cluster (spy/stub "cluster") - alia/connect (spy/stub "session") - alia/shutdown (spy/spy) - c/create-my-keyspace (spy/spy) - c/create-my-table (spy/spy)] - (scalar/setup-transaction-tables {:nodes ["n1" "n2" "n3"]} - [{:keyspace "test-keyspace1" - :table "test-table2" - :schema {:id :text - :val :int - :primary-key [:id]}} - {:keyspace "test-keyspace2" - :table "test-table2" - :schema {:id :text - :val :int - :val2 :int - :primary-key [:id]}}]) - (is (spy/called-once? alia/connect)) - (is (spy/called-n-times? alia/shutdown 2)) - (is (spy/called-n-times? c/create-my-keyspace 3)) - (is (spy/called-n-times? c/create-my-table 3)))) - -(deftest create-properties-test - (let [nodes ["n1" "n2" "n3"] - properties (#'scalar/create-properties - {:isolation-level :serializable - :serializable-strategy :extra-write} nodes)] - (is (= "n1,n2,n3" - (.getProperty properties "scalar.db.contact_points"))) - (is (= "cassandra" - (.getProperty properties "scalar.db.username"))) - (is (= "cassandra" - (.getProperty properties "scalar.db.password"))) - (is (= "SERIALIZABLE" - (.getProperty properties "scalar.db.isolation_level"))) - (is (= "EXTRA_WRITE" - (.getProperty properties "scalar.db.consensus_commit.serializable_strategy"))))) - (defn- mock-result "This is only for Coordinator/get and this returns ID as `tx_state`" [id] @@ -66,6 +27,22 @@ "tx_created_at" (Optional/of (BigIntValue. column (long 1566376246))) "tx_state" (Optional/of (IntValue. column (Integer/parseInt id))))))) +(def mock-db + (reify + db/DB + (setup! [_ _ _]) + (teardown! [_ _ _]) + db/Primary + (primaries [_ _]) + (setup-primary! [_ _ _]) + db/LogFiles + (log-files [_ _ _]) + ext/DbExtension + (live-nodes [_ _] ["n1" "n2" "n3"]) + (wait-for-recovery [_ _]) + (create-table-opts [_ _]) + (create-properties [_ _]))) + (def mock-storage (reify DistributedStorage @@ -108,26 +85,23 @@ (is (nil? @(:2pc test))))) (deftest prepare-storage-service-test - (with-redefs [c/live-nodes (spy/stub ["n1" "n2" "n3"]) - scalar/create-service-instance (spy/stub mock-storage)] - (let [test {:storage (atom nil)}] + (with-redefs [scalar/create-service-instance (spy/stub mock-storage)] + (let [test {:db mock-db :storage (atom nil)}] (scalar/prepare-storage-service! test) (is (= mock-storage @(:storage test)))))) (deftest prepare-transaction-service-test - (with-redefs [c/live-nodes (spy/stub ["n1" "n2" "n3"]) - scalar/create-service-instance (spy/stub mock-tx-manager)] - (let [test {:transaction (atom nil)}] + (with-redefs [scalar/create-service-instance (spy/stub mock-tx-manager)] + (let [test {:db mock-db :transaction (atom nil)}] (scalar/prepare-transaction-service! test) (is (= mock-tx-manager @(:transaction test)))))) (deftest prepare-service-fail-test - (with-redefs [c/live-nodes (spy/stub ["n1" "n2" "n3"]) - c/exponential-backoff (spy/spy) + (with-redefs [scalar/exponential-backoff (spy/spy) scalar/create-service-instance (spy/stub nil)] - (let [test {:storage (atom nil)}] + (let [test {:db mock-db :storage (atom nil)}] (scalar/prepare-storage-service! test) - (is (spy/called-n-times? c/exponential-backoff 8)) + (is (spy/called-n-times? scalar/exponential-backoff 8)) (is (nil? @(:storage test)))))) (deftest check-connection-test @@ -169,10 +143,10 @@ (is (= 1 (scalar/check-transaction-states test #{"3" "4"})))))) (deftest check-transaction-states-fail-test - (with-redefs [c/exponential-backoff (spy/spy) + (with-redefs [scalar/exponential-backoff (spy/spy) scalar/prepare-storage-service! (spy/spy)] (let [test {:storage (atom mock-storage)}] (is (thrown? clojure.lang.ExceptionInfo (scalar/check-transaction-states test #{"tx"}))) - (is (spy/called-n-times? c/exponential-backoff 8)) + (is (spy/called-n-times? scalar/exponential-backoff 8)) (is (spy/called-n-times? scalar/prepare-storage-service! 3))))) diff --git a/scalardb/test/scalardb/db_extend_test.clj b/scalardb/test/scalardb/db_extend_test.clj new file mode 100644 index 0000000..5511c72 --- /dev/null +++ b/scalardb/test/scalardb/db_extend_test.clj @@ -0,0 +1,24 @@ +(ns scalardb.db-extend-test + (:require [cassandra.core :as cassandra] + [clojure.test :refer [deftest is]] + [scalardb.db-extend :as ext] + [spy.core :as spy])) + +(deftest create-properties-test + (with-redefs [cassandra/live-nodes (spy/stub ["n1" "n2" "n3"])] + (let [db (ext/extend-db (cassandra/db) :cassandra) + properties (ext/create-properties db + {:isolation-level :serializable + :serializable-strategy :extra-write})] + (is (= "n1,n2,n3" + (.getProperty properties "scalar.db.contact_points"))) + (is (= "cassandra" + (.getProperty properties "scalar.db.username"))) + (is (= "cassandra" + (.getProperty properties "scalar.db.password"))) + (is (= "SERIALIZABLE" + (.getProperty properties "scalar.db.isolation_level"))) + (is (= "EXTRA_WRITE" + (.getProperty + properties + "scalar.db.consensus_commit.serializable_strategy")))))) diff --git a/scalardb/test/scalardb/transfer_2pc_test.clj b/scalardb/test/scalardb/transfer_2pc_test.clj index 3662246..b2695d2 100644 --- a/scalardb/test/scalardb/transfer_2pc_test.clj +++ b/scalardb/test/scalardb/transfer_2pc_test.clj @@ -2,8 +2,8 @@ (:require [clojure.test :refer [deftest is]] [jepsen.client :as client] [jepsen.checker :as checker] - [cassandra.core :as cass] [scalardb.core :as scalar] + [scalardb.core-test :refer [mock-db]] [scalardb.transfer :as transfer] [scalardb.transfer-2pc :as transfer-2pc] [spy.core :as spy]) @@ -205,13 +205,13 @@ (deftest transfer-client-get-all-test (binding [test-records (atom {0 1000 1 100 2 10 3 1 4 0})] - (with-redefs [cass/wait-rf-nodes (spy/spy) - scalar/check-transaction-connection! (spy/spy) + (with-redefs [scalar/check-transaction-connection! (spy/spy) scalar/check-storage-connection! (spy/spy) scalar/start-transaction (spy/stub mock-transaction)] (let [client (client/open! (transfer-2pc/->TransferClient (atom false) 5 100) nil nil) - result (client/invoke! client {:storage (ref mock-storage)} + result (client/invoke! client {:db mock-db + :storage (ref mock-storage)} (#'transfer/get-all {:client client} nil))] (is (spy/called-once? scalar/check-transaction-connection!)) @@ -221,8 +221,7 @@ (is (= [1000 100 10 1 0] (get-in result [:value :version]))))))) (deftest transfer-client-get-all-fail-test - (with-redefs [cass/wait-rf-nodes (spy/spy) - cass/exponential-backoff (spy/spy) + (with-redefs [scalar/exponential-backoff (spy/spy) scalar/check-transaction-connection! (spy/spy) scalar/check-storage-connection! (spy/spy) scalar/prepare-transaction-service! (spy/spy) @@ -231,10 +230,11 @@ (let [client (client/open! (transfer-2pc/->TransferClient (atom false) 5 100) nil nil)] (is (thrown? clojure.lang.ExceptionInfo - (client/invoke! client {:storage (ref mock-storage)} + (client/invoke! client {:db mock-db + :storage (ref mock-storage)} (#'transfer/get-all {:client client} nil)))) - (is (spy/called-n-times? cass/exponential-backoff scalar/RETRIES)) + (is (spy/called-n-times? scalar/exponential-backoff scalar/RETRIES)) (is (spy/called-n-times? scalar/prepare-transaction-service! scalar/RETRIES_FOR_RECONNECTION)) (is (spy/called-n-times? scalar/prepare-storage-service! scalar/RETRIES_FOR_RECONNECTION))))) diff --git a/scalardb/test/scalardb/transfer_append_2pc_test.clj b/scalardb/test/scalardb/transfer_append_2pc_test.clj index 59f8d57..2824920 100644 --- a/scalardb/test/scalardb/transfer_append_2pc_test.clj +++ b/scalardb/test/scalardb/transfer_append_2pc_test.clj @@ -2,8 +2,8 @@ (:require [clojure.test :refer [deftest is]] [jepsen.client :as client] [jepsen.checker :as checker] - [cassandra.core :as cass] [scalardb.core :as scalar] + [scalardb.core-test :refer [mock-db]] [scalardb.transfer-append :as transfer] [scalardb.transfer-append-2pc :as t-append-2pc] [spy.core :as spy]) @@ -221,12 +221,11 @@ {:age 2 :balance 100} {:age 3 :balance 10}] 2 [{:age 1 :balance 1}]})] - (with-redefs [cass/wait-rf-nodes (spy/spy) - scalar/check-transaction-connection! (spy/spy) + (with-redefs [scalar/check-transaction-connection! (spy/spy) scalar/start-transaction (spy/stub mock-transaction)] (let [client (client/open! (t-append-2pc/->TransferClient (atom false) 3 100) nil nil) - result (client/invoke! client {} + result (client/invoke! client {:db mock-db} (#'transfer/get-all {:client client} nil))] (is (spy/called-once? scalar/check-transaction-connection!)) @@ -236,18 +235,17 @@ (is (= [2 3 1] (get-in result [:value :num]))))))) (deftest transfer-client-get-all-fail-test - (with-redefs [cass/wait-rf-nodes (spy/spy) - cass/exponential-backoff (spy/spy) + (with-redefs [scalar/exponential-backoff (spy/spy) scalar/check-transaction-connection! (spy/spy) scalar/prepare-transaction-service! (spy/spy) scalar/start-transaction (spy/stub mock-transaction-throws-exception)] (let [client (client/open! (t-append-2pc/->TransferClient (atom false) 5 100) nil nil)] (is (thrown? clojure.lang.ExceptionInfo - (client/invoke! client {} + (client/invoke! client {:db mock-db} (#'transfer/get-all {:client client} nil)))) - (is (spy/called-n-times? cass/exponential-backoff scalar/RETRIES)) + (is (spy/called-n-times? scalar/exponential-backoff scalar/RETRIES)) (is (spy/called-n-times? scalar/prepare-transaction-service! scalar/RETRIES_FOR_RECONNECTION))))) (deftest transfer-client-check-tx-test diff --git a/scalardb/test/scalardb/transfer_append_test.clj b/scalardb/test/scalardb/transfer_append_test.clj index 46e398c..c2c6586 100644 --- a/scalardb/test/scalardb/transfer_append_test.clj +++ b/scalardb/test/scalardb/transfer_append_test.clj @@ -2,8 +2,8 @@ (:require [clojure.test :refer [deftest is]] [jepsen.client :as client] [jepsen.checker :as checker] - [cassandra.core :as cass] [scalardb.core :as scalar] + [scalardb.core-test :refer [mock-db]] [scalardb.transfer-append :as transfer] [spy.core :as spy]) (:import (com.scalar.db.api DistributedTransaction @@ -192,12 +192,11 @@ {:age 2 :balance 100} {:age 3 :balance 10}] 2 [{:age 1 :balance 1}]})] - (with-redefs [cass/wait-rf-nodes (spy/spy) - scalar/check-transaction-connection! (spy/spy) + (with-redefs [scalar/check-transaction-connection! (spy/spy) scalar/start-transaction (spy/stub mock-transaction)] (let [client (client/open! (transfer/->TransferClient (atom false) 3 100) nil nil) - result (client/invoke! client {} + result (client/invoke! client {:db mock-db} (#'transfer/get-all {:client client} nil))] (is (spy/called-once? scalar/check-transaction-connection!)) @@ -207,18 +206,17 @@ (is (= [2 3 1] (get-in result [:value :num]))))))) (deftest transfer-client-get-all-fail-test - (with-redefs [cass/wait-rf-nodes (spy/spy) - cass/exponential-backoff (spy/spy) + (with-redefs [scalar/exponential-backoff (spy/spy) scalar/check-transaction-connection! (spy/spy) scalar/prepare-transaction-service! (spy/spy) scalar/start-transaction (spy/stub mock-transaction-throws-exception)] (let [client (client/open! (transfer/->TransferClient (atom false) 5 100) nil nil)] (is (thrown? clojure.lang.ExceptionInfo - (client/invoke! client {} + (client/invoke! client {:db mock-db} (#'transfer/get-all {:client client} nil)))) - (is (spy/called-n-times? cass/exponential-backoff scalar/RETRIES)) + (is (spy/called-n-times? scalar/exponential-backoff scalar/RETRIES)) (is (spy/called-n-times? scalar/prepare-transaction-service! scalar/RETRIES_FOR_RECONNECTION))))) (deftest transfer-client-check-tx-test diff --git a/scalardb/test/scalardb/transfer_test.clj b/scalardb/test/scalardb/transfer_test.clj index df4b7d2..9f9f268 100644 --- a/scalardb/test/scalardb/transfer_test.clj +++ b/scalardb/test/scalardb/transfer_test.clj @@ -2,7 +2,7 @@ (:require [clojure.test :refer [deftest is]] [jepsen.client :as client] [jepsen.checker :as checker] - [cassandra.core :as cass] + [scalardb.core-test :refer [mock-db]] [scalardb.core :as scalar] [scalardb.transfer :as transfer] [spy.core :as spy]) @@ -176,13 +176,13 @@ (deftest transfer-client-get-all-test (binding [test-records (atom {0 1000 1 100 2 10 3 1 4 0})] - (with-redefs [cass/wait-rf-nodes (spy/spy) - scalar/check-transaction-connection! (spy/spy) + (with-redefs [scalar/check-transaction-connection! (spy/spy) scalar/check-storage-connection! (spy/spy) scalar/start-transaction (spy/stub mock-transaction)] (let [client (client/open! (transfer/->TransferClient (atom false) 5 100) nil nil) - result (client/invoke! client {:storage (ref mock-storage)} + result (client/invoke! client {:db mock-db + :storage (ref mock-storage)} (#'transfer/get-all {:client client} nil))] (is (spy/called-once? scalar/check-transaction-connection!)) @@ -192,8 +192,7 @@ (is (= [1000 100 10 1 0] (get-in result [:value :version]))))))) (deftest transfer-client-get-all-fail-test - (with-redefs [cass/wait-rf-nodes (spy/spy) - cass/exponential-backoff (spy/spy) + (with-redefs [scalar/exponential-backoff (spy/spy) scalar/check-transaction-connection! (spy/spy) scalar/check-storage-connection! (spy/spy) scalar/prepare-transaction-service! (spy/spy) @@ -202,10 +201,11 @@ (let [client (client/open! (transfer/->TransferClient (atom false) 5 100) nil nil)] (is (thrown? clojure.lang.ExceptionInfo - (client/invoke! client {:storage (ref mock-storage)} + (client/invoke! client {:db mock-db + :storage (ref mock-storage)} (#'transfer/get-all {:client client} nil)))) - (is (spy/called-n-times? cass/exponential-backoff scalar/RETRIES)) + (is (spy/called-n-times? scalar/exponential-backoff scalar/RETRIES)) (is (spy/called-n-times? scalar/prepare-transaction-service! scalar/RETRIES_FOR_RECONNECTION)) (is (spy/called-n-times? scalar/prepare-storage-service! scalar/RETRIES_FOR_RECONNECTION))))) From 6113ba05c6a5f978a80217c4152cfa8446fbd0e5 Mon Sep 17 00:00:00 2001 From: yito88 Date: Mon, 23 Oct 2023 00:08:31 +0200 Subject: [PATCH 6/8] fix opts --- scalardb/src/scalardb/runner.clj | 13 +++++++------ scalardb/test/scalardb/db_extend_test.clj | 4 +++- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/scalardb/src/scalardb/runner.clj b/scalardb/src/scalardb/runner.clj index dcb1de4..c9e7608 100644 --- a/scalardb/src/scalardb/runner.clj +++ b/scalardb/src/scalardb/runner.clj @@ -104,13 +104,14 @@ (defn scalardb-test [base-opts db-key workload-key faults admin] - (let [workload ((workload-key workloads) base-opts) - [db nemesis] (gen-db db-key faults admin) - consistency-model (->> base-opts :consistency-model (mapv keyword))] + (let [[db nemesis] (gen-db db-key faults admin) + consistency-model (->> base-opts :consistency-model (mapv keyword)) + workload-opts (merge base-opts + scalardb-opts + {:consistency-model consistency-model}) + workload ((workload-key workloads) workload-opts)] (merge tests/noop-test - base-opts - scalardb-opts - {:consistency-model consistency-model} + workload-opts {:name (test-name workload-key faults admin) :client (:client workload) :db db diff --git a/scalardb/test/scalardb/db_extend_test.clj b/scalardb/test/scalardb/db_extend_test.clj index 5511c72..93ede4d 100644 --- a/scalardb/test/scalardb/db_extend_test.clj +++ b/scalardb/test/scalardb/db_extend_test.clj @@ -17,7 +17,9 @@ (is (= "cassandra" (.getProperty properties "scalar.db.password"))) (is (= "SERIALIZABLE" - (.getProperty properties "scalar.db.isolation_level"))) + (.getProperty + properties + "scalar.db.consensus_commit.isolation_level"))) (is (= "EXTRA_WRITE" (.getProperty properties From 5d2964d67f4b38f02b582ccd478677ddc338789f Mon Sep 17 00:00:00 2001 From: yito88 Date: Sun, 29 Oct 2023 20:22:07 +0100 Subject: [PATCH 7/8] fix generator --- scalardb/src/scalardb/core.clj | 18 ++++++++++++++---- scalardb/src/scalardb/runner.clj | 1 - 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/scalardb/src/scalardb/core.clj b/scalardb/src/scalardb/core.clj index 7b773cb..fce0f4b 100644 --- a/scalardb/src/scalardb/core.clj +++ b/scalardb/src/scalardb/core.clj @@ -28,10 +28,20 @@ (let [properties (ext/create-properties (:db test) test) options (ext/create-table-opts (:db test) test)] (doseq [schema schemata] - (SchemaLoader/load properties - (cheshire/generate-string schema) - options - true)))) + (loop [retries RETRIES] + (when (zero? retries) + (throw (ex-info "Failed to set up tables" {:schema schema}))) + (let [result (try + (SchemaLoader/load properties + (cheshire/generate-string schema) + options + true) + :success + (catch Exception e + (warn (.getMessage e)) + :fail))] + (when (= result :fail) + (recur (dec retries)))))))) (defn- close-storage! [test] diff --git a/scalardb/src/scalardb/runner.clj b/scalardb/src/scalardb/runner.clj index c9e7608..42e4cbd 100644 --- a/scalardb/src/scalardb/runner.clj +++ b/scalardb/src/scalardb/runner.clj @@ -118,7 +118,6 @@ :pure-generators true :generator (gen/phases (->> (:generator workload) - gen/mix (gen/nemesis (gen/phases (gen/sleep 5) From 2492632744e2be224fc39fd79d78c145842829d6 Mon Sep 17 00:00:00 2001 From: yito88 Date: Fri, 17 Nov 2023 11:17:27 +0100 Subject: [PATCH 8/8] use scalardb-schema-loader 4.0.0-SNAPSHOT --- scalardb/project.clj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scalardb/project.clj b/scalardb/project.clj index bf79161..79d5ee3 100644 --- a/scalardb/project.clj +++ b/scalardb/project.clj @@ -9,7 +9,7 @@ [org.slf4j/slf4j-jdk14 "2.0.6"] [cassandra "0.1.0-SNAPSHOT"] [cheshire "5.12.0"] - [com.scalar-labs/scalardb-schema-loader "3.10.1"]] + [com.scalar-labs/scalardb-schema-loader "4.0.0-SNAPSHOT"]] :repositories {"sonartype" "https://oss.sonatype.org/content/repositories/snapshots/"} :profiles {:dev {:dependencies [[tortue/spy "2.0.0"]] :plugins [[lein-cloverage "1.1.2"]]}