Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release-19.0] vtorc: require topo for Healthy: true in /debug/health (#17129) #17351

Draft
wants to merge 6 commits into
base: release-19.0
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions go/vt/vtorc/logic/keyspace_shard_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,16 @@
)

// RefreshAllKeyspacesAndShards reloads the keyspace and shard information for the keyspaces that vtorc is concerned with.
func RefreshAllKeyspacesAndShards() {
func RefreshAllKeyspacesAndShards(ctx context.Context) error {

Check warning on line 32 in go/vt/vtorc/logic/keyspace_shard_discovery.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vtorc/logic/keyspace_shard_discovery.go#L32

Added line #L32 was not covered by tests
var keyspaces []string
if len(clustersToWatch) == 0 { // all known keyspaces
ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
ctx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)

Check warning on line 35 in go/vt/vtorc/logic/keyspace_shard_discovery.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vtorc/logic/keyspace_shard_discovery.go#L35

Added line #L35 was not covered by tests
defer cancel()
var err error
// Get all the keyspaces
keyspaces, err = ts.GetKeyspaces(ctx)
if err != nil {
log.Error(err)
return
return err

Check warning on line 41 in go/vt/vtorc/logic/keyspace_shard_discovery.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vtorc/logic/keyspace_shard_discovery.go#L41

Added line #L41 was not covered by tests
}
} else {
// Parse input and build list of keyspaces
Expand All @@ -55,14 +54,14 @@
}
if len(keyspaces) == 0 {
log.Errorf("Found no keyspaces for input: %+v", clustersToWatch)
return
return nil

Check warning on line 57 in go/vt/vtorc/logic/keyspace_shard_discovery.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vtorc/logic/keyspace_shard_discovery.go#L57

Added line #L57 was not covered by tests
}
}

// Sort the list of keyspaces.
// The list can have duplicates because the input to clusters to watch may have multiple shards of the same keyspace
sort.Strings(keyspaces)
refreshCtx, refreshCancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
refreshCtx, refreshCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)

Check warning on line 64 in go/vt/vtorc/logic/keyspace_shard_discovery.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vtorc/logic/keyspace_shard_discovery.go#L64

Added line #L64 was not covered by tests
defer refreshCancel()
var wg sync.WaitGroup
for idx, keyspace := range keyspaces {
Expand All @@ -83,6 +82,8 @@
}(keyspace)
}
wg.Wait()

return nil

Check warning on line 86 in go/vt/vtorc/logic/keyspace_shard_discovery.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vtorc/logic/keyspace_shard_discovery.go#L85-L86

Added lines #L85 - L86 were not covered by tests
}

