Skip to content

Commit

Permalink
add S3 minimum part sized defined by the user
Browse files Browse the repository at this point in the history
Signed-off-by: Renan Rangel <[email protected]>
  • Loading branch information
rvrangel committed Nov 7, 2024
1 parent 9946ce8 commit 15898f4
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 9 deletions.
47 changes: 38 additions & 9 deletions go/vt/mysqlctl/s3backupstorage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/aws/aws-sdk-go-v2/service/s3/types"
transport "github.com/aws/smithy-go/endpoints"
"github.com/aws/smithy-go/middleware"
"github.com/dustin/go-humanize"
"github.com/spf13/pflag"

"vitess.io/vitess/go/vt/concurrency"
Expand Down Expand Up @@ -84,6 +85,11 @@ var (

// path component delimiter
delimiter = "/"

// minimum part size
minimumPartSize int64

ErrPartSize = errors.New("minimum S3 part size must be between 5MiB and 5GiB")
)

func registerFlags(fs *pflag.FlagSet) {
Expand All @@ -96,6 +102,7 @@ func registerFlags(fs *pflag.FlagSet) {
fs.BoolVar(&tlsSkipVerifyCert, "s3_backup_tls_skip_verify_cert", false, "skip the 'certificate is valid' check for SSL connections.")
fs.StringVar(&requiredLogLevel, "s3_backup_log_level", "LogOff", "determine the S3 loglevel to use from LogOff, LogDebug, LogDebugWithSigning, LogDebugWithHTTPBody, LogDebugWithRequestRetries, LogDebugWithRequestErrors.")
fs.StringVar(&sse, "s3_backup_server_side_encryption", "", "server-side encryption algorithm (e.g., AES256, aws:kms, sse_c:/path/to/key/file).")
fs.Int64Var(&minimumPartSize, "s3_backup_aws_minimum_partsize", 1024*1024*5, "Minimum part size to use")
}

func init() {
Expand Down Expand Up @@ -179,17 +186,13 @@ func (bh *S3BackupHandle) AddFile(ctx context.Context, filename string, filesize
return nil, fmt.Errorf("AddFile cannot be called on read-only backup")
}

// Calculate s3 upload part size using the source filesize
partSizeBytes := manager.DefaultUploadPartSize
if filesize > 0 {
minimumPartSize := float64(filesize) / float64(manager.MaxUploadParts)
// Round up to ensure large enough partsize
calculatedPartSizeBytes := int64(math.Ceil(minimumPartSize))
if calculatedPartSizeBytes > partSizeBytes {
partSizeBytes = calculatedPartSizeBytes
}
partSizeBytes, err := getPartSize(filesize)
if err != nil {
return nil, err
}

bh.bs.params.Logger.Infof("Using S3 upload part size: %s", humanize.IBytes(uint64(partSizeBytes)))

reader, writer := io.Pipe()
bh.waitGroup.Add(1)

Expand Down Expand Up @@ -537,6 +540,32 @@ func objName(parts ...string) string {
return res
}

// this is a helper to calculate the part size, taking into consideration the minimum part size
// passed in by an operator.
func getPartSize(filesize int64) (partSizeBytes int64, err error) {
// Calculate s3 upload part size using the source filesize
partSizeBytes = manager.DefaultUploadPartSize
if filesize > 0 {
minimumPartSize := float64(filesize) / float64(manager.MaxUploadParts)
// Round up to ensure large enough partsize
calculatedPartSizeBytes := int64(math.Ceil(minimumPartSize))
if calculatedPartSizeBytes > partSizeBytes {
partSizeBytes = calculatedPartSizeBytes
}
}

if minimumPartSize != 0 && partSizeBytes < minimumPartSize {
if minimumPartSize > 1024*1024*1024*5 || minimumPartSize < 1024*1024*5 { // 5GiB and 5MiB respectively
return 0, fmt.Errorf("%w, currently set to %s",
ErrPartSize, humanize.IBytes(uint64(minimumPartSize)),
)
}
partSizeBytes = int64(minimumPartSize)
}

return
}

func init() {
backupstorage.BackupStorageMap["s3"] = newS3BackupStorage()

Expand Down
65 changes: 65 additions & 0 deletions go/vt/mysqlctl/s3backupstorage/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,3 +328,68 @@ func TestWithParams(t *testing.T) {
assert.NotNil(t, s3.transport.DialContext)
assert.NotNil(t, s3.transport.Proxy)
}

func TestGetPartSize(t *testing.T) {
originalMinimum := minimumPartSize
defer func() { minimumPartSize = originalMinimum }()

tests := []struct {
name string
filesize int64
minimumPartSize int64
want int64
err error
}{
{
name: "minimum - 10 MiB",
filesize: 1024 * 1024 * 10, // 10 MiB
minimumPartSize: 1024 * 1024 * 5, // 5 MiB
want: 1024 * 1024 * 5, // 5 MiB,
err: nil,
},
{
name: "below minimum - 10 MiB",
filesize: 1024 * 1024 * 10, // 10 MiB
minimumPartSize: 1024 * 1024 * 8, // 8 MiB
want: 1024 * 1024 * 8, // 8 MiB,
err: nil,
},
{
name: "above minimum - 1 TiB",
filesize: 1024 * 1024 * 1024 * 1024, // 1 TiB
minimumPartSize: 1024 * 1024 * 5, // 5 MiB
want: 109951163, // ~104 MiB
err: nil,
},
{
name: "below minimum - 1 TiB",
filesize: 1024 * 1024 * 1024 * 1024, // 1 TiB
minimumPartSize: 1024 * 1024 * 200, // 200 MiB
want: 1024 * 1024 * 200, // 200 MiB
err: nil,
},
{
name: "below S3 limits - 5 MiB",
filesize: 1024 * 1024 * 3, // 3 MiB
minimumPartSize: 1024 * 1024 * 4, // 4 MiB
want: 1024 * 1024 * 5, // 5 MiB - should always return the minimum
err: nil,
},
{
name: "above S3 limits - 5 GiB",
filesize: 1024 * 1024 * 1024 * 1024, // 1 TiB
minimumPartSize: 1024 * 1024 * 1024 * 6, // 6 GiB
want: 0,
err: ErrPartSize,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
minimumPartSize = tt.minimumPartSize
partSize, err := getPartSize(tt.filesize)
require.ErrorIs(t, err, tt.err)
require.Equal(t, tt.want, partSize)
})
}
}

0 comments on commit 15898f4

Please sign in to comment.