Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support extraction of a subset of zip contents #7

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/itchio/zipserver
go 1.17

require (
github.com/dhowden/tag v0.0.0-20220618230019-adf36e896086
github.com/go-errors/errors v1.4.2
github.com/stretchr/testify v1.7.0
golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDk
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dhowden/itl v0.0.0-20170329215456-9fbe21093131/go.mod h1:eVWQJVQ67aMvYhpkDwaH2Goy2vo6v8JCMfGXfQ9sPtw=
github.com/dhowden/plist v0.0.0-20141002110153-5db6e0d9931a/go.mod h1:sLjdR6uwx3L6/Py8F+QgAfeiuY87xuYGwCDqRFrvCzw=
github.com/dhowden/tag v0.0.0-20220618230019-adf36e896086 h1:ORubSQoKnncsBnR4zD9CuYFJCPOCuSNEpWEZrDdBXkc=
github.com/dhowden/tag v0.0.0-20220618230019-adf36e896086/go.mod h1:Z3Lomva4pyMWYezjMAU5QWRh0p1VvO4199OHlFnyKkM=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
Expand Down
25 changes: 25 additions & 0 deletions zipserver/analyzer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package zipserver

import (
"errors"
"io"
)

// ErrSkipped is non-critical and indicates that analysis
// chose to ignore a file. The file should not be uploaded.
var ErrSkipped = errors.New("skipped file")

// Analyzer analyzes individual files in a zip archive.
// Behavior may change based on the intended workload.
type Analyzer interface {
// Analyze should return info about the contained file.
// It should return ErrSkipped if a file was ignored.
Analyze(r io.Reader, filename string) (AnalyzeResult, error)
}

type AnalyzeResult struct {
RenameTo string // If non-empty, file should be renamed before uploading
Metadata interface{}
ContentType string
ContentEncoding string
}
143 changes: 66 additions & 77 deletions zipserver/archive.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package zipserver

import (
"bytes"
"context"
"crypto/md5"
"encoding/hex"
Expand Down Expand Up @@ -40,8 +39,9 @@ type Archiver struct {

// ExtractedFile represents a file extracted from a .zip into a GCS bucket
type ExtractedFile struct {
Key string
Size uint64
Key string
Size uint64
Metadata interface{} `json:",omitempty"`
}

// NewArchiver creates a new archiver from the given config
Expand Down Expand Up @@ -133,42 +133,44 @@ func shouldIgnoreFile(fname string) bool {

// UploadFileTask contains the information needed to extract a single file from a .zip
type UploadFileTask struct {
File *zip.File
Key string
DestPathPrefix string
LocalFile *zip.File
}

// UploadFileResult is successful is Error is nil - in that case, it contains the
// GCS key the file was uploaded under, and the number of bytes written for that file.
// An error causes the job to abort processing all further files in the archive.
type UploadFileResult struct {
Error error
Key string
Size uint64
ExtractedFile
}

func uploadWorker(
ctx context.Context,
a *Archiver,
analyzer Analyzer,
tasks <-chan UploadFileTask,
results chan<- UploadFileResult,
done chan struct{},
) {
defer func() { done <- struct{}{} }()

for task := range tasks {
file := task.File
key := task.Key

ctx, cancel := context.WithTimeout(ctx, time.Duration(a.Config.FilePutTimeout))
resource, err := a.extractAndUploadOne(ctx, key, file)
info, err := a.extractAndUploadOne(ctx, task, analyzer)
cancel() // Free resources now instead of deferring till func returns

if err != nil {
log.Print("Failed sending " + key + ": " + err.Error())
results <- UploadFileResult{err, key, 0}
if errors.Is(err, ErrSkipped) {
log.Printf("Skipping file: %s", task.LocalFile.Name)
continue
}
log.Print("Failed sending " + task.LocalFile.Name + ": " + err.Error())
results <- UploadFileResult{Error: err}
return
}

results <- UploadFileResult{nil, resource.key, resource.size}
results <- UploadFileResult{ExtractedFile: info}
}
}

Expand All @@ -177,6 +179,7 @@ func (a *Archiver) sendZipExtracted(
ctx context.Context,
prefix, fname string,
limits *ExtractLimits,
analyzer Analyzer,
) ([]ExtractedFile, error) {
zipReader, err := zip.OpenReader(fname)
if err != nil {
Expand Down Expand Up @@ -233,16 +236,18 @@ func (a *Archiver) sendZipExtracted(
defer cancel()

for i := 0; i < limits.ExtractionThreads; i++ {
go uploadWorker(ctx, a, tasks, results, done)
go uploadWorker(ctx, a, analyzer, tasks, results, done)
}

activeWorkers := limits.ExtractionThreads

go func() {
defer func() { close(tasks) }()
for _, file := range fileList {
key := path.Join(prefix, file.Name)
task := UploadFileTask{file, key}
task := UploadFileTask{
DestPathPrefix: prefix,
LocalFile: file,
}
select {
case tasks <- task:
case <-ctx.Done():
Expand All @@ -262,7 +267,7 @@ func (a *Archiver) sendZipExtracted(
extractError = result.Error
cancel()
} else {
extractedFiles = append(extractedFiles, ExtractedFile{result.Key, result.Size})
extractedFiles = append(extractedFiles, result.ExtractedFile)
fileCount++
}
case <-done:
Expand All @@ -284,77 +289,59 @@ func (a *Archiver) sendZipExtracted(

// sends an individual file from a zip
// Caller should set the job timeout in ctx.
func (a *Archiver) extractAndUploadOne(ctx context.Context, key string, file *zip.File) (*ResourceSpec, error) {
readerCloser, err := file.Open()
if err != nil {
return nil, err
}
defer readerCloser.Close()

var reader io.Reader = readerCloser
func (a *Archiver) extractAndUploadOne(
ctx context.Context,
task UploadFileTask,
analyzer Analyzer,
) (ExtractedFile, error) {
none := ExtractedFile{}
file := task.LocalFile

resource := &ResourceSpec{
key: key,
analyzerReader, err := file.Open()
if err != nil {
return none, err
}
defer analyzerReader.Close()

// try determining MIME by extension
mimeType := mime.TypeByExtension(path.Ext(key))

var buffer bytes.Buffer
_, err = io.Copy(&buffer, io.LimitReader(reader, 512))

info, err := analyzer.Analyze(analyzerReader, file.Name)
if err != nil {
return nil, errors.Wrap(err, 0)
return none, err
}

contentMimeType := http.DetectContentType(buffer.Bytes())
// join the bytes read and the original reader
reader = io.MultiReader(&buffer, reader)

if contentMimeType == "application/x-gzip" || contentMimeType == "application/gzip" {
resource.contentEncoding = "gzip"

// try to see if there's a real extension hidden beneath
if strings.HasSuffix(key, ".gz") {
realMimeType := mime.TypeByExtension(path.Ext(strings.TrimSuffix(key, ".gz")))

if realMimeType != "" {
mimeType = realMimeType
}
}

} else if strings.HasSuffix(key, ".br") {
// there is no way to detect a brotli stream by content, so we assume if it ends if .br then it's brotli
// this path is used for Unity 2020 webgl games built with brotli compression
resource.contentEncoding = "br"
realMimeType := mime.TypeByExtension(path.Ext(strings.TrimSuffix(key, ".br")))

if realMimeType != "" {
mimeType = realMimeType
}
} else if mimeType == "" {
// fall back to the extension detected from content, eg. someone uploaded a .png with wrong extension
mimeType = contentMimeType
// Analysis may have called Read() but we cannot seek back, so open a new Reader with initialized cursor.
uploadReader, err := file.Open()
if err != nil {
return none, err
}
defer uploadReader.Close()

if mimeType == "" {
// default mime type
mimeType = "application/octet-stream"
sendName := file.Name
if info.RenameTo != "" {
sendName = info.RenameTo
}
resource.contentType = mimeType
destKey := path.Join(task.DestPathPrefix, sendName)
log.Printf("Sending key=%q mime=%q encoding=%q", destKey, info.ContentType, info.ContentEncoding)

resource.applyRewriteRules()
var size uint64 // Written to by limitedReader
limited := limitedReader(uploadReader, file.UncompressedSize64, &size)

log.Printf("Sending: %s", resource)

limited := limitedReader(reader, file.UncompressedSize64, &resource.size)

err = a.Storage.PutFileWithSetup(ctx, a.Bucket, resource.key, limited, resource.setupRequest)
err = a.Storage.PutFileWithSetup(ctx, a.Bucket, destKey, limited, func(r *http.Request) error {
r.Header.Set("X-Goog-Acl", "public-read")
r.Header.Set("Content-Type", info.ContentType)
if info.ContentEncoding != "" {
r.Header.Set("Content-Encoding", info.ContentEncoding)
}
return nil
})
if err != nil {
return resource, errors.Wrap(err, 0)
return none, errors.Wrap(err, 0)
}

return resource, nil
return ExtractedFile{
Key: destKey,
Size: size,
Metadata: info.Metadata,
}, nil
}

// ExtractZip downloads the zip at `key` to a temporary file,
Expand All @@ -364,6 +351,7 @@ func (a *Archiver) ExtractZip(
ctx context.Context,
key, prefix string,
limits *ExtractLimits,
analyzer Analyzer,
) ([]ExtractedFile, error) {
fname, err := a.fetchZip(ctx, key)
if err != nil {
Expand All @@ -372,7 +360,7 @@ func (a *Archiver) ExtractZip(

defer os.Remove(fname)
prefix = path.Join(a.ExtractPrefix, prefix)
return a.sendZipExtracted(ctx, prefix, fname, limits)
return a.sendZipExtracted(ctx, prefix, fname, limits, analyzer)
}

// Caller should set the job timeout in ctx.
Expand All @@ -382,5 +370,6 @@ func (a *Archiver) UploadZipFromFile(
limits *ExtractLimits,
) ([]ExtractedFile, error) {
prefix = path.Join("_zipserver", prefix)
return a.sendZipExtracted(ctx, prefix, fname, limits)
// TODO: Add CLI option to choose game or music content.
return a.sendZipExtracted(ctx, prefix, fname, limits, &GameAnalyzer{})
}
Loading