From f457702566ee8128fe4f0a037c5c4823af11b732 Mon Sep 17 00:00:00 2001 From: sevennt Date: Wed, 3 Jan 2024 15:49:47 +0800 Subject: [PATCH] add ctxStore --- examples/grpc/direct/server/main.go | 4 ++- server/egrpc/interceptor.go | 48 ++++++++++++++++++++--------- 2 files changed, 36 insertions(+), 16 deletions(-) diff --git a/examples/grpc/direct/server/main.go b/examples/grpc/direct/server/main.go index 692e7d5b..5e312371 100644 --- a/examples/grpc/direct/server/main.go +++ b/examples/grpc/direct/server/main.go @@ -3,10 +3,11 @@ package main import ( "context" - "github.com/gotomicro/ego/server/egovernor" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "github.com/gotomicro/ego/server/egovernor" + "github.com/gotomicro/ego" "github.com/gotomicro/ego/core/elog" "github.com/gotomicro/ego/examples/helloworld" @@ -36,6 +37,7 @@ func (g Greeter) SayHello(ctx context.Context, request *helloworld.HelloRequest) if request.Name == "error" { return nil, status.Error(codes.Unavailable, "error") } + egrpc.CtxStoreSet(ctx, "x-biz-guid", "123") // header := metadata.Pairs("x-header-key", "val") // err := grpc.SendHeader(context, header) // if err != nil { diff --git a/server/egrpc/interceptor.go b/server/egrpc/interceptor.go index f436c6ef..83aa4959 100644 --- a/server/egrpc/interceptor.go +++ b/server/egrpc/interceptor.go @@ -12,16 +12,6 @@ import ( sentinel "github.com/alibaba/sentinel-golang/api" sentinelBase "github.com/alibaba/sentinel-golang/core/base" - "github.com/gotomicro/ego/core/eerrors" - "github.com/gotomicro/ego/core/elog" - "github.com/gotomicro/ego/core/emetric" - "github.com/gotomicro/ego/core/esentinel" - "github.com/gotomicro/ego/core/etrace" - "github.com/gotomicro/ego/core/transport" - "github.com/gotomicro/ego/core/util/xstring" - "github.com/gotomicro/ego/internal/ecode" - "github.com/gotomicro/ego/internal/egrpcinteceptor" - "github.com/gotomicro/ego/internal/tools" "github.com/prometheus/client_golang/prometheus" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" @@ -32,6 +22,17 @@ import ( "google.golang.org/grpc/metadata" "google.golang.org/grpc/peer" "google.golang.org/grpc/status" + + "github.com/gotomicro/ego/core/eerrors" + "github.com/gotomicro/ego/core/elog" + "github.com/gotomicro/ego/core/emetric" + "github.com/gotomicro/ego/core/esentinel" + "github.com/gotomicro/ego/core/etrace" + "github.com/gotomicro/ego/core/transport" + "github.com/gotomicro/ego/core/util/xstring" + "github.com/gotomicro/ego/internal/ecode" + "github.com/gotomicro/ego/internal/egrpcinteceptor" + "github.com/gotomicro/ego/internal/tools" ) func traceUnaryServerInterceptor() grpc.UnaryServerInterceptor { @@ -206,8 +207,21 @@ func (c *Container) prometheusStreamServerInterceptor(ss grpc.ServerStream, info emetric.ServerHandleCounter.Inc(emetric.TypeGRPCStream, info.FullMethod, getPeerName(ss.Context()), pbStatus.Message(), strconv.Itoa(ecode.GrpcToHTTPStatusCode(pbStatus.Code())), serviceName) } +type ctxStore struct { + kvs map[string]any +} + +type ctxStoreStruct struct{} + +// CtxStoreSet 从ctx中尝试获取ctxStore,并往其中插入kv +func CtxStoreSet(ctx context.Context, k string, v any) { + skv, _ := ctx.Value(ctxStoreStruct{}).(*ctxStore) + skv.kvs[k] = v +} + func (c *Container) defaultUnaryServerInterceptor() grpc.UnaryServerInterceptor { return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (res interface{}, err error) { + ctx = context.WithValue(ctx, ctxStoreStruct{}, &ctxStore{kvs: map[string]any{}}) // 默认过滤掉该探活日志 if c.config.EnableSkipHealthLog && info.FullMethod == "/grpc.health.v1.Health/Check" { return handler(ctx, req) @@ -270,9 +284,13 @@ func (c *Container) defaultUnaryServerInterceptor() grpc.UnaryServerInterceptor elog.FieldPeerIP(getPeerIP(ctx)), ) + skv, _ := ctx.Value(ctxStoreStruct{}).(*ctxStore) for _, key := range loggerKeys { + if v, ok := skv.kvs[key]; ok { + fields = append(fields, elog.Any(strings.ToLower(key), v)) + } if value := tools.ContextValue(ctx, key); value != "" { - fields = append(fields, elog.FieldCustomKeyValue(key, value)) + fields = append(fields, elog.FieldCustomKeyValue(strings.ToLower(key), value)) } } @@ -319,7 +337,7 @@ func (c *Container) defaultUnaryServerInterceptor() grpc.UnaryServerInterceptor c.prometheusUnaryServerInterceptor(ctx, info, spbStatus, cost) }() - //if enableCPUUsage(ctx) { + // if enableCPUUsage(ctx) { // var stat = xcpu.Stat{} // xcpu.ReadStat(&stat) // if stat.Usage > 0 { @@ -330,7 +348,7 @@ func (c *Container) defaultUnaryServerInterceptor() grpc.UnaryServerInterceptor // c.logger.Error("set header error", elog.FieldErr(err)) // } // } - //} + // } return handler(ctx, req) } } @@ -349,9 +367,9 @@ func (c *Container) prometheusUnaryServerInterceptor(ctx context.Context, info * } // enableCPUUsage 是否开启cpu利用率 -//func enableCPUUsage(ctx context.Context) bool { +// func enableCPUUsage(ctx context.Context) bool { // return tools.GrpcHeaderValue(ctx, "enable-cpu-usage") == "true" -//} +// } // getPeerName 获取对端应用名称 func getPeerName(ctx context.Context) string {