Skip to content

Commit

Permalink
fix segment compaction
Browse files Browse the repository at this point in the history
  • Loading branch information
mymeiyi committed Dec 19, 2024
1 parent 8b3cc23 commit 0dfe4d9
Show file tree
Hide file tree
Showing 10 changed files with 244 additions and 18 deletions.
2 changes: 1 addition & 1 deletion be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -990,7 +990,7 @@ DEFINE_Bool(hide_webserver_config_page, "false");
DEFINE_Bool(enable_segcompaction, "true");

// Max number of segments allowed in a single segcompaction task.
DEFINE_Int32(segcompaction_batch_size, "10");
DEFINE_mInt32(segcompaction_batch_size, "10");

// Max row count allowed in a single source segment, bigger segments will be skipped.
DEFINE_Int32(segcompaction_candidate_max_rows, "1048576");
Expand Down
2 changes: 1 addition & 1 deletion be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1040,7 +1040,7 @@ DECLARE_Bool(hide_webserver_config_page);
DECLARE_Bool(enable_segcompaction);

// Max number of segments allowed in a single segcompaction task.
DECLARE_Int32(segcompaction_batch_size);
DECLARE_mInt32(segcompaction_batch_size);

// Max row count allowed in a single source segment, bigger segments will be skipped.
DECLARE_Int32(segcompaction_candidate_max_rows);
Expand Down
6 changes: 3 additions & 3 deletions be/src/olap/base_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -361,15 +361,14 @@ void BaseTablet::generate_tablet_meta_copy_unlocked(TabletMeta& new_tablet_meta)
}

