Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

typed error to handle the retry / skip of events #41

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 42 additions & 10 deletions listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,34 @@ import (
"github.com/opentracing/opentracing-go"
)

var (
ErrEventUnretriable = errors.New("the event will not be retried")
ErrEventOmitted = errors.New("the event will be omitted")
)
// EventProcessingError is to be used to indicate that an error occurred during the processing of an event.
// It can be used to indicate that the event should not be retried, or that it should be omitted, in replacement
// of the deprecated errors ErrEventUnretriable and ErrEventOmitted.
type EventProcessingError struct {
msg string
isUnretriable bool
isToBeSkipped bool
}

func (e *EventProcessingError) Error() string {
return e.msg
}

func (e *EventProcessingError) IsUnretriable() bool {
return e.isUnretriable
}

func (e *EventProcessingError) IsToBeSkipped() bool {
return e.isToBeSkipped
}

func NewEventProcessingError(error error, isUnretriable bool, isToBeSkipped bool) *EventProcessingError {
return &EventProcessingError{
msg: error.Error(),
isUnretriable: isUnretriable,
isToBeSkipped: isToBeSkipped,
}
}

type HandlerConfig struct {
ConsumerMaxRetries *int
Expand Down Expand Up @@ -247,9 +271,11 @@ func (l *listener) onNewMessage(msg *sarama.ConsumerMessage, session sarama.Cons
}

func (l *listener) handleErrorMessage(initialError error, handler Handler, msg *sarama.ConsumerMessage) {
if errors.Is(initialError, ErrEventOmitted) {
l.handleOmittedMessage(initialError, msg)
return
if err, ok := initialError.(*EventProcessingError); ok {
if err.IsToBeSkipped() {
l.handleOmittedMessage(initialError, msg)
return
}
}

// Log
Expand Down Expand Up @@ -323,7 +349,14 @@ func forwardToTopic(l *listener, msg *sarama.ConsumerMessage, topicName string)
}

func isRetriableError(initialError error) bool {
return !errors.Is(initialError, ErrEventUnretriable) && !errors.Is(initialError, ErrEventOmitted)
if eperr, ok := initialError.(*EventProcessingError); ok {
if eperr.IsUnretriable() {
return false
} else if eperr.IsToBeSkipped() {
return false
}
}
return true
}

func (l *listener) handleOmittedMessage(initialError error, msg *sarama.ConsumerMessage) {
Expand Down Expand Up @@ -362,9 +395,8 @@ func shouldRetry(retries int, err error) bool {
return false
}

if errors.Is(err, ErrEventUnretriable) || errors.Is(err, ErrEventOmitted) {
if !isRetriableError(err) {
return false
}

return true
}
8 changes: 4 additions & 4 deletions listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ func Test_ConsumeClaim_Message_Error_WithHandlerSpecificRetryTopic(t *testing.T)

func Test_handleErrorMessage_OmittedError(t *testing.T) {

omittedError := errors.New("This error should be omitted")
omittedError := errors.New("this error should be omitted")

l := listener{}

Expand All @@ -384,7 +384,7 @@ func Test_handleErrorMessage_OmittedError(t *testing.T) {
}).Once()
ErrorLogger = mockLogger

l.handleErrorMessage(fmt.Errorf("%w: %w", omittedError, ErrEventOmitted), Handler{}, nil)
l.handleErrorMessage(NewEventProcessingError(fmt.Errorf("failed in context blablah. %w", omittedError), false, true), Handler{}, nil)

assert.True(t, errorLogged)
}
Expand All @@ -397,7 +397,7 @@ func Test_handleMessageWithRetry(t *testing.T) {
handlerCalled := 0
handlerProcessor := func(ctx context.Context, msg *sarama.ConsumerMessage) error {
handlerCalled++
return err
return NewEventProcessingError(fmt.Errorf("failed in context blablah. %w", err), false, false)
}
handler := Handler{
Processor: handlerProcessor,
Expand All @@ -415,7 +415,7 @@ func Test_handleMessageWithRetry_UnretriableError(t *testing.T) {
handlerCalled := 0
handlerProcessor := func(ctx context.Context, msg *sarama.ConsumerMessage) error {
handlerCalled++
return fmt.Errorf("%w: %w", err, ErrEventUnretriable)
return NewEventProcessingError(fmt.Errorf("failed in context blablah. %w", err), true, false)
}
handler := Handler{
Processor: handlerProcessor,
Expand Down
Loading