Skip to content

Commit

Permalink
Merge branch 'main' of github.com:etcd-io/etcd into issue-16339
Browse files Browse the repository at this point in the history
  • Loading branch information
RaphSku committed Dec 16, 2023
2 parents c1934ba + 4e98636 commit 31cbb37
Show file tree
Hide file tree
Showing 13 changed files with 251 additions and 22 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG/CHANGELOG-3.4.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ Previous change logs can be found at [CHANGELOG-3.3](https://github.com/etcd-io/

## v3.4.29 (tbd)

### etcd server
- [Disable following HTTP redirects in peer communication](https://github.com/etcd-io/etcd/pull/17112)

### Dependencies
- Compile binaries using go [1.20.12](https://github.com/etcd-io/etcd/pull/17076).

Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
[![Releases](https://img.shields.io/github/release/etcd-io/etcd/all.svg?style=flat-square)](https://github.com/etcd-io/etcd/releases)
[![LICENSE](https://img.shields.io/github/license/etcd-io/etcd.svg?style=flat-square)](https://github.com/etcd-io/etcd/blob/main/LICENSE)
[![OpenSSF Scorecard](https://api.securityscorecards.dev/projects/github.com/etcd-io/etcd/badge)](https://api.securityscorecards.dev/projects/github.com/etcd-io/etcd)
<a href="https://actuated.dev/"><img alt="Arm CI sponsored by Actuated" src="https://docs.actuated.dev/images/actuated-badge.png" width="120px"></img></a>

**Note**: The `main` branch may be in an *unstable or even broken state* during development. For stable versions, see [releases][github-release].

Expand Down
5 changes: 4 additions & 1 deletion client/v3/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,9 +549,12 @@ func (l *lessor) recvKeepAlive(resp *pb.LeaseKeepAliveResponse) {
// deadlineLoop reaps any keep alive channels that have not received a response
// within the lease TTL
func (l *lessor) deadlineLoop() {
timer := time.NewTimer(time.Second)
defer timer.Stop()
for {
timer.Reset(time.Second)
select {
case <-time.After(time.Second):
case <-timer.C:
case <-l.donec:
return
}
Expand Down
4 changes: 3 additions & 1 deletion contrib/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

Scripts and files which may be useful but aren't part of the core etcd project.

* [systemd](systemd) - an example unit file for deploying etcd on systemd-based distributions
* [lock](lock) - example addressing the expired lease problem of distributed locking with etcd
* [mixin](mixin) - customisable set of Grafana dashboard and Prometheus alerts for etcd
* [raftexample](raftexample) - an example distributed key-value store using raft
* [systemd](systemd) - an example unit file for deploying etcd on systemd-based distributions
* [systemd/etcd3-multinode](systemd/etcd3-multinode) - multi-node cluster setup with systemd
4 changes: 4 additions & 0 deletions server/etcdserver/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,18 +90,22 @@ func bootstrap(cfg config.ServerConfig) (b *bootstrappedServer, err error) {
bwal = bootstrapWALFromSnapshot(cfg, backend.snapshot)
}

cfg.Logger.Info("bootstrapping cluster")
cluster, err := bootstrapCluster(cfg, bwal, prt)
if err != nil {
backend.Close()
return nil, err
}

cfg.Logger.Info("bootstrapping storage")
s := bootstrapStorage(cfg, st, backend, bwal, cluster)

if err = cluster.Finalize(cfg, s); err != nil {
backend.Close()
return nil, err
}

cfg.Logger.Info("bootstrapping raft")
raft := bootstrapRaft(cfg, cluster, s.wal)
return &bootstrappedServer{
prt: prt,
Expand Down
49 changes: 47 additions & 2 deletions server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,8 +298,10 @@ type EtcdServer struct {
func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
b, err := bootstrap(cfg)
if err != nil {
cfg.Logger.Error("bootstrap failed", zap.Error(err))
return nil, err
}
cfg.Logger.Info("bootstrap successfully")

defer func() {
if err != nil {
Expand Down Expand Up @@ -392,8 +394,15 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {

if srv.Cfg.EnableLeaseCheckpoint {
// setting checkpointer enables lease checkpoint feature.
srv.lessor.SetCheckpointer(func(ctx context.Context, cp *pb.LeaseCheckpointRequest) {
srv.lessor.SetCheckpointer(func(ctx context.Context, cp *pb.LeaseCheckpointRequest) error {
if !srv.ensureLeadership() {
srv.lg.Warn("Ignore the checkpoint request because current member isn't a leader",
zap.Uint64("local-member-id", uint64(srv.MemberId())))
return lease.ErrNotPrimary
}

srv.raftRequestOnce(ctx, pb.InternalRaftRequest{LeaseCheckpoint: cp})
return nil
})
}

Expand Down Expand Up @@ -842,7 +851,19 @@ func (s *EtcdServer) run() {

func (s *EtcdServer) revokeExpiredLeases(leases []*lease.Lease) {
s.GoAttach(func() {
// We shouldn't revoke any leases if current member isn't a leader,
// because the operation should only be performed by the leader. When
// the leader gets blocked on the raft loop, such as writing WAL entries,
// it can't process any events or messages from raft. It may think it
// is still the leader even the leader has already changed.
// Refer to https://github.com/etcd-io/etcd/issues/15247
lg := s.Logger()
if !s.ensureLeadership() {
lg.Warn("Ignore the lease revoking request because current member isn't a leader",
zap.Uint64("local-member-id", uint64(s.MemberId())))
return
}

// Increases throughput of expired leases deletion process through parallelization
c := make(chan struct{}, maxPendingRevokes)
for _, curLease := range leases {
Expand Down Expand Up @@ -875,6 +896,29 @@ func (s *EtcdServer) revokeExpiredLeases(leases []*lease.Lease) {
})
}

// ensureLeadership checks whether current member is still the leader.
func (s *EtcdServer) ensureLeadership() bool {
lg := s.Logger()

ctx, cancel := context.WithTimeout(s.ctx, s.Cfg.ReqTimeout())
defer cancel()
if err := s.linearizableReadNotify(ctx); err != nil {
lg.Warn("Failed to check current member's leadership",
zap.Error(err))
return false
}

newLeaderId := s.raftStatus().Lead
if newLeaderId != uint64(s.MemberId()) {
lg.Warn("Current member isn't a leader",
zap.Uint64("local-member-id", uint64(s.MemberId())),
zap.Uint64("new-lead", newLeaderId))
return false
}

return true
}

// Cleanup removes allocated objects by EtcdServer.NewServer in
// situation that EtcdServer::Start was not called (that takes care of cleanup).
func (s *EtcdServer) Cleanup() {
Expand Down Expand Up @@ -1975,7 +2019,9 @@ func removeNeedlessRangeReqs(txn *pb.TxnRequest) {
// applyConfChange applies a ConfChange to the server. It is only
// invoked with a ConfChange that has already passed through Raft
func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.ConfState, shouldApplyV3 membership.ShouldApplyV3) (bool, error) {
lg := s.Logger()
if err := s.cluster.ValidateConfigurationChange(cc); err != nil {
lg.Error("Validation on configuration change failed", zap.Bool("shouldApplyV3", bool(shouldApplyV3)), zap.Error(err))
cc.NodeID = raft.None
s.r.ApplyConfChange(cc)

Expand All @@ -1988,7 +2034,6 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
return false, err
}

lg := s.Logger()
*confState = *s.r.ApplyConfChange(cc)
s.beHooks.SetConfState(confState)
switch cc.Type {
Expand Down
10 changes: 10 additions & 0 deletions server/etcdserver/v3_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,16 @@ func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest)

func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, error) {
if s.isLeader() {
// If s.isLeader() returns true, but we fail to ensure the current
// member's leadership, there are a couple of possibilities:
// 1. current member gets stuck on writing WAL entries;
// 2. current member is in network isolation status;
// 3. current member isn't a leader anymore (possibly due to #1 above).
// In such case, we just return error to client, so that the client can
// switch to another member to continue the lease keep-alive operation.
if !s.ensureLeadership() {
return -1, lease.ErrNotPrimary
}
if err := s.waitAppliedIndex(); err != nil {
return 0, err
}
Expand Down
10 changes: 7 additions & 3 deletions server/lease/lessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ type RangeDeleter func() TxnDelete

// Checkpointer permits checkpointing of lease remaining TTLs to the consensus log. Defined here to
// avoid circular dependency with mvcc.
type Checkpointer func(ctx context.Context, lc *pb.LeaseCheckpointRequest)
type Checkpointer func(ctx context.Context, lc *pb.LeaseCheckpointRequest) error

type LeaseID int64

Expand Down Expand Up @@ -422,7 +422,9 @@ func (le *lessor) Renew(id LeaseID) (int64, error) {
// By applying a RAFT entry only when the remainingTTL is already set, we limit the number
// of RAFT entries written per lease to a max of 2 per checkpoint interval.
if clearRemainingTTL {
le.cp(context.Background(), &pb.LeaseCheckpointRequest{Checkpoints: []*pb.LeaseCheckpoint{{ID: int64(l.ID), Remaining_TTL: 0}}})
if err := le.cp(context.Background(), &pb.LeaseCheckpointRequest{Checkpoints: []*pb.LeaseCheckpoint{{ID: int64(l.ID), Remaining_TTL: 0}}}); err != nil {
return -1, err
}
}

le.mu.Lock()
Expand Down Expand Up @@ -656,7 +658,9 @@ func (le *lessor) checkpointScheduledLeases() {
le.mu.Unlock()

if len(cps) != 0 {
le.cp(context.Background(), &pb.LeaseCheckpointRequest{Checkpoints: cps})
if err := le.cp(context.Background(), &pb.LeaseCheckpointRequest{Checkpoints: cps}); err != nil {
return
}
}
if len(cps) < maxLeaseCheckpointBatchSize {
return
Expand Down
6 changes: 4 additions & 2 deletions server/lease/lessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,10 +269,11 @@ func TestLessorRenewWithCheckpointer(t *testing.T) {
defer os.RemoveAll(dir)

le := newLessor(lg, be, clusterLatest(), LessorConfig{MinLeaseTTL: minLeaseTTL})
fakerCheckerpointer := func(ctx context.Context, cp *pb.LeaseCheckpointRequest) {
fakerCheckerpointer := func(ctx context.Context, cp *pb.LeaseCheckpointRequest) error {
for _, cp := range cp.GetCheckpoints() {
le.Checkpoint(LeaseID(cp.GetID()), cp.GetRemaining_TTL())
}
return nil
}
defer le.Stop()
// Set checkpointer
Expand Down Expand Up @@ -556,7 +557,7 @@ func TestLessorCheckpointScheduling(t *testing.T) {
defer le.Stop()
le.minLeaseTTL = 1
checkpointedC := make(chan struct{})
le.SetCheckpointer(func(ctx context.Context, lc *pb.LeaseCheckpointRequest) {
le.SetCheckpointer(func(ctx context.Context, lc *pb.LeaseCheckpointRequest) error {
close(checkpointedC)
if len(lc.Checkpoints) != 1 {
t.Errorf("expected 1 checkpoint but got %d", len(lc.Checkpoints))
Expand All @@ -565,6 +566,7 @@ func TestLessorCheckpointScheduling(t *testing.T) {
if c.Remaining_TTL != 1 {
t.Errorf("expected checkpoint to be called with Remaining_TTL=%d but got %d", 1, c.Remaining_TTL)
}
return nil
})
_, err := le.Grant(1, 2)
if err != nil {
Expand Down
17 changes: 9 additions & 8 deletions tests/e2e/ctl_v3_member_no_proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,28 +40,27 @@ func TestMemberReplace(t *testing.T) {
require.NoError(t, err)
defer epc.Close()

memberId := rand.Int() % len(epc.Procs)
member := epc.Procs[memberId]
memberIdx := rand.Int() % len(epc.Procs)
member := epc.Procs[memberIdx]
memberName := member.Config().Name
var endpoints []string
for i := 1; i < len(epc.Procs); i++ {
endpoints = append(endpoints, epc.Procs[(memberId+i)%len(epc.Procs)].EndpointsGRPC()...)
endpoints = append(endpoints, epc.Procs[(memberIdx+i)%len(epc.Procs)].EndpointsGRPC()...)
}
cc, err := e2e.NewEtcdctl(epc.Cfg.Client, endpoints)
require.NoError(t, err)

c := epc.Etcdctl()
memberID, found, err := getMemberIdByName(ctx, c, memberName)
memberID, found, err := getMemberIdByName(ctx, cc, memberName)
require.NoError(t, err)
require.Equal(t, found, true, "Member not found")

// Need to wait health interval for cluster to accept member changes
time.Sleep(etcdserver.HealthInterval)

t.Logf("Removing member %s", memberName)
_, err = c.MemberRemove(ctx, memberID)
_, err = cc.MemberRemove(ctx, memberID)
require.NoError(t, err)
_, found, err = getMemberIdByName(ctx, c, memberName)
_, found, err = getMemberIdByName(ctx, cc, memberName)
require.NoError(t, err)
require.Equal(t, found, false, "Expected member to be removed")
for member.IsRunning() {
Expand All @@ -82,12 +81,14 @@ func TestMemberReplace(t *testing.T) {
err = patchArgs(member.Config().Args, "initial-cluster-state", "existing")
require.NoError(t, err)

// Sleep 100ms to bypass the known issue https://github.com/etcd-io/etcd/issues/16687.
time.Sleep(100 * time.Millisecond)
t.Logf("Starting member %s", memberName)
err = member.Start(ctx)
require.NoError(t, err)
testutils.ExecuteUntil(ctx, t, func() {
for {
_, found, err := getMemberIdByName(ctx, c, memberName)
_, found, err := getMemberIdByName(ctx, cc, memberName)
if err != nil || !found {
time.Sleep(10 * time.Millisecond)
continue
Expand Down
Loading

0 comments on commit 31cbb37

Please sign in to comment.