Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: linter #14

Merged
merged 6 commits into from
Jun 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: Integration Testing
name: Integration Testing with RabbitMQ

on:
push:
Expand Down
55 changes: 32 additions & 23 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,12 @@ import (

"github.com/bxcodec/goqueue"
"github.com/bxcodec/goqueue/consumer"
rmqConsumer "github.com/bxcodec/goqueue/consumer/rabbitmq"
"github.com/bxcodec/goqueue/interfaces"
"github.com/bxcodec/goqueue/middleware"
"github.com/bxcodec/goqueue/options"
consumerOpts "github.com/bxcodec/goqueue/options/consumer"
publisherOpts "github.com/bxcodec/goqueue/options/publisher"
"github.com/bxcodec/goqueue/publisher"
rmqPublisher "github.com/bxcodec/goqueue/publisher/rabbitmq"
)

func initExchange(ch *amqp.Channel, exchangeName string) error {
Expand All @@ -62,9 +64,14 @@ func main() {
panic(err)
}

rmqPub := rmqPublisher.NewPublisher(rmqConn,
publisher.WithPublisherID("publisher_id"),
publisher.WithMiddlewares(
rmqPub := publisher.NewPublisher(
publisherOpts.PublisherPlatformRabbitMQ,
publisherOpts.WithRabbitMQPublisherConfig(&publisherOpts.RabbitMQPublisherConfig{
Conn: rmqConn,
PublisherChannelPoolSize: 5,
}),
publisherOpts.WithPublisherID("publisher_id"),
publisherOpts.WithMiddlewares(
middleware.HelloWorldMiddlewareExecuteBeforePublisher(),
middleware.HelloWorldMiddlewareExecuteAfterPublisher(),
),
Expand All @@ -83,35 +90,36 @@ func main() {
panic(err)
}
defer consumerChannel.Close()

rmqConsumer := rmqConsumer.NewConsumer(
publisherChannel,
consumerChannel,
consumer.WithMiddlewares(
rmqConsumer := consumer.NewConsumer(
consumerOpts.ConsumerPlatformRabbitMQ,
consumerOpts.WithRabbitMQConsumerConfig(&consumerOpts.RabbitMQConsumerConfig{
ConsumerChannel: consumerChannel,
ReQueueChannel: publisherChannel,
}),
consumerOpts.WithConsumerID("consumer_id"),
consumerOpts.WithMiddlewares(
middleware.HelloWorldMiddlewareExecuteAfterInboundMessageHandler(),
middleware.HelloWorldMiddlewareExecuteBeforeInboundMessageHandler(),
),
consumer.WithQueueName("consumer_queue"),
consumer.WithConsumerID("consumer_id"),
consumer.WithBatchMessageSize(1),
consumer.WithMaxRetryFailedMessage(3),
consumer.WithActionsPatternSubscribed("goqueue.payments.#", "goqueue.users.#"),
consumer.WithTopicName("goqueue"),
consumerOpts.WithMaxRetryFailedMessage(3),
consumerOpts.WithBatchMessageSize(1),
consumerOpts.WithActionsPatternSubscribed("goqueue.payments.#", "goqueue.users.#"),
consumerOpts.WithTopicName("goqueue"),
consumerOpts.WithQueueName("consumer_queue"),
)

queueSvc := goqueue.NewQueueService(
goqueue.WithPublisher(rmqPub),
goqueue.WithConsumer(rmqConsumer),
goqueue.WithMessageHandler(handler()),
options.WithConsumer(rmqConsumer),
options.WithPublisher(rmqPub),
options.WithMessageHandler(handler()),
)

go func() {
for i := 0; i < 10; i++ {
data := map[string]interface{}{
"message": fmt.Sprintf("Hello World %d", i),
}
jbyt, _ := json.Marshal(data)
err := queueSvc.Publish(context.Background(), goqueue.Message{
err := queueSvc.Publish(context.Background(), interfaces.Message{
Data: data,
Action: "goqueue.payments.create",
Topic: "goqueue",
Expand All @@ -132,14 +140,15 @@ func main() {
}
}

func handler() goqueue.InboundMessageHandlerFunc {
return func(ctx context.Context, m goqueue.InboundMessage) (err error) {
func handler() interfaces.InboundMessageHandlerFunc {
return func(ctx context.Context, m interfaces.InboundMessage) (err error) {
data := m.Data
jbyt, _ := json.Marshal(data)
fmt.Println("Message Received: ", string(jbyt))
return m.Ack(ctx)
}
}

```

## Contribution
Expand Down
3 changes: 3 additions & 0 deletions consumer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import (
consumerOpts "github.com/bxcodec/goqueue/options/consumer"
)

// NewConsumer creates a new consumer based on the specified platform.
// It accepts a platform option and additional consumer option functions.
// It returns a consumer.Consumer interface implementation.
func NewConsumer(platform options.Platform, opts ...consumerOpts.ConsumerOptionFunc) consumer.Consumer {
switch platform {
case consumerOpts.ConsumerPlatformRabbitMQ:
Expand Down
4 changes: 4 additions & 0 deletions encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,7 @@ var (
}
DefaultEncoding = JSONEncoding
)

func init() {
AddGoQueueEncoding(JSONEncoding.ContentType, JSONEncoding)
}
15 changes: 14 additions & 1 deletion errors/error.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package errors

const (
InvalidMessageFormatCode = "INVALID_MESSAGE_FORMAT"
InvalidMessageFormatCode = "INVALID_MESSAGE_FORMAT"
EncodingFormatNotSupported = "ENCODING_FORMAT_NOT_SUPPORTED"
UnKnownError = "UNKNOWN_ERROR"
)

// Error represents an error with a code and a message.
type Error struct {
Code string `json:"code"`
Message string `json:"message"`
Expand All @@ -14,7 +17,17 @@ func (e Error) Error() string {
}

var (
// ErrInvalidMessageFormat is an error that occurs when attempting to unmarshal a message with an invalid format.
ErrInvalidMessageFormat = Error{
Code: InvalidMessageFormatCode,
Message: "failed to unmarshal the message, removing the message due to wrong message format"}
// ErrEncodingFormatNotSupported is an error that indicates the encoding format is not supported.
ErrEncodingFormatNotSupported = Error{
Code: EncodingFormatNotSupported,
Message: "encoding format not supported. Please register the encoding format before using it"}

// ErrUnknownError is an error that indicates an unknown error occurred.
ErrUnknownError = Error{
Code: UnKnownError,
Message: "an unknown error occurred"}
)
2 changes: 2 additions & 0 deletions internal/consumer/rabbitmq/blackbox_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ func TestSuiteRabbitMQConsumer(t *testing.T) {
t.Skip("Skip the Test Suite for RabbitMQ Consumer")
}

time.Sleep(5 * time.Second) // wait for the rabbitmq to be ready

rmqURL := os.Getenv("RABBITMQ_TEST_URL")
if rmqURL == "" {
rmqURL = "amqp://test:test@localhost:5672/test"
Expand Down
19 changes: 14 additions & 5 deletions internal/consumer/rabbitmq/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,20 @@ func (r *rabbitMQ) initConsumer() {
r.msgReceiver = receiver
}

// Consume consumes messages from a RabbitMQ queue.
// It takes a context, an inbound message handler, and metadata as input parameters.
// The method continuously listens for messages from the queue and handles them using the provided handler.
// If the context is canceled, the method stops consuming messages and returns.
// The method returns an error if there was an issue consuming messages.
// Consume consumes messages from a RabbitMQ queue and handles them using the provided message handler.
// It takes a context, an inbound message handler, and a map of metadata as input parameters.
// The function continuously listens for messages from the queue and processes them until the context is canceled.
// If the context is canceled, the function stops consuming messages and returns.
// For each received message, the function builds an inbound message, extracts the retry count, and checks if the maximum retry count has been reached.
// If the maximum retry count has been reached, the message is moved to the dead letter queue.
// Otherwise, the message is passed to the message handler for processing.
// The message handler is responsible for handling the message and returning an error if any.
// If an error occurs while handling the message, it is logged.
// The function provides methods for acknowledging, rejecting, and moving messages to the dead letter queue.
// These methods can be used by the message handler to control the message processing flow.
// The function also logs information about the received message, such as the message ID, topic, action, and timestamp.
// It applies any configured middlewares to the message handler before calling it.
// The function returns an error if any occurred during message handling or if the context was canceled.
func (r *rabbitMQ) Consume(ctx context.Context,
h interfaces.InboundMessageHandler,
meta map[string]interface{}) (err error) {
Expand Down
2 changes: 2 additions & 0 deletions internal/publisher/rabbitmq/blackbox_publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ func TestSuiteRabbitMQPublisher(t *testing.T) {
t.Skip("Skip the Test Suite for RabbitMQ Publisher")
}

time.Sleep(5 * time.Second) // wait for the rabbitmq to be ready

rmqURL := os.Getenv("RABBITMQ_TEST_URL")
if rmqURL == "" {
rmqURL = "amqp://test:test@localhost:5672/test"
Expand Down
11 changes: 11 additions & 0 deletions internal/publisher/rabbitmq/channel_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,18 @@ import (
"go.uber.org/multierr"
)

// ChannelPool represents a pool of AMQP channels used for publishing messages.
// It provides a way to manage and reuse AMQP channels efficiently.
type ChannelPool struct {
conn *amqp.Connection
mutex sync.Mutex
pool chan *amqp.Channel
maxSize int
}

// NewChannelPool creates a new ChannelPool instance.
// It takes an AMQP connection and the maximum size of the pool as parameters.
// It returns a pointer to the newly created ChannelPool.
func NewChannelPool(conn *amqp.Connection, maxSize int) *ChannelPool {
return &ChannelPool{
conn: conn,
Expand All @@ -23,6 +28,8 @@ func NewChannelPool(conn *amqp.Connection, maxSize int) *ChannelPool {
}
}

// Get returns a channel from the pool. If there are available channels in the pool, it returns one of them.
// Otherwise, it creates a new channel from the underlying connection and returns it.
func (cp *ChannelPool) Get() (*amqp.Channel, error) {
cp.mutex.Lock()
defer cp.mutex.Unlock()
Expand All @@ -35,6 +42,8 @@ func (cp *ChannelPool) Get() (*amqp.Channel, error) {
}
}

// Return returns a channel back to the channel pool.
// If the pool is full, the channel is closed.
func (cp *ChannelPool) Return(ch *amqp.Channel) {
cp.mutex.Lock()
defer cp.mutex.Unlock()
Expand All @@ -51,6 +60,8 @@ func (cp *ChannelPool) Return(ch *amqp.Channel) {
}
}

// Close closes the ChannelPool and all its associated channels.
// It returns an error if there was an error closing any of the channels.
func (cp *ChannelPool) Close() (err error) {
cp.mutex.Lock()
defer cp.mutex.Unlock()
Expand Down
32 changes: 31 additions & 1 deletion internal/publisher/rabbitmq/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

"github.com/bxcodec/goqueue"
"github.com/bxcodec/goqueue/errors"
headerKey "github.com/bxcodec/goqueue/headers/key"
headerVal "github.com/bxcodec/goqueue/headers/value"
"github.com/bxcodec/goqueue/interfaces"
Expand All @@ -25,6 +26,32 @@ type rabbitMQ struct {
option *publisherOpts.PublisherOption
}

// NewPublisher creates a new instance of the publisher.Publisher interface
// using the provided options. It returns a publisher.Publisher implementation
// that utilizes RabbitMQ as the underlying message broker.
//
// The function accepts a variadic parameter `opts` of type
// `publisherOpts.PublisherOptionFunc`, which allows the caller to provide
// custom configuration options for the publisher.
//
// Example usage:
//
// publisher := NewPublisher(
// publisherOpts.PublisherPlatformRabbitMQ,
// publisherOpts.WithRabbitMQPublisherConfig(&publisherOpts.RabbitMQPublisherConfig{
// Conn: rmqConn,
// PublisherChannelPoolSize: 5,
// }),
// publisherOpts.WithPublisherID("publisher_id"),
// publisherOpts.WithMiddlewares(
// middleware.HelloWorldMiddlewareExecuteBeforePublisher(),
// middleware.HelloWorldMiddlewareExecuteAfterPublisher(),
// ),
//
// )
//
// The returned publisher can be used to publish messages to the configured
// RabbitMQ exchange and routing key.
func NewPublisher(
opts ...publisherOpts.PublisherOptionFunc,
) publisher.Publisher {
Expand All @@ -47,6 +74,9 @@ func NewPublisher(
}
}

// Publish sends a message to the RabbitMQ exchange.
// It applies the default content type if not specified in the message.
// It also applies any registered middlewares before publishing the message.
func (r *rabbitMQ) Publish(ctx context.Context, m interfaces.Message) (err error) {
if m.ContentType == "" {
m.ContentType = publisherOpts.DefaultContentType
Expand Down Expand Up @@ -94,7 +124,7 @@ func (r *rabbitMQ) buildPublisher() interfaces.PublisherFunc {
m.ID = id
encoder, ok := goqueue.GetGoQueueEncoding(m.ContentType)
if !ok {
encoder = goqueue.DefaultEncoding
return errors.ErrEncodingFormatNotSupported
}

data, err := encoder.Encode(ctx, m)
Expand Down
61 changes: 61 additions & 0 deletions middleware/default_errormapper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package middleware

import (
"context"

"github.com/bxcodec/goqueue/errors"
"github.com/bxcodec/goqueue/interfaces"
)

// PublisherDefaultErrorMapper returns a middleware function that maps publisher errors to specific error types.
// It takes a next PublisherFunc as input and returns a new PublisherFunc that performs error mapping.
// If an error occurs during publishing, it will be mapped to a specific error type based on the error code.
// The mapped error will be returned, or nil if no error occurred.
func PublisherDefaultErrorMapper() interfaces.PublisherMiddlewareFunc {
return func(next interfaces.PublisherFunc) interfaces.PublisherFunc {
return func(ctx context.Context, e interfaces.Message) (err error) {
err = next(ctx, e)
if err != nil {
switch err {
case errors.ErrInvalidMessageFormat:
return errors.ErrInvalidMessageFormat
case errors.ErrEncodingFormatNotSupported:
return errors.ErrEncodingFormatNotSupported
default:
return errors.Error{
Code: errors.UnKnownError,
Message: err.Error(),
}
}
}
return nil
}
}
}

// InboundMessageHandlerDefaultErrorMapper returns a middleware function that maps specific errors to predefined error types.
// It takes the next inbound message handler function as input and returns a new inbound message handler function.
// The returned function checks if an error occurred during the execution of the next handler function.
// If an error is found, it maps the error to a predefined error type and returns it.
// If no error is found, it returns nil.
func InboundMessageHandlerDefaultErrorMapper() interfaces.InboundMessageHandlerMiddlewareFunc {
return func(next interfaces.InboundMessageHandlerFunc) interfaces.InboundMessageHandlerFunc {
return func(ctx context.Context, m interfaces.InboundMessage) (err error) {
err = next(ctx, m)
if err != nil {
switch err {
case errors.ErrInvalidMessageFormat:
return errors.ErrInvalidMessageFormat
case errors.ErrEncodingFormatNotSupported:
return errors.ErrEncodingFormatNotSupported
default:
return errors.Error{
Code: errors.UnKnownError,
Message: err.Error(),
}
}
}
return nil
}
}
}
Loading
Loading