Skip to content

Commit

Permalink
fix:修复eureka协议中心跳错误的处理 (#670)
Browse files Browse the repository at this point in the history
* docs:优化错误信息描述

* Update zh.toml

* fix:修复eureka心跳协议错误码不兼容问题

* fix:修复eureka心跳协议错误码不兼容问题

* fix:修复eureka心跳协议错误码不兼容问题

* unit:添加单元测试

* test:调整测试配置文件位置
  • Loading branch information
chuntaojun authored Sep 18, 2022
1 parent 168376b commit b32b1a5
Show file tree
Hide file tree
Showing 14 changed files with 923 additions and 5 deletions.
208 changes: 208 additions & 0 deletions apiserver/eurekaserver/eureka_suit_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
/**
* Tencent is pleased to support the open source community by making Polaris available.
*
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package eurekaserver

import (
"context"
"fmt"
"os"
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/polarismesh/polaris-server/auth"
"github.com/polarismesh/polaris-server/cache"
commonlog "github.com/polarismesh/polaris-server/common/log"
"github.com/polarismesh/polaris-server/common/utils"
"github.com/polarismesh/polaris-server/namespace"
"github.com/polarismesh/polaris-server/plugin"
"github.com/polarismesh/polaris-server/service"
"github.com/polarismesh/polaris-server/service/batch"
"github.com/polarismesh/polaris-server/service/healthcheck"
"github.com/polarismesh/polaris-server/store"
storemock "github.com/polarismesh/polaris-server/store/mock"
"gopkg.in/yaml.v2"

"github.com/polarismesh/polaris-server/testdata"

// 注册相关默认插件
_ "github.com/polarismesh/polaris-server/plugin/auth/defaultauth"
_ "github.com/polarismesh/polaris-server/plugin/auth/platform"
_ "github.com/polarismesh/polaris-server/plugin/cmdb/memory"
_ "github.com/polarismesh/polaris-server/plugin/discoverevent/local"
_ "github.com/polarismesh/polaris-server/plugin/discoverstat/discoverlocal"
_ "github.com/polarismesh/polaris-server/plugin/healthchecker/heartbeatmemory"
_ "github.com/polarismesh/polaris-server/plugin/healthchecker/heartbeatredis"
_ "github.com/polarismesh/polaris-server/plugin/history/logger"
_ "github.com/polarismesh/polaris-server/plugin/password"
_ "github.com/polarismesh/polaris-server/plugin/ratelimit/lrurate"
_ "github.com/polarismesh/polaris-server/plugin/ratelimit/token"
_ "github.com/polarismesh/polaris-server/plugin/statis/local"
_ "github.com/polarismesh/polaris-server/store/boltdb"
_ "github.com/polarismesh/polaris-server/store/sqldb"
)

type Bootstrap struct {
Logger map[string]*commonlog.Options
}

type TestConfig struct {
Bootstrap Bootstrap `yaml:"bootstrap"`
Cache cache.Config `yaml:"cache"`
Namespace namespace.Config `yaml:"namespace"`
Naming service.Config `yaml:"naming"`
HealthChecks healthcheck.Config `yaml:"healthcheck"`
Store store.Config `yaml:"store"`
Auth auth.Config `yaml:"auth"`
Plugin plugin.Config `yaml:"plugin"`
}

type EurekaTestSuit struct {
cfg *TestConfig
server service.DiscoverServer
healthSvr *healthcheck.Server
namespaceSvr namespace.NamespaceOperateServer
cancelFlag bool
updateCacheInterval time.Duration
cancel context.CancelFunc
storage store.Store
}

type options func(cfg *TestConfig)

// 内部初始化函数
func (d *EurekaTestSuit) initialize(t *testing.T, callback func(t *testing.T, s *storemock.MockStore) error, opts ...options) error {
if err := d.loadConfig(); err != nil {
return err
}

for i := range opts {
opts[i](d.cfg)
}

_ = commonlog.Configure(d.cfg.Bootstrap.Logger)

commonlog.DefaultScope().SetOutputLevel(commonlog.ErrorLevel)
commonlog.NamingScope().SetOutputLevel(commonlog.ErrorLevel)
commonlog.CacheScope().SetOutputLevel(commonlog.ErrorLevel)
commonlog.StoreScope().SetOutputLevel(commonlog.ErrorLevel)
commonlog.AuthScope().SetOutputLevel(commonlog.ErrorLevel)

plugin.SetPluginConfig(&d.cfg.Plugin)

ctx, cancel := context.WithCancel(context.Background())
d.cancel = cancel

// 初始化存储层
ctrl := gomock.NewController(t)
s := storemock.NewMockStore(ctrl)
d.storage = s

if err := callback(t, s); err != nil {
return err
}

// 初始化缓存模块
if err := cache.TestCacheInitialize(ctx, &d.cfg.Cache, d.storage); err != nil {
return err
}

cacheMgn, err := cache.GetCacheManager()
if err != nil {
return err
}

// 批量控制器
namingBatchConfig, err := batch.ParseBatchConfig(d.cfg.Naming.Batch)
if err != nil {
return err
}
healthBatchConfig, err := batch.ParseBatchConfig(d.cfg.HealthChecks.Batch)
if err != nil {
return err
}

batchConfig := &batch.Config{
Register: namingBatchConfig.Register,
Deregister: namingBatchConfig.Register,
ClientRegister: namingBatchConfig.ClientRegister,
ClientDeregister: namingBatchConfig.ClientDeregister,
Heartbeat: healthBatchConfig.Heartbeat,
}

bc, err := batch.NewBatchCtrlWithConfig(d.storage, cacheMgn, batchConfig)
if err != nil {
log.Errorf("new batch ctrl with config err: %s", err.Error())
return err
}
bc.Start(ctx)

if len(d.cfg.HealthChecks.LocalHost) == 0 {
d.cfg.HealthChecks.LocalHost = utils.LocalHost // 补充healthCheck的配置
}
healthCheckServer, err := healthcheck.TestInitialize(ctx, &d.cfg.HealthChecks, d.cfg.Cache.Open, bc, d.storage)
if err != nil {
return err
}
cacheProvider, err := healthCheckServer.CacheProvider()
if err != nil {
return err
}
healthCheckServer.SetServiceCache(cacheMgn.Service())
healthCheckServer.SetInstanceCache(cacheMgn.Instance())

// 为 instance 的 cache 添加 健康检查的 Listener
cacheMgn.AddListener(cache.CacheNameInstance, []cache.Listener{cacheProvider})
cacheMgn.AddListener(cache.CacheNameClient, []cache.Listener{cacheProvider})

d.healthSvr = healthCheckServer
time.Sleep(5 * time.Second)
return nil
}

// 加载配置
func (d *EurekaTestSuit) loadConfig() error {
d.cfg = new(TestConfig)

confFileName := testdata.Path("eureka_apiserver_test.yaml")
if os.Getenv("STORE_MODE") == "sqldb" {
fmt.Printf("run store mode : sqldb\n")
confFileName = testdata.Path("eureka_apiserver_db_test.yaml")
}
file, err := os.Open(confFileName)
if err != nil {
fmt.Printf("[ERROR] %v\n", err)
return err
}

err = yaml.NewDecoder(file).Decode(d.cfg)
if err != nil {
fmt.Printf("[ERROR] %v\n", err)
return err
}

return err
}

func (d *EurekaTestSuit) Destroy() {
d.cancel()
time.Sleep(5 * time.Second)

d.storage.Destroy()
time.Sleep(5 * time.Second)
}
9 changes: 8 additions & 1 deletion apiserver/eurekaserver/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,14 @@ func (h *EurekaServer) updateStatus(ctx context.Context, appId string, instanceI

func (h *EurekaServer) renew(ctx context.Context, appId string, instanceId string) uint32 {
resp := h.healthCheckServer.Report(ctx, &api.Instance{Id: &wrappers.StringValue{Value: instanceId}})
return resp.GetCode().GetValue()
code := resp.GetCode().GetValue()

// 如果目标实例存在,但是没有开启心跳,对于 eureka 来说,仍然属于心跳上报成功
if code == api.HeartbeatOnDisabledIns {
return api.ExecuteSuccess
}

return code
}

func (h *EurekaServer) updateMetadata(ctx context.Context, instanceId string, metadata map[string]string) uint32 {
Expand Down
146 changes: 146 additions & 0 deletions apiserver/eurekaserver/write_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/**
* Tencent is pleased to support the open source community by making Polaris available.
*
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package eurekaserver

import (
"context"
"testing"
"time"

"github.com/golang/mock/gomock"
api "github.com/polarismesh/polaris-server/common/api/v1"
"github.com/polarismesh/polaris-server/common/model"
"github.com/polarismesh/polaris-server/common/utils"
"github.com/polarismesh/polaris-server/store/mock"
"github.com/stretchr/testify/assert"
"google.golang.org/protobuf/types/known/wrapperspb"
)

func TestEurekaServer_renew(t *testing.T) {

ins := &model.Instance{
ServiceID: utils.NewUUID(),
Proto: &api.Instance{
Service: utils.NewStringValue("echo"),
Namespace: utils.NewStringValue("default"),
Host: utils.NewStringValue("127.0.0.1"),
Port: utils.NewUInt32Value(8080),
HealthCheck: &api.HealthCheck{
Type: api.HealthCheck_HEARTBEAT,
Heartbeat: &api.HeartbeatHealthCheck{
Ttl: &wrapperspb.UInt32Value{
Value: 5,
},
},
},
},
Valid: true,
}

insId, resp := utils.CheckInstanceTetrad(ins.Proto)
if resp != nil {
t.Fatal(resp.GetInfo().GetValue())
return
}

ins.Proto.Id = utils.NewStringValue(insId)

disableBeatIns := &model.Instance{
ServiceID: utils.NewUUID(),
Proto: &api.Instance{
Service: utils.NewStringValue("echo"),
Namespace: utils.NewStringValue("default"),
Host: utils.NewStringValue("127.0.0.2"),
Port: utils.NewUInt32Value(8081),
HealthCheck: &api.HealthCheck{
Type: api.HealthCheck_HEARTBEAT,
Heartbeat: &api.HeartbeatHealthCheck{
Ttl: &wrapperspb.UInt32Value{
Value: 5,
},
},
},
},
Valid: true,
}

disableBeatInsId, resp := utils.CheckInstanceTetrad(disableBeatIns.Proto)
if resp != nil {
t.Fatal(resp.GetInfo().GetValue())
return
}

disableBeatIns.Proto.Id = utils.NewStringValue(disableBeatInsId)

eurekaSuit := &EurekaTestSuit{}
if err := eurekaSuit.initialize(t, func(t *testing.T, s *mock.MockStore) error {
s.
EXPECT().
GetMoreInstances(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
AnyTimes().
Return(map[string]*model.Instance{
insId: ins,
disableBeatInsId: disableBeatIns,
}, nil)
s.
EXPECT().
GetMoreServices(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
AnyTimes().
Return(map[string]*model.Service{
ins.ServiceID: {
ID: ins.ServiceID,
Name: ins.Proto.GetService().GetValue(),
Namespace: ins.Proto.GetNamespace().GetValue(),
},
}, nil)

s.EXPECT().GetInstancesCount().AnyTimes().Return(uint32(1), nil)
s.EXPECT().GetUnixSecond().AnyTimes().Return(time.Now().Unix(), nil)
s.EXPECT().Destroy().Return(nil)
return nil
}); err != nil {
t.Fatal(err)
}

defer eurekaSuit.Destroy()

t.Run("eureka客户端心跳上报-实例正常且开启心跳", func(t *testing.T) {
svr := &EurekaServer{
healthCheckServer: eurekaSuit.healthSvr,
}
code := svr.renew(context.Background(), "", insId)
assert.Equalf(t, api.ExecuteSuccess, code, "code need success, actual : %d", code)
})

t.Run("eureka客户端心跳上报-实例未开启心跳", func(t *testing.T) {
svr := &EurekaServer{
healthCheckServer: eurekaSuit.healthSvr,
}
code := svr.renew(context.Background(), "", disableBeatInsId)
assert.Equalf(t, api.ExecuteSuccess, code, "code need success, actual : %d", code)
})

t.Run("eureka客户端心跳上报-实例不存在", func(t *testing.T) {
svr := &EurekaServer{
healthCheckServer: eurekaSuit.healthSvr,
}
code := svr.renew(context.Background(), "", utils.NewUUID())
assert.Equalf(t, api.NotFoundResource, code, "code need notfound, actual : %d", code)
})

}
1 change: 1 addition & 0 deletions bootstrap/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ func StartDiscoverComponents(ctx context.Context, cfg *boot_config.Config, s sto
return err
}
healthCheckServer.SetServiceCache(cacheMgn.Service())
healthCheckServer.SetInstanceCache(cacheMgn.Instance())

// 为 instance 的 cache 添加 健康检查的 Listener
cacheMgn.AddListener(cache.CacheNameInstance, []cache.Listener{cacheProvider})
Expand Down
Loading

0 comments on commit b32b1a5

Please sign in to comment.