Skip to content

Commit

Permalink
Merge pull request #34 from rlmcpherson/slice-pool
Browse files Browse the repository at this point in the history
Slice pool
  • Loading branch information
rlmcpherson committed Oct 8, 2014
2 parents 0706fb0 + 5e9bfe6 commit 974e759
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 105 deletions.
69 changes: 37 additions & 32 deletions getter.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package s3gof3r

import (
"bytes"
"crypto/md5"
"fmt"
"hash"
Expand Down Expand Up @@ -37,21 +36,21 @@ type getter struct {
quit chan struct{}
qWait map[int]*chunk

bp *bp
sp *bp

closed bool
c *Config

md5 hash.Hash
md5 hash.Hash
cIdx int64
}

type chunk struct {
id int
header http.Header
start int64
size int64
b *bytes.Buffer
len int64
b []byte
}

func newGetter(getURL url.URL, c *Config, b *Bucket) (io.ReadCloser, http.Header, error) {
Expand Down Expand Up @@ -82,7 +81,7 @@ func newGetter(getURL url.URL, c *Config, b *Bucket) (io.ReadCloser, http.Header
g.chunkTotal = int((g.contentLen + g.bufsz - 1) / g.bufsz) // round up, integer division
logger.debugPrintf("object size: %3.2g MB", float64(g.contentLen)/float64((1*mb)))

g.bp = newBufferPool(g.bufsz)
g.sp = bufferPool(g.bufsz)

for i := 0; i < g.c.Concurrency; i++ {
go g.worker()
Expand Down Expand Up @@ -130,7 +129,7 @@ func (g *getter) initChunks() {
start: i,
size: size,
b: nil,
len: 0}
}
i += size
id++
g.wg.Add(1)
Expand All @@ -149,22 +148,21 @@ func (g *getter) worker() {
func (g *getter) retryGetChunk(c *chunk) {
defer g.wg.Done()
var err error
c.b = <-g.bp.get
c.b = <-g.sp.get
for i := 0; i < g.c.NTry; i++ {
time.Sleep(time.Duration(math.Exp2(float64(i))) * 100 * time.Millisecond) // exponential back-off
err = g.getChunk(c)
if err == nil {
return
}
logger.debugPrintf("error on attempt %d: retrying chunk: %v, error: %s", i, c, err)
logger.debugPrintf("error on attempt %d: retrying chunk: %v, error: %s", i, c.id, err)
}
g.err = err
close(g.quit) // out of tries, ensure quit by closing channel
}

func (g *getter) getChunk(c *chunk) error {
// ensure buffer is empty
c.b.Reset()

r, err := http.NewRequest("GET", g.url.String(), nil)
if err != nil {
Expand All @@ -180,11 +178,11 @@ func (g *getter) getChunk(c *chunk) error {
if resp.StatusCode != 206 {
return newRespError(resp)
}
n, err := c.b.ReadFrom(resp.Body)
n, err := io.ReadAtLeast(resp.Body, c.b, int(c.size))
if err != nil {
return err
}
if n != c.size {
if int64(n) != c.size {
return fmt.Errorf("chunk %d: Expected %d bytes, received %d",
c.id, c.size, n)
}
Expand All @@ -200,30 +198,32 @@ func (g *getter) Read(p []byte) (int, error) {
if g.err != nil {
return 0, g.err
}
if g.rChunk == nil {
g.rChunk, err = g.nextChunk()
if err != nil {
return 0, err
nw := 0
for nw < len(p) {
if g.rChunk == nil {
g.rChunk, err = g.nextChunk()
if err != nil {
return 0, err
}
g.cIdx = 0
}
}

n, err := g.rChunk.b.Read(p)
if g.c.Md5Check {
g.md5.Write(p[0:n])
}
n := copy(p[nw:], g.rChunk.b[g.cIdx:g.rChunk.size])
g.cIdx += int64(n)
nw += n
g.bytesRead += int64(n)

// Empty buffer, move on to next
if err == io.EOF {
// Do not send EOF for each chunk.
if !(g.rChunk.id == g.chunkTotal-1 && g.rChunk.b.Len() == 0) {
err = nil
if g.bytesRead == g.contentLen {
return nw, io.EOF
}
if g.cIdx >= g.rChunk.size-1 { // chunk complete
g.sp.give <- g.rChunk.b
g.chunkID++
g.rChunk = nil
}
g.bp.give <- g.rChunk.b // recycle buffer
g.rChunk = nil
g.chunkID++
}
g.bytesRead = g.bytesRead + int64(n)
return n, err
return nw, nil

}

func (g *getter) nextChunk() (*chunk, error) {
Expand All @@ -233,6 +233,11 @@ func (g *getter) nextChunk() (*chunk, error) {
c := g.qWait[g.chunkID]
if c != nil {
delete(g.qWait, g.chunkID)
if g.c.Md5Check {
if _, err := g.md5.Write(c.b[:c.size]); err != nil {
return nil, err
}
}
return c, nil
}
// if next chunk not in qWait, read from channel
Expand All @@ -254,7 +259,7 @@ func (g *getter) Close() error {
}
g.wg.Wait()
g.closed = true
close(g.bp.quit)
close(g.sp.quit)
if g.bytesRead != g.contentLen {
return fmt.Errorf("read error: %d bytes read. expected: %d", g.bytesRead, g.contentLen)
}
Expand Down
67 changes: 30 additions & 37 deletions pool.go
Original file line number Diff line number Diff line change
@@ -1,77 +1,70 @@
package s3gof3r

import (
"bytes"
"container/list"
"time"
)

type qBuf struct {
when time.Time
buffer *bytes.Buffer
type qb struct {
when time.Time
s []byte
}

type bp struct {
makes int
get chan *bytes.Buffer
give chan *bytes.Buffer
quit chan struct{}
timeout time.Duration
makeSize int64
makes int
get chan []byte
give chan []byte
quit chan struct{}
timeout time.Duration
bufsz int64
sizech chan int64
}

func makeBuffer(size int64) []byte {
return make([]byte, 0, size)
}

func newBufferPool(bufsz int64) (np *bp) {
np = &bp{
get: make(chan *bytes.Buffer),
give: make(chan *bytes.Buffer),
quit: make(chan struct{}),
timeout: time.Minute,
makeSize: bufsz,
func bufferPool(bufsz int64) (sp *bp) {
sp = &bp{
get: make(chan []byte),
give: make(chan []byte),
quit: make(chan struct{}),
timeout: time.Minute,
sizech: make(chan int64),
}
go func() {
q := new(list.List)
for {
if q.Len() == 0 {
size := np.makeSize + 100*kb // allocate overhead to avoid slice growth
q.PushFront(qBuf{when: time.Now(), buffer: bytes.NewBuffer(makeBuffer(size))})
np.makes++
logger.debugPrintf("buffer %d of %d MB allocated", np.makes, np.makeSize/(1*mb))
q.PushFront(qb{when: time.Now(), s: make([]byte, bufsz)})
sp.makes++
}

e := q.Front()

timeout := time.NewTimer(np.timeout)
timeout := time.NewTimer(sp.timeout)
select {
case b := <-np.give:
case b := <-sp.give:
timeout.Stop()
b.Reset()
q.PushFront(qBuf{when: time.Now(), buffer: b})

case np.get <- e.Value.(qBuf).buffer:
q.PushFront(qb{when: time.Now(), s: b})
case sp.get <- e.Value.(qb).s:
timeout.Stop()
q.Remove(e)

case <-timeout.C:
// free unused buffers
// free unused slices older than timeout
e := q.Front()
for e != nil {
n := e.Next()
if time.Since(e.Value.(qBuf).when) > np.timeout {
if time.Since(e.Value.(qb).when) > sp.timeout {
q.Remove(e)
e.Value = nil
}
e = n
}
case <-np.quit:
logger.debugPrintf("%d buffers of %d MB allocated", np.makes, np.makeSize/(1*mb))
case sz := <-sp.sizech: // update buffer size, free buffers
bufsz = sz
case <-sp.quit:
logger.debugPrintf("%d buffers of %d MB allocated", sp.makes, bufsz/(1*mb))
return
}
}

}()
return np
return sp
}
6 changes: 3 additions & 3 deletions pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ func TestBP(t *testing.T) {
lf := *bytes.NewBuffer(nil)
SetLogger(&lf, "", log.LstdFlags, true)

bp := newBufferPool(mb)
bp := bufferPool(mb)
bp.timeout = 1 * time.Millisecond
b := <-bp.get
if cap(b.Bytes()) != int(mb+100*kb) {
t.Errorf("Expected buffer capacity: %d. Actual: %d", kb, b.Len())
if cap(b) != int(mb) {
t.Errorf("Expected buffer capacity: %d. Actual: %d", kb, cap(b))
}
bp.give <- b
if bp.makes != 2 {
Expand Down
Loading

0 comments on commit 974e759

Please sign in to comment.