Skip to content

Commit

Permalink
NEOS-1712: Adds ability to specify user defined transformers in anony…
Browse files Browse the repository at this point in the history
…mization api (#3160)
  • Loading branch information
nickzelei authored Jan 17, 2025
1 parent 5918dae commit 0f57e26
Show file tree
Hide file tree
Showing 13 changed files with 320 additions and 67 deletions.
3 changes: 3 additions & 0 deletions .mockery.yml
Original file line number Diff line number Diff line change
Expand Up @@ -123,3 +123,6 @@ packages:
github.com/nucleuscloud/neosync/internal/ee/transformers/functions:
interfaces:
NeosyncOperatorApi:
github.com/nucleuscloud/neosync/worker/pkg/benthos/transformers:
interfaces:
UserDefinedTransformerResolver:
2 changes: 1 addition & 1 deletion backend/internal/cmds/mgmt/serve/connect/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -639,7 +639,7 @@ func serve(ctx context.Context) error {
PresidioDefaultLanguage: getPresidioDefaultLanguage(),
IsAuthEnabled: isAuthEnabled,
IsNeosyncCloud: ncloudlicense.IsValid(),
}, anonymizerMeter, userdataclient, useraccountService, presAnalyzeClient, presAnonClient, db)
}, anonymizerMeter, userdataclient, useraccountService, transformerService, presAnalyzeClient, presAnonClient, db)
api.Handle(
mgmtv1alpha1connect.NewAnonymizationServiceHandler(
anonymizationService,
Expand Down
1 change: 1 addition & 0 deletions backend/pkg/integration-test/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ func (s *NeosyncApiTestClient) setupMux(
nil, // meter
userclient,
userService,
transformerService,
presAnalyzeClient, presAnonClient,
neosyncDb,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ func (s *Service) AnonymizeMany(
jsonanonymizer.WithDefaultTransformers(req.Msg.DefaultTransformers),
jsonanonymizer.WithHaltOnFailure(req.Msg.HaltOnFailure),
jsonanonymizer.WithConditionalAnonymizeConfig(s.cfg.IsPresidioEnabled, s.analyze, s.anonymize, s.cfg.PresidioDefaultLanguage),
jsonanonymizer.WithTransformerClient(s.transformerClient),
jsonanonymizer.WithLogger(logger),
)
if err != nil {
Expand Down Expand Up @@ -201,6 +202,7 @@ func (s *Service) AnonymizeSingle(
jsonanonymizer.WithTransformerMappings(req.Msg.TransformerMappings),
jsonanonymizer.WithDefaultTransformers(req.Msg.DefaultTransformers),
jsonanonymizer.WithConditionalAnonymizeConfig(s.cfg.IsPresidioEnabled, s.analyze, s.anonymize, s.cfg.PresidioDefaultLanguage),
jsonanonymizer.WithTransformerClient(s.transformerClient),
jsonanonymizer.WithLogger(logger),
)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type Service struct {
meter metric.Meter // optional
userdataclient userdata.Interface
useraccountService mgmtv1alpha1connect.UserAccountServiceClient
transformerClient mgmtv1alpha1connect.TransformersServiceClient
analyze presidioapi.AnalyzeInterface
anonymize presidioapi.AnonymizeInterface
db *neosyncdb.NeosyncDb
Expand All @@ -30,6 +31,7 @@ func New(
meter metric.Meter,
userdataclient userdata.Interface,
useraccountService mgmtv1alpha1connect.UserAccountServiceClient,
transformerClient mgmtv1alpha1connect.TransformersServiceClient,
analyzeclient presidioapi.AnalyzeInterface,
anonymizeclient presidioapi.AnonymizeInterface,
db *neosyncdb.NeosyncDb,
Expand All @@ -39,6 +41,7 @@ func New(
meter: meter,
userdataclient: userdataclient,
useraccountService: useraccountService,
transformerClient: transformerClient,
analyze: analyzeclient,
anonymize: anonymizeclient,
db: db,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,69 @@ func (s *IntegrationTestSuite) Test_AnonymizeService_AnonymizeMany() {
}

func (s *IntegrationTestSuite) Test_AnonymizeService_AnonymizeSingle() {
jsonStr :=
`{
t := s.T()

t.Run("user-defined-transformer", func(t *testing.T) {
jsonStr := `{
"sports": ["basketball", "golf", "swimming"]
}`

accountId := s.createPersonalAccount(s.ctx, s.OSSUnauthenticatedLicensedClients.Users())
categories := "A"
createTransformerResp, err := s.OSSUnauthenticatedLicensedClients.Transformers().CreateUserDefinedTransformer(
s.ctx,
connect.NewRequest(&mgmtv1alpha1.CreateUserDefinedTransformerRequest{
AccountId: accountId,
Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_CATEGORICAL,
TransformerConfig: &mgmtv1alpha1.TransformerConfig{
Config: &mgmtv1alpha1.TransformerConfig_GenerateCategoricalConfig{
GenerateCategoricalConfig: &mgmtv1alpha1.GenerateCategorical{
Categories: &categories,
},
},
},
}),
)
requireNoErrResp(t, createTransformerResp, err)
transformerId := createTransformerResp.Msg.GetTransformer().GetId()
resp, err := s.OSSUnauthenticatedLicensedClients.Anonymize().AnonymizeSingle(
s.ctx,
connect.NewRequest(&mgmtv1alpha1.AnonymizeSingleRequest{
AccountId: accountId,
InputData: jsonStr,
TransformerMappings: []*mgmtv1alpha1.TransformerMapping{
{
Expression: ".sports[]",
Transformer: &mgmtv1alpha1.TransformerConfig{
Config: &mgmtv1alpha1.TransformerConfig_UserDefinedTransformerConfig{
UserDefinedTransformerConfig: &mgmtv1alpha1.UserDefinedTransformerConfig{
Id: transformerId,
},
},
},
},
},
}),
)
requireNoErrResp(s.T(), resp, err)
require.NotEmpty(s.T(), resp.Msg.OutputData)

var inputObject map[string]any
err = json.Unmarshal([]byte(jsonStr), &inputObject)
require.NoError(s.T(), err)

output := resp.Msg.OutputData
var result map[string]any
err = json.Unmarshal([]byte(output), &result)
require.NoError(s.T(), err)
for _, sport := range result["sports"].([]any) {
require.Equal(t, "A", sport)
}
})

t.Run("ok", func(t *testing.T) {
jsonStr :=
`{
"user": {
"name": "Jane Doe",
"age": 42,
Expand All @@ -171,58 +232,59 @@ func (s *IntegrationTestSuite) Test_AnonymizeService_AnonymizeSingle() {
"sports": ["basketball", "golf", "swimming"]
}`

accountId := s.createPersonalAccount(s.ctx, s.OSSUnauthenticatedLicensedClients.Users())
resp, err := s.OSSUnauthenticatedLicensedClients.Anonymize().AnonymizeSingle(
s.ctx,
connect.NewRequest(&mgmtv1alpha1.AnonymizeSingleRequest{
AccountId: accountId,
InputData: jsonStr,
DefaultTransformers: &mgmtv1alpha1.DefaultTransformersConfig{
N: &mgmtv1alpha1.TransformerConfig{
Config: &mgmtv1alpha1.TransformerConfig_GenerateInt64Config{
GenerateInt64Config: &mgmtv1alpha1.GenerateInt64{
Min: gotypeutil.ToPtr(int64(18)),
Max: gotypeutil.ToPtr(int64(30)),
accountId := s.createPersonalAccount(s.ctx, s.OSSUnauthenticatedLicensedClients.Users())
resp, err := s.OSSUnauthenticatedLicensedClients.Anonymize().AnonymizeSingle(
s.ctx,
connect.NewRequest(&mgmtv1alpha1.AnonymizeSingleRequest{
AccountId: accountId,
InputData: jsonStr,
DefaultTransformers: &mgmtv1alpha1.DefaultTransformersConfig{
N: &mgmtv1alpha1.TransformerConfig{
Config: &mgmtv1alpha1.TransformerConfig_GenerateInt64Config{
GenerateInt64Config: &mgmtv1alpha1.GenerateInt64{
Min: gotypeutil.ToPtr(int64(18)),
Max: gotypeutil.ToPtr(int64(30)),
},
},
},
},
},
TransformerMappings: []*mgmtv1alpha1.TransformerMapping{
{
Expression: ".details.name",
Transformer: &mgmtv1alpha1.TransformerConfig{
Config: &mgmtv1alpha1.TransformerConfig_TransformFirstNameConfig{
TransformFirstNameConfig: &mgmtv1alpha1.TransformFirstName{},
TransformerMappings: []*mgmtv1alpha1.TransformerMapping{
{
Expression: ".details.name",
Transformer: &mgmtv1alpha1.TransformerConfig{
Config: &mgmtv1alpha1.TransformerConfig_TransformFirstNameConfig{
TransformFirstNameConfig: &mgmtv1alpha1.TransformFirstName{},
},
},
},
},
{
Expression: ".sports[]",
Transformer: &mgmtv1alpha1.TransformerConfig{
Config: &mgmtv1alpha1.TransformerConfig_GenerateCityConfig{
GenerateCityConfig: &mgmtv1alpha1.GenerateCity{},
{
Expression: ".sports[]",
Transformer: &mgmtv1alpha1.TransformerConfig{
Config: &mgmtv1alpha1.TransformerConfig_GenerateCityConfig{
GenerateCityConfig: &mgmtv1alpha1.GenerateCity{},
},
},
},
},
},
}),
)
requireNoErrResp(s.T(), resp, err)
require.NotEmpty(s.T(), resp.Msg.OutputData)

var inputObject map[string]any
err = json.Unmarshal([]byte(jsonStr), &inputObject)
require.NoError(s.T(), err)

output := resp.Msg.OutputData
var result map[string]any
err = json.Unmarshal([]byte(output), &result)
require.NoError(s.T(), err)
require.NotEqual(s.T(), inputObject["details"].(map[string]any)["name"], result["details"].(map[string]any)["name"])
require.NotEqual(s.T(), inputObject["user"].(map[string]any)["age"], result["user"].(map[string]any)["age"])
for j, sport := range result["sports"].([]any) {
require.NotEqual(s.T(), inputObject["sports"].([]any)[j], sport)
}
}),
)
requireNoErrResp(s.T(), resp, err)
require.NotEmpty(s.T(), resp.Msg.OutputData)

var inputObject map[string]any
err = json.Unmarshal([]byte(jsonStr), &inputObject)
require.NoError(s.T(), err)

output := resp.Msg.OutputData
var result map[string]any
err = json.Unmarshal([]byte(output), &result)
require.NoError(s.T(), err)
require.NotEqual(s.T(), inputObject["details"].(map[string]any)["name"], result["details"].(map[string]any)["name"])
require.NotEqual(s.T(), inputObject["user"].(map[string]any)["age"], result["user"].(map[string]any)["age"])
for j, sport := range result["sports"].([]any) {
require.NotEqual(s.T(), inputObject["sports"].([]any)[j], sport)
}
})
}

func (s *IntegrationTestSuite) Test_AnonymizeService_AnonymizeSingle_InvalidTransformerConfig() {
Expand Down
22 changes: 17 additions & 5 deletions internal/json-anonymizer/json-anonymizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/itchyny/gojq"
mgmtv1alpha1 "github.com/nucleuscloud/neosync/backend/gen/go/protos/mgmt/v1alpha1"
"github.com/nucleuscloud/neosync/backend/gen/go/protos/mgmt/v1alpha1/mgmtv1alpha1connect"
presidioapi "github.com/nucleuscloud/neosync/internal/ee/presidio"
transformer "github.com/nucleuscloud/neosync/worker/pkg/benthos/transformers"
)
Expand All @@ -29,7 +30,8 @@ type JsonAnonymizer struct {
skipPaths map[string]struct{}
anonymizeConfig *anonymizeConfig

logger *slog.Logger
logger *slog.Logger
transformerClient mgmtv1alpha1connect.TransformersServiceClient
}

type anonymizeConfig struct {
Expand Down Expand Up @@ -57,14 +59,14 @@ func NewAnonymizer(opts ...Option) (*JsonAnonymizer, error) {

// Initialize transformerExecutors
var err error
a.transformerExecutors, err = initTransformerExecutors(a.transformerMappings, a.anonymizeConfig, a.logger)
a.transformerExecutors, err = initTransformerExecutors(a.transformerMappings, a.anonymizeConfig, a.transformerClient, a.logger)
if err != nil {
return nil, err
}

// Initialize defaultTransformerExecutor if needed
if a.defaultTransformers != nil {
a.defaultTransformerExecutor, err = initDefaultTransformerExecutors(a.defaultTransformers, a.anonymizeConfig, a.logger)
a.defaultTransformerExecutor, err = initDefaultTransformerExecutors(a.defaultTransformers, a.anonymizeConfig, a.transformerClient, a.logger)
if err != nil {
return nil, err
}
Expand All @@ -83,6 +85,12 @@ func WithLogger(logger *slog.Logger) Option {
}
}

func WithTransformerClient(transformerClient mgmtv1alpha1connect.TransformersServiceClient) Option {
return func(ja *JsonAnonymizer) {
ja.transformerClient = transformerClient
}
}

// WithAnonymizeConfig sets the analyze and anonymize clients for use by the presidio transformers only if isEnabled is true
func WithConditionalAnonymizeConfig(isEnabled bool, analyze presidioapi.AnalyzeInterface, anonymize presidioapi.AnonymizeInterface, defaultLanguage *string) Option {
return func(ja *JsonAnonymizer) {
Expand Down Expand Up @@ -349,16 +357,18 @@ func (a *JsonAnonymizer) AnonymizeJSONObject(jsonStr string) (string, error) {
func initTransformerExecutors(
transformerMappings []*mgmtv1alpha1.TransformerMapping,
anonymizeConfig *anonymizeConfig,
transformerClient mgmtv1alpha1connect.TransformersServiceClient,
logger *slog.Logger,
) ([]*transformer.TransformerExecutor, error) {
executors := []*transformer.TransformerExecutor{}
execOpts := []transformer.TransformerExecutorOption{
transformer.WithLogger(logger),
transformer.WithUserDefinedTransformerResolver(newUdtResolver(transformerClient)),
}
if anonymizeConfig != nil && anonymizeConfig.analyze != nil && anonymizeConfig.anonymize != nil {
execOpts = append(
execOpts,
transformer.WithTransformPiiTextConfig(anonymizeConfig.analyze, anonymizeConfig.anonymize, newNeosyncOperatorApi(logger), anonymizeConfig.defaultLanguage),
transformer.WithTransformPiiTextConfig(anonymizeConfig.analyze, anonymizeConfig.anonymize, newNeosyncOperatorApi(execOpts), anonymizeConfig.defaultLanguage),
)
}

Expand All @@ -382,13 +392,15 @@ type DefaultExecutors struct {
func initDefaultTransformerExecutors(
defaultTransformer *mgmtv1alpha1.DefaultTransformersConfig,
anonymizeConfig *anonymizeConfig,
transformerClient mgmtv1alpha1connect.TransformersServiceClient,
logger *slog.Logger,
) (*DefaultExecutors, error) {
execOpts := []transformer.TransformerExecutorOption{
transformer.WithLogger(logger),
transformer.WithUserDefinedTransformerResolver(newUdtResolver(transformerClient)),
}
if anonymizeConfig != nil && anonymizeConfig.analyze != nil && anonymizeConfig.anonymize != nil {
execOpts = append(execOpts, transformer.WithTransformPiiTextConfig(anonymizeConfig.analyze, anonymizeConfig.anonymize, newNeosyncOperatorApi(logger), anonymizeConfig.defaultLanguage))
execOpts = append(execOpts, transformer.WithTransformPiiTextConfig(anonymizeConfig.analyze, anonymizeConfig.anonymize, newNeosyncOperatorApi(execOpts), anonymizeConfig.defaultLanguage))
}

var stringExecutor, numberExecutor, booleanExecutor *transformer.TransformerExecutor
Expand Down
8 changes: 4 additions & 4 deletions internal/json-anonymizer/json-anonymizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func Test_InitTransformerExecutors(t *testing.T) {
},
},
}
executors, err := initTransformerExecutors(mappings, nil, testutil.GetTestLogger(t))
executors, err := initTransformerExecutors(mappings, nil, nil, testutil.GetTestLogger(t))
require.NoError(t, err)
require.Len(t, executors, 1)
})
Expand All @@ -239,7 +239,7 @@ func Test_InitTransformerExecutors(t *testing.T) {
Transformer: nil,
},
}
_, err := initTransformerExecutors(mappings, nil, testutil.GetTestLogger(t))
_, err := initTransformerExecutors(mappings, nil, nil, testutil.GetTestLogger(t))
require.Error(t, err)
})
}
Expand All @@ -261,7 +261,7 @@ func Test_InitDefaultTransformerExecutors(t *testing.T) {
Config: &mgmtv1alpha1.TransformerConfig_GenerateBoolConfig{},
},
}
executors, err := initDefaultTransformerExecutors(defaults, nil, testutil.GetTestLogger(t))
executors, err := initDefaultTransformerExecutors(defaults, nil, nil, testutil.GetTestLogger(t))
require.NoError(t, err)
require.NotNil(t, executors.S)
require.NotNil(t, executors.N)
Expand All @@ -276,7 +276,7 @@ func Test_InitDefaultTransformerExecutors(t *testing.T) {
},
},
}
executors, err := initDefaultTransformerExecutors(defaults, nil, testutil.GetTestLogger(t))
executors, err := initDefaultTransformerExecutors(defaults, nil, nil, testutil.GetTestLogger(t))
require.NoError(t, err)
require.NotNil(t, executors.S)
require.Nil(t, executors.N)
Expand Down
Loading

0 comments on commit 0f57e26

Please sign in to comment.