Skip to content

Commit

Permalink
fix: user-provided kafka properties were not taken into account when …
Browse files Browse the repository at this point in the history
…creating the kafka client config
  • Loading branch information
Mcdostone committed Jan 16, 2025
1 parent dff41e2 commit 526bb22
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 11 deletions.
2 changes: 1 addition & 1 deletion crates/command/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ where
None => self.kafka_client_config()?,
Some(c) => c,
};
let command = self.default_command.clone().with_client(config_client);
let command = self.default_command.clone().with_client(config_client)?;
command.execute().await
}
}
Expand Down
39 changes: 30 additions & 9 deletions crates/command/src/command/main_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ where
)]
pub topics: Vec<String>,
/// Override kafka consumer properties, see <https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html>
#[clap(short, long, use_value_delimiter = true, value_delimiter = ',')]
#[clap(short, long)]
pub properties: Vec<String>,
#[clap(long)]
/// Disable the TUI, print results in stdout instead.
Expand Down Expand Up @@ -111,8 +111,14 @@ where
<T as FromStr>::Err: Display,
{
/// Create a new `MainCommandWithClient` with a `ClientConfig`.
pub fn with_client(self, client_config: ClientConfig) -> MainCommandWithClient<T> {
MainCommandWithClient::new(self, client_config)
pub fn with_client(
self,
client_config: ClientConfig,
) -> Result<MainCommandWithClient<T>, Error> {
let kafka_properties = client_config.config_map().clone();
let kafka_properties = self.override_kafka_config_properties(kafka_properties)?;
let client_config = Self::kafka_client_config_from_properties(kafka_properties)?;
Ok(MainCommandWithClient::new(self, client_config))
}

/// Returns the search query to use.
Expand Down Expand Up @@ -318,21 +324,29 @@ where
}
}

/// Returns the kafka client config from the configuration file
pub(crate) fn kafka_client_config(&self) -> Result<ClientConfig, Error> {
let config = self.config()?;
let mut kafka_properties = config.kafka_config_of(&self.cluster().to_string())?;

/// Overrides the kafka properties with the properties provided by the user
fn override_kafka_config_properties(
&self,
mut config: HashMap<String, String>,
) -> Result<HashMap<String, String>, Error> {
for property in &self.properties {
match property.split_once('=') {
Some((key, value)) => {
kafka_properties.insert(key.trim().into(), value.into());
config.insert(key.trim().into(), value.into());
}
None => {
return Err(Error::Error(format!("Invalid kafka property '{}', expected a '=' symbol to separate the property and its value.", property)));
}
}
}
Ok(config)
}

/// Returns the kafka client config from the configuration file
pub(crate) fn kafka_client_config(&self) -> Result<ClientConfig, Error> {
let config = self.config()?;
let mut kafka_properties = config.kafka_config_of(&self.cluster().to_string())?;
kafka_properties = self.override_kafka_config_properties(kafka_properties)?;

// Default properties
for (key, value) in [
Expand All @@ -344,6 +358,13 @@ where
}
}

Self::kafka_client_config_from_properties(kafka_properties)
}

/// Returns the kafka client config from kafka properties
pub fn kafka_client_config_from_properties(
kafka_properties: HashMap<String, String>,
) -> Result<ClientConfig, Error> {
let mut config = ClientConfig::new();
config.set_log_level(rdkafka::config::RDKafkaLogLevel::Emerg);
debug!(
Expand Down
2 changes: 1 addition & 1 deletion docs/try-it.sh
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ docker compose -f "${repo}/compose.yml" exec -T kafka \
/usr/bin/kafka-topics \
--create --if-not-exists \
--bootstrap-server localhost:9092 \
--partitions 1 \
--partitions 3 \
--topic "${topic}"

if jbang --version &> /dev/null; then
Expand Down

0 comments on commit 526bb22

Please sign in to comment.