Skip to content

Commit

Permalink
add ctxStore
Browse files Browse the repository at this point in the history
  • Loading branch information
sevennt committed Jan 3, 2024
1 parent 68b343b commit f457702
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 16 deletions.
4 changes: 3 additions & 1 deletion examples/grpc/direct/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ package main
import (
"context"

"github.com/gotomicro/ego/server/egovernor"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/gotomicro/ego/server/egovernor"

"github.com/gotomicro/ego"
"github.com/gotomicro/ego/core/elog"
"github.com/gotomicro/ego/examples/helloworld"
Expand Down Expand Up @@ -36,6 +37,7 @@ func (g Greeter) SayHello(ctx context.Context, request *helloworld.HelloRequest)
if request.Name == "error" {
return nil, status.Error(codes.Unavailable, "error")
}
egrpc.CtxStoreSet(ctx, "x-biz-guid", "123")
// header := metadata.Pairs("x-header-key", "val")
// err := grpc.SendHeader(context, header)
// if err != nil {
Expand Down
48 changes: 33 additions & 15 deletions server/egrpc/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,6 @@ import (

sentinel "github.com/alibaba/sentinel-golang/api"
sentinelBase "github.com/alibaba/sentinel-golang/core/base"
"github.com/gotomicro/ego/core/eerrors"
"github.com/gotomicro/ego/core/elog"
"github.com/gotomicro/ego/core/emetric"
"github.com/gotomicro/ego/core/esentinel"
"github.com/gotomicro/ego/core/etrace"
"github.com/gotomicro/ego/core/transport"
"github.com/gotomicro/ego/core/util/xstring"
"github.com/gotomicro/ego/internal/ecode"
"github.com/gotomicro/ego/internal/egrpcinteceptor"
"github.com/gotomicro/ego/internal/tools"
"github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
Expand All @@ -32,6 +22,17 @@ import (
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/status"

"github.com/gotomicro/ego/core/eerrors"
"github.com/gotomicro/ego/core/elog"
"github.com/gotomicro/ego/core/emetric"
"github.com/gotomicro/ego/core/esentinel"
"github.com/gotomicro/ego/core/etrace"
"github.com/gotomicro/ego/core/transport"
"github.com/gotomicro/ego/core/util/xstring"
"github.com/gotomicro/ego/internal/ecode"
"github.com/gotomicro/ego/internal/egrpcinteceptor"
"github.com/gotomicro/ego/internal/tools"
)

func traceUnaryServerInterceptor() grpc.UnaryServerInterceptor {
Expand Down Expand Up @@ -206,8 +207,21 @@ func (c *Container) prometheusStreamServerInterceptor(ss grpc.ServerStream, info
emetric.ServerHandleCounter.Inc(emetric.TypeGRPCStream, info.FullMethod, getPeerName(ss.Context()), pbStatus.Message(), strconv.Itoa(ecode.GrpcToHTTPStatusCode(pbStatus.Code())), serviceName)
}

type ctxStore struct {
kvs map[string]any
}

type ctxStoreStruct struct{}

// CtxStoreSet 从ctx中尝试获取ctxStore,并往其中插入kv
func CtxStoreSet(ctx context.Context, k string, v any) {
skv, _ := ctx.Value(ctxStoreStruct{}).(*ctxStore)
skv.kvs[k] = v
}

func (c *Container) defaultUnaryServerInterceptor() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (res interface{}, err error) {
ctx = context.WithValue(ctx, ctxStoreStruct{}, &ctxStore{kvs: map[string]any{}})
// 默认过滤掉该探活日志
if c.config.EnableSkipHealthLog && info.FullMethod == "/grpc.health.v1.Health/Check" {
return handler(ctx, req)
Expand Down Expand Up @@ -270,9 +284,13 @@ func (c *Container) defaultUnaryServerInterceptor() grpc.UnaryServerInterceptor
elog.FieldPeerIP(getPeerIP(ctx)),
)

skv, _ := ctx.Value(ctxStoreStruct{}).(*ctxStore)
for _, key := range loggerKeys {
if v, ok := skv.kvs[key]; ok {
fields = append(fields, elog.Any(strings.ToLower(key), v))
}
if value := tools.ContextValue(ctx, key); value != "" {
fields = append(fields, elog.FieldCustomKeyValue(key, value))
fields = append(fields, elog.FieldCustomKeyValue(strings.ToLower(key), value))
}
}

Expand Down Expand Up @@ -319,7 +337,7 @@ func (c *Container) defaultUnaryServerInterceptor() grpc.UnaryServerInterceptor
c.prometheusUnaryServerInterceptor(ctx, info, spbStatus, cost)
}()

//if enableCPUUsage(ctx) {
// if enableCPUUsage(ctx) {
// var stat = xcpu.Stat{}
// xcpu.ReadStat(&stat)
// if stat.Usage > 0 {
Expand All @@ -330,7 +348,7 @@ func (c *Container) defaultUnaryServerInterceptor() grpc.UnaryServerInterceptor
// c.logger.Error("set header error", elog.FieldErr(err))
// }
// }
//}
// }
return handler(ctx, req)
}
}
Expand All @@ -349,9 +367,9 @@ func (c *Container) prometheusUnaryServerInterceptor(ctx context.Context, info *
}

// enableCPUUsage 是否开启cpu利用率
//func enableCPUUsage(ctx context.Context) bool {
// func enableCPUUsage(ctx context.Context) bool {
// return tools.GrpcHeaderValue(ctx, "enable-cpu-usage") == "true"
//}
// }

// getPeerName 获取对端应用名称
func getPeerName(ctx context.Context) string {
Expand Down

0 comments on commit f457702

Please sign in to comment.