Skip to content

Commit

Permalink
reduce more memory
Browse files Browse the repository at this point in the history
Signed-off-by: lhy1024 <[email protected]>
  • Loading branch information
lhy1024 committed Jul 26, 2023
1 parent c501fd8 commit 2b1ab88
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 143 deletions.
112 changes: 47 additions & 65 deletions server/api/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,50 +227,6 @@ func (s *RegionsInfo) Adjust() {
}
}

func (s *RegionsInfo) marshal(ctx context.Context) ([]byte, error) {
out := &jwriter.Writer{}
out.RawByte('{')

out.RawString("\"count\":")
out.Int(s.Count)

out.RawString(",\"regions\":")
if s.Regions == nil {
out.RawString("null")
} else {
out.RawByte('[')
for i, r := range s.Regions {
select {
case <-ctx.Done():
// Return early, avoid the unnecessary computation.
// See more details in https://github.com/tikv/pd/issues/6835
return nil, ctx.Err()
default:
}
if i > 0 {
out.RawByte(',')
}
// EasyJSON will not check anonymous struct pointer field and will panic if the field is nil.
// So we need to set the field to default value explicitly when the anonymous struct pointer is nil.
r.Leader.setDefaultIfNil()
for i := range r.Peers {
r.Peers[i].setDefaultIfNil()
}
for i := range r.PendingPeers {
r.PendingPeers[i].setDefaultIfNil()
}
for i := range r.DownPeers {
r.DownPeers[i].setDefaultIfNil()
}
r.MarshalEasyJSON(out)
}
out.RawByte(']')
}

out.RawByte('}')
return out.Buffer.BuildBytes(), out.Error
}

type regionHandler struct {
svr *server.Server
rd *render.Render
Expand Down Expand Up @@ -381,15 +337,48 @@ func newRegionsHandler(svr *server.Server, rd *render.Render) *regionsHandler {
}
}

