Skip to content

Commit

Permalink
Add List Offsets API code as Kayrock & refactor describe groups
Browse files Browse the repository at this point in the history
No breaking changes to existing api, yet we are extending it with possibility of adding opts to request

s
  • Loading branch information
Argonus committed Apr 19, 2024
1 parent 35cb309 commit 3af1829
Show file tree
Hide file tree
Showing 55 changed files with 1,034 additions and 336 deletions.
3 changes: 2 additions & 1 deletion .formatter.exs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Used by "mix format"
[
inputs: ["mix.exs", "{config,lib,test}/**/*.{ex,exs}"]
inputs: ["mix.exs", "{config,lib,test}/**/*.{ex,exs}"],
line_length: 120
]
3 changes: 1 addition & 2 deletions lib/kafka_ex/consumer_group.ex
Original file line number Diff line number Diff line change
Expand Up @@ -369,8 +369,7 @@ defmodule KafkaEx.ConsumerGroup do
opts = Keyword.put(opts, :supervisor_pid, self())

children = [
{KafkaEx.ConsumerGroup.Manager,
{{gen_consumer_module, consumer_module}, group_name, topics, opts}}
{KafkaEx.ConsumerGroup.Manager, {{gen_consumer_module, consumer_module}, group_name, topics, opts}}
]

Supervisor.init(children,
Expand Down
4 changes: 1 addition & 3 deletions lib/kafka_ex/gen_consumer/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,7 @@ defmodule KafkaEx.GenConsumer.Supervisor do
child_spec_builder = fn topic, partition ->
%{
id: gen_consumer_module,
start:
{gen_consumer_module, :start_link,
[consumer_module, group_name, topic, partition, opts]}
start: {gen_consumer_module, :start_link, [consumer_module, group_name, topic, partition, opts]}
}
end

Expand Down
135 changes: 58 additions & 77 deletions lib/kafka_ex/new/client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,8 @@ defmodule KafkaEx.New.Client do
KafkaEx.New.Structs.NodeSelector.t(),
pos_integer | nil
) :: {:ok, term} | {:error, term}
def send_request(
server,
request,
node_selector,
timeout \\ nil
) do
GenServer.call(
server,
{:kayrock_request, request, node_selector},
timeout_val(timeout)
)
def send_request(server, request, node_selector, timeout \\ nil) do
GenServer.call(server, {:kayrock_request, request, node_selector}, timeout_val(timeout))
end

require Logger
Expand Down Expand Up @@ -165,11 +156,19 @@ defmodule KafkaEx.New.Client do
{:reply, {:ok, topic_metadata}, updated_state}
end

def handle_call({:describe_groups, [consumer_group_name]}, _from, state) do
if KafkaEx.valid_consumer_group?(consumer_group_name) do
{response, updated_state} = describe_group_request(consumer_group_name, state)
def handle_call({:list_offsets, [{topic, partition}], opts}, _from, state) do
case list_offset_request({topic, partition}, opts, state) do
{:error, error} -> {:reply, {:error, error}, state}
{result, updated_state} -> {:reply, result, updated_state}
end
end

{:reply, response, updated_state}
def handle_call({:describe_groups, [consumer_group_name], opts}, _from, state) do
if KafkaEx.valid_consumer_group?(consumer_group_name) do
case describe_group_request(consumer_group_name, opts, state) do
{:error, error} -> {:reply, {:error, error}, state}
{result, updated_state} -> {:reply, result, updated_state}
end
else
{:reply, {:error, :invalid_consumer_group}, state}
end
Expand Down Expand Up @@ -257,66 +256,65 @@ defmodule KafkaEx.New.Client do
end
end

defp describe_group_request(consumer_group_name, state) do
# ----------------------------------------------------------------------------------------------------
defp describe_group_request(consumer_group_name, opts, state) do
node_selector = NodeSelector.consumer_group(consumer_group_name)
req_data = [{:group_names, [consumer_group_name]} | opts]

case RequestBuilder.describe_groups_request(req_data, state) do
{:ok, request} -> handle_describe_group_request(request, node_selector, state)
{:error, error} -> {:error, error}
end
end

defp list_offset_request({topic, partition}, opts, state) do
node_selector = NodeSelector.topic_partition(topic, partition)
req_data = [{:topics, [{topic, [partition]}]} | opts]

case RequestBuilder.lists_offset_request(req_data, state) do
{:ok, request} -> handle_lists_offsets_request(request, node_selector, state)
{:error, error} -> {:error, error}
end
end

