Skip to content

Commit

Permalink
fix(compress): allow passing in compressor options
Browse files Browse the repository at this point in the history
Goals of this PR:
1. Allow passing in options to individual compressor
2. Do not change default behavior

Signed-off-by: Ramkumar Chinchani <[email protected]>
  • Loading branch information
rchincha committed Oct 13, 2023
1 parent 312b2db commit a9860bf
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 3 deletions.
34 changes: 31 additions & 3 deletions mutate/compress.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,15 @@ type Compressor interface {
// indicate what compression type is used, e.g. "gzip", or "" for no
// compression.
MediaTypeSuffix() string

// WithOpt applies an option and can be chained.
WithOpt(CompressorOpt) Compressor
}

// CompressorOpt is a compressor option which can be used to configure a
// compressor.
type CompressorOpt interface{}

type noopCompressor struct{}

func (nc noopCompressor) Compress(r io.Reader) (io.ReadCloser, error) {
Expand All @@ -37,16 +44,24 @@ func (nc noopCompressor) MediaTypeSuffix() string {
// NoopCompressor provides no compression.
var NoopCompressor Compressor = noopCompressor{}

func (nc noopCompressor) WithOpt(CompressorOpt) Compressor {
return nc
}

// GzipCompressor provides gzip compression.
var GzipCompressor Compressor = gzipCompressor{}
var GzipCompressor Compressor = gzipCompressor{blockSize: 256 << 10}

type GzipBlockSize int

type gzipCompressor struct{}
type gzipCompressor struct {
blockSize int
}

func (gz gzipCompressor) Compress(reader io.Reader) (io.ReadCloser, error) {
pipeReader, pipeWriter := io.Pipe()

gzw := gzip.NewWriter(pipeWriter)
if err := gzw.SetConcurrency(256<<10, 2*runtime.NumCPU()); err != nil {
if err := gzw.SetConcurrency(gz.blockSize, 2*runtime.NumCPU()); err != nil {
return nil, errors.Wrapf(err, "set concurrency level to %v blocks", 2*runtime.NumCPU())
}
go func() {
Expand Down Expand Up @@ -76,6 +91,15 @@ func (gz gzipCompressor) MediaTypeSuffix() string {
return "gzip"
}

func (gz gzipCompressor) WithOpt(opt CompressorOpt) Compressor {
switch val := opt.(type) {
case GzipBlockSize:
gz.blockSize = int(val)
}

return gz
}

// ZstdCompressor provides zstd compression.
var ZstdCompressor Compressor = zstdCompressor{}

Expand Down Expand Up @@ -114,3 +138,7 @@ func (zs zstdCompressor) Compress(reader io.Reader) (io.ReadCloser, error) {
func (zs zstdCompressor) MediaTypeSuffix() string {
return "zstd"
}

func (zs zstdCompressor) WithOpt(CompressorOpt) Compressor {
return zs
}
15 changes: 15 additions & 0 deletions mutate/compress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,21 @@ func TestGzipCompressor(t *testing.T) {
assert.NoError(err)

assert.Equal(string(content), fact)

// with options
c = c.WithOpt(GzipBlockSize(256 << 12))

r, err = c.Compress(buf)
assert.NoError(err)
assert.Equal(c.MediaTypeSuffix(), "gzip")

r, err = gzip.NewReader(r)
assert.NoError(err)

content, err = ioutil.ReadAll(r)
assert.NoError(err)

assert.Equal(string(content), fact)
}

func TestZstdCompressor(t *testing.T) {
Expand Down

0 comments on commit a9860bf

Please sign in to comment.