Skip to content

Commit

Permalink
Merge pull request #5439 from oasisprotocol/kostko/stable/23.0.x/back…
Browse files Browse the repository at this point in the history
…port-multi-2

[BACKPORT/23.0.x] Multiple backports
  • Loading branch information
kostko authored Nov 13, 2023
2 parents 55372e9 + afcf6c9 commit 622d0bd
Show file tree
Hide file tree
Showing 18 changed files with 131 additions and 125 deletions.
Empty file added .changelog/5430.trivial.md
Empty file.
4 changes: 4 additions & 0 deletions .changelog/5436.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
go/consensus/roothash: Filter executor commitments by runtime ID

Compute executor committee workers no longer have to verify the signatures
of observed commitments simply to identify them as invalid.
1 change: 1 addition & 0 deletions .changelog/5438.bugfix.2.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
go/worker/compute: Bound batch execution time
5 changes: 5 additions & 0 deletions .changelog/5438.bugfix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
go/runtime/host/sandbox: Release lock before calling into runtime

Similar to how this is handled in the multi runtime host, we need to
release the lock before calling into the runtime as otherwise this could
lead to a deadlock in certain situations.
9 changes: 9 additions & 0 deletions go/consensus/cometbft/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
cmtrpctypes "github.com/cometbft/cometbft/rpc/core/types"
cmttypes "github.com/cometbft/cometbft/types"

"github.com/oasisprotocol/oasis-core/go/common"
"github.com/oasisprotocol/oasis-core/go/common/cbor"
"github.com/oasisprotocol/oasis-core/go/common/crypto/hash"
"github.com/oasisprotocol/oasis-core/go/common/crypto/signature"
Expand All @@ -23,6 +24,7 @@ import (
"github.com/oasisprotocol/oasis-core/go/consensus/api/events"
"github.com/oasisprotocol/oasis-core/go/consensus/api/transaction"
"github.com/oasisprotocol/oasis-core/go/consensus/cometbft/crypto"
"github.com/oasisprotocol/oasis-core/go/roothash/api/commitment"
mkvsNode "github.com/oasisprotocol/oasis-core/go/storage/mkvs/node"
)

Expand Down Expand Up @@ -380,3 +382,10 @@ var MessageStateSyncCompleted = messageKind(0)
func CometBFTChainID(chainContext string) string {
return chainContext[:cmttypes.MaxChainIDLen]
}

