From 901951d8c058b8ec20c918ce0a3da9b8ab758fc8 Mon Sep 17 00:00:00 2001 From: yangxuan Date: Wed, 8 May 2024 14:57:37 +0800 Subject: [PATCH] Fix ut Signed-off-by: yangxuan --- internal/datanode/l0_compactor.go | 29 ++++++++++++++++++-------- internal/datanode/l0_compactor_test.go | 4 ++-- 2 files changed, 22 insertions(+), 11 deletions(-) diff --git a/internal/datanode/l0_compactor.go b/internal/datanode/l0_compactor.go index f5f839a689ab6..ae2bf19a7a653 100644 --- a/internal/datanode/l0_compactor.go +++ b/internal/datanode/l0_compactor.go @@ -341,6 +341,7 @@ func (t *levelZeroCompactionTask) composeDeltalog(segmentID int64, dData *storag func (t *levelZeroCompactionTask) uploadByCheck(ctx context.Context, requireCheck bool, alteredSegments map[int64]*storage.DeleteData, resultSegments map[int64]*datapb.CompactionSegment) error { allBlobs := make(map[string][]byte) + tmpResults := make(map[int64]*datapb.CompactionSegment) for segID, dData := range alteredSegments { if !requireCheck || (dData.Size() >= paramtable.Get().DataNodeCfg.FlushDeleteBufferBytes.GetAsInt64()) { blobs, binlog, err := t.composeDeltalog(segID, dData) @@ -349,22 +350,32 @@ func (t *levelZeroCompactionTask) uploadByCheck(ctx context.Context, requireChec return err } allBlobs = lo.Assign(blobs, allBlobs) - if _, ok := resultSegments[segID]; !ok { - resultSegments[segID] = &datapb.CompactionSegment{ - SegmentID: segID, - Deltalogs: []*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{binlog}}}, - Channel: t.plan.GetChannel(), - } - } else { - resultSegments[segID].Deltalogs[0].Binlogs = append(resultSegments[segID].Deltalogs[0].Binlogs, binlog) + tmpResults[segID] = &datapb.CompactionSegment{ + SegmentID: segID, + Deltalogs: []*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{binlog}}}, + Channel: t.plan.GetChannel(), } - delete(alteredSegments, segID) } } + + if len(allBlobs) == 0 { + return nil + } + if err := t.Upload(ctx, allBlobs); err != nil { log.Warn("L0 compaction upload blobs fail", zap.Error(err)) return err } + + for segID, compSeg := range tmpResults { + if _, ok := resultSegments[segID]; !ok { + resultSegments[segID] = compSeg + } else { + binlog := compSeg.Deltalogs[0].Binlogs[0] + resultSegments[segID].Deltalogs[0].Binlogs = append(resultSegments[segID].Deltalogs[0].Binlogs, binlog) + } + } + return nil } diff --git a/internal/datanode/l0_compactor_test.go b/internal/datanode/l0_compactor_test.go index ba7564b92ec0e..80b9241285c14 100644 --- a/internal/datanode/l0_compactor_test.go +++ b/internal/datanode/l0_compactor_test.go @@ -220,7 +220,7 @@ func (s *LevelZeroCompactionTaskSuite) TestCompactLinear() { RunAndReturn(func(paths ...string) string { return path.Join(paths...) }).Times(2) - s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil).Times(2) + s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil).Once() s.Require().Equal(plan.GetPlanID(), s.task.getPlanID()) s.Require().Equal(plan.GetChannel(), s.task.getChannelName()) @@ -322,7 +322,7 @@ func (s *LevelZeroCompactionTaskSuite) TestCompactBatch() { RunAndReturn(func(paths ...string) string { return path.Join(paths...) }).Times(2) - s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil).Times(2) + s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil).Once() l0Segments := lo.Filter(s.task.plan.GetSegmentBinlogs(), func(s *datapb.CompactionSegmentBinlogs, _ int) bool { return s.Level == datapb.SegmentLevel_L0