diff --git a/server/etcdmain/grpc_proxy.go b/server/etcdmain/grpc_proxy.go index 5946361b8093..39a83029b169 100644 --- a/server/etcdmain/grpc_proxy.go +++ b/server/etcdmain/grpc_proxy.go @@ -19,6 +19,7 @@ import ( "crypto/tls" "crypto/x509" "fmt" + "google.golang.org/grpc/metadata" "io" "log" "math" @@ -422,6 +423,20 @@ func mustListenCMux(lg *zap.Logger, tlsinfo *transport.TLSInfo) cmux.CMux { return cmux.New(l) } +func contextPropagationUnaryServerInterceptor() grpc.UnaryServerInterceptor { + return func( + ctx context.Context, + req interface{}, + info *grpc.UnaryServerInfo, + handler grpc.UnaryHandler, + ) (interface{}, error) { + if md, ok := metadata.FromIncomingContext(ctx); ok { + ctx = metadata.NewOutgoingContext(ctx, md) + } + return handler(ctx, req) + } +} + func newGRPCProxyServer(lg *zap.Logger, client *clientv3.Client) *grpc.Server { if grpcProxyEnableOrdering { vf := ordering.NewOrderViolationSwitchEndpointClosure(client) @@ -467,6 +482,7 @@ func newGRPCProxyServer(lg *zap.Logger, client *clientv3.Client) *grpc.Server { } grpcChainUnaryList := []grpc.UnaryServerInterceptor{ grpc_prometheus.UnaryServerInterceptor, + contextPropagationUnaryServerInterceptor(), } if grpcProxyEnableLogging { grpcChainStreamList = append(grpcChainStreamList, diff --git a/server/proxy/grpcproxy/watch.go b/server/proxy/grpcproxy/watch.go index 90eb21d4a40f..ca1ff85f858a 100644 --- a/server/proxy/grpcproxy/watch.go +++ b/server/proxy/grpcproxy/watch.go @@ -123,7 +123,8 @@ func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) { // post to stopc => terminate server stream; can't use a waitgroup // since all goroutines will only terminate after Watch() exits. - stopc := make(chan struct{}, 3) + stopc := make(chan struct{}, 2) + leaderc := make(chan struct{}, 1) go func() { defer func() { stopc <- struct{}{} }() wps.recvLoop() @@ -134,7 +135,7 @@ func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) { }() // tear down watch if leader goes down or entire watch proxy is terminated go func() { - defer func() { stopc <- struct{}{} }() + defer func() { leaderc <- struct{}{} }() select { case <-lostLeaderC: case <-ctx.Done(): @@ -142,7 +143,7 @@ func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) { } }() - <-stopc + <-leaderc cancel() // recv/send may only shutdown after function exits;