// RefreshKeyspaceAndShard refreshes the keyspace record and shard record for the given keyspace and shard.
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtorc/logic/keyspace_shard_discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func TestRefreshAllKeyspaces(t *testing.T) {
// Set clusters to watch to only watch ks1 and ks3
onlyKs1and3 := []string{"ks1/-80", "ks3/-80", "ks3/80-"}
clustersToWatch = onlyKs1and3
RefreshAllKeyspacesAndShards()
require.NoError(t, RefreshAllKeyspacesAndShards(context.Background()))

// Verify that we only have ks1 and ks3 in vtorc's db.
verifyKeyspaceInfo(t, "ks1", keyspaceDurabilityNone, "")
Expand All @@ -108,7 +108,7 @@ func TestRefreshAllKeyspaces(t *testing.T) {
clustersToWatch = nil
// Change the durability policy of ks1
reparenttestutil.SetKeyspaceDurability(ctx, t, ts, "ks1", "semi_sync")
RefreshAllKeyspacesAndShards()
require.NoError(t, RefreshAllKeyspacesAndShards(context.Background()))

// Verify that all the keyspaces are correctly reloaded
verifyKeyspaceInfo(t, "ks1", keyspaceDurabilitySemiSync, "")
Expand Down
30 changes: 18 additions & 12 deletions go/vt/vtorc/logic/tablet_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
"time"

"github.com/spf13/pflag"

"google.golang.org/protobuf/encoding/prototext"
"google.golang.org/protobuf/proto"

Expand Down Expand Up @@ -71,30 +70,36 @@
if _, err := db.ExecVTOrc("delete from vitess_tablet"); err != nil {
log.Error(err)
}
// We refresh all information from the topo once before we start the ticks to do
// it on a timer.
ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
defer cancel()
if err := refreshAllInformation(ctx); err != nil {
log.Errorf("failed to initialize topo information: %+v", err)
}

Check warning on line 79 in go/vt/vtorc/logic/tablet_discovery.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vtorc/logic/tablet_discovery.go#L75-L79

Added lines #L75 - L79 were not covered by tests
return time.Tick(time.Second * time.Duration(config.Config.TopoInformationRefreshSeconds)) //nolint SA1015: using time.Tick leaks the underlying ticker
}

// refreshAllTablets reloads the tablets from topo and discovers the ones which haven't been refreshed in a while
func refreshAllTablets() {
refreshTabletsUsing(func(tabletAlias string) {
func refreshAllTablets(ctx context.Context) error {
return refreshTabletsUsing(ctx, func(tabletAlias string) {

Check warning on line 85 in go/vt/vtorc/logic/tablet_discovery.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vtorc/logic/tablet_discovery.go#L84-L85

Added lines #L84 - L85 were not covered by tests
DiscoverInstance(tabletAlias, false /* forceDiscovery */)
}, false /* forceRefresh */)
}

func refreshTabletsUsing(loader func(tabletAlias string), forceRefresh bool) {
func refreshTabletsUsing(ctx context.Context, loader func(tabletAlias string), forceRefresh bool) error {

Check warning on line 90 in go/vt/vtorc/logic/tablet_discovery.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vtorc/logic/tablet_discovery.go#L90

Added line #L90 was not covered by tests
if !IsLeaderOrActive() {
return
return nil

Check warning on line 92 in go/vt/vtorc/logic/tablet_discovery.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vtorc/logic/tablet_discovery.go#L92

Added line #L92 was not covered by tests
}
if len(clustersToWatch) == 0 { // all known clusters
ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
ctx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)

Check warning on line 95 in go/vt/vtorc/logic/tablet_discovery.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vtorc/logic/tablet_discovery.go#L95

Added line #L95 was not covered by tests
defer cancel()
cells, err := ts.GetKnownCells(ctx)
if err != nil {
log.Error(err)
return
return err

Check warning on line 99 in go/vt/vtorc/logic/tablet_discovery.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vtorc/logic/tablet_discovery.go#L99

Added line #L99 was not covered by tests
}

refreshCtx, refreshCancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
refreshCtx, refreshCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)

Check warning on line 102 in go/vt/vtorc/logic/tablet_discovery.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vtorc/logic/tablet_discovery.go#L102

Added line #L102 was not covered by tests
defer refreshCancel()
var wg sync.WaitGroup
for _, cell := range cells {
Expand All @@ -115,7 +120,7 @@
keyspaceShards = append(keyspaceShards, &topo.KeyspaceShard{Keyspace: input[0], Shard: input[1]})
} else {
// Assume this is a keyspace and find all shards in keyspace
ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
ctx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)

Check warning on line 123 in go/vt/vtorc/logic/tablet_discovery.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vtorc/logic/tablet_discovery.go#L123

Added line #L123 was not covered by tests
defer cancel()
shards, err := ts.GetShardNames(ctx, ks)
if err != nil {
Expand All @@ -134,9 +139,9 @@
}
if len(keyspaceShards) == 0 {
log.Errorf("Found no keyspaceShards for input: %+v", clustersToWatch)
return
return nil

Check warning on line 142 in go/vt/vtorc/logic/tablet_discovery.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vtorc/logic/tablet_discovery.go#L142

Added line #L142 was not covered by tests
}
refreshCtx, refreshCancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
refreshCtx, refreshCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)

Check warning on line 144 in go/vt/vtorc/logic/tablet_discovery.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vtorc/logic/tablet_discovery.go#L144

Added line #L144 was not covered by tests
defer refreshCancel()
var wg sync.WaitGroup
for _, ks := range keyspaceShards {
Expand All @@ -148,6 +153,7 @@
}
wg.Wait()
}
return nil

Check warning on line 156 in go/vt/vtorc/logic/tablet_discovery.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vtorc/logic/tablet_discovery.go#L156

Added line #L156 was not covered by tests
}

func refreshTabletsInCell(ctx context.Context, cell string, loader func(tabletAlias string), forceRefresh bool) {
Expand Down
47 changes: 28 additions & 19 deletions go/vt/vtorc/logic/vtorc.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package logic

import (
"context"
"os"
"os/signal"
"sync"
Expand All @@ -27,6 +28,7 @@
"github.com/patrickmn/go-cache"
"github.com/rcrowley/go-metrics"
"github.com/sjmudd/stopwatch"
"golang.org/x/sync/errgroup"

"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/servenv"
Expand Down Expand Up @@ -415,27 +417,34 @@
}
}()
case <-tabletTopoTick:
// Create a wait group
var wg sync.WaitGroup
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(config.Config.TopoInformationRefreshSeconds))
if err := refreshAllInformation(ctx); err != nil {
log.Errorf("failed to refresh topo information: %+v", err)
}
cancel()

Check warning on line 424 in go/vt/vtorc/logic/vtorc.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vtorc/logic/vtorc.go#L420-L424

Added lines #L420 - L424 were not covered by tests
}
}
}

