-
Notifications
You must be signed in to change notification settings - Fork 721
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
client/http: implement more HTTP APIs #7371
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,12 +15,14 @@ | |
package http | ||
|
||
import ( | ||
"bytes" | ||
"context" | ||
"crypto/tls" | ||
"encoding/json" | ||
"fmt" | ||
"io" | ||
"net/http" | ||
"net/url" | ||
"strings" | ||
"time" | ||
|
||
|
@@ -43,12 +45,17 @@ | |
GetRegionByID(context.Context, uint64) (*RegionInfo, error) | ||
GetRegionByKey(context.Context, []byte) (*RegionInfo, error) | ||
GetRegions(context.Context) (*RegionsInfo, error) | ||
GetRegionsByKey(context.Context, []byte, []byte, int) (*RegionsInfo, error) | ||
GetRegionsByKeyRange(context.Context, []byte, []byte, int) (*RegionsInfo, error) | ||
GetRegionsByStoreID(context.Context, uint64) (*RegionsInfo, error) | ||
GetHotReadRegions(context.Context) (*StoreHotPeersInfos, error) | ||
GetHotWriteRegions(context.Context) (*StoreHotPeersInfos, error) | ||
GetRegionStatusByKeyRange(context.Context, []byte, []byte) (*RegionStats, error) | ||
GetStores(context.Context) (*StoresInfo, error) | ||
GetPlacementRulesByGroup(context.Context, string) ([]*Rule, error) | ||
SetPlacementRule(context.Context, *Rule) error | ||
DeletePlacementRule(context.Context, string, string) error | ||
GetMinResolvedTSByStoresIDs(context.Context, []uint64) (uint64, map[uint64]uint64, error) | ||
AccelerateSchedule(context.Context, []byte, []byte) error | ||
Close() | ||
} | ||
|
||
|
@@ -154,16 +161,16 @@ | |
// it consistent with the current implementation of some clients (e.g. TiDB). | ||
func (c *client) requestWithRetry( | ||
ctx context.Context, | ||
name, uri string, | ||
res interface{}, | ||
name, uri, method string, | ||
body io.Reader, res interface{}, | ||
) error { | ||
var ( | ||
err error | ||
addr string | ||
) | ||
for idx := 0; idx < len(c.pdAddrs); idx++ { | ||
addr = c.pdAddrs[idx] | ||
err = c.request(ctx, name, addr, uri, res) | ||
err = c.request(ctx, name, fmt.Sprintf("%s%s", addr, uri), method, body, res) | ||
if err == nil { | ||
break | ||
} | ||
|
@@ -175,16 +182,15 @@ | |
|
||
func (c *client) request( | ||
ctx context.Context, | ||
name, addr, uri string, | ||
res interface{}, | ||
name, url, method string, | ||
body io.Reader, res interface{}, | ||
) error { | ||
reqURL := fmt.Sprintf("%s%s", addr, uri) | ||
logFields := []zap.Field{ | ||
zap.String("name", name), | ||
zap.String("url", reqURL), | ||
zap.String("url", url), | ||
} | ||
log.Debug("[pd] request the http url", logFields...) | ||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, reqURL, nil) | ||
req, err := http.NewRequestWithContext(ctx, method, url, body) | ||
if err != nil { | ||
log.Error("[pd] create http request failed", append(logFields, zap.Error(err))...) | ||
return errors.Trace(err) | ||
|
@@ -219,6 +225,10 @@ | |
return errors.Errorf("request pd http api failed with status: '%s'", resp.Status) | ||
} | ||
|
||
if res == nil { | ||
return nil | ||
} | ||
|
||
err = json.NewDecoder(resp.Body).Decode(res) | ||
if err != nil { | ||
return errors.Trace(err) | ||
|
@@ -229,7 +239,9 @@ | |
// GetRegionByID gets the region info by ID. | ||
func (c *client) GetRegionByID(ctx context.Context, regionID uint64) (*RegionInfo, error) { | ||
var region RegionInfo | ||
err := c.requestWithRetry(ctx, "GetRegionByID", RegionByID(regionID), ®ion) | ||
err := c.requestWithRetry(ctx, | ||
"GetRegionByID", RegionByID(regionID), | ||
http.MethodGet, nil, ®ion) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
@@ -239,7 +251,9 @@ | |
// GetRegionByKey gets the region info by key. | ||
func (c *client) GetRegionByKey(ctx context.Context, key []byte) (*RegionInfo, error) { | ||
var region RegionInfo | ||
err := c.requestWithRetry(ctx, "GetRegionByKey", RegionByKey(key), ®ion) | ||
err := c.requestWithRetry(ctx, | ||
"GetRegionByKey", RegionByKey(key), | ||
http.MethodGet, nil, ®ion) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
@@ -249,17 +263,21 @@ | |
// GetRegions gets the regions info. | ||
func (c *client) GetRegions(ctx context.Context) (*RegionsInfo, error) { | ||
var regions RegionsInfo | ||
err := c.requestWithRetry(ctx, "GetRegions", Regions, ®ions) | ||
err := c.requestWithRetry(ctx, | ||
"GetRegions", Regions, | ||
http.MethodGet, nil, ®ions) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return ®ions, nil | ||
} | ||
|
||
// GetRegionsByKey gets the regions info by key range. If the limit is -1, it will return all regions within the range. | ||
func (c *client) GetRegionsByKey(ctx context.Context, startKey, endKey []byte, limit int) (*RegionsInfo, error) { | ||
// GetRegionsByKeyRange gets the regions info by key range. If the limit is -1, it will return all regions within the range. | ||
func (c *client) GetRegionsByKeyRange(ctx context.Context, startKey, endKey []byte, limit int) (*RegionsInfo, error) { | ||
var regions RegionsInfo | ||
err := c.requestWithRetry(ctx, "GetRegionsByKey", RegionsByKey(startKey, endKey, limit), ®ions) | ||
err := c.requestWithRetry(ctx, | ||
"GetRegionsByKeyRange", RegionsByKey(startKey, endKey, limit), | ||
http.MethodGet, nil, ®ions) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
@@ -269,7 +287,9 @@ | |
// GetRegionsByStoreID gets the regions info by store ID. | ||
func (c *client) GetRegionsByStoreID(ctx context.Context, storeID uint64) (*RegionsInfo, error) { | ||
var regions RegionsInfo | ||
err := c.requestWithRetry(ctx, "GetRegionsByStoreID", RegionsByStoreID(storeID), ®ions) | ||
err := c.requestWithRetry(ctx, | ||
"GetRegionsByStoreID", RegionsByStoreID(storeID), | ||
http.MethodGet, nil, ®ions) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
@@ -279,7 +299,9 @@ | |
// GetHotReadRegions gets the hot read region statistics info. | ||
func (c *client) GetHotReadRegions(ctx context.Context) (*StoreHotPeersInfos, error) { | ||
var hotReadRegions StoreHotPeersInfos | ||
err := c.requestWithRetry(ctx, "GetHotReadRegions", HotRead, &hotReadRegions) | ||
err := c.requestWithRetry(ctx, | ||
"GetHotReadRegions", HotRead, | ||
http.MethodGet, nil, &hotReadRegions) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
@@ -289,23 +311,70 @@ | |
// GetHotWriteRegions gets the hot write region statistics info. | ||
func (c *client) GetHotWriteRegions(ctx context.Context) (*StoreHotPeersInfos, error) { | ||
var hotWriteRegions StoreHotPeersInfos | ||
err := c.requestWithRetry(ctx, "GetHotWriteRegions", HotWrite, &hotWriteRegions) | ||
err := c.requestWithRetry(ctx, | ||
"GetHotWriteRegions", HotWrite, | ||
http.MethodGet, nil, &hotWriteRegions) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return &hotWriteRegions, nil | ||
} | ||
|
||
// GetRegionStatusByKeyRange gets the region status by key range. | ||
func (c *client) GetRegionStatusByKeyRange(ctx context.Context, startKey, endKey []byte) (*RegionStats, error) { | ||
var regionStats RegionStats | ||
err := c.requestWithRetry(ctx, | ||
"GetRegionStatusByKeyRange", RegionStatsByKeyRange(startKey, endKey), | ||
http.MethodGet, nil, ®ionStats, | ||
) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return ®ionStats, nil | ||
} | ||
|
||
// GetStores gets the stores info. | ||
func (c *client) GetStores(ctx context.Context) (*StoresInfo, error) { | ||
var stores StoresInfo | ||
err := c.requestWithRetry(ctx, "GetStores", Stores, &stores) | ||
err := c.requestWithRetry(ctx, | ||
"GetStores", Stores, | ||
http.MethodGet, nil, &stores) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return &stores, nil | ||
} | ||
|
||
// GetPlacementRulesByGroup gets the placement rules by group. | ||
func (c *client) GetPlacementRulesByGroup(ctx context.Context, group string) ([]*Rule, error) { | ||
var rules []*Rule | ||
err := c.requestWithRetry(ctx, | ||
"GetPlacementRulesByGroup", PlacementRulesByGroup(group), | ||
http.MethodGet, nil, &rules) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return rules, nil | ||
} | ||
|
||
// SetPlacementRule sets the placement rule. | ||
func (c *client) SetPlacementRule(ctx context.Context, rule *Rule) error { | ||
ruleJSON, err := json.Marshal(rule) | ||
if err != nil { | ||
return errors.Trace(err) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not sure why needs There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. One is to be consistent with the original HTTP call code of TiDB, and the other is that because it is a general library, adding trace information can help us better locate where the call is wrong I think. |
||
} | ||
return c.requestWithRetry(ctx, | ||
"SetPlacementRule", PlacementRule, | ||
http.MethodPost, bytes.NewBuffer(ruleJSON), nil) | ||
} | ||
|
||
// DeletePlacementRule deletes the placement rule. | ||
func (c *client) DeletePlacementRule(ctx context.Context, group, id string) error { | ||
return c.requestWithRetry(ctx, | ||
"DeletePlacementRule", PlacementRuleByGroupAndID(group, id), | ||
http.MethodDelete, nil, nil) | ||
} | ||
|
||
// GetMinResolvedTSByStoresIDs get min-resolved-ts by stores IDs. | ||
func (c *client) GetMinResolvedTSByStoresIDs(ctx context.Context, storeIDs []uint64) (uint64, map[uint64]uint64, error) { | ||
uri := MinResolvedTSPrefix | ||
|
@@ -326,7 +395,9 @@ | |
IsRealTime bool `json:"is_real_time,omitempty"` | ||
StoresMinResolvedTS map[uint64]uint64 `json:"stores_min_resolved_ts"` | ||
}{} | ||
err := c.requestWithRetry(ctx, "GetMinResolvedTSByStoresIDs", uri, &resp) | ||
err := c.requestWithRetry(ctx, | ||
"GetMinResolvedTSByStoresIDs", uri, | ||
http.MethodGet, nil, &resp) | ||
if err != nil { | ||
return 0, nil, err | ||
} | ||
|
@@ -335,3 +406,18 @@ | |
} | ||
return resp.MinResolvedTS, resp.StoresMinResolvedTS, nil | ||
} | ||
|
||
// AccelerateSchedule accelerates the scheduling of the regions within the given key range. | ||
func (c *client) AccelerateSchedule(ctx context.Context, startKey, endKey []byte) error { | ||
JmPotato marked this conversation as resolved.
Show resolved
Hide resolved
|
||
input := map[string]string{ | ||
"start_key": url.QueryEscape(string(startKey)), | ||
"end_key": url.QueryEscape(string(endKey)), | ||
} | ||
inputJSON, err := json.Marshal(input) | ||
if err != nil { | ||
return errors.Trace(err) | ||
} | ||
return c.requestWithRetry(ctx, | ||
"AccelerateSchedule", accelerateSchedule, | ||
http.MethodPost, bytes.NewBuffer(inputJSON), nil) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to customize the retry?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will provide the function of custom backoffer in the future, but at present, the interface we are about to replace in TiDB uses this simple retry method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still prefer to leave it to the caller even if the retry method is simple.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense to me though I'd like to implement it in an independent PR later.