Skip to content

Commit

Permalink
Merge branch 'master' into easyjson
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot[bot] authored Jul 27, 2023
2 parents 67c0ea2 + 7ac9e6b commit 412bc41
Show file tree
Hide file tree
Showing 39 changed files with 494 additions and 162 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ static: install-tools
@ echo "gofmt ..."
@ gofmt -s -l -d $(PACKAGE_DIRECTORIES) 2>&1 | awk '{ print } END { if (NR > 0) { exit 1 } }'
@ echo "golangci-lint ..."
@ golangci-lint run --verbose $(PACKAGE_DIRECTORIES)
@ golangci-lint run --verbose $(PACKAGE_DIRECTORIES) --allow-parallel-runners
@ echo "revive ..."
@ revive -formatter friendly -config revive.toml $(PACKAGES)

Expand Down
2 changes: 1 addition & 1 deletion client/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ static: install-tools
@ echo "gofmt ..."
@ gofmt -s -l -d . 2>&1 | awk '{ print } END { if (NR > 0) { exit 1 } }'
@ echo "golangci-lint ..."
@ golangci-lint run -c ../.golangci.yml --verbose ./...
@ golangci-lint run -c ../.golangci.yml --verbose ./... --allow-parallel-runners
@ echo "revive ..."
@ revive -formatter friendly -config ../revive.toml ./...

Expand Down
3 changes: 2 additions & 1 deletion client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@ require (
github.com/opentracing/opentracing-go v1.2.0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/kvproto v0.0.0-20230713060620-89756bd21be1
github.com/pingcap/kvproto v0.0.0-20230720094213-a3b4a77b4333
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/prometheus/client_golang v1.11.1
github.com/stretchr/testify v1.8.2
go.uber.org/goleak v1.1.11
go.uber.org/zap v1.24.0
golang.org/x/exp v0.0.0-20230711005742-c3f37128e5a4
google.golang.org/grpc v1.54.0
)

Expand Down
6 changes: 4 additions & 2 deletions client/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTm
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/kvproto v0.0.0-20230713060620-89756bd21be1 h1:sC3XRNNBQNjFJGRtSzJRvqi2aDLFOsQoCHItr9rbbY8=
github.com/pingcap/kvproto v0.0.0-20230713060620-89756bd21be1/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc=
github.com/pingcap/kvproto v0.0.0-20230720094213-a3b4a77b4333 h1:A6Wqgq0uMw51UiRAH27TVN0QlzVR5CVtV6fTQSAmvKM=
github.com/pingcap/kvproto v0.0.0-20230720094213-a3b4a77b4333/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down Expand Up @@ -152,6 +152,8 @@ golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnf
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/exp v0.0.0-20230711005742-c3f37128e5a4 h1:QLureRX3moex6NVu/Lr4MGakp9FdA7sBHGBmvRW7NaM=
golang.org/x/exp v0.0.0-20230711005742-c3f37128e5a4/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
Expand Down
48 changes: 43 additions & 5 deletions client/resource_group/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"encoding/json"
"math"
"strings"
"sync"
"sync/atomic"
"time"
Expand All @@ -32,6 +33,7 @@ import (
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/errs"
"go.uber.org/zap"
"golang.org/x/exp/slices"
)