func convertToAPIRegions(regions []*core.RegionInfo) *RegionsInfo {
regionInfos := make([]RegionInfo, len(regions))
// regionsToBytes converts regions to bytes, which is `RegionsInfo`'s json format.
// It is used to reduce the cost of json serialization.
func regionsToBytes(ctx context.Context, regions []*core.RegionInfo) ([]byte, error) {
out := &jwriter.Writer{}
out.RawByte('{')

out.RawString("\"count\":")
out.Int(len(regions))

out.RawString(",\"regions\":")
out.RawByte('[')
region := &RegionInfo{}
for i, r := range regions {
InitRegion(r, &regionInfos[i])
}
return &RegionsInfo{
Count: len(regions),
Regions: regionInfos,
select {
case <-ctx.Done():
// Return early, avoid the unnecessary computation.
// See more details in https://github.com/tikv/pd/issues/6835
return nil, ctx.Err()
default:
}
if i > 0 {
out.RawByte(',')
}
InitRegion(r, region)
// EasyJSON will not check anonymous struct pointer field and will panic if the field is nil.
// So we need to set the field to default value explicitly when the anonymous struct pointer is nil.
region.Leader.setDefaultIfNil()
for i := range region.Peers {
region.Peers[i].setDefaultIfNil()
}
for i := range region.PendingPeers {
region.PendingPeers[i].setDefaultIfNil()
}
for i := range region.DownPeers {
region.DownPeers[i].setDefaultIfNil()
}
region.MarshalEasyJSON(out)
}
out.RawByte(']')

out.RawByte('}')
return out.Buffer.BuildBytes(), out.Error
}

// @Tags region
Expand All @@ -400,8 +389,7 @@ func convertToAPIRegions(regions []*core.RegionInfo) *RegionsInfo {
func (h *regionsHandler) GetRegions(w http.ResponseWriter, r *http.Request) {
rc := getCluster(r)
regions := rc.GetRegions()
regionsInfo := convertToAPIRegions(regions)
b, err := regionsInfo.marshal(r.Context())
b, err := regionsToBytes(r.Context(), regions)
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
Expand Down Expand Up @@ -436,8 +424,7 @@ func (h *regionsHandler) ScanRegions(w http.ResponseWriter, r *http.Request) {
limit = maxRegionLimit
}
regions := rc.ScanRegions([]byte(startKey), []byte(endKey), limit)
regionsInfo := convertToAPIRegions(regions)
b, err := regionsInfo.marshal(r.Context())
b, err := regionsToBytes(r.Context(), regions)
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
Expand Down Expand Up @@ -473,8 +460,7 @@ func (h *regionsHandler) GetStoreRegions(w http.ResponseWriter, r *http.Request)
return
}
regions := rc.GetStoreRegions(uint64(id))
regionsInfo := convertToAPIRegions(regions)
b, err := regionsInfo.marshal(r.Context())
b, err := regionsToBytes(r.Context(), regions)
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
Expand Down Expand Up @@ -528,8 +514,7 @@ func (h *regionsHandler) GetKeyspaceRegions(w http.ResponseWriter, r *http.Reque
txnRegion := rc.ScanRegions(regionBound.TxnLeftBound, regionBound.TxnRightBound, limit-len(regions))
regions = append(regions, txnRegion...)
}
regionsInfo := convertToAPIRegions(regions)
b, err := regionsInfo.marshal(r.Context())
b, err := regionsToBytes(r.Context(), regions)
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
Expand Down Expand Up @@ -558,8 +543,7 @@ func (h *regionsHandler) getRegionsByType(
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
regionsInfo := convertToAPIRegions(regions)
b, err := regionsInfo.marshal(r.Context())
b, err := regionsToBytes(r.Context(), regions)
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
Expand Down Expand Up @@ -784,8 +768,7 @@ func (h *regionsHandler) GetRegionSiblings(w http.ResponseWriter, r *http.Reques
}

left, right := rc.GetAdjacentRegions(region)
regionsInfo := convertToAPIRegions([]*core.RegionInfo{left, right})
b, err := regionsInfo.marshal(r.Context())
b, err := regionsToBytes(r.Context(), []*core.RegionInfo{left, right})
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
Expand Down Expand Up @@ -1008,8 +991,7 @@ func (h *regionsHandler) GetTopNRegions(w http.ResponseWriter, r *http.Request,
limit = maxRegionLimit
}
regions := TopNRegions(rc.GetRegions(), less, limit)
regionsInfo := convertToAPIRegions(regions)
b, err := regionsInfo.marshal(r.Context())
b, err := regionsToBytes(r.Context(), regions)
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
Expand Down
154 changes: 76 additions & 78 deletions server/api/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"net/url"
"sort"
"testing"
"time"

"github.com/docker/go-units"
"github.com/pingcap/failpoint"
Expand Down Expand Up @@ -454,6 +455,40 @@ func (suite *regionTestSuite) TestTopN() {
}
}

func TestRegionsWithKillRequest(t *testing.T) {
re := require.New(t)
svr, cleanup := mustNewServer(re)
defer cleanup()
server.MustWaitLeader(re, []*server.Server{svr})

addr := svr.GetAddr()
url := fmt.Sprintf("%s%s/api/v1/regions", addr, apiPrefix)
mustBootstrapCluster(re, svr)
regionCount := 100000
for i := 0; i < regionCount; i++ {
r := core.NewTestRegionInfo(uint64(i+2), 1,
[]byte(fmt.Sprintf("%09d", i)),
[]byte(fmt.Sprintf("%09d", i+1)),
core.SetApproximateKeys(10), core.SetApproximateSize(10))
mustRegionHeartbeat(re, svr, r)
}

ctx, cancel := context.WithCancel(context.Background())
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, bytes.NewBuffer(nil))
re.NoError(err)
respCh := make(chan *http.Response)
go func() {
resp, err := testDialClient.Do(req) // nolint:bodyclose
re.Error(err)
re.Contains(err.Error(), "context canceled")
respCh <- resp
}()
time.Sleep(100 * time.Millisecond) // wait for the request to be sent
cancel() // close the request
resp := <-respCh
re.Nil(resp)
}

type getRegionTestSuite struct {
suite.Suite
svr *server.Server
Expand Down Expand Up @@ -738,95 +773,58 @@ func (suite *regionsReplicatedTestSuite) TestCheckRegionsReplicated() {

func TestRegionsInfoMarshal(t *testing.T) {
re := require.New(t)
cases := []*RegionsInfo{
regionWithNilPeer := core.NewRegionInfo(&metapb.Region{Id: 1}, &metapb.Peer{Id: 1})
core.SetPeers([]*metapb.Peer{{Id: 2}, nil})(regionWithNilPeer)
cases := [][]*core.RegionInfo{
{},
{
Count: 0,
// leader is nil
core.NewRegionInfo(&metapb.Region{Id: 1}, nil),
// Peers is empty
core.NewRegionInfo(&metapb.Region{Id: 1}, &metapb.Peer{Id: 1},
core.SetPeers([]*metapb.Peer{})),
// There is nil peer in peers.
regionWithNilPeer,
},
{
Count: 0,
Regions: []RegionInfo{},
// PendingPeers is empty
core.NewRegionInfo(&metapb.Region{Id: 1}, &metapb.Peer{Id: 1},
core.WithPendingPeers([]*metapb.Peer{})),
// There is nil peer in peers.
core.NewRegionInfo(&metapb.Region{Id: 1}, &metapb.Peer{Id: 1},
core.WithPendingPeers([]*metapb.Peer{nil})),
},
{
Count: 1,
Regions: []RegionInfo{
{
ID: 1,
Leader: MetaPeer{},
},
},
// DownPeers is empty
core.NewRegionInfo(&metapb.Region{Id: 1}, &metapb.Peer{Id: 1},
core.WithDownPeers([]*pdpb.PeerStats{})),
// There is nil peer in peers.
core.NewRegionInfo(&metapb.Region{Id: 1}, &metapb.Peer{Id: 1},
core.WithDownPeers([]*pdpb.PeerStats{{Peer: nil}})),
},
{
Count: 2,
Regions: []RegionInfo{
{
ID: 1,
Peers: []MetaPeer{},
PendingPeers: []MetaPeer{},
DownPeers: []PDPeerStats{},
},
{
ID: 2,
Peers: []MetaPeer{{}},
PendingPeers: []MetaPeer{{}},
DownPeers: []PDPeerStats{{}},
},
},
// Buckets is nil
core.NewRegionInfo(&metapb.Region{Id: 1}, &metapb.Peer{Id: 1},
core.SetBuckets(nil)),
// Buckets is empty
core.NewRegionInfo(&metapb.Region{Id: 1}, &metapb.Peer{Id: 1},
core.SetBuckets(&metapb.Buckets{})),
},
{
core.NewRegionInfo(&metapb.Region{Id: 1, StartKey: []byte{}, EndKey: []byte{},
RegionEpoch: &metapb.RegionEpoch{Version: 1, ConfVer: 1}},
&metapb.Peer{Id: 1}, core.SetCPUUsage(10),
core.SetApproximateKeys(10), core.SetApproximateSize(10),
core.SetWrittenBytes(10), core.SetReadBytes(10),
core.SetReadKeys(10), core.SetWrittenKeys(10)),
},
}
regionsInfo := &RegionsInfo{}
for _, regions := range cases {
_, err := regions.marshal(context.Background())
b, err := regionsToBytes(context.Background(), regions)
re.NoError(err)
err = json.Unmarshal(b, regionsInfo)
re.NoError(err)
}
}

// Create n regions (0..n) of n stores (0..n).
// Each region contains np peers, the first peer is the leader.
// (copied from server/cluster_test.go)
func newTestRegions() []*core.RegionInfo {
n := uint64(10000)
np := uint64(3)

regions := make([]*core.RegionInfo, 0, n)
for i := uint64(0); i < n; i++ {
peers := make([]*metapb.Peer, 0, np)
for j := uint64(0); j < np; j++ {
peer := &metapb.Peer{
Id: i*np + j,
}
peer.StoreId = (i + j) % n
peers = append(peers, peer)
}
region := &metapb.Region{
Id: i,
Peers: peers,
StartKey: []byte(fmt.Sprintf("%d", i)),
EndKey: []byte(fmt.Sprintf("%d", i+1)),
RegionEpoch: &metapb.RegionEpoch{ConfVer: 2, Version: 2},
}
regions = append(regions, core.NewRegionInfo(region, peers[0]))
}
return regions
}

func BenchmarkRenderJSON(b *testing.B) {
regionInfos := newTestRegions()
rd := createStreamingRender()
regions := convertToAPIRegions(regionInfos)

b.ResetTimer()
for i := 0; i < b.N; i++ {
var buffer bytes.Buffer
rd.JSON(&buffer, 200, regions)
}
}

func BenchmarkConvertToAPIRegions(b *testing.B) {
regionInfos := newTestRegions()

b.ResetTimer()
for i := 0; i < b.N; i++ {
regions := convertToAPIRegions(regionInfos)
_ = regions.Count
}
}

Expand Down

0 comments on commit 2b1ab88

Please sign in to comment.