From 67f17166bf2ba337dafb8e0ea8eea5f74a990767 Mon Sep 17 00:00:00 2001 From: Benjamin Wang Date: Fri, 1 Dec 2023 14:42:46 +0000 Subject: [PATCH] Safeguard lease operations by double checking the leadership 1. ignore old leader's leases revoking request 2. double check current member's leadership before perform lease renew request 3. etcdserve: ensure current member's leadership before performing lease checkpoint request Signed-off-by: Benjamin Wang --- server/etcdserver/server.go | 44 +++++++++++++++++++++++++++++- server/etcdserver/v3_server.go | 10 +++++++ server/lease/lessor.go | 10 +++++-- server/lease/lessor_test.go | 6 ++-- tests/integration/v3_lease_test.go | 2 +- 5 files changed, 65 insertions(+), 7 deletions(-) diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index ac5f0c7ab29..1feb8bc379d 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -394,8 +394,15 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { if srv.Cfg.EnableLeaseCheckpoint { // setting checkpointer enables lease checkpoint feature. - srv.lessor.SetCheckpointer(func(ctx context.Context, cp *pb.LeaseCheckpointRequest) { + srv.lessor.SetCheckpointer(func(ctx context.Context, cp *pb.LeaseCheckpointRequest) error { + if !srv.ensureLeadership() { + srv.lg.Warn("Ignore the checkpoint request because current member isn't a leader", + zap.Uint64("local-member-id", uint64(srv.MemberId()))) + return lease.ErrNotPrimary + } + srv.raftRequestOnce(ctx, pb.InternalRaftRequest{LeaseCheckpoint: cp}) + return nil }) } @@ -844,7 +851,19 @@ func (s *EtcdServer) run() { func (s *EtcdServer) revokeExpiredLeases(leases []*lease.Lease) { s.GoAttach(func() { + // We shouldn't revoke any leases if current member isn't a leader, + // because the operation should only be performed by the leader. When + // the leader gets blocked on the raft loop, such as writing WAL entries, + // it can't process any events or messages from raft. It may think it + // is still the leader even the leader has already changed. + // Refer to https://github.com/etcd-io/etcd/issues/15247 lg := s.Logger() + if !s.ensureLeadership() { + lg.Warn("Ignore the lease revoking request because current member isn't a leader", + zap.Uint64("local-member-id", uint64(s.MemberId()))) + return + } + // Increases throughput of expired leases deletion process through parallelization c := make(chan struct{}, maxPendingRevokes) for _, curLease := range leases { @@ -877,6 +896,29 @@ func (s *EtcdServer) revokeExpiredLeases(leases []*lease.Lease) { }) } +// ensureLeadership checks whether current member is still the leader. +func (s *EtcdServer) ensureLeadership() bool { + lg := s.Logger() + + ctx, cancel := context.WithTimeout(s.ctx, s.Cfg.ReqTimeout()) + defer cancel() + if err := s.linearizableReadNotify(ctx); err != nil { + lg.Warn("Failed to check current member's leadership", + zap.Error(err)) + return false + } + + newLeaderId := s.raftStatus().Lead + if newLeaderId != uint64(s.MemberId()) { + lg.Warn("Current member isn't a leader", + zap.Uint64("local-member-id", uint64(s.MemberId())), + zap.Uint64("new-lead", newLeaderId)) + return false + } + + return true +} + // Cleanup removes allocated objects by EtcdServer.NewServer in // situation that EtcdServer::Start was not called (that takes care of cleanup). func (s *EtcdServer) Cleanup() { diff --git a/server/etcdserver/v3_server.go b/server/etcdserver/v3_server.go index 07d1f546c9f..49067ae5203 100644 --- a/server/etcdserver/v3_server.go +++ b/server/etcdserver/v3_server.go @@ -278,6 +278,16 @@ func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, error) { if s.isLeader() { + // If s.isLeader() returns true, but we fail to ensure the current + // member's leadership, there are a couple of possibilities: + // 1. current member gets stuck on writing WAL entries; + // 2. current member is in network isolation status; + // 3. current member isn't a leader anymore (possibly due to #1 above). + // In such case, we just return error to client, so that the client can + // switch to another member to continue the lease keep-alive operation. + if !s.ensureLeadership() { + return -1, lease.ErrNotPrimary + } if err := s.waitAppliedIndex(); err != nil { return 0, err } diff --git a/server/lease/lessor.go b/server/lease/lessor.go index 75a6834a7ae..fc29b9b6663 100644 --- a/server/lease/lessor.go +++ b/server/lease/lessor.go @@ -75,7 +75,7 @@ type RangeDeleter func() TxnDelete // Checkpointer permits checkpointing of lease remaining TTLs to the consensus log. Defined here to // avoid circular dependency with mvcc. -type Checkpointer func(ctx context.Context, lc *pb.LeaseCheckpointRequest) +type Checkpointer func(ctx context.Context, lc *pb.LeaseCheckpointRequest) error type LeaseID int64 @@ -422,7 +422,9 @@ func (le *lessor) Renew(id LeaseID) (int64, error) { // By applying a RAFT entry only when the remainingTTL is already set, we limit the number // of RAFT entries written per lease to a max of 2 per checkpoint interval. if clearRemainingTTL { - le.cp(context.Background(), &pb.LeaseCheckpointRequest{Checkpoints: []*pb.LeaseCheckpoint{{ID: int64(l.ID), Remaining_TTL: 0}}}) + if err := le.cp(context.Background(), &pb.LeaseCheckpointRequest{Checkpoints: []*pb.LeaseCheckpoint{{ID: int64(l.ID), Remaining_TTL: 0}}}); err != nil { + return -1, err + } } le.mu.Lock() @@ -656,7 +658,9 @@ func (le *lessor) checkpointScheduledLeases() { le.mu.Unlock() if len(cps) != 0 { - le.cp(context.Background(), &pb.LeaseCheckpointRequest{Checkpoints: cps}) + if err := le.cp(context.Background(), &pb.LeaseCheckpointRequest{Checkpoints: cps}); err != nil { + return + } } if len(cps) < maxLeaseCheckpointBatchSize { return diff --git a/server/lease/lessor_test.go b/server/lease/lessor_test.go index ae9ad52e820..736909169da 100644 --- a/server/lease/lessor_test.go +++ b/server/lease/lessor_test.go @@ -269,10 +269,11 @@ func TestLessorRenewWithCheckpointer(t *testing.T) { defer os.RemoveAll(dir) le := newLessor(lg, be, clusterLatest(), LessorConfig{MinLeaseTTL: minLeaseTTL}) - fakerCheckerpointer := func(ctx context.Context, cp *pb.LeaseCheckpointRequest) { + fakerCheckerpointer := func(ctx context.Context, cp *pb.LeaseCheckpointRequest) error { for _, cp := range cp.GetCheckpoints() { le.Checkpoint(LeaseID(cp.GetID()), cp.GetRemaining_TTL()) } + return nil } defer le.Stop() // Set checkpointer @@ -556,7 +557,7 @@ func TestLessorCheckpointScheduling(t *testing.T) { defer le.Stop() le.minLeaseTTL = 1 checkpointedC := make(chan struct{}) - le.SetCheckpointer(func(ctx context.Context, lc *pb.LeaseCheckpointRequest) { + le.SetCheckpointer(func(ctx context.Context, lc *pb.LeaseCheckpointRequest) error { close(checkpointedC) if len(lc.Checkpoints) != 1 { t.Errorf("expected 1 checkpoint but got %d", len(lc.Checkpoints)) @@ -565,6 +566,7 @@ func TestLessorCheckpointScheduling(t *testing.T) { if c.Remaining_TTL != 1 { t.Errorf("expected checkpoint to be called with Remaining_TTL=%d but got %d", 1, c.Remaining_TTL) } + return nil }) _, err := le.Grant(1, 2) if err != nil { diff --git a/tests/integration/v3_lease_test.go b/tests/integration/v3_lease_test.go index 5fc0d93b74f..0be08b6df7b 100644 --- a/tests/integration/v3_lease_test.go +++ b/tests/integration/v3_lease_test.go @@ -747,7 +747,7 @@ func TestV3LeaseFailover(t *testing.T) { // send keep alive to old leader until the old leader starts // to drop lease request. - var expectedExp time.Time + expectedExp := time.Now().Add(5 * time.Second) for { if err = lac.Send(lreq); err != nil { break