Skip to content

Commit

Permalink
Add configurable retries for target writes
Browse files Browse the repository at this point in the history
Currently retrying [configuration](https://github.com/snowplow/snowbridge/blob/c8c94f8a13367260f9a1a280cdd5d86098f66521/cmd/cli/cli.go#L213) is hardcoded to 5 attempts with exponential delay (inital 1 second) for each type of an error.

This commit brings a couple of improvements to retrying:

* Make retrying settings configurable from HCL. Max attempts and delay can now be customized.
* Split retrying strategies into 2 categories: transient and setup. Target can now signal what kind of an error has happened and what kind of retrying strategy should be applied.
* HTTP target can now return setup errors by the new response rules configuration. These rules also allows the target to match invalid data. Previously it was either failure (retried) or oversized data.
* Eventually setup errors will also trigger another actions like toggling application health status and sending monitoring alerts. For now behaviour for setup errors is basically the same as for transient errors, but extending behaviour for setup errors should be relatively easy with this structure.
  • Loading branch information
pondzix committed Aug 9, 2024
1 parent 80eacaa commit 32006eb
Show file tree
Hide file tree
Showing 16 changed files with 836 additions and 65 deletions.
11 changes: 11 additions & 0 deletions assets/docs/configuration/overview-full-example.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,17 @@ stats_receiver {
// log level configuration (default: "info")
log_level = "info"

// Specifies how failed writes to the target should be retried, depending on an error type
retry {
transient {
delay_sec = 1
max_attempts = 5
}
setup {
delay_sec = 20
}
}

license {
accept = true
}
9 changes: 9 additions & 0 deletions assets/docs/configuration/retry-example.hcl
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
retry {
transient {
delay_sec = 5
max_attempts = 10
}
setup {
delay_sec = 30
}
}
22 changes: 22 additions & 0 deletions assets/docs/configuration/targets/http-full-example.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -62,5 +62,27 @@ target {

# Optional path to the file containing template which is used to build HTTP request based on a batch of input data
template_file = "myTemplate.file"

# 2 invalid + 1 setup error rules
response_rules {
# This one is a match when...
invalid {
# ...HTTP statuses match...
http_codes = [400]
# AND this string exists in a response body
body = "Invalid value for 'purchase' field"
}
# If no match yet, we can check the next one...
invalid {
# again 400 status...
http_codes = [400]
# BUT we expect different error message in the response body
body = "Invalid value for 'attributes' field"
}
# Same for 'setup' rules..
setup {
http_codes = [401, 403]
}
}
}
}
127 changes: 90 additions & 37 deletions cmd/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ import (

"github.com/getsentry/sentry-go"
log "github.com/sirupsen/logrus"
retry "github.com/snowplow-devops/go-retry"
"github.com/urfave/cli"

"net/http"
// pprof imported for the side effect of registering its HTTP handlers
_ "net/http/pprof"

retry "github.com/avast/retry-go/v4"
"github.com/snowplow/snowbridge/cmd"
"github.com/snowplow/snowbridge/config"
"github.com/snowplow/snowbridge/pkg/failure/failureiface"
Expand Down Expand Up @@ -171,7 +171,7 @@ func RunApp(cfg *config.Config, supportedSources []config.ConfigurationPair, sup

// Callback functions for the source to leverage when writing data
sf := sourceiface.SourceFunctions{
WriteToTarget: sourceWriteFunc(t, ft, tr, o),
WriteToTarget: sourceWriteFunc(t, ft, tr, o, cfg),
}

// Read is a long running process and will only return when the source
Expand All @@ -195,7 +195,7 @@ func RunApp(cfg *config.Config, supportedSources []config.ConfigurationPair, sup
// 4. Observing these results
//
// All with retry logic baked in to remove any of this handling from the implementations
func sourceWriteFunc(t targetiface.Target, ft failureiface.Failure, tr transform.TransformationApplyFunction, o *observer.Observer) func(messages []*models.Message) error {
func sourceWriteFunc(t targetiface.Target, ft failureiface.Failure, tr transform.TransformationApplyFunction, o *observer.Observer, cfg *config.Config) func(messages []*models.Message) error {
return func(messages []*models.Message) error {

// Apply transformations
Expand All @@ -215,65 +215,118 @@ func sourceWriteFunc(t targetiface.Target, ft failureiface.Failure, tr transform

// Send message buffer
messagesToSend := transformed.Result
invalid := transformed.Invalid
var oversized []*models.Message

res, err := retry.ExponentialWithInterface(5, time.Second, "target.Write", func() (interface{}, error) {
res, err := t.Write(messagesToSend)
write := func() error {
result, err := t.Write(messagesToSend)

o.TargetWrite(result)
messagesToSend = result.Failed
oversized = append(oversized, result.Oversized...)
invalid = append(invalid, result.Invalid...)
return err
}

err := handleWrite(cfg, write)

o.TargetWrite(res)
messagesToSend = res.Failed
return res, err
})
if err != nil {
return err
}
resCast := res.(*models.TargetWriteResult)

// Send oversized message buffer
messagesToSend = resCast.Oversized
if len(messagesToSend) > 0 {
err2 := retry.Exponential(5, time.Second, "failureTarget.WriteOversized", func() error {
res, err := ft.WriteOversized(t.MaximumAllowedMessageSizeBytes(), messagesToSend)
if err != nil {
return err
}
if len(res.Oversized) != 0 || len(res.Invalid) != 0 {
if len(oversized) > 0 {
messagesToSend = oversized
writeOversized := func() error {
result, err := ft.WriteOversized(t.MaximumAllowedMessageSizeBytes(), messagesToSend)
if len(result.Oversized) != 0 || len(result.Invalid) != 0 {
log.Fatal("Oversized message transformation resulted in new oversized / invalid messages")
}

o.TargetWriteOversized(res)
messagesToSend = res.Failed
o.TargetWriteOversized(result)
messagesToSend = result.Failed
return err
}

err := handleWrite(cfg, writeOversized)

if err != nil {
return err
})
if err2 != nil {
return err2
}
}

// Send invalid message buffer
messagesToSend = append(resCast.Invalid, transformed.Invalid...)
if len(messagesToSend) > 0 {
err3 := retry.Exponential(5, time.Second, "failureTarget.WriteInvalid", func() error {
res, err := ft.WriteInvalid(messagesToSend)
if err != nil {
return err
}
if len(res.Oversized) != 0 || len(res.Invalid) != 0 {
if len(invalid) > 0 {
messagesToSend = invalid
writeInvalid := func() error {
result, err := ft.WriteInvalid(messagesToSend)
if len(result.Oversized) != 0 || len(result.Invalid) != 0 {
log.Fatal("Invalid message transformation resulted in new invalid / oversized messages")
}

o.TargetWriteInvalid(res)
messagesToSend = res.Failed
o.TargetWriteInvalid(result)
messagesToSend = result.Failed
return err
})
if err3 != nil {
return err3
}
}

err := handleWrite(cfg, writeInvalid)

if err != nil {
return err
}
}
return nil
}
}

// Wrap each target write operation with 2 kinds of retries:
// - setup errors: long delay, unlimited attempts, unhealthy state + alerts
// - transient errors: short delay, limited attempts
// If it's setup/transient error is decided based on a response returned by the target.
func handleWrite(cfg *config.Config, write func() error) error {
retryOnlySetupErrors := retry.RetryIf(func(err error) bool {
_, isSetup := err.(models.SetupWriteError)
return isSetup
})

onSetupError := retry.OnRetry(func(attempt uint, err error) {
log.Infof("Retrying setup target write - attempt: %d, error: %s\n", attempt, err)
// Here we can set unhealthy status + send monitoring alerts in the future. Nothing happens here now.
})

//First try to handle error as setup...
err := retry.Do(
write,
retryOnlySetupErrors,
onSetupError,
retry.Delay(time.Duration(cfg.Data.Retry.Setup.Delay)*time.Second),
// for now let's limit attempts to 5 for setup errors, because we don't have health check which would allow app to be killed externally. Unlimited attempts don't make sense right now.
retry.Attempts(5),
//enable when health check + monitoring implemented
// retry.Attempts(0), //unlimited
)

if err == nil {
return err
}

log.Infof("Retrying transient target write - attempt: %d, error: %s\n", 0, err)

onTransientError := retry.OnRetry(func(attempt uint, err error) {
log.Infof("Retrying transient target write - attempt: %d, error: %s\n", attempt+1, err)
})

//If no setup, then handle as transient
err = retry.Do(
write,
onTransientError,
retry.Delay(time.Duration(cfg.Data.Retry.Transient.Delay)*time.Second),
retry.Attempts(uint(cfg.Data.Retry.Transient.MaxAttempts)),
retry.LastErrorOnly(true),
)
return err
}

// exitWithError will ensure we log the error and leave time for Sentry to flush
func exitWithError(err error, flushSentry bool) {
log.WithFields(log.Fields{"error": err}).Error(err)
Expand Down
Loading

0 comments on commit 32006eb

Please sign in to comment.