diff --git a/pkg/network/server.go b/pkg/network/server.go index ba8e99edde..7d0c47aed6 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -319,7 +319,6 @@ func (s *Server) Shutdown() { } close(s.quit) <-s.relayFin - _ = s.log.Sync() } diff --git a/pkg/network/server_test.go b/pkg/network/server_test.go index 2ca95885fa..6590da7821 100644 --- a/pkg/network/server_test.go +++ b/pkg/network/server_test.go @@ -133,7 +133,7 @@ func TestServerRegisterPeer(t *testing.T) { ps[i].version = &payload.Version{Nonce: uint32(i), UserAgent: []byte("fake")} } - startWithCleanup(t, s) + go s.Start() s.register <- ps[0] require.Eventually(t, func() bool { return 1 == s.PeerCount() }, time.Second, time.Millisecond*10) @@ -173,6 +173,8 @@ func TestServerRegisterPeer(t *testing.T) { } return false }, time.Second, time.Millisecond*50) + s.Shutdown() + require.Eventually(t, func() bool { return s.log.Sync() == nil }, 2*time.Second, time.Millisecond*500) } func TestGetBlocksByIndex(t *testing.T) { @@ -384,20 +386,12 @@ func startTestServer(t *testing.T, protocolCfg ...func(*config.Blockchain)) *Ser } else { s = newTestServer(t, srvCfg) } - startWithCleanup(t, s) return s } -func startWithCleanup(t *testing.T, s *Server) { - go s.Start() - t.Cleanup(func() { - s.Shutdown() - }) -} - func TestBlock(t *testing.T) { s := startTestServer(t) - + go s.Start() s.chain.(*fakechain.FakeChain).Blockheight.Store(12344) require.Equal(t, uint32(12344), s.chain.BlockHeight()) @@ -405,13 +399,16 @@ func TestBlock(t *testing.T) { b.Index = 12345 s.testHandleMessage(t, nil, CMDBlock, b) require.Eventually(t, func() bool { return s.chain.BlockHeight() == 12345 }, 2*time.Second, time.Millisecond*500) + s.Shutdown() + + require.Eventually(t, func() bool { return s.log.Sync() == nil }, 2*time.Second, time.Millisecond*500) } func TestConsensus(t *testing.T) { s := newTestServer(t, ServerConfig{}) cons := new(fakeConsensus) s.AddConsensusService(cons, cons.OnPayload, cons.OnTransaction) - startWithCleanup(t, s) + go s.Start() s.chain.(*fakechain.FakeChain).Blockheight.Store(4) p := newLocalPeer(t, s) @@ -450,13 +447,15 @@ func TestConsensus(t *testing.T) { msg := newConsensusMessage(s.chain.BlockHeight()+1, s.chain.BlockHeight()+2) require.Error(t, s.handleMessage(p, msg)) }) + s.Shutdown() + require.Eventually(t, func() bool { return s.log.Sync() == nil }, 2*time.Second, time.Millisecond*500) } func TestTransaction(t *testing.T) { s := newTestServer(t, ServerConfig{}) cons := new(fakeConsensus) s.AddConsensusService(cons, cons.OnPayload, cons.OnTransaction) - startWithCleanup(t, s) + go s.Start() t.Run("good", func(t *testing.T) { tx := newDummyTx() @@ -502,6 +501,8 @@ func TestTransaction(t *testing.T) { return false }, 2*time.Second, time.Millisecond*500) }) + s.Shutdown() + require.Eventually(t, func() bool { return s.log.Sync() == nil }, 2*time.Second, time.Millisecond*500) } func (s *Server) testHandleGetData(t *testing.T, invType payload.InventoryType, hs, notFound []util.Uint256, found payload.Payload) { @@ -529,6 +530,7 @@ func (s *Server) testHandleGetData(t *testing.T, invType payload.InventoryType, func TestGetData(t *testing.T) { s := startTestServer(t) + go s.Start() s.chain.(*fakechain.FakeChain).UtilityTokenBalance = big.NewInt(1000000) t.Run("block", func(t *testing.T) { @@ -582,10 +584,13 @@ func TestGetData(t *testing.T) { notFound := []util.Uint256{hs[0], hs[2]} s.testHandleGetData(t, payload.P2PNotaryRequestType, hs, notFound, r) }) + s.Shutdown() + require.Eventually(t, func() bool { return s.log.Sync() == nil }, 2*time.Second, time.Millisecond*500) } func initGetBlocksTest(t *testing.T) (*Server, []*block.Block) { s := startTestServer(t) + go s.Start() var blocks []*block.Block for i := uint32(12); i <= 15; i++ { @@ -624,6 +629,8 @@ func TestGetBlocks(t *testing.T) { msg := NewMessage(CMDGetBlocks, &payload.GetBlocks{HashStart: util.Uint256{}, Count: -1}) require.Error(t, s.handleMessage(p, msg)) }) + s.Shutdown() + require.Eventually(t, func() bool { return s.log.Sync() == nil }, 2*time.Second, time.Millisecond*500) } func TestGetBlockByIndex(t *testing.T) { @@ -658,6 +665,8 @@ func TestGetBlockByIndex(t *testing.T) { expected = blocks s.testHandleMessage(t, p, CMDGetBlockByIndex, &payload.GetBlockByIndex{IndexStart: blocks[0].Index, Count: -1}) }) + s.Shutdown() + require.Eventually(t, func() bool { return s.log.Sync() == nil }, 2*time.Second, time.Millisecond*500) } func TestGetHeaders(t *testing.T) { @@ -700,10 +709,13 @@ func TestGetHeaders(t *testing.T) { t.Run("distribute requests between peers", func(t *testing.T) { testGetBlocksByIndex(t, CMDGetHeaders) }) + s.Shutdown() + require.Eventually(t, func() bool { return s.log.Sync() == nil }, 2*time.Second, time.Millisecond*500) } func TestInv(t *testing.T) { s := startTestServer(t) + go s.Start() s.chain.(*fakechain.FakeChain).UtilityTokenBalance = big.NewInt(10000000) var actual []util.Uint256 @@ -764,20 +776,26 @@ func TestInv(t *testing.T) { }) require.Equal(t, []util.Uint256{hs[0], hs[2]}, actual) }) + s.Shutdown() + require.Eventually(t, func() bool { return s.log.Sync() == nil }, 2*time.Second, time.Millisecond*500) } func TestHandleGetMPTData(t *testing.T) { t.Run("P2PStateExchange extensions off", func(t *testing.T) { s := startTestServer(t) + go s.Start() p := newLocalPeer(t, s) p.handshaked = 1 msg := NewMessage(CMDGetMPTData, &payload.MPTInventory{ Hashes: []util.Uint256{{1, 2, 3}}, }) require.Error(t, s.handleMessage(p, msg)) + s.Shutdown() + require.Eventually(t, func() bool { return s.log.Sync() == nil }, 2*time.Second, time.Millisecond*500) }) check := func(t *testing.T, s *Server) { + go s.Start() var recvResponse atomic.Bool r1 := random.Uint256() r2 := random.Uint256() @@ -806,6 +824,8 @@ func TestHandleGetMPTData(t *testing.T) { s.testHandleMessage(t, p, CMDGetMPTData, payload.NewMPTInventory(hs)) require.Eventually(t, recvResponse.Load, time.Second, time.Millisecond) + s.Shutdown() + require.Eventually(t, func() bool { return s.log.Sync() == nil }, 2*time.Second, time.Millisecond*500) } t.Run("KeepOnlyLatestState on", func(t *testing.T) { s := startTestServer(t, func(c *config.Blockchain) { @@ -826,12 +846,15 @@ func TestHandleGetMPTData(t *testing.T) { func TestHandleMPTData(t *testing.T) { t.Run("P2PStateExchange extensions off", func(t *testing.T) { s := startTestServer(t) + go s.Start() p := newLocalPeer(t, s) p.handshaked = 1 msg := NewMessage(CMDMPTData, &payload.MPTData{ Nodes: [][]byte{{1, 2, 3}}, }) require.Error(t, s.handleMessage(p, msg)) + s.Shutdown() + require.Eventually(t, func() bool { return s.log.Sync() == nil }, 2*time.Second, time.Millisecond*500) }) t.Run("good", func(t *testing.T) { @@ -844,7 +867,7 @@ func TestHandleMPTData(t *testing.T) { return nil }, } - startWithCleanup(t, s) + go s.Start() p := newLocalPeer(t, s) p.handshaked = 1 @@ -852,11 +875,14 @@ func TestHandleMPTData(t *testing.T) { Nodes: expected, }) require.NoError(t, s.handleMessage(p, msg)) + s.Shutdown() + require.Eventually(t, func() bool { return s.log.Sync() == nil }, 2*time.Second, time.Millisecond*500) }) } func TestRequestMPTNodes(t *testing.T) { s := startTestServer(t) + go s.Start() var actual []util.Uint256 p := newLocalPeer(t, s) @@ -898,10 +924,13 @@ func TestRequestMPTNodes(t *testing.T) { require.NoError(t, s.requestMPTNodes(p, expected)) require.Equal(t, expected[:payload.MaxMPTHashesCount], actual) }) + s.Shutdown() + require.Eventually(t, func() bool { return s.log.Sync() == nil }, 2*time.Second, time.Millisecond*500) } func TestRequestTx(t *testing.T) { s := startTestServer(t) + go s.Start() var actual []util.Uint256 p := newLocalPeer(t, s) @@ -943,10 +972,13 @@ func TestRequestTx(t *testing.T) { s.RequestTx(expected...) require.Equal(t, expected, actual) }) + s.Shutdown() + require.Eventually(t, func() bool { return s.log.Sync() == nil }, 2*time.Second, time.Millisecond*500) } func TestAddrs(t *testing.T) { s := startTestServer(t) + go s.Start() ips := make([][16]byte, 4) copy(ips[0][:], net.IPv4(1, 2, 3, 4)) @@ -991,6 +1023,8 @@ func TestAddrs(t *testing.T) { msg := NewMessage(CMDAddr, pl) require.Error(t, s.handleMessage(p, msg)) }) + s.Shutdown() + require.Eventually(t, func() bool { return s.log.Sync() == nil }, 2*time.Second, time.Millisecond*500) } type feerStub struct { @@ -1004,6 +1038,7 @@ func (f feerStub) GetBaseExecFee() int64 { return interop func TestMemPool(t *testing.T) { s := startTestServer(t) + go s.Start() var actual []util.Uint256 p := newLocalPeer(t, s) @@ -1024,6 +1059,8 @@ func TestMemPool(t *testing.T) { s.testHandleMessage(t, p, CMDMempool, payload.NullPayload{}) require.ElementsMatch(t, expected, actual) + s.Shutdown() + require.Eventually(t, func() bool { return s.log.Sync() == nil }, 2*time.Second, time.Millisecond*500) } func TestVerifyNotaryRequest(t *testing.T) { @@ -1081,20 +1118,27 @@ func TestVerifyNotaryRequest(t *testing.T) { func TestTryInitStateSync(t *testing.T) { t.Run("module inactive", func(t *testing.T) { s := startTestServer(t) + go s.Start() s.tryInitStateSync() + s.Shutdown() + require.Eventually(t, func() bool { return s.log.Sync() == nil }, 2*time.Second, time.Millisecond*500) }) t.Run("module already initialized", func(t *testing.T) { s := startTestServer(t) + go s.Start() ss := &fakechain.FakeStateSync{} ss.IsActiveFlag.Store(true) ss.IsInitializedFlag.Store(true) s.stateSync = ss s.tryInitStateSync() + s.Shutdown() + require.Eventually(t, func() bool { return s.log.Sync() == nil }, 2*time.Second, time.Millisecond*500) }) t.Run("good", func(t *testing.T) { s := startTestServer(t) + go s.Start() for _, h := range []uint32{10, 8, 7, 4, 11, 4} { p := newLocalPeer(t, s) p.handshaked = 1 @@ -1117,6 +1161,8 @@ func TestTryInitStateSync(t *testing.T) { ss.IsActiveFlag.Store(true) s.stateSync = ss s.tryInitStateSync() + s.Shutdown() + require.Eventually(t, func() bool { return s.log.Sync() == nil }, 2*time.Second, time.Millisecond*500) }) }