Status BaseTablet::calc_delete_bitmap_between_segments(
RowsetSharedPtr rowset, const std::vector<segment_v2::SegmentSharedPtr>& segments,
RowsetId rowset_id, const std::vector<segment_v2::SegmentSharedPtr>& segments,
DeleteBitmapPtr delete_bitmap) {
size_t const num_segments = segments.size();
if (num_segments < 2) {
return Status::OK();
}

OlapStopWatch watch;
auto const rowset_id = rowset->rowset_id();
size_t seq_col_length = 0;
if (_tablet_meta->tablet_schema()->has_sequence_col()) {
auto seq_col_idx = _tablet_meta->tablet_schema()->sequence_col_idx();
Expand Down Expand Up @@ -1687,7 +1686,8 @@ Status BaseTablet::update_delete_bitmap_without_lock(

// calculate delete bitmap between segments if necessary.
DeleteBitmapPtr delete_bitmap = std::make_shared<DeleteBitmap>(self->tablet_id());
RETURN_IF_ERROR(self->calc_delete_bitmap_between_segments(rowset, segments, delete_bitmap));
RETURN_IF_ERROR(self->calc_delete_bitmap_between_segments(rowset->rowset_id(), segments,
delete_bitmap));

// get all base rowsets to calculate on
std::vector<RowsetSharedPtr> specified_rowsets;
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/base_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ class BaseTablet {
DeleteBitmapPtr tablet_delete_bitmap = nullptr);

Status calc_delete_bitmap_between_segments(
RowsetSharedPtr rowset, const std::vector<segment_v2::SegmentSharedPtr>& segments,
RowsetId rowset_id, const std::vector<segment_v2::SegmentSharedPtr>& segments,
DeleteBitmapPtr delete_bitmap);

static Status commit_phase_update_delete_bitmap(
Expand Down
33 changes: 26 additions & 7 deletions be/src/olap/rowset/segcompaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,19 @@ Status SegcompactionWorker::_get_segcompaction_reader(
read_options.use_page_cache = false;
read_options.tablet_schema = ctx.tablet_schema;
read_options.record_rowids = record_rowids;
if (!tablet->tablet_schema()->cluster_key_uids().empty()) {
DeleteBitmapPtr delete_bitmap = std::make_shared<DeleteBitmap>(tablet->tablet_id());
RETURN_IF_ERROR(tablet->calc_delete_bitmap_between_segments(ctx.rowset_id, *segments,
delete_bitmap));
for (auto& seg_ptr : *segments) {
auto d = delete_bitmap->get_agg(
{ctx.rowset_id, seg_ptr->id(), DeleteBitmap::TEMP_VERSION_COMMON});
if (d->isEmpty()) {
continue; // Empty delete bitmap for the segment
}
read_options.delete_bitmap.emplace(seg_ptr->id(), std::move(d));
}
}
std::vector<std::unique_ptr<RowwiseIterator>> seg_iterators;
std::map<uint32_t, uint32_t> segment_rows;
for (auto& seg_ptr : *segments) {
Expand Down Expand Up @@ -191,8 +204,9 @@ Status SegcompactionWorker::_delete_original_segments(uint32_t begin, uint32_t e

Status SegcompactionWorker::_check_correctness(OlapReaderStatistics& reader_stat,
Merger::Statistics& merger_stat, uint32_t begin,
uint32_t end) {
uint32_t end, bool is_mow_with_cluster_keys) {
uint64_t raw_rows_read = reader_stat.raw_rows_read; /* total rows read before merge */
uint64_t rows_del_by_bitmap = reader_stat.rows_del_by_bitmap;
uint64_t sum_src_row = 0; /* sum of rows in each involved source segments */
uint64_t filtered_rows = merger_stat.filtered_rows; /* rows filtered by del conditions */
uint64_t output_rows = merger_stat.output_rows; /* rows after merge */
Expand All @@ -206,11 +220,15 @@ Status SegcompactionWorker::_check_correctness(OlapReaderStatistics& reader_stat
}

DBUG_EXECUTE_IF("SegcompactionWorker._check_correctness_wrong_sum_src_row", { sum_src_row++; });
if (raw_rows_read != sum_src_row) {
uint64_t raw_rows = raw_rows_read;
if (is_mow_with_cluster_keys) {
raw_rows += rows_del_by_bitmap;
}
if (raw_rows != sum_src_row) {
return Status::Error<CHECK_LINES_ERROR>(
"segcompaction read row num does not match source. expect read row:{}, actual read "
"row:{}",
sum_src_row, raw_rows_read);
"row:{}(raw_rows_read: {}, rows_del_by_bitmap: {})",
sum_src_row, raw_rows, raw_rows_read, rows_del_by_bitmap);
}

DBUG_EXECUTE_IF("SegcompactionWorker._check_correctness_wrong_merged_rows", { merged_rows++; });
Expand Down Expand Up @@ -305,9 +323,10 @@ Status SegcompactionWorker::_do_compact_segments(SegCompactionCandidatesSharedPt
}

/* check row num after merge/aggregation */
RETURN_NOT_OK_STATUS_WITH_WARN(
_check_correctness(key_reader_stats, key_merger_stats, begin, end),
"check correctness failed");
bool is_mow_with_cluster_keys = !tablet->tablet_schema()->cluster_key_uids().empty();
RETURN_NOT_OK_STATUS_WITH_WARN(_check_correctness(key_reader_stats, key_merger_stats, begin,
end, is_mow_with_cluster_keys),
"check correctness failed");
{
std::lock_guard<std::mutex> lock(_writer->_segid_statistics_map_mutex);
_writer->_clear_statistics_for_deleting_segments_unsafe(begin, end);
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/segcompaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class SegcompactionWorker {
uint32_t end);
Status _delete_original_segments(uint32_t begin, uint32_t end);
Status _check_correctness(OlapReaderStatistics& reader_stat, Merger::Statistics& merger_stat,
uint32_t begin, uint32_t end);
uint32_t begin, uint32_t end, bool is_mow_with_cluster_keys);
Status _do_compact_segments(SegCompactionCandidatesSharedPtr segments);

private:
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/rowset_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -286,8 +286,8 @@ Status BaseRowsetBuilder::submit_calc_delete_bitmap_task() {
RETURN_IF_ERROR(beta_rowset->load_segments(&segments));
if (segments.size() > 1) {
// calculate delete bitmap between segments
RETURN_IF_ERROR(
_tablet->calc_delete_bitmap_between_segments(_rowset, segments, _delete_bitmap));
RETURN_IF_ERROR(_tablet->calc_delete_bitmap_between_segments(_rowset->rowset_id(), segments,
_delete_bitmap));
}

// For partial update, we need to fill in the entire row of data, during the calculation
Expand Down
4 changes: 2 additions & 2 deletions be/src/service/backend_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -551,8 +551,8 @@ void _ingest_binlog(StorageEngine& engine, IngestBinlogArg* arg) {
}
if (segments.size() > 1) {
// calculate delete bitmap between segments
status = local_tablet->calc_delete_bitmap_between_segments(rowset, segments,
delete_bitmap);
status = local_tablet->calc_delete_bitmap_between_segments(rowset->rowset_id(),
segments, delete_bitmap);
if (!status) {
LOG(WARNING) << "failed to calculate delete bitmap"
<< ". tablet_id: " << local_tablet->tablet_id()
Expand Down
19 changes: 19 additions & 0 deletions regression-test/data/unique_with_mow_c_p0/test_compact_seg.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !select1 --
12345 23083 30920 40410

-- !select2 --
17320 24209 30795 40000

-- !select3 --
59832 36673 30343 40299

-- !select1 --
12345 23083 30920 40410

-- !select2 --
17320 24209 30795 40782

-- !select3 --
59832 36673 30586 40739

188 changes: 188 additions & 0 deletions regression-test/suites/unique_with_mow_c_p0/test_compact_seg.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_compact_seg", "nonConcurrent") {
def tableName = "test_compact_seg"

def getTabletStatus = { rowsetNum, lastRowsetSegmentNum ->
def tablets = sql_return_maparray """ show tablets from ${tableName}; """
logger.info("tablets: ${tablets}")
assertEquals(1, tablets.size())
def tablet = tablets[0]
String compactionUrl = tablet["CompactionStatus"]
def retry = 15
for (int i = 0; i < retry; i++) {
def (code, out, err) = curl("GET", compactionUrl)
logger.info("Show tablets " + tablet.TabletId + " status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def tabletJson = parseJson(out.trim())
assert tabletJson.rowsets instanceof List
assertTrue(tabletJson.rowsets.size() >= rowsetNum)
def rowset = tabletJson.rowsets.get(rowsetNum - 1)
logger.info("rowset: ${rowset}")
int start_index = rowset.indexOf("]")
int end_index = rowset.indexOf("DATA")
def segmentNumStr = rowset.substring(start_index + 1, end_index).trim()
logger.info("segmentNumStr: ${segmentNumStr}")
if (Integer.parseInt(segmentNumStr) == lastRowsetSegmentNum) {
break
}
if (i == retry - 1) {
// assertEquals(lastRowsetSegmentNum, Integer.parseInt(segmentNumStr))
logger.warn("expected segmentNum: ${segmentNumStr}, but get ${lastRowsetSegmentNum} after ${retry} retries")
}
sleep(2000)
}
}

// batch_size is 4164 in csv_reader.cpp
// _batch_size is 8192 in vtablet_writer.cpp
def doris_scanner_row_bytes_params = get_be_param("doris_scanner_row_bytes")
def segcompaction_batch_size_params = get_be_param("segcompaction_batch_size")
onFinish {
GetDebugPoint().disableDebugPointForAllBEs("MemTable.need_flush")
set_original_be_param("doris_scanner_row_bytes", doris_scanner_row_bytes_params)
set_original_be_param('segcompaction_batch_size', segcompaction_batch_size_params)
}
GetDebugPoint().enableDebugPointForAllBEs("MemTable.need_flush")
set_be_param.call("doris_scanner_row_bytes", "1")
set_be_param.call('segcompaction_batch_size', 5)

for (int j = 0; j < 2; j++) {
tableName = "test_compact_seg_" + j
sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
CREATE TABLE IF NOT EXISTS ${tableName} (
`k1` int(11) NULL,
`k2` int(11) NULL,
`v3` int(11) NULL,
`v4` int(11) NULL
) unique KEY(`k1`, `k2`)
cluster by(`v3`, `v4`)
DISTRIBUTED BY HASH(`k1`) BUCKETS 1
PROPERTIES (
""" + (j == 1 ? "\"function_column.sequence_col\"='v4', " : "") +
"""
"replication_num" = "1",
"disable_auto_compaction" = "true"
);
"""

streamLoad {
table "${tableName}"
set 'column_separator', ','
file 'test_schema_change_add_key_column.csv'
time 10000 // limit inflight 10s

check { result, exception, startTime, endTime ->
if (exception != null) {
throw exception
}
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
assertEquals(8192, json.NumberTotalRows)
assertEquals(0, json.NumberFilteredRows)
}
}
// check generate 3 segments
getTabletStatus(2, 3)

streamLoad {
table "${tableName}"
set 'column_separator', ','
file 'test_schema_change_add_key_column1.csv'
time 10000 // limit inflight 10s

check { result, exception, startTime, endTime ->
if (exception != null) {
throw exception
}
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
assertEquals(20480, json.NumberTotalRows)
assertEquals(0, json.NumberFilteredRows)
}
}
// check generate 2 segments(6 -> 2)
getTabletStatus(3, 2)

streamLoad {
table "${tableName}"
set 'column_separator', ','
file 'test_schema_change_add_key_column2.csv'
time 10000 // limit inflight 10s

check { result, exception, startTime, endTime ->
if (exception != null) {
throw exception
}
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
assertEquals(20480, json.NumberTotalRows)
assertEquals(0, json.NumberFilteredRows)
}
}
// check generate 2 segments(6 -> 2)
getTabletStatus(4, 2)

streamLoad {
table "${tableName}"
set 'column_separator', ','
file 'test_schema_change_add_key_column3.csv'
time 10000 // limit inflight 10s

check { result, exception, startTime, endTime ->
if (exception != null) {
throw exception
}
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
assertEquals(20480, json.NumberTotalRows)
assertEquals(0, json.NumberFilteredRows)
}
}
// check generate 2 segments(6 -> 2)
getTabletStatus(5, 2)

def rowCount1 = sql """ select count() from ${tableName}; """
logger.info("rowCount1: ${rowCount1}")

// get be info
def tablets = sql_return_maparray """ show tablets from ${tableName}; """
for (def tablet in tablets) {
def (code, out, err) = curl("GET", tablet.CompactionStatus)
logger.info("Show tablet status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
}

// check generate 1 segments
// getTabletStatus(2, 1) // [2-5]

// check row count
def rowCount2 = sql """ select count() from ${tableName}; """
logger.info("rowCount2: ${rowCount2}")
assertEquals(rowCount1[0][0], rowCount2[0][0])
// check no duplicated key
def result = sql """ select `k1`, `k2`, count(*) a from ${tableName} group by `k1`, `k2` having a > 1; """
logger.info("result: ${result}")
assertEquals(0, result.size())
// check one row value
order_qt_select1 """ select * from ${tableName} where `k1` = 12345; """
order_qt_select2 """ select * from ${tableName} where `k1` = 17320; """
order_qt_select3 """ select * from ${tableName} where `k1` = 59832 and `k2` = 36673; """
}
}

0 comments on commit 0dfe4d9

Please sign in to comment.