diff --git a/client/egrpc/interceptor.go b/client/egrpc/interceptor.go index 87124c61..cadfbc62 100644 --- a/client/egrpc/interceptor.go +++ b/client/egrpc/interceptor.go @@ -348,7 +348,13 @@ func (c *Container) loggerUnaryClientInterceptor() grpc.UnaryClientInterceptor { } if c.config.EnableAccessInterceptorReq { - fields = append(fields, elog.Any("req", json.RawMessage(xstring.JSON(req)))) + var reqMap = map[string]interface{}{ + "payload": xstring.JSON(req), + } + if md, ok := metadata.FromOutgoingContext(ctx); ok { + reqMap["metadata"] = md + } + fields = append(fields, elog.Any("req", reqMap)) } if c.config.EnableAccessInterceptorRes { fields = append(fields, elog.Any("res", json.RawMessage(xstring.JSON(res)))) diff --git a/client/ehttp/component.go b/client/ehttp/component.go index 1ce48359..cfe0c0d2 100644 --- a/client/ehttp/component.go +++ b/client/ehttp/component.go @@ -4,9 +4,13 @@ import ( "net" "net/http" "net/http/cookiejar" + "net/url" + "strings" "time" "github.com/go-resty/resty/v2" + "github.com/gotomicro/ego/client/ehttp/resolver" + "github.com/gotomicro/ego/core/eregistry" "golang.org/x/net/publicsuffix" "github.com/gotomicro/ego/core/eapp" @@ -22,9 +26,23 @@ type Component struct { config *Config logger *elog.Component *resty.Client + builder resolver.Builder } func newComponent(name string, config *Config, logger *elog.Component) *Component { + // addr可以为空 + // 以下方法是为了支持k8s解析域名, k8s:///svc-user:9002 http接口,一定要是三斜线,跟gRPC统一 + egoTarget, err := parseTarget(config.Addr) + if err != nil { + elog.Panic("parse addr error", elog.FieldErr(err), elog.FieldKey(config.Addr)) + } + addr := strings.ReplaceAll(config.Addr, egoTarget.Scheme+"://", "http://") + builder := resolver.Get(egoTarget.Scheme) + resolver, err := builder.Build(addr) + if err != nil { + elog.Panic("build resolver error", elog.FieldErr(err), elog.FieldKey(config.Addr)) + } + // resty的默认方法,无法设置长连接个数,和是否开启长连接,这里重新构造http client。 cookieJar, _ := cookiejar.New(&cookiejar.Options{PublicSuffixList: publicsuffix.List}) interceptors := []interceptor{fixedInterceptor, logInterceptor, metricInterceptor, traceInterceptor} @@ -32,9 +50,9 @@ func newComponent(name string, config *Config, logger *elog.Component) *Componen SetDebug(config.RawDebug). SetTimeout(config.ReadTimeout). SetHeader("app", eapp.Name()). - SetBaseURL(config.Addr) + SetBaseURL(addr) for _, interceptor := range interceptors { - onBefore, onAfter, onErr := interceptor(name, config, logger) + onBefore, onAfter, onErr := interceptor(name, config, logger, resolver) if onBefore != nil { cli.OnBeforeRequest(onBefore) } @@ -47,11 +65,32 @@ func newComponent(name string, config *Config, logger *elog.Component) *Componen } return &Component{ - name: name, - config: config, - logger: logger, - Client: cli, + name: name, + config: config, + logger: logger, + Client: cli, + builder: builder, + } +} + +func parseTarget(addr string) (eregistry.Target, error) { + target, err := url.Parse(addr) + if err != nil { + return eregistry.Target{}, err + } + endpoint := target.Path + if endpoint == "" { + endpoint = target.Opaque + } + endpoint = strings.TrimPrefix(endpoint, "/") + + egoTarget := eregistry.Target{ + Protocol: eregistry.ProtocolHTTP, + Scheme: target.Scheme, + Endpoint: endpoint, + Authority: target.Host, } + return egoTarget, nil } func createTransport(config *Config) *http.Transport { diff --git a/client/ehttp/interceptor.go b/client/ehttp/interceptor.go index c598cef6..ba9f984c 100644 --- a/client/ehttp/interceptor.go +++ b/client/ehttp/interceptor.go @@ -11,6 +11,7 @@ import ( "time" "github.com/go-resty/resty/v2" + "github.com/gotomicro/ego/client/ehttp/resolver" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" semconv "go.opentelemetry.io/otel/semconv/v1.7.0" @@ -23,7 +24,7 @@ import ( "github.com/gotomicro/ego/core/util/xdebug" ) -type interceptor func(name string, cfg *Config, logger *elog.Component) (resty.RequestMiddleware, resty.ResponseMiddleware, resty.ErrorHook) +type interceptor func(name string, cfg *Config, logger *elog.Component, builder resolver.Resolver) (resty.RequestMiddleware, resty.ResponseMiddleware, resty.ErrorHook) func logAccess(name string, config *Config, logger *elog.Component, req *resty.Request, res *resty.Response, err error) { u := req.Context().Value(urlKey{}).(*url.URL) @@ -90,7 +91,7 @@ func beg(ctx context.Context) time.Time { return begTime } -func fixedInterceptor(name string, config *Config, logger *elog.Component) (resty.RequestMiddleware, resty.ResponseMiddleware, resty.ErrorHook) { +func fixedInterceptor(name string, config *Config, logger *elog.Component, builder resolver.Resolver) (resty.RequestMiddleware, resty.ResponseMiddleware, resty.ErrorHook) { return func(cli *resty.Client, req *resty.Request) error { // 这个URL可能不准,每次请求都需要重复url.Parse(),会增加一定的性能损耗 var concatURL string @@ -114,12 +115,16 @@ func fixedInterceptor(name string, config *Config, logger *elog.Component) (rest } } } + // 只有存在,才会更新 + if builder.GetAddr() != "" { + cli.HostURL = builder.GetAddr() + } req.SetContext(context.WithValue(context.WithValue(req.Context(), begKey{}, time.Now()), urlKey{}, u)) return nil }, nil, nil } -func logInterceptor(name string, config *Config, logger *elog.Component) (resty.RequestMiddleware, resty.ResponseMiddleware, resty.ErrorHook) { +func logInterceptor(name string, config *Config, logger *elog.Component, builder resolver.Resolver) (resty.RequestMiddleware, resty.ResponseMiddleware, resty.ErrorHook) { afterFn := func(cli *resty.Client, response *resty.Response) error { logAccess(name, config, logger, response.Request, response, nil) return nil @@ -134,7 +139,7 @@ func logInterceptor(name string, config *Config, logger *elog.Component) (resty. return nil, afterFn, errorFn } -func metricInterceptor(name string, config *Config, logger *elog.Component) (resty.RequestMiddleware, resty.ResponseMiddleware, resty.ErrorHook) { +func metricInterceptor(name string, config *Config, logger *elog.Component, builder resolver.Resolver) (resty.RequestMiddleware, resty.ResponseMiddleware, resty.ErrorHook) { if !config.EnableMetricsInterceptor { return nil, nil, nil } @@ -157,7 +162,7 @@ func metricInterceptor(name string, config *Config, logger *elog.Component) (res return nil, afterFn, errorFn } -func traceInterceptor(name string, config *Config, logger *elog.Component) (resty.RequestMiddleware, resty.ResponseMiddleware, resty.ErrorHook) { +func traceInterceptor(name string, config *Config, logger *elog.Component, builder resolver.Resolver) (resty.RequestMiddleware, resty.ResponseMiddleware, resty.ErrorHook) { tracer := etrace.NewTracer(trace.SpanKindClient) attrs := []attribute.KeyValue{ semconv.RPCSystemKey.String("http"), diff --git a/client/ehttp/resolver/resolver.go b/client/ehttp/resolver/resolver.go new file mode 100644 index 00000000..9fdcb3a2 --- /dev/null +++ b/client/ehttp/resolver/resolver.go @@ -0,0 +1,177 @@ +package resolver + +import ( + "context" + "net/url" + "strings" + + "google.golang.org/grpc/attributes" + "google.golang.org/grpc/resolver" + + "github.com/gotomicro/ego/core/constant" + "github.com/gotomicro/ego/core/eregistry" + "github.com/gotomicro/ego/server" +) + +var ( + // m is a map from scheme to resolver builder. + m = make(map[string]Builder) + // defaultScheme is the default scheme to use. + defaultScheme = "http" +) + +// Builder creates a resolver that will be used to watch name resolution updates. +type Builder interface { + // Build creates a new resolver for the given target. + // gRPC dial calls Build synchronously, and fails if the returned error is + // not nil. + Build(addr string) (Resolver, error) + // Scheme returns the scheme supported by this resolver. + Scheme() string +} + +type Resolver interface { + GetAddr() string +} + +// Register ... +func Register(name string, reg eregistry.Registry) { + b := &baseBuilder{ + name: name, + reg: reg, + } + m[b.Scheme()] = b +} + +func Get(scheme string) Builder { + if b, ok := m[scheme]; ok { + return b + } + return nil +} + +type baseBuilder struct { + name string + reg eregistry.Registry +} + +// Build ... +func (b *baseBuilder) Build(addr string) (Resolver, error) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + target, err := url.Parse(addr) + if err != nil { + return nil, err + } + endpoint := target.Path + if endpoint == "" { + endpoint = target.Opaque + } + endpoint = strings.TrimPrefix(endpoint, "/") + + egoTarget := eregistry.Target{ + Protocol: eregistry.ProtocolHTTP, + Scheme: target.Scheme, + Endpoint: endpoint, + Authority: target.Host, + } + + endpoints, err := b.reg.WatchServices(ctx, egoTarget) + if err != nil { + cancel() + return nil, err + } + + br := &baseResolver{ + target: egoTarget, + stop: make(chan struct{}), + reg: b.reg, + cancel: cancel, + nodeInfo: make(map[string]*attributes.Attributes), + } + br.run(endpoints) + return br, nil +} + +// Scheme ... +func (b *baseBuilder) Scheme() string { + return b.name +} + +type baseResolver struct { + target eregistry.Target // 使用ego的target,因为官方的target后续会不兼容 + stop chan struct{} + reg eregistry.Registry + cancel context.CancelFunc + addrSlices []string + nodeInfo map[string]*attributes.Attributes // node节点的属性 +} + +func (b *baseResolver) GetAddr() string { + for key := range b.nodeInfo { + return "http://" + key + } + return "" +} + +// Close ... +func (b *baseResolver) Close() { + b.stop <- struct{}{} + b.cancel() +} + +// run 更新节点信息 +// State +// +// Addresses []Address{ IP列表 +// Addr: IP 地址, +// ServerName: 应用名称, 如:svc-user +// Attributes: 节点基本信息: server.ServiceInfo +// } +// Attributes: { 用于负载均衡的配置,目前需要通过后台来设置 +// constant.KeyRouteConfig 路由配置 +// constant.KeyProviderConfig 服务提供方元信息 +// constant.KeyConsumerConfig 服务消费方配置信息 +// } +func (b *baseResolver) run(endpoints chan eregistry.Endpoints) { + go func() { + for { + select { + case endpoint := <-endpoints: + var state = resolver.State{ + Addresses: make([]resolver.Address, 0), + Attributes: attributes.New(constant.KeyRouteConfig, endpoint.RouteConfigs). // 路由配置 + WithValue(constant.KeyProviderConfig, endpoint.ProviderConfigs). // 服务提供方元信息 + WithValue(constant.KeyConsumerConfig, endpoint.ConsumerConfigs), // 服务消费方配置信息 + } + // 如果node信息有变更,那么就添加,更新或者删除 + b.tryUpdateAttrs(endpoint.Nodes) + for key, node := range endpoint.Nodes { + var address resolver.Address + address.Addr = node.Address + address.ServerName = b.target.Endpoint + address.Attributes = b.nodeInfo[key] + state.Addresses = append(state.Addresses, address) + } + case <-b.stop: + return + } + } + }() +} + +// tryUpdateAttrs 更新节点数据 +func (b *baseResolver) tryUpdateAttrs(nodes map[string]server.ServiceInfo) { + for addr, node := range nodes { + oldAttr, ok := b.nodeInfo[addr] + newAttr := attributes.New(constant.KeyServiceInfo, node) + if !ok || !oldAttr.Equal(newAttr) { + b.nodeInfo[addr] = newAttr + } + } + for addr := range b.nodeInfo { + if _, ok := nodes[addr]; !ok { + delete(b.nodeInfo, addr) + } + } +} diff --git a/client/ehttp/resolver/resolver_http.go b/client/ehttp/resolver/resolver_http.go new file mode 100644 index 00000000..2b513688 --- /dev/null +++ b/client/ehttp/resolver/resolver_http.go @@ -0,0 +1,29 @@ +package resolver + +type baseHttpBuilder struct { + name string +} + +type baseHttpResolver struct { +} + +func init() { + m["http"] = &baseHttpBuilder{} + m["https"] = &baseHttpBuilder{} + // 用户是没有传协议 + m[""] = &baseHttpBuilder{} +} + +// Build ... +func (b *baseHttpBuilder) Build(addr string) (Resolver, error) { + return &baseHttpResolver{}, nil +} + +// Scheme ... +func (b *baseHttpBuilder) Scheme() string { + return b.name +} + +func (b *baseHttpResolver) GetAddr() string { + return "" +}