// Refresh all keyspace information.
wg.Add(1)
go func() {
defer wg.Done()
RefreshAllKeyspacesAndShards()
}()
// refreshAllInformation refreshes both shard and tablet information. This is meant to be run on tablet topo ticks.
func refreshAllInformation(ctx context.Context) error {
// Create an errgroup
eg, ctx := errgroup.WithContext(ctx)

Check warning on line 432 in go/vt/vtorc/logic/vtorc.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vtorc/logic/vtorc.go#L430-L432

Added lines #L430 - L432 were not covered by tests

// Refresh all tablets.
wg.Add(1)
go func() {
defer wg.Done()
refreshAllTablets()
}()
// Refresh all keyspace information.
eg.Go(func() error {
return RefreshAllKeyspacesAndShards(ctx)
})

Check warning on line 437 in go/vt/vtorc/logic/vtorc.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vtorc/logic/vtorc.go#L434-L437

Added lines #L434 - L437 were not covered by tests

// Wait for both the refreshes to complete
wg.Wait()
// We have completed one discovery cycle in the entirety of it. We should update the process health.
process.FirstDiscoveryCycleComplete.Store(true)
}
// Refresh all tablets.
eg.Go(func() error {
return refreshAllTablets(ctx)
})

Check warning on line 442 in go/vt/vtorc/logic/vtorc.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vtorc/logic/vtorc.go#L440-L442

Added lines #L440 - L442 were not covered by tests

// Wait for both the refreshes to complete
err := eg.Wait()
if err == nil {
process.FirstDiscoveryCycleComplete.Store(true)

Check warning on line 447 in go/vt/vtorc/logic/vtorc.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vtorc/logic/vtorc.go#L445-L447

Added lines #L445 - L447 were not covered by tests
}
return err

Check warning on line 449 in go/vt/vtorc/logic/vtorc.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vtorc/logic/vtorc.go#L449

Added line #L449 was not covered by tests
}
52 changes: 52 additions & 0 deletions go/vt/vtorc/logic/vtorc_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
package logic

