Skip to content

Commit

Permalink
cleaner code
Browse files Browse the repository at this point in the history
  • Loading branch information
vadiminshakov committed Dec 18, 2023
1 parent cabeefa commit fb6db0d
Showing 1 changed file with 51 additions and 42 deletions.
93 changes: 51 additions & 42 deletions core/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,49 +79,9 @@ func (c *coordinatorImpl) Broadcast(ctx context.Context, req entity.BroadcastReq

// propose
log.Infof("propose key %s", req.Key)
votes := make([]*entity.Vote, 0, len(c.followers))
for nodename, follower := range c.followers {
if c.tracer != nil {
span, ctx = c.tracer.StartSpanFromContext(ctx, "Propose")
}
var (
resp *pb.Response
err error
)
isAccepted := true

for resp == nil || resp != nil && resp.Type == pb.Type_NACK {
resp, err = follower.Propose(ctx, &pb.ProposeRequest{Key: req.Key,
Value: req.Value,
CommitType: ctype,
Index: c.height})
if c.tracer != nil && span != nil {
span.Finish()
}
if err != nil {
log.Errorf(err.Error())
isAccepted = false
}
votes = append(votes, &entity.Vote{
Node: nodename,
IsAccepted: isAccepted,
})

if resp == nil || resp.Type != pb.Type_ACK {
log.Warnf("follower %s not acknowledged msg %v", nodename, req)
}

if resp != nil && resp.Index > c.height {
log.Warnf("сoordinator has stale height [%d], update to [%d] and try to send again", c.height, resp.Index)
c.height = resp.Index
votes = votes[:0]
}
}
if err := c.propose(ctx, req, ctype); err != nil {
return nil, errors.Wrap(err, "failed to send propose")
}
if err := c.vlog.Set(c.height, req.Key, req.Value); err != nil {
return nil, errors.Wrap(err, "failed to save msg in the coordinator's log")
}
c.vlog.SetVotes(c.height, votes)

// precommit phase only for three-phase mode
if c.config.CommitType == server.THREE_PHASE {
Expand Down Expand Up @@ -196,6 +156,55 @@ func (c *coordinatorImpl) Broadcast(ctx context.Context, req entity.BroadcastReq
return &entity.BroadcastResponse{entity.ResponseAck, c.height}, nil
}

func (c *coordinatorImpl) propose(ctx context.Context, req entity.BroadcastRequest, ctype pb.CommitType) error {
votes := make([]*entity.Vote, 0, len(c.followers))
var span zipkin.Span
for nodename, follower := range c.followers {
if c.tracer != nil {
span, ctx = c.tracer.StartSpanFromContext(ctx, "Propose")
}
var (
resp *pb.Response
err error
)
isAccepted := true

for resp == nil || resp != nil && resp.Type == pb.Type_NACK {
resp, err = follower.Propose(ctx, &pb.ProposeRequest{Key: req.Key,
Value: req.Value,
CommitType: ctype,
Index: c.height})
if c.tracer != nil && span != nil {
span.Finish()
}
if err != nil {
log.Errorf(err.Error())
isAccepted = false
}
votes = append(votes, &entity.Vote{
Node: nodename,
IsAccepted: isAccepted,
})

if resp == nil || resp.Type != pb.Type_ACK {
log.Warnf("follower %s not acknowledged msg %v", nodename, req)
}

if resp != nil && resp.Index > c.height {
log.Warnf("сoordinator has stale height [%d], update to [%d] and try to send again", c.height, resp.Index)
c.height = resp.Index
votes = votes[:0]
}
}
}
if err := c.vlog.Set(c.height, req.Key, req.Value); err != nil {
return errors.Wrap(err, "failed to save msg in the coordinator's log")
}
c.vlog.SetVotes(c.height, votes)

return nil
}

func (c *coordinatorImpl) Height() uint64 {
return c.height
}
Expand Down

0 comments on commit fb6db0d

Please sign in to comment.