Skip to content

Commit

Permalink
Switch to AWS SDK v2
Browse files Browse the repository at this point in the history
  • Loading branch information
NathanBaulch authored and funkyshu committed Jan 8, 2025
1 parent 7a10f16 commit 338e573
Show file tree
Hide file tree
Showing 17 changed files with 1,322 additions and 20,770 deletions.
19 changes: 19 additions & 0 deletions backend/s3/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package s3

import (
"context"

"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
"github.com/aws/aws-sdk-go-v2/service/s3"
)

type Client interface {
manager.DownloadAPIClient
manager.UploadAPIClient
CopyObject(ctx context.Context, in *s3.CopyObjectInput, opts ...func(*s3.Options)) (*s3.CopyObjectOutput, error)
DeleteObject(ctx context.Context, in *s3.DeleteObjectInput, opts ...func(*s3.Options)) (*s3.DeleteObjectOutput, error)
HeadBucket(ctx context.Context, in *s3.HeadBucketInput, opts ...func(*s3.Options)) (*s3.HeadBucketOutput, error)
HeadObject(ctx context.Context, in *s3.HeadObjectInput, opts ...func(*s3.Options)) (*s3.HeadObjectOutput, error)
ListObjects(ctx context.Context, in *s3.ListObjectsInput, opts ...func(*s3.Options)) (*s3.ListObjectsOutput, error)
ListObjectVersions(ctx context.Context, in *s3.ListObjectVersionsInput, opts ...func(*s3.Options)) (*s3.ListObjectVersionsOutput, error)
}
8 changes: 4 additions & 4 deletions backend/s3/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,12 @@ would have to be cast as s3.FileSystem to use the following:
)
// to pass specific client, for instance a mock client
s3apiMock := &mocks.S3API{}
s3apiMock.On("GetObject", mock.AnythingOfType("*s3.GetObjectInput")).
s3cliMock := &mocks.Client{}
s3cliMock.On("GetObject", matchContext, mock.AnythingOfType("*s3.GetObjectInput")).
Return(&s3.GetObjectOutput{
Body: nopCloser{bytes.NewBufferString("Hello world!")},
}, nil)
fs = fs.WithClient(s3apiMock)
fs = fs.WithClient(s3cliMock)
}
# Object ACL
Expand Down Expand Up @@ -86,6 +86,6 @@ and https://docs.aws.amazon.com/sdk-for-go/v1/developer-guide/configuring-sdk.ht
# See Also
See: https://github.com/aws/aws-sdk-go/tree/master/service/s3
See: https://github.com/aws/aws-sdk-go-v2/tree/main/service/s3
*/
package s3
125 changes: 56 additions & 69 deletions backend/s3/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,10 @@ import (
"strings"
"time"

"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3iface"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/aws/aws-sdk-go/service/s3/s3manager/s3manageriface"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/s3/types"

"github.com/c2fo/vfs/v6"
"github.com/c2fo/vfs/v6/mocks"
Expand Down Expand Up @@ -132,17 +131,14 @@ func (f *File) CopyToFile(file vfs.File) (err error) {

// if target is S3
if tf, ok := file.(*File); ok {
input, err := f.getCopyObjectInput(tf)
if err != nil {
return err
}
input := f.getCopyObjectInput(tf)
// if input is not nil, use it to natively copy object
if input != nil {
client, err := f.fileSystem.Client()
if err != nil {
return err
}
_, err = client.CopyObject(input)
_, err = client.CopyObject(context.Background(), input)
return err
}
}
Expand Down Expand Up @@ -225,7 +221,7 @@ func (f *File) Delete(opts ...options.DeleteOption) error {
}
}

_, err = client.DeleteObject(&s3.DeleteObjectInput{
_, err = client.DeleteObject(context.Background(), &s3.DeleteObjectInput{
Key: &f.key,
Bucket: &f.bucket,
})
Expand All @@ -240,7 +236,7 @@ func (f *File) Delete(opts ...options.DeleteOption) error {
}

for _, version := range objectVersions.Versions {
if _, err = client.DeleteObject(&s3.DeleteObjectInput{
if _, err = client.DeleteObject(context.Background(), &s3.DeleteObjectInput{
Key: &f.key,
Bucket: &f.bucket,
VersionId: version.VersionId,
Expand Down Expand Up @@ -338,11 +334,11 @@ func (f *File) tempToS3() error {
return err
}

uploader := getUploader(client, withUploadPartitionSize(f.getDownloadPartitionSize()))
uploader := manager.NewUploader(client, withUploadPartitionSize(f.getUploadPartitionSize()))
uploadInput := uploadInput(f)
uploadInput.Body = f.tempFileWriter

_, err = uploader.UploadWithContext(context.Background(), uploadInput)
_, err = uploader.Upload(context.Background(), uploadInput)
if err != nil {
return err
}
Expand Down Expand Up @@ -544,32 +540,35 @@ func (f *File) String() string {
/*
Private helper functions
*/
func (f *File) getAllObjectVersions(client s3iface.S3API) (*s3.ListObjectVersionsOutput, error) {
func (f *File) getAllObjectVersions(client Client) (*s3.ListObjectVersionsOutput, error) {
prefix := utils.RemoveLeadingSlash(f.key)
objVers, err := client.ListObjectVersions(&s3.ListObjectVersionsInput{
objVers, err := client.ListObjectVersions(context.Background(), &s3.ListObjectVersionsInput{
Bucket: &f.bucket,
Prefix: &prefix,
})
return objVers, err
}

func (f *File) getHeadObject() (*s3.HeadObjectOutput, error) {
headObjectInput := new(s3.HeadObjectInput).SetKey(f.key).SetBucket(f.bucket)
headObjectInput := &s3.HeadObjectInput{
Key: aws.String(f.key),
Bucket: aws.String(f.bucket),
}
client, err := f.fileSystem.Client()
if err != nil {
return nil, err
}

head, err := client.HeadObject(headObjectInput)
head, err := client.HeadObject(context.Background(), headObjectInput)

return head, handleExistsError(err)
}

// For copy from S3-to-S3 when credentials are the same between source and target, return *s3.CopyObjectInput or error
func (f *File) getCopyObjectInput(targetFile *File) (*s3.CopyObjectInput, error) {
func (f *File) getCopyObjectInput(targetFile *File) *s3.CopyObjectInput {
// first we must determine if we're using the same s3 credentials for source and target before doing a native copy
isSameAccount := false
var ACL string
var ACL types.ObjectCannedACL

// get content type from source
var contentType string
Expand Down Expand Up @@ -612,32 +611,28 @@ func (f *File) getCopyObjectInput(targetFile *File) (*s3.CopyObjectInput, error)
// PathEscape ensures we url-encode as required by the API, including double-encoding literals
copySourceKey := url.PathEscape(path.Join(f.bucket, f.key))

copyInput := new(s3.CopyObjectInput).
SetServerSideEncryption("AES256").
SetACL(ACL).
SetKey(targetFile.key).
SetBucket(targetFile.bucket).
SetCopySource(copySourceKey)
copyInput := &s3.CopyObjectInput{
ServerSideEncryption: types.ServerSideEncryptionAes256,
ACL: ACL,
Key: aws.String(targetFile.key),
Bucket: aws.String(targetFile.bucket),
CopySource: aws.String(copySourceKey),
}

// set content type if it exists
if contentType != "" {
copyInput.SetContentType(contentType)

Check failure on line 624 in backend/s3/file.go

View workflow job for this annotation

GitHub Actions / lint

copyInput.SetContentType undefined (type *"github.com/aws/aws-sdk-go-v2/service/s3".CopyObjectInput has no field or method SetContentType)) (typecheck)

Check failure on line 624 in backend/s3/file.go

View workflow job for this annotation

GitHub Actions / lint

copyInput.SetContentType undefined (type *"github.com/aws/aws-sdk-go-v2/service/s3".CopyObjectInput has no field or method SetContentType)

Check failure on line 624 in backend/s3/file.go

View workflow job for this annotation

GitHub Actions / lint

copyInput.SetContentType undefined (type *"github.com/aws/aws-sdk-go-v2/service/s3".CopyObjectInput has no field or method SetContentType)) (typecheck)

Check failure on line 624 in backend/s3/file.go

View workflow job for this annotation

GitHub Actions / lint

copyInput.SetContentType undefined (type *"github.com/aws/aws-sdk-go-v2/service/s3".CopyObjectInput has no field or method SetContentType)) (typecheck)
}

if f.fileSystem.options != nil && f.fileSystem.options.(Options).DisableServerSideEncryption {
copyInput.ServerSideEncryption = nil
}

// validate copyInput
if err := copyInput.Validate(); err != nil {
return nil, err
copyInput.ServerSideEncryption = ""
}

return copyInput, nil
return copyInput
}

// return nil if credentials aren't the same
return nil, nil
return nil
}

func (f *File) copyS3ToLocalTempReader(tmpFile *os.File) error {
Expand All @@ -647,34 +642,36 @@ func (f *File) copyS3ToLocalTempReader(tmpFile *os.File) error {
}

// Download file
input := new(s3.GetObjectInput).SetBucket(f.bucket).SetKey(f.key)
input := &s3.GetObjectInput{
Bucket: aws.String(f.bucket),
Key: aws.String(f.key),
}
opt := withDownloadPartitionSize(f.getDownloadPartitionSize())
_, err = getDownloader(client, opt).
DownloadWithContext(context.Background(), tmpFile, input)
_, err = manager.NewDownloader(client, opt).
Download(context.Background(), tmpFile, input)

return err
}

// TODO: need to provide an implementation-agnostic container for providing config options such as SSE
func uploadInput(f *File) *s3manager.UploadInput {
sseType := "AES256"
input := &s3manager.UploadInput{
func uploadInput(f *File) *s3.PutObjectInput {
input := &s3.PutObjectInput{
Bucket: &f.bucket,
Key: &f.key,
ServerSideEncryption: &sseType,
ServerSideEncryption: types.ServerSideEncryptionAes256,
}

if f.fileSystem.options == nil {
f.fileSystem.options = Options{}
}

if f.fileSystem.options.(Options).DisableServerSideEncryption {
input.ServerSideEncryption = nil
input.ServerSideEncryption = ""
}

if opts, ok := f.fileSystem.options.(Options); ok {
if opts.ACL != "" {
input.ACL = &opts.ACL
input.ACL = opts.ACL
}
}

Expand Down Expand Up @@ -743,10 +740,11 @@ func (f *File) getReader() (io.ReadCloser, error) {
f.reader = io.NopCloser(strings.NewReader(""))
} else {
// Create the request to get the object
input := new(s3.GetObjectInput).
SetBucket(f.bucket).
SetKey(f.key).
SetRange(fmt.Sprintf("bytes=%d-", f.cursorPos))
input := &s3.GetObjectInput{
Bucket: aws.String(f.bucket),
Key: aws.String(f.key),
Range: aws.String(fmt.Sprintf("bytes=%d-", f.cursorPos)),
}

// Get the client
client, err := f.fileSystem.Client()
Expand All @@ -755,7 +753,7 @@ func (f *File) getReader() (io.ReadCloser, error) {
}

// Request the object
result, err := client.GetObject(input)
result, err := client.GetObject(context.Background(), input)
if err != nil {
return nil, err
}
Expand All @@ -770,12 +768,9 @@ func (f *File) getReader() (io.ReadCloser, error) {

func handleExistsError(err error) error {
if err != nil {
var awsErr awserr.Error
if errors.As(err, &awsErr) {
switch awsErr.Code() {
case s3.ErrCodeNoSuchKey, s3.ErrCodeNoSuchBucket, "NotFound":
return vfs.ErrNotExist
}
var kerr *types.NoSuchKey
if errors.As(err, &kerr) {
return vfs.ErrNotExist
}
return err
}
Expand Down Expand Up @@ -828,15 +823,15 @@ func (f *File) getS3Writer() (*io.PipeWriter, error) {
if err != nil {
return nil, err
}
uploader := getUploader(client, withUploadPartitionSize(f.getUploadPartitionSize()))
uploader := manager.NewUploader(client, withUploadPartitionSize(f.getUploadPartitionSize()))
ctx, cancel := context.WithCancel(context.Background())
f.cancelFunc = cancel
uploadInput := uploadInput(f)
uploadInput.Body = pr

go func(input *s3manager.UploadInput) {
go func(input *s3.PutObjectInput) {
defer cancel()
_, err := uploader.UploadWithContext(ctx, input)
_, err := uploader.Upload(ctx, input)
if err != nil {
_ = pw.CloseWithError(err)
}
Expand Down Expand Up @@ -870,22 +865,14 @@ func (f *File) getDownloadPartitionSize() int64 {
return partSize
}

func withDownloadPartitionSize(partSize int64) func(*s3manager.Downloader) {
return func(d *s3manager.Downloader) {
func withDownloadPartitionSize(partSize int64) func(*manager.Downloader) {
return func(d *manager.Downloader) {
d.PartSize = partSize
}
}

func withUploadPartitionSize(partSize int64) func(*s3manager.Uploader) {
return func(u *s3manager.Uploader) {
func withUploadPartitionSize(partSize int64) func(*manager.Uploader) {
return func(u *manager.Uploader) {
u.PartSize = partSize
}
}

var getDownloader = func(client s3iface.S3API, opts ...func(d *s3manager.Downloader)) s3manageriface.DownloaderAPI {
return s3manager.NewDownloaderWithClient(client, opts...)
}

var getUploader = func(client s3iface.S3API, opts ...func(d *s3manager.Uploader)) s3manageriface.UploaderAPI {
return s3manager.NewUploaderWithClient(client, opts...)
}
14 changes: 5 additions & 9 deletions backend/s3/fileSystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,6 @@ import (
"fmt"
"path"

"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3iface"

"github.com/c2fo/vfs/v6"
"github.com/c2fo/vfs/v6/backend"
"github.com/c2fo/vfs/v6/options"
Expand All @@ -20,7 +17,7 @@ const name = "AWS S3"

// FileSystem implements vfs.FileSystem for the S3 file system.
type FileSystem struct {
client s3iface.S3API
client Client
options vfs.Options
}

Expand Down Expand Up @@ -81,7 +78,7 @@ func (fs *FileSystem) Scheme() string {

// Client returns the underlying aws s3 client, creating it, if necessary
// See Overview for authentication resolution
func (fs *FileSystem) Client() (s3iface.S3API, error) {
func (fs *FileSystem) Client() (Client, error) {
if fs.client == nil {
if fs.options == nil {
fs.options = Options{}
Expand Down Expand Up @@ -113,15 +110,14 @@ func (fs *FileSystem) WithOptions(opts vfs.Options) *FileSystem {

// WithClient passes in an s3 client and returns the file system (chainable)
func (fs *FileSystem) WithClient(client interface{}) *FileSystem {
switch client.(type) {
case *s3.S3, s3iface.S3API:
fs.client = client.(s3iface.S3API)
if c, ok := client.(Client); ok {
fs.client = c
fs.options = nil
}
return fs
}

// NewFileSystem initializer for FileSystem struct accepts aws-sdk S3API client and returns FileSystem or error.
// NewFileSystem initializer for FileSystem struct accepts aws-sdk client and returns Filesystem or error.
func NewFileSystem() *FileSystem {
return &FileSystem{}
}
Expand Down
Loading

0 comments on commit 338e573

Please sign in to comment.