Skip to content

Commit

Permalink
fix s3 multi p2
Browse files Browse the repository at this point in the history
  • Loading branch information
mmaxim committed Aug 15, 2024
1 parent d2e862c commit 3212f60
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 23 deletions.
43 changes: 28 additions & 15 deletions go/chat/s3/multi.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,14 @@ func (b *Bucket) ListMulti(ctx context.Context, prefix, delim string) (multis []
"prefix": {prefix},
"delimiter": {delim},
}
headers := map[string][]string{}
b.addTokenHeader(headers)
for attempt := b.S3.AttemptStrategy.Start(); attempt.Next(); {
req := &request{
method: "GET",
bucket: b.Name,
params: params,
method: "GET",
bucket: b.Name,
params: params,
headers: headers,
}
var resp listMultiResp
err := b.S3.query(ctx, req, &resp)
Expand Down Expand Up @@ -165,7 +168,7 @@ func (m *Multi) putPart(ctx context.Context, n int, r io.ReadSeeker, partSize in
"Content-Length": {strconv.FormatInt(partSize, 10)},
"Content-MD5": {md5b64},
}
b.addTokenHeader(headers)
m.Bucket.addTokenHeader(headers)
params := map[string][]string{
"uploadId": {m.UploadID},
"partNumber": {strconv.FormatInt(int64(n), 10)},
Expand Down Expand Up @@ -249,13 +252,17 @@ func (m *Multi) ListParts(ctx context.Context) ([]Part, error) {
"uploadId": {m.UploadID},
"max-parts": {strconv.FormatInt(int64(listPartsMax), 10)},
}
headers := map[string][]string{}
m.Bucket.addTokenHeader(headers)

var parts partSlice
for attempt := m.Bucket.S3.AttemptStrategy.Start(); attempt.Next(); {
req := &request{
method: "GET",
bucket: m.Bucket.Name,
path: m.Key,
params: params,
method: "GET",
bucket: m.Bucket.Name,
path: m.Key,
params: params,
headers: headers,
}
var resp listPartsResp
err := m.Bucket.S3.query(ctx, req, &resp)
Expand Down Expand Up @@ -385,15 +392,17 @@ func (m *Multi) Complete(ctx context.Context, parts []Part) error {

// Setting Content-Length prevents breakage on DreamObjects
for attempt := m.Bucket.S3.AttemptStrategy.Start(); attempt.Next(); {
headers := map[string][]string{
"Content-Length": {strconv.Itoa(len(data))},
}
m.Bucket.addTokenHeader(headers)
req := &request{
method: "POST",
bucket: m.Bucket.Name,
path: m.Key,
params: params,
payload: bytes.NewReader(data),
headers: map[string][]string{
"Content-Length": {strconv.Itoa(len(data))},
},
headers: headers,
}

resp := &completeResponse{}
Expand Down Expand Up @@ -434,12 +443,16 @@ func (m *Multi) Abort(ctx context.Context) error {
params := map[string][]string{
"uploadId": {m.UploadID},
}
headers := map[string][]string{}
m.Bucket.addTokenHeader(headers)

for attempt := m.Bucket.S3.AttemptStrategy.Start(); attempt.Next(); {
req := &request{
method: "DELETE",
bucket: m.Bucket.Name,
path: m.Key,
params: params,
method: "DELETE",
bucket: m.Bucket.Name,
path: m.Key,
params: params,
headers: headers,
}
err := m.Bucket.S3.query(ctx, req, nil)
if shouldRetry(err) && attempt.HasNext() {
Expand Down
34 changes: 26 additions & 8 deletions go/chat/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,10 +225,14 @@ func (b *Bucket) PutBucket(ctx context.Context, perm ACL) error {
//
// See http://goo.gl/GoBrY for details.
func (b *Bucket) DelBucket() (err error) {
headers := map[string][]string{}
b.addTokenHeader(headers)

req := &request{
method: "DELETE",
bucket: b.Name,
path: "/",
method: "DELETE",
bucket: b.Name,
path: "/",
headers: headers,
}
for attempt := b.S3.AttemptStrategy.Start(); attempt.Next(); {
err = b.S3.query(context.Background(), req, nil)
Expand Down Expand Up @@ -321,10 +325,14 @@ func (b *Bucket) GetResponseWithHeaders(ctx context.Context, path string, header

// Exists checks whether or not an object exists on an S3 bucket using a HEAD request.
func (b *Bucket) Exists(path string) (exists bool, err error) {
headers := map[string][]string{}
b.addTokenHeader(headers)

req := &request{
method: "HEAD",
bucket: b.Name,
path: path,
method: "HEAD",
bucket: b.Name,
path: path,
headers: headers,
}
err = b.S3.prepare(req)
if err != nil {
Expand Down Expand Up @@ -543,6 +551,7 @@ func (b *Bucket) PutBucketSubresource(subresource string, r io.Reader, length in
headers := map[string][]string{
"Content-Length": {strconv.FormatInt(length, 10)},
}
b.addTokenHeader(headers)
req := &request{
path: "/",
method: "PUT",
Expand All @@ -559,6 +568,9 @@ func (b *Bucket) PutBucketSubresource(subresource string, r io.Reader, length in
//
// See http://goo.gl/APeTt for details.
func (b *Bucket) Del(ctx context.Context, path string) error {
headers := map[string][]string{}
b.addTokenHeader(headers)

req := &request{
method: "DELETE",
bucket: b.Name,
Expand Down Expand Up @@ -598,6 +610,8 @@ func (b *Bucket) DelMulti(objects Delete) error {
"Content-MD5": {base64.StdEncoding.EncodeToString(digest.Sum(nil))},
"Content-Type": {"text/xml"},
}
b.addTokenHeader(headers)

req := &request{
path: "/",
method: "POST",
Expand Down Expand Up @@ -705,9 +719,13 @@ func (b *Bucket) List(prefix, delim, marker string, max int) (result *ListResp,
if max != 0 {
params["max-keys"] = []string{strconv.FormatInt(int64(max), 10)}
}
headers := map[string][]string{}
b.addTokenHeader(headers)

req := &request{
bucket: b.Name,
params: params,
bucket: b.Name,
params: params,
headers: headers,
}
result = &ListResp{}
for attempt := b.S3.AttemptStrategy.Start(); attempt.Next(); {
Expand Down

0 comments on commit 3212f60

Please sign in to comment.