Skip to content

Commit

Permalink
batching in v2
Browse files Browse the repository at this point in the history
  • Loading branch information
InoMurko committed Mar 12, 2021
1 parent 88c8fd0 commit ab4f276
Show file tree
Hide file tree
Showing 22 changed files with 386 additions and 48 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ start-watcher:
rm -f ./_build/${BAREBUILD_ENV}/rel/watcher/var/sys.config || true && \
echo "Init Watcher DBs" && \
_build/${BAREBUILD_ENV}/rel/watcher/bin/watcher eval "OMG.DB.ReleaseTasks.InitKeyValueDB.run()" && \
_build/${BAREBUILD_ENV}/rel/watcher_info/bin/watcher_info eval "OMG.DB.ReleaseTasks.InitKeysWithValues.run()" && \
_build/${BAREBUILD_ENV}/rel/watcher/bin/watcher eval "OMG.DB.ReleaseTasks.InitKeysWithValues.run()" && \
echo "Run Watcher" && \
. ${OVERRIDING_VARIABLES} && \
PORT=${WATCHER_PORT} _build/${BAREBUILD_ENV}/rel/watcher/bin/watcher $(OVERRIDING_START)
Expand Down
6 changes: 6 additions & 0 deletions apps/omg_watcher/lib/omg_watcher/api/transaction.ex
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ defmodule OMG.Watcher.API.Transaction do
* transaction doesn't spend funds not yet mined
* etc...
"""
@spec submit(list(Transaction.Signed.t())) :: Client.response_t() | {:error, atom()}
def batch_submit(signed_txs) do
url = Application.get_env(:omg_watcher, :child_chain_url)
Client.batch_submit(signed_txs, url)
end

@spec submit(Transaction.Signed.t()) :: Client.response_t() | {:error, atom()}
def submit(%Transaction.Signed{} = signed_tx) do
url = Application.get_env(:omg_watcher, :child_chain_url)
Expand Down
7 changes: 4 additions & 3 deletions apps/omg_watcher/lib/omg_watcher/http_rpc/adapter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ defmodule OMG.Watcher.HttpRPC.Adapter do
the structure in body is known, so we can try to deserialize it.
"""
@spec get_response_body(HTTPoison.Response.t() | {:error, HTTPoison.Error.t()}) ::
{:ok, map()} | {:error, atom() | tuple() | HTTPoison.Error.t()}
{:ok, map()} | {:ok, list(map())} | {:error, atom() | tuple() | HTTPoison.Error.t()}
def get_response_body(http_response) do
with {:ok, body} <- get_unparsed_response_body(http_response),
{:ok, response} <- Jason.decode(body),
Expand All @@ -78,8 +78,9 @@ defmodule OMG.Watcher.HttpRPC.Adapter do
end
end

defp convert_keys_to_atoms(data) when is_list(data),
do: Enum.map(data, &convert_keys_to_atoms/1)
defp convert_keys_to_atoms(data) when is_list(data) do
Enum.map(data, &convert_keys_to_atoms/1)
end

defp convert_keys_to_atoms(data) when is_map(data) do
data
Expand Down
34 changes: 30 additions & 4 deletions apps/omg_watcher/lib/omg_watcher/http_rpc/client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ defmodule OMG.Watcher.HttpRPC.Client do
require Logger

@type response_t() ::
{:ok, %{required(atom()) => any()}}
list()
| {:ok, %{required(atom()) => any()}}
| {:error,
{:client_error | :server_error, any()}
| {:malformed_response, any() | {:error, :invalid}}}
Expand All @@ -40,8 +41,16 @@ defmodule OMG.Watcher.HttpRPC.Client do
@spec submit(binary(), binary()) :: response_t()
def submit(tx, url), do: call(%{transaction: Encoding.to_hex(tx)}, "transaction.submit", url)

@doc """
Submits a batch of transactions
"""
@spec batch_submit(list(binary()), binary()) :: response_t()
def batch_submit(txs, url) do
call(%{transactions: Enum.map(txs, &Encoding.to_hex(&1))}, "transaction.batch_submit", url)
end

defp call(params, path, url) do
Adapter.rpc_post(params, path, url) |> Adapter.get_response_body() |> decode_response()
params |> Adapter.rpc_post(path, url) |> Adapter.get_response_body() |> decode_response()
end

# Translates response's body to known elixir structure, either block or tx submission response or error.
Expand All @@ -54,12 +63,29 @@ defmodule OMG.Watcher.HttpRPC.Client do
}}
end

defp decode_response({:ok, %{tx_hash: _hash} = response}) do
{:ok, Map.update!(response, :tx_hash, &decode16!/1)}
defp decode_response({:ok, %{txhash: _hash} = response}) do
{:ok, Map.update!(response, :txhash, &decode16!/1)}
end

defp decode_response({:ok, response}) when is_list(response) do
decode_response(response, [])
end

defp decode_response(error), do: error

defp decode_response([], acc) do
Enum.reverse(acc)
end

defp decode_response([%{txhash: _hash} = transaction_response | response], acc) do
decode_response(response, [Map.update!(transaction_response, :txhash, &decode16!/1) | acc])
end

# all error tuples
defp decode_response([%{error: error} | response], acc) do
decode_response(response, [%{error: {:skip_hex_encode, error}} | acc])
end

defp decode16!(hexstr) do
{:ok, bin} = Encoding.from_hex(hexstr)
bin
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -561,12 +561,12 @@ defmodule OMG.Watcher.ExitProcessor.StandardExitTest do

