Skip to content

Commit

Permalink
Cherry-pick 0403d54 with conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
vitess-bot[bot] authored and vitess-bot committed Nov 11, 2024
1 parent 1dcbce6 commit 5b2db0b
Show file tree
Hide file tree
Showing 12 changed files with 564 additions and 22 deletions.
85 changes: 85 additions & 0 deletions go/test/endtoend/reparent/newfeaturetest/reparent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ package newfeaturetest
import (
"context"
"fmt"
"sync"
"testing"
"time"

"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -146,3 +148,86 @@ func TestChangeTypeWithoutSemiSync(t *testing.T) {
err = clusterInstance.VtctlclientProcess.ExecuteCommand("ChangeTabletType", replica.Alias, "replica")
require.NoError(t, err)
}
<<<<<<< HEAD
=======

// TestERSWithWriteInPromoteReplica tests that ERS doesn't fail even if there is a
// write that happens when PromoteReplica is called.
func TestERSWithWriteInPromoteReplica(t *testing.T) {
defer cluster.PanicHandler(t)
clusterInstance := utils.SetupReparentCluster(t, "semi_sync")
defer utils.TeardownCluster(clusterInstance)
tablets := clusterInstance.Keyspaces[0].Shards[0].Vttablets
utils.ConfirmReplication(t, tablets[0], []*cluster.Vttablet{tablets[1], tablets[2], tablets[3]})

// Drop a table so that when sidecardb changes are checked, we run a DML query.
utils.RunSQLs(context.Background(), t, []string{
"set sql_log_bin=0",
`SET @@global.super_read_only=0`,
`DROP TABLE _vt.heartbeat`,
"set sql_log_bin=1",
}, tablets[3])
_, err := utils.Ers(clusterInstance, tablets[3], "60s", "30s")
require.NoError(t, err, "ERS should not fail even if there is a sidecardb change")
}

func TestBufferingWithMultipleDisruptions(t *testing.T) {
defer cluster.PanicHandler(t)
clusterInstance := utils.SetupShardedReparentCluster(t, "semi_sync")
defer utils.TeardownCluster(clusterInstance)

// Stop all VTOrc instances, so that they don't interfere with the test.
for _, vtorc := range clusterInstance.VTOrcProcesses {
err := vtorc.TearDown()
require.NoError(t, err)
}

// Start by reparenting all the shards to the first tablet.
keyspace := clusterInstance.Keyspaces[0]
shards := keyspace.Shards
for _, shard := range shards {
err := clusterInstance.VtctldClientProcess.PlannedReparentShard(keyspace.Name, shard.Name, shard.Vttablets[0].Alias)
require.NoError(t, err)
}

// We simulate start of external reparent or a PRS where the healthcheck update from the tablet gets lost in transit
// to vtgate by just setting the primary read only. This is also why we needed to shutdown all VTOrcs, so that they don't
// fix this.
utils.RunSQL(context.Background(), t, "set global read_only=1", shards[0].Vttablets[0])
utils.RunSQL(context.Background(), t, "set global read_only=1", shards[1].Vttablets[0])

wg := sync.WaitGroup{}
rowCount := 10
vtParams := clusterInstance.GetVTParams(keyspace.Name)
// We now spawn writes for a bunch of go routines.
// The ones going to shard 1 and shard 2 should block, since
// they're in the midst of a reparenting operation (as seen by the buffering code).
for i := 1; i <= rowCount; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
conn, err := mysql.Connect(context.Background(), &vtParams)
if err != nil {
return
}
defer conn.Close()
_, err = conn.ExecuteFetch(utils.GetInsertQuery(i), 0, false)
require.NoError(t, err)
}(i)
}

