Skip to content

Commit

Permalink
enhance: L0 compaction performance by Bypass serialize
Browse files Browse the repository at this point in the history
Signed-off-by: yangxuan <[email protected]>
  • Loading branch information
XuanYang-cn committed Jun 24, 2024
1 parent d08cb88 commit dc4e3a6
Show file tree
Hide file tree
Showing 6 changed files with 296 additions and 84 deletions.
40 changes: 31 additions & 9 deletions internal/datanode/compaction/l0_compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"math"
"sync"
"time"

"github.com/cockroachdb/errors"
"github.com/samber/lo"
Expand Down Expand Up @@ -201,11 +202,11 @@ func (t *LevelZeroCompactionTask) serializeUpload(ctx context.Context, segmentWr

blobKey, _ := binlog.BuildLogPath(storage.DeleteBinlog, writer.collectionID, writer.partitionID, writer.segmentID, -1, logID)

allBlobs[blobKey] = blob.GetValue()
allBlobs[blobKey] = blob
deltalog := &datapb.Binlog{
EntriesNum: writer.GetRowNum(),
LogSize: int64(len(blob.GetValue())),
MemorySize: blob.GetMemorySize(),
LogSize: int64(len(blob)),
MemorySize: writer.GetMemorySize(),
LogPath: blobKey,
LogID: logID,
TimestampFrom: tr.GetMinTimestamp(),
Expand Down Expand Up @@ -235,7 +236,7 @@ func (t *LevelZeroCompactionTask) splitDelta(
ctx context.Context,
allDelta *storage.DeleteData,
segmentBfs map[int64]*metacache.BloomFilterSet,
) map[int64]*SegmentDeltaWriter {
) (map[int64]*SegmentDeltaWriter, error) {
traceCtx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "L0Compact splitDelta")
defer span.End()

Expand All @@ -247,11 +248,16 @@ func (t *LevelZeroCompactionTask) splitDelta(

retMap := t.applyBFInParallel(traceCtx, allDelta, io.GetBFApplyPool(), segmentBfs)

var err error
targetSegBuffer := make(map[int64]*SegmentDeltaWriter)
retMap.Range(func(key int, value *BatchApplyRet) bool {
startIdx := value.StartIdx
pk2SegmentIDs := value.Segment2Hits

pks := allDelta.Pks
tss := allDelta.Tss
serialized := allDelta.Serialized

for segmentID, hits := range pk2SegmentIDs {
for i, hit := range hits {
if hit {
Expand All @@ -261,13 +267,20 @@ func (t *LevelZeroCompactionTask) splitDelta(
writer = NewSegmentDeltaWriter(segmentID, segment.GetPartitionID(), segment.GetCollectionID())
targetSegBuffer[segmentID] = writer
}
writer.Write(allDelta.Pks[startIdx+i], allDelta.Tss[startIdx+i])
err = writer.WriteSerialized(serialized[startIdx+i], pks[startIdx+i], tss[startIdx+i])
if err != nil {
return false
}
}
}
}
return true
})
return targetSegBuffer
if err != nil {
return nil, err
}

return targetSegBuffer, nil
}

type BatchApplyRet = struct {
Expand Down Expand Up @@ -338,18 +351,25 @@ func (t *LevelZeroCompactionTask) process(ctx context.Context, batchSize int, ta
}

for i := 0; i < batch; i++ {
batchStart := time.Now()
left, right := i*batchSize, (i+1)*batchSize
if right >= len(targetSegments) {
right = len(targetSegments)
}
batchSegments := targetSegments[left:right]

segmentBFs, err := t.loadBF(ctx, batchSegments)
if err != nil {
log.Warn("L0 compaction loadBF fail", zap.Error(err))
return nil, err
}

batchSegWriter := t.splitDelta(ctx, allDelta, segmentBFs)
batchSegWriter, err := t.splitDelta(ctx, allDelta, segmentBFs)
if err != nil {
log.Warn("L0 compaction splitDelta fail", zap.Error(err))
return nil, err
}

batchResults, err := t.serializeUpload(ctx, batchSegWriter)
if err != nil {
log.Warn("L0 compaction serialize upload fail", zap.Error(err))
Expand All @@ -358,8 +378,10 @@ func (t *LevelZeroCompactionTask) process(ctx context.Context, batchSize int, ta

log.Info("L0 compaction finished one batch",
zap.Int("batch no.", i),
zap.Int("batch segment count", len(batchResults)),
zap.Int("total deltaRowCount", int(allDelta.RowCount)),
zap.Int("batch segment count", len(batchResults)))
zap.Duration("batch elapse", time.Since(batchStart)),
)
results = append(results, batchResults...)
}

Expand All @@ -379,7 +401,7 @@ func (t *LevelZeroCompactionTask) loadDelta(ctx context.Context, deltaLogs []str
for _, blob := range blobBytes {
blobs = append(blobs, &storage.Blob{Value: blob})
}
_, _, dData, err := storage.NewDeleteCodec().Deserialize(blobs)
_, _, dData, err := storage.NewDeleteCodec().DeserializeWithSerialized(blobs)
if err != nil {
return nil, err
}
Expand Down
38 changes: 27 additions & 11 deletions internal/datanode/compaction/l0_compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,18 @@ func (s *LevelZeroCompactionTaskSuite) SetupTest() {
3: 20002,
}

s.dData = storage.NewDeleteData([]storage.PrimaryKey{}, []typeutil.Timestamp{})
dData := storage.NewEmptyDeleteData()
for pk, ts := range pk2ts {
s.dData.Append(storage.NewInt64PrimaryKey(pk), ts)
dData.Append(storage.NewInt64PrimaryKey(pk), ts)
}

dataCodec := storage.NewDeleteCodec()
blob, err := dataCodec.Serialize(0, 0, 0, s.dData)
blob, err := storage.NewDeleteCodec().Serialize(0, 0, 0, dData)
s.Require().NoError(err)
s.dBlob = blob.GetValue()

_, _, serializedData, err := storage.NewDeleteCodec().DeserializeWithSerialized([]*storage.Blob{{Value: s.dBlob}})
s.Require().NoError(err)
s.dData = serializedData
}

func (s *LevelZeroCompactionTaskSuite) TestProcessLoadDeltaFail() {
Expand Down Expand Up @@ -423,7 +426,10 @@ func (s *LevelZeroCompactionTaskSuite) TestSerializeUpload() {
s.task.allocator = mockAlloc

writer := NewSegmentDeltaWriter(100, 10, 1)
writer.WriteBatch(s.dData.Pks, s.dData.Tss)

for i := range s.dData.Pks {
writer.WriteSerialized(s.dData.Serialized[i], s.dData.Pks[i], s.dData.Tss[i])
}
writers := map[int64]*SegmentDeltaWriter{100: writer}

result, err := s.task.serializeUpload(ctx, writers)
Expand All @@ -436,7 +442,9 @@ func (s *LevelZeroCompactionTaskSuite) TestSerializeUpload() {
s.task.plan = plan
s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(errors.New("mock upload failed"))
writer := NewSegmentDeltaWriter(100, 10, 1)
writer.WriteBatch(s.dData.Pks, s.dData.Tss)
for i := range s.dData.Pks {
writer.WriteSerialized(s.dData.Serialized[i], s.dData.Pks[i], s.dData.Tss[i])
}
writers := map[int64]*SegmentDeltaWriter{100: writer}
s.mockAlloc.EXPECT().AllocOne().Return(19530, nil)

Expand All @@ -452,7 +460,9 @@ func (s *LevelZeroCompactionTaskSuite) TestSerializeUpload() {

s.mockAlloc.EXPECT().AllocOne().Return(19530, nil)
writer := NewSegmentDeltaWriter(100, 10, 1)
writer.WriteBatch(s.dData.Pks, s.dData.Tss)
for i := range s.dData.Pks {
writer.WriteSerialized(s.dData.Serialized[i], s.dData.Pks[i], s.dData.Tss[i])
}
writers := map[int64]*SegmentDeltaWriter{100: writer}

results, err := s.task.serializeUpload(ctx, writers)
Expand Down Expand Up @@ -480,17 +490,23 @@ func (s *LevelZeroCompactionTaskSuite) TestSplitDelta() {
101: bfs2,
102: bfs3,
}
deltaWriters := s.task.splitDelta(context.TODO(), s.dData, segmentBFs)
deltaWriters, err := s.task.splitDelta(context.TODO(), s.dData, segmentBFs)
s.NoError(err)

s.NotEmpty(deltaWriters)
s.ElementsMatch(predicted, lo.Keys(deltaWriters))
s.EqualValues(2, deltaWriters[100].GetRowNum())
s.EqualValues(1, deltaWriters[101].GetRowNum())
s.EqualValues(1, deltaWriters[102].GetRowNum())

s.ElementsMatch([]storage.PrimaryKey{storage.NewInt64PrimaryKey(1), storage.NewInt64PrimaryKey(3)}, deltaWriters[100].deleteData.Pks)
s.Equal(storage.NewInt64PrimaryKey(3), deltaWriters[101].deleteData.Pks[0])
s.Equal(storage.NewInt64PrimaryKey(3), deltaWriters[102].deleteData.Pks[0])
for segID, writer := range deltaWriters {
gotBytes, _, err := writer.Finish()
s.NoError(err)

_, _, gotData, err := storage.NewDeleteCodec().Deserialize([]*storage.Blob{{Value: gotBytes}})
s.NoError(err)
s.ElementsMatch(expectedSegPK[segID], lo.Map(gotData.Pks, func(pk storage.PrimaryKey, _ int) int64 { return pk.(*storage.Int64PrimaryKey).Value }))
}
}

func (s *LevelZeroCompactionTaskSuite) TestLoadDelta() {
Expand Down
38 changes: 20 additions & 18 deletions internal/datanode/compaction/segment_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,27 @@ import (

func NewSegmentDeltaWriter(segmentID, partitionID, collectionID int64) *SegmentDeltaWriter {
return &SegmentDeltaWriter{
deleteData: &storage.DeleteData{},
segmentID: segmentID,
partitionID: partitionID,
collectionID: collectionID,
tsFrom: math.MaxUint64,
tsTo: 0,

writer: storage.NewDeleteSerializedWriter(collectionID, partitionID, segmentID),
}
}

type SegmentDeltaWriter struct {
deleteData *storage.DeleteData
segmentID int64
partitionID int64
collectionID int64

tsFrom typeutil.Timestamp
tsTo typeutil.Timestamp
writer *storage.DeleteSerializedWriter

rowCount int
memSize int64
tsFrom typeutil.Timestamp
tsTo typeutil.Timestamp
}

func (w *SegmentDeltaWriter) GetCollectionID() int64 {
Expand All @@ -53,7 +57,11 @@ func (w *SegmentDeltaWriter) GetSegmentID() int64 {
}

func (w *SegmentDeltaWriter) GetRowNum() int64 {
return w.deleteData.RowCount
return int64(w.rowCount)
}

func (w *SegmentDeltaWriter) GetMemorySize() int64 {
return w.memSize
}

func (w *SegmentDeltaWriter) GetTimeRange() *writebuffer.TimeRange {
Expand All @@ -69,25 +77,19 @@ func (w *SegmentDeltaWriter) updateRange(ts typeutil.Timestamp) {
}
}

func (w *SegmentDeltaWriter) Write(pk storage.PrimaryKey, ts typeutil.Timestamp) {
w.deleteData.Append(pk, ts)
func (w *SegmentDeltaWriter) WriteSerialized(serializedRow string, pk storage.PrimaryKey, ts typeutil.Timestamp) error {
w.updateRange(ts)
w.memSize += pk.Size() + int64(8)
w.rowCount += 1
return w.writer.Write(serializedRow)
}

func (w *SegmentDeltaWriter) WriteBatch(pks []storage.PrimaryKey, tss []typeutil.Timestamp) {
w.deleteData.AppendBatch(pks, tss)

for _, ts := range tss {
w.updateRange(ts)
}
}

func (w *SegmentDeltaWriter) Finish() (*storage.Blob, *writebuffer.TimeRange, error) {
blob, err := storage.NewDeleteCodec().Serialize(w.collectionID, w.partitionID, w.segmentID, w.deleteData)
// Finish returns serialized bytes and timestamp range of delete data
func (w *SegmentDeltaWriter) Finish() ([]byte, *writebuffer.TimeRange, error) {
blob, err := w.writer.Finish(w.tsFrom, w.tsTo)
if err != nil {
return nil, nil, err
}

return blob, w.GetTimeRange(), nil
}

Expand Down
Loading

0 comments on commit dc4e3a6

Please sign in to comment.