Skip to content

Commit

Permalink
Improve efficiency of vtorc topo calls (#17071)
Browse files Browse the repository at this point in the history
Signed-off-by: Tim Vaillancourt <[email protected]>
Co-authored-by: Matt Lord <[email protected]>
  • Loading branch information
timvaillancourt and mattlord authored Nov 6, 2024
1 parent 5b699d8 commit a575982
Show file tree
Hide file tree
Showing 6 changed files with 410 additions and 55 deletions.
1 change: 1 addition & 0 deletions go/flags/endtoend/vtorc.txt
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ Flags:
--topo_global_root string the path of the global topology data in the global topology server
--topo_global_server_address string the address of the global topology server
--topo_implementation string the topology implementation to use
--topo_read_concurrency int Concurrency of topo reads. (default 32)
--topo_zk_auth_file string auth to use when connecting to the zk topo server, file contents should be <scheme>:<auth>, e.g., digest:user:pass
--topo_zk_base_timeout duration zk base timeout (see zk.Connect) (default 30s)
--topo_zk_max_concurrency int maximum number of pending requests to send to a Zookeeper server. (default 64)
Expand Down
1 change: 1 addition & 0 deletions go/vt/topo/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func init() {
servenv.OnParseFor("vtcombo", registerFlags)
servenv.OnParseFor("vtctld", registerFlags)
servenv.OnParseFor("vtgate", registerFlags)
servenv.OnParseFor("vtorc", registerFlags)
}

// KeyspaceInfo is a meta struct that contains metadata to give the
Expand Down
86 changes: 77 additions & 9 deletions go/vt/topo/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,28 +20,27 @@ import (
"context"
"encoding/hex"
"errors"
"fmt"
"path"
"slices"
"sort"
"strings"
"sync"
"time"

"vitess.io/vitess/go/constants/sidecar"
"vitess.io/vitess/go/protoutil"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"
"golang.org/x/sync/errgroup"

"vitess.io/vitess/go/constants/sidecar"
"vitess.io/vitess/go/event"
"vitess.io/vitess/go/protoutil"
"vitess.io/vitess/go/trace"
"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/log"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/topo/events"
"vitess.io/vitess/go/vt/topo/topoproto"

topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/vterrors"
)

const (
Expand Down Expand Up @@ -553,7 +552,6 @@ func (ts *Server) FindAllTabletAliasesInShardByCell(ctx context.Context, keyspac
span.Annotate("shard", shard)
span.Annotate("num_cells", len(cells))
defer span.Finish()
ctx = trace.NewContext(ctx, span)
var err error

// The caller intents to all cells
Expand Down Expand Up @@ -597,7 +595,7 @@ func (ts *Server) FindAllTabletAliasesInShardByCell(ctx context.Context, keyspac
case IsErrType(err, NoNode):
// There is no shard replication for this shard in this cell. NOOP
default:
rec.RecordError(vterrors.Wrap(err, fmt.Sprintf("GetShardReplication(%v, %v, %v) failed.", cell, keyspace, shard)))
rec.RecordError(vterrors.Wrapf(err, "GetShardReplication(%v, %v, %v) failed.", cell, keyspace, shard))
return
}
}(cell)
Expand All @@ -616,6 +614,76 @@ func (ts *Server) FindAllTabletAliasesInShardByCell(ctx context.Context, keyspac
return result, err
}

// GetTabletsByShard returns the tablets in the given shard using all cells.
// It can return ErrPartialResult if it couldn't read all the cells, or all
// the individual tablets, in which case the result is valid, but partial.
func (ts *Server) GetTabletsByShard(ctx context.Context, keyspace, shard string) ([]*TabletInfo, error) {
return ts.GetTabletsByShardCell(ctx, keyspace, shard, nil)
}

// GetTabletsByShardCell returns the tablets in the given shard. It can return
// ErrPartialResult if it couldn't read all the cells, or all the individual
// tablets, in which case the result is valid, but partial.
func (ts *Server) GetTabletsByShardCell(ctx context.Context, keyspace, shard string, cells []string) ([]*TabletInfo, error) {
span, ctx := trace.NewSpan(ctx, "topo.GetTabletsByShardCell")
span.Annotate("keyspace", keyspace)
span.Annotate("shard", shard)
span.Annotate("num_cells", len(cells))
defer span.Finish()
var err error

if len(cells) == 0 {
cells, err = ts.GetCellInfoNames(ctx)
if err != nil {
return nil, err
}
if len(cells) == 0 { // Nothing to do
return nil, nil
}
}

// Divide the concurrency limit by the number of cells. If there are more
// cells than the limit, default to concurrency of 1.
cellConcurrency := 1
if len(cells) < DefaultConcurrency {
cellConcurrency = DefaultConcurrency / len(cells)
}

mu := sync.Mutex{}
eg, ctx := errgroup.WithContext(ctx)
eg.SetLimit(DefaultConcurrency)

tablets := make([]*TabletInfo, 0, len(cells))
var kss *KeyspaceShard
if keyspace != "" {
kss = &KeyspaceShard{
Keyspace: keyspace,
Shard: shard,
}
}
options := &GetTabletsByCellOptions{
Concurrency: cellConcurrency,
KeyspaceShard: kss,
}
for _, cell := range cells {
eg.Go(func() error {
t, err := ts.GetTabletsByCell(ctx, cell, options)
if err != nil {
return vterrors.Wrapf(err, "GetTabletsByCell for %v failed.", cell)
}
mu.Lock()
defer mu.Unlock()
tablets = append(tablets, t...)
return nil
})
}
if err := eg.Wait(); err != nil {
log.Warningf("GetTabletsByShardCell(%v,%v): got partial result: %v", keyspace, shard, err)
return tablets, NewError(PartialResult, shard)
}
return tablets, nil
}

// GetTabletMapForShard returns the tablets for a shard. It can return
// ErrPartialResult if it couldn't read all the cells, or all
// the individual tablets, in which case the map is valid, but partial.
Expand Down
21 changes: 18 additions & 3 deletions go/vt/topo/tablet.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,9 @@ func (ts *Server) GetTabletAliasesByCell(ctx context.Context, cell string) ([]*t
type GetTabletsByCellOptions struct {
// Concurrency controls the maximum number of concurrent calls to GetTablet.
Concurrency int
// KeyspaceShard is the optional keyspace/shard that tablets must match.
// An empty shard value will match all shards in the keyspace.
KeyspaceShard *KeyspaceShard
}

// GetTabletsByCell returns all the tablets in the cell.
Expand Down Expand Up @@ -263,15 +266,27 @@ func (ts *Server) GetTabletsByCell(ctx context.Context, cellAlias string, opt *G
return nil, err
}

tablets := make([]*TabletInfo, len(listResults))
var capHint int
if opt != nil && opt.KeyspaceShard == nil {
capHint = len(listResults)
}

tablets := make([]*TabletInfo, 0, capHint)
for n := range listResults {
tablet := &topodatapb.Tablet{}
if err := tablet.UnmarshalVT(listResults[n].Value); err != nil {
return nil, err
}
tablets[n] = &TabletInfo{Tablet: tablet, version: listResults[n].Version}
if opt != nil && opt.KeyspaceShard != nil && opt.KeyspaceShard.Keyspace != "" {
if opt.KeyspaceShard.Keyspace != tablet.Keyspace {
continue
}
if opt.KeyspaceShard.Shard != "" && opt.KeyspaceShard.Shard != tablet.Shard {
continue
}
}
tablets = append(tablets, &TabletInfo{Tablet: tablet, version: listResults[n].Version})
}

return tablets, nil
}

Expand Down
Loading

0 comments on commit a575982

Please sign in to comment.