Skip to content

Commit

Permalink
NEOS-1712: Adds support for JS in Anon Api (#3163)
Browse files Browse the repository at this point in the history
  • Loading branch information
nickzelei authored Jan 21, 2025
1 parent a312e37 commit 1de0f68
Show file tree
Hide file tree
Showing 39 changed files with 1,983 additions and 1,383 deletions.
2 changes: 1 addition & 1 deletion .mockery.yml
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,6 @@ packages:
github.com/nucleuscloud/neosync/internal/ee/transformers/functions:
interfaces:
NeosyncOperatorApi:
github.com/nucleuscloud/neosync/worker/pkg/benthos/transformers:
github.com/nucleuscloud/neosync/worker/pkg/benthos/transformer_executor:
interfaces:
UserDefinedTransformerResolver:
Original file line number Diff line number Diff line change
Expand Up @@ -198,22 +198,74 @@ func (s *IntegrationTestSuite) Test_AnonymizeService_AnonymizeSingle() {
},
}),
)
requireNoErrResp(s.T(), resp, err)
require.NotEmpty(s.T(), resp.Msg.OutputData)
requireNoErrResp(t, resp, err)
require.NotEmpty(t, resp.Msg.OutputData)

var inputObject map[string]any
err = json.Unmarshal([]byte(jsonStr), &inputObject)
require.NoError(s.T(), err)
require.NoError(t, err)

output := resp.Msg.OutputData
var result map[string]any
err = json.Unmarshal([]byte(output), &result)
require.NoError(s.T(), err)
require.NoError(t, err)
for _, sport := range result["sports"].([]any) {
require.Equal(t, "A", sport)
}
})

t.Run("javascript-transformers", func(t *testing.T) {
jsonStr := `{
"sports": ["basketball", "golf", "swimming"],
"name": "bill"
}`

accountId := s.createPersonalAccount(s.ctx, s.OSSUnauthenticatedLicensedClients.Users())

resp, err := s.OSSUnauthenticatedLicensedClients.Anonymize().AnonymizeSingle(
s.ctx,
connect.NewRequest(&mgmtv1alpha1.AnonymizeSingleRequest{
AccountId: accountId,
InputData: jsonStr,
TransformerMappings: []*mgmtv1alpha1.TransformerMapping{
{
Expression: ".sports",
Transformer: &mgmtv1alpha1.TransformerConfig{
Config: &mgmtv1alpha1.TransformerConfig_TransformJavascriptConfig{
TransformJavascriptConfig: &mgmtv1alpha1.TransformJavascript{
Code: "return value.map(v => v + ' updated');",
},
},
},
},
{
Expression: ".name",
Transformer: &mgmtv1alpha1.TransformerConfig{
Config: &mgmtv1alpha1.TransformerConfig_GenerateJavascriptConfig{
GenerateJavascriptConfig: &mgmtv1alpha1.GenerateJavascript{
Code: "return 'jim';",
},
},
},
},
},
}),
)
requireNoErrResp(t, resp, err)
require.NotEmpty(t, resp.Msg.OutputData)

var inputObject map[string]any
err = json.Unmarshal([]byte(jsonStr), &inputObject)
require.NoError(t, err)

output := resp.Msg.OutputData
var result map[string]any
err = json.Unmarshal([]byte(output), &result)
require.NoError(t, err)
require.Equal(t, "jim", result["name"])
require.Equal(t, []any{"basketball updated", "golf updated", "swimming updated"}, result["sports"])
})

