-
Notifications
You must be signed in to change notification settings - Fork 721
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
*: support dynamic tso service #8517
Conversation
@@ -660,7 +660,7 @@ func (c *client) Close() { | |||
} | |||
} | |||
|
|||
func (c *client) setServiceMode(newMode pdpb.ServiceMode) { | |||
func (c *client) setServiceMode(newMode pdpb.ServiceMode, skipSameMode bool) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Prefer using a more straightforward word.
func (c *client) setServiceMode(newMode pdpb.ServiceMode, skipSameMode bool) { | |
func (c *client) setServiceMode(newMode pdpb.ServiceMode, force bool) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not the same as force.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not skipSameMode always?
client/errs/errs.go
Outdated
} | ||
errMsg := err.Error() | ||
return strings.Contains(errMsg, "not found tso address") || | ||
strings.Contains(errMsg, "maximum number of retries exceeded") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This error would also occur when the leadership cannot be elected. In which case will this be a misjudgment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't check this error on client side, do you know about the reason?
@@ -406,6 +406,8 @@ func TestTSOFollowerProxyWithTSOService(t *testing.T) { | |||
backendEndpoints := pdLeaderServer.GetAddr() | |||
tsoCluster, err := tests.NewTestTSOCluster(ctx, 2, backendEndpoints) | |||
re.NoError(err) | |||
// let service discovery know the TSO service | |||
time.Sleep(500 * time.Millisecond) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can it be replaced with an Eventually
?
6c98636
to
dc6b4a7
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #8517 +/- ##
==========================================
- Coverage 77.54% 71.80% -5.74%
==========================================
Files 474 517 +43
Lines 62359 67531 +5172
==========================================
+ Hits 48358 48493 +135
- Misses 10441 15482 +5041
+ Partials 3560 3556 -4
Flags with carried forward coverage won't be shown. Click here to find out more. |
a36ad5d
to
f2c0c14
Compare
5d11416
to
12c7821
Compare
client/errs/errs.go
Outdated
@@ -38,6 +38,16 @@ func IsLeaderChange(err error) bool { | |||
strings.Contains(errMsg, NotPrimaryErr) | |||
} | |||
|
|||
// IsServiceModeChange will determine whether there is a service mode change. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// IsServiceModeChange will determine whether there is a service mode change. | |
// IsServiceModeChange determines whether there is a service mode change. |
server/forward.go
Outdated
if err != nil { | ||
if needRetry := handleStreamError(err); needRetry { | ||
continue | ||
if s.forwardToTSOService() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can reduce some of the indentation
if s.forwardToTSOService() { | |
if !s.forwardToTSOService() { | |
return s.tsoAllocatorManager.HandleRequest(ctx, tso.GlobalDCLocation, 1) | |
} | |
request := xxxx | |
..... |
@@ -569,6 +590,72 @@ func (s *GrpcServer) Tso(stream pdpb.PD_TsoServer) error { | |||
continue | |||
} | |||
|
|||
if s.forwardToTSOService() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe wrap three situation(local tso/ms tso/normal) to three functions. Just a suggestion.
server/cluster/cluster.go
Outdated
if !c.IsServiceIndependent(constant.TSOServiceName) { | ||
// leader tso service exit, tso independent service provide tso | ||
c.tsoAllocator.ResetAllocatorGroup(tso.GlobalDCLocation, true) | ||
} | ||
if !c.IsServiceIndependent(constant.TSOServiceName) { | ||
log.Info("TSO server starts to provide timestamp") | ||
} | ||
c.SetServiceIndependent(constant.TSOServiceName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if !c.IsServiceIndependent(constant.TSOServiceName) { | |
// leader tso service exit, tso independent service provide tso | |
c.tsoAllocator.ResetAllocatorGroup(tso.GlobalDCLocation, true) | |
} | |
if !c.IsServiceIndependent(constant.TSOServiceName) { | |
log.Info("TSO server starts to provide timestamp") | |
} | |
c.SetServiceIndependent(constant.TSOServiceName) | |
if !c.IsServiceIndependent(constant.TSOServiceName) { | |
// leader tso service exit, tso independent service provide tso | |
c.tsoAllocator.ResetAllocatorGroup(tso.GlobalDCLocation, true) | |
log.Info("TSO server starts to provide timestamp") | |
} | |
c.SetServiceIndependent(constant.TSOServiceName) |
@@ -390,24 +397,84 @@ func (c *RaftCluster) checkServices() { | |||
} | |||
} | |||
|
|||
// checkTSOService checks the TSO service. | |||
func (c *RaftCluster) checkTSOService() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add more comments for this function and inside this function. Seems there are too many situations in this function
client/pd_service_discovery.go
Outdated
|
||
ctx, cancel := context.WithCancel(c.ctx) | ||
defer cancel() | ||
ticker := time.NewTicker(serviceModeUpdateInterval) | ||
failpoint.Inject("fastUpdateServiceMode", func() { | ||
ticker.Stop() | ||
ticker = time.NewTicker(10 * time.Millisecond) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ticker.Reset()?
client/client.go
Outdated
@@ -713,6 +712,7 @@ func (c *client) resetTSOClientLocked(mode pdpb.ServiceMode) { | |||
log.Warn("[pd] intend to switch to unknown service mode, just return") | |||
return | |||
} | |||
// Replace the old TSO client. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
duplicate
@@ -660,7 +660,7 @@ func (c *client) Close() { | |||
} | |||
} | |||
|
|||
func (c *client) setServiceMode(newMode pdpb.ServiceMode) { | |||
func (c *client) setServiceMode(newMode pdpb.ServiceMode, skipSameMode bool) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not skipSameMode always?
PR needs rebase. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
96c89a1
to
0a659c8
Compare
0a659c8
to
80133c9
Compare
80133c9
to
e3f29e6
Compare
Signed-off-by: Ryan Leung <[email protected]>
Signed-off-by: Ryan Leung <[email protected]>
Signed-off-by: Ryan Leung <[email protected]>
Signed-off-by: Ryan Leung <[email protected]>
Signed-off-by: Ryan Leung <[email protected]>
Signed-off-by: Ryan Leung <[email protected]>
Signed-off-by: Ryan Leung <[email protected]>
Signed-off-by: Ryan Leung <[email protected]>
df3a3ec
to
403519c
Compare
Signed-off-by: Ryan Leung <[email protected]>
403519c
to
039088d
Compare
Signed-off-by: Ryan Leung <[email protected]>
|
||
func (suite *APIServerForward) checkAvailableTSO(re *require.Assertions) { | ||
mcs.WaitForTSOServiceAvailable(suite.ctx, re, suite.pdClient) | ||
func (suite *APIServerForward) checkAvailableTSO(re *require.Assertions, needWait bool) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Prefer to wrap to two functions:
- checkAvailableTSO
- waitAvailableTSO
Signed-off-by: Ryan Leung <[email protected]>
Signed-off-by: Ryan Leung <[email protected]>
client/errs/errs.go
Outdated
return false | ||
} | ||
errMsg := err.Error() | ||
return strings.Contains(errMsg, "not found tso address") || |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we have a method to use ErrorCode
or error type to instead of strings.Contains here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use a constant as IsLeaderChange
server/cluster/cluster.go
Outdated
if c.IsServiceIndependent(constant.TSOServiceName) { | ||
log.Info("PD server starts to provide timestamp") | ||
} | ||
c.UnsetServiceIndependent(constant.TSOServiceName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why Unset
the flag whatever it is tso service or pd server.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can add more comments for independentServices
sync.Map
@@ -569,6 +590,72 @@ func (s *GrpcServer) Tso(stream pdpb.PD_TsoServer) error { | |||
continue | |||
} | |||
|
|||
if s.forwardToTSOService() { | |||
if request.GetCount() == 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we reuse forwardTSO
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me think about how to reduce the duplicated code.
@@ -855,12 +856,16 @@ func (c *DRAutoSyncReplicationConfig) adjust(meta *configutil.ConfigMetaData) { | |||
// MicroServiceConfig is the configuration for micro service. | |||
type MicroServiceConfig struct { | |||
EnableSchedulingFallback bool `toml:"enable-scheduling-fallback" json:"enable-scheduling-fallback,string"` | |||
EnableTSOFallback bool `toml:"enable-tso-fallback" json:"enable-tso-fallback,string"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this neccesary? The code about this feature is so big that, I think, this switch can't control them as we expect.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, if we don't want PD to provide the service after tso server is stopped.
server/cluster/cluster.go
Outdated
if c.IsServiceIndependent(constant.TSOServiceName) { | ||
log.Info("PD server starts to provide timestamp") | ||
} | ||
c.UnsetServiceIndependent(constant.TSOServiceName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can add more comments for independentServices
sync.Map
ch := make(chan struct{}) | ||
ch1 := make(chan struct{}) | ||
wg.Add(1) | ||
go func(ctx context.Context, wg *sync.WaitGroup, ch, ch1 chan struct{}) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems we needn't delivery these params
@@ -578,3 +585,212 @@ func (suite *CommonTestSuite) TestBootstrapDefaultKeyspaceGroup() { | |||
suite.pdLeader.ResignLeader() | |||
suite.pdLeader = suite.cluster.GetServer(suite.cluster.WaitLeader()) | |||
} | |||
|
|||
func TestTSOServiceSwitch1(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add a description for these three UTs. Or make the UT name more meaningful?
ch1 <- struct{}{} | ||
<-ch |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Prefer to wrap a function
ch1 <- struct{}{} | |
<-ch | |
waitOneTS := func() { | |
ch1 <- struct{}{} | |
<-ch | |
} | |
... | |
waitOneTS() |
re.NoError(err) | ||
tsoCluster.WaitForDefaultPrimaryServing(re) | ||
|
||
// Wait for TSO server to start and PD to detect it |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have some ways to detect this instead of Sleep
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have a better idea.
tsoCluster.Destroy() | ||
|
||
// Wait for the configuration change to take effect | ||
time.Sleep(300 * time.Millisecond) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
re.NoError(failpoint.Disable("github.com/tikv/pd/client/fastUpdateServiceMode")) | ||
} | ||
|
||
func TestTSOServiceSwitch3(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the difference with the above two?
Signed-off-by: Ryan Leung <[email protected]>
Signed-off-by: Ryan Leung <[email protected]>
Signed-off-by: Ryan Leung <[email protected]>
Signed-off-by: Ryan Leung <[email protected]>
@rleungx: The following test failed, say
Full PR test history. Your PR dashboard. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. I understand the commands that are listed here. |
PR needs rebase. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
Close it due to related PRs merged. |
What problem does this PR solve?
Issue Number: ref #8477
What is changed and how does it work?
Check List
Tests
Release note