Skip to content

Commit

Permalink
Kafka and Zookeeper services (#42)
Browse files Browse the repository at this point in the history
  • Loading branch information
shivaraj-bh authored Aug 29, 2023
1 parent 6e7c208 commit b93a612
Show file tree
Hide file tree
Showing 7 changed files with 354 additions and 1 deletion.
155 changes: 155 additions & 0 deletions nix/apache-kafka.nix
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
# Based on: https://github.com/NixOS/nixpkgs/blob/master/nixos/modules/services/misc/apache-kafka.nix
{ config, lib, pkgs, name, ... }:

with lib;
{
options = {
enable = mkOption {
description = lib.mdDoc "Whether to enable Apache Kafka.";
default = false;
type = types.bool;
};

brokerId = mkOption {
description = lib.mdDoc "Broker ID.";
default = -1;
type = types.int;
};

port = mkOption {
description = lib.mdDoc "Port number the broker should listen on.";
default = 9092;
type = types.port;
};

hostname = mkOption {
description = lib.mdDoc "Hostname the broker should bind to.";
default = "localhost";
type = types.str;
};

dataDir = lib.mkOption {
type = types.str;
default = "./data/${name}";
description = "The apache-kafka data directory";
};

logDirs = mkOption {
description = lib.mdDoc "Log file directories inside the data directory.";
default = [ "/kafka-logs" ];
type = types.listOf types.path;
};

zookeeper = mkOption {
description = lib.mdDoc "Zookeeper connection string";
default = "localhost:2181";
type = types.str;
};

extraProperties = mkOption {
description = lib.mdDoc "Extra properties for server.properties.";
type = types.nullOr types.lines;
default = null;
};

serverProperties = mkOption {
description = lib.mdDoc ''
Complete server.properties content. Other server.properties config
options will be ignored if this option is used.
'';
type = types.nullOr types.lines;
default = null;
};

log4jProperties = mkOption {
description = lib.mdDoc "Kafka log4j property configuration.";
default = ''
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
'';
type = types.lines;
};

jvmOptions = mkOption {
description = lib.mdDoc "Extra command line options for the JVM running Kafka.";
default = [ ];
type = types.listOf types.str;
example = [
"-Djava.net.preferIPv4Stack=true"
"-Dcom.sun.management.jmxremote"
"-Dcom.sun.management.jmxremote.local.only=true"
];
};

package = mkOption {
description = lib.mdDoc "The kafka package to use";
default = pkgs.apacheKafka;
defaultText = literalExpression "pkgs.apacheKafka";
type = types.package;
};

jre = mkOption {
description = lib.mdDoc "The JRE with which to run Kafka";
default = config.package.passthru.jre;
defaultText = literalExpression "pkgs.apacheKafka.passthru.jre";
type = types.package;
};

outputs.settings = lib.mkOption {
type = types.deferredModule;
internal = true;
readOnly = true;
default = {
processes = {
"${name}" =
let
serverProperties =
if config.serverProperties != null then
config.serverProperties
else
''
# Generated by services-flake
broker.id=${toString config.brokerId}
port=${toString config.port}
host.name=${config.hostname}
log.dirs=${concatStringsSep "," (builtins.map (dir: "${config.dataDir}${dir}") config.logDirs)}
zookeeper.connect=${config.zookeeper}
${toString config.extraProperties}
'';

serverConfig = pkgs.writeText "server.properties" serverProperties;
logConfig = pkgs.writeText "log4j.properties" config.log4jProperties;

startScript = pkgs.writeShellScriptBin "start-kafka" ''
${config.jre}/bin/java \
-cp "${config.package}/libs/*" \
-Dlog4j.configuration=file:${logConfig} \
${toString config.jvmOptions} \
kafka.Kafka \
${serverConfig}
'';
in
{
command = "${startScript}/bin/start-kafka";

readiness_probe = {
# TODO: need to find a better way to check if kafka is ready. Maybe use one of the scripts in bin?
exec.command = "${pkgs.netcat.nc}/bin/nc -z ${config.hostname} ${toString config.port}";
initial_delay_seconds = 2;
period_seconds = 10;
timeout_seconds = 4;
success_threshold = 1;
failure_threshold = 5;
};

availability.restart = "on_failure";
};
};
};
};

};
}
15 changes: 15 additions & 0 deletions nix/apache-kafka_test.nix
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{ pkgs, config, ... }: {
services.zookeeper."z1".enable = true;
services.apache-kafka."k1".enable = true;
settings.processes.test =
{
command = pkgs.writeShellApplication {
runtimeInputs = [ pkgs.bash config.package ];
text = ''
bash kafka-topics.sh --list --bootstrap-server localhost:9092
'';
name = "kafka-test";
};
depends_on."k1".condition = "process_healthy";
};
}
2 changes: 2 additions & 0 deletions nix/default.nix
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,11 @@ let
in
{
imports = builtins.map multiService [
./apache-kafka.nix
./postgres.nix
./redis.nix
./redis-cluster.nix
./elasticsearch.nix
./zookeeper.nix
];
}
2 changes: 1 addition & 1 deletion nix/elasticsearch_test.nix
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
command = pkgs.writeShellApplication {
runtimeInputs = [ pkgs.curl ];
text = ''
curl 127.0.0.1:9200/_cat/health
curl 127.0.0.1:9200/_cat/health
'';
name = "elasticsearch-test";
};
Expand Down
155 changes: 155 additions & 0 deletions nix/zookeeper.nix
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
# Based on: https://github.com/NixOS/nixpkgs/blob/master/nixos/modules/services/misc/zookeeper.nix
{ config, lib, pkgs, name, ... }:

with lib;
{
options = {
enable = mkEnableOption (lib.mdDoc "Zookeeper");

port = mkOption {
description = lib.mdDoc "Zookeeper Client port.";
default = 2181;
type = types.port;
};

dataDir = mkOption {
type = types.str;
default = "./data/${name}";
description = lib.mdDoc ''
Data directory for Zookeeper
'';
};

id = mkOption {
description = lib.mdDoc "Zookeeper ID.";
default = 0;
type = types.int;
};

purgeInterval = mkOption {
description = lib.mdDoc ''
The time interval in hours for which the purge task has to be triggered. Set to a positive integer (1 and above) to enable the auto purging.
'';
default = 1;
type = types.int;
};

extraConf = mkOption {
description = lib.mdDoc "Extra configuration for Zookeeper.";
type = types.lines;
default = ''
initLimit=5
syncLimit=2
tickTime=2000
'';
};

servers = mkOption {
description = lib.mdDoc "All Zookeeper Servers.";
default = "";
type = types.lines;
example = ''
server.0=host0:2888:3888
server.1=host1:2888:3888
server.2=host2:2888:3888
'';
};

logging = mkOption {
description = lib.mdDoc "Zookeeper logging configuration.";
default = ''
zookeeper.root.logger=INFO, CONSOLE
log4j.rootLogger=INFO, CONSOLE
log4j.logger.org.apache.zookeeper.audit.Log4jAuditLogger=INFO, CONSOLE
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=[myid:%X{myid}] - %-5p [%t:%C{1}@%L] - %m%n
'';
type = types.lines;
};

extraCmdLineOptions = mkOption {
description = lib.mdDoc "Extra command line options for the Zookeeper launcher.";
default = [ "-Dcom.sun.management.jmxremote" "-Dcom.sun.management.jmxremote.local.only=true" ];
type = types.listOf types.str;
example = [ "-Djava.net.preferIPv4Stack=true" "-Dcom.sun.management.jmxremote" "-Dcom.sun.management.jmxremote.local.only=true" ];
};

preferIPv4 = mkOption {
type = types.bool;
default = true;
description = lib.mdDoc ''
Add the -Djava.net.preferIPv4Stack=true flag to the Zookeeper server.
'';
};

package = mkOption {
description = lib.mdDoc "The zookeeper package to use";
default = pkgs.zookeeper;
defaultText = literalExpression "pkgs.zookeeper";
type = types.package;
};

jre = mkOption {
description = lib.mdDoc "The JRE with which to run Zookeeper";
default = config.package.jre;
defaultText = literalExpression "pkgs.zookeeper.jre";
example = literalExpression "pkgs.jre";
type = types.package;
};
outputs.settings = lib.mkOption {
type = types.deferredModule;
internal = true;
readOnly = true;
default = {
processes = {
"${name}" =
let
zookeeperConfig = ''
dataDir=${config.dataDir}
clientPort=${toString config.port}
autopurge.purgeInterval=${toString config.purgeInterval}
${config.extraConf}
${config.servers}
admin.enableServer=false
'';

configDir = pkgs.buildEnv {
name = "zookeeper-conf";
paths = [
(pkgs.writeTextDir "zoo.cfg" zookeeperConfig)
(pkgs.writeTextDir "log4j.properties" config.logging)
];
};

startScript = pkgs.writeShellScriptBin "start-zookeeper" ''
${config.jre}/bin/java \
-cp "${config.package}/lib/*:${configDir}" \
${escapeShellArgs config.extraCmdLineOptions} \
-Dzookeeper.datadir.autocreate=true \
${optionalString config.preferIPv4 "-Djava.net.preferIPv4Stack=true"} \
org.apache.zookeeper.server.quorum.QuorumPeerMain \
${configDir}/zoo.cfg
'';
in
{
command = "${startScript}/bin/start-zookeeper";

readiness_probe = {
# TODO: need to find a better way to check if zookeeper is ready, maybe `zkCli.sh`?
exec.command = "${pkgs.netcat.nc}/bin/nc -z localhost ${toString config.port}";
initial_delay_seconds = 2;
period_seconds = 10;
timeout_seconds = 4;
success_threshold = 1;
failure_threshold = 5;
};

availability.restart = "on_failure";
};
};
};
};

};
}
14 changes: 14 additions & 0 deletions nix/zookeeper_test.nix
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{ pkgs, config, ... }: {
services.zookeeper."z1".enable = true;
settings.processes.test =
{
command = pkgs.writeShellApplication {
runtimeInputs = [ pkgs.bash config.package ];
text = ''
bash zkCli.sh -server localhost:2181 get /
'';
name = "zookeeper-test";
};
depends_on."z1".condition = "process_healthy";
};
}
12 changes: 12 additions & 0 deletions test/flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,18 @@
../nix/elasticsearch_test.nix
];
};
apache-kafka = {
imports = [
inputs.services-flake.processComposeModules.default
../nix/apache-kafka_test.nix
];
};
zookeeper = {
imports = [
inputs.services-flake.processComposeModules.default
../nix/zookeeper_test.nix
];
};
};
};
};
Expand Down

0 comments on commit b93a612

Please sign in to comment.