Skip to content

Commit

Permalink
Test with Kafka 3.6
Browse files Browse the repository at this point in the history
  • Loading branch information
zmstone committed Jun 16, 2024
1 parent adda7c3 commit fcad1a1
Show file tree
Hide file tree
Showing 14 changed files with 175 additions and 137 deletions.
11 changes: 6 additions & 5 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ on:
branches:
- master
env:
OTP_VERSION: "24.1"
REBAR_VERSION: "3.17.0"
OTP_VERSION: "26"
REBAR_VERSION: "3.20.0"

jobs:
lint:
Expand Down Expand Up @@ -44,8 +44,8 @@ jobs:
strategy:
fail-fast: false
matrix:
otp: ["24.1", "23.3.4.7", "22.3.4.21"]
kafka: ["2.4", "1.1", "0.11"]
otp: ["26"]
kafka: ["0.9", "0.10", "0.11", "2.8", "1.1", "3.6"]
steps:
- name: Checkout
uses: actions/checkout@v2
Expand All @@ -69,7 +69,8 @@ jobs:
run: |
export KAFKA_VERSION=${{ matrix.kafka }}
echo "Running Kafka ${KAFKA_VERSION}"
scripts/setup-test-env.sh && rebar3 do ct,eunit
make test-env
make t
- name: Store test logs
uses: actions/upload-artifact@v1
if: always()
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,4 @@ relx
docker/
TAGS
.vscode/
test/data/ssl/*.pem
6 changes: 6 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
KAFKA_VERSION ?= 3.6
export KAFKA_VERSION
all: compile

compile:
Expand All @@ -8,6 +10,10 @@ lint:

test-env:
@./scripts/setup-test-env.sh
@mkdir -p ./test/data/ssl
@docker cp kafka-1:/localhost-ca-crt.pem ./test/data/ssl/ca.pem
@docker cp kafka-1:/localhost-client-key.pem ./test/data/ssl/client-key.pem
@docker cp kafka-1:/localhost-client-crt.pem ./test/data/ssl/client-crt.pem

ut:
@rebar3 eunit -v --cover_export_name ut-$(KAFKA_VERSION)
Expand Down
6 changes: 3 additions & 3 deletions scripts/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ version: "2"

services:
zookeeper:
image: "zmstone/kafka:${KAFKA_VERSION}"
image: "zmstone/kafka:${KAFKA_IMAGE_VERSION}"
container_name: zookeeper
command: run zookeeper
network_mode: host
kafka_1:
depends_on:
- zookeeper
image: "zmstone/kafka:${KAFKA_VERSION}"
image: "zmstone/kafka:${KAFKA_IMAGE_VERSION}"
container_name: "kafka-1"
network_mode: host
environment:
Expand All @@ -23,7 +23,7 @@ services:
kafka_2:
depends_on:
- zookeeper
image: "zmstone/kafka:${KAFKA_VERSION}"
image: "zmstone/kafka:${KAFKA_IMAGE_VERSION}"
container_name: "kafka-2"
network_mode: host
environment:
Expand Down
77 changes: 56 additions & 21 deletions scripts/setup-test-env.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
#!/bin/bash -eu

if [ -n "${DEBUG:-}" ]; then
set -x
fi

docker ps > /dev/null || {
echo "You must be a member of docker group to run this script"
exit 1
Expand All @@ -18,46 +22,77 @@ function docker_compose {
fi
}

VERSION=${KAFKA_VERSION:-2.4}
if [ -z $VERSION ]; then VERSION=$1; fi
KAFKA_VERSION=${KAFKA_VERSION:-3.6}
if [ -z $KAFKA_VERSION ]; then KAFKA_VERSION=$1; fi

case $VERSION in
case $KAFKA_VERSION in
0.9*)
KAFKA_VERSION="0.9";;
0.10*)
VERSION="0.10";;
KAFKA_VERSION="0.10";;
0.11*)
VERSION="0.11";;
KAFKA_VERSION="0.11";;
1.*)
VERSION="1.1";;
KAFKA_VERSION="1.1";;
2.*)
VERSION="2.4";;
KAFKA_VERSION="2.8";;
3.*)
KAFKA_VERSION="3.6";;
*)
VERSION="2.4";;
KAFKA_VERSION="3.6";;
esac

echo "Using KAFKA_VERSION=$VERSION"
export KAFKA_VERSION=$VERSION
export KAFKA_IMAGE_VERSION="1.1-${KAFKA_VERSION}"
echo "env KAFKA_IMAGE_VERSION=$KAFKA_IMAGE_VERSION"

TD="$(cd "$(dirname "$0")" && pwd)"

docker_compose -f $TD/docker-compose.yml down || true
docker_compose -f $TD/docker-compose.yml up -d

if [[ "$KAFKA_VERSION" == 2* ]] || [[ "$KAFKA_VERSION" == 3* ]]; then
MAYBE_ZOOKEEPER="--bootstrap-server localhost:9092"
else
MAYBE_ZOOKEEPER="--zookeeper localhost:2181"
fi

n=0
while [ "$(docker exec kafka-1 bash -c '/opt/kafka/bin/kafka-topics.sh --zookeeper localhost --list')" != '' ]; do
if [ $n -gt 4 ]; then
echo "timeout waiting for kakfa_1"
exit 1
TOPIC_LIST_CMD="/opt/kafka/bin/kafka-topics.sh $MAYBE_ZOOKEEPER --list"
MAX_WAIT_SEC=10

function wait_for_kafka {
local which_kafka="$1"
local n=0
local port=':9092'
local topic_list listener
if [ "$which_kafka" = 'kafka-2' ]; then
port=':9192'
fi
n=$(( n + 1 ))
sleep 1
done
while true; do
listener="$(netstat -tnlp 2>&1 | grep $port || true)"
if [ "$listener" != '' ]; then
topic_list="$(docker exec $which_kafka $TOPIC_LIST_CMD 2>&1)"
if [ "${topic_list-}" = '' ]; then
break
fi
fi
if [ $n -gt $MAX_WAIT_SEC ]; then
echo "timeout waiting for kafka-1"
echo "last print: ${topic_list:-}"
exit 1
fi
n=$(( n + 1 ))
sleep 1
done
}

wait_for_kafka kafka-1
wait_for_kafka kafka-2

function create_topic {
TOPIC_NAME="$1"
PARTITIONS="${2:-1}"
REPLICAS="${3:-1}"
CMD="/opt/kafka/bin/kafka-topics.sh --zookeeper localhost --create --partitions $PARTITIONS --replication-factor $REPLICAS --topic $TOPIC_NAME --config min.insync.replicas=1"
CMD="/opt/kafka/bin/kafka-topics.sh $MAYBE_ZOOKEEPER --create --partitions $PARTITIONS --replication-factor $REPLICAS --topic $TOPIC_NAME --config min.insync.replicas=1"
docker exec kafka-1 bash -c "$CMD"
}

Expand All @@ -80,7 +115,7 @@ create_topic "brod_compression_SUITE"
create_topic "lz4-test"
create_topic "test-topic"

if [[ "$KAFKA_VERSION" = 2* ]]; then
if [[ "$KAFKA_VERSION" = 2* ]] || [[ "$KAFKA_VERSION" = 3* ]]; then
MAYBE_NEW_CONSUMER=""
else
MAYBE_NEW_CONSUMER="--new-consumer"
Expand All @@ -90,5 +125,5 @@ docker exec kafka-1 /opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server l

# for kafka 0.11 or later, add sasl-scram test credentials
if [[ "$KAFKA_VERSION" != 0.9* ]] && [[ "$KAFKA_VERSION" != 0.10* ]]; then
docker exec kafka-1 /opt/kafka/bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=ecila],SCRAM-SHA-512=[password=ecila]' --entity-type users --entity-name alice
docker exec kafka-1 /opt/kafka/bin/kafka-configs.sh $MAYBE_ZOOKEEPER --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=ecila],SCRAM-SHA-512=[password=ecila]' --entity-type users --entity-name alice
fi
2 changes: 1 addition & 1 deletion src/brod_consumer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ handle_batches(Header, [], #state{begin_offset = BeginOffset} = State0) ->
State =
case BeginOffset < StableOffset of
true ->
%% There are chances that kafka may return empty message set
%% There are chances that Kafka may return empty message set
%% when messages are deleted from a compacted topic.
%% Since there is no way to know how big the 'hole' is
%% we can only bump begin_offset with +1 and try again.
Expand Down
21 changes: 17 additions & 4 deletions src/brod_utils.erl
Original file line number Diff line number Diff line change
Expand Up @@ -453,12 +453,25 @@ fetch(Conn, ReqFun, Offset, MaxBytes) ->
fetch(Conn, ReqFun, Offset, Size);
{ok, #{header := Header, batches := Batches}} ->
StableOffset = get_stable_offset(Header),
{NewBeginOffset, Msgs} = flatten_batches(Offset, Header, Batches),
{NewBeginOffset0, Msgs} = flatten_batches(Offset, Header, Batches),
case Offset < StableOffset andalso Msgs =:= [] of
true ->
%% Not reached the latest stable offset yet,
%% but received an empty batch-set (all messages are dropped).
%% try again with new begin-offset
NewBeginOffset =
case NewBeginOffset0 > Offset of
true ->
%% Not reached the latest stable offset yet,
%% but resulted in an empty batch-set,
%% i.e. all messages are dropped due to they are before
%% the last fetch Offset.
%% try again with new begin-offset.
NewBeginOffset0;
false when NewBeginOffset0 =:= Offset ->
%% There are chances that Kafka may return empty message set
%% when messages are deleted from a compacted topic.
%% Since there is no way to know how big the 'hole' is
%% we can only bump begin_offset with +1 and try again.
NewBeginOffset0 + 1
end,
fetch(Conn, ReqFun, NewBeginOffset, MaxBytes);
false ->
{ok, {StableOffset, Msgs}}
Expand Down
Loading

0 comments on commit fcad1a1

Please sign in to comment.