Skip to content

Commit

Permalink
Merge pull request #354 from askuy/feature/http-k8s-20230728
Browse files Browse the repository at this point in the history
http k8s
  • Loading branch information
askuy authored Jul 28, 2023
2 parents 2fc787a + 167d125 commit 1fa1e38
Show file tree
Hide file tree
Showing 5 changed files with 268 additions and 12 deletions.
8 changes: 7 additions & 1 deletion client/egrpc/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,13 @@ func (c *Container) loggerUnaryClientInterceptor() grpc.UnaryClientInterceptor {
}

if c.config.EnableAccessInterceptorReq {
fields = append(fields, elog.Any("req", json.RawMessage(xstring.JSON(req))))
var reqMap = map[string]interface{}{
"payload": xstring.JSON(req),
}
if md, ok := metadata.FromOutgoingContext(ctx); ok {
reqMap["metadata"] = md
}
fields = append(fields, elog.Any("req", reqMap))
}
if c.config.EnableAccessInterceptorRes {
fields = append(fields, elog.Any("res", json.RawMessage(xstring.JSON(res))))
Expand Down
51 changes: 45 additions & 6 deletions client/ehttp/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,13 @@ import (
"net"
"net/http"
"net/http/cookiejar"
"net/url"
"strings"
"time"

"github.com/go-resty/resty/v2"
"github.com/gotomicro/ego/client/ehttp/resolver"
"github.com/gotomicro/ego/core/eregistry"
"golang.org/x/net/publicsuffix"

"github.com/gotomicro/ego/core/eapp"
Expand All @@ -22,19 +26,33 @@ type Component struct {
config *Config
logger *elog.Component
*resty.Client
builder resolver.Builder
}

func newComponent(name string, config *Config, logger *elog.Component) *Component {
// addr可以为空
// 以下方法是为了支持k8s解析域名, k8s:///svc-user:9002 http接口,一定要是三斜线,跟gRPC统一
egoTarget, err := parseTarget(config.Addr)
if err != nil {
elog.Panic("parse addr error", elog.FieldErr(err), elog.FieldKey(config.Addr))
}
addr := strings.ReplaceAll(config.Addr, egoTarget.Scheme+"://", "http://")
builder := resolver.Get(egoTarget.Scheme)
resolver, err := builder.Build(addr)
if err != nil {
elog.Panic("build resolver error", elog.FieldErr(err), elog.FieldKey(config.Addr))
}

// resty的默认方法,无法设置长连接个数,和是否开启长连接,这里重新构造http client。
cookieJar, _ := cookiejar.New(&cookiejar.Options{PublicSuffixList: publicsuffix.List})
interceptors := []interceptor{fixedInterceptor, logInterceptor, metricInterceptor, traceInterceptor}
cli := resty.NewWithClient(&http.Client{Transport: createTransport(config), Jar: cookieJar}).
SetDebug(config.RawDebug).
SetTimeout(config.ReadTimeout).
SetHeader("app", eapp.Name()).
SetBaseURL(config.Addr)
SetBaseURL(addr)
for _, interceptor := range interceptors {
onBefore, onAfter, onErr := interceptor(name, config, logger)
onBefore, onAfter, onErr := interceptor(name, config, logger, resolver)
if onBefore != nil {
cli.OnBeforeRequest(onBefore)
}
Expand All @@ -47,11 +65,32 @@ func newComponent(name string, config *Config, logger *elog.Component) *Componen
}

return &Component{
name: name,
config: config,
logger: logger,
Client: cli,
name: name,
config: config,
logger: logger,
Client: cli,
builder: builder,
}
}

func parseTarget(addr string) (eregistry.Target, error) {
target, err := url.Parse(addr)
if err != nil {
return eregistry.Target{}, err
}
endpoint := target.Path
if endpoint == "" {
endpoint = target.Opaque
}
endpoint = strings.TrimPrefix(endpoint, "/")

egoTarget := eregistry.Target{
Protocol: eregistry.ProtocolHTTP,
Scheme: target.Scheme,
Endpoint: endpoint,
Authority: target.Host,
}
return egoTarget, nil
}

func createTransport(config *Config) *http.Transport {
Expand Down
15 changes: 10 additions & 5 deletions client/ehttp/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/go-resty/resty/v2"
"github.com/gotomicro/ego/client/ehttp/resolver"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
semconv "go.opentelemetry.io/otel/semconv/v1.7.0"
Expand All @@ -23,7 +24,7 @@ import (
"github.com/gotomicro/ego/core/util/xdebug"
)

type interceptor func(name string, cfg *Config, logger *elog.Component) (resty.RequestMiddleware, resty.ResponseMiddleware, resty.ErrorHook)
type interceptor func(name string, cfg *Config, logger *elog.Component, builder resolver.Resolver) (resty.RequestMiddleware, resty.ResponseMiddleware, resty.ErrorHook)

func logAccess(name string, config *Config, logger *elog.Component, req *resty.Request, res *resty.Response, err error) {
u := req.Context().Value(urlKey{}).(*url.URL)
Expand Down Expand Up @@ -90,7 +91,7 @@ func beg(ctx context.Context) time.Time {
return begTime
}

func fixedInterceptor(name string, config *Config, logger *elog.Component) (resty.RequestMiddleware, resty.ResponseMiddleware, resty.ErrorHook) {
func fixedInterceptor(name string, config *Config, logger *elog.Component, builder resolver.Resolver) (resty.RequestMiddleware, resty.ResponseMiddleware, resty.ErrorHook) {
return func(cli *resty.Client, req *resty.Request) error {
// 这个URL可能不准,每次请求都需要重复url.Parse(),会增加一定的性能损耗
var concatURL string
Expand All @@ -114,12 +115,16 @@ func fixedInterceptor(name string, config *Config, logger *elog.Component) (rest
}
}
}
// 只有存在,才会更新
if builder.GetAddr() != "" {
cli.HostURL = builder.GetAddr()
}
req.SetContext(context.WithValue(context.WithValue(req.Context(), begKey{}, time.Now()), urlKey{}, u))
return nil
}, nil, nil
}

func logInterceptor(name string, config *Config, logger *elog.Component) (resty.RequestMiddleware, resty.ResponseMiddleware, resty.ErrorHook) {
func logInterceptor(name string, config *Config, logger *elog.Component, builder resolver.Resolver) (resty.RequestMiddleware, resty.ResponseMiddleware, resty.ErrorHook) {
afterFn := func(cli *resty.Client, response *resty.Response) error {
logAccess(name, config, logger, response.Request, response, nil)
return nil
Expand All @@ -134,7 +139,7 @@ func logInterceptor(name string, config *Config, logger *elog.Component) (resty.
return nil, afterFn, errorFn
}

func metricInterceptor(name string, config *Config, logger *elog.Component) (resty.RequestMiddleware, resty.ResponseMiddleware, resty.ErrorHook) {
func metricInterceptor(name string, config *Config, logger *elog.Component, builder resolver.Resolver) (resty.RequestMiddleware, resty.ResponseMiddleware, resty.ErrorHook) {
if !config.EnableMetricsInterceptor {
return nil, nil, nil
}
Expand All @@ -157,7 +162,7 @@ func metricInterceptor(name string, config *Config, logger *elog.Component) (res
return nil, afterFn, errorFn
}

func traceInterceptor(name string, config *Config, logger *elog.Component) (resty.RequestMiddleware, resty.ResponseMiddleware, resty.ErrorHook) {
func traceInterceptor(name string, config *Config, logger *elog.Component, builder resolver.Resolver) (resty.RequestMiddleware, resty.ResponseMiddleware, resty.ErrorHook) {
tracer := etrace.NewTracer(trace.SpanKindClient)
attrs := []attribute.KeyValue{
semconv.RPCSystemKey.String("http"),
Expand Down
177 changes: 177 additions & 0 deletions client/ehttp/resolver/resolver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
package resolver

import (
"context"
"net/url"
"strings"

"google.golang.org/grpc/attributes"
"google.golang.org/grpc/resolver"

"github.com/gotomicro/ego/core/constant"
"github.com/gotomicro/ego/core/eregistry"
"github.com/gotomicro/ego/server"
)

var (
// m is a map from scheme to resolver builder.
m = make(map[string]Builder)
// defaultScheme is the default scheme to use.
defaultScheme = "http"
)

// Builder creates a resolver that will be used to watch name resolution updates.
type Builder interface {
// Build creates a new resolver for the given target.
// gRPC dial calls Build synchronously, and fails if the returned error is
// not nil.
Build(addr string) (Resolver, error)
// Scheme returns the scheme supported by this resolver.
Scheme() string
}

type Resolver interface {
GetAddr() string
}

// Register ...
func Register(name string, reg eregistry.Registry) {
b := &baseBuilder{
name: name,
reg: reg,
}
m[b.Scheme()] = b
}

func Get(scheme string) Builder {
if b, ok := m[scheme]; ok {
return b
}
return nil
}

type baseBuilder struct {
name string
reg eregistry.Registry
}

// Build ...
func (b *baseBuilder) Build(addr string) (Resolver, error) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
target, err := url.Parse(addr)
if err != nil {
return nil, err
}
endpoint := target.Path
if endpoint == "" {
endpoint = target.Opaque
}
endpoint = strings.TrimPrefix(endpoint, "/")

egoTarget := eregistry.Target{
Protocol: eregistry.ProtocolHTTP,
Scheme: target.Scheme,
Endpoint: endpoint,
Authority: target.Host,
}

endpoints, err := b.reg.WatchServices(ctx, egoTarget)
if err != nil {
cancel()
return nil, err
}

br := &baseResolver{
target: egoTarget,
stop: make(chan struct{}),
reg: b.reg,
cancel: cancel,
nodeInfo: make(map[string]*attributes.Attributes),
}
br.run(endpoints)
return br, nil
}

// Scheme ...
func (b *baseBuilder) Scheme() string {
return b.name
}

type baseResolver struct {
target eregistry.Target // 使用ego的target,因为官方的target后续会不兼容
stop chan struct{}
reg eregistry.Registry
cancel context.CancelFunc
addrSlices []string
nodeInfo map[string]*attributes.Attributes // node节点的属性
}

func (b *baseResolver) GetAddr() string {
for key := range b.nodeInfo {
return "http://" + key
}
return ""
}

// Close ...
func (b *baseResolver) Close() {
b.stop <- struct{}{}
b.cancel()
}

// run 更新节点信息
// State
//
// Addresses []Address{ IP列表
// Addr: IP 地址,
// ServerName: 应用名称, 如:svc-user
// Attributes: 节点基本信息: server.ServiceInfo
// }
// Attributes: { 用于负载均衡的配置,目前需要通过后台来设置
// constant.KeyRouteConfig 路由配置
// constant.KeyProviderConfig 服务提供方元信息
// constant.KeyConsumerConfig 服务消费方配置信息
// }
func (b *baseResolver) run(endpoints chan eregistry.Endpoints) {
go func() {
for {
select {
case endpoint := <-endpoints:
var state = resolver.State{
Addresses: make([]resolver.Address, 0),
Attributes: attributes.New(constant.KeyRouteConfig, endpoint.RouteConfigs). // 路由配置
WithValue(constant.KeyProviderConfig, endpoint.ProviderConfigs). // 服务提供方元信息
WithValue(constant.KeyConsumerConfig, endpoint.ConsumerConfigs), // 服务消费方配置信息
}
// 如果node信息有变更,那么就添加,更新或者删除
b.tryUpdateAttrs(endpoint.Nodes)
for key, node := range endpoint.Nodes {
var address resolver.Address
address.Addr = node.Address
address.ServerName = b.target.Endpoint
address.Attributes = b.nodeInfo[key]
state.Addresses = append(state.Addresses, address)
}
case <-b.stop:
return
}
}
}()
}

// tryUpdateAttrs 更新节点数据
func (b *baseResolver) tryUpdateAttrs(nodes map[string]server.ServiceInfo) {
for addr, node := range nodes {
oldAttr, ok := b.nodeInfo[addr]
newAttr := attributes.New(constant.KeyServiceInfo, node)
if !ok || !oldAttr.Equal(newAttr) {
b.nodeInfo[addr] = newAttr
}
}
for addr := range b.nodeInfo {
if _, ok := nodes[addr]; !ok {
delete(b.nodeInfo, addr)
}
}
}
29 changes: 29 additions & 0 deletions client/ehttp/resolver/resolver_http.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package resolver

type baseHttpBuilder struct {
name string
}

type baseHttpResolver struct {
}

func init() {
m["http"] = &baseHttpBuilder{}
m["https"] = &baseHttpBuilder{}
// 用户是没有传协议
m[""] = &baseHttpBuilder{}
}

// Build ...
func (b *baseHttpBuilder) Build(addr string) (Resolver, error) {
return &baseHttpResolver{}, nil
}

// Scheme ...
func (b *baseHttpBuilder) Scheme() string {
return b.name
}

func (b *baseHttpResolver) GetAddr() string {
return ""
}

0 comments on commit 1fa1e38

Please sign in to comment.