Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Request transfer transactions in parallel #125

Merged
merged 5 commits into from
Apr 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 3 additions & 9 deletions scalardb/src/scalardb/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -192,16 +192,10 @@
(when-not @(:transaction test)
(prepare-transaction-service! test)))

(defn try-reconnection-for-transaction!
[test]
(when (= (swap! (:failures test) inc) NUM_FAILURES_FOR_RECONNECTION)
(prepare-transaction-service! test)
(reset! (:failures test) 0)))

(defn try-reconnection-for-2pc!
[test]
(defn try-reconnection!
[test prepare-fn]
(when (= (swap! (:failures test) inc) NUM_FAILURES_FOR_RECONNECTION)
(prepare-2pc-service! test)
(prepare-fn test)
(reset! (:failures test) 0)))

(defn start-transaction
Expand Down
2 changes: 1 addition & 1 deletion scalardb/src/scalardb/elle_append.clj
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@
(swap! (:unknown-tx test) conj (.getId tx))
(assoc op :type :info :error {:unknown-tx-status (.getId tx)}))
(catch Exception e
(scalar/try-reconnection-for-transaction! test)
(scalar/try-reconnection! test scalar/prepare-transaction-service!)
(assoc op :type :fail :error {:crud-error (.getMessage e)})))))

(close! [_ _])
Expand Down
2 changes: 1 addition & 1 deletion scalardb/src/scalardb/elle_append_2pc.clj
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
(assoc op :type :info :error {:unknown-tx-status (.getId tx1)}))
(catch Exception e
(scalar/rollback-txs [tx1 tx2])
(scalar/try-reconnection-for-2pc! test)
(scalar/try-reconnection! test scalar/prepare-2pc-service!)
(assoc op :type :fail :error {:crud-error (.getMessage e)})))))

(close! [_ _])
Expand Down
2 changes: 1 addition & 1 deletion scalardb/src/scalardb/elle_write_read.clj
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@
(swap! (:unknown-tx test) conj (.getId tx))
(assoc op :type :info :error {:unknown-tx-status (.getId tx)}))
(catch Exception e
(scalar/try-reconnection-for-transaction! test)
(scalar/try-reconnection! test scalar/prepare-transaction-service!)
(assoc op :type :fail :error {:crud-error (.getMessage e)})))))

(close! [_ _])
Expand Down
2 changes: 1 addition & 1 deletion scalardb/src/scalardb/elle_write_read_2pc.clj
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
(assoc op :type :info :error {:unknown-tx-status (.getId tx1)}))
(catch Exception e
(scalar/rollback-txs [tx1 tx2])
(scalar/try-reconnection-for-2pc! test)
(scalar/try-reconnection! test scalar/prepare-2pc-service!)
(assoc op :type :fail :error {:crud-error (.getMessage e)})))))

