Skip to content

Commit

Permalink
[P2P] refactor: peerstore provider (part 1) (#804)
Browse files Browse the repository at this point in the history
## Description

1. Simplify `PeerstoreProvider` interface
2. Extend it to support retrieval of the unstaked actor peerstore
3. Implement the extended interface in `persistencePeerstoreProvider`

### Before

```mermaid
classDiagram 

class InterruptableModule {
    <<interface>>
    Start() error
    Stop() error
}
class IntegratableModule {
    <<interface>>
    +SetBus(bus Bus)
    +GetBus() Bus
}
class InitializableModule {
    <<interface>>
    +GetModuleName() string
    +Create(bus Bus, opts ...Option) (Module, error)
}
class Module {
    <<interface>>
}

Module --|> InitializableModule
Module --|> IntegratableModule
Module --|> InterruptableModule


class PeerstoreProvider {
    <<interface>>
    +GetStakedPeerstoreAtHeight(height int) (Peerstore, error)
    +GetP2PConfig() *P2PConfig
}

class persistencePeerstoreProvider

class rpcPeerstoreProvider

persistencePeerstoreProvider --|> PeerstoreProvider

rpcPeerstoreProvider --|> PeerstoreProvider
PeerstoreProvider --|> Module
```

### After

```mermaid
classDiagram 

class IntegratableModule {
    <<interface>>
    +GetBus() Bus
    +SetBus(bus Bus)
}

class PeerstoreProvider {
    <<interface>>
    +GetStakedPeerstoreAtHeight(height int) (Peerstore, error)
    +GetUnstakedPeerstore() (Peerstore, error)
}

class persistencePeerstoreProvider
class rpcPeerstoreProvider
class p2pModule

class unstakedPeerstoreProvider {
    <<interface>>
    +GetUnstakedPeerstore() (Peerstore, error)
}

persistencePeerstoreProvider --|> PeerstoreProvider
persistencePeerstoreProvider --> p2pModule : from Bus
rpcPeerstoreProvider --> p2pModule : from Bus
p2pModule --|> unstakedPeerstoreProvider

rpcPeerstoreProvider --|> PeerstoreProvider
rpcPeerstoreProvider --|> Module
PeerstoreProvider --|> IntegratableModule

class Module {
    <<interface>>
}

Module --|> InitializableModule
Module --|> IntegratableModule
Module --|> InterruptableModule
```

## Issue

Realted:
- #810

Dependants:
- #505 
- #806

## Type of change

Please mark the relevant option(s):

- [ ] New feature, functionality or library
- [ ] Bug fix
- [ ] Code health or cleanup
- [ ] Major breaking change
- [ ] Documentation
- [ ] Other <!-- add details here if it a different type of change -->

## List of changes

- Replaced embedded `modules.Module` with simpler
`modules.IntegratableModule` in `PeerstoreProvider` interface
- Removed unused `PeerstoreProvider#GetP2PConfig()` method
- Added `PeerstoreProvider#GetUnstakedPeerstore()` method
- Added `p2pPeerstoreProvider` implementation of `PeerstoreProvider`
interface
- Added `Factory` generic type 

## Testing

- [ ] `make develop_test`; if any code changes were made
- [x] `make test_e2e` on [k8s
LocalNet](https://github.com/pokt-network/pocket/blob/main/build/localnet/README.md);
if any code changes were made
- [ ] `e2e-devnet-test` passes tests on
[DevNet](https://pocketnetwork.notion.site/How-to-DevNet-ff1598f27efe44c09f34e2aa0051f0dd);
if any code was changed
- [x] [Docker Compose
LocalNet](https://github.com/pokt-network/pocket/blob/main/docs/development/README.md);
if any major functionality was changed or introduced
- [x] [k8s
LocalNet](https://github.com/pokt-network/pocket/blob/main/build/localnet/README.md);
if any infrastructure or configuration changes were made

<!-- REMOVE this comment block after following the instructions
 If you added additional tests or infrastructure, describe it here.
 Bonus points for images and videos or gifs.
-->

## Required Checklist

- [x] I have performed a self-review of my own code
- [x] I have commented my code, particularly in hard-to-understand areas
- [x] I have added, or updated, [`godoc` format
comments](https://go.dev/blog/godoc) on touched members (see:
[tip.golang.org/doc/comment](https://tip.golang.org/doc/comment))
- [ ] I have tested my changes using the available tooling
- [ ] I have updated the corresponding CHANGELOG

### If Applicable Checklist

- [ ] I have updated the corresponding README(s); local and/or global
- [ ] I have added tests that prove my fix is effective or that my
feature works
- [ ] I have added, or updated,
[mermaid.js](https://mermaid-js.github.io) diagrams in the corresponding
README(s)
- [ ] I have added, or updated, documentation and
[mermaid.js](https://mermaid-js.github.io) diagrams in `shared/docs/*`
if I updated `shared/*`README(s)
  • Loading branch information
bryanchriswhite authored Jun 13, 2023
1 parent f72e1f0 commit 9cb0ee9
Show file tree
Hide file tree
Showing 16 changed files with 172 additions and 60 deletions.
2 changes: 1 addition & 1 deletion app/client/cli/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func persistentPreRun(cmd *cobra.Command, _ []string) {
func setupPeerstoreProvider(rm runtime.Manager, rpcURL string) {
bus := rm.GetBus()
modulesRegistry := bus.GetModulesRegistry()
pstoreProvider := rpcABP.NewRPCPeerstoreProvider(
pstoreProvider := rpcABP.Create(
rpcABP.WithP2PConfig(rm.GetConfig().P2P),
rpcABP.WithCustomRPCURL(rpcURL),
)
Expand Down
4 changes: 4 additions & 0 deletions app/client/doc/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## [0.0.0.33] - 2023-06-13

- Renamed `NewRPCPeerstoreProvider()` and `NewPersistencePeerstoreProvider()` to `Create()` (per package)

## [0.0.0.32] - 2023-05-25

- Add the `nonInteractive` flag in a couple spots where it was missing
Expand Down
4 changes: 4 additions & 0 deletions consensus/doc/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## [0.0.0.54] - 2023-06-13

- Fix tests

## [0.0.0.53] - 2023-06-08

- Add consensus README
Expand Down
16 changes: 16 additions & 0 deletions consensus/e2e_tests/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ import (

"github.com/benbjohnson/clock"
"github.com/golang/mock/gomock"

"github.com/pokt-network/pocket/consensus"
typesCons "github.com/pokt-network/pocket/consensus/types"
"github.com/pokt-network/pocket/internal/testutil"
persistenceMocks "github.com/pokt-network/pocket/persistence/types/mocks"
"github.com/pokt-network/pocket/runtime"
"github.com/pokt-network/pocket/runtime/configs"
Expand Down Expand Up @@ -432,6 +434,20 @@ func basePersistenceMock(t *testing.T, _ modules.EventsChannel, bus modules.Bus)
Return(bus.GetRuntimeMgr().GetGenesis().Validators, nil).
AnyTimes()

persistenceReadContextMock.
EXPECT().
GetAllStakedActors(gomock.Any()).
DoAndReturn(func(height int64) ([]*coreTypes.Actor, error) {
genesisState := bus.GetRuntimeMgr().GetGenesis()
return testutil.Concatenate[*coreTypes.Actor](
genesisState.Validators,
genesisState.Servicers,
genesisState.Fishermen,
genesisState.Applications,
), nil
}).
AnyTimes()

persistenceReadContextMock.
EXPECT().
GetBlockHash(gomock.Any()).
Expand Down
10 changes: 10 additions & 0 deletions internal/testutil/slice.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package testutil

// Concatenate appends the contents of multiple slices of any type (T) into a
// single slice of type T.
func Concatenate[T any](tt ...[]T) (result []T) {
for _, t := range tt {
result = append(result, t...)
}
return result
}
8 changes: 8 additions & 0 deletions p2p/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## [0.0.0.54] - 2023-06-13

- Replaced embedded `modules.Module` with simpler `modules.IntegratableModule` in `PeerstoreProvider` interface
- Removed unused `PeerstoreProvider#GetP2PConfig()` method
- Added `PeerstoreProvider#GetUnstakedPeerstore()` method
- Added temporary `unstakedPeerstoreProvider` interface
- Renamed `NewRPCPeerstoreProvider()` and `NewPersistencePeerstoreProvider()` to `Create()` (per package)

## [0.0.0.53] - 2023-06-01

- Moved nonce field from RainTreeMessage to PocketEnvelope protobuf types
Expand Down
2 changes: 1 addition & 1 deletion p2p/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (m *p2pModule) bootstrap() error {
continue
}

pstoreProvider := rpcABP.NewRPCPeerstoreProvider(
pstoreProvider := rpcABP.Create(
rpcABP.WithP2PConfig(
m.GetBus().GetRuntimeMgr().GetConfig().P2P,
),
Expand Down
21 changes: 15 additions & 6 deletions p2p/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,19 @@ package p2p
import (
"errors"
"fmt"

"github.com/libp2p/go-libp2p"
libp2pHost "github.com/libp2p/go-libp2p/core/host"
"github.com/multiformats/go-multiaddr"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"

"github.com/pokt-network/pocket/logger"
"github.com/pokt-network/pocket/p2p/config"
"github.com/pokt-network/pocket/p2p/providers"
"github.com/pokt-network/pocket/p2p/providers/current_height_provider"
"github.com/pokt-network/pocket/p2p/providers/peerstore_provider"
persABP "github.com/pokt-network/pocket/p2p/providers/peerstore_provider/persistence"
persPSP "github.com/pokt-network/pocket/p2p/providers/peerstore_provider/persistence"
"github.com/pokt-network/pocket/p2p/raintree"
typesP2P "github.com/pokt-network/pocket/p2p/types"
"github.com/pokt-network/pocket/p2p/utils"
Expand All @@ -24,8 +28,6 @@ import (
"github.com/pokt-network/pocket/shared/modules"
"github.com/pokt-network/pocket/shared/modules/base_modules"
"github.com/pokt-network/pocket/telemetry"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
)

var _ modules.P2PModule = &p2pModule{}
Expand Down Expand Up @@ -231,14 +233,21 @@ func (m *p2pModule) setupDependencies() error {
// bus, if one is registered, otherwise returns a new `persistencePeerstoreProvider`.
func (m *p2pModule) setupPeerstoreProvider() error {
m.logger.Debug().Msg("setupPeerstoreProvider")

// TECHDEBT(#810): simplify once submodules are more convenient to retrieve.
pstoreProviderModule, err := m.GetBus().GetModulesRegistry().GetModule(peerstore_provider.ModuleName)
if err != nil {
m.logger.Debug().Msg("creating new persistence peerstore...")
pstoreProviderModule = persABP.NewPersistencePeerstoreProvider(m.GetBus())
} else if pstoreProviderModule != nil {
m.logger.Debug().Msg("loaded persistence peerstore...")
pstoreProvider, err := persPSP.Create(m.GetBus())
if err != nil {
return err
}

m.pstoreProvider = pstoreProvider
return nil
}

m.logger.Debug().Msg("loaded persistence peerstore...")
pstoreProvider, ok := pstoreProviderModule.(providers.PeerstoreProvider)
if !ok {
return fmt.Errorf("unknown peerstore provider type: %T", pstoreProviderModule)
Expand Down
10 changes: 7 additions & 3 deletions p2p/providers/peerstore_provider/peerstore_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package peerstore_provider
import (
"github.com/pokt-network/pocket/logger"
typesP2P "github.com/pokt-network/pocket/p2p/types"
"github.com/pokt-network/pocket/runtime/configs"
coreTypes "github.com/pokt-network/pocket/shared/core/types"
cryptoPocket "github.com/pokt-network/pocket/shared/crypto"
"github.com/pokt-network/pocket/shared/modules"
Expand All @@ -16,10 +15,15 @@ const ModuleName = "peerstore_provider"

// PeerstoreProvider is an interface that provides Peerstore accessors
type PeerstoreProvider interface {
modules.Module
modules.IntegratableModule

// GetStakedPeerstoreAtHeight returns a peerstore containing all staked peers
// at a given height. These peers communicate via the p2p module's staked actor
// router.
GetStakedPeerstoreAtHeight(height uint64) (typesP2P.Peerstore, error)
GetP2PConfig() *configs.P2PConfig
// GetUnstakedPeerstore returns a peerstore containing all peers which
// communicate via the p2p module's unstaked actor router.
GetUnstakedPeerstore() (typesP2P.Peerstore, error)
}

func ActorsToPeerstore(abp PeerstoreProvider, actors []*coreTypes.Actor) (pstore typesP2P.Peerstore, errs error) {
Expand Down
40 changes: 22 additions & 18 deletions p2p/providers/peerstore_provider/persistence/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,28 @@ package persistence
import (
"github.com/pokt-network/pocket/p2p/providers/peerstore_provider"
typesP2P "github.com/pokt-network/pocket/p2p/types"
"github.com/pokt-network/pocket/runtime/configs"
"github.com/pokt-network/pocket/shared/modules"
"github.com/pokt-network/pocket/shared/modules/base_modules"
)

var _ peerstore_provider.PeerstoreProvider = &persistencePeerstoreProvider{}
var (
_ peerstore_provider.PeerstoreProvider = &persistencePeerstoreProvider{}
_ persistencePStoreProviderFactory = &persistencePeerstoreProvider{}
)

type persistencePStoreProviderOption func(*persistencePeerstoreProvider)
type persistencePStoreProviderFactory = modules.FactoryWithOptions[peerstore_provider.PeerstoreProvider, persistencePStoreProviderOption]

// TECHDEBT(#810): refactor to implement `Submodule` interface.
type persistencePeerstoreProvider struct {
base_modules.IntegratableModule
base_modules.InterruptableModule
}

func NewPersistencePeerstoreProvider(bus modules.Bus, options ...func(*persistencePeerstoreProvider)) *persistencePeerstoreProvider {
func Create(bus modules.Bus, options ...persistencePStoreProviderOption) (peerstore_provider.PeerstoreProvider, error) {
return new(persistencePeerstoreProvider).Create(bus, options...)
}

func (*persistencePeerstoreProvider) Create(bus modules.Bus, options ...persistencePStoreProviderOption) (peerstore_provider.PeerstoreProvider, error) {
pabp := &persistencePeerstoreProvider{
IntegratableModule: *base_modules.NewIntegratableModule(bus),
}
Expand All @@ -24,35 +33,30 @@ func NewPersistencePeerstoreProvider(bus modules.Bus, options ...func(*persisten
o(pabp)
}

return pabp
}

func Create(bus modules.Bus, options ...modules.ModuleOption) (modules.Module, error) {
return new(persistencePeerstoreProvider).Create(bus, options...)
}

func (*persistencePeerstoreProvider) Create(bus modules.Bus, options ...modules.ModuleOption) (modules.Module, error) {
return NewPersistencePeerstoreProvider(bus), nil
return pabp, nil
}

func (*persistencePeerstoreProvider) GetModuleName() string {
return peerstore_provider.ModuleName
}

func (pabp *persistencePeerstoreProvider) GetStakedPeerstoreAtHeight(height uint64) (typesP2P.Peerstore, error) {
readCtx, err := pabp.GetBus().GetPersistenceModule().NewReadContext(int64(height))
// GetStakedPeerstoreAtHeight implements the respective `PeerstoreProvider` interface method.
func (persistencePSP *persistencePeerstoreProvider) GetStakedPeerstoreAtHeight(height uint64) (typesP2P.Peerstore, error) {
readCtx, err := persistencePSP.GetBus().GetPersistenceModule().NewReadContext(int64(height))
if err != nil {
return nil, err
}
defer readCtx.Release()

// TECHDEBT(#818): consider all staked actors, not just validators.
validators, err := readCtx.GetAllValidators(int64(height))
if err != nil {
return nil, err
}
return peerstore_provider.ActorsToPeerstore(pabp, validators)
return peerstore_provider.ActorsToPeerstore(persistencePSP, validators)
}

func (pabp *persistencePeerstoreProvider) GetP2PConfig() *configs.P2PConfig {
return pabp.GetBus().GetRuntimeMgr().GetConfig().P2P
// GetStakedPeerstoreAtHeight implements the respective `PeerstoreProvider` interface method.
func (persistencePSP *persistencePeerstoreProvider) GetUnstakedPeerstore() (typesP2P.Peerstore, error) {
return peerstore_provider.GetUnstakedPeerstore(persistencePSP.GetBus())
}
30 changes: 13 additions & 17 deletions p2p/providers/peerstore_provider/rpc/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ func init() {
rpcHost = runtime.GetEnv("RPC_HOST", defaults.DefaultRPCHost)
}

// TECHDEBT(#810): refactor to implement `Submodule` interface.
type rpcPeerstoreProvider struct {
// TECHDEBT(#810): simplify once submodules are more convenient to retrieve.
base_modules.IntegratableModule
base_modules.InterruptableModule

Expand All @@ -37,7 +39,7 @@ type rpcPeerstoreProvider struct {
rpcClient *rpc.ClientWithResponses
}

func NewRPCPeerstoreProvider(options ...modules.ModuleOption) *rpcPeerstoreProvider {
func Create(options ...modules.ModuleOption) *rpcPeerstoreProvider {
rabp := &rpcPeerstoreProvider{
rpcURL: fmt.Sprintf("http://%s:%s", rpcHost, defaults.DefaultRPCPort), // TODO: Make port configurable
}
Expand All @@ -51,27 +53,24 @@ func NewRPCPeerstoreProvider(options ...modules.ModuleOption) *rpcPeerstoreProvi
return rabp
}

func Create(bus modules.Bus, options ...modules.ModuleOption) (modules.Module, error) {
return new(rpcPeerstoreProvider).Create(bus, options...)
}

// TECHDEBT(#810): refactor to implement `Submodule` interface.
func (*rpcPeerstoreProvider) Create(bus modules.Bus, options ...modules.ModuleOption) (modules.Module, error) {
return NewRPCPeerstoreProvider(options...), nil
return Create(options...), nil
}

func (*rpcPeerstoreProvider) GetModuleName() string {
return peerstore_provider.ModuleName
}

func (rabp *rpcPeerstoreProvider) GetStakedPeerstoreAtHeight(height uint64) (typesP2P.Peerstore, error) {
func (rpcPSP *rpcPeerstoreProvider) GetStakedPeerstoreAtHeight(height uint64) (typesP2P.Peerstore, error) {
ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
defer cancel()

var (
h int64 = int64(height)
actorType rpc.ActorTypesEnum = "validator"
)
response, err := rabp.rpcClient.GetV1P2pStakedActorsAddressBookWithResponse(ctx, &rpc.GetV1P2pStakedActorsAddressBookParams{Height: &h, ActorType: &actorType})
response, err := rpcPSP.rpcClient.GetV1P2pStakedActorsAddressBookWithResponse(ctx, &rpc.GetV1P2pStakedActorsAddressBookParams{Height: &h, ActorType: &actorType})
if err != nil {
return nil, err
}
Expand All @@ -91,22 +90,19 @@ func (rabp *rpcPeerstoreProvider) GetStakedPeerstoreAtHeight(height uint64) (typ
})
}

return peerstore_provider.ActorsToPeerstore(rabp, coreActors)
return peerstore_provider.ActorsToPeerstore(rpcPSP, coreActors)
}

func (rabp *rpcPeerstoreProvider) GetP2PConfig() *configs.P2PConfig {
if rabp.p2pCfg == nil {
return rabp.GetBus().GetRuntimeMgr().GetConfig().P2P
}
return rabp.p2pCfg
func (rpcPSP *rpcPeerstoreProvider) GetUnstakedPeerstore() (typesP2P.Peerstore, error) {
return peerstore_provider.GetUnstakedPeerstore(rpcPSP.GetBus())
}

func (rabp *rpcPeerstoreProvider) initRPCClient() {
rpcClient, err := rpc.NewClientWithResponses(rabp.rpcURL)
func (rpcPSP *rpcPeerstoreProvider) initRPCClient() {
rpcClient, err := rpc.NewClientWithResponses(rpcPSP.rpcURL)
if err != nil {
log.Fatalf("could not create RPC client: %v", err)
}
rabp.rpcClient = rpcClient
rpcPSP.rpcClient = rpcClient
}

// options
Expand Down
34 changes: 34 additions & 0 deletions p2p/providers/peerstore_provider/unstaked.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package peerstore_provider

import (
"fmt"

typesP2P "github.com/pokt-network/pocket/p2p/types"
"github.com/pokt-network/pocket/shared/modules"
)

// unstakedPeerstoreProvider is an interface which the p2p module supports in
// order to allow access to the unstaked-actor-router's peerstore.
//
// NB: this peerstore includes all actors which participate in P2P (e.g. full
// and light clients but also validators, servicers, etc.).
//
// TECHDEBT(#811): will become unnecessary after `modules.P2PModule#GetUnstakedPeerstore` is added.`
// CONSIDERATION: split `PeerstoreProvider` into `StakedPeerstoreProvider` and `UnstakedPeerstoreProvider`.
// (see: https://github.com/pokt-network/pocket/pull/804#issuecomment-1576531916)
type unstakedPeerstoreProvider interface {
GetUnstakedPeerstore() (typesP2P.Peerstore, error)
}

func GetUnstakedPeerstore(bus modules.Bus) (typesP2P.Peerstore, error) {
p2pModule := bus.GetP2PModule()
if p2pModule == nil {
return nil, fmt.Errorf("p2p module is not registered to bus and is required")
}

unstakedPSP, ok := p2pModule.(unstakedPeerstoreProvider)
if !ok {
return nil, fmt.Errorf("p2p module does not implement unstakedPeerstoreProvider")
}
return unstakedPSP.GetUnstakedPeerstore()
}
Loading

0 comments on commit 9cb0ee9

Please sign in to comment.