Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Context for cancellation and timeouts #9

Merged
merged 4 commits into from
Oct 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,13 @@ load a zip file you want to extract later.
curl http://localhost:8090/slurp?key=myfile.zip&url=http://leafo.net/file.zip
```

## GCS authentication and permissions

The key file in your config should be the PEM-encoded private key for a
service account which has permissions to view and create objects on your
chosen GCS bucket.

The bucket needs correct access settings:

- Public access must be enabled, not prevented.
- Access control should be set to fine-grained ("legacy ACL"), not uniform.
15 changes: 9 additions & 6 deletions main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"encoding/json"
"flag"
"fmt"
Expand Down Expand Up @@ -45,11 +46,6 @@ func must(err error) {
func main() {
flag.Parse()

if serve != "" {
must(zipserver.ServeZip(serve))
return
}

config, err := zipserver.LoadConfig(configFname)
must(err)

Expand All @@ -58,6 +54,11 @@ func main() {
return
}

if serve != "" {
must(zipserver.ServeZip(config, serve))
return
}

if extract != "" {
archiver := zipserver.NewArchiver(config)
limits := zipserver.DefaultExtractLimits(config)
Expand All @@ -74,8 +75,10 @@ func main() {
randChars[i] = letters[rand.Intn(len(letters))]
}

files, err := archiver.UploadZipFromFile(extract, string(randChars), limits)
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(config.JobTimeout))
defer cancel()

files, err := archiver.UploadZipFromFile(ctx, extract, string(randChars), limits)
if err != nil {
log.Fatal(err.Error())
return
Expand Down
69 changes: 50 additions & 19 deletions zipserver/archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package zipserver

import (
"bytes"
"context"
"crypto/md5"
"encoding/hex"
"fmt"
Expand All @@ -12,6 +13,7 @@ import (
"os"
"path"
"strings"
"time"

"archive/zip"

Expand Down Expand Up @@ -53,15 +55,15 @@ func NewArchiver(config *Config) *Archiver {
return &Archiver{storage, config}
}

func (a *Archiver) fetchZip(key string) (string, error) {
func (a *Archiver) fetchZip(ctx context.Context, key string) (string, error) {
os.MkdirAll(tmpDir, os.ModeDir|0777)

hasher := md5.New()
hasher.Write([]byte(key))
fname := a.Bucket + "_" + hex.EncodeToString(hasher.Sum(nil)) + ".zip"
fname = path.Join(tmpDir, fname)

src, err := a.Storage.GetFile(a.Bucket, key)
src, err := a.Storage.GetFile(ctx, a.Bucket, key)

if err != nil {
return "", errors.Wrap(err, 0)
Expand Down Expand Up @@ -89,7 +91,8 @@ func (a *Archiver) fetchZip(key string) (string, error) {
func (a *Archiver) abortUpload(files []ExtractedFile) error {
for _, file := range files {
// FIXME: code quality - what if we fail here? any retry strategies?
a.Storage.DeleteFile(a.Bucket, file.Key)
ctx := context.Background()
a.Storage.DeleteFile(ctx, a.Bucket, file.Key)
}

return nil
Expand Down Expand Up @@ -133,14 +136,22 @@ type UploadFileResult struct {
Size uint64
}

func uploadWorker(a *Archiver, limits *ExtractLimits, tasks <-chan UploadFileTask, results chan<- UploadFileResult, cancel chan struct{}, done chan struct{}) {
func uploadWorker(
ctx context.Context,
a *Archiver,
tasks <-chan UploadFileTask,
results chan<- UploadFileResult,
done chan struct{},
) {
defer func() { done <- struct{}{} }()

for task := range tasks {
leafo marked this conversation as resolved.
Show resolved Hide resolved
file := task.File
key := task.Key

resource, err := a.extractAndUploadOne(key, file, limits)
ctx, cancel := context.WithTimeout(ctx, time.Duration(a.Config.FilePutTimeout))
resource, err := a.extractAndUploadOne(ctx, key, file)
cancel() // Free resources now instead of deferring till func returns

if err != nil {
log.Print("Failed sending " + key + ": " + err.Error())
Expand All @@ -153,12 +164,18 @@ func uploadWorker(a *Archiver, limits *ExtractLimits, tasks <-chan UploadFileTas
}

// extracts and sends all files to prefix
func (a *Archiver) sendZipExtracted(prefix, fname string, limits *ExtractLimits) ([]ExtractedFile, error) {
func (a *Archiver) sendZipExtracted(
ctx context.Context,
prefix, fname string,
limits *ExtractLimits,
) ([]ExtractedFile, error) {
zipReader, err := zip.OpenReader(fname)
if err != nil {
return nil, errors.Wrap(err, 0)
}

defer zipReader.Close()

if len(zipReader.File) > limits.MaxNumFiles {
err := fmt.Errorf("Too many files in zip (%v > %v)",
len(zipReader.File), limits.MaxNumFiles)
Expand All @@ -167,15 +184,14 @@ func (a *Archiver) sendZipExtracted(prefix, fname string, limits *ExtractLimits)

extractedFiles := []ExtractedFile{}

defer zipReader.Close()

fileCount := 0
var byteCount uint64

fileList := []*zip.File{}

for _, file := range zipReader.File {
if shouldIgnoreFile(file.Name) {
log.Printf("Ignoring file %s", file.Name)
continue
}

Expand All @@ -201,11 +217,14 @@ func (a *Archiver) sendZipExtracted(prefix, fname string, limits *ExtractLimits)

tasks := make(chan UploadFileTask)
results := make(chan UploadFileResult)
cancel := make(chan struct{})
done := make(chan struct{}, limits.ExtractionThreads)

// Context can be canceled by caller or when an individual task fails.
ctx, cancel := context.WithCancel(ctx)
defer cancel()

for i := 0; i < limits.ExtractionThreads; i++ {
go uploadWorker(a, limits, tasks, results, cancel, done)
go uploadWorker(ctx, a, tasks, results, done)
}

activeWorkers := limits.ExtractionThreads
Expand All @@ -217,8 +236,9 @@ func (a *Archiver) sendZipExtracted(prefix, fname string, limits *ExtractLimits)
task := UploadFileTask{file, key}
select {
case tasks <- task:
case <-cancel:
case <-ctx.Done():
// Something went wrong!
log.Println("Remaining tasks were canceled")
return
}
}
Expand All @@ -231,7 +251,7 @@ func (a *Archiver) sendZipExtracted(prefix, fname string, limits *ExtractLimits)
case result := <-results:
if result.Error != nil {
extractError = result.Error
close(cancel)
cancel()
} else {
extractedFiles = append(extractedFiles, ExtractedFile{result.Key, result.Size})
fileCount++
Expand All @@ -254,7 +274,8 @@ func (a *Archiver) sendZipExtracted(prefix, fname string, limits *ExtractLimits)
}

// sends an individual file from a zip
func (a *Archiver) extractAndUploadOne(key string, file *zip.File, limits *ExtractLimits) (*ResourceSpec, error) {
// Caller should set the job timeout in ctx.
func (a *Archiver) extractAndUploadOne(ctx context.Context, key string, file *zip.File) (*ResourceSpec, error) {
readerCloser, err := file.Open()
if err != nil {
return nil, err
Expand Down Expand Up @@ -319,7 +340,7 @@ func (a *Archiver) extractAndUploadOne(key string, file *zip.File, limits *Extra

limited := limitedReader(reader, file.UncompressedSize64, &resource.size)

err = a.Storage.PutFileWithSetup(a.Bucket, resource.key, limited, resource.setupRequest)
err = a.Storage.PutFileWithSetup(ctx, a.Bucket, resource.key, limited, resource.setupRequest)
if err != nil {
return resource, errors.Wrap(err, 0)
}
Expand All @@ -329,18 +350,28 @@ func (a *Archiver) extractAndUploadOne(key string, file *zip.File, limits *Extra

// ExtractZip downloads the zip at `key` to a temporary file,
// then extracts its contents and uploads each item to `prefix`
func (a *Archiver) ExtractZip(key, prefix string, limits *ExtractLimits) ([]ExtractedFile, error) {
fname, err := a.fetchZip(key)
// Caller should set the job timeout in ctx.
func (a *Archiver) ExtractZip(
ctx context.Context,
key, prefix string,
limits *ExtractLimits,
) ([]ExtractedFile, error) {
fname, err := a.fetchZip(ctx, key)
if err != nil {
return nil, err
}

defer os.Remove(fname)
prefix = path.Join(a.ExtractPrefix, prefix)
return a.sendZipExtracted(prefix, fname, limits)
return a.sendZipExtracted(ctx, prefix, fname, limits)
}

func (a *Archiver) UploadZipFromFile(fname string, prefix string, limits *ExtractLimits) ([]ExtractedFile, error) {
// Caller should set the job timeout in ctx.
func (a *Archiver) UploadZipFromFile(
ctx context.Context,
fname, prefix string,
limits *ExtractLimits,
) ([]ExtractedFile, error) {
prefix = path.Join("_zipserver", prefix)
return a.sendZipExtracted(prefix, fname, limits)
return a.sendZipExtracted(ctx, prefix, fname, limits)
}
28 changes: 17 additions & 11 deletions zipserver/archive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package zipserver
import (
"archive/zip"
"bytes"
"context"
"fmt"
"io"
"os"
Expand Down Expand Up @@ -32,16 +33,17 @@ func emptyConfig() *Config {

func Test_ExtractOnGCS(t *testing.T) {
withGoogleCloudStorage(t, func(storage Storage, config *Config) {
ctx := context.Background()
archiver := &Archiver{storage, config}

r, err := os.Open("/home/leafo/code/go/etlua.zip")
assert.NoError(t, err)
defer r.Close()

err = storage.PutFile(config.Bucket, "zipserver_test/test.zip", r, "application/zip")
err = storage.PutFile(ctx, config.Bucket, "zipserver_test/test.zip", r, "application/zip")
assert.NoError(t, err)

_, err = archiver.ExtractZip("zipserver_test/test.zip", "zipserver_test/extract", testLimits())
_, err = archiver.ExtractZip(ctx, "zipserver_test/test.zip", "zipserver_test/extract", testLimits())
assert.NoError(t, err)
})
}
Expand Down Expand Up @@ -73,6 +75,8 @@ func (zl *zipLayout) Write(t *testing.T, zw *zip.Writer) {
}

func (zl *zipLayout) Check(t *testing.T, storage *MemStorage, bucket, prefix string) {
ctx := context.Background()

for _, entry := range zl.entries {
func() {
name := entry.name
Expand All @@ -81,7 +85,7 @@ func (zl *zipLayout) Check(t *testing.T, storage *MemStorage, bucket, prefix str
}

path := fmt.Sprintf("%s/%s", prefix, name)
reader, err := storage.GetFile(bucket, path)
reader, err := storage.GetFile(ctx, bucket, path)
if entry.ignored {
assert.Error(t, err)
assert.True(t, strings.Contains(err.Error(), "object not found"))
Expand Down Expand Up @@ -111,14 +115,16 @@ func (zl *zipLayout) Check(t *testing.T, storage *MemStorage, bucket, prefix str
func Test_ExtractInMemory(t *testing.T) {
config := emptyConfig()

ctx := context.Background()

storage, err := NewMemStorage()
assert.NoError(t, err)

archiver := &Archiver{storage, config}
prefix := "zipserver_test/mem_test_extracted"
zipPath := "mem_test.zip"

_, err = archiver.ExtractZip(zipPath, prefix, testLimits())
_, err = archiver.ExtractZip(ctx, zipPath, prefix, testLimits())
assert.Error(t, err)

withZip := func(zl *zipLayout, cb func(zl *zipLayout)) {
Expand All @@ -131,7 +137,7 @@ func Test_ExtractInMemory(t *testing.T) {
err = zw.Close()
assert.NoError(t, err)

err = storage.PutFile(config.Bucket, zipPath, bytes.NewReader(buf.Bytes()), "application/octet-stream")
err = storage.PutFile(ctx, config.Bucket, zipPath, bytes.NewReader(buf.Bytes()), "application/octet-stream")
assert.NoError(t, err)

cb(zl)
Expand Down Expand Up @@ -210,7 +216,7 @@ func Test_ExtractInMemory(t *testing.T) {
},
},
}, func(zl *zipLayout) {
_, err := archiver.ExtractZip(zipPath, prefix, testLimits())
_, err := archiver.ExtractZip(ctx, zipPath, prefix, testLimits())
assert.NoError(t, err)

zl.Check(t, storage, config.Bucket, prefix)
Expand All @@ -228,7 +234,7 @@ func Test_ExtractInMemory(t *testing.T) {
limits := testLimits()
limits.MaxFileNameLength = 100

_, err := archiver.ExtractZip(zipPath, prefix, limits)
_, err := archiver.ExtractZip(ctx, zipPath, prefix, limits)
assert.Error(t, err)
assert.True(t, strings.Contains(err.Error(), "paths that are too long"))
})
Expand All @@ -245,7 +251,7 @@ func Test_ExtractInMemory(t *testing.T) {
limits := testLimits()
limits.MaxFileSize = 499

_, err := archiver.ExtractZip(zipPath, prefix, limits)
_, err := archiver.ExtractZip(ctx, zipPath, prefix, limits)
assert.Error(t, err)
assert.True(t, strings.Contains(err.Error(), "file that is too large"))
})
Expand Down Expand Up @@ -277,7 +283,7 @@ func Test_ExtractInMemory(t *testing.T) {
limits := testLimits()
limits.MaxNumFiles = 3

_, err := archiver.ExtractZip(zipPath, prefix, limits)
_, err := archiver.ExtractZip(ctx, zipPath, prefix, limits)
assert.Error(t, err)
assert.True(t, strings.Contains(err.Error(), "Too many files"))
})
Expand Down Expand Up @@ -309,7 +315,7 @@ func Test_ExtractInMemory(t *testing.T) {
limits := testLimits()
limits.MaxTotalSize = 6

_, err := archiver.ExtractZip(zipPath, prefix, limits)
_, err := archiver.ExtractZip(ctx, zipPath, prefix, limits)
assert.Error(t, err)
assert.True(t, strings.Contains(err.Error(), "zip too large"))
})
Expand Down Expand Up @@ -347,7 +353,7 @@ func Test_ExtractInMemory(t *testing.T) {
}, func(zl *zipLayout) {
limits := testLimits()

_, err := archiver.ExtractZip(zipPath, prefix, limits)
_, err := archiver.ExtractZip(ctx, zipPath, prefix, limits)
assert.Error(t, err)
assert.True(t, strings.Contains(err.Error(), "intentional failure"))

Expand Down
Loading