Skip to content

Commit

Permalink
Split integration & unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Argonus committed Apr 22, 2024
1 parent 1d4a92d commit 858e268
Show file tree
Hide file tree
Showing 8 changed files with 202 additions and 35 deletions.
100 changes: 100 additions & 0 deletions .github/workflows/integration.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
name: CI Integration

on:
push:
branches: [ master ]
pull_request:
branches: [ master ]

jobs:
setup:
name: test | setup dependencies
runs-on: ubuntu-20.04
env:
MIX_ENV: test
strategy:
matrix:
pair:
- elixir: 1.16
otp: 26.1

steps:
- name: Cancel previous runs
uses: styfle/[email protected]
with:
access_token: ${{ github.token }}
- name: Checkout Github repo
uses: actions/checkout@v2
- name: Setup elixir & erlang environment
uses: erlef/setup-beam@v1
with:
elixir-version: ${{matrix.pair.elixir}} # Define the elixir version [required]
otp-version: ${{matrix.pair.otp}} # Define the OTP version [required]

- name: Retrieve Mix Dependencies Cache
uses: actions/cache@v2
id: mix-cache # id to use in retrieve action
with:
path: deps
key: ${{ runner.os }}-${{ matrix.pair.otp }}-${{ matrix.pair.elixir }}-mix-${{ hashFiles(format('{0}{1}', github.workspace, '/mix.lock')) }}

- name: Retrieve Mix Dependencies Compilation Cache
uses: actions/cache@v2
id: mix-deps-compile-cache # id to use in retrieve action
with:
path: _build
key: ${{ runner.os }}-${{ matrix.pair.otp }}-${{ matrix.pair.elixir }}-mix-deps-compile-${{ hashFiles(format('{0}{1}', github.workspace, '/mix.lock')) }}
- name: Install Mix Dependencies
run: |
mix local.rebar --force
mix local.hex --force
mix deps.get
- name: Compile Mix Dependencies
run: mix deps.compile

test:
name: runner / Test
needs: [setup]

runs-on: ubuntu-20.04
env:
MIX_ENV: test

strategy:
fail-fast: false
matrix:
pair:
- elixir: 1.16
otp: 26.1

steps:
- uses: actions/checkout@v2
- name: Setup elixir & erlang environment
uses: erlef/setup-beam@v1
with:
elixir-version: ${{matrix.pair.elixir}} # Define the elixir version [required]
otp-version: ${{matrix.pair.otp}} # Define the OTP version [required]

- name: Retrieve Mix Dependencies Cache
uses: actions/cache@v2
id: mix-cache # id to use in retrieve action
with:
path: deps
key: ${{ runner.os }}-${{ matrix.pair.otp }}-${{ matrix.pair.elixir }}-mix-${{ hashFiles(format('{0}{1}', github.workspace, '/mix.lock')) }}

- name: Retrieve Mix Dependencies Compilation Cache
uses: actions/cache@v2
id: mix-deps-compile-cache # id to use in retrieve action
with:
path: _build
key: ${{ runner.os }}-${{ matrix.pair.otp }}-${{ matrix.pair.elixir }}-mix-deps-compile-${{ hashFiles(format('{0}{1}', github.workspace, '/mix.lock')) }}

- name: Docker-compose up
run: ./scripts/docker_up.sh

- name: Docker ps
run: docker ps -a

- name: Run Tests
run: ./scripts/ci_tests.sh
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,4 @@ jobs:
run: docker ps -a

- name: Run Tests
run: ./scripts/ci_tests.sh
run: mix test
10 changes: 10 additions & 0 deletions lib/kafka_ex/new/client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,16 @@ defmodule KafkaEx.New.Client do
end
end

# Backward compatibility, to be deleted once we delete legacy code
def handle_call({:offset, topic, partition, timestamp}, _from, state) do
partition_data = %{partition_num: partition, timestamp: timestamp}

case list_offset_request({topic, [partition_data]}, [], state) do
{:error, error} -> {:reply, {:error, error}, state}
{result, updated_state} -> {:reply, result, updated_state}
end
end

def handle_call({:describe_groups, [consumer_group_name], opts}, _from, state) do
if KafkaEx.valid_consumer_group?(consumer_group_name) do
case describe_group_request(consumer_group_name, opts, state) do
Expand Down
29 changes: 7 additions & 22 deletions lib/kafka_ex/new/client_compatibility.ex
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,13 @@ defmodule KafkaEx.New.ClientCompatibility do
# note we also try to create the topic if it does not exist
state = ensure_topics_metadata(state, [produce_request.topic], true)

produce_request =
default_partitioner().assign_partition(
produce_request,
Adapter.metadata_response(state.cluster_metadata)
)
metadata_response = Adapter.metadata_response(state.cluster_metadata)
produce_request = default_partitioner().assign_partition(produce_request, metadata_response)

{request, topic, partition} = Adapter.produce_request(produce_request)

{response, updated_state} =
kayrock_network_request(
request,
NodeSelector.topic_partition(topic, partition),
state
)
node_selector = NodeSelector.topic_partition(topic, partition)
{response, updated_state} = kayrock_network_request(request, node_selector, state)

