diff --git a/.circleci/config.yml b/.circleci/config.yml index 15a59f9836..2be4a868b9 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -74,12 +74,6 @@ commands: steps: - run: printf "%s\\n" "$DOCKER_PASS" | docker login -u "$DOCKER_USER" --password-stdin - gcloud_login: - steps: - - run: | - echo $GCLOUD_SERVICE_KEY | gcloud auth activate-service-account --key-file=- - gcloud auth configure-docker --quiet - make_docker_images: description: Builds docker images steps: @@ -132,6 +126,19 @@ commands: sleep 5 done + run_test_in_docker: + description: "Quick test runs" + parameters: + test_command: + type: string + test_name: + type: string + steps: + - run: + name: "Docker run <>" + command: | + docker run --rm -it --network=chain_net -e DOCKER=true -e CHILD_CHAIN_URL=http://172.27.0.108:9656/v1 -e ETHEREUM_RPC_URL=http://172.27.0.108:80 -e DOCKER_GETH=true -e TEST_DATABASE_URL=postgresql://omisego_dev:omisego_dev@172.27.0.107:5432/omisego_test -e SHELL=/bin/sh -v $(pwd):/app --entrypoint /bin/sh omisegoimages/elixir-omg-builder:stable-20201207 -c "cd /app && mix deps.get && <>" + install_elixir: description: Installs elixir and checks if docker is healthy steps: @@ -158,19 +165,6 @@ commands: - restore_cache: key: v2-mix-specs-cache-{{ .Branch }}-{{ checksum "mix.lock" }} - run_test_in_docker: - description: "Quick test runs" - parameters: - test_command: - type: string - test_name: - type: string - steps: - - run: - name: "Docker run <>" - command: | - docker run --rm -it --network=project_chain_net -e DOCKER=true -e CHILD_CHAIN_URL=http://172.27.0.108:9656/v1 -e ETHEREUM_RPC_URL=http://172.27.0.108:80 -e DOCKER_GETH=true -e TEST_DATABASE_URL=postgresql://omisego_dev:omisego_dev@172.27.0.107:5432/omisego_test -e SHELL=/bin/sh -v $(pwd):/app --entrypoint /bin/sh omisegoimages/elixir-omg-builder:stable-20201207 -c "cd /app && mix deps.get && <>" - install_deps: description: Install linux dependencies steps: @@ -185,6 +179,20 @@ commands: sudo apt-get update && ./bin/setup no_output_timeout: 2400 + install_and_setup_gcloud: + description: Installs and sets up gcloud to fetch feefeed + steps: + - run: | + export LD_LIBRARY_PATH=/usr/local/lib + export CLOUDSDK_PYTHON=/usr/bin/python + wget https://dl.google.com/dl/cloudsdk/channels/rapid/downloads/google-cloud-sdk-323.0.0-linux-x86_64.tar.gz -O gcloud-sdk.tar.gz + tar zxf gcloud-sdk.tar.gz google-cloud-sdk + mv google-cloud-sdk ~/.google-cloud-sdk + ~/.google-cloud-sdk/install.sh --quiet + echo $GCP_KEY_FILE | gcloud auth activate-service-account $GCP_SERVICE_EMAIL --key-file=- + gcloud --quiet config set project ${GCP_PROJECT} + gcloud --quiet config set compute/zone ${GCP_ZONE} + gcloud --quiet auth configure-docker jobs: barebuild: @@ -230,8 +238,10 @@ jobs: source ~/.bash_profile asdf plugin-add elixir https://github.com/asdf-vm/asdf-elixir.git asdf plugin-add erlang https://github.com/asdf-vm/asdf-erlang.git + asdf plugin-add rust || asdf plugin-update rust asdf install - run: make init_test + - install_rust - run: command: ./bin/setup no_output_timeout: 2400 @@ -257,9 +267,11 @@ jobs: paths: - "deps_docker/" - "deps_docker/rocksdb" - - "_build_docker/" + - "_build_docker/test/lib/rocksdb/" + - "_build_docker/test/dev/rocksdb/" - "deps/" - - "_build/" + - "_build/test/lib/rocksdb/" + - "_build/test/dev/rocksdb/" - persist_to_workspace: name: Persist workspace root: ~/src @@ -315,32 +327,11 @@ jobs: _counter=$(mix credo --only Credo.Check.Readability.SinglePipe | grep -c "Use a function call when a pipeline is only one function long") echo "Current Credo.Check.Readability.SinglePipe occurrences:" echo $_counter - if [ $_counter -gt 271 ]; then + if [ $_counter -gt 273 ]; then echo "Have you been naughty or nice? Find out if Santa knows." exit 1 fi - - lint_version: - executor: builder - steps: - - setup_elixir-omg_workspace - - run: - command: | - if [ -n "$CIRCLE_TAG" ]; then - _tagged_version="${CIRCLE_TAG#*v}" - _tagged_version_ignoring_pre="${_tagged_version%%-pre.*}" - _filed_version="$(head -n 1 ./VERSION | sed 's/^[ \t]*//;s/[ \t]*$//')" - - if [ "$_tagged_version_ignoring_pre" != "$_filed_version" ]; then - echo "The git tag \"${CIRCLE_TAG}\" expects the VERSION to be \"${_tagged_version_ignoring_pre}\". Got \"${_filed_version}\"." - exit 1 - fi - else - echo "This build is not version-tagged. Skipping version lint." - exit 0 - fi - sobelow: executor: builder_pg environment: @@ -559,12 +550,13 @@ jobs: - run: name: Compile command: mix compile + - install_rust - run: name: Integration Tests command: | # Slow, serial integration test, run nightly. Here to make sure the standard `mix test --only integration` works export SHELL=/bin/bash - mix test --only integration + mix test --only integration dialyzer: executor: builder_pg @@ -608,7 +600,11 @@ jobs: image: ubuntu-2004:202010-01 environment: SNAPSHOT: SNAPSHOT_MIX_EXIT_PERIOD_SECONDS_120 - parallelism: 4 + LD_LIBRARY_PATH: /usr/local/lib + CLOUDSDK_PYTHON: /usr/bin/python + EXIT_ID_SIZE: 168 + LOCALCHAIN_CONTRACT_ADDRESSES: ~/project/localchain_contract_addresses.env + parallelism: 5 steps: - checkout - run: @@ -621,39 +617,50 @@ jobs: command: | [ -d data ] || mkdir data && chmod 777 data - docker_login - - gcloud_login - - run: pip3 install docker-compose --upgrade - make_docker_images + - install_and_setup_gcloud - run: name: Start daemon services - command: | - make init_test - cp ./localchain_contract_addresses.env ./priv/cabbage/apps/itest/localchain_contract_addresses.env - docker-compose -f docker-compose.yml -f docker-compose.specs.yml up -d || (START_RESULT=$?; docker-compose logs; exit $START_RESULT;) + command: make init_test && docker-compose -f docker-compose.yml -f docker-compose.specs.yml up -d || (START_RESULT=$?; docker-compose logs; exit $START_RESULT;) - run: name: Log daemon services command: make cabbage-logs background: true - check_docker_status - - install_elixir - run: sh .circleci/status.sh + - restore_cache: + key: docker_compose_release-cabbage-{{ checksum "~/project/priv/cabbage/mix.lock" }} - run: - name: Run specs - environment: - CHILD_CHAIN_URL: http://127.0.0.1:9656/v1 - EXIT_ID_SIZE: "168" - LOCALCHAIN_CONTRACT_ADDRESSES: "/home/circleci/project/localchain_contract_addresses.env" - PLASMA_CONTRACTS_DIR: "/home/circleci/project/data/plasma-contracts/contracts/" - FEE_CLAIMER_ADDRESS: "0x24F3402Cd22F03ff81B56941fE048AD0F4EED5A1" + no_output_timeout: 30m command: | - cd priv/cabbage - make install - make generate_api_code - mix deps.get - grep -r "@moduletag" ./apps/itest/ | awk '{print "--include " $3}' | tr -d ':' > tags.txt - echo $(circleci tests split ./tags.txt) - circleci tests split ./tags.txt > /tmp/tests-to-run - mix test --exclude test $(cat /tmp/tests-to-run) + cd ~/project/priv/cabbage + TESTFILES=$(circleci tests glob "apps/itest/test/itest/*_test.exs" | circleci tests split --split-by=timings --show-counts | tr '\r\n' ' ') + echo ${TESTFILES} + cd ~/project/ + docker run --rm -it --network=chain_net \ + --user=root \ + -e FEE_CLAIMER_ADDRESS=0x24F3402Cd22F03ff81B56941fE048AD0F4EED5A1 \ + -e MIX_ENV=test \ + -e PLASMA_CONTRACTS_DIR=/app/data/plasma-contracts/contracts/ \ + -e LOCALCHAIN_CONTRACT_ADDRESSES=/app/localchain_contract_addresses.env \ + -e DOCKER=true \ + -e WATCHER_URL=http://172.27.0.104:7434 \ + -e WATCHER_INFO_URL=http://172.27.0.105:7534 \ + -e CHILD_CHAIN_URL=http://172.27.0.108:9656/v1 \ + -e ETHEREUM_RPC_URL=http://172.27.0.108:80 \ + -e ETHEREUM_WS_URL=ws://172.27.0.108:81 \ + -e EXIT_ID_SIZE=168 \ + -e SHELL=/bin/sh \ + -v $(pwd):/app \ + --entrypoint /bin/sh \ + "omisego/childchain-builder:dev-6b9e25f" -c "cd /app/priv/cabbage && apk add maven && apk add jq && make install && make generate_api_code && mix deps.get && mix test ${TESTFILES} --trace" + - store_test_results: + path: ~/project/priv/cabbage/_build/test/lib/itest/ + - save_cache: + key: docker_compose_release-cabbage-{{ checksum "~/project/priv/cabbage/mix.lock" }} + paths: + - ~/project/priv/cabbage/deps + - ~/project/priv/cabbage/_build test_docker_compose_performance: description: "These are not actually performance tests, we're checking if the scripts work" @@ -662,8 +669,9 @@ jobs: environment: PERF_IMAGE_NAME: "omisego/perf-v2:latest" STATIX_TAG: "env:perf_circleci" - CHILD_CHAIN_URL: http://127.0.0.1:9656/v1 - FEE_AMOUNT: 1 + LD_LIBRARY_PATH: /usr/local/lib + CLOUDSDK_PYTHON: /usr/bin/python + EXIT_ID_SIZE: 168 steps: - checkout - run: @@ -672,53 +680,55 @@ jobs: [ -d data ] || mkdir data && chmod 777 data - docker_login - make_docker_images + - install_and_setup_gcloud - run: name: Build perf docker image command: make docker-perf IMAGE_NAME=$PERF_IMAGE_NAME - - install_elixir - - run: - name: Start daemon services - command: | - cd priv/perf - make start-services - - run: - name: docker services logs - background: true - command: | - cd priv/perf - make log-services - - run: sh .circleci/status.sh - - run: - name: Run load test - command: | - cd priv/perf - make init - export $(cat ../../localchain_contract_addresses.env | xargs) - make test - - run: - name: Show help information - command: docker run -it $PERF_IMAGE_NAME mix run -e "LoadTest.TestRunner.run()" -- help - - run: - name: Run perf smoke test (transactions) - command: | - docker run -it --env-file ./localchain_contract_addresses.env -e CHILD_CHAIN_URL=http://localhost:9656/v1 -e FEE_AMOUNT=1 --env DD_API_KEY --env DD_APP_KEY --env STATIX_TAG --network host $PERF_IMAGE_NAME mix run -e "LoadTest.TestRunner.run()" -- "transactions" 1 200 + # - install_elixir + # - run: + # name: Start daemon services + # command: | + # cd priv/perf + # make start-services + # - run: + # name: docker services logs + # background: true + # command: | + # cd priv/perf + # make log-services + # - run: sh .circleci/status.sh + # - run: + # name: Run load test + # command: | + # cd priv/perf + # make init + # export $(cat ../../localchain_contract_addresses.env | xargs) + # make test + # - run: + # name: Show help information + # command: docker run -it $PERF_IMAGE_NAME mix run -e "LoadTest.TestRunner.run()" -- help - run: name: Run perf smoke test (deposits) command: | - docker run -it --env-file ./localchain_contract_addresses.env -e CHILD_CHAIN_URL=http://localhost:9656/v1 -e FEE_AMOUNT=1 --env DD_API_KEY --env DD_APP_KEY --env STATIX_TAG --network host $PERF_IMAGE_NAME mix run -e "LoadTest.TestRunner.run()" -- "deposits" 1 200 + docker run -it --env-file ./localchain_contract_addresses.env -e FEE_AMOUNT=1 --env DD_API_KEY --env DD_APP_KEY --env STATIX_TAG --network host $PERF_IMAGE_NAME mix run -e "LoadTest.TestRunner.run()" -- "deposits" 1 200 - run: - name: (Perf) Format generated code and check for warnings + name: Run perf smoke test (transactions) command: | - cd priv/perf - # run format ONLY on formatted code so that it cleans up quoted atoms because - # we cannot exclude folders to --warnings-as-errors - mix format apps/*_api/lib/*_api/model/*.ex - export $(cat ../../localchain_contract_addresses.env | xargs) - make format-code-check-warnings + docker run -it --env-file ./localchain_contract_addresses.env -e FEE_AMOUNT=1 --env DD_API_KEY --env DD_APP_KEY --env STATIX_TAG --network host $PERF_IMAGE_NAME mix run -e "LoadTest.TestRunner.run()" -- "transactions" 1 200 + # - run: + # name: (Perf) Format generated code and check for warnings + # command: | + # cd priv/perf + # # run format ONLY on formatted code so that it cleans up quoted atoms because + # # we cannot exclude folders to --warnings-as-errors + # mix format apps/*_api/lib/*_api/model/*.ex + # export $(cat ../../localchain_contract_addresses.env | xargs) + # make format-code-check-warnings - save_cache: key: v2-mix-specs-cache-{{ .Branch }}-{{ checksum "mix.lock" }} paths: - "priv/perf/deps" + - run: name: (Perf) Credo and formatting command: | @@ -730,6 +740,8 @@ jobs: image: ubuntu-2004:202010-01 environment: REORG: true + LD_LIBRARY_PATH: /usr/local/lib + CLOUDSDK_PYTHON: /usr/bin/python steps: - checkout - run: @@ -745,6 +757,7 @@ jobs: [ -d data ] || mkdir data && chmod 777 data - docker_login - make_docker_images + - install_and_setup_gcloud - run: name: Start daemon services command: | @@ -756,7 +769,6 @@ jobs: command: make cabbage-logs-reorg background: true - check_docker_status - - install_elixir - run: sh .circleci/status.sh - run: name: Print watcher logs @@ -778,27 +790,43 @@ jobs: name: Print reorg logs command: make cabbage-reorgs-logs background: true + - restore_cache: + key: docker_compose_release-cabbage-{{ checksum "~/project/priv/cabbage/mix.lock" }} - run: - name: Run specs - environment: - CHILD_CHAIN_URL: http://127.0.0.1:9656/v1 - EXIT_ID_SIZE: "168" - LOCALCHAIN_CONTRACT_ADDRESSES: "/home/circleci/project/localchain_contract_addresses.env" - PLASMA_CONTRACTS_DIR: "/home/circleci/project/data/plasma-contracts/contracts/" - FEE_CLAIMER_ADDRESS: "0x24F3402Cd22F03ff81B56941fE048AD0F4EED5A1" command: | - cd priv/cabbage - make install - make generate_api_code - mix deps.get - mix test --only deposit --trace - no_output_timeout: 30m + cd ~/project/ + docker run --rm -it --network=chain_net \ + --user=root \ + -e FEE_CLAIMER_ADDRESS=0x24F3402Cd22F03ff81B56941fE048AD0F4EED5A1 \ + -e MIX_ENV=test \ + -e PLASMA_CONTRACTS_DIR=/app/data/plasma-contracts/contracts/ \ + -e LOCALCHAIN_CONTRACT_ADDRESSES=/app/localchain_contract_addresses.env \ + -e DOCKER=true \ + -e WATCHER_URL=http://172.27.0.104:7434 \ + -e WATCHER_INFO_URL=http://172.27.0.105:7534 \ + -e CHILD_CHAIN_URL=http://172.27.0.108:9656/v1 \ + -e ETHEREUM_RPC_URL=http://172.27.0.108:80 \ + -e ETHEREUM_WS_URL=ws://172.27.0.108:81 \ + -e EXIT_ID_SIZE=168 \ + -e SHELL=/bin/sh \ + -e REORG=true \ + -v $(pwd):/app \ + --entrypoint /bin/sh \ + "omisego/childchain-builder:dev-6b9e25f" -c "cd /app/priv/cabbage && apk add maven && apk add jq && make install && make generate_api_code && mix deps.get && mix test --only deposit --trace" + - save_cache: + key: docker_compose_release-cabbage-{{ checksum "~/project/priv/cabbage/mix.lock" }} + paths: + - ~/project/priv/cabbage/deps + - ~/project/priv/cabbage/_build test_barebone_release: machine: image: ubuntu-2004:202010-01 environment: TERM: xterm-256color + LD_LIBRARY_PATH: /usr/local/lib + CLOUDSDK_PYTHON: /usr/bin/python + EXIT_ID_SIZE: 168 steps: - checkout - run: @@ -807,6 +835,7 @@ jobs: git submodule init git submodule update --remote - run: echo 'export PATH=~/.cargo/bin:$PATH' >> $BASH_ENV + - install_and_setup_gcloud - docker_login - run: name: Start geth, postgres, feefeed and pull in blockchain snapshot @@ -1044,7 +1073,7 @@ workflows: - integration_tests: requires: [build] - barebuild_macos - - test_barebone_release + #- test_barebone_release build-test-deploy: jobs: - build: @@ -1053,12 +1082,12 @@ workflows: only: /.+/ tags: only: /.+/ - - test_barebone_release: - filters: *all_branches_and_tags + # - test_barebone_release: + # filters: *all_branches_and_tags - notify_services: - # requires: - # - increase_chart_version_watcher_master - # - increase_chart_version_watcher_info_master + requires: + - increase_chart_version_watcher_master + - increase_chart_version_watcher_info_master filters: branches: only: @@ -1080,8 +1109,8 @@ workflows: filters: *all_branches_and_tags - test_docker_compose_release: filters: *all_branches_and_tags - - test_docker_compose_performance: - filters: *all_branches_and_tags + # - test_docker_compose_performance: + # filters: *all_branches_and_tags - test_docker_compose_reorg: filters: branches: @@ -1093,9 +1122,6 @@ workflows: - lint: requires: [build] filters: *all_branches_and_tags - - lint_version: - requires: [build] - filters: *all_branches_and_tags - sobelow: requires: [build] filters: *all_branches_and_tags @@ -1107,22 +1133,29 @@ workflows: filters: *all_branches_and_tags - property_tests: requires: [build] - filters: *all_branches_and_tags + filters: &master_and_version_branches_and_all_tags + branches: + only: + - master-v2 + # vMAJOR.MINOR (e.g. v0.1, v0.2, v1.0, v2.1, etc.) + - /^v[0-9]+\.[0-9]+/ + tags: + only: + - /.+/ - watcher_mix_based_childchain: filters: *all_branches_and_tags - publish_watcher: requires: [ - test_barebone_release, + # test_barebone_release, test_docker_compose_release, watcher_coveralls_and_integration_tests, watcher_info_coveralls_and_integration_tests, common_coveralls_and_integration_tests, test, - property_tests, + # property_tests, dialyzer, lint, - lint_version, audit_deps ] filters: &master_and_version_branches_and_all_tags @@ -1137,7 +1170,7 @@ workflows: - publish_watcher_info: requires: [ - test_barebone_release, + # test_barebone_release, test_docker_compose_release, watcher_coveralls_and_integration_tests, watcher_info_coveralls_and_integration_tests, @@ -1146,35 +1179,34 @@ workflows: property_tests, dialyzer, lint, - lint_version, audit_deps ] filters: *master_and_version_branches_and_all_tags - - publish_perf: - requires: [test_docker_compose_performance] - filters: - branches: - only: - - master-v2 - # vMAJOR.MINOR (e.g. v0.1, v0.2, v1.0, v2.1, etc.) - - /^v[0-9]+\.[0-9]+/ - tags: - only: - - /.+/ - # Increase chart version for master, this will end up trigger deployment on dev - # - increase_chart_version_watcher_master: - # requires: [publish_watcher, publish_watcher_info] + # - publish_perf: + # requires: [test_docker_compose_performance] # filters: # branches: # only: - # - master-v2 - # - increase_chart_version_watcher_info_master: - # requires: [publish_watcher, publish_watcher_info] - # filters: - # branches: + # - master + # # vMAJOR.MINOR (e.g. v0.1, v0.2, v1.0, v2.1, etc.) + # - /^v[0-9]+\.[0-9]+/ + # tags: # only: - # - master-v2 + # - /.+/ + # Increase chart version for master, this will end up trigger deployment on dev + - increase_chart_version_watcher_master: + requires: [publish_watcher, publish_watcher_info] + filters: + branches: + only: + - master + - increase_chart_version_watcher_info_master: + requires: [publish_watcher, publish_watcher_info] + filters: + branches: + only: + - master # Increase chart version for new release - increase_chart_version_watcher_release: requires: [publish_watcher, publish_watcher_info] @@ -1188,24 +1220,23 @@ workflows: - increase_chart_version_watcher_info_release: requires: [publish_watcher, publish_watcher_info] filters: *only_release_tag - # - release: - # requires: [ - # test_barebone_release, - # test_docker_compose_release, - # watcher_coveralls_and_integration_tests, - # watcher_info_coveralls_and_integration_tests, - # common_coveralls_and_integration_tests, - # test, - # property_tests, - # dialyzer, - # lint, - # lint_version, - # audit_deps - # ] - # context: - # - shared-semantic-release - # filters: - # branches: - # only: /^master-v2$/ - # tags: - # ignore: /.*/ + - release: + requires: [ + # test_barebone_release, + test_docker_compose_release, + watcher_coveralls_and_integration_tests, + watcher_info_coveralls_and_integration_tests, + common_coveralls_and_integration_tests, + test, + property_tests, + dialyzer, + lint, + audit_deps + ] + context: + - shared-semantic-release + filters: + branches: + only: /^master-v2$/ + tags: + ignore: /.*/ \ No newline at end of file diff --git a/apps/omg/lib/omg/state.ex b/apps/omg/lib/omg/state.ex index d05c8a2153..76fcdeaf93 100644 --- a/apps/omg/lib/omg/state.ex +++ b/apps/omg/lib/omg/state.ex @@ -193,6 +193,10 @@ defmodule OMG.State do end def handle_call({:deposit, deposits}, _from, state) do + if Code.ensure_loaded?(OMG.WatcherInfo.DB.EthEvent) do + :ok = Kernel.apply(OMG.WatcherInfo.DB.EthEvent, :insert_deposits!, [deposits]) + end + {:ok, db_updates, new_state} = Core.deposit(deposits, state) {:reply, {:ok, db_updates}, new_state} diff --git a/apps/omg/test/omg/state/persistence_test.exs b/apps/omg/test/omg/state/persistence_test.exs index c708d3ad52..2e477c877f 100644 --- a/apps/omg/test/omg/state/persistence_test.exs +++ b/apps/omg/test/omg/state/persistence_test.exs @@ -23,6 +23,7 @@ defmodule OMG.State.PersistenceTest do require OMG.Utxo + alias Ecto.Adapters.SQL.Sandbox alias OMG.Block alias OMG.Eth.Configuration alias OMG.State.Transaction @@ -57,6 +58,20 @@ defmodule OMG.State.PersistenceTest do strategy: :one_for_one ) + Application.ensure_all_started(:postgrex) + Application.ensure_all_started(:spandex_ecto) + Application.ensure_all_started(:ecto) + + {:ok, _} = + Supervisor.start_link( + [%{id: OMG.WatcherInfo.DB.Repo, start: {OMG.WatcherInfo.DB.Repo, :start_link, []}, type: :supervisor}], + strategy: :one_for_one, + name: WatcherInfo.Supervisor + ) + + :ok = Sandbox.checkout(OMG.WatcherInfo.DB.Repo) + Sandbox.mode(OMG.WatcherInfo.DB.Repo, {:shared, self()}) + on_exit(fn -> Application.put_env(:omg_db, :path, nil) @@ -226,7 +241,8 @@ defmodule OMG.State.PersistenceTest do owner: owner.addr, currency: currency, amount: amount, - blknum: blknum + blknum: blknum, + eth_height: 1 } end) end diff --git a/apps/omg/test/omg/state_test.exs b/apps/omg/test/omg/state_test.exs index 20f5715e48..8bef42637d 100644 --- a/apps/omg/test/omg/state_test.exs +++ b/apps/omg/test/omg/state_test.exs @@ -21,6 +21,7 @@ defmodule OMG.StateTest do use OMG.DB.Fixtures + alias Ecto.Adapters.SQL.Sandbox alias OMG.State alias OMG.TestHelper alias OMG.Utxo @@ -38,6 +39,20 @@ defmodule OMG.StateTest do # the pubsub is required, because `OMG.State` is broadcasting to the `OMG.Bus` {:ok, bus_apps} = Application.ensure_all_started(:omg_bus) + Application.ensure_all_started(:postgrex) + Application.ensure_all_started(:spandex_ecto) + Application.ensure_all_started(:ecto) + + {:ok, _} = + Supervisor.start_link( + [%{id: OMG.WatcherInfo.DB.Repo, start: {OMG.WatcherInfo.DB.Repo, :start_link, []}, type: :supervisor}], + strategy: :one_for_one, + name: WatcherInfo.Supervisor + ) + + :ok = Sandbox.checkout(OMG.WatcherInfo.DB.Repo) + Sandbox.mode(OMG.WatcherInfo.DB.Repo, {:shared, self()}) + on_exit(fn -> (started_apps ++ bus_apps) |> Enum.reverse() @@ -68,7 +83,19 @@ defmodule OMG.StateTest do fee = %{@eth => [1]} # deposits, transactions, utxo existence - assert {:ok, _} = State.deposit([%{owner: alice.addr, currency: @eth, amount: 10, blknum: 1}]) + assert {:ok, _} = + State.deposit([ + %{ + owner: alice.addr, + currency: @eth, + amount: 10, + blknum: 1, + root_chain_txhash: <<1::256>>, + eth_height: 1, + log_index: 0 + } + ]) + assert true == State.utxo_exists?(Utxo.position(1, 0, 0)) assert {:ok, _} = State.exec(TestHelper.create_recovered([{1, 0, 0, alice}], @eth, [{alice, 9}]), fee) diff --git a/apps/omg_status/lib/omg_status/metric/event.ex b/apps/omg_status/lib/omg_status/metric/event.ex index ba83c932ac..c082d0706c 100644 --- a/apps/omg_status/lib/omg_status/metric/event.ex +++ b/apps/omg_status/lib/omg_status/metric/event.ex @@ -28,6 +28,7 @@ defmodule OMG.Status.Metric.Event do :ife_exit_finalizer, :in_flight_exit, :in_flight_exit_processor, + :in_flight_exit_deleted_processor, :piggyback, :piggyback_challenges_processor, :piggyback_processor, @@ -54,7 +55,6 @@ defmodule OMG.Status.Metric.Event do :watcher_exit_processor_message_queue_len - OMG.Watcher.ExitProcessor message queue length :eventer_message_queue_len - OMG.Watcher.Eventer message queue length :db_message_queue_len - OMG.DB server implementation (OMG.DB.LevelDB.Server, or OMG.DB.RocksDB.Server,) message queue length - :pending_block_queue_length - OMG.WatcherInfo.DB.PendingBlock queue length :write - OMG.DB KV layer has three types of actions: write, read, multiread :read - OMG.DB KV layer has three types of actions: write, read, multiread :multiread - OMG.DB KV layer has three types of actions: write, read, multiread @@ -78,7 +78,6 @@ defmodule OMG.Status.Metric.Event do def name(:watcher_exit_processor_message_queue_len), do: "watcher_exit_processor_message_queue_len" def name(:eventer_message_queue_len), do: "eventer_message_queue_len" def name(:db_message_queue_len), do: "db_message_queue_len" - def name(:pending_block_queue_length), do: "pending_block_queue_length" def name(:write), do: "db_write" def name(:read), do: "db_read" def name(:multiread), do: "db_multiread" @@ -100,6 +99,7 @@ defmodule OMG.Status.Metric.Event do defp events_name(:exit_finalizer), do: "exit_finalizer_ethereum_events" defp events_name(:exit_challenger), do: "exit_challenger_ethereum_events" defp events_name(:in_flight_exit_processor), do: "in_flight_exit_processor_ethereum_events" + defp events_name(:in_flight_exit_deleted_processor), do: "in_flight_exit_deleted_processor_ethereum_events" defp events_name(:piggyback_processor), do: "piggyback_processor_ethereum_events" defp events_name(:competitor_processor), do: "competitor_processor_ethereum_events" defp events_name(:challenges_responds_processor), do: "challenges_responds_processor_ethereum_events" diff --git a/apps/omg_watcher/lib/omg_watcher/block_getter.ex b/apps/omg_watcher/lib/omg_watcher/block_getter.ex index d95d95bff5..6a99d33fef 100644 --- a/apps/omg_watcher/lib/omg_watcher/block_getter.ex +++ b/apps/omg_watcher/lib/omg_watcher/block_getter.ex @@ -141,10 +141,8 @@ defmodule OMG.Watcher.BlockGetter do case Core.validate_executions(tx_exec_results, block_application, state) do {:ok, state} -> - :ok = - {:child_chain, "block.get"} - |> OMG.Bus.Event.new(:block_received, block_application) - |> OMG.Bus.direct_local_broadcast() + if Code.ensure_loaded?(OMG.WatcherInfo.DB.Block), + do: Kernel.apply(OMG.WatcherInfo.BlockApplicator, :insert_block!, [block_application]) {:noreply, state, {:continue, {:apply_block_step, :run_block_download_task, block_application}}} diff --git a/apps/omg_watcher/lib/omg_watcher/exit_processor.ex b/apps/omg_watcher/lib/omg_watcher/exit_processor.ex index 93043ad916..d78a9612c1 100644 --- a/apps/omg_watcher/lib/omg_watcher/exit_processor.ex +++ b/apps/omg_watcher/lib/omg_watcher/exit_processor.ex @@ -391,6 +391,9 @@ defmodule OMG.Watcher.ExitProcessor do ) |> Enum.map(fn {:ok, result} -> result end) + if Code.ensure_loaded?(OMG.WatcherInfo.DB.EthEvent), + do: Kernel.apply(OMG.WatcherInfo.DB.EthEvent, :insert_exits!, [exits, :standard_exit, nil]) + {new_state, db_updates} = Core.new_exits(state, exit_maps, exit_contract_statuses) {:reply, {:ok, db_updates}, new_state} end @@ -404,13 +407,17 @@ defmodule OMG.Watcher.ExitProcessor do end) # Prepare events data for internal bus - :ok = + events = exits |> Enum.map(fn %{input_utxos_pos: inputs} = event -> {event, inputs} end) |> Tools.to_bus_events_data() - |> publish_internal_bus_events("InFlightExitStarted") + + :ok = publish_internal_bus_events(events, :InFlightExitStarted) + + if Code.ensure_loaded?(OMG.WatcherInfo.DB.EthEvent), + do: Kernel.apply(OMG.WatcherInfo.DB.EthEvent, :insert_exits!, [events, :in_flight_exit, :InFlightExitStarted]) {:ok, statuses} = Eth.RootChain.get_in_flight_exit_structs(contract_ife_ids) @@ -446,10 +453,16 @@ defmodule OMG.Watcher.ExitProcessor do _ = if not Enum.empty?(exits), do: Logger.info("Recognized #{Enum.count(exits)} piggybacks: #{inspect(exits)}") {new_state, db_updates} = Core.new_piggybacks(state, exits) - :ok = - exits - |> Tools.to_bus_events_data() - |> publish_internal_bus_events("InFlightTxOutputPiggybacked") + events = Tools.to_bus_events_data(exits) + :ok = publish_internal_bus_events(events, :InFlightTxOutputPiggybacked) + + if Code.ensure_loaded?(OMG.WatcherInfo.DB.EthEvent), + do: + Kernel.apply( + OMG.WatcherInfo.DB.EthEvent, + :insert_exits!, + [events, :in_flight_exit, :InFlightTxOutputPiggybacked] + ) {:reply, {:ok, db_updates}, new_state} end @@ -503,10 +516,16 @@ defmodule OMG.Watcher.ExitProcessor do {:ok, state3, db_updates} = Core.finalize_in_flight_exits(state2, finalizations, invalidities) - :ok = - events_with_utxos - |> Tools.to_bus_events_data() - |> publish_internal_bus_events("InFlightExitOutputWithdrawn") + events = Tools.to_bus_events_data(events_with_utxos) + :ok = publish_internal_bus_events(events, :InFlightExitOutputWithdrawn) + + if Code.ensure_loaded?(OMG.WatcherInfo.DB.EthEvent), + do: + Kernel.apply( + OMG.WatcherInfo.DB.EthEvent, + :insert_exits!, + [events, :in_flight_exit, :InFlightExitOutputWithdrawn] + ) {:reply, {:ok, state_db_updates ++ db_updates}, state3} end @@ -709,7 +728,7 @@ defmodule OMG.Watcher.ExitProcessor do defp publish_internal_bus_events([], _), do: :ok - defp publish_internal_bus_events(events_data, topic) when is_list(events_data) and is_binary(topic) do + defp publish_internal_bus_events(events_data, topic) when is_list(events_data) and is_atom(topic) do {:watcher, topic} |> OMG.Bus.Event.new(:data, events_data) |> OMG.Bus.direct_local_broadcast() diff --git a/apps/omg_watcher/test/omg_watcher/integration/monitor_test.exs b/apps/omg_watcher/test/omg_watcher/integration/monitor_test.exs index c9d136ca41..f4ff1ae82d 100644 --- a/apps/omg_watcher/test/omg_watcher/integration/monitor_test.exs +++ b/apps/omg_watcher/test/omg_watcher/integration/monitor_test.exs @@ -51,7 +51,7 @@ defmodule OMG.Watcher.MonitorTest do test "that a child process gets restarted after alarm is cleared" do child = ChildProcess.prepare_child() - {:ok, monitor_pid} = Monitor.start_link([Alarm, child]) + {:ok, monitor_pid} = start_and_attach_a_child([Alarm, child]) app_alarm = Alarm.ethereum_connection_error(__MODULE__) # the monitor is now started, we raise an alarm and kill it's child @@ -85,7 +85,7 @@ defmodule OMG.Watcher.MonitorTest do test "that a child process does not get restarted if an alarm is cleared but it was not down" do child = ChildProcess.prepare_child() - {:ok, monitor_pid} = Monitor.start_link([Alarm, child]) + {:ok, monitor_pid} = start_and_attach_a_child([Alarm, child]) app_alarm = Alarm.ethereum_connection_error(__MODULE__) :ok = :alarm_handler.set_alarm(app_alarm) :erlang.trace(monitor_pid, true, [:receive]) @@ -102,6 +102,17 @@ defmodule OMG.Watcher.MonitorTest do assert Process.info(monitor_pid, :links) == {:links, links} end + defp start_and_attach_a_child(opts) do + case Monitor.start_link(opts) do + {:ok, monitor_pid} -> + {:ok, monitor_pid} + + {:error, {{:badmatch, {:error, {:already_started, _}}}, _}} -> + Process.sleep(500) + start_and_attach_a_child(opts) + end + end + defmodule ChildProcess do @moduledoc """ Mocking a child process to Monitor diff --git a/apps/omg_watcher_info/lib/omg_watcher_info/block_application_consumer.ex b/apps/omg_watcher_info/lib/omg_watcher_info/block_application_consumer.ex deleted file mode 100644 index b48ad41dc8..0000000000 --- a/apps/omg_watcher_info/lib/omg_watcher_info/block_application_consumer.ex +++ /dev/null @@ -1,64 +0,0 @@ -# Copyright 2019-2020 OMG Network Pte Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -defmodule OMG.WatcherInfo.BlockApplicationConsumer do - @moduledoc """ - Subscribes for new blocks and inserts them into a pending queue of blocks waiting to be processed. - """ - require Logger - - use GenServer - - alias OMG.WatcherInfo.DB - - @default_bus_module OMG.Bus - - def start_link(args) do - GenServer.start_link(__MODULE__, args, name: Keyword.get(args, :name, __MODULE__)) - end - - def init(args) do - _ = Logger.info("Started #{inspect(__MODULE__)}") - - bus_module = Keyword.get(args, :bus_module, @default_bus_module) - :ok = bus_module.subscribe({:child_chain, "block.get"}, link: true) - - {:ok, %{}} - end - - # Listens for blocks and insert them to the WatcherDB. - def handle_info({:internal_event_bus, :block_received, block_application}, state) do - {:ok, _} = - block_application - |> to_pending_block() - |> DB.PendingBlock.insert() - - {:noreply, state} - end - - defp to_pending_block(%{} = block) do - data = %{ - eth_height: block.eth_height, - blknum: block.number, - blkhash: block.hash, - timestamp: block.timestamp, - transactions: block.transactions - } - - %{ - data: :erlang.term_to_binary(data), - blknum: block.number - } - end -end diff --git a/apps/omg_watcher_info/lib/omg_watcher_info/block_applicator.ex b/apps/omg_watcher_info/lib/omg_watcher_info/block_applicator.ex new file mode 100644 index 0000000000..fbc0754a59 --- /dev/null +++ b/apps/omg_watcher_info/lib/omg_watcher_info/block_applicator.ex @@ -0,0 +1,48 @@ +# Copyright 2019-2020 OMG Network Pte Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +defmodule OMG.WatcherInfo.BlockApplicator do + @moduledoc """ + Handles new block applications from Watcher's `BlockGetter` and persists them for further processing + """ + + alias OMG.WatcherInfo.DB + + @type block_application_t :: %{ + eth_height: pos_integer(), + hash: binary(), + number: pos_integer(), + timestamp: pos_integer(), + transactions: [OMG.State.Transaction.Recovered.t()] + } + + @doc """ + Inserts a block along with transactions and outputs, does not break when block already exists. + """ + @spec insert_block!(block_application_t()) :: :ok + def insert_block!(block) do + block + |> DB.Block.insert_from_block_application() + |> case do + {:ok, _} -> + :ok + + # Ensures insert idempotency. Trying to add block with the same `blknum` that already exists takes no effect. + # See also [comment](https://github.com/omgnetwork/elixir-omg/pull/1769#discussion_r528700434) + {:error, "current_block", changeset, _explain} -> + [{:blknum, {_msg, [constraint: :unique, constraint_name: _name]}}] = changeset.errors() + :ok + end + end +end diff --git a/apps/omg_watcher_info/lib/omg_watcher_info/configuration.ex b/apps/omg_watcher_info/lib/omg_watcher_info/configuration.ex deleted file mode 100644 index eca71707f2..0000000000 --- a/apps/omg_watcher_info/lib/omg_watcher_info/configuration.ex +++ /dev/null @@ -1,28 +0,0 @@ -# Copyright 2019-2020 OMG Network Pte Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -defmodule OMG.WatcherInfo.Configuration do - @moduledoc """ - Provides access to applications configuration - """ - @app :omg_watcher_info - - def pending_block_processing_interval() do - Application.fetch_env!(@app, :pending_block_processing_interval) - end - - def block_queue_check_interval() do - Application.fetch_env!(@app, :block_queue_check_interval) - end -end diff --git a/apps/omg_watcher_info/lib/omg_watcher_info/db/block.ex b/apps/omg_watcher_info/lib/omg_watcher_info/db/block.ex index 0af7a1bd4e..3f4a0ef8f8 100644 --- a/apps/omg_watcher_info/lib/omg_watcher_info/db/block.ex +++ b/apps/omg_watcher_info/lib/omg_watcher_info/db/block.ex @@ -25,7 +25,6 @@ defmodule OMG.WatcherInfo.DB.Block do alias OMG.Utils.Paginator alias OMG.WatcherInfo.DB alias OMG.WatcherInfo.DB.Block.Chunk - alias OMG.WatcherInfo.DB.PendingBlock import Ecto.Query, only: [from: 2] @@ -152,27 +151,21 @@ defmodule OMG.WatcherInfo.DB.Block do end @doc """ - Takes a pending block, decode its data and inserts its content in the database. + Takes a block application and inserts block into the database. """ # sobelow_skip ["Misc.BinToTerm"] - @spec insert_from_pending_block(PendingBlock.t()) :: {:ok, %__MODULE__{}} | {:error, any()} + @spec insert_from_block_application(map()) :: {:ok, %__MODULE__{}} | {:error, binary(), Ecto.Changeset.t(), any()} @decorate trace(service: :ecto, type: :db, tracer: OMG.WatcherInfo.Tracer) - def insert_from_pending_block(pending_block) do - %{ - transactions: transactions, - blknum: block_number, - blkhash: blkhash, - timestamp: timestamp, - eth_height: eth_height - } = :erlang.binary_to_term(pending_block.data) + def insert_from_block_application(block_application) do + %{number: block_number, transactions: transactions} = block_application {db_txs, db_outputs, db_inputs} = prepare_db_transactions(transactions, block_number) current_block = %{ blknum: block_number, - hash: blkhash, - timestamp: timestamp, - eth_height: eth_height + hash: block_application.hash, + timestamp: block_application.timestamp, + eth_height: block_application.eth_height } db_txs_stream = Chunk.chunk(db_txs) @@ -184,7 +177,6 @@ defmodule OMG.WatcherInfo.DB.Block do |> prepare_inserts(db_txs_stream, "db_txs_", DB.Transaction) |> prepare_inserts(db_outputs_stream, "db_outputs_", DB.TxOutput) |> DB.TxOutput.spend_utxos(db_inputs) - |> Ecto.Multi.delete("pending_block", pending_block, []) {insert_duration, result} = :timer.tc(DB.Repo, :transaction, [multi]) diff --git a/apps/omg_watcher_info/lib/omg_watcher_info/db/eth_event.ex b/apps/omg_watcher_info/lib/omg_watcher_info/db/eth_event.ex index 0c8a99e167..550cd41f2a 100644 --- a/apps/omg_watcher_info/lib/omg_watcher_info/db/eth_event.ex +++ b/apps/omg_watcher_info/lib/omg_watcher_info/db/eth_event.ex @@ -84,15 +84,17 @@ defmodule OMG.WatcherInfo.DB.EthEvent do @spec insert_deposit!(OMG.State.Core.deposit()) :: {:ok, %__MODULE__{}} | {:error, Ecto.Changeset.t()} @decorate trace(service: :ecto, type: :db, tracer: OMG.WatcherInfo.Tracer) - defp insert_deposit!(%{ - root_chain_txhash: root_chain_txhash, - log_index: log_index, - blknum: blknum, - owner: owner, - eth_height: eth_height, - currency: currency, - amount: amount - }) do + defp insert_deposit!(params) do + %{ + root_chain_txhash: root_chain_txhash, + log_index: log_index, + blknum: blknum, + owner: owner, + eth_height: eth_height, + currency: currency, + amount: amount + } = params + event_type = :deposit position = Utxo.position(blknum, 0, 0) root_chain_txhash_event = generate_root_chain_txhash_event(root_chain_txhash, log_index) @@ -134,11 +136,13 @@ defmodule OMG.WatcherInfo.DB.EthEvent do @doc """ Uses a list of encoded `Utxo.Position`s to insert the exits (if not already inserted before) """ - @spec insert_exits!([non_neg_integer()], available_event_type_t()) :: :ok - def insert_exits!(exits, event_type) do + @spec insert_exits!([map()], available_event_type_t(), atom()) :: :ok + def insert_exits!(exits, event_type, event_type_detailed) do + ensure_output = expect_output_existence?(event_type, event_type_detailed) + exits |> Stream.map(&utxo_exit_from_exit_event/1) - |> Enum.each(&insert_exit!(&1, event_type)) + |> Enum.each(&insert_exit!(&1, event_type, ensure_output)) end def txoutput_changeset(txoutput, params, ethevent) do @@ -220,17 +224,17 @@ defmodule OMG.WatcherInfo.DB.EthEvent do output_pointer: {:utxo_position, Utxo.Position.t()} | {:output_id, tuple()}, eth_height: pos_integer() }, - available_event_type_t() - ) :: {:ok, %__MODULE__{}} | {:error, Ecto.Changeset.t()} | :noop - defp insert_exit!( - %{ - root_chain_txhash: root_chain_txhash, - log_index: log_index, - eth_height: eth_height, - output_pointer: output_pointer - }, - event_type - ) do + available_event_type_t(), + boolean() + ) :: :ok | :noop + defp insert_exit!(event, event_type, ensure_output) do + %{ + root_chain_txhash: root_chain_txhash, + log_index: log_index, + eth_height: eth_height, + output_pointer: output_pointer + } = event + root_chain_txhash_event = generate_root_chain_txhash_event(root_chain_txhash, log_index) ethevent = @@ -248,23 +252,26 @@ defmodule OMG.WatcherInfo.DB.EthEvent do event end - tx_output = resolve_tx_output(output_pointer) + case resolve_tx_output(output_pointer) do + %DB.TxOutput{} = tx_output -> + :ok = insert_exit_if_not_exist(ethevent, tx_output) - insert_exit_if_not_exist(ethevent, tx_output) + # The transaction's output is expected to be found in the DB unless explicitly allowed by `ensure_output = false` + # More explanation can be found in [the issue discussion](https://github.com/omgnetwork/elixir-omg/issues/1760#issuecomment-722313713). + nil when not ensure_output -> + :noop + end end @spec resolve_tx_output(tuple()) :: %DB.TxOutput{} | nil defp resolve_tx_output({:utxo_position, utxo_pos}), do: DB.TxOutput.get_by_position(utxo_pos) defp resolve_tx_output({:output_id, {txhash, oindex}}), do: DB.TxOutput.get_by_output_id(txhash, oindex) - @spec insert_exit_if_not_exist(%__MODULE__{}, %DB.TxOutput{} | nil) :: - {:ok, %__MODULE__{}} | {:error, Ecto.Changeset.t()} | :noop - defp insert_exit_if_not_exist(_, nil), do: :noop - + @spec insert_exit_if_not_exist(%__MODULE__{}, %DB.TxOutput{} | nil) :: :ok defp insert_exit_if_not_exist(ethevent, tx_output) do # if TxOutput is already assiociated with this (or any other) spending event no action is needed if output_spent?(tx_output), - do: :noop, + do: :ok, else: do_insert_exit(ethevent, tx_output) end @@ -278,11 +285,9 @@ defmodule OMG.WatcherInfo.DB.EthEvent do tx_output |> txoutput_changeset(%{child_chain_utxohash: generate_child_chain_utxohash(decoded_utxo_position)}, ethevent) - |> DB.Repo.update() + |> DB.Repo.update!() - # a txoutput row just got updated, but we need to return the associated ethevent with all populated - # fields including those populated by the DB (eg: inserted_at, updated_at, ...) - {:ok, get(ethevent.root_chain_txhash_event)} + :ok end defp base_query() do @@ -326,4 +331,10 @@ defmodule OMG.WatcherInfo.DB.EthEvent do end defp output_spent?(%DB.TxOutput{}), do: true + + # Tells whether processing exit event, exited outout has to be present in the database + # For more see: https://github.com/omgnetwork/elixir-omg/issues/1760#issuecomment-722313713 + defp expect_output_existence?(:standard_exit, _), do: true + defp expect_output_existence?(:in_flight_exit, :InFlightTxOutputPiggybacked), do: false + defp expect_output_existence?(:in_flight_exit, _any), do: true end diff --git a/apps/omg_watcher_info/lib/omg_watcher_info/db/pending_block.ex b/apps/omg_watcher_info/lib/omg_watcher_info/db/pending_block.ex deleted file mode 100644 index 0745bed150..0000000000 --- a/apps/omg_watcher_info/lib/omg_watcher_info/db/pending_block.ex +++ /dev/null @@ -1,66 +0,0 @@ -# Copyright 2019-2020 OMG Network Pte Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -defmodule OMG.WatcherInfo.DB.PendingBlock do - @moduledoc """ - Ecto schema for pending block data - These are valid block data received by the internal bus that should be stored in the database. - This intermediate table is needed as the messages received by the bus are not persisted - and if for some reason the writing to the database fails, we would lose these data. - """ - use Ecto.Schema - use OMG.Utils.LoggerExt - - import Ecto.Changeset - import Ecto.Query, only: [from: 2] - - alias OMG.WatcherInfo.DB.Repo - - @type t() :: %{ - blknum: pos_integer(), - data: binary() - } - - @primary_key {:blknum, :integer, []} - - schema "pending_blocks" do - field(:data, :binary) - - timestamps(type: :utc_datetime_usec) - end - - @spec insert(map()) :: {:ok, %__MODULE__{}} | {:error, Ecto.Changeset.t()} - def insert(params) do - params - |> insert_changeset() - |> Repo.insert() - end - - @spec get_next_to_process() :: nil | %__MODULE__{} - def get_next_to_process() do - (pending_block in __MODULE__) - |> from(order_by: :blknum, limit: 1) - |> Repo.one() - end - - @spec get_count() :: non_neg_integer() - def get_count(), do: Repo.aggregate(__MODULE__, :count) - - defp insert_changeset(params) do - %__MODULE__{} - |> cast(params, [:blknum, :data]) - |> validate_required([:blknum, :data]) - |> unique_constraint(:blknum, name: :pending_blocks_pkey) - end -end diff --git a/apps/omg_watcher_info/lib/omg_watcher_info/db/txoutput.ex b/apps/omg_watcher_info/lib/omg_watcher_info/db/txoutput.ex index 3e8ba999ef..8736e6000c 100644 --- a/apps/omg_watcher_info/lib/omg_watcher_info/db/txoutput.ex +++ b/apps/omg_watcher_info/lib/omg_watcher_info/db/txoutput.ex @@ -78,7 +78,6 @@ defmodule OMG.WatcherInfo.DB.TxOutput do Repo.one( from(txoutput in __MODULE__, preload: [:ethevents], - left_join: ethevent in assoc(txoutput, :ethevents), where: txoutput.blknum == ^blknum and txoutput.txindex == ^txindex and txoutput.oindex == ^oindex ) ) @@ -90,7 +89,6 @@ defmodule OMG.WatcherInfo.DB.TxOutput do Repo.one( from(txoutput in __MODULE__, preload: [:ethevents], - left_join: ethevent in assoc(txoutput, :ethevents), where: txoutput.creating_txhash == ^txhash and txoutput.oindex == ^oindex ) ) diff --git a/apps/omg_watcher_info/lib/omg_watcher_info/deposit_consumer.ex b/apps/omg_watcher_info/lib/omg_watcher_info/deposit_consumer.ex deleted file mode 100644 index 7e64760840..0000000000 --- a/apps/omg_watcher_info/lib/omg_watcher_info/deposit_consumer.ex +++ /dev/null @@ -1,42 +0,0 @@ -# Copyright 2019-2020 OMG Network Pte Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -defmodule OMG.WatcherInfo.DepositConsumer do - @moduledoc """ - Subscribes to deposit events and inserts them to WatcherInfo.DB. - """ - require Logger - - ### Client - - def start_link(_args) do - GenServer.start_link(__MODULE__, :ok, name: __MODULE__) - end - - ### Server - - use GenServer - - def init(:ok) do - :ok = OMG.Bus.subscribe({:root_chain, "DepositCreated"}, link: true) - - _ = Logger.info("Started #{inspect(__MODULE__)}") - {:ok, %{}} - end - - def handle_info({:internal_event_bus, :data, data}, state) do - _ = OMG.WatcherInfo.DB.EthEvent.insert_deposits!(data) - {:noreply, state} - end -end diff --git a/apps/omg_watcher_info/lib/omg_watcher_info/exit_consumer.ex b/apps/omg_watcher_info/lib/omg_watcher_info/exit_consumer.ex deleted file mode 100644 index 8bf195a5a5..0000000000 --- a/apps/omg_watcher_info/lib/omg_watcher_info/exit_consumer.ex +++ /dev/null @@ -1,55 +0,0 @@ -# Copyright 2019-2020 OMG Network Pte Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -defmodule OMG.WatcherInfo.ExitConsumer do - @moduledoc """ - Subscribes to exit events and inserts them to WatcherInfo.DB. - """ - require Logger - alias OMG.WatcherInfo.DB.EthEvent - - @default_bus_module OMG.Bus - - def start_link(args) do - GenServer.start_link(__MODULE__, args) - end - - ### Server - - use GenServer - - def init(args) do - bus_module = Keyword.get(args, :bus_module, @default_bus_module) - - state = %{ - topic: Keyword.fetch!(args, :topic), - event_type: Keyword.fetch!(args, :event_type) - } - - :ok = bus_module.subscribe(state.topic, link: true) - - _ = Logger.info("Started #{inspect(__MODULE__)}, listen to #{inspect(state.topic)}") - {:ok, state} - end - - def handle_info({:internal_event_bus, :data, data}, state) do - _ = - Logger.debug( - "Received event from #{inspect(state.topic)} typeof #{inspect(state.event_type)} Data:\n#{inspect(data)}" - ) - - _ = EthEvent.insert_exits!(data, state.event_type) - {:noreply, state} - end -end diff --git a/apps/omg_watcher_info/lib/omg_watcher_info/pending_block_processor.ex b/apps/omg_watcher_info/lib/omg_watcher_info/pending_block_processor.ex deleted file mode 100644 index eaefcf9329..0000000000 --- a/apps/omg_watcher_info/lib/omg_watcher_info/pending_block_processor.ex +++ /dev/null @@ -1,53 +0,0 @@ -# Copyright 2019-2020 OMG Network Pte Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -defmodule OMG.WatcherInfo.PendingBlockProcessor do - @moduledoc """ - This module is in charge of processing the queue of pending blocks that are waiting to - be inserted in the database. - It internally relies on a timer that will check the queue state at every iteration and - will process pending blocks one by one until the queue is empty. - """ - - require Logger - - use GenServer - - alias OMG.WatcherInfo.PendingBlockProcessor.Storage - - def start_link(args) do - GenServer.start_link(__MODULE__, args, name: Keyword.get(args, :name, __MODULE__)) - end - - def init(args) do - interval = Keyword.fetch!(args, :processing_interval) - _ = Logger.info("Started #{inspect(__MODULE__)}") - {:ok, %{interval: interval, block: nil}, interval} - end - - def handle_info(:timeout, state) do - block = Storage.get_next_pending_block() - {:noreply, %{state | block: block}, {:continue, :process_block}} - end - - def handle_continue(:process_block, %{block: nil} = state) do - {:noreply, state, state.interval} - end - - def handle_continue(:process_block, %{block: block} = state) do - {:ok, _} = Storage.process_block(block) - - {:noreply, %{state | block: nil}, 1} - end -end diff --git a/apps/omg_watcher_info/lib/omg_watcher_info/pending_block_processor/storage.ex b/apps/omg_watcher_info/lib/omg_watcher_info/pending_block_processor/storage.ex deleted file mode 100644 index d549640643..0000000000 --- a/apps/omg_watcher_info/lib/omg_watcher_info/pending_block_processor/storage.ex +++ /dev/null @@ -1,32 +0,0 @@ -# Copyright 2019-2020 OMG Network Pte Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -defmodule OMG.WatcherInfo.PendingBlockProcessor.Storage do - @moduledoc """ - Contains storage related functions of the PendingBlockProcessor - """ - - alias OMG.WatcherInfo.DB.Block - alias OMG.WatcherInfo.DB.PendingBlock - - @spec get_next_pending_block() :: nil | %PendingBlock{} - def get_next_pending_block() do - PendingBlock.get_next_to_process() - end - - @spec process_block(PendingBlock.t()) :: {:ok, %Block{}} | {:error, any()} - def process_block(block) do - Block.insert_from_pending_block(block) - end -end diff --git a/apps/omg_watcher_info/lib/omg_watcher_info/pending_block_queue_length_checker.ex b/apps/omg_watcher_info/lib/omg_watcher_info/pending_block_queue_length_checker.ex deleted file mode 100644 index d5edfa0b33..0000000000 --- a/apps/omg_watcher_info/lib/omg_watcher_info/pending_block_queue_length_checker.ex +++ /dev/null @@ -1,41 +0,0 @@ -# Copyright 2019-2020 OMG Network Pte Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -defmodule OMG.WatcherInfo.PendingBlockQueueLengthChecker do - @moduledoc """ - Periodically checks the size of the pending block queue and reports it to telemetry. - """ - - require Logger - - use GenServer - - alias OMG.WatcherInfo.PendingBlockQueueLengthChecker.Storage - - def start_link(args) do - GenServer.start_link(__MODULE__, args, name: Keyword.get(args, :name, __MODULE__)) - end - - def init(args) do - interval = Keyword.fetch!(args, :check_interval) - _ = Logger.info("Started #{inspect(__MODULE__)}") - {:ok, %{interval: interval}, interval} - end - - def handle_info(:timeout, state) do - length = Storage.get_queue_length() - _ = :telemetry.execute([:pending_block_queue_length, __MODULE__], %{length: length}, %{}) - {:noreply, state, state.interval} - end -end diff --git a/apps/omg_watcher_info/lib/omg_watcher_info/pending_block_queue_length_checker/storage.ex b/apps/omg_watcher_info/lib/omg_watcher_info/pending_block_queue_length_checker/storage.ex deleted file mode 100644 index 2041248322..0000000000 --- a/apps/omg_watcher_info/lib/omg_watcher_info/pending_block_queue_length_checker/storage.ex +++ /dev/null @@ -1,26 +0,0 @@ -# Copyright 2019-2020 OMG Network Pte Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -defmodule OMG.WatcherInfo.PendingBlockQueueLengthChecker.Storage do - @moduledoc """ - Contains storage related functions of the PendingBlockQueueLengthChecker - """ - - alias OMG.WatcherInfo.DB.PendingBlock - - @spec get_queue_length() :: non_neg_integer() - def get_queue_length() do - PendingBlock.get_count() - end -end diff --git a/apps/omg_watcher_info/lib/omg_watcher_info/supervisor.ex b/apps/omg_watcher_info/lib/omg_watcher_info/supervisor.ex index ede17f4fb5..a96bafcb75 100644 --- a/apps/omg_watcher_info/lib/omg_watcher_info/supervisor.ex +++ b/apps/omg_watcher_info/lib/omg_watcher_info/supervisor.ex @@ -21,7 +21,6 @@ defmodule OMG.WatcherInfo.Supervisor do use OMG.Utils.LoggerExt alias OMG.WatcherInfo - alias OMG.WatcherInfo.Configuration if Mix.env() == :test do defmodule Sandbox do @@ -62,31 +61,8 @@ defmodule OMG.WatcherInfo.Supervisor do } ] ++ @children_run_after_repo - children = [ - {OMG.WatcherInfo.BlockApplicationConsumer, []}, - {OMG.WatcherInfo.PendingBlockProcessor, [processing_interval: Configuration.pending_block_processing_interval()]}, - {OMG.WatcherInfo.PendingBlockQueueLengthChecker, [check_interval: Configuration.block_queue_check_interval()]}, - {OMG.WatcherInfo.DepositConsumer, []}, - Supervisor.child_spec( - {OMG.WatcherInfo.ExitConsumer, [topic: {:root_chain, "ExitStarted"}, event_type: :standard_exit]}, - id: :std_exit_consumer - ), - Supervisor.child_spec( - {OMG.WatcherInfo.ExitConsumer, [topic: {:watcher, "InFlightExitStarted"}, event_type: :in_flight_exit]}, - id: :ife_exit_consumer - ), - Supervisor.child_spec( - {OMG.WatcherInfo.ExitConsumer, [topic: {:watcher, "InFlightTxOutputPiggybacked"}, event_type: :in_flight_exit]}, - id: :ife_exit_output_piggybacked_consumer - ), - Supervisor.child_spec( - {OMG.WatcherInfo.ExitConsumer, [topic: {:watcher, "InFlightExitOutputWithdrawn"}, event_type: :in_flight_exit]}, - id: :ife_exit_processed_consumer - ) - ] - opts = [strategy: :one_for_one] _ = Logger.info("Starting #{inspect(__MODULE__)}") - Supervisor.init(top_children ++ children, opts) + Supervisor.init(top_children, opts) end end diff --git a/apps/omg_watcher_info/test/fixtures.exs b/apps/omg_watcher_info/test/fixtures.exs index 225902c8cf..eb7446f680 100644 --- a/apps/omg_watcher_info/test/fixtures.exs +++ b/apps/omg_watcher_info/test/fixtures.exs @@ -92,7 +92,12 @@ defmodule OMG.WatcherInfo.Fixtures do ]}, {2000, [ - OMG.TestHelper.create_recovered([{1000, 1, 0, alice}], @eth, [{bob, 99}, {alice, 1}], <<1337::256>>) + OMG.TestHelper.create_recovered( + [{1000, 1, 0, alice}], + @eth, + [{bob, 99}, {alice, 1}], + <<1337::256>> + ) ]}, {3000, [ @@ -142,25 +147,21 @@ defmodule OMG.WatcherInfo.Fixtures do end defp prepare_one_block({blknum, recovered_txs}) do - mined_block = %{ + block_application = %{ transactions: recovered_txs, - blknum: blknum, - blkhash: "##{blknum}", + number: blknum, + hash: <>, timestamp: 1_540_465_606, eth_height: 1 } - {:ok, pending_block} = - DB.PendingBlock.insert(%{ - data: :erlang.term_to_binary(mined_block), - blknum: blknum - }) - - {:ok, _} = DB.Block.insert_from_pending_block(pending_block) + {:ok, _} = DB.Block.insert_from_block_application(block_application) recovered_txs |> Enum.with_index() - |> Enum.map(fn {recovered_tx, txindex} -> {blknum, txindex, recovered_tx.tx_hash, recovered_tx} end) + |> Enum.map(fn {recovered_tx, txindex} -> + {blknum, txindex, recovered_tx.tx_hash, recovered_tx} + end) end defp ensure_web_started(module, function, args, counter) do diff --git a/apps/omg_watcher_info/test/omg_watcher_info/block_application_consumer_test.exs b/apps/omg_watcher_info/test/omg_watcher_info/block_application_consumer_test.exs deleted file mode 100644 index 78da1b0d4a..0000000000 --- a/apps/omg_watcher_info/test/omg_watcher_info/block_application_consumer_test.exs +++ /dev/null @@ -1,71 +0,0 @@ -# Copyright 2019-2020 OMG Network Pte Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -defmodule OMG.WatcherInfo.BlockApplicationConsumerTest do - use ExUnitFixtures - use ExUnit.Case, async: true - - alias OMG.Watcher.BlockGetter.BlockApplication - alias OMG.WatcherInfo.BlockApplicationConsumer - alias OMG.WatcherInfo.DB - alias OMG.WatcherInfo.DB.PendingBlock - - setup tags do - {:ok, pid} = - BlockApplicationConsumer.start_link( - bus_module: __MODULE__.FakeBus, - name: BlockApplicationConsumerTest - ) - - Map.put(tags, :pid, pid) - end - - describe "handle_info/2" do - @tag fixtures: [:phoenix_ecto_sandbox] - test "inserts the given block application into pending block", %{pid: pid} do - block_application = %BlockApplication{ - number: 1_000, - eth_height: 1, - eth_height_done: true, - hash: "0x1000", - transactions: [], - timestamp: 1_576_500_000 - } - - send_events_and_wait_until_processed(block_application, pid) - - expected_data = - :erlang.term_to_binary(%{ - eth_height: block_application.eth_height, - blknum: block_application.number, - blkhash: block_application.hash, - timestamp: block_application.timestamp, - transactions: block_application.transactions - }) - - assert [%PendingBlock{blknum: 1000, data: ^expected_data}] = DB.Repo.all(PendingBlock) - end - end - - defp send_events_and_wait_until_processed(block, pid) do - Process.send(pid, {:internal_event_bus, :block_received, block}, [:noconnect]) - - # this waits for all messages in process inbox is processed - _ = :sys.get_state(pid) - end - - defmodule FakeBus do - def subscribe(_topic, _args), do: :ok - end -end diff --git a/apps/omg_watcher_info/test/omg_watcher_info/block_applicator_test.exs b/apps/omg_watcher_info/test/omg_watcher_info/block_applicator_test.exs new file mode 100644 index 0000000000..057244735c --- /dev/null +++ b/apps/omg_watcher_info/test/omg_watcher_info/block_applicator_test.exs @@ -0,0 +1,66 @@ +# Copyright 2019-2020 OMG Network Pte Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +defmodule OMG.WatcherInfo.BlockApplicatorTest do + use ExUnitFixtures + use ExUnit.Case, async: false + + alias OMG.Watcher.BlockGetter.BlockApplication + alias OMG.WatcherInfo.BlockApplicator + alias OMG.WatcherInfo.DB + + import Ecto.Query, only: [where: 2] + + setup do + eth = <<0::160>> + alice = OMG.TestHelper.generate_entity() + tx = OMG.TestHelper.create_recovered([{1, 0, 0, alice}], eth, [{alice, 100}]) + + block_application = %BlockApplication{ + number: 1_000, + eth_height: 1, + eth_height_done: true, + hash: "0x1000", + transactions: [tx], + timestamp: 1_576_500_000 + } + + {:ok, block_application: block_application} + end + + describe "insert_block!" do + @tag fixtures: [:phoenix_ecto_sandbox] + test "inserts the given block application into pending block", %{block_application: block_application} do + assert :ok = BlockApplicator.insert_block!(block_application) + + assert [%DB.Block{blknum: 1_000}] = DB.Repo.all(DB.Block) + end + + @tag fixtures: [:phoenix_ecto_sandbox] + test "insert block operation is idempotent", %{block_application: block_application} do + blknum = block_application.number + :ok = BlockApplicator.insert_block!(block_application) + + assert :ok = BlockApplicator.insert_block!(block_application) + assert %DB.Block{blknum: ^blknum} = DB.Block |> where(blknum: ^blknum) |> DB.Repo.one() + end + + @tag fixtures: [:phoenix_ecto_sandbox] + test "breaks when block application is invalid", %{block_application: block_application} do + block_application = %BlockApplication{block_application | number: "not an integer"} + + assert_raise MatchError, fn -> BlockApplicator.insert_block!(block_application) end + end + end +end diff --git a/apps/omg_watcher_info/test/omg_watcher_info/db/block_test.exs b/apps/omg_watcher_info/test/omg_watcher_info/db/block_test.exs index 60560bb4ba..c06a332c24 100644 --- a/apps/omg_watcher_info/test/omg_watcher_info/db/block_test.exs +++ b/apps/omg_watcher_info/test/omg_watcher_info/db/block_test.exs @@ -314,68 +314,56 @@ defmodule OMG.WatcherInfo.DB.BlockTest do end end - describe "insert_from_pending_block/1" do - @tag fixtures: [:phoenix_ecto_sandbox] - test "inserts the block, its transactions and transaction outputs" do - alice = OMG.TestHelper.generate_entity() - bob = OMG.TestHelper.generate_entity() - + describe "insert_from_block_application/1" do + @tag fixtures: [:phoenix_ecto_sandbox, :alice, :bob] + test "inserts the block, its transactions and transaction outputs", %{alice: alice, bob: bob} do tx_1 = OMG.TestHelper.create_recovered([{1, 0, 0, alice}], @eth, [{bob, 300}]) tx_2 = OMG.TestHelper.create_recovered([{1, 0, 0, alice}], @eth, [{bob, 500}]) - mined_block = %{ + block_application = %{ transactions: [tx_1, tx_2], - blknum: 1000, - blkhash: "0x1000", + number: 1000, + hash: "0x1000", timestamp: 1_576_500_000, eth_height: 1 } - pending_block = insert(:pending_block, %{data: :erlang.term_to_binary(mined_block), blknum: 1000}) - - {:ok, block} = DB.Block.insert_from_pending_block(pending_block) + {:ok, block} = DB.Block.insert_from_block_application(block_application) assert %DB.Block{} = block["current_block"] current_block_hash = block["current_block"].hash - assert mined_block.blkhash == current_block_hash + assert block_application.hash == current_block_hash assert DB.Repo.get(DB.Transaction, tx_1.tx_hash) assert DB.Repo.get(DB.Transaction, tx_2.tx_hash) end - @tag fixtures: [:phoenix_ecto_sandbox] - test "returns an error when inserting with an existing blknum" do - existing = insert(:block, blknum: 1000, hash: "0x1000", eth_height: 1, timestamp: 100) - alice = OMG.TestHelper.generate_entity() - bob = OMG.TestHelper.generate_entity() - + @tag fixtures: [:phoenix_ecto_sandbox, :alice, :bob] + test "returns an error when inserting with an existing blknum", %{alice: alice, bob: bob} do tx_1 = OMG.TestHelper.create_recovered([{1, 0, 0, alice}], @eth, [{bob, 300}]) tx_2 = OMG.TestHelper.create_recovered([{1, 0, 0, alice}], @eth, [{bob, 500}]) - mined_block = %{ + block_application = %{ transactions: [tx_1, tx_2], - blknum: existing.blknum, - blkhash: existing.hash, + number: 1000, + hash: "0x1000", timestamp: 1_576_500_000, - eth_height: existing.eth_height + eth_height: 1 } - pending_block = insert(:pending_block, %{data: :erlang.term_to_binary(mined_block), blknum: existing.blknum}) + {:ok, _block} = DB.Block.insert_from_block_application(block_application) - {:error, "current_block", changeset, %{}} = DB.Block.insert_from_pending_block(pending_block) + assert {:error, "current_block", changeset, %{}} = DB.Block.insert_from_block_application(block_application) assert changeset.errors == [ blknum: {"has already been taken", [constraint: :unique, constraint_name: "blocks_pkey"]} ] end - @tag fixtures: [:phoenix_ecto_sandbox] + @tag fixtures: [:phoenix_ecto_sandbox, :alice, :bob] @tag :watcher_info @tag timeout: :infinity - test "full block test" do - alice = OMG.TestHelper.generate_entity() - bob = OMG.TestHelper.generate_entity() - + test "full block test", %{alice: alice, bob: bob} do tx_1 = OMG.TestHelper.create_recovered([{1, 0, 0, alice}], @eth, [{bob, 1}]) tx_2 = OMG.TestHelper.create_recovered([{1, 0, 0, alice}], @eth, [{bob, 2}]) @@ -387,21 +375,19 @@ defmodule OMG.WatcherInfo.DB.BlockTest do OMG.TestHelper.create_recovered([{1, 0, 0, a}], @eth, [{b, amount}]) end) - mined_block = %{ + block_application = %{ transactions: [tx_1, tx_2] ++ transactions, - blknum: 1000, - blkhash: "0x1000", + number: 1000, + hash: "0x1000", timestamp: 1_576_500_000, eth_height: 1 } - pending_block = insert(:pending_block, %{data: :erlang.term_to_binary(mined_block), blknum: 1000}) - - {:ok, block} = DB.Block.insert_from_pending_block(pending_block) + {:ok, block} = DB.Block.insert_from_block_application(block_application) assert %DB.Block{} = block["current_block"] current_block_hash = block["current_block"].hash - assert mined_block.blkhash == current_block_hash + assert block_application.hash == current_block_hash assert DB.Repo.get(DB.Transaction, tx_1.tx_hash) assert DB.Repo.get(DB.Transaction, tx_2.tx_hash) diff --git a/apps/omg_watcher_info/test/omg_watcher_info/db/eth_event_test.exs b/apps/omg_watcher_info/test/omg_watcher_info/db/eth_event_test.exs index 8562848ac7..01abc9e675 100644 --- a/apps/omg_watcher_info/test/omg_watcher_info/db/eth_event_test.exs +++ b/apps/omg_watcher_info/test/omg_watcher_info/db/eth_event_test.exs @@ -264,7 +264,8 @@ defmodule OMG.WatcherInfo.DB.EthEventTest do eth_height: expected_exit_eth_height } ], - :standard_exit + :standard_exit, + nil ) %{data: utxos_after_exit} = DB.TxOutput.get_utxos(address: expected_owner) @@ -302,7 +303,7 @@ defmodule OMG.WatcherInfo.DB.EthEventTest do } ] - assert :ok = DB.EthEvent.insert_exits!(exits, :in_flight_exit) + assert :ok = DB.EthEvent.insert_exits!(exits, :in_flight_exit, :InFlightExitStarted) end @tag fixtures: [:alice, :initial_blocks] @@ -337,7 +338,7 @@ defmodule OMG.WatcherInfo.DB.EthEventTest do } ] - assert :ok = DB.EthEvent.insert_exits!(exits, expected_event_type) + assert :ok = DB.EthEvent.insert_exits!(exits, expected_event_type, :InFlightExitStarted) txo1 = DB.TxOutput.get_by_position(utxo_pos1) assert txo1 != nil @@ -399,7 +400,7 @@ defmodule OMG.WatcherInfo.DB.EthEventTest do } ] - assert :ok = DB.EthEvent.insert_exits!(exits, expected_event_type) + assert :ok = DB.EthEvent.insert_exits!(exits, expected_event_type, :InFlightExitOutputWithdrawn) assert_txoutput_spent_by_event( txhash1, @@ -420,6 +421,119 @@ defmodule OMG.WatcherInfo.DB.EthEventTest do ) end + @tag fixtures: [:initial_blocks] + test "Allows for missing output when the event explicitly allows it" do + max_blknum = DB.Repo.aggregate(DB.TxOutput, :max, :blknum) + pos_from_future = Utxo.position(max_blknum + 1, 0, 0) + + exits = [ + %{ + root_chain_txhash: Crypto.hash(<<6::256>>), + log_index: 0, + eth_height: 0, + utxo_pos: Utxo.Position.encode(pos_from_future) + } + ] + + assert :ok = DB.EthEvent.insert_exits!(exits, :in_flight_exit, :InFlightTxOutputPiggybacked) + end + + @tag fixtures: [:initial_blocks] + test "Fails when missing output disallowed" do + max_blknum = DB.Repo.aggregate(DB.TxOutput, :max, :blknum) + pos_from_future = Utxo.position(max_blknum + 1, 0, 0) + + exits = [ + %{ + root_chain_txhash: Crypto.hash(<<6::256>>), + log_index: 0, + eth_height: 0, + utxo_pos: Utxo.Position.encode(pos_from_future) + } + ] + + assert_raise CaseClauseError, fn -> + DB.EthEvent.insert_exits!(exits, :in_flight_exit, :InFlightExitOutputWithdrawn) + end + end + + @tag fixtures: [:phoenix_ecto_sandbox, :alice] + test "deposited and exited utxo is retrievable by position", %{alice: alice} do + assert :ok = + DB.EthEvent.insert_deposits!([ + %{ + root_chain_txhash: Crypto.hash(<<1000::256>>), + eth_height: 1, + log_index: 0, + owner: alice.addr, + currency: @eth, + amount: 333, + blknum: 1 + } + ]) + + assert :ok = + DB.EthEvent.insert_exits!( + [ + %{ + root_chain_txhash: Crypto.hash(<<1001::256>>), + eth_height: 2, + log_index: 1, + utxo_pos: Utxo.Position.encode(Utxo.position(1, 0, 0)) + } + ], + :in_flight_exit, + :InFlightExitStarted + ) + + assert %DB.TxOutput{ethevents: events} = DB.TxOutput.get_by_position(Utxo.position(1, 0, 0)) + + assert [ + %DB.EthEvent{event_type: :deposit}, + %DB.EthEvent{event_type: :in_flight_exit} + ] = Enum.sort(events, &(&1.eth_height < &2.eth_height)) + end + + @tag fixtures: [:initial_blocks] + test "only one exit type can be associated to output which is retrievable by output_id", + %{initial_blocks: blocks} do + # Get the transaction with _unspent_ output + {blknum, txindex, txhash, _tx} = Enum.find(blocks, fn {blknum, _, _, _} -> blknum == 2000 end) + utxo_pos = Utxo.Position.encode(Utxo.position(blknum, txindex, 0)) + + assert :ok = + DB.EthEvent.insert_exits!( + [ + %{ + root_chain_txhash: Crypto.hash(<<1001::256>>), + eth_height: 1, + log_index: 0, + utxo_pos: utxo_pos + } + ], + :standard_exit, + nil + ) + + assert :ok = + DB.EthEvent.insert_exits!( + [ + %{ + root_chain_txhash: Crypto.hash(<<1002::256>>), + eth_height: 2, + log_index: 1, + utxo_pos: utxo_pos + } + ], + :in_flight_exit, + :InFlightExitStarted + ) + + assert %DB.TxOutput{ethevents: events} = DB.TxOutput.get_by_output_id(txhash, 0) + + assert [%DB.EthEvent{event_type: :standard_exit}] = events + end + defp assert_txoutput_spent_by_event(txhash, oindex, log_index, eth_txhash, eth_height, event_type) do txo = DB.TxOutput.get_by_output_id(txhash, oindex) diff --git a/apps/omg_watcher_info/test/omg_watcher_info/db/pending_block_test.exs b/apps/omg_watcher_info/test/omg_watcher_info/db/pending_block_test.exs deleted file mode 100644 index 4d97a1179f..0000000000 --- a/apps/omg_watcher_info/test/omg_watcher_info/db/pending_block_test.exs +++ /dev/null @@ -1,75 +0,0 @@ -# Copyright 2019-2020 OMG Network Pte Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -defmodule OMG.WatcherInfo.DB.PendingBlockTest do - use ExUnitFixtures - use ExUnit.Case, async: false - use OMG.Fixtures - - import OMG.WatcherInfo.Factory - - alias OMG.WatcherInfo.DB - alias OMG.WatcherInfo.DB.PendingBlock - - describe "insert/1" do - @tag fixtures: [:phoenix_ecto_sandbox] - test "casts blknum and data" do - data = :erlang.term_to_binary(%{something: :nice}) - assert {:ok, block} = PendingBlock.insert(%{data: data, blknum: 1000, bad_key: :value}) - - assert %PendingBlock{blknum: 1000, data: _data} = block - end - - @tag fixtures: [:phoenix_ecto_sandbox] - test "returns an error if blknum is already used" do - blknum = 1000 - insert(:pending_block, %{data: <<0>>, blknum: blknum}) - - assert {:error, %Ecto.Changeset{}} = PendingBlock.insert(%{data: <<1>>, blknum: blknum}) - end - end - - describe "get_next_to_process/0" do - @tag fixtures: [:phoenix_ecto_sandbox] - test "returns the next pending block" do - b_1 = insert(:pending_block) - b_2 = insert(:pending_block) - _b_3 = insert(:pending_block) - - assert PendingBlock.get_next_to_process() == b_1 - - {:ok, _} = DB.Repo.delete(b_1) - - assert PendingBlock.get_next_to_process() == b_2 - end - - @tag fixtures: [:phoenix_ecto_sandbox] - test "returns nil if no block to process" do - assert PendingBlock.get_next_to_process() == nil - end - end - - describe "get_count/0" do - @tag fixtures: [:phoenix_ecto_sandbox] - test "returns the count of pending blocks" do - assert PendingBlock.get_count() == 0 - - insert(:pending_block) - insert(:pending_block) - insert(:pending_block) - - assert PendingBlock.get_count() == 3 - end - end -end diff --git a/apps/omg_watcher_info/test/omg_watcher_info/db/txoutput_test.exs b/apps/omg_watcher_info/test/omg_watcher_info/db/txoutput_test.exs index 0b7f133a48..83ce698711 100644 --- a/apps/omg_watcher_info/test/omg_watcher_info/db/txoutput_test.exs +++ b/apps/omg_watcher_info/test/omg_watcher_info/db/txoutput_test.exs @@ -34,17 +34,15 @@ defmodule OMG.WatcherInfo.DB.TxOutputTest do big_amount = power_of_2.(256) - 1 - mined_block = %{ + block_application = %{ transactions: [OMG.TestHelper.create_recovered([], @eth, [{alice, big_amount}])], - blknum: 11_000, - blkhash: <>, + number: 11_000, + hash: <>, timestamp: :os.system_time(:second), eth_height: 10 } - pending_block = insert(:pending_block, %{data: :erlang.term_to_binary(mined_block), blknum: 11_000}) - - DB.Block.insert_from_pending_block(pending_block) + {:ok, _} = DB.Block.insert_from_block_application(block_application) utxo = DB.TxOutput.get_by_position(Utxo.position(11_000, 0, 0)) assert not is_nil(utxo) @@ -100,6 +98,7 @@ defmodule OMG.WatcherInfo.DB.TxOutputTest do spend_utxo_params = spend_uxto_params_from_txoutput(txinput) _ = DB.Repo.transaction(DB.TxOutput.spend_utxos(Ecto.Multi.new(), [spend_utxo_params])) + spent_txoutput = DB.TxOutput.get_by_position(Utxo.position(txinput.blknum, txinput.txindex, txinput.oindex)) assert :eq == DateTime.compare(txinput.inserted_at, spent_txoutput.inserted_at) diff --git a/apps/omg_watcher_info/test/omg_watcher_info/exit_consumer_test.exs b/apps/omg_watcher_info/test/omg_watcher_info/exit_consumer_test.exs deleted file mode 100644 index 8002b1a72a..0000000000 --- a/apps/omg_watcher_info/test/omg_watcher_info/exit_consumer_test.exs +++ /dev/null @@ -1,213 +0,0 @@ -# Copyright 2019-2020 OMG Network Pte Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -defmodule OMG.WatcherInfo.ExitConsumerTest do - use ExUnitFixtures - use ExUnit.Case, async: false - use OMG.Fixtures - - alias OMG.Crypto - alias OMG.Utxo - alias OMG.Utxo.Position - alias OMG.WatcherInfo.DB - alias OMG.WatcherInfo.ExitConsumer - - require Utxo - - @eth <<0::160>> - @root_chain_txhash1 <<11::256>> - @root_chain_txhash2 <<12::256>> - - setup_all do - {:ok, _} = - GenServer.start_link( - ExitConsumer, - [topic: :watcher_test_topic, bus_module: __MODULE__.FakeBus, event_type: :in_flight_exit], - name: TestExitConsumer - ) - - _ = - on_exit(fn -> - with pid when is_pid(pid) <- GenServer.whereis(TestExitConsumer) do - :ok = GenServer.stop(TestExitConsumer) - end - end) - end - - describe "ExitConsumer.handle_info/2" do - @tag fixtures: [:phoenix_ecto_sandbox] - test "spending non-existing output does not break" do - pos_1 = Position.encode(Utxo.position(5001, 0, 1)) - txhash = Crypto.hash(<>) - - event_data = [ - %{log_index: 2, eth_height: 2, root_chain_txhash: @root_chain_txhash2, utxo_pos: pos_1}, - %{log_index: 1, eth_height: 1, root_chain_txhash: @root_chain_txhash1, txhash: txhash, oindex: 1} - ] - - send_events_and_wait_until_processed(event_data) - - assert_consumer_alive() - end - - @tag fixtures: [:alice, :initial_blocks] - test "receiving ife started or finalized events spends given outputs", %{alice: alice} do - spent_utxos_pos = alice.addr |> get_utxos_pos() |> Enum.take(2) - [pos_1, pos_2] = Enum.map(spent_utxos_pos, &Position.encode/1) - - # Note: event data is the same for InFlightExitStarted and InFlightExitOutputWithdrawn events - event_data = [ - %{log_index: 2, eth_height: 2, root_chain_txhash: @root_chain_txhash2, utxo_pos: pos_2}, - %{log_index: 1, eth_height: 1, root_chain_txhash: @root_chain_txhash1, utxo_pos: pos_1} - ] - - send_events_and_wait_until_processed(event_data) - - assert alice.addr |> get_utxos_pos() |> none_in(spent_utxos_pos) - end - - @tag fixtures: [:alice, :initial_blocks] - test "receiving IFE output piggybacked event spends this output", %{alice: alice} do - %{creating_txhash: txhash, oindex: oindex} = output = alice.addr |> get_utxos_for() |> hd() - spent_utxos_pos = get_utxos_pos([output]) - - event_data = [ - %{ - log_index: 2, - eth_height: 2, - root_chain_txhash: @root_chain_txhash2, - txhash: txhash, - oindex: oindex - } - ] - - send_events_and_wait_until_processed(event_data) - - assert alice.addr |> get_utxos_pos() |> none_in(spent_utxos_pos) - end - - @tag fixtures: [:alice, :initial_blocks] - test "receiving IFE output piggybacked event is reflected in balance", %{alice: alice} do - %{creating_txhash: txhash, oindex: oindex, amount: exiting_amount} = alice.addr |> get_utxos_for() |> hd() - [%{currency: @eth, amount: initial_balance}] = DB.TxOutput.get_balance(alice.addr) - - event_data = [ - %{ - log_index: 2, - eth_height: 2, - root_chain_txhash: @root_chain_txhash2, - txhash: txhash, - oindex: oindex - } - ] - - send_events_and_wait_until_processed(event_data) - - expected_balance = initial_balance - exiting_amount - assert [%{currency: @eth, amount: expected_balance}] == DB.TxOutput.get_balance(alice.addr) - end - - @tag fixtures: [:alice, :initial_blocks] - test "spending output more than once does not overwrite first event", %{alice: alice} do - %{creating_txhash: txhash, oindex: oindex} = output = alice.addr |> get_utxos_for() |> hd() - txo_pos = get_utxos_pos([output]) |> hd() - - expected_log_index = 1 - expected_eth_height = 1 - expected_root_hash = <<1::256>> - - send_events_and_wait_until_processed([ - %{ - log_index: expected_log_index, - eth_height: expected_eth_height, - root_chain_txhash: expected_root_hash, - txhash: txhash, - oindex: oindex - } - ]) - - assert %DB.TxOutput{ - ethevents: [ - %DB.EthEvent{ - log_index: ^expected_log_index, - eth_height: ^expected_eth_height, - root_chain_txhash: ^expected_root_hash - } - ] - } = DB.TxOutput.get_by_position(txo_pos) - - event_data = [ - %{ - log_index: 2, - root_chain_txhash: @root_chain_txhash2, - eth_height: expected_eth_height, - utxo_pos: Position.encode(txo_pos) - } - ] - - send_events_and_wait_until_processed(event_data) - - assert %DB.TxOutput{ - ethevents: [ - %DB.EthEvent{ - log_index: ^expected_log_index, - root_chain_txhash: ^expected_root_hash, - eth_height: ^expected_eth_height - } - ] - } = DB.TxOutput.get_by_position(txo_pos) - end - - defp get_utxos_for(address) do - [address: address] - |> DB.TxOutput.get_utxos() - |> Map.get(:data) - end - - defp get_utxos_pos(<<_::160>> = address) do - address - |> get_utxos_for() - |> get_utxos_pos() - end - - defp get_utxos_pos(outputs) when is_list(outputs) do - Enum.map(outputs, fn %{blknum: blknum, txindex: txindex, oindex: oindex} -> - Utxo.position(blknum, txindex, oindex) - end) - end - - defp none_in(address_utxos, pos_to_check) do - Enum.all?(pos_to_check, &(&1 not in address_utxos)) - end - - defp send_events_and_wait_until_processed(data) do - pid = assert_consumer_alive() - - Process.send(pid, {:internal_event_bus, :data, data}, [:noconnect]) - - # this waits for all messages in process inbox is processed - _ = :sys.get_state(pid) - end - - defp assert_consumer_alive() do - pid = GenServer.whereis(TestExitConsumer) - assert is_pid(pid) and Process.alive?(pid) - pid - end - end - - defmodule FakeBus do - def subscribe(_topic, _args), do: :ok - end -end diff --git a/apps/omg_watcher_info/test/omg_watcher_info/pending_block_processor/storage_test.exs b/apps/omg_watcher_info/test/omg_watcher_info/pending_block_processor/storage_test.exs deleted file mode 100644 index 32a0bafb50..0000000000 --- a/apps/omg_watcher_info/test/omg_watcher_info/pending_block_processor/storage_test.exs +++ /dev/null @@ -1,65 +0,0 @@ -# Copyright 2019-2020 OMG Network Pte Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -defmodule OMG.WatcherInfo.PendingBlockProcessor.StorageTest do - use ExUnitFixtures - use ExUnit.Case, async: false - - import OMG.WatcherInfo.Factory - import Ecto.Query, only: [from: 2] - - alias OMG.WatcherInfo.DB - alias OMG.WatcherInfo.DB.PendingBlock - alias OMG.WatcherInfo.PendingBlockProcessor.Storage - - describe "get_next_pending_block/0" do - @tag fixtures: [:phoenix_ecto_sandbox] - test "returns the next block to process when exist" do - %{blknum: blknum} = insert(:pending_block) - - assert %{blknum: ^blknum} = Storage.get_next_pending_block() - end - - @tag fixtures: [:phoenix_ecto_sandbox] - test "returns nil when no pending block" do - assert Storage.get_next_pending_block() == nil - end - end - - describe "process_block/1" do - @tag fixtures: [:phoenix_ecto_sandbox] - test "insert the block into the storage and deletes it" do - block = insert(:pending_block) - - assert {:ok, _} = Storage.process_block(block) - - assert get_all() == [] - end - - @tag fixtures: [:phoenix_ecto_sandbox] - test "returns an error when failing" do - %{data: data, blknum: blknum_1} = block_1 = insert(:pending_block) - assert {:ok, _} = Storage.process_block(block_1) - - # inserting a second block with the same data params - block_2 = insert(:pending_block, %{data: data, blknum: blknum_1 + 1000}) - - assert {:error, _, _, _} = Storage.process_block(block_2) - end - end - - defp get_all() do - PendingBlock |> from(order_by: :blknum) |> DB.Repo.all() - end -end diff --git a/apps/omg_watcher_info/test/omg_watcher_info/pending_block_processor_test.exs b/apps/omg_watcher_info/test/omg_watcher_info/pending_block_processor_test.exs deleted file mode 100644 index f102b6a562..0000000000 --- a/apps/omg_watcher_info/test/omg_watcher_info/pending_block_processor_test.exs +++ /dev/null @@ -1,82 +0,0 @@ -# Copyright 2019-2020 OMG Network Pte Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -defmodule OMG.WatcherInfo.PendingBlockProcessorTest do - use ExUnitFixtures - use ExUnit.Case, async: false - - import Ecto.Query, only: [from: 2] - import OMG.WatcherInfo.Factory - - alias OMG.WatcherInfo.DB - alias OMG.WatcherInfo.DB.Block - alias OMG.WatcherInfo.DB.PendingBlock - alias OMG.WatcherInfo.PendingBlockProcessor - - @interval 200 - - describe "handle_info/2" do - setup tags do - {:ok, pid} = - PendingBlockProcessor.start_link( - processing_interval: @interval, - name: PendingBlockProcessorTest - ) - - Map.put(tags, :pid, pid) - end - - @tag fixtures: [:phoenix_ecto_sandbox] - test "processes the next pending block in queue", %{pid: pid} do - :erlang.trace(pid, true, [:receive]) - - assert get_all_pending_blocks() == [] - assert get_all_blocks() == [] - - %{blknum: blknum_1} = insert(:pending_block) - %{blknum: blknum_2} = insert(:pending_block) - %{blknum: blknum_3} = insert(:pending_block) - - assert_receive {:trace, ^pid, :receive, :timeout}, @interval + 50 - get_state(pid) - - assert [%{blknum: ^blknum_1}] = get_all_blocks() - assert [%{blknum: ^blknum_2}, %{blknum: ^blknum_3}] = get_all_pending_blocks() - - assert_receive {:trace, ^pid, :receive, :timeout} - get_state(pid) - - assert [%{blknum: ^blknum_3}] = get_all_pending_blocks() - assert [%{blknum: ^blknum_1}, %{blknum: ^blknum_2}] = get_all_blocks() - - assert_receive {:trace, ^pid, :receive, :timeout} - get_state(pid) - - assert get_all_pending_blocks() == [] - assert [%{blknum: ^blknum_1}, %{blknum: ^blknum_2}, %{blknum: ^blknum_3}] = get_all_blocks() - end - end - - defp get_all_pending_blocks() do - PendingBlock |> from(order_by: :blknum) |> DB.Repo.all() - end - - defp get_all_blocks() do - Block |> from(order_by: :blknum) |> DB.Repo.all() - end - - defp get_state(pid) do - :sys.get_state(pid) - end -end diff --git a/apps/omg_watcher_info/test/omg_watcher_info/pending_block_queue_length_checker/storage_test.exs b/apps/omg_watcher_info/test/omg_watcher_info/pending_block_queue_length_checker/storage_test.exs deleted file mode 100644 index 69e5124e54..0000000000 --- a/apps/omg_watcher_info/test/omg_watcher_info/pending_block_queue_length_checker/storage_test.exs +++ /dev/null @@ -1,42 +0,0 @@ -# Copyright 2019-2020 OMG Network Pte Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -defmodule OMG.WatcherInfo.PendingBlockQueueLengthChecker.StorageTest do - use ExUnitFixtures - use ExUnit.Case, async: false - - import OMG.WatcherInfo.Factory - - alias OMG.WatcherInfo.DB - alias OMG.WatcherInfo.PendingBlockQueueLengthChecker.Storage - - describe "get_queue_length/0" do - @tag fixtures: [:phoenix_ecto_sandbox] - test "returns the queue length" do - assert Storage.get_queue_length() == 0 - - block_1 = insert(:pending_block) - block_2 = insert(:pending_block) - block_3 = insert(:pending_block) - - assert Storage.get_queue_length() == 3 - - DB.Repo.delete!(block_1) - DB.Repo.delete!(block_2) - DB.Repo.delete!(block_3) - - assert Storage.get_queue_length() == 0 - end - end -end diff --git a/apps/omg_watcher_info/test/omg_watcher_info/pending_block_queue_length_checker_test.exs b/apps/omg_watcher_info/test/omg_watcher_info/pending_block_queue_length_checker_test.exs deleted file mode 100644 index 6ee69b5ff4..0000000000 --- a/apps/omg_watcher_info/test/omg_watcher_info/pending_block_queue_length_checker_test.exs +++ /dev/null @@ -1,108 +0,0 @@ -# Copyright 2019-2020 OMG Network Pte Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -defmodule OMG.WatcherInfo.PendingBlockQueueLengthCheckerTest do - use ExUnitFixtures - use ExUnit.Case, async: false - - import OMG.WatcherInfo.Factory - - alias OMG.WatcherInfo.DB - alias OMG.WatcherInfo.PendingBlockQueueLengthChecker - - @interval 100 - - setup tags do - {:ok, _pid} = - PendingBlockQueueLengthChecker.start_link( - check_interval: @interval, - name: PendingBlockQueueLengthCheckerTest - ) - - handler_id = {__MODULE__, :rand.uniform(100)} - - on_exit(fn -> - :telemetry.detach(handler_id) - end) - - Map.put(tags, :handler_id, handler_id) - end - - describe "handle_info/2" do - @tag fixtures: [:phoenix_ecto_sandbox] - test "emits a pending_block_queue_length event with the length", %{handler_id: handler_id} do - attach(handler_id, [:pending_block_queue_length, PendingBlockQueueLengthChecker]) - - assert_receive( - {:telemetry_event, [:pending_block_queue_length, PendingBlockQueueLengthChecker], %{length: 0}, %{}}, - @interval * 2 - ) - - block_1 = insert(:pending_block) - - assert_receive( - {:telemetry_event, [:pending_block_queue_length, PendingBlockQueueLengthChecker], %{length: 1}, %{}}, - @interval * 2 - ) - - block_2 = insert(:pending_block) - - assert_receive( - {:telemetry_event, [:pending_block_queue_length, PendingBlockQueueLengthChecker], %{length: 2}, %{}}, - @interval * 2 - ) - - block_3 = insert(:pending_block) - - assert_receive( - {:telemetry_event, [:pending_block_queue_length, PendingBlockQueueLengthChecker], %{length: 3}, %{}}, - @interval * 2 - ) - - DB.Repo.delete!(block_1) - - assert_receive( - {:telemetry_event, [:pending_block_queue_length, PendingBlockQueueLengthChecker], %{length: 2}, %{}}, - @interval * 2 - ) - - DB.Repo.delete!(block_2) - - assert_receive( - {:telemetry_event, [:pending_block_queue_length, PendingBlockQueueLengthChecker], %{length: 1}, %{}}, - @interval * 2 - ) - - DB.Repo.delete!(block_3) - - assert_receive( - {:telemetry_event, [:pending_block_queue_length, PendingBlockQueueLengthChecker], %{length: 0}, %{}}, - @interval * 2 - ) - end - end - - defp attach(handler_id, event) do - pid = self() - - :telemetry.attach( - handler_id, - event, - fn received_event, measurements, metadata, _ -> - send(pid, {:telemetry_event, received_event, measurements, metadata}) - end, - nil - ) - end -end diff --git a/apps/omg_watcher_info/test/omg_watcher_info/transaction_test.exs b/apps/omg_watcher_info/test/omg_watcher_info/transaction_test.exs index 8d14b8b232..f0a6c87bcb 100644 --- a/apps/omg_watcher_info/test/omg_watcher_info/transaction_test.exs +++ b/apps/omg_watcher_info/test/omg_watcher_info/transaction_test.exs @@ -14,7 +14,7 @@ defmodule OMG.WatcherInfo.TransactionTest do use ExUnitFixtures - use ExUnit.Case, async: true + use ExUnit.Case, async: false use OMG.Fixtures alias OMG.Utxo diff --git a/apps/omg_watcher_info/test/support/factories/pending_block_factory.ex b/apps/omg_watcher_info/test/support/factories/pending_block_factory.ex deleted file mode 100644 index fb605c1371..0000000000 --- a/apps/omg_watcher_info/test/support/factories/pending_block_factory.ex +++ /dev/null @@ -1,52 +0,0 @@ -# Copyright 2019-2020 OMG Network Pte Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -defmodule OMG.WatcherInfo.Factory.PendingBlock do - @moduledoc """ - Pending Block factory. - - Generates a pending block that will need to be processed and inserted to the database. - """ - - defmacro __using__(_opts) do - quote do - alias OMG.TestHelper - alias OMG.WatcherInfo.DB - - @eth <<0::160>> - - def pending_block_factory() do - blknum = sequence(:block_blknum, fn seq -> (seq + 1) * 1000 end) - alice = TestHelper.generate_entity() - bob = TestHelper.generate_entity() - - tx_1 = TestHelper.create_recovered([{blknum + 1, 0, 0, alice}], @eth, [{bob, 300}]) - tx_2 = TestHelper.create_recovered([{blknum + 1, 0, 0, alice}], @eth, [{bob, 500}]) - - block = %DB.PendingBlock{ - data: - :erlang.term_to_binary(%{ - blknum: blknum, - blkhash: insecure_random_bytes(32), - eth_height: sequence(:block_eth_height, fn seq -> seq + 1 end), - timestamp: sequence(:block_timestamp, fn seq -> seq + 1 end), - transactions: [tx_1, tx_2], - tx_count: 2 - }), - blknum: blknum - } - end - end - end -end diff --git a/apps/omg_watcher_info/test/support/factory.ex b/apps/omg_watcher_info/test/support/factory.ex index 6bc2edaffa..a61b600a13 100644 --- a/apps/omg_watcher_info/test/support/factory.ex +++ b/apps/omg_watcher_info/test/support/factory.ex @@ -64,7 +64,6 @@ defmodule OMG.WatcherInfo.Factory do use OMG.WatcherInfo.Factory.Block use OMG.WatcherInfo.Factory.DataHelper use OMG.WatcherInfo.Factory.EthEvent - use OMG.WatcherInfo.Factory.PendingBlock use OMG.WatcherInfo.Factory.Transaction use OMG.WatcherInfo.Factory.TxOutput end diff --git a/apps/omg_watcher_rpc/test/omg_watcher_rpc/web/controllers/account_test.exs b/apps/omg_watcher_rpc/test/omg_watcher_rpc/web/controllers/account_test.exs index a0a383b9ea..a8579f8e7e 100644 --- a/apps/omg_watcher_rpc/test/omg_watcher_rpc/web/controllers/account_test.exs +++ b/apps/omg_watcher_rpc/test/omg_watcher_rpc/web/controllers/account_test.exs @@ -18,8 +18,6 @@ defmodule OMG.WatcherRPC.Web.Controller.AccountTest do use OMG.Fixtures use OMG.WatcherInfo.Fixtures - import OMG.WatcherInfo.Factory - alias OMG.Crypto alias OMG.Utils.HttpRPC.Encoding alias OMG.Utxo @@ -232,17 +230,15 @@ defmodule OMG.WatcherRPC.Web.Controller.AccountTest do [] = WatcherHelper.get_utxos(carol.addr) # bob spends his utxo to carol - mined_block = %{ + block_application = %{ transactions: [OMG.TestHelper.create_recovered([{2000, 0, 0, bob}], @eth, [{bob, 49}, {carol, 50}])], - blknum: 11_000, - blkhash: <>, + number: 11_000, + hash: <>, timestamp: :os.system_time(:second), eth_height: 10 } - pending_block = insert(:pending_block, %{data: :erlang.term_to_binary(mined_block), blknum: 11_000}) - - DB.Block.insert_from_pending_block(pending_block) + {:ok, _} = DB.Block.insert_from_block_application(block_application) assert [ %{ @@ -292,17 +288,15 @@ defmodule OMG.WatcherRPC.Web.Controller.AccountTest do "oindex" => 0 } = utxos |> Enum.find(&(&1["blknum"] < 1000)) - mined_block = %{ + block_application = %{ transactions: [OMG.TestHelper.create_recovered([{blknum, 0, 0, bob}], @eth, [{carol, 100}])], - blknum: 11_000, - blkhash: <>, + number: 11_000, + hash: <>, timestamp: :os.system_time(:second), eth_height: 10 } - pending_block = insert(:pending_block, %{data: :erlang.term_to_binary(mined_block), blknum: 11_000}) - - DB.Block.insert_from_pending_block(pending_block) + {:ok, _} = DB.Block.insert_from_block_application(block_application) utxos = WatcherHelper.get_utxos(bob.addr) diff --git a/apps/omg_watcher_rpc/test/omg_watcher_rpc/web/controllers/challenge_test.exs b/apps/omg_watcher_rpc/test/omg_watcher_rpc/web/controllers/challenge_test.exs index 4296582f7e..6a66fed765 100644 --- a/apps/omg_watcher_rpc/test/omg_watcher_rpc/web/controllers/challenge_test.exs +++ b/apps/omg_watcher_rpc/test/omg_watcher_rpc/web/controllers/challenge_test.exs @@ -18,8 +18,6 @@ defmodule OMG.WatcherRPC.Web.Controller.ChallengeTest do use OMG.Fixtures use OMG.WatcherInfo.Fixtures - import OMG.WatcherInfo.Factory - alias OMG.Utxo alias OMG.WatcherInfo.DB alias Support.WatcherHelper @@ -33,21 +31,17 @@ defmodule OMG.WatcherRPC.Web.Controller.ChallengeTest do test "challenge data is properly formatted", %{alice: alice} do DB.EthEvent.insert_deposits!([%{owner: alice.addr, currency: @eth, amount: 100, blknum: 1, eth_height: 1}]) - mined_block = %{ - transactions: [ - OMG.TestHelper.create_recovered([{1, 0, 0, alice}], @eth, [{alice, 100}]) - ], - blknum: 1000, - blkhash: <>, + block_application = %{ + transactions: [OMG.TestHelper.create_recovered([{1, 0, 0, alice}], @eth, [{alice, 100}])], + number: 1000, + hash: <>, timestamp: :os.system_time(:second), eth_height: 1 } - pending_block = insert(:pending_block, %{data: :erlang.term_to_binary(mined_block), blknum: 11_000}) - - DB.Block.insert_from_pending_block(pending_block) + {:ok, _} = DB.Block.insert_from_block_application(block_application) - utxo_pos = Utxo.position(1, 0, 0) |> Utxo.Position.encode() + utxo_pos = Utxo.Position.encode(Utxo.position(1, 0, 0)) %{ "input_index" => _input_index, @@ -60,7 +54,7 @@ defmodule OMG.WatcherRPC.Web.Controller.ChallengeTest do @tag skip: true @tag fixtures: [:phoenix_ecto_sandbox] test "challenging non-existent utxo returns error" do - utxo_pos = Utxo.position(1, 1, 0) |> Utxo.Position.encode() + utxo_pos = Utxo.Position.encode(Utxo.position(1, 1, 0)) %{ "code" => "challenge:invalid", diff --git a/config/config.exs b/config/config.exs index 7109f559b0..64bd54464a 100644 --- a/config/config.exs +++ b/config/config.exs @@ -149,9 +149,7 @@ config :omg_watcher_info, child_chain_url: "http://localhost:9656/v1", namespace: OMG.WatcherInfo, ecto_repos: [OMG.WatcherInfo.DB.Repo], - metrics_collection_interval: 60_000, - pending_block_processing_interval: 1000, - block_queue_check_interval: 10_000 + metrics_collection_interval: 60_000 # Configures the endpoint diff --git a/config/test.exs b/config/test.exs index 62fba17835..ab50cbba7c 100644 --- a/config/test.exs +++ b/config/test.exs @@ -128,7 +128,7 @@ config :omg_watcher, OMG.Watcher.Tracer, config :omg_watcher_info, child_chain_url: System.get_env("CHILD_CHAIN_URL", "http://localhost:9656/v1") config :omg_watcher_info, OMG.WatcherInfo.DB.Repo, - ownership_timeout: 180_000, + ownership_timeout: 500_000, pool: Ecto.Adapters.SQL.Sandbox, # DATABASE_URL format is following `postgres://{user_name}:{password}@{host:port}/{database_name}` url: System.get_env("TEST_DATABASE_URL", "postgres://omisego_dev:omisego_dev@localhost:5432/omisego_test") diff --git a/docker-compose.yml b/docker-compose.yml index a318518758..ebdc41860d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -218,6 +218,7 @@ services: networks: chain_net: + name: "chain_net" driver: bridge ipam: config: diff --git a/mix.exs b/mix.exs index fed576b03e..8d8e9467d8 100644 --- a/mix.exs +++ b/mix.exs @@ -124,7 +124,7 @@ defmodule OMG.Umbrella.MixProject do defp aliases() do [ - test: ["test --no-start"], + test: ["ecto.create", "ecto.migrate", "test --no-start"], coveralls: ["coveralls --no-start"], "coveralls.html": ["coveralls.html --no-start"], "coveralls.detail": ["coveralls.detail --no-start"], diff --git a/priv/cabbage b/priv/cabbage index ee6dcf38ea..4f0b13b629 160000 --- a/priv/cabbage +++ b/priv/cabbage @@ -1 +1 @@ -Subproject commit ee6dcf38ea1a31791974b1e8cc0d97a8853f9779 +Subproject commit 4f0b13b629b3b682a014fb744f4f66bbabea86b8