Skip to content

Commit

Permalink
allow tablet picker to exclude specified tablets from its candidate l…
Browse files Browse the repository at this point in the history
…ist (#14224)

Signed-off-by: Priya Bibra <[email protected]>
  • Loading branch information
pbibra authored Oct 23, 2023
1 parent ad26d15 commit 0f21a0b
Show file tree
Hide file tree
Showing 4 changed files with 230 additions and 45 deletions.
23 changes: 19 additions & 4 deletions go/vt/discovery/tablet_picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ type TabletPicker struct {
inOrder bool
cellPref TabletPickerCellPreference
localCellInfo localCellInfo
// This map is keyed on the results of TabletAlias.String().
ignoreTablets map[string]struct{}
}

// NewTabletPicker returns a TabletPicker.
Expand All @@ -144,6 +146,7 @@ func NewTabletPicker(
cells []string,
localCell, keyspace, shard, tabletTypesStr string,
options TabletPickerOptions,
ignoreTablets ...*topodatapb.TabletAlias,
) (*TabletPicker, error) {
// Keep inOrder parsing here for backward compatability until TabletPickerTabletOrder is fully adopted.
if tabletTypesStr == "" {
Expand Down Expand Up @@ -219,7 +222,7 @@ func NewTabletPicker(
}
}

return &TabletPicker{
tp := &TabletPicker{
ts: ts,
cells: dedupeCells(cells),
localCellInfo: localCellInfo{localCell: localCell, cellsInAlias: aliasCellMap},
Expand All @@ -228,7 +231,15 @@ func NewTabletPicker(
tabletTypes: tabletTypes,
inOrder: inOrder,
cellPref: cellPref,
}, nil
ignoreTablets: make(map[string]struct{}, len(ignoreTablets)),
}

for _, ignoreTablet := range ignoreTablets {
tp.ignoreTablets[ignoreTablet.String()] = struct{}{}
}

return tp, nil

}

// dedupeCells is used to remove duplicates in the cell list in case it is passed in
Expand Down Expand Up @@ -358,7 +369,9 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn
log.Errorf("Error getting shard %s/%s: %v", tp.keyspace, tp.shard, err)
return nil
}
aliases = append(aliases, si.PrimaryAlias)
if _, ignore := tp.ignoreTablets[si.PrimaryAlias.String()]; !ignore {
aliases = append(aliases, si.PrimaryAlias)
}
} else {
actualCells := make([]string, 0)
for _, cell := range tp.cells {
Expand Down Expand Up @@ -394,7 +407,9 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn
continue
}
for _, node := range sri.Nodes {
aliases = append(aliases, node.TabletAlias)
if _, ignore := tp.ignoreTablets[node.TabletAlias.String()]; !ignore {
aliases = append(aliases, node.TabletAlias)
}
}
}
}
Expand Down
23 changes: 23 additions & 0 deletions go/vt/discovery/tablet_picker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,29 @@ func TestPickUsingCellAsAlias(t *testing.T) {
}
}

func TestPickWithIgnoreList(t *testing.T) {
ctx := utils.LeakCheckContext(t)

te := newPickerTestEnv(t, ctx, []string{"cell1", "cell2"})

want := addTablet(ctx, te, 101, topodatapb.TabletType_REPLICA, "cell1", true, true)
defer deleteTablet(t, te, want)

dontWant := addTablet(ctx, te, 102, topodatapb.TabletType_REPLICA, "cell1", true, true)
defer deleteTablet(t, te, dontWant)

// Specify the alias as the cell.
tp, err := NewTabletPicker(ctx, te.topoServ, []string{"cella"}, "cell1", te.keyspace, te.shard, "replica", TabletPickerOptions{}, dontWant.GetAlias())
require.NoError(t, err)

// Try it many times to be sure we don't ever pick from the ignore list.
for i := 0; i < 100; i++ {
tablet, err := tp.PickForStreaming(ctx)
require.NoError(t, err)
require.False(t, proto.Equal(dontWant, tablet), "Picked the tablet we shouldn't have: %v", dontWant)
}
}

