diff --git a/scalardb/src/scalardb/core.clj b/scalardb/src/scalardb/core.clj index dd19343..2902b96 100644 --- a/scalardb/src/scalardb/core.clj +++ b/scalardb/src/scalardb/core.clj @@ -192,16 +192,10 @@ (when-not @(:transaction test) (prepare-transaction-service! test))) -(defn try-reconnection-for-transaction! - [test] - (when (= (swap! (:failures test) inc) NUM_FAILURES_FOR_RECONNECTION) - (prepare-transaction-service! test) - (reset! (:failures test) 0))) - -(defn try-reconnection-for-2pc! - [test] +(defn try-reconnection! + [test prepare-fn] (when (= (swap! (:failures test) inc) NUM_FAILURES_FOR_RECONNECTION) - (prepare-2pc-service! test) + (prepare-fn test) (reset! (:failures test) 0))) (defn start-transaction diff --git a/scalardb/src/scalardb/elle_append.clj b/scalardb/src/scalardb/elle_append.clj index 1bdf047..94c8e28 100644 --- a/scalardb/src/scalardb/elle_append.clj +++ b/scalardb/src/scalardb/elle_append.clj @@ -110,7 +110,7 @@ (swap! (:unknown-tx test) conj (.getId tx)) (assoc op :type :info :error {:unknown-tx-status (.getId tx)})) (catch Exception e - (scalar/try-reconnection-for-transaction! test) + (scalar/try-reconnection! test scalar/prepare-transaction-service!) (assoc op :type :fail :error {:crud-error (.getMessage e)}))))) (close! [_ _]) diff --git a/scalardb/src/scalardb/elle_append_2pc.clj b/scalardb/src/scalardb/elle_append_2pc.clj index 3b26362..9ff31fb 100644 --- a/scalardb/src/scalardb/elle_append_2pc.clj +++ b/scalardb/src/scalardb/elle_append_2pc.clj @@ -52,7 +52,7 @@ (assoc op :type :info :error {:unknown-tx-status (.getId tx1)})) (catch Exception e (scalar/rollback-txs [tx1 tx2]) - (scalar/try-reconnection-for-2pc! test) + (scalar/try-reconnection! test scalar/prepare-2pc-service!) (assoc op :type :fail :error {:crud-error (.getMessage e)}))))) (close! [_ _]) diff --git a/scalardb/src/scalardb/elle_write_read.clj b/scalardb/src/scalardb/elle_write_read.clj index 8de0516..137dcf3 100644 --- a/scalardb/src/scalardb/elle_write_read.clj +++ b/scalardb/src/scalardb/elle_write_read.clj @@ -105,7 +105,7 @@ (swap! (:unknown-tx test) conj (.getId tx)) (assoc op :type :info :error {:unknown-tx-status (.getId tx)})) (catch Exception e - (scalar/try-reconnection-for-transaction! test) + (scalar/try-reconnection! test scalar/prepare-transaction-service!) (assoc op :type :fail :error {:crud-error (.getMessage e)}))))) (close! [_ _]) diff --git a/scalardb/src/scalardb/elle_write_read_2pc.clj b/scalardb/src/scalardb/elle_write_read_2pc.clj index 61b23a2..da5c65e 100644 --- a/scalardb/src/scalardb/elle_write_read_2pc.clj +++ b/scalardb/src/scalardb/elle_write_read_2pc.clj @@ -44,7 +44,7 @@ (assoc op :type :info :error {:unknown-tx-status (.getId tx1)})) (catch Exception e (scalar/rollback-txs [tx1 tx2]) - (scalar/try-reconnection-for-2pc! test) + (scalar/try-reconnection! test scalar/prepare-2pc-service!) (assoc op :type :fail :error {:crud-error (.getMessage e)}))))) (close! [_ _]) diff --git a/scalardb/src/scalardb/transfer.clj b/scalardb/src/scalardb/transfer.clj index b2b5314..0fde6db 100644 --- a/scalardb/src/scalardb/transfer.clj +++ b/scalardb/src/scalardb/transfer.clj @@ -1,5 +1,6 @@ (ns scalardb.transfer (:require [clojure.core.reducers :as r] + [clojure.tools.logging :refer [warn]] [jepsen [client :as client] [checker :as checker] @@ -23,7 +24,7 @@ (def ^:const INITIAL_BALANCE 10000) (def ^:const NUM_ACCOUNTS 10) -(def ^:private ^:const TOTAL_BALANCE (* NUM_ACCOUNTS INITIAL_BALANCE)) +(def ^:const MAX_NUM_TXS 8) (def ^:const SCHEMA {(keyword (str KEYSPACE \. TABLE)) {:transaction true @@ -86,7 +87,7 @@ (-> r get-balance (+ amount))) (defn- tx-transfer - [tx {:keys [from to amount]}] + [tx from to amount] (let [fromResult (.get tx (prepare-get from)) toResult (.get tx (prepare-get to))] (->> (calc-new-balance fromResult (- amount)) @@ -97,6 +98,34 @@ (.put tx)) (.commit tx))) +(defn- try-tx-transfer + [test {:keys [from to amount]}] + (if-let [tx (scalar/start-transaction test)] + (try + (tx-transfer tx from to amount) + :commit + (catch UnknownTransactionStatusException _ + (swap! (:unknown-tx test) conj (.getId tx)) + (warn "Unknown transaction: " (.getId tx)) + :unknown-tx-status) + (catch Exception e + (warn (.getMessage e)) + :fail)) + :start-fail)) + +(defn exec-transfers + "Execute transfers in parallel. Give the transfer function." + [test op transfer-fn] + (let [results (pmap #(transfer-fn test %) (:value op))] + (if (some #{:commit} results) + ;; return :ok when at least 1 transaction is committed + (assoc op :type :ok :value {:results results}) + ;; :info type could be better in some cases + ;; However, our checker doesn't care about the type for now + (do + (scalar/try-reconnection! test scalar/prepare-transaction-service!) + (assoc op :type :fail :error {:results results}))))) + (defn- read-record "Read a record with a transaction. If read fails, this function returns nil." [tx storage i] @@ -110,15 +139,19 @@ [test n] (scalar/check-transaction-connection! test) (scalar/check-storage-connection! test) - (scalar/with-retry (fn [test] (scalar/prepare-transaction-service! test) (scalar/prepare-storage-service! test)) test + (scalar/with-retry + (fn [test] + (scalar/prepare-transaction-service! test) + (scalar/prepare-storage-service! test)) + test (let [tx (scalar/start-transaction test) results (map #(read-record tx @(:storage test) %) (range n))] (if (some nil? results) nil results)))) -(defrecord TransferClient [initialized? n initial-balance] +(defrecord TransferClient [initialized? n initial-balance max-txs] client/Client (open! [_ _ _] - (TransferClient. initialized? n initial-balance)) + (TransferClient. initialized? n initial-balance max-txs)) (setup! [_ test] (locking initialized? @@ -129,22 +162,10 @@ (invoke! [_ test op] (case (:f op) - :transfer (if-let [tx (scalar/start-transaction test)] - (try - (tx-transfer tx (:value op)) - (assoc op :type :ok) - (catch UnknownTransactionStatusException _ - (swap! (:unknown-tx test) conj (.getId tx)) - (assoc op :type :info :error {:unknown-tx-status (.getId tx)})) - (catch Exception e - (scalar/try-reconnection-for-transaction! test) - (assoc op :type :fail :error (.getMessage e)))) - (do - (scalar/try-reconnection-for-transaction! test) - (assoc op :type :fail :error "Skipped due to no connection"))) + :transfer (exec-transfers test op try-tx-transfer) :get-all (do (wait-for-recovery (:db test) test) - (if-let [results (read-all-with-retry test (:num op))] + (if-let [results (read-all-with-retry test n)] (assoc op :type :ok :value {:balance (get-balances results) :version (get-versions results)}) (assoc op :type :fail :error "Failed to get balances"))) @@ -159,25 +180,30 @@ (teardown! [_ test] (scalar/close-all! test))) -(defn- transfer +(defn- generate-acc-pair + [n] + (loop [] + (let [from (rand-int n) + to (rand-int n)] + (if-not (= from to) [from to] (recur))))) + +(defn transfer [test _] - (let [n (-> test :client :n)] + (let [num-accs (-> test :client :n) + num-txs (-> test :client :max-txs rand-int inc)] {:type :invoke :f :transfer - :value {:from (rand-int n) - :to (rand-int n) - :amount (+ 1 (rand-int 1000))}})) - -(def diff-transfer - (gen/filter (fn [op] (not= (-> op :value :from) - (-> op :value :to))) - transfer)) + :value (repeatedly num-txs + (fn [] + (let [[from to] (generate-acc-pair num-accs)] + {:from from + :to to + :amount (+ 1 (rand-int 1000))})))})) (defn get-all - [test _] + [_ _] {:type :invoke - :f :get-all - :num (-> test :client :n)}) + :f :get-all}) (defn check-tx [_ _] @@ -188,7 +214,10 @@ [] (reify checker/Checker (check [_ test history _] - (let [read-result (->> history + (let [num-accs (-> test :client :n) + initial-balance (-> test :client :initial-balance) + total-balance (* num-accs initial-balance) + read-result (->> history (r/filter #(= :get-all (:f %))) (r/filter identity) (into []) @@ -196,9 +225,9 @@ :value) actual-balance (->> (:balance read-result) (reduce +)) - bad-balance (when-not (= actual-balance TOTAL_BALANCE) + bad-balance (when-not (= actual-balance total-balance) {:type :wrong-balance - :expected TOTAL_BALANCE + :expected total-balance :actual actual-balance}) actual-version (->> (:version read-result) (reduce +)) @@ -209,16 +238,18 @@ last ((fn [x] (if (= (:type x) :ok) (:value x) 0)))) - total-ok (->> history - (r/filter op/ok?) - (r/filter #(= :transfer (:f %))) - (r/filter identity) - (into []) - count - (+ checked-committed)) - expected-version (-> total-ok - (* 2) ; update 2 records per a transfer - (+ (-> test :client :n))) ; initial insertions + total-commits (->> history + (r/filter op/ok?) + (r/filter #(= :transfer (:f %))) + (r/reduce (fn [cnt op] + (->> op :value :results + (filter #{:commit}) + count + (+ cnt))) + checked-committed)) + expected-version (-> total-commits + (* 2) ; update 2 records per transfer + (+ num-accs)) ; initial insertions bad-version (when-not (= actual-version expected-version) {:type :wrong-version :expected expected-version @@ -231,8 +262,11 @@ (defn workload [_] - {:client (->TransferClient (atom false) NUM_ACCOUNTS INITIAL_BALANCE) - :generator [diff-transfer] + {:client (->TransferClient (atom false) + NUM_ACCOUNTS + INITIAL_BALANCE + MAX_NUM_TXS) + :generator [transfer] :final-generator (gen/phases (gen/once get-all) (gen/once check-tx)) diff --git a/scalardb/src/scalardb/transfer_2pc.clj b/scalardb/src/scalardb/transfer_2pc.clj index 96b8563..16575db 100644 --- a/scalardb/src/scalardb/transfer_2pc.clj +++ b/scalardb/src/scalardb/transfer_2pc.clj @@ -1,5 +1,6 @@ (ns scalardb.transfer-2pc - (:require [jepsen + (:require [clojure.tools.logging :refer [warn]] + [jepsen [client :as client] [generator :as gen]] [scalardb.core :as scalar] @@ -8,7 +9,7 @@ (:import (com.scalar.db.exception.transaction UnknownTransactionStatusException))) (defn- tx-transfer - [tx1 tx2 {:keys [from to amount]}] + [tx1 tx2 from to amount] (try (let [fromResult (.get tx1 (transfer/prepare-get from)) toResult (.get tx2 (transfer/prepare-get to))] @@ -25,10 +26,25 @@ (scalar/rollback-txs [tx1 tx2]) (throw e)))) -(defrecord TransferClient [initialized? n initial-balance] +(defn- try-tx-transfer + [test {:keys [from to amount]}] + (let [tx1 (scalar/start-2pc test) + tx2 (scalar/join-2pc test (.getId tx1))] + (try + (tx-transfer tx1 tx2 from to amount) + :commit + (catch UnknownTransactionStatusException _ + (swap! (:unknown-tx test) conj (.getId tx1)) + (warn "Unknown transaction: " (.getId tx1)) + :unknown-tx-status) + (catch Exception e + (warn (.getMessage e)) + :fail)))) + +(defrecord TransferClient [initialized? n initial-balance max-txs] client/Client (open! [_ _ _] - (TransferClient. initialized? n initial-balance)) + (TransferClient. initialized? n initial-balance max-txs)) (setup! [_ test] (locking initialized? @@ -40,22 +56,10 @@ (invoke! [_ test op] (case (:f op) - :transfer (let [tx1 (scalar/start-2pc test) - tx2 (scalar/join-2pc test (.getId tx1))] - (try - (tx-transfer tx1 tx2 (:value op)) - (assoc op :type :ok) - (catch UnknownTransactionStatusException _ - (swap! (:unknown-tx test) conj (.getId tx1)) - (assoc op - :type :info - :error {:unknown-tx-status (.getId tx1)})) - (catch Exception e - (scalar/try-reconnection-for-2pc! test) - (assoc op :type :fail :error (.getMessage e))))) + :transfer (transfer/exec-transfers test op try-tx-transfer) :get-all (do (wait-for-recovery (:db test) test) - (if-let [results (transfer/read-all-with-retry test (:num op))] + (if-let [results (transfer/read-all-with-retry test n)] (assoc op :type :ok :value {:balance (transfer/get-balances results) :version @@ -76,8 +80,9 @@ [_] {:client (->TransferClient (atom false) transfer/NUM_ACCOUNTS - transfer/INITIAL_BALANCE) - :generator [transfer/diff-transfer] + transfer/INITIAL_BALANCE + transfer/MAX_NUM_TXS) + :generator [transfer/transfer] :final-generator (gen/phases (gen/once transfer/get-all) (gen/once transfer/check-tx)) diff --git a/scalardb/src/scalardb/transfer_append.clj b/scalardb/src/scalardb/transfer_append.clj index 02b423e..6b22ae0 100644 --- a/scalardb/src/scalardb/transfer_append.clj +++ b/scalardb/src/scalardb/transfer_append.clj @@ -1,12 +1,13 @@ (ns scalardb.transfer-append (:require [clojure.core.reducers :as r] - [clojure.tools.logging :refer [info]] + [clojure.tools.logging :refer [info warn]] [jepsen [client :as client] [checker :as checker] [generator :as gen]] [scalardb.core :as scalar :refer [KEYSPACE]] - [scalardb.db-extend :refer [wait-for-recovery]]) + [scalardb.db-extend :refer [wait-for-recovery]] + [scalardb.transfer :as transfer]) (:import (com.scalar.db.api Put Scan Scan$Ordering @@ -21,9 +22,6 @@ (def ^:private ^:const ACCOUNT_ID "account_id") (def ^:private ^:const BALANCE "balance") (def ^:private ^:const AGE "age") -(def ^:const INITIAL_BALANCE 10000) -(def ^:const NUM_ACCOUNTS 10) -(def ^:private ^:const TOTAL_BALANCE (* NUM_ACCOUNTS INITIAL_BALANCE)) (def ^:const SCHEMA {(keyword (str KEYSPACE \. TABLE)) {:transaction true :partition-key [ACCOUNT_ID] @@ -100,7 +98,7 @@ (-> r get-age inc)) (defn- tx-transfer - [tx {:keys [from to amount]}] + [tx from to amount] (let [^Result from-result (scan-for-latest tx (prepare-scan-for-latest from)) ^Result to-result (scan-for-latest tx (prepare-scan-for-latest to))] (info "fromID:" from "the latest age:" (get-age from-result)) @@ -115,6 +113,21 @@ (.put tx)) (.commit tx))) +(defn- try-tx-transfer + [test {:keys [from to amount]}] + (if-let [tx (scalar/start-transaction test)] + (try + (tx-transfer tx from to amount) + :commit + (catch UnknownTransactionStatusException _ + (swap! (:unknown-tx test) conj (.getId tx)) + (warn "Unknown transaction: " (.getId tx)) + :unknown-tx-status) + (catch Exception e + (warn (.getMessage e)) + :fail)) + :start-fail)) + (defn- scan-records [tx id] (try @@ -129,10 +142,10 @@ results (map #(scan-records tx %) (range n))] (if (some nil? results) nil results)))) -(defrecord TransferClient [initialized? n initial-balance] +(defrecord TransferClient [initialized? n initial-balance max-txs] client/Client (open! [_ _ _] - (TransferClient. initialized? n initial-balance)) + (TransferClient. initialized? n initial-balance max-txs)) (setup! [_ test] (locking initialized? @@ -143,28 +156,17 @@ (invoke! [_ test op] (case (:f op) - :transfer (if-let [tx (scalar/start-transaction test)] - (try - (tx-transfer tx (:value op)) - (assoc op :type :ok) - (catch UnknownTransactionStatusException _ - (swap! (:unknown-tx test) conj (.getId tx)) - (assoc op :type :info, :error {:unknown-tx-status (.getId tx)})) - (catch Exception e - (scalar/try-reconnection-for-transaction! test) - (assoc op :type :fail, :error (.getMessage e)))) - (do - (scalar/try-reconnection-for-transaction! test) - (assoc op :type :fail, :error "Skipped due to no connection"))) + :transfer (transfer/exec-transfers test op try-tx-transfer) :get-all (do (wait-for-recovery (:db test) test) - (if-let [results (scan-all-records-with-retry test (:num op))] + (if-let [results (scan-all-records-with-retry test n)] (assoc op :type, :ok :value {:balance (get-balances results) :age (get-ages results) :num (get-nums results)}) (assoc op :type, :fail, :error "Failed to get all records"))) - :check-tx (if-let [num-committed (scalar/check-transaction-states test - @(:unknown-tx test))] + :check-tx (if-let [num-committed (scalar/check-transaction-states + test + @(:unknown-tx test))] (assoc op :type :ok, :value num-committed) (assoc op :type :fail, :error "Failed to check status")))) @@ -173,25 +175,10 @@ (teardown! [_ test] (scalar/close-all! test))) -(defn- transfer - [test _] - (let [n (-> test :client :n)] - {:type :invoke - :f :transfer - :value {:from (rand-int n) - :to (rand-int n) - :amount (+ 1 (rand-int 1000))}})) - -(def diff-transfer - (gen/filter (fn [op] (not= (-> op :value :from) - (-> op :value :to))) - transfer)) - (defn get-all - [test _] + [_ _] {:type :invoke - :f :get-all - :num (-> test :client :n)}) + :f :get-all}) (defn check-tx [_ _] @@ -201,17 +188,20 @@ (defn consistency-checker [] (reify checker/Checker - (check [_ _ history _] - (let [read-result (->> history + (check [_ test history _] + (let [num-accs (-> test :client :n) + initial-balance (-> test :client :initial-balance) + total-balance (* num-accs initial-balance) + read-result (->> history (r/filter #(= :get-all (:f %))) (into []) last :value) actual-balance (->> (:balance read-result) (reduce +)) - bad-balance (when-not (= actual-balance TOTAL_BALANCE) + bad-balance (when-not (= actual-balance total-balance) {:type :wrong-balance - :expected TOTAL_BALANCE + :expected total-balance :actual actual-balance}) actual-age (->> (:age read-result) (reduce +)) @@ -236,8 +226,11 @@ (defn workload [_] - {:client (->TransferClient (atom false) NUM_ACCOUNTS INITIAL_BALANCE) - :generator [diff-transfer] + {:client (->TransferClient (atom false) + transfer/NUM_ACCOUNTS + transfer/INITIAL_BALANCE + transfer/MAX_NUM_TXS) + :generator [transfer/transfer] :final-generator (gen/phases (gen/once get-all) (gen/once check-tx)) diff --git a/scalardb/src/scalardb/transfer_append_2pc.clj b/scalardb/src/scalardb/transfer_append_2pc.clj index 9f0767a..427f069 100644 --- a/scalardb/src/scalardb/transfer_append_2pc.clj +++ b/scalardb/src/scalardb/transfer_append_2pc.clj @@ -1,16 +1,17 @@ (ns scalardb.transfer-append-2pc - (:require [clojure.tools.logging :refer [info]] + (:require [clojure.tools.logging :refer [info warn]] [jepsen [client :as client] [generator :as gen]] [scalardb.core :as scalar] [scalardb.db-extend :refer [wait-for-recovery]] + [scalardb.transfer :as transfer] [scalardb.transfer-append :as t-append]) (:import (com.scalar.db.api Result) (com.scalar.db.exception.transaction UnknownTransactionStatusException))) (defn- tx-transfer - [tx1 tx2 {:keys [from to amount]}] + [tx1 tx2 from to amount] (try (let [^Result from-result (t-append/scan-for-latest tx1 (t-append/prepare-scan-for-latest from)) @@ -34,10 +35,25 @@ (scalar/rollback-txs [tx1 tx2]) (throw e)))) -(defrecord TransferClient [initialized? n initial-balance] +(defn- try-tx-transfer + [test {:keys [from to amount]}] + (let [tx1 (scalar/start-2pc test) + tx2 (scalar/join-2pc test (.getId tx1))] + (try + (tx-transfer tx1 tx2 from to amount) + :commit + (catch UnknownTransactionStatusException _ + (swap! (:unknown-tx test) conj (.getId tx1)) + (warn "Unknown transaction: " (.getId tx1)) + :unknown-tx-status) + (catch Exception e + (warn (.getMessage e)) + :fail)))) + +(defrecord TransferClient [initialized? n initial-balance max-txs] client/Client (open! [_ _ _] - (TransferClient. initialized? n initial-balance)) + (TransferClient. initialized? n initial-balance max-txs)) (setup! [_ test] (locking initialized? @@ -49,23 +65,10 @@ (invoke! [_ test op] (case (:f op) - :transfer (let [tx1 (scalar/start-2pc test) - tx2 (scalar/join-2pc test (.getId tx1))] - (try - (tx-transfer tx1 tx2 (:value op)) - (assoc op :type :ok) - (catch UnknownTransactionStatusException _ - (swap! (:unknown-tx test) conj (.getId tx1)) - (assoc op - :type :info - :error {:unknown-tx-status (.getId tx1)})) - (catch Exception e - (scalar/try-reconnection-for-2pc! test) - (assoc op :type :fail :error (.getMessage e))))) + :transfer (transfer/exec-transfers test op try-tx-transfer) :get-all (do (wait-for-recovery (:db test) test) - (if-let [results (t-append/scan-all-records-with-retry - test (:num op))] + (if-let [results (t-append/scan-all-records-with-retry test n)] (assoc op :type :ok :value {:balance (t-append/get-balances results) :age (t-append/get-ages results) @@ -85,9 +88,10 @@ (defn workload [_] {:client (->TransferClient (atom false) - t-append/NUM_ACCOUNTS - t-append/INITIAL_BALANCE) - :generator [t-append/diff-transfer] + transfer/NUM_ACCOUNTS + transfer/INITIAL_BALANCE + transfer/MAX_NUM_TXS) + :generator [transfer/transfer] :final-generator (gen/phases (gen/once t-append/get-all) (gen/once t-append/check-tx)) diff --git a/scalardb/test/scalardb/core_test.clj b/scalardb/test/scalardb/core_test.clj index 9dd86db..5c39604 100644 --- a/scalardb/test/scalardb/core_test.clj +++ b/scalardb/test/scalardb/core_test.clj @@ -123,13 +123,13 @@ mock-tx-manager)))] (let [test {:transaction (atom nil) :failures (atom 999)}] - (scalar/try-reconnection-for-transaction! test) + (scalar/try-reconnection! test scalar/prepare-transaction-service!) (is (spy/called-once? scalar/prepare-transaction-service!)) (is (= mock-tx-manager @(:transaction test))) (is (= 0 @(:failures test))) ;; the next one doesn't reconnect - (scalar/try-reconnection-for-transaction! test) + (scalar/try-reconnection! test scalar/prepare-transaction-service!) (is (spy/called-once? scalar/prepare-transaction-service!)) (is (= 1 @(:failures test)))))) diff --git a/scalardb/test/scalardb/elle_append_2pc_test.clj b/scalardb/test/scalardb/elle_append_2pc_test.clj index 8f610d5..f9c261f 100644 --- a/scalardb/test/scalardb/elle_append_2pc_test.clj +++ b/scalardb/test/scalardb/elle_append_2pc_test.clj @@ -139,7 +139,7 @@ (binding [rollback-count (atom 0)] (with-redefs [scalar/start-2pc (spy/stub mock-2pc-throws-exception) scalar/join-2pc (spy/stub mock-2pc) - scalar/try-reconnection-for-2pc! (spy/spy)] + scalar/try-reconnection! (spy/spy)] (let [client (client/open! (elle-append/->AppendClient (atom false)) nil nil) result (client/invoke! client @@ -151,7 +151,7 @@ :value [0 [[:r 1 nil]]]})] (is (spy/called-once? scalar/start-2pc)) (is (spy/called-once? scalar/join-2pc)) - (is (spy/called-once? scalar/try-reconnection-for-2pc!)) + (is (spy/called-once? scalar/try-reconnection!)) (is (= 2 @rollback-count)) (is (= :fail (:type result))))))) @@ -163,7 +163,7 @@ rollback-count (atom 0)] (with-redefs [scalar/start-2pc (spy/stub mock-2pc-throws-unknown) scalar/join-2pc (spy/stub mock-2pc) - scalar/try-reconnection-for-2pc! (spy/spy)] + scalar/try-reconnection! (spy/spy)] (let [client (client/open! (elle-append/->AppendClient (atom false)) nil nil) result (client/invoke! client @@ -176,7 +176,7 @@ :value [0 [[:r 1 nil] [:append 1 0]]]})] (is (spy/called-once? scalar/start-2pc)) (is (spy/called-once? scalar/join-2pc)) - (is (spy/not-called? scalar/try-reconnection-for-2pc!)) + (is (spy/not-called? scalar/try-reconnection!)) (is (= 2 @get-count)) (is (= 1 @put-count)) (is (= 2 @prepare-count)) diff --git a/scalardb/test/scalardb/elle_append_test.clj b/scalardb/test/scalardb/elle_append_test.clj index 9c76e5d..93ec843 100644 --- a/scalardb/test/scalardb/elle_append_test.clj +++ b/scalardb/test/scalardb/elle_append_test.clj @@ -115,7 +115,7 @@ (deftest append-client-invoke-crud-exception-test (with-redefs [scalar/start-transaction (spy/stub mock-transaction-throws-exception) - scalar/try-reconnection-for-transaction! (spy/spy)] + scalar/try-reconnection! (spy/spy)] (let [client (client/open! (elle-append/->AppendClient (atom false)) nil nil) result (client/invoke! client @@ -126,14 +126,14 @@ :f :txn :value [0 [[:r 1 nil]]]})] (is (spy/called-once? scalar/start-transaction)) - (is (spy/called-once? scalar/try-reconnection-for-transaction!)) + (is (spy/called-once? scalar/try-reconnection!)) (is (= :fail (:type result)))))) (deftest append-client-invoke-unknown-exception-test (binding [get-count (atom 0) put-count (atom 0)] (with-redefs [scalar/start-transaction (spy/stub mock-transaction-throws-unknown) - scalar/try-reconnection-for-transaction! (spy/spy)] + scalar/try-reconnection! (spy/spy)] (let [client (client/open! (elle-append/->AppendClient (atom false)) nil nil) result (client/invoke! client @@ -147,7 +147,7 @@ [[:r 1 nil] [:append 1 0]]]})] (is (spy/called-once? scalar/start-transaction)) - (is (spy/not-called? scalar/try-reconnection-for-transaction!)) + (is (spy/not-called? scalar/try-reconnection!)) (is (= 2 @get-count)) (is (= 1 @put-count)) (is (= :info (:type result))) diff --git a/scalardb/test/scalardb/elle_write_read_2pc_test.clj b/scalardb/test/scalardb/elle_write_read_2pc_test.clj index 0ece448..546e0b1 100644 --- a/scalardb/test/scalardb/elle_write_read_2pc_test.clj +++ b/scalardb/test/scalardb/elle_write_read_2pc_test.clj @@ -138,7 +138,7 @@ (binding [rollback-count (atom 0)] (with-redefs [scalar/start-2pc (spy/stub mock-2pc-throws-exception) scalar/join-2pc (spy/stub mock-2pc) - scalar/try-reconnection-for-2pc! (spy/spy)] + scalar/try-reconnection! (spy/spy)] (let [client (client/open! (elle-wr/->WriteReadClient (atom false)) nil nil) result (client/invoke! client @@ -150,7 +150,7 @@ :value [0 [[:r 1 nil]]]})] (is (spy/called-once? scalar/start-2pc)) (is (spy/called-once? scalar/join-2pc)) - (is (spy/called-once? scalar/try-reconnection-for-2pc!)) + (is (spy/called-once? scalar/try-reconnection!)) (is (= 2 @rollback-count)) (is (= :fail (:type result))))))) @@ -162,7 +162,7 @@ rollback-count (atom 0)] (with-redefs [scalar/start-2pc (spy/stub mock-2pc-throws-unknown) scalar/join-2pc (spy/stub mock-2pc) - scalar/try-reconnection-for-2pc! (spy/spy)] + scalar/try-reconnection! (spy/spy)] (let [client (client/open! (elle-wr/->WriteReadClient (atom false)) nil nil) result (client/invoke! client @@ -175,7 +175,7 @@ :value [0 [[:r 1 nil] [:w 1 1]]]})] (is (spy/called-once? scalar/start-2pc)) (is (spy/called-once? scalar/join-2pc)) - (is (spy/not-called? scalar/try-reconnection-for-2pc!)) + (is (spy/not-called? scalar/try-reconnection!)) (is (= 2 @get-count)) (is (= 1 @put-count)) (is (= 2 @prepare-count)) diff --git a/scalardb/test/scalardb/elle_write_read_test.clj b/scalardb/test/scalardb/elle_write_read_test.clj index c3c43cf..a399fab 100644 --- a/scalardb/test/scalardb/elle_write_read_test.clj +++ b/scalardb/test/scalardb/elle_write_read_test.clj @@ -114,7 +114,7 @@ (deftest write-read-client-invoke-crud-exception-test (with-redefs [scalar/start-transaction (spy/stub mock-transaction-throws-exception) - scalar/try-reconnection-for-transaction! (spy/spy)] + scalar/try-reconnection! (spy/spy)] (let [client (client/open! (elle-wr/->WriteReadClient (atom false)) nil nil) result (client/invoke! client @@ -125,14 +125,14 @@ :f :txn :value [0 [[:r 1 nil]]]})] (is (spy/called-once? scalar/start-transaction)) - (is (spy/called-once? scalar/try-reconnection-for-transaction!)) + (is (spy/called-once? scalar/try-reconnection!)) (is (= :fail (:type result)))))) (deftest write-read-client-invoke-unknown-exception-test (binding [get-count (atom 0) put-count (atom 0)] (with-redefs [scalar/start-transaction (spy/stub mock-transaction-throws-unknown) - scalar/try-reconnection-for-transaction! (spy/spy)] + scalar/try-reconnection! (spy/spy)] (let [client (client/open! (elle-wr/->WriteReadClient (atom false)) nil nil) result (client/invoke! client @@ -146,7 +146,7 @@ [[:r 1 nil] [:w 1 1]]]})] (is (spy/called-once? scalar/start-transaction)) - (is (spy/not-called? scalar/try-reconnection-for-transaction!)) + (is (spy/not-called? scalar/try-reconnection!)) (is (= 2 @get-count)) (is (= 1 @put-count)) (is (= :info (:type result))) diff --git a/scalardb/test/scalardb/transfer_2pc_test.clj b/scalardb/test/scalardb/transfer_2pc_test.clj index b2695d2..09a82f6 100644 --- a/scalardb/test/scalardb/transfer_2pc_test.clj +++ b/scalardb/test/scalardb/transfer_2pc_test.clj @@ -117,7 +117,7 @@ scalar/prepare-2pc-service! (spy/spy) scalar/prepare-transaction-service! (spy/spy) scalar/start-transaction (spy/stub mock-transaction)] - (let [client (client/open! (transfer-2pc/->TransferClient (atom false) 5 100) + (let [client (client/open! (transfer-2pc/->TransferClient (atom false) 5 100 1) nil nil)] (client/setup! client nil) (is (true? @(:initialized? client))) @@ -142,13 +142,13 @@ commit-count (atom 0)] (with-redefs [scalar/start-2pc (spy/stub mock-2pc) scalar/join-2pc (spy/stub mock-2pc)] - (let [client (client/open! (transfer-2pc/->TransferClient (atom false) 5 100) + (let [client (client/open! (transfer-2pc/->TransferClient (atom false) 5 100 1) nil nil) result (client/invoke! client nil {:type :invoke :f :transfer - :value {:from 0 :to 1 :amount 10}})] + :value [{:from 0 :to 1 :amount 10}]})] (is (spy/called-once? scalar/start-2pc)) (is (spy/called-once? scalar/join-2pc)) (is (= 2 @get-count)) @@ -163,16 +163,16 @@ (binding [rollback-count (atom 0)] (with-redefs [scalar/start-2pc (spy/stub mock-2pc-throws-exception) scalar/join-2pc (spy/stub mock-2pc) - scalar/try-reconnection-for-2pc! (spy/spy)] - (let [client (client/open! (transfer-2pc/->TransferClient (atom false) 5 100) + scalar/try-reconnection! (spy/spy)] + (let [client (client/open! (transfer-2pc/->TransferClient (atom false) 5 100 1) nil nil) result (client/invoke! client - nil + {:failures (atom 0)} (#'transfer/transfer {:client client} nil))] (is (spy/called-once? scalar/start-2pc)) (is (spy/called-once? scalar/join-2pc)) - (is (spy/called-once? scalar/try-reconnection-for-2pc!)) + (is (spy/called-once? scalar/try-reconnection!)) (is (= 2 @rollback-count)) (is (= :fail (:type result))))))) @@ -184,31 +184,31 @@ rollback-count (atom 0)] (with-redefs [scalar/start-2pc (spy/stub mock-2pc-throws-unknown) scalar/join-2pc (spy/stub mock-2pc) - scalar/try-reconnection-for-2pc! (spy/spy)] - (let [client (client/open! (transfer-2pc/->TransferClient (atom false) 5 100) + scalar/try-reconnection! (spy/spy)] + (let [client (client/open! (transfer-2pc/->TransferClient (atom false) 5 100 1) nil nil) result (client/invoke! client - {:unknown-tx (atom #{})} + {:unknown-tx (atom #{}) + :failures (atom 0)} (#'transfer/transfer {:client client} nil))] (is (spy/called-once? scalar/start-2pc)) (is (spy/called-once? scalar/join-2pc)) - (is (spy/not-called? scalar/try-reconnection-for-2pc!)) + (is (spy/called-once? scalar/try-reconnection!)) (is (= 2 @get-count)) (is (= 2 @put-count)) (is (= 2 @prepare-count)) (is (= 2 @validate-count)) (is (= 0 @rollback-count)) - (is (= :info (:type result))) - (is (= "unknown-state-tx" (get-in result - [:error :unknown-tx-status]))))))) + (is (= :fail (:type result))) + (is (= [:unknown-tx-status] (get-in result [:error :results]))))))) (deftest transfer-client-get-all-test (binding [test-records (atom {0 1000 1 100 2 10 3 1 4 0})] (with-redefs [scalar/check-transaction-connection! (spy/spy) scalar/check-storage-connection! (spy/spy) scalar/start-transaction (spy/stub mock-transaction)] - (let [client (client/open! (transfer-2pc/->TransferClient (atom false) 5 100) + (let [client (client/open! (transfer-2pc/->TransferClient (atom false) 5 100 1) nil nil) result (client/invoke! client {:db mock-db :storage (ref mock-storage)} @@ -227,7 +227,7 @@ scalar/prepare-transaction-service! (spy/spy) scalar/prepare-storage-service! (spy/spy) scalar/start-transaction (spy/stub mock-transaction-throws-exception)] - (let [client (client/open! (transfer-2pc/->TransferClient (atom false) 5 100) + (let [client (client/open! (transfer-2pc/->TransferClient (atom false) 5 100 1) nil nil)] (is (thrown? clojure.lang.ExceptionInfo (client/invoke! client {:db mock-db @@ -240,7 +240,7 @@ (deftest transfer-client-check-tx-test (with-redefs [scalar/check-transaction-states (spy/stub 1)] - (let [client (client/open! (transfer-2pc/->TransferClient (atom false) 5 100) + (let [client (client/open! (transfer-2pc/->TransferClient (atom false) 5 100 1) nil nil) result (client/invoke! client {:unknown-tx (atom #{"tx1"})} (#'transfer/check-tx {:client client} @@ -251,7 +251,7 @@ (deftest transfer-client-check-tx-fail-test (with-redefs [scalar/check-transaction-states (spy/stub nil)] - (let [client (client/open! (transfer-2pc/->TransferClient (atom false) 5 100) + (let [client (client/open! (transfer-2pc/->TransferClient (atom false) 5 100 1) nil nil) result (client/invoke! client {:unknown-tx (atom #{"tx1"})} (#'transfer/check-tx {:client client} @@ -260,20 +260,20 @@ (is (= :fail (:type result)))))) (def correct-history - [{:type :ok :f :transfer} - {:type :ok :f :transfer} - {:type :fail :f :transfer :error {:unknown-tx-status "unknown-state-tx"}} - {:type :ok :f :transfer} - {:type :ok :f :transfer} - {:type :ok :f :transfer} - {:type :ok :f :transfer} + [{:type :ok :f :transfer :value {:results [:commit]}} + {:type :ok :f :transfer :value {:results [:commit]}} + {:type :fail :f :transfer :error {:results [:unknown-tx-status]}} + {:type :ok :f :transfer :value {:results [:commit]}} + {:type :ok :f :transfer :value {:results [:commit]}} + {:type :ok :f :transfer :value {:results [:commit]}} + {:type :ok :f :transfer :value {:results [:commit]}} {:type :ok :f :get-all :value {:balance [10120 10140 9980 9760 10000 10500 9820 8700 10620 10360] :version [2 3 2 3 1 2 2 4 2 3]}} {:type :ok :f :check-tx :value 1}]) (deftest consistency-checker-test - (let [client (client/open! (transfer-2pc/->TransferClient (atom false) 10 10000) + (let [client (client/open! (transfer-2pc/->TransferClient (atom false) 10 10000 1) nil nil) checker (#'transfer/consistency-checker) result (checker/check checker {:client client} correct-history nil)] @@ -284,20 +284,20 @@ (is (nil? (:bad-version result))))) (def bad-history - [{:type :ok :f :transfer} - {:type :ok :f :transfer} - {:type :fail :f :transfer :error {:unknown-tx-status "unknown-state-tx"}} - {:type :ok :f :transfer} - {:type :ok :f :transfer} - {:type :ok :f :transfer} - {:type :ok :f :transfer} + [{:type :ok :f :transfer :value {:results [:commit]}} + {:type :ok :f :transfer :value {:results [:commit]}} + {:type :fail :f :transfer :error {:results [:unknown-tx-status]}} + {:type :ok :f :transfer :value {:results [:commit]}} + {:type :ok :f :transfer :value {:results [:commit]}} + {:type :ok :f :transfer :value {:results [:commit]}} + {:type :ok :f :transfer :value {:results [:commit]}} {:type :ok :f :get-all :value {:balance [10120 10140 9980 9760 10001 10500 9820 8700 10620 10360] :version [2 3 2 3 1 2 2 4 2 3]}} {:type :fail :f :check-tx}]) (deftest consistency-checker-fail-test - (let [client (client/open! (transfer-2pc/->TransferClient (atom false) 10 10000) + (let [client (client/open! (transfer-2pc/->TransferClient (atom false) 10 10000 1) nil nil) checker (#'transfer/consistency-checker) result (checker/check checker {:client client} bad-history nil)] diff --git a/scalardb/test/scalardb/transfer_append_2pc_test.clj b/scalardb/test/scalardb/transfer_append_2pc_test.clj index 2824920..b4db93f 100644 --- a/scalardb/test/scalardb/transfer_append_2pc_test.clj +++ b/scalardb/test/scalardb/transfer_append_2pc_test.clj @@ -4,6 +4,7 @@ [jepsen.checker :as checker] [scalardb.core :as scalar] [scalardb.core-test :refer [mock-db]] + [scalardb.transfer :as t] [scalardb.transfer-append :as transfer] [scalardb.transfer-append-2pc :as t-append-2pc] [spy.core :as spy]) @@ -120,7 +121,7 @@ scalar/prepare-2pc-service! (spy/spy) scalar/prepare-transaction-service! (spy/spy) scalar/start-transaction (spy/stub mock-transaction)] - (let [client (client/open! (t-append-2pc/->TransferClient (atom false) 5 100) + (let [client (client/open! (t-append-2pc/->TransferClient (atom false) 5 100 1) nil nil)] (client/setup! client nil) (is (true? @(:initialized? client))) @@ -151,13 +152,13 @@ commit-count (atom 0)] (with-redefs [scalar/start-2pc (spy/stub mock-2pc) scalar/join-2pc (spy/stub mock-2pc)] - (let [client (client/open! (t-append-2pc/->TransferClient (atom false) 2 100) + (let [client (client/open! (t-append-2pc/->TransferClient (atom false) 2 100 1) nil nil) result (client/invoke! client nil {:type :invoke :f :transfer - :value {:from 0 :to 1 :amount 10}})] + :value [{:from 0 :to 1 :amount 10}]})] (is (spy/called-once? scalar/start-2pc)) (is (spy/called-once? scalar/join-2pc)) (is (= 2 @scan-count)) @@ -174,16 +175,15 @@ (binding [rollback-count (atom 0)] (with-redefs [scalar/start-2pc (spy/stub mock-2pc-throws-exception) scalar/join-2pc (spy/stub mock-2pc) - scalar/try-reconnection-for-2pc! (spy/spy)] - (let [client (client/open! (t-append-2pc/->TransferClient (atom false) 5 100) + scalar/try-reconnection! (spy/spy)] + (let [client (client/open! (t-append-2pc/->TransferClient (atom false) 5 100 1) nil nil) result (client/invoke! client nil - (#'transfer/transfer {:client client} - nil))] + (t/transfer {:client client} nil))] (is (spy/called-once? scalar/start-2pc)) (is (spy/called-once? scalar/join-2pc)) - (is (spy/called-once? scalar/try-reconnection-for-2pc!)) + (is (spy/called-once? scalar/try-reconnection!)) (is (= 2 @rollback-count)) (is (= :fail (:type result))))))) @@ -195,24 +195,22 @@ rollback-count (atom 0)] (with-redefs [scalar/start-2pc (spy/stub mock-2pc-throws-unknown) scalar/join-2pc (spy/stub mock-2pc) - scalar/try-reconnection-for-2pc! (spy/spy)] - (let [client (client/open! (t-append-2pc/->TransferClient (atom false) 5 100) + scalar/try-reconnection! (spy/spy)] + (let [client (client/open! (t-append-2pc/->TransferClient (atom false) 5 100 1) nil nil) result (client/invoke! client {:unknown-tx (atom #{})} - (#'transfer/transfer {:client client} - nil))] + (t/transfer {:client client} nil))] (is (spy/called-once? scalar/start-2pc)) (is (spy/called-once? scalar/join-2pc)) - (is (spy/not-called? scalar/try-reconnection-for-2pc!)) + (is (spy/called-once? scalar/try-reconnection!)) (is (= 2 @scan-count)) (is (= 2 @put-count)) (is (= 2 @prepare-count)) (is (= 2 @validate-count)) (is (= 0 @rollback-count)) - (is (= :info (:type result))) - (is (= "unknown-state-tx" (get-in result - [:error :unknown-tx-status]))))))) + (is (= :fail (:type result))) + (is (= '(:unknown-tx-status) (get-in result [:error :results]))))))) (deftest transfer-client-get-all-test (binding [test-records (atom {0 [{:age 1 :balance 0} @@ -223,11 +221,10 @@ 2 [{:age 1 :balance 1}]})] (with-redefs [scalar/check-transaction-connection! (spy/spy) scalar/start-transaction (spy/stub mock-transaction)] - (let [client (client/open! (t-append-2pc/->TransferClient (atom false) 3 100) + (let [client (client/open! (t-append-2pc/->TransferClient (atom false) 3 100 1) nil nil) result (client/invoke! client {:db mock-db} - (#'transfer/get-all {:client client} - nil))] + (transfer/get-all {:client client} nil))] (is (spy/called-once? scalar/check-transaction-connection!)) (is (= :ok (:type result))) (is (= [1000 10 1] (get-in result [:value :balance]))) @@ -239,44 +236,41 @@ scalar/check-transaction-connection! (spy/spy) scalar/prepare-transaction-service! (spy/spy) scalar/start-transaction (spy/stub mock-transaction-throws-exception)] - (let [client (client/open! (t-append-2pc/->TransferClient (atom false) 5 100) + (let [client (client/open! (t-append-2pc/->TransferClient (atom false) 5 100 1) nil nil)] (is (thrown? clojure.lang.ExceptionInfo (client/invoke! client {:db mock-db} - (#'transfer/get-all {:client client} - nil)))) + (transfer/get-all {:client client} nil)))) (is (spy/called-n-times? scalar/exponential-backoff scalar/RETRIES)) (is (spy/called-n-times? scalar/prepare-transaction-service! scalar/RETRIES_FOR_RECONNECTION))))) (deftest transfer-client-check-tx-test (with-redefs [scalar/check-transaction-states (spy/stub 1)] - (let [client (client/open! (t-append-2pc/->TransferClient (atom false) 5 100) + (let [client (client/open! (t-append-2pc/->TransferClient (atom false) 5 100 1) nil nil) result (client/invoke! client {:unknown-tx (atom #{"tx1"})} - (#'transfer/check-tx {:client client} - nil))] + (transfer/check-tx {:client client} nil))] (is (spy/called-once? scalar/check-transaction-states)) (is (= :ok (:type result))) (is (= 1 (:value result)))))) (deftest transfer-client-check-tx-fail-test (with-redefs [scalar/check-transaction-states (spy/stub nil)] - (let [client (client/open! (t-append-2pc/->TransferClient (atom false) 5 100) + (let [client (client/open! (t-append-2pc/->TransferClient (atom false) 5 100 1) nil nil) result (client/invoke! client {:unknown-tx (atom #{"tx1"})} - (#'transfer/check-tx {:client client} - nil))] + (transfer/check-tx {:client client} nil))] (is (spy/called-once? scalar/check-transaction-states)) (is (= :fail (:type result)))))) (def correct-history - [{:type :ok :f :transfer} - {:type :ok :f :transfer} - {:type :fail :f :transfer :error {:unknown-tx-status "unknown-state-tx"}} - {:type :ok :f :transfer} - {:type :ok :f :transfer} - {:type :ok :f :transfer} - {:type :ok :f :transfer} + [{:type :ok :f :transfer :value {:results [:commit]}} + {:type :ok :f :transfer :value {:results [:commit]}} + {:type :fail :f :transfer :error {:results [:unknown-tx-status]}} + {:type :ok :f :transfer :value {:results [:commit]}} + {:type :ok :f :transfer :value {:results [:commit]}} + {:type :ok :f :transfer :value {:results [:commit]}} + {:type :ok :f :transfer :value {:results [:commit]}} {:type :ok :f :get-all :value {:balance [10120 10140 9980 9760 10000 10500 9820 8700 10620 10360] :age [2 3 2 3 1 2 2 4 2 3] @@ -284,9 +278,9 @@ {:type :ok :f :check-tx :value 1}]) (deftest consistency-checker-test - (let [client (client/open! (t-append-2pc/->TransferClient (atom false) 10 10000) + (let [client (client/open! (t-append-2pc/->TransferClient (atom false) 10 10000 1) nil nil) - checker (#'transfer/consistency-checker) + checker (transfer/consistency-checker) result (checker/check checker {:client client} correct-history nil)] (is (true? (:valid? result))) (is (= 24 (:total-age result))) @@ -295,13 +289,14 @@ (is (nil? (:bad-version result))))) (def bad-history - [{:type :ok :f :transfer} - {:type :ok :f :transfer} - {:type :fail :f :transfer :error {:unknown-tx-status "unknown-state-tx"}} - {:type :ok :f :transfer} - {:type :ok :f :transfer} - {:type :ok :f :transfer} - {:type :ok :f :transfer} + [{:type :ok :f :transfer :value {:results [:commit]}} + {:type :ok :f :transfer :value {:results [:commit]}} + {:type :fail :f :transfer :error {:results [:unknown-tx-status]}} + {:type :ok :f :transfer :value {:results [:commit]}} + {:type :ok :f :transfer :value {:results [:commit]}} + {:type :ok :f :transfer :value {:results [:commit]}} + {:type :ok :f :transfer :value {:results [:commit]}} + {:type :ok :f :get-all :value {:balance [10120 10140 9980 9760 10001 10500 9820 8700 10620 10360] :age [2 3 2 3 1 2 2 4 2 3] @@ -309,9 +304,9 @@ {:type :fail :f :check-tx}]) (deftest consistency-checker-fail-test - (let [client (client/open! (t-append-2pc/->TransferClient (atom false) 10 10000) + (let [client (client/open! (t-append-2pc/->TransferClient (atom false) 10 10000 1) nil nil) - checker (#'transfer/consistency-checker) + checker (transfer/consistency-checker) result (checker/check checker {:client client} bad-history nil)] (is (false? (:valid? result))) (is (= 24 (:total-age result))) diff --git a/scalardb/test/scalardb/transfer_append_test.clj b/scalardb/test/scalardb/transfer_append_test.clj index c2c6586..8ef72d0 100644 --- a/scalardb/test/scalardb/transfer_append_test.clj +++ b/scalardb/test/scalardb/transfer_append_test.clj @@ -4,6 +4,7 @@ [jepsen.checker :as checker] [scalardb.core :as scalar] [scalardb.core-test :refer [mock-db]] + [scalardb.transfer :as t] [scalardb.transfer-append :as transfer] [spy.core :as spy]) (:import (com.scalar.db.api DistributedTransaction @@ -87,7 +88,7 @@ (with-redefs [scalar/setup-transaction-tables (spy/spy) scalar/prepare-transaction-service! (spy/spy) scalar/start-transaction (spy/stub mock-transaction)] - (let [client (client/open! (transfer/->TransferClient (atom false) 5 100) + (let [client (client/open! (transfer/->TransferClient (atom false) 5 100 1) nil nil)] (client/setup! client nil) (is (true? @(:initialized? client))) @@ -113,7 +114,7 @@ (with-redefs [scalar/setup-transaction-tables (spy/spy) scalar/prepare-transaction-service! (spy/spy) scalar/start-transaction (spy/stub mock-transaction-throws-exception)] - (let [client (client/open! (transfer/->TransferClient (atom false) 5 100) + (let [client (client/open! (transfer/->TransferClient (atom false) 5 100 1) nil nil)] (is (thrown? CrudException (client/setup! client nil))))))) @@ -124,13 +125,13 @@ put-count (atom 0) commit-count (atom 0)] (with-redefs [scalar/start-transaction (spy/stub mock-transaction)] - (let [client (client/open! (transfer/->TransferClient (atom false) 2 100) + (let [client (client/open! (transfer/->TransferClient (atom false) 2 100 1) nil nil) result (client/invoke! client nil {:type :invoke :f :transfer - :value {:from 0 :to 1 :amount 10}})] + :value [{:from 0 :to 1 :amount 10}]})] (is (spy/called-once? scalar/start-transaction)) (is (= 2 @scan-count)) (is (= 2 @put-count)) @@ -142,48 +143,44 @@ (deftest transfer-client-transfer-no-tx-test (with-redefs [scalar/start-transaction (spy/stub nil) - scalar/try-reconnection-for-transaction! (spy/spy)] - (let [client (client/open! (transfer/->TransferClient (atom false) 5 100) + scalar/try-reconnection! (spy/spy)] + (let [client (client/open! (transfer/->TransferClient (atom false) 5 100 1) nil nil) result (client/invoke! client nil - (#'transfer/transfer {:client client} - nil))] + (t/transfer {:client client} nil))] (is (spy/called-once? scalar/start-transaction)) - (is (spy/called-once? scalar/try-reconnection-for-transaction!)) + (is (spy/called-once? scalar/try-reconnection!)) (is (= :fail (:type result)))))) (deftest transfer-client-transfer-crud-exception-test (with-redefs [scalar/start-transaction (spy/stub mock-transaction-throws-exception) - scalar/try-reconnection-for-transaction! (spy/spy)] - (let [client (client/open! (transfer/->TransferClient (atom false) 5 100) + scalar/try-reconnection! (spy/spy)] + (let [client (client/open! (transfer/->TransferClient (atom false) 5 100 1) nil nil) result (client/invoke! client nil - (#'transfer/transfer {:client client} - nil))] + (t/transfer {:client client} nil))] (is (spy/called-once? scalar/start-transaction)) - (is (spy/called-once? scalar/try-reconnection-for-transaction!)) + (is (spy/called-once? scalar/try-reconnection!)) (is (= :fail (:type result)))))) (deftest transfer-client-transfer-unknown-exception-test (binding [scan-count (atom 0) put-count (atom 0)] (with-redefs [scalar/start-transaction (spy/stub mock-transaction-throws-unknown) - scalar/try-reconnection-for-transaction! (spy/spy)] - (let [client (client/open! (transfer/->TransferClient (atom false) 5 100) + scalar/try-reconnection! (spy/spy)] + (let [client (client/open! (transfer/->TransferClient (atom false) 5 100 1) nil nil) result (client/invoke! client {:unknown-tx (atom #{})} - (#'transfer/transfer {:client client} - nil))] + (t/transfer {:client client} nil))] (is (spy/called-once? scalar/start-transaction)) - (is (spy/not-called? scalar/try-reconnection-for-transaction!)) + (is (spy/called-once? scalar/try-reconnection!)) (is (= 2 @scan-count)) (is (= 2 @put-count)) - (is (= :info (:type result))) - (is (= "unknown-state-tx" (get-in result - [:error :unknown-tx-status]))))))) + (is (= :fail (:type result))) + (is (= '(:unknown-tx-status) (get-in result [:error :results]))))))) (deftest transfer-client-get-all-test (binding [test-records (atom {0 [{:age 1 :balance 0} @@ -194,11 +191,10 @@ 2 [{:age 1 :balance 1}]})] (with-redefs [scalar/check-transaction-connection! (spy/spy) scalar/start-transaction (spy/stub mock-transaction)] - (let [client (client/open! (transfer/->TransferClient (atom false) 3 100) + (let [client (client/open! (transfer/->TransferClient (atom false) 3 100 1) nil nil) result (client/invoke! client {:db mock-db} - (#'transfer/get-all {:client client} - nil))] + (transfer/get-all {:client client} nil))] (is (spy/called-once? scalar/check-transaction-connection!)) (is (= :ok (:type result))) (is (= [1000 10 1] (get-in result [:value :balance]))) @@ -210,44 +206,41 @@ scalar/check-transaction-connection! (spy/spy) scalar/prepare-transaction-service! (spy/spy) scalar/start-transaction (spy/stub mock-transaction-throws-exception)] - (let [client (client/open! (transfer/->TransferClient (atom false) 5 100) + (let [client (client/open! (transfer/->TransferClient (atom false) 5 100 1) nil nil)] (is (thrown? clojure.lang.ExceptionInfo (client/invoke! client {:db mock-db} - (#'transfer/get-all {:client client} - nil)))) + (transfer/get-all {:client client} nil)))) (is (spy/called-n-times? scalar/exponential-backoff scalar/RETRIES)) (is (spy/called-n-times? scalar/prepare-transaction-service! scalar/RETRIES_FOR_RECONNECTION))))) (deftest transfer-client-check-tx-test (with-redefs [scalar/check-transaction-states (spy/stub 1)] - (let [client (client/open! (transfer/->TransferClient (atom false) 5 100) + (let [client (client/open! (transfer/->TransferClient (atom false) 5 100 1) nil nil) result (client/invoke! client {:unknown-tx (atom #{"tx1"})} - (#'transfer/check-tx {:client client} - nil))] + (transfer/check-tx {:client client} nil))] (is (spy/called-once? scalar/check-transaction-states)) (is (= :ok (:type result))) (is (= 1 (:value result)))))) (deftest transfer-client-check-tx-fail-test (with-redefs [scalar/check-transaction-states (spy/stub nil)] - (let [client (client/open! (transfer/->TransferClient (atom false) 5 100) + (let [client (client/open! (transfer/->TransferClient (atom false) 5 100 1) nil nil) result (client/invoke! client {:unknown-tx (atom #{"tx1"})} - (#'transfer/check-tx {:client client} - nil))] + (transfer/check-tx {:client client} nil))] (is (spy/called-once? scalar/check-transaction-states)) (is (= :fail (:type result)))))) (def correct-history - [{:type :ok :f :transfer} - {:type :ok :f :transfer} - {:type :fail :f :transfer :error {:unknown-tx-status "unknown-state-tx"}} - {:type :ok :f :transfer} - {:type :ok :f :transfer} - {:type :ok :f :transfer} - {:type :ok :f :transfer} + [{:type :ok :f :transfer :value {:results [:commit]}} + {:type :ok :f :transfer :value {:results [:commit]}} + {:type :fail :f :transfer :error {:results [:unknown-tx-status]}} + {:type :ok :f :transfer :value {:results [:commit]}} + {:type :ok :f :transfer :value {:results [:commit]}} + {:type :ok :f :transfer :value {:results [:commit]}} + {:type :ok :f :transfer :value {:results [:commit]}} {:type :ok :f :get-all :value {:balance [10120 10140 9980 9760 10000 10500 9820 8700 10620 10360] :age [2 3 2 3 1 2 2 4 2 3] @@ -255,9 +248,9 @@ {:type :ok :f :check-tx :value 1}]) (deftest consistency-checker-test - (let [client (client/open! (transfer/->TransferClient (atom false) 10 10000) + (let [client (client/open! (transfer/->TransferClient (atom false) 10 10000 1) nil nil) - checker (#'transfer/consistency-checker) + checker (transfer/consistency-checker) result (checker/check checker {:client client} correct-history nil)] (is (true? (:valid? result))) (is (= 24 (:total-age result))) @@ -266,13 +259,13 @@ (is (nil? (:bad-version result))))) (def bad-history - [{:type :ok :f :transfer} - {:type :ok :f :transfer} - {:type :fail :f :transfer :error {:unknown-tx-status "unknown-state-tx"}} - {:type :ok :f :transfer} - {:type :ok :f :transfer} - {:type :ok :f :transfer} - {:type :ok :f :transfer} + [{:type :ok :f :transfer :value {:results [:commit]}} + {:type :ok :f :transfer :value {:results [:commit]}} + {:type :fail :f :transfer :error {:results [:unknown-tx-status]}} + {:type :ok :f :transfer :value {:results [:commit]}} + {:type :ok :f :transfer :value {:results [:commit]}} + {:type :ok :f :transfer :value {:results [:commit]}} + {:type :ok :f :transfer :value {:results [:commit]}} {:type :ok :f :get-all :value {:balance [10120 10140 9980 9760 10001 10500 9820 8700 10620 10360] :age [2 3 2 3 1 2 2 4 2 3] @@ -280,9 +273,9 @@ {:type :fail :f :check-tx}]) (deftest consistency-checker-fail-test - (let [client (client/open! (transfer/->TransferClient (atom false) 10 10000) + (let [client (client/open! (transfer/->TransferClient (atom false) 10 10000 1) nil nil) - checker (#'transfer/consistency-checker) + checker (transfer/consistency-checker) result (checker/check checker {:client client} bad-history nil)] (is (false? (:valid? result))) (is (= 24 (:total-age result))) diff --git a/scalardb/test/scalardb/transfer_test.clj b/scalardb/test/scalardb/transfer_test.clj index 9f9f268..f4a9cb2 100644 --- a/scalardb/test/scalardb/transfer_test.clj +++ b/scalardb/test/scalardb/transfer_test.clj @@ -84,7 +84,7 @@ (with-redefs [scalar/setup-transaction-tables (spy/spy) scalar/prepare-transaction-service! (spy/spy) scalar/start-transaction (spy/stub mock-transaction)] - (let [client (client/open! (transfer/->TransferClient (atom false) 5 100) + (let [client (client/open! (transfer/->TransferClient (atom false) 5 100 1) nil nil)] (client/setup! client nil) (is (true? @(:initialized? client))) @@ -105,7 +105,7 @@ (with-redefs [scalar/setup-transaction-tables (spy/spy) scalar/prepare-transaction-service! (spy/spy) scalar/start-transaction (spy/stub mock-transaction-throws-exception)] - (let [client (client/open! (transfer/->TransferClient (atom false) 5 100) + (let [client (client/open! (transfer/->TransferClient (atom false) 5 100 1) nil nil)] (is (thrown? CrudException (client/setup! client nil))))))) @@ -115,13 +115,13 @@ put-count (atom 0) commit-count (atom 0)] (with-redefs [scalar/start-transaction (spy/stub mock-transaction)] - (let [client (client/open! (transfer/->TransferClient (atom false) 5 100) + (let [client (client/open! (transfer/->TransferClient (atom false) 5 100 1) nil nil) result (client/invoke! client nil {:type :invoke :f :transfer - :value {:from 0 :to 1 :amount 10}})] + :value [{:from 0 :to 1 :amount 10}]})] (is (spy/called-once? scalar/start-transaction)) (is (= 2 @get-count)) (is (= 2 @put-count)) @@ -131,55 +131,54 @@ (deftest transfer-client-transfer-no-tx-test (with-redefs [scalar/start-transaction (spy/stub nil) - scalar/try-reconnection-for-transaction! (spy/spy)] - (let [client (client/open! (transfer/->TransferClient (atom false) 5 100) + scalar/try-reconnection! (spy/spy)] + (let [client (client/open! (transfer/->TransferClient (atom false) 5 100 1) nil nil) result (client/invoke! client nil (#'transfer/transfer {:client client} nil))] (is (spy/called-once? scalar/start-transaction)) - (is (spy/called-once? scalar/try-reconnection-for-transaction!)) + (is (spy/called-once? scalar/try-reconnection!)) (is (= :fail (:type result)))))) (deftest transfer-client-transfer-crud-exception-test (with-redefs [scalar/start-transaction (spy/stub mock-transaction-throws-exception) - scalar/try-reconnection-for-transaction! (spy/spy)] - (let [client (client/open! (transfer/->TransferClient (atom false) 5 100) + scalar/try-reconnection! (spy/spy)] + (let [client (client/open! (transfer/->TransferClient (atom false) 5 100 1) nil nil) result (client/invoke! client nil (#'transfer/transfer {:client client} nil))] (is (spy/called-once? scalar/start-transaction)) - (is (spy/called-once? scalar/try-reconnection-for-transaction!)) + (is (spy/called-once? scalar/try-reconnection!)) (is (= :fail (:type result)))))) (deftest transfer-client-transfer-unknown-exception-test (binding [get-count (atom 0) put-count (atom 0)] (with-redefs [scalar/start-transaction (spy/stub mock-transaction-throws-unknown) - scalar/try-reconnection-for-transaction! (spy/spy)] - (let [client (client/open! (transfer/->TransferClient (atom false) 5 100) + scalar/try-reconnection! (spy/spy)] + (let [client (client/open! (transfer/->TransferClient (atom false) 5 100 1) nil nil) result (client/invoke! client {:unknown-tx (atom #{})} (#'transfer/transfer {:client client} nil))] (is (spy/called-once? scalar/start-transaction)) - (is (spy/not-called? scalar/try-reconnection-for-transaction!)) + (is (spy/called-once? scalar/try-reconnection!)) (is (= 2 @get-count)) (is (= 2 @put-count)) - (is (= :info (:type result))) - (is (= "unknown-state-tx" (get-in result - [:error :unknown-tx-status]))))))) + (is (= :fail (:type result))) + (is (= [:unknown-tx-status] (get-in result [:error :results]))))))) (deftest transfer-client-get-all-test (binding [test-records (atom {0 1000 1 100 2 10 3 1 4 0})] (with-redefs [scalar/check-transaction-connection! (spy/spy) scalar/check-storage-connection! (spy/spy) scalar/start-transaction (spy/stub mock-transaction)] - (let [client (client/open! (transfer/->TransferClient (atom false) 5 100) + (let [client (client/open! (transfer/->TransferClient (atom false) 5 100 1) nil nil) result (client/invoke! client {:db mock-db :storage (ref mock-storage)} @@ -198,7 +197,7 @@ scalar/prepare-transaction-service! (spy/spy) scalar/prepare-storage-service! (spy/spy) scalar/start-transaction (spy/stub mock-transaction-throws-exception)] - (let [client (client/open! (transfer/->TransferClient (atom false) 5 100) + (let [client (client/open! (transfer/->TransferClient (atom false) 5 100 1) nil nil)] (is (thrown? clojure.lang.ExceptionInfo (client/invoke! client {:db mock-db @@ -211,7 +210,7 @@ (deftest transfer-client-check-tx-test (with-redefs [scalar/check-transaction-states (spy/stub 1)] - (let [client (client/open! (transfer/->TransferClient (atom false) 5 100) + (let [client (client/open! (transfer/->TransferClient (atom false) 5 100 1) nil nil) result (client/invoke! client {:unknown-tx (atom #{"tx1"})} (#'transfer/check-tx {:client client} @@ -222,7 +221,7 @@ (deftest transfer-client-check-tx-fail-test (with-redefs [scalar/check-transaction-states (spy/stub nil)] - (let [client (client/open! (transfer/->TransferClient (atom false) 5 100) + (let [client (client/open! (transfer/->TransferClient (atom false) 5 100 1) nil nil) result (client/invoke! client {:unknown-tx (atom #{"tx1"})} (#'transfer/check-tx {:client client} @@ -231,20 +230,20 @@ (is (= :fail (:type result)))))) (def correct-history - [{:type :ok :f :transfer} - {:type :ok :f :transfer} - {:type :fail :f :transfer :error {:unknown-tx-status "unknown-state-tx"}} - {:type :ok :f :transfer} - {:type :ok :f :transfer} - {:type :ok :f :transfer} - {:type :ok :f :transfer} + [{:type :ok :f :transfer :value {:results [:commit]}} + {:type :ok :f :transfer :value {:results [:commit]}} + {:type :fail :f :transfer :error {:results [:unknown-tx-status]}} + {:type :ok :f :transfer :value {:results [:commit]}} + {:type :ok :f :transfer :value {:results [:commit]}} + {:type :ok :f :transfer :value {:results [:commit]}} + {:type :ok :f :transfer :value {:results [:commit]}} {:type :ok :f :get-all :value {:balance [10120 10140 9980 9760 10000 10500 9820 8700 10620 10360] :version [2 3 2 3 1 2 2 4 2 3]}} {:type :ok :f :check-tx :value 1}]) (deftest consistency-checker-test - (let [client (client/open! (transfer/->TransferClient (atom false) 10 10000) + (let [client (client/open! (transfer/->TransferClient (atom false) 10 10000 1) nil nil) checker (#'transfer/consistency-checker) result (checker/check checker {:client client} correct-history nil)] @@ -255,20 +254,20 @@ (is (nil? (:bad-version result))))) (def bad-history - [{:type :ok :f :transfer} - {:type :ok :f :transfer} - {:type :fail :f :transfer :error {:unknown-tx-status "unknown-state-tx"}} - {:type :ok :f :transfer} - {:type :ok :f :transfer} - {:type :ok :f :transfer} - {:type :ok :f :transfer} + [{:type :ok :f :transfer :value {:results [:commit]}} + {:type :ok :f :transfer :value {:results [:commit]}} + {:type :fail :f :transfer :error {:results [:unknown-tx-status]}} + {:type :ok :f :transfer :value {:results [:commit]}} + {:type :ok :f :transfer :value {:results [:commit]}} + {:type :ok :f :transfer :value {:results [:commit]}} + {:type :ok :f :transfer :value {:results [:commit]}} {:type :ok :f :get-all :value {:balance [10120 10140 9980 9760 10001 10500 9820 8700 10620 10360] :version [2 3 2 3 1 2 2 4 2 3]}} {:type :fail :f :check-tx}]) (deftest consistency-checker-fail-test - (let [client (client/open! (transfer/->TransferClient (atom false) 10 10000) + (let [client (client/open! (transfer/->TransferClient (atom false) 10 10000 1) nil nil) checker (#'transfer/consistency-checker) result (checker/check checker {:client client} bad-history nil)]