Skip to content

Commit

Permalink
fix: linter
Browse files Browse the repository at this point in the history
  • Loading branch information
bxcodec committed Jun 16, 2024
1 parent 90f5e49 commit 149df5d
Show file tree
Hide file tree
Showing 14 changed files with 282 additions and 89 deletions.
3 changes: 3 additions & 0 deletions consumer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import (
consumerOpts "github.com/bxcodec/goqueue/options/consumer"
)

// NewConsumer creates a new consumer based on the specified platform.
// It accepts a platform option and additional consumer option functions.
// It returns a consumer.Consumer interface implementation.
func NewConsumer(platform options.Platform, opts ...consumerOpts.ConsumerOptionFunc) consumer.Consumer {
switch platform {
case consumerOpts.ConsumerPlatformRabbitMQ:
Expand Down
15 changes: 14 additions & 1 deletion errors/error.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package errors

const (
InvalidMessageFormatCode = "INVALID_MESSAGE_FORMAT"
InvalidMessageFormatCode = "INVALID_MESSAGE_FORMAT"
EncodingFormatNotSupported = "ENCODING_FORMAT_NOT_SUPPORTED"
UnKnownError = "UNKNOWN_ERROR"
)

// Error represents an error with a code and a message.
type Error struct {
Code string `json:"code"`
Message string `json:"message"`
Expand All @@ -14,7 +17,17 @@ func (e Error) Error() string {
}

var (
// ErrInvalidMessageFormat is an error that occurs when attempting to unmarshal a message with an invalid format.
ErrInvalidMessageFormat = Error{
Code: InvalidMessageFormatCode,
Message: "failed to unmarshal the message, removing the message due to wrong message format"}
// ErrEncodingFormatNotSupported is an error that indicates the encoding format is not supported.
ErrEncodingFormatNotSupported = Error{
Code: EncodingFormatNotSupported,
Message: "encoding format not supported. Please register the encoding format before using it"}

// ErrUnknownError is an error that indicates an unknown error occurred.
ErrUnknownError = Error{
Code: UnKnownError,
Message: "an unknown error occurred"}
)
19 changes: 14 additions & 5 deletions internal/consumer/rabbitmq/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,20 @@ func (r *rabbitMQ) initConsumer() {
r.msgReceiver = receiver
}

// Consume consumes messages from a RabbitMQ queue.
// It takes a context, an inbound message handler, and metadata as input parameters.
// The method continuously listens for messages from the queue and handles them using the provided handler.
// If the context is canceled, the method stops consuming messages and returns.
// The method returns an error if there was an issue consuming messages.
// Consume consumes messages from a RabbitMQ queue and handles them using the provided message handler.
// It takes a context, an inbound message handler, and a map of metadata as input parameters.
// The function continuously listens for messages from the queue and processes them until the context is canceled.
// If the context is canceled, the function stops consuming messages and returns.
// For each received message, the function builds an inbound message, extracts the retry count, and checks if the maximum retry count has been reached.
// If the maximum retry count has been reached, the message is moved to the dead letter queue.
// Otherwise, the message is passed to the message handler for processing.
// The message handler is responsible for handling the message and returning an error if any.
// If an error occurs while handling the message, it is logged.
// The function provides methods for acknowledging, rejecting, and moving messages to the dead letter queue.
// These methods can be used by the message handler to control the message processing flow.
// The function also logs information about the received message, such as the message ID, topic, action, and timestamp.
// It applies any configured middlewares to the message handler before calling it.
// The function returns an error if any occurred during message handling or if the context was canceled.
func (r *rabbitMQ) Consume(ctx context.Context,
h interfaces.InboundMessageHandler,
meta map[string]interface{}) (err error) {
Expand Down
11 changes: 11 additions & 0 deletions internal/publisher/rabbitmq/channel_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,18 @@ import (
"go.uber.org/multierr"
)

// ChannelPool represents a pool of AMQP channels used for publishing messages.
// It provides a way to manage and reuse AMQP channels efficiently.
type ChannelPool struct {
conn *amqp.Connection
mutex sync.Mutex
pool chan *amqp.Channel
maxSize int
}

// NewChannelPool creates a new ChannelPool instance.
// It takes an AMQP connection and the maximum size of the pool as parameters.
// It returns a pointer to the newly created ChannelPool.
func NewChannelPool(conn *amqp.Connection, maxSize int) *ChannelPool {
return &ChannelPool{
conn: conn,
Expand All @@ -23,6 +28,8 @@ func NewChannelPool(conn *amqp.Connection, maxSize int) *ChannelPool {
}
}

// Get returns a channel from the pool. If there are available channels in the pool, it returns one of them.
// Otherwise, it creates a new channel from the underlying connection and returns it.
func (cp *ChannelPool) Get() (*amqp.Channel, error) {
cp.mutex.Lock()
defer cp.mutex.Unlock()
Expand All @@ -35,6 +42,8 @@ func (cp *ChannelPool) Get() (*amqp.Channel, error) {
}
}

// Return returns a channel back to the channel pool.
// If the pool is full, the channel is closed.
func (cp *ChannelPool) Return(ch *amqp.Channel) {
cp.mutex.Lock()
defer cp.mutex.Unlock()
Expand All @@ -51,6 +60,8 @@ func (cp *ChannelPool) Return(ch *amqp.Channel) {
}
}

// Close closes the ChannelPool and all its associated channels.
// It returns an error if there was an error closing any of the channels.
func (cp *ChannelPool) Close() (err error) {
cp.mutex.Lock()
defer cp.mutex.Unlock()
Expand Down
32 changes: 31 additions & 1 deletion internal/publisher/rabbitmq/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

"github.com/bxcodec/goqueue"
"github.com/bxcodec/goqueue/errors"
headerKey "github.com/bxcodec/goqueue/headers/key"
headerVal "github.com/bxcodec/goqueue/headers/value"
"github.com/bxcodec/goqueue/interfaces"
Expand All @@ -25,6 +26,32 @@ type rabbitMQ struct {
option *publisherOpts.PublisherOption
}

// NewPublisher creates a new instance of the publisher.Publisher interface
// using the provided options. It returns a publisher.Publisher implementation
// that utilizes RabbitMQ as the underlying message broker.
//
// The function accepts a variadic parameter `opts` of type
// `publisherOpts.PublisherOptionFunc`, which allows the caller to provide
// custom configuration options for the publisher.
//
// Example usage:
//
// publisher := NewPublisher(
// publisherOpts.PublisherPlatformRabbitMQ,
// publisherOpts.WithRabbitMQPublisherConfig(&publisherOpts.RabbitMQPublisherConfig{
// Conn: rmqConn,
// PublisherChannelPoolSize: 5,
// }),
// publisherOpts.WithPublisherID("publisher_id"),
// publisherOpts.WithMiddlewares(
// middleware.HelloWorldMiddlewareExecuteBeforePublisher(),
// middleware.HelloWorldMiddlewareExecuteAfterPublisher(),
// ),
//
// )
//
// The returned publisher can be used to publish messages to the configured
// RabbitMQ exchange and routing key.
func NewPublisher(
opts ...publisherOpts.PublisherOptionFunc,
) publisher.Publisher {
Expand All @@ -47,6 +74,9 @@ func NewPublisher(
}
}

// Publish sends a message to the RabbitMQ exchange.
// It applies the default content type if not specified in the message.
// It also applies any registered middlewares before publishing the message.
func (r *rabbitMQ) Publish(ctx context.Context, m interfaces.Message) (err error) {
if m.ContentType == "" {
m.ContentType = publisherOpts.DefaultContentType
Expand Down Expand Up @@ -94,7 +124,7 @@ func (r *rabbitMQ) buildPublisher() interfaces.PublisherFunc {
m.ID = id
encoder, ok := goqueue.GetGoQueueEncoding(m.ContentType)
if !ok {
encoder = goqueue.DefaultEncoding
return errors.ErrEncodingFormatNotSupported
}

data, err := encoder.Encode(ctx, m)
Expand Down
61 changes: 61 additions & 0 deletions middleware/default_errormapper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package middleware

import (
"context"

"github.com/bxcodec/goqueue/errors"
"github.com/bxcodec/goqueue/interfaces"
)

// PublisherDefaultErrorMapper returns a middleware function that maps publisher errors to specific error types.
// It takes a next PublisherFunc as input and returns a new PublisherFunc that performs error mapping.
// If an error occurs during publishing, it will be mapped to a specific error type based on the error code.
// The mapped error will be returned, or nil if no error occurred.
func PublisherDefaultErrorMapper() interfaces.PublisherMiddlewareFunc {
return func(next interfaces.PublisherFunc) interfaces.PublisherFunc {
return func(ctx context.Context, e interfaces.Message) (err error) {
err = next(ctx, e)
if err != nil {
switch err {
case errors.ErrInvalidMessageFormat:
return errors.ErrInvalidMessageFormat
case errors.ErrEncodingFormatNotSupported:
return errors.ErrEncodingFormatNotSupported
default:
return errors.Error{
Code: errors.UnKnownError,
Message: err.Error(),
}
}
}
return nil
}
}
}

// InboundMessageHandlerDefaultErrorMapper returns a middleware function that maps specific errors to predefined error types.
// It takes the next inbound message handler function as input and returns a new inbound message handler function.
// The returned function checks if an error occurred during the execution of the next handler function.
// If an error is found, it maps the error to a predefined error type and returns it.
// If no error is found, it returns nil.
func InboundMessageHandlerDefaultErrorMapper() interfaces.InboundMessageHandlerMiddlewareFunc {
return func(next interfaces.InboundMessageHandlerFunc) interfaces.InboundMessageHandlerFunc {
return func(ctx context.Context, m interfaces.InboundMessage) (err error) {
err = next(ctx, m)
if err != nil {
switch err {
case errors.ErrInvalidMessageFormat:
return errors.ErrInvalidMessageFormat
case errors.ErrEncodingFormatNotSupported:
return errors.ErrEncodingFormatNotSupported
default:
return errors.Error{
Code: errors.UnKnownError,
Message: err.Error(),
}
}
}
return nil
}
}
}
65 changes: 65 additions & 0 deletions middleware/example.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package middleware

import (
"context"

"github.com/bxcodec/goqueue/interfaces"
"github.com/sirupsen/logrus"
)

// HelloWorldMiddlewareExecuteAfterInboundMessageHandler returns an inbound message handler middleware function.
// This middleware function executes after the inbound message handler and performs additional tasks.
// It logs any errors that occur during the execution of the next handler and provides an opportunity to handle them.
// You can customize the error handling logic by adding your own error handler, such as sending errors to Sentry or other error tracking tools.
// After error handling, it logs a message indicating that the hello-world-last-middleware has been executed.
// The function signature follows the `interfaces.InboundMessageHandlerMiddlewareFunc` type.
func HelloWorldMiddlewareExecuteAfterInboundMessageHandler() interfaces.InboundMessageHandlerMiddlewareFunc {
return func(next interfaces.InboundMessageHandlerFunc) interfaces.InboundMessageHandlerFunc {
return func(ctx context.Context, m interfaces.InboundMessage) (err error) {
err = next(ctx, m)
if err != nil {
logrus.Error("Error: ", err, "add your custom error handler here, eg send to Sentry or other error tracking tools")
}
logrus.Info("hello-world-last-middleware executed")
return err
}
}
}

// HelloWorldMiddlewareExecuteBeforeInboundMessageHandler is a function that returns an inbound message handler middleware.
// This middleware logs a message and then calls the next inbound message handler in the chain.
func HelloWorldMiddlewareExecuteBeforeInboundMessageHandler() interfaces.InboundMessageHandlerMiddlewareFunc {
return func(next interfaces.InboundMessageHandlerFunc) interfaces.InboundMessageHandlerFunc {
return func(ctx context.Context, m interfaces.InboundMessage) (err error) {
logrus.Info("hello-world-first-middleware executed")
return next(ctx, m)
}
}
}

// HelloWorldMiddlewareExecuteAfterPublisher is a function that returns a PublisherMiddlewareFunc.
// It wraps the provided PublisherFunc with additional functionality to be executed after publishing a message.
func HelloWorldMiddlewareExecuteAfterPublisher() interfaces.PublisherMiddlewareFunc {
return func(next interfaces.PublisherFunc) interfaces.PublisherFunc {
return func(ctx context.Context, m interfaces.Message) (err error) {
err = next(ctx, m)
if err != nil {
logrus.Error("got error while publishing the message: ", err)
return err
}
logrus.Info("hello-world-last-middleware executed")
return nil
}
}
}

// HelloWorldMiddlewareExecuteBeforePublisher is a function that returns a PublisherMiddlewareFunc.
// It wraps the provided PublisherFunc with a middleware that logs a message before executing the next middleware or the actual publisher function.
func HelloWorldMiddlewareExecuteBeforePublisher() interfaces.PublisherMiddlewareFunc {
return func(next interfaces.PublisherFunc) interfaces.PublisherFunc {
return func(ctx context.Context, e interfaces.Message) (err error) {
logrus.Info("hello-world-first-middleware executed")
return next(ctx, e)
}
}
}
76 changes: 11 additions & 65 deletions middleware/middleware.go
Original file line number Diff line number Diff line change
@@ -1,83 +1,29 @@
package middleware

import (
"context"

"github.com/bxcodec/goqueue/interfaces"
"github.com/sirupsen/logrus"
)

// ApplyHandlerMiddleware applies a series of middleware functions to an inbound message handler function.
// It takes an inbound message handler function `h` and a variadic list of middleware functions `middleware`.
// Each middleware function is applied to the handler function in the order they are provided.
// The resulting handler function with all the middleware applied is returned.
func ApplyHandlerMiddleware(h interfaces.InboundMessageHandlerFunc, middlewares ...interfaces.InboundMessageHandlerMiddlewareFunc) interfaces.InboundMessageHandlerFunc {
// ApplyHandlerMiddleware applies a series of middlewares to an inbound message handler function.
// It takes an inbound message handler function and a variadic list of middlewares as input.
// Each middleware is applied to the handler function in the order they are provided.
// The resulting function is returned as the final handler function with all the middlewares applied.
func ApplyHandlerMiddleware(h interfaces.InboundMessageHandlerFunc,
middlewares ...interfaces.InboundMessageHandlerMiddlewareFunc) interfaces.InboundMessageHandlerFunc {
for _, middleware := range middlewares {
h = middleware(h)
}
return h
}

// ApplyPublisherMiddleware applies the given publisher middleware functions to the provided publisher function.
// It iterates over the middleware functions and applies them in the order they are provided.
// ApplyPublisherMiddleware applies a series of middlewares to a publisher function.
// It takes a publisher function and a variadic list of publisher middleware functions as input.
// Each middleware function is applied to the publisher function in the order they are provided.
// The resulting publisher function is returned.
func ApplyPublisherMiddleware(p interfaces.PublisherFunc, middlewares ...interfaces.PublisherMiddlewareFunc) interfaces.PublisherFunc {
func ApplyPublisherMiddleware(p interfaces.PublisherFunc,
middlewares ...interfaces.PublisherMiddlewareFunc) interfaces.PublisherFunc {
for _, middleware := range middlewares {
p = middleware(p)
}
return p
}

// HelloWorldMiddlewareExecuteAfterInboundMessageHandler returns an inbound message handler middleware function.
// It wraps the provided `next` inbound message handler function and executes some additional logic after it.
// The additional logic includes logging any error that occurred during the execution of the `next` function
// and logging a message indicating that the middleware has been executed.
func HelloWorldMiddlewareExecuteAfterInboundMessageHandler() interfaces.InboundMessageHandlerMiddlewareFunc {
return func(next interfaces.InboundMessageHandlerFunc) interfaces.InboundMessageHandlerFunc {
return func(ctx context.Context, m interfaces.InboundMessage) (err error) {
err = next(ctx, m)
if err != nil {
logrus.Error("Error: ", err, "add your custom error handler here, eg send to Sentry or other error tracking tools")
}
logrus.Info("hello-world-last-middleware executed")
return err
}
}
}

// HelloWorldMiddlewareExecuteBeforeInboundMessageHandler returns a middleware function that logs a message before executing the handler.
func HelloWorldMiddlewareExecuteBeforeInboundMessageHandler() interfaces.InboundMessageHandlerMiddlewareFunc {
return func(next interfaces.InboundMessageHandlerFunc) interfaces.InboundMessageHandlerFunc {
return func(ctx context.Context, m interfaces.InboundMessage) (err error) {
logrus.Info("hello-world-first-middleware executed")
return next(ctx, m)
}
}
}

// HelloWorldMiddlewareExecuteAfterPublisher returns a PublisherMiddlewareFunc that executes after the publisher function.
// It logs any error that occurs during publishing and logs a message indicating that the last middleware has been executed.
func HelloWorldMiddlewareExecuteAfterPublisher() interfaces.PublisherMiddlewareFunc {
return func(next interfaces.PublisherFunc) interfaces.PublisherFunc {
return func(ctx context.Context, m interfaces.Message) (err error) {
err = next(ctx, m)
if err != nil {
logrus.Error("got error while publishing the message: ", err)
return err
}
logrus.Info("hello-world-last-middleware executed")
return nil
}
}
}

// HelloWorldMiddlewareExecuteBeforePublisher is a function that returns a PublisherMiddlewareFunc.
// It wraps the provided PublisherFunc with a middleware that logs a message before executing the next function.
func HelloWorldMiddlewareExecuteBeforePublisher() interfaces.PublisherMiddlewareFunc {
return func(next interfaces.PublisherFunc) interfaces.PublisherFunc {
return func(ctx context.Context, e interfaces.Message) (err error) {
logrus.Info("hello-world-first-middleware executed")
return next(ctx, e)
}
}
}
Loading

0 comments on commit 149df5d

Please sign in to comment.