Skip to content

Commit

Permalink
增强batchDeleteInstanceMeta方法的场景兼容性 (#837)
Browse files Browse the repository at this point in the history
* Fix k8s deploy (#819)

* fix issue #629 (#693)

* docs:优化错误信息描述

* Update zh.toml

* fix:修复eureka心跳协议错误码不兼容问题

* fix:修复eureka心跳协议错误码不兼容问题

* unit:添加单元测试

* test:调整测试配置文件位置

* fix:issue #692

* fix:issue #692

* fix:issue #692

* fix:issue #692

* docs:add error code desc

* fix:调整license-checker的触发

* fix:调整license-checker的触发

* fix:k8s standalone deploy

* [ISSUE #789] Leader Election (#817)

* Add leader election

* fix test

* add sql delta

* feat:优化eureka插件大小写敏感,默认大小写不敏感 (#820) (#821)

* feat: 支持eureka双向数据同步

* fix:golintci warnings

* feat: 调整import顺序

* fix:修复golint格式化问题

* fix:修复导入问题

* fix: 修复导入顺序问题

* Fix k8s deploy (#819)

* fix issue #629 (#693)

* docs:优化错误信息描述

* Update zh.toml

* fix:修复eureka心跳协议错误码不兼容问题

* fix:修复eureka心跳协议错误码不兼容问题

* unit:添加单元测试

* test:调整测试配置文件位置

* fix:issue #692

* fix:issue #692

* fix:issue #692

* fix:issue #692

* docs:add error code desc

* fix:调整license-checker的触发

* fix:调整license-checker的触发

* fix:k8s standalone deploy

* feat:优化大小写问题,eureka插件默认大小写不敏感,无需配置

* feat:读取的时候进行大写转换

* feat:修改读取时候大小写

* feat:处理replication大小写

* fix:修复用例失败问题

* fix:修复用例失败问题

Co-authored-by: liaochuntao <[email protected]>

Co-authored-by: liaochuntao <[email protected]>

* fix: 解决eureka同步时,出现存量服务同步失败的问题 (#823)

* feat: 支持eureka双向数据同步

* fix:golintci warnings

* feat: 调整import顺序

* fix:修复golint格式化问题

* fix:修复导入问题

* fix: 修复导入顺序问题

* feat:优化大小写问题,eureka插件默认大小写不敏感,无需配置

* feat:读取的时候进行大写转换

* feat:修改读取时候大小写

* feat:处理replication大小写

* fix:修复用例失败问题

* fix:修复用例失败问题

* fix:修复eureka心跳复制失败的问题

* fix:解决自注册时候,服务不存在需要动态创建的问题

* fix:修复服务不存在,创建报错问题

* feat:修复polaris之间走eureka同步接口出现报错的问题 (#830)

* fix: eureka同步时,实例不存在没有返回404,导致eureka触发不了重注册 (#833)

* 增强batchDeleteInstanceMeta方法的场景兼容性

解决以下sql
delete from instance_metadata where id in (,?)

Co-authored-by: liaochuntao <[email protected]>
Co-authored-by: Shichao <[email protected]>
Co-authored-by: andrew shan <[email protected]>
  • Loading branch information
4 people authored Dec 2, 2022
1 parent 10e0a43 commit a444c2a
Show file tree
Hide file tree
Showing 11 changed files with 665 additions and 49 deletions.
5 changes: 5 additions & 0 deletions store/boltdb/maintain.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ type maintainStore struct {
handler BoltHandler
}

// IsLeader
func (m *maintainStore) IsLeader() bool {
return true
}

// BatchCleanDeletedInstances
func (m *maintainStore) BatchCleanDeletedInstances(batchSize uint32) (uint32, error) {
fields := []string{insFieldValid}
Expand Down
2 changes: 2 additions & 0 deletions store/maintain_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package store

type MaintainStore interface {
// IsLeader whether it is leader node
IsLeader() bool

// BatchCleanDeletedInstances batch clean soft deleted instances
BatchCleanDeletedInstances(batchSize uint32) (uint32, error)
Expand Down
15 changes: 14 additions & 1 deletion store/mock/api_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions store/sqldb/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,12 @@ func (s *stableStore) Initialize(conf *store.Config) error {

s.start = true
s.newStore()

if err := s.maintainStore.StartLeaderElection(); err != nil {
log.Errorf("[Store][database] leader election start failed")
return err
}
log.Infof("[Store][database] leader election start successfully")
return nil
}

Expand Down Expand Up @@ -206,6 +212,10 @@ func (s *stableStore) Destroy() error {
_ = s.slave.Close()
}

if s.maintainStore != nil {
s.maintainStore.StopLeaderElection()
}

s.master = nil
s.masterTx = nil
s.slave = nil
Expand Down
9 changes: 5 additions & 4 deletions store/sqldb/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -902,17 +902,18 @@ func addInstanceMeta(tx *BaseTx, id string, meta map[string]string) error {
func batchDeleteInstanceMeta(tx *BaseTx, instances []*model.Instance) error {
ids := make([]interface{}, 0, len(instances))
builder := strings.Builder{}
for idx, entry := range instances {
for _, entry := range instances {
// If instance is first registration, no need to participate in the following METADATA cleaning action
if entry.FirstRegis {
continue
}

if idx > 0 {

ids = append(ids, entry.ID())

if len(ids) > 1 {
builder.WriteString(",")
}
builder.WriteString("?")
ids = append(ids, entry.ID())
}

if len(ids) == 0 {
Expand Down
250 changes: 250 additions & 0 deletions store/sqldb/maintain.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,250 @@
/**
* Tencent is pleased to support the open source community by making Polaris available.
*
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package sqldb

import (
"context"
"sync/atomic"
"time"

"github.com/polarismesh/polaris/common/utils"
"github.com/polarismesh/polaris/store"
)

const (
DefaultElectKey = "polaris-server"
TickTime = 2
LeaseTime = 10
)

// maintainStore implement MaintainStore interface
type maintainStore struct {
master *BaseDB
le *leaderElectionStateMachine
}

// LeaderElectionStore store inteface
type LeaderElectionStore interface {
// GetVersion get current version
GetVersion(key string) (int64, error)
// CompareAndSwapVersion cas version
CompareAndSwapVersion(key string, curVersion int64, newVersion int64, leader string) (bool, error)
// CheckMtimeExpired check mtime expired
CheckMtimeExpired(key string, leaseTime int32) (bool, error)
}

// leaderElectionStore
type leaderElectionStore struct {
master *BaseDB
}

// GetVersion
func (l *leaderElectionStore) GetVersion(key string) (int64, error) {
log.Debugf("[Store][database] get version (%s)", key)
mainStr := "select version from leader_election where elect_key = ?"

var count int64
err := l.master.DB.QueryRow(mainStr, key).Scan(&count)
if err != nil {
log.Errorf("[Store][database] get version (%s), err: %s", key, err.Error())
}
return count, store.Error(err)
}

// CompareAndSwapVersion
func (l *leaderElectionStore) CompareAndSwapVersion(key string, curVersion int64, newVersion int64, leader string) (bool, error) {
log.Debugf("[Store][database] compare and swap version (%s, %d, %d, %s)", key, curVersion, newVersion, leader)
mainStr := "update leader_election set leader = ?, version = ? where elect_key = ? and version = ?"
result, err := l.master.DB.Exec(mainStr, leader, newVersion, key, curVersion)
if err != nil {
log.Errorf("[Store][database] compare and swap version, err: %s", err.Error())
return false, store.Error(err)
}
rows, err := result.RowsAffected()
if err != nil {
log.Errorf("[Store][database] compare and swap version, get RowsAffected err: %s", err.Error())
return false, store.Error(err)
}
return (rows > 0), nil
}

// CheckMtimeExpired
func (l *leaderElectionStore) CheckMtimeExpired(key string, leaseTime int32) (bool, error) {
log.Debugf("[Store][database] check mtime expired (%s, %d)", key, leaseTime)
mainStr := "select count(1) from leader_election where elect_key = ? and mtime < FROM_UNIXTIME(UNIX_TIMESTAMP(SYSDATE()) - ?)"

var count int32
err := l.master.DB.QueryRow(mainStr, key, leaseTime).Scan(&count)
if err != nil {
log.Errorf("[Store][database] check mtime expired (%s), err: %s", key, err.Error())
}
return (count > 0), store.Error(err)
}

// leaderElectionStateMachine
type leaderElectionStateMachine struct {
leStore LeaderElectionStore
leaderFlag int32
version int64
ctx context.Context
cancel context.CancelFunc
}

// isLeader
func isLeader(flag int32) bool {
return flag > 0
}

// mainLoop
func (le *leaderElectionStateMachine) mainLoop() {
log.Infof("[Store][database] leader election started")
ticker := time.NewTicker(TickTime * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
le.tick()
case <-le.ctx.Done():
log.Infof("[Store][database] leader election stopped")
return
}
}
}

// tick
func (le *leaderElectionStateMachine) tick() {
if le.isLeader() {
r, err := le.heartbeat()
if err != nil {
log.Errorf("[Store][database] leader heartbeat err (%s), change to follower state", err.Error())
le.changeToFollower()
return
}
if !r {
le.changeToFollower()
}
} else {
dead, err := le.checkLeaderDead()
if err != nil {
log.Errorf("[Store][database] check leader dead err (%s), stay follower state", err.Error())
return
}
if !dead {
return
}
r, err := le.elect()
if err != nil {
log.Errorf("[Store][database] elect leader err (%s), stay follower state", err.Error())
return
}
if r {
le.changeToLeader()
}
}
}

// changeToLeader
func (le *leaderElectionStateMachine) changeToLeader() {
log.Infof("[Store][database] change from follower to leader")
atomic.StoreInt32(&le.leaderFlag, 1)
}

// changeToFollower
func (le *leaderElectionStateMachine) changeToFollower() {
log.Infof("[Store][database] change from leader to follower")
atomic.StoreInt32(&le.leaderFlag, 0)
}

// checkLeaderDead
func (le *leaderElectionStateMachine) checkLeaderDead() (bool, error) {
return le.leStore.CheckMtimeExpired(DefaultElectKey, LeaseTime)
}

// elect
func (le *leaderElectionStateMachine) elect() (bool, error) {
curVersion, err := le.leStore.GetVersion(DefaultElectKey)
if err != nil {
return false, err
}
le.version = curVersion + 1
return le.leStore.CompareAndSwapVersion(DefaultElectKey, curVersion, le.version, utils.LocalHost)
}

// heartbeat
func (le *leaderElectionStateMachine) heartbeat() (bool, error) {
curVersion := le.version
le.version = curVersion + 1
return le.leStore.CompareAndSwapVersion(DefaultElectKey, curVersion, le.version, utils.LocalHost)
}

// isLeader
func (le *leaderElectionStateMachine) isLeader() bool {
return isLeader(le.leaderFlag)
}

// isLeaderAtomic
func (le *leaderElectionStateMachine) isLeaderAtomic() bool {
return isLeader(atomic.LoadInt32(&le.leaderFlag))
}

// StartLeaderElection
func (m *maintainStore) StartLeaderElection() error {
ctx, cancel := context.WithCancel(context.TODO())
le := &leaderElectionStateMachine{
leStore: &leaderElectionStore{master: m.master},
leaderFlag: 0,
version: 0,
ctx: ctx,
cancel: cancel,
}
m.le = le
go le.mainLoop()
return nil
}

// StopLeaderElection
func (m *maintainStore) StopLeaderElection() {
if m.le != nil {
m.le.cancel()
}
m.le = nil
}

// IsLeader
func (maintain *maintainStore) IsLeader() bool {
return maintain.le.isLeaderAtomic()
}

// BatchCleanDeletedInstances batch clean soft deleted instances
func (maintain *maintainStore) BatchCleanDeletedInstances(batchSize uint32) (uint32, error) {
log.Infof("[Store][database] batch clean soft deleted instances(%d)", batchSize)
mainStr := "delete from instance where flag = 1 limit ?"
result, err := maintain.master.Exec(mainStr, batchSize)
if err != nil {
log.Errorf("[Store][database] batch clean soft deleted instances(%d), err: %s", batchSize, err.Error())
return 0, store.Error(err)
}

rows, err := result.RowsAffected()
if err != nil {
log.Warnf("[Store][database] batch clean soft deleted instances(%d), get RowsAffected err: %s", batchSize, err.Error())
return 0, store.Error(err)
}

return uint32(rows), nil
}
Loading

0 comments on commit a444c2a

Please sign in to comment.