Skip to content

Commit

Permalink
feat: read && write in a single loop (#38)
Browse files Browse the repository at this point in the history
  • Loading branch information
joway authored and Hchenn committed Sep 16, 2021
1 parent 337b646 commit f343971
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 68 deletions.
75 changes: 40 additions & 35 deletions poll_default_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,50 +124,55 @@ func (p *defaultPoll) handler(events []epollevent) (closed bool) {
if !operator.do() {
continue
}

evt := events[i].events
switch {
// check hup first
case events[i].events&(syscall.EPOLLHUP|syscall.EPOLLRDHUP) != 0:
case evt&(syscall.EPOLLHUP|syscall.EPOLLRDHUP) != 0:
hups = append(hups, operator)
case events[i].events&syscall.EPOLLERR != 0:
case evt&syscall.EPOLLERR != 0:
// Under block-zerocopy, the kernel may give an error callback, which is not a real error, just an EAGAIN.
// So here we need to check this error, if it is EAGAIN then do nothing, otherwise still mark as hup.
if _, _, _, _, err := syscall.Recvmsg(operator.FD, nil, nil, syscall.MSG_ERRQUEUE); err != syscall.EAGAIN {
hups = append(hups, operator)
}
case events[i].events&syscall.EPOLLIN != 0:
// for non-connection
if operator.OnRead != nil {
operator.OnRead(p)
break
}
// only for connection
var bs = operator.Inputs(p.barriers[i].bs)
if len(bs) == 0 {
break
}
var n, err = readv(operator.FD, bs, p.barriers[i].ivs)
operator.InputAck(n)
if err != nil && err != syscall.EAGAIN && err != syscall.EINTR {
log.Printf("readv(fd=%d) failed: %s", operator.FD, err.Error())
hups = append(hups, operator)
default:
if evt&syscall.EPOLLIN != 0 {
if operator.OnRead != nil {
// for non-connection
operator.OnRead(p)
} else {
// for connection
var bs = operator.Inputs(p.barriers[i].bs)
if len(bs) > 0 {
var n, err = readv(operator.FD, bs, p.barriers[i].ivs)
operator.InputAck(n)
if err != nil && err != syscall.EAGAIN && err != syscall.EINTR {
log.Printf("readv(fd=%d) failed: %s", operator.FD, err.Error())
hups = append(hups, operator)
break
}
}
}
}
case events[i].events&syscall.EPOLLOUT != 0:
// for non-connection
if operator.OnWrite != nil {
operator.OnWrite(p)
break
}
// only for connection
var bs, supportZeroCopy = operator.Outputs(p.barriers[i].bs)
if len(bs) == 0 {
break
}
// TODO: Let the upper layer pass in whether to use ZeroCopy.
var n, err = sendmsg(operator.FD, bs, p.barriers[i].ivs, false && supportZeroCopy)
operator.OutputAck(n)
if err != nil && err != syscall.EAGAIN {
log.Printf("sendmsg(fd=%d) failed: %s", operator.FD, err.Error())
hups = append(hups, operator)
if evt&syscall.EPOLLOUT != 0 {
if operator.OnWrite != nil {
// for non-connection
operator.OnWrite(p)
} else {
// for connection
var bs, supportZeroCopy = operator.Outputs(p.barriers[i].bs)
if len(bs) > 0 {
// TODO: Let the upper layer pass in whether to use ZeroCopy.
var n, err = sendmsg(operator.FD, bs, p.barriers[i].ivs, false && supportZeroCopy)
operator.OutputAck(n)
if err != nil && err != syscall.EAGAIN {
log.Printf("sendmsg(fd=%d) failed: %s", operator.FD, err.Error())
hups = append(hups, operator)
break
}
}
}
}
}
operator.done()
Expand Down
74 changes: 41 additions & 33 deletions poll_race_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package netpoll

import (
"log"
"runtime"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -122,48 +123,55 @@ func (p *defaultPoll) handler(events []syscall.EpollEvent) (closed bool) {
if !operator.do() {
continue
}

evt := events[i].Events
switch {
// check hup first
case events[i].Events&(syscall.EPOLLHUP|syscall.EPOLLRDHUP) != 0:
case evt&(syscall.EPOLLHUP|syscall.EPOLLRDHUP) != 0:
hups = append(hups, operator)
case events[i].Events&syscall.EPOLLERR != 0:
case evt&syscall.EPOLLERR != 0:
// Under block-zerocopy, the kernel may give an error callback, which is not a real error, just an EAGAIN.
// So here we need to check this error, if it is EAGAIN then do nothing, otherwise still mark as hup.
if _, _, _, _, err := syscall.Recvmsg(operator.FD, nil, nil, syscall.MSG_ERRQUEUE); err != syscall.EAGAIN {
hups = append(hups, operator)
}
case events[i].Events&syscall.EPOLLIN != 0:
// for non-connection
if operator.OnRead != nil {
operator.OnRead(p)
break
}
// only for connection
var bs = operator.Inputs(p.barriers[i].bs)
if len(bs) == 0 {
break
}
var n, err = readv(operator.FD, bs, p.barriers[i].ivs)
operator.InputAck(n)
if err != nil && err != syscall.EAGAIN && err != syscall.EINTR {
hups = append(hups, operator)
default:
if evt&syscall.EPOLLIN != 0 {
if operator.OnRead != nil {
// for non-connection
operator.OnRead(p)
} else {
// for connection
var bs = operator.Inputs(p.barriers[i].bs)
if len(bs) > 0 {
var n, err = readv(operator.FD, bs, p.barriers[i].ivs)
operator.InputAck(n)
if err != nil && err != syscall.EAGAIN && err != syscall.EINTR {
log.Printf("readv(fd=%d) failed: %s", operator.FD, err.Error())
hups = append(hups, operator)
break
}
}
}
}
case events[i].Events&syscall.EPOLLOUT != 0:
// for non-connection
if operator.OnWrite != nil {
operator.OnWrite(p)
break
}
// only for connection
var bs, supportZeroCopy = operator.Outputs(p.barriers[i].bs)
if len(bs) == 0 {
break
}
// TODO: Let the upper layer pass in whether to use ZeroCopy.
var n, err = sendmsg(operator.FD, bs, p.barriers[i].ivs, false && supportZeroCopy)
operator.OutputAck(n)
if err != nil && err != syscall.EAGAIN {
hups = append(hups, operator)
if evt&syscall.EPOLLOUT != 0 {
if operator.OnWrite != nil {
// for non-connection
operator.OnWrite(p)
} else {
// for connection
var bs, supportZeroCopy = operator.Outputs(p.barriers[i].bs)
if len(bs) > 0 {
// TODO: Let the upper layer pass in whether to use ZeroCopy.
var n, err = sendmsg(operator.FD, bs, p.barriers[i].ivs, false && supportZeroCopy)
operator.OutputAck(n)
if err != nil && err != syscall.EAGAIN {
log.Printf("sendmsg(fd=%d) failed: %s", operator.FD, err.Error())
hups = append(hups, operator)
break
}
}
}
}
}
operator.done()
Expand Down

0 comments on commit f343971

Please sign in to comment.