diff --git a/cli/internal/cmds/neosync/sync/sync_integration_test.go b/cli/internal/cmds/neosync/sync/sync_integration_test.go index 1c37511c68..bad167d6a8 100644 --- a/cli/internal/cmds/neosync/sync/sync_integration_test.go +++ b/cli/internal/cmds/neosync/sync/sync_integration_test.go @@ -31,7 +31,6 @@ func Test_Sync(t *testing.T) { conndataclient := neosyncApi.UnauthdClients.ConnectionData sqlmanagerclient := tcneosyncapi.NewTestSqlManagerClient() - discardLogger := testutil.GetTestCharmSlogger() accountId := tcneosyncapi.CreatePersonalAccount(ctx, t, neosyncApi.UnauthdClients.Users) outputType := output.PlainOutput @@ -54,6 +53,7 @@ func Test_Sync(t *testing.T) { sourceConn := tcneosyncapi.CreatePostgresConnection(ctx, t, neosyncApi.UnauthdClients.Connections, accountId, "postgres-source", postgres.Source.URL) t.Run("sync", func(t *testing.T) { + discardLogger := testutil.GetTestLogger(t) cmdConfig := &cmdConfig{ Source: &sourceConfig{ ConnectionId: sourceConn.Id, @@ -123,6 +123,7 @@ func Test_Sync(t *testing.T) { sourceConn := tcneosyncapi.CreateMysqlConnection(ctx, t, neosyncApi.UnauthdClients.Connections, accountId, "mysql-source", mysql.Source.URL) t.Run("sync", func(t *testing.T) { + discardLogger := testutil.GetTestLogger(t) cmdConfig := &cmdConfig{ Source: &sourceConfig{ ConnectionId: sourceConn.Id, diff --git a/internal/benthos/benthos-builder/builders/sql.go b/internal/benthos/benthos-builder/builders/sql.go index c149af63d1..c0b23d3aae 100644 --- a/internal/benthos/benthos-builder/builders/sql.go +++ b/internal/benthos/benthos-builder/builders/sql.go @@ -374,16 +374,23 @@ func (b *sqlSyncBuilder) BuildDestinationConfig(ctx context.Context, params *bb_ } } - columnTypes := []string{} + columnTypes := []string{} // use map going forward + columnDataTypes := map[string]string{} for _, c := range benthosConfig.Columns { colType, ok := colInfoMap[c] if ok { + columnDataTypes[c] = colType.DataType columnTypes = append(columnTypes, colType.DataType) } else { columnTypes = append(columnTypes, "") } } + batchProcessors := []*neosync_benthos.BatchProcessor{} + if benthosConfig.Config.Input.Inputs.NeosyncConnectionData != nil { + batchProcessors = append(batchProcessors, &neosync_benthos.BatchProcessor{JsonToSql: &neosync_benthos.JsonToSqlConfig{ColumnDataTypes: columnDataTypes}}) + } + prefix, suffix := getInsertPrefixAndSuffix(b.driver, benthosConfig.TableSchema, benthosConfig.TableName, columnDefaultProperties) config.Outputs = append(config.Outputs, neosync_benthos.Outputs{ Fallback: []neosync_benthos.Outputs{ @@ -406,8 +413,9 @@ func (b *sqlSyncBuilder) BuildDestinationConfig(ctx context.Context, params *bb_ Suffix: suffix, Batching: &neosync_benthos.Batching{ - Period: destOpts.BatchPeriod, - Count: destOpts.BatchCount, + Period: destOpts.BatchPeriod, + Count: destOpts.BatchCount, + Processors: batchProcessors, }, }, }, diff --git a/internal/testutil/utils.go b/internal/testutil/utils.go index e10fb0b615..3225004a23 100644 --- a/internal/testutil/utils.go +++ b/internal/testutil/utils.go @@ -4,9 +4,8 @@ import ( "fmt" "log/slog" "os" + "strings" "testing" - - charmlog "github.com/charmbracelet/log" ) func ShouldRunIntegrationTest() bool { @@ -31,13 +30,8 @@ type testWriter struct { } func (tw testWriter) Write(p []byte) (n int, err error) { - tw.t.Log(string(p)) + // removes extra line between log statements + msg := strings.TrimSuffix(string(p), "\n") + tw.t.Log(msg) return len(p), nil } - -func GetTestCharmSlogger() *slog.Logger { - charmlogger := charmlog.NewWithOptions(os.Stdout, charmlog.Options{ - Level: charmlog.DebugLevel, - }) - return slog.New(charmlogger) -} diff --git a/worker/pkg/benthos/config.go b/worker/pkg/benthos/config.go index bfb15c7834..7a4af2f90e 100644 --- a/worker/pkg/benthos/config.go +++ b/worker/pkg/benthos/config.go @@ -477,6 +477,11 @@ type BatchProcessor struct { Archive *ArchiveProcessor `json:"archive,omitempty" yaml:"archive,omitempty"` Compress *CompressProcessor `json:"compress,omitempty" yaml:"compress,omitempty"` SqlToJson *SqlToJsonConfig `json:"sql_to_json,omitempty" yaml:"sql_to_json,omitempty"` + JsonToSql *JsonToSqlConfig `json:"json_to_sql,omitempty" yaml:"json_to_sql,omitempty"` +} + +type JsonToSqlConfig struct { + ColumnDataTypes map[string]string `json:"column_data_types" yaml:"column_data_types"` } type SqlToJsonConfig struct{} diff --git a/worker/pkg/benthos/environment/environment.go b/worker/pkg/benthos/environment/environment.go index f0a030b7e9..cff8529f41 100644 --- a/worker/pkg/benthos/environment/environment.go +++ b/worker/pkg/benthos/environment/environment.go @@ -174,6 +174,11 @@ func NewWithEnvironment(env *service.Environment, logger *slog.Logger, opts ...O return nil, fmt.Errorf("unable to register SQL to JSON processor to benthos instance: %w", err) } + err = neosync_benthos_sql.RegisterJsonToSqlProcessor(env) + if err != nil { + return nil, fmt.Errorf("unable to regigster JSON to SQL processor to benthos instance: %w", err) + } + if config.blobEnv != nil { env.UseBloblangEnvironment(config.blobEnv) } diff --git a/worker/pkg/benthos/sql/json_processor.go b/worker/pkg/benthos/sql/json_processor.go new file mode 100644 index 0000000000..b5cbeef7fc --- /dev/null +++ b/worker/pkg/benthos/sql/json_processor.go @@ -0,0 +1,134 @@ +package neosync_benthos_sql + +import ( + "context" + "encoding/binary" + "encoding/json" + "strconv" + "strings" + + "github.com/warpstreamlabs/bento/public/service" +) + +func jsonToSqlProcessorConfig() *service.ConfigSpec { + return service.NewConfigSpec().Field(service.NewStringMapField("column_data_types")) +} + +func RegisterJsonToSqlProcessor(env *service.Environment) error { + return env.RegisterBatchProcessor( + "json_to_sql", + jsonToSqlProcessorConfig(), + func(conf *service.ParsedConfig, mgr *service.Resources) (service.BatchProcessor, error) { + proc, err := newJsonToSqlProcessor(conf, mgr) + if err != nil { + return nil, err + } + return proc, nil + }) +} + +type jsonToSqlProcessor struct { + logger *service.Logger + columnDataTypes map[string]string // column name to datatype +} + +func newJsonToSqlProcessor(conf *service.ParsedConfig, mgr *service.Resources) (*jsonToSqlProcessor, error) { + columnDataTypes, err := conf.FieldStringMap("column_data_types") + if err != nil { + return nil, err + } + return &jsonToSqlProcessor{ + logger: mgr.Logger(), + columnDataTypes: columnDataTypes, + }, nil +} + +func (p *jsonToSqlProcessor) ProcessBatch(ctx context.Context, batch service.MessageBatch) ([]service.MessageBatch, error) { + newBatch := make(service.MessageBatch, 0, len(batch)) + for _, msg := range batch { + root, err := msg.AsStructuredMut() + if err != nil { + return nil, err + } + newRoot := p.transform("", root) + newMsg := msg.Copy() + newMsg.SetStructured(newRoot) + newBatch = append(newBatch, newMsg) + } + + if len(newBatch) == 0 { + return nil, nil + } + return []service.MessageBatch{newBatch}, nil +} + +func (m *jsonToSqlProcessor) Close(context.Context) error { + return nil +} + +// [bigint binary blob char date datetime decimal double enum float int int json longblob longtext mediumblob mediumint mediumtext set set smallint text time timestamp tinyblob tinyint tinytext varbinary varchar year] +func (p *jsonToSqlProcessor) transform(path string, root any) any { + switch v := root.(type) { + case map[string]any: + newMap := make(map[string]any) + for k, v2 := range v { + newValue := p.transform(k, v2) + newMap[k] = newValue + } + return newMap + case nil: + return v + case []byte: + datatype, ok := p.columnDataTypes[path] + if !ok { + return v + } + if strings.EqualFold(datatype, "bit") { + bit, err := convertStringToBit(string(v)) + if err != nil { + p.logger.Errorf("unable to convert bit string to SQL bit []byte: %w", err) + return v + } + return bit + } else if strings.EqualFold(datatype, "json") { + validJson, err := getValidJson(v) + if err != nil { + p.logger.Errorf("unable to get valid json: %w", err) + return v + } + return validJson + } + return v + default: + return v + } +} + +// handles case where json strings are not quoted +func getValidJson(jsonData []byte) ([]byte, error) { + isValidJson := json.Valid(jsonData) + if isValidJson { + return jsonData, nil + } + + quotedData, err := json.Marshal(string(jsonData)) + if err != nil { + return nil, err + } + return quotedData, nil +} + +func convertStringToBit(bitString string) ([]byte, error) { + val, err := strconv.ParseUint(bitString, 2, len(bitString)) + if err != nil { + return nil, err + } + + // Always allocate 8 bytes for PutUint64 + bytes := make([]byte, 8) + binary.BigEndian.PutUint64(bytes, val) + + // Calculate actual needed bytes and return only those + neededBytes := (len(bitString) + 7) / 8 + return bytes[len(bytes)-neededBytes:], nil +} diff --git a/worker/pkg/benthos/sql/json_processor_test.go b/worker/pkg/benthos/sql/json_processor_test.go new file mode 100644 index 0000000000..7de52328fc --- /dev/null +++ b/worker/pkg/benthos/sql/json_processor_test.go @@ -0,0 +1,47 @@ +package neosync_benthos_sql + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func Test_convertStringToBit(t *testing.T) { + t.Run("8 bits", func(t *testing.T) { + got, err := convertStringToBit("10101010") + require.NoError(t, err) + expected := []byte{170} + require.Equalf(t, expected, got, "got %v, want %v", got, expected) + }) + + t.Run("1 bit", func(t *testing.T) { + got, err := convertStringToBit("1") + require.NoError(t, err) + expected := []byte{1} + require.Equalf(t, expected, got, "got %v, want %v", got, expected) + }) + + t.Run("16 bits", func(t *testing.T) { + got, err := convertStringToBit("1010101010101010") + require.NoError(t, err) + expected := []byte{170, 170} + require.Equalf(t, expected, got, "got %v, want %v", got, expected) + }) + + t.Run("24 bits", func(t *testing.T) { + got, err := convertStringToBit("101010101111111100000000") + require.NoError(t, err) + expected := []byte{170, 255, 0} + require.Equalf(t, expected, got, "got %v, want %v", got, expected) + }) + + t.Run("invalid binary string", func(t *testing.T) { + _, err := convertStringToBit("102") + require.Error(t, err) + }) + + t.Run("empty string", func(t *testing.T) { + _, err := convertStringToBit("") + require.Error(t, err) + }) +}