Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a function call for listing objects under a prefix #122

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
231 changes: 231 additions & 0 deletions list_objects.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
package s3gof3r

import (
"encoding/xml"
"math"
"net/http"
"strconv"
"sync"
"time"
)

func newObjectLister(c *Config, b *Bucket, prefixes []string, maxKeys int) (*ObjectLister, error) {
l := new(ObjectLister)
l.c, l.b = new(Config), new(Bucket)
*l.c, *l.b = *c, *b
l.c.NTry = max(c.NTry, 1)
l.c.Concurrency = max(c.Concurrency, 1)
l.getCh, l.putCh = make(chan string), make(chan []string, 1)
l.quit = make(chan struct{})
l.prefixes = prefixes
l.maxKeys = maxKeys

for i := 0; i < l.c.Concurrency; i++ {
l.wg.Add(1)
go l.worker()
}
go l.initPrefixes()

return l, nil
}

type ObjectLister struct {
b *Bucket
c *Config
prefixes []string
maxKeys int

next []string
err error
getCh chan string
putCh chan []string
wg sync.WaitGroup
quit chan struct{}
quitOnce sync.Once
}

func (l *ObjectLister) closeQuit() {
l.quitOnce.Do(func() { close(l.quit) })
}

func (l *ObjectLister) initPrefixes() {
// We first enqueue all of the prefixes we were given
for _, p := range l.prefixes {
l.getCh <- p
}
close(l.getCh)

l.wg.Wait()
close(l.putCh)
}

func (l *ObjectLister) worker() {
for p := range l.getCh {
var continuation string
retries:
for {
res, err := l.retryListObjects(p, continuation)
if err != nil {
select {
case <-l.quit:
return
default:
l.err = err
l.closeQuit()
return
}
}

keys := make([]string, 0, len(res.Contents))
for _, c := range res.Contents {
keys = append(keys, c.Key)
}

select {
case <-l.quit:
return
case l.putCh <- keys:
continuation = res.NextContinuationToken
if continuation != "" {
continue
}

// Break from this prefix and grab the next one
break retries
}
}
}

l.wg.Done()
}

func (l *ObjectLister) retryListObjects(p, continuation string) (*listBucketResult, error) {
var err error
var res *listBucketResult
for i := 0; i < l.c.NTry; i++ {
opts := listObjectsOptions{MaxKeys: l.maxKeys, Prefix: p, ContinuationToken: continuation}
res, err = listObjects(l.c, l.b, opts)
if err == nil {
return res, nil
}

time.Sleep(time.Duration(math.Exp2(float64(i))) * 100 * time.Millisecond) // exponential back-off
}

return nil, err
}

// Next moves the iterator to the next set of results. It returns true if there
// are more results, or false if there are no more results or there was an
// error.
func (l *ObjectLister) Next() bool {
if l.err != nil {
return false
}

select {
case n, ok := <-l.putCh:
if !ok {
l.err = nil
return false
}

l.next = n
return true
case <-l.quit:
return false
}
}

func (l *ObjectLister) Value() []string {
return l.next
}

func (l *ObjectLister) Error() error {
return l.err
}

func (l *ObjectLister) Close() {
l.closeQuit()
}

// ListObjectsOptions specifies the options for a ListObjects operation on a S3
// bucket
type listObjectsOptions struct {
// Maximum number of keys to return per request
MaxKeys int
// Only list those keys that start with the given prefix
Prefix string
// Continuation token from the previous request
ContinuationToken string
}

type listBucketResult struct {
Name string `xml:"Name"`
Prefix string `xml:"Prefix"`
KeyCount int `xml:"KeyCount"`
MaxKeys int `xml:"MaxKeys"`
IsTruncated bool `xml:"IsTrucated"`
NextContinuationToken string `xml:"NextContinuationToken"`
Contents []listBucketContents `xml:"Contents"`
}

type listBucketContents struct {
Key string `xml:"Key"`
LastModified time.Time `xml:"LastModified"`
ETag string `xml:"ETag"`
Size int64 `xml:"Size"`
StorageClass string `xml:"StorageClass"`
CommonPrefixes []CommonPrefix `xml:"CommonPrefixes"`
}