func TestPickUsingCellAliasOnlySpecified(t *testing.T) {
ctx := utils.LeakCheckContextTimeout(t, 200*time.Millisecond)

Expand Down
58 changes: 51 additions & 7 deletions go/vt/vtgate/vstream_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,17 @@ import (

"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/discovery"
querypb "vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/srvtopo"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vterrors"

"vitess.io/vitess/go/vt/log"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/srvtopo"
"vitess.io/vitess/go/vt/vterrors"
)

// vstreamManager manages vstream requests.
Expand All @@ -53,6 +53,10 @@ type vstreamManager struct {
// maxSkewTimeoutSeconds is the maximum allowed skew between two streams when the MinimizeSkew flag is set
const maxSkewTimeoutSeconds = 10 * 60

// tabletPickerContextTimeout is the timeout for the child context used to select candidate tablets
// for a vstream
const tabletPickerContextTimeout = 90 * time.Second

// vstream contains the metadata for one VStream request.
type vstream struct {
// mu protects parts of vgtid, the semantics of a send, and journaler.
Expand Down Expand Up @@ -124,6 +128,7 @@ type journalEvent struct {

func newVStreamManager(resolver *srvtopo.Resolver, serv srvtopo.Server, cell string) *vstreamManager {
exporter := servenv.NewExporter(cell, "VStreamManager")

return &vstreamManager{
resolver: resolver,
toposerv: serv,
Expand Down Expand Up @@ -473,6 +478,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
// journalDone is assigned a channel when a journal event is encountered.
// It will be closed when all journal events converge.
var journalDone chan struct{}
ignoreTablets := make([]*topodatapb.TabletAlias, 0)

errCount := 0
for {
Expand All @@ -490,12 +496,19 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
var eventss [][]*binlogdatapb.VEvent
var err error
cells := vs.getCells()
tp, err := discovery.NewTabletPicker(ctx, vs.ts, cells, vs.vsm.cell, sgtid.Keyspace, sgtid.Shard, vs.tabletType.String(), vs.tabletPickerOptions)

tp, err := discovery.NewTabletPicker(ctx, vs.ts, cells, vs.vsm.cell, sgtid.Keyspace, sgtid.Shard, vs.tabletType.String(), vs.tabletPickerOptions, ignoreTablets...)
if err != nil {
log.Errorf(err.Error())
return err
}
tablet, err := tp.PickForStreaming(ctx)

// Create a child context with a stricter timeout when picking a tablet.
// This will prevent hanging in the case no tablets are found.
tpCtx, tpCancel := context.WithTimeout(ctx, tabletPickerContextTimeout)
defer tpCancel()

tablet, err := tp.PickForStreaming(tpCtx)
if err != nil {
log.Errorf(err.Error())
return err
Expand Down Expand Up @@ -670,11 +683,18 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
// Unreachable.
err = vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "vstream ended unexpectedly")
}
if vterrors.Code(err) != vtrpcpb.Code_FAILED_PRECONDITION && vterrors.Code(err) != vtrpcpb.Code_UNAVAILABLE {

retry, ignoreTablet := vs.shouldRetry(err)
if !retry {
log.Errorf("vstream for %s/%s error: %v", sgtid.Keyspace, sgtid.Shard, err)
return err
}
if ignoreTablet {
ignoreTablets = append(ignoreTablets, tablet.GetAlias())
}

errCount++
// Retry, at most, 3 times if the error can be retried.
if errCount >= 3 {
log.Errorf("vstream for %s/%s had three consecutive failures: %v", sgtid.Keyspace, sgtid.Shard, err)
return err
Expand All @@ -683,6 +703,30 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
}
}

// shouldRetry determines whether we should exit immediately or retry the vstream.
// The first return value determines if the error can be retried, while the second
// indicates whether the tablet with which the error occurred should be ommitted
// from the candidate list of tablets to choose from on the retry.
//
// An error should be retried if it is expected to be transient.
// A tablet should be ignored upon retry if it's likely another tablet will not
// produce the same error.
func (vs *vstream) shouldRetry(err error) (bool, bool) {
errCode := vterrors.Code(err)

if errCode == vtrpcpb.Code_FAILED_PRECONDITION || errCode == vtrpcpb.Code_UNAVAILABLE {
return true, false
}

// If there is a GTIDSet Mismatch on the tablet, omit it from the candidate
// list in the TabletPicker on retry.
if errCode == vtrpcpb.Code_INVALID_ARGUMENT && strings.Contains(err.Error(), "GTIDSet Mismatch") {
return true, true
}

return false, false
}

// sendAll sends a group of events together while holding the lock.
func (vs *vstream) sendAll(ctx context.Context, sgtid *binlogdatapb.ShardGtid, eventss [][]*binlogdatapb.VEvent) error {
vs.mu.Lock()
Expand Down
Loading

0 comments on commit 0f21a0b

Please sign in to comment.