const (
Expand All @@ -56,8 +58,10 @@ const (
type ResourceGroupKVInterceptor interface {
// OnRequestWait is used to check whether resource group has enough tokens. It maybe needs to wait some time.
OnRequestWait(ctx context.Context, resourceGroupName string, info RequestInfo) (*rmpb.Consumption, *rmpb.Consumption, error)
// OnResponse is used to consume tokens after receiving response
// OnResponse is used to consume tokens after receiving response.
OnResponse(resourceGroupName string, req RequestInfo, resp ResponseInfo) (*rmpb.Consumption, error)
// IsBackgroundRequest If the resource group has background jobs, we should not record consumption and wait for it.
IsBackgroundRequest(ctx context.Context, resourceGroupName, requestResource string) bool
}

// ResourceGroupProvider provides some api to interact with resource manager server.
Expand Down Expand Up @@ -454,7 +458,6 @@ func (c *ResourceGroupsController) OnRequestWait(
) (*rmpb.Consumption, *rmpb.Consumption, error) {
gc, err := c.tryGetResourceGroup(ctx, resourceGroupName)
if err != nil {
failedRequestCounter.WithLabelValues(resourceGroupName).Inc()
return nil, nil, err
}
return gc.onRequestWait(ctx, info)
Expand All @@ -472,6 +475,41 @@ func (c *ResourceGroupsController) OnResponse(
return tmp.(*groupCostController).onResponse(req, resp)
}

// IsBackgroundRequest If the resource group has background jobs, we should not record consumption and wait for it.
func (c *ResourceGroupsController) IsBackgroundRequest(ctx context.Context,
resourceGroupName, requestResource string) bool {
gc, err := c.tryGetResourceGroup(ctx, resourceGroupName)
if err != nil {
failedRequestCounter.WithLabelValues(resourceGroupName).Inc()
return false
}

return c.checkBackgroundSettings(ctx, gc.getMeta().BackgroundSettings, requestResource)
}

func (c *ResourceGroupsController) checkBackgroundSettings(ctx context.Context, bg *rmpb.BackgroundSettings, requestResource string) bool {
// fallback to default resource group.
if bg == nil {
resourceGroupName := "default"
gc, err := c.tryGetResourceGroup(ctx, resourceGroupName)
if err != nil {
failedRequestCounter.WithLabelValues(resourceGroupName).Inc()
return false
}
bg = gc.getMeta().BackgroundSettings
}

if bg == nil || len(requestResource) == 0 || len(bg.JobTypes) == 0 {
return false
}

if idx := strings.LastIndex(requestResource, "_"); idx != -1 {
return slices.Contains(bg.JobTypes, requestResource[idx+1:])
}

return false
}

// GetResourceGroup returns the meta setting of the given resource group name.
func (c *ResourceGroupsController) GetResourceGroup(resourceGroupName string) (*rmpb.ResourceGroup, error) {
gc, err := c.tryGetResourceGroup(c.loopCtx, resourceGroupName)
Expand Down Expand Up @@ -518,7 +556,7 @@ type groupCostController struct {
lastRequestTime time.Time

// requestInProgress is set true when sending token bucket request.
// And it is set false when reciving token bucket response.
// And it is set false when receiving token bucket response.
// This triggers a retry attempt on the next tick.
requestInProgress bool

Expand Down Expand Up @@ -1045,8 +1083,8 @@ func (gc *groupCostController) collectRequestAndConsumption(selectTyp selectType
}

func (gc *groupCostController) getMeta() *rmpb.ResourceGroup {
gc.metaLock.Lock()
defer gc.metaLock.Unlock()
gc.metaLock.RLock()
defer gc.metaLock.RUnlock()
return gc.meta
}

Expand Down
3 changes: 3 additions & 0 deletions client/resource_group/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ func createTestGroupCostController(re *require.Assertions) *groupCostController
},
},
},
BackgroundSettings: &rmpb.BackgroundSettings{
JobTypes: []string{"lightning", "br"},
},
}
ch1 := make(chan struct{})
ch2 := make(chan *groupCostController)
Expand Down
2 changes: 1 addition & 1 deletion client/resource_group/controller/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"os"
"time"

"github.com/cloudfoundry/gosigar"
sigar "github.com/cloudfoundry/gosigar"
"go.uber.org/zap"

rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
Expand Down
4 changes: 2 additions & 2 deletions codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ flag_management:
carryforward: true
statuses:
- type: project
target: 85%
target: 74% # increase it if you want to enforce higher coverage for project, current setting as 74% is for do not let the error be reported and lose the meaning of warning.
- type: patch
target: 85%
target: 74% # increase it if you want to enforce higher coverage for project, current setting as 74% is for do not let the error be reported and lose the meaning of warning.

ignore:
- tests/** # integration test cases or tools.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ require (
github.com/pingcap/errcode v0.3.0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/kvproto v0.0.0-20230713060620-89756bd21be1
github.com/pingcap/kvproto v0.0.0-20230720094213-a3b4a77b4333
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21
github.com/pingcap/tidb-dashboard v0.0.0-20230705095454-5e220f970f27
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -424,8 +424,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20230713060620-89756bd21be1 h1:sC3XRNNBQNjFJGRtSzJRvqi2aDLFOsQoCHItr9rbbY8=
github.com/pingcap/kvproto v0.0.0-20230713060620-89756bd21be1/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc=
github.com/pingcap/kvproto v0.0.0-20230720094213-a3b4a77b4333 h1:A6Wqgq0uMw51UiRAH27TVN0QlzVR5CVtV6fTQSAmvKM=
github.com/pingcap/kvproto v0.0.0-20230720094213-a3b4a77b4333/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc=
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
Expand Down
18 changes: 16 additions & 2 deletions pkg/gc/safepoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,26 @@ import (
"math"
"time"

"github.com/pingcap/errors"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/syncutil"
"github.com/tikv/pd/server/config"
)

var blockGCSafePointErrmsg = "don't allow update gc safe point v1."
var blockServiceSafepointErrmsg = "don't allow update service safe point v1."

// SafePointManager is the manager for safePoint of GC and services.
type SafePointManager struct {
gcLock syncutil.Mutex
serviceGCLock syncutil.Mutex
store endpoint.GCSafePointStorage
cfg config.PDServerConfig
}

// NewSafePointManager creates a SafePointManager of GC and services.
func NewSafePointManager(store endpoint.GCSafePointStorage) *SafePointManager {
return &SafePointManager{store: store}
func NewSafePointManager(store endpoint.GCSafePointStorage, cfg config.PDServerConfig) *SafePointManager {
return &SafePointManager{store: store, cfg: cfg}
}

// LoadGCSafePoint loads current GC safe point from storage.
Expand All @@ -49,6 +55,11 @@ func (manager *SafePointManager) UpdateGCSafePoint(newSafePoint uint64) (oldSafe
if err != nil {
return
}
if manager.cfg.BlockSafePointV1 {
err = errors.Errorf(blockGCSafePointErrmsg)
return
}

if oldSafePoint >= newSafePoint {
return
}
Expand All @@ -58,6 +69,9 @@ func (manager *SafePointManager) UpdateGCSafePoint(newSafePoint uint64) (oldSafe

// UpdateServiceGCSafePoint update the safepoint for a specific service.
func (manager *SafePointManager) UpdateServiceGCSafePoint(serviceID string, newSafePoint uint64, ttl int64, now time.Time) (minServiceSafePoint *endpoint.ServiceSafePoint, updated bool, err error) {
if manager.cfg.BlockSafePointV1 {
return nil, false, errors.Errorf(blockServiceSafepointErrmsg)
}
manager.serviceGCLock.Lock()
defer manager.serviceGCLock.Unlock()
minServiceSafePoint, err = manager.store.LoadMinServiceGCSafePoint(now)
Expand Down
26 changes: 23 additions & 3 deletions pkg/gc/safepoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,15 @@ import (
"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/server/config"
)

func newGCStorage() endpoint.GCSafePointStorage {
return endpoint.NewStorageEndpoint(kv.NewMemoryKV(), nil)
}

func TestGCSafePointUpdateSequentially(t *testing.T) {
gcSafePointManager := NewSafePointManager(newGCStorage())
gcSafePointManager := NewSafePointManager(newGCStorage(), config.PDServerConfig{})
re := require.New(t)
curSafePoint := uint64(0)
// update gc safePoint with asc value.
Expand Down Expand Up @@ -59,7 +60,7 @@ func TestGCSafePointUpdateSequentially(t *testing.T) {
}

func TestGCSafePointUpdateCurrently(t *testing.T) {
gcSafePointManager := NewSafePointManager(newGCStorage())
gcSafePointManager := NewSafePointManager(newGCStorage(), config.PDServerConfig{})
maxSafePoint := uint64(1000)
wg := sync.WaitGroup{}
re := require.New(t)
Expand All @@ -83,7 +84,7 @@ func TestGCSafePointUpdateCurrently(t *testing.T) {

func TestServiceGCSafePointUpdate(t *testing.T) {
re := require.New(t)
manager := NewSafePointManager(newGCStorage())
manager := NewSafePointManager(newGCStorage(), config.PDServerConfig{})
gcworkerServiceID := "gc_worker"
cdcServiceID := "cdc"
brServiceID := "br"
Expand Down Expand Up @@ -162,3 +163,22 @@ func TestServiceGCSafePointUpdate(t *testing.T) {
re.NoError(err)
re.True(updated)
}

func TestBlockUpdateSafePointV1(t *testing.T) {
re := require.New(t)
manager := NewSafePointManager(newGCStorage(), config.PDServerConfig{BlockSafePointV1: true})
gcworkerServiceID := "gc_worker"
gcWorkerSafePoint := uint64(8)

min, updated, err := manager.UpdateServiceGCSafePoint(gcworkerServiceID, gcWorkerSafePoint, math.MaxInt64, time.Now())
re.Error(err, blockServiceSafepointErrmsg)
re.Equal(err.Error(), blockServiceSafepointErrmsg)
re.False(updated)
re.Nil(min)

oldSafePoint, err := manager.UpdateGCSafePoint(gcWorkerSafePoint)
re.Error(err)
re.Equal(err.Error(), blockGCSafePointErrmsg)

re.Equal(uint64(0), oldSafePoint)
}
7 changes: 6 additions & 1 deletion pkg/mcs/resourcemanager/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,10 +190,15 @@ func (s *Service) AcquireTokenBuckets(stream rmpb.ResourceManager_AcquireTokenBu
continue
}
// Send the consumption to update the metrics.
isBackground := req.GetIsBackground()
s.manager.consumptionDispatcher <- struct {
resourceGroupName string
*rmpb.Consumption
}{resourceGroupName, req.GetConsumptionSinceLastRequest()}
isBackground bool
}{resourceGroupName, req.GetConsumptionSinceLastRequest(), isBackground}
if isBackground {
continue
}
now := time.Now()
resp := &rmpb.TokenBucketResponse{
ResourceGroupName: rg.Name,
Expand Down
19 changes: 13 additions & 6 deletions pkg/mcs/resourcemanager/server/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type Manager struct {
consumptionDispatcher chan struct {
resourceGroupName string
*rmpb.Consumption
isBackground bool
}
// record update time of each resource group
consumptionRecord map[string]time.Time
Expand All @@ -76,6 +77,7 @@ func NewManager[T ResourceManagerConfigProvider](srv bs.Server) *Manager {
consumptionDispatcher: make(chan struct {
resourceGroupName string
*rmpb.Consumption
isBackground bool
}, defaultConsumptionChanSize),
consumptionRecord: make(map[string]time.Time),
}
Expand Down Expand Up @@ -300,15 +302,20 @@ func (m *Manager) backgroundMetricsFlush(ctx context.Context) {
if consumption == nil {
continue
}
backgroundType := ""
if consumptionInfo.isBackground {
backgroundType = backgroundTypeLabel
}

var (
name = consumptionInfo.resourceGroupName
rruMetrics = readRequestUnitCost.WithLabelValues(name)
wruMetrics = writeRequestUnitCost.WithLabelValues(name)
rruMetrics = readRequestUnitCost.WithLabelValues(name, backgroundType)
wruMetrics = writeRequestUnitCost.WithLabelValues(name, backgroundType)
sqlLayerRuMetrics = sqlLayerRequestUnitCost.WithLabelValues(name)
readByteMetrics = readByteCost.WithLabelValues(name)
writeByteMetrics = writeByteCost.WithLabelValues(name)
kvCPUMetrics = kvCPUCost.WithLabelValues(name)
sqlCPUMetrics = sqlCPUCost.WithLabelValues(name)
readByteMetrics = readByteCost.WithLabelValues(name, backgroundType)
writeByteMetrics = writeByteCost.WithLabelValues(name, backgroundType)
kvCPUMetrics = kvCPUCost.WithLabelValues(name, backgroundType)
sqlCPUMetrics = sqlCPUCost.WithLabelValues(name, backgroundType)
readRequestCountMetrics = requestCount.WithLabelValues(name, readTypeLabel)
writeRequestCountMetrics = requestCount.WithLabelValues(name, writeTypeLabel)
)
Expand Down
Loading

0 comments on commit 412bc41

Please sign in to comment.