diff --git a/pkg/mq/msgstream/msgstream_util.go b/pkg/mq/msgstream/msgstream_util.go index 7544c4d129e77..fc3fe84babc21 100644 --- a/pkg/mq/msgstream/msgstream_util.go +++ b/pkg/mq/msgstream/msgstream_util.go @@ -109,6 +109,11 @@ func PulsarHealthCheck(clusterStatus *pcommon.MQClusterStatus) { // KafkaHealthCheck Perform a health check by retrieving cluster metadata func KafkaHealthCheck(clusterStatus *pcommon.MQClusterStatus) { config := kafkamqwrapper.GetBasicConfig(¶mtable.Get().KafkaCfg) + // Set extra config for producer + pConfig := (¶mtable.Get().KafkaCfg).ProducerExtraConfig.GetValue() + for k, v := range pConfig { + config.SetKey(k, v) + } producer, err := kafka.NewProducer(&config) if err != nil { clusterStatus.Reason = fmt.Sprintf("failed to create Kafka producer: %v", err)