type CommonPrefix struct {
Prefix string `xml:"Prefix"`
}

type ListObjectsResult struct {
result *listBucketResult
}

func listObjects(c *Config, b *Bucket, opts listObjectsOptions) (result *listBucketResult, err error) {
result = new(listBucketResult)
u, err := b.url("", c)
if err != nil {
return nil, err
}

q := u.Query()
q.Set("list-type", "2")
if opts.MaxKeys > 0 {
q.Set("max-keys", strconv.Itoa(opts.MaxKeys))
}
if opts.Prefix != "" {
q.Set("prefix", opts.Prefix)
}
if opts.ContinuationToken != "" {
q.Set("continuation-token", opts.ContinuationToken)
}
u.RawQuery = q.Encode()

r := http.Request{
Method: "GET",
URL: u,
}
b.Sign(&r)

resp, err := b.conf().Do(&r)
if err != nil {
return nil, err
}
defer checkClose(resp.Body, err)
if resp.StatusCode != 200 {
return nil, newRespError(resp)
}

decoder := xml.NewDecoder(resp.Body)
if err := decoder.Decode(result); err != nil {
return nil, err
}

return result, nil
}
85 changes: 85 additions & 0 deletions list_objects_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package s3gof3r

import (
"log"
"sort"
"sync"
"testing"
)

var keysForListing = []string{
"list/one/two/three",
"list/one/two/four",
"list/two/three/four",
"list/two/three/five",
"list/three/four/five",
"list/three/four/six",
"list/four/five/six",
"list/four/five/seven",
}

func uploadListerFiles() {
var wg sync.WaitGroup
for _, tt := range keysForListing {
wg.Add(1)
go func(path string) {
err := b.putReader(path, &randSrc{Size: 20})
if err != nil {
log.Fatal(err)
}
wg.Done()
}(tt)
}
wg.Wait()
}

func testListObjects(t *testing.T, prefixes []string, iterations, concurrency int) {
config := Config{
Concurrency: 1,
Scheme: "https",
}
l, err := b.ListObjects(prefixes, 5, &config)
if err != nil {
t.Error(err)
}

actual := make([]string, 0, len(keysForListing))
actualIterations := 0
for l.Next() {
actualIterations++
actual = append(actual, l.Value()...)
}

err = l.Error()
if err != nil {
t.Error(err)
}

if actualIterations != iterations {
t.Errorf("expected %d iterations, got %d", iterations, actualIterations)
}

if len(actual) != len(keysForListing) {
t.Errorf("expected %d keys, got %d", len(keysForListing), len(actual))
}

sort.Strings(keysForListing)
sort.Strings(actual)

for i, a := range keysForListing {
if a != actual[i] {
t.Errorf("result mismatch, expected '%s', got '%s'", a, actual[i])
}
}
}

func TestListObjects(t *testing.T) {
t.Parallel()

uploadListerFiles()

testListObjects(t, []string{"list/"}, 2, 1)
testListObjects(t, []string{"list/"}, 2, 5)
testListObjects(t, []string{"list/one/", "list/two/", "list/three", "list/four"}, 4, 1)
testListObjects(t, []string{"list/one/", "list/two/", "list/three", "list/four"}, 4, 5)
}
12 changes: 12 additions & 0 deletions s3gof3r.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,18 @@ func (b *Bucket) delete(path string) error {
return nil
}

// ListObjects returns a list of objects under the given prefixes using parallel
// requests for each prefix and any continuations.
//
// maxKeys indicates how many keys should be returned per request
func (b *Bucket) ListObjects(prefixes []string, maxKeys int, c *Config) (*ObjectLister, error) {
if c == nil {
c = b.conf()
}

return newObjectLister(c, b, prefixes, maxKeys)
}

// SetLogger wraps the standard library log package.
//
// It allows the internal logging of s3gof3r to be set to a desired output and format.
Expand Down