Skip to content

Commit

Permalink
fix: set leftover wait read size (#41)
Browse files Browse the repository at this point in the history
  • Loading branch information
joway authored and Hchenn committed Sep 16, 2021
1 parent 8224c3e commit 3304c59
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 4 deletions.
5 changes: 3 additions & 2 deletions connection_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,10 +327,11 @@ func (c *connection) triggerWrite(err error) {

// waitRead will wait full n bytes.
func (c *connection) waitRead(n int) (err error) {
if c.inputBuffer.Len() >= n {
leftover := n - c.inputBuffer.Len()
if leftover <= 0 {
return nil
}
atomic.StoreInt32(&c.waitReadSize, int32(n))
atomic.StoreInt32(&c.waitReadSize, int32(leftover))
defer atomic.StoreInt32(&c.waitReadSize, 0)
if c.readTimeout > 0 {
return c.waitReadWithTimeout(n)
Expand Down
4 changes: 2 additions & 2 deletions connection_reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ func (c *connection) inputAck(n int) (err error) {
if n < 0 {
n = 0
}
lack := atomic.AddInt32(&c.waitReadSize, int32(-n))
err = c.inputBuffer.BookAck(n, lack <= 0)
leftover := atomic.AddInt32(&c.waitReadSize, int32(-n))
err = c.inputBuffer.BookAck(n, leftover <= 0)
c.triggerRead()
c.onRequest()
return err
Expand Down
34 changes: 34 additions & 0 deletions connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,40 @@ func TestConnectionReadAfterClosed(t *testing.T) {
wg.Wait()
}

func TestConnectionWaitReadHalfPacket(t *testing.T) {
r, w := GetSysFdPairs()
var rconn = &connection{}
rconn.init(&netFD{fd: r}, nil)
var size = pagesize * 2
var msg = make([]byte, size)

// write half packet
syscall.Write(w, msg[:size/2])
// wait poller reads buffer
for rconn.inputBuffer.Len() <= 0 {
runtime.Gosched()
}

// wait read full packet
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
var buf, err = rconn.Reader().Next(size)
Equal(t, atomic.LoadInt32(&rconn.waitReadSize), int32(0))
MustNil(t, err)
Equal(t, len(buf), size)
}()

// write left half packet
for atomic.LoadInt32(&rconn.waitReadSize) <= 0 {
runtime.Gosched()
}
Equal(t, atomic.LoadInt32(&rconn.waitReadSize), int32(size/2))
syscall.Write(w, msg[size/2:])
wg.Wait()
}

func TestReadTimer(t *testing.T) {
read := time.NewTimer(time.Second)
MustTrue(t, read.Stop())
Expand Down
1 change: 1 addition & 0 deletions nocopy_linkbuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,7 @@ func (b *LinkBuffer) BookAck(n int, isEnd bool) (err error) {
for node := b.flush.next; node != nil; node = node.next {
node.off, node.malloc, node.refer, node.buf = 0, 0, 1, node.buf[:0]
}

// FIXME: The tail node must not be larger than 8KB to prevent Out Of Memory.
if isEnd && cap(b.flush.buf) > pagesize {
if b.flush.next == nil {
Expand Down

0 comments on commit 3304c59

Please sign in to comment.