Skip to content

Commit

Permalink
etcdserver: fix incorrect metrics generated when clients cancel watches
Browse files Browse the repository at this point in the history
Before this patch, a client which cancels the context for a watch results in the
server generating a `rpctypes.ErrGRPCNoLeader` error that leads the recording of
a gRPC `Unavailable` metric in association with the client watch cancellation.
The metric looks like this:

    grpc_server_handled_total{grpc_code="Unavailable",grpc_method="Watch",grpc_service="etcdserverpb.Watch",grpc_type="bidi_stream"}

So, the watch server has misidentified the error as a server error and then
propagates the mistake to metrics, leading to a false indicator that the leader
has been lost. This false signal then leads to false alerting.

The commit 9c103dd introduced an interceptor which wraps
watch streams requiring a leader, causing those streams to be actively canceled
when leader loss is detected.

However, the error handling code assumes all stream context cancellations are
from the interceptor. This assumption is broken when the context was canceled
because of a client stream cancelation.

The core challenge is lack of information conveyed via `context.Context` which
is shared by both the send and receive sides of the stream handling and is
subject to cancellation by all paths (including the gRPC library itself). If any
piece of the system cancels the shared context, there's no way for a context
consumer to understand who cancelled the context or why.

To solve the ambiguity of the stream interceptor code specifically, this patch
introduces a custom context struct which the interceptor uses to expose a custom
error through the context when the interceptor decides to actively cancel a
stream. Now the consuming side can more safely assume a generic context
cancellation can be propagated as a cancellation, and the server generated
leader error is preserved and propagated normally without any special inference.

When a client cancels the stream, there remains a race in the error handling
code between the send and receive goroutines whereby the underlying gRPC error
is lost in the case where the send path returns and is handled first, but this
issue can be taken separately as no matter which paths wins, we can detect a
generic cancellation.

This is a replacement of #11375.

Fixes #10289, #9725, #9576, #9166
  • Loading branch information
ironcladlou committed Sep 29, 2020
1 parent b47cd2f commit 1f41059
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 10 deletions.
48 changes: 44 additions & 4 deletions etcdserver/api/v3rpc/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@ import (
"go.etcd.io/etcd/v3/pkg/types"
"go.etcd.io/etcd/v3/raft"

pb "go.etcd.io/etcd/v3/etcdserver/etcdserverpb"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"

pb "go.etcd.io/etcd/v3/etcdserver/etcdserverpb"
)

const (
Expand Down Expand Up @@ -231,8 +232,13 @@ func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor
return rpctypes.ErrGRPCNoLeader
}

cctx, cancel := context.WithCancel(ss.Context())
ss = serverStreamWithCtx{ctx: cctx, cancel: &cancel, ServerStream: ss}
cancelCtx, cancelFn := context.WithCancel(ss.Context())
monitorCtx := &leaderMonitoringContext{
Context: cancelCtx,
cancel: cancelFn,
}
cancelForLeaderLoss := context.CancelFunc(monitorCtx.CancelForLeaderLoss)
ss = serverStreamWithCtx{ctx: monitorCtx, cancel: &cancelForLeaderLoss, ServerStream: ss}

smap.mu.Lock()
smap.streams[ss] = struct{}{}
Expand All @@ -242,7 +248,7 @@ func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor
smap.mu.Lock()
delete(smap.streams, ss)
smap.mu.Unlock()
cancel()
monitorCtx.Cancel()
}()
}
}
Expand All @@ -251,6 +257,40 @@ func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor
}
}

// leaderMonitoringContext wraps a context and provides a custom error when
// the CancelForLeaderLoss() method is used to cancel the context. This is
// so downstream context users can disambiguate the reason for the cancellation
// which could be from the client (for example) or from this interceptor code.
type leaderMonitoringContext struct {
context.Context

lock sync.Mutex
cancel context.CancelFunc
cancelReason error
}

func (c *leaderMonitoringContext) Cancel() {
c.lock.Lock()
defer c.lock.Unlock()
c.cancel()
}

