From 96b3ccb601257937c873e45c5549095dde5c9349 Mon Sep 17 00:00:00 2001 From: Recep Alaca Date: Thu, 23 Sep 2021 19:26:52 +0300 Subject: [PATCH] update for project structure --- constants/constants.go | 3 +++ consumer.go => consumer/consumer.go | 14 +++++++------- consumer_test.go => consumer/consumer_test.go | 19 +++++++++++-------- model/message.go | 3 +++ producer.go => producer/producer.go | 7 ++++--- producer_test.go => producer/producer_test.go | 19 +++++++++++-------- 6 files changed, 39 insertions(+), 26 deletions(-) create mode 100644 constants/constants.go rename consumer.go => consumer/consumer.go (77%) rename consumer_test.go => consumer/consumer_test.go (58%) create mode 100644 model/message.go rename producer.go => producer/producer.go (69%) rename producer_test.go => producer/producer_test.go (58%) diff --git a/constants/constants.go b/constants/constants.go new file mode 100644 index 0000000..64442a7 --- /dev/null +++ b/constants/constants.go @@ -0,0 +1,3 @@ +package constants + +const GroupProtocol = "roundrobin" diff --git a/consumer.go b/consumer/consumer.go similarity index 77% rename from consumer.go rename to consumer/consumer.go index 8c83913..6b24148 100644 --- a/consumer.go +++ b/consumer/consumer.go @@ -1,26 +1,26 @@ -package kafka +package consumer import ( "context" "os" "time" + "github.com/teamseodo/kafka-do/constants" + "github.com/teamseodo/kafka-do/model" "github.com/twmb/franz-go/pkg/kgo" ) -type Message []byte - type Consumer struct { client *kgo.Client } -func NewConsumer(groupName string, topics []string, brokers []string, logger bool) (*Consumer, error) { +func New(groupName string, topics []string, brokers []string, logger bool) (*Consumer, error) { opts := []kgo.Opt{ kgo.SeedBrokers(brokers...), kgo.ConsumerGroup(groupName), kgo.ConsumeTopics(topics...), kgo.DisableAutoCommit(), - kgo.GroupProtocol("roundrobin"), + kgo.GroupProtocol(constants.GroupProtocol), } if logger { @@ -37,8 +37,8 @@ func NewConsumer(groupName string, topics []string, brokers []string, logger boo }, nil } -func (c *Consumer) ConsumeBatch(ctx context.Context, batchSize int) []Message { - var messages []Message +func (c *Consumer) ConsumeBatch(ctx context.Context, batchSize int) []model.Message { + var messages []model.Message for batchSize > 0 { timeout, cancel := context.WithTimeout(ctx, time.Minute*1) diff --git a/consumer_test.go b/consumer/consumer_test.go similarity index 58% rename from consumer_test.go rename to consumer/consumer_test.go index 72fa5f8..70248ae 100644 --- a/consumer_test.go +++ b/consumer/consumer_test.go @@ -1,39 +1,42 @@ -package kafka +package consumer import ( "context" "testing" + + "github.com/teamseodo/kafka-do/model" + "github.com/teamseodo/kafka-do/producer" ) func TestConsumeBatch(t *testing.T) { tests := []struct { name string - messages []Message + messages []model.Message }{ { name: "should-got-message", - messages: []Message{ - Message("message 1"), + messages: []model.Message{ + model.Message("message 1"), }, }, { name: "should-got-all-messages", - messages: []Message{ - Message("message 1"), Message("message 2"), Message("message 3"), Message("message 4"), Message("message 5"), + messages: []model.Message{ + model.Message("message 1"), model.Message("message 2"), model.Message("message 3"), model.Message("message 4"), model.Message("message 5"), }, }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { - producer, err := NewProducer("127.0.0.1:9092") + producer, err := producer.New("127.0.0.1:9092") if err != nil { t.Fatal(err) } defer producer.Close() producer.Produce(context.Background(), test.messages, "kafka_do_test") - consumer, err := NewConsumer("kafka_do", []string{"kafka_do_test"}, []string{"127.0.0.1:9092"}, false) + consumer, err := New("kafka_do", []string{"kafka_do_test"}, []string{"127.0.0.1:9092"}, false) if err != nil { t.Fatal(err) } diff --git a/model/message.go b/model/message.go new file mode 100644 index 0000000..3640ad3 --- /dev/null +++ b/model/message.go @@ -0,0 +1,3 @@ +package model + +type Message []byte diff --git a/producer.go b/producer/producer.go similarity index 69% rename from producer.go rename to producer/producer.go index a3ac461..104a6b9 100644 --- a/producer.go +++ b/producer/producer.go @@ -1,8 +1,9 @@ -package kafka +package producer import ( "context" + "github.com/teamseodo/kafka-do/model" "github.com/twmb/franz-go/pkg/kgo" ) @@ -10,7 +11,7 @@ type Producer struct { client *kgo.Client } -func NewProducer(brokers ...string) (*Producer, error) { +func New(brokers ...string) (*Producer, error) { cl, err := kgo.NewClient( kgo.SeedBrokers(brokers...), ) @@ -23,7 +24,7 @@ func NewProducer(brokers ...string) (*Producer, error) { }, nil } -func (p *Producer) Produce(ctx context.Context, messages []Message, topic string) kgo.ProduceResults { +func (p *Producer) Produce(ctx context.Context, messages []model.Message, topic string) kgo.ProduceResults { var records []*kgo.Record for _, message := range messages { diff --git a/producer_test.go b/producer/producer_test.go similarity index 58% rename from producer_test.go rename to producer/producer_test.go index 725fefc..1df001e 100644 --- a/producer_test.go +++ b/producer/producer_test.go @@ -1,32 +1,35 @@ -package kafka +package producer import ( "context" "testing" + + "github.com/teamseodo/kafka-do/consumer" + "github.com/teamseodo/kafka-do/model" ) func TestProduce(t *testing.T) { tests := []struct { name string - messages []Message + messages []model.Message }{ { name: "should-got-message", - messages: []Message{ - Message("message 1"), + messages: []model.Message{ + model.Message("message 1"), }, }, { name: "should-got-all-messages", - messages: []Message{ - Message("message 1"), Message("message 2"), Message("message 3"), Message("message 4"), Message("message 5"), + messages: []model.Message{ + model.Message("message 1"), model.Message("message 2"), model.Message("message 3"), model.Message("message 4"), model.Message("message 5"), }, }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { - producer, err := NewProducer("127.0.0.1:9092") + producer, err := New("127.0.0.1:9092") if err != nil { t.Fatal(err) } @@ -34,7 +37,7 @@ func TestProduce(t *testing.T) { producer.Produce(context.Background(), test.messages, "kafka_do_test") - consumer, err := NewConsumer("kafka_do", []string{"kafka_do_test"}, []string{"127.0.0.1:9092"}, false) + consumer, err := consumer.New("kafka_do", []string{"kafka_do_test"}, []string{"127.0.0.1:9092"}, false) if err != nil { t.Fatal(err) }