response =
case response do
Expand Down Expand Up @@ -256,18 +249,10 @@ defmodule KafkaEx.New.ClientCompatibility do
raise KafkaEx.ConsumerGroupRequiredError, offset_fetch
end

{request, consumer_group} =
Adapter.offset_fetch_request(
offset_fetch,
state.consumer_group_for_auto_commit
)
{request, consumer_group} = Adapter.offset_fetch_request(offset_fetch, state.consumer_group_for_auto_commit)

{response, updated_state} =
kayrock_network_request(
request,
NodeSelector.consumer_group(consumer_group),
state
)
node_selector = NodeSelector.consumer_group(consumer_group)
{response, updated_state} = kayrock_network_request(request, node_selector, state)

response =
case response do
Expand Down
7 changes: 4 additions & 3 deletions lib/kafka_ex/protocol/offset.ex
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@ defmodule KafkaEx.Protocol.Offset do
defstruct topic: nil, partition_offsets: []
@type t :: %Response{topic: binary, partition_offsets: list}

def extract_offset([%__MODULE__{partition_offsets: [%{offset: [offset]}]}]),
do: offset

def extract_offset([%__MODULE__{partition_offsets: [%{offset: [offset]}]}]), do: offset
def extract_offset([%__MODULE__{partition_offsets: [%{offset: []}]}]), do: 0

# Forward Compatibility with Kayrock Client
def extract_offset({:ok, [%{partition_offsets: [%{offset: offset}]}]}) when is_integer(offset), do: offset
end

@spec create_request(integer, binary, binary, integer, term) :: iolist
Expand Down
27 changes: 27 additions & 0 deletions test/kafka_ex/new/structs/offset/partition_offset_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
defmodule KafkaEx.New.Structs.Offset.PartitionOffsetTest do
use ExUnit.Case, async: true

alias KafkaEx.New.Structs.Offset.PartitionOffset

describe "build/1" do
test "returns struct with missing timestamp" do
result = PartitionOffset.build(%{partition: 1, offset: 2})

assert result == %PartitionOffset{
partition: 1,
offset: 2,
timestamp: -1
}
end

test "returns struct with timestamp" do
result = PartitionOffset.build(%{partition: 1, offset: 2, timestamp: 123})

assert result == %PartitionOffset{
partition: 1,
offset: 2,
timestamp: 123
}
end
end
end
40 changes: 40 additions & 0 deletions test/kafka_ex/new/structs/offset_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
defmodule KafkaEx.New.Structs.OffsetTest do
use ExUnit.Case, async: true

alias KafkaEx.New.Structs.Offset

describe "from_list_offset/2" do
test "creates offset with v0 partition responses" do
result = Offset.from_list_offset("test-topic", [%{offset: 1, partition: 2}])

assert result == %Offset{
topic: "test-topic",
partition_offsets: [
%Offset.PartitionOffset{offset: 1, partition: 2, timestamp: -1}
]
}
end

test "creates offset with v1 partition responses" do
result = Offset.from_list_offset("test-topic", [%{offset: 1, partition: 2}])

assert result == %Offset{
topic: "test-topic",
partition_offsets: [
%Offset.PartitionOffset{offset: 1, partition: 2, timestamp: -1}
]
}
end

test "creates offset with v2 partition responses" do
result = Offset.from_list_offset("test-topic", [%{offset: 1, partition: 2, timestamp: 3}])

assert result == %Offset{
topic: "test-topic",
partition_offsets: [
%Offset.PartitionOffset{offset: 1, partition: 2, timestamp: 3}
]
}
end
end
end
22 changes: 13 additions & 9 deletions test/support/test_helpers.ex
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,7 @@ defmodule KafkaEx.TestHelpers do

defp wait_for_topic_to_appear(_client, _topic_name, attempts \\ 10)

defp wait_for_topic_to_appear(_client, _topic_name, attempts)
when attempts <= 0 do
defp wait_for_topic_to_appear(_client, _topic_name, attempts) when attempts <= 0 do
raise "Timeout while waiting for topic to appear"
end

Expand All @@ -162,15 +161,17 @@ defmodule KafkaEx.TestHelpers do
end
end

defp first_partition_offset(:topic_not_found) do
nil
end
defp first_partition_offset({:ok, response}), do: first_partition_offset(response)
defp first_partition_offset({:error, error}), do: first_partition_offset(error)
defp first_partition_offset(:topic_not_found), do: nil

defp first_partition_offset(response) do
[%KafkaEx.Protocol.Offset.Response{partition_offsets: partition_offsets}] = response
[%{partition_offsets: partition_offsets}] = response

first_partition = hd(partition_offsets)
first_partition.offset |> hd
case hd(partition_offsets) do
%{offset: [offset | _]} -> offset
%{offset: offset} -> offset
end
end

defp wait_for_value(_value_getter, _condn, _dwell, max_tries, n)
Expand Down Expand Up @@ -198,7 +199,10 @@ defmodule KafkaEx.TestHelpers do
value =
wait_for_value(
value_getter,
fn v -> length(v) > 0 end,
fn
{:ok, v} -> length(v) > 0
v -> length(v) > 0
end,
dwell,
max_tries
)
Expand Down

0 comments on commit 858e268

Please sign in to comment.