Skip to content

Commit

Permalink
wires up sql dest opts, fixes driver specific opts
Browse files Browse the repository at this point in the history
  • Loading branch information
nickzelei committed Oct 31, 2024
1 parent 7f53fb5 commit d5179cd
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 15 deletions.
31 changes: 18 additions & 13 deletions internal/benthos/benthos-builder/builders/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,11 @@ func (b *sqlSyncBuilder) BuildDestinationConfig(ctx context.Context, params *bb_
}
params.SourceConfig.ColumnDefaultProperties = columnDefaultProperties

destOpts := params.DestinationOpts
destOpts, err := getDestinationOptions(params.DestinationOpts)
if err != nil {
return nil, fmt.Errorf("unable to parse destination options: %w", err)
}

config.BenthosDsns = append(config.BenthosDsns, &bb_shared.BenthosDsn{EnvVarKey: params.DestEnvVarKey, ConnectionId: params.DestConnection.Id})
if benthosConfig.RunType == tabledependency.RunTypeUpdate {
args := benthosConfig.Columns
Expand All @@ -283,22 +287,23 @@ func (b *sqlSyncBuilder) BuildDestinationConfig(ctx context.Context, params *bb_
Schema: benthosConfig.TableSchema,
Table: benthosConfig.TableName,
Columns: benthosConfig.Columns,
SkipForeignKeyViolations: destOpts.GetPostgresOptions().GetSkipForeignKeyViolations(),
SkipForeignKeyViolations: destOpts.SkipForeignKeyViolations,
MaxInFlight: int(destOpts.MaxInFlight),
WhereColumns: benthosConfig.PrimaryKeys,
ArgsMapping: buildPlainInsertArgs(args),

Batching: &neosync_benthos.Batching{
Period: "5s",
Count: 100,
Period: destOpts.BatchPeriod,
Count: destOpts.BatchCount,
},
},
},
// kills activity depending on error
{Error: &neosync_benthos.ErrorOutputConfig{
ErrorMsg: `${! meta("fallback_error")}`,
Batching: &neosync_benthos.Batching{
Period: "5s",
Count: 100,
Period: destOpts.BatchPeriod,
Count: destOpts.BatchCount,
},
}},
},
Expand Down Expand Up @@ -356,25 +361,25 @@ func (b *sqlSyncBuilder) BuildDestinationConfig(ctx context.Context, params *bb_
Columns: benthosConfig.Columns,
ColumnsDataTypes: columnTypes,
ColumnDefaultProperties: columnDefaultProperties,
OnConflictDoNothing: destOpts.GetPostgresOptions().GetOnConflict().GetDoNothing(),
SkipForeignKeyViolations: destOpts.GetPostgresOptions().GetSkipForeignKeyViolations(),
TruncateOnRetry: destOpts.GetPostgresOptions().GetTruncateTable().GetTruncateBeforeInsert(),
OnConflictDoNothing: destOpts.OnConflictDoNothing,
SkipForeignKeyViolations: destOpts.SkipForeignKeyViolations,
TruncateOnRetry: destOpts.Truncate,
ArgsMapping: buildPlainInsertArgs(benthosConfig.Columns),
Prefix: prefix,
Suffix: suffix,

Batching: &neosync_benthos.Batching{
Period: "5s",
Count: 100,
Period: destOpts.BatchPeriod,
Count: destOpts.BatchCount,
},
},
},
// kills activity depending on error
{Error: &neosync_benthos.ErrorOutputConfig{
ErrorMsg: `${! meta("fallback_error")}`,
Batching: &neosync_benthos.Batching{
Period: "5s",
Count: 100,
Period: destOpts.BatchPeriod,
Count: destOpts.BatchCount,
},
}},
},
Expand Down
4 changes: 2 additions & 2 deletions worker/pkg/benthos/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ type PooledSqlUpdate struct {
Batching *Batching `json:"batching,omitempty" yaml:"batching,omitempty"`
MaxRetryAttempts *uint `json:"max_retry_attempts,omitempty" yaml:"max_retry_attempts,omitempty"`
RetryAttemptDelay *string `json:"retry_attempt_delay,omitempty" yaml:"retry_attempt_delay,omitempty"`
MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"`
MaxInFlight int `json:"max_in_flight,omitempty" yaml:"max_in_flight,omitempty"`
}

type ColumnDefaultProperties struct {
Expand All @@ -409,7 +409,7 @@ type PooledSqlInsert struct {
Suffix *string `json:"suffix,omitempty" yaml:"suffix,omitempty"`
MaxRetryAttempts *uint `json:"max_retry_attempts,omitempty" yaml:"max_retry_attempts,omitempty"`
RetryAttemptDelay *string `json:"retry_attempt_delay,omitempty" yaml:"retry_attempt_delay,omitempty"`
MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"`
MaxInFlight int `json:"max_in_flight,omitempty" yaml:"max_in_flight,omitempty"`
}

type SqlInsert struct {
Expand Down

0 comments on commit d5179cd

Please sign in to comment.