Skip to content

Commit

Permalink
require topo for Healthy: true
Browse files Browse the repository at this point in the history
Signed-off-by: Tim Vaillancourt <[email protected]>
  • Loading branch information
timvaillancourt committed Oct 31, 2024
1 parent 1b0a902 commit fed73a4
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 42 deletions.
13 changes: 7 additions & 6 deletions go/vt/vtorc/logic/keyspace_shard_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,16 @@ import (
)

// RefreshAllKeyspacesAndShards reloads the keyspace and shard information for the keyspaces that vtorc is concerned with.
func RefreshAllKeyspacesAndShards() {
func RefreshAllKeyspacesAndShards(ctx context.Context) error {
var keyspaces []string
if len(clustersToWatch) == 0 { // all known keyspaces
ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
ctx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer cancel()
var err error
// Get all the keyspaces
keyspaces, err = ts.GetKeyspaces(ctx)
if err != nil {
log.Error(err)
return
return err
}
} else {
// Parse input and build list of keyspaces
Expand All @@ -55,14 +54,14 @@ func RefreshAllKeyspacesAndShards() {
}
if len(keyspaces) == 0 {
log.Errorf("Found no keyspaces for input: %+v", clustersToWatch)
return
return nil
}
}

// Sort the list of keyspaces.
// The list can have duplicates because the input to clusters to watch may have multiple shards of the same keyspace
sort.Strings(keyspaces)
refreshCtx, refreshCancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
refreshCtx, refreshCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer refreshCancel()
var wg sync.WaitGroup
for idx, keyspace := range keyspaces {
Expand All @@ -83,6 +82,8 @@ func RefreshAllKeyspacesAndShards() {
}(keyspace)
}
wg.Wait()

return nil
}

// RefreshKeyspaceAndShard refreshes the keyspace record and shard record for the given keyspace and shard.
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtorc/logic/keyspace_shard_discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func TestRefreshAllKeyspaces(t *testing.T) {
// Set clusters to watch to only watch ks1 and ks3
onlyKs1and3 := []string{"ks1/-80", "ks3/-80", "ks3/80-"}
clustersToWatch = onlyKs1and3
RefreshAllKeyspacesAndShards()
RefreshAllKeyspacesAndShards(context.Background())

// Verify that we only have ks1 and ks3 in vtorc's db.
verifyKeyspaceInfo(t, "ks1", keyspaceDurabilityNone, "")
Expand All @@ -107,7 +107,7 @@ func TestRefreshAllKeyspaces(t *testing.T) {
clustersToWatch = nil
// Change the durability policy of ks1
reparenttestutil.SetKeyspaceDurability(ctx, t, ts, "ks1", "semi_sync")
RefreshAllKeyspacesAndShards()
RefreshAllKeyspacesAndShards(context.Background())

// Verify that all the keyspaces are correctly reloaded
verifyKeyspaceInfo(t, "ks1", keyspaceDurabilitySemiSync, "")
Expand Down
29 changes: 14 additions & 15 deletions go/vt/vtorc/logic/tablet_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"time"

"github.com/spf13/pflag"

"google.golang.org/protobuf/encoding/prototext"
"google.golang.org/protobuf/proto"

Expand All @@ -39,7 +38,6 @@ import (
"vitess.io/vitess/go/vt/vtorc/config"
"vitess.io/vitess/go/vt/vtorc/db"
"vitess.io/vitess/go/vt/vtorc/inst"
"vitess.io/vitess/go/vt/vtorc/process"
"vitess.io/vitess/go/vt/vttablet/tmclient"

topodatapb "vitess.io/vitess/go/vt/proto/topodata"
Expand Down Expand Up @@ -77,29 +75,29 @@ func OpenTabletDiscovery() <-chan time.Time {

// populateAllInformation initializes all the information for VTOrc to function.
func populateAllInformation() {
refreshAllInformation()
// We have completed one full discovery cycle. We should update the process health.
process.FirstDiscoveryCycleComplete.Store(true)
// We can wait forever (context.Background()) for this call.
if err := refreshAllInformation(context.Background()); err != nil {
log.Errorf("failed to initialize topology information: %+v", err)
}
}

// refreshAllTablets reloads the tablets from topo and discovers the ones which haven't been refreshed in a while
func refreshAllTablets() {
refreshTabletsUsing(func(tabletAlias string) {
func refreshAllTablets(ctx context.Context) error {
return refreshTabletsUsing(ctx, func(tabletAlias string) {
DiscoverInstance(tabletAlias, false /* forceDiscovery */)
}, false /* forceRefresh */)
}

func refreshTabletsUsing(loader func(tabletAlias string), forceRefresh bool) {
func refreshTabletsUsing(ctx context.Context, loader func(tabletAlias string), forceRefresh bool) error {
if len(clustersToWatch) == 0 { // all known clusters
ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
ctx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer cancel()
cells, err := ts.GetKnownCells(ctx)
if err != nil {
log.Error(err)
return
return err
}

refreshCtx, refreshCancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
refreshCtx, refreshCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer refreshCancel()
var wg sync.WaitGroup
for _, cell := range cells {
Expand All @@ -120,7 +118,7 @@ func refreshTabletsUsing(loader func(tabletAlias string), forceRefresh bool) {
keyspaceShards = append(keyspaceShards, &topo.KeyspaceShard{Keyspace: input[0], Shard: input[1]})
} else {
// Assume this is a keyspace and find all shards in keyspace
ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
ctx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer cancel()
shards, err := ts.GetShardNames(ctx, ks)
if err != nil {
Expand All @@ -139,9 +137,9 @@ func refreshTabletsUsing(loader func(tabletAlias string), forceRefresh bool) {
}
if len(keyspaceShards) == 0 {
log.Errorf("Found no keyspaceShards for input: %+v", clustersToWatch)
return
return nil
}
refreshCtx, refreshCancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
refreshCtx, refreshCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer refreshCancel()
var wg sync.WaitGroup
for _, ks := range keyspaceShards {
Expand All @@ -153,6 +151,7 @@ func refreshTabletsUsing(loader func(tabletAlias string), forceRefresh bool) {
}
wg.Wait()
}
return nil
}

func refreshTabletsInCell(ctx context.Context, cell string, loader func(tabletAlias string), forceRefresh bool) {
Expand Down
5 changes: 2 additions & 3 deletions go/vt/vtorc/logic/tablet_discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,11 +381,10 @@ func TestProcessHealth(t *testing.T) {
_, discoveredOnce := process.HealthTest()
require.False(t, discoveredOnce)
ts = memorytopo.NewServer(context.Background(), cell1)
populateAllInformation()
require.True(t, process.FirstDiscoveryCycleComplete.Load())
require.False(t, process.FirstDiscoveryCycleComplete.Load())
// Verify after we populate all information, we have the first DiscoveredOnce field true.
_, discoveredOnce = process.HealthTest()
require.True(t, discoveredOnce)
require.False(t, discoveredOnce)
}

func TestSetReadOnly(t *testing.T) {
Expand Down
38 changes: 23 additions & 15 deletions go/vt/vtorc/logic/vtorc.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package logic

import (
"context"
"os"
"os/signal"
"sync"
Expand All @@ -26,6 +27,7 @@ import (

"github.com/patrickmn/go-cache"
"github.com/sjmudd/stopwatch"
"golang.org/x/sync/errgroup"

"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/log"
Expand All @@ -35,6 +37,7 @@ import (
"vitess.io/vitess/go/vt/vtorc/discovery"
"vitess.io/vitess/go/vt/vtorc/inst"
ometrics "vitess.io/vitess/go/vt/vtorc/metrics"
"vitess.io/vitess/go/vt/vtorc/process"
"vitess.io/vitess/go/vt/vtorc/util"
)

Expand Down Expand Up @@ -328,30 +331,35 @@ func ContinuousDiscovery() {
go inst.SnapshotTopologies()
}()
case <-tabletTopoTick:
refreshAllInformation()
timeout := time.Second * time.Duration(config.Config.TopoInformationRefreshSeconds)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
if err := refreshAllInformation(ctx); err != nil {
log.Errorf("failed to refresh topo information: %+v", err)
}
cancel()
}
}
}

// refreshAllInformation refreshes both shard and tablet information. This is meant to be run on tablet topo ticks.
func refreshAllInformation() {
// Create a wait group
var wg sync.WaitGroup
func refreshAllInformation(ctx context.Context) error {
// Create an errgroup
eg, ctx := errgroup.WithContext(ctx)

// Refresh all keyspace information.
wg.Add(1)
go func() {
defer wg.Done()
RefreshAllKeyspacesAndShards()
}()
eg.Go(func() error {
return RefreshAllKeyspacesAndShards(ctx)
})

// Refresh all tablets.
wg.Add(1)
go func() {
defer wg.Done()
refreshAllTablets()
}()
eg.Go(func() error {
return refreshAllTablets(ctx)
})

// Wait for both the refreshes to complete
wg.Wait()
err := eg.Wait()
if err == nil {
process.FirstDiscoveryCycleComplete.Store(true)
}
return err
}
2 changes: 1 addition & 1 deletion go/vt/vtorc/process/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func writeHealthToDatabase() bool {
func HealthTest() (health *NodeHealth, discoveredOnce bool) {
ThisNodeHealth.LastReported = time.Now()
discoveredOnce = FirstDiscoveryCycleComplete.Load()
ThisNodeHealth.Healthy = writeHealthToDatabase()
ThisNodeHealth.Healthy = discoveredOnce && writeHealthToDatabase()

return ThisNodeHealth, discoveredOnce
}

0 comments on commit fed73a4

Please sign in to comment.