From 3ce54396e343d472c1616b4c3a16c9f9a62305a7 Mon Sep 17 00:00:00 2001 From: Kyle Xiao Date: Mon, 11 Nov 2024 18:13:58 +0800 Subject: [PATCH] ci: updated the latest conf and use supported Go versions --- .github/workflows/pr-check.yml | 47 +++++++++------ .github/workflows/release-check.yml | 25 -------- .golangci.yaml | 32 ++++++++++ connection_errors.go | 11 +--- connection_impl.go | 9 +-- connection_onevent.go | 21 ++++--- connection_reactor.go | 12 ++-- connection_test.go | 80 +++++++++++++------------ eventloop.go | 17 +++--- fd_operator_cache_test.go | 2 +- net_dialer.go | 2 +- net_dialer_test.go | 8 +-- net_listener.go | 4 +- net_netfd.go | 13 ++-- net_sock.go | 38 ++++++------ netpoll_server.go | 6 +- netpoll_unix.go | 2 +- netpoll_unix_test.go | 92 ++++++++++++++--------------- nocopy_linkbuffer.go | 18 +++--- nocopy_linkbuffer_test.go | 16 +++-- poll_default.go | 5 +- poll_default_bsd.go | 20 +++---- poll_default_bsd_norace.go | 1 - poll_default_linux.go | 20 +++---- poll_default_linux_norace.go | 1 - poll_default_linux_test.go | 20 ++++--- poll_manager.go | 2 +- poll_manager_test.go | 12 ++-- poll_test.go | 26 ++++---- sys_epoll_linux.go | 2 +- sys_epoll_linux_arm64.go | 2 +- sys_epoll_linux_loong64.go | 2 +- sys_exec_test.go | 31 ++++------ sys_sendmsg_bsd.go | 2 +- sys_sendmsg_linux.go | 2 +- 35 files changed, 304 insertions(+), 299 deletions(-) delete mode 100644 .github/workflows/release-check.yml create mode 100644 .golangci.yaml diff --git a/.github/workflows/pr-check.yml b/.github/workflows/pr-check.yml index 798696ea..ff3e6863 100644 --- a/.github/workflows/pr-check.yml +++ b/.github/workflows/pr-check.yml @@ -6,13 +6,8 @@ jobs: compatibility-test: strategy: matrix: - go: [ 1.15, 1.22 ] - # - "ubuntu-latest" is for Linux with X64 CPU, hosted by GitHub, - # fewer CPUs but high speed international network - # - "ARM64" is for Linux with ARM64 CPU, hosted by bytedance, - # more CPUs but inside CN internet which may download go cache slowly. - # GitHub don't have free runner with ARM CPU. - os: [ ubuntu-latest, ARM64 ] + go: [ 1.18, 1.23 ] + os: [ X64, ARM64 ] runs-on: ${{ matrix.os }} steps: - uses: actions/checkout@v4 @@ -20,10 +15,12 @@ jobs: uses: actions/setup-go@v5 with: go-version: ${{ matrix.go }} + cache: false - name: Unit Test run: go test -timeout=2m -v -race -covermode=atomic -coverprofile=coverage.out ./... - name: Benchmark run: go test -bench=. -benchmem -run=none ./... + windows-test: runs-on: windows-latest steps: @@ -31,22 +28,38 @@ jobs: - name: Set up Go uses: actions/setup-go@v5 with: - go-version: 1.22 + go-version: stable - name: Build Test run: go vet -v ./... - style-test: + + compliant: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - - name: Set up Go - uses: actions/setup-go@v5 - with: - go-version: 1.16 + - name: Check License Header uses: apache/skywalking-eyes/header@v0.4.0 env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - - name: Lint - run: | - test -z "$(gofmt -s -l .)" - go vet -stdmethods=false $(go list ./...) + + - name: Check Spell + uses: crate-ci/typos@v1.13.14 + + golangci-lint: + runs-on: [ self-hosted, X64 ] + steps: + - uses: actions/checkout@v4 + - name: Set up Go + uses: actions/setup-go@v5 + with: + go-version: stable + # for self-hosted, the cache path is shared across projects + # and it works well without the cache of github actions + # Enable it if we're going to use Github only + cache: false + + - name: Golangci Lint + # https://golangci-lint.run/ + uses: golangci/golangci-lint-action@v6 + with: + version: latest diff --git a/.github/workflows/release-check.yml b/.github/workflows/release-check.yml deleted file mode 100644 index 0e75ca1e..00000000 --- a/.github/workflows/release-check.yml +++ /dev/null @@ -1,25 +0,0 @@ -name: Release Check - -on: - pull_request: - branches: - - main - -jobs: - build: - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v4 - - - name: Check Source Branch - run: python2 -c "exit(0 if '${{ github.head_ref }}'.startswith('release') or '${{ github.head_ref }}'.startswith('hotfix') else 1)" - - - name: Check Version - run: | - # get version code, runner not support grep -E here - SOURCE_VERSION=`grep 'Version\s*=\s*\"v[0-9]\{1,\}\.[0-9]\{1,\}\.[0-9]\{1,\}\"' *.go | awk -F '\"' '{print $(NF-1)}'` - git checkout main - MASTER_VERSION=`grep 'Version\s*=\s*\"v[0-9]\{1,\}\.[0-9]\{1,\}\.[0-9]\{1,\}\"' *.go | awk -F '\"' '{print $(NF-1)}'` - git checkout ${{Head.SHA}} - # check version update - python2 -c "exit(0 if list(map(int,'${SOURCE_VERSION#v}'.split('.')[:3])) > list(map(int,'${MASTER_VERSION#v}'.split('.')[:3])) else 1)" diff --git a/.golangci.yaml b/.golangci.yaml new file mode 100644 index 00000000..6c588de2 --- /dev/null +++ b/.golangci.yaml @@ -0,0 +1,32 @@ +output: + # Format: colored-line-number|line-number|json|tab|checkstyle|code-climate|junit-xml|github-actions + format: colored-line-number +# All available settings of specific linters. +# Refer to https://golangci-lint.run/usage/linters +linters-settings: + gofumpt: + # Choose whether to use the extra rules. + # Default: false + extra-rules: true + goimports: + # Put imports beginning with prefix after 3rd-party packages. + # It's a comma-separated list of prefixes. + local-prefixes: github.com/cloudwego/kitex + govet: + # Disable analyzers by name. + # Run `go tool vet help` to see all analyzers. + disable: + - stdmethods +linters: + enable: + - gofumpt + - goimports + - gofmt + disable: + - errcheck + - typecheck + - deadcode + - varcheck + - staticcheck +issues: + exclude-use-default: true diff --git a/connection_errors.go b/connection_errors.go index f14070ab..203f8e79 100644 --- a/connection_errors.go +++ b/connection_errors.go @@ -44,7 +44,7 @@ const ErrnoMask = 0xFF // wrap Errno, implement xerrors.Wrapper func Exception(err error, suffix string) error { - var no, ok = err.(syscall.Errno) + no, ok := err.(syscall.Errno) if !ok { if suffix == "" { return err @@ -54,9 +54,7 @@ func Exception(err error, suffix string) error { return &exception{no: no, suffix: suffix} } -var ( - _ net.Error = (*exception)(nil) -) +var _ net.Error = (*exception)(nil) type exception struct { no syscall.Errno @@ -100,10 +98,7 @@ func (e *exception) Timeout() bool { case ErrDialTimeout, ErrReadTimeout, ErrWriteTimeout: return true } - if e.no.Timeout() { - return true - } - return false + return e.no.Timeout() } func (e *exception) Temporary() bool { diff --git a/connection_impl.go b/connection_impl.go index aa15e1e9..49913658 100644 --- a/connection_impl.go +++ b/connection_impl.go @@ -484,8 +484,8 @@ func (c *connection) flush() error { return nil } // TODO: Let the upper layer pass in whether to use ZeroCopy. - var bs = c.outputBuffer.GetBytes(c.outputBarrier.bs) - var n, err = sendmsg(c.fd, bs, c.outputBarrier.ivs, false && c.supportZeroCopy) + bs := c.outputBuffer.GetBytes(c.outputBarrier.bs) + n, err := sendmsg(c.fd, bs, c.outputBarrier.ivs, false && c.supportZeroCopy) if err != nil && err != syscall.EAGAIN { return Exception(err, "when flush") } @@ -510,10 +510,7 @@ func (c *connection) flush() error { func (c *connection) waitFlush() (err error) { if c.writeTimeout == 0 { - select { - case err = <-c.writeTrigger: - } - return err + return <-c.writeTrigger } // set write timeout diff --git a/connection_onevent.go b/connection_onevent.go index 5dc986da..8efcd98d 100644 --- a/connection_onevent.go +++ b/connection_onevent.go @@ -94,7 +94,7 @@ func (c *connection) AddCloseCallback(callback CloseCallback) error { if callback == nil { return nil } - var cb = &callbackNode{} + cb := &callbackNode{} cb.fn = callback if pre := c.closeCallbacks.Load(); pre != nil { cb.pre = pre.(*callbackNode) @@ -132,7 +132,7 @@ func (c *connection) onPrepare(opts *options) (err error) { // onConnect is responsible for executing onRequest if there is new data coming after onConnect callback finished. func (c *connection) onConnect() { - var onConnect, _ = c.onConnectCallback.Load().(OnConnect) + onConnect, _ := c.onConnectCallback.Load().(OnConnect) if onConnect == nil { c.changeState(connStateNone, connStateConnected) return @@ -141,17 +141,17 @@ func (c *connection) onConnect() { // it never happens because onDisconnect will not lock connecting if c.connected == 0 return } - var onRequest, _ = c.onRequestCallback.Load().(OnRequest) + onRequest, _ := c.onRequestCallback.Load().(OnRequest) c.onProcess(onConnect, onRequest) } // when onDisconnect called, c.IsActive() must return false func (c *connection) onDisconnect() { - var onDisconnect, _ = c.onDisconnectCallback.Load().(OnDisconnect) + onDisconnect, _ := c.onDisconnectCallback.Load().(OnDisconnect) if onDisconnect == nil { return } - var onConnect, _ = c.onConnectCallback.Load().(OnConnect) + onConnect, _ := c.onConnectCallback.Load().(OnConnect) if onConnect == nil { // no need lock if onConnect is nil // it's ok to force set state to disconnected since onConnect is nil @@ -170,12 +170,11 @@ func (c *connection) onDisconnect() { return } // OnConnect is not finished yet, return and let onConnect helps to call onDisconnect - return } // onRequest is responsible for executing the closeCallbacks after the connection has been closed. func (c *connection) onRequest() (needTrigger bool) { - var onRequest, ok = c.onRequestCallback.Load().(OnRequest) + onRequest, ok := c.onRequestCallback.Load().(OnRequest) if !ok { return true } @@ -270,8 +269,8 @@ func (c *connection) onProcess(onConnect OnConnect, onRequest OnRequest) (proces } // task exits panicked = false - return - } + } // end of task closure func + // add new task runTask(c.ctx, task) return true @@ -280,7 +279,7 @@ func (c *connection) onProcess(onConnect OnConnect, onRequest OnRequest) (proces // closeCallback . // It can be confirmed that closeCallback and onRequest will not be executed concurrently. // If onRequest is still running, it will trigger closeCallback on exit. -func (c *connection) closeCallback(needLock bool, needDetach bool) (err error) { +func (c *connection) closeCallback(needLock, needDetach bool) (err error) { if needLock && !c.lock(processing) { return nil } @@ -290,7 +289,7 @@ func (c *connection) closeCallback(needLock bool, needDetach bool) (err error) { logger.Printf("NETPOLL: closeCallback[%v,%v] detach operator failed: %v", needLock, needDetach, err) } } - var latest = c.closeCallbacks.Load() + latest := c.closeCallbacks.Load() if latest == nil { return nil } diff --git a/connection_reactor.go b/connection_reactor.go index 25b4dec5..65ee2448 100644 --- a/connection_reactor.go +++ b/connection_reactor.go @@ -37,9 +37,9 @@ func (c *connection) onHup(p Poll) error { // It depends on closing by user if OnConnect and OnRequest is nil, otherwise it needs to be released actively. // It can be confirmed that the OnRequest goroutine has been exited before closeCallback executing, // and it is safe to close the buffer at this time. - var onConnect = c.onConnectCallback.Load() - var onRequest = c.onRequestCallback.Load() - var needCloseByUser = onConnect == nil && onRequest == nil + onConnect := c.onConnectCallback.Load() + onRequest := c.onRequestCallback.Load() + needCloseByUser := onConnect == nil && onRequest == nil if !needCloseByUser { // already PollDetach when call OnHup c.closeCallback(true, false) @@ -69,8 +69,8 @@ func (c *connection) onClose() error { // closeBuffer recycle input & output LinkBuffer. func (c *connection) closeBuffer() { - var onConnect, _ = c.onConnectCallback.Load().(OnConnect) - var onRequest, _ = c.onRequestCallback.Load().(OnRequest) + onConnect, _ := c.onConnectCallback.Load().(OnConnect) + onRequest, _ := c.onRequestCallback.Load().(OnRequest) // if client close the connection, we cannot ensure that the poller is not process the buffer, // so we need to check the buffer length, and if it's an "unclean" close operation, let's give up to reuse the buffer if c.inputBuffer.Len() == 0 || onConnect != nil || onRequest != nil { @@ -108,7 +108,7 @@ func (c *connection) inputAck(n int) (err error) { c.maxSize = mallocMax } - var needTrigger = true + needTrigger := true if length == n { // first start onRequest needTrigger = c.onRequest() } diff --git a/connection_test.go b/connection_test.go index 65dd69ff..65d761f4 100644 --- a/connection_test.go +++ b/connection_test.go @@ -33,10 +33,10 @@ import ( ) func BenchmarkConnectionIO(b *testing.B) { - var dataSize = 1024 * 16 - var writeBuffer = make([]byte, dataSize) - var rfd, wfd = GetSysFdPairs() - var rconn, wconn = new(connection), new(connection) + dataSize := 1024 * 16 + writeBuffer := make([]byte, dataSize) + rfd, wfd := GetSysFdPairs() + rconn, wconn := new(connection), new(connection) rconn.init(&netFD{fd: rfd}, &options{onRequest: func(ctx context.Context, connection Connection) error { read, _ := connection.Reader().Next(dataSize) _ = wconn.Reader().Release() @@ -57,13 +57,13 @@ func BenchmarkConnectionIO(b *testing.B) { } func TestConnectionWrite(t *testing.T) { - var cycle, caps = 10000, 256 - var msg, buf = make([]byte, caps), make([]byte, caps) + cycle, caps := 10000, 256 + msg, buf := make([]byte, caps), make([]byte, caps) var wg sync.WaitGroup wg.Add(1) var count int32 - var expect = int32(cycle * caps) - var opts = &options{} + expect := int32(cycle * caps) + opts := &options{} opts.onRequest = func(ctx context.Context, connection Connection) error { n, err := connection.Read(buf) MustNil(t, err) @@ -74,7 +74,7 @@ func TestConnectionWrite(t *testing.T) { } r, w := GetSysFdPairs() - var rconn, wconn = &connection{}, &connection{} + rconn, wconn := &connection{}, &connection{} rconn.init(&netFD{fd: r}, opts) wconn.init(&netFD{fd: w}, opts) @@ -91,10 +91,10 @@ func TestConnectionWrite(t *testing.T) { func TestConnectionLargeWrite(t *testing.T) { // ci machine don't have 4GB memory, so skip test t.Skipf("skip large write test for ci job") - var totalSize = 1024 * 1024 * 1024 * 4 + totalSize := 1024 * 1024 * 1024 * 4 var wg sync.WaitGroup wg.Add(1) - var opts = &options{} + opts := &options{} opts.onRequest = func(ctx context.Context, connection Connection) error { if connection.Reader().Len() < totalSize { return nil @@ -108,7 +108,7 @@ func TestConnectionLargeWrite(t *testing.T) { } r, w := GetSysFdPairs() - var rconn, wconn = &connection{}, &connection{} + rconn, wconn := &connection{}, &connection{} rconn.init(&netFD{fd: r}, opts) wconn.init(&netFD{fd: w}, opts) @@ -124,15 +124,15 @@ func TestConnectionLargeWrite(t *testing.T) { func TestConnectionRead(t *testing.T) { r, w := GetSysFdPairs() - var rconn, wconn = &connection{}, &connection{} + rconn, wconn := &connection{}, &connection{} err := rconn.init(&netFD{fd: r}, nil) MustNil(t, err) err = wconn.init(&netFD{fd: w}, nil) MustNil(t, err) - var size = 256 - var cycleTime = 1000 - var msg = make([]byte, size) + size := 256 + cycleTime := 1000 + msg := make([]byte, size) var wg sync.WaitGroup wg.Add(1) go func() { @@ -162,14 +162,14 @@ func TestConnectionNoCopyReadString(t *testing.T) { }() r, w := GetSysFdPairs() - var rconn, wconn = &connection{}, &connection{} + rconn, wconn := &connection{}, &connection{} rconn.init(&netFD{fd: r}, nil) wconn.init(&netFD{fd: w}, nil) - var size, cycleTime = 256, 100 + size, cycleTime := 256, 100 // record historical data, check data consistency - var readBucket = make([]string, cycleTime) - var trigger = make(chan struct{}) + readBucket := make([]string, cycleTime) + trigger := make(chan struct{}) // read data go func() { @@ -188,7 +188,7 @@ func TestConnectionNoCopyReadString(t *testing.T) { }() // write data - var msg = make([]byte, size) + msg := make([]byte, size) for i := 0; i < cycleTime; i++ { byt := 'a' + byte(i%26) for c := 0; c < size; c++ { @@ -213,15 +213,15 @@ func TestConnectionNoCopyReadString(t *testing.T) { func TestConnectionReadAfterClosed(t *testing.T) { r, w := GetSysFdPairs() - var rconn = &connection{} + rconn := &connection{} rconn.init(&netFD{fd: r}, nil) - var size = 256 - var msg = make([]byte, size) + size := 256 + msg := make([]byte, size) var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() - var buf, err = rconn.Reader().Next(size) + buf, err := rconn.Reader().Next(size) MustNil(t, err) Equal(t, len(buf), size) }() @@ -233,10 +233,10 @@ func TestConnectionReadAfterClosed(t *testing.T) { func TestConnectionWaitReadHalfPacket(t *testing.T) { r, w := GetSysFdPairs() - var rconn = &connection{} + rconn := &connection{} rconn.init(&netFD{fd: r}, nil) - var size = pagesize * 2 - var msg = make([]byte, size) + size := pagesize * 2 + msg := make([]byte, size) // write half packet syscall.Write(w, msg[:size/2]) @@ -250,7 +250,7 @@ func TestConnectionWaitReadHalfPacket(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - var buf, err = rconn.Reader().Next(size) + buf, err := rconn.Reader().Next(size) Equal(t, atomic.LoadInt64(&rconn.waitReadSize), int64(0)) MustNil(t, err) Equal(t, len(buf), size) @@ -323,7 +323,7 @@ func TestLargeBufferWrite(t *testing.T) { wg.Add(1) bufferSize := 2 * 1024 * 1024 // 2MB round := 128 - //start large buffer writing + // start large buffer writing go func() { defer wg.Done() for i := 1; i <= round+1; i++ { @@ -410,11 +410,11 @@ func TestConnectionLargeMemory(t *testing.T) { runtime.ReadMemStats(&start) r, w := GetSysFdPairs() - var rconn = &connection{} + rconn := &connection{} rconn.init(&netFD{fd: r}, nil) var wg sync.WaitGroup - var rn, wn = 1024, 1 * 1024 * 1024 + rn, wn := 1024, 1*1024*1024 wg.Add(1) go func() { @@ -423,7 +423,7 @@ func TestConnectionLargeMemory(t *testing.T) { MustNil(t, err) }() - var msg = make([]byte, rn) + msg := make([]byte, rn) for i := 0; i < wn/rn; i++ { n, err := syscall.Write(w, msg) if err != nil { @@ -441,6 +441,7 @@ func TestConnectionLargeMemory(t *testing.T) { // TestSetTCPNoDelay is used to verify the connection initialization set the TCP_NODELAY correctly func TestSetTCPNoDelay(t *testing.T) { fd, err := sysSocket(syscall.AF_INET, syscall.SOCK_STREAM, 0) + MustNil(t, err) conn := &connection{} conn.init(&netFD{network: "tcp", fd: fd}, nil) @@ -602,9 +603,10 @@ func TestParallelShortConnection(t *testing.T) { if err != nil { return err } - //t.Logf("conn[%s] received: %d, active: %v", connection.RemoteAddr(), len(data), connection.IsActive()) + // t.Logf("conn[%s] received: %d, active: %v", connection.RemoteAddr(), len(data), connection.IsActive()) return nil }) + MustNil(t, err) go func() { el.Serve(ln) }() @@ -654,7 +656,7 @@ func TestConnectionServerClose(t *testing.T) { var wg sync.WaitGroup el, err := NewEventLoop( func(ctx context.Context, connection Connection) error { - //t.Logf("server.OnRequest: addr=%s", connection.RemoteAddr()) + // t.Logf("server.OnRequest: addr=%s", connection.RemoteAddr()) defer wg.Done() buf, err := connection.Reader().Next(len(PONG)) // pong Equal(t, string(buf), PONG) @@ -677,18 +679,20 @@ func TestConnectionServerClose(t *testing.T) { err = connection.Writer().Flush() MustNil(t, err) connection.AddCloseCallback(func(connection Connection) error { - //t.Logf("server.CloseCallback: addr=%s", connection.RemoteAddr()) + // t.Logf("server.CloseCallback: addr=%s", connection.RemoteAddr()) wg.Done() return nil }) return ctx }), WithOnPrepare(func(connection Connection) context.Context { - //t.Logf("server.OnPrepare: addr=%s", connection.RemoteAddr()) + // t.Logf("server.OnPrepare: addr=%s", connection.RemoteAddr()) defer wg.Done() return context.WithValue(context.Background(), "prepare", "true") }), ) + MustNil(t, err) + defer el.Shutdown(context.Background()) go func() { err := el.Serve(ln) @@ -725,7 +729,7 @@ func TestConnectionServerClose(t *testing.T) { err = conn.SetOnRequest(clientOnRequest) MustNil(t, err) conn.AddCloseCallback(func(connection Connection) error { - //t.Logf("client.CloseCallback: addr=%s", connection.LocalAddr()) + // t.Logf("client.CloseCallback: addr=%s", connection.LocalAddr()) defer wg.Done() return nil }) diff --git a/eventloop.go b/eventloop.go index 425cd95f..14a73b88 100644 --- a/eventloop.go +++ b/eventloop.go @@ -68,13 +68,14 @@ type OnPrepare func(connection Connection) context.Context // // An example usage in TCP Proxy scenario: // -// func onConnect(ctx context.Context, upstream netpoll.Connection) context.Context { -// downstream, _ := netpoll.DialConnection("tcp", downstreamAddr, time.Second) -// return context.WithValue(ctx, downstreamKey, downstream) -// } -// func onRequest(ctx context.Context, upstream netpoll.Connection) error { -// downstream := ctx.Value(downstreamKey).(netpoll.Connection) -// } +// func onConnect(ctx context.Context, upstream netpoll.Connection) context.Context { +// downstream, _ := netpoll.DialConnection("tcp", downstreamAddr, time.Second) +// return context.WithValue(ctx, downstreamKey, downstream) +// } +// +// func onRequest(ctx context.Context, upstream netpoll.Connection) error { +// downstream := ctx.Value(downstreamKey).(netpoll.Connection) +// } type OnConnect func(ctx context.Context, connection Connection) context.Context // OnDisconnect is called once connection is going to be closed. @@ -89,7 +90,7 @@ type OnDisconnect func(ctx context.Context, connection Connection) // func OnRequest(ctx context, connection Connection) error { // input := connection.Reader().Next(n) // handling input data... -// send, _ := connection.Writer().Malloc(l) +// send, _ := connection.Writer().Malloc(l) // copy(send, output) // connection.Flush() // return nil diff --git a/fd_operator_cache_test.go b/fd_operator_cache_test.go index a92f15fb..1865bc23 100644 --- a/fd_operator_cache_test.go +++ b/fd_operator_cache_test.go @@ -27,7 +27,7 @@ func TestPersistFDOperator(t *testing.T) { opcache := newOperatorCache() // init size := 2048 - var ops = make([]*FDOperator, size) + ops := make([]*FDOperator, size) for i := 0; i < size; i++ { op := opcache.alloc() op.FD = i diff --git a/net_dialer.go b/net_dialer.go index 74edd91c..c575a1a9 100644 --- a/net_dialer.go +++ b/net_dialer.go @@ -105,7 +105,7 @@ func (d *dialer) dialTCP(ctx context.Context, network, address string) (connecti } var firstErr error // The error from the first address is most relevant. - var tcpAddr = &TCPAddr{} + tcpAddr := &TCPAddr{} for _, ipaddr := range ipaddrs { tcpAddr.IP = ipaddr.IP tcpAddr.Port = portnum diff --git a/net_dialer_test.go b/net_dialer_test.go index 88a16af8..d92e6a95 100644 --- a/net_dialer_test.go +++ b/net_dialer_test.go @@ -118,7 +118,7 @@ func TestDialerFdAlloc(t *testing.T) { go func() { el1.Serve(ln) }() - var ctx1, cancel1 = context.WithTimeout(context.Background(), time.Second) + ctx1, cancel1 := context.WithTimeout(context.Background(), time.Second) defer cancel1() defer el1.Shutdown(ctx1) @@ -147,7 +147,7 @@ func TestFDClose(t *testing.T) { go func() { el1.Serve(ln) }() - var ctx1, cancel1 = context.WithTimeout(context.Background(), time.Second) + ctx1, cancel1 := context.WithTimeout(context.Background(), time.Second) defer cancel1() defer el1.Shutdown(ctx1) @@ -177,7 +177,7 @@ func TestDialerThenClose(t *testing.T) { go func() { el1.Serve(ln1) }() - var ctx1, cancel1 = context.WithTimeout(context.Background(), time.Second) + ctx1, cancel1 := context.WithTimeout(context.Background(), time.Second) defer cancel1() defer el1.Shutdown(ctx1) @@ -187,7 +187,7 @@ func TestDialerThenClose(t *testing.T) { go func() { el2.Serve(ln2) }() - var ctx2, cancel2 = context.WithTimeout(context.Background(), time.Second) + ctx2, cancel2 := context.WithTimeout(context.Background(), time.Second) defer cancel2() defer el2.Shutdown(ctx2) diff --git a/net_listener.go b/net_listener.go index e7f9edc1..523e6d43 100644 --- a/net_listener.go +++ b/net_listener.go @@ -89,7 +89,7 @@ func (ln *listener) Accept() (net.Conn, error) { return ln.UDPAccept() } // tcp - var fd, sa, err = syscall.Accept(ln.fd) + fd, sa, err := syscall.Accept(ln.fd) if err != nil { /* https://man7.org/linux/man-pages/man2/accept.2.html EAGAIN or EWOULDBLOCK @@ -104,7 +104,7 @@ func (ln *listener) Accept() (net.Conn, error) { } return nil, err } - var nfd = &netFD{} + nfd := &netFD{} nfd.fd = fd nfd.localAddr = ln.addr nfd.network = ln.addr.Network() diff --git a/net_netfd.go b/net_netfd.go index 81acc109..8b4c028f 100644 --- a/net_netfd.go +++ b/net_netfd.go @@ -20,14 +20,9 @@ import ( "time" ) -var ( - // aLongTimeAgo is a non-zero time, far in the past, used for - // immediate cancelation of dials. - aLongTimeAgo = time.Unix(1, 0) - // nonDeadline and noCancel are just zero values for - // readability with functions taking too many parameters. - noDeadline = time.Time{} -) +// nonDeadline and noCancel are just zero values for +// readability with functions taking too many parameters. +var noDeadline = time.Time{} type netFD struct { // file descriptor @@ -55,7 +50,7 @@ type netFD struct { } func newNetFD(fd, family, sotype int, net string) *netFD { - var ret = &netFD{} + ret := &netFD{} ret.fd = fd ret.network = net ret.family = family diff --git a/net_sock.go b/net_sock.go index a3d318c7..c6ec98e8 100644 --- a/net_sock.go +++ b/net_sock.go @@ -55,29 +55,29 @@ func internetSocket(ctx context.Context, net string, laddr, raddr sockaddr, soty // address family, both AF_INET and AF_INET6, and a wildcard address // like the following: // -// - A listen for a wildcard communication domain, "tcp" or -// "udp", with a wildcard address: If the platform supports -// both IPv6 and IPv4-mapped IPv6 communication capabilities, -// or does not support IPv4, we use a dual stack, AF_INET6 and -// IPV6_V6ONLY=0, wildcard address listen. The dual stack -// wildcard address listen may fall back to an IPv6-only, -// AF_INET6 and IPV6_V6ONLY=1, wildcard address listen. -// Otherwise we prefer an IPv4-only, AF_INET, wildcard address -// listen. +// - A listen for a wildcard communication domain, "tcp" or +// "udp", with a wildcard address: If the platform supports +// both IPv6 and IPv4-mapped IPv6 communication capabilities, +// or does not support IPv4, we use a dual stack, AF_INET6 and +// IPV6_V6ONLY=0, wildcard address listen. The dual stack +// wildcard address listen may fall back to an IPv6-only, +// AF_INET6 and IPV6_V6ONLY=1, wildcard address listen. +// Otherwise we prefer an IPv4-only, AF_INET, wildcard address +// listen. // -// - A listen for a wildcard communication domain, "tcp" or -// "udp", with an IPv4 wildcard address: same as above. +// - A listen for a wildcard communication domain, "tcp" or +// "udp", with an IPv4 wildcard address: same as above. // -// - A listen for a wildcard communication domain, "tcp" or -// "udp", with an IPv6 wildcard address: same as above. +// - A listen for a wildcard communication domain, "tcp" or +// "udp", with an IPv6 wildcard address: same as above. // -// - A listen for an IPv4 communication domain, "tcp4" or "udp4", -// with an IPv4 wildcard address: We use an IPv4-only, AF_INET, -// wildcard address listen. +// - A listen for an IPv4 communication domain, "tcp4" or "udp4", +// with an IPv4 wildcard address: We use an IPv4-only, AF_INET, +// wildcard address listen. // -// - A listen for an IPv6 communication domain, "tcp6" or "udp6", -// with an IPv6 wildcard address: We use an IPv6-only, AF_INET6 -// and IPV6_V6ONLY=1, wildcard address listen. +// - A listen for an IPv6 communication domain, "tcp6" or "udp6", +// with an IPv6 wildcard address: We use an IPv6-only, AF_INET6 +// and IPV6_V6ONLY=1, wildcard address listen. // // Otherwise guess: If the addresses are IPv4 then returns AF_INET, // or else returns AF_INET6. It also returns a boolean value what diff --git a/netpoll_server.go b/netpoll_server.go index 248ae909..61ed09c2 100644 --- a/netpoll_server.go +++ b/netpoll_server.go @@ -66,7 +66,7 @@ func (s *server) Close(ctx context.Context) error { for { activeConn := 0 s.connections.Range(func(key, value interface{}) bool { - var conn, ok = value.(gracefulExit) + conn, ok := value.(gracefulExit) if !ok || conn.isIdle() { value.(Connection).Close() } else { @@ -162,12 +162,12 @@ func (s *server) OnHup(p Poll) error { func (s *server) onAccept(conn Conn) { // store & register connection - var nconn = new(connection) + nconn := new(connection) nconn.init(conn, s.opts) if !nconn.IsActive() { return } - var fd = conn.Fd() + fd := conn.Fd() nconn.AddCloseCallback(func(connection Connection) error { s.connections.Delete(fd) return nil diff --git a/netpoll_unix.go b/netpoll_unix.go index 4eb25a05..5dd1b7fb 100644 --- a/netpoll_unix.go +++ b/netpoll_unix.go @@ -155,7 +155,7 @@ func (evl *eventLoop) Serve(ln net.Listener) error { // Shutdown signals a shutdown a begins server closing. func (evl *eventLoop) Shutdown(ctx context.Context) error { evl.Lock() - var svr = evl.svr + svr := evl.svr evl.svr = nil evl.Unlock() diff --git a/netpoll_unix_test.go b/netpoll_unix_test.go index 24689fd0..c441af57 100644 --- a/netpoll_unix_test.go +++ b/netpoll_unix_test.go @@ -66,7 +66,7 @@ func Assert(t *testing.T, cond bool, val ...interface{}) { var testPort int32 = 10000 -// getTestAddress return a unique port for every tests, so all tests will not share a same listerner +// getTestAddress return a unique port for every tests, so all tests will not share a same listener func getTestAddress() string { return fmt.Sprintf("127.0.0.1:%d", atomic.AddInt32(&testPort, 1)) } @@ -80,9 +80,9 @@ func TestEqual(t *testing.T) { } func TestOnConnect(t *testing.T) { - var network, address = "tcp", getTestAddress() + network, address := "tcp", getTestAddress() req, resp := "ping", "pong" - var loop = newTestEventLoop(network, address, + loop := newTestEventLoop(network, address, func(ctx context.Context, connection Connection) error { return nil }, @@ -102,7 +102,7 @@ func TestOnConnect(t *testing.T) { } }), ) - var conn, err = DialConnection(network, address, time.Second) + conn, err := DialConnection(network, address, time.Second) MustNil(t, err) for i := 0; i < 1024; i++ { @@ -124,8 +124,8 @@ func TestOnConnect(t *testing.T) { } func TestOnConnectWrite(t *testing.T) { - var network, address = "tcp", getTestAddress() - var loop = newTestEventLoop(network, address, + network, address := "tcp", getTestAddress() + loop := newTestEventLoop(network, address, func(ctx context.Context, connection Connection) error { return nil }, @@ -135,7 +135,7 @@ func TestOnConnectWrite(t *testing.T) { return ctx }), ) - var conn, err = DialConnection(network, address, time.Second) + conn, err := DialConnection(network, address, time.Second) MustNil(t, err) s, err := conn.Reader().ReadString(5) MustNil(t, err) @@ -147,11 +147,11 @@ func TestOnConnectWrite(t *testing.T) { func TestOnDisconnect(t *testing.T) { type ctxKey struct{} - var network, address = "tcp", getTestAddress() + network, address := "tcp", getTestAddress() var canceled, closed int32 var conns int32 = 100 req := "ping" - var loop = newTestEventLoop(network, address, + loop := newTestEventLoop(network, address, func(ctx context.Context, connection Connection) error { cancelFunc, _ := ctx.Value(ctxKey{}).(context.CancelFunc) MustTrue(t, cancelFunc != nil) @@ -183,7 +183,7 @@ func TestOnDisconnect(t *testing.T) { ) for i := int32(0); i < conns; i++ { - var conn, err = DialConnection(network, address, time.Second) + conn, err := DialConnection(network, address, time.Second) MustNil(t, err) _, err = conn.Writer().WriteString(req) @@ -208,11 +208,11 @@ func TestOnDisconnect(t *testing.T) { func TestOnDisconnectWhenOnConnect(t *testing.T) { type ctxPrepareKey struct{} type ctxConnectKey struct{} - var network, address = "tcp", getTestAddress() + network, address := "tcp", getTestAddress() var conns int32 = 100 var wg sync.WaitGroup wg.Add(int(conns) * 3) - var loop = newTestEventLoop(network, address, + loop := newTestEventLoop(network, address, func(ctx context.Context, connection Connection) error { _, _ = connection.Reader().Next(connection.Reader().Len()) return nil @@ -243,7 +243,7 @@ func TestOnDisconnectWhenOnConnect(t *testing.T) { ) for i := int32(0); i < conns; i++ { - var conn, err = DialConnection(network, address, time.Second) + conn, err := DialConnection(network, address, time.Second) MustNil(t, err) err = conn.Close() t.Logf("Close: %v", conn.LocalAddr()) @@ -256,34 +256,34 @@ func TestOnDisconnectWhenOnConnect(t *testing.T) { } func TestGracefulExit(t *testing.T) { - var network, address = "tcp", getTestAddress() + network, address := "tcp", getTestAddress() // exit without processing connections - var eventLoop1 = newTestEventLoop(network, address, + eventLoop1 := newTestEventLoop(network, address, func(ctx context.Context, connection Connection) error { return nil }) - var _, err = DialConnection(network, address, time.Second) + _, err := DialConnection(network, address, time.Second) MustNil(t, err) err = eventLoop1.Shutdown(context.Background()) MustNil(t, err) // exit with processing connections trigger := make(chan struct{}) - var eventLoop2 = newTestEventLoop(network, address, + eventLoop2 := newTestEventLoop(network, address, func(ctx context.Context, connection Connection) error { <-trigger return nil }) for i := 0; i < 10; i++ { // connect success - var conn, err = DialConnection(network, address, time.Second) + conn, err := DialConnection(network, address, time.Second) MustNil(t, err) _, err = conn.Write(make([]byte, 16)) MustNil(t, err) } // shutdown timeout - var ctx2, cancel2 = context.WithTimeout(context.Background(), time.Millisecond*100) + ctx2, cancel2 := context.WithTimeout(context.Background(), time.Millisecond*100) defer cancel2() err = eventLoop2.Shutdown(ctx2) MustTrue(t, err != nil) @@ -295,30 +295,30 @@ func TestGracefulExit(t *testing.T) { // exit with read connections size := 16 - var eventLoop3 = newTestEventLoop(network, address, + eventLoop3 := newTestEventLoop(network, address, func(ctx context.Context, connection Connection) error { _, err := connection.Reader().Next(size) MustNil(t, err) return nil }) for i := 0; i < 10; i++ { - var conn, err = DialConnection(network, address, time.Second) + conn, err := DialConnection(network, address, time.Second) MustNil(t, err) if i%2 == 0 { _, err := conn.Write(make([]byte, size)) MustNil(t, err) } } - var ctx3, cancel3 = context.WithTimeout(context.Background(), 5*time.Second) + ctx3, cancel3 := context.WithTimeout(context.Background(), 5*time.Second) defer cancel3() err = eventLoop3.Shutdown(ctx3) MustNil(t, err) } func TestCloseCallbackWhenOnRequest(t *testing.T) { - var network, address = "tcp", getTestAddress() - var requested, closed = make(chan struct{}), make(chan struct{}) - var loop = newTestEventLoop(network, address, + network, address := "tcp", getTestAddress() + requested, closed := make(chan struct{}), make(chan struct{}) + loop := newTestEventLoop(network, address, func(ctx context.Context, connection Connection) error { _, err := connection.Reader().Next(connection.Reader().Len()) MustNil(t, err) @@ -331,7 +331,7 @@ func TestCloseCallbackWhenOnRequest(t *testing.T) { return nil }, ) - var conn, err = DialConnection(network, address, time.Second) + conn, err := DialConnection(network, address, time.Second) MustNil(t, err) _, err = conn.Writer().WriteString("hello") MustNil(t, err) @@ -347,9 +347,9 @@ func TestCloseCallbackWhenOnRequest(t *testing.T) { } func TestCloseCallbackWhenOnConnect(t *testing.T) { - var network, address = "tcp", getTestAddress() - var connected, closed = make(chan struct{}), make(chan struct{}) - var loop = newTestEventLoop(network, address, + network, address := "tcp", getTestAddress() + connected, closed := make(chan struct{}), make(chan struct{}) + loop := newTestEventLoop(network, address, nil, WithOnConnect(func(ctx context.Context, connection Connection) context.Context { err := connection.AddCloseCallback(func(connection Connection) error { @@ -361,7 +361,7 @@ func TestCloseCallbackWhenOnConnect(t *testing.T) { return ctx }), ) - var conn, err = DialConnection(network, address, time.Second) + conn, err := DialConnection(network, address, time.Second) MustNil(t, err) err = conn.Close() MustNil(t, err) @@ -374,11 +374,11 @@ func TestCloseCallbackWhenOnConnect(t *testing.T) { } func TestCloseConnWhenOnConnect(t *testing.T) { - var network, address = "tcp", "localhost:8888" + network, address := "tcp", "localhost:8888" conns := 10 var wg sync.WaitGroup wg.Add(conns) - var loop = newTestEventLoop(network, address, + loop := newTestEventLoop(network, address, nil, WithOnConnect(func(ctx context.Context, connection Connection) context.Context { defer wg.Done() @@ -392,7 +392,7 @@ func TestCloseConnWhenOnConnect(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - var conn, err = DialConnection(network, address, time.Second) + conn, err := DialConnection(network, address, time.Second) if err != nil { return } @@ -409,9 +409,9 @@ func TestCloseConnWhenOnConnect(t *testing.T) { } func TestServerReadAndClose(t *testing.T) { - var network, address = "tcp", getTestAddress() - var sendMsg = []byte("hello") - var loop = newTestEventLoop(network, address, + network, address := "tcp", getTestAddress() + sendMsg := []byte("hello") + loop := newTestEventLoop(network, address, func(ctx context.Context, connection Connection) error { _, err := connection.Reader().Next(len(sendMsg)) MustNil(t, err) @@ -421,7 +421,7 @@ func TestServerReadAndClose(t *testing.T) { }, ) - var conn, err = DialConnection(network, address, time.Second) + conn, err := DialConnection(network, address, time.Second) MustNil(t, err) _, err = conn.Writer().WriteBinary(sendMsg) MustNil(t, err) @@ -441,10 +441,10 @@ func TestServerReadAndClose(t *testing.T) { } func TestServerPanicAndClose(t *testing.T) { - var network, address = "tcp", getTestAddress() - var sendMsg = []byte("hello") + network, address := "tcp", getTestAddress() + sendMsg := []byte("hello") var paniced int32 - var loop = newTestEventLoop(network, address, + loop := newTestEventLoop(network, address, func(ctx context.Context, connection Connection) error { _, err := connection.Reader().Next(len(sendMsg)) MustNil(t, err) @@ -453,7 +453,7 @@ func TestServerPanicAndClose(t *testing.T) { }, ) - var conn, err = DialConnection(network, address, time.Second) + conn, err := DialConnection(network, address, time.Second) MustNil(t, err) _, err = conn.Writer().WriteBinary(sendMsg) MustNil(t, err) @@ -478,7 +478,7 @@ func TestClientWriteAndClose(t *testing.T) { packetsize, packetnum = 1000 * 5, 1 recvbytes int32 = 0 ) - var loop = newTestEventLoop(network, address, + loop := newTestEventLoop(network, address, func(ctx context.Context, connection Connection) error { buf, err := connection.Reader().Next(connection.Reader().Len()) if errors.Is(err, ErrConnClosed) { @@ -494,7 +494,7 @@ func TestClientWriteAndClose(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - var conn, err = DialConnection(network, address, time.Second) + conn, err := DialConnection(network, address, time.Second) MustNil(t, err) sendMsg := make([]byte, packetsize) for j := 0; j < packetnum; j++ { @@ -537,9 +537,9 @@ func TestServerAcceptWhenTooManyOpenFiles(t *testing.T) { MustNil(t, err) }() - var network, address = "tcp", getTestAddress() + network, address := "tcp", getTestAddress() var connected int32 - var loop = newTestEventLoop(network, address, + loop := newTestEventLoop(network, address, func(ctx context.Context, connection Connection) error { buf, err := connection.Reader().Next(connection.Reader().Len()) connection.Writer().WriteBinary(buf) @@ -576,7 +576,7 @@ func TestServerAcceptWhenTooManyOpenFiles(t *testing.T) { }() // we should use telnet manually - var connections = 1 + connections := 1 for atomic.LoadInt32(&connected) < int32(connections) { t.Logf("connected=%d", atomic.LoadInt32(&connected)) time.Sleep(time.Second) diff --git a/nocopy_linkbuffer.go b/nocopy_linkbuffer.go index 7843767f..a8c89755 100644 --- a/nocopy_linkbuffer.go +++ b/nocopy_linkbuffer.go @@ -31,17 +31,19 @@ const BinaryInplaceThreshold = block4k // LinkBufferCap that can be modified marks the minimum value of each node of LinkBuffer. var LinkBufferCap = block4k -var _ Reader = &LinkBuffer{} -var _ Writer = &LinkBuffer{} +var ( + _ Reader = &LinkBuffer{} + _ Writer = &LinkBuffer{} +) // NewLinkBuffer size defines the initial capacity, but there is no readable data. func NewLinkBuffer(size ...int) *LinkBuffer { - var buf = &LinkBuffer{} + buf := &LinkBuffer{} var l int if len(size) > 0 { l = size[0] } - var node = newLinkBufferNode(l) + node := newLinkBufferNode(l) buf.head, buf.read, buf.flush, buf.write = node, node, node, node return buf } @@ -343,7 +345,7 @@ func (b *UnsafeLinkBuffer) Slice(n int) (r Reader, err error) { return p, nil } // multiple nodes - var l = b.read.Len() + l := b.read.Len() node := b.read.Refer(l) b.read = b.read.next @@ -428,7 +430,7 @@ func (b *UnsafeLinkBuffer) Flush() (err error) { // Append implements Writer. func (b *UnsafeLinkBuffer) Append(w Writer) (err error) { - var buf, ok = w.(*LinkBuffer) + buf, ok := w.(*LinkBuffer) if !ok { return errors.New("unsupported writer which is not LinkBuffer") } @@ -683,7 +685,6 @@ func (b *UnsafeLinkBuffer) resetTail(maxSize int) { b.write.next = newLinkBufferNode(0) b.write = b.write.next b.flush = b.write - return } // indexByte returns the index of the first instance of c in buffer, or -1 if c is not present in buffer. @@ -778,7 +779,7 @@ func (b *LinkBuffer) memorySize() (bytes int) { // newLinkBufferNode create or reuse linkBufferNode. // Nodes with size <= 0 are marked as readonly, which means the node.buf is not allocated by this mcache. func newLinkBufferNode(size int) *linkBufferNode { - var node = linkedPool.Get().(*linkBufferNode) + node := linkedPool.Get().(*linkBufferNode) // reset node offset node.off, node.malloc, node.refer, node.mode = 0, 0, 1, defaultLinkBufferMode if size <= 0 { @@ -824,7 +825,6 @@ func (node *linkBufferNode) Reset() { } node.off, node.malloc = 0, 0 node.buf = node.buf[:0] - return } func (node *linkBufferNode) Next(n int) (p []byte) { diff --git a/nocopy_linkbuffer_test.go b/nocopy_linkbuffer_test.go index 1e84fabb..c731d76b 100644 --- a/nocopy_linkbuffer_test.go +++ b/nocopy_linkbuffer_test.go @@ -108,7 +108,6 @@ func TestLinkBufferGetBytes(t *testing.T) { actualLen += len(bs[i]) } Equal(t, actualLen, expectedLen) - } // TestLinkBufferWithZero test more case with n is invalid. @@ -480,8 +479,8 @@ func TestLinkBufferWriteBinary(t *testing.T) { LinkBufferCap = 8 // new b: cap=16, len=9 - var b = make([]byte, 16) - var buf = NewLinkBuffer() + b := make([]byte, 16) + buf := NewLinkBuffer() buf.WriteBinary(b[:9]) buf.Flush() @@ -489,7 +488,7 @@ func TestLinkBufferWriteBinary(t *testing.T) { // WriteBinary/Malloc etc. cannot start from b[9:] buf.WriteBinary([]byte{1}) Equal(t, b[9], byte(0)) - var bs, err = buf.Malloc(1) + bs, err := buf.Malloc(1) MustNil(t, err) bs[0] = 2 buf.Flush() @@ -500,7 +499,7 @@ func TestLinkBufferWriteDirect(t *testing.T) { // clean & new LinkBufferCap = 32 - var buf = NewLinkBuffer() + buf := NewLinkBuffer() bt, _ := buf.Malloc(32) bt[0] = 'a' bt[1] = 'b' @@ -688,7 +687,6 @@ func BenchmarkLinkBufferConcurrentReadWrite(b *testing.B) { buf.Release() } } - }) } @@ -831,14 +829,14 @@ func BenchmarkLinkBufferPoolGet(b *testing.B) { } func BenchmarkCopyString(b *testing.B) { - var s = make([]byte, 128*1024) + s := make([]byte, 128*1024) // benchmark b.ReportAllocs() b.SetParallelism(100) b.ResetTimer() b.RunParallel(func(pb *testing.PB) { - var v = make([]byte, 1024) + v := make([]byte, 1024) for pb.Next() { copy(v, s) } @@ -855,7 +853,7 @@ func BenchmarkLinkBufferNoCopyRead(b *testing.B) { b.ReportAllocs() b.ResetTimer() b.RunParallel(func(pb *testing.PB) { - var buffer = NewLinkBuffer(pagesize) + buffer := NewLinkBuffer(pagesize) for pb.Next() { buf, err := buffer.Malloc(totalSize) if len(buf) != totalSize || err != nil { diff --git a/poll_default.go b/poll_default.go index b35ff5a6..6f9b413f 100644 --- a/poll_default.go +++ b/poll_default.go @@ -56,11 +56,10 @@ func (p *defaultPoll) onhups() { // readall read all left data before close connection func readall(op *FDOperator, br barrier) (total int, err error) { - var bs = br.bs - var ivs = br.ivs + ivs := br.ivs var n int for { - bs = op.Inputs(br.bs) + bs := op.Inputs(br.bs) if len(bs) == 0 { return total, nil } diff --git a/poll_default_bsd.go b/poll_default_bsd.go index 9c8aa8c9..8e8dfe00 100644 --- a/poll_default_bsd.go +++ b/poll_default_bsd.go @@ -60,8 +60,8 @@ type defaultPoll struct { // Wait implements Poll. func (p *defaultPoll) Wait() error { // init - var size, caps = 1024, barriercap - var events, barriers = make([]syscall.Kevent_t, size), make([]barrier, size) + size, caps := 1024, barriercap + events, barriers := make([]syscall.Kevent_t, size), make([]barrier, size) for i := range barriers { barriers[i].bs = make([][]byte, caps) barriers[i].ivs = make([]syscall.Iovec, caps) @@ -78,14 +78,14 @@ func (p *defaultPoll) Wait() error { return err } for i := 0; i < n; i++ { - var fd = int(events[i].Ident) + fd := int(events[i].Ident) // trigger if fd == 0 { // clean trigger atomic.StoreUint32(&p.trigger, 0) continue } - var operator = p.getOperator(fd, unsafe.Pointer(&events[i].Udata)) + operator := p.getOperator(fd, unsafe.Pointer(&events[i].Udata)) if operator == nil || !operator.do() { continue } @@ -102,9 +102,9 @@ func (p *defaultPoll) Wait() error { operator.OnRead(p) } else { // only for connection - var bs = operator.Inputs(barriers[i].bs) + bs := operator.Inputs(barriers[i].bs) if len(bs) > 0 { - var n, err = ioread(operator.FD, bs, barriers[i].ivs) + n, err := ioread(operator.FD, bs, barriers[i].ivs) operator.InputAck(n) totalRead += n if err != nil { @@ -135,10 +135,10 @@ func (p *defaultPoll) Wait() error { operator.OnWrite(p) } else { // only for connection - var bs, supportZeroCopy = operator.Outputs(barriers[i].bs) + bs, supportZeroCopy := operator.Outputs(barriers[i].bs) if len(bs) > 0 { // TODO: Let the upper layer pass in whether to use ZeroCopy. - var n, err = iosend(operator.FD, bs, barriers[i].ivs, false && supportZeroCopy) + n, err := iosend(operator.FD, bs, barriers[i].ivs, false && supportZeroCopy) operator.OutputAck(n) if err != nil { p.appendHup(operator) @@ -157,7 +157,7 @@ func (p *defaultPoll) Wait() error { // TODO: Close will bad file descriptor here func (p *defaultPoll) Close() error { - var err = syscall.Close(p.fd) + err := syscall.Close(p.fd) return err } @@ -176,7 +176,7 @@ func (p *defaultPoll) Trigger() error { // Control implements Poll. func (p *defaultPoll) Control(operator *FDOperator, event PollEvent) error { - var evs = make([]syscall.Kevent_t, 1) + evs := make([]syscall.Kevent_t, 1) evs[0].Ident = uint64(operator.FD) p.setOperator(unsafe.Pointer(&evs[0].Udata), operator) switch event { diff --git a/poll_default_bsd_norace.go b/poll_default_bsd_norace.go index 8a0266d0..49326e6a 100644 --- a/poll_default_bsd_norace.go +++ b/poll_default_bsd_norace.go @@ -29,5 +29,4 @@ func (p *defaultPoll) setOperator(ptr unsafe.Pointer, operator *FDOperator) { } func (p *defaultPoll) delOperator(operator *FDOperator) { - } diff --git a/poll_default_linux.go b/poll_default_linux.go index e1215fb5..a536d22c 100644 --- a/poll_default_linux.go +++ b/poll_default_linux.go @@ -28,16 +28,16 @@ func openPoll() (Poll, error) { } func openDefaultPoll() (*defaultPoll, error) { - var poll = new(defaultPoll) + poll := new(defaultPoll) poll.buf = make([]byte, 8) - var p, err = EpollCreate(0) + p, err := EpollCreate(0) if err != nil { return nil, err } poll.fd = p - var r0, _, e0 = syscall.Syscall(syscall.SYS_EVENTFD2, 0, 0, 0) + r0, _, e0 := syscall.Syscall(syscall.SYS_EVENTFD2, 0, 0, 0) if e0 != 0 { _ = syscall.Close(poll.fd) return nil, e0 @@ -63,7 +63,7 @@ type defaultPoll struct { wop *FDOperator // eventfd, wake epoll_wait buf []byte // read wfd trigger msg trigger uint32 // trigger flag - m sync.Map // only used in go:race + m sync.Map //nolint:unused // only used in go:race opcache *operatorCache // operator cache // fns for handle events Reset func(size, caps int) @@ -90,7 +90,7 @@ func (a *pollArgs) reset(size, caps int) { // Wait implements Poll. func (p *defaultPoll) Wait() (err error) { // init - var caps, msec, n = barriercap, -1, 0 + caps, msec, n := barriercap, -1, 0 p.Reset(128, caps) // wait for { @@ -153,9 +153,9 @@ func (p *defaultPoll) handler(events []epollevent) (closed bool) { operator.OnRead(p) } else if operator.Inputs != nil { // for connection - var bs = operator.Inputs(p.barriers[i].bs) + bs := operator.Inputs(p.barriers[i].bs) if len(bs) > 0 { - var n, err = ioread(operator.FD, bs, p.barriers[i].ivs) + n, err := ioread(operator.FD, bs, p.barriers[i].ivs) operator.InputAck(n) totalRead += n if err != nil { @@ -199,10 +199,10 @@ func (p *defaultPoll) handler(events []epollevent) (closed bool) { operator.OnWrite(p) } else if operator.Outputs != nil { // for connection - var bs, supportZeroCopy = operator.Outputs(p.barriers[i].bs) + bs, supportZeroCopy := operator.Outputs(p.barriers[i].bs) if len(bs) > 0 { // TODO: Let the upper layer pass in whether to use ZeroCopy. - var n, err = iosend(operator.FD, bs, p.barriers[i].ivs, false && supportZeroCopy) + n, err := iosend(operator.FD, bs, p.barriers[i].ivs, false && supportZeroCopy) operator.OutputAck(n) if err != nil { p.appendHup(operator) @@ -243,7 +243,7 @@ func (p *defaultPoll) Control(operator *FDOperator, event PollEvent) error { // op.inuse() op.unused() // op.FD -- T1 op.FD = 0 -- T2 // T1 and T2 may happen together - var fd = operator.FD + fd := operator.FD var op int var evt epollevent p.setOperator(unsafe.Pointer(&evt.data), operator) diff --git a/poll_default_linux_norace.go b/poll_default_linux_norace.go index 29d5e6be..3787071d 100644 --- a/poll_default_linux_norace.go +++ b/poll_default_linux_norace.go @@ -28,5 +28,4 @@ func (p *defaultPoll) setOperator(ptr unsafe.Pointer, operator *FDOperator) { } func (p *defaultPoll) delOperator(operator *FDOperator) { - } diff --git a/poll_default_linux_test.go b/poll_default_linux_test.go index 58517d47..c6441906 100644 --- a/poll_default_linux_test.go +++ b/poll_default_linux_test.go @@ -27,7 +27,7 @@ import ( ) func TestEpollEvent(t *testing.T) { - var epollfd, err = EpollCreate(0) + epollfd, err := EpollCreate(0) MustNil(t, err) defer syscall.Close(epollfd) @@ -83,6 +83,7 @@ func TestEpollEvent(t *testing.T) { MustNil(t, err) n, err = epollWaitUntil(epollfd, events, -1) MustNil(t, err) + Equal(t, n, 1) Equal(t, events[0].data, eventdata3) _, err = syscall.Read(rfd, recv) MustTrue(t, err == nil && string(recv) == string(send)) @@ -94,7 +95,7 @@ func TestEpollEvent(t *testing.T) { } func TestEpollWait(t *testing.T) { - var epollfd, err = EpollCreate(0) + epollfd, err := EpollCreate(0) MustNil(t, err) defer syscall.Close(epollfd) @@ -148,6 +149,7 @@ func TestEpollWait(t *testing.T) { rfd2, wfd2 := GetSysFdPairs() defer syscall.Close(wfd2) err = EpollCtl(epollfd, unix.EPOLL_CTL_ADD, rfd2, event) + MustNil(t, err) err = syscall.Close(rfd2) MustNil(t, err) _, err = epollWaitUntil(epollfd, events, -1) @@ -162,7 +164,7 @@ func TestEpollWait(t *testing.T) { } func TestEpollETClose(t *testing.T) { - var epollfd, err = EpollCreate(0) + epollfd, err := EpollCreate(0) MustNil(t, err) defer syscall.Close(epollfd) rfd, wfd := GetSysFdPairs() @@ -175,6 +177,7 @@ func TestEpollETClose(t *testing.T) { // EPOLL: init state err = EpollCtl(epollfd, unix.EPOLL_CTL_ADD, rfd, event) + MustNil(t, err) _, err = epollWaitUntil(epollfd, events, -1) MustNil(t, err) Assert(t, events[0].events&syscall.EPOLLIN == 0) @@ -196,6 +199,7 @@ func TestEpollETClose(t *testing.T) { // EPOLLIN and EPOLLOUT rfd, wfd = GetSysFdPairs() err = EpollCtl(epollfd, unix.EPOLL_CTL_ADD, rfd, event) + MustNil(t, err) err = syscall.Close(wfd) MustNil(t, err) n, err = epollWaitUntil(epollfd, events, 100) @@ -212,7 +216,7 @@ func TestEpollETClose(t *testing.T) { } func TestEpollETDel(t *testing.T) { - var epollfd, err = EpollCreate(0) + epollfd, err := EpollCreate(0) MustNil(t, err) defer syscall.Close(epollfd) rfd, wfd := GetSysFdPairs() @@ -244,7 +248,7 @@ func TestEpollConnectSameFD(t *testing.T) { Port: 12345, Addr: [4]byte{127, 0, 0, 1}, } - var loop = newTestEventLoop("tcp", "127.0.0.1:12345", + loop := newTestEventLoop("tcp", "127.0.0.1:12345", func(ctx context.Context, connection Connection) error { _, err := connection.Reader().Next(connection.Reader().Len()) return err @@ -252,7 +256,7 @@ func TestEpollConnectSameFD(t *testing.T) { ) defer loop.Shutdown(context.Background()) - var epollfd, err = EpollCreate(0) + epollfd, err := EpollCreate(0) MustNil(t, err) defer syscall.Close(epollfd) events := make([]epollevent, 128) @@ -287,8 +291,8 @@ func TestEpollConnectSameFD(t *testing.T) { Assert(t, events[0].events&syscall.EPOLLRDHUP == 0) Assert(t, events[0].events&syscall.EPOLLERR == 0) // forget to del fd - //err = EpollCtl(epollfd, unix.EPOLL_CTL_DEL, fd1, event1) - //MustNil(t, err) + // err = EpollCtl(epollfd, unix.EPOLL_CTL_DEL, fd1, event1) + // MustNil(t, err) err = syscall.Close(fd1) // close fd1 MustNil(t, err) diff --git a/poll_manager.go b/poll_manager.go index 602250e4..43c8e31f 100644 --- a/poll_manager.go +++ b/poll_manager.go @@ -88,7 +88,7 @@ func (m *manager) Run() (err error) { if numLoops == len(m.polls) { return nil } - var polls = make([]Poll, numLoops) + polls := make([]Poll, numLoops) if numLoops < len(m.polls) { // shrink polls copy(polls, m.polls[:numLoops]) diff --git a/poll_manager_test.go b/poll_manager_test.go index f61c5282..6a592293 100644 --- a/poll_manager_test.go +++ b/poll_manager_test.go @@ -25,13 +25,13 @@ import ( func TestPollManager(t *testing.T) { r, w := GetSysFdPairs() - var rconn, wconn = &connection{}, &connection{} + rconn, wconn := &connection{}, &connection{} err := rconn.init(&netFD{fd: r}, nil) MustNil(t, err) err = wconn.init(&netFD{fd: w}, nil) MustNil(t, err) - var msg = []byte("hello world") + msg := []byte("hello world") n, err := wconn.Write(msg) MustNil(t, err) Equal(t, n, len(msg)) @@ -65,8 +65,9 @@ func TestPollManagerSetNumLoops(t *testing.T) { t.Logf("old=%d, new=%d", startGs, newGs) // change pollers + numLoops := 100 oldGs := newGs - err := pm.SetNumLoops(100) + err := pm.SetNumLoops(numLoops) MustNil(t, err) newGs = runtime.NumGoroutine() t.Logf("old=%d, new=%d", oldGs, newGs) @@ -75,8 +76,8 @@ func TestPollManagerSetNumLoops(t *testing.T) { // trigger polls adjustment var wg sync.WaitGroup finish := make(chan struct{}) - oldGs = startGs + 32 // 32 self goroutines - for i := 0; i < 32; i++ { + moreGs := 32 + for i := 0; i < moreGs; i++ { wg.Add(1) go func() { poll := pm.Pick() @@ -87,5 +88,6 @@ func TestPollManagerSetNumLoops(t *testing.T) { }() } wg.Wait() + Assert(t, startGs+numLoops+moreGs == runtime.NumGoroutine()) close(finish) } diff --git a/poll_test.go b/poll_test.go index b3c7f2e8..01395955 100644 --- a/poll_test.go +++ b/poll_test.go @@ -30,8 +30,8 @@ import ( func TestPollTrigger(t *testing.T) { t.Skip() var trigger int - var stop = make(chan error) - var p, err = openDefaultPoll() + stop := make(chan error) + p, err := openDefaultPoll() MustNil(t, err) go func() { @@ -54,28 +54,28 @@ func TestPollTrigger(t *testing.T) { func TestPollMod(t *testing.T) { var rn, wn, hn int32 - var read = func(p Poll) error { + read := func(p Poll) error { atomic.AddInt32(&rn, 1) return nil } - var write = func(p Poll) error { + write := func(p Poll) error { atomic.AddInt32(&wn, 1) return nil } - var hup = func(p Poll) error { + hup := func(p Poll) error { atomic.AddInt32(&hn, 1) return nil } - var stop = make(chan error) - var p, err = openDefaultPoll() + stop := make(chan error) + p, err := openDefaultPoll() MustNil(t, err) go func() { stop <- p.Wait() }() - var rfd, wfd = GetSysFdPairs() - var rop = &FDOperator{FD: rfd, OnRead: read, OnWrite: write, OnHup: hup, poll: p} - var wop = &FDOperator{FD: wfd, OnRead: read, OnWrite: write, OnHup: hup, poll: p} + rfd, wfd := GetSysFdPairs() + rop := &FDOperator{FD: rfd, OnRead: read, OnWrite: write, OnHup: hup, poll: p} + wop := &FDOperator{FD: wfd, OnRead: read, OnWrite: write, OnHup: hup, poll: p} var r, w, h int32 r, w, h = atomic.LoadInt32(&rn), atomic.LoadInt32(&wn), atomic.LoadInt32(&hn) Assert(t, r == 0 && w == 0 && h == 0, r, w, h) @@ -115,7 +115,7 @@ func TestPollMod(t *testing.T) { } func TestPollClose(t *testing.T) { - var p, err = openDefaultPoll() + p, err := openDefaultPoll() MustNil(t, err) var wg sync.WaitGroup wg.Add(1) @@ -129,9 +129,9 @@ func TestPollClose(t *testing.T) { func BenchmarkPollMod(b *testing.B) { b.StopTimer() - var p, _ = openDefaultPoll() + p, _ := openDefaultPoll() r, _ := GetSysFdPairs() - var operator = &FDOperator{FD: r} + operator := &FDOperator{FD: r} p.Control(operator, PollReadable) // benchmark diff --git a/sys_epoll_linux.go b/sys_epoll_linux.go index 528c34ed..1047652b 100644 --- a/sys_epoll_linux.go +++ b/sys_epoll_linux.go @@ -51,7 +51,7 @@ func EpollCtl(epfd int, op int, fd int, event *epollevent) (err error) { // EpollWait implements epoll_wait. func EpollWait(epfd int, events []epollevent, msec int) (n int, err error) { var r0 uintptr - var _p0 = unsafe.Pointer(&events[0]) + _p0 := unsafe.Pointer(&events[0]) if msec == 0 { r0, _, err = syscall.RawSyscall6(syscall.SYS_EPOLL_WAIT, uintptr(epfd), uintptr(_p0), uintptr(len(events)), 0, 0, 0) } else { diff --git a/sys_epoll_linux_arm64.go b/sys_epoll_linux_arm64.go index e8d6094d..082b52cb 100644 --- a/sys_epoll_linux_arm64.go +++ b/sys_epoll_linux_arm64.go @@ -49,7 +49,7 @@ func EpollCtl(epfd int, op int, fd int, event *epollevent) (err error) { // EpollWait implements epoll_wait. func EpollWait(epfd int, events []epollevent, msec int) (n int, err error) { var r0 uintptr - var _p0 = unsafe.Pointer(&events[0]) + _p0 := unsafe.Pointer(&events[0]) if msec == 0 { r0, _, err = syscall.RawSyscall6(syscall.SYS_EPOLL_PWAIT, uintptr(epfd), uintptr(_p0), uintptr(len(events)), 0, 0, 0) } else { diff --git a/sys_epoll_linux_loong64.go b/sys_epoll_linux_loong64.go index ecf36c13..24f98569 100644 --- a/sys_epoll_linux_loong64.go +++ b/sys_epoll_linux_loong64.go @@ -52,7 +52,7 @@ func EpollCtl(epfd int, op int, fd int, event *epollevent) (err error) { // EpollWait implements epoll_wait. func EpollWait(epfd int, events []epollevent, msec int) (n int, err error) { var r0 uintptr - var _p0 = unsafe.Pointer(&events[0]) + _p0 := unsafe.Pointer(&events[0]) if msec == 0 { r0, _, err = syscall.RawSyscall6(syscall.SYS_EPOLL_PWAIT, uintptr(epfd), uintptr(_p0), uintptr(len(events)), 0, 0, 0) } else { diff --git a/sys_exec_test.go b/sys_exec_test.go index f35bb6e0..2f2f033f 100644 --- a/sys_exec_test.go +++ b/sys_exec_test.go @@ -26,7 +26,7 @@ import ( func TestIovecs(t *testing.T) { var got int var bs [][]byte - var ivs = make([]syscall.Iovec, 4) + ivs := make([]syscall.Iovec, 4) // case 1 bs = [][]byte{ @@ -75,7 +75,7 @@ func TestIovecs(t *testing.T) { func TestWritev(t *testing.T) { r, w := GetSysFdPairs() - var barrier = barrier{} + barrier := barrier{} barrier.bs = [][]byte{ []byte(""), // len=0 []byte("first line"), // len=10 @@ -86,7 +86,7 @@ func TestWritev(t *testing.T) { wn, err := writev(w, barrier.bs, barrier.ivs) MustNil(t, err) Equal(t, wn, 31) - var p = make([]byte, 50) + p := make([]byte, 50) rn, err := syscall.Read(r, p) MustNil(t, err) Equal(t, rn, 31) @@ -105,7 +105,7 @@ func TestReadv(t *testing.T) { w3, _ := syscall.Write(w, vs[2]) Equal(t, w1+w2+w3, 31) - var barrier = barrier{ + barrier := barrier{ bs: make([][]byte, 4), } res := [][]byte{ @@ -114,9 +114,7 @@ func TestReadv(t *testing.T) { make([]byte, 11), make([]byte, 10), } - for i := range res { - barrier.bs[i] = res[i] - } + copy(barrier.bs, res) barrier.ivs = make([]syscall.Iovec, len(barrier.bs)) rn, err := readv(r, barrier.bs, barrier.ivs) MustNil(t, err) @@ -128,7 +126,7 @@ func TestReadv(t *testing.T) { func TestSendmsg(t *testing.T) { r, w := GetSysFdPairs() - var barrier = barrier{} + barrier := barrier{} barrier.bs = [][]byte{ []byte(""), // len=0 []byte("first line"), // len=10 @@ -139,7 +137,7 @@ func TestSendmsg(t *testing.T) { wn, err := sendmsg(w, barrier.bs, barrier.ivs, false) MustNil(t, err) Equal(t, wn, 31) - var p = make([]byte, 50) + p := make([]byte, 50) rn, err := syscall.Read(r, p) MustNil(t, err) Equal(t, rn, 31) @@ -157,14 +155,13 @@ func BenchmarkWrite(b *testing.B) { for { syscall.Read(r, buffer) } - }() // benchmark b.ReportAllocs() b.StartTimer() for i := 0; i < b.N; i++ { - var wmsg = make([]byte, len(message)*5) + wmsg := make([]byte, len(message)*5) var n int for j := 0; j < size; j++ { n += copy(wmsg[n:], message) @@ -178,7 +175,7 @@ func BenchmarkWritev(b *testing.B) { r, w := GetSysFdPairs() message := "hello, world!" size := 5 - var barrier = barrier{} + barrier := barrier{} barrier.bs = make([][]byte, size) barrier.ivs = make([]syscall.Iovec, len(barrier.bs)) for i := range barrier.bs { @@ -190,7 +187,6 @@ func BenchmarkWritev(b *testing.B) { for { syscall.Read(r, buffer) } - }() // benchmark @@ -206,7 +202,7 @@ func BenchmarkSendmsg(b *testing.B) { r, w := GetSysFdPairs() message := "hello, world!" size := 5 - var barrier = barrier{} + barrier := barrier{} barrier.bs = make([][]byte, size) barrier.ivs = make([]syscall.Iovec, len(barrier.bs)) for i := range barrier.bs { @@ -218,7 +214,6 @@ func BenchmarkSendmsg(b *testing.B) { for { syscall.Read(r, buffer) } - }() // benchmark @@ -244,14 +239,13 @@ func BenchmarkRead(b *testing.B) { for { syscall.Write(w, wmsg) } - }() // benchmark b.ReportAllocs() b.StartTimer() for i := 0; i < b.N; i++ { - var buffer = make([]byte, size*len(message)) + buffer := make([]byte, size*len(message)) syscall.Read(r, buffer) } } @@ -261,7 +255,7 @@ func BenchmarkReadv(b *testing.B) { r, w := GetSysFdPairs() message := "hello, world!" size := 5 - var barrier = barrier{} + barrier := barrier{} barrier.bs = make([][]byte, size) barrier.ivs = make([]syscall.Iovec, len(barrier.bs)) for i := range barrier.bs { @@ -272,7 +266,6 @@ func BenchmarkReadv(b *testing.B) { for { writeAll(w, []byte(message)) } - }() // benchmark diff --git a/sys_sendmsg_bsd.go b/sys_sendmsg_bsd.go index b808a5d2..1a001a9b 100644 --- a/sys_sendmsg_bsd.go +++ b/sys_sendmsg_bsd.go @@ -31,7 +31,7 @@ func sendmsg(fd int, bs [][]byte, ivs []syscall.Iovec, zerocopy bool) (n int, er if iovLen == 0 { return 0, nil } - var msghdr = syscall.Msghdr{ + msghdr := syscall.Msghdr{ Iov: &ivs[0], Iovlen: int32(iovLen), } diff --git a/sys_sendmsg_linux.go b/sys_sendmsg_linux.go index a5a63c58..cd24fadc 100644 --- a/sys_sendmsg_linux.go +++ b/sys_sendmsg_linux.go @@ -36,7 +36,7 @@ func sendmsg(fd int, bs [][]byte, ivs []syscall.Iovec, zerocopy bool) (n int, er if iovLen == 0 { return 0, nil } - var msghdr = syscall.Msghdr{ + msghdr := syscall.Msghdr{ Iov: &ivs[0], Iovlen: uint64(iovLen), }