defp start_se_from_deposit(processor, exiting_pos, alice) do
tx = TestHelper.create_recovered([], [{alice, @eth, 10}])
processor |> start_se_from(tx, exiting_pos)
start_se_from(processor, tx, exiting_pos)
end

defp start_se_from_block_tx(processor, exiting_pos, alice) do
tx = TestHelper.create_recovered([Tuple.append(@deposit_input2, alice)], [{alice, @eth, 10}])
processor |> start_se_from(tx, exiting_pos)
start_se_from(processor, tx, exiting_pos)
end

defp get_bytes_sig(tx, sig_idx \\ 0), do: {Transaction.raw_txbytes(tx), Enum.at(tx.signed_tx.sigs, sig_idx)}
Expand Down
26 changes: 22 additions & 4 deletions apps/omg_watcher_rpc/lib/web/controllers/transaction.ex
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,15 @@ defmodule OMG.WatcherRPC.Web.Controller.Transaction do
end
end

@doc """
Submits transaction to child chain
"""
def batch_submit(conn, params) do
with {:ok, txbytes} <- expect(params, "transactions", list: &to_transaction/1, optional: false) do
submit_tx_sec(txbytes, conn)
end
end

@doc """
Thin-client version of `/transaction.submit` that accepts json encoded transaction
"""
Expand Down Expand Up @@ -109,6 +118,12 @@ defmodule OMG.WatcherRPC.Web.Controller.Transaction do
end

# Provides extra validation (recover_from) and passes transaction to API layer
defp submit_tx_sec(txbytes, conn) when is_list(txbytes) do
txbytes
|> SecurityApiTransaction.batch_submit()
|> api_response(conn, :batch_submission)
end

defp submit_tx_sec(txbytes, conn) do
with {:ok, recovered_tx} <- Transaction.Recovered.recover_from(txbytes),
:ok <- is_supported(recovered_tx) do
Expand All @@ -119,10 +134,13 @@ defmodule OMG.WatcherRPC.Web.Controller.Transaction do
end
end

defp is_supported(%Transaction.Recovered{
signed_tx: %Transaction.Signed{raw_tx: %Transaction.Fee{}}
}),
do: {:error, :transaction_not_supported}
defp is_supported(%Transaction.Recovered{signed_tx: %Transaction.Signed{raw_tx: %Transaction.Fee{}}}) do
{:error, :transaction_not_supported}
end

defp is_supported(%Transaction.Recovered{}), do: :ok

defp to_transaction(transaction) do
expect(%{"transaction" => transaction}, "transaction", :hex)
end
end
2 changes: 1 addition & 1 deletion apps/omg_watcher_rpc/lib/web/router.ex
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ defmodule OMG.WatcherRPC.Web.Router do
post("/utxo.get_challenge_data", Controller.Challenge, :get_utxo_challenge)

post("/transaction.submit", Controller.Transaction, :submit)

post("/transaction.batch_submit", Controller.Transaction, :batch_submit)
post("/in_flight_exit.get_data", Controller.InFlightExit, :get_in_flight_exit)
post("/in_flight_exit.get_competitor", Controller.InFlightExit, :get_competitor)
post("/in_flight_exit.prove_canonical", Controller.InFlightExit, :prove_canonical)
Expand Down
6 changes: 6 additions & 0 deletions apps/omg_watcher_rpc/lib/web/views/transaction.ex
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ defmodule OMG.WatcherRPC.Web.View.Transaction do
|> WatcherRPCResponse.add_app_infos()
end

def render("batch_submission.json", %{response: transactions}) do
transactions
|> Response.serialize()
|> WatcherRPCResponse.add_app_infos()
end

def render("transactions.json", %{response: %Paginator{data: transactions, data_paging: data_paging}}) do
transactions
|> Enum.map(&render_transaction/1)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
transaction.batch_submit:
post:
tags:
- Transaction
summary: This endpoint submits an array of signed transaction to the child chain.
description: >
Normally you should call the Watcher's Transaction - Submit instead of this.
The Watcher's version performs various security and validation checks (TO DO) before submitting the transaction,
so is much safer. However, if the Watcher is not available this version exists.
operationId: batch_submit
requestBody:
$ref: 'request_bodies.yaml#/TransactionBatchSubmitBodySchema'
responses:
200:
$ref: 'responses.yaml#/TransactionBatchSubmitResponse'
500:
$ref: '../responses.yaml#/InternalServerError'
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
TransactionBatchSubmitBodySchema:
description: Array of signed transactions, RLP-encoded to bytes, and HEX-encoded to string
required: true
content:
application/json:
schema:
title: 'TransactionBatchSubmitBodySchema'
type: object
properties:
transactions:
type: array
items:
type: string
required:
- transactions
example:
transactions: ['0xf8d083015ba98080808080940000...', '0xf8d083a15ba98080808080920000...']
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
TransactionBatchSubmitResponseSchema:
allOf:
- $ref: '../response_schemas.yaml#/WatcherBaseResponseSchema'
- type: object
properties:
data:
type: array
$ref: 'schemas.yaml#/TransactionBatchSubmitSchema '
example:
data:
-
blknum: 123000
txindex: 111
txhash: '0xbdf562c24ace032176e27621073df58ce1c6f65de3b5932343b70ba03c72132d'
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
TransactionBatchSubmitResponse:
description: Transaction batch submission successful response
content:
application/json:
schema:
$ref: 'response_schemas.yaml#/TransactionBatchSubmitResponseSchema'
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
TransactionBatchSubmitSchema:
type: array
items:
type: object
properties:
blknum:
type: integer
format: int64
txindex:
type: integer
format: int16
txhash:
type: string
Loading

0 comments on commit ab4f276

Please sign in to comment.