From 4cdda8b0a258d511ac68478b6574294e9e22075b Mon Sep 17 00:00:00 2001 From: Argonus <9743549+Argonus@users.noreply.github.com> Date: Fri, 19 Apr 2024 16:02:49 +0100 Subject: [PATCH] Split integration & unit tests --- .github/workflows/integration.yml | 100 ++++++++++++++++++ .github/workflows/test.yml | 2 +- lib/kafka_ex/new/client.ex | 10 ++ lib/kafka_ex/new/client_compatibility.ex | 29 ++--- lib/kafka_ex/protocol/offset.ex | 5 +- .../structs/offset/partition_offset_test.exs | 27 +++++ test/kafka_ex/new/structs/offset_test.exs | 40 +++++++ test/support/test_helpers.ex | 22 ++-- 8 files changed, 200 insertions(+), 35 deletions(-) create mode 100644 .github/workflows/integration.yml create mode 100644 test/kafka_ex/new/structs/offset/partition_offset_test.exs create mode 100644 test/kafka_ex/new/structs/offset_test.exs diff --git a/.github/workflows/integration.yml b/.github/workflows/integration.yml new file mode 100644 index 00000000..ffc4708f --- /dev/null +++ b/.github/workflows/integration.yml @@ -0,0 +1,100 @@ +name: CI Integration + +on: + push: + branches: [ master ] + pull_request: + branches: [ master ] + +jobs: + setup: + name: test | setup dependencies + runs-on: ubuntu-20.04 + env: + MIX_ENV: test + strategy: + matrix: + pair: + - elixir: 1.16 + otp: 26.1 + + steps: + - name: Cancel previous runs + uses: styfle/cancel-workflow-action@0.9.0 + with: + access_token: ${{ github.token }} + - name: Checkout Github repo + uses: actions/checkout@v2 + - name: Setup elixir & erlang environment + uses: erlef/setup-beam@v1 + with: + elixir-version: ${{matrix.pair.elixir}} # Define the elixir version [required] + otp-version: ${{matrix.pair.otp}} # Define the OTP version [required] + + - name: Retrieve Mix Dependencies Cache + uses: actions/cache@v2 + id: mix-cache # id to use in retrieve action + with: + path: deps + key: ${{ runner.os }}-${{ matrix.pair.otp }}-${{ matrix.pair.elixir }}-mix-${{ hashFiles(format('{0}{1}', github.workspace, '/mix.lock')) }} + + - name: Retrieve Mix Dependencies Compilation Cache + uses: actions/cache@v2 + id: mix-deps-compile-cache # id to use in retrieve action + with: + path: _build + key: ${{ runner.os }}-${{ matrix.pair.otp }}-${{ matrix.pair.elixir }}-mix-deps-compile-${{ hashFiles(format('{0}{1}', github.workspace, '/mix.lock')) }} + - name: Install Mix Dependencies + run: | + mix local.rebar --force + mix local.hex --force + mix deps.get + + - name: Compile Mix Dependencies + run: mix deps.compile + + test: + name: runner / Test + needs: [setup] + + runs-on: ubuntu-20.04 + env: + MIX_ENV: test + + strategy: + fail-fast: false + matrix: + pair: + - elixir: 1.16 + otp: 26.1 + + steps: + - uses: actions/checkout@v2 + - name: Setup elixir & erlang environment + uses: erlef/setup-beam@v1 + with: + elixir-version: ${{matrix.pair.elixir}} # Define the elixir version [required] + otp-version: ${{matrix.pair.otp}} # Define the OTP version [required] + + - name: Retrieve Mix Dependencies Cache + uses: actions/cache@v2 + id: mix-cache # id to use in retrieve action + with: + path: deps + key: ${{ runner.os }}-${{ matrix.pair.otp }}-${{ matrix.pair.elixir }}-mix-${{ hashFiles(format('{0}{1}', github.workspace, '/mix.lock')) }} + + - name: Retrieve Mix Dependencies Compilation Cache + uses: actions/cache@v2 + id: mix-deps-compile-cache # id to use in retrieve action + with: + path: _build + key: ${{ runner.os }}-${{ matrix.pair.otp }}-${{ matrix.pair.elixir }}-mix-deps-compile-${{ hashFiles(format('{0}{1}', github.workspace, '/mix.lock')) }} + + - name: Docker-compose up + run: ./scripts/docker_up.sh + + - name: Docker ps + run: docker ps -a + + - name: Run Tests + run: ./scripts/ci_tests.sh \ No newline at end of file diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 0a8719cc..13f1eadf 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -105,4 +105,4 @@ jobs: run: docker ps -a - name: Run Tests - run: ./scripts/ci_tests.sh \ No newline at end of file + run: mix test \ No newline at end of file diff --git a/lib/kafka_ex/new/client.ex b/lib/kafka_ex/new/client.ex index 3fce96e8..da9528fb 100644 --- a/lib/kafka_ex/new/client.ex +++ b/lib/kafka_ex/new/client.ex @@ -163,6 +163,16 @@ defmodule KafkaEx.New.Client do end end + # Backward compatibility, to be deleted once we delete legacy code + def handle_call({:offset, topic, partition, timestamp}, _from, state) do + partition_data = %{partition_num: partition, timestamp: timestamp} + + case list_offset_request({topic, [partition_data]}, [], state) do + {:error, error} -> {:reply, {:error, error}, state} + {result, updated_state} -> {:reply, result, updated_state} + end + end + def handle_call({:describe_groups, [consumer_group_name], opts}, _from, state) do if KafkaEx.valid_consumer_group?(consumer_group_name) do case describe_group_request(consumer_group_name, opts, state) do diff --git a/lib/kafka_ex/new/client_compatibility.ex b/lib/kafka_ex/new/client_compatibility.ex index d296f3f4..d359977f 100644 --- a/lib/kafka_ex/new/client_compatibility.ex +++ b/lib/kafka_ex/new/client_compatibility.ex @@ -37,20 +37,13 @@ defmodule KafkaEx.New.ClientCompatibility do # note we also try to create the topic if it does not exist state = ensure_topics_metadata(state, [produce_request.topic], true) - produce_request = - default_partitioner().assign_partition( - produce_request, - Adapter.metadata_response(state.cluster_metadata) - ) + metadata_response = Adapter.metadata_response(state.cluster_metadata) + produce_request = default_partitioner().assign_partition(produce_request, metadata_response) {request, topic, partition} = Adapter.produce_request(produce_request) - {response, updated_state} = - kayrock_network_request( - request, - NodeSelector.topic_partition(topic, partition), - state - ) + node_selector = NodeSelector.topic_partition(topic, partition) + {response, updated_state} = kayrock_network_request(request, node_selector, state) response = case response do @@ -256,18 +249,10 @@ defmodule KafkaEx.New.ClientCompatibility do raise KafkaEx.ConsumerGroupRequiredError, offset_fetch end - {request, consumer_group} = - Adapter.offset_fetch_request( - offset_fetch, - state.consumer_group_for_auto_commit - ) + {request, consumer_group} = Adapter.offset_fetch_request(offset_fetch, state.consumer_group_for_auto_commit) - {response, updated_state} = - kayrock_network_request( - request, - NodeSelector.consumer_group(consumer_group), - state - ) + node_selector = NodeSelector.consumer_group(consumer_group) + {response, updated_state} = kayrock_network_request(request, node_selector, state) response = case response do diff --git a/lib/kafka_ex/protocol/offset.ex b/lib/kafka_ex/protocol/offset.ex index 62b0bf38..d75aa161 100644 --- a/lib/kafka_ex/protocol/offset.ex +++ b/lib/kafka_ex/protocol/offset.ex @@ -27,10 +27,9 @@ defmodule KafkaEx.Protocol.Offset do defstruct topic: nil, partition_offsets: [] @type t :: %Response{topic: binary, partition_offsets: list} - def extract_offset([%__MODULE__{partition_offsets: [%{offset: [offset]}]}]), - do: offset - + def extract_offset([%__MODULE__{partition_offsets: [%{offset: [offset]}]}]), do: offset def extract_offset([%__MODULE__{partition_offsets: [%{offset: []}]}]), do: 0 + def extract_offset([%{partition_offsets: [%{offset: offset}]}]) when is_integer(offset), do: offset end @spec create_request(integer, binary, binary, integer, term) :: iolist diff --git a/test/kafka_ex/new/structs/offset/partition_offset_test.exs b/test/kafka_ex/new/structs/offset/partition_offset_test.exs new file mode 100644 index 00000000..b720d0bf --- /dev/null +++ b/test/kafka_ex/new/structs/offset/partition_offset_test.exs @@ -0,0 +1,27 @@ +defmodule KafkaEx.New.Structs.Offset.PartitionOffsetTest do + use ExUnit.Case, async: true + + alias KafkaEx.New.Structs.Offset.PartitionOffset + + describe "build/1" do + test "returns struct with missing timestamp" do + result = PartitionOffset.build(%{partition: 1, offset: 2}) + + assert result == %PartitionOffset{ + partition: 1, + offset: 2, + timestamp: -1 + } + end + + test "returns struct with timestamp" do + result = PartitionOffset.build(%{partition: 1, offset: 2, timestamp: 123}) + + assert result == %PartitionOffset{ + partition: 1, + offset: 2, + timestamp: 123 + } + end + end +end diff --git a/test/kafka_ex/new/structs/offset_test.exs b/test/kafka_ex/new/structs/offset_test.exs new file mode 100644 index 00000000..fe222f44 --- /dev/null +++ b/test/kafka_ex/new/structs/offset_test.exs @@ -0,0 +1,40 @@ +defmodule KafkaEx.New.Structs.OffsetTest do + use ExUnit.Case, async: true + + alias KafkaEx.New.Structs.Offset + + describe "from_list_offset/2" do + test "creates offset with v0 partition responses" do + result = Offset.from_list_offset("test-topic", [%{offset: 1, partition: 2}]) + + assert result == %Offset{ + topic: "test-topic", + partition_offsets: [ + %Offset.PartitionOffset{offset: 1, partition: 2, timestamp: -1} + ] + } + end + + test "creates offset with v1 partition responses" do + result = Offset.from_list_offset("test-topic", [%{offset: 1, partition: 2}]) + + assert result == %Offset{ + topic: "test-topic", + partition_offsets: [ + %Offset.PartitionOffset{offset: 1, partition: 2, timestamp: -1} + ] + } + end + + test "creates offset with v2 partition responses" do + result = Offset.from_list_offset("test-topic", [%{offset: 1, partition: 2, timestamp: 3}]) + + assert result == %Offset{ + topic: "test-topic", + partition_offsets: [ + %Offset.PartitionOffset{offset: 1, partition: 2, timestamp: 3} + ] + } + end + end +end diff --git a/test/support/test_helpers.ex b/test/support/test_helpers.ex index 88cafad0..38edba3c 100644 --- a/test/support/test_helpers.ex +++ b/test/support/test_helpers.ex @@ -142,8 +142,7 @@ defmodule KafkaEx.TestHelpers do defp wait_for_topic_to_appear(_client, _topic_name, attempts \\ 10) - defp wait_for_topic_to_appear(_client, _topic_name, attempts) - when attempts <= 0 do + defp wait_for_topic_to_appear(_client, _topic_name, attempts) when attempts <= 0 do raise "Timeout while waiting for topic to appear" end @@ -162,15 +161,17 @@ defmodule KafkaEx.TestHelpers do end end - defp first_partition_offset(:topic_not_found) do - nil - end + defp first_partition_offset({:ok, response}), do: first_partition_offset(response) + defp first_partition_offset({:error, error}), do: first_partition_offset(error) + defp first_partition_offset(:topic_not_found), do: nil defp first_partition_offset(response) do - [%KafkaEx.Protocol.Offset.Response{partition_offsets: partition_offsets}] = response + [%{partition_offsets: partition_offsets}] = response - first_partition = hd(partition_offsets) - first_partition.offset |> hd + case hd(partition_offsets) do + %{offset: [offset | _]} -> offset + %{offset: offset} -> offset + end end defp wait_for_value(_value_getter, _condn, _dwell, max_tries, n) @@ -198,7 +199,10 @@ defmodule KafkaEx.TestHelpers do value = wait_for_value( value_getter, - fn v -> length(v) > 0 end, + fn + {:ok, v} -> length(v) > 0 + v -> length(v) > 0 + end, dwell, max_tries )