// Now, run a PRS call on the last shard. This shouldn't unbuffer the queries that are buffered for shards 1 and 2
// since the disruption on the two shards hasn't stopped.
err := clusterInstance.VtctldClientProcess.PlannedReparentShard(keyspace.Name, shards[2].Name, shards[2].Vttablets[1].Alias)
require.NoError(t, err)
// We wait a second just to make sure the PRS changes are processed by the buffering logic in vtgate.
time.Sleep(1 * time.Second)
// Finally, we'll now make the 2 shards healthy again by running PRS.
err = clusterInstance.VtctldClientProcess.PlannedReparentShard(keyspace.Name, shards[0].Name, shards[0].Vttablets[1].Alias)
require.NoError(t, err)
err = clusterInstance.VtctldClientProcess.PlannedReparentShard(keyspace.Name, shards[1].Name, shards[1].Vttablets[1].Alias)
require.NoError(t, err)
// Wait for all the writes to have succeeded.
wg.Wait()
}
>>>>>>> 0403d541c1 (Fix to prevent stopping buffering prematurely (#17013))
47 changes: 47 additions & 0 deletions go/test/endtoend/reparent/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,53 @@ func SetupRangeBasedCluster(ctx context.Context, t *testing.T) *cluster.LocalPro
return setupCluster(ctx, t, ShardName, []string{cell1}, []int{2}, "semi_sync")
}

// SetupShardedReparentCluster is used to setup a sharded cluster for testing
func SetupShardedReparentCluster(t *testing.T, durability string) *cluster.LocalProcessCluster {
clusterInstance := cluster.NewCluster(cell1, Hostname)
// Start topo server
err := clusterInstance.StartTopo()
require.NoError(t, err)

clusterInstance.VtTabletExtraArgs = append(clusterInstance.VtTabletExtraArgs,
"--lock_tables_timeout", "5s",
// Fast health checks help find corner cases.
"--health_check_interval", "1s",
"--track_schema_versions=true",
"--queryserver_enable_online_ddl=false")
clusterInstance.VtGateExtraArgs = append(clusterInstance.VtGateExtraArgs,
"--enable_buffer",
// Long timeout in case failover is slow.
"--buffer_window", "10m",
"--buffer_max_failover_duration", "10m",
"--buffer_min_time_between_failovers", "20m",
)

// Start keyspace
keyspace := &cluster.Keyspace{
Name: KeyspaceName,
SchemaSQL: sqlSchema,
VSchema: `{"sharded": true, "vindexes": {"hash_index": {"type": "hash"}}, "tables": {"vt_insert_test": {"column_vindexes": [{"column": "id", "name": "hash_index"}]}}}`,
DurabilityPolicy: durability,
}
err = clusterInstance.StartKeyspace(*keyspace, []string{"-40", "40-80", "80-"}, 2, false)
require.NoError(t, err)

// Start Vtgate
err = clusterInstance.StartVtgate()
require.NoError(t, err)
return clusterInstance
}

// GetInsertQuery returns a built insert query to insert a row.
func GetInsertQuery(idx int) string {
return fmt.Sprintf(insertSQL, idx, idx)
}

// GetSelectionQuery returns a built selection query read the data.
func GetSelectionQuery() string {
return `select * from vt_insert_test`
}

// TeardownCluster is used to teardown the reparent cluster. When
// run in a CI environment -- which is considered true when the
// "CI" env variable is set to "true" -- the teardown also removes
Expand Down
15 changes: 15 additions & 0 deletions go/vt/discovery/fake_healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,21 @@ func (fhc *FakeHealthCheck) SetTabletType(tablet *topodatapb.Tablet, tabletType
item.ts.Target.TabletType = tabletType
}

// SetPrimaryTimestamp sets the primary timestamp for the given tablet
func (fhc *FakeHealthCheck) SetPrimaryTimestamp(tablet *topodatapb.Tablet, timestamp int64) {
if fhc.ch == nil {
return
}
fhc.mu.Lock()
defer fhc.mu.Unlock()
key := TabletToMapKey(tablet)
item, isPresent := fhc.items[key]
if !isPresent {
return
}
item.ts.PrimaryTermStartTime = timestamp
}

// Unsubscribe is not implemented.
func (fhc *FakeHealthCheck) Unsubscribe(c chan *TabletHealth) {
}
Expand Down
115 changes: 112 additions & 3 deletions go/vt/discovery/keyspace_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,12 @@ func (kss *keyspaceState) beingResharded(currentShard string) bool {
}

type shardState struct {
target *querypb.Target
serving bool
target *querypb.Target
serving bool
// waitForReparent is used to tell the keyspace event watcher
// that this shard should be marked serving only after a reparent
// operation has succeeded.
waitForReparent bool
externallyReparented int64
currentPrimary *topodatapb.TabletAlias
}
Expand Down Expand Up @@ -355,8 +359,34 @@ func (kss *keyspaceState) onHealthCheck(th *TabletHealth) {
// if the shard went from serving to not serving, or the other way around, the keyspace
// is undergoing an availability event
if sstate.serving != th.Serving {
sstate.serving = th.Serving
kss.consistent = false
switch {
case th.Serving && sstate.waitForReparent:
// While waiting for a reparent, if we receive a serving primary,
// we should check if the primary term start time is greater than the externally reparented time.
// We mark the shard serving only if it is. This is required so that we don't prematurely stop
// buffering for PRS, or TabletExternallyReparented, after seeing a serving healthcheck from the
// same old primary tablet that has already been turned read-only.
if th.PrimaryTermStartTime > sstate.externallyReparented {
sstate.waitForReparent = false
sstate.serving = true
}
case th.Serving && !sstate.waitForReparent:
sstate.serving = true
case !th.Serving:
sstate.serving = false
}
}
if !th.Serving {
// Once we have seen a non-serving primary healthcheck, there is no need for us to explicitly wait
// for a reparent to happen. We use waitForReparent to ensure that we don't prematurely stop
// buffering when we receive a serving healthcheck from the primary that is being demoted.
// However, if we receive a non-serving check, then we know that we won't receive any more serving
// health checks until reparent finishes. Specifically, this helps us when PRS fails, but
// stops gracefully because the new candidate couldn't get caught up in time. In this case, we promote
// the previous primary back. Without turning off waitForReparent here, we wouldn't be able to stop
// buffering for that case.
sstate.waitForReparent = false
}

// if the primary for this shard has been externally reparented, we're undergoing a failover,
Expand Down Expand Up @@ -651,3 +681,82 @@ func (kew *KeyspaceEventWatcher) GetServingKeyspaces() []string {
}
return servingKeyspaces
}
<<<<<<< HEAD
=======

// WaitForConsistentKeyspaces waits for the given set of keyspaces to be marked consistent.
func (kew *KeyspaceEventWatcher) WaitForConsistentKeyspaces(ctx context.Context, ksList []string) error {
// We don't want to change the original keyspace list that we receive so we clone it
// before we empty it elements down below.
keyspaces := slices.Clone(ksList)
for {
// We empty keyspaces as we find them to be consistent.
allConsistent := true
for i, ks := range keyspaces {
if ks == "" {
continue
}

// Get the keyspace status and see it is consistent yet or not.
kss := kew.getKeyspaceStatus(ctx, ks)
// If kss is nil, then it must be deleted. In that case too it is fine for us to consider
// it consistent since the keyspace has been deleted.
if kss == nil || kss.isConsistent() {
keyspaces[i] = ""
} else {
allConsistent = false
}
}

if allConsistent {
// all the keyspaces are consistent.
return nil
}

// Unblock after the sleep or when the context has expired.
select {
case <-ctx.Done():
for _, ks := range keyspaces {
if ks != "" {
log.Infof("keyspace %v didn't become consistent", ks)
}
}
return ctx.Err()
case <-time.After(waitConsistentKeyspacesCheck):
}
}
}

// MarkShardNotServing marks the given shard not serving.
// We use this when we start buffering for a given shard. This helps
// coordinate between the sharding logic and the keyspace event watcher.
// We take in a boolean as well to tell us whether this error is because
// a reparent is ongoing. If it is, we also mark the shard to wait for a reparent.
// The return argument is whether the shard was found and marked not serving successfully or not.
func (kew *KeyspaceEventWatcher) MarkShardNotServing(ctx context.Context, keyspace string, shard string, isReparentErr bool) bool {
kss := kew.getKeyspaceStatus(ctx, keyspace)
if kss == nil {
// Only happens if the keyspace was deleted.
return false
}
kss.mu.Lock()
defer kss.mu.Unlock()
sstate := kss.shards[shard]
if sstate == nil {
// This only happens if the shard is deleted, or if
// the keyspace event watcher hasn't seen the shard at all.
return false
}
// Mark the keyspace inconsistent and the shard not serving.
kss.consistent = false
sstate.serving = false
if isReparentErr {
// If the error was triggered because a reparent operation has started.
// We mark the shard to wait for a reparent to finish before marking it serving.
// This is required to prevent premature stopping of buffering if we receive
// a serving healthcheck from a primary that is being demoted.
sstate.waitForReparent = true
}
return true
}
>>>>>>> 0403d541c1 (Fix to prevent stopping buffering prematurely (#17013))
Loading

0 comments on commit 5b2db0b

Please sign in to comment.