Skip to content

Commit

Permalink
Fix S3 to SQL database types (#2911)
Browse files Browse the repository at this point in the history
  • Loading branch information
alishakawaguchi authored Nov 5, 2024
1 parent db1c628 commit b063df0
Show file tree
Hide file tree
Showing 7 changed files with 208 additions and 14 deletions.
3 changes: 2 additions & 1 deletion cli/internal/cmds/neosync/sync/sync_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ func Test_Sync(t *testing.T) {
conndataclient := neosyncApi.UnauthdClients.ConnectionData
sqlmanagerclient := tcneosyncapi.NewTestSqlManagerClient()

discardLogger := testutil.GetTestCharmSlogger()
accountId := tcneosyncapi.CreatePersonalAccount(ctx, t, neosyncApi.UnauthdClients.Users)
outputType := output.PlainOutput

Expand All @@ -54,6 +53,7 @@ func Test_Sync(t *testing.T) {
sourceConn := tcneosyncapi.CreatePostgresConnection(ctx, t, neosyncApi.UnauthdClients.Connections, accountId, "postgres-source", postgres.Source.URL)

t.Run("sync", func(t *testing.T) {
discardLogger := testutil.GetTestLogger(t)
cmdConfig := &cmdConfig{
Source: &sourceConfig{
ConnectionId: sourceConn.Id,
Expand Down Expand Up @@ -123,6 +123,7 @@ func Test_Sync(t *testing.T) {
sourceConn := tcneosyncapi.CreateMysqlConnection(ctx, t, neosyncApi.UnauthdClients.Connections, accountId, "mysql-source", mysql.Source.URL)

t.Run("sync", func(t *testing.T) {
discardLogger := testutil.GetTestLogger(t)
cmdConfig := &cmdConfig{
Source: &sourceConfig{
ConnectionId: sourceConn.Id,
Expand Down
14 changes: 11 additions & 3 deletions internal/benthos/benthos-builder/builders/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,16 +374,23 @@ func (b *sqlSyncBuilder) BuildDestinationConfig(ctx context.Context, params *bb_
}
}

columnTypes := []string{}
columnTypes := []string{} // use map going forward
columnDataTypes := map[string]string{}
for _, c := range benthosConfig.Columns {
colType, ok := colInfoMap[c]
if ok {
columnDataTypes[c] = colType.DataType
columnTypes = append(columnTypes, colType.DataType)
} else {
columnTypes = append(columnTypes, "")
}
}

batchProcessors := []*neosync_benthos.BatchProcessor{}
if benthosConfig.Config.Input.Inputs.NeosyncConnectionData != nil {
batchProcessors = append(batchProcessors, &neosync_benthos.BatchProcessor{JsonToSql: &neosync_benthos.JsonToSqlConfig{ColumnDataTypes: columnDataTypes}})
}

prefix, suffix := getInsertPrefixAndSuffix(b.driver, benthosConfig.TableSchema, benthosConfig.TableName, columnDefaultProperties)
config.Outputs = append(config.Outputs, neosync_benthos.Outputs{
Fallback: []neosync_benthos.Outputs{
Expand All @@ -406,8 +413,9 @@ func (b *sqlSyncBuilder) BuildDestinationConfig(ctx context.Context, params *bb_
Suffix: suffix,

Batching: &neosync_benthos.Batching{
Period: destOpts.BatchPeriod,
Count: destOpts.BatchCount,
Period: destOpts.BatchPeriod,
Count: destOpts.BatchCount,
Processors: batchProcessors,
},
},
},
Expand Down
14 changes: 4 additions & 10 deletions internal/testutil/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@ import (
"fmt"
"log/slog"
"os"
"strings"
"testing"

charmlog "github.com/charmbracelet/log"
)

func ShouldRunIntegrationTest() bool {
Expand All @@ -31,13 +30,8 @@ type testWriter struct {
}

func (tw testWriter) Write(p []byte) (n int, err error) {
tw.t.Log(string(p))
// removes extra line between log statements
msg := strings.TrimSuffix(string(p), "\n")
tw.t.Log(msg)
return len(p), nil
}

func GetTestCharmSlogger() *slog.Logger {
charmlogger := charmlog.NewWithOptions(os.Stdout, charmlog.Options{
Level: charmlog.DebugLevel,
})
return slog.New(charmlogger)
}
5 changes: 5 additions & 0 deletions worker/pkg/benthos/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,11 @@ type BatchProcessor struct {
Archive *ArchiveProcessor `json:"archive,omitempty" yaml:"archive,omitempty"`
Compress *CompressProcessor `json:"compress,omitempty" yaml:"compress,omitempty"`
SqlToJson *SqlToJsonConfig `json:"sql_to_json,omitempty" yaml:"sql_to_json,omitempty"`
JsonToSql *JsonToSqlConfig `json:"json_to_sql,omitempty" yaml:"json_to_sql,omitempty"`
}

type JsonToSqlConfig struct {
ColumnDataTypes map[string]string `json:"column_data_types" yaml:"column_data_types"`
}

type SqlToJsonConfig struct{}
Expand Down
5 changes: 5 additions & 0 deletions worker/pkg/benthos/environment/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,11 @@ func NewWithEnvironment(env *service.Environment, logger *slog.Logger, opts ...O
return nil, fmt.Errorf("unable to register SQL to JSON processor to benthos instance: %w", err)
}

err = neosync_benthos_sql.RegisterJsonToSqlProcessor(env)
if err != nil {
return nil, fmt.Errorf("unable to regigster JSON to SQL processor to benthos instance: %w", err)
}

if config.blobEnv != nil {
env.UseBloblangEnvironment(config.blobEnv)
}
Expand Down
134 changes: 134 additions & 0 deletions worker/pkg/benthos/sql/json_processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package neosync_benthos_sql

import (
"context"
"encoding/binary"
"encoding/json"
"strconv"
"strings"

"github.com/warpstreamlabs/bento/public/service"
)

func jsonToSqlProcessorConfig() *service.ConfigSpec {
return service.NewConfigSpec().Field(service.NewStringMapField("column_data_types"))
}

func RegisterJsonToSqlProcessor(env *service.Environment) error {
return env.RegisterBatchProcessor(
"json_to_sql",
jsonToSqlProcessorConfig(),
func(conf *service.ParsedConfig, mgr *service.Resources) (service.BatchProcessor, error) {
proc, err := newJsonToSqlProcessor(conf, mgr)
if err != nil {
return nil, err
}
return proc, nil
})
}

type jsonToSqlProcessor struct {
logger *service.Logger
columnDataTypes map[string]string // column name to datatype
}

func newJsonToSqlProcessor(conf *service.ParsedConfig, mgr *service.Resources) (*jsonToSqlProcessor, error) {
columnDataTypes, err := conf.FieldStringMap("column_data_types")
if err != nil {
return nil, err
}
return &jsonToSqlProcessor{
logger: mgr.Logger(),
columnDataTypes: columnDataTypes,
}, nil
}

func (p *jsonToSqlProcessor) ProcessBatch(ctx context.Context, batch service.MessageBatch) ([]service.MessageBatch, error) {
newBatch := make(service.MessageBatch, 0, len(batch))
for _, msg := range batch {
root, err := msg.AsStructuredMut()
if err != nil {
return nil, err
}
newRoot := p.transform("", root)
newMsg := msg.Copy()
newMsg.SetStructured(newRoot)
newBatch = append(newBatch, newMsg)
}

if len(newBatch) == 0 {
return nil, nil
}
return []service.MessageBatch{newBatch}, nil
}

func (m *jsonToSqlProcessor) Close(context.Context) error {
return nil
}

// [bigint binary blob char date datetime decimal double enum float int int json longblob longtext mediumblob mediumint mediumtext set set smallint text time timestamp tinyblob tinyint tinytext varbinary varchar year]
func (p *jsonToSqlProcessor) transform(path string, root any) any {
switch v := root.(type) {
case map[string]any:
newMap := make(map[string]any)
for k, v2 := range v {
newValue := p.transform(k, v2)
newMap[k] = newValue
}
return newMap
case nil:
return v
case []byte:
datatype, ok := p.columnDataTypes[path]
if !ok {
return v
}
if strings.EqualFold(datatype, "bit") {
bit, err := convertStringToBit(string(v))
if err != nil {
p.logger.Errorf("unable to convert bit string to SQL bit []byte: %w", err)
return v
}
return bit
} else if strings.EqualFold(datatype, "json") {
validJson, err := getValidJson(v)
if err != nil {
p.logger.Errorf("unable to get valid json: %w", err)
return v
}
return validJson
}
return v
default:
return v
}
}

// handles case where json strings are not quoted
func getValidJson(jsonData []byte) ([]byte, error) {
isValidJson := json.Valid(jsonData)
if isValidJson {
return jsonData, nil
}

quotedData, err := json.Marshal(string(jsonData))
if err != nil {
return nil, err
}
return quotedData, nil
}

func convertStringToBit(bitString string) ([]byte, error) {
val, err := strconv.ParseUint(bitString, 2, len(bitString))
if err != nil {
return nil, err
}

// Always allocate 8 bytes for PutUint64
bytes := make([]byte, 8)
binary.BigEndian.PutUint64(bytes, val)

// Calculate actual needed bytes and return only those
neededBytes := (len(bitString) + 7) / 8
return bytes[len(bytes)-neededBytes:], nil
}
47 changes: 47 additions & 0 deletions worker/pkg/benthos/sql/json_processor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package neosync_benthos_sql

import (
"testing"

"github.com/stretchr/testify/require"
)

func Test_convertStringToBit(t *testing.T) {
t.Run("8 bits", func(t *testing.T) {
got, err := convertStringToBit("10101010")
require.NoError(t, err)
expected := []byte{170}
require.Equalf(t, expected, got, "got %v, want %v", got, expected)
})

t.Run("1 bit", func(t *testing.T) {
got, err := convertStringToBit("1")
require.NoError(t, err)
expected := []byte{1}
require.Equalf(t, expected, got, "got %v, want %v", got, expected)
})

t.Run("16 bits", func(t *testing.T) {
got, err := convertStringToBit("1010101010101010")
require.NoError(t, err)
expected := []byte{170, 170}
require.Equalf(t, expected, got, "got %v, want %v", got, expected)
})

t.Run("24 bits", func(t *testing.T) {
got, err := convertStringToBit("101010101111111100000000")
require.NoError(t, err)
expected := []byte{170, 255, 0}
require.Equalf(t, expected, got, "got %v, want %v", got, expected)
})

t.Run("invalid binary string", func(t *testing.T) {
_, err := convertStringToBit("102")
require.Error(t, err)
})

t.Run("empty string", func(t *testing.T) {
_, err := convertStringToBit("")
require.Error(t, err)
})
}

0 comments on commit b063df0

Please sign in to comment.