Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[apache-kafka] Fix the configuration to allow kafka to run on non-default port #76

Merged
merged 1 commit into from
Jan 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 99 additions & 61 deletions nix/apache-kafka.nix
Original file line number Diff line number Diff line change
@@ -1,64 +1,124 @@
# Based on: https://github.com/NixOS/nixpkgs/blob/master/nixos/modules/services/misc/apache-kafka.nix
{ config, lib, pkgs, name, ... }:
let
mkPropertyString =
let
render = {
bool = lib.boolToString;
int = toString;
list = lib.concatMapStringsSep "," mkPropertyString;
string = lib.id;
};
in
v: render.${lib.strings.typeOf v} v;

stringlySettings = lib.mapAttrs (_: mkPropertyString)
(lib.filterAttrs (_: v: v != null) config.settings);

generator = (pkgs.formats.javaProperties { }).generate;
in
with lib;
{
options = {
enable = mkOption {
description = lib.mdDoc "Whether to enable Apache Kafka.";
default = false;
type = types.bool;
};

brokerId = mkOption {
description = lib.mdDoc "Broker ID.";
default = -1;
type = types.int;
};
enable = mkEnableOption (lib.mdDoc "Apache Kafka event streaming broker");

port = mkOption {
description = lib.mdDoc "Port number the broker should listen on.";
default = 9092;
type = types.port;
};

hostname = mkOption {
description = lib.mdDoc "Hostname the broker should bind to.";
default = "localhost";
type = types.str;
};

dataDir = lib.mkOption {
type = types.str;
default = "./data/${name}";
description = "The apache-kafka data directory";
description = lib.mdDoc "The apache-kafka data directory";
};

logDirs = mkOption {
description = lib.mdDoc "Log file directories inside the data directory.";
default = [ "/kafka-logs" ];
type = types.listOf types.path;
};
settings = mkOption {
description = lib.mdDoc ''
[Kafka broker configuration](https://kafka.apache.org/documentation.html#brokerconfigs)
{file}`server.properties`.

zookeeper = mkOption {
description = lib.mdDoc "Zookeeper connection string";
default = "localhost:2181";
type = types.str;
Note that .properties files contain mappings from string to string.
Keys with dots are NOT represented by nested attrs in these settings,
but instead as quoted strings (ie. `settings."broker.id"`, NOT
`settings.broker.id`).
'';
type = types.submodule {
freeformType = with types; let
primitive = oneOf [ bool int str ];
in
lazyAttrsOf (nullOr (either primitive (listOf primitive)));

options = {
"broker.id" = mkOption {
description = lib.mdDoc "Broker ID. -1 or null to auto-allocate in zookeeper mode.";
default = null;
type = with types; nullOr int;
};

"log.dirs" = mkOption {
description = lib.mdDoc "Log file directories.";
# Deliberaly leave out old default and use the rewrite opportunity
# to have users choose a safer value -- /tmp might be volatile and is a
# slightly scary default choice.
# default = [ "/tmp/apache-kafka" ];
type = with types; listOf string;
default = [ (config.dataDir + "/logs") ];
};

"listeners" = mkOption {
description = lib.mdDoc ''
Kafka Listener List.
See [listeners](https://kafka.apache.org/documentation/#brokerconfigs_listeners).
'';
type = types.listOf types.str;
default = [ "PLAINTEXT://localhost:${builtins.toString config.port}" ];
};
};
};
};

extraProperties = mkOption {
description = lib.mdDoc "Extra properties for server.properties.";
type = types.nullOr types.lines;
clusterId = mkOption {
description = lib.mdDoc ''
KRaft mode ClusterId used for formatting log directories. Can be generated with `kafka-storage.sh random-uuid`
'';
type = with types; nullOr str;
default = null;
};

serverProperties = mkOption {
configFiles.serverProperties = mkOption {
description = lib.mdDoc ''
Complete server.properties content. Other server.properties config
options will be ignored if this option is used.
Kafka server.properties configuration file path.
Defaults to the rendered `settings`.
'';
type = types.nullOr types.lines;
default = null;
type = types.path;
default = generator "server.properties" stringlySettings;
};

configFiles.log4jProperties = mkOption {
description = lib.mdDoc "Kafka log4j property configuration file path";
type = types.path;
default = pkgs.writeText "log4j.properties" config.log4jProperties;
defaultText = ''pkgs.writeText "log4j.properties" config.log4jProperties'';
};

formatLogDirs = mkOption {
description = lib.mdDoc ''
Whether to format log dirs in KRaft mode if all log dirs are
unformatted, ie. they contain no meta.properties.
'';
type = types.bool;
default = false;
};

formatLogDirsIgnoreFormatted = mkOption {
description = lib.mdDoc ''
Whether to ignore already formatted log dirs when formatting log dirs,
instead of failing. Useful when replacing or adding disks.
'';
type = types.bool;
default = false;
};

log4jProperties = mkOption {
Expand All @@ -84,12 +144,7 @@ with lib;
];
};

package = mkOption {
description = lib.mdDoc "The kafka package to use";
default = pkgs.apacheKafka;
defaultText = literalExpression "pkgs.apacheKafka";
type = types.package;
};
package = mkPackageOption pkgs "apacheKafka" { };

jre = mkOption {
description = lib.mdDoc "The JRE with which to run Kafka";
Expand All @@ -106,38 +161,21 @@ with lib;
processes = {
"${name}" =
let
serverProperties =
if config.serverProperties != null then
config.serverProperties
else
''
# Generated by services-flake
broker.id=${toString config.brokerId}
port=${toString config.port}
host.name=${config.hostname}
log.dirs=${concatStringsSep "," (builtins.map (dir: "${config.dataDir}${dir}") config.logDirs)}
zookeeper.connect=${config.zookeeper}
${toString config.extraProperties}
'';

serverConfig = pkgs.writeText "server.properties" serverProperties;
logConfig = pkgs.writeText "log4j.properties" config.log4jProperties;

startScript = pkgs.writeShellScriptBin "start-kafka" ''
${config.jre}/bin/java \
-cp "${config.package}/libs/*" \
-Dlog4j.configuration=file:${logConfig} \
-Dlog4j.configuration=file:${config.configFiles.log4jProperties} \
${toString config.jvmOptions} \
kafka.Kafka \
${serverConfig}
${config.configFiles.serverProperties}
'';
in
{
command = "${startScript}/bin/start-kafka";

readiness_probe = {
# TODO: need to find a better way to check if kafka is ready. Maybe use one of the scripts in bin?
exec.command = "${pkgs.netcat.nc}/bin/nc -z ${config.hostname} ${toString config.port}";
exec.command = "${pkgs.netcat.nc}/bin/nc -z localhost ${builtins.toString config.port}";
initial_delay_seconds = 2;
period_seconds = 10;
timeout_seconds = 4;
Expand Down
24 changes: 21 additions & 3 deletions nix/apache-kafka_test.nix
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,32 @@
services.zookeeper."z1".enable = true;
# To avoid conflicting with `zookeeper_test.nix` in case the tests are run in parallel
services.zookeeper."z1".port = 2182;
services.apache-kafka."k1".enable = true;
services.apache-kafka."k1".zookeeper = "localhost:2182";
services.apache-kafka."k1" = {
enable = true;
port = 9094;
settings = {
# Since the available brokers are only 1
"offsets.topic.replication.factor" = 1;
"zookeeper.connect" = [ "localhost:2182" ];
};
};
# kafka should start only after zookeeper is healthy
settings.processes.k1.depends_on."z1".condition = "process_healthy";
settings.processes.test =
{
command = pkgs.writeShellApplication {
runtimeInputs = [ pkgs.bash config.services.apache-kafka.k1.package ];
text = ''
bash kafka-topics.sh --list --bootstrap-server localhost:9092
# Create a topic
kafka-topics.sh --create --bootstrap-server localhost:9094 --partitions 1 \
--replication-factor 1 --topic testtopic

# Producer
echo 'test 1' | kafka-console-producer.sh --broker-list localhost:9094 --topic testtopic

# Consumer
kafka-console-consumer.sh --bootstrap-server localhost:9094 --topic testtopic \
--from-beginning --max-messages 1 | grep -q "test 1"
'';
name = "kafka-test";
};
Expand Down