Skip to content

Commit

Permalink
Enable using AWS Secrets from Secret Manager for SQL DSN URLs (#135)
Browse files Browse the repository at this point in the history
Enable using AWS Secrets from Secret Manager for SQL DSN URLs 
---------
Signed-off-by: Jem Davies <[email protected]>
  • Loading branch information
jem-davies authored Oct 21, 2024
1 parent bd5c585 commit e704503
Show file tree
Hide file tree
Showing 25 changed files with 1,352 additions and 30 deletions.
9 changes: 5 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ require (
github.com/PaesslerAG/jsonpath v0.1.1
github.com/apache/pulsar-client-go v0.12.0
github.com/aws/aws-lambda-go v1.46.0
github.com/aws/aws-sdk-go-v2 v1.25.0
github.com/aws/aws-sdk-go-v2 v1.32.2
github.com/aws/aws-sdk-go-v2/config v1.26.6
github.com/aws/aws-sdk-go-v2/credentials v1.16.16
github.com/aws/aws-sdk-go-v2/feature/dynamodb/expression v1.6.16
Expand Down Expand Up @@ -195,8 +195,8 @@ require (
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.0 // indirect
github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.12.16 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.14.11 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.0 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.0 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.21 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.21 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.7.3 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.2.10 // indirect
github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.18.7 // indirect
Expand All @@ -205,9 +205,10 @@ require (
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.8.11 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.10.10 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.16.10 // indirect
github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.34.2
github.com/aws/aws-sdk-go-v2/service/sso v1.18.7 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.21.7 // indirect
github.com/aws/smithy-go v1.20.0 // indirect
github.com/aws/smithy-go v1.22.0 // indirect
github.com/aymerick/douceur v0.2.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bits-and-blooms/bitset v1.4.0 // indirect
Expand Down
18 changes: 10 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -769,8 +769,8 @@ github.com/aws/aws-lambda-go v1.46.0/go.mod h1:dpMpZgvWx5vuQJfBt0zqBha60q7Dd7Rfg
github.com/aws/aws-sdk-go v1.30.19/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0=
github.com/aws/aws-sdk-go v1.42.37/go.mod h1:OGr6lGMAKGlG9CVrYnWYDKIyb829c6EVBRjxqjmPepc=
github.com/aws/aws-sdk-go-v2 v1.7.1/go.mod h1:L5LuPC1ZgDr2xQS7AmIec/Jlc7O/Y1u2KxJyNVab250=
github.com/aws/aws-sdk-go-v2 v1.25.0 h1:sv7+1JVJxOu/dD/sz/csHX7jFqmP001TIY7aytBWDSQ=
github.com/aws/aws-sdk-go-v2 v1.25.0/go.mod h1:G104G1Aho5WqF+SR3mDIobTABQzpYV0WxMsKxlMggOA=
github.com/aws/aws-sdk-go-v2 v1.32.2 h1:AkNLZEyYMLnx/Q/mSKkcMqwNFXMAvFto9bNsHqcTduI=
github.com/aws/aws-sdk-go-v2 v1.32.2/go.mod h1:2SK5n0a2karNTv5tbP1SjsX0uhttou00v/HpXKM1ZUo=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.0 h1:2UO6/nT1lCZq1LqM67Oa4tdgP1CvL1sLSxvuD+VrOeE=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.0/go.mod h1:5zGj2eA85ClyedTDK+Whsu+w9yimnVIZvhvBKrDquM8=
github.com/aws/aws-sdk-go-v2/config v1.5.0/go.mod h1:RWlPOAW3E3tbtNAqTwvSW54Of/yP3oiZXMI0xfUdjyA=
Expand All @@ -789,10 +789,10 @@ github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.14.11/go.mod h1:cRrYDYAMUohBJUt
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.3.2/go.mod h1:qaqQiHSrOUVOfKe6fhgQ6UzhxjwqVW8aHNegd6Ws4w4=
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.15.15 h1:2MUXyGW6dVaQz6aqycpbdLIH1NMcUI6kW6vQ0RabGYg=
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.15.15/go.mod h1:aHbhbR6WEQgHAiRj41EQ2W47yOYwNtIkWTXmcAtYqj8=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.0 h1:NPs/EqVO+ajwOoq56EfcGKa3L3ruWuazkIw1BqxwOPw=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.0/go.mod h1:D+duLy2ylgatV+yTlQ8JTuLfDD0BnFvnQRc+o6tbZ4M=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.0 h1:ks7KGMVUMoDzcxNWUlEdI+/lokMFD136EL6DWmUOV80=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.0/go.mod h1:hL6BWM/d/qz113fVitZjbXR0E+RCTU1+x+1Idyn5NgE=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.21 h1:UAsR3xA31QGf79WzpG/ixT9FZvQlh5HY1NRqSHBNOCk=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.21/go.mod h1:JNr43NFf5L9YaG3eKTm7HQzls9J+A9YYcGI5Quh1r2Y=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.21 h1:6jZVETqmYCadGFvrYEQfC5fAQmlo80CeL5psbno6r0s=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.21/go.mod h1:1SR0GbLlnN3QUmYaflZNiH1ql+1qrSiB2vwcJ+4UM60=
github.com/aws/aws-sdk-go-v2/internal/ini v1.1.1/go.mod h1:Zy8smImhTdOETZqfyn01iNOe0CNggVbPjCajyaz6Gvg=
github.com/aws/aws-sdk-go-v2/internal/ini v1.7.3 h1:n3GDfwqF2tzEkXlv5cuy4iy7LpKDtqDMcNLfZDu9rls=
github.com/aws/aws-sdk-go-v2/internal/ini v1.7.3/go.mod h1:6fQQgfuGmw8Al/3M2IgIllycxV7ZW7WCdVSqfBeUiCY=
Expand Down Expand Up @@ -826,6 +826,8 @@ github.com/aws/aws-sdk-go-v2/service/lambda v1.50.0/go.mod h1:yEO3Ejj0qBhdIDlRYQ
github.com/aws/aws-sdk-go-v2/service/s3 v1.11.1/go.mod h1:XLAGFrEjbvMCLvAtWLLP32yTv8GpBquCApZEycDLunI=
github.com/aws/aws-sdk-go-v2/service/s3 v1.48.1 h1:5XNlsBsEvBZBMO6p82y+sqpWg8j5aBCe+5C2GBFgqBQ=
github.com/aws/aws-sdk-go-v2/service/s3 v1.48.1/go.mod h1:4qXHrG1Ne3VGIMZPCB8OjH/pLFO94sKABIusjh0KWPU=
github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.34.2 h1:Rrqru2wYkKQCS2IM5/JrgKUQIoNTqA6y/iuxkjzxC6M=
github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.34.2/go.mod h1:QuCURO98Sqee2AXmqDNxKXYFm2OEDAVAPApMqO0Vqnc=
github.com/aws/aws-sdk-go-v2/service/sns v1.27.0 h1:Qa8B9/cgLWNt5zNogF81CuT+Nh+XkzW+hkfO784u1bs=
github.com/aws/aws-sdk-go-v2/service/sns v1.27.0/go.mod h1:uaz2BGV8LQxQPlNmuUcqFS9Bf6n+OY3y8cNukcQSTRw=
github.com/aws/aws-sdk-go-v2/service/sqs v1.29.7 h1:tRNrFDGRm81e6nTX5Q4CFblea99eAfm0dxXazGpLceU=
Expand All @@ -839,8 +841,8 @@ github.com/aws/aws-sdk-go-v2/service/sts v1.6.0/go.mod h1:q7o0j7d7HrJk/vr9uUt3BV
github.com/aws/aws-sdk-go-v2/service/sts v1.26.7 h1:NzO4Vrau795RkUdSHKEwiR01FaGzGOH1EETJ+5QHnm0=
github.com/aws/aws-sdk-go-v2/service/sts v1.26.7/go.mod h1:6h2YuIoxaMSCFf5fi1EgZAwdfkGMgDY+DVfa61uLe4U=
github.com/aws/smithy-go v1.6.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E=
github.com/aws/smithy-go v1.20.0 h1:6+kZsCXZwKxZS9RfISnPc4EXlHoyAkm2hPuM8X2BrrQ=
github.com/aws/smithy-go v1.20.0/go.mod h1:uo5RKksAl4PzhqaAbjd4rLgFoq5koTsQKYuGe7dklGc=
github.com/aws/smithy-go v1.22.0 h1:uunKnWlcoL3zO7q+gG2Pk53joueEOsnNB28QdMsmiMM=
github.com/aws/smithy-go v1.22.0/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg=
github.com/aymerick/douceur v0.2.0 h1:Mv+mAeH1Q+n9Fr+oyamOlAkUNPWPlA8PPGR0QAaYuPk=
github.com/aymerick/douceur v0.2.0/go.mod h1:wlT5vV2O3h55X9m7iVYN0TBM0NH/MmbLnd30/FjWUq4=
github.com/beanstalkd/go-beanstalk v0.2.0 h1:6UOJugnu47uNB2jJO/lxyDgeD1Yds7owYi1USELqexA=
Expand Down
1 change: 1 addition & 0 deletions internal/impl/aws/integration_s3_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//nolint:staticcheck // Ignore SA1019
package aws

import (
Expand Down
3 changes: 0 additions & 3 deletions internal/impl/gcp/output_bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,9 +276,6 @@ func newGCPBigQueryOutput(
return g, nil
}

// _, isStatic := g.conf.TableID.Static()
// if

var err error
if g.fieldDelimiterBytes, err = convertToIso(g.fieldDelimiterBytes); err != nil {
return nil, fmt.Errorf("error parsing csv.field_delimiter field: %w", err)
Expand Down
12 changes: 11 additions & 1 deletion internal/impl/sql/cache_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ import (
"github.com/Jeffail/shutdown"

"github.com/warpstreamlabs/bento/public/service"

"github.com/aws/aws-sdk-go-v2/aws"
bento_aws "github.com/warpstreamlabs/bento/internal/impl/aws"
)

const (
Expand Down Expand Up @@ -97,6 +100,8 @@ type sqlCache struct {
upsertBuilder squirrel.InsertBuilder
deleteBuilder squirrel.DeleteBuilder

awsConf aws.Config

logger *service.Logger
shutSig *shutdown.Signaller
}
Expand Down Expand Up @@ -161,7 +166,12 @@ func newSQLCacheFromConfig(conf *service.ParsedConfig, mgr *service.Resources) (
return nil, err
}

if s.db, err = sqlOpenWithReworks(context.Background(), s.logger, s.driver, s.dsn, connSettings.initVerifyConn); err != nil {
s.awsConf, err = bento_aws.GetSession(context.Background(), conf)
if err != nil {
return nil, err
}

if s.db, err = sqlOpenWithReworks(context.Background(), s.logger, s.driver, s.dsn, connSettings, s.awsConf); err != nil {
return nil, err
}
connSettings.apply(context.Background(), s.db, s.logger)
Expand Down
97 changes: 94 additions & 3 deletions internal/impl/sql/conn_fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,17 @@ package sql
import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"net/url"
"strings"
"sync"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/secretsmanager"
"github.com/warpstreamlabs/bento/internal/impl/aws/config"
"github.com/warpstreamlabs/bento/public/service"
)

Expand Down Expand Up @@ -46,7 +51,8 @@ The ` + "[`gocosmos`](https://pkg.go.dev/github.com/microsoft/gocosmos)" + ` dri
Example("oracle://foouser:foopass@localhost:1521/service_name")

func connFields() []*service.ConfigField {
return []*service.ConfigField{

connFields := []*service.ConfigField{
service.NewStringListField("init_files").
Description(`
An optional list of file paths containing SQL statements to execute immediately upon the first connection to the target database. This is a useful way to initialise tables before processing data. Glob patterns are supported, including super globs (double star).
Expand Down Expand Up @@ -102,7 +108,16 @@ CREATE TABLE IF NOT EXISTS some_table (
Description("An optional maximum number of open connections to the database. If conn_max_idle is greater than 0 and the new conn_max_open is less than conn_max_idle, then conn_max_idle will be reduced to match the new conn_max_open limit. If `value <= 0`, then there is no limit on the number of open connections. The default is 0 (unlimited).").
Optional().
Advanced(),
service.NewStringField("secret_name").
Description("An optional field that can be used to get the Username + Password from AWS Secrets Manager. This will overwrite the Username + Password in the DSN with the values from the Secret only if the driver is set to postgres.").
Optional().
Advanced(),
}

connFields = append(connFields, config.SessionFields()...)

return connFields

}

func rawQueryField() *service.ConfigField {
Expand Down Expand Up @@ -134,6 +149,7 @@ type connSettings struct {
initFileStatements [][2]string // (path,statement)
initStatement string
initVerifyConn bool
secretName string
}

func (c *connSettings) apply(ctx context.Context, db *sql.DB, log *service.Logger) {
Expand Down Expand Up @@ -222,6 +238,12 @@ func connSettingsFromParsed(
}
}

if conf.Contains("secret_name") {
if c.secretName, err = conf.FieldString("secret_name"); err != nil {
return
}
}

return
}

Expand Down Expand Up @@ -257,7 +279,67 @@ func reworkDSN(driver, dsn string) (string, error) {
return dsn, nil
}

func sqlOpenWithReworks(ctx context.Context, logger *service.Logger, driver, dsn string, shouldPing bool) (*sql.DB, error) {
func getSecretFromAWSSecretManager(secretName string, awsConf aws.Config) (secretString string, err error) {
svc := secretsmanager.NewFromConfig(awsConf)

input := &secretsmanager.GetSecretValueInput{
SecretId: aws.String(secretName),
}
result, err := svc.GetSecretValue(context.TODO(), input)
if err != nil {
return "", err
}

return *result.SecretString, nil
}

func BuildAwsDsn(dsn string, driver string, secretName string, awsConf aws.Config, getSecretFunc func(secretName string, awsConf aws.Config) (awsSecret string, err error)) (awsSecretDsn string, err error) {
if secretName != "" && driver == "postgres" {

parsedDSN, err := url.Parse(dsn)
if err != nil {
return "", fmt.Errorf("error parsing DSN URL: %w", err)
}

username := parsedDSN.User.Username()
password, _ := parsedDSN.User.Password()
host := parsedDSN.Hostname()
port := parsedDSN.Port()
path := parsedDSN.Path
rawQuery := parsedDSN.RawQuery

secretString, err := getSecretFunc(secretName, awsConf)
if err != nil {
return "", fmt.Errorf("error retrieving secret: %w", err)
}

var secrets map[string]interface{}
if err := json.Unmarshal([]byte(secretString), &secrets); err != nil {
return "", fmt.Errorf("error unmarshalling secret: %w", err)
}

if val, ok := secrets["username"].(string); ok && val != "" {
username = val
}
if val, ok := secrets["password"].(string); ok && val != "" {
password = val
}

newDSN := fmt.Sprintf("postgres://%s:%s@%s:%s%s", url.QueryEscape(username), url.QueryEscape(password), host, port, path)
if rawQuery != "" {
newDSN = fmt.Sprintf("%s?%s", newDSN, rawQuery)
}

return newDSN, nil

} else if secretName != "" && driver != "postgres" {
return "", errors.New("secret_name with DSN info currently only works for postgres DSNs")
}

return dsn, nil
}

func sqlOpenWithReworks(ctx context.Context, logger *service.Logger, driver, dsn string, connSettings *connSettings, awsConf aws.Config) (*sql.DB, error) {
updatedDSN, err := reworkDSN(driver, dsn)
if err != nil {
return nil, err
Expand All @@ -267,12 +349,21 @@ func sqlOpenWithReworks(ctx context.Context, logger *service.Logger, driver, dsn
logger.Warnf("Detected old-style Clickhouse Data Source Name: '%v', replacing with new style: '%v'", dsn, updatedDSN)
}

updatedDSN, err = BuildAwsDsn(dsn, driver, connSettings.secretName, awsConf, getSecretFromAWSSecretManager)
if err != nil {
return nil, err
}

if updatedDSN != dsn {
logger.Infof("Updated dsn with info from AWS Secret '%v'", connSettings.secretName)
}

db, err := sql.Open(driver, updatedDSN)
if err != nil {
return nil, err
}

if shouldPing {
if connSettings.initVerifyConn {
if err := db.PingContext(ctx); err != nil {
_ = db.Close()
return nil, fmt.Errorf("could not establish connection to database: %w", err)
Expand Down
96 changes: 96 additions & 0 deletions internal/impl/sql/conn_fields_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,19 @@ package sql_test

import (
"context"
"encoding/json"
"errors"
"fmt"
"os"
"path/filepath"
"testing"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

bento_sql "github.com/warpstreamlabs/bento/internal/impl/sql"
"github.com/warpstreamlabs/bento/public/service"

_ "github.com/warpstreamlabs/bento/public/components/pure"
Expand Down Expand Up @@ -184,3 +188,95 @@ sql_select:
`{"bar":"third bar","baz":"third baz","foo":"third"}`,
}, msgs)
}

func mockGetSecretFromAWS(secretName string, awsConf aws.Config) (secretString string, err error) {
var secret map[string]interface{}
if secretName == "validFullSecret" {
secret = map[string]interface{}{
"username": "testUser",
"password": "testPassword",
"host": "testHost",
"port": 5432,
"dbName": "testDB",
}
} else if secretName == "validUserPassSecret" {
secret = map[string]interface{}{
"username": "testUser",
"password": "testPassword",
}
} else if secretName == "SecretDoesNotExist" {
return "", errors.New("ResourceNotFoundException: Secrets Manager can't find the specified secret.")
}
secretBytes, _ := json.Marshal(secret)
return string(secretBytes), nil
}

func TestBuildAwsDsn(t *testing.T) {
awsConf := aws.Config{}

tests := []struct {
name string
dsn string
driver string
secretName string
expectedDSN string
expectedError bool
errorValue string
}{
{
name: "validFullSecretTest",
dsn: "postgres://user:password@host:5432/dbname?param1=value1&param2=value2",
driver: "postgres",
secretName: "validFullSecret",
expectedDSN: "postgres://testUser:testPassword@host:5432/dbname?param1=value1&param2=value2",
expectedError: false,
},
{
name: "validUserPassSecretTest",
dsn: "postgres://user:password@host:5432/dbname?param1=value1&param2=value2",
driver: "postgres",
secretName: "validUserPassSecret",
expectedDSN: "postgres://testUser:testPassword@host:5432/dbname?param1=value1&param2=value2",
expectedError: false,
},
{
name: "SecretNotFoundTest",
dsn: "postgres://user:password@host:5432/dbname?param1=value1&param2=value2",
driver: "postgres",
secretName: "SecretDoesNotExist",
expectedDSN: "postgres://testUser:testPassword@host:5432/dbname?param1=value1&param2=value2",
expectedError: true,
errorValue: "error retrieving secret: ResourceNotFoundException: Secrets Manager can't find the specified secret.",
},
{
name: "DriverNotPostgresTest",
dsn: "mysql://root@localhost/username",
driver: "mysql",
secretName: "validFullSecret",
expectedDSN: "",
expectedError: true,
errorValue: "secret_name with DSN info currently only works for postgres DSNs",
},
{
name: "NoSecretName",
dsn: "postgres://user:password@host:5432/dbname?param1=value1&param2=value2",
driver: "postgres",
secretName: "",
expectedDSN: "postgres://user:password@host:5432/dbname?param1=value1&param2=value2",
expectedError: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
awsSecretDsn, err := bento_sql.BuildAwsDsn(tt.dsn, tt.driver, tt.secretName, awsConf, mockGetSecretFromAWS)
if tt.expectedError {
assert.Error(t, err)
assert.Equal(t, tt.errorValue, err.Error())
} else {
assert.NoError(t, err)
assert.Equal(t, tt.expectedDSN, awsSecretDsn)
}
})
}
}
Loading

0 comments on commit e704503

Please sign in to comment.