From 526bb226ee599943658b888ac3fab77f30f37b53 Mon Sep 17 00:00:00 2001 From: Yann Prono Date: Thu, 16 Jan 2025 23:12:37 +0100 Subject: [PATCH] fix: user-provided kafka properties were not taken into account when creating the kafka client config --- crates/command/src/cli.rs | 2 +- crates/command/src/command/main_command.rs | 39 +++++++++++++++++----- docs/try-it.sh | 2 +- 3 files changed, 32 insertions(+), 11 deletions(-) diff --git a/crates/command/src/cli.rs b/crates/command/src/cli.rs index 16221f4..3fd09f5 100644 --- a/crates/command/src/cli.rs +++ b/crates/command/src/cli.rs @@ -69,7 +69,7 @@ where None => self.kafka_client_config()?, Some(c) => c, }; - let command = self.default_command.clone().with_client(config_client); + let command = self.default_command.clone().with_client(config_client)?; command.execute().await } } diff --git a/crates/command/src/command/main_command.rs b/crates/command/src/command/main_command.rs index 70d75ad..ecbcc31 100644 --- a/crates/command/src/command/main_command.rs +++ b/crates/command/src/command/main_command.rs @@ -67,7 +67,7 @@ where )] pub topics: Vec, /// Override kafka consumer properties, see - #[clap(short, long, use_value_delimiter = true, value_delimiter = ',')] + #[clap(short, long)] pub properties: Vec, #[clap(long)] /// Disable the TUI, print results in stdout instead. @@ -111,8 +111,14 @@ where ::Err: Display, { /// Create a new `MainCommandWithClient` with a `ClientConfig`. - pub fn with_client(self, client_config: ClientConfig) -> MainCommandWithClient { - MainCommandWithClient::new(self, client_config) + pub fn with_client( + self, + client_config: ClientConfig, + ) -> Result, Error> { + let kafka_properties = client_config.config_map().clone(); + let kafka_properties = self.override_kafka_config_properties(kafka_properties)?; + let client_config = Self::kafka_client_config_from_properties(kafka_properties)?; + Ok(MainCommandWithClient::new(self, client_config)) } /// Returns the search query to use. @@ -318,21 +324,29 @@ where } } - /// Returns the kafka client config from the configuration file - pub(crate) fn kafka_client_config(&self) -> Result { - let config = self.config()?; - let mut kafka_properties = config.kafka_config_of(&self.cluster().to_string())?; - + /// Overrides the kafka properties with the properties provided by the user + fn override_kafka_config_properties( + &self, + mut config: HashMap, + ) -> Result, Error> { for property in &self.properties { match property.split_once('=') { Some((key, value)) => { - kafka_properties.insert(key.trim().into(), value.into()); + config.insert(key.trim().into(), value.into()); } None => { return Err(Error::Error(format!("Invalid kafka property '{}', expected a '=' symbol to separate the property and its value.", property))); } } } + Ok(config) + } + + /// Returns the kafka client config from the configuration file + pub(crate) fn kafka_client_config(&self) -> Result { + let config = self.config()?; + let mut kafka_properties = config.kafka_config_of(&self.cluster().to_string())?; + kafka_properties = self.override_kafka_config_properties(kafka_properties)?; // Default properties for (key, value) in [ @@ -344,6 +358,13 @@ where } } + Self::kafka_client_config_from_properties(kafka_properties) + } + + /// Returns the kafka client config from kafka properties + pub fn kafka_client_config_from_properties( + kafka_properties: HashMap, + ) -> Result { let mut config = ClientConfig::new(); config.set_log_level(rdkafka::config::RDKafkaLogLevel::Emerg); debug!( diff --git a/docs/try-it.sh b/docs/try-it.sh index fba1d8a..5efba38 100644 --- a/docs/try-it.sh +++ b/docs/try-it.sh @@ -89,7 +89,7 @@ docker compose -f "${repo}/compose.yml" exec -T kafka \ /usr/bin/kafka-topics \ --create --if-not-exists \ --bootstrap-server localhost:9092 \ - --partitions 1 \ + --partitions 3 \ --topic "${topic}" if jbang --version &> /dev/null; then