(close! [_ _])
Expand Down
128 changes: 81 additions & 47 deletions scalardb/src/scalardb/transfer.clj
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
(ns scalardb.transfer
(:require [clojure.core.reducers :as r]
[clojure.tools.logging :refer [warn]]
[jepsen
[client :as client]
[checker :as checker]
Expand All @@ -23,7 +24,7 @@

(def ^:const INITIAL_BALANCE 10000)
(def ^:const NUM_ACCOUNTS 10)
(def ^:private ^:const TOTAL_BALANCE (* NUM_ACCOUNTS INITIAL_BALANCE))
(def ^:const MAX_NUM_TXS 8)

(def ^:const SCHEMA {(keyword (str KEYSPACE \. TABLE))
{:transaction true
Expand Down Expand Up @@ -86,7 +87,7 @@
(-> r get-balance (+ amount)))

(defn- tx-transfer
[tx {:keys [from to amount]}]
[tx from to amount]
(let [fromResult (.get tx (prepare-get from))
toResult (.get tx (prepare-get to))]
(->> (calc-new-balance fromResult (- amount))
Expand All @@ -97,6 +98,34 @@
(.put tx))
(.commit tx)))

(defn- try-tx-transfer
[test {:keys [from to amount]}]
(if-let [tx (scalar/start-transaction test)]
(try
(tx-transfer tx from to amount)
:commit
(catch UnknownTransactionStatusException _
(swap! (:unknown-tx test) conj (.getId tx))
(warn "Unknown transaction: " (.getId tx))
:unknown-tx-status)
(catch Exception e
(warn (.getMessage e))
:fail))
:start-fail))

(defn exec-transfers
"Execute transfers in parallel. Give the transfer function."
[test op transfer-fn]
(let [results (pmap #(transfer-fn test %) (:value op))]
(if (some #{:commit} results)
;; return :ok when at least 1 transaction is committed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:ok needs to be returned for this filtering even if only one transaction is committed?

Copy link
Member Author

@yito88 yito88 Apr 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, it's just for the checker, Jepsen requires the type though

(assoc op :type :ok :value {:results results})
;; :info type could be better in some cases
;; However, our checker doesn't care about the type for now
(do
(scalar/try-reconnection! test scalar/prepare-transaction-service!)
(assoc op :type :fail :error {:results results})))))

(defn- read-record
"Read a record with a transaction. If read fails, this function returns nil."
[tx storage i]
Expand All @@ -110,15 +139,19 @@
[test n]
(scalar/check-transaction-connection! test)
(scalar/check-storage-connection! test)
(scalar/with-retry (fn [test] (scalar/prepare-transaction-service! test) (scalar/prepare-storage-service! test)) test
(scalar/with-retry
(fn [test]
(scalar/prepare-transaction-service! test)
(scalar/prepare-storage-service! test))
test
(let [tx (scalar/start-transaction test)
results (map #(read-record tx @(:storage test) %) (range n))]
(if (some nil? results) nil results))))

(defrecord TransferClient [initialized? n initial-balance]
(defrecord TransferClient [initialized? n initial-balance max-txs]
client/Client
(open! [_ _ _]
(TransferClient. initialized? n initial-balance))
(TransferClient. initialized? n initial-balance max-txs))

(setup! [_ test]
(locking initialized?
Expand All @@ -129,22 +162,10 @@

(invoke! [_ test op]
(case (:f op)
:transfer (if-let [tx (scalar/start-transaction test)]
(try
(tx-transfer tx (:value op))
(assoc op :type :ok)
(catch UnknownTransactionStatusException _
(swap! (:unknown-tx test) conj (.getId tx))
(assoc op :type :info :error {:unknown-tx-status (.getId tx)}))
(catch Exception e
(scalar/try-reconnection-for-transaction! test)
(assoc op :type :fail :error (.getMessage e))))
(do
(scalar/try-reconnection-for-transaction! test)
(assoc op :type :fail :error "Skipped due to no connection")))
:transfer (exec-transfers test op try-tx-transfer)
:get-all (do
(wait-for-recovery (:db test) test)
(if-let [results (read-all-with-retry test (:num op))]
(if-let [results (read-all-with-retry test n)]
(assoc op :type :ok :value {:balance (get-balances results)
:version (get-versions results)})
(assoc op :type :fail :error "Failed to get balances")))
Expand All @@ -159,25 +180,30 @@
(teardown! [_ test]
(scalar/close-all! test)))

(defn- transfer
(defn- generate-acc-pair
[n]
(loop []
(let [from (rand-int n)
to (rand-int n)]
(if-not (= from to) [from to] (recur)))))

(defn transfer
[test _]
(let [n (-> test :client :n)]
(let [num-accs (-> test :client :n)
num-txs (-> test :client :max-txs rand-int inc)]
{:type :invoke
:f :transfer
:value {:from (rand-int n)
:to (rand-int n)
:amount (+ 1 (rand-int 1000))}}))

(def diff-transfer
(gen/filter (fn [op] (not= (-> op :value :from)
(-> op :value :to)))
transfer))
:value (repeatedly num-txs
(fn []
(let [[from to] (generate-acc-pair num-accs)]
{:from from
:to to
:amount (+ 1 (rand-int 1000))})))}))

(defn get-all
[test _]
[_ _]
{:type :invoke
:f :get-all
:num (-> test :client :n)})
:f :get-all})

(defn check-tx
[_ _]
Expand All @@ -188,17 +214,20 @@
[]
(reify checker/Checker
(check [_ test history _]
(let [read-result (->> history
(let [num-accs (-> test :client :n)
initial-balance (-> test :client :initial-balance)
total-balance (* num-accs initial-balance)
read-result (->> history
(r/filter #(= :get-all (:f %)))
(r/filter identity)
(into [])
last
:value)
actual-balance (->> (:balance read-result)
(reduce +))
bad-balance (when-not (= actual-balance TOTAL_BALANCE)
bad-balance (when-not (= actual-balance total-balance)
{:type :wrong-balance
:expected TOTAL_BALANCE
:expected total-balance
:actual actual-balance})
actual-version (->> (:version read-result)
(reduce +))
Expand All @@ -209,16 +238,18 @@
last
((fn [x]
(if (= (:type x) :ok) (:value x) 0))))
total-ok (->> history
(r/filter op/ok?)
(r/filter #(= :transfer (:f %)))
(r/filter identity)
(into [])
count
(+ checked-committed))
expected-version (-> total-ok
(* 2) ; update 2 records per a transfer
(+ (-> test :client :n))) ; initial insertions
total-commits (->> history
(r/filter op/ok?)
(r/filter #(= :transfer (:f %)))
(r/reduce (fn [cnt op]
(->> op :value :results
(filter #{:commit})
count
(+ cnt)))
checked-committed))
expected-version (-> total-commits
(* 2) ; update 2 records per transfer
(+ num-accs)) ; initial insertions
bad-version (when-not (= actual-version expected-version)
{:type :wrong-version
:expected expected-version
Expand All @@ -231,8 +262,11 @@

(defn workload
[_]
{:client (->TransferClient (atom false) NUM_ACCOUNTS INITIAL_BALANCE)
:generator [diff-transfer]
{:client (->TransferClient (atom false)
NUM_ACCOUNTS
INITIAL_BALANCE
MAX_NUM_TXS)
:generator [transfer]
:final-generator (gen/phases
(gen/once get-all)
(gen/once check-tx))
Expand Down
45 changes: 25 additions & 20 deletions scalardb/src/scalardb/transfer_2pc.clj
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
(ns scalardb.transfer-2pc
(:require [jepsen
(:require [clojure.tools.logging :refer [warn]]
[jepsen
[client :as client]
[generator :as gen]]
[scalardb.core :as scalar]
Expand All @@ -8,7 +9,7 @@
(:import (com.scalar.db.exception.transaction UnknownTransactionStatusException)))

(defn- tx-transfer
[tx1 tx2 {:keys [from to amount]}]
[tx1 tx2 from to amount]
(try
(let [fromResult (.get tx1 (transfer/prepare-get from))
toResult (.get tx2 (transfer/prepare-get to))]
Expand All @@ -25,10 +26,25 @@
(scalar/rollback-txs [tx1 tx2])
(throw e))))

(defrecord TransferClient [initialized? n initial-balance]
(defn- try-tx-transfer
[test {:keys [from to amount]}]
(let [tx1 (scalar/start-2pc test)
tx2 (scalar/join-2pc test (.getId tx1))]
(try
(tx-transfer tx1 tx2 from to amount)
:commit
(catch UnknownTransactionStatusException _
(swap! (:unknown-tx test) conj (.getId tx1))
(warn "Unknown transaction: " (.getId tx1))
:unknown-tx-status)
(catch Exception e
(warn (.getMessage e))
:fail))))

(defrecord TransferClient [initialized? n initial-balance max-txs]
client/Client
(open! [_ _ _]
(TransferClient. initialized? n initial-balance))
(TransferClient. initialized? n initial-balance max-txs))

(setup! [_ test]
(locking initialized?
Expand All @@ -40,22 +56,10 @@

(invoke! [_ test op]
(case (:f op)
:transfer (let [tx1 (scalar/start-2pc test)
tx2 (scalar/join-2pc test (.getId tx1))]
(try
(tx-transfer tx1 tx2 (:value op))
(assoc op :type :ok)
(catch UnknownTransactionStatusException _
(swap! (:unknown-tx test) conj (.getId tx1))
(assoc op
:type :info
:error {:unknown-tx-status (.getId tx1)}))
(catch Exception e
(scalar/try-reconnection-for-2pc! test)
(assoc op :type :fail :error (.getMessage e)))))
:transfer (transfer/exec-transfers test op try-tx-transfer)
:get-all (do
(wait-for-recovery (:db test) test)
(if-let [results (transfer/read-all-with-retry test (:num op))]
(if-let [results (transfer/read-all-with-retry test n)]
(assoc op :type :ok :value {:balance
(transfer/get-balances results)
:version
Expand All @@ -76,8 +80,9 @@
[_]
{:client (->TransferClient (atom false)
transfer/NUM_ACCOUNTS
transfer/INITIAL_BALANCE)
:generator [transfer/diff-transfer]
transfer/INITIAL_BALANCE
transfer/MAX_NUM_TXS)
:generator [transfer/transfer]
:final-generator (gen/phases
(gen/once transfer/get-all)
(gen/once transfer/check-tx))
Expand Down
Loading
Loading