t.Run("ok", func(t *testing.T) {
jsonStr :=
`{
Expand Down
75 changes: 6 additions & 69 deletions internal/benthos/benthos-builder/builders/processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ import (
"fmt"
"slices"
"strings"
"unicode"

"connectrpc.com/connect"
mgmtv1alpha1 "github.com/nucleuscloud/neosync/backend/gen/go/protos/mgmt/v1alpha1"
"github.com/nucleuscloud/neosync/backend/gen/go/protos/mgmt/v1alpha1/mgmtv1alpha1connect"
sqlmanager_shared "github.com/nucleuscloud/neosync/backend/pkg/sqlmanager/shared"
tabledependency "github.com/nucleuscloud/neosync/backend/pkg/table-dependency"
bb_internal "github.com/nucleuscloud/neosync/internal/benthos/benthos-builder/internal"
javascript_userland "github.com/nucleuscloud/neosync/internal/javascript/userland"
neosync_benthos "github.com/nucleuscloud/neosync/worker/pkg/benthos"
"github.com/nucleuscloud/neosync/worker/pkg/benthos/transformers"
transformer_utils "github.com/nucleuscloud/neosync/worker/pkg/benthos/transformers/utils"
Expand Down Expand Up @@ -242,7 +242,7 @@ func extractJsFunctionsAndOutputs(ctx context.Context, transformerclient mgmtv1a
}

if len(jsFunctions) > 0 {
return constructBenthosJsProcessor(jsFunctions, benthosOutputs), nil
return javascript_userland.GetFunction(jsFunctions, benthosOutputs), nil
} else {
return "", nil
}
Expand Down Expand Up @@ -385,83 +385,25 @@ func buildRedisGetBranchConfig(
func constructJsFunction(jsCode, col string, source mgmtv1alpha1.TransformerSource) string {
switch source {
case mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_TRANSFORM_JAVASCRIPT:
return fmt.Sprintf(`
function fn_%s(value, input){
%s
};
`, sanitizeJsFunctionName(col), jsCode)
return javascript_userland.GetTransformJavascriptFunction(jsCode, col, true)
case mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_JAVASCRIPT:
return fmt.Sprintf(`
function fn_%s(){
%s
};
`, sanitizeJsFunctionName(col), jsCode)
return javascript_userland.GetGenerateJavascriptFunction(jsCode, col)
default:
return ""
}
}

func sanitizeJsFunctionName(input string) string {
var result strings.Builder

for i, r := range input {
if unicode.IsLetter(r) || r == '_' || r == '$' || (unicode.IsDigit(r) && i > 0) {
result.WriteRune(r)
} else if unicode.IsDigit(r) && i == 0 {
result.WriteRune('_')
result.WriteRune(r)
} else {
result.WriteRune('_')
}
}

return result.String()
}

func constructBenthosJsProcessor(jsFunctions, benthosOutputs []string) string {
jsFunctionStrings := strings.Join(jsFunctions, "\n")

benthosOutputString := strings.Join(benthosOutputs, "\n")

jsCode := fmt.Sprintf(`
(() => {
%s
const input = benthos.v0_msg_as_structured();
const updatedValues = {}
%s
neosync.patchStructuredMessage(updatedValues)
})();`, jsFunctionStrings, benthosOutputString)
return jsCode
}

func constructBenthosJavascriptObject(col string, source mgmtv1alpha1.TransformerSource) string {
switch source {
case mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_TRANSFORM_JAVASCRIPT:
return fmt.Sprintf(
`updatedValues[%q] = fn_%s(%s, input)`,
col,
sanitizeJsFunctionName(col),
convertJsObjPathToOptionalChain(fmt.Sprintf("input.%s", col)),
)
return javascript_userland.BuildOutputSetter(col, true, true)
case mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_JAVASCRIPT:
return fmt.Sprintf(
`updatedValues[%q] = fn_%s()`,
col,
sanitizeJsFunctionName(col),
)
return javascript_userland.BuildOutputSetter(col, false, false)
default:
return ""
}
}

func convertJsObjPathToOptionalChain(inputPath string) string {
parts := strings.Split(inputPath, ".")
for i := 1; i < len(parts); i++ {
parts[i] = fmt.Sprintf("['%s']", parts[i])
}
return strings.Join(parts, "?.")
}

// takes in an user defined config with just an id field and return the right transformer config for that user defined function id
func convertUserDefinedFunctionConfig(
ctx context.Context,
Expand All @@ -479,11 +421,6 @@ func convertUserDefinedFunctionConfig(
}, nil
}

/*
function transformers
root.{destination_col} = transformerfunction(args)
*/

func computeMutationFunction(col *mgmtv1alpha1.JobMapping, colInfo *sqlmanager_shared.DatabaseSchemaRow, splitColumnPath bool) (string, error) {
var maxLen int64 = 10000
if colInfo != nil && colInfo.CharacterMaximumLength > 0 {
Expand Down
29 changes: 0 additions & 29 deletions internal/benthos/benthos-builder/builders/processors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,35 +14,6 @@ import (
"github.com/stretchr/testify/require"
)

func Test_sanitizeFunctionName(t *testing.T) {
tests := []struct {
input string
expected string
}{
{"123my Function!", "_123my_Function_"},
{"validName", "validName"},
{"name_with_underscores", "name_with_underscores"},
{"$dollarSign", "$dollarSign"},
{"invalid-char$", "invalid_char$"},
{"spaces in name", "spaces_in_name"},
{"!@#$%^&*()_+=", "___$_________"},
{"_leadingUnderscore", "_leadingUnderscore"},
{"$startingDollarSign", "$startingDollarSign"},
{"endingWithNumber1", "endingWithNumber1"},
{"functionName123", "functionName123"},
{"中文字符", "中文字符"},
}

for _, tt := range tests {
t.Run(tt.input, func(t *testing.T) {
actual := sanitizeJsFunctionName(tt.input)
if actual != tt.expected {
t.Errorf("sanitizeJsFunctionName(%q) = %q; expected %q", tt.input, actual, tt.expected)
}
})
}
}

func Test_buildProcessorConfigsJavascript(t *testing.T) {
mockTransformerClient := mgmtv1alpha1connect.NewMockTransformersServiceClient(t)

Expand Down
93 changes: 93 additions & 0 deletions internal/benthos_slogger/logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package benthos_slogger

import (
"context"
"log/slog"

"github.com/warpstreamlabs/bento/public/service"
)

var _ slog.Handler = (*benthosLogHandler)(nil)

type benthosLogHandler struct {
logger *service.Logger
attrs []slog.Attr
groups []string
}

func (h *benthosLogHandler) Enabled(ctx context.Context, level slog.Level) bool {
// We defer to the benthos logger and let it handle what leveling it wants to output
return true
}

func (h *benthosLogHandler) Handle(ctx context.Context, r slog.Record) error { //nolint:gocritic // Needs to conform to the slog.Handler interface
// Combine pre-defined attrs with record attrs
allAttrs := make([]slog.Attr, 0, len(h.attrs)+r.NumAttrs())
allAttrs = append(allAttrs, h.attrs...)

r.Attrs(func(attr slog.Attr) bool {
if !attr.Equal(slog.Attr{}) {
// Handle groups
if len(h.groups) > 0 {
last := h.groups[len(h.groups)-1]
if last != "" {
attr.Key = last + "." + attr.Key
}
}
allAttrs = append(allAttrs, attr)
}
return true
})

// Convert to key-value pairs for temporal logger
keyvals := make([]any, 0, len(allAttrs)*2)
for _, attr := range allAttrs {
keyvals = append(keyvals, attr.Key, attr.Value.Any())
}

switch r.Level {
case slog.LevelDebug:
h.logger.With(keyvals...).Debug(r.Message)
case slog.LevelInfo:
h.logger.With(keyvals...).Info(r.Message)
case slog.LevelWarn:
h.logger.With(keyvals...).Warn(r.Message)
case slog.LevelError:
h.logger.With(keyvals...).Error(r.Message)
}
return nil
}

func (h *benthosLogHandler) WithAttrs(attrs []slog.Attr) slog.Handler {
newAttrs := []slog.Attr{}
newAttrs = append(newAttrs, h.attrs...)
newAttrs = append(newAttrs, attrs...)
return &benthosLogHandler{
logger: h.logger,
attrs: newAttrs,
groups: h.groups,
}
}

func (h *benthosLogHandler) WithGroup(name string) slog.Handler {
if name == "" {
return h
}
newGroups := []string{}
newGroups = append(newGroups, h.groups...)
newGroups = append(newGroups, name)
return &benthosLogHandler{
logger: h.logger,
attrs: h.attrs,
groups: newGroups,
}
}

func newBenthosLogHandler(logger *service.Logger) *benthosLogHandler {
return &benthosLogHandler{logger: logger}
}

// Returns a benthos logger wrapped as a slog.Logger to ease plugging in to the rest of the system
func NewSlogger(logger *service.Logger) *slog.Logger {
return slog.New(newBenthosLogHandler(logger))
}
Loading

0 comments on commit 1de0f68

Please sign in to comment.