Skip to content

Commit

Permalink
Ensure all topo read calls consider --topo_read_concurrency
Browse files Browse the repository at this point in the history
Signed-off-by: Tim Vaillancourt <[email protected]>
  • Loading branch information
timvaillancourt committed Dec 4, 2024
1 parent 0b51839 commit 7d318cf
Show file tree
Hide file tree
Showing 16 changed files with 86 additions and 100 deletions.
1 change: 1 addition & 0 deletions go/flags/endtoend/vtbackup.txt
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ Flags:
--topo_global_root string the path of the global topology data in the global topology server
--topo_global_server_address string the address of the global topology server
--topo_implementation string the topology implementation to use
--topo_read_concurrency int Maximum concurrency of topo reads. (default 32)
--topo_zk_auth_file string auth to use when connecting to the zk topo server, file contents should be <scheme>:<auth>, e.g., digest:user:pass
--topo_zk_base_timeout duration zk base timeout (see zk.Connect) (default 30s)
--topo_zk_max_concurrency int maximum number of pending requests to send to a Zookeeper server. (default 64)
Expand Down
2 changes: 1 addition & 1 deletion go/flags/endtoend/vtcombo.txt
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ Flags:
--topo_global_root string the path of the global topology data in the global topology server
--topo_global_server_address string the address of the global topology server
--topo_implementation string the topology implementation to use
--topo_read_concurrency int Concurrency of topo reads. (default 32)
--topo_read_concurrency int Maximum concurrency of topo reads. (default 32)
--topo_zk_auth_file string auth to use when connecting to the zk topo server, file contents should be <scheme>:<auth>, e.g., digest:user:pass
--topo_zk_base_timeout duration zk base timeout (see zk.Connect) (default 30s)
--topo_zk_max_concurrency int maximum number of pending requests to send to a Zookeeper server. (default 64)
Expand Down
2 changes: 1 addition & 1 deletion go/flags/endtoend/vtctld.txt
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ Flags:
--topo_global_root string the path of the global topology data in the global topology server
--topo_global_server_address string the address of the global topology server
--topo_implementation string the topology implementation to use
--topo_read_concurrency int Concurrency of topo reads. (default 32)
--topo_read_concurrency int Maximum concurrency of topo reads. (default 32)
--topo_zk_auth_file string auth to use when connecting to the zk topo server, file contents should be <scheme>:<auth>, e.g., digest:user:pass
--topo_zk_base_timeout duration zk base timeout (see zk.Connect) (default 30s)
--topo_zk_max_concurrency int maximum number of pending requests to send to a Zookeeper server. (default 64)
Expand Down
2 changes: 1 addition & 1 deletion go/flags/endtoend/vtgate.txt
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ Flags:
--topo_global_root string the path of the global topology data in the global topology server
--topo_global_server_address string the address of the global topology server
--topo_implementation string the topology implementation to use
--topo_read_concurrency int Concurrency of topo reads. (default 32)
--topo_read_concurrency int Maximum concurrency of topo reads. (default 32)
--topo_zk_auth_file string auth to use when connecting to the zk topo server, file contents should be <scheme>:<auth>, e.g., digest:user:pass
--topo_zk_base_timeout duration zk base timeout (see zk.Connect) (default 30s)
--topo_zk_max_concurrency int maximum number of pending requests to send to a Zookeeper server. (default 64)
Expand Down
2 changes: 1 addition & 1 deletion go/flags/endtoend/vtorc.txt
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ Flags:
--topo_global_root string the path of the global topology data in the global topology server
--topo_global_server_address string the address of the global topology server
--topo_implementation string the topology implementation to use
--topo_read_concurrency int Concurrency of topo reads. (default 32)
--topo_read_concurrency int Maximum concurrency of topo reads. (default 32)
--topo_zk_auth_file string auth to use when connecting to the zk topo server, file contents should be <scheme>:<auth>, e.g., digest:user:pass
--topo_zk_base_timeout duration zk base timeout (see zk.Connect) (default 30s)
--topo_zk_max_concurrency int maximum number of pending requests to send to a Zookeeper server. (default 64)
Expand Down
1 change: 1 addition & 0 deletions go/flags/endtoend/vttablet.txt
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,7 @@ Flags:
--topo_global_root string the path of the global topology data in the global topology server
--topo_global_server_address string the address of the global topology server
--topo_implementation string the topology implementation to use
--topo_read_concurrency int Maximum concurrency of topo reads. (default 32)
--topo_zk_auth_file string auth to use when connecting to the zk topo server, file contents should be <scheme>:<auth>, e.g., digest:user:pass
--topo_zk_base_timeout duration zk base timeout (see zk.Connect) (default 30s)
--topo_zk_max_concurrency int maximum number of pending requests to send to a Zookeeper server. (default 64)
Expand Down
2 changes: 1 addition & 1 deletion go/vt/discovery/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur
if c == "" {
continue
}
topoWatchers = append(topoWatchers, NewTopologyWatcher(ctx, topoServer, hc, filters, c, refreshInterval, refreshKnownTablets, topo.DefaultConcurrency))
topoWatchers = append(topoWatchers, NewTopologyWatcher(ctx, topoServer, hc, filters, c, refreshInterval, refreshKnownTablets, topo.DefaultReadConcurrency))
}

