This tutorial gets you started with Netpoll through some simple examples, includes how to use Server, Client and nocopy APIs.
Here is a simple server demo, we will explain how it is constructed next.
First we need to get a Listener
, it can be net.Listener
or netpoll.Listener
, which is no difference for server
usage. Create a Listener
as shown below:
package main
import "net"
func main() {
listener, err := net.Listen(network, address)
if err != nil {
panic("create net listener failed")
}
...
}
or
package main
import "github.com/cloudwego/netpoll"
func main() {
listener, err := netpoll.CreateListener(network, address)
if err != nil {
panic("create netpoll listener failed")
}
...
}
EventLoop
is an event-driven scheduler, a real NIO Server, responsible for connection management, event scheduling,
etc.
params:
OnRequest
is an interface that users should implement by themselves to process business logic. Code Comment describes its behavior in detail.Option
is used to customize the configuration when creatingEventLoop
, and the following example shows its usage. For more details, please refer to options.
The creation process is as follows:
package main
import (
"time"
"github.com/cloudwego/netpoll"
)
var eventLoop netpoll.EventLoop
func main() {
...
eventLoop, _ := netpoll.NewEventLoop(
handle,
netpoll.WithOnPrepare(prepare),
netpoll.WithReadTimeout(time.Second),
)
...
}
EventLoop
provides services by binding Listener
, as shown below.
Serve
function will block until an error occurs, such as a panic or the user actively calls Shutdown
.
package main
import (
"github.com/cloudwego/netpoll"
)
var eventLoop netpoll.EventLoop
func main() {
...
// start listen loop ...
eventLoop.Serve(listener)
}
EventLoop
provides the Shutdown
function, which is used to stop the server gracefully. The usage is as follows.
package main
import (
"context"
"time"
"github.com/cloudwego/netpoll"
)
var eventLoop netpoll.EventLoop
func main() {
// stop server ...
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
eventLoop.Shutdown(ctx)
}
Netpoll also has the ability to be used on the Client side. It provides Dialer
, similar to net.Dialer
.
Again, here is a simple client demo, and then we introduce it in detail.
Similar to Net, Netpoll provides several public functions for directly dialing a connection. such as:
DialConnection(network, address string, timeout time.Duration) (connection Connection, err error)
DialTCP(ctx context.Context, network string, laddr, raddr *TCPAddr) (*TCPConnection, error)
DialUnix(network string, laddr, raddr *UnixAddr) (*UnixConnection, error)
Netpoll also defines the Dialer
interface. The usage is as follows:
(of course, you can usually use the fast way)
package main
import (
"github.com/cloudwego/netpoll"
)
func main() {
// Dial a connection with Dialer.
dialer := netpoll.NewDialer()
conn, err := dialer.DialConnection(network, address, timeout)
if err != nil {
panic("dial netpoll connection failed")
}
...
}
Connection
provides Nocopy APIs - Reader
and Writer
, to avoid frequent copying. Let’s introduce their simple
usage.
package main
type Connection interface {
// Recommended nocopy APIs
Reader() Reader
Writer() Writer
... // see code comments for more details
}
Nocopy APIs is designed as a two-step operation.
On Reader
, after reading data through Next
, Peek
, ReadString
, etc., you still have to actively call Release
to
release the buffer(Nocopy
reads the original address of the buffer, so you must take the initiative to confirm that
the buffer is no longer used).
Similarly, on Writer
, you first need to allocate a buffer to write data, and then call Flush
to confirm that all
data has been written.
Writer
also provides rich APIs to allocate buffers, such as Malloc
, WriteString
and so on.
The following shows some simple examples of reading and writing data. For more details, please refer to the code comments.
package main
import (
"github.com/cloudwego/netpoll"
)
func main() {
var conn netpoll.Connection
var reader, writer = conn.Reader(), conn.Writer()
// reading
buf, _ := reader.Next(n)
... parse the read data ...
reader.Release()
// writing
var write_data []byte
... make the write data ...
alloc, _ := writer.Malloc(len(write_data))
copy(alloc, write_data) // write data
writer.Flush()
}
If you want to use the connection to send (or receive) multiple sets of data, then you will face the work of packing and unpacking the data.
On net, this kind of work is generally done by copying. An example is as follows:
package main
import (
"net"
)
func main() {
var conn net.Conn
var buf = make([]byte, 8192)
// reading
for {
n, _ := conn.Read(buf)
... unpacking & handling ...
var i int
for i = 0; i <= n-pkgsize; i += pkgsize {
pkg := append([]byte{}, buf[i:i+pkgsize]...)
go func() {
... handling pkg ...
}
}
buf = append(buf[:0], buf[i:n]...)
}
// writing
var write_datas <-chan []byte
... packing write ...
for {
pkg := <-write_datas
conn.Write(pkg)
}
}
But, this is not necessary in Netpoll, nocopy APIs supports operations on the original address of the buffer, and realizes automatic recycling and reuse of resources through reference counting.
Examples are as follows(use function Reader.Slice
and Writer.Append
):
package main
import (
"github.com/cloudwego/netpoll"
)
func main() {
var conn netpoll.Connection
// reading
reader := conn.Reader()
for {
... unpacking & handling ...
pkg, _ := reader.Slice(pkgsize)
go func() {
... handling pkg ...
pkg.Release()
}
}
// writing
var write_datas <-chan netpoll.Writer
... packing write ...
writer := conn.Writer()
for {
select {
case pkg := <-write_datas:
writer.Append(pkg)
default:
if writer.MallocLen() > 0 {
writer.Flush()
}
}
}
}
NumLoops
represents the number of epoll
created by Netpoll, which has been automatically adjusted
according to the number of P (runtime.GOMAXPROCS(0)
) by default, and users generally don't need to care.
But if your service has heavy I/O, you may need the following configuration:
package main
import (
"runtime"
"github.com/cloudwego/netpoll"
)
func init() {
netpoll.SetNumLoops(runtime.GOMAXPROCS(0))
}
When there are multiple pollers in Netpoll, the connections in the service process will be loadbalanced to each poller.
The following strategies are supported now:
- Random
- The new connection will be assigned to a randomly picked poller.
- RoundRobin
- The new connection will be assigned to the poller in order.
Netpoll uses RoundRobin
by default, and users can change it in the following ways:
package main
import (
"github.com/cloudwego/netpoll"
)
func init() {
netpoll.SetLoadBalance(netpoll.Random)
// or
netpoll.SetLoadBalance(netpoll.RoundRobin)
}
3. How to configure gopool ?
Netpoll uses gopool as the goroutine pool by default to optimize the stack growth
problem that
generally occurs in RPC services.
In the project gopool, it explains how to change its configuration, so won't repeat it here.
Of course, if your project does not have a stack growth
problem, it is best to close gopool as follows:
package main
import (
"github.com/cloudwego/netpoll"
)
func init() {
netpoll.DisableGopool()
}
There are different ways to prepare a new connection on the client and server.
- On the server side,
OnPrepare
is defined to prepare for the new connection, and it also supports returning acontext
, which can be reused in subsequent business processing.WithOnPrepare
provides this registration. When the server accepts a new connection, it will automatically execute the registeredOnPrepare
function to complete the preparation work. The example is as follows:
package main
import (
"context"
"github.com/cloudwego/netpoll"
)
func main() {
// register OnPrepare
var onPrepare netpoll.OnPrepare = prepare
evl, _ := netpoll.NewEventLoop(handler, netpoll.WithOnPrepare(onPrepare))
...
}
func prepare(connection netpoll.Connection) (ctx context.Context) {
... prepare connection ...
return
}
- On the client side, the connection preparation needs to be completed by the user. Generally speaking, the connection
created by
Dialer
can be controlled by the user, which is different from passively accepting the connection on the server side. Therefore, the user not relying on the trigger, just prepare a new connection like this:
package main
import (
"context"
"github.com/cloudwego/netpoll"
)
func main() {
conn, err := netpoll.DialConnection(network, address, timeout)
if err != nil {
panic("dial netpoll connection failed")
}
... prepare here directly ...
prepare(conn)
...
}
func prepare(connection netpoll.Connection) (ctx context.Context) {
... prepare connection ...
return
}
Netpoll now supports two timeout configurations:
Read Timeout
- In order to maintain the same operating style as
net.Conn
,Connection.Reader
is also designed to block reading. So provideRead Timeout
. Read Timeout
has no default value(wait infinitely), it can be configured viaConnection
orEventLoop.Option
, for example:
- In order to maintain the same operating style as
package main
import (
"github.com/cloudwego/netpoll"
)
func main() {
var conn netpoll.Connection
// 1. setting by Connection
conn.SetReadTimeout(timeout)
// or
// 2. setting with Option
netpoll.NewEventLoop(handler, netpoll.WithReadTimeout(timeout))
...
}
Idle Timeout
Idle Timeout
utilizes theTCP KeepAlive
mechanism to kick out dead connections and reduce maintenance overhead. When using Netpoll, there is generally no need to create and close connections frequently, and idle connections have little effect. When the connection is inactive for a long time, in order to prevent dead connection caused by suspended animation, hang of the opposite end, abnormal disconnection, etc., the connection will be actively closed after theIdle Timeout
.- The default minimum value of
Idle Timeout
is10min
, which can be configured throughConnection
API orEventLoop.Option
, for example:
package main
import (
"github.com/cloudwego/netpoll"
)
func main() {
var conn netpoll.Connection
// 1. setting by Connection
conn.SetIdleTimeout(timeout)
// or
// 2. setting with Option
netpoll.NewEventLoop(handler, netpoll.WithIdleTimeout(timeout))
...
}
OnRequest
refers to the callback triggered by Netpoll when a read event occurs on the connection. On the
Server side, when creating the EventLoop
, you can register an OnRequest
, which will be triggered when each
connection data arrives and perform business processing. On the Client side, there is no OnRequest
by default, and it
can be set via API when needed. E.g:
package main
import (
"context"
"github.com/cloudwego/netpoll"
)
func main() {
var onRequest netpoll.OnRequest = handler
// 1. on server side
evl, _ := netpoll.NewEventLoop(onRequest, opts...)
...
// 2. on client side
conn, _ := netpoll.DialConnection(network, address, timeout)
conn.SetOnRequest(handler)
...
}
func handler(ctx context.Context, connection netpoll.Connection) (err error) {
... handling ...
return nil
}
CloseCallback
refers to the callback triggered by Netpoll when the connection is closed, which is used to
perform additional processing after the connection is closed.
Netpoll is able to perceive the connection status. When the connection is closed by peer or cleaned up by
self, it will actively trigger CloseCallback
instead of returning an error on the next Read
or Write
(the way
of net.Conn
).
Connection
provides API for adding CloseCallback
, callbacks that have been added cannot be removed, and multiple
callbacks are supported.
package main
import (
"github.com/cloudwego/netpoll"
)
func main() {
var conn netpoll.Connection
// add close callback
var cb netpoll.CloseCallback = callback
conn.AddCloseCallback(cb)
...
}
func callback(connection netpoll.Connection) error {
return nil
}
If your server is running on a physical machine, the number of P created by the Go process is equal to the number of CPUs of the machine. But the server may not use so many cores. In this case, too many pollers will cause performance degradation.
There are several solutions:
- Use the
taskset
command to limit CPU usage, such as:
taskset -c 0-3 $run_your_server
- Actively set the number of P, for instance:
package main
import (
"runtime"
)
func init() {
runtime.GOMAXPROCS(num_you_want)
}
- Actively set the number of pollers, e.g:
package main
import (
"github.com/cloudwego/netpoll"
)
func init() {
netpoll.SetNumLoops(num_you_want)
}