From b93a612aa7057fbb395c79a915672f9b6567ffea Mon Sep 17 00:00:00 2001 From: Shivaraj B H Date: Tue, 29 Aug 2023 17:04:04 +0530 Subject: [PATCH] Kafka and Zookeeper services (#42) --- nix/apache-kafka.nix | 155 +++++++++++++++++++++++++++++++++++++ nix/apache-kafka_test.nix | 15 ++++ nix/default.nix | 2 + nix/elasticsearch_test.nix | 2 +- nix/zookeeper.nix | 155 +++++++++++++++++++++++++++++++++++++ nix/zookeeper_test.nix | 14 ++++ test/flake.nix | 12 +++ 7 files changed, 354 insertions(+), 1 deletion(-) create mode 100644 nix/apache-kafka.nix create mode 100644 nix/apache-kafka_test.nix create mode 100644 nix/zookeeper.nix create mode 100644 nix/zookeeper_test.nix diff --git a/nix/apache-kafka.nix b/nix/apache-kafka.nix new file mode 100644 index 00000000..7f0f8720 --- /dev/null +++ b/nix/apache-kafka.nix @@ -0,0 +1,155 @@ +# Based on: https://github.com/NixOS/nixpkgs/blob/master/nixos/modules/services/misc/apache-kafka.nix +{ config, lib, pkgs, name, ... }: + +with lib; +{ + options = { + enable = mkOption { + description = lib.mdDoc "Whether to enable Apache Kafka."; + default = false; + type = types.bool; + }; + + brokerId = mkOption { + description = lib.mdDoc "Broker ID."; + default = -1; + type = types.int; + }; + + port = mkOption { + description = lib.mdDoc "Port number the broker should listen on."; + default = 9092; + type = types.port; + }; + + hostname = mkOption { + description = lib.mdDoc "Hostname the broker should bind to."; + default = "localhost"; + type = types.str; + }; + + dataDir = lib.mkOption { + type = types.str; + default = "./data/${name}"; + description = "The apache-kafka data directory"; + }; + + logDirs = mkOption { + description = lib.mdDoc "Log file directories inside the data directory."; + default = [ "/kafka-logs" ]; + type = types.listOf types.path; + }; + + zookeeper = mkOption { + description = lib.mdDoc "Zookeeper connection string"; + default = "localhost:2181"; + type = types.str; + }; + + extraProperties = mkOption { + description = lib.mdDoc "Extra properties for server.properties."; + type = types.nullOr types.lines; + default = null; + }; + + serverProperties = mkOption { + description = lib.mdDoc '' + Complete server.properties content. Other server.properties config + options will be ignored if this option is used. + ''; + type = types.nullOr types.lines; + default = null; + }; + + log4jProperties = mkOption { + description = lib.mdDoc "Kafka log4j property configuration."; + default = '' + log4j.rootLogger=INFO, stdout + + log4j.appender.stdout=org.apache.log4j.ConsoleAppender + log4j.appender.stdout.layout=org.apache.log4j.PatternLayout + log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n + ''; + type = types.lines; + }; + + jvmOptions = mkOption { + description = lib.mdDoc "Extra command line options for the JVM running Kafka."; + default = [ ]; + type = types.listOf types.str; + example = [ + "-Djava.net.preferIPv4Stack=true" + "-Dcom.sun.management.jmxremote" + "-Dcom.sun.management.jmxremote.local.only=true" + ]; + }; + + package = mkOption { + description = lib.mdDoc "The kafka package to use"; + default = pkgs.apacheKafka; + defaultText = literalExpression "pkgs.apacheKafka"; + type = types.package; + }; + + jre = mkOption { + description = lib.mdDoc "The JRE with which to run Kafka"; + default = config.package.passthru.jre; + defaultText = literalExpression "pkgs.apacheKafka.passthru.jre"; + type = types.package; + }; + + outputs.settings = lib.mkOption { + type = types.deferredModule; + internal = true; + readOnly = true; + default = { + processes = { + "${name}" = + let + serverProperties = + if config.serverProperties != null then + config.serverProperties + else + '' + # Generated by services-flake + broker.id=${toString config.brokerId} + port=${toString config.port} + host.name=${config.hostname} + log.dirs=${concatStringsSep "," (builtins.map (dir: "${config.dataDir}${dir}") config.logDirs)} + zookeeper.connect=${config.zookeeper} + ${toString config.extraProperties} + ''; + + serverConfig = pkgs.writeText "server.properties" serverProperties; + logConfig = pkgs.writeText "log4j.properties" config.log4jProperties; + + startScript = pkgs.writeShellScriptBin "start-kafka" '' + ${config.jre}/bin/java \ + -cp "${config.package}/libs/*" \ + -Dlog4j.configuration=file:${logConfig} \ + ${toString config.jvmOptions} \ + kafka.Kafka \ + ${serverConfig} + ''; + in + { + command = "${startScript}/bin/start-kafka"; + + readiness_probe = { + # TODO: need to find a better way to check if kafka is ready. Maybe use one of the scripts in bin? + exec.command = "${pkgs.netcat.nc}/bin/nc -z ${config.hostname} ${toString config.port}"; + initial_delay_seconds = 2; + period_seconds = 10; + timeout_seconds = 4; + success_threshold = 1; + failure_threshold = 5; + }; + + availability.restart = "on_failure"; + }; + }; + }; + }; + + }; +} diff --git a/nix/apache-kafka_test.nix b/nix/apache-kafka_test.nix new file mode 100644 index 00000000..5f07fb2d --- /dev/null +++ b/nix/apache-kafka_test.nix @@ -0,0 +1,15 @@ +{ pkgs, config, ... }: { + services.zookeeper."z1".enable = true; + services.apache-kafka."k1".enable = true; + settings.processes.test = + { + command = pkgs.writeShellApplication { + runtimeInputs = [ pkgs.bash config.package ]; + text = '' + bash kafka-topics.sh --list --bootstrap-server localhost:9092 + ''; + name = "kafka-test"; + }; + depends_on."k1".condition = "process_healthy"; + }; +} diff --git a/nix/default.nix b/nix/default.nix index 43522a48..a2210fb6 100644 --- a/nix/default.nix +++ b/nix/default.nix @@ -34,9 +34,11 @@ let in { imports = builtins.map multiService [ + ./apache-kafka.nix ./postgres.nix ./redis.nix ./redis-cluster.nix ./elasticsearch.nix + ./zookeeper.nix ]; } diff --git a/nix/elasticsearch_test.nix b/nix/elasticsearch_test.nix index 01c3f631..da512d6c 100644 --- a/nix/elasticsearch_test.nix +++ b/nix/elasticsearch_test.nix @@ -5,7 +5,7 @@ command = pkgs.writeShellApplication { runtimeInputs = [ pkgs.curl ]; text = '' - curl 127.0.0.1:9200/_cat/health + curl 127.0.0.1:9200/_cat/health ''; name = "elasticsearch-test"; }; diff --git a/nix/zookeeper.nix b/nix/zookeeper.nix new file mode 100644 index 00000000..f75dced8 --- /dev/null +++ b/nix/zookeeper.nix @@ -0,0 +1,155 @@ +# Based on: https://github.com/NixOS/nixpkgs/blob/master/nixos/modules/services/misc/zookeeper.nix +{ config, lib, pkgs, name, ... }: + +with lib; +{ + options = { + enable = mkEnableOption (lib.mdDoc "Zookeeper"); + + port = mkOption { + description = lib.mdDoc "Zookeeper Client port."; + default = 2181; + type = types.port; + }; + + dataDir = mkOption { + type = types.str; + default = "./data/${name}"; + description = lib.mdDoc '' + Data directory for Zookeeper + ''; + }; + + id = mkOption { + description = lib.mdDoc "Zookeeper ID."; + default = 0; + type = types.int; + }; + + purgeInterval = mkOption { + description = lib.mdDoc '' + The time interval in hours for which the purge task has to be triggered. Set to a positive integer (1 and above) to enable the auto purging. + ''; + default = 1; + type = types.int; + }; + + extraConf = mkOption { + description = lib.mdDoc "Extra configuration for Zookeeper."; + type = types.lines; + default = '' + initLimit=5 + syncLimit=2 + tickTime=2000 + ''; + }; + + servers = mkOption { + description = lib.mdDoc "All Zookeeper Servers."; + default = ""; + type = types.lines; + example = '' + server.0=host0:2888:3888 + server.1=host1:2888:3888 + server.2=host2:2888:3888 + ''; + }; + + logging = mkOption { + description = lib.mdDoc "Zookeeper logging configuration."; + default = '' + zookeeper.root.logger=INFO, CONSOLE + log4j.rootLogger=INFO, CONSOLE + log4j.logger.org.apache.zookeeper.audit.Log4jAuditLogger=INFO, CONSOLE + log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender + log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout + log4j.appender.CONSOLE.layout.ConversionPattern=[myid:%X{myid}] - %-5p [%t:%C{1}@%L] - %m%n + ''; + type = types.lines; + }; + + extraCmdLineOptions = mkOption { + description = lib.mdDoc "Extra command line options for the Zookeeper launcher."; + default = [ "-Dcom.sun.management.jmxremote" "-Dcom.sun.management.jmxremote.local.only=true" ]; + type = types.listOf types.str; + example = [ "-Djava.net.preferIPv4Stack=true" "-Dcom.sun.management.jmxremote" "-Dcom.sun.management.jmxremote.local.only=true" ]; + }; + + preferIPv4 = mkOption { + type = types.bool; + default = true; + description = lib.mdDoc '' + Add the -Djava.net.preferIPv4Stack=true flag to the Zookeeper server. + ''; + }; + + package = mkOption { + description = lib.mdDoc "The zookeeper package to use"; + default = pkgs.zookeeper; + defaultText = literalExpression "pkgs.zookeeper"; + type = types.package; + }; + + jre = mkOption { + description = lib.mdDoc "The JRE with which to run Zookeeper"; + default = config.package.jre; + defaultText = literalExpression "pkgs.zookeeper.jre"; + example = literalExpression "pkgs.jre"; + type = types.package; + }; + outputs.settings = lib.mkOption { + type = types.deferredModule; + internal = true; + readOnly = true; + default = { + processes = { + "${name}" = + let + zookeeperConfig = '' + dataDir=${config.dataDir} + clientPort=${toString config.port} + autopurge.purgeInterval=${toString config.purgeInterval} + ${config.extraConf} + ${config.servers} + admin.enableServer=false + ''; + + configDir = pkgs.buildEnv { + name = "zookeeper-conf"; + paths = [ + (pkgs.writeTextDir "zoo.cfg" zookeeperConfig) + (pkgs.writeTextDir "log4j.properties" config.logging) + ]; + }; + + startScript = pkgs.writeShellScriptBin "start-zookeeper" '' + ${config.jre}/bin/java \ + -cp "${config.package}/lib/*:${configDir}" \ + ${escapeShellArgs config.extraCmdLineOptions} \ + -Dzookeeper.datadir.autocreate=true \ + ${optionalString config.preferIPv4 "-Djava.net.preferIPv4Stack=true"} \ + org.apache.zookeeper.server.quorum.QuorumPeerMain \ + ${configDir}/zoo.cfg + ''; + in + { + command = "${startScript}/bin/start-zookeeper"; + + readiness_probe = { + # TODO: need to find a better way to check if zookeeper is ready, maybe `zkCli.sh`? + exec.command = "${pkgs.netcat.nc}/bin/nc -z localhost ${toString config.port}"; + initial_delay_seconds = 2; + period_seconds = 10; + timeout_seconds = 4; + success_threshold = 1; + failure_threshold = 5; + }; + + availability.restart = "on_failure"; + }; + }; + }; + }; + + }; +} diff --git a/nix/zookeeper_test.nix b/nix/zookeeper_test.nix new file mode 100644 index 00000000..22e9b825 --- /dev/null +++ b/nix/zookeeper_test.nix @@ -0,0 +1,14 @@ +{ pkgs, config, ... }: { + services.zookeeper."z1".enable = true; + settings.processes.test = + { + command = pkgs.writeShellApplication { + runtimeInputs = [ pkgs.bash config.package ]; + text = '' + bash zkCli.sh -server localhost:2181 get / + ''; + name = "zookeeper-test"; + }; + depends_on."z1".condition = "process_healthy"; + }; +} diff --git a/test/flake.nix b/test/flake.nix index 0023df57..a9ed2e0a 100644 --- a/test/flake.nix +++ b/test/flake.nix @@ -43,6 +43,18 @@ ../nix/elasticsearch_test.nix ]; }; + apache-kafka = { + imports = [ + inputs.services-flake.processComposeModules.default + ../nix/apache-kafka_test.nix + ]; + }; + zookeeper = { + imports = [ + inputs.services-flake.processComposeModules.default + ../nix/zookeeper_test.nix + ]; + }; }; }; };