hc.topoWatchers = topoWatchers
Expand Down
29 changes: 13 additions & 16 deletions go/vt/discovery/topology_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,13 @@ import (
"sync"
"time"

"vitess.io/vitess/go/vt/topo/topoproto"

"vitess.io/vitess/go/vt/key"

"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/trace"

"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/proto/topodata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
)

const (
Expand All @@ -56,7 +53,7 @@ var (
// tabletInfo is used internally by the TopologyWatcher struct.
type tabletInfo struct {
alias string
tablet *topodata.Tablet
tablet *topodatapb.Tablet
}

// TopologyWatcher polls the topology periodically for changes to
Expand All @@ -70,7 +67,7 @@ type TopologyWatcher struct {
cell string
refreshInterval time.Duration
refreshKnownTablets bool
concurrency int
concurrency int64
ctx context.Context
cancelFunc context.CancelFunc
// wg keeps track of all launched Go routines.
Expand All @@ -92,7 +89,7 @@ type TopologyWatcher struct {

// NewTopologyWatcher returns a TopologyWatcher that monitors all
// the tablets in a cell, and reloads them as needed.
func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthCheck, f TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int) *TopologyWatcher {
func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthCheck, f TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int64) *TopologyWatcher {
tw := &TopologyWatcher{
topoServer: topoServer,
healthcheck: hc,
Expand All @@ -112,7 +109,7 @@ func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthC
}

func (tw *TopologyWatcher) getTablets() ([]*topo.TabletInfo, error) {
return tw.topoServer.GetTabletsByCell(tw.ctx, tw.cell, &topo.GetTabletsByCellOptions{Concurrency: tw.concurrency})
return tw.topoServer.GetTabletsByCell(tw.ctx, tw.cell, nil)
}

// Start starts the topology watcher.
Expand Down Expand Up @@ -271,14 +268,14 @@ func (tw *TopologyWatcher) TopoChecksum() uint32 {
// to be applied as an additional filter on the list of tablets returned by its getTablets function.
type TabletFilter interface {
// IsIncluded returns whether tablet is included in this filter
IsIncluded(tablet *topodata.Tablet) bool
IsIncluded(tablet *topodatapb.Tablet) bool
}

// TabletFilters contains filters for tablets.
type TabletFilters []TabletFilter

// IsIncluded returns true if a tablet passes all filters.
func (tf TabletFilters) IsIncluded(tablet *topodata.Tablet) bool {
func (tf TabletFilters) IsIncluded(tablet *topodatapb.Tablet) bool {
for _, filter := range tf {
if !filter.IsIncluded(tablet) {
return false
Expand All @@ -299,7 +296,7 @@ type FilterByShard struct {
type filterShard struct {
keyspace string
shard string
keyRange *topodata.KeyRange // only set if shard is also a KeyRange
keyRange *topodatapb.KeyRange // only set if shard is also a KeyRange
}

// NewFilterByShard creates a new FilterByShard for use by a
Expand Down Expand Up @@ -344,7 +341,7 @@ func NewFilterByShard(filters []string) (*FilterByShard, error) {
}

// IsIncluded returns true iff the tablet's keyspace and shard match what we have.
func (fbs *FilterByShard) IsIncluded(tablet *topodata.Tablet) bool {
func (fbs *FilterByShard) IsIncluded(tablet *topodatapb.Tablet) bool {
canonical, kr, err := topo.ValidateShardName(tablet.Shard)
if err != nil {
log.Errorf("Error parsing shard name %v, will ignore tablet: %v", tablet.Shard, err)
Expand Down Expand Up @@ -384,7 +381,7 @@ func NewFilterByKeyspace(selectedKeyspaces []string) *FilterByKeyspace {
}

// IsIncluded returns true if the tablet's keyspace matches what we have.
func (fbk *FilterByKeyspace) IsIncluded(tablet *topodata.Tablet) bool {
func (fbk *FilterByKeyspace) IsIncluded(tablet *topodatapb.Tablet) bool {
_, exist := fbk.keyspaces[tablet.Keyspace]
return exist
}
Expand All @@ -403,7 +400,7 @@ func NewFilterByTabletTags(tabletTags map[string]string) *FilterByTabletTags {
}

// IsIncluded returns true if the tablet's tags match what we expect.
func (fbtg *FilterByTabletTags) IsIncluded(tablet *topodata.Tablet) bool {
func (fbtg *FilterByTabletTags) IsIncluded(tablet *topodatapb.Tablet) bool {
if fbtg.tags == nil {
return true
}
Expand Down
28 changes: 5 additions & 23 deletions go/vt/topo/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,44 +22,26 @@ import (
"sort"
"sync"

"github.com/spf13/pflag"
"golang.org/x/sync/errgroup"

"vitess.io/vitess/go/constants/sidecar"
"vitess.io/vitess/go/event"
"vitess.io/vitess/go/sqlescape"
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/vterrors"

"vitess.io/vitess/go/event"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/topo/events"

topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/topo/events"
"vitess.io/vitess/go/vt/vterrors"
)

// This file contains keyspace utility functions.

// Default concurrency to use in order to avoid overhwelming the topo server.
var DefaultConcurrency = 32

// shardKeySuffix is the suffix of a shard key.
// The full key looks like this:
// /vitess/global/keyspaces/customer/shards/80-/Shard
const shardKeySuffix = "Shard"

func registerFlags(fs *pflag.FlagSet) {
fs.IntVar(&DefaultConcurrency, "topo_read_concurrency", DefaultConcurrency, "Concurrency of topo reads.")
}

func init() {
servenv.OnParseFor("vtcombo", registerFlags)
servenv.OnParseFor("vtctld", registerFlags)
servenv.OnParseFor("vtgate", registerFlags)
servenv.OnParseFor("vtorc", registerFlags)
}

// KeyspaceInfo is a meta struct that contains metadata to give the
// data more context and convenience. This is the main way we interact
// with a keyspace.
Expand Down Expand Up @@ -210,7 +192,7 @@ func (ts *Server) UpdateKeyspace(ctx context.Context, ki *KeyspaceInfo) error {
type FindAllShardsInKeyspaceOptions struct {
// Concurrency controls the maximum number of concurrent calls to GetShard.
// If <= 0, Concurrency is set to 1.
Concurrency int
Concurrency int64
}

// FindAllShardsInKeyspace reads and returns all the existing shards in a
Expand All @@ -228,7 +210,7 @@ func (ts *Server) FindAllShardsInKeyspace(ctx context.Context, keyspace string,
opt = &FindAllShardsInKeyspaceOptions{}
}
if opt.Concurrency <= 0 {
opt.Concurrency = DefaultConcurrency
opt.Concurrency = DefaultReadConcurrency
}

// Unescape the keyspace name as this can e.g. come from the VSchema where
Expand Down
17 changes: 14 additions & 3 deletions go/vt/topo/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"sync"

"github.com/spf13/pflag"
"golang.org/x/sync/semaphore"

"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/proto/topodata"
Expand Down Expand Up @@ -147,6 +148,10 @@ type Server struct {
// will read the list of addresses for that cell from the
// global cluster and create clients as needed.
cellConns map[string]cellConn

// cellReadSem is a semaphore limiting the number of concurrent read
// operations to all cell-based topos.
cellReadSem *semaphore.Weighted
}

type cellConn struct {
Expand Down Expand Up @@ -181,6 +186,9 @@ var (

FlagBinaries = []string{"vttablet", "vtctl", "vtctld", "vtcombo", "vtgate",
"vtorc", "vtbackup"}

// Default read concurrency to use in order to avoid overhwelming the topo server.
DefaultReadConcurrency int64 = 32
)

func init() {
Expand All @@ -193,6 +201,7 @@ func registerTopoFlags(fs *pflag.FlagSet) {
fs.StringVar(&topoImplementation, "topo_implementation", topoImplementation, "the topology implementation to use")
fs.StringVar(&topoGlobalServerAddress, "topo_global_server_address", topoGlobalServerAddress, "the address of the global topology server")
fs.StringVar(&topoGlobalRoot, "topo_global_root", topoGlobalRoot, "the path of the global topology data in the global topology server")
fs.Int64Var(&DefaultReadConcurrency, "topo_read_concurrency", DefaultReadConcurrency, "Maximum concurrency of topo reads.")
}

// RegisterFactory registers a Factory for an implementation for a Server.
Expand All @@ -208,19 +217,20 @@ func RegisterFactory(name string, factory Factory) {
// NewWithFactory creates a new Server based on the given Factory.
// It also opens the global cell connection.
func NewWithFactory(factory Factory, serverAddress, root string) (*Server, error) {
globalReadSem := semaphore.NewWeighted(DefaultReadConcurrency)
conn, err := factory.Create(GlobalCell, serverAddress, root)
if err != nil {
return nil, err
}
conn = NewStatsConn(GlobalCell, conn)
conn = NewStatsConn(GlobalCell, conn, globalReadSem)

var connReadOnly Conn
if factory.HasGlobalReadOnlyCell(serverAddress, root) {
connReadOnly, err = factory.Create(GlobalReadOnlyCell, serverAddress, root)
if err != nil {
return nil, err
}
connReadOnly = NewStatsConn(GlobalReadOnlyCell, connReadOnly)
connReadOnly = NewStatsConn(GlobalReadOnlyCell, connReadOnly, globalReadSem)
} else {
connReadOnly = conn
}
Expand All @@ -230,6 +240,7 @@ func NewWithFactory(factory Factory, serverAddress, root string) (*Server, error
globalReadOnlyCell: connReadOnly,
factory: factory,
cellConns: make(map[string]cellConn),
cellReadSem: semaphore.NewWeighted(DefaultReadConcurrency),
}, nil
}

Expand Down Expand Up @@ -302,7 +313,7 @@ func (ts *Server) ConnForCell(ctx context.Context, cell string) (Conn, error) {
conn, err := ts.factory.Create(cell, ci.ServerAddress, ci.Root)
switch {
case err == nil:
conn = NewStatsConn(cell, conn)
conn = NewStatsConn(cell, conn, ts.cellReadSem)
ts.cellConns[cell] = cellConn{ci, conn}
return conn, nil
case IsErrType(err, NoNode):
Expand Down
9 changes: 0 additions & 9 deletions go/vt/topo/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -658,16 +658,8 @@ func (ts *Server) GetTabletsByShardCell(ctx context.Context, keyspace, shard str
}
}

// Divide the concurrency limit by the number of cells. If there are more
// cells than the limit, default to concurrency of 1.
cellConcurrency := 1
if len(cells) < DefaultConcurrency {
cellConcurrency = DefaultConcurrency / len(cells)
}

mu := sync.Mutex{}
eg, ctx := errgroup.WithContext(ctx)
eg.SetLimit(DefaultConcurrency)

tablets := make([]*TabletInfo, 0, len(cells))
var kss *KeyspaceShard
Expand All @@ -678,7 +670,6 @@ func (ts *Server) GetTabletsByShardCell(ctx context.Context, keyspace, shard str
}
}
options := &GetTabletsByCellOptions{
Concurrency: cellConcurrency,
KeyspaceShard: kss,
}
for _, cell := range cells {
Expand Down
Loading

0 comments on commit 7d318cf

Please sign in to comment.