import (
"context"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/vt/topo/memorytopo"
"vitess.io/vitess/go/vt/vtorc/db"
"vitess.io/vitess/go/vt/vtorc/process"
)

func TestWaitForLocksRelease(t *testing.T) {
Expand Down Expand Up @@ -54,3 +60,49 @@ func waitForLocksReleaseAndGetTimeWaitedFor() time.Duration {
waitForLocksRelease()
return time.Since(start)
}

func TestRefreshAllInformation(t *testing.T) {
defer process.ResetLastHealthCheckCache()

// Store the old flags and restore on test completion
oldTs := ts
defer func() {
ts = oldTs
}()

// Clear the database after the test. The easiest way to do that is to run all the initialization commands again.
defer func() {
db.ClearVTOrcDatabase()
}()

// Verify in the beginning, we have the first DiscoveredOnce field false.
_, err := process.HealthTest()
require.NoError(t, err)

// Create a memory topo-server and create the keyspace and shard records
ts = memorytopo.NewServer(context.Background(), cell1)
_, err = ts.GetOrCreateShard(context.Background(), keyspace, shard)
require.NoError(t, err)

// Test error
ctx, cancel := context.WithCancel(context.Background())
cancel() // cancel context to simulate timeout
require.Error(t, refreshAllInformation(ctx))
require.False(t, process.FirstDiscoveryCycleComplete.Load())
health, err := process.HealthTest()
require.NoError(t, err)
require.False(t, health.DiscoveredOnce)
require.False(t, health.Healthy)
process.ResetLastHealthCheckCache()

// Test success
ctx2, cancel2 := context.WithCancel(context.Background())
defer cancel2()
require.NoError(t, refreshAllInformation(ctx2))
require.True(t, process.FirstDiscoveryCycleComplete.Load())
health, err = process.HealthTest()
require.NoError(t, err)
require.True(t, health.DiscoveredOnce)
require.True(t, health.Healthy)
process.ResetLastHealthCheckCache()
}
4 changes: 3 additions & 1 deletion go/vt/vtorc/process/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ var FirstDiscoveryCycleComplete atomic.Bool

var lastHealthCheckCache = cache.New(config.HealthPollSeconds*time.Second, time.Second)

func ResetLastHealthCheckCache() { lastHealthCheckCache.Flush() }

type NodeHealth struct {
Hostname string
Token string
Expand Down Expand Up @@ -120,8 +122,8 @@ func HealthTest() (health *HealthStatus, err error) {
log.Error(err)
return health, err
}
health.Healthy = healthy
health.DiscoveredOnce = FirstDiscoveryCycleComplete.Load()
health.Healthy = healthy && health.DiscoveredOnce

if health.ActiveNode, health.IsActiveNode, err = ElectedNode(); err != nil {
health.Error = err
Expand Down
51 changes: 51 additions & 0 deletions go/vt/vtorc/process/health_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
Copyright 2024 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package process

import (
"testing"

"github.com/stretchr/testify/require"
_ "modernc.org/sqlite"
)

func TestHealthTest(t *testing.T) {
defer func() {
FirstDiscoveryCycleComplete.Store(false)
ThisNodeHealth = &NodeHealth{}
ResetLastHealthCheckCache()
}()

require.Zero(t, ThisNodeHealth.LastReported)

ThisNodeHealth = &NodeHealth{}
health, err := HealthTest()
require.NoError(t, err)
require.False(t, health.Healthy)
require.False(t, health.DiscoveredOnce)
require.NotZero(t, ThisNodeHealth.LastReported)
ResetLastHealthCheckCache()

ThisNodeHealth = &NodeHealth{}
FirstDiscoveryCycleComplete.Store(true)
health, err = HealthTest()
require.NoError(t, err)
require.True(t, health.Healthy)
require.True(t, health.DiscoveredOnce)
require.NotZero(t, ThisNodeHealth.LastReported)
ResetLastHealthCheckCache()
}
Loading