Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mcs/transfer: Added checks for available tso nodes #8530

Merged
merged 3 commits into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/mcs/scheduling/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1508,7 +1508,7 @@ func transferPrimary(c *gin.Context) {
}

if err := mcsutils.TransferPrimary(svr.GetClient(), svr.GetParticipant().GetExpectedPrimaryLease(),
constant.SchedulingServiceName, svr.Name(), newPrimary, 0); err != nil {
constant.SchedulingServiceName, svr.Name(), newPrimary, 0, nil); err != nil {
c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error())
return
}
Expand Down
8 changes: 7 additions & 1 deletion pkg/mcs/tso/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,9 +307,15 @@ func transferPrimary(c *gin.Context) {
c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error())
return
}
// only members of specific group are valid primary candidates.
group := svr.GetKeyspaceGroupManager().GetKeyspaceGroups()[keyspaceGroupID]
memberMap := make(map[string]bool, len(group.Members))
for _, member := range group.Members {
memberMap[member.Address] = true
}

if err := utils.TransferPrimary(svr.GetClient(), globalAllocator.(*tso.GlobalTSOAllocator).GetExpectedPrimaryLease(),
constant.TSOServiceName, svr.Name(), newPrimary, keyspaceGroupID); err != nil {
constant.TSOServiceName, svr.Name(), newPrimary, keyspaceGroupID, memberMap); err != nil {
c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error())
return
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/mcs/utils/expected_primary.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func watchExpectedPrimary(ctx context.Context,
// TransferPrimary transfers the primary of the specified service.
// keyspaceGroupID is optional, only used for TSO service.
func TransferPrimary(client *clientv3.Client, lease *election.Lease, serviceName,
oldPrimary, newPrimary string, keyspaceGroupID uint32) error {
oldPrimary, newPrimary string, keyspaceGroupID uint32, tsoMembersMap map[string]bool) error {
if lease == nil {
return errors.New("current lease is nil, please check leadership")
}
Expand All @@ -139,6 +139,10 @@ func TransferPrimary(client *clientv3.Client, lease *election.Lease, serviceName

var primaryIDs []string
for _, member := range entries {
// only members of specific group are valid primary candidates for TSO service.
if tsoMembersMap != nil && !tsoMembersMap[member.ServiceAddr] {
continue
}
if (newPrimary == "" && member.Name != oldPrimary) || (newPrimary != "" && member.Name == newPrimary) {
primaryIDs = append(primaryIDs, member.ServiceAddr)
}
Expand Down
68 changes: 55 additions & 13 deletions tests/integrations/mcs/members/member_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,19 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
pdClient "github.com/tikv/pd/client/http"
bs "github.com/tikv/pd/pkg/basicserver"
tso "github.com/tikv/pd/pkg/mcs/tso/server"
"github.com/tikv/pd/pkg/mcs/tso/server/apis/v1"
"github.com/tikv/pd/pkg/mcs/utils/constant"
"github.com/tikv/pd/pkg/utils/tempurl"
"github.com/tikv/pd/pkg/utils/testutil"
"github.com/tikv/pd/tests"
Expand All @@ -41,6 +46,9 @@ type memberTestSuite struct {
backendEndpoints string
pdClient pdClient.Client

// We only test `DefaultKeyspaceGroupID` here.
// tsoAvailMembers is used to check the tso members which in the DefaultKeyspaceGroupID.
tsoAvailMembers map[string]bool
tsoNodes map[string]bs.Server
schedulingNodes map[string]bs.Server
}
Expand All @@ -51,6 +59,7 @@ func TestMemberTestSuite(t *testing.T) {

func (suite *memberTestSuite) SetupTest() {
re := suite.Require()
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes", `return(true)`))
ctx, cancel := context.WithCancel(context.Background())
suite.ctx = ctx
cluster, err := tests.NewTestAPICluster(suite.ctx, 1)
Expand All @@ -65,15 +74,24 @@ func (suite *memberTestSuite) SetupTest() {

// TSO
nodes := make(map[string]bs.Server)
// mock 3 tso nodes, which is more than the default replica count(DefaultKeyspaceGroupReplicaCount).
for i := 0; i < 3; i++ {
s, cleanup := tests.StartSingleTSOTestServer(suite.ctx, re, suite.backendEndpoints, tempurl.Alloc())
nodes[s.GetAddr()] = s
suite.cleanupFunc = append(suite.cleanupFunc, func() {
cleanup()
})
}
tests.WaitForPrimaryServing(re, nodes)
primary := tests.WaitForPrimaryServing(re, nodes)
members := mustGetKeyspaceGroupMembers(re, nodes[primary].(*tso.Server))
// Get the tso nodes
suite.tsoNodes = nodes
// We only test `DefaultKeyspaceGroupID` here.
// tsoAvailMembers is used to check the tso members which in the DefaultKeyspaceGroupID.
suite.tsoAvailMembers = make(map[string]bool)
for _, member := range members[constant.DefaultKeyspaceGroupID].Group.Members {
suite.tsoAvailMembers[member.Address] = true
}

// Scheduling
nodes = make(map[string]bs.Server)
Expand All @@ -100,6 +118,8 @@ func (suite *memberTestSuite) TearDownTest() {
suite.pdClient.Close()
}
suite.cluster.Destroy()
re := suite.Require()
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes"))
}

func (suite *memberTestSuite) TestMembers() {
Expand All @@ -124,7 +144,7 @@ func (suite *memberTestSuite) TestPrimary() {
re.NotEmpty(primary)
}

func (suite *memberTestSuite) TestCampaignPrimaryWhileServerClose() {
func (suite *memberTestSuite) TestPrimaryWorkWhileOtherServerClose() {
re := suite.Require()
primary, err := suite.pdClient.GetMicroServicePrimary(suite.ctx, "tso")
re.NoError(err)
Expand All @@ -143,20 +163,18 @@ func (suite *memberTestSuite) TestCampaignPrimaryWhileServerClose() {
primary, err := suite.pdClient.GetMicroServicePrimary(suite.ctx, service)
re.NoError(err)

// Close old and new primary to mock campaign primary
// Close non-primary node.
for _, member := range nodes {
if member.GetAddr() != primary {
nodes[member.Name()].Close()
break
}
}
nodes[primary].Close()
tests.WaitForPrimaryServing(re, nodes)

// primary should be different with before
onlyPrimary, err := suite.pdClient.GetMicroServicePrimary(suite.ctx, service)
// primary should be same with before.
curPrimary, err := suite.pdClient.GetMicroServicePrimary(suite.ctx, service)
re.NoError(err)
re.NotEqual(primary, onlyPrimary)
re.Equal(primary, curPrimary)
}
}

Expand Down Expand Up @@ -200,6 +218,9 @@ func (suite *memberTestSuite) TestTransferPrimary() {
// Test transfer primary to a specific node
var newPrimary string
for _, member := range nodes {
if service == "tso" && !suite.tsoAvailMembers[member.GetAddr()] {
continue
}
if member.GetAddr() != primary {
newPrimary = member.Name()
break
Expand Down Expand Up @@ -251,6 +272,9 @@ func (suite *memberTestSuite) TestCampaignPrimaryAfterTransfer() {
// Test transfer primary to a specific node
var newPrimary string
for _, member := range nodes {
if service == "tso" && !suite.tsoAvailMembers[member.GetAddr()] {
continue
}
if member.GetAddr() != primary {
newPrimary = member.Name()
break
Expand All @@ -270,15 +294,13 @@ func (suite *memberTestSuite) TestCampaignPrimaryAfterTransfer() {
re.NoError(err)
re.NotEqual(primary, newPrimary)

// Close old and new primary to mock campaign primary
nodes[primary].Close()
// Close primary to push other nodes campaign primary
nodes[newPrimary].Close()
tests.WaitForPrimaryServing(re, nodes)
// Primary should be different with before
onlyPrimary, err := suite.pdClient.GetMicroServicePrimary(suite.ctx, service)
anotherPrimary, err := suite.pdClient.GetMicroServicePrimary(suite.ctx, service)
re.NoError(err)
re.NotEqual(primary, onlyPrimary)
re.NotEqual(newPrimary, onlyPrimary)
re.NotEqual(newPrimary, anotherPrimary)
}
}

Expand All @@ -304,6 +326,9 @@ func (suite *memberTestSuite) TestTransferPrimaryWhileLeaseExpired() {
// Test transfer primary to a specific node
var newPrimary string
for _, member := range nodes {
if service == "tso" && !suite.tsoAvailMembers[member.GetAddr()] {
continue
}
if member.GetAddr() != primary {
newPrimary = member.Name()
break
Expand Down Expand Up @@ -356,6 +381,9 @@ func (suite *memberTestSuite) TestTransferPrimaryWhileLeaseExpiredAndServerDown(
// Test transfer primary to a specific node
var newPrimary string
for _, member := range nodes {
if service == "tso" && !suite.tsoAvailMembers[member.GetAddr()] {
continue
}
if member.GetAddr() != primary {
newPrimary = member.Name()
break
Expand Down Expand Up @@ -390,3 +418,17 @@ func (suite *memberTestSuite) TestTransferPrimaryWhileLeaseExpiredAndServerDown(
re.NotEqual(newPrimary, onlyPrimary)
}
}

func mustGetKeyspaceGroupMembers(re *require.Assertions, server *tso.Server) map[uint32]*apis.KeyspaceGroupMember {
httpReq, err := http.NewRequest(http.MethodGet, server.GetAddr()+"/tso/api/v1/keyspace-groups/members", nil)
re.NoError(err)
httpResp, err := tests.TestDialClient.Do(httpReq)
re.NoError(err)
defer httpResp.Body.Close()
data, err := io.ReadAll(httpResp.Body)
re.NoError(err)
re.Equal(http.StatusOK, httpResp.StatusCode, string(data))
var resp map[uint32]*apis.KeyspaceGroupMember
re.NoError(json.Unmarshal(data, &resp))
return resp
}