[consumer_group_name]
|> RequestBuilder.describe_groups_request(state)
|> handle_describe_group_request(node_selector, state)
# ----------------------------------------------------------------------------------------------------
defp handle_describe_group_request(request, node_selector, state) do
handle_request_with_retry(request, &ResponseParser.describe_groups_response/1, node_selector, state)
end

defp handle_describe_group_request(
_,
_,
_,
retry_count \\ @retry_count,
_last_error \\ nil
)
defp handle_lists_offsets_request(request, node_selector, state) do
handle_request_with_retry(request, &ResponseParser.list_offsets_response/1, node_selector, state)
end

defp handle_describe_group_request(_, _, state, 0, last_error) do
# ----------------------------------------------------------------------------------------------------
defp handle_request_with_retry(_, _, _, _, retry_count \\ @retry_count, _last_error \\ nil)

defp handle_request_with_retry(_, _, _, state, 0, last_error) do
{{:error, last_error}, state}
end

defp handle_describe_group_request(
request,
node_selector,
state,
retry_count,
_last_error
) do
defp handle_request_with_retry(request, parser_fn, node_selector, state, retry_count, _last_error) do
case kayrock_network_request(request, node_selector, state) do
{{:ok, response}, state_out} ->
case ResponseParser.describe_groups_response(response) do
{:ok, consumer_groups} ->
{{:ok, consumer_groups}, state_out}
case parser_fn.(response) do
{:ok, result} ->
{{:ok, result}, state_out}

{:error, [error | _]} ->
Logger.warn(
"Unable to fetch consumer group metadata for #{inspect(request.group_ids)}"
)

handle_describe_group_request(
request,
node_selector,
state,
retry_count - 1,
error
)
request_name = request.__struct__
Logger.warning("Unable to send request #{inspect(request_name)}, failed with error #{inspect(error)}")
handle_request_with_retry(request, parser_fn, node_selector, state, retry_count - 1, error)
end

{_, _state_out} ->
Logger.warn("Unable to fetch consumer group metadata for #{inspect(request.group_ids)}")

handle_describe_group_request(
request,
node_selector,
state,
retry_count - 1,
:unknown
)
request_name = request.__struct__
Logger.warning("Unable to send request #{inspect(request_name)}, failed with error unknown")
error = KafkaEx.New.Structs.Error.build(:unknown, %{})
handle_request_with_retry(request, parser_fn, node_selector, state, retry_count - 1, error)
end
end

# ----------------------------------------------------------------------------------------------------
defp maybe_connect_broker(broker, state) do
case Broker.connected?(broker) do
true ->
Expand Down Expand Up @@ -575,12 +573,7 @@ defmodule KafkaEx.New.Client do
{ok_or_error, response, state_out}
end

defp kayrock_network_request(
request,
node_selector,
state,
network_timeout \\ nil
) do
defp kayrock_network_request(request, node_selector, state, network_timeout \\ nil) do
# produce request have an acks field and if this is 0 then we do not want to
# wait for a response from the broker
synchronous =
Expand All @@ -590,14 +583,7 @@ defmodule KafkaEx.New.Client do
end

network_timeout = config_sync_timeout(network_timeout)

{send_request, updated_state} =
get_send_request_function(
node_selector,
state,
network_timeout,
synchronous
)
{send_request, updated_state} = get_send_request_function(node_selector, state, network_timeout, synchronous)

case send_request do
:no_broker ->
Expand Down Expand Up @@ -661,12 +647,7 @@ defmodule KafkaEx.New.Client do
network_timeout,
synchronous
) do
{broker, updated_state} =
broker_for_partition_with_update(
state,
topic,
partition
)
{broker, updated_state} = broker_for_partition_with_update(state, topic, partition)

if broker do
if synchronous do
Expand Down
44 changes: 37 additions & 7 deletions lib/kafka_ex/new/client/request_builder.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,54 @@ defmodule KafkaEx.New.Client.RequestBuilder do
)

@default_api_version %{
describe_groups: 1
describe_groups: 1,
list_offsets: 1
}

alias KafkaEx.New.Client.State

@doc """
Builds request for Describe Groups API
"""
@spec describe_groups_request([binary], State.t()) :: term
def describe_groups_request(group_names, state) do
api_version = get_api_version(state, :describe_groups)
@spec describe_groups_request(Keyword.t(), State.t()) :: {:ok, term} | {:error, :api_version_no_supported}
def describe_groups_request(request_opts, state) do
case get_api_version(state, :describe_groups, request_opts) do
{:ok, api_version} ->
group_names = Keyword.fetch!(request_opts, :group_names)
req = @protocol.build_request(:describe_groups, api_version, group_names: group_names)
{:ok, req}

