Skip to content

Commit

Permalink
fix(grpcproxy): support grpc context propagation in grpc proxy;fix th…
Browse files Browse the repository at this point in the history
…e problem that watch exits when recvLoop exits but sendLoop still running
  • Loading branch information
saiwl committed May 15, 2024
1 parent 7b9013d commit 98844f0
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 3 deletions.
16 changes: 16 additions & 0 deletions server/etcdmain/grpc_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"crypto/tls"
"crypto/x509"
"fmt"
"google.golang.org/grpc/metadata"
"io"
"log"
"math"
Expand Down Expand Up @@ -422,6 +423,20 @@ func mustListenCMux(lg *zap.Logger, tlsinfo *transport.TLSInfo) cmux.CMux {
return cmux.New(l)
}

func contextPropagationUnaryServerInterceptor() grpc.UnaryServerInterceptor {
return func(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
if md, ok := metadata.FromIncomingContext(ctx); ok {
ctx = metadata.NewOutgoingContext(ctx, md)
}
return handler(ctx, req)
}
}

func newGRPCProxyServer(lg *zap.Logger, client *clientv3.Client) *grpc.Server {
if grpcProxyEnableOrdering {
vf := ordering.NewOrderViolationSwitchEndpointClosure(client)
Expand Down Expand Up @@ -467,6 +482,7 @@ func newGRPCProxyServer(lg *zap.Logger, client *clientv3.Client) *grpc.Server {
}
grpcChainUnaryList := []grpc.UnaryServerInterceptor{
grpc_prometheus.UnaryServerInterceptor,
contextPropagationUnaryServerInterceptor(),
}
if grpcProxyEnableLogging {
grpcChainStreamList = append(grpcChainStreamList,
Expand Down
7 changes: 4 additions & 3 deletions server/proxy/grpcproxy/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) {

// post to stopc => terminate server stream; can't use a waitgroup
// since all goroutines will only terminate after Watch() exits.
stopc := make(chan struct{}, 3)
stopc := make(chan struct{}, 2)
leaderc := make(chan struct{}, 1)
go func() {
defer func() { stopc <- struct{}{} }()
wps.recvLoop()
Expand All @@ -134,15 +135,15 @@ func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) {
}()
// tear down watch if leader goes down or entire watch proxy is terminated
go func() {
defer func() { stopc <- struct{}{} }()
defer func() { leaderc <- struct{}{} }()
select {
case <-lostLeaderC:
case <-ctx.Done():
case <-wp.ctx.Done():
}
}()

<-stopc
<-leaderc
cancel()

// recv/send may only shutdown after function exits;
Expand Down

0 comments on commit 98844f0

Please sign in to comment.