From 6242842ccf2c1910fb075dfe76467e26213a5065 Mon Sep 17 00:00:00 2001 From: alishakawaguchi Date: Thu, 31 Oct 2024 21:43:57 -0700 Subject: [PATCH] Fix mysql date and datetime in CLI sync (#2893) --- backend/pkg/dbconnect-config/mysql.go | 8 +- backend/pkg/dbconnect-config/mysql_test.go | 30 +++ backend/pkg/sqlconnect/mock_SqlConnector.go | 43 ++-- backend/pkg/sqlconnect/sql-connector.go | 36 +++- .../connection-data.go | 5 +- .../v1alpha1/connection-service/connection.go | 2 +- cli/internal/cmds/neosync/sync/job.go | 2 +- cli/internal/cmds/neosync/sync/sync.go | 6 +- .../neosync/sync/sync_integration_test.go | 17 +- .../benthos-builder/benthos-builder.go | 11 +- .../benthos-builder/builders/aws-s3.go | 26 ++- .../benthos-builder/builders/mongodb.go | 12 +- .../testdata/mysql/alltypes/create-schema.sql | 2 + .../testdata/mysql/alltypes/create-tables.sql | 136 ++++++++++++ .../testdata/mysql/alltypes/teardown.sql | 1 + .../mysql/humanresources/create-tables.sql | 201 +++++++++--------- .../postgres/alltypes/create-tables.sql | 12 +- worker/pkg/benthos/config.go | 7 +- worker/pkg/benthos/environment/environment.go | 6 + worker/pkg/benthos/json/sql_processor.go | 77 +++++++ 20 files changed, 486 insertions(+), 154 deletions(-) create mode 100644 internal/testutil/testdata/mysql/alltypes/create-schema.sql create mode 100644 internal/testutil/testdata/mysql/alltypes/create-tables.sql create mode 100644 internal/testutil/testdata/mysql/alltypes/teardown.sql create mode 100644 worker/pkg/benthos/json/sql_processor.go diff --git a/backend/pkg/dbconnect-config/mysql.go b/backend/pkg/dbconnect-config/mysql.go index 90a4befb95..9a43bd5192 100644 --- a/backend/pkg/dbconnect-config/mysql.go +++ b/backend/pkg/dbconnect-config/mysql.go @@ -30,7 +30,9 @@ func NewFromMysqlConnection( config *mgmtv1alpha1.ConnectionConfig_MysqlConfig, connectionTimeout *uint32, logger *slog.Logger, + mysqlDisableParseTime bool, ) (DbConnectConfig, error) { + parseTime := !mysqlDisableParseTime switch cc := config.MysqlConfig.GetConnectionConfig().(type) { case *mgmtv1alpha1.MysqlConnectionConfig_Connection: cfg := mysql.NewConfig() @@ -46,7 +48,7 @@ func NewFromMysqlConnection( } cfg.Net = cc.Connection.GetProtocol() cfg.MultiStatements = true - cfg.ParseTime = true + cfg.ParseTime = parseTime return &mysqlConnectConfig{dsn: cfg.FormatDSN(), user: cfg.User}, nil case *mgmtv1alpha1.MysqlConnectionConfig_Url: @@ -76,7 +78,7 @@ func NewFromMysqlConnection( cfg.Timeout = time.Duration(*connectionTimeout) * time.Second } cfg.MultiStatements = true - cfg.ParseTime = true + cfg.ParseTime = parseTime for k, values := range uriConfig.Query() { for _, value := range values { cfg.Params[k] = value @@ -89,7 +91,7 @@ func NewFromMysqlConnection( cfg.Timeout = time.Duration(*connectionTimeout) * time.Second } cfg.MultiStatements = true - cfg.ParseTime = true + cfg.ParseTime = parseTime return &mysqlConnectConfig{dsn: cfg.FormatDSN(), user: cfg.User}, nil default: return nil, fmt.Errorf("unsupported mysql connection config: %T", cc) diff --git a/backend/pkg/dbconnect-config/mysql_test.go b/backend/pkg/dbconnect-config/mysql_test.go index 27207c226a..02a0ceee41 100644 --- a/backend/pkg/dbconnect-config/mysql_test.go +++ b/backend/pkg/dbconnect-config/mysql_test.go @@ -35,6 +35,7 @@ func Test_NewFromMysqlConnection(t *testing.T) { }, &testConnectionTimeout, discardLogger, + false, ) assert.NoError(t, err) assert.NotNil(t, actual) @@ -45,6 +46,29 @@ func Test_NewFromMysqlConnection(t *testing.T) { ) assert.Equal(t, "test-user", actual.GetUser()) }) + + t.Run("ok_disable_parse_time", func(t *testing.T) { + actual, err := NewFromMysqlConnection( + &mgmtv1alpha1.ConnectionConfig_MysqlConfig{ + MysqlConfig: &mgmtv1alpha1.MysqlConnectionConfig{ + ConnectionConfig: &mgmtv1alpha1.MysqlConnectionConfig_Connection{ + Connection: mysqlconnectionFixture, + }, + }, + }, + &testConnectionTimeout, + discardLogger, + true, + ) + assert.NoError(t, err) + assert.NotNil(t, actual) + assert.Equal( + t, + "test-user:test-pass@tcp(localhost:3309)/mydb?multiStatements=true&timeout=5s", + actual.String(), + ) + assert.Equal(t, "test-user", actual.GetUser()) + }) t.Run("ok_no_timeout", func(t *testing.T) { actual, err := NewFromMysqlConnection( &mgmtv1alpha1.ConnectionConfig_MysqlConfig{ @@ -56,6 +80,7 @@ func Test_NewFromMysqlConnection(t *testing.T) { }, nil, discardLogger, + false, ) assert.NoError(t, err) assert.NotNil(t, actual) @@ -80,6 +105,7 @@ func Test_NewFromMysqlConnection(t *testing.T) { }, &testConnectionTimeout, discardLogger, + false, ) assert.NoError(t, err) assert.NotNil(t, actual) @@ -101,6 +127,7 @@ func Test_NewFromMysqlConnection(t *testing.T) { }, nil, discardLogger, + false, ) assert.NoError(t, err) assert.NotNil(t, actual) @@ -122,6 +149,7 @@ func Test_NewFromMysqlConnection(t *testing.T) { }, nil, discardLogger, + false, ) assert.NoError(t, err) assert.NotNil(t, actual) @@ -146,6 +174,7 @@ func Test_NewFromMysqlConnection(t *testing.T) { }, &testConnectionTimeout, discardLogger, + false, ) assert.NoError(t, err) assert.NotNil(t, actual) @@ -167,6 +196,7 @@ func Test_NewFromMysqlConnection(t *testing.T) { }, nil, discardLogger, + false, ) assert.NoError(t, err) assert.NotNil(t, actual) diff --git a/backend/pkg/sqlconnect/mock_SqlConnector.go b/backend/pkg/sqlconnect/mock_SqlConnector.go index 7c98534a68..5dc07bd2cc 100644 --- a/backend/pkg/sqlconnect/mock_SqlConnector.go +++ b/backend/pkg/sqlconnect/mock_SqlConnector.go @@ -22,9 +22,16 @@ func (_m *MockSqlConnector) EXPECT() *MockSqlConnector_Expecter { return &MockSqlConnector_Expecter{mock: &_m.Mock} } -// NewDbFromConnectionConfig provides a mock function with given fields: connectionConfig, connectionTimeout, logger -func (_m *MockSqlConnector) NewDbFromConnectionConfig(connectionConfig *mgmtv1alpha1.ConnectionConfig, connectionTimeout *uint32, logger *slog.Logger) (SqlDbContainer, error) { - ret := _m.Called(connectionConfig, connectionTimeout, logger) +// NewDbFromConnectionConfig provides a mock function with given fields: connectionConfig, connectionTimeout, logger, opts +func (_m *MockSqlConnector) NewDbFromConnectionConfig(connectionConfig *mgmtv1alpha1.ConnectionConfig, connectionTimeout *uint32, logger *slog.Logger, opts ...SqlConnectorOption) (SqlDbContainer, error) { + _va := make([]interface{}, len(opts)) + for _i := range opts { + _va[_i] = opts[_i] + } + var _ca []interface{} + _ca = append(_ca, connectionConfig, connectionTimeout, logger) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) if len(ret) == 0 { panic("no return value specified for NewDbFromConnectionConfig") @@ -32,19 +39,19 @@ func (_m *MockSqlConnector) NewDbFromConnectionConfig(connectionConfig *mgmtv1al var r0 SqlDbContainer var r1 error - if rf, ok := ret.Get(0).(func(*mgmtv1alpha1.ConnectionConfig, *uint32, *slog.Logger) (SqlDbContainer, error)); ok { - return rf(connectionConfig, connectionTimeout, logger) + if rf, ok := ret.Get(0).(func(*mgmtv1alpha1.ConnectionConfig, *uint32, *slog.Logger, ...SqlConnectorOption) (SqlDbContainer, error)); ok { + return rf(connectionConfig, connectionTimeout, logger, opts...) } - if rf, ok := ret.Get(0).(func(*mgmtv1alpha1.ConnectionConfig, *uint32, *slog.Logger) SqlDbContainer); ok { - r0 = rf(connectionConfig, connectionTimeout, logger) + if rf, ok := ret.Get(0).(func(*mgmtv1alpha1.ConnectionConfig, *uint32, *slog.Logger, ...SqlConnectorOption) SqlDbContainer); ok { + r0 = rf(connectionConfig, connectionTimeout, logger, opts...) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(SqlDbContainer) } } - if rf, ok := ret.Get(1).(func(*mgmtv1alpha1.ConnectionConfig, *uint32, *slog.Logger) error); ok { - r1 = rf(connectionConfig, connectionTimeout, logger) + if rf, ok := ret.Get(1).(func(*mgmtv1alpha1.ConnectionConfig, *uint32, *slog.Logger, ...SqlConnectorOption) error); ok { + r1 = rf(connectionConfig, connectionTimeout, logger, opts...) } else { r1 = ret.Error(1) } @@ -61,13 +68,21 @@ type MockSqlConnector_NewDbFromConnectionConfig_Call struct { // - connectionConfig *mgmtv1alpha1.ConnectionConfig // - connectionTimeout *uint32 // - logger *slog.Logger -func (_e *MockSqlConnector_Expecter) NewDbFromConnectionConfig(connectionConfig interface{}, connectionTimeout interface{}, logger interface{}) *MockSqlConnector_NewDbFromConnectionConfig_Call { - return &MockSqlConnector_NewDbFromConnectionConfig_Call{Call: _e.mock.On("NewDbFromConnectionConfig", connectionConfig, connectionTimeout, logger)} +// - opts ...SqlConnectorOption +func (_e *MockSqlConnector_Expecter) NewDbFromConnectionConfig(connectionConfig interface{}, connectionTimeout interface{}, logger interface{}, opts ...interface{}) *MockSqlConnector_NewDbFromConnectionConfig_Call { + return &MockSqlConnector_NewDbFromConnectionConfig_Call{Call: _e.mock.On("NewDbFromConnectionConfig", + append([]interface{}{connectionConfig, connectionTimeout, logger}, opts...)...)} } -func (_c *MockSqlConnector_NewDbFromConnectionConfig_Call) Run(run func(connectionConfig *mgmtv1alpha1.ConnectionConfig, connectionTimeout *uint32, logger *slog.Logger)) *MockSqlConnector_NewDbFromConnectionConfig_Call { +func (_c *MockSqlConnector_NewDbFromConnectionConfig_Call) Run(run func(connectionConfig *mgmtv1alpha1.ConnectionConfig, connectionTimeout *uint32, logger *slog.Logger, opts ...SqlConnectorOption)) *MockSqlConnector_NewDbFromConnectionConfig_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(*mgmtv1alpha1.ConnectionConfig), args[1].(*uint32), args[2].(*slog.Logger)) + variadicArgs := make([]SqlConnectorOption, len(args)-3) + for i, a := range args[3:] { + if a != nil { + variadicArgs[i] = a.(SqlConnectorOption) + } + } + run(args[0].(*mgmtv1alpha1.ConnectionConfig), args[1].(*uint32), args[2].(*slog.Logger), variadicArgs...) }) return _c } @@ -77,7 +92,7 @@ func (_c *MockSqlConnector_NewDbFromConnectionConfig_Call) Return(_a0 SqlDbConta return _c } -func (_c *MockSqlConnector_NewDbFromConnectionConfig_Call) RunAndReturn(run func(*mgmtv1alpha1.ConnectionConfig, *uint32, *slog.Logger) (SqlDbContainer, error)) *MockSqlConnector_NewDbFromConnectionConfig_Call { +func (_c *MockSqlConnector_NewDbFromConnectionConfig_Call) RunAndReturn(run func(*mgmtv1alpha1.ConnectionConfig, *uint32, *slog.Logger, ...SqlConnectorOption) (SqlDbContainer, error)) *MockSqlConnector_NewDbFromConnectionConfig_Call { _c.Call.Return(run) return _c } diff --git a/backend/pkg/sqlconnect/sql-connector.go b/backend/pkg/sqlconnect/sql-connector.go index e1d7e0db7c..31273675f8 100644 --- a/backend/pkg/sqlconnect/sql-connector.go +++ b/backend/pkg/sqlconnect/sql-connector.go @@ -34,17 +34,45 @@ type SqlDBTX interface { BeginTx(context.Context, *sql.TxOptions) (*sql.Tx, error) } +type SqlConnectorOption func(*sqlConnectorOptions) + +type sqlConnectorOptions struct { + mysqlDisableParseTime bool + postgresDriver string +} + +// WithMysqlParseTimeDisabled disables MySQL time parsing +func WithMysqlParseTimeDisabled() SqlConnectorOption { + return func(opts *sqlConnectorOptions) { + opts.mysqlDisableParseTime = true + } +} + +// WithPostgresDriver overrides default postgres driver +func WithDefaultPostgresDriver() SqlConnectorOption { + return func(opts *sqlConnectorOptions) { + opts.postgresDriver = "postgres" + } +} + type SqlConnector interface { - NewDbFromConnectionConfig(connectionConfig *mgmtv1alpha1.ConnectionConfig, connectionTimeout *uint32, logger *slog.Logger) (SqlDbContainer, error) + NewDbFromConnectionConfig(connectionConfig *mgmtv1alpha1.ConnectionConfig, connectionTimeout *uint32, logger *slog.Logger, opts ...SqlConnectorOption) (SqlDbContainer, error) } type SqlOpenConnector struct{} -func (rc *SqlOpenConnector) NewDbFromConnectionConfig(cc *mgmtv1alpha1.ConnectionConfig, connectionTimeout *uint32, logger *slog.Logger) (SqlDbContainer, error) { +func (rc *SqlOpenConnector) NewDbFromConnectionConfig(cc *mgmtv1alpha1.ConnectionConfig, connectionTimeout *uint32, logger *slog.Logger, opts ...SqlConnectorOption) (SqlDbContainer, error) { if cc == nil { return nil, errors.New("connectionConfig was nil, expected *mgmtv1alpha1.ConnectionConfig") } + options := sqlConnectorOptions{ + postgresDriver: "pgx", + } + for _, opt := range opts { + opt(&options) + } + dbconnopts, err := getConnectionOptsFromConnectionConfig(cc) if err != nil { return nil, err @@ -76,10 +104,10 @@ func (rc *SqlOpenConnector) NewDbFromConnectionConfig(cc *mgmtv1alpha1.Connectio dbconnopts, ), nil } else { - return newStdlibContainer("pgx", dsn, dbconnopts), nil + return newStdlibContainer(options.postgresDriver, dsn, dbconnopts), nil } case *mgmtv1alpha1.ConnectionConfig_MysqlConfig: - connDetails, err := dbconnectconfig.NewFromMysqlConnection(config, connectionTimeout, logger) + connDetails, err := dbconnectconfig.NewFromMysqlConnection(config, connectionTimeout, logger, options.mysqlDisableParseTime) if err != nil { return nil, err } diff --git a/backend/services/mgmt/v1alpha1/connection-data-service/connection-data.go b/backend/services/mgmt/v1alpha1/connection-data-service/connection-data.go index 00dc6b5d25..7d43b19e28 100644 --- a/backend/services/mgmt/v1alpha1/connection-data-service/connection-data.go +++ b/backend/services/mgmt/v1alpha1/connection-data-service/connection-data.go @@ -24,6 +24,7 @@ import ( nucleuserrors "github.com/nucleuscloud/neosync/backend/internal/errors" neosync_gcp "github.com/nucleuscloud/neosync/backend/internal/gcp" "github.com/nucleuscloud/neosync/backend/internal/neosyncdb" + "github.com/nucleuscloud/neosync/backend/pkg/sqlconnect" sqlmanager_mysql "github.com/nucleuscloud/neosync/backend/pkg/sqlmanager/mysql" sqlmanager_postgres "github.com/nucleuscloud/neosync/backend/pkg/sqlmanager/postgres" sqlmanager_shared "github.com/nucleuscloud/neosync/backend/pkg/sqlmanager/shared" @@ -87,7 +88,7 @@ func (s *Service) GetConnectionDataStream( return err } - conn, err := s.sqlConnector.NewDbFromConnectionConfig(connection.ConnectionConfig, &connectionTimeout, logger) + conn, err := s.sqlConnector.NewDbFromConnectionConfig(connection.ConnectionConfig, &connectionTimeout, logger, sqlconnect.WithMysqlParseTimeDisabled()) if err != nil { return err } @@ -148,7 +149,7 @@ func (s *Service) GetConnectionDataStream( return err } - conn, err := s.sqlConnector.NewDbFromConnectionConfig(connection.GetConnectionConfig(), &connectionTimeout, logger) + conn, err := s.sqlConnector.NewDbFromConnectionConfig(connection.GetConnectionConfig(), &connectionTimeout, logger, sqlconnect.WithDefaultPostgresDriver()) if err != nil { return err } diff --git a/backend/services/mgmt/v1alpha1/connection-service/connection.go b/backend/services/mgmt/v1alpha1/connection-service/connection.go index 71949c4c4c..f2f6541c30 100644 --- a/backend/services/mgmt/v1alpha1/connection-service/connection.go +++ b/backend/services/mgmt/v1alpha1/connection-service/connection.go @@ -202,7 +202,7 @@ func getDbRoleFromConnectionConfig(cconfig *mgmtv1alpha1.ConnectionConfig, logge } return parsedCfg.GetUser(), nil case *mgmtv1alpha1.ConnectionConfig_MysqlConfig: - parsedCfg, err := dbconnectconfig.NewFromMysqlConnection(typedconfig, nil, logger) + parsedCfg, err := dbconnectconfig.NewFromMysqlConnection(typedconfig, nil, logger, false) if err != nil { return "", fmt.Errorf("unable to parse mysql connection: %w", err) } diff --git a/cli/internal/cmds/neosync/sync/job.go b/cli/internal/cmds/neosync/sync/job.go index 046e877fc2..9616ea12f4 100644 --- a/cli/internal/cmds/neosync/sync/job.go +++ b/cli/internal/cmds/neosync/sync/job.go @@ -7,7 +7,7 @@ import ( mgmtv1alpha1 "github.com/nucleuscloud/neosync/backend/gen/go/protos/mgmt/v1alpha1" ) -func createJob( +func toJob( cmd *cmdConfig, sourceConnection *mgmtv1alpha1.Connection, destinationConnection *mgmtv1alpha1.Connection, diff --git a/cli/internal/cmds/neosync/sync/sync.go b/cli/internal/cmds/neosync/sync/sync.go index 11661e4c6b..a97c1c60c4 100644 --- a/cli/internal/cmds/neosync/sync/sync.go +++ b/cli/internal/cmds/neosync/sync/sync.go @@ -297,7 +297,9 @@ func (c *clisync) configureAndRunSync() error { case <-ctx.Done(): return case <-stopChan: + c.logger.Error("Sync Failed.") cancel() + os.Exit(1) return } } @@ -369,6 +371,8 @@ func (c *clisync) configureSync() ([][]*benthosbuilder.BenthosConfigResponse, er if syncConfigs == nil { return nil, nil } + + // TODO move this after benthos builder c.logger.Info("Running table init statements...") err = c.runDestinationInitStatements(syncConfigs, schemaConfig) if err != nil { @@ -378,7 +382,7 @@ func (c *clisync) configureSync() ([][]*benthosbuilder.BenthosConfigResponse, er syncConfigCount := len(syncConfigs) c.logger.Info(fmt.Sprintf("Generating %d sync configs...", syncConfigCount)) - job, err := createJob(c.cmd, c.sourceConnection, c.destinationConnection, schemaConfig.Schemas) + job, err := toJob(c.cmd, c.sourceConnection, c.destinationConnection, schemaConfig.Schemas) if err != nil { c.logger.Error("unable to create job") return nil, err diff --git a/cli/internal/cmds/neosync/sync/sync_integration_test.go b/cli/internal/cmds/neosync/sync/sync_integration_test.go index 888f6aa6da..eda3ddcdde 100644 --- a/cli/internal/cmds/neosync/sync/sync_integration_test.go +++ b/cli/internal/cmds/neosync/sync/sync_integration_test.go @@ -42,12 +42,12 @@ func Test_Sync(t *testing.T) { panic(err) } - testdataFolder := "../../../../../internal/testutil/testdata/postgres/humanresources" - err = postgres.Source.RunSqlFiles(ctx, &testdataFolder, []string{"create-tables.sql"}) + testdataFolder := "../../../../../internal/testutil/testdata/postgres" + err = postgres.Source.RunSqlFiles(ctx, &testdataFolder, []string{"humanresources/create-tables.sql"}) if err != nil { panic(err) } - err = postgres.Target.RunSqlFiles(ctx, &testdataFolder, []string{"create-schema.sql"}) + err = postgres.Target.RunSqlFiles(ctx, &testdataFolder, []string{"humanresources/create-schema.sql"}) if err != nil { panic(err) } @@ -106,12 +106,12 @@ func Test_Sync(t *testing.T) { panic(err) } - testdataFolder := "../../../../../internal/testutil/testdata/mysql/humanresources" - err = mysql.Source.RunSqlFiles(ctx, &testdataFolder, []string{"create-tables.sql"}) + testdataFolder := "../../../../../internal/testutil/testdata/mysql" + err = mysql.Source.RunSqlFiles(ctx, &testdataFolder, []string{"humanresources/create-tables.sql", "alltypes/create-tables.sql"}) if err != nil { panic(err) } - err = mysql.Target.RunSqlFiles(ctx, &testdataFolder, []string{"create-schema.sql"}) + err = mysql.Target.RunSqlFiles(ctx, &testdataFolder, []string{"humanresources/create-schema.sql", "alltypes/create-schema.sql"}) if err != nil { panic(err) } @@ -152,6 +152,11 @@ func Test_Sync(t *testing.T) { err = rows.Scan(&rowCount) require.NoError(t, err) require.Greater(t, rowCount, 1) + + rows = mysql.Target.DB.QueryRowContext(ctx, "select count(*) from alltypes.all_data_types;") + err = rows.Scan(&rowCount) + require.NoError(t, err) + require.Greater(t, rowCount, 1) }) t.Cleanup(func() { diff --git a/internal/benthos/benthos-builder/benthos-builder.go b/internal/benthos/benthos-builder/benthos-builder.go index c3b30a6a24..660c1e36c1 100644 --- a/internal/benthos/benthos-builder/benthos-builder.go +++ b/internal/benthos/benthos-builder/benthos-builder.go @@ -3,6 +3,7 @@ package benthosbuilder import ( "fmt" "log/slog" + "sync" mgmtv1alpha1 "github.com/nucleuscloud/neosync/backend/gen/go/protos/mgmt/v1alpha1" "github.com/nucleuscloud/neosync/backend/gen/go/protos/mgmt/v1alpha1/mgmtv1alpha1connect" @@ -45,6 +46,7 @@ func (b *BuilderKey) String() string { // Manages and provides access to different Benthos builders based on connection and job types type BuilderProvider struct { + mu sync.RWMutex builders map[string]bb_internal.BenthosBuilder logger *slog.Logger } @@ -61,6 +63,10 @@ func NewBuilderProvider(logger *slog.Logger) *BuilderProvider { // Handles registering new builders func (r *BuilderProvider) Register(jobType bb_internal.JobType, connType bb_internal.ConnectionType, builder bb_internal.BenthosBuilder) { key := BuilderKey{ConnType: connType, JobType: jobType} + + r.mu.Lock() + defer r.mu.Unlock() + _, exists := r.builders[key.String()] if !exists { r.logger.Debug(fmt.Sprintf("registering benthos builder for job type %s and connection type %s", jobType, connType)) @@ -76,9 +82,12 @@ func (r *BuilderProvider) GetBuilder( connectionType := bb_internal.GetConnectionType(connection) jobType := bb_internal.GetJobType(job) key := BuilderKey{ConnType: connectionType, JobType: jobType} + + r.mu.RLock() builder, exists := r.builders[key.String()] + r.mu.RUnlock() if !exists { - return nil, fmt.Errorf("unsupported connection type: %s", connectionType) + return nil, fmt.Errorf("builder not registered for connection type (%s) and job type (%s)", connectionType, jobType) } return builder, nil } diff --git a/internal/benthos/benthos-builder/builders/aws-s3.go b/internal/benthos/benthos-builder/builders/aws-s3.go index 45d86bf599..8c0451c292 100644 --- a/internal/benthos/benthos-builder/builders/aws-s3.go +++ b/internal/benthos/benthos-builder/builders/aws-s3.go @@ -75,6 +75,17 @@ func (b *awsS3SyncBuilder) BuildDestinationConfig(ctx context.Context, params *b storageClass = convertToS3StorageClass(destinationOpts.GetStorageClass()).String() } + processors := []*neosync_benthos.BatchProcessor{} + if isPooledSqlRawConfigured(benthosConfig.Config) { + processors = append(processors, &neosync_benthos.BatchProcessor{SqlToJson: &neosync_benthos.SqlToJsonConfig{}}) + } + + standardProcessors := []*neosync_benthos.BatchProcessor{ + {Archive: &neosync_benthos.ArchiveProcessor{Format: "lines"}}, + {Compress: &neosync_benthos.CompressProcessor{Algorithm: "gzip"}}, + } + processors = append(processors, standardProcessors...) + config.Outputs = append(config.Outputs, neosync_benthos.Outputs{ Fallback: []neosync_benthos.Outputs{ { @@ -86,12 +97,9 @@ func (b *awsS3SyncBuilder) BuildDestinationConfig(ctx context.Context, params *b Path: strings.Join(s3pathpieces, "/"), ContentType: "application/gzip", Batching: &neosync_benthos.Batching{ - Count: batchingConfig.BatchCount, - Period: batchingConfig.BatchPeriod, - Processors: []*neosync_benthos.BatchProcessor{ - {Archive: &neosync_benthos.ArchiveProcessor{Format: "lines"}}, - {Compress: &neosync_benthos.CompressProcessor{Algorithm: "gzip"}}, - }, + Count: batchingConfig.BatchCount, + Period: batchingConfig.BatchPeriod, + Processors: processors, }, Credentials: buildBenthosS3Credentials(connAwsS3Config.Credentials), Region: connAwsS3Config.GetRegion(), @@ -112,6 +120,12 @@ func (b *awsS3SyncBuilder) BuildDestinationConfig(ctx context.Context, params *b return config, nil } +func isPooledSqlRawConfigured(cfg *neosync_benthos.BenthosConfig) bool { + return cfg != nil && + cfg.StreamConfig.Input != nil && + cfg.StreamConfig.Input.Inputs.PooledSqlRaw != nil +} + type S3StorageClass int const ( diff --git a/internal/benthos/benthos-builder/builders/mongodb.go b/internal/benthos/benthos-builder/builders/mongodb.go index 5cecfaaa86..dc563f4573 100644 --- a/internal/benthos/benthos-builder/builders/mongodb.go +++ b/internal/benthos/benthos-builder/builders/mongodb.go @@ -121,13 +121,13 @@ func (b *mongodbSyncBuilder) BuildDestinationConfig(ctx context.Context, params Operation: "update-one", Upsert: true, DocumentMap: ` - root = { - "$set": this - } - `, + root = { + "$set": this + } + `, FilterMap: ` - root._id = this._id - `, + root._id = this._id + `, WriteConcern: &neosync_benthos.MongoWriteConcern{ W: "1", }, diff --git a/internal/testutil/testdata/mysql/alltypes/create-schema.sql b/internal/testutil/testdata/mysql/alltypes/create-schema.sql new file mode 100644 index 0000000000..7fdccc4d07 --- /dev/null +++ b/internal/testutil/testdata/mysql/alltypes/create-schema.sql @@ -0,0 +1,2 @@ +CREATE DATABASE IF NOT EXISTS alltypes; + diff --git a/internal/testutil/testdata/mysql/alltypes/create-tables.sql b/internal/testutil/testdata/mysql/alltypes/create-tables.sql new file mode 100644 index 0000000000..ecbdfdae22 --- /dev/null +++ b/internal/testutil/testdata/mysql/alltypes/create-tables.sql @@ -0,0 +1,136 @@ +CREATE DATABASE IF NOT EXISTS alltypes; + +USE alltypes; +CREATE TABLE IF NOT EXISTS all_data_types ( + -- Auto-incrementing primary key + id INT AUTO_INCREMENT PRIMARY KEY, + + -- Numeric Types + tinyint_col TINYINT, + smallint_col SMALLINT, + mediumint_col MEDIUMINT, + int_col INT, + bigint_col BIGINT, + decimal_col DECIMAL(10, 2), + float_col FLOAT(7, 4), + double_col DOUBLE(15, 8), + bit_col BIT(8), + + -- Date and Time Types + date_col DATE, + time_col TIME, + datetime_col DATETIME, + timestamp_col TIMESTAMP, + year_col YEAR, + + -- String Types + char_col CHAR(10), + varchar_col VARCHAR(255), + binary_col BINARY(3), + varbinary_col VARBINARY(255), + tinyblob_col TINYBLOB, + tinytext_col TINYTEXT, + blob_col BLOB, + text_col TEXT, + mediumblob_col MEDIUMBLOB, + mediumtext_col MEDIUMTEXT, + longblob_col LONGBLOB, + longtext_col LONGTEXT, + enum_col ENUM('value1', 'value2', 'value3'), + set_col SET('option1', 'option2', 'option3'), + + -- Spatial Data Types BROKEN + -- geometry_col GEOMETRY, + -- point_col POINT, + -- linestring_col LINESTRING, + -- polygon_col POLYGON, + -- multipoint_col MULTIPOINT, + -- multilinestring_col MULTILINESTRING, + -- multipolygon_col MULTIPOLYGON, + -- geometrycollection_col GEOMETRYCOLLECTION, + + -- JSON Data Type + json_col JSON, + + -- Array-like representations + set_as_array SET('value1', 'value2', 'value3', 'value4', 'value5') +); + +CREATE TABLE json_data ( + id INT AUTO_INCREMENT PRIMARY KEY, + data JSON +); + +INSERT INTO all_data_types ( + tinyint_col, smallint_col, mediumint_col, int_col, bigint_col, + decimal_col, float_col, double_col, + bit_col, + date_col, + time_col, datetime_col, year_col, + char_col, varchar_col,binary_col, varbinary_col, + tinyblob_col, tinytext_col, blob_col, text_col, + mediumblob_col, mediumtext_col, longblob_col, longtext_col, + enum_col, set_col, + -- geometry_col, point_col, linestring_col, polygon_col, + -- multipoint_col, multilinestring_col, multipolygon_col, geometrycollection_col, + json_col, + set_as_array +) VALUES ( + 127, 32767, 8388607, 2147483647, 9223372036854775807, + 1234.56, 3.1415, 3.14159265359, + b'10101010', + '2023-09-12', '14:30:00', '2023-09-12 14:30:00', 2023, + 'Fixed Char', 'Variable Char', 'Bin', 'VarBinary', + 'Tiny BLOB', 'Tiny Text', 'Regular BLOB', 'Regular Text', + 'Medium BLOB', 'Medium Text', 'Long BLOB', 'Long Text', + 'value2', 'option1,option3', + -- ST_GeomFromText('POINT(1 1)'), + -- ST_PointFromText('POINT(1 1)'), + -- ST_LineFromText('LINESTRING(0 0,1 1,2 2)'), + -- ST_PolygonFromText('POLYGON((0 0,10 0,10 10,0 10,0 0),(5 5,7 5,7 7,5 7,5 5))'), + -- ST_MultiPointFromText('MULTIPOINT(1 1, 2 2)'), + -- ST_MultiLineStringFromText('MULTILINESTRING((0 0,1 1,2 2),(2 2,3 3,4 4))'), + -- ST_MultiPolygonFromText('MULTIPOLYGON(((0 0,10 0,10 10,0 10,0 0)),((5 5,7 5,7 7,5 7,5 5)))'), + -- ST_GeomCollFromText('GEOMETRYCOLLECTION(POINT(1 1),LINESTRING(0 0,1 1,2 2))'), + '{"key": "value", "array": [1, 2, 3]}', + 'value1,value3,value5' +); + +INSERT INTO all_data_types (id) VALUES (DEFAULT); + + +INSERT INTO json_data (data) VALUES ('"Hello, world!"'); +INSERT INTO json_data (data) VALUES ('42'); +INSERT INTO json_data (data) VALUES ('3.14'); +INSERT INTO json_data (data) VALUES ('true'); +INSERT INTO json_data (data) VALUES ('false'); +INSERT INTO json_data (data) VALUES ('null'); + +INSERT INTO json_data (data) VALUES ('{"name": "John", "age": 30}'); +INSERT INTO json_data (data) VALUES ('{"coords": {"x": 10, "y": 20}}'); + +INSERT INTO json_data (data) VALUES ('[1, 2, 3, 4]'); +INSERT INTO json_data (data) VALUES ('["apple", "banana", "cherry"]'); + +INSERT INTO json_data (data) VALUES ('{"items": ["book", "pen"], "count": 2, "in_stock": true}'); + +INSERT INTO json_data (data) VALUES ( + '{ + "user": { + "name": "Alice", + "age": 28, + "contacts": [ + {"type": "email", "value": "alice@example.com"}, + {"type": "phone", "value": "123-456-7890"} + ] + }, + "orders": [ + {"id": 1001, "total": 59.99}, + {"id": 1002, "total": 24.50} + ], + "preferences": { + "notifications": true, + "theme": "dark" + } + }' +); diff --git a/internal/testutil/testdata/mysql/alltypes/teardown.sql b/internal/testutil/testdata/mysql/alltypes/teardown.sql new file mode 100644 index 0000000000..0ce3909abc --- /dev/null +++ b/internal/testutil/testdata/mysql/alltypes/teardown.sql @@ -0,0 +1 @@ +DROP DATABASE IF EXISTS alltypes; diff --git a/internal/testutil/testdata/mysql/humanresources/create-tables.sql b/internal/testutil/testdata/mysql/humanresources/create-tables.sql index da82ae4c15..9664def324 100644 --- a/internal/testutil/testdata/mysql/humanresources/create-tables.sql +++ b/internal/testutil/testdata/mysql/humanresources/create-tables.sql @@ -36,31 +36,30 @@ CREATE TABLE departments ( FOREIGN KEY (location_id) REFERENCES locations (location_id) ON DELETE CASCADE ON UPDATE CASCADE ); --- DATE broken in CLI SYNC --- CREATE TABLE employees ( --- employee_id INT (11) AUTO_INCREMENT PRIMARY KEY, --- first_name VARCHAR (20) DEFAULT NULL, --- last_name VARCHAR (25) NOT NULL, --- email VARCHAR (100) NOT NULL, --- phone_number VARCHAR (20) DEFAULT NULL, --- hire_date DATE NOT NULL, --- job_id INT (11) NOT NULL, --- salary DECIMAL (8, 2) NOT NULL, --- manager_id INT (11) DEFAULT NULL, --- department_id INT (11) DEFAULT NULL, --- FOREIGN KEY (job_id) REFERENCES jobs (job_id) ON DELETE CASCADE ON UPDATE CASCADE, --- FOREIGN KEY (department_id) REFERENCES departments (department_id) ON DELETE CASCADE ON UPDATE CASCADE, --- FOREIGN KEY (manager_id) REFERENCES employees (employee_id) --- ); - --- CREATE TABLE dependents ( --- dependent_id INT (11) AUTO_INCREMENT PRIMARY KEY, --- first_name VARCHAR (50) NOT NULL, --- last_name VARCHAR (50) NOT NULL, --- relationship VARCHAR (25) NOT NULL, --- employee_id INT (11) NOT NULL, --- FOREIGN KEY (employee_id) REFERENCES employees (employee_id) ON DELETE CASCADE ON UPDATE CASCADE --- ); +CREATE TABLE employees ( + employee_id INT (11) AUTO_INCREMENT PRIMARY KEY, + first_name VARCHAR (20) DEFAULT NULL, + last_name VARCHAR (25) NOT NULL, + email VARCHAR (100) NOT NULL, + phone_number VARCHAR (20) DEFAULT NULL, + hire_date DATE NOT NULL, + job_id INT (11) NOT NULL, + salary DECIMAL (8, 2) NOT NULL, + manager_id INT (11) DEFAULT NULL, + department_id INT (11) DEFAULT NULL, + FOREIGN KEY (job_id) REFERENCES jobs (job_id) ON DELETE CASCADE ON UPDATE CASCADE, + FOREIGN KEY (department_id) REFERENCES departments (department_id) ON DELETE CASCADE ON UPDATE CASCADE, + FOREIGN KEY (manager_id) REFERENCES employees (employee_id) +); + +CREATE TABLE dependents ( + dependent_id INT (11) AUTO_INCREMENT PRIMARY KEY, + first_name VARCHAR (50) NOT NULL, + last_name VARCHAR (50) NOT NULL, + relationship VARCHAR (25) NOT NULL, + employee_id INT (11) NOT NULL, + FOREIGN KEY (employee_id) REFERENCES employees (employee_id) ON DELETE CASCADE ON UPDATE CASCADE +); /*Data for the table regions */ @@ -147,82 +146,82 @@ INSERT INTO departments(department_id,department_name,location_id) VALUES (11,'A --- /*Data for the table employees */ - --- INSERT INTO employees(employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id) VALUES (100,'Steven','King','steven.king@sqltutorial.org','515.123.4567','1987-06-17',4,24000.00,NULL,9); --- INSERT INTO employees(employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id) VALUES (101,'Neena','Kochhar','neena.kochhar@sqltutorial.org','515.123.4568','1989-09-21',5,17000.00,100,9); --- INSERT INTO employees(employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id) VALUES (102,'Lex','De Haan','lex.de haan@sqltutorial.org','515.123.4569','1993-01-13',5,17000.00,100,9); --- INSERT INTO employees(employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id) VALUES (103,'Alexander','Hunold','alexander.hunold@sqltutorial.org','590.423.4567','1990-01-03',9,9000.00,102,6); --- INSERT INTO employees(employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id) VALUES (104,'Bruce','Ernst','bruce.ernst@sqltutorial.org','590.423.4568','1991-05-21',9,6000.00,103,6); --- INSERT INTO employees(employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id) VALUES (105,'David','Austin','david.austin@sqltutorial.org','590.423.4569','1997-06-25',9,4800.00,103,6); --- INSERT INTO employees(employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id) VALUES (106,'Valli','Pataballa','valli.pataballa@sqltutorial.org','590.423.4560','1998-02-05',9,4800.00,103,6); --- INSERT INTO employees(employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id) VALUES (107,'Diana','Lorentz','diana.lorentz@sqltutorial.org','590.423.5567','1999-02-07',9,4200.00,103,6); --- INSERT INTO employees(employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id) VALUES (108,'Nancy','Greenberg','nancy.greenberg@sqltutorial.org','515.124.4569','1994-08-17',7,12000.00,101,10); --- INSERT INTO employees(employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id) VALUES (109,'Daniel','Faviet','daniel.faviet@sqltutorial.org','515.124.4169','1994-08-16',6,9000.00,108,10); --- INSERT INTO employees(employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id) VALUES (110,'John','Chen','john.chen@sqltutorial.org','515.124.4269','1997-09-28',6,8200.00,108,10); --- INSERT INTO employees(employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id) VALUES (111,'Ismael','Sciarra','ismael.sciarra@sqltutorial.org','515.124.4369','1997-09-30',6,7700.00,108,10); --- INSERT INTO employees(employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id) VALUES (112,'Jose Manuel','Urman','jose manuel.urman@sqltutorial.org','515.124.4469','1998-03-07',6,7800.00,108,10); --- INSERT INTO employees(employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id) VALUES (113,'Luis','Popp','luis.popp@sqltutorial.org','515.124.4567','1999-12-07',6,6900.00,108,10); --- INSERT INTO employees(employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id) VALUES (114,'Den','Raphaely','den.raphaely@sqltutorial.org','515.127.4561','1994-12-07',14,11000.00,100,3); --- INSERT INTO employees(employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id) VALUES (115,'Alexander','Khoo','alexander.khoo@sqltutorial.org','515.127.4562','1995-05-18',13,3100.00,114,3); --- INSERT INTO employees(employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id) VALUES (116,'Shelli','Baida','shelli.baida@sqltutorial.org','515.127.4563','1997-12-24',13,2900.00,114,3); --- INSERT INTO employees(employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id) VALUES (117,'Sigal','Tobias','sigal.tobias@sqltutorial.org','515.127.4564','1997-07-24',13,2800.00,114,3); --- INSERT INTO employees(employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id) VALUES (118,'Guy','Himuro','guy.himuro@sqltutorial.org','515.127.4565','1998-11-15',13,2600.00,114,3); --- INSERT INTO employees(employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id) VALUES (119,'Karen','Colmenares','karen.colmenares@sqltutorial.org','515.127.4566','1999-08-10',13,2500.00,114,3); --- INSERT INTO employees(employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id) VALUES (120,'Matthew','Weiss','matthew.weiss@sqltutorial.org','650.123.1234','1996-07-18',19,8000.00,100,5); --- INSERT INTO employees(employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id) VALUES (121,'Adam','Fripp','adam.fripp@sqltutorial.org','650.123.2234','1997-04-10',19,8200.00,100,5); --- INSERT INTO employees(employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id) VALUES (122,'Payam','Kaufling','payam.kaufling@sqltutorial.org','650.123.3234','1995-05-01',19,7900.00,100,5); --- INSERT INTO employees(employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id) VALUES (123,'Shanta','Vollman','shanta.vollman@sqltutorial.org','650.123.4234','1997-10-10',19,6500.00,100,5); --- INSERT INTO employees(employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id) VALUES (126,'Irene','Mikkilineni','irene.mikkilineni@sqltutorial.org','650.124.1224','1998-09-28',18,2700.00,120,5); --- INSERT INTO employees(employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id) VALUES (145,'John','Russell','john.russell@sqltutorial.org',NULL,'1996-10-01',15,14000.00,100,8); --- INSERT INTO employees(employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id) VALUES (146,'Karen','Partners','karen.partners@sqltutorial.org',NULL,'1997-01-05',15,13500.00,100,8); --- INSERT INTO employees(employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id) VALUES (176,'Jonathon','Taylor','jonathon.taylor@sqltutorial.org',NULL,'1998-03-24',16,8600.00,100,8); --- INSERT INTO employees(employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id) VALUES (177,'Jack','Livingston','jack.livingston@sqltutorial.org',NULL,'1998-04-23',16,8400.00,100,8); --- INSERT INTO employees(employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id) VALUES (178,'Kimberely','Grant','kimberely.grant@sqltutorial.org',NULL,'1999-05-24',16,7000.00,100,8); --- INSERT INTO employees(employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id) VALUES (179,'Charles','Johnson','charles.johnson@sqltutorial.org',NULL,'2000-01-04',16,6200.00,100,8); --- INSERT INTO employees(employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id) VALUES (192,'Sarah','Bell','sarah.bell@sqltutorial.org','650.501.1876','1996-02-04',17,4000.00,123,5); --- INSERT INTO employees(employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id) VALUES (193,'Britney','Everett','britney.everett@sqltutorial.org','650.501.2876','1997-03-03',17,3900.00,123,5); --- INSERT INTO employees(employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id) VALUES (200,'Jennifer','Whalen','jennifer.whalen@sqltutorial.org','515.123.4444','1987-09-17',3,4400.00,101,1); --- INSERT INTO employees(employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id) VALUES (201,'Michael','Hartstein','michael.hartstein@sqltutorial.org','515.123.5555','1996-02-17',10,13000.00,100,2); --- INSERT INTO employees(employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id) VALUES (202,'Pat','Fay','pat.fay@sqltutorial.org','603.123.6666','1997-08-17',11,6000.00,201,2); --- INSERT INTO employees(employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id) VALUES (203,'Susan','Mavris','susan.mavris@sqltutorial.org','515.123.7777','1994-06-07',8,6500.00,101,4); --- INSERT INTO employees(employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id) VALUES (204,'Hermann','Baer','hermann.baer@sqltutorial.org','515.123.8888','1994-06-07',12,10000.00,101,7); --- INSERT INTO employees(employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id) VALUES (205,'Shelley','Higgins','shelley.higgins@sqltutorial.org','515.123.8080','1994-06-07',2,12000.00,101,11); --- INSERT INTO employees(employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id) VALUES (206,'William','Gietz','william.gietz@sqltutorial.org','515.123.8181','1994-06-07',1,8300.00,205,11); - - --- /*Data for the table dependents */ - --- INSERT INTO dependents(dependent_id,first_name,last_name,relationship,employee_id) VALUES (1,'Penelope','Gietz','Child',206); --- INSERT INTO dependents(dependent_id,first_name,last_name,relationship,employee_id) VALUES (2,'Nick','Higgins','Child',205); --- INSERT INTO dependents(dependent_id,first_name,last_name,relationship,employee_id) VALUES (3,'Ed','Whalen','Child',200); --- INSERT INTO dependents(dependent_id,first_name,last_name,relationship,employee_id) VALUES (4,'Jennifer','King','Child',100); --- INSERT INTO dependents(dependent_id,first_name,last_name,relationship,employee_id) VALUES (5,'Johnny','Kochhar','Child',101); --- INSERT INTO dependents(dependent_id,first_name,last_name,relationship,employee_id) VALUES (6,'Bette','De Haan','Child',102); --- INSERT INTO dependents(dependent_id,first_name,last_name,relationship,employee_id) VALUES (7,'Grace','Faviet','Child',109); --- INSERT INTO dependents(dependent_id,first_name,last_name,relationship,employee_id) VALUES (8,'Matthew','Chen','Child',110); --- INSERT INTO dependents(dependent_id,first_name,last_name,relationship,employee_id) VALUES (9,'Joe','Sciarra','Child',111); --- INSERT INTO dependents(dependent_id,first_name,last_name,relationship,employee_id) VALUES (10,'Christian','Urman','Child',112); --- INSERT INTO dependents(dependent_id,first_name,last_name,relationship,employee_id) VALUES (11,'Zero','Popp','Child',113); --- INSERT INTO dependents(dependent_id,first_name,last_name,relationship,employee_id) VALUES (12,'Karl','Greenberg','Child',108); --- INSERT INTO dependents(dependent_id,first_name,last_name,relationship,employee_id) VALUES (13,'Uma','Mavris','Child',203); --- INSERT INTO dependents(dependent_id,first_name,last_name,relationship,employee_id) VALUES (14,'Vivien','Hunold','Child',103); --- INSERT INTO dependents(dependent_id,first_name,last_name,relationship,employee_id) VALUES (15,'Cuba','Ernst','Child',104); --- INSERT INTO dependents(dependent_id,first_name,last_name,relationship,employee_id) VALUES (16,'Fred','Austin','Child',105); --- INSERT INTO dependents(dependent_id,first_name,last_name,relationship,employee_id) VALUES (17,'Helen','Pataballa','Child',106); --- INSERT INTO dependents(dependent_id,first_name,last_name,relationship,employee_id) VALUES (18,'Dan','Lorentz','Child',107); --- INSERT INTO dependents(dependent_id,first_name,last_name,relationship,employee_id) VALUES (19,'Bob','Hartstein','Child',201); --- INSERT INTO dependents(dependent_id,first_name,last_name,relationship,employee_id) VALUES (20,'Lucille','Fay','Child',202); --- INSERT INTO dependents(dependent_id,first_name,last_name,relationship,employee_id) VALUES (21,'Kirsten','Baer','Child',204); --- INSERT INTO dependents(dependent_id,first_name,last_name,relationship,employee_id) VALUES (22,'Elvis','Khoo','Child',115); --- INSERT INTO dependents(dependent_id,first_name,last_name,relationship,employee_id) VALUES (23,'Sandra','Baida','Child',116); --- INSERT INTO dependents(dependent_id,first_name,last_name,relationship,employee_id) VALUES (24,'Cameron','Tobias','Child',117); --- INSERT INTO dependents(dependent_id,first_name,last_name,relationship,employee_id) VALUES (25,'Kevin','Himuro','Child',118); --- INSERT INTO dependents(dependent_id,first_name,last_name,relationship,employee_id) VALUES (26,'Rip','Colmenares','Child',119); --- INSERT INTO dependents(dependent_id,first_name,last_name,relationship,employee_id) VALUES (27,'Julia','Raphaely','Child',114); --- INSERT INTO dependents(dependent_id,first_name,last_name,relationship,employee_id) VALUES (28,'Woody','Russell','Child',145); --- INSERT INTO dependents(dependent_id,first_name,last_name,relationship,employee_id) VALUES (29,'Alec','Partners','Child',146); --- INSERT INTO dependents(dependent_id,first_name,last_name,relationship,employee_id) VALUES (30,'Sandra','Taylor','Child',176); +/*Data for the table employees */ + +INSERT INTO employees(employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id) VALUES (100,'Steven','King','steven.king@sqltutorial.org','515.123.4567','1987-06-17',4,24000.00,NULL,9); +INSERT INTO employees(employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id) VALUES (101,'Neena','Kochhar','neena.kochhar@sqltutorial.org','515.123.4568','1989-09-21',5,17000.00,100,9); +INSERT INTO employees(employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id) VALUES (102,'Lex','De Haan','lex.de haan@sqltutorial.org','515.123.4569','1993-01-13',5,17000.00,100,9); +INSERT INTO employees(employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id) VALUES (103,'Alexander','Hunold','alexander.hunold@sqltutorial.org','590.423.4567','1990-01-03',9,9000.00,102,6); +INSERT INTO employees(employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id) VALUES (104,'Bruce','Ernst','bruce.ernst@sqltutorial.org','590.423.4568','1991-05-21',9,6000.00,103,6); +INSERT INTO employees(employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id) VALUES (105,'David','Austin','david.austin@sqltutorial.org','590.423.4569','1997-06-25',9,4800.00,103,6); +INSERT INTO employees(employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id) VALUES (106,'Valli','Pataballa','valli.pataballa@sqltutorial.org','590.423.4560','1998-02-05',9,4800.00,103,6); +INSERT INTO employees(employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id) VALUES (107,'Diana','Lorentz','diana.lorentz@sqltutorial.org','590.423.5567','1999-02-07',9,4200.00,103,6); +INSERT INTO employees(employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id) VALUES (108,'Nancy','Greenberg','nancy.greenberg@sqltutorial.org','515.124.4569','1994-08-17',7,12000.00,101,10); +INSERT INTO employees(employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id) VALUES (109,'Daniel','Faviet','daniel.faviet@sqltutorial.org','515.124.4169','1994-08-16',6,9000.00,108,10); +INSERT INTO employees(employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id) VALUES (110,'John','Chen','john.chen@sqltutorial.org','515.124.4269','1997-09-28',6,8200.00,108,10); +INSERT INTO employees(employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id) VALUES (111,'Ismael','Sciarra','ismael.sciarra@sqltutorial.org','515.124.4369','1997-09-30',6,7700.00,108,10); +INSERT INTO employees(employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id) VALUES (112,'Jose Manuel','Urman','jose manuel.urman@sqltutorial.org','515.124.4469','1998-03-07',6,7800.00,108,10); +INSERT INTO employees(employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id) VALUES (113,'Luis','Popp','luis.popp@sqltutorial.org','515.124.4567','1999-12-07',6,6900.00,108,10); +INSERT INTO employees(employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id) VALUES (114,'Den','Raphaely','den.raphaely@sqltutorial.org','515.127.4561','1994-12-07',14,11000.00,100,3); +INSERT INTO employees(employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id) VALUES (115,'Alexander','Khoo','alexander.khoo@sqltutorial.org','515.127.4562','1995-05-18',13,3100.00,114,3); +INSERT INTO employees(employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id) VALUES (116,'Shelli','Baida','shelli.baida@sqltutorial.org','515.127.4563','1997-12-24',13,2900.00,114,3); +INSERT INTO employees(employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id) VALUES (117,'Sigal','Tobias','sigal.tobias@sqltutorial.org','515.127.4564','1997-07-24',13,2800.00,114,3); +INSERT INTO employees(employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id) VALUES (118,'Guy','Himuro','guy.himuro@sqltutorial.org','515.127.4565','1998-11-15',13,2600.00,114,3); +INSERT INTO employees(employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id) VALUES (119,'Karen','Colmenares','karen.colmenares@sqltutorial.org','515.127.4566','1999-08-10',13,2500.00,114,3); +INSERT INTO employees(employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id) VALUES (120,'Matthew','Weiss','matthew.weiss@sqltutorial.org','650.123.1234','1996-07-18',19,8000.00,100,5); +INSERT INTO employees(employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id) VALUES (121,'Adam','Fripp','adam.fripp@sqltutorial.org','650.123.2234','1997-04-10',19,8200.00,100,5); +INSERT INTO employees(employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id) VALUES (122,'Payam','Kaufling','payam.kaufling@sqltutorial.org','650.123.3234','1995-05-01',19,7900.00,100,5); +INSERT INTO employees(employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id) VALUES (123,'Shanta','Vollman','shanta.vollman@sqltutorial.org','650.123.4234','1997-10-10',19,6500.00,100,5); +INSERT INTO employees(employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id) VALUES (126,'Irene','Mikkilineni','irene.mikkilineni@sqltutorial.org','650.124.1224','1998-09-28',18,2700.00,120,5); +INSERT INTO employees(employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id) VALUES (145,'John','Russell','john.russell@sqltutorial.org',NULL,'1996-10-01',15,14000.00,100,8); +INSERT INTO employees(employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id) VALUES (146,'Karen','Partners','karen.partners@sqltutorial.org',NULL,'1997-01-05',15,13500.00,100,8); +INSERT INTO employees(employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id) VALUES (176,'Jonathon','Taylor','jonathon.taylor@sqltutorial.org',NULL,'1998-03-24',16,8600.00,100,8); +INSERT INTO employees(employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id) VALUES (177,'Jack','Livingston','jack.livingston@sqltutorial.org',NULL,'1998-04-23',16,8400.00,100,8); +INSERT INTO employees(employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id) VALUES (178,'Kimberely','Grant','kimberely.grant@sqltutorial.org',NULL,'1999-05-24',16,7000.00,100,8); +INSERT INTO employees(employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id) VALUES (179,'Charles','Johnson','charles.johnson@sqltutorial.org',NULL,'2000-01-04',16,6200.00,100,8); +INSERT INTO employees(employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id) VALUES (192,'Sarah','Bell','sarah.bell@sqltutorial.org','650.501.1876','1996-02-04',17,4000.00,123,5); +INSERT INTO employees(employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id) VALUES (193,'Britney','Everett','britney.everett@sqltutorial.org','650.501.2876','1997-03-03',17,3900.00,123,5); +INSERT INTO employees(employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id) VALUES (200,'Jennifer','Whalen','jennifer.whalen@sqltutorial.org','515.123.4444','1987-09-17',3,4400.00,101,1); +INSERT INTO employees(employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id) VALUES (201,'Michael','Hartstein','michael.hartstein@sqltutorial.org','515.123.5555','1996-02-17',10,13000.00,100,2); +INSERT INTO employees(employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id) VALUES (202,'Pat','Fay','pat.fay@sqltutorial.org','603.123.6666','1997-08-17',11,6000.00,201,2); +INSERT INTO employees(employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id) VALUES (203,'Susan','Mavris','susan.mavris@sqltutorial.org','515.123.7777','1994-06-07',8,6500.00,101,4); +INSERT INTO employees(employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id) VALUES (204,'Hermann','Baer','hermann.baer@sqltutorial.org','515.123.8888','1994-06-07',12,10000.00,101,7); +INSERT INTO employees(employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id) VALUES (205,'Shelley','Higgins','shelley.higgins@sqltutorial.org','515.123.8080','1994-06-07',2,12000.00,101,11); +INSERT INTO employees(employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id) VALUES (206,'William','Gietz','william.gietz@sqltutorial.org','515.123.8181','1994-06-07',1,8300.00,205,11); + + +/*Data for the table dependents */ + +INSERT INTO dependents(dependent_id,first_name,last_name,relationship,employee_id) VALUES (1,'Penelope','Gietz','Child',206); +INSERT INTO dependents(dependent_id,first_name,last_name,relationship,employee_id) VALUES (2,'Nick','Higgins','Child',205); +INSERT INTO dependents(dependent_id,first_name,last_name,relationship,employee_id) VALUES (3,'Ed','Whalen','Child',200); +INSERT INTO dependents(dependent_id,first_name,last_name,relationship,employee_id) VALUES (4,'Jennifer','King','Child',100); +INSERT INTO dependents(dependent_id,first_name,last_name,relationship,employee_id) VALUES (5,'Johnny','Kochhar','Child',101); +INSERT INTO dependents(dependent_id,first_name,last_name,relationship,employee_id) VALUES (6,'Bette','De Haan','Child',102); +INSERT INTO dependents(dependent_id,first_name,last_name,relationship,employee_id) VALUES (7,'Grace','Faviet','Child',109); +INSERT INTO dependents(dependent_id,first_name,last_name,relationship,employee_id) VALUES (8,'Matthew','Chen','Child',110); +INSERT INTO dependents(dependent_id,first_name,last_name,relationship,employee_id) VALUES (9,'Joe','Sciarra','Child',111); +INSERT INTO dependents(dependent_id,first_name,last_name,relationship,employee_id) VALUES (10,'Christian','Urman','Child',112); +INSERT INTO dependents(dependent_id,first_name,last_name,relationship,employee_id) VALUES (11,'Zero','Popp','Child',113); +INSERT INTO dependents(dependent_id,first_name,last_name,relationship,employee_id) VALUES (12,'Karl','Greenberg','Child',108); +INSERT INTO dependents(dependent_id,first_name,last_name,relationship,employee_id) VALUES (13,'Uma','Mavris','Child',203); +INSERT INTO dependents(dependent_id,first_name,last_name,relationship,employee_id) VALUES (14,'Vivien','Hunold','Child',103); +INSERT INTO dependents(dependent_id,first_name,last_name,relationship,employee_id) VALUES (15,'Cuba','Ernst','Child',104); +INSERT INTO dependents(dependent_id,first_name,last_name,relationship,employee_id) VALUES (16,'Fred','Austin','Child',105); +INSERT INTO dependents(dependent_id,first_name,last_name,relationship,employee_id) VALUES (17,'Helen','Pataballa','Child',106); +INSERT INTO dependents(dependent_id,first_name,last_name,relationship,employee_id) VALUES (18,'Dan','Lorentz','Child',107); +INSERT INTO dependents(dependent_id,first_name,last_name,relationship,employee_id) VALUES (19,'Bob','Hartstein','Child',201); +INSERT INTO dependents(dependent_id,first_name,last_name,relationship,employee_id) VALUES (20,'Lucille','Fay','Child',202); +INSERT INTO dependents(dependent_id,first_name,last_name,relationship,employee_id) VALUES (21,'Kirsten','Baer','Child',204); +INSERT INTO dependents(dependent_id,first_name,last_name,relationship,employee_id) VALUES (22,'Elvis','Khoo','Child',115); +INSERT INTO dependents(dependent_id,first_name,last_name,relationship,employee_id) VALUES (23,'Sandra','Baida','Child',116); +INSERT INTO dependents(dependent_id,first_name,last_name,relationship,employee_id) VALUES (24,'Cameron','Tobias','Child',117); +INSERT INTO dependents(dependent_id,first_name,last_name,relationship,employee_id) VALUES (25,'Kevin','Himuro','Child',118); +INSERT INTO dependents(dependent_id,first_name,last_name,relationship,employee_id) VALUES (26,'Rip','Colmenares','Child',119); +INSERT INTO dependents(dependent_id,first_name,last_name,relationship,employee_id) VALUES (27,'Julia','Raphaely','Child',114); +INSERT INTO dependents(dependent_id,first_name,last_name,relationship,employee_id) VALUES (28,'Woody','Russell','Child',145); +INSERT INTO dependents(dependent_id,first_name,last_name,relationship,employee_id) VALUES (29,'Alec','Partners','Child',146); +INSERT INTO dependents(dependent_id,first_name,last_name,relationship,employee_id) VALUES (30,'Sandra','Taylor','Child',176); -- table with generated columns diff --git a/internal/testutil/testdata/postgres/alltypes/create-tables.sql b/internal/testutil/testdata/postgres/alltypes/create-tables.sql index 9403da5d35..864bd7adbf 100644 --- a/internal/testutil/testdata/postgres/alltypes/create-tables.sql +++ b/internal/testutil/testdata/postgres/alltypes/create-tables.sql @@ -13,7 +13,7 @@ CREATE TABLE IF NOT EXISTS alltypes.all_postgres_types ( bigserial_col BIGSERIAL, -- Monetary Types - money_col MONEY, + -- money_col MONEY, -- Character Types char_col CHAR(10), @@ -27,7 +27,7 @@ CREATE TABLE IF NOT EXISTS alltypes.all_postgres_types ( timestamp_col TIMESTAMP, timestamptz_col TIMESTAMPTZ, date_col DATE, - time_col TIME, + -- time_col TIME, timetz_col TIMETZ, interval_col INTERVAL, @@ -93,7 +93,7 @@ INSERT INTO alltypes.all_postgres_types ( double_precision_col, serial_col, bigserial_col, - money_col, + -- money_col, char_col, varchar_col, text_col, @@ -101,7 +101,7 @@ INSERT INTO alltypes.all_postgres_types ( timestamp_col, timestamptz_col, date_col, - time_col, + -- time_col, timetz_col, interval_col, boolean_col, @@ -142,7 +142,7 @@ INSERT INTO alltypes.all_postgres_types ( 123456789.123456789, -- double_precision_col 1, -- serial_col (auto-incremented, will be generated) 1, -- bigserial_col (auto-incremented, will be generated) - '$100.00', -- money_col + -- '$100.00', -- money_col 'A', -- char_col 'DEFAULT', -- varchar_col 'default', -- text_col @@ -150,7 +150,7 @@ INSERT INTO alltypes.all_postgres_types ( '2024-01-01 12:34:56', -- timestamp_col '2024-01-01 12:34:56+00', -- timestamptz_col '2024-01-01', -- date_col - '12:34:56', -- time_col + -- '12:34:56', -- time_col '12:34:56+00', -- timetz_col '1 day', -- interval_col TRUE, -- boolean_col diff --git a/worker/pkg/benthos/config.go b/worker/pkg/benthos/config.go index 6b94f5aba9..24e031f639 100644 --- a/worker/pkg/benthos/config.go +++ b/worker/pkg/benthos/config.go @@ -473,10 +473,13 @@ type Batching struct { } type BatchProcessor struct { - Archive *ArchiveProcessor `json:"archive,omitempty" yaml:"archive,omitempty"` - Compress *CompressProcessor `json:"compress,omitempty" yaml:"compress,omitempty"` + Archive *ArchiveProcessor `json:"archive,omitempty" yaml:"archive,omitempty"` + Compress *CompressProcessor `json:"compress,omitempty" yaml:"compress,omitempty"` + SqlToJson *SqlToJsonConfig `json:"sql_to_json,omitempty" yaml:"sql_to_json,omitempty"` } +type SqlToJsonConfig struct{} + type ArchiveProcessor struct { Format string `json:"format" yaml:"format"` Path *string `json:"path,omitempty" yaml:"path,omitempty"` diff --git a/worker/pkg/benthos/environment/environment.go b/worker/pkg/benthos/environment/environment.go index a9547af794..f0a030b7e9 100644 --- a/worker/pkg/benthos/environment/environment.go +++ b/worker/pkg/benthos/environment/environment.go @@ -9,6 +9,7 @@ import ( neosync_benthos_defaulttransform "github.com/nucleuscloud/neosync/worker/pkg/benthos/default_transform" neosync_benthos_dynamodb "github.com/nucleuscloud/neosync/worker/pkg/benthos/dynamodb" neosync_benthos_error "github.com/nucleuscloud/neosync/worker/pkg/benthos/error" + neosync_benthos_json "github.com/nucleuscloud/neosync/worker/pkg/benthos/json" benthos_metrics "github.com/nucleuscloud/neosync/worker/pkg/benthos/metrics" neosync_benthos_mongodb "github.com/nucleuscloud/neosync/worker/pkg/benthos/mongodb" neosync_benthos_connectiondata "github.com/nucleuscloud/neosync/worker/pkg/benthos/neosync_connection_data" @@ -168,6 +169,11 @@ func NewWithEnvironment(env *service.Environment, logger *slog.Logger, opts ...O return nil, fmt.Errorf("unable to register default mapping processor to benthos instance: %w", err) } + err = neosync_benthos_json.RegisterSqlToJsonProcessor(env) + if err != nil { + return nil, fmt.Errorf("unable to register SQL to JSON processor to benthos instance: %w", err) + } + if config.blobEnv != nil { env.UseBloblangEnvironment(config.blobEnv) } diff --git a/worker/pkg/benthos/json/sql_processor.go b/worker/pkg/benthos/json/sql_processor.go new file mode 100644 index 0000000000..53b8385ed5 --- /dev/null +++ b/worker/pkg/benthos/json/sql_processor.go @@ -0,0 +1,77 @@ +package neosync_benthos_json + +import ( + "context" + "time" + + "github.com/warpstreamlabs/bento/public/service" +) + +func sqlToJsonProcessorConfig() *service.ConfigSpec { + return service.NewConfigSpec() +} + +func RegisterSqlToJsonProcessor(env *service.Environment) error { + return env.RegisterBatchProcessor( + "sql_to_json", + sqlToJsonProcessorConfig(), + func(conf *service.ParsedConfig, mgr *service.Resources) (service.BatchProcessor, error) { + proc := newMysqlToJsonProcessor(conf, mgr) + return proc, nil + }) +} + +type sqlToJsonProcessor struct { + logger *service.Logger +} + +func newMysqlToJsonProcessor(_ *service.ParsedConfig, mgr *service.Resources) *sqlToJsonProcessor { + return &sqlToJsonProcessor{ + logger: mgr.Logger(), + } +} + +func (m *sqlToJsonProcessor) ProcessBatch(ctx context.Context, batch service.MessageBatch) ([]service.MessageBatch, error) { + newBatch := make(service.MessageBatch, 0, len(batch)) + for _, msg := range batch { + root, err := msg.AsStructuredMut() + if err != nil { + return nil, err + } + newRoot := transform(root) + newMsg := msg.Copy() + newMsg.SetStructured(newRoot) + newBatch = append(newBatch, newMsg) + } + + if len(newBatch) == 0 { + return nil, nil + } + return []service.MessageBatch{newBatch}, nil +} + +func (m *sqlToJsonProcessor) Close(context.Context) error { + return nil +} + +func transform(root any) any { + switch v := root.(type) { + case map[string]any: + newMap := make(map[string]any) + for k, v2 := range v { + newValue := transform(v2) + newMap[k] = newValue + } + return newMap + case []any: + newSlice := make([]any, len(v)) + for i, v2 := range v { + newSlice[i] = transform(v2) + } + return newSlice + case time.Time: + return v.Format(time.DateTime) + default: + return v + } +}