From 143a6964e150dac417f4a8713da05d812503a095 Mon Sep 17 00:00:00 2001 From: Linus Gasser Date: Thu, 7 Dec 2023 08:26:22 +0100 Subject: [PATCH 01/15] Adding RandomFilter Use RandomFilter to select a given amount of indexes randomly. Added suggestion from @PascalinDe --- mino/option.go | 29 +++++++++++++++++++++++++++++ mino/option_test.go | 34 ++++++++++++++++++++++++++-------- 2 files changed, 55 insertions(+), 8 deletions(-) diff --git a/mino/option.go b/mino/option.go index cc9858964..7d19c23a8 100644 --- a/mino/option.go +++ b/mino/option.go @@ -5,6 +5,7 @@ package mino import ( + "math/rand" "sort" ) @@ -68,6 +69,19 @@ func IndexFilter(index int) FilterUpdater { } } +// RejectFilter removes the given index +func RejectFilter(index int) FilterUpdater { + return func(filters *Filter) { + arr := filters.Indices + i := sort.IntSlice(arr).Search(index) + // do nothing if the element is not there + if i == len(arr) || arr[i] != index { + return + } + filters.Indices = append(filters.Indices[0:i], filters.Indices[i+1:]...) + } +} + // RangeFilter is a filter to include a range of indices. func RangeFilter(start, end int) FilterUpdater { return func(filters *Filter) { @@ -95,3 +109,18 @@ func ListFilter(indices []int) FilterUpdater { filters.Indices = indices } } + +// RandomFilter shuffles the elements of the Index and then limits the size +// of Indices to 'count'. +// If there are less than 'count' elements, only the shuffling takes place. +func RandomFilter(count int) FilterUpdater { + return func(filters *Filter) { + rand.Shuffle(len(filters.Indices), + func(i, j int) { + filters.Indices[i], filters.Indices[j] = filters.Indices[j], filters.Indices[i] + }) + if len(filters.Indices) > count { + filters.Indices = filters.Indices[:count] + } + } +} diff --git a/mino/option_test.go b/mino/option_test.go index 0c1b63e8a..15965b577 100644 --- a/mino/option_test.go +++ b/mino/option_test.go @@ -33,20 +33,38 @@ func TestFilter_RotateFilter(t *testing.T) { func TestFilter_IndexFilter(t *testing.T) { filters := &Filter{Indices: []int{}} - IndexFilter(1)(filters) - require.Equal(t, filters.Indices, []int{1}) + require.Equal(t, []int{1}, filters.Indices) IndexFilter(2)(filters) - require.Equal(t, filters.Indices, []int{1, 2}) + require.Equal(t, []int{1, 2}, filters.Indices) - IndexFilter(0)(filters) - require.Equal(t, filters.Indices, []int{0, 1, 2}) + IndexFilter(2)(filters) + require.Equal(t, []int{1, 2}, filters.Indices) IndexFilter(0)(filters) - IndexFilter(1)(filters) - IndexFilter(2)(filters) - require.Equal(t, filters.Indices, []int{0, 1, 2}) + require.Equal(t, []int{0, 1, 2}, filters.Indices) +} + +func TestFilter_RejectFilter(t *testing.T) { + filters := &Filter{Indices: []int{1, 2, 3, 4}} + + testCases := []struct { + filterVal int + expected []int + }{ + {0, []int{1, 2, 3, 4}}, + {5, []int{1, 2, 3, 4}}, + {2, []int{1, 3, 4}}, + {1, []int{3, 4}}, + {4, []int{3}}, + {3, []int{}}, + } + + for _, tc := range testCases { + RejectFilter(tc.filterVal)(filters) + require.Equal(t, tc.expected, filters.Indices) + } } func TestFilter_RangeFilter(t *testing.T) { From bca499a44e863e69eef441b451daaad4c964aee2 Mon Sep 17 00:00:00 2001 From: Linus Gasser Date: Thu, 7 Dec 2023 08:28:16 +0100 Subject: [PATCH 02/15] Protocol definitions for fastsync --- core/ordering/cosipbft/fastsync/json/json.go | 114 +++++++++++++++++ .../cosipbft/fastsync/json/json_test.go | 76 +++++++++++ .../ordering/cosipbft/fastsync/types/types.go | 118 ++++++++++++++++++ .../cosipbft/fastsync/types/types_test.go | 90 +++++++++++++ 4 files changed, 398 insertions(+) create mode 100644 core/ordering/cosipbft/fastsync/json/json.go create mode 100644 core/ordering/cosipbft/fastsync/json/json_test.go create mode 100644 core/ordering/cosipbft/fastsync/types/types.go create mode 100644 core/ordering/cosipbft/fastsync/types/types_test.go diff --git a/core/ordering/cosipbft/fastsync/json/json.go b/core/ordering/cosipbft/fastsync/json/json.go new file mode 100644 index 000000000..0c868561e --- /dev/null +++ b/core/ordering/cosipbft/fastsync/json/json.go @@ -0,0 +1,114 @@ +package json + +import ( + "encoding/json" + + "go.dedis.ch/dela/core/ordering/cosipbft/fastsync/types" + otypes "go.dedis.ch/dela/core/ordering/cosipbft/types" + "go.dedis.ch/dela/serde" + "golang.org/x/xerrors" +) + +func init() { + types.RegisterMessageFormat(serde.FormatJSON, msgFormat{}) +} + +// RequestCatchupMessageJSON is the JSON representation of a request catchup +// message. +type RequestCatchupMessageJSON struct { + SplitMessageSize uint64 + Latest uint64 +} + +// CatchupMessageJSON is the JSON representation of all the new BlockLinks. +type CatchupMessageJSON struct { + SplitMessage bool + BlockLinks []json.RawMessage +} + +// MessageJSON is the JSON representation of a sync message. +type MessageJSON struct { + Request *RequestCatchupMessageJSON `json:",omitempty"` + Catchup *CatchupMessageJSON `json:",omitempty"` +} + +// MsgFormat is the format engine to encode and decode sync messages. +// +// - implements serde.FormatEngine +type msgFormat struct{} + +// Encode implements serde.FormatEngine. It returns the JSON data of the message +// if appropriate, otherwise an error. +func (fmt msgFormat) Encode(ctx serde.Context, msg serde.Message) ([]byte, error) { + var m MessageJSON + + switch in := msg.(type) { + case types.RequestCatchupMessage: + request := RequestCatchupMessageJSON{ + SplitMessageSize: in.GetSplitMessageSize(), + Latest: in.GetLatest(), + } + + m.Request = &request + case types.CatchupMessage: + bls := in.GetBlockLinks() + catchup := CatchupMessageJSON{ + SplitMessage: in.GetSplitMessage(), + BlockLinks: make([]json.RawMessage, len(bls)), + } + + for i, bl := range bls { + blBuf, err := bl.Serialize(ctx) + if err != nil { + return nil, xerrors.Errorf("failed to encode blocklink: %v", err) + } + catchup.BlockLinks[i] = blBuf + } + + m.Catchup = &catchup + default: + return nil, xerrors.Errorf("unsupported message '%T'", msg) + } + + data, err := ctx.Marshal(m) + if err != nil { + return nil, xerrors.Errorf("marshal failed: %v", err) + } + + return data, nil +} + +// Decode implements serde.FormatEngine. It returns the message associated to +// the data if appropriate, otherwise an error. +func (fmt msgFormat) Decode(ctx serde.Context, data []byte) (serde.Message, error) { + m := MessageJSON{} + err := ctx.Unmarshal(data, &m) + if err != nil { + return nil, xerrors.Errorf("unmarshal failed: %v", err) + } + + if m.Request != nil { + return types.NewRequestCatchupMessage(m.Request.SplitMessageSize, m.Request.Latest), nil + } + + if m.Catchup != nil { + fac := ctx.GetFactory(types.LinkKey{}) + + factory, ok := fac.(otypes.LinkFactory) + if !ok { + return nil, xerrors.Errorf("invalid link factory '%T'", fac) + } + + var blockLinks = make([]otypes.BlockLink, len(m.Catchup.BlockLinks)) + for i, blBuf := range m.Catchup.BlockLinks { + blockLinks[i], err = factory.BlockLinkOf(ctx, blBuf) + if err != nil { + return nil, xerrors.Errorf("failed to decode blockLink: %v", err) + } + } + + return types.NewCatchupMessage(m.Catchup.SplitMessage, blockLinks), nil + } + + return nil, xerrors.New("message is empty") +} diff --git a/core/ordering/cosipbft/fastsync/json/json_test.go b/core/ordering/cosipbft/fastsync/json/json_test.go new file mode 100644 index 000000000..acb9117b0 --- /dev/null +++ b/core/ordering/cosipbft/fastsync/json/json_test.go @@ -0,0 +1,76 @@ +package json + +import ( + "testing" + + "github.com/stretchr/testify/require" + "go.dedis.ch/dela/core/ordering/cosipbft/fastsync/types" + otypes "go.dedis.ch/dela/core/ordering/cosipbft/types" + "go.dedis.ch/dela/serde" + "go.dedis.ch/dela/testing/fake" +) + +func TestMsgFormat_Encode(t *testing.T) { + format := msgFormat{} + + ctx := fake.NewContext() + + data, err := format.Encode(ctx, types.NewCatchupMessage(false, []otypes.BlockLink{fakeLink{}})) + require.NoError(t, err) + require.Equal(t, `{"Catchup":{"SplitMessage":false,"BlockLinks":[{}]}}`, string(data)) + + data, err = format.Encode(ctx, types.NewRequestCatchupMessage(1, 3)) + require.NoError(t, err) + require.Equal(t, `{"Request":{"SplitMessageSize":1,"Latest":3}}`, string(data)) + + _, err = format.Encode(ctx, fake.Message{}) + require.EqualError(t, err, "unsupported message 'fake.Message'") + + _, err = format.Encode(ctx, + types.NewCatchupMessage(false, []otypes.BlockLink{fakeLink{err: fake.GetError()}})) + require.EqualError(t, err, fake.Err("failed to encode blocklink")) +} + +func TestMsgFormat_Decode(t *testing.T) { + format := msgFormat{} + + ctx := fake.NewContext() + ctx = serde.WithFactory(ctx, types.LinkKey{}, fakeLinkFac{}) + + msg, err := format.Decode(ctx, []byte(`{"Catchup":{"SplitMessage":true,"BlockLinks":[{}]}}`)) + require.NoError(t, err) + require.Equal(t, types.NewCatchupMessage(true, []otypes.BlockLink{fakeLink{}}), msg) + + msg, err = format.Decode(ctx, []byte(`{"Request":{"SplitMessageSize":1,"Latest":3}}`)) + require.NoError(t, err) + require.Equal(t, types.NewRequestCatchupMessage(1, 3), msg) + + _, err = format.Decode(ctx, []byte(`{}`)) + require.EqualError(t, err, "message is empty") + + _, err = format.Decode(fake.NewBadContext(), []byte(`{}`)) + require.EqualError(t, err, fake.Err("unmarshal failed")) +} + +// ----------------------------------------------------------------------------- +// Utility functions + +type fakeLink struct { + otypes.BlockLink + + err error +} + +func (link fakeLink) Serialize(serde.Context) ([]byte, error) { + return []byte("{}"), link.err +} + +type fakeLinkFac struct { + otypes.LinkFactory + + err error +} + +func (fac fakeLinkFac) BlockLinkOf(serde.Context, []byte) (otypes.BlockLink, error) { + return fakeLink{}, fac.err +} diff --git a/core/ordering/cosipbft/fastsync/types/types.go b/core/ordering/cosipbft/fastsync/types/types.go new file mode 100644 index 000000000..22e9d0fa4 --- /dev/null +++ b/core/ordering/cosipbft/fastsync/types/types.go @@ -0,0 +1,118 @@ +package types + +import ( + "go.dedis.ch/dela/core/ordering/cosipbft/types" + "go.dedis.ch/dela/serde" + "go.dedis.ch/dela/serde/registry" + "golang.org/x/xerrors" +) + +var msgFormats = registry.NewSimpleRegistry() + +// RegisterMessageFormat registers the engine for the given format. +func RegisterMessageFormat(f serde.Format, e serde.FormatEngine) { + msgFormats.Register(f, e) +} + +// RequestCatchupMessage is sent by a node which wants to catch up to the latest +// block. +type RequestCatchupMessage struct { + splitMessageSize uint64 + latest uint64 +} + +// NewRequestCatchupMessage creates a RequestCatchupMessage +func NewRequestCatchupMessage(splitMessageSize, latest uint64) RequestCatchupMessage { + return RequestCatchupMessage{splitMessageSize: splitMessageSize, latest: latest} +} + +// GetLatest returns the latest index requested by the sender. +func (m RequestCatchupMessage) GetLatest() uint64 { + return m.latest +} + +// GetSplitMessageSize returns the size at which a message should be split. +func (m RequestCatchupMessage) GetSplitMessageSize() uint64 { + return m.splitMessageSize +} + +// Serialize implements serde.Message. It returns the serialized data for this +// message. +func (m RequestCatchupMessage) Serialize(ctx serde.Context) ([]byte, error) { + format := msgFormats.Get(ctx.GetFormat()) + + data, err := format.Encode(ctx, m) + if err != nil { + return nil, xerrors.Errorf("encoding failed: %v", err) + } + + return data, nil +} + +// CatchupMessage returns all the blocks, not just the links, so that the +// node can re-create the correct global state. +// 'splitMessage' is true if the node knows about more nodes. +type CatchupMessage struct { + splitMessage bool + blockLinks []types.BlockLink +} + +// NewCatchupMessage creates a reply to RequestLatestMessage. +func NewCatchupMessage(splitMessage bool, blockLinks []types.BlockLink) CatchupMessage { + return CatchupMessage{splitMessage: splitMessage, blockLinks: blockLinks} +} + +// GetBlockLinks returns the BlockLinks of the catchup. +func (m CatchupMessage) GetBlockLinks() []types.BlockLink { + return m.blockLinks +} + +// GetSplitMessage returns if the sending node has more blocks. +func (m CatchupMessage) GetSplitMessage() bool { + return m.splitMessage +} + +// Serialize implements serde.Message. It returns the serialized data for this +// message. +func (m CatchupMessage) Serialize(ctx serde.Context) ([]byte, error) { + format := msgFormats.Get(ctx.GetFormat()) + + data, err := format.Encode(ctx, m) + if err != nil { + return nil, xerrors.Errorf("encoding failed: %v", err) + } + + return data, nil +} + +// LinkKey is the key of the block link factory. +type LinkKey struct{} + +// MessageFactory is a message factory for sync messages. +// +// - implements serde.Factory +type MessageFactory struct { + linkFac types.LinkFactory +} + +// NewMessageFactory creates new message factory. +func NewMessageFactory(fac types.LinkFactory) MessageFactory { + return MessageFactory{ + linkFac: fac, + } +} + +// Deserialize implements serde.Factory. It returns the message associated to +// the data if appropriate, otherwise an error. +func (fac MessageFactory) Deserialize(ctx serde.Context, data []byte) (serde.Message, error) { + format := msgFormats.Get(ctx.GetFormat()) + + ctx = serde.WithFactory(ctx, LinkKey{}, fac.linkFac) + + msg, err := format.Decode(ctx, data) + if err != nil { + return nil, xerrors.Errorf("decoding failed: %v", err) + } + + return msg, nil +} diff --git a/core/ordering/cosipbft/fastsync/types/types_test.go b/core/ordering/cosipbft/fastsync/types/types_test.go new file mode 100644 index 000000000..e73afa9a1 --- /dev/null +++ b/core/ordering/cosipbft/fastsync/types/types_test.go @@ -0,0 +1,90 @@ +package types + +import ( + "testing" + + "github.com/stretchr/testify/require" + "go.dedis.ch/dela/core/ordering/cosipbft/types" + "go.dedis.ch/dela/core/validation/simple" + "go.dedis.ch/dela/serde" + "go.dedis.ch/dela/testing/fake" +) + +var testCalls = &fake.Call{} + +func init() { + RegisterMessageFormat(fake.GoodFormat, + fake.Format{Msg: CatchupMessage{}, Call: testCalls}) + RegisterMessageFormat(fake.BadFormat, fake.NewBadFormat()) +} + +func TestRequestCatchupMessage_GetChain(t *testing.T) { + m := NewRequestCatchupMessage(1, 42) + + require.Equal(t, uint64(42), m.GetLatest()) + require.Equal(t, uint64(1), m.GetSplitMessageSize()) +} + +func TestRequestCatchupMessage_Serialize(t *testing.T) { + m := NewRequestCatchupMessage(1, 42) + + data, err := m.Serialize(fake.NewContext()) + require.NoError(t, err) + require.Equal(t, fake.GetFakeFormatValue(), data) + + _, err = m.Serialize(fake.NewBadContext()) + require.EqualError(t, err, fake.Err("encoding failed")) +} + +func TestCatchupMessage_GetBlockLinks(t *testing.T) { + m := NewCatchupMessage(false, makeChain(t, 0, 2)) + + require.Equal(t, 2, len(m.GetBlockLinks())) + require.Equal(t, false, m.GetSplitMessage()) +} + +func TestCatchupMessage_Serialize(t *testing.T) { + m := NewCatchupMessage(false, makeChain(t, 0, 2)) + + data, err := m.Serialize(fake.NewContext()) + require.NoError(t, err) + require.Equal(t, fake.GetFakeFormatValue(), data) + + _, err = m.Serialize(fake.NewBadContext()) + require.EqualError(t, err, fake.Err("encoding failed")) +} + +func TestMessageFactory_Deserialize(t *testing.T) { + testCalls.Clear() + + linkFac := types.NewLinkFactory(nil, nil, nil) + + fac := NewMessageFactory(linkFac) + + msg, err := fac.Deserialize(fake.NewContext(), nil) + require.NoError(t, err) + require.Equal(t, CatchupMessage{}, msg) + + factory := testCalls.Get(0, 0).(serde.Context).GetFactory(LinkKey{}) + require.NotNil(t, factory) + + _, err = fac.Deserialize(fake.NewBadContext(), nil) + require.EqualError(t, err, fake.Err("decoding failed")) +} + +// ----------------------------------------------------------------------------- +// Utility functions + +func makeChain(t *testing.T, start, count uint64) []types.BlockLink { + blocks := make([]types.BlockLink, count) + + for index := uint64(0); index < count; index++ { + block, err := types.NewBlock(simple.NewResult(nil), types.WithIndex(index)) + require.NoError(t, err) + + blocks[index-start], err = types.NewBlockLink(types.Digest{}, block) + require.NoError(t, err) + } + + return blocks +} From 06b1b5c6ea63a45e18f2be5fc7ba5f18ce08f559 Mon Sep 17 00:00:00 2001 From: Linus Gasser Date: Thu, 7 Dec 2023 08:30:52 +0100 Subject: [PATCH 03/15] Adding the fastsync protocol --- core/ordering/cosipbft/fastsync/default.go | 316 ++++++++++++++++++ .../cosipbft/fastsync/default_test.go | 162 +++++++++ core/ordering/cosipbft/fastsync/fastsync.go | 48 +++ serde/json/json.go | 1 + 4 files changed, 527 insertions(+) create mode 100644 core/ordering/cosipbft/fastsync/default.go create mode 100644 core/ordering/cosipbft/fastsync/default_test.go create mode 100644 core/ordering/cosipbft/fastsync/fastsync.go diff --git a/core/ordering/cosipbft/fastsync/default.go b/core/ordering/cosipbft/fastsync/default.go new file mode 100644 index 000000000..fb7dbdce1 --- /dev/null +++ b/core/ordering/cosipbft/fastsync/default.go @@ -0,0 +1,316 @@ +package fastsync + +import ( + "context" + "io" + "sync" + "time" + + "github.com/rs/zerolog" + "go.dedis.ch/dela" + "go.dedis.ch/dela/core/ordering/cosipbft/blockstore" + "go.dedis.ch/dela/core/ordering/cosipbft/blocksync" + "go.dedis.ch/dela/core/ordering/cosipbft/fastsync/types" + "go.dedis.ch/dela/core/ordering/cosipbft/pbft" + otypes "go.dedis.ch/dela/core/ordering/cosipbft/types" + "go.dedis.ch/dela/crypto" + "go.dedis.ch/dela/internal/tracing" + "go.dedis.ch/dela/mino" + "go.dedis.ch/dela/serde/json" + "golang.org/x/xerrors" +) + +var timeoutSync = 20 * time.Second +var protocolName = "fastsync" + +// fastSync is a block synchronizer which quickly catches up to the +// latest block. +// +// - implements fastsync.Synchronizer +type fastSync struct { + logger zerolog.Logger + rpc mino.RPC + pbftsm pbft.StateMachine + blocks blockstore.BlockStore + + Mino mino.Mino + + latest *uint64 + catchUpLock *sync.Mutex + + // This is for debugging + syncMessages *int +} + +// NewSynchronizer creates a new block synchronizer. +func NewSynchronizer(param blocksync.SyncParam) Synchronizer { + latest := param.Blocks.Len() + + logger := dela.Logger.With().Str("addr", param.Mino.GetAddress().String()).Logger() + + h := &handler{ + latest: &latest, + catchUpLock: new(sync.Mutex), + logger: logger, + genesis: param.Genesis, + blocks: param.Blocks, + pbftsm: param.PBFT, + verifierFac: param.VerifierFactory, + } + + fac := types.NewMessageFactory(param.LinkFactory) + + s := fastSync{ + logger: logger, + rpc: mino.MustCreateRPC(param.Mino, "fastsync", h, fac), + pbftsm: param.PBFT, + blocks: param.Blocks, + latest: &latest, + catchUpLock: h.catchUpLock, + Mino: param.Mino, + } + + return s +} + +// Sync implements fastsync.Synchronizer. +// It asks the other nodes what their latest block is, and then chooses some +// nodes randomly to request catching up the missing blocks. +func (s fastSync) Sync(ctx context.Context, players mino.Players, config Config) error { + if players.Len() == 0 { + return xerrors.Errorf("need at least 1 node to contact") + } + ctx = context.WithValue(ctx, tracing.ProtocolKey, protocolName) + ctx, cancel := context.WithDeadline(ctx, time.Now().Add(timeoutSync)) + defer cancel() + + // Make sure that the address of this node is at the beginning of the list. + addresses := []mino.Address{s.Mino.GetAddress()} + for iter := players.AddressIterator(); iter.HasNext(); { + addr := iter.GetNext() + if !s.Mino.GetAddress().Equal(addr) { + addresses = append(addresses, addr) + } + } + players = mino.NewAddresses(addresses...) + sender, rcvr, err := s.rpc.Stream(ctx, players) + if err != nil { + return xerrors.Errorf("stream failed: %v", err) + } + + // Send a catchup-request to f+1 nodes with our latest known block, + // but not to this node. + // This should be enough, because the protocol supposes there are only + // f byzantine nodes, so this should contact at least one healthy node. + f := (players.Len() - 1) / 3 + nodes := players.Take(mino.RangeFilter(1, players.Len()), + mino.RandomFilter(f+1)) + + // Send the request as many times as needed, because with a + // SplitMessageSize < size(all missing blocks), multiple requests are + // needed. + blockCount := s.blocks.Len() + for { + more, err := s.requestSync(ctx, sender, rcvr, nodes, config.SplitMessageSize) + if err != nil { + return xerrors.Errorf("error while requesting sync updates: %v", err) + } + if s.syncMessages != nil { + *s.syncMessages += 1 + } + + if !more { + break + } else if blockCount == s.blocks.Len() { + s.logger.Warn().Msgf("one of the nodes returned it has more blocks, "+ + "but didn't deliver: %v", nodes) + break + } + + blockCount = s.blocks.Len() + } + + return nil +} + +// requestSync asks all 'nodes' to send their block updates. +// The return is a boolean indicating whether at least one node indicated +// there are more blocks. +// This might be wrong, as there is no check whether the sending node is +// byzantine or not. +func (s fastSync) requestSync( + ctx context.Context, + sender mino.Sender, + rcvr mino.Receiver, + nodes mino.Players, + sms uint64, +) (bool, error) { + // Send the messages to all nodes + s.logger.Debug().Msgf("Sending catchup req to %+v", nodes) + errs := sender.Send(types.NewRequestCatchupMessage(sms, s.blocks.Len()), + iter2arr(nodes.AddressIterator())...) + errCount := 0 + for err := range errs { + errCount += 1 + if err != nil { + s.logger.Warn().Err(err).Msgf("announcement failed to one node") + } + } + if errCount == nodes.Len() { + return false, xerrors.Errorf("contacted %d nodes, but all failed: %v", len(errs), errs) + } + + // Wait for all replies, supposing that there are no more than f nodes + // not replying or replying with wrong blocks. + + replies := make(map[string]struct{}) + moreBlocks := false + for len(replies) < nodes.Len() { + s.logger.Debug().Msgf("Waiting for replies: %d < %d", len(replies), nodes.Len()) + from, msg, err := rcvr.Recv(ctx) + if err == context.Canceled || err == context.DeadlineExceeded || err == io.EOF { + return moreBlocks, nil + } + if err != nil { + s.logger.Debug().Err(err).Msg("sync finished with error") + return false, nil + } + + _, received := replies[from.String()] + if received { + s.logger.Warn().Msgf("received two fastsync messages from %s", from) + continue + } + + catchup, ok := msg.(types.CatchupMessage) + if ok { + s.logger.Trace().Msgf("Got %d blocks from %v", + len(catchup.GetBlockLinks()), from) + + replies[from.String()] = struct{}{} + moreBlocks = moreBlocks || catchup.GetSplitMessage() + + for _, bl := range catchup.GetBlockLinks() { + if bl.GetBlock().GetIndex() >= s.blocks.Len() { + err := s.pbftsm.CatchUp(bl) + if err != nil { + s.logger.Warn().Err(err).Msg("while using block to catchup") + } + } + } + } + } + + return moreBlocks, nil +} + +// handler is a Mino handler for the synchronization messages. +// +// - implements mino.Handler +type handler struct { + mino.UnsupportedHandler + + latest *uint64 + catchUpLock *sync.Mutex + + logger zerolog.Logger + blocks blockstore.BlockStore + genesis blockstore.GenesisStore + pbftsm pbft.StateMachine + verifierFac crypto.VerifierFactory +} + +// Stream implements mino.Handler. It waits for a request message and then +// replies with eventually missing BlockLinks of the requester. +func (h *handler) Stream(out mino.Sender, in mino.Receiver) error { + h.logger.Debug().Msg("Starting stream") + ctx := context.Background() + ctx, cancel := context.WithDeadline(ctx, time.Now().Add(timeoutSync)) + defer cancel() + + for sentAllBlocks := false; !sentAllBlocks; { + m, orch, err := h.waitRequest(ctx, in) + if err != nil { + return xerrors.Errorf("no request: %v", err) + } + + blReply, err := h.getBlocks(m) + if err != nil { + return xerrors.Errorf("creating blocks to send failed: %v", err) + } + + sentAllBlocks = m.GetLatest()+uint64(len(blReply)) == h.blocks.Len() + err = <-out.Send(types.NewCatchupMessage( + !sentAllBlocks, blReply), orch) + if err != nil { + return xerrors.Errorf("sending request failed: %v", err) + } + } + + h.logger.Debug().Msg("done sending catchup blocks") + + return nil +} + +// getBlocks creates a reply that will only overflow the given message-size +// budget by at most one block. +func (h *handler) getBlocks(m *types.RequestCatchupMessage) ([]otypes.BlockLink, error) { + var blReply []otypes.BlockLink + + msgSize := uint64(0) + if h.blocks.Len() > m.GetLatest() { + for index := m.GetLatest(); index < h.blocks.Len(); index++ { + bl, err := h.blocks.GetByIndex(index) + if err != nil { + return blReply, xerrors.Errorf("failed to get block with index %d", index) + } + blReply = append(blReply, bl) + b, err := bl.Serialize(json.NewContext()) + if err != nil { + return blReply, xerrors.Errorf("failed to serialize block %d: %v", index, err) + } + msgSize += uint64(len(b)) + + if m.GetSplitMessageSize() > 0 && msgSize >= m.GetSplitMessageSize() { + h.logger.Debug().Msgf("splitting message because size %d >= %d", + msgSize, m.GetSplitMessageSize()) + break + } + } + h.logger.Debug().Msgf("Sending blocks %d..%d", m.GetLatest(), + m.GetLatest()+uint64(len(blReply))-1) + } else { + h.logger.Debug().Msgf("No new blocks to send") + } + + return blReply, nil +} + +func (h *handler) waitRequest( + ctx context.Context, + in mino.Receiver, +) (*types.RequestCatchupMessage, mino.Address, error) { + + for { + orch, msg, err := in.Recv(ctx) + if err != nil { + return nil, nil, xerrors.Errorf("receiver failed: %v", err) + } + + // The SyncMessage contains the chain to the latest block known by the + // leader which allows to verify if it is not lying. + m, ok := msg.(types.RequestCatchupMessage) + if ok { + return &m, orch, nil + } + } +} + +func iter2arr(iter mino.AddressIterator) []mino.Address { + var addrs []mino.Address + for iter.HasNext() { + addrs = append(addrs, iter.GetNext()) + } + + return addrs +} diff --git a/core/ordering/cosipbft/fastsync/default_test.go b/core/ordering/cosipbft/fastsync/default_test.go new file mode 100644 index 000000000..3848aa20f --- /dev/null +++ b/core/ordering/cosipbft/fastsync/default_test.go @@ -0,0 +1,162 @@ +package fastsync + +import ( + "context" + "fmt" + "strconv" + "testing" + + "github.com/stretchr/testify/require" + "go.dedis.ch/dela/core/ordering/cosipbft/authority" + "go.dedis.ch/dela/core/ordering/cosipbft/blockstore" + "go.dedis.ch/dela/core/ordering/cosipbft/blocksync" + "go.dedis.ch/dela/core/ordering/cosipbft/pbft" + otypes "go.dedis.ch/dela/core/ordering/cosipbft/types" + "go.dedis.ch/dela/core/txn/signed" + "go.dedis.ch/dela/core/validation/simple" + "go.dedis.ch/dela/mino" + "go.dedis.ch/dela/mino/minoch" + "go.dedis.ch/dela/testing/fake" +) + +func TestDefaultSync_Basic(t *testing.T) { + n := 20 + f := (n - 1) / 3 + num := 10 + + syncs, genesis, roster := makeNodes(t, n) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + err := syncs[0].Sync(ctx, roster, Config{SplitMessageSize: 1}) + require.NoError(t, err) + + storeBlocks(t, syncs[0].blocks, num, genesis.GetHash().Bytes()...) + + // Test only a subset of the roster to prepare for the next test. + for node := 1; node < n; node++ { + // Send the sync call to the leader + contact := roster.Take(mino.IndexFilter(0)) + if node >= 2*f+1 { + // Now that there are 2f+1 nodes with the block, sync with + // the whole roster. + contact = roster + } + err = syncs[node].Sync(ctx, contact, Config{}) + require.NoError(t, err) + } + + for i := 0; i < n; i++ { + require.Equal(t, uint64(num), syncs[i].blocks.Len(), strconv.Itoa(i)) + } +} + +func TestDefaultSync_SplitMessage(t *testing.T) { + num := 10 + + tests := []struct { + sms uint64 + msgs int + }{ + {0, 1}, + {1, 10}, + {255, 10}, + {256, 5}, + {1024, 2}, + {2550, 1}, + } + for _, test := range tests { + syncs, genesis, roster := makeNodes(t, 2) + + ctx, cancel := context.WithCancel(context.Background()) + + storeBlocks(t, syncs[0].blocks, num, genesis.GetHash().Bytes()...) + + syncsReceived := 0 + syncs[1].syncMessages = &syncsReceived + err := syncs[1].Sync(ctx, roster, Config{SplitMessageSize: test.sms}) + require.NoError(t, err) + require.Equal(t, test.msgs, syncsReceived) + require.Equal(t, uint64(num), syncs[1].blocks.Len()) + cancel() + } +} + +// ----------------------------------------------------------------------------- +// Utility functions + +func makeNodes(t *testing.T, n int) ([]fastSync, otypes.Genesis, mino.Players) { + manager := minoch.NewManager() + + syncs := make([]fastSync, n) + addrs := make([]mino.Address, n) + + ro := authority.FromAuthority(fake.NewAuthority(3, fake.NewSigner)) + + genesis, err := otypes.NewGenesis(ro) + require.NoError(t, err) + + for i := 0; i < n; i++ { + m := minoch.MustCreate(manager, fmt.Sprintf("node%d", i)) + + addrs[i] = m.GetAddress() + + genstore := blockstore.NewGenesisStore() + require.NoError(t, genstore.Set(genesis)) + + blocks := blockstore.NewInMemory() + blockFac := otypes.NewBlockFactory(simple.NewResultFactory(signed.NewTransactionFactory())) + csFac := authority.NewChangeSetFactory(m.GetAddressFactory(), fake.PublicKeyFactory{}) + linkFac := otypes.NewLinkFactory(blockFac, fake.SignatureFactory{}, csFac) + + param := blocksync.SyncParam{ + Mino: m, + Blocks: blocks, + Genesis: genstore, + LinkFactory: linkFac, + ChainFactory: otypes.NewChainFactory(linkFac), + PBFT: testSM{blocks: blocks}, + VerifierFactory: fake.VerifierFactory{}, + } + + syncs[i] = NewSynchronizer(param).(fastSync) + } + + return syncs, genesis, mino.NewAddresses(addrs...) +} + +// Create n new blocks and store them while creating appropriate links. +func storeBlocks(t *testing.T, blocks blockstore.BlockStore, n int, from ...byte) { + prev := otypes.Digest{} + copy(prev[:], from) + + for i := 0; i < n; i++ { + block, err := otypes.NewBlock(simple.NewResult(nil), otypes.WithIndex(uint64(i))) + require.NoError(t, err) + + link, err := otypes.NewBlockLink(prev, block, + otypes.WithSignatures(fake.Signature{}, fake.Signature{})) + require.NoError(t, err) + + err = blocks.Store(link) + require.NoError(t, err) + + prev = block.GetHash() + } +} + +type testSM struct { + pbft.StateMachine + + blocks blockstore.BlockStore +} + +func (sm testSM) CatchUp(link otypes.BlockLink) error { + err := sm.blocks.Store(link) + if err != nil { + return err + } + + return nil +} diff --git a/core/ordering/cosipbft/fastsync/fastsync.go b/core/ordering/cosipbft/fastsync/fastsync.go new file mode 100644 index 000000000..f46ebc4aa --- /dev/null +++ b/core/ordering/cosipbft/fastsync/fastsync.go @@ -0,0 +1,48 @@ +// Package fastsync defines a block synchronizer for the ordering service. +// +// The block synchronizer is to be called in two situations: +// - if a node is starting up, to make sure it's up-to-date with other nodes +// - if a node receives a request for a block it doesn't hold the parent of +// +// To make it really simple, the node sends a catchup request parallel to +// f+1 random nodes. +// As long as there are enough honest nodes, this will allow the block to +// catch up to the latest block. +// One optimization would be to send the requests serially, waiting for the +// reply before going on. +// But this would involve timeouts and would take much longer. +// So we suppose the node is not that much behind and thus will not waste too +// much bandwidth. +// +// Possible improvements: +// - make the protocol more efficient in the presence of byzantine nodes: +// The node broadcasts a request indicating which is the last block in storage. +// It receives offers from different nodes, and contacts the n nodes with the +// most recent block, where n must be bigger than the maximum number of +// byzantine nodes. +// +// Documentation Last Review: 22.11.2023 +package fastsync + +import ( + "context" + + "go.dedis.ch/dela/mino" +) + +// Config of the current run of the fastsync. +// For future expansion and to make it similar to blocksync, +// this is held as a struct. +type Config struct { + // The size at which the message will be split. + // If the encoding of all blocks is bigger than this value, the + // message is sent as-is. + SplitMessageSize uint64 +} + +// Synchronizer is an interface to synchronize a node with the participants. +type Synchronizer interface { + // Sync sends a synchronization request message to f+1 random participants, + // which will return BlockLinks to the latest block. + Sync(ctx context.Context, players mino.Players, config Config) error +} diff --git a/serde/json/json.go b/serde/json/json.go index 880085e0e..470758f9e 100644 --- a/serde/json/json.go +++ b/serde/json/json.go @@ -11,6 +11,7 @@ import ( _ "go.dedis.ch/dela/core/access/darc/json" _ "go.dedis.ch/dela/core/ordering/cosipbft/authority/json" _ "go.dedis.ch/dela/core/ordering/cosipbft/blocksync/json" + _ "go.dedis.ch/dela/core/ordering/cosipbft/fastsync/json" _ "go.dedis.ch/dela/core/ordering/cosipbft/json" _ "go.dedis.ch/dela/core/txn/signed/json" _ "go.dedis.ch/dela/core/validation/simple/json" From de866cb698a8de53a658822138138eea4543dd21 Mon Sep 17 00:00:00 2001 From: Linus Gasser Date: Thu, 7 Dec 2023 09:30:06 +0100 Subject: [PATCH 04/15] Making fastsync the default protocol Adds the fastsync as default protocol to cosipbft. Adds a 'WithBlockSync' option for the old, slow protocol. Updates all the tests. --- cli/node/memcoin/mod_test.go | 48 ++++++++++--- core/ordering/cosipbft/cosipbft.go | 65 ++++++++++++----- core/ordering/cosipbft/cosipbft_test.go | 21 ++++-- core/ordering/cosipbft/proc.go | 96 +++++++++++++++++++++---- core/ordering/cosipbft/proc_test.go | 2 +- mino/minoch/mod.go | 3 +- mino/minoch/rpc.go | 2 +- 7 files changed, 187 insertions(+), 50 deletions(-) diff --git a/cli/node/memcoin/mod_test.go b/cli/node/memcoin/mod_test.go index df7c92638..56a341347 100644 --- a/cli/node/memcoin/mod_test.go +++ b/cli/node/memcoin/mod_test.go @@ -2,6 +2,7 @@ package main import ( "bytes" + "encoding/base64" "fmt" "io" "net" @@ -14,6 +15,7 @@ import ( "time" "github.com/stretchr/testify/require" + "go.dedis.ch/kyber/v3/pairing/bn256" ) // This test creates a chain with initially 3 nodes. It then adds node 4 and 5 @@ -74,28 +76,49 @@ func TestMemcoin_Scenario_SetupAndTransactions(t *testing.T) { args = append([]string{ os.Args[0], "--config", node1, "ordering", "roster", "add", - "--wait", "60s"}, + "--wait", "60s", + }, getExport(t, node4)..., ) err = run(args) require.NoError(t, err) + // Add the certificate and push two new blocks to make sure node4 is + // fully participating + shareCert(t, node4, node1, "//127.0.0.1:2111") + publicKey, err := bn256.NewSuiteG2().Point().MarshalBinary() + require.NoError(t, err) + publicKeyHex := base64.StdEncoding.EncodeToString(publicKey) + argsAccess := []string{ + os.Args[0], + "--config", node1, "access", "add", + "--identity", publicKeyHex, + } + for i := 0; i < 2; i++ { + err = runWithCfg(argsAccess, config{}) + require.NoError(t, err) + } + // Add node 5 which should be participating. + // This makes sure that node 4 is actually participating and caught up. + // If node 4 is not participating, there would be too many faulty nodes + // after adding node 5. args = append([]string{ os.Args[0], "--config", node1, "ordering", "roster", "add", - "--wait", "60s"}, + "--wait", "60s", + }, getExport(t, node5)..., ) err = run(args) require.NoError(t, err) - // Run a few transactions. - for i := 0; i < 5; i++ { - err = runWithCfg(args, config{}) - require.EqualError(t, err, "command error: transaction refused: duplicate in roster: grpcs://127.0.0.1:2115") + // Run 2 new transactions + for i := 0; i < 2; i++ { + err = runWithCfg(argsAccess, config{}) + require.NoError(t, err) } // Test a timeout waiting for a transaction. @@ -146,12 +169,14 @@ func TestMemcoin_Scenario_RestartNode(t *testing.T) { args := append([]string{ os.Args[0], "--config", node1, "ordering", "roster", "add", - "--wait", "60s"}, + "--wait", "60s", + }, getExport(t, node1)..., ) err = run(args) - require.EqualError(t, err, "command error: transaction refused: duplicate in roster: grpcs://127.0.0.1:2210") + require.EqualError(t, err, + "command error: transaction refused: duplicate in roster: grpcs://127.0.0.1:2210") } // ----------------------------------------------------------------------------- @@ -230,7 +255,12 @@ func waitDaemon(t *testing.T, daemons []string) bool { func makeNodeArg(path string, port uint16) []string { return []string{ - os.Args[0], "--config", path, "start", "--listen", "tcp://127.0.0.1:" + strconv.Itoa(int(port)), + os.Args[0], + "--config", + path, + "start", + "--listen", + "tcp://127.0.0.1:" + strconv.Itoa(int(port)), } } diff --git a/core/ordering/cosipbft/cosipbft.go b/core/ordering/cosipbft/cosipbft.go index f11eeba43..583af3307 100644 --- a/core/ordering/cosipbft/cosipbft.go +++ b/core/ordering/cosipbft/cosipbft.go @@ -6,7 +6,7 @@ // protocol and the followers wait for incoming messages to update their own // state machines and reply with signatures when the leader candidate is valid. // If the leader fails to send a candidate, or finalize it, the followers will -// timeout after some time and move to a view change state. +// time out after some time and move to a view change state. // // The view change procedure is always waiting on the leader+1 confirmation // before moving to leader+2, leader+3, etc. It means that if not enough nodes @@ -43,6 +43,7 @@ import ( "go.dedis.ch/dela/core/ordering/cosipbft/blockstore" "go.dedis.ch/dela/core/ordering/cosipbft/blocksync" "go.dedis.ch/dela/core/ordering/cosipbft/contracts/viewchange" + "go.dedis.ch/dela/core/ordering/cosipbft/fastsync" "go.dedis.ch/dela/core/ordering/cosipbft/pbft" "go.dedis.ch/dela/core/ordering/cosipbft/types" "go.dedis.ch/dela/core/store" @@ -80,6 +81,9 @@ const ( // RoundMaxWait is the maximum amount for the backoff. RoundMaxWait = 5 * time.Minute + // DefaultFastSyncMessageSize defines when a fast sync message will be split. + DefaultFastSyncMessageSize = 1e6 + rpcName = "cosipbft" ) @@ -115,9 +119,10 @@ type Service struct { } type serviceTemplate struct { - hashFac crypto.HashFactory - blocks blockstore.BlockStore - genesis blockstore.GenesisStore + hashFac crypto.HashFactory + blocks blockstore.BlockStore + genesis blockstore.GenesisStore + syncMethod syncMethodType } // ServiceOption is the type of option to set some fields of the service. @@ -144,8 +149,15 @@ func WithHashFactory(fac crypto.HashFactory) ServiceOption { } } +// WithBlockSync enables the old, slow syncing algorithm in the cosipbft module. +func WithBlockSync() ServiceOption { + return func(tmpl *serviceTemplate) { + tmpl.syncMethod = syncMethodBlock + } +} + // ServiceParam is the different components to provide to the service. All the -// fields are mandatory and it will panic if any is nil. +// fields are mandatory, and it will panic if any is nil. type ServiceParam struct { Mino mino.Mino Cosi cosi.CollectiveSigning @@ -220,10 +232,11 @@ func NewServiceStruct(param ServiceParam, opts ...ServiceOption) (*Service, erro ChainFactory: chainFac, VerifierFactory: param.Cosi.GetVerifierFactory(), } - - bs := blocksync.NewSynchronizer(syncparam) - - proc.sync = bs + if tmpl.syncMethod == syncMethodBlock { + proc.bsync = blocksync.NewSynchronizer(syncparam) + } else { + proc.fsync = fastsync.NewSynchronizer(syncparam) + } fac := types.NewMessageFactory( types.NewGenesisFactory(proc.rosterFac), @@ -275,6 +288,20 @@ func NewServiceStart(s *Service) { go s.watchBlocks() if s.genesis.Exists() { + if s.syncMethod() == syncMethodFast { + ctx, done := context.WithCancel(context.Background()) + roster, err := s.readRoster(s.tree.Get()) + if err != nil { + panic("couldn't get roster of latest block: " + err.Error()) + } + err = s.fsync.Sync(ctx, roster, + fastsync.Config{SplitMessageSize: DefaultFastSyncMessageSize}) + if err != nil { + s.logger.Warn().Msgf("while syncing with other nodes: %+v", err) + } + done() + } + // If the genesis already exists, the service can start right away to // participate in the chain. close(s.started) @@ -541,17 +568,21 @@ func (s *Service) doLeaderRound( s.logger.Debug().Uint64("index", s.blocks.Len()).Msg("round has started") - // Send a synchronization to the roster so that they can learn about the - // latest block of the chain. - err := s.sync.Sync(ctx, roster, - blocksync.Config{MinHard: threshold.ByzantineThreshold(roster.Len())}) - if err != nil { - return xerrors.Errorf("sync failed: %v", err) + // When using blocksync, the updates are sent before every new block, which + // uses a lot of bandwidth if there are more than just a few blocks. + if s.syncMethod() == syncMethodBlock { + // Send a synchronization to the roster so that they can learn about the + // latest block of the chain. + err := s.bsync.Sync(ctx, roster, + blocksync.Config{MinHard: threshold.ByzantineThreshold(roster.Len())}) + if err != nil { + return xerrors.Errorf("sync failed: %v", err) + } } s.logger.Debug().Uint64("index", s.blocks.Len()).Msg("pbft has started") - err = s.doPBFT(ctx) + err := s.doPBFT(ctx) if err != nil { return xerrors.Errorf("pbft failed: %v", err) } @@ -677,7 +708,7 @@ func (s *Service) doPBFT(ctx context.Context) error { block, err = types.NewBlock( data, types.WithTreeRoot(root), - types.WithIndex(uint64(s.blocks.Len())), + types.WithIndex(s.blocks.Len()), types.WithHashFactory(s.hashFactory)) if err != nil { diff --git a/core/ordering/cosipbft/cosipbft_test.go b/core/ordering/cosipbft/cosipbft_test.go index 616983f00..3efaeb12c 100644 --- a/core/ordering/cosipbft/cosipbft_test.go +++ b/core/ordering/cosipbft/cosipbft_test.go @@ -43,14 +43,25 @@ import ( "go.dedis.ch/dela/testing/fake" ) +func TestService_Scenario_Basic_Blocksync(t *testing.T) { + testserviceScenarioBasic(t, syncMethodBlock) +} +func TestService_Scenario_Basic_Fastsync(t *testing.T) { + testserviceScenarioBasic(t, syncMethodFast) +} + // This test is known to be VERY flaky on Windows. // Further investigation is needed. -func TestService_Scenario_Basic(t *testing.T) { +func testserviceScenarioBasic(t *testing.T, sm syncMethodType) { if testing.Short() { t.Skip("Skipping flaky test") } - nodes, ro, clean := makeAuthority(t, 5) + var opts []ServiceOption + if sm == syncMethodBlock { + opts = append(opts, WithBlockSync()) + } + nodes, ro, clean := makeAuthority(t, 5, opts...) defer clean() signer := nodes[0].signer @@ -450,7 +461,7 @@ func TestService_DoRound(t *testing.T) { closing: make(chan struct{}), } srvc.blocks = blockstore.NewInMemory() - srvc.sync = fakeSync{} + srvc.bsync = fakeSync{} srvc.pool = mem.NewPool() srvc.tree = blockstore.NewTreeCache(fakeTree{}) srvc.rosterFac = authority.NewFactory(fake.AddressFactory{}, fake.PublicKeyFactory{}) @@ -618,7 +629,7 @@ func TestService_FailSync_DoRound(t *testing.T) { srvc.tree = blockstore.NewTreeCache(fakeTree{}) srvc.rosterFac = authority.NewFactory(fake.AddressFactory{}, fake.PublicKeyFactory{}) srvc.pbftsm = fakeSM{} - srvc.sync = fakeSync{err: fake.GetError()} + srvc.bsync = fakeSync{err: fake.GetError()} ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -641,7 +652,7 @@ func TestService_FailPBFT_DoRound(t *testing.T) { srvc.tree = blockstore.NewTreeCache(fakeTree{}) srvc.rosterFac = authority.NewFactory(fake.AddressFactory{}, fake.PublicKeyFactory{}) srvc.pbftsm = fakeSM{} - srvc.sync = fakeSync{} + srvc.bsync = fakeSync{} require.NoError(t, srvc.pool.Add(makeTx(t, 0, fake.NewSigner()))) diff --git a/core/ordering/cosipbft/proc.go b/core/ordering/cosipbft/proc.go index a703c8c2f..3a294586c 100644 --- a/core/ordering/cosipbft/proc.go +++ b/core/ordering/cosipbft/proc.go @@ -16,6 +16,7 @@ import ( "go.dedis.ch/dela/core/ordering/cosipbft/blockstore" "go.dedis.ch/dela/core/ordering/cosipbft/blocksync" "go.dedis.ch/dela/core/ordering/cosipbft/contracts/viewchange" + "go.dedis.ch/dela/core/ordering/cosipbft/fastsync" "go.dedis.ch/dela/core/ordering/cosipbft/pbft" "go.dedis.ch/dela/core/ordering/cosipbft/types" "go.dedis.ch/dela/core/store" @@ -28,6 +29,13 @@ import ( "golang.org/x/xerrors" ) +type syncMethodType bool + +const ( + syncMethodFast syncMethodType = false + syncMethodBlock syncMethodType = true +) + // Processor processes the messages to run a collective signing PBFT consensus. // // - implements cosi.Reactor @@ -38,7 +46,8 @@ type processor struct { logger zerolog.Logger pbftsm pbft.StateMachine - sync blocksync.Synchronizer + bsync blocksync.Synchronizer + fsync fastsync.Synchronizer tree blockstore.TreeCache pool pool.Pool watcher core.Observable @@ -49,20 +58,32 @@ type processor struct { context serde.Context genesis blockstore.GenesisStore blocks blockstore.BlockStore + // catchup sends catchup requests to the players to get new blocks + catchup chan mino.Players started chan struct{} } func newProcessor() *processor { - return &processor{ + proc := &processor{ watcher: core.NewWatcher(), context: json.NewContext(), started: make(chan struct{}), + catchup: make(chan mino.Players), + } + go proc.catchupHandler() + return proc +} + +func (h *processor) syncMethod() syncMethodType { + if h.bsync != nil { + return syncMethodBlock } + return syncMethodFast } // Invoke implements cosi.Reactor. It processes the messages from the collective -// signature module. The messages are either from the the prepare or the commit +// signature module. The messages are either from the prepare or the commit // phase. func (h *processor) Invoke(from mino.Address, msg serde.Message) ([]byte, error) { switch in := msg.(type) { @@ -70,16 +91,39 @@ func (h *processor) Invoke(from mino.Address, msg serde.Message) ([]byte, error) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - blocks := h.blocks.Watch(ctx) - - // In case the node is falling behind the chain, it gives it a chance to - // catch up before moving forward. - latest := h.sync.GetLatest() + if h.syncMethod() == syncMethodFast { + // Check if a catchup is needed for fastsync + var roster mino.Players + if h.blocks.Len() == 0 && in.GetBlock().GetIndex() > 0 { + h.logger.Info().Msgf("node joined an existing blockchain from %+v", from) + roster = mino.NewAddresses(from) + } else if in.GetBlock().GetIndex() > h.blocks.Len() { + h.logger.Warn().Msgf("node got asked to sign block-index %d, "+ + "but has only %d blocks", in.GetBlock().GetIndex(), + h.blocks.Len()) + var err error + roster, err = h.getCurrentRoster() + if err != nil { + return nil, xerrors.Errorf("failed to get roster: %v", err) + } + } - if latest > h.blocks.Len() { - for link := range blocks { - if link.GetBlock().GetIndex() >= latest { - cancel() + if roster != nil { + h.catchup <- roster + return nil, xerrors.Errorf("needed to catch up") + } + } else { + blocks := h.blocks.Watch(ctx) + + // In case the node is falling behind the chain, it gives it a chance to + // catch up before moving forward. + latest := h.bsync.GetLatest() + + if latest > h.blocks.Len() { + for link := range blocks { + if link.GetBlock().GetIndex() >= latest { + cancel() + } } } } @@ -144,9 +188,14 @@ func (h *processor) Process(req mino.Request) (serde.Message, error) { return nil, h.storeGenesis(msg.GetGenesis().GetRoster(), &root) case types.DoneMessage: - err := h.pbftsm.Finalize(msg.GetID(), msg.GetSignature()) - if err != nil { - return nil, xerrors.Errorf("pbftsm finalized failed: %v", err) + if h.pbftsm.GetState() == pbft.InitialState { + h.logger.Warn().Msgf("Got block without commit from %v - catching up", req.Address) + h.catchup <- mino.NewAddresses(req.Address) + } else { + err := h.pbftsm.Finalize(msg.GetID(), msg.GetSignature()) + if err != nil { + return nil, xerrors.Errorf("pbftsm finalized failed: %v", err) + } } case types.ViewMessage: param := pbft.ViewParam{ @@ -250,3 +299,20 @@ func (h *processor) makeAccess(store store.Snapshot, roster authority.Authority) return nil } + +// catchupHandler listens to incoming requests for potentially missing blocks. +// It is started as a go-routine +func (h *processor) catchupHandler() { + for players := range h.catchup { + if h.syncMethod() == syncMethodFast { + ctx, cancel := context.WithCancel(context.Background()) + err := h.fsync.Sync(ctx, players, + fastsync.Config{SplitMessageSize: DefaultFastSyncMessageSize}) + if err != nil { + h.logger.Err(err) + } + cancel() + } + } + panic("catchup channel got closed - this should not happen") +} diff --git a/core/ordering/cosipbft/proc_test.go b/core/ordering/cosipbft/proc_test.go index 4f2728d4c..16ace5e93 100644 --- a/core/ordering/cosipbft/proc_test.go +++ b/core/ordering/cosipbft/proc_test.go @@ -24,7 +24,7 @@ func TestProcessor_BlockMessage_Invoke(t *testing.T) { proc := newProcessor() proc.rosterFac = authority.NewFactory(fake.AddressFactory{}, fake.PublicKeyFactory{}) - proc.sync = fakeSync{latest: 1} + proc.bsync = fakeSync{latest: 1} proc.blocks = fakeStore{} proc.pbftsm = fakeSM{ state: pbft.InitialState, diff --git a/mino/minoch/mod.go b/mino/minoch/mod.go index 65d152f27..b8af3b8b8 100644 --- a/mino/minoch/mod.go +++ b/mino/minoch/mod.go @@ -10,7 +10,6 @@ // instance should drop the message. // // Documentation Last Review: 06.10.2020 -// package minoch import ( @@ -85,7 +84,7 @@ func (m *Minoch) GetAddress() mino.Address { return address{id: m.identifier} } -// AddFilter adds the filter to all of the RPCs. This must be called before +// AddFilter adds the filter to all the RPCs. This must be called before // receiving requests. func (m *Minoch) AddFilter(filter Filter) { m.filters = append(m.filters, filter) diff --git a/mino/minoch/rpc.go b/mino/minoch/rpc.go index f2b4c67e4..1133b5714 100644 --- a/mino/minoch/rpc.go +++ b/mino/minoch/rpc.go @@ -211,7 +211,7 @@ func (c RPC) Stream(ctx context.Context, memship mino.Players) (mino.Sender, min case env := <-in: for _, to := range env.to { output := orchRecv.out - if !to.(address).orchestrator { + if !to.(address).orchestrator || !to.Equal(orchAddr) { output = outs[to.String()].out } From 252cde85aa903232360cfa8788b1b6b7a0c68093 Mon Sep 17 00:00:00 2001 From: Linus Gasser Date: Thu, 25 Jan 2024 09:25:58 +0100 Subject: [PATCH 05/15] Using main for github workflow --- .github/workflows/go_lint.yml | 2 +- .github/workflows/go_test.yml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/go_lint.yml b/.github/workflows/go_lint.yml index 4da140707..c2f1b85c1 100644 --- a/.github/workflows/go_lint.yml +++ b/.github/workflows/go_lint.yml @@ -26,4 +26,4 @@ jobs: run: make lint - name: Vet - run: make vet \ No newline at end of file + run: make vet diff --git a/.github/workflows/go_test.yml b/.github/workflows/go_test.yml index 9d8527fee..b7e1d8257 100644 --- a/.github/workflows/go_test.yml +++ b/.github/workflows/go_test.yml @@ -2,7 +2,7 @@ name: Go test on: push: - branches: [ master ] + branches: [ main ] pull_request: types: - opened From d96665a2e633b31b40afbd6fc624cc154ece83f7 Mon Sep 17 00:00:00 2001 From: Linus Gasser Date: Mon, 22 Jan 2024 16:49:57 +0100 Subject: [PATCH 06/15] Updating the timeouts to make pass 1000 votes - increased the default timeouts (tests set their own timeout) - increased maximum message size over grpc --- core/ordering/cosipbft/cosipbft.go | 2 +- core/ordering/cosipbft/fastsync/default.go | 2 +- mino/minogrpc/mod.go | 1 + mino/minogrpc/rpc.go | 21 +++++++++++++++------ mino/minogrpc/server.go | 6 ++++-- mino/minogrpc/session/mod.go | 11 +++++++++-- mod.go | 2 ++ 7 files changed, 33 insertions(+), 12 deletions(-) diff --git a/core/ordering/cosipbft/cosipbft.go b/core/ordering/cosipbft/cosipbft.go index 583af3307..555e78b5d 100644 --- a/core/ordering/cosipbft/cosipbft.go +++ b/core/ordering/cosipbft/cosipbft.go @@ -62,7 +62,7 @@ import ( const ( // DefaultRoundTimeout is the maximum round time the service waits // for an event to happen. - DefaultRoundTimeout = 10 * time.Second + DefaultRoundTimeout = 200 * time.Second // DefaultFailedRoundTimeout is the maximum round time the service waits // for an event to happen, after a round has failed, thus letting time diff --git a/core/ordering/cosipbft/fastsync/default.go b/core/ordering/cosipbft/fastsync/default.go index fb7dbdce1..7857c5b76 100644 --- a/core/ordering/cosipbft/fastsync/default.go +++ b/core/ordering/cosipbft/fastsync/default.go @@ -184,7 +184,7 @@ func (s fastSync) requestSync( catchup, ok := msg.(types.CatchupMessage) if ok { - s.logger.Trace().Msgf("Got %d blocks from %v", + s.logger.Info().Msgf("Got %d fastsync blocks from %v", len(catchup.GetBlockLinks()), from) replies[from.String()] = struct{}{} diff --git a/mino/minogrpc/mod.go b/mino/minogrpc/mod.go index f562688bf..3d58ae482 100644 --- a/mino/minogrpc/mod.go +++ b/mino/minogrpc/mod.go @@ -281,6 +281,7 @@ func NewMinogrpc(listen net.Addr, public *url.URL, router router.Router, opts .. otgrpc.SpanDecorator(decorateServerTrace))), grpc.StreamInterceptor(otgrpc.OpenTracingStreamServerInterceptor(tracer, otgrpc.SpanDecorator(decorateServerTrace))), + grpc.MaxRecvMsgSize(session.MaxMessageSize), } if !tmpl.serveTLS { diff --git a/mino/minogrpc/rpc.go b/mino/minogrpc/rpc.go index 0e16b7bc4..c9c5117a5 100644 --- a/mino/minogrpc/rpc.go +++ b/mino/minogrpc/rpc.go @@ -7,6 +7,8 @@ package minogrpc import ( context "context" + "sync" + "github.com/rs/xid" "go.dedis.ch/dela" "go.dedis.ch/dela/internal/tracing" @@ -15,10 +17,10 @@ import ( "go.dedis.ch/dela/mino/minogrpc/session" "go.dedis.ch/dela/serde" "golang.org/x/xerrors" + "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" - "sync" ) // RPC represents an RPC that has been registered by a client, which allows @@ -32,8 +34,10 @@ type RPC struct { } // Call implements mino.RPC. It calls the RPC on each provided address. -func (rpc *RPC) Call(ctx context.Context, - req serde.Message, players mino.Players) (<-chan mino.Response, error) { +func (rpc *RPC) Call( + ctx context.Context, + req serde.Message, players mino.Players, +) (<-chan mino.Response, error) { data, err := req.Serialize(rpc.overlay.context) if err != nil { @@ -75,7 +79,8 @@ func (rpc *RPC) Call(ctx context.Context, header := metadata.New(map[string]string{headerURIKey: rpc.uri}) newCtx := metadata.NewOutgoingContext(ctx, header) - callResp, err := cl.Call(newCtx, sendMsg) + callResp, err := cl.Call(newCtx, sendMsg, + grpc.MaxCallRecvMsgSize(session.MaxMessageSize)) if err != nil { resp := mino.NewResponseWithError( addr, @@ -134,7 +139,11 @@ func (rpc *RPC) Call(ctx context.Context, // If C has to send a message to B, it will send it through node A. Similarly, // if D has to send a message to G, it will move up the tree through B, A and // finally C. -func (rpc RPC) Stream(ctx context.Context, players mino.Players) (mino.Sender, mino.Receiver, error) { +func (rpc RPC) Stream(ctx context.Context, players mino.Players) ( + mino.Sender, + mino.Receiver, + error, +) { if players == nil || players.Len() == 0 { return nil, nil, xerrors.New("empty list of addresses") } @@ -180,7 +189,7 @@ func (rpc RPC) Stream(ctx context.Context, players mino.Players) (mino.Sender, m ctx = metadata.NewOutgoingContext(ctx, md) - stream, err := client.Stream(ctx) + stream, err := client.Stream(ctx, grpc.MaxCallRecvMsgSize(session.MaxMessageSize)) if err != nil { rpc.overlay.connMgr.Release(gw) diff --git a/mino/minogrpc/server.go b/mino/minogrpc/server.go index d4a307ba0..68c187192 100644 --- a/mino/minogrpc/server.go +++ b/mino/minogrpc/server.go @@ -112,7 +112,8 @@ func (o overlayServer) Join(ctx context.Context, req *ptypes.JoinRequest) ( client := ptypes.NewOverlayClient(conn) - _, err = client.Share(ctx, req.GetChain()) + _, err = client.Share(ctx, req.GetChain(), + grpc.MaxCallRecvMsgSize(session.MaxMessageSize)) if err != nil { res <- xerrors.Errorf("couldn't call share: %v", err) return @@ -585,7 +586,7 @@ func (o *overlay) Join(addr *url.URL, token string, certHash []byte) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - resp, err := client.Join(ctx, req) + resp, err := client.Join(ctx, req, grpc.MaxCallRecvMsgSize(session.MaxMessageSize)) if err != nil { return xerrors.Errorf("couldn't call join: %v", err) } @@ -663,6 +664,7 @@ func (mgr *connManager) Acquire(to mino.Address) (grpc.ClientConnInterface, erro Backoff: backoff.DefaultConfig, MinConnectTimeout: defaultMinConnectTimeout, }), + grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(session.MaxMessageSize)), grpc.WithUnaryInterceptor( otgrpc.OpenTracingClientInterceptor(tracer, otgrpc.SpanDecorator(decorateClientTrace)), ), diff --git a/mino/minogrpc/session/mod.go b/mino/minogrpc/session/mod.go index 0009993cd..f6927688b 100644 --- a/mino/minogrpc/session/mod.go +++ b/mino/minogrpc/session/mod.go @@ -40,6 +40,9 @@ import ( // HandshakeKey is the key to the handshake store in the headers. const HandshakeKey = "handshake" +// MaxMessageSize that will be sent using grpc +var MaxMessageSize = int(1e9) + // ConnectionManager is an interface required by the session to open and release // connections to the relays. type ConnectionManager interface { @@ -445,7 +448,10 @@ func (s *session) setupRelay(p parent, addr mino.Address) (Relay, error) { cl := ptypes.NewOverlayClient(conn) - stream, err := cl.Stream(ctx, grpc.WaitForReady(false)) + stream, err := cl.Stream(ctx, + grpc.WaitForReady(false), + grpc.MaxCallRecvMsgSize(MaxMessageSize), + ) if err != nil { s.connMgr.Release(addr) return nil, xerrors.Errorf("client: %v", err) @@ -592,7 +598,8 @@ func (r *unicastRelay) Send(ctx context.Context, p router.Packet) (*ptypes.Ack, ctx = metadata.NewOutgoingContext(ctx, r.md) - ack, err := client.Forward(ctx, &ptypes.Packet{Serialized: data}) + ack, err := client.Forward(ctx, &ptypes.Packet{Serialized: data}, + grpc.MaxCallRecvMsgSize(MaxMessageSize)) if err != nil { return nil, xerrors.Errorf("client: %w", err) } diff --git a/mod.go b/mod.go index 302e32a3e..2f50d4eae 100644 --- a/mod.go +++ b/mod.go @@ -12,6 +12,7 @@ package dela import ( + "fmt" "os" "time" @@ -73,6 +74,7 @@ func init() { default: level = zerolog.TraceLevel } + fmt.Println("LogLevel is:", logLevel, level) Logger = Logger.Level(level) PromCollectors = append(PromCollectors, promWarns, promErrs) From 208765592b2f1dc743226c3461ffb81114d8aae4 Mon Sep 17 00:00:00 2001 From: Linus Gasser Date: Tue, 27 Feb 2024 16:44:47 +0100 Subject: [PATCH 07/15] 1h rounds... --- core/ordering/cosipbft/cosipbft.go | 2 +- mod.go | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/core/ordering/cosipbft/cosipbft.go b/core/ordering/cosipbft/cosipbft.go index 555e78b5d..4876040e6 100644 --- a/core/ordering/cosipbft/cosipbft.go +++ b/core/ordering/cosipbft/cosipbft.go @@ -62,7 +62,7 @@ import ( const ( // DefaultRoundTimeout is the maximum round time the service waits // for an event to happen. - DefaultRoundTimeout = 200 * time.Second + DefaultRoundTimeout = time.Hour // DefaultFailedRoundTimeout is the maximum round time the service waits // for an event to happen, after a round has failed, thus letting time diff --git a/mod.go b/mod.go index 2f50d4eae..302e32a3e 100644 --- a/mod.go +++ b/mod.go @@ -12,7 +12,6 @@ package dela import ( - "fmt" "os" "time" @@ -74,7 +73,6 @@ func init() { default: level = zerolog.TraceLevel } - fmt.Println("LogLevel is:", logLevel, level) Logger = Logger.Level(level) PromCollectors = append(PromCollectors, promWarns, promErrs) From 3f0bb983993024a6c0a210c0365beae0504a61bd Mon Sep 17 00:00:00 2001 From: Linus Gasser Date: Wed, 28 Feb 2024 09:40:56 +0100 Subject: [PATCH 08/15] Comments and repetitions --- core/txn/pool/controller/action.go | 2 +- dkg/pedersen/dkg.go | 4 ++-- dkg/pedersen/pedersen.go | 5 ++--- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/core/txn/pool/controller/action.go b/core/txn/pool/controller/action.go index 1f9762dae..0baea8efb 100644 --- a/core/txn/pool/controller/action.go +++ b/core/txn/pool/controller/action.go @@ -25,7 +25,7 @@ var getManager = func(signer crypto.Signer, s signed.Client) txn.Manager { return signed.NewManager(signer, s) } -// addAction describes an action to add an new transaction to the pool. +// addAction describes an action to add a new transaction to the pool. // // - implements node.ActionTemplate type addAction struct { diff --git a/dkg/pedersen/dkg.go b/dkg/pedersen/dkg.go index 5e9fc8020..d528f37c7 100644 --- a/dkg/pedersen/dkg.go +++ b/dkg/pedersen/dkg.go @@ -826,7 +826,7 @@ func (s *instance) handleDecrypt( ) error { if !s.startRes.Done() { - return xerrors.Errorf("you must first initialize DKG. Did you call setup() first?") + return xerrors.Errorf(initDkgFirst) } S := suite.Point().Mul(s.privShare.V, msg.K) @@ -849,7 +849,7 @@ func (s *instance) handleReencryptRequest( ) error { if !s.startRes.Done() { - return xerrors.Errorf("you must first initialize DKG. Did you call setup() first?") + return xerrors.Errorf(initDkgFirst) } ui := s.getUI(msg.K, msg.PubK) diff --git a/dkg/pedersen/pedersen.go b/dkg/pedersen/pedersen.go index 6b6e2a087..5ffa017aa 100644 --- a/dkg/pedersen/pedersen.go +++ b/dkg/pedersen/pedersen.go @@ -47,7 +47,7 @@ var ( // associated with the `dkg-decrypt` protocol. protocolNameDecrypt = "dkg-decrypt" // protocolNameReencrypt denotes the value of the protocol span tag - //// associated with the `dkg-reencrypt` protocol. + // associated with the `dkg-reencrypt` protocol. protocolNameReencrypt = "dkg-reencrypt" // ProtocolNameResharing denotes the value of the protocol span tag // associated with the `dkg-resharing` protocol. @@ -336,8 +336,7 @@ func (a *Actor) VerifiableEncrypt(message []byte, GBar kyber.Point) (types.Ciphe []byte, error) { if !a.startRes.Done() { - return types.Ciphertext{}, nil, xerrors.Errorf("you must first initialize " + - "DKG. Did you call setup() first?") + return types.Ciphertext{}, nil, xerrors.Errorf(initDkgFirst) } // Embed the message (or as much of it as will fit) into a curve point. From 3c1f396b4a2e677a488567d9ef1ec16599556eda Mon Sep 17 00:00:00 2001 From: Linus Gasser Date: Wed, 28 Feb 2024 09:41:16 +0100 Subject: [PATCH 09/15] Remove existing socket file upon startup --- cli/node/daemon.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/cli/node/daemon.go b/cli/node/daemon.go index 8a38fcda4..9ef000620 100644 --- a/cli/node/daemon.go +++ b/cli/node/daemon.go @@ -12,6 +12,7 @@ import ( "fmt" "io" "net" + "os" "path/filepath" "sync" "time" @@ -98,6 +99,14 @@ type socketDaemon struct { // Listen implements node.Daemon. It starts the daemon by creating the unix // socket file to the path. func (d *socketDaemon) Listen() error { + _, err := os.Stat(d.socketpath) + if err == nil { + d.logger.Warn().Msg("Cleaning existing socket file") + err := os.Remove(d.socketpath) + if err != nil { + return xerrors.Errorf("couldn't clear tangling socketpath: %v", err) + } + } socket, err := d.listenFn("unix", d.socketpath) if err != nil { return xerrors.Errorf("couldn't bind socket: %v", err) From 8d2944f247f8bb642e5c660aab27eb6d6e7b84a5 Mon Sep 17 00:00:00 2001 From: Linus Gasser Date: Wed, 28 Feb 2024 09:41:52 +0100 Subject: [PATCH 10/15] Increase logging --- core/ordering/cosipbft/blockstore/disk.go | 4 ++++ core/ordering/cosipbft/fastsync/default.go | 2 ++ 2 files changed, 6 insertions(+) diff --git a/core/ordering/cosipbft/blockstore/disk.go b/core/ordering/cosipbft/blockstore/disk.go index 2ab4a33bc..d6af59d48 100644 --- a/core/ordering/cosipbft/blockstore/disk.go +++ b/core/ordering/cosipbft/blockstore/disk.go @@ -11,6 +11,7 @@ import ( "encoding/binary" "sync" + "go.dedis.ch/dela" "go.dedis.ch/dela/core" "go.dedis.ch/dela/core/ordering/cosipbft/types" "go.dedis.ch/dela/core/store" @@ -87,6 +88,9 @@ func (s *InDisk) Load() error { s.last = link s.indices[link.GetBlock().GetHash()] = link.GetBlock().GetIndex() + if s.length%100 == 0 { + dela.Logger.Info().Msgf("Loaded %d blocks", s.length) + } return nil }) diff --git a/core/ordering/cosipbft/fastsync/default.go b/core/ordering/cosipbft/fastsync/default.go index 7857c5b76..e7b77d3dd 100644 --- a/core/ordering/cosipbft/fastsync/default.go +++ b/core/ordering/cosipbft/fastsync/default.go @@ -201,6 +201,8 @@ func (s fastSync) requestSync( } } + s.logger.Info().Msgf("Currently have %d blocks", s.blocks.Len()) + return moreBlocks, nil } From c153b69b3ae92348058b549843c2f13f31250722 Mon Sep 17 00:00:00 2001 From: Linus Gasser Date: Wed, 28 Feb 2024 09:43:25 +0100 Subject: [PATCH 11/15] Put fast block sync into a unique place --- core/ordering/cosipbft/cosipbft.go | 27 ++++++++++++--------------- core/ordering/cosipbft/proc.go | 13 +++++++++---- 2 files changed, 21 insertions(+), 19 deletions(-) diff --git a/core/ordering/cosipbft/cosipbft.go b/core/ordering/cosipbft/cosipbft.go index 4876040e6..6c8281c09 100644 --- a/core/ordering/cosipbft/cosipbft.go +++ b/core/ordering/cosipbft/cosipbft.go @@ -288,23 +288,20 @@ func NewServiceStart(s *Service) { go s.watchBlocks() if s.genesis.Exists() { + // If the genesis already exists, and all blocks are loaded, + // the service can start right away to participate in the chain. + close(s.started) if s.syncMethod() == syncMethodFast { - ctx, done := context.WithCancel(context.Background()) - roster, err := s.readRoster(s.tree.Get()) - if err != nil { - panic("couldn't get roster of latest block: " + err.Error()) - } - err = s.fsync.Sync(ctx, roster, - fastsync.Config{SplitMessageSize: DefaultFastSyncMessageSize}) - if err != nil { - s.logger.Warn().Msgf("while syncing with other nodes: %+v", err) - } - done() + go func() { + roster, err := s.getCurrentRoster() + if err != nil { + s.logger.Err(err).Msg("Couldn't get roster") + } else { + s.logger.Info().Msg("Triggering catchup") + s.catchup <- roster + } + }() } - - // If the genesis already exists, the service can start right away to - // participate in the chain. - close(s.started) } } diff --git a/core/ordering/cosipbft/proc.go b/core/ordering/cosipbft/proc.go index 3a294586c..425583176 100644 --- a/core/ordering/cosipbft/proc.go +++ b/core/ordering/cosipbft/proc.go @@ -8,6 +8,7 @@ package cosipbft import ( "context" + "time" "github.com/rs/zerolog" "go.dedis.ch/dela/core" @@ -306,10 +307,14 @@ func (h *processor) catchupHandler() { for players := range h.catchup { if h.syncMethod() == syncMethodFast { ctx, cancel := context.WithCancel(context.Background()) - err := h.fsync.Sync(ctx, players, - fastsync.Config{SplitMessageSize: DefaultFastSyncMessageSize}) - if err != nil { - h.logger.Err(err) + for { + err := h.fsync.Sync(ctx, players, + fastsync.Config{SplitMessageSize: DefaultFastSyncMessageSize}) + if err == nil { + break + } + h.logger.Err(err).Msg("Couldn't sync - trying again in 10 seconds") + time.Sleep(10 * time.Second) } cancel() } From 556fbdd5ac290f2c3b3e4cb6b8b666d460221e65 Mon Sep 17 00:00:00 2001 From: Linus Gasser Date: Wed, 28 Feb 2024 09:44:11 +0100 Subject: [PATCH 12/15] Fix comparisons and updates of block indexes and nonces --- core/ordering/cosipbft/fastsync/default.go | 2 +- core/txn/signed/example_test.go | 16 +++++++++++++--- core/txn/signed/signed.go | 11 +++++++++++ 3 files changed, 25 insertions(+), 4 deletions(-) diff --git a/core/ordering/cosipbft/fastsync/default.go b/core/ordering/cosipbft/fastsync/default.go index e7b77d3dd..95c93f811 100644 --- a/core/ordering/cosipbft/fastsync/default.go +++ b/core/ordering/cosipbft/fastsync/default.go @@ -241,7 +241,7 @@ func (h *handler) Stream(out mino.Sender, in mino.Receiver) error { return xerrors.Errorf("creating blocks to send failed: %v", err) } - sentAllBlocks = m.GetLatest()+uint64(len(blReply)) == h.blocks.Len() + sentAllBlocks = m.GetLatest()+uint64(len(blReply)) >= h.blocks.Len() err = <-out.Send(types.NewCatchupMessage( !sentAllBlocks, blReply), orch) if err != nil { diff --git a/core/txn/signed/example_test.go b/core/txn/signed/example_test.go index 51e6fae7f..830c6af29 100644 --- a/core/txn/signed/example_test.go +++ b/core/txn/signed/example_test.go @@ -10,7 +10,8 @@ import ( func ExampleTransactionManager_Make() { signer := bls.NewSigner() - manager := NewManager(signer, exampleClient{nonce: 5}) + nonce := uint64(0) + manager := NewManager(signer, exampleClient{nonce: &nonce}) tx, err := manager.Make() if err != nil { @@ -19,6 +20,7 @@ func ExampleTransactionManager_Make() { fmt.Println(tx.GetNonce()) + nonce = uint64(5) err = manager.Sync() if err != nil { panic("failed to synchronize: " + err.Error()) @@ -31,8 +33,16 @@ func ExampleTransactionManager_Make() { fmt.Println(tx.GetNonce()) + tx, err = manager.Make() + if err != nil { + panic("failed to create second transaction: " + err.Error()) + } + + fmt.Println(tx.GetNonce()) + // Output: 0 // 5 + // 6 } // exampleClient is an example of a manager client. It always synchronize the @@ -40,11 +50,11 @@ func ExampleTransactionManager_Make() { // // - implements signed.Client type exampleClient struct { - nonce uint64 + nonce *uint64 } // GetNonce implements signed.Client. It always return the same nonce for // simplicity. func (cl exampleClient) GetNonce(identity access.Identity) (uint64, error) { - return cl.nonce, nil + return *cl.nonce, nil } diff --git a/core/txn/signed/signed.go b/core/txn/signed/signed.go index ef0d1d9b4..31a7a8f7f 100644 --- a/core/txn/signed/signed.go +++ b/core/txn/signed/signed.go @@ -306,6 +306,17 @@ func (mgr *TransactionManager) Make(args ...txn.Arg) (txn.Transaction, error) { opts = append(opts, WithHashFactory(mgr.hashFac)) + nonce, err := mgr.client.GetNonce(mgr.signer.GetPublicKey()) + if err != nil { + dela.Logger.Err(err).Msg("Couldn't update nonce") + } + // Only update our nonce if the stored nonce is bigger than ours. + // This allows to have transactions in the pool which are not yet accepted, + // but still have the correct nonce. + if nonce > mgr.nonce { + mgr.nonce = nonce + } + tx, err := NewTransaction(mgr.nonce, mgr.signer.GetPublicKey(), opts...) if err != nil { return nil, xerrors.Errorf("failed to create tx: %v", err) From 21f54f0b46f66141764950fb007ec2f43005bf4c Mon Sep 17 00:00:00 2001 From: Linus Gasser Date: Wed, 28 Feb 2024 09:48:40 +0100 Subject: [PATCH 13/15] Doing some magic timeout adjustments --- core/ordering/cosipbft/cosipbft.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/ordering/cosipbft/cosipbft.go b/core/ordering/cosipbft/cosipbft.go index 6c8281c09..3832fc4d5 100644 --- a/core/ordering/cosipbft/cosipbft.go +++ b/core/ordering/cosipbft/cosipbft.go @@ -62,7 +62,7 @@ import ( const ( // DefaultRoundTimeout is the maximum round time the service waits // for an event to happen. - DefaultRoundTimeout = time.Hour + DefaultRoundTimeout = 10 * time.Minute // DefaultFailedRoundTimeout is the maximum round time the service waits // for an event to happen, after a round has failed, thus letting time @@ -72,14 +72,14 @@ const ( // DefaultTransactionTimeout is the maximum allowed age of transactions // before a view change is executed. - DefaultTransactionTimeout = 30 * time.Second + DefaultTransactionTimeout = 5 * time.Minute // RoundWait is the constant value of the exponential backoff use between // round failures. - RoundWait = 5 * time.Millisecond + RoundWait = 50 * time.Millisecond // RoundMaxWait is the maximum amount for the backoff. - RoundMaxWait = 5 * time.Minute + RoundMaxWait = 10 * time.Minute // DefaultFastSyncMessageSize defines when a fast sync message will be split. DefaultFastSyncMessageSize = 1e6 From fa266d7ecd92201f265fc45483b8f04bbd5c8bb7 Mon Sep 17 00:00:00 2001 From: Linus Gasser Date: Wed, 28 Feb 2024 09:58:33 +0100 Subject: [PATCH 14/15] Fix tests --- .../contracts/viewchange/viewchange_test.go | 2 +- core/ordering/cosipbft/cosipbft_test.go | 4 ++-- core/txn/signed/signed_test.go | 2 +- test/cosidela_test.go | 7 ++----- test/integration_test.go | 3 --- testing/fake/crypto.go | 14 ++++++++++++++ 6 files changed, 20 insertions(+), 12 deletions(-) diff --git a/core/ordering/cosipbft/contracts/viewchange/viewchange_test.go b/core/ordering/cosipbft/contracts/viewchange/viewchange_test.go index 1f718ff81..2a21083a4 100644 --- a/core/ordering/cosipbft/contracts/viewchange/viewchange_test.go +++ b/core/ordering/cosipbft/contracts/viewchange/viewchange_test.go @@ -22,7 +22,7 @@ func TestRegisterContract(t *testing.T) { } func TestNewTransaction(t *testing.T) { - mgr := NewManager(signed.NewManager(fake.NewSigner(), nil)) + mgr := NewManager(signed.NewManager(fake.NewSigner(), fake.NewClient())) tx, err := mgr.Make(authority.New(nil, nil)) require.NoError(t, err) diff --git a/core/ordering/cosipbft/cosipbft_test.go b/core/ordering/cosipbft/cosipbft_test.go index 3efaeb12c..44c9eb030 100644 --- a/core/ordering/cosipbft/cosipbft_test.go +++ b/core/ordering/cosipbft/cosipbft_test.go @@ -175,7 +175,7 @@ func TestService_Scenario_ViewChange_Request(t *testing.T) { require.Equal(t, leader, nodes[0].onet.GetAddress()) // let enough time for a round to run - time.Sleep(DefaultRoundTimeout + 100*time.Millisecond) + time.Sleep(time.Second) require.Equal(t, nodes[3].service.pbftsm.GetState(), pbft.ViewChangeState) require.NotEqual(t, nodes[2].service.pbftsm.GetState(), pbft.ViewChangeState) @@ -214,7 +214,7 @@ func TestService_Scenario_ViewChange_NoRequest(t *testing.T) { require.NoError(t, err) // let enough time for a round to run - time.Sleep(DefaultRoundTimeout + 100*time.Millisecond) + time.Sleep(time.Second) require.NotEqual(t, nodes[3].service.pbftsm.GetState(), pbft.ViewChangeState) require.NotEqual(t, nodes[2].service.pbftsm.GetState(), pbft.ViewChangeState) diff --git a/core/txn/signed/signed_test.go b/core/txn/signed/signed_test.go index b8c47227a..d32ac180b 100644 --- a/core/txn/signed/signed_test.go +++ b/core/txn/signed/signed_test.go @@ -160,7 +160,7 @@ func TestTransactionFactory_Deserialize(t *testing.T) { } func TestManager_Make(t *testing.T) { - mgr := NewManager(fake.NewSigner(), nil) + mgr := NewManager(fake.NewSigner(), fake.NewClient()) tx, err := mgr.Make(txn.Arg{Key: "a", Value: []byte{1, 2, 3}}) require.NoError(t, err) diff --git a/test/cosidela_test.go b/test/cosidela_test.go index 9bafad70a..d45d0928d 100644 --- a/test/cosidela_test.go +++ b/test/cosidela_test.go @@ -399,16 +399,13 @@ func (a accessstore) Delete(key []byte) error { return nil } -// txClient return monotically increasing nonce +// txClient returns always a 0 nonce // // - implements signed.Client type txClient struct { - nonce uint64 } // GetNonce implements signed.Client func (c *txClient) GetNonce(access.Identity) (uint64, error) { - res := c.nonce - c.nonce++ - return res, nil + return 0, nil } diff --git a/test/integration_test.go b/test/integration_test.go index 2e2cb6fb4..da6df36b2 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -124,9 +124,6 @@ func addAndWait( node cosiDelaNode, args ...txn.Arg, ) error { - err := manager.Sync() - require.NoError(t, err) - tx, err := manager.Make(args...) if err != nil { return xerrors.Errorf("failed to make tx: %v", err) diff --git a/testing/fake/crypto.go b/testing/fake/crypto.go index 90a3f5acb..5fd36dc57 100644 --- a/testing/fake/crypto.go +++ b/testing/fake/crypto.go @@ -3,6 +3,7 @@ package fake import ( "hash" + "go.dedis.ch/dela/core/access" "go.dedis.ch/dela/crypto" "go.dedis.ch/dela/serde" ) @@ -253,6 +254,19 @@ func (s Signer) Aggregate(...crypto.Signature) (crypto.Signature, error) { return Signature{}, s.err } +type Client struct { + nonce uint64 +} + +// NewClient returns a fake client +func NewClient() Client { + return Client{} +} + +func (c Client) GetNonce(access.Identity) (uint64, error) { + return c.nonce, nil +} + // Verifier is a fake implementation of crypto.Verifier. // // - implements crypto.Verifier From c1fc3e1879203be3c0ea5d278e0c4eec0d25d9a8 Mon Sep 17 00:00:00 2001 From: Linus Gasser Date: Tue, 5 Mar 2024 13:55:58 +0100 Subject: [PATCH 15/15] Don't use the sync beforehand - the manager.Make takes care of that --- core/txn/pool/controller/action.go | 5 ----- core/txn/pool/controller/action_test.go | 9 +-------- core/txn/pool/controller/controller.go | 2 +- 3 files changed, 2 insertions(+), 14 deletions(-) diff --git a/core/txn/pool/controller/action.go b/core/txn/pool/controller/action.go index 0baea8efb..15810ab92 100644 --- a/core/txn/pool/controller/action.go +++ b/core/txn/pool/controller/action.go @@ -62,11 +62,6 @@ func (a *addAction) Execute(ctx node.Context) error { manager := getManager(signer, a.client) - err = manager.Sync() - if err != nil { - return xerrors.Errorf("failed to sync manager: %v", err) - } - tx, err := manager.Make(args...) if err != nil { return xerrors.Errorf("creating transaction: %v", err) diff --git a/core/txn/pool/controller/action_test.go b/core/txn/pool/controller/action_test.go index 4865360cc..abad3cf41 100644 --- a/core/txn/pool/controller/action_test.go +++ b/core/txn/pool/controller/action_test.go @@ -48,19 +48,12 @@ func TestExecute(t *testing.T) { err = action.Execute(ctx) require.EqualError(t, err, "failed to include tx: "+fake.Err("failed to add")) - getManager = func(c crypto.Signer, s signed.Client) txn.Manager { - return badManager{} - } - - err = action.Execute(ctx) - require.EqualError(t, err, "creating transaction: "+fake.Err("make fail")) - getManager = func(c crypto.Signer, s signed.Client) txn.Manager { return badManager{failSync: true} } err = action.Execute(ctx) - require.EqualError(t, err, "failed to sync manager: "+fake.Err("sync fail")) + require.EqualError(t, err, "creating transaction: "+fake.Err("make fail")) err = os.WriteFile(keyFile, []byte("bad signer"), os.ModePerm) require.NoError(t, err) diff --git a/core/txn/pool/controller/controller.go b/core/txn/pool/controller/controller.go index 9d70373c1..921339509 100644 --- a/core/txn/pool/controller/controller.go +++ b/core/txn/pool/controller/controller.go @@ -63,7 +63,7 @@ func (miniController) OnStop(inj node.Injector) error { return nil } -// client return monotically increasing nonce +// client returns monotonically increasing nonce // // - implements signed.Client type client struct {