Skip to content

Commit

Permalink
Updates CLI and Worker to used shared benthos builder (#2882)
Browse files Browse the repository at this point in the history
  • Loading branch information
alishakawaguchi authored Oct 31, 2024
1 parent 8628960 commit 9a06f7e
Show file tree
Hide file tree
Showing 44 changed files with 5,814 additions and 4,989 deletions.
2 changes: 1 addition & 1 deletion backend/pkg/sqlmanager/sql-manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ func (s *SqlManager) NewSqlDbFromUrl(

func GetColumnOverrideAndResetProperties(driver string, cInfo *sqlmanager_shared.ColumnInfo) (needsOverride, needsReset bool, err error) {
switch driver {
case sqlmanager_shared.PostgresDriver:
case sqlmanager_shared.PostgresDriver, "postgres":
needsOverride, needsReset := sqlmanager_postgres.GetPostgresColumnOverrideAndResetProperties(cInfo)
return needsOverride, needsReset, nil
case sqlmanager_shared.MysqlDriver:
Expand Down
6 changes: 5 additions & 1 deletion cli/internal/cmds/neosync/sync/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,8 @@ func buildCmdConfig(cmd *cobra.Command) (*cmdConfig, error) {
if err != nil {
return nil, err
}
config.Debug = debug

config.Debug = debug
if cmd.Flags().Changed("destination-open-limit") {
openLimit, err := cmd.Flags().GetInt32("destination-open-limit")
if err != nil {
Expand Down Expand Up @@ -196,6 +196,10 @@ func isConfigValid(cmd *cmdConfig, logger *slog.Logger, sourceConnection *mgmtv1
return errors.New("GCP Cloud Storage source connection type requires job-id or job-run-id")
}

if (sourceConnectionType == awsS3Connection || sourceConnectionType == gcpCloudStorageConnection) && cmd.Destination.InitSchema {
return errors.New("init schema is only supported when source is a SQL Database")
}

if cmd.Destination.TruncateCascade && cmd.Destination.Driver == mysqlDriver {
return fmt.Errorf("truncate cascade is only supported in postgres")
}
Expand Down
86 changes: 0 additions & 86 deletions cli/internal/cmds/neosync/sync/dynamodb.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package sync_cmd

import (
tabledependency "github.com/nucleuscloud/neosync/backend/pkg/table-dependency"
neosync_benthos "github.com/nucleuscloud/neosync/worker/pkg/benthos"
"github.com/spf13/cobra"
)

Expand Down Expand Up @@ -59,87 +57,3 @@ func buildAwsCredConfig(cmd *cobra.Command, config *cmdConfig) (*cmdConfig, erro
}
return config, nil
}

func generateDynamoDbBenthosConfig(
cmd *cmdConfig,
table string,
) *benthosConfigResponse {
bc := &neosync_benthos.BenthosConfig{
StreamConfig: neosync_benthos.StreamConfig{
Logger: &neosync_benthos.LoggerConfig{
Level: "ERROR",
AddTimestamp: true,
},
Input: &neosync_benthos.InputConfig{
Inputs: neosync_benthos.Inputs{
NeosyncConnectionData: &neosync_benthos.NeosyncConnectionData{
// ApiKey: authToken,
// ApiUrl: apiUrl,
ConnectionId: cmd.Source.ConnectionId,
ConnectionType: string(awsDynamoDBConnection),
Schema: "dynamodb",
Table: table,
},
},
},
Pipeline: &neosync_benthos.PipelineConfig{},
Output: &neosync_benthos.OutputConfig{
Outputs: neosync_benthos.Outputs{
AwsDynamoDB: &neosync_benthos.OutputAwsDynamoDB{
Table: table,
JsonMapColumns: map[string]string{
"": ".",
},

Batching: &neosync_benthos.Batching{
// https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_BatchWriteItem.html
// A single call to BatchWriteItem can transmit up to 16MB of data over the network, consisting of up to 25 item put or delete operations
// Specifying the count here may not be enough if the overall data is above 16MB.
// Benthos will fall back on error to single writes however
Period: "5s",
Count: 25,
},

Region: cmd.AwsDynamoDbDestination.AwsCredConfig.Region,
Endpoint: *cmd.AwsDynamoDbDestination.AwsCredConfig.Endpoint,
Credentials: buildBenthosAwsCredentials(cmd),
},
},
},
},
}
return &benthosConfigResponse{
Name: table,
Config: bc,
DependsOn: []*tabledependency.DependsOn{},
Table: table,
Columns: []string{},
}
}

func buildBenthosAwsCredentials(cmd *cmdConfig) *neosync_benthos.AwsCredentials {
if cmd.AwsDynamoDbDestination == nil || cmd.AwsDynamoDbDestination.AwsCredConfig == nil {
return nil
}
cc := cmd.AwsDynamoDbDestination.AwsCredConfig
creds := &neosync_benthos.AwsCredentials{}
if cc.Profile != nil {
creds.Profile = *cc.Profile
}
if cc.AccessKeyID != nil {
creds.Id = *cc.AccessKeyID
}
if cc.SecretAccessKey != nil {
creds.Secret = *cc.SecretAccessKey
}
if cc.SessionToken != nil {
creds.Token = *cc.SessionToken
}
if cc.RoleARN != nil {
creds.Role = *cc.RoleARN
}
if cc.RoleExternalID != nil {
creds.RoleExternalId = *cc.RoleExternalID
}
return creds
}
108 changes: 108 additions & 0 deletions cli/internal/cmds/neosync/sync/job.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package sync_cmd

import (
"fmt"

"github.com/google/uuid"
mgmtv1alpha1 "github.com/nucleuscloud/neosync/backend/gen/go/protos/mgmt/v1alpha1"
)

func createJob(
cmd *cmdConfig,
sourceConnection *mgmtv1alpha1.Connection,
destinationConnection *mgmtv1alpha1.Connection,
sourceSchema []*mgmtv1alpha1.DatabaseColumn,
) (*mgmtv1alpha1.Job, error) {
sourceConnOpts, err := toJobSourceOption(sourceConnection)
if err != nil {
return nil, err
}
jobId := uuid.NewString()
if cmd.Source.ConnectionOpts != nil && cmd.Source.ConnectionOpts.JobId != nil && *cmd.Source.ConnectionOpts.JobId != "" {
jobId = *cmd.Source.ConnectionOpts.JobId
}
return &mgmtv1alpha1.Job{
Id: jobId,
Name: "cli-sync",
AccountId: *cmd.AccountId,
Source: &mgmtv1alpha1.JobSource{
Options: sourceConnOpts,
},
Destinations: []*mgmtv1alpha1.JobDestination{toJobDestination(cmd, destinationConnection)},
Mappings: toJobMappings(sourceSchema),
}, nil
}

func toJobDestination(cmd *cmdConfig, destinationConnection *mgmtv1alpha1.Connection) *mgmtv1alpha1.JobDestination {
return &mgmtv1alpha1.JobDestination{
ConnectionId: destinationConnection.Id,
Id: uuid.NewString(),
Options: cmdConfigToDestinationConnectionOptions(cmd),
}
}

func toJobSourceOption(sourceConnection *mgmtv1alpha1.Connection) (*mgmtv1alpha1.JobSourceOptions, error) {
switch sourceConnection.ConnectionConfig.Config.(type) {
case *mgmtv1alpha1.ConnectionConfig_PgConfig:
return &mgmtv1alpha1.JobSourceOptions{
Config: &mgmtv1alpha1.JobSourceOptions_Postgres{
Postgres: &mgmtv1alpha1.PostgresSourceConnectionOptions{
ConnectionId: sourceConnection.Id,
},
},
}, nil
case *mgmtv1alpha1.ConnectionConfig_MysqlConfig:
return &mgmtv1alpha1.JobSourceOptions{
Config: &mgmtv1alpha1.JobSourceOptions_Mysql{
Mysql: &mgmtv1alpha1.MysqlSourceConnectionOptions{
ConnectionId: sourceConnection.Id,
},
},
}, nil
case *mgmtv1alpha1.ConnectionConfig_AwsS3Config:
return &mgmtv1alpha1.JobSourceOptions{
Config: &mgmtv1alpha1.JobSourceOptions_AwsS3{
AwsS3: &mgmtv1alpha1.AwsS3SourceConnectionOptions{
ConnectionId: sourceConnection.Id,
},
},
}, nil
default:
return nil, fmt.Errorf("unsupported connection type")
}
}

// if is generated and not idenity then set to generate default
func toJobMappings(sourceSchema []*mgmtv1alpha1.DatabaseColumn) []*mgmtv1alpha1.JobMapping {
mappings := []*mgmtv1alpha1.JobMapping{}

for _, colInfo := range sourceSchema {
mappings = append(mappings, &mgmtv1alpha1.JobMapping{
Schema: colInfo.Schema,
Table: colInfo.Table,
Column: colInfo.Column,
Transformer: toTransformer(colInfo),
})
}

return mappings
}

func toTransformer(colInfo *mgmtv1alpha1.DatabaseColumn) *mgmtv1alpha1.JobMappingTransformer {
if colInfo.GeneratedType != nil && colInfo.GetGeneratedType() != "" {
return &mgmtv1alpha1.JobMappingTransformer{
Config: &mgmtv1alpha1.TransformerConfig{
Config: &mgmtv1alpha1.TransformerConfig_GenerateDefaultConfig{
GenerateDefaultConfig: &mgmtv1alpha1.GenerateDefault{},
},
},
}
}
return &mgmtv1alpha1.JobMappingTransformer{
Config: &mgmtv1alpha1.TransformerConfig{
Config: &mgmtv1alpha1.TransformerConfig_PassthroughConfig{
PassthroughConfig: &mgmtv1alpha1.Passthrough{},
},
},
}
}
Loading

0 comments on commit 9a06f7e

Please sign in to comment.