Skip to content

Commit

Permalink
Fix generate jobs critical error check (#3111)
Browse files Browse the repository at this point in the history
  • Loading branch information
alishakawaguchi authored Jan 9, 2025
1 parent 3062145 commit 17f4177
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 39 deletions.
37 changes: 19 additions & 18 deletions internal/benthos/benthos-builder/builders/generate-ai.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,15 +213,15 @@ func (b *generateAIBuilder) BuildDestinationConfig(ctx context.Context, params *

config.BenthosDsns = append(config.BenthosDsns, &bb_shared.BenthosDsn{ConnectionId: params.DestConnection.Id})
config.Outputs = append(config.Outputs, neosync_benthos.Outputs{
Fallback: []neosync_benthos.Outputs{
{
// retry processor and output several times
Retry: &neosync_benthos.RetryConfig{
InlineRetryConfig: neosync_benthos.InlineRetryConfig{
MaxRetries: 10,
},
Output: neosync_benthos.OutputConfig{
Outputs: neosync_benthos.Outputs{
// retry processor and output several times
Retry: &neosync_benthos.RetryConfig{
InlineRetryConfig: neosync_benthos.InlineRetryConfig{
MaxRetries: 10,
},
Output: neosync_benthos.OutputConfig{
Outputs: neosync_benthos.Outputs{
Fallback: []neosync_benthos.Outputs{
{
PooledSqlInsert: &neosync_benthos.PooledSqlInsert{
ConnectionId: params.DestConnection.GetId(),
Schema: benthosConfig.TableSchema,
Expand All @@ -236,18 +236,19 @@ func (b *generateAIBuilder) BuildDestinationConfig(ctx context.Context, params *
MaxInFlight: int(destOpts.MaxInFlight),
},
},
Processors: processorConfigs,
{ // kills activity depending on error
Error: &neosync_benthos.ErrorOutputConfig{
ErrorMsg: `${! meta("fallback_error")}`,
Batching: &neosync_benthos.Batching{
Period: destOpts.BatchPeriod,
Count: destOpts.BatchCount,
},
IsGenerateJob: true,
}},
},
},
Processors: processorConfigs,
},
// kills activity depending on error
{Error: &neosync_benthos.ErrorOutputConfig{
ErrorMsg: `${! meta("fallback_error")}`,
Batching: &neosync_benthos.Batching{
Period: destOpts.BatchPeriod,
Count: destOpts.BatchCount,
},
}},
},
})

Expand Down
37 changes: 19 additions & 18 deletions internal/benthos/benthos-builder/builders/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,15 +188,15 @@ func (b *generateBuilder) BuildDestinationConfig(ctx context.Context, params *bb

config.BenthosDsns = append(config.BenthosDsns, &bb_shared.BenthosDsn{ConnectionId: params.DestConnection.Id})
config.Outputs = append(config.Outputs, neosync_benthos.Outputs{
Fallback: []neosync_benthos.Outputs{
{
// retry processor and output several times
Retry: &neosync_benthos.RetryConfig{
InlineRetryConfig: neosync_benthos.InlineRetryConfig{
MaxRetries: 10,
},
Output: neosync_benthos.OutputConfig{
Outputs: neosync_benthos.Outputs{
// retry processor and output several times
Retry: &neosync_benthos.RetryConfig{
InlineRetryConfig: neosync_benthos.InlineRetryConfig{
MaxRetries: 10,
},
Output: neosync_benthos.OutputConfig{
Outputs: neosync_benthos.Outputs{
Fallback: []neosync_benthos.Outputs{
{
PooledSqlInsert: &neosync_benthos.PooledSqlInsert{
ConnectionId: params.DestConnection.GetId(),

Expand All @@ -213,18 +213,19 @@ func (b *generateBuilder) BuildDestinationConfig(ctx context.Context, params *bb
MaxInFlight: int(destOpts.MaxInFlight),
},
},
Processors: processorConfigs,
{ // kills activity depending on error
Error: &neosync_benthos.ErrorOutputConfig{
ErrorMsg: `${! meta("fallback_error")}`,
Batching: &neosync_benthos.Batching{
Period: destOpts.BatchPeriod,
Count: destOpts.BatchCount,
},
IsGenerateJob: true,
}},
},
},
Processors: processorConfigs,
},
// kills activity depending on error
{Error: &neosync_benthos.ErrorOutputConfig{
ErrorMsg: `${! meta("fallback_error")}`,
Batching: &neosync_benthos.Batching{
Period: destOpts.BatchPeriod,
Count: destOpts.BatchCount,
},
}},
},
})

Expand Down
5 changes: 3 additions & 2 deletions worker/pkg/benthos/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,8 +289,9 @@ type SwitchOutputCase struct {
Output Outputs `json:"output,omitempty" yaml:"output,omitempty"`
}
type ErrorOutputConfig struct {
ErrorMsg string `json:"error_msg" yaml:"error_msg"`
Batching *Batching `json:"batching,omitempty" yaml:"batching,omitempty"`
ErrorMsg string `json:"error_msg" yaml:"error_msg"`
IsGenerateJob bool `json:"is_generate_job" yaml:"is_generate_job"`
Batching *Batching `json:"batching,omitempty" yaml:"batching,omitempty"`
}

type RedisHashOutputConfig struct {
Expand Down
16 changes: 15 additions & 1 deletion worker/pkg/benthos/error/output_error.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ func errorOutputSpec() *service.ConfigSpec {
Summary(`Sends stop Activity signal`).
Field(service.NewStringField("error_msg")).
Field(service.NewIntField("max_in_flight").Default(64)).
Field(service.NewBoolField("is_generate_job").Default(false)).
Field(service.NewBatchPolicyField("batching"))
}

Expand Down Expand Up @@ -44,17 +45,23 @@ func newErrorOutput(conf *service.ParsedConfig, mgr *service.Resources, channel
if err != nil {
return nil, err
}
isGenerateJob, err := conf.FieldBool("is_generate_job")
if err != nil {
return nil, err
}
return &errorOutput{
logger: mgr.Logger(),
stopActivityChannel: channel,
errorMsg: errMsg,
isGenerateJob: isGenerateJob,
}, nil
}

type errorOutput struct {
logger *service.Logger
stopActivityChannel chan<- error
errorMsg *service.InterpolatedString
isGenerateJob bool
}

func (e *errorOutput) Connect(ctx context.Context) error {
Expand All @@ -67,7 +74,7 @@ func (e *errorOutput) WriteBatch(ctx context.Context, batch service.MessageBatch
if err != nil {
return fmt.Errorf("error message interpolation error: %w", err)
}
if !neosync_benthos.IsCriticalError(errMsg) {
if !e.isCriticalError(errMsg) {
// throw error so that benthos retries
return errors.New(errMsg)
}
Expand All @@ -81,3 +88,10 @@ func (e *errorOutput) WriteBatch(ctx context.Context, batch service.MessageBatch
func (e *errorOutput) Close(ctx context.Context) error {
return nil
}

func (e *errorOutput) isCriticalError(errMsg string) bool {
if e.isGenerateJob {
return neosync_benthos.IsGenerateJobCriticalError(errMsg)
}
return neosync_benthos.IsCriticalError(errMsg)
}
22 changes: 22 additions & 0 deletions worker/pkg/benthos/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,28 @@ func IsCriticalError(errMsg string) bool {
return false
}

// checks if the error message is critical for the generate job
func IsGenerateJobCriticalError(errMsg string) bool {
criticalErrors := []string{
"violates foreign key constraint",
"cannot add or update a child row",
"a foreign key constraint fails",
"could not identify an equality operator",
"violates not-null constraint",
"invalid input syntax",
"incorrect datetime value",
"incorrect date value",
"incorrect time value",
}

for _, errStr := range criticalErrors {
if containsIgnoreCase(errMsg, errStr) {
return true
}
}
return false
}

func containsIgnoreCase(s, substr string) bool {
return strings.Contains(strings.ToLower(s), strings.ToLower(substr))
}
Expand Down

0 comments on commit 17f4177

Please sign in to comment.