Skip to content

Commit

Permalink
Fix ut
Browse files Browse the repository at this point in the history
Signed-off-by: yangxuan <[email protected]>
  • Loading branch information
XuanYang-cn committed May 8, 2024
1 parent 321ee79 commit 901951d
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 11 deletions.
29 changes: 20 additions & 9 deletions internal/datanode/l0_compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,7 @@ func (t *levelZeroCompactionTask) composeDeltalog(segmentID int64, dData *storag

func (t *levelZeroCompactionTask) uploadByCheck(ctx context.Context, requireCheck bool, alteredSegments map[int64]*storage.DeleteData, resultSegments map[int64]*datapb.CompactionSegment) error {
allBlobs := make(map[string][]byte)
tmpResults := make(map[int64]*datapb.CompactionSegment)
for segID, dData := range alteredSegments {
if !requireCheck || (dData.Size() >= paramtable.Get().DataNodeCfg.FlushDeleteBufferBytes.GetAsInt64()) {
blobs, binlog, err := t.composeDeltalog(segID, dData)
Expand All @@ -349,22 +350,32 @@ func (t *levelZeroCompactionTask) uploadByCheck(ctx context.Context, requireChec
return err
}
allBlobs = lo.Assign(blobs, allBlobs)
if _, ok := resultSegments[segID]; !ok {
resultSegments[segID] = &datapb.CompactionSegment{
SegmentID: segID,
Deltalogs: []*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{binlog}}},
Channel: t.plan.GetChannel(),
}
} else {
resultSegments[segID].Deltalogs[0].Binlogs = append(resultSegments[segID].Deltalogs[0].Binlogs, binlog)
tmpResults[segID] = &datapb.CompactionSegment{
SegmentID: segID,
Deltalogs: []*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{binlog}}},
Channel: t.plan.GetChannel(),
}

delete(alteredSegments, segID)
}
}

if len(allBlobs) == 0 {
return nil
}

if err := t.Upload(ctx, allBlobs); err != nil {
log.Warn("L0 compaction upload blobs fail", zap.Error(err))
return err
}

for segID, compSeg := range tmpResults {
if _, ok := resultSegments[segID]; !ok {
resultSegments[segID] = compSeg
} else {
binlog := compSeg.Deltalogs[0].Binlogs[0]
resultSegments[segID].Deltalogs[0].Binlogs = append(resultSegments[segID].Deltalogs[0].Binlogs, binlog)
}
}

return nil
}
4 changes: 2 additions & 2 deletions internal/datanode/l0_compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func (s *LevelZeroCompactionTaskSuite) TestCompactLinear() {
RunAndReturn(func(paths ...string) string {
return path.Join(paths...)
}).Times(2)
s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil).Times(2)
s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil).Once()

s.Require().Equal(plan.GetPlanID(), s.task.getPlanID())
s.Require().Equal(plan.GetChannel(), s.task.getChannelName())
Expand Down Expand Up @@ -322,7 +322,7 @@ func (s *LevelZeroCompactionTaskSuite) TestCompactBatch() {
RunAndReturn(func(paths ...string) string {
return path.Join(paths...)
}).Times(2)
s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil).Times(2)
s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil).Once()

l0Segments := lo.Filter(s.task.plan.GetSegmentBinlogs(), func(s *datapb.CompactionSegmentBinlogs, _ int) bool {
return s.Level == datapb.SegmentLevel_L0
Expand Down

0 comments on commit 901951d

Please sign in to comment.