func (c *leaderMonitoringContext) CancelForLeaderLoss() {
c.lock.Lock()
defer c.lock.Unlock()
c.cancelReason = rpctypes.ErrGRPCNoLeader
c.cancel()
}

func (c *leaderMonitoringContext) Err() error {
c.lock.Lock()
defer c.lock.Unlock()
if c.cancelReason != nil {
return c.cancelReason
}
return c.Context.Err()
}

type serverStreamWithCtx struct {
grpc.ServerStream
ctx context.Context
Expand Down
2 changes: 2 additions & 0 deletions etcdserver/api/v3rpc/rpctypes/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ var (
ErrGRPCLeaseExist = status.New(codes.FailedPrecondition, "etcdserver: lease already exists").Err()
ErrGRPCLeaseTTLTooLarge = status.New(codes.OutOfRange, "etcdserver: too large lease TTL").Err()

ErrGRPCWatchCanceled = status.New(codes.Canceled, "etcdserver: watch canceled").Err()

ErrGRPCMemberExist = status.New(codes.FailedPrecondition, "etcdserver: member ID already exist").Err()
ErrGRPCPeerURLExist = status.New(codes.FailedPrecondition, "etcdserver: Peer URLs already exists").Err()
ErrGRPCMemberNotEnoughStarted = status.New(codes.FailedPrecondition, "etcdserver: re-configuration failed due to not enough started members").Err()
Expand Down
11 changes: 6 additions & 5 deletions etcdserver/api/v3rpc/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ import (
"sync"
"time"

"go.uber.org/zap"

"go.etcd.io/etcd/v3/auth"
"go.etcd.io/etcd/v3/etcdserver"
"go.etcd.io/etcd/v3/etcdserver/api/v3rpc/rpctypes"
pb "go.etcd.io/etcd/v3/etcdserver/etcdserverpb"
"go.etcd.io/etcd/v3/mvcc"
"go.etcd.io/etcd/v3/mvcc/mvccpb"

"go.uber.org/zap"
)

const minWatchProgressInterval = 100 * time.Millisecond
Expand Down Expand Up @@ -199,13 +199,14 @@ func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) {

select {
case err = <-errc:
if err == context.Canceled {
err = rpctypes.ErrGRPCWatchCanceled
}
close(sws.ctrlStream)

case <-stream.Context().Done():
err = stream.Context().Err()
// the only server-side cancellation is noleader for now.
if err == context.Canceled {
err = rpctypes.ErrGRPCNoLeader
err = rpctypes.ErrGRPCWatchCanceled
}
}

Expand Down
5 changes: 4 additions & 1 deletion tests/e2e/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func metricsTest(cx ctlCtx) {
{"/metrics", fmt.Sprintf("etcd_mvcc_delete_total 3")},
{"/metrics", fmt.Sprintf(`etcd_server_version{server_version="%s"} 1`, version.Version)},
{"/metrics", fmt.Sprintf(`etcd_cluster_version{cluster_version="%s"} 1`, version.Cluster(version.Version))},
{"/metrics", fmt.Sprintf(`grpc_server_handled_total{grpc_code="Canceled",grpc_method="Watch",grpc_service="etcdserverpb.Watch",grpc_type="bidi_stream"} 6`)},
{"/health", `{"health":"true","reason":""}`},
} {
i++
Expand All @@ -58,7 +59,9 @@ func metricsTest(cx ctlCtx) {
if err := ctlV3Del(cx, []string{fmt.Sprintf("%d", i)}, 1); err != nil {
cx.t.Fatal(err)
}

if err := ctlV3Watch(cx, []string{"k", "--rev", "1"}, []kvExec{{key: "k", val: "v"}}...); err != nil {
cx.t.Fatal(err)
}
if err := cURLGet(cx.epc, cURLReq{endpoint: test.endpoint, expected: test.expected, metricsURLScheme: cx.cfg.metricsURLScheme}); err != nil {
cx.t.Fatalf("failed get with curl (%v)", err)
}
Expand Down

0 comments on commit 1f41059

Please sign in to comment.