Skip to content

Commit

Permalink
go/scheduler: Change scheduler functions to return ok
Browse files Browse the repository at this point in the history
  • Loading branch information
peternose committed Oct 5, 2023
1 parent 88ff0dc commit 48623ef
Show file tree
Hide file tree
Showing 9 changed files with 44 additions and 46 deletions.
16 changes: 8 additions & 8 deletions go/consensus/cometbft/apps/roothash/finalization.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,10 @@ func (app *rootHashApplication) tryFinalizeRoundInsideTx( //nolint: gocyclo
livenessStats.TotalRounds++

// Record if the highest-ranked scheduler received enough commitments.
firstSchedulerIdx, err := rtState.Committee.SchedulerIdx(round, 0)
if err != nil {
return err
firstSchedulerIdx, ok := rtState.Committee.SchedulerIdx(round, 0)
if !ok {
// Should never happen.
return fmt.Errorf("failed to query primary scheduler, no workers in committee")
}
firstScheduler := rtState.Committee.Members[firstSchedulerIdx]

Expand Down Expand Up @@ -331,13 +332,12 @@ func (app *rootHashApplication) failRound(
)

// Record that the scheduler did not receive enough commitments.
schedulerIdx, err := rtState.Committee.SchedulerIdx(round, 0)
if err != nil {
// No workers in the committee.
return err
firstSchedulerIdx, ok := rtState.Committee.SchedulerIdx(round, 0)
if !ok {
return fmt.Errorf("failed to query primary scheduler, no workers in committee")
}

rtState.LivenessStatistics.MissedProposals[schedulerIdx]++
rtState.LivenessStatistics.MissedProposals[firstSchedulerIdx]++

if err := app.finalizeBlock(ctx, rtState, block.RoundFailed, nil); err != nil {
return fmt.Errorf("failed to emit empty block: %w", err)
Expand Down
7 changes: 4 additions & 3 deletions go/oasis-node/cmd/control/runtime_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,10 +430,11 @@ func doRuntimeStats(cmd *cobra.Command, args []string) { //nolint:gocyclo
continue
}
// Set committee info.
var ok bool
currentCommittee = state.Committee
currentScheduler, err = currentCommittee.Scheduler(currentRound, 0)
if err != nil {
logger.Error("failed to query transaction scheduler",
currentScheduler, ok = currentCommittee.Scheduler(currentRound, 0)
if !ok {
logger.Error("failed to query primary scheduler, no workers in committee",
"err", err,
"height", height,
)
Expand Down
7 changes: 2 additions & 5 deletions go/oasis-node/cmd/debug/byzantine/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,6 @@ func schedulerCheckScheduled(committee *scheduler.Committee, nodeID signature.Pu
}

func schedulerCheckPrimaryScheduler(committee *scheduler.Committee, nodeID signature.PublicKey, round uint64) bool {
rank, err := committee.SchedulerRank(round, nodeID)
if err != nil {
panic(err)
}
return rank == 0
rank, ok := committee.SchedulerRank(round, nodeID)
return ok && rank == 0
}
4 changes: 2 additions & 2 deletions go/roothash/api/commitment/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,8 +241,8 @@ func (p *Pool) AddVerifiedExecutorCommitment(c *scheduler.Committee, ec *Executo
}

// Ensure that the scheduler is allowed to schedule transactions.
rank, err := c.SchedulerRank(ec.Header.Header.Round, ec.Header.SchedulerID)
if err != nil {
rank, ok := c.SchedulerRank(ec.Header.Header.Round, ec.Header.SchedulerID)
if !ok {
// Reject commitments with invalid schedulers.
logger.Debug("executor commitment's scheduler is not in the committee",
"round", ec.Header.Header.Round,
Expand Down
12 changes: 6 additions & 6 deletions go/roothash/tests/tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,8 +329,8 @@ func (s *runtimeState) generateExecutorCommitments(t *testing.T, consensus conse
inMsgsHash.Empty()

// Gather executor nodes, starting with the scheduler.
schedulerIdx, err := executorCommittee.committee.SchedulerIdx(parent.Header.Round, rank)
require.NoError(err, "SchedulerIdx")
schedulerIdx, ok := executorCommittee.committee.SchedulerIdx(parent.Header.Round, rank)
require.True(ok, "SchedulerIdx")
schedulerID := executorCommittee.workers[schedulerIdx].Signer.Public()

executorNodes := make([]*registryTests.TestNode, 0, len(executorCommittee.workers))
Expand Down Expand Up @@ -523,8 +523,8 @@ func (s *runtimeState) testSuccessfulRound(t *testing.T, backend api.Backend, co
liveRounds[i] = 1
}

schedulerIdx, err := s.executorCommittee.committee.SchedulerIdx(parent.Block.Header.Round, 0)
require.NoError(err, "SchedulerIdx")
schedulerIdx, ok := s.executorCommittee.committee.SchedulerIdx(parent.Block.Header.Round, 0)
require.True(ok, "SchedulerIdx")
finalizedProposals[schedulerIdx]++

require.Equal(uint64(1), livenessStatistics.TotalRounds, "there should be one finalized round")
Expand Down Expand Up @@ -565,8 +565,8 @@ func (s *runtimeState) testRoundTimeout(t *testing.T, backend api.Backend, conse
missedProposals := make([]uint64, len(livenessStatistics.MissedProposals))

var schedulerIdx int
schedulerIdx, err = s.executorCommittee.committee.SchedulerIdx(blk.Block.Header.Round, 0)
require.NoError(err, "SchedulerIdx")
schedulerIdx, ok := s.executorCommittee.committee.SchedulerIdx(blk.Block.Header.Round, 0)
require.True(ok, "SchedulerIdx")
missedProposals[schedulerIdx]++

require.Zero(livenessStatistics.TotalRounds, "there should be no finalized rounds")
Expand Down
28 changes: 14 additions & 14 deletions go/scheduler/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,20 +194,20 @@ func (c *Committee) IsBackupWorker(id signature.PublicKey) bool {
// Scheduler returns the scheduler with the given rank in the committee's scheduling order
// for the given round.
//
// If no scheduler with the given rank is found, it returns an error.
func (c *Committee) Scheduler(round uint64, rank uint64) (*CommitteeNode, error) {
idx, err := c.SchedulerIdx(round, rank)
if err != nil {
return nil, err
// If no scheduler with the given rank is found, it returns false.
func (c *Committee) Scheduler(round uint64, rank uint64) (*CommitteeNode, bool) {
idx, ok := c.SchedulerIdx(round, rank)
if !ok {
return nil, false
}
return c.Members[idx], nil
return c.Members[idx], true
}

// SchedulerIdx returns the index of the scheduler with the given rank in the committee's
// scheduling order for the given round.
//
// If no scheduler with the given rank is found, it returns an error.
func (c *Committee) SchedulerIdx(round uint64, rank uint64) (int, error) {
// If no scheduler with the given rank is found, it returns false.
func (c *Committee) SchedulerIdx(round uint64, rank uint64) (int, bool) {
var total uint64

for _, n := range c.Members {
Expand All @@ -219,20 +219,20 @@ func (c *Committee) SchedulerIdx(round uint64, rank uint64) (int, error) {
}

if rank >= total {
return 0, fmt.Errorf("no worker with the given rank in the committee")
return 0, false
}

idx := (rank + total - round%total) % total

return int(idx), nil
return int(idx), true
}

// SchedulerRank returns the position (index) of a node with the given public key in the committee's
// scheduling order for the given round. A lower rank indicates higher scheduling priority.
//
// If the node is not a worker in the committee and, therefore, not allowed to schedule transactions
// for the given round, it returns an error.
func (c *Committee) SchedulerRank(round uint64, id signature.PublicKey) (uint64, error) {
// for the given round, it returns false.
func (c *Committee) SchedulerRank(round uint64, id signature.PublicKey) (uint64, bool) {
var (
total uint64
idx uint64
Expand All @@ -252,12 +252,12 @@ func (c *Committee) SchedulerRank(round uint64, id signature.PublicKey) (uint64,
}

if !isWorker {
return 0, fmt.Errorf("node is not a worker in the committee")
return 0, false
}

rank := (round + idx) % total

return rank, nil
return rank, true
}

// String returns a string representation of a Committee.
Expand Down
2 changes: 1 addition & 1 deletion go/worker/common/committee/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ func (n *Node) GetStatus() (*api.Status, error) {
status.ExecutorRoles = cmte.Roles

// Include scheduler rank.
if rank, err := cmte.Committee.SchedulerRank(status.LatestRound+1, epoch.identity.NodeSigner.Public()); err != nil {
if rank, ok := cmte.Committee.SchedulerRank(status.LatestRound+1, epoch.identity.NodeSigner.Public()); ok {
status.SchedulerRank = rank
}

Expand Down
10 changes: 5 additions & 5 deletions go/worker/compute/executor/committee/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -1453,9 +1453,9 @@ func (n *Node) roundWorker(ctx context.Context, bi *runtime.BlockInfo) {
// Compute node's rank when scheduling transactions.
id := n.commonNode.Identity.NodeSigner.Public()
n.committee = n.epoch.GetExecutorCommittee().Committee
n.rank, err = n.committee.SchedulerRank(round, id)
if err != nil {
n.rank = math.MaxUint64
n.rank = math.MaxUint64
if rank, ok := n.committee.SchedulerRank(round, id); ok {
n.rank = rank
}

n.logger.Debug("node is an executor member",
Expand Down Expand Up @@ -1546,8 +1546,8 @@ func (n *Node) roundWorker(ctx context.Context, bi *runtime.BlockInfo) {
if ec.Header.Header.Round != round {
continue
}
rank, err := n.committee.SchedulerRank(round, ec.Header.SchedulerID)
if err != nil {
rank, ok := n.committee.SchedulerRank(round, ec.Header.SchedulerID)
if !ok {
continue
}
poolRank = rank
Expand Down
4 changes: 2 additions & 2 deletions go/worker/compute/executor/committee/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ func (h *committeeMsgHandler) HandleMessage(_ context.Context, _ signature.Publi
// Before opening the signed dispatch message, verify that it was actually signed by one
// of the transaction schedulers.
committee := epoch.GetExecutorCommittee().Committee
rank, err := committee.SchedulerRank(proposal.Header.Round, proposal.NodeID)
if err != nil {
rank, ok := committee.SchedulerRank(proposal.Header.Round, proposal.NodeID)
if !ok {
// Invalid scheduler, do not forward.
return p2pError.Permanent(errMsgFromNonTxnSched)
}
Expand Down

0 comments on commit 48623ef

Please sign in to comment.