Skip to content

Commit

Permalink
Merge branch 'main' into persistence/savepoints-initial
Browse files Browse the repository at this point in the history
  • Loading branch information
dylanlott committed Jun 29, 2023
2 parents 9f73b31 + fd30526 commit 7ee9be3
Show file tree
Hide file tree
Showing 10 changed files with 304 additions and 110 deletions.
2 changes: 1 addition & 1 deletion app/client/keybase/hashicorp/vault_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestMain(m *testing.M) {
// pulls an image, creates a container based on it and runs it
resource, err := pool.RunWithOptions(&dockertest.RunOptions{
Repository: "vault",
Tag: "latest",
Tag: "1.13.3",
Env: []string{
"VAULT_DEV_ROOT_TOKEN_ID=dev-only-token",
"VAULT_DEV_LISTEN_ADDRESS=0.0.0.0:8200",
Expand Down
44 changes: 43 additions & 1 deletion p2p/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,19 @@ import (
"fmt"

"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/protocol"
"go.uber.org/multierr"

"github.com/pokt-network/pocket/p2p/providers"
typesP2P "github.com/pokt-network/pocket/p2p/types"
"github.com/pokt-network/pocket/shared/crypto"
"go.uber.org/multierr"
"github.com/pokt-network/pocket/shared/modules"
)

var (
_ typesP2P.RouterConfig = &baseConfig{}
_ typesP2P.RouterConfig = &UnicastRouterConfig{}
_ typesP2P.RouterConfig = &RainTreeConfig{}
)

// baseConfig implements `RouterConfig` using the given libp2p host and current
Expand All @@ -22,6 +32,14 @@ type baseConfig struct {
PeerstoreProvider providers.PeerstoreProvider
}

type UnicastRouterConfig struct {
Logger *modules.Logger
Host host.Host
ProtocolID protocol.ID
MessageHandler typesP2P.MessageHandler
PeerHandler func(peer typesP2P.Peer) error
}

// BackgroundConfig implements `RouterConfig` for use with `BackgroundRouter`.
type BackgroundConfig struct {
Host host.Host
Expand Down Expand Up @@ -57,6 +75,30 @@ func (cfg *baseConfig) IsValid() (err error) {
if cfg.PeerstoreProvider == nil {
err = multierr.Append(err, fmt.Errorf("peerstore provider not configured"))
}
return nil
}

// IsValid implements the respective member of the `RouterConfig` interface.
func (cfg *UnicastRouterConfig) IsValid() (err error) {
if cfg.Logger == nil {
err = multierr.Append(err, fmt.Errorf("logger not configured"))
}

if cfg.Host == nil {
err = multierr.Append(err, fmt.Errorf("host not configured"))
}

if cfg.ProtocolID == "" {
err = multierr.Append(err, fmt.Errorf("protocol id not configured"))
}

if cfg.MessageHandler == nil {
err = multierr.Append(err, fmt.Errorf("message handler not configured"))
}

if cfg.PeerHandler == nil {
err = multierr.Append(err, fmt.Errorf("peer handler not configured"))
}
return err
}

Expand Down
15 changes: 15 additions & 0 deletions p2p/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,17 @@ type p2pModule struct {
identity libp2p.Option
listenAddrs libp2p.Option

// TECHDEBT(#810): register the providers to the module registry instead of
// holding a reference in the module struct and passing via router config.
//
// Assigned during creation via `#setupDependencies()`.
currentHeightProvider providers.CurrentHeightProvider
pstoreProvider providers.PeerstoreProvider
nonceDeduper *mempool.GenericFIFOSet[uint64, uint64]

// TECHDEBT(#810): register the routers to the module registry instead of
// holding a reference in the module struct. This will improve testability.
//
// Assigned during `#Start()`. TLDR; `host` listens on instantiation.
// and `router` depends on `host`.
router typesP2P.Router
Expand Down Expand Up @@ -252,6 +258,9 @@ func (m *p2pModule) setupPeerstoreProvider() error {
if !ok {
return fmt.Errorf("unknown peerstore provider type: %T", pstoreProviderModule)
}

// TECHDEBT(#810): register the provider to the module registry instead of
// holding a reference in the module struct and passing via router config.
m.pstoreProvider = pstoreProvider

return nil
Expand All @@ -260,6 +269,7 @@ func (m *p2pModule) setupPeerstoreProvider() error {
// setupCurrentHeightProvider attempts to retrieve the current height provider
// from the bus registry, falls back to the consensus module if none is registered.
func (m *p2pModule) setupCurrentHeightProvider() error {
// TECHDEBT(#810): simplify once submodules are more convenient to retrieve.
m.logger.Debug().Msg("setupCurrentHeightProvider")
currentHeightProviderModule, err := m.GetBus().GetModulesRegistry().GetModule(current_height_provider.ModuleName)
if err != nil {
Expand All @@ -276,6 +286,9 @@ func (m *p2pModule) setupCurrentHeightProvider() error {
if !ok {
return fmt.Errorf("unexpected current height provider type: %T", currentHeightProviderModule)
}

// TECHDEBT(#810): register the provider to the module registry instead of
// holding a reference in the module struct and passing via router config.
m.currentHeightProvider = currentHeightProvider

return nil
Expand All @@ -294,6 +307,8 @@ func (m *p2pModule) setupNonceDeduper() error {

// setupRouter instantiates the configured router implementation.
func (m *p2pModule) setupRouter() (err error) {
// TECHDEBT(#810): register the router to the module registry instead of
// holding a reference in the module struct. This will improve testability.
m.router, err = raintree.NewRainTreeRouter(
m.GetBus(),
&config.RainTreeConfig{
Expand Down
148 changes: 48 additions & 100 deletions p2p/raintree/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@ package raintree

import (
"fmt"
"io"
"time"

libp2pHost "github.com/libp2p/go-libp2p/core/host"
libp2pNetwork "github.com/libp2p/go-libp2p/core/network"
"github.com/pokt-network/pocket/p2p/unicast"
"google.golang.org/protobuf/proto"

"github.com/pokt-network/pocket/logger"
Expand All @@ -22,15 +20,9 @@ import (
"github.com/pokt-network/pocket/shared/messaging"
"github.com/pokt-network/pocket/shared/modules"
"github.com/pokt-network/pocket/shared/modules/base_modules"
telemetry "github.com/pokt-network/pocket/telemetry"
"github.com/pokt-network/pocket/telemetry"
)

// TECHDEBT(#629): configure timeouts. Consider security exposure vs. real-world conditions.
// TECHDEBT(#629): parameterize and expose via config.
// readStreamTimeout is the duration to wait for a read operation on a
// stream to complete, after which the stream is closed ("timed out").
const readStreamTimeout = time.Second * 10

var (
_ typesP2P.Router = &rainTreeRouter{}
_ modules.IntegratableModule = &rainTreeRouter{}
Expand All @@ -41,10 +33,11 @@ type rainTreeFactory = modules.FactoryWithConfig[typesP2P.Router, *config.RainTr

type rainTreeRouter struct {
base_modules.IntegratableModule
unicast.UnicastRouter

logger *modules.Logger
// handler is the function to call when a message is received.
handler typesP2P.RouterHandler
handler typesP2P.MessageHandler
// host represents a libp2p libp2pNetwork node, it encapsulates a libp2p peerstore
// & connection manager. `libp2p.New` configures and starts listening
// according to options.
Expand Down Expand Up @@ -84,7 +77,6 @@ func (*rainTreeRouter) Create(bus modules.Bus, cfg *config.RainTreeConfig) (type
return nil, err
}

rtr.host.SetStreamHandler(protocol.PoktProtocolID, rtr.handleStream)
return typesP2P.Router(rtr), nil
}

Expand Down Expand Up @@ -191,9 +183,11 @@ func (rtr *rainTreeRouter) sendInternal(data []byte, address cryptoPocket.Addres
return nil
}

// handleRainTreeMsg handles a RainTree message, continuing broadcast propagation
// if applicable. Returns the serialized `PocketEnvelope` data contained within.
func (rtr *rainTreeRouter) handleRainTreeMsg(data []byte) ([]byte, error) {
// handleRainTreeMsg deserializes a RainTree message to extract the `PocketEnvelope`
// bytes contained within, continues broadcast propagation, if applicable, and
// passes them off to the application by calling the configured `rtr.handler`.
// Intended to be called in a go routine.
func (rtr *rainTreeRouter) handleRainTreeMsg(rainTreeMsgBz []byte) error {
blockHeightInt := rtr.GetBus().GetConsensusModule().CurrentHeight()
blockHeight := fmt.Sprintf("%d", blockHeightInt)

Expand All @@ -207,25 +201,36 @@ func (rtr *rainTreeRouter) handleRainTreeMsg(data []byte) ([]byte, error) {
)

var rainTreeMsg typesP2P.RainTreeMessage
if err := proto.Unmarshal(data, &rainTreeMsg); err != nil {
return nil, err
if err := proto.Unmarshal(rainTreeMsgBz, &rainTreeMsg); err != nil {
return err
}

// TECHDEBT(#763): refactor as "pre-propagation validation"
networkMessage := messaging.PocketEnvelope{}
if err := proto.Unmarshal(rainTreeMsg.Data, &networkMessage); err != nil {
rtr.logger.Error().Err(err).Msg("Error decoding network message")
return nil, err
return err
}

// Continue RainTree propagation
if rainTreeMsg.Level > 0 {
if err := rtr.broadcastAtLevel(rainTreeMsg.Data, rainTreeMsg.Level-1); err != nil {
return nil, err
return err
}
}

// Return the data back to the caller so it can be handled by the app specific bus
return rainTreeMsg.Data, nil
// There was no error, but we don't need to forward this to the app-specific bus.
// For example, the message has already been handled by the application.
if rainTreeMsg.Data == nil {
rtr.logger.Debug().Msg("no data in RainTree message")
return nil
}

// Call configured message handler with the serialized `PocketEnvelope`.
if err := rtr.handler(rainTreeMsg.Data); err != nil {
return fmt.Errorf("handling raintree message: %w", err)
}
return nil
}

// GetPeerstore implements the respective member of `typesP2P.Router`.
Expand Down Expand Up @@ -254,6 +259,7 @@ func (rtr *rainTreeRouter) AddPeer(peer typesP2P.Peer) error {
return nil
}

// RemovePeer implements the respective member of `typesP2P.Router`.
func (rtr *rainTreeRouter) RemovePeer(peer typesP2P.Peer) error {
rtr.peersManager.HandleEvent(
typesP2P.PeerManagerEvent{
Expand All @@ -270,94 +276,42 @@ func (rtr *rainTreeRouter) Size() int {
return rtr.peersManager.GetPeerstore().Size()
}

// handleStream ensures the peerstore contains the remote peer and then reads
// the incoming stream in a new go routine.
func (rtr *rainTreeRouter) handleStream(stream libp2pNetwork.Stream) {
rtr.logger.Debug().Msg("handling incoming stream")
peer, err := utils.PeerFromLibp2pStream(stream)
if err != nil {
rtr.logger.Error().Err(err).
Str("address", peer.GetAddress().String()).
Msg("parsing remote peer identity")

if err = stream.Reset(); err != nil {
rtr.logger.Error().Err(err).Msg("resetting stream")
}
return
}

if err := rtr.AddPeer(peer); err != nil {
rtr.logger.Error().Err(err).
Str("address", peer.GetAddress().String()).
Msg("adding remote peer to router")
}

go rtr.readStream(stream)
// shouldSendToTarget returns false if target is self.
func shouldSendToTarget(target target) bool {
return !target.isSelf
}

// readStream reads the incoming stream, extracts the serialized `PocketEnvelope`
// data from the incoming `RainTreeMessage`, and passes it to the application by
// calling the configured `rtr.handler`. Intended to be called in a go routine.
func (rtr *rainTreeRouter) readStream(stream libp2pNetwork.Stream) {
// Time out if no data is sent to free resources.
// NB: tests using libp2p's `mocknet` rely on this not returning an error.
if err := stream.SetReadDeadline(newReadStreamDeadline()); err != nil {
// `SetReadDeadline` not supported by `mocknet` streams.
rtr.logger.Error().Err(err).Msg("setting stream read deadline")
}

// log incoming stream
rtr.logStream(stream)

// read stream
rainTreeMsgBz, err := io.ReadAll(stream)
if err != nil {
rtr.logger.Error().Err(err).Msg("reading from stream")
if err := stream.Reset(); err != nil {
rtr.logger.Error().Err(err).Msg("resetting stream (read-side)")
}
return
}

// done reading; reset to signal this to remote peer
// NB: failing to reset the stream can easily max out the number of available
// network connections on the receiver's side.
if err := stream.Reset(); err != nil {
rtr.logger.Error().Err(err).Msg("resetting stream (read-side)")
// setupUnicastRouter configures and assigns `rtr.UnicastRouter`.
func (rtr *rainTreeRouter) setupUnicastRouter() error {
unicastRouterCfg := config.UnicastRouterConfig{
Logger: rtr.logger,
Host: rtr.host,
ProtocolID: protocol.PoktProtocolID,
MessageHandler: rtr.handleRainTreeMsg,
PeerHandler: rtr.AddPeer,
}

// extract `PocketEnvelope` from `RainTreeMessage` (& continue propagation)
poktEnvelopeBz, err := rtr.handleRainTreeMsg(rainTreeMsgBz)
unicastRouter, err := unicast.Create(rtr.GetBus(), &unicastRouterCfg)
if err != nil {
rtr.logger.Error().Err(err).Msg("handling raintree message")
return
}

// There was no error, but we don't need to forward this to the app-specific bus.
// For example, the message has already been handled by the application.
if poktEnvelopeBz == nil {
return
}

// call configured handler to forward to app-specific bus
if err := rtr.handler(poktEnvelopeBz); err != nil {
rtr.logger.Error().Err(err).Msg("handling pocket envelope")
return fmt.Errorf("setting up unicast router: %w", err)
}
}

// shouldSendToTarget returns false if target is self.
func shouldSendToTarget(target target) bool {
return !target.isSelf
rtr.UnicastRouter = *unicastRouter
return nil
}

func (rtr *rainTreeRouter) setupDependencies() error {
if err := rtr.setupUnicastRouter(); err != nil {
return err
}

pstore, err := rtr.pstoreProvider.GetStakedPeerstoreAtHeight(rtr.currentHeightProvider.CurrentHeight())
if err != nil {
return err
return fmt.Errorf("getting staked peerstore: %w", err)
}

if err := rtr.setupPeerManager(pstore); err != nil {
return err
return fmt.Errorf("setting up peer manager: %w", err)
}

if err := utils.PopulateLibp2pHost(rtr.host, pstore); err != nil {
Expand All @@ -374,9 +328,3 @@ func (rtr *rainTreeRouter) setupPeerManager(pstore typesP2P.Peerstore) (err erro
func (rtr *rainTreeRouter) getHostname() string {
return rtr.GetBus().GetRuntimeMgr().GetConfig().P2P.Hostname
}

// newReadStreamDeadline returns a future deadline
// based on the read stream timeout duration.
func newReadStreamDeadline() time.Time {
return time.Now().Add(readStreamTimeout)
}
6 changes: 4 additions & 2 deletions p2p/raintree/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@

package raintree

import libp2pNetwork "github.com/libp2p/go-libp2p/core/network"
import (
libp2pNetwork "github.com/libp2p/go-libp2p/core/network"
)

// RainTreeRouter exports `rainTreeRouter` for testing purposes.
type RainTreeRouter = rainTreeRouter

// HandleStream exports `rainTreeRouter#handleStream` for testing purposes.
func (rtr *rainTreeRouter) HandleStream(stream libp2pNetwork.Stream) {
rtr.handleStream(stream)
rtr.UnicastRouter.HandleStream(stream)
}
Loading

0 comments on commit 7ee9be3

Please sign in to comment.