Skip to content

Commit

Permalink
srvtopo: Setup metrics in init() function (#15304)
Browse files Browse the repository at this point in the history
Signed-off-by: Dirkjan Bussink <[email protected]>
  • Loading branch information
dbussink authored Feb 23, 2024
1 parent a78ccbf commit 229bbaa
Show file tree
Hide file tree
Showing 20 changed files with 129 additions and 81 deletions.
10 changes: 7 additions & 3 deletions go/cmd/vtcombo/cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (

"vitess.io/vitess/go/acl"
"vitess.io/vitess/go/mysql/replication"
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/dbconfigs"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/logutil"
Expand Down Expand Up @@ -84,6 +85,8 @@ In particular, it contains:
tabletTypesToWait []topodatapb.TabletType

env *vtenv.Environment

srvTopoCounts *stats.CountersWithSingleLabel
)

func init() {
Expand Down Expand Up @@ -131,6 +134,7 @@ func init() {
if err != nil {
log.Fatalf("unable to initialize env: %v", err)
}
srvTopoCounts = stats.NewCountersWithSingleLabel("ResilientSrvTopoServer", "Resilient srvtopo server operations", "type")
}

func startMysqld(uid uint32) (mysqld *mysqlctl.Mysqld, cnf *mysqlctl.Mycnf, err error) {
Expand Down Expand Up @@ -234,7 +238,7 @@ func run(cmd *cobra.Command, args []string) (err error) {
// to be the "internal" protocol that InitTabletMap registers.
cmd.Flags().Set("tablet_manager_protocol", "internal")
cmd.Flags().Set("tablet_protocol", "internal")
uid, err := vtcombo.InitTabletMap(env, ts, &tpb, mysqld, &dbconfigs.GlobalDBConfigs, schemaDir, startMysql)
uid, err := vtcombo.InitTabletMap(env, ts, &tpb, mysqld, &dbconfigs.GlobalDBConfigs, schemaDir, startMysql, srvTopoCounts)
if err != nil {
// ensure we start mysql in the event we fail here
if startMysql {
Expand All @@ -260,7 +264,7 @@ func run(cmd *cobra.Command, args []string) (err error) {
}

wr := wrangler.New(env, logutil.NewConsoleLogger(), ts, nil)
newUID, err := vtcombo.CreateKs(ctx, env, ts, &tpb, mysqld, &dbconfigs.GlobalDBConfigs, schemaDir, ks, true, uid, wr)
newUID, err := vtcombo.CreateKs(ctx, env, ts, &tpb, mysqld, &dbconfigs.GlobalDBConfigs, schemaDir, ks, true, uid, wr, srvTopoCounts)
if err != nil {
return err
}
Expand Down Expand Up @@ -297,7 +301,7 @@ func run(cmd *cobra.Command, args []string) (err error) {
}

// vtgate configuration and init
resilientServer = srvtopo.NewResilientServer(context.Background(), ts, "ResilientSrvTopoServer")
resilientServer = srvtopo.NewResilientServer(context.Background(), ts, srvTopoCounts)

tabletTypes := make([]topodatapb.TabletType, 0, 1)
if len(tabletTypesToWait) != 0 {
Expand Down
4 changes: 3 additions & 1 deletion go/cmd/vtexplain/cli/vtexplain.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"os"

"vitess.io/vitess/go/acl"
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/topo/memorytopo"
Expand Down Expand Up @@ -186,7 +187,8 @@ func parseAndRun() error {
}
ctx := context.Background()
ts := memorytopo.NewServer(ctx, vtexplain.Cell)
vte, err := vtexplain.Init(ctx, env, ts, vschema, schema, ksShardMap, opts)
srvTopoCounts := stats.NewCountersWithSingleLabel("", "Resilient srvtopo server operations", "type")
vte, err := vtexplain.Init(ctx, env, ts, vschema, schema, ksShardMap, opts, srvTopoCounts)
if err != nil {
return err
}
Expand Down
9 changes: 8 additions & 1 deletion go/cmd/vtgate/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"vitess.io/vitess/go/acl"
"vitess.io/vitess/go/exit"
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/srvtopo"
Expand Down Expand Up @@ -73,8 +74,14 @@ var (
PreRunE: servenv.CobraPreRunE,
RunE: run,
}

srvTopoCounts *stats.CountersWithSingleLabel
)

func init() {
srvTopoCounts = stats.NewCountersWithSingleLabel("ResilientSrvTopoServer", "Resilient srvtopo server operations", "type")
}

// CheckCellFlags will check validation of cell and cells_to_watch flag
// it will help to avoid strange behaviors when vtgate runs but actually does not work
func CheckCellFlags(ctx context.Context, serv srvtopo.Server, cell string, cellsToWatch string) error {
Expand Down Expand Up @@ -139,7 +146,7 @@ func run(cmd *cobra.Command, args []string) error {
ts := topo.Open()
defer ts.Close()

resilientServer = srvtopo.NewResilientServer(context.Background(), ts, "ResilientSrvTopoServer")
resilientServer = srvtopo.NewResilientServer(context.Background(), ts, srvTopoCounts)

tabletTypes := make([]topodatapb.TabletType, 0, 1)
for _, tt := range tabletTypesToWait {
Expand Down
13 changes: 10 additions & 3 deletions go/cmd/vttablet/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"vitess.io/vitess/go/acl"
"vitess.io/vitess/go/mysql/collations"
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/binlog"
"vitess.io/vitess/go/vt/dbconfigs"
"vitess.io/vitess/go/vt/log"
Expand Down Expand Up @@ -102,8 +103,14 @@ vttablet \
PreRunE: servenv.CobraPreRunE,
RunE: run,
}

srvTopoCounts *stats.CountersWithSingleLabel
)

func init() {
srvTopoCounts = stats.NewCountersWithSingleLabel("TabletSrvTopo", "Resilient srvtopo server operations", "type")
}

func run(cmd *cobra.Command, args []string) error {
servenv.Init()

Expand All @@ -129,7 +136,7 @@ func run(cmd *cobra.Command, args []string) error {
}

ts := topo.Open()
qsc, err := createTabletServer(context.Background(), env, config, ts, tabletAlias)
qsc, err := createTabletServer(context.Background(), env, config, ts, tabletAlias, srvTopoCounts)
if err != nil {
ts.Close()
return err
Expand Down Expand Up @@ -249,7 +256,7 @@ func extractOnlineDDL() error {
return nil
}

func createTabletServer(ctx context.Context, env *vtenv.Environment, config *tabletenv.TabletConfig, ts *topo.Server, tabletAlias *topodatapb.TabletAlias) (*tabletserver.TabletServer, error) {
func createTabletServer(ctx context.Context, env *vtenv.Environment, config *tabletenv.TabletConfig, ts *topo.Server, tabletAlias *topodatapb.TabletAlias, srvTopoCounts *stats.CountersWithSingleLabel) (*tabletserver.TabletServer, error) {
if tableACLConfig != "" {
// To override default simpleacl, other ACL plugins must set themselves to be default ACL factory
tableacl.Register("simpleacl", &simpleacl.Factory{})
Expand All @@ -258,7 +265,7 @@ func createTabletServer(ctx context.Context, env *vtenv.Environment, config *tab
}

// creates and registers the query service
qsc := tabletserver.NewTabletServer(ctx, env, "", config, ts, tabletAlias)
qsc := tabletserver.NewTabletServer(ctx, env, "", config, ts, tabletAlias, srvTopoCounts)
servenv.OnRun(func() {
qsc.Register()
addStatusParts(qsc)
Expand Down
4 changes: 3 additions & 1 deletion go/vt/srvtopo/discover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"testing"
"time"

"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/topo/memorytopo"

querypb "vitess.io/vitess/go/vt/proto/query"
Expand Down Expand Up @@ -59,7 +60,8 @@ func TestFindAllTargets(t *testing.T) {
srvTopoCacheTTL = 1 * time.Second

}()
rs := NewResilientServer(ctx, ts, "TestFindAllKeyspaceShards")
counts := stats.NewCountersWithSingleLabel("", "Resilient srvtopo server operations", "type")
rs := NewResilientServer(ctx, ts, counts)

// No keyspace / shards.
ks, err := FindAllTargets(ctx, rs, "cell1", []topodatapb.TabletType{topodatapb.TabletType_PRIMARY})
Expand Down
10 changes: 1 addition & 9 deletions go/vt/srvtopo/resilient_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ const (
// - return the last known value of the data if there is an error
type ResilientServer struct {
topoServer *topo.Server
counts *stats.CountersWithSingleLabel

*SrvKeyspaceWatcher
*SrvVSchemaWatcher
Expand All @@ -79,20 +78,13 @@ type ResilientServer struct {

// NewResilientServer creates a new ResilientServer
// based on the provided topo.Server.
func NewResilientServer(ctx context.Context, base *topo.Server, counterPrefix string) *ResilientServer {
func NewResilientServer(ctx context.Context, base *topo.Server, counts *stats.CountersWithSingleLabel) *ResilientServer {
if srvTopoCacheRefresh > srvTopoCacheTTL {
log.Fatalf("srv_topo_cache_refresh must be less than or equal to srv_topo_cache_ttl")
}

metric := ""
if counterPrefix != "" {
metric = counterPrefix + "Counts"
}
counts := stats.NewCountersWithSingleLabel(metric, "Resilient srvtopo server operations", "type")

return &ResilientServer{
topoServer: base,
counts: counts,
SrvKeyspaceWatcher: NewSrvKeyspaceWatcher(ctx, base, counts, srvTopoCacheRefresh, srvTopoCacheTTL),
SrvVSchemaWatcher: NewSrvVSchemaWatcher(ctx, base, counts, srvTopoCacheRefresh, srvTopoCacheTTL),
SrvKeyspaceNamesQuery: NewSrvKeyspaceNamesQuery(base, counts, srvTopoCacheRefresh, srvTopoCacheTTL),
Expand Down
31 changes: 19 additions & 12 deletions go/vt/srvtopo/resilient_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"github.com/google/safehtml/template"

"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/key"

"github.com/stretchr/testify/assert"
Expand All @@ -53,7 +54,8 @@ func TestGetSrvKeyspace(t *testing.T) {
srvTopoCacheRefresh = 1 * time.Second
}()

rs := NewResilientServer(ctx, ts, "TestGetSrvKeyspace")
counts := stats.NewCountersWithSingleLabel("", "Resilient srvtopo server operations", "type")
rs := NewResilientServer(ctx, ts, counts)

// Ask for a not-yet-created keyspace
_, err := rs.GetSrvKeyspace(context.Background(), "test_cell", "test_ks")
Expand Down Expand Up @@ -175,7 +177,7 @@ func TestGetSrvKeyspace(t *testing.T) {
// Now simulate a topo service error and see that the last value is
// cached for at least half of the expected ttl.
errorTestStart := time.Now()
errorReqsBefore := rs.counts.Counts()[errorCategory]
errorReqsBefore := counts.Counts()[errorCategory]
forceErr := topo.NewError(topo.Timeout, "test topo error")
factory.SetError(forceErr)

Expand Down Expand Up @@ -271,7 +273,7 @@ func TestGetSrvKeyspace(t *testing.T) {

// Check that the expected number of errors were counted during the
// interval
errorReqs := rs.counts.Counts()[errorCategory]
errorReqs := counts.Counts()[errorCategory]
expectedErrors := int64(time.Since(errorTestStart) / srvTopoCacheRefresh)
if errorReqs-errorReqsBefore > expectedErrors {
t.Errorf("expected <= %v error requests got %d", expectedErrors, errorReqs-errorReqsBefore)
Expand Down Expand Up @@ -370,7 +372,8 @@ func TestSrvKeyspaceCachedError(t *testing.T) {
srvTopoCacheTTL = 1 * time.Second
srvTopoCacheRefresh = 1 * time.Second
}()
rs := NewResilientServer(ctx, ts, "TestSrvKeyspaceCachedErrors")
counts := stats.NewCountersWithSingleLabel("", "Resilient srvtopo server operations", "type")
rs := NewResilientServer(ctx, ts, counts)

// Ask for an unknown keyspace, should get an error.
_, err := rs.GetSrvKeyspace(ctx, "test_cell", "unknown_ks")
Expand Down Expand Up @@ -401,7 +404,8 @@ func TestGetSrvKeyspaceCreated(t *testing.T) {
defer cancel()
ts := memorytopo.NewServer(ctx, "test_cell")
defer ts.Close()
rs := NewResilientServer(ctx, ts, "TestGetSrvKeyspaceCreated")
counts := stats.NewCountersWithSingleLabel("", "Resilient srvtopo server operations", "type")
rs := NewResilientServer(ctx, ts, counts)

// Set SrvKeyspace with value.
want := &topodatapb.SrvKeyspace{}
Expand Down Expand Up @@ -435,7 +439,8 @@ func TestWatchSrvVSchema(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ts := memorytopo.NewServer(ctx, "test_cell")
rs := NewResilientServer(ctx, ts, "TestWatchSrvVSchema")
counts := stats.NewCountersWithSingleLabel("", "Resilient srvtopo server operations", "type")
rs := NewResilientServer(ctx, ts, counts)

// mu protects watchValue and watchErr.
mu := sync.Mutex{}
Expand Down Expand Up @@ -529,7 +534,8 @@ func TestGetSrvKeyspaceNames(t *testing.T) {
srvTopoCacheTTL = 1 * time.Second
srvTopoCacheRefresh = 1 * time.Second
}()
rs := NewResilientServer(ctx, ts, "TestGetSrvKeyspaceNames")
counts := stats.NewCountersWithSingleLabel("", "Resilient srvtopo server operations", "type")
rs := NewResilientServer(ctx, ts, counts)

// Set SrvKeyspace with value
want := &topodatapb.SrvKeyspace{}
Expand Down Expand Up @@ -614,7 +620,7 @@ func TestGetSrvKeyspaceNames(t *testing.T) {

// Check that we only checked the topo service 1 or 2 times during the
// period where we got the cached error.
cachedReqs, ok := rs.counts.Counts()[cachedCategory]
cachedReqs, ok := counts.Counts()[cachedCategory]
if !ok || cachedReqs > 2 {
t.Errorf("expected <= 2 cached requests got %v", cachedReqs)
}
Expand All @@ -640,7 +646,7 @@ func TestGetSrvKeyspaceNames(t *testing.T) {
t.Errorf("GetSrvKeyspaceNames got %v want %v", names, wantNames)
}

errorReqs, ok := rs.counts.Counts()[errorCategory]
errorReqs, ok := counts.Counts()[errorCategory]
if !ok || errorReqs == 0 {
t.Errorf("expected non-zero error requests got %v", errorReqs)
}
Expand Down Expand Up @@ -684,8 +690,8 @@ func TestSrvKeyspaceWatcher(t *testing.T) {
srvTopoCacheTTL = 1 * time.Second
srvTopoCacheRefresh = 1 * time.Second
}()

rs := NewResilientServer(ctx, ts, "TestGetSrvKeyspaceWatcher")
counts := stats.NewCountersWithSingleLabel("", "Resilient srvtopo server operations", "type")
rs := NewResilientServer(ctx, ts, counts)

var wmu sync.Mutex
var wseen []watched
Expand Down Expand Up @@ -811,7 +817,8 @@ func TestSrvKeyspaceListener(t *testing.T) {
srvTopoCacheRefresh = 1 * time.Second
}()

rs := NewResilientServer(ctx, ts, "TestGetSrvKeyspaceListener")
counts := stats.NewCountersWithSingleLabel("", "Resilient srvtopo server operations", "type")
rs := NewResilientServer(ctx, ts, counts)

cancelCtx, cancelFunc := context.WithCancel(context.Background())
var callbackCount atomic.Int32
Expand Down
8 changes: 5 additions & 3 deletions go/vt/srvtopo/resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/topo/memorytopo"
Expand All @@ -33,10 +34,11 @@ import (
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

func initResolver(t *testing.T, ctx context.Context, name string) *Resolver {
func initResolver(t *testing.T, ctx context.Context) *Resolver {
cell := "cell1"
ts := memorytopo.NewServer(ctx, cell)
rs := NewResilientServer(ctx, ts, name)
counts := stats.NewCountersWithSingleLabel("", "Resilient srvtopo server operations", "type")
rs := NewResilientServer(ctx, ts, counts)

// Create sharded keyspace and shards.
if err := ts.CreateKeyspace(ctx, "sks", &topodatapb.Keyspace{}); err != nil {
Expand Down Expand Up @@ -97,7 +99,7 @@ func initResolver(t *testing.T, ctx context.Context, name string) *Resolver {
func TestResolveDestinations(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
resolver := initResolver(t, ctx, "TestResolveDestinations")
resolver := initResolver(t, ctx)

id1 := &querypb.Value{
Type: sqltypes.VarChar,
Expand Down
4 changes: 3 additions & 1 deletion go/vt/vtadmin/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/gorilla/mux"
"github.com/patrickmn/go-cache"

"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/vtenv"

"vitess.io/vitess/go/sets"
Expand Down Expand Up @@ -2384,7 +2385,8 @@ func (api *API) VTExplain(ctx context.Context, req *vtadminpb.VTExplainRequest)
}

ts := memorytopo.NewServer(ctx, vtexplain.Cell)
vte, err := vtexplain.Init(ctx, api.env, ts, srvVSchema, schema, shardMap, &vtexplain.Options{ReplicationMode: "ROW"})
srvTopoCounts := stats.NewCountersWithSingleLabel("", "Resilient srvtopo server operations", "type")
vte, err := vtexplain.Init(ctx, api.env, ts, srvVSchema, schema, shardMap, &vtexplain.Options{ReplicationMode: "ROW"}, srvTopoCounts)
if err != nil {
return nil, fmt.Errorf("error initilaizing vtexplain: %w", err)
}
Expand Down
Loading

0 comments on commit 229bbaa

Please sign in to comment.