From fbaa3bf54cee6b90cf4d239413b8a9bd235b9ef0 Mon Sep 17 00:00:00 2001 From: Argonus Date: Sat, 21 Oct 2023 12:39:40 +0200 Subject: [PATCH] [Kayrock] Refactor test helpers, add random ports & fix compile issues --- .../consumer_group_implementation_test.exs | 2 +- test/integration/consumer_group_test.exs | 23 +- test/integration/integration_test.exs | 8 +- .../compatibility_0_p_10_and_later_test.exs | 4 +- .../kayrock/compatibility_0_p_8_p_0_test.exs | 2 +- .../kayrock/compatibility_0_p_9_p_0_test.exs | 8 +- ...ity_consumer_group_implementation_test.exs | 14 +- .../compatibility_consumer_group_test.exs | 16 +- .../kayrock/compatibility_streaming_test.exs | 4 +- .../kayrock/compatibility_test.exs | 36 +-- test/integration/kayrock/offset_test.exs | 16 +- .../integration/kayrock/record_batch_test.exs | 32 +-- test/integration/kayrock/timestamp_test.exs | 28 +-- test/integration/new_client_test.exs | 6 +- .../server0_p_10_and_later_test.exs | 2 +- test/integration/server0_p_8_p_0_test.exs | 2 +- test/integration/server0_p_9_p_0_test.exs | 2 +- test/kafka_ex/network_client_test.exs | 13 +- .../new/client/request_builder_test.exs | 3 +- .../kayrock/describe_groups/request_test.exs | 17 +- .../kayrock/describe_groups/response_test.exs | 36 +-- .../kayrock/describe_groups_test.exs | 17 +- test/kafka_ex/new/structs/broker_test.exs | 15 +- .../consumer_group/member_assignment_test.exs | 4 +- .../structs/consumer_group/member_test.exs | 4 +- .../partition_assignment_test.exs | 4 +- .../new/structs/consumer_group_test.exs | 2 +- test/kafka_ex/socket_test.exs | 6 +- test/support/test_helpers.ex | 209 ++++++++++++++++++ test/test_helper.exs | 177 --------------- 30 files changed, 382 insertions(+), 330 deletions(-) create mode 100644 test/support/test_helpers.ex diff --git a/test/integration/consumer_group_implementation_test.exs b/test/integration/consumer_group_implementation_test.exs index 09b049e1..243a53f3 100644 --- a/test/integration/consumer_group_implementation_test.exs +++ b/test/integration/consumer_group_implementation_test.exs @@ -3,7 +3,7 @@ defmodule KafkaEx.ConsumerGroupImplementationTest do alias KafkaEx.ConsumerGroup alias KafkaEx.GenConsumer - import TestHelper + import KafkaEx.TestHelpers require Logger diff --git a/test/integration/consumer_group_test.exs b/test/integration/consumer_group_test.exs index cde96195..bb253d10 100644 --- a/test/integration/consumer_group_test.exs +++ b/test/integration/consumer_group_test.exs @@ -2,7 +2,7 @@ defmodule KafkaEx.ConsumerGroup.Test do alias KafkaEx.Protocol, as: Proto alias KafkaEx.Config use ExUnit.Case - import TestHelper + import KafkaEx.TestHelpers @moduletag :consumer_group @@ -237,7 +237,8 @@ defmodule KafkaEx.ConsumerGroup.Test do consumer_group: consumer_group ) - offset_before = TestHelper.latest_offset_number(topic, 0, worker_name) + offset_before = + KafkaEx.TestHelpers.latest_offset_number(topic, 0, worker_name) Enum.each(1..10, fn _ -> msg = %Proto.Produce.Message{value: "hey #{inspect(:os.timestamp())}"} @@ -253,7 +254,9 @@ defmodule KafkaEx.ConsumerGroup.Test do ) end) - offset_after = TestHelper.latest_offset_number(topic, 0, worker_name) + offset_after = + KafkaEx.TestHelpers.latest_offset_number(topic, 0, worker_name) + assert offset_after == offset_before + 10 [logs] = @@ -471,7 +474,7 @@ defmodule KafkaEx.ConsumerGroup.Test do offset: 0 ) - log = TestHelper.wait_for_any(fn -> Enum.take(stream, 2) end) + log = KafkaEx.TestHelpers.wait_for_any(fn -> Enum.take(stream, 2) end) refute Enum.empty?(log) @@ -520,9 +523,9 @@ defmodule KafkaEx.ConsumerGroup.Test do # make sure the offset commit is actually committed before we # start streaming again :ok = - TestHelper.wait_for(fn -> + KafkaEx.TestHelpers.wait_for(fn -> 3 == - TestHelper.latest_consumer_offset_number( + KafkaEx.TestHelpers.latest_consumer_offset_number( random_string, 0, consumer_group, @@ -531,7 +534,7 @@ defmodule KafkaEx.ConsumerGroup.Test do end) stream = KafkaEx.stream(random_string, 0, worker_name: worker_name) - log = TestHelper.wait_for_any(fn -> Enum.take(stream, 2) end) + log = KafkaEx.TestHelpers.wait_for_any(fn -> Enum.take(stream, 2) end) refute Enum.empty?(log) first_message = log |> hd @@ -574,7 +577,7 @@ defmodule KafkaEx.ConsumerGroup.Test do assert "message 3" == m2.value offset = - TestHelper.latest_consumer_offset_number( + KafkaEx.TestHelpers.latest_consumer_offset_number( topic_name, 0, consumer_group, @@ -623,7 +626,7 @@ defmodule KafkaEx.ConsumerGroup.Test do assert "message 5" == m4.value offset = - TestHelper.latest_consumer_offset_number( + KafkaEx.TestHelpers.latest_consumer_offset_number( topic_name, 0, consumer_group, @@ -649,7 +652,7 @@ defmodule KafkaEx.ConsumerGroup.Test do Enum.map(map_stream, fn m -> m.value end) offset = - TestHelper.latest_consumer_offset_number( + KafkaEx.TestHelpers.latest_consumer_offset_number( topic_name, 0, other_consumer_group, diff --git a/test/integration/integration_test.exs b/test/integration/integration_test.exs index a7c9e364..0b83f1e4 100644 --- a/test/integration/integration_test.exs +++ b/test/integration/integration_test.exs @@ -2,7 +2,7 @@ defmodule KafkaEx.Integration.Test do alias KafkaEx.Protocol, as: Proto alias KafkaEx.Config use ExUnit.Case - import TestHelper + import KafkaEx.TestHelpers @moduletag :integration @@ -203,7 +203,7 @@ defmodule KafkaEx.Integration.Test do random_string = generate_random_string() metadata = - TestHelper.wait_for_value( + KafkaEx.TestHelpers.wait_for_value( fn -> KafkaEx.metadata(topic: random_string) end, fn metadata -> metadata != nil && length(metadata.topic_metadatas) > 0 @@ -365,7 +365,9 @@ defmodule KafkaEx.Integration.Test do }) [offset_response] = - TestHelper.wait_for_any(fn -> KafkaEx.latest_offset(random_string, 0) end) + KafkaEx.TestHelpers.wait_for_any(fn -> + KafkaEx.latest_offset(random_string, 0) + end) offset = offset_response.partition_offsets |> hd |> Map.get(:offset) |> hd diff --git a/test/integration/kayrock/compatibility_0_p_10_and_later_test.exs b/test/integration/kayrock/compatibility_0_p_10_and_later_test.exs index c69bd39d..7a3ff8bd 100644 --- a/test/integration/kayrock/compatibility_0_p_10_and_later_test.exs +++ b/test/integration/kayrock/compatibility_0_p_10_and_later_test.exs @@ -36,7 +36,7 @@ defmodule KafkaEx.KayrockCompatibility0p10AndLaterTest do resp = create_topic(name, config, client) assert {:topic_already_exists, name} == parse_create_topic_resp(resp) - TestHelper.wait_for(fn -> + KafkaEx.TestHelpers.wait_for(fn -> {:ok, metadatas} = KafkaExAPI.topics_metadata(client, [name]) length(metadatas) > 0 end) @@ -56,7 +56,7 @@ defmodule KafkaEx.KayrockCompatibility0p10AndLaterTest do resp = KafkaEx.delete_topics([name], timeout: 5_000, worker_name: client) assert {:no_error, name} = parse_delete_topic_resp(resp) - TestHelper.wait_for(fn -> + KafkaEx.TestHelpers.wait_for(fn -> {:ok, []} == KafkaExAPI.topics_metadata(client, [name]) end) end diff --git a/test/integration/kayrock/compatibility_0_p_8_p_0_test.exs b/test/integration/kayrock/compatibility_0_p_8_p_0_test.exs index 8525d670..9fb79a22 100644 --- a/test/integration/kayrock/compatibility_0_p_8_p_0_test.exs +++ b/test/integration/kayrock/compatibility_0_p_8_p_0_test.exs @@ -27,7 +27,7 @@ defmodule KafkaEx.KayrockCompatibility0p8p0Test do partition = 0 :ok = KafkaEx.produce(@topic, partition, msg, worker_name: client) - TestHelper.wait_for(fn -> + KafkaEx.TestHelpers.wait_for(fn -> [got] = KafkaEx.fetch( @topic, diff --git a/test/integration/kayrock/compatibility_0_p_9_p_0_test.exs b/test/integration/kayrock/compatibility_0_p_9_p_0_test.exs index 45d73625..3e01c8d2 100644 --- a/test/integration/kayrock/compatibility_0_p_9_p_0_test.exs +++ b/test/integration/kayrock/compatibility_0_p_9_p_0_test.exs @@ -25,7 +25,7 @@ defmodule KafkaEx.KayrockCompatibility0p9p0Test do end test "can join a consumer group", %{client: client} do - random_group = TestHelper.generate_random_string() + random_group = KafkaEx.TestHelpers.generate_random_string() request = %JoinGroupRequest{ group_name: random_group, @@ -45,7 +45,7 @@ defmodule KafkaEx.KayrockCompatibility0p9p0Test do test "can send a simple leader sync for a consumer group", %{client: client} do # A lot of repetition with the previous test. Leaving it in now, waiting for # how this pans out eventually as we add more and more 0.9 consumer group code - random_group = TestHelper.generate_random_string() + random_group = KafkaEx.TestHelpers.generate_random_string() request = %JoinGroupRequest{ group_name: random_group, @@ -81,7 +81,7 @@ defmodule KafkaEx.KayrockCompatibility0p9p0Test do test "can leave a consumer group", %{client: client} do # A lot of repetition with the previous tests. Leaving it in now, waiting for # how this pans out eventually as we add more and more 0.9 consumer group code - random_group = TestHelper.generate_random_string() + random_group = KafkaEx.TestHelpers.generate_random_string() request = %JoinGroupRequest{ group_name: random_group, @@ -107,7 +107,7 @@ defmodule KafkaEx.KayrockCompatibility0p9p0Test do test "can heartbeat", %{client: client} do # See sync test. Removing repetition in the next iteration - random_group = TestHelper.generate_random_string() + random_group = KafkaEx.TestHelpers.generate_random_string() request = %JoinGroupRequest{ group_name: random_group, diff --git a/test/integration/kayrock/compatibility_consumer_group_implementation_test.exs b/test/integration/kayrock/compatibility_consumer_group_implementation_test.exs index 92cbe256..df1d56da 100644 --- a/test/integration/kayrock/compatibility_consumer_group_implementation_test.exs +++ b/test/integration/kayrock/compatibility_consumer_group_implementation_test.exs @@ -128,7 +128,7 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupImplementationTest do end def sync_stop(pid) when is_pid(pid) do - TestHelper.wait_for(fn -> + KafkaEx.TestHelpers.wait_for(fn -> if Process.alive?(pid) do Process.exit(pid, :normal) end @@ -172,7 +172,7 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupImplementationTest do topic_name = "#{@topic_name_prefix}#{:rand.uniform(2_000_000)}" {:ok, topic_name} = - TestHelper.ensure_append_timestamp_topic(client_pid, topic_name) + KafkaEx.TestHelpers.ensure_append_timestamp_topic(client_pid, topic_name) {:ok, consumer_group_pid1} = ConsumerGroup.start_link( @@ -197,7 +197,7 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupImplementationTest do ) # wait for both consumer groups to join - TestHelper.wait_for(fn -> + KafkaEx.TestHelpers.wait_for(fn -> ConsumerGroup.active?(consumer_group_pid1, 30000) && ConsumerGroup.active?(consumer_group_pid2, 30000) end) @@ -291,7 +291,7 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupImplementationTest do starting_offsets = partition_range |> Enum.map(fn px -> - {px, TestHelper.latest_offset_number(topic_name, px)} + {px, KafkaEx.TestHelpers.latest_offset_number(topic_name, px)} end) |> Enum.into(%{}) @@ -315,7 +315,7 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupImplementationTest do |> Enum.map(fn px -> consumer_pid = Map.get(consumers, {topic_name, px}) - TestHelper.wait_for(fn -> + KafkaEx.TestHelpers.wait_for(fn -> message_set = TestConsumer.last_message_set(consumer_pid) correct_last_message?(message_set, messages[px], starting_offsets[px]) end) @@ -335,9 +335,9 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupImplementationTest do # offsets should be committed on exit for px <- partition_range do - TestHelper.wait_for(fn -> + KafkaEx.TestHelpers.wait_for(fn -> ending_offset = - TestHelper.latest_consumer_offset_number( + KafkaEx.TestHelpers.latest_consumer_offset_number( topic_name, px, @consumer_group_name, diff --git a/test/integration/kayrock/compatibility_consumer_group_test.exs b/test/integration/kayrock/compatibility_consumer_group_test.exs index 81c46956..359a5488 100644 --- a/test/integration/kayrock/compatibility_consumer_group_test.exs +++ b/test/integration/kayrock/compatibility_consumer_group_test.exs @@ -31,7 +31,7 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupTest do test "fetch with auto_commit doesn't blow up on no messages", %{ client: client } do - topic = TestHelper.generate_random_string() + topic = KafkaEx.TestHelpers.generate_random_string() consumer_group = "auto_commit_consumer_group" KafkaExAPI.set_consumer_group_for_auto_commit(client, consumer_group) @@ -167,7 +167,7 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupTest do end test "fetch starts consuming from last committed offset", %{client: client} do - random_string = TestHelper.generate_random_string() + random_string = KafkaEx.TestHelpers.generate_random_string() consumer_group = "auto_commit_consumer_group" KafkaExAPI.set_consumer_group_for_auto_commit(client, consumer_group) @@ -204,7 +204,7 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupTest do test "fetch does not commit offset with auto_commit is set to false", %{ client: client } do - topic = TestHelper.generate_random_string() + topic = KafkaEx.TestHelpers.generate_random_string() Enum.each(1..10, fn _ -> KafkaEx.produce( @@ -248,7 +248,7 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupTest do end test "offset_fetch does not override consumer_group", %{client: client} do - topic = TestHelper.generate_random_string() + topic = KafkaEx.TestHelpers.generate_random_string() consumer_group = "bar#{topic}" KafkaExAPI.set_consumer_group_for_auto_commit(client, consumer_group) @@ -276,7 +276,7 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupTest do test "offset_commit commits an offset and offset_fetch retrieves the committed offset", %{client: client} do - random_string = TestHelper.generate_random_string() + random_string = KafkaEx.TestHelpers.generate_random_string() Enum.each(1..10, fn _ -> KafkaEx.produce( @@ -325,7 +325,7 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupTest do end test "stream auto_commits offset by default", %{client: client} do - random_string = TestHelper.generate_random_string() + random_string = KafkaEx.TestHelpers.generate_random_string() KafkaEx.produce( %Proto.Produce.Request{ @@ -348,7 +348,7 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupTest do offset: 0 ) - log = TestHelper.wait_for_any(fn -> Enum.take(stream, 2) end) + log = KafkaEx.TestHelpers.wait_for_any(fn -> Enum.take(stream, 2) end) refute Enum.empty?(log) @@ -368,7 +368,7 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupTest do test "streams with a consumer group begin at the last committed offset", %{ client: client } do - topic_name = TestHelper.generate_random_string() + topic_name = KafkaEx.TestHelpers.generate_random_string() consumer_group = "stream_test" KafkaExAPI.set_consumer_group_for_auto_commit(client, consumer_group) diff --git a/test/integration/kayrock/compatibility_streaming_test.exs b/test/integration/kayrock/compatibility_streaming_test.exs index 0cddd4ea..7f6957fc 100644 --- a/test/integration/kayrock/compatibility_streaming_test.exs +++ b/test/integration/kayrock/compatibility_streaming_test.exs @@ -22,7 +22,7 @@ defmodule KafkaEx.KayrockCompatibilityStreamingTest do partition = 0 consumer_group = "streamers" - {:ok, topic} = TestHelper.ensure_append_timestamp_topic(client, topic) + {:ok, topic} = KafkaEx.TestHelpers.ensure_append_timestamp_topic(client, topic) KafkaEx.produce(topic, partition, "foo 1", api_version: 3) KafkaEx.produce(topic, partition, "foo 2", api_version: 3) @@ -81,7 +81,7 @@ defmodule KafkaEx.KayrockCompatibilityStreamingTest do topic_name = "kayrock_stream_with_empty_log" consumer_group = "streamers_with_empty_log" - {:ok, topic} = TestHelper.ensure_append_timestamp_topic(client, topic_name) + {:ok, topic} = KafkaEx.TestHelpers.ensure_append_timestamp_topic(client, topic_name) {:ok, agent} = Agent.start(fn -> [] end) diff --git a/test/integration/kayrock/compatibility_test.exs b/test/integration/kayrock/compatibility_test.exs index f81f5a9d..077bba9b 100644 --- a/test/integration/kayrock/compatibility_test.exs +++ b/test/integration/kayrock/compatibility_test.exs @@ -121,7 +121,7 @@ defmodule KafkaEx.KayrockCompatibilityTest do end test "produce creates log for a non-existing topic", %{client: client} do - random_string = TestHelper.generate_random_string() + random_string = KafkaEx.TestHelpers.generate_random_string() KafkaEx.produce( %Proto.Produce.Request{ @@ -150,14 +150,14 @@ defmodule KafkaEx.KayrockCompatibilityTest do test "fetch returns ':topic_not_found' for non-existing topic", %{ client: client } do - random_string = TestHelper.generate_random_string() + random_string = KafkaEx.TestHelpers.generate_random_string() assert KafkaEx.fetch(random_string, 0, offset: 0, worker_name: client) == :topic_not_found end test "fetch works", %{client: client} do - topic_name = TestHelper.generate_random_string() + topic_name = KafkaEx.TestHelpers.generate_random_string() {:ok, offset} = KafkaEx.produce( @@ -188,7 +188,7 @@ defmodule KafkaEx.KayrockCompatibilityTest do end test "fetch with empty topic", %{client: client} do - random_string = TestHelper.generate_random_string() + random_string = KafkaEx.TestHelpers.generate_random_string() response = KafkaEx.fetch(random_string, 0, @@ -201,7 +201,7 @@ defmodule KafkaEx.KayrockCompatibilityTest do end test "fetch nonexistent offset", %{client: client} do - random_string = TestHelper.generate_random_string() + random_string = KafkaEx.TestHelpers.generate_random_string() {:ok, offset} = KafkaEx.produce( @@ -227,7 +227,7 @@ defmodule KafkaEx.KayrockCompatibilityTest do end test "fetch nonexistent partition", %{client: client} do - random_string = TestHelper.generate_random_string() + random_string = KafkaEx.TestHelpers.generate_random_string() {:ok, offset} = KafkaEx.produce( @@ -251,7 +251,7 @@ defmodule KafkaEx.KayrockCompatibilityTest do end test "earliest_offset retrieves offset of 0", %{client: client} do - random_string = TestHelper.generate_random_string() + random_string = KafkaEx.TestHelpers.generate_random_string() KafkaEx.produce( %Proto.Produce.Request{ @@ -273,7 +273,7 @@ defmodule KafkaEx.KayrockCompatibilityTest do test "latest_offset retrieves offset of 0 for non-existing topic", %{ client: client } do - random_string = TestHelper.generate_random_string() + random_string = KafkaEx.TestHelpers.generate_random_string() {:ok, produce_offset} = KafkaEx.produce( @@ -296,7 +296,7 @@ defmodule KafkaEx.KayrockCompatibilityTest do test "latest_offset retrieves a non-zero offset for a topic published to", %{ client: client } do - random_string = TestHelper.generate_random_string() + random_string = KafkaEx.TestHelpers.generate_random_string() KafkaEx.produce( %Proto.Produce.Request{ @@ -309,7 +309,7 @@ defmodule KafkaEx.KayrockCompatibilityTest do ) [offset_response] = - TestHelper.wait_for_any(fn -> + KafkaEx.TestHelpers.wait_for_any(fn -> KafkaEx.latest_offset(random_string, 0, client) end) @@ -320,7 +320,7 @@ defmodule KafkaEx.KayrockCompatibilityTest do # compression test "compresses / decompresses using gzip", %{client: client} do - random_string = TestHelper.generate_random_string() + random_string = KafkaEx.TestHelpers.generate_random_string() message1 = %Proto.Produce.Message{value: "value 1"} message2 = %Proto.Produce.Message{key: "key 2", value: "value 2"} @@ -356,7 +356,7 @@ defmodule KafkaEx.KayrockCompatibilityTest do end test "compresses / decompresses using snappy", %{client: client} do - random_string = TestHelper.generate_random_string() + random_string = KafkaEx.TestHelpers.generate_random_string() message1 = %Proto.Produce.Message{value: "value 1"} message2 = %Proto.Produce.Message{key: "key 2", value: "value 2"} @@ -447,7 +447,7 @@ defmodule KafkaEx.KayrockCompatibilityTest do # stream test "streams kafka logs", %{client: client} do - topic_name = TestHelper.generate_random_string() + topic_name = KafkaEx.TestHelpers.generate_random_string() KafkaEx.produce( %Proto.Produce.Request{ @@ -486,7 +486,7 @@ defmodule KafkaEx.KayrockCompatibilityTest do test "stream with small max_bytes makes multiple requests if necessary", %{ client: client } do - topic_name = TestHelper.generate_random_string() + topic_name = KafkaEx.TestHelpers.generate_random_string() KafkaEx.produce( %Proto.Produce.Request{ @@ -526,7 +526,7 @@ defmodule KafkaEx.KayrockCompatibilityTest do end test "stream blocks until new messages are available", %{client: client} do - topic_name = TestHelper.generate_random_string() + topic_name = KafkaEx.TestHelpers.generate_random_string() KafkaEx.produce( %Proto.Produce.Request{ @@ -577,7 +577,7 @@ defmodule KafkaEx.KayrockCompatibilityTest do end test "stream is non-blocking with no_wait_at_logend", %{client: client} do - topic_name = TestHelper.generate_random_string() + topic_name = KafkaEx.TestHelpers.generate_random_string() KafkaEx.produce( %Proto.Produce.Request{ @@ -613,13 +613,13 @@ defmodule KafkaEx.KayrockCompatibilityTest do end test "doesn't error when re-creating an existing stream", %{client: client} do - random_string = TestHelper.generate_random_string() + random_string = KafkaEx.TestHelpers.generate_random_string() KafkaEx.stream(random_string, 0, offset: 0, worker_name: client) KafkaEx.stream(random_string, 0, offset: 0, worker_name: client) end test "produce with the default partitioner works", %{client: client} do - topic = TestHelper.generate_random_string() + topic = KafkaEx.TestHelpers.generate_random_string() :ok = KafkaEx.produce(topic, nil, "hello", worker_name: client) end end diff --git a/test/integration/kayrock/offset_test.exs b/test/integration/kayrock/offset_test.exs index bfebd40e..e7cf07f1 100644 --- a/test/integration/kayrock/offset_test.exs +++ b/test/integration/kayrock/offset_test.exs @@ -20,7 +20,7 @@ defmodule KafkaEx.KayrockOffsetTest do test "offset commit v0 and fetch v0", %{client: client} do topic = "food" consumer_group = "commit_v0_fetch_v0" - msg = TestHelper.generate_random_string() + msg = KafkaEx.TestHelpers.generate_random_string() {:ok, offset} = KafkaEx.produce( @@ -62,7 +62,7 @@ defmodule KafkaEx.KayrockOffsetTest do test "offset commit v1 and fetch v0", %{client: client} do topic = "food" consumer_group = "commit_v1_fetch_v0" - msg = TestHelper.generate_random_string() + msg = KafkaEx.TestHelpers.generate_random_string() {:ok, offset} = KafkaEx.produce( @@ -107,7 +107,7 @@ defmodule KafkaEx.KayrockOffsetTest do test "offset commit v1 and fetch v1", %{client: client} do topic = "food" consumer_group = "commit_v1_fetch_v1" - msg = TestHelper.generate_random_string() + msg = KafkaEx.TestHelpers.generate_random_string() {:ok, offset} = KafkaEx.produce( @@ -149,7 +149,7 @@ defmodule KafkaEx.KayrockOffsetTest do test "offset commit v0 and fetch v1", %{client: client} do topic = "food" consumer_group = "commit_v0_fetch_v1" - msg = TestHelper.generate_random_string() + msg = KafkaEx.TestHelpers.generate_random_string() {:ok, offset} = KafkaEx.produce( @@ -192,7 +192,7 @@ defmodule KafkaEx.KayrockOffsetTest do test "offset commit v0 and fetch v2", %{client: client} do topic = "food" consumer_group = "commit_v0_fetch_v2" - msg = TestHelper.generate_random_string() + msg = KafkaEx.TestHelpers.generate_random_string() {:ok, offset} = KafkaEx.produce( @@ -235,7 +235,7 @@ defmodule KafkaEx.KayrockOffsetTest do test "offset commit v0 and fetch v3", %{client: client} do topic = "food" consumer_group = "commit_v0_fetch_v3" - msg = TestHelper.generate_random_string() + msg = KafkaEx.TestHelpers.generate_random_string() {:ok, offset} = KafkaEx.produce( @@ -278,7 +278,7 @@ defmodule KafkaEx.KayrockOffsetTest do test "offset commit v2 and fetch v2", %{client: client} do topic = "food" consumer_group = "commit_v2_fetch_v2" - msg = TestHelper.generate_random_string() + msg = KafkaEx.TestHelpers.generate_random_string() {:ok, offset} = KafkaEx.produce( @@ -320,7 +320,7 @@ defmodule KafkaEx.KayrockOffsetTest do test "offset commit v3 and fetch v3", %{client: client} do topic = "food" consumer_group = "commit_v3_fetch_v3" - msg = TestHelper.generate_random_string() + msg = KafkaEx.TestHelpers.generate_random_string() {:ok, offset} = KafkaEx.produce( diff --git a/test/integration/kayrock/record_batch_test.exs b/test/integration/kayrock/record_batch_test.exs index bd7cc722..1b81b275 100644 --- a/test/integration/kayrock/record_batch_test.exs +++ b/test/integration/kayrock/record_batch_test.exs @@ -16,7 +16,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do test "can specify protocol version for fetch - v3", %{client: client} do topic = "food" - msg = TestHelper.generate_random_string() + msg = KafkaEx.TestHelpers.generate_random_string() {:ok, offset} = KafkaEx.produce( @@ -45,7 +45,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do test "fetch empty message set - v3", %{client: client} do topic = "food" - msg = TestHelper.generate_random_string() + msg = KafkaEx.TestHelpers.generate_random_string() {:ok, offset} = KafkaEx.produce( @@ -72,7 +72,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do # v2 is the highest that will accept the MessageSet format test "can specify protocol version for produce - v2", %{client: client} do topic = "food" - msg = TestHelper.generate_random_string() + msg = KafkaEx.TestHelpers.generate_random_string() {:ok, offset} = KafkaEx.produce( @@ -102,7 +102,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do test "can specify protocol version for fetch - v5", %{client: client} do topic = "food" - msg = TestHelper.generate_random_string() + msg = KafkaEx.TestHelpers.generate_random_string() {:ok, offset} = KafkaEx.produce( @@ -131,7 +131,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do test "fetch empty message set - v5", %{client: client} do topic = "food" - msg = TestHelper.generate_random_string() + msg = KafkaEx.TestHelpers.generate_random_string() {:ok, offset} = KafkaEx.produce( @@ -158,7 +158,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do # v3 is the lowest that requires the RecordBatch format test "can specify protocol version for produce - v3", %{client: client} do topic = "food" - msg = TestHelper.generate_random_string() + msg = KafkaEx.TestHelpers.generate_random_string() {:ok, offset} = KafkaEx.produce( @@ -241,7 +241,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do test "gzip compression - produce v0, fetch v3", %{client: client} do topic = "food" - msg = TestHelper.generate_random_string() + msg = KafkaEx.TestHelpers.generate_random_string() {:ok, offset} = KafkaEx.produce( @@ -272,7 +272,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do test "gzip compression - produce v0, fetch v5", %{client: client} do topic = "food" - msg = TestHelper.generate_random_string() + msg = KafkaEx.TestHelpers.generate_random_string() {:ok, offset} = KafkaEx.produce( @@ -303,7 +303,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do test "gzip compression - produce v3, fetch v0", %{client: client} do topic = "food" - msg = TestHelper.generate_random_string() + msg = KafkaEx.TestHelpers.generate_random_string() {:ok, offset} = KafkaEx.produce( @@ -334,7 +334,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do test "gzip compression - produce v3, fetch v3", %{client: client} do topic = "food" - msg = TestHelper.generate_random_string() + msg = KafkaEx.TestHelpers.generate_random_string() {:ok, offset} = KafkaEx.produce( @@ -365,7 +365,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do test "gzip compression - produce v3, fetch v5", %{client: client} do topic = "food" - msg = TestHelper.generate_random_string() + msg = KafkaEx.TestHelpers.generate_random_string() {:ok, offset} = KafkaEx.produce( @@ -396,7 +396,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do test "snappy compression - produce v0, fetch v3", %{client: client} do topic = "food" - msg = TestHelper.generate_random_string() + msg = KafkaEx.TestHelpers.generate_random_string() {:ok, offset} = KafkaEx.produce( @@ -427,7 +427,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do test "snappy compression - produce v0, fetch v5", %{client: client} do topic = "food" - msg = TestHelper.generate_random_string() + msg = KafkaEx.TestHelpers.generate_random_string() {:ok, offset} = KafkaEx.produce( @@ -458,7 +458,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do test "snappy compression - produce v3, fetch v0", %{client: client} do topic = "food" - msg = TestHelper.generate_random_string() + msg = KafkaEx.TestHelpers.generate_random_string() {:ok, offset} = KafkaEx.produce( @@ -489,7 +489,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do test "snappy compression - produce v3, fetch v3", %{client: client} do topic = "food" - msg = TestHelper.generate_random_string() + msg = KafkaEx.TestHelpers.generate_random_string() {:ok, offset} = KafkaEx.produce( @@ -520,7 +520,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do test "snappy compression - produce v3, fetch v5", %{client: client} do topic = "food" - msg = TestHelper.generate_random_string() + msg = KafkaEx.TestHelpers.generate_random_string() {:ok, offset} = KafkaEx.produce( diff --git a/test/integration/kayrock/timestamp_test.exs b/test/integration/kayrock/timestamp_test.exs index 0c66448e..c255c952 100644 --- a/test/integration/kayrock/timestamp_test.exs +++ b/test/integration/kayrock/timestamp_test.exs @@ -20,7 +20,7 @@ defmodule KafkaEx.KayrockTimestampTest do test "fetch timestamp is nil by default on v0 messages", %{client: client} do topic = "food" - msg = TestHelper.generate_random_string() + msg = KafkaEx.TestHelpers.generate_random_string() {:ok, offset} = KafkaEx.produce( @@ -50,7 +50,7 @@ defmodule KafkaEx.KayrockTimestampTest do test "fetch timestamp is -1 by default on v3 messages", %{client: client} do topic = "food" - msg = TestHelper.generate_random_string() + msg = KafkaEx.TestHelpers.generate_random_string() {:ok, offset} = KafkaEx.produce( @@ -80,7 +80,7 @@ defmodule KafkaEx.KayrockTimestampTest do test "fetch timestamp is -1 by default on v5 messages", %{client: client} do topic = "food" - msg = TestHelper.generate_random_string() + msg = KafkaEx.TestHelpers.generate_random_string() {:ok, offset} = KafkaEx.produce( @@ -112,12 +112,12 @@ defmodule KafkaEx.KayrockTimestampTest do topic = "test_log_append_timestamp_#{:rand.uniform(2_000_000)}" {:ok, ^topic} = - TestHelper.ensure_append_timestamp_topic( + KafkaEx.TestHelpers.ensure_append_timestamp_topic( client, topic ) - msg = TestHelper.generate_random_string() + msg = KafkaEx.TestHelpers.generate_random_string() {:ok, offset} = KafkaEx.produce( @@ -149,12 +149,12 @@ defmodule KafkaEx.KayrockTimestampTest do topic = "test_log_append_timestamp_#{:rand.uniform(2_000_000)}" {:ok, ^topic} = - TestHelper.ensure_append_timestamp_topic( + KafkaEx.TestHelpers.ensure_append_timestamp_topic( client, topic ) - msg = TestHelper.generate_random_string() + msg = KafkaEx.TestHelpers.generate_random_string() {:ok, offset} = KafkaEx.produce( @@ -187,12 +187,12 @@ defmodule KafkaEx.KayrockTimestampTest do topic = "test_log_append_timestamp_#{:rand.uniform(2_000_000)}" {:ok, ^topic} = - TestHelper.ensure_append_timestamp_topic( + KafkaEx.TestHelpers.ensure_append_timestamp_topic( client, topic ) - msg = TestHelper.generate_random_string() + msg = KafkaEx.TestHelpers.generate_random_string() {:ok, offset} = KafkaEx.produce( @@ -224,7 +224,7 @@ defmodule KafkaEx.KayrockTimestampTest do test "set timestamp with v0 throws an error", %{client: client} do topic = "food" - msg = TestHelper.generate_random_string() + msg = KafkaEx.TestHelpers.generate_random_string() Process.flag(:trap_exit, true) @@ -246,7 +246,7 @@ defmodule KafkaEx.KayrockTimestampTest do test "set timestamp with v1 throws an error", %{client: client} do topic = "food" - msg = TestHelper.generate_random_string() + msg = KafkaEx.TestHelpers.generate_random_string() Process.flag(:trap_exit, true) @@ -267,7 +267,7 @@ defmodule KafkaEx.KayrockTimestampTest do test "set timestamp for v3 message, fetch v0", %{client: client} do topic = "food" - msg = TestHelper.generate_random_string() + msg = KafkaEx.TestHelpers.generate_random_string() {:ok, offset} = KafkaEx.produce( @@ -299,7 +299,7 @@ defmodule KafkaEx.KayrockTimestampTest do test "set timestamp for v3 message, fetch v3", %{client: client} do topic = "food" - msg = TestHelper.generate_random_string() + msg = KafkaEx.TestHelpers.generate_random_string() {:ok, offset} = KafkaEx.produce( @@ -331,7 +331,7 @@ defmodule KafkaEx.KayrockTimestampTest do test "set timestamp for v3 message, fetch v5", %{client: client} do topic = "food" - msg = TestHelper.generate_random_string() + msg = KafkaEx.TestHelpers.generate_random_string() {:ok, offset} = KafkaEx.produce( diff --git a/test/integration/new_client_test.exs b/test/integration/new_client_test.exs index 9413b4b2..15ae7b32 100644 --- a/test/integration/new_client_test.exs +++ b/test/integration/new_client_test.exs @@ -1,6 +1,6 @@ defmodule KafkaEx.New.Client.Test do use ExUnit.Case - import TestHelper + import KafkaEx.TestHelpers alias KafkaEx.New.Client @@ -197,7 +197,7 @@ defmodule KafkaEx.New.Client.Test do test "client can receive {:ssl_closed, _}", %{client: client} do send(client, {:ssl_closed, :unused}) - TestHelper.wait_for(fn -> + KafkaEx.TestHelpers.wait_for(fn -> {:message_queue_len, m} = Process.info(client, :message_queue_len) m == 0 end) @@ -208,7 +208,7 @@ defmodule KafkaEx.New.Client.Test do test "client can receive {:tcp_closed, _}", %{client: client} do send(client, {:tcp_closed, :unused}) - TestHelper.wait_for(fn -> + KafkaEx.TestHelpers.wait_for(fn -> {:message_queue_len, m} = Process.info(client, :message_queue_len) m == 0 end) diff --git a/test/integration/server0_p_10_and_later_test.exs b/test/integration/server0_p_10_and_later_test.exs index bc30266d..2375f778 100644 --- a/test/integration/server0_p_10_and_later_test.exs +++ b/test/integration/server0_p_10_and_later_test.exs @@ -1,6 +1,6 @@ defmodule KafkaEx.Server0P10P1AndLater.Test do use ExUnit.Case - import TestHelper + import KafkaEx.TestHelpers @moduletag :server_0_p_10_and_later @num_partitions 10 diff --git a/test/integration/server0_p_8_p_0_test.exs b/test/integration/server0_p_8_p_0_test.exs index 3019a0cf..579f9616 100644 --- a/test/integration/server0_p_8_p_0_test.exs +++ b/test/integration/server0_p_8_p_0_test.exs @@ -1,6 +1,6 @@ defmodule KafkaEx.Server0P8P0.Test do use ExUnit.Case - import TestHelper + import KafkaEx.TestHelpers @moduletag :server_0_p_8_p_0 diff --git a/test/integration/server0_p_9_p_0_test.exs b/test/integration/server0_p_9_p_0_test.exs index b5da44bc..5e6f2bd0 100644 --- a/test/integration/server0_p_9_p_0_test.exs +++ b/test/integration/server0_p_9_p_0_test.exs @@ -1,6 +1,6 @@ defmodule KafkaEx.Server0P9P0.Test do use ExUnit.Case - import TestHelper + import KafkaEx.TestHelpers alias KafkaEx.Protocol.Heartbeat.Request, as: HeartbeatRequest alias KafkaEx.Protocol.JoinGroup.Request, as: JoinGroupRequest diff --git a/test/kafka_ex/network_client_test.exs b/test/kafka_ex/network_client_test.exs index c9ee89cc..c9c24e8f 100644 --- a/test/kafka_ex/network_client_test.exs +++ b/test/kafka_ex/network_client_test.exs @@ -1,5 +1,6 @@ defmodule KafkaEx.NetworkClientTest do use ExUnit.Case, async: true + import KafkaEx.TestHelpers use Hammox.Protect, module: KafkaEx.NetworkClient, @@ -28,22 +29,26 @@ defmodule KafkaEx.NetworkClientTest do describe "create_socket/3" do setup do - pid = KafkaEx.TestSupport.Server.start(3040) + port = get_free_port(3040) + pid = KafkaEx.TestSupport.Server.start(port) on_exit(fn -> Process.exit(pid, :normal) end) + + {:ok, [port: port]} end - test "creates a socket" do - kafka_ex_socket = create_socket("localhost", 3040, [], false) + test "creates a socket", %{port: port} do + kafka_ex_socket = create_socket("localhost", port, [], false) assert kafka_ex_socket.socket assert kafka_ex_socket.ssl == false end test "returns nil if socket creation fails" do - assert nil == create_socket("localhost", 3002, [], true) + port = get_free_port(3040) + assert nil == create_socket("localhost", port, [], true) end end diff --git a/test/kafka_ex/new/client/request_builder_test.exs b/test/kafka_ex/new/client/request_builder_test.exs index a6f40d3b..0f06ca92 100644 --- a/test/kafka_ex/new/client/request_builder_test.exs +++ b/test/kafka_ex/new/client/request_builder_test.exs @@ -12,8 +12,7 @@ defmodule KafkaEx.New.Client.RequestBuilderTest do group_ids: group_names } - {:ok, request} = - RequestBuilder.describe_groups_request(group_names, state) + request = RequestBuilder.describe_groups_request(group_names, state) assert expected_request == request end diff --git a/test/kafka_ex/new/protocols/kayrock/describe_groups/request_test.exs b/test/kafka_ex/new/protocols/kayrock/describe_groups/request_test.exs index b11d456a..18a900c4 100644 --- a/test/kafka_ex/new/protocols/kayrock/describe_groups/request_test.exs +++ b/test/kafka_ex/new/protocols/kayrock/describe_groups/request_test.exs @@ -1,7 +1,8 @@ defmodule KafkaEx.New.Protocols.Kayrock.DescribeGroups.RequestTest do use ExUnit.Case, async: true - alias KafkaEx.New.Protocols.DescribeGroups + alias KafkaEx.New.Protocols.Kayrock.DescribeGroups.Request, + as: DescribeGroupsRequest alias Kayrock.DescribeGroups.V0 alias Kayrock.DescribeGroups.V1 @@ -10,17 +11,19 @@ defmodule KafkaEx.New.Protocols.Kayrock.DescribeGroups.RequestTest do test "for api version 0 - builds describe group request" do groups = ["group1", "group2"] - assert %V0.Request{ - group_ids: groups - } == DescribeGroups.Request.build_request(%V0.Request{}, groups) + expected_request = %V0.Request{group_ids: groups} + + assert expected_request == + DescribeGroupsRequest.build_request(%V0.Request{}, groups) end test "for api version 1 - builds describe group request" do groups = ["group1", "group2"] - assert %V1.Request{ - group_ids: groups - } == DescribeGroups.Request.build_request(%V1.Request{}, groups) + expected_request = %V1.Request{group_ids: groups} + + assert expected_request == + DescribeGroupsRequest.build_request(%V1.Request{}, groups) end end end diff --git a/test/kafka_ex/new/protocols/kayrock/describe_groups/response_test.exs b/test/kafka_ex/new/protocols/kayrock/describe_groups/response_test.exs index 1e179699..33acc7cd 100644 --- a/test/kafka_ex/new/protocols/kayrock/describe_groups/response_test.exs +++ b/test/kafka_ex/new/protocols/kayrock/describe_groups/response_test.exs @@ -1,33 +1,35 @@ defmodule KafkaEx.New.Protocols.Kayrock.DescribeGroups.ResponseTest do use ExUnit.Case, async: true - alias KafkaEx.New.Protocols.DescribeGroups + alias KafkaEx.New.Protocols.Kayrock.DescribeGroups.Response, + as: DescribeGroupsResponse alias Kayrock.DescribeGroups.V0 alias Kayrock.DescribeGroups.V1 describe "parse_response/1" do - @expected_group %KafkaEx.New.ConsumerGroup{ + @expected_group %KafkaEx.New.Structs.ConsumerGroup{ group_id: "succeeded", state: "stable", protocol_type: "protocol_type", protocol: "protocol", members: [ - %KafkaEx.New.ConsumerGroup.Member{ + %KafkaEx.New.Structs.ConsumerGroup.Member{ member_id: "member_id", client_id: "client_id", client_host: "client_host", member_metadata: "member_metadata", - member_assignment: %KafkaEx.New.ConsumerGroup.Member.MemberAssignment{ - version: 0, - user_data: "user_data", - partition_assignments: [ - %KafkaEx.New.ConsumerGroup.Member.MemberAssignment.PartitionAssignment{ - topic: "test-topic", - partitions: [1, 2, 3] - } - ] - } + member_assignment: + %KafkaEx.New.Structs.ConsumerGroup.Member.MemberAssignment{ + version: 0, + user_data: "user_data", + partition_assignments: [ + %KafkaEx.New.Structs.ConsumerGroup.Member.MemberAssignment.PartitionAssignment{ + topic: "test-topic", + partitions: [1, 2, 3] + } + ] + } } ] } @@ -61,7 +63,7 @@ defmodule KafkaEx.New.Protocols.Kayrock.DescribeGroups.ResponseTest do } assert {:ok, [@expected_group]} == - DescribeGroups.Response.parse_response(response) + DescribeGroupsResponse.parse_response(response) end test "for api version 0 - returns error if any group failed" do @@ -73,7 +75,7 @@ defmodule KafkaEx.New.Protocols.Kayrock.DescribeGroups.ResponseTest do } assert {:error, [{"failed", :offset_out_of_range}]} == - DescribeGroups.Response.parse_response(response) + DescribeGroupsResponse.parse_response(response) end test "for api version 1 - returns response if all groups succeeded" do @@ -105,7 +107,7 @@ defmodule KafkaEx.New.Protocols.Kayrock.DescribeGroups.ResponseTest do } assert {:ok, [@expected_group]} == - DescribeGroups.Response.parse_response(response) + DescribeGroupsResponse.parse_response(response) end test "for api version 1 - returns error if any group failed" do @@ -117,7 +119,7 @@ defmodule KafkaEx.New.Protocols.Kayrock.DescribeGroups.ResponseTest do } assert {:error, [{"failed", :offset_out_of_range}]} == - DescribeGroups.Response.parse_response(response) + DescribeGroupsResponse.parse_response(response) end end end diff --git a/test/kafka_ex/new/protocols/kayrock/describe_groups_test.exs b/test/kafka_ex/new/protocols/kayrock/describe_groups_test.exs index 6b37693e..eb07b7a5 100644 --- a/test/kafka_ex/new/protocols/kayrock/describe_groups_test.exs +++ b/test/kafka_ex/new/protocols/kayrock/describe_groups_test.exs @@ -7,12 +7,11 @@ defmodule KafkaEx.New.Protocols.Kayrock.DescribeGroupsTest do describe "build_request/2" do test "builds request for Describe Groups API" do - api_version = 0 consumer_group_names = ["test-group"] - expected_request = %V0.Request{group_ids: groups} + expected_request = %V0.Request{group_ids: consumer_group_names} - assert KayrockDescribeGroups.build_request( - api_version, + assert KayrockDescribeGroups.Request.build_request( + %V0.Request{}, consumer_group_names ) == expected_request end @@ -49,23 +48,23 @@ defmodule KafkaEx.New.Protocols.Kayrock.DescribeGroupsTest do assert {:ok, [ - %KafkaEx.New.ConsumerGroup{ + %KafkaEx.New.Structs.ConsumerGroup{ group_id: "succeeded", state: "stable", protocol_type: "protocol_type", protocol: "protocol", members: [ - %KafkaEx.New.ConsumerGroup.Member{ + %KafkaEx.New.Structs.ConsumerGroup.Member{ member_id: "member_id", client_id: "client_id", client_host: "client_host", member_metadata: "member_metadata", member_assignment: - %KafkaEx.New.ConsumerGroup.Member.MemberAssignment{ + %KafkaEx.New.Structs.ConsumerGroup.Member.MemberAssignment{ version: 0, user_data: "user_data", partition_assignments: [ - %KafkaEx.New.ConsumerGroup.Member.MemberAssignment.PartitionAssignment{ + %KafkaEx.New.Structs.ConsumerGroup.Member.MemberAssignment.PartitionAssignment{ topic: "test-topic", partitions: [1, 2, 3] } @@ -75,7 +74,7 @@ defmodule KafkaEx.New.Protocols.Kayrock.DescribeGroupsTest do ] } ]} == - DescribeGroups.Response.parse_response(response) + KayrockDescribeGroups.Response.parse_response(response) end end end diff --git a/test/kafka_ex/new/structs/broker_test.exs b/test/kafka_ex/new/structs/broker_test.exs index ca45a9d9..d0f1a15e 100644 --- a/test/kafka_ex/new/structs/broker_test.exs +++ b/test/kafka_ex/new/structs/broker_test.exs @@ -1,20 +1,22 @@ defmodule KafkaEx.New.Structs.BrokerTest do use ExUnit.Case, async: false + import KafkaEx.TestHelpers alias KafkaEx.New.Structs.Broker setup do - pid = KafkaEx.TestSupport.Server.start(3040) + port = get_free_port(3040) + pid = KafkaEx.TestSupport.Server.start(port) {:ok, socket} = - KafkaEx.Socket.create('localhost', 3040, [:binary, {:packet, 0}], false) + KafkaEx.Socket.create('localhost', port, [:binary, {:packet, 0}], false) on_exit(fn -> KafkaEx.Socket.close(socket) Process.exit(pid, :normal) end) - {:ok, [socket: socket]} + {:ok, [socket: socket, port: port]} end describe "connect_broker/1" do @@ -66,9 +68,12 @@ defmodule KafkaEx.New.Structs.BrokerTest do refute Broker.has_socket?(broker, socket) end - test "returns false if broker has different socket", %{socket: socket_one} do + test "returns false if broker has different socket", %{ + socket: socket_one, + port: port + } do {:ok, socket_two} = - KafkaEx.Socket.create('localhost', 3040, [:binary, {:packet, 0}], false) + KafkaEx.Socket.create('localhost', port, [:binary, {:packet, 0}], false) broker = %Broker{socket: nil} |> Broker.put_socket(socket_one) diff --git a/test/kafka_ex/new/structs/consumer_group/member_assignment_test.exs b/test/kafka_ex/new/structs/consumer_group/member_assignment_test.exs index 7c552822..204ae9d2 100644 --- a/test/kafka_ex/new/structs/consumer_group/member_assignment_test.exs +++ b/test/kafka_ex/new/structs/consumer_group/member_assignment_test.exs @@ -1,7 +1,7 @@ -defmodule KafkaEx.New.ConsumerGroup.Member.MemberAssignmentTest do +defmodule KafkaEx.New.Structs.ConsumerGroup.Member.MemberAssignmentTest do use ExUnit.Case, async: true - alias KafkaEx.New.ConsumerGroup.Member.MemberAssignment + alias KafkaEx.New.Structs.ConsumerGroup.Member.MemberAssignment describe "from_describe_group_response/1" do test "returns a MemberAssignment struct without partitions" do diff --git a/test/kafka_ex/new/structs/consumer_group/member_test.exs b/test/kafka_ex/new/structs/consumer_group/member_test.exs index 253dce70..ab02af3e 100644 --- a/test/kafka_ex/new/structs/consumer_group/member_test.exs +++ b/test/kafka_ex/new/structs/consumer_group/member_test.exs @@ -1,7 +1,7 @@ -defmodule KafkaEx.New.ConsumerGroup.MemberTest do +defmodule KafkaEx.New.Structs.ConsumerGroup.MemberTest do use ExUnit.Case, async: true - alias KafkaEx.New.ConsumerGroup.Member + alias KafkaEx.New.Structs.ConsumerGroup.Member describe "from_describe_group_response/1" do test "returns a Member struct" do diff --git a/test/kafka_ex/new/structs/consumer_group/partition_assignment_test.exs b/test/kafka_ex/new/structs/consumer_group/partition_assignment_test.exs index 5c302fc2..54e3b38b 100644 --- a/test/kafka_ex/new/structs/consumer_group/partition_assignment_test.exs +++ b/test/kafka_ex/new/structs/consumer_group/partition_assignment_test.exs @@ -1,7 +1,7 @@ -defmodule KafkaEx.New.ConsumerGroup.Member.MemberAssignment.PartitionAssignmentTest do +defmodule KafkaEx.New.Structs.ConsumerGroup.Member.MemberAssignment.PartitionAssignmentTest do use ExUnit.Case, async: true - alias KafkaEx.New.ConsumerGroup.Member.MemberAssignment.PartitionAssignment + alias KafkaEx.New.Structs.ConsumerGroup.Member.MemberAssignment.PartitionAssignment describe "from_describe_group_response/1" do test "builds a struct from a describe group response" do diff --git a/test/kafka_ex/new/structs/consumer_group_test.exs b/test/kafka_ex/new/structs/consumer_group_test.exs index 44ab3d4f..cee03acd 100644 --- a/test/kafka_ex/new/structs/consumer_group_test.exs +++ b/test/kafka_ex/new/structs/consumer_group_test.exs @@ -1,7 +1,7 @@ defmodule KafkaEx.New.ConsumerGroupTest do use ExUnit.Case, async: true - alias KafkaEx.New.ConsumerGroup + alias KafkaEx.New.Structs.ConsumerGroup describe "from_describe_group_response/1" do test "returns a consumer group struct" do diff --git a/test/kafka_ex/socket_test.exs b/test/kafka_ex/socket_test.exs index 6232a864..a245ae45 100644 --- a/test/kafka_ex/socket_test.exs +++ b/test/kafka_ex/socket_test.exs @@ -1,5 +1,6 @@ defmodule KafkaEx.Socket.Test do use ExUnit.Case, async: false + import KafkaEx.TestHelpers setup_all do :ssl.start() @@ -7,8 +8,9 @@ defmodule KafkaEx.Socket.Test do describe "without SSL socket" do setup do - KafkaEx.TestSupport.Server.start(3040) - {:ok, [port: 3040]} + port = get_free_port(3040) + KafkaEx.TestSupport.Server.start(port) + {:ok, [port: port]} end test "create a non SSL socket", context do diff --git a/test/support/test_helpers.ex b/test/support/test_helpers.ex new file mode 100644 index 00000000..7a913b34 --- /dev/null +++ b/test/support/test_helpers.ex @@ -0,0 +1,209 @@ +defmodule KafkaEx.TestHelpers do + alias KafkaEx.New.Client + alias KafkaEx.New.Structs.NodeSelector + require Logger + + @doc """ + Returns a random string of length string_length. + """ + def generate_random_string(string_length \\ 20) do + 1..string_length + |> Enum.map(fn _ -> round(:rand.uniform() * 25 + 65) end) + |> to_string + end + + @doc """ + Returns a random port number that is not in use. + """ + def get_free_port(port) do + case :gen_tcp.listen(port, [:binary]) do + {:ok, socket} -> + :ok = :gen_tcp.close(socket) + port + + {:error, :eaddrinuse} -> + get_free_port(port + 1) + end + end + + @doc """ + Wait for the return value of value_getter to pass the predicate condn + ~> If condn does not pass, sleep for dwell msec and try again + ~> If condn does not pass after max_tries attempts, raises an error + """ + def wait_for_value(value_getter, condn, dwell \\ 500, max_tries \\ 200) do + wait_for_value(value_getter, condn, dwell, max_tries, 0) + end + + @doc """ + Wait for condn to return false or nil; passes through to wait_for_value + returns :ok on success + """ + def wait_for(condn, dwell \\ 500, max_tries \\ 200) do + wait_for_value(fn -> :ok end, fn :ok -> condn.() end, dwell, max_tries) + end + + @doc """ + Execute value_getter, which should return a list, and accumulate + the results until the accumulated results are at least min_length long + """ + def wait_for_accum(value_getter, min_length, dwell \\ 500, max_tries \\ 200) do + wait_for_accum(value_getter, [], min_length, dwell, max_tries) + end + + @doc """ + passthrough to wait_for_accum with 1 as the min_length - i.e., + wait for any response + """ + def wait_for_any(value_getter, dwell \\ 500, max_tries \\ 200) do + wait_for_accum(value_getter, 1, dwell, max_tries) + end + + @doc """ + Returns a list of the brokers in the cluster + """ + def uris do + Application.get_env(:kafka_ex, :brokers) + end + + def utc_time do + {x, {a, b, c}} = + :calendar.local_time() + |> :calendar.local_time_to_universal_time_dst() + |> hd + + {x, {a, b, c + 60}} + end + + def latest_offset_number(topic, partition_id, worker \\ :kafka_ex) do + offset = + KafkaEx.latest_offset(topic, partition_id, worker) + |> first_partition_offset + + offset || 0 + end + + def latest_consumer_offset_number( + topic, + partition, + consumer_group, + worker \\ :kafka_ex, + api_version \\ 0 + ) do + request = %KafkaEx.Protocol.OffsetFetch.Request{ + topic: topic, + partition: partition, + consumer_group: consumer_group, + api_version: api_version + } + + resp = KafkaEx.offset_fetch(worker, request) + resp |> KafkaEx.Protocol.OffsetFetch.Response.last_offset() + end + + def ensure_append_timestamp_topic(client, topic_name) do + resp = + Client.send_request( + client, + %Kayrock.CreateTopics.V0.Request{ + create_topic_requests: [ + %{ + topic: topic_name, + num_partitions: 4, + replication_factor: 1, + replica_assignment: [], + config_entries: [ + %{ + config_name: "message.timestamp.type", + config_value: "LogAppendTime" + } + ] + } + ], + timeout: 1000 + }, + NodeSelector.controller() + ) + + {:ok, + %Kayrock.CreateTopics.V0.Response{ + topic_errors: [%{error_code: error_code}] + }} = resp + + wait_for_topic_to_appear(client, topic_name) + + if error_code in [0, 36] do + {:ok, topic_name} + else + Logger.error("Unable to create topic #{topic_name}: #{inspect(resp)}") + {:error, topic_name} + end + end + + defp wait_for_topic_to_appear(_client, _topic_name, attempts \\ 10) + + defp wait_for_topic_to_appear(_client, _topic_name, attempts) + when attempts <= 0 do + raise "Timeout while waiting for topic to appear" + end + + defp wait_for_topic_to_appear(client, topic_name, attempts) do + {:ok, %{topic_metadata: topic_metadata}} = + Client.send_request( + client, + %Kayrock.Metadata.V0.Request{}, + NodeSelector.topic_partition(topic_name, 0) + ) + + topics = topic_metadata |> Enum.map(& &1.topic) + + unless topic_name in topics do + wait_for_topic_to_appear(client, topic_name, attempts - 1) + end + end + + defp first_partition_offset(:topic_not_found) do + nil + end + + defp first_partition_offset(response) do + [%KafkaEx.Protocol.Offset.Response{partition_offsets: partition_offsets}] = + response + + first_partition = hd(partition_offsets) + first_partition.offset |> hd + end + + defp wait_for_value(_value_getter, _condn, _dwell, max_tries, n) + when n >= max_tries do + raise "too many tries waiting for condition" + end + + defp wait_for_value(value_getter, condn, dwell, max_tries, n) do + value = value_getter.() + + if condn.(value) do + value + else + :timer.sleep(dwell) + wait_for_value(value_getter, condn, dwell, max_tries, n + 1) + end + end + + defp wait_for_accum(_value_getter, acc, min_length, _dwell, _max_tries) + when length(acc) >= min_length do + acc + end + + defp wait_for_accum(value_getter, acc, min_length, dwell, max_tries) do + value = + wait_for_value( + value_getter, + fn v -> length(v) > 0 end, + dwell, + max_tries + ) + + wait_for_accum(value_getter, acc ++ value, min_length, dwell, max_tries) + end +end diff --git a/test/test_helper.exs b/test/test_helper.exs index 9a5a1f04..4059a618 100644 --- a/test/test_helper.exs +++ b/test/test_helper.exs @@ -12,180 +12,3 @@ ExUnit.configure( server_0_p_8_p_0: true ] ) - -defmodule TestHelper do - alias KafkaEx.New.Client - alias KafkaEx.New.Structs.NodeSelector - require Logger - - def generate_random_string(string_length \\ 20) do - 1..string_length - |> Enum.map(fn _ -> round(:rand.uniform() * 25 + 65) end) - |> to_string - end - - # Wait for the return value of value_getter to pass the predicate condn - # If condn does not pass, sleep for dwell msec and try again - # If condn does not pass after max_tries attempts, raises an error - def wait_for_value(value_getter, condn, dwell \\ 500, max_tries \\ 200) do - wait_for_value(value_getter, condn, dwell, max_tries, 0) - end - - # Wait for condn to return false or nil; passes through to wait_for_value - # returns :ok on success - def wait_for(condn, dwell \\ 500, max_tries \\ 200) do - wait_for_value(fn -> :ok end, fn :ok -> condn.() end, dwell, max_tries) - end - - # execute value_getter, which should return a list, and accumulate - # the results until the accumulated results are at least min_length long - def wait_for_accum(value_getter, min_length, dwell \\ 500, max_tries \\ 200) do - wait_for_accum(value_getter, [], min_length, dwell, max_tries) - end - - # passthrough to wait_for_accum with 1 as the min_length - i.e., - # wait for any response - def wait_for_any(value_getter, dwell \\ 500, max_tries \\ 200) do - wait_for_accum(value_getter, 1, dwell, max_tries) - end - - def uris do - Application.get_env(:kafka_ex, :brokers) - end - - def utc_time do - {x, {a, b, c}} = - :calendar.local_time() - |> :calendar.local_time_to_universal_time_dst() - |> hd - - {x, {a, b, c + 60}} - end - - def latest_offset_number(topic, partition_id, worker \\ :kafka_ex) do - offset = - KafkaEx.latest_offset(topic, partition_id, worker) - |> first_partition_offset - - offset || 0 - end - - def latest_consumer_offset_number( - topic, - partition, - consumer_group, - worker \\ :kafka_ex, - api_version \\ 0 - ) do - request = %KafkaEx.Protocol.OffsetFetch.Request{ - topic: topic, - partition: partition, - consumer_group: consumer_group, - api_version: api_version - } - - resp = KafkaEx.offset_fetch(worker, request) - resp |> KafkaEx.Protocol.OffsetFetch.Response.last_offset() - end - - def ensure_append_timestamp_topic(client, topic_name) do - resp = - Client.send_request( - client, - %Kayrock.CreateTopics.V0.Request{ - create_topic_requests: [ - %{ - topic: topic_name, - num_partitions: 4, - replication_factor: 1, - replica_assignment: [], - config_entries: [ - %{ - config_name: "message.timestamp.type", - config_value: "LogAppendTime" - } - ] - } - ], - timeout: 1000 - }, - NodeSelector.controller() - ) - - {:ok, - %Kayrock.CreateTopics.V0.Response{ - topic_errors: [%{error_code: error_code}] - }} = resp - - wait_for_topic_to_appear(client, topic_name) - - if error_code in [0, 36] do - {:ok, topic_name} - else - Logger.error("Unable to create topic #{topic_name}: #{inspect(resp)}") - {:error, topic_name} - end - end - - defp wait_for_topic_to_appear(_client, _topic_name, attempts \\ 10) - - defp wait_for_topic_to_appear(_client, _topic_name, attempts) - when attempts <= 0 do - raise "Timeout while waiting for topic to appear" - end - - defp wait_for_topic_to_appear(client, topic_name, attempts) do - {:ok, %{topic_metadata: topic_metadata}} = - Client.send_request( - client, - %Kayrock.Metadata.V0.Request{}, - NodeSelector.topic_partition(topic_name, 0) - ) - - topics = topic_metadata |> Enum.map(& &1.topic) - - unless topic_name in topics do - wait_for_topic_to_appear(client, topic_name, attempts - 1) - end - end - - defp first_partition_offset(:topic_not_found) do - nil - end - - defp first_partition_offset(response) do - [%KafkaEx.Protocol.Offset.Response{partition_offsets: partition_offsets}] = - response - - first_partition = hd(partition_offsets) - first_partition.offset |> hd - end - - defp wait_for_value(_value_getter, _condn, _dwell, max_tries, n) - when n >= max_tries do - raise "too many tries waiting for condition" - end - - defp wait_for_value(value_getter, condn, dwell, max_tries, n) do - value = value_getter.() - - if condn.(value) do - value - else - :timer.sleep(dwell) - wait_for_value(value_getter, condn, dwell, max_tries, n + 1) - end - end - - defp wait_for_accum(_value_getter, acc, min_length, _dwell, _max_tries) - when length(acc) >= min_length do - acc - end - - defp wait_for_accum(value_getter, acc, min_length, dwell, max_tries) do - value = - wait_for_value(value_getter, fn v -> length(v) > 0 end, dwell, max_tries) - - wait_for_accum(value_getter, acc ++ value, min_length, dwell, max_tries) - end -end