Skip to content

Commit

Permalink
Add consume forever
Browse files Browse the repository at this point in the history
  • Loading branch information
benoit-touron committed Nov 9, 2021
1 parent 1d68804 commit cdaae83
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 0 deletions.
30 changes: 30 additions & 0 deletions rocktest/rocktest/scenario/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,36 @@ func (module *Module) Kafka_check(params map[string]interface{}, scenario *Scena

}

func (module *Module) Kafka_consumeForever(params map[string]interface{}, scenario *Scenario) error {

paramsEx, err := scenario.ExpandMap(params)
if err != nil {
return err
}

name, _ := scenario.GetString(paramsEx, "name", "default")
data := scenario.GetStore("kafka." + name).(kafkaData)

topic, err := scenario.GetString(paramsEx, "topic", nil)
if err != nil {
return err
}

data.consumer.SubscribeTopics([]string{topic}, nil)

for {
msg, err := data.consumer.ReadMessage(-1)
if err == nil {
log.Infof("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
} else {
// If timeout : nothing to read more yet
log.Errorf("Consumer error: %v (%v)\n", err, msg)
return err
}
}

}

func (module *Module) Kafka_produce(params map[string]interface{}, scenario *Scenario) error {

paramsEx, err := scenario.ExpandMap(params)
Expand Down
11 changes: 11 additions & 0 deletions rocktest/rocktest/test/yaml/scen/kafka/kafka_consume.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
- display: Kafka Test

- kafka.connect:
params:
server: "myrancher:30190"
group: group # optional
offset: earliest # optional

- kafka.consumeForever:
params:
topic: rockers

0 comments on commit cdaae83

Please sign in to comment.