// ExecutorCommitmentNotifier is an executor commitment notifier interface.
type ExecutorCommitmentNotifier interface {
// DeliverExecutorCommitment delivers an executor commitment observed
// in the consensus layer P2P network.
DeliverExecutorCommitment(runtimeID common.Namespace, ec *commitment.ExecutorCommitment)
}
8 changes: 3 additions & 5 deletions go/consensus/cometbft/apps/roothash/roothash.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (

beacon "github.com/oasisprotocol/oasis-core/go/beacon/api"
"github.com/oasisprotocol/oasis-core/go/common/cbor"
"github.com/oasisprotocol/oasis-core/go/common/pubsub"
"github.com/oasisprotocol/oasis-core/go/consensus/api/transaction"
tmapi "github.com/oasisprotocol/oasis-core/go/consensus/cometbft/api"
governanceApi "github.com/oasisprotocol/oasis-core/go/consensus/cometbft/apps/governance/api"
Expand All @@ -32,8 +31,7 @@ var _ tmapi.Application = (*rootHashApplication)(nil)
type rootHashApplication struct {
state tmapi.ApplicationState
md tmapi.MessageDispatcher

ecNotifier *pubsub.Broker
ecn tmapi.ExecutorCommitmentNotifier
}

func (app *rootHashApplication) Name() string {
Expand Down Expand Up @@ -402,8 +400,8 @@ func (app *rootHashApplication) EndBlock(ctx *tmapi.Context) (types.ResponseEndB
}

// New constructs a new roothash application instance.
func New(ecNotifier *pubsub.Broker) tmapi.Application {
func New(ecn tmapi.ExecutorCommitmentNotifier) tmapi.Application {
return &rootHashApplication{
ecNotifier: ecNotifier,
ecn: ecn,
}
}
10 changes: 4 additions & 6 deletions go/consensus/cometbft/apps/roothash/transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,10 @@ func (app *rootHashApplication) executorCommit(
cc *roothash.ExecutorCommit,
) (err error) {
if ctx.IsCheckOnly() {
// In case an executor commit notifier has been set up, push all commits into channel.
if app.ecNotifier != nil {
for _, ec := range cc.Commits {
ec := ec
app.ecNotifier.Broadcast(&ec)
}
// Notify subscribers about observed commitments.
for _, ec := range cc.Commits {
ec := ec
app.ecn.DeliverExecutorCommitment(cc.ID, &ec)
}
return nil
}
Expand Down
50 changes: 27 additions & 23 deletions go/consensus/cometbft/roothash/roothash.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type runtimeBrokers struct {

blockNotifier *pubsub.Broker
eventNotifier *pubsub.Broker
ecNotifier *pubsub.Broker

lastBlockHeight int64
lastBlock *block.Block
Expand Down Expand Up @@ -79,8 +80,6 @@ type serviceClient struct {
trackedRuntime map[common.Namespace]*trackedRuntime

pruneHandler *pruneHandler

ecNotifier *pubsub.Broker
}

// Implements api.Backend.
Expand Down Expand Up @@ -240,8 +239,9 @@ func (sc *serviceClient) WatchEvents(_ context.Context, id common.Namespace) (<-
}

// Implements api.Backend.
func (sc *serviceClient) WatchExecutorCommitments(_ context.Context) (<-chan *commitment.ExecutorCommitment, pubsub.ClosableSubscription, error) {
sub := sc.ecNotifier.Subscribe()
func (sc *serviceClient) WatchExecutorCommitments(_ context.Context, id common.Namespace) (<-chan *commitment.ExecutorCommitment, pubsub.ClosableSubscription, error) {
notifiers := sc.getRuntimeNotifiers(id)
sub := notifiers.ecNotifier.Subscribe()
ch := make(chan *commitment.ExecutorCommitment)
sub.Unwrap(ch)

Expand Down Expand Up @@ -361,6 +361,7 @@ func (sc *serviceClient) getRuntimeNotifiers(id common.Namespace) *runtimeBroker
notifiers = &runtimeBrokers{
blockNotifier: pubsub.NewBroker(false),
eventNotifier: pubsub.NewBroker(false),
ecNotifier: pubsub.NewBroker(false),
}
sc.runtimeNotifiers[id] = notifiers
}
Expand Down Expand Up @@ -596,6 +597,12 @@ func (sc *serviceClient) DeliverEvent(ctx context.Context, height int64, tx cmtt
return nil
}

// Implements api.ExecutorCommitmentNotifier.
func (sc *serviceClient) DeliverExecutorCommitment(runtimeID common.Namespace, ec *commitment.ExecutorCommitment) {
notifiers := sc.getRuntimeNotifiers(runtimeID)
notifiers.ecNotifier.Broadcast(ec)
}

func (sc *serviceClient) processFinalizedEvent(
ctx context.Context,
height int64,
Expand Down Expand Up @@ -862,36 +869,33 @@ func New(
ctx context.Context,
backend tmapi.Backend,
) (ServiceClient, error) {
// Create the general executor commitment notifier.
ecNotifier := pubsub.NewBroker(false)
sc := serviceClient{
ctx: ctx,
logger: logging.GetLogger("cometbft/roothash"),
backend: backend,
allBlockNotifier: pubsub.NewBroker(false),
runtimeNotifiers: make(map[common.Namespace]*runtimeBrokers),
genesisBlocks: make(map[common.Namespace]*block.Block),
queryCh: make(chan cmtpubsub.Query, runtimeRegistry.MaxRuntimeCount),
cmdCh: make(chan interface{}, runtimeRegistry.MaxRuntimeCount),
trackedRuntime: make(map[common.Namespace]*trackedRuntime),
}

// Initialize and register the CometBFT service component.
a := app.New(ecNotifier)
a := app.New(&sc)
if err := backend.RegisterApplication(a); err != nil {
return nil, err
}
sc.querier = a.QueryFactory().(*app.QueryFactory)

// Register a consensus state prune handler to make sure that we don't prune blocks that haven't
// yet been indexed by the roothash backend.
ph := &pruneHandler{
sc.pruneHandler = &pruneHandler{
logger: logging.GetLogger("cometbft/roothash/prunehandler"),
}
backend.Pruner().RegisterHandler(ph)
backend.Pruner().RegisterHandler(sc.pruneHandler)

return &serviceClient{
ctx: ctx,
logger: logging.GetLogger("cometbft/roothash"),
backend: backend,
querier: a.QueryFactory().(*app.QueryFactory),
allBlockNotifier: pubsub.NewBroker(false),
runtimeNotifiers: make(map[common.Namespace]*runtimeBrokers),
genesisBlocks: make(map[common.Namespace]*block.Block),
queryCh: make(chan cmtpubsub.Query, runtimeRegistry.MaxRuntimeCount),
cmdCh: make(chan interface{}, runtimeRegistry.MaxRuntimeCount),
trackedRuntime: make(map[common.Namespace]*trackedRuntime),
pruneHandler: ph,
ecNotifier: ecNotifier,
}, nil
return &sc, nil
}

func init() {
Expand Down
6 changes: 0 additions & 6 deletions go/oasis-node/cmd/common/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package grpc

import (
"crypto/tls"
"errors"
"fmt"
"os"
"strings"
Expand Down Expand Up @@ -64,11 +63,6 @@ func NewServerTCP(cert *tls.Certificate, installWrapper bool) (*cmnGrpc.Server,
// This internally takes a snapshot of the current global tracer, so
// make sure you initialize the global tracer before calling this.
func NewServerLocal(installWrapper bool) (*cmnGrpc.Server, error) {
dataDir := common.DataDir()
if dataDir == "" {
return nil, errors.New("data directory must be set")
}

config := &cmnGrpc.ServerConfig{
Name: "internal",
Path: common.InternalSocketPath(),
Expand Down
2 changes: 1 addition & 1 deletion go/roothash/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ type Backend interface {
//
// Note that these commitments may not have been processed by consensus, commitments may be
// received in any order and duplicates are possible.
WatchExecutorCommitments(ctx context.Context) (<-chan *commitment.ExecutorCommitment, pubsub.ClosableSubscription, error)
WatchExecutorCommitments(ctx context.Context, runtimeID common.Namespace) (<-chan *commitment.ExecutorCommitment, pubsub.ClosableSubscription, error)

// TrackRuntime adds a runtime the history of which should be tracked.
TrackRuntime(ctx context.Context, history BlockHistory) error
Expand Down
9 changes: 5 additions & 4 deletions go/roothash/api/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,12 +371,13 @@ func handlerWatchEvents(srv interface{}, stream grpc.ServerStream) error {
}

func handlerWatchExecutorCommitments(srv interface{}, stream grpc.ServerStream) error {
if err := stream.RecvMsg(nil); err != nil {
var runtimeID common.Namespace
if err := stream.RecvMsg(&runtimeID); err != nil {
return err
}

ctx := stream.Context()
ch, sub, err := srv.(Backend).WatchExecutorCommitments(ctx)
ch, sub, err := srv.(Backend).WatchExecutorCommitments(ctx, runtimeID)
if err != nil {
return err
}
Expand Down Expand Up @@ -556,14 +557,14 @@ func (c *roothashClient) WatchEvents(ctx context.Context, runtimeID common.Names
return ch, sub, nil
}

func (c *roothashClient) WatchExecutorCommitments(ctx context.Context) (<-chan *commitment.ExecutorCommitment, pubsub.ClosableSubscription, error) {
func (c *roothashClient) WatchExecutorCommitments(ctx context.Context, runtimeID common.Namespace) (<-chan *commitment.ExecutorCommitment, pubsub.ClosableSubscription, error) {
ctx, sub := pubsub.NewContextSubscription(ctx)

stream, err := c.conn.NewStream(ctx, &serviceDesc.Streams[1], methodWatchExecutorCommitments.FullName())
if err != nil {
return nil, nil, err
}
if err = stream.SendMsg(nil); err != nil {
if err = stream.SendMsg(runtimeID); err != nil {
return nil, nil, err
}
if err = stream.CloseSend(); err != nil {
Expand Down
17 changes: 4 additions & 13 deletions go/runtime/host/multi/multi.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,15 @@ func (ah *aggregatedHost) stopPassthrough() {
type Aggregate struct {
l sync.RWMutex

id common.Namespace
logger *logging.Logger
id common.Namespace

hosts map[version.Version]*aggregatedHost
active *aggregatedHost
next *aggregatedHost

notifier *pubsub.Broker

logger *logging.Logger
}

// ID implements host.Runtime.
Expand Down Expand Up @@ -197,7 +198,6 @@ func (agg *Aggregate) Call(ctx context.Context, body *protocol.Body) (*protocol.
_, err = nextHost.Call(ctx, body)
if err != nil {
agg.logger.Warn("failed to propagate runtime request to next version",
"id", agg.ID(),
"err", err,
)
}
Expand Down Expand Up @@ -293,7 +293,6 @@ func (agg *Aggregate) SetVersion(active version.Version, next *version.Version)
defer agg.l.Unlock()

agg.logger.Info("set version",
"id", agg.ID(),
"active", active,
"next", next,
)
Expand All @@ -312,7 +311,6 @@ func (agg *Aggregate) setActiveVersionLocked(version version.Version) error {
// Ensure that we know about the new version.
if next == nil {
agg.logger.Error("SetVersion: unknown version",
"id", agg.ID(),
"version", version,
)

Expand Down Expand Up @@ -345,7 +343,6 @@ func (agg *Aggregate) setActiveVersionLocked(version version.Version) error {
// the part that does all the work is async. Log something.
agg.logger.Error("SetVersion: failed to start sub-host",
"err", err,
"id", agg.ID(),
"version", version,
)
}
Expand All @@ -369,7 +366,6 @@ func (agg *Aggregate) stopActiveLocked() {
}

agg.logger.Debug("stopActiveLocked",
"id", agg.ID(),
"version", agg.active.version,
)

Expand All @@ -392,7 +388,6 @@ func (agg *Aggregate) stopActiveLocked() {
continue
}
agg.logger.Debug("stopActiveLocked: stopped old sub-host",
"id", agg.ID(),
"version", agg.active.version,
)
break
Expand All @@ -419,7 +414,6 @@ func (agg *Aggregate) setNextVersionLocked(maybeVersion *version.Version) error
// Ensure that we know about the next version.
if next == nil {
agg.logger.Warn("unsupported next version",
"id", agg.ID(),
"version", version,
)
// Active version must be unaffected.
Expand All @@ -440,7 +434,6 @@ func (agg *Aggregate) setNextVersionLocked(maybeVersion *version.Version) error

// Warn in case the next version is changed but the previous one was not activated yet.
agg.logger.Warn("overwriting next version without activation",
"id", agg.ID(),
"version", version,
"previous_version", agg.next.version,
)
Expand All @@ -453,7 +446,6 @@ func (agg *Aggregate) setNextVersionLocked(maybeVersion *version.Version) error
// the part that does all the work is async. Log something.
agg.logger.Error("failed to start next version sub-host",
"err", err,
"id", agg.ID(),
"version", version,
)
}
Expand All @@ -478,7 +470,6 @@ func (agg *Aggregate) stopNextLocked() {
}

agg.logger.Debug("stopNextLocked",
"id", agg.ID(),
"version", agg.next.version,
)

Expand Down Expand Up @@ -510,7 +501,7 @@ func New(

agg := &Aggregate{
id: id,
logger: logging.GetLogger("runtime/host/multi"),
logger: logging.GetLogger("runtime/host/multi").With("runtime_id", id),
hosts: make(map[version.Version]*aggregatedHost),
notifier: pubsub.NewBroker(false),
}
Expand Down
Loading

0 comments on commit 622d0bd

Please sign in to comment.