From ffeca6db4541681ca7de6bd67e390a235c396d51 Mon Sep 17 00:00:00 2001 From: ptrus Date: Tue, 7 Nov 2023 14:02:12 +0100 Subject: [PATCH 1/7] go/oais-node/grpc: remove unneeded datadir check --- .changelog/5430.trivial.md | 0 go/oasis-node/cmd/common/grpc/grpc.go | 6 ------ 2 files changed, 6 deletions(-) create mode 100644 .changelog/5430.trivial.md diff --git a/.changelog/5430.trivial.md b/.changelog/5430.trivial.md new file mode 100644 index 00000000000..e69de29bb2d diff --git a/go/oasis-node/cmd/common/grpc/grpc.go b/go/oasis-node/cmd/common/grpc/grpc.go index 84cf4d98dc9..5afe5516a19 100644 --- a/go/oasis-node/cmd/common/grpc/grpc.go +++ b/go/oasis-node/cmd/common/grpc/grpc.go @@ -3,7 +3,6 @@ package grpc import ( "crypto/tls" - "errors" "fmt" "os" "strings" @@ -64,11 +63,6 @@ func NewServerTCP(cert *tls.Certificate, installWrapper bool) (*cmnGrpc.Server, // This internally takes a snapshot of the current global tracer, so // make sure you initialize the global tracer before calling this. func NewServerLocal(installWrapper bool) (*cmnGrpc.Server, error) { - dataDir := common.DataDir() - if dataDir == "" { - return nil, errors.New("data directory must be set") - } - config := &cmnGrpc.ServerConfig{ Name: "internal", Path: common.InternalSocketPath(), From 972711e414ee0d6db0fae592f24de1f025645bf0 Mon Sep 17 00:00:00 2001 From: Peter Nose Date: Thu, 9 Nov 2023 10:03:36 +0100 Subject: [PATCH 2/7] go/runtime/host/multi: Add runtime ID to the logger --- go/runtime/host/multi/multi.go | 17 ++++------------- 1 file changed, 4 insertions(+), 13 deletions(-) diff --git a/go/runtime/host/multi/multi.go b/go/runtime/host/multi/multi.go index e194b3174e2..65402ee0f24 100644 --- a/go/runtime/host/multi/multi.go +++ b/go/runtime/host/multi/multi.go @@ -106,14 +106,15 @@ func (ah *aggregatedHost) stopPassthrough() { type Aggregate struct { l sync.RWMutex - id common.Namespace - logger *logging.Logger + id common.Namespace hosts map[version.Version]*aggregatedHost active *aggregatedHost next *aggregatedHost notifier *pubsub.Broker + + logger *logging.Logger } // ID implements host.Runtime. @@ -197,7 +198,6 @@ func (agg *Aggregate) Call(ctx context.Context, body *protocol.Body) (*protocol. _, err = nextHost.Call(ctx, body) if err != nil { agg.logger.Warn("failed to propagate runtime request to next version", - "id", agg.ID(), "err", err, ) } @@ -293,7 +293,6 @@ func (agg *Aggregate) SetVersion(active version.Version, next *version.Version) defer agg.l.Unlock() agg.logger.Info("set version", - "id", agg.ID(), "active", active, "next", next, ) @@ -312,7 +311,6 @@ func (agg *Aggregate) setActiveVersionLocked(version version.Version) error { // Ensure that we know about the new version. if next == nil { agg.logger.Error("SetVersion: unknown version", - "id", agg.ID(), "version", version, ) @@ -345,7 +343,6 @@ func (agg *Aggregate) setActiveVersionLocked(version version.Version) error { // the part that does all the work is async. Log something. agg.logger.Error("SetVersion: failed to start sub-host", "err", err, - "id", agg.ID(), "version", version, ) } @@ -369,7 +366,6 @@ func (agg *Aggregate) stopActiveLocked() { } agg.logger.Debug("stopActiveLocked", - "id", agg.ID(), "version", agg.active.version, ) @@ -392,7 +388,6 @@ func (agg *Aggregate) stopActiveLocked() { continue } agg.logger.Debug("stopActiveLocked: stopped old sub-host", - "id", agg.ID(), "version", agg.active.version, ) break @@ -419,7 +414,6 @@ func (agg *Aggregate) setNextVersionLocked(maybeVersion *version.Version) error // Ensure that we know about the next version. if next == nil { agg.logger.Warn("unsupported next version", - "id", agg.ID(), "version", version, ) // Active version must be unaffected. @@ -440,7 +434,6 @@ func (agg *Aggregate) setNextVersionLocked(maybeVersion *version.Version) error // Warn in case the next version is changed but the previous one was not activated yet. agg.logger.Warn("overwriting next version without activation", - "id", agg.ID(), "version", version, "previous_version", agg.next.version, ) @@ -453,7 +446,6 @@ func (agg *Aggregate) setNextVersionLocked(maybeVersion *version.Version) error // the part that does all the work is async. Log something. agg.logger.Error("failed to start next version sub-host", "err", err, - "id", agg.ID(), "version", version, ) } @@ -478,7 +470,6 @@ func (agg *Aggregate) stopNextLocked() { } agg.logger.Debug("stopNextLocked", - "id", agg.ID(), "version", agg.next.version, ) @@ -510,7 +501,7 @@ func New( agg := &Aggregate{ id: id, - logger: logging.GetLogger("runtime/host/multi"), + logger: logging.GetLogger("runtime/host/multi").With("runtime_id", id), hosts: make(map[version.Version]*aggregatedHost), notifier: pubsub.NewBroker(false), } From 13194b11651c2a7a400cbcfd8522a13bccf44afb Mon Sep 17 00:00:00 2001 From: Peter Nose Date: Thu, 9 Nov 2023 14:33:10 +0100 Subject: [PATCH 3/7] go/consensus/roothash: Filter executor commitments by runtime ID Compute executor committee workers no longer have to verify the signatures of observed commitments simply to identify them as invalid. --- .changelog/5436.feature.md | 4 ++ go/consensus/cometbft/api/api.go | 9 ++++ .../cometbft/apps/roothash/roothash.go | 8 ++- .../cometbft/apps/roothash/transactions.go | 10 ++-- go/consensus/cometbft/roothash/roothash.go | 50 ++++++++++--------- go/roothash/api/api.go | 2 +- go/roothash/api/grpc.go | 9 ++-- go/worker/compute/executor/committee/node.go | 2 +- 8 files changed, 54 insertions(+), 40 deletions(-) create mode 100644 .changelog/5436.feature.md diff --git a/.changelog/5436.feature.md b/.changelog/5436.feature.md new file mode 100644 index 00000000000..2cfcf5f5898 --- /dev/null +++ b/.changelog/5436.feature.md @@ -0,0 +1,4 @@ +go/consensus/roothash: Filter executor commitments by runtime ID + +Compute executor committee workers no longer have to verify the signatures +of observed commitments simply to identify them as invalid. diff --git a/go/consensus/cometbft/api/api.go b/go/consensus/cometbft/api/api.go index 2270f802365..3efd0a43d9f 100644 --- a/go/consensus/cometbft/api/api.go +++ b/go/consensus/cometbft/api/api.go @@ -14,6 +14,7 @@ import ( cmtrpctypes "github.com/cometbft/cometbft/rpc/core/types" cmttypes "github.com/cometbft/cometbft/types" + "github.com/oasisprotocol/oasis-core/go/common" "github.com/oasisprotocol/oasis-core/go/common/cbor" "github.com/oasisprotocol/oasis-core/go/common/crypto/hash" "github.com/oasisprotocol/oasis-core/go/common/crypto/signature" @@ -23,6 +24,7 @@ import ( "github.com/oasisprotocol/oasis-core/go/consensus/api/events" "github.com/oasisprotocol/oasis-core/go/consensus/api/transaction" "github.com/oasisprotocol/oasis-core/go/consensus/cometbft/crypto" + "github.com/oasisprotocol/oasis-core/go/roothash/api/commitment" mkvsNode "github.com/oasisprotocol/oasis-core/go/storage/mkvs/node" ) @@ -380,3 +382,10 @@ var MessageStateSyncCompleted = messageKind(0) func CometBFTChainID(chainContext string) string { return chainContext[:cmttypes.MaxChainIDLen] } + +// ExecutorCommitmentNotifier is an executor commitment notifier interface. +type ExecutorCommitmentNotifier interface { + // DeliverExecutorCommitment delivers an executor commitment observed + // in the consensus layer P2P network. + DeliverExecutorCommitment(runtimeID common.Namespace, ec *commitment.ExecutorCommitment) +} diff --git a/go/consensus/cometbft/apps/roothash/roothash.go b/go/consensus/cometbft/apps/roothash/roothash.go index 43c7d0d0b28..e0285c12c7c 100644 --- a/go/consensus/cometbft/apps/roothash/roothash.go +++ b/go/consensus/cometbft/apps/roothash/roothash.go @@ -8,7 +8,6 @@ import ( beacon "github.com/oasisprotocol/oasis-core/go/beacon/api" "github.com/oasisprotocol/oasis-core/go/common/cbor" - "github.com/oasisprotocol/oasis-core/go/common/pubsub" "github.com/oasisprotocol/oasis-core/go/consensus/api/transaction" tmapi "github.com/oasisprotocol/oasis-core/go/consensus/cometbft/api" governanceApi "github.com/oasisprotocol/oasis-core/go/consensus/cometbft/apps/governance/api" @@ -32,8 +31,7 @@ var _ tmapi.Application = (*rootHashApplication)(nil) type rootHashApplication struct { state tmapi.ApplicationState md tmapi.MessageDispatcher - - ecNotifier *pubsub.Broker + ecn tmapi.ExecutorCommitmentNotifier } func (app *rootHashApplication) Name() string { @@ -402,8 +400,8 @@ func (app *rootHashApplication) EndBlock(ctx *tmapi.Context) (types.ResponseEndB } // New constructs a new roothash application instance. -func New(ecNotifier *pubsub.Broker) tmapi.Application { +func New(ecn tmapi.ExecutorCommitmentNotifier) tmapi.Application { return &rootHashApplication{ - ecNotifier: ecNotifier, + ecn: ecn, } } diff --git a/go/consensus/cometbft/apps/roothash/transactions.go b/go/consensus/cometbft/apps/roothash/transactions.go index 684742d315d..dd196bbe08e 100644 --- a/go/consensus/cometbft/apps/roothash/transactions.go +++ b/go/consensus/cometbft/apps/roothash/transactions.go @@ -47,12 +47,10 @@ func (app *rootHashApplication) executorCommit( cc *roothash.ExecutorCommit, ) (err error) { if ctx.IsCheckOnly() { - // In case an executor commit notifier has been set up, push all commits into channel. - if app.ecNotifier != nil { - for _, ec := range cc.Commits { - ec := ec - app.ecNotifier.Broadcast(&ec) - } + // Notify subscribers about observed commitments. + for _, ec := range cc.Commits { + ec := ec + app.ecn.DeliverExecutorCommitment(cc.ID, &ec) } return nil } diff --git a/go/consensus/cometbft/roothash/roothash.go b/go/consensus/cometbft/roothash/roothash.go index 42d36f6944e..3085513d0ee 100644 --- a/go/consensus/cometbft/roothash/roothash.go +++ b/go/consensus/cometbft/roothash/roothash.go @@ -42,6 +42,7 @@ type runtimeBrokers struct { blockNotifier *pubsub.Broker eventNotifier *pubsub.Broker + ecNotifier *pubsub.Broker lastBlockHeight int64 lastBlock *block.Block @@ -79,8 +80,6 @@ type serviceClient struct { trackedRuntime map[common.Namespace]*trackedRuntime pruneHandler *pruneHandler - - ecNotifier *pubsub.Broker } // Implements api.Backend. @@ -240,8 +239,9 @@ func (sc *serviceClient) WatchEvents(_ context.Context, id common.Namespace) (<- } // Implements api.Backend. -func (sc *serviceClient) WatchExecutorCommitments(_ context.Context) (<-chan *commitment.ExecutorCommitment, pubsub.ClosableSubscription, error) { - sub := sc.ecNotifier.Subscribe() +func (sc *serviceClient) WatchExecutorCommitments(_ context.Context, id common.Namespace) (<-chan *commitment.ExecutorCommitment, pubsub.ClosableSubscription, error) { + notifiers := sc.getRuntimeNotifiers(id) + sub := notifiers.ecNotifier.Subscribe() ch := make(chan *commitment.ExecutorCommitment) sub.Unwrap(ch) @@ -361,6 +361,7 @@ func (sc *serviceClient) getRuntimeNotifiers(id common.Namespace) *runtimeBroker notifiers = &runtimeBrokers{ blockNotifier: pubsub.NewBroker(false), eventNotifier: pubsub.NewBroker(false), + ecNotifier: pubsub.NewBroker(false), } sc.runtimeNotifiers[id] = notifiers } @@ -596,6 +597,12 @@ func (sc *serviceClient) DeliverEvent(ctx context.Context, height int64, tx cmtt return nil } +// Implements api.ExecutorCommitmentNotifier. +func (sc *serviceClient) DeliverExecutorCommitment(runtimeID common.Namespace, ec *commitment.ExecutorCommitment) { + notifiers := sc.getRuntimeNotifiers(runtimeID) + notifiers.ecNotifier.Broadcast(ec) +} + func (sc *serviceClient) processFinalizedEvent( ctx context.Context, height int64, @@ -862,36 +869,33 @@ func New( ctx context.Context, backend tmapi.Backend, ) (ServiceClient, error) { - // Create the general executor commitment notifier. - ecNotifier := pubsub.NewBroker(false) + sc := serviceClient{ + ctx: ctx, + logger: logging.GetLogger("cometbft/roothash"), + backend: backend, + allBlockNotifier: pubsub.NewBroker(false), + runtimeNotifiers: make(map[common.Namespace]*runtimeBrokers), + genesisBlocks: make(map[common.Namespace]*block.Block), + queryCh: make(chan cmtpubsub.Query, runtimeRegistry.MaxRuntimeCount), + cmdCh: make(chan interface{}, runtimeRegistry.MaxRuntimeCount), + trackedRuntime: make(map[common.Namespace]*trackedRuntime), + } // Initialize and register the CometBFT service component. - a := app.New(ecNotifier) + a := app.New(&sc) if err := backend.RegisterApplication(a); err != nil { return nil, err } + sc.querier = a.QueryFactory().(*app.QueryFactory) // Register a consensus state prune handler to make sure that we don't prune blocks that haven't // yet been indexed by the roothash backend. - ph := &pruneHandler{ + sc.pruneHandler = &pruneHandler{ logger: logging.GetLogger("cometbft/roothash/prunehandler"), } - backend.Pruner().RegisterHandler(ph) + backend.Pruner().RegisterHandler(sc.pruneHandler) - return &serviceClient{ - ctx: ctx, - logger: logging.GetLogger("cometbft/roothash"), - backend: backend, - querier: a.QueryFactory().(*app.QueryFactory), - allBlockNotifier: pubsub.NewBroker(false), - runtimeNotifiers: make(map[common.Namespace]*runtimeBrokers), - genesisBlocks: make(map[common.Namespace]*block.Block), - queryCh: make(chan cmtpubsub.Query, runtimeRegistry.MaxRuntimeCount), - cmdCh: make(chan interface{}, runtimeRegistry.MaxRuntimeCount), - trackedRuntime: make(map[common.Namespace]*trackedRuntime), - pruneHandler: ph, - ecNotifier: ecNotifier, - }, nil + return &sc, nil } func init() { diff --git a/go/roothash/api/api.go b/go/roothash/api/api.go index ba7a2c3140e..07b2139232f 100644 --- a/go/roothash/api/api.go +++ b/go/roothash/api/api.go @@ -146,7 +146,7 @@ type Backend interface { // // Note that these commitments may not have been processed by consensus, commitments may be // received in any order and duplicates are possible. - WatchExecutorCommitments(ctx context.Context) (<-chan *commitment.ExecutorCommitment, pubsub.ClosableSubscription, error) + WatchExecutorCommitments(ctx context.Context, runtimeID common.Namespace) (<-chan *commitment.ExecutorCommitment, pubsub.ClosableSubscription, error) // TrackRuntime adds a runtime the history of which should be tracked. TrackRuntime(ctx context.Context, history BlockHistory) error diff --git a/go/roothash/api/grpc.go b/go/roothash/api/grpc.go index bd77c3628ed..96036795a8a 100644 --- a/go/roothash/api/grpc.go +++ b/go/roothash/api/grpc.go @@ -371,12 +371,13 @@ func handlerWatchEvents(srv interface{}, stream grpc.ServerStream) error { } func handlerWatchExecutorCommitments(srv interface{}, stream grpc.ServerStream) error { - if err := stream.RecvMsg(nil); err != nil { + var runtimeID common.Namespace + if err := stream.RecvMsg(&runtimeID); err != nil { return err } ctx := stream.Context() - ch, sub, err := srv.(Backend).WatchExecutorCommitments(ctx) + ch, sub, err := srv.(Backend).WatchExecutorCommitments(ctx, runtimeID) if err != nil { return err } @@ -556,14 +557,14 @@ func (c *roothashClient) WatchEvents(ctx context.Context, runtimeID common.Names return ch, sub, nil } -func (c *roothashClient) WatchExecutorCommitments(ctx context.Context) (<-chan *commitment.ExecutorCommitment, pubsub.ClosableSubscription, error) { +func (c *roothashClient) WatchExecutorCommitments(ctx context.Context, runtimeID common.Namespace) (<-chan *commitment.ExecutorCommitment, pubsub.ClosableSubscription, error) { ctx, sub := pubsub.NewContextSubscription(ctx) stream, err := c.conn.NewStream(ctx, &serviceDesc.Streams[1], methodWatchExecutorCommitments.FullName()) if err != nil { return nil, nil, err } - if err = stream.SendMsg(nil); err != nil { + if err = stream.SendMsg(runtimeID); err != nil { return nil, nil, err } if err = stream.CloseSend(); err != nil { diff --git a/go/worker/compute/executor/committee/node.go b/go/worker/compute/executor/committee/node.go index 12fccf9530a..87178b39744 100644 --- a/go/worker/compute/executor/committee/node.go +++ b/go/worker/compute/executor/committee/node.go @@ -1456,7 +1456,7 @@ func (n *Node) worker() { defer txSub.Close() // Subscribe to gossiped executor commitments. - n.ecCh, ecSub, err = n.commonNode.Consensus.RootHash().WatchExecutorCommitments(n.ctx) + n.ecCh, ecSub, err = n.commonNode.Consensus.RootHash().WatchExecutorCommitments(n.ctx, n.commonNode.Runtime.ID()) if err != nil { n.logger.Error("failed to subscribe to executor commitments", "err", err, From 228db89c93c587e6fa8e1b1215bd1f8330a6ab67 Mon Sep 17 00:00:00 2001 From: Jernej Kos Date: Mon, 13 Nov 2023 09:46:55 +0100 Subject: [PATCH 4/7] go/runtime/host/sandbox: Release lock before calling into runtime Similar to how this is handled in the multi runtime host, we need to release the lock before calling into the runtime as otherwise this could lead to a deadlock in certain situations. --- .changelog/5438.bugfix.md | 5 +++ go/runtime/host/sandbox/sandbox.go | 58 ++++++++++++++++-------------- 2 files changed, 37 insertions(+), 26 deletions(-) create mode 100644 .changelog/5438.bugfix.md diff --git a/.changelog/5438.bugfix.md b/.changelog/5438.bugfix.md new file mode 100644 index 00000000000..72a6b617761 --- /dev/null +++ b/.changelog/5438.bugfix.md @@ -0,0 +1,5 @@ +go/runtime/host/sandbox: Release lock before calling into runtime + +Similar to how this is handled in the multi runtime host, we need to +release the lock before calling into the runtime as otherwise this could +lead to a deadlock in certain situations. diff --git a/go/runtime/host/sandbox/sandbox.go b/go/runtime/host/sandbox/sandbox.go index 241a8455264..bfac2433e06 100644 --- a/go/runtime/host/sandbox/sandbox.go +++ b/go/runtime/host/sandbox/sandbox.go @@ -136,21 +136,13 @@ func (r *sandboxedRuntime) ID() common.Namespace { } // Implements host.Runtime. -func (r *sandboxedRuntime) GetInfo(ctx context.Context) (rsp *protocol.RuntimeInfoResponse, err error) { - callFn := func() error { - r.RLock() - defer r.RUnlock() - - if r.conn == nil { - return errRuntimeNotReady - } - rsp, err = r.conn.GetInfo() - return err +func (r *sandboxedRuntime) GetInfo(ctx context.Context) (*protocol.RuntimeInfoResponse, error) { + conn, err := r.getConnection(ctx) + if err != nil { + return nil, err } - // Retry call in case the runtime is not yet ready. - err = backoff.Retry(callFn, backoff.WithContext(cmnBackoff.NewExponentialBackOff(), ctx)) - return + return conn.GetInfo() } // Implements host.Runtime. @@ -165,25 +157,39 @@ func (r *sandboxedRuntime) GetCapabilityTEE() (*node.CapabilityTEE, error) { } // Implements host.Runtime. -func (r *sandboxedRuntime) Call(ctx context.Context, body *protocol.Body) (rsp *protocol.Body, err error) { - callFn := func() error { +func (r *sandboxedRuntime) Call(ctx context.Context, body *protocol.Body) (*protocol.Body, error) { + conn, err := r.getConnection(ctx) + if err != nil { + return nil, err + } + + // Take care to release lock before calling into the runtime as otherwise this could lead to a + // deadlock in case the runtime makes a call that acquires the cross node lock and at the same + // time SetVersion is being called to update the version with the cross node lock acquired. + + return conn.Call(ctx, body) +} + +func (r *sandboxedRuntime) getConnection(ctx context.Context) (protocol.Connection, error) { + var conn protocol.Connection + getConnFn := func() error { r.RLock() defer r.RUnlock() if r.conn == nil { return errRuntimeNotReady } - rsp, err = r.conn.Call(ctx, body) - if err != nil { - // All protocol-level errors are permanent. - return backoff.Permanent(err) - } + conn = r.conn + return nil } - // Retry call in case the runtime is not yet ready. - err = backoff.Retry(callFn, backoff.WithContext(cmnBackoff.NewExponentialBackOff(), ctx)) - return + err := backoff.Retry(getConnFn, backoff.WithContext(cmnBackoff.NewExponentialBackOff(), ctx)) + if err != nil { + return nil, err + } + + return conn, nil } // Implements host.Runtime. @@ -421,8 +427,8 @@ func (r *sandboxedRuntime) startProcess() (err error) { } ok = true - r.Lock() r.process = p + r.Lock() r.conn = pc r.capabilityTEE = ev.CapabilityTEE r.Unlock() @@ -464,9 +470,9 @@ func (r *sandboxedRuntime) handleAbortRequest(rq *abortRequest) error { // Remove the process so it will be respanwed (it would be respawned either way, but with an // additional "unexpected termination" message). - r.Lock() r.conn.Close() r.process = nil + r.Lock() r.conn = nil r.capabilityTEE = nil r.Unlock() @@ -580,9 +586,9 @@ func (r *sandboxedRuntime) manager() { "err", r.process.Error(), ) - r.Lock() r.conn.Close() r.process = nil + r.Lock() r.conn = nil r.capabilityTEE = nil r.Unlock() From 612096cfd79f27646e3132c02890473a30e98bbd Mon Sep 17 00:00:00 2001 From: Jernej Kos Date: Mon, 13 Nov 2023 10:03:19 +0100 Subject: [PATCH 5/7] go/worker/compute: Bound batch execution time --- .changelog/5438.bugfix.2.md | 1 + go/worker/compute/executor/committee/node.go | 33 ++++++++++++++----- go/worker/compute/executor/committee/state.go | 6 ++-- 3 files changed, 28 insertions(+), 12 deletions(-) create mode 100644 .changelog/5438.bugfix.2.md diff --git a/.changelog/5438.bugfix.2.md b/.changelog/5438.bugfix.2.md new file mode 100644 index 00000000000..97da869af33 --- /dev/null +++ b/.changelog/5438.bugfix.2.md @@ -0,0 +1 @@ +go/worker/compute: Bound batch execution time diff --git a/go/worker/compute/executor/committee/node.go b/go/worker/compute/executor/committee/node.go index 87178b39744..db6761e4940 100644 --- a/go/worker/compute/executor/committee/node.go +++ b/go/worker/compute/executor/committee/node.go @@ -45,6 +45,10 @@ var ( getInfoTimeout = 5 * time.Second ) +// executeBatchTimeoutFactor is the factor F in calculation of the batch execution timeout using +// the formula F * ProposerTimeout to ensure that a broken runtime doesn't block forever. +const executeBatchTimeoutFactor = 3 + // Node is a committee node. type Node struct { // nolint: maligned runtimeReady bool @@ -186,7 +190,7 @@ func (n *Node) transitionState(state NodeState) { } func (n *Node) transitionStateToProcessing(ctx context.Context, proposal *commitment.Proposal, rank uint64, batch transaction.RawBatch) { - ctx, cancel := context.WithCancel(ctx) + ctx, cancel := context.WithCancelCause(ctx) done := make(chan struct{}) n.transitionState(StateProcessingBatch{ @@ -220,7 +224,7 @@ func (n *Node) transitionStateToProcessingFailure( "max_batch_size", maxBatchSize, ) - cancel := func() {} + cancel := func(_ error) {} done := make(chan struct{}) close(done) @@ -415,7 +419,7 @@ func (n *Node) scheduleBatch(ctx context.Context, round uint64, force bool) { return } - ctx, cancel := context.WithCancel(ctx) + ctx, cancel := context.WithCancelCause(ctx) done := make(chan struct{}) n.transitionState(StateProcessingBatch{ @@ -686,15 +690,26 @@ func (n *Node) runtimeExecuteTxBatch( batchRuntimeProcessingTime.With(n.getMetricLabels()).Observe(time.Since(rtStartTime).Seconds()) }() - rsp, err := rt.Call(ctx, rq) + // Ensure batch execution is bounded. + proposerTimeout := state.Runtime.TxnScheduler.ProposerTimeout + callCtx, cancelCallFn := context.WithTimeoutCause( + ctx, + executeBatchTimeoutFactor*proposerTimeout, + errors.New("proposer timeout expired"), + ) + defer cancelCallFn() + + rsp, err := rt.Call(callCtx, rq) switch { case err == nil: case errors.Is(err, context.Canceled): // Context was canceled while the runtime was processing a request. - n.logger.Error("batch processing aborted by context, restarting runtime") + n.logger.Error("batch processing aborted by context, restarting runtime", + "cause", context.Cause(callCtx), + ) // Abort the runtime, so we can start processing the next batch. - abortCtx, cancel := context.WithTimeout(n.ctx, abortTimeout) + abortCtx, cancel := context.WithTimeout(ctx, abortTimeout) defer cancel() if err = rt.Abort(abortCtx, false); err != nil { @@ -778,7 +793,7 @@ func (n *Node) abortBatch(state *StateProcessingBatch) { n.logger.Warn("aborting processing batch") // Stop processing. - state.Cancel() + state.Cancel(errors.New("batch aborted")) // Discard the result if there was any. select { @@ -1500,8 +1515,8 @@ func (n *Node) worker() { var wg sync.WaitGroup defer wg.Wait() - ctx, cancel := context.WithCancel(n.ctx) - defer cancel() + ctx, cancel := context.WithCancelCause(n.ctx) + defer cancel(errors.New("round finished")) wg.Add(1) go func() { diff --git a/go/worker/compute/executor/committee/state.go b/go/worker/compute/executor/committee/state.go index fd731ad7c1a..85bbbd50b16 100644 --- a/go/worker/compute/executor/committee/state.go +++ b/go/worker/compute/executor/committee/state.go @@ -141,7 +141,7 @@ type StateProcessingBatch struct { // Timing for this batch. batchStartTime time.Time // Function for cancelling batch processing. - cancelFn context.CancelFunc + cancelFn context.CancelCauseFunc // Channel which will provide the result. done chan struct{} } @@ -157,8 +157,8 @@ func (s StateProcessingBatch) String() string { } // Cancel invokes the cancellation function and waits for the processing to actually stop. -func (s *StateProcessingBatch) Cancel() { - s.cancelFn() +func (s *StateProcessingBatch) Cancel(cause error) { + s.cancelFn(cause) <-s.done } From 0d9765cceabec8e5f14350f87140717f8c64df65 Mon Sep 17 00:00:00 2001 From: Jernej Kos Date: Mon, 13 Nov 2023 10:16:38 +0100 Subject: [PATCH 6/7] runtime: Add compile-time feature for debug logging --- runtime/Cargo.toml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml index e154b7652b3..f7a1ac9ac94 100644 --- a/runtime/Cargo.toml +++ b/runtime/Cargo.toml @@ -70,6 +70,11 @@ jsonrpc = { version = "0.13.0", features = ["simple_uds"] } tempfile = "3.4.0" tendermint-testgen = "0.30.0" +[features] +default = [] +# Enables debug-level logging in release builds. +debug-logging = ["slog/max_level_debug", "slog/release_max_level_debug"] + [[bin]] name = "fuzz-mkvs-proof" path = "fuzz/mkvs_proof.rs" From afcf6c956f54978372a6e151afb3f7dd4bef1ae5 Mon Sep 17 00:00:00 2001 From: Jernej Kos Date: Mon, 13 Nov 2023 10:53:01 +0100 Subject: [PATCH 7/7] runtime: Remove incorrectly implemented abort requests --- runtime/src/dispatcher.rs | 26 ++------------------------ runtime/src/protocol.rs | 5 +---- 2 files changed, 3 insertions(+), 28 deletions(-) diff --git a/runtime/src/dispatcher.rs b/runtime/src/dispatcher.rs index ff14d0e6858..4c172e727b9 100644 --- a/runtime/src/dispatcher.rs +++ b/runtime/src/dispatcher.rs @@ -1,10 +1,7 @@ //! Runtime call dispatcher. use std::{ convert::TryInto, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, Condvar, Mutex, - }, + sync::{Arc, Condvar, Mutex}, thread, }; @@ -150,7 +147,6 @@ struct State { #[derive(Debug)] enum Command { Request(u64, Body), - Abort(mpsc::Sender<()>), } /// Runtime call dispatcher. @@ -158,7 +154,6 @@ pub struct Dispatcher { logger: Logger, queue_tx: mpsc::Sender, identity: Arc, - abort_batch: Arc, state: Mutex>, state_cond: Condvar, @@ -179,7 +174,6 @@ impl Dispatcher { logger: get_logger("runtime/dispatcher"), queue_tx: tx, identity, - abort_batch: Arc::new(AtomicBool::new(false)), state: Mutex::new(None), state_cond: Condvar::new(), tokio_runtime, @@ -212,17 +206,6 @@ impl Dispatcher { Ok(()) } - /// Signals to dispatcher that it should abort and waits for the abort to - /// complete. - pub fn abort_and_wait(&self) -> AnyResult<()> { - self.abort_batch.store(true, Ordering::SeqCst); - // Queue an abort command and wait for it to be processed. - let (tx, mut rx) = mpsc::channel(1); - self.queue_tx.blocking_send(Command::Abort(tx))?; - rx.blocking_recv(); - Ok(()) - } - fn run(self: &Arc, initializer: Box, mut rx: mpsc::Receiver) { // Wait for the state to be available. let ProtocolState { @@ -249,10 +232,9 @@ impl Dispatcher { consensus_verifier: &consensus_verifier, }; let post_init_state = initializer.init(pre_init_state); - let mut txn_dispatcher = post_init_state + let txn_dispatcher = post_init_state .txn_dispatcher .unwrap_or_else(|| Box::::default()); - txn_dispatcher.set_abort_batch_flag(self.abort_batch.clone()); let state = State { protocol: protocol.clone(), @@ -294,10 +276,6 @@ impl Dispatcher { protocol.send_response(id, response).unwrap(); }); } - Command::Abort(tx) => { - // Request to abort processing. - tx.send(()).await.unwrap(); - } } } }); diff --git a/runtime/src/protocol.rs b/runtime/src/protocol.rs index ad8fa451a96..140ac03a0c2 100644 --- a/runtime/src/protocol.rs +++ b/runtime/src/protocol.rs @@ -367,10 +367,7 @@ impl Protocol { } Body::RuntimeAbortRequest {} => { info!(self.logger, "Received worker abort request"); - self.ensure_initialized()?; - self.dispatcher.abort_and_wait()?; - info!(self.logger, "Handled worker abort request"); - Ok(Some(Body::RuntimeAbortResponse {})) + Err(ProtocolError::MethodNotSupported.into()) } // Attestation-related requests.