From 17f41774fbf2f31deb38fb1a5b62aff52598affb Mon Sep 17 00:00:00 2001 From: alishakawaguchi Date: Thu, 9 Jan 2025 12:01:58 -0800 Subject: [PATCH] Fix generate jobs critical error check (#3111) --- .../benthos-builder/builders/generate-ai.go | 37 ++++++++++--------- .../benthos-builder/builders/generate.go | 37 ++++++++++--------- worker/pkg/benthos/config.go | 5 ++- worker/pkg/benthos/error/output_error.go | 16 +++++++- worker/pkg/benthos/utils.go | 22 +++++++++++ 5 files changed, 78 insertions(+), 39 deletions(-) diff --git a/internal/benthos/benthos-builder/builders/generate-ai.go b/internal/benthos/benthos-builder/builders/generate-ai.go index 9570f0f10f..70b4c6dbe7 100644 --- a/internal/benthos/benthos-builder/builders/generate-ai.go +++ b/internal/benthos/benthos-builder/builders/generate-ai.go @@ -213,15 +213,15 @@ func (b *generateAIBuilder) BuildDestinationConfig(ctx context.Context, params * config.BenthosDsns = append(config.BenthosDsns, &bb_shared.BenthosDsn{ConnectionId: params.DestConnection.Id}) config.Outputs = append(config.Outputs, neosync_benthos.Outputs{ - Fallback: []neosync_benthos.Outputs{ - { - // retry processor and output several times - Retry: &neosync_benthos.RetryConfig{ - InlineRetryConfig: neosync_benthos.InlineRetryConfig{ - MaxRetries: 10, - }, - Output: neosync_benthos.OutputConfig{ - Outputs: neosync_benthos.Outputs{ + // retry processor and output several times + Retry: &neosync_benthos.RetryConfig{ + InlineRetryConfig: neosync_benthos.InlineRetryConfig{ + MaxRetries: 10, + }, + Output: neosync_benthos.OutputConfig{ + Outputs: neosync_benthos.Outputs{ + Fallback: []neosync_benthos.Outputs{ + { PooledSqlInsert: &neosync_benthos.PooledSqlInsert{ ConnectionId: params.DestConnection.GetId(), Schema: benthosConfig.TableSchema, @@ -236,18 +236,19 @@ func (b *generateAIBuilder) BuildDestinationConfig(ctx context.Context, params * MaxInFlight: int(destOpts.MaxInFlight), }, }, - Processors: processorConfigs, + { // kills activity depending on error + Error: &neosync_benthos.ErrorOutputConfig{ + ErrorMsg: `${! meta("fallback_error")}`, + Batching: &neosync_benthos.Batching{ + Period: destOpts.BatchPeriod, + Count: destOpts.BatchCount, + }, + IsGenerateJob: true, + }}, }, }, + Processors: processorConfigs, }, - // kills activity depending on error - {Error: &neosync_benthos.ErrorOutputConfig{ - ErrorMsg: `${! meta("fallback_error")}`, - Batching: &neosync_benthos.Batching{ - Period: destOpts.BatchPeriod, - Count: destOpts.BatchCount, - }, - }}, }, }) diff --git a/internal/benthos/benthos-builder/builders/generate.go b/internal/benthos/benthos-builder/builders/generate.go index b9c6ea1a3e..0f4efe5edb 100644 --- a/internal/benthos/benthos-builder/builders/generate.go +++ b/internal/benthos/benthos-builder/builders/generate.go @@ -188,15 +188,15 @@ func (b *generateBuilder) BuildDestinationConfig(ctx context.Context, params *bb config.BenthosDsns = append(config.BenthosDsns, &bb_shared.BenthosDsn{ConnectionId: params.DestConnection.Id}) config.Outputs = append(config.Outputs, neosync_benthos.Outputs{ - Fallback: []neosync_benthos.Outputs{ - { - // retry processor and output several times - Retry: &neosync_benthos.RetryConfig{ - InlineRetryConfig: neosync_benthos.InlineRetryConfig{ - MaxRetries: 10, - }, - Output: neosync_benthos.OutputConfig{ - Outputs: neosync_benthos.Outputs{ + // retry processor and output several times + Retry: &neosync_benthos.RetryConfig{ + InlineRetryConfig: neosync_benthos.InlineRetryConfig{ + MaxRetries: 10, + }, + Output: neosync_benthos.OutputConfig{ + Outputs: neosync_benthos.Outputs{ + Fallback: []neosync_benthos.Outputs{ + { PooledSqlInsert: &neosync_benthos.PooledSqlInsert{ ConnectionId: params.DestConnection.GetId(), @@ -213,18 +213,19 @@ func (b *generateBuilder) BuildDestinationConfig(ctx context.Context, params *bb MaxInFlight: int(destOpts.MaxInFlight), }, }, - Processors: processorConfigs, + { // kills activity depending on error + Error: &neosync_benthos.ErrorOutputConfig{ + ErrorMsg: `${! meta("fallback_error")}`, + Batching: &neosync_benthos.Batching{ + Period: destOpts.BatchPeriod, + Count: destOpts.BatchCount, + }, + IsGenerateJob: true, + }}, }, }, + Processors: processorConfigs, }, - // kills activity depending on error - {Error: &neosync_benthos.ErrorOutputConfig{ - ErrorMsg: `${! meta("fallback_error")}`, - Batching: &neosync_benthos.Batching{ - Period: destOpts.BatchPeriod, - Count: destOpts.BatchCount, - }, - }}, }, }) diff --git a/worker/pkg/benthos/config.go b/worker/pkg/benthos/config.go index e5dcfc6b73..2d1b9160f2 100644 --- a/worker/pkg/benthos/config.go +++ b/worker/pkg/benthos/config.go @@ -289,8 +289,9 @@ type SwitchOutputCase struct { Output Outputs `json:"output,omitempty" yaml:"output,omitempty"` } type ErrorOutputConfig struct { - ErrorMsg string `json:"error_msg" yaml:"error_msg"` - Batching *Batching `json:"batching,omitempty" yaml:"batching,omitempty"` + ErrorMsg string `json:"error_msg" yaml:"error_msg"` + IsGenerateJob bool `json:"is_generate_job" yaml:"is_generate_job"` + Batching *Batching `json:"batching,omitempty" yaml:"batching,omitempty"` } type RedisHashOutputConfig struct { diff --git a/worker/pkg/benthos/error/output_error.go b/worker/pkg/benthos/error/output_error.go index 2aa5ea58e5..fb0f7be34a 100644 --- a/worker/pkg/benthos/error/output_error.go +++ b/worker/pkg/benthos/error/output_error.go @@ -14,6 +14,7 @@ func errorOutputSpec() *service.ConfigSpec { Summary(`Sends stop Activity signal`). Field(service.NewStringField("error_msg")). Field(service.NewIntField("max_in_flight").Default(64)). + Field(service.NewBoolField("is_generate_job").Default(false)). Field(service.NewBatchPolicyField("batching")) } @@ -44,10 +45,15 @@ func newErrorOutput(conf *service.ParsedConfig, mgr *service.Resources, channel if err != nil { return nil, err } + isGenerateJob, err := conf.FieldBool("is_generate_job") + if err != nil { + return nil, err + } return &errorOutput{ logger: mgr.Logger(), stopActivityChannel: channel, errorMsg: errMsg, + isGenerateJob: isGenerateJob, }, nil } @@ -55,6 +61,7 @@ type errorOutput struct { logger *service.Logger stopActivityChannel chan<- error errorMsg *service.InterpolatedString + isGenerateJob bool } func (e *errorOutput) Connect(ctx context.Context) error { @@ -67,7 +74,7 @@ func (e *errorOutput) WriteBatch(ctx context.Context, batch service.MessageBatch if err != nil { return fmt.Errorf("error message interpolation error: %w", err) } - if !neosync_benthos.IsCriticalError(errMsg) { + if !e.isCriticalError(errMsg) { // throw error so that benthos retries return errors.New(errMsg) } @@ -81,3 +88,10 @@ func (e *errorOutput) WriteBatch(ctx context.Context, batch service.MessageBatch func (e *errorOutput) Close(ctx context.Context) error { return nil } + +func (e *errorOutput) isCriticalError(errMsg string) bool { + if e.isGenerateJob { + return neosync_benthos.IsGenerateJobCriticalError(errMsg) + } + return neosync_benthos.IsCriticalError(errMsg) +} diff --git a/worker/pkg/benthos/utils.go b/worker/pkg/benthos/utils.go index 736c0a369d..11324d5a8c 100644 --- a/worker/pkg/benthos/utils.go +++ b/worker/pkg/benthos/utils.go @@ -48,6 +48,28 @@ func IsCriticalError(errMsg string) bool { return false } +// checks if the error message is critical for the generate job +func IsGenerateJobCriticalError(errMsg string) bool { + criticalErrors := []string{ + "violates foreign key constraint", + "cannot add or update a child row", + "a foreign key constraint fails", + "could not identify an equality operator", + "violates not-null constraint", + "invalid input syntax", + "incorrect datetime value", + "incorrect date value", + "incorrect time value", + } + + for _, errStr := range criticalErrors { + if containsIgnoreCase(errMsg, errStr) { + return true + } + } + return false +} + func containsIgnoreCase(s, substr string) bool { return strings.Contains(strings.ToLower(s), strings.ToLower(substr)) }