Skip to content

Commit

Permalink
Merge pull request #93 from cloudwego/release/v0.1.1
Browse files Browse the repository at this point in the history
chore: release v0.1.1
  • Loading branch information
Hchenn authored Dec 9, 2021
2 parents 68a9c4c + 70120f1 commit 66a7ad1
Show file tree
Hide file tree
Showing 7 changed files with 85 additions and 60 deletions.
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,16 @@ large number of failed connections in the pool.

On the other hand, the open source community currently lacks Go network libraries that focus on RPC scenarios. Similar
repositories such as: [evio][evio], [gnet][gnet], etc., are all
focus on scenarios like [Redis][Redis], [Haproxy][Haproxy].
focus on scenarios like [Redis][Redis], [HAProxy][HAProxy].

But now, [Netpoll][Netpoll] was born and solved the above problems. It draws inspiration
from the design of [evio][evio] and [netty][netty], has
excellent [Performance](#performance), and is more suitable for microservice architecture.
Also [Netpoll][Netpoll] provides a number of [Features](#features), and it is recommended
to replace [net][net] in some RPC scenarios.

We developed the RPC framework [KiteX][KiteX] and HTTP
framework [Hertz][Hertz] (to be open sourced) based
We developed the RPC framework [Kitex][Kitex] and HTTP
framework [Hertz][Hertz] (coming soon) based
on [Netpoll][Netpoll], both with industry-leading performance.

[Examples][netpoll-benchmark] show how to build RPC client and server
Expand All @@ -41,7 +41,7 @@ For more information, please refer to [Document](#document).
- `Dialer` supports building clients
- `EventLoop` supports building a server
- TCP, Unix Domain Socket
- Linux, Mac OS (operating system)
- Linux, macOS (operating system)

* **Future**
- [multisyscall][multisyscall] supports batch system calls
Expand Down Expand Up @@ -77,7 +77,7 @@ More benchmarks reference [kitex-benchmark][kitex-benchmark] and [hertz-benchmar
[evio]: https://github.com/tidwall/evio
[gnet]: https://github.com/panjf2000/gnet
[netty]: https://github.com/netty/netty
[KiteX]: https://github.com/cloudwego/kitex
[Kitex]: https://github.com/cloudwego/kitex
[Hertz]: https://github.com/cloudwego/hertz

[netpoll-benchmark]: https://github.com/cloudwego/netpoll-benchmark
Expand All @@ -86,7 +86,7 @@ More benchmarks reference [kitex-benchmark][kitex-benchmark] and [hertz-benchmar

[ByteDance]: https://www.bytedance.com
[Redis]: https://redis.io
[Haproxy]: http://www.haproxy.org
[HAProxy]: http://www.haproxy.org

[LinkBuffer]: nocopy_linkbuffer.go
[gopool]: https://github.com/bytedance/gopkg/tree/develop/util/gopool
Expand Down
10 changes: 5 additions & 5 deletions README_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ goroutine,大幅增加调度开销。此外,[net.Conn][net.Conn] 没有提
框架很难设计出高效的连接池,池中的失效连接无法及时清理。

另一方面,开源社区目前缺少专注于 RPC 方案的 Go 网络库。类似的项目如:[evio][evio]
, [gnet][gnet] 等,均面向 [Redis][Redis], [Haproxy][Haproxy] 这样的场景。
, [gnet][gnet] 等,均面向 [Redis][Redis], [HAProxy][HAProxy] 这样的场景。

因此 [Netpoll][Netpoll] 应运而生,它借鉴了 [evio][evio]
[netty][netty] 的优秀设计,具有出色的 [性能](#性能),更适用于微服务架构。
同时,[Netpoll][Netpoll] 还提供了一些 [特性](#特性),推荐在 RPC 设计中替代
[net][net]

基于 [Netpoll][Netpoll] 开发的 RPC 框架 [KiteX][KiteX] 和 HTTP
基于 [Netpoll][Netpoll] 开发的 RPC 框架 [Kitex][Kitex] 和 HTTP
框架 [Hertz][Hertz] (即将开源),性能均业界领先。

[范例][netpoll-benchmark] 展示了如何使用 [Netpoll][Netpoll]
Expand All @@ -36,7 +36,7 @@ goroutine,大幅增加调度开销。此外,[net.Conn][net.Conn] 没有提
- `Dialer` 支持构建 client
- `EventLoop` 支持构建 server
- 支持 TCP,Unix Domain Socket
- 支持 Linux,Mac OS(操作系统)
- 支持 Linux,macOS(操作系统)

* **即将开源**
- [multisyscall][multisyscall] 支持批量系统调用
Expand Down Expand Up @@ -70,7 +70,7 @@ goroutine,大幅增加调度开销。此外,[net.Conn][net.Conn] 没有提
[evio]: https://github.com/tidwall/evio
[gnet]: https://github.com/panjf2000/gnet
[netty]: https://github.com/netty/netty
[KiteX]: https://github.com/cloudwego/kitex
[Kitex]: https://github.com/cloudwego/kitex
[Hertz]: https://github.com/cloudwego/hertz

[netpoll-benchmark]: https://github.com/cloudwego/netpoll-benchmark
Expand All @@ -79,7 +79,7 @@ goroutine,大幅增加调度开销。此外,[net.Conn][net.Conn] 没有提

[ByteDance]: https://www.bytedance.com
[Redis]: https://redis.io
[Haproxy]: http://www.haproxy.org
[HAProxy]: http://www.haproxy.org

[LinkBuffer]: nocopy_linkbuffer.go
[gopool]: https://github.com/bytedance/gopkg/tree/develop/util/gopool
Expand Down
15 changes: 8 additions & 7 deletions connection_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,16 +109,17 @@ func (c *connection) Skip(n int) (err error) {
// Release implements Connection.
func (c *connection) Release() (err error) {
// Check inputBuffer length first to reduce contention in mux situation.
if c.inputBuffer.Len() == 0 && c.lock(reading) {
// Double check length to calculate the maxSize
// c.operator.do competes with c.inputs/c.inputAck
if c.inputBuffer.Len() == 0 && c.operator.do() {
maxSize := c.inputBuffer.calcMaxSize()
if maxSize > c.maxSize {
c.maxSize = maxSize
}
// Double check length to reset tail node
if c.inputBuffer.Len() == 0 {
maxSize := c.inputBuffer.calcMaxSize()
if maxSize > c.maxSize {
c.maxSize = maxSize
}
c.inputBuffer.resetTail(c.maxSize)
}
c.unlock(reading)
c.operator.done()
}
return c.inputBuffer.Release()
}
Expand Down
1 change: 0 additions & 1 deletion connection_lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ const (
closing key = iota
processing
flushing
reading
// total must be at the bottom.
total
)
Expand Down
4 changes: 0 additions & 4 deletions connection_reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,6 @@ func (c *connection) closeBuffer() {

// inputs implements FDOperator.
func (c *connection) inputs(vs [][]byte) (rs [][]byte) {
if !c.lock(reading) {
return rs
}
vs[0] = c.inputBuffer.book(c.bookSize, c.maxSize)
return vs[:1]
}
Expand All @@ -88,7 +85,6 @@ func (c *connection) inputAck(n int) (err error) {
if c.maxSize < length {
c.maxSize = length
}
c.unlock(reading)

var needTrigger = true
if length == n {
Expand Down
76 changes: 50 additions & 26 deletions mux/shard_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@ package mux

import (
"runtime"
"sync"
"sync/atomic"

"github.com/bytedance/gopkg/util/gopool"

"github.com/cloudwego/netpoll"
)

Expand All @@ -30,20 +33,25 @@ import (
* NewShardQueue: create a queue with netpoll.Connection.
* ShardSize: the recommended number of shards is 32.
*/
const ShardSize = 32
var ShardSize int

func init() {
ShardSize = runtime.GOMAXPROCS(0)
}

// NewShardQueue .
func NewShardQueue(size int32, conn netpoll.Connection) (queue *ShardQueue) {
func NewShardQueue(size int, conn netpoll.Connection) (queue *ShardQueue) {
queue = &ShardQueue{
conn: conn,
size: size,
size: int32(size),
getters: make([][]WriterGetter, size),
swap: make([]WriterGetter, 0, 64),
locks: make([]int32, size),
}
for i := range queue.getters {
queue.getters[i] = make([]WriterGetter, 0, 64)
}
queue.list = make([]int32, size)
return queue
}

Expand All @@ -55,12 +63,21 @@ type WriterGetter func() (buf netpoll.Writer, isNil bool)
// If there is an error in the data transmission, the connection will be closed.
// ShardQueue.Add: add the data to be sent.
type ShardQueue struct {
conn netpoll.Connection
idx, size int32
getters [][]WriterGetter // len(getters) = size
swap []WriterGetter // use for swap
locks []int32 // len(locks) = size
trigger, runNum int32
conn netpoll.Connection
idx, size int32
getters [][]WriterGetter // len(getters) = size
swap []WriterGetter // use for swap
locks []int32 // len(locks) = size
queueTrigger
}

// here for trigger
type queueTrigger struct {
trigger int32
runNum int32
list []int32 // record the triggered shard
w, r int32 // ptr of list
listLock sync.Mutex // list total lock
}

// Add adds to q.getters[shard]
Expand All @@ -77,44 +94,51 @@ func (q *ShardQueue) Add(gts ...WriterGetter) {

// triggering shard.
func (q *ShardQueue) triggering(shard int32) {
q.listLock.Lock()
q.w = (q.w + 1) % q.size
q.list[q.w] = shard
q.listLock.Unlock()

if atomic.AddInt32(&q.trigger, 1) > 1 {
return
}
q.foreach(shard)
q.foreach()
}

// foreach swap r & w. It's not concurrency safe.
func (q *ShardQueue) foreach(shard int32) {
func (q *ShardQueue) foreach() {
if atomic.AddInt32(&q.runNum, 1) > 1 {
return
}
go func() {
var tmp []WriterGetter
for ; atomic.LoadInt32(&q.trigger) > 0; shard = (shard + 1) % q.size {
gopool.CtxGo(nil, func() {
var negNum int32 // is negative number of triggerNum
for triggerNum := atomic.LoadInt32(&q.trigger); triggerNum > 0; {
q.r = (q.r + 1) % q.size
shared := q.list[q.r]

// lock & swap
q.lock(shard)
if len(q.getters[shard]) == 0 {
q.unlock(shard)
continue
}
// swap
tmp = q.getters[shard]
q.getters[shard] = q.swap[:0]
q.lock(shared)
tmp := q.getters[shared]
q.getters[shared] = q.swap[:0]
q.swap = tmp
q.unlock(shard)
atomic.AddInt32(&q.trigger, -1)
q.unlock(shared)

// deal
q.deal(q.swap)
negNum--
if triggerNum+negNum == 0 {
triggerNum = atomic.AddInt32(&q.trigger, negNum)
negNum = 0
}
}
q.flush()

// quit & check again
atomic.StoreInt32(&q.runNum, 0)
if atomic.LoadInt32(&q.trigger) > 0 {
q.foreach(shard)
q.foreach()
}
}()
})
}

// deal is used to get deal of netpoll.Writer.
Expand Down
27 changes: 16 additions & 11 deletions nocopy_linkbuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (b *LinkBuffer) Next(n int) (p []byte, err error) {
b.recalLen(-n) // re-cal length

// single node
l := b.read.Len()
l := b.firstReadLen()
if l >= n {
return b.read.Next(n), nil
}
Expand Down Expand Up @@ -118,11 +118,10 @@ func (b *LinkBuffer) Peek(n int) (p []byte, err error) {
if b.Len() < n {
return p, fmt.Errorf("link buffer peek[%d] not enough", n)
}
var node = b.read
// single node
l := node.Len()
l := b.firstReadLen()
if l >= n {
return node.Peek(n), nil
return b.read.Peek(n), nil
}
// multiple nodes
var pIdx int
Expand All @@ -132,6 +131,7 @@ func (b *LinkBuffer) Peek(n int) (p []byte, err error) {
} else {
p = make([]byte, n)
}
var node = b.read
for ack := n; ack > 0; ack = ack - l {
l = node.Len()
if l >= ack {
Expand Down Expand Up @@ -209,7 +209,7 @@ func (b *LinkBuffer) readBinary(n int) (p []byte) {

// single node
p = make([]byte, n)
l := b.read.Len()
l := b.firstReadLen()
if l >= n {
copy(p, b.read.Next(n))
return p
Expand Down Expand Up @@ -267,7 +267,7 @@ func (b *LinkBuffer) Slice(n int) (r Reader, err error) {
}()

// single node
l := b.read.Len()
l := b.firstReadLen()
if l >= n {
node := b.read.Refer(n)
p.head, p.read, p.flush = node, node, node
Expand Down Expand Up @@ -534,11 +534,6 @@ func (b *LinkBuffer) book(bookSize, maxSize int) (p []byte) {
l = maxSize
b.write.next = newLinkBufferNode(maxSize)
b.write = b.write.next

// If there is no data in read node, then point it to next one.
if b.Len() == 0 {
b.read, b.flush = b.write, b.write
}
}
if l > bookSize {
l = bookSize
Expand Down Expand Up @@ -736,6 +731,16 @@ func (b *LinkBuffer) growth(n int) {
}
}

// firstReadLen returns the length of the first node greater than zero.
func (b *LinkBuffer) firstReadLen() int {
l := b.read.Len()
for l == 0 {
b.read = b.read.next
l = b.read.Len()
}
return l
}

// zero-copy slice convert to string
func unsafeSliceToString(b []byte) string {
return *(*string)(unsafe.Pointer(&b))
Expand Down

0 comments on commit 66a7ad1

Please sign in to comment.