diff --git a/internal/benthos/benthos-builder/builders/sql.go b/internal/benthos/benthos-builder/builders/sql.go index 35ea52672d..6820cfcd7e 100644 --- a/internal/benthos/benthos-builder/builders/sql.go +++ b/internal/benthos/benthos-builder/builders/sql.go @@ -268,7 +268,11 @@ func (b *sqlSyncBuilder) BuildDestinationConfig(ctx context.Context, params *bb_ } params.SourceConfig.ColumnDefaultProperties = columnDefaultProperties - destOpts := params.DestinationOpts + destOpts, err := getDestinationOptions(params.DestinationOpts) + if err != nil { + return nil, fmt.Errorf("unable to parse destination options: %w", err) + } + config.BenthosDsns = append(config.BenthosDsns, &bb_shared.BenthosDsn{EnvVarKey: params.DestEnvVarKey, ConnectionId: params.DestConnection.Id}) if benthosConfig.RunType == tabledependency.RunTypeUpdate { args := benthosConfig.Columns @@ -283,13 +287,14 @@ func (b *sqlSyncBuilder) BuildDestinationConfig(ctx context.Context, params *bb_ Schema: benthosConfig.TableSchema, Table: benthosConfig.TableName, Columns: benthosConfig.Columns, - SkipForeignKeyViolations: destOpts.GetPostgresOptions().GetSkipForeignKeyViolations(), + SkipForeignKeyViolations: destOpts.SkipForeignKeyViolations, + MaxInFlight: int(destOpts.MaxInFlight), WhereColumns: benthosConfig.PrimaryKeys, ArgsMapping: buildPlainInsertArgs(args), Batching: &neosync_benthos.Batching{ - Period: "5s", - Count: 100, + Period: destOpts.BatchPeriod, + Count: destOpts.BatchCount, }, }, }, @@ -297,8 +302,8 @@ func (b *sqlSyncBuilder) BuildDestinationConfig(ctx context.Context, params *bb_ {Error: &neosync_benthos.ErrorOutputConfig{ ErrorMsg: `${! meta("fallback_error")}`, Batching: &neosync_benthos.Batching{ - Period: "5s", - Count: 100, + Period: destOpts.BatchPeriod, + Count: destOpts.BatchCount, }, }}, }, @@ -356,16 +361,16 @@ func (b *sqlSyncBuilder) BuildDestinationConfig(ctx context.Context, params *bb_ Columns: benthosConfig.Columns, ColumnsDataTypes: columnTypes, ColumnDefaultProperties: columnDefaultProperties, - OnConflictDoNothing: destOpts.GetPostgresOptions().GetOnConflict().GetDoNothing(), - SkipForeignKeyViolations: destOpts.GetPostgresOptions().GetSkipForeignKeyViolations(), - TruncateOnRetry: destOpts.GetPostgresOptions().GetTruncateTable().GetTruncateBeforeInsert(), + OnConflictDoNothing: destOpts.OnConflictDoNothing, + SkipForeignKeyViolations: destOpts.SkipForeignKeyViolations, + TruncateOnRetry: destOpts.Truncate, ArgsMapping: buildPlainInsertArgs(benthosConfig.Columns), Prefix: prefix, Suffix: suffix, Batching: &neosync_benthos.Batching{ - Period: "5s", - Count: 100, + Period: destOpts.BatchPeriod, + Count: destOpts.BatchCount, }, }, }, @@ -373,8 +378,8 @@ func (b *sqlSyncBuilder) BuildDestinationConfig(ctx context.Context, params *bb_ {Error: &neosync_benthos.ErrorOutputConfig{ ErrorMsg: `${! meta("fallback_error")}`, Batching: &neosync_benthos.Batching{ - Period: "5s", - Count: 100, + Period: destOpts.BatchPeriod, + Count: destOpts.BatchCount, }, }}, }, diff --git a/worker/pkg/benthos/config.go b/worker/pkg/benthos/config.go index 4551b41623..6b94f5aba9 100644 --- a/worker/pkg/benthos/config.go +++ b/worker/pkg/benthos/config.go @@ -383,7 +383,7 @@ type PooledSqlUpdate struct { Batching *Batching `json:"batching,omitempty" yaml:"batching,omitempty"` MaxRetryAttempts *uint `json:"max_retry_attempts,omitempty" yaml:"max_retry_attempts,omitempty"` RetryAttemptDelay *string `json:"retry_attempt_delay,omitempty" yaml:"retry_attempt_delay,omitempty"` - MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` + MaxInFlight int `json:"max_in_flight,omitempty" yaml:"max_in_flight,omitempty"` } type ColumnDefaultProperties struct { @@ -409,7 +409,7 @@ type PooledSqlInsert struct { Suffix *string `json:"suffix,omitempty" yaml:"suffix,omitempty"` MaxRetryAttempts *uint `json:"max_retry_attempts,omitempty" yaml:"max_retry_attempts,omitempty"` RetryAttemptDelay *string `json:"retry_attempt_delay,omitempty" yaml:"retry_attempt_delay,omitempty"` - MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` + MaxInFlight int `json:"max_in_flight,omitempty" yaml:"max_in_flight,omitempty"` } type SqlInsert struct {