@protocol.build_request(:describe_groups, api_version, group_names: group_names)
{:error, error_code} ->
{:error, error_code}
end
end

@doc """
Builds request for List Offsets API
"""
@spec lists_offset_request(Keyword.t(), State.t()) :: {:ok, term} | {:error, :api_version_no_supported}
def lists_offset_request(request_opts, state) do
case get_api_version(state, :list_offsets, request_opts) do
{:ok, api_version} ->
topics = Keyword.fetch!(request_opts, :topics)
req = @protocol.build_request(:list_offsets, api_version, topics: topics)
{:ok, req}

{:error, error_code} ->
{:error, error_code}
end
end

# -----------------------------------------------------------------------------
defp get_api_version(state, request_type) do
defp get_api_version(state, request_type, request_opts) do
default = Map.fetch!(@default_api_version, request_type)
State.max_supported_api_version(state, request_type, default)
requested_version = Keyword.get(request_opts, :api_version, default)
max_supported = State.max_supported_api_version(state, request_type, default)

if requested_version > max_supported do
{:error, :api_version_no_supported}
else
{:ok, requested_version}
end
end
end
19 changes: 12 additions & 7 deletions lib/kafka_ex/new/client/response_parser.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,24 @@ defmodule KafkaEx.New.Client.ResponseParser do
It's main decision point which protocol to use for parsing response
"""
alias KafkaEx.New.Structs.ConsumerGroup
alias KafkaEx.New.Structs.Error
alias KafkaEx.New.Structs.Offset

@protocol Application.compile_env(
:kafka_ex,
:protocol,
KafkaEx.New.Protocols.KayrockProtocol
)
@protocol Application.compile_env(:kafka_ex, :protocol, KafkaEx.New.Protocols.KayrockProtocol)

@doc """
Parses response for Describe Groups API
"""
@spec describe_groups_response(term) ::
{:ok, [ConsumerGroup.t()]} | {:error, term}
@spec describe_groups_response(term) :: {:ok, [ConsumerGroup.t()]} | {:error, term}
def describe_groups_response(response) do
@protocol.parse_response(:describe_groups, response)
end

@doc """
Parses response for List Groups API
"""
@spec list_offsets_response(term) :: {:ok, [Offset.t()]} | {:error, Error.t()}
def list_offsets_response(response) do
@protocol.parse_response(:list_offsets, response)
end
end
19 changes: 7 additions & 12 deletions lib/kafka_ex/new/client/state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,8 @@ defmodule KafkaEx.New.Client.State do
%{state | correlation_id: cid + 1}
end

def select_broker(
%__MODULE__{cluster_metadata: cluster_metadata},
selector
) do
with {:ok, node_id} <-
ClusterMetadata.select_node(cluster_metadata, selector),
def select_broker(%__MODULE__{cluster_metadata: cluster_metadata}, selector) do
with {:ok, node_id} <- ClusterMetadata.select_node(cluster_metadata, selector),
broker <- ClusterMetadata.broker_by_node_id(cluster_metadata, node_id) do
{:ok, broker}
else
Expand Down Expand Up @@ -130,12 +126,11 @@ defmodule KafkaEx.New.Client.State do
%{state | api_versions: api_versions}
end

def max_supported_api_version(
%__MODULE__{api_versions: api_versions},
api,
default
)
when is_atom(api) do
@doc """
Returns max supported api version for request based on cached values in state.
Currently supports Kayrock metadata schema only.
"""
def max_supported_api_version(%__MODULE__{api_versions: api_versions}, api, default) when is_atom(api) do
api_key = Kayrock.KafkaSchemaMetadata.api_key(api)
{_, max_kayrock_version} = Kayrock.KafkaSchemaMetadata.version_range(api)

Expand Down
3 changes: 1 addition & 2 deletions lib/kafka_ex/new/client_compatibility.ex
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,7 @@ defmodule KafkaEx.New.ClientCompatibility do

case response do
{:ok, resp} ->
{:reply, Adapter.delete_topics_response(resp),
State.remove_topics(updated_state, topics)}
{:reply, Adapter.delete_topics_response(resp), State.remove_topics(updated_state, topics)}

_ ->
{:reply, response, updated_state}
Expand Down
Loading

0 comments on commit 3af1829

Please sign in to comment.