From 0dfe4d9ff08b4114bbdc209208090fd1a5f4605b Mon Sep 17 00:00:00 2001 From: meiyi Date: Thu, 19 Dec 2024 19:32:39 +0800 Subject: [PATCH] fix segment compaction --- be/src/common/config.cpp | 2 +- be/src/common/config.h | 2 +- be/src/olap/base_tablet.cpp | 6 +- be/src/olap/base_tablet.h | 2 +- be/src/olap/rowset/segcompaction.cpp | 33 ++- be/src/olap/rowset/segcompaction.h | 2 +- be/src/olap/rowset_builder.cpp | 4 +- be/src/service/backend_service.cpp | 4 +- .../unique_with_mow_c_p0/test_compact_seg.out | 19 ++ .../test_compact_seg.groovy | 188 ++++++++++++++++++ 10 files changed, 244 insertions(+), 18 deletions(-) create mode 100644 regression-test/data/unique_with_mow_c_p0/test_compact_seg.out create mode 100644 regression-test/suites/unique_with_mow_c_p0/test_compact_seg.groovy diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 95a3e61fb5517a..0b788ddc089b67 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -990,7 +990,7 @@ DEFINE_Bool(hide_webserver_config_page, "false"); DEFINE_Bool(enable_segcompaction, "true"); // Max number of segments allowed in a single segcompaction task. -DEFINE_Int32(segcompaction_batch_size, "10"); +DEFINE_mInt32(segcompaction_batch_size, "10"); // Max row count allowed in a single source segment, bigger segments will be skipped. DEFINE_Int32(segcompaction_candidate_max_rows, "1048576"); diff --git a/be/src/common/config.h b/be/src/common/config.h index f8a9c3f7480b33..d39e488ff141f9 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1040,7 +1040,7 @@ DECLARE_Bool(hide_webserver_config_page); DECLARE_Bool(enable_segcompaction); // Max number of segments allowed in a single segcompaction task. -DECLARE_Int32(segcompaction_batch_size); +DECLARE_mInt32(segcompaction_batch_size); // Max row count allowed in a single source segment, bigger segments will be skipped. DECLARE_Int32(segcompaction_candidate_max_rows); diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp index 82dc122e19f5ef..45c689143237f9 100644 --- a/be/src/olap/base_tablet.cpp +++ b/be/src/olap/base_tablet.cpp @@ -361,7 +361,7 @@ void BaseTablet::generate_tablet_meta_copy_unlocked(TabletMeta& new_tablet_meta) } Status BaseTablet::calc_delete_bitmap_between_segments( - RowsetSharedPtr rowset, const std::vector& segments, + RowsetId rowset_id, const std::vector& segments, DeleteBitmapPtr delete_bitmap) { size_t const num_segments = segments.size(); if (num_segments < 2) { @@ -369,7 +369,6 @@ Status BaseTablet::calc_delete_bitmap_between_segments( } OlapStopWatch watch; - auto const rowset_id = rowset->rowset_id(); size_t seq_col_length = 0; if (_tablet_meta->tablet_schema()->has_sequence_col()) { auto seq_col_idx = _tablet_meta->tablet_schema()->sequence_col_idx(); @@ -1687,7 +1686,8 @@ Status BaseTablet::update_delete_bitmap_without_lock( // calculate delete bitmap between segments if necessary. DeleteBitmapPtr delete_bitmap = std::make_shared(self->tablet_id()); - RETURN_IF_ERROR(self->calc_delete_bitmap_between_segments(rowset, segments, delete_bitmap)); + RETURN_IF_ERROR(self->calc_delete_bitmap_between_segments(rowset->rowset_id(), segments, + delete_bitmap)); // get all base rowsets to calculate on std::vector specified_rowsets; diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h index c6de447200f87c..9c6b63a00fe5bb 100644 --- a/be/src/olap/base_tablet.h +++ b/be/src/olap/base_tablet.h @@ -181,7 +181,7 @@ class BaseTablet { DeleteBitmapPtr tablet_delete_bitmap = nullptr); Status calc_delete_bitmap_between_segments( - RowsetSharedPtr rowset, const std::vector& segments, + RowsetId rowset_id, const std::vector& segments, DeleteBitmapPtr delete_bitmap); static Status commit_phase_update_delete_bitmap( diff --git a/be/src/olap/rowset/segcompaction.cpp b/be/src/olap/rowset/segcompaction.cpp index f65e00b8b93791..bf12ce8cbbc366 100644 --- a/be/src/olap/rowset/segcompaction.cpp +++ b/be/src/olap/rowset/segcompaction.cpp @@ -95,6 +95,19 @@ Status SegcompactionWorker::_get_segcompaction_reader( read_options.use_page_cache = false; read_options.tablet_schema = ctx.tablet_schema; read_options.record_rowids = record_rowids; + if (!tablet->tablet_schema()->cluster_key_uids().empty()) { + DeleteBitmapPtr delete_bitmap = std::make_shared(tablet->tablet_id()); + RETURN_IF_ERROR(tablet->calc_delete_bitmap_between_segments(ctx.rowset_id, *segments, + delete_bitmap)); + for (auto& seg_ptr : *segments) { + auto d = delete_bitmap->get_agg( + {ctx.rowset_id, seg_ptr->id(), DeleteBitmap::TEMP_VERSION_COMMON}); + if (d->isEmpty()) { + continue; // Empty delete bitmap for the segment + } + read_options.delete_bitmap.emplace(seg_ptr->id(), std::move(d)); + } + } std::vector> seg_iterators; std::map segment_rows; for (auto& seg_ptr : *segments) { @@ -191,8 +204,9 @@ Status SegcompactionWorker::_delete_original_segments(uint32_t begin, uint32_t e Status SegcompactionWorker::_check_correctness(OlapReaderStatistics& reader_stat, Merger::Statistics& merger_stat, uint32_t begin, - uint32_t end) { + uint32_t end, bool is_mow_with_cluster_keys) { uint64_t raw_rows_read = reader_stat.raw_rows_read; /* total rows read before merge */ + uint64_t rows_del_by_bitmap = reader_stat.rows_del_by_bitmap; uint64_t sum_src_row = 0; /* sum of rows in each involved source segments */ uint64_t filtered_rows = merger_stat.filtered_rows; /* rows filtered by del conditions */ uint64_t output_rows = merger_stat.output_rows; /* rows after merge */ @@ -206,11 +220,15 @@ Status SegcompactionWorker::_check_correctness(OlapReaderStatistics& reader_stat } DBUG_EXECUTE_IF("SegcompactionWorker._check_correctness_wrong_sum_src_row", { sum_src_row++; }); - if (raw_rows_read != sum_src_row) { + uint64_t raw_rows = raw_rows_read; + if (is_mow_with_cluster_keys) { + raw_rows += rows_del_by_bitmap; + } + if (raw_rows != sum_src_row) { return Status::Error( "segcompaction read row num does not match source. expect read row:{}, actual read " - "row:{}", - sum_src_row, raw_rows_read); + "row:{}(raw_rows_read: {}, rows_del_by_bitmap: {})", + sum_src_row, raw_rows, raw_rows_read, rows_del_by_bitmap); } DBUG_EXECUTE_IF("SegcompactionWorker._check_correctness_wrong_merged_rows", { merged_rows++; }); @@ -305,9 +323,10 @@ Status SegcompactionWorker::_do_compact_segments(SegCompactionCandidatesSharedPt } /* check row num after merge/aggregation */ - RETURN_NOT_OK_STATUS_WITH_WARN( - _check_correctness(key_reader_stats, key_merger_stats, begin, end), - "check correctness failed"); + bool is_mow_with_cluster_keys = !tablet->tablet_schema()->cluster_key_uids().empty(); + RETURN_NOT_OK_STATUS_WITH_WARN(_check_correctness(key_reader_stats, key_merger_stats, begin, + end, is_mow_with_cluster_keys), + "check correctness failed"); { std::lock_guard lock(_writer->_segid_statistics_map_mutex); _writer->_clear_statistics_for_deleting_segments_unsafe(begin, end); diff --git a/be/src/olap/rowset/segcompaction.h b/be/src/olap/rowset/segcompaction.h index ac9ba66f7fb017..0279b5bb653c6e 100644 --- a/be/src/olap/rowset/segcompaction.h +++ b/be/src/olap/rowset/segcompaction.h @@ -93,7 +93,7 @@ class SegcompactionWorker { uint32_t end); Status _delete_original_segments(uint32_t begin, uint32_t end); Status _check_correctness(OlapReaderStatistics& reader_stat, Merger::Statistics& merger_stat, - uint32_t begin, uint32_t end); + uint32_t begin, uint32_t end, bool is_mow_with_cluster_keys); Status _do_compact_segments(SegCompactionCandidatesSharedPtr segments); private: diff --git a/be/src/olap/rowset_builder.cpp b/be/src/olap/rowset_builder.cpp index ec7463d5b9d75d..cc6221a5bf6ed9 100644 --- a/be/src/olap/rowset_builder.cpp +++ b/be/src/olap/rowset_builder.cpp @@ -286,8 +286,8 @@ Status BaseRowsetBuilder::submit_calc_delete_bitmap_task() { RETURN_IF_ERROR(beta_rowset->load_segments(&segments)); if (segments.size() > 1) { // calculate delete bitmap between segments - RETURN_IF_ERROR( - _tablet->calc_delete_bitmap_between_segments(_rowset, segments, _delete_bitmap)); + RETURN_IF_ERROR(_tablet->calc_delete_bitmap_between_segments(_rowset->rowset_id(), segments, + _delete_bitmap)); } // For partial update, we need to fill in the entire row of data, during the calculation diff --git a/be/src/service/backend_service.cpp b/be/src/service/backend_service.cpp index d74a9cd2e0b181..55e18b4deb8544 100644 --- a/be/src/service/backend_service.cpp +++ b/be/src/service/backend_service.cpp @@ -551,8 +551,8 @@ void _ingest_binlog(StorageEngine& engine, IngestBinlogArg* arg) { } if (segments.size() > 1) { // calculate delete bitmap between segments - status = local_tablet->calc_delete_bitmap_between_segments(rowset, segments, - delete_bitmap); + status = local_tablet->calc_delete_bitmap_between_segments(rowset->rowset_id(), + segments, delete_bitmap); if (!status) { LOG(WARNING) << "failed to calculate delete bitmap" << ". tablet_id: " << local_tablet->tablet_id() diff --git a/regression-test/data/unique_with_mow_c_p0/test_compact_seg.out b/regression-test/data/unique_with_mow_c_p0/test_compact_seg.out new file mode 100644 index 00000000000000..17c30d14aa5b43 --- /dev/null +++ b/regression-test/data/unique_with_mow_c_p0/test_compact_seg.out @@ -0,0 +1,19 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select1 -- +12345 23083 30920 40410 + +-- !select2 -- +17320 24209 30795 40000 + +-- !select3 -- +59832 36673 30343 40299 + +-- !select1 -- +12345 23083 30920 40410 + +-- !select2 -- +17320 24209 30795 40782 + +-- !select3 -- +59832 36673 30586 40739 + diff --git a/regression-test/suites/unique_with_mow_c_p0/test_compact_seg.groovy b/regression-test/suites/unique_with_mow_c_p0/test_compact_seg.groovy new file mode 100644 index 00000000000000..230653bcf3bd57 --- /dev/null +++ b/regression-test/suites/unique_with_mow_c_p0/test_compact_seg.groovy @@ -0,0 +1,188 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_compact_seg", "nonConcurrent") { + def tableName = "test_compact_seg" + + def getTabletStatus = { rowsetNum, lastRowsetSegmentNum -> + def tablets = sql_return_maparray """ show tablets from ${tableName}; """ + logger.info("tablets: ${tablets}") + assertEquals(1, tablets.size()) + def tablet = tablets[0] + String compactionUrl = tablet["CompactionStatus"] + def retry = 15 + for (int i = 0; i < retry; i++) { + def (code, out, err) = curl("GET", compactionUrl) + logger.info("Show tablets " + tablet.TabletId + " status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def tabletJson = parseJson(out.trim()) + assert tabletJson.rowsets instanceof List + assertTrue(tabletJson.rowsets.size() >= rowsetNum) + def rowset = tabletJson.rowsets.get(rowsetNum - 1) + logger.info("rowset: ${rowset}") + int start_index = rowset.indexOf("]") + int end_index = rowset.indexOf("DATA") + def segmentNumStr = rowset.substring(start_index + 1, end_index).trim() + logger.info("segmentNumStr: ${segmentNumStr}") + if (Integer.parseInt(segmentNumStr) == lastRowsetSegmentNum) { + break + } + if (i == retry - 1) { + // assertEquals(lastRowsetSegmentNum, Integer.parseInt(segmentNumStr)) + logger.warn("expected segmentNum: ${segmentNumStr}, but get ${lastRowsetSegmentNum} after ${retry} retries") + } + sleep(2000) + } + } + + // batch_size is 4164 in csv_reader.cpp + // _batch_size is 8192 in vtablet_writer.cpp + def doris_scanner_row_bytes_params = get_be_param("doris_scanner_row_bytes") + def segcompaction_batch_size_params = get_be_param("segcompaction_batch_size") + onFinish { + GetDebugPoint().disableDebugPointForAllBEs("MemTable.need_flush") + set_original_be_param("doris_scanner_row_bytes", doris_scanner_row_bytes_params) + set_original_be_param('segcompaction_batch_size', segcompaction_batch_size_params) + } + GetDebugPoint().enableDebugPointForAllBEs("MemTable.need_flush") + set_be_param.call("doris_scanner_row_bytes", "1") + set_be_param.call('segcompaction_batch_size', 5) + + for (int j = 0; j < 2; j++) { + tableName = "test_compact_seg_" + j + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `k1` int(11) NULL, + `k2` int(11) NULL, + `v3` int(11) NULL, + `v4` int(11) NULL + ) unique KEY(`k1`, `k2`) + cluster by(`v3`, `v4`) + DISTRIBUTED BY HASH(`k1`) BUCKETS 1 + PROPERTIES ( + """ + (j == 1 ? "\"function_column.sequence_col\"='v4', " : "") + + """ + "replication_num" = "1", + "disable_auto_compaction" = "true" + ); + """ + + streamLoad { + table "${tableName}" + set 'column_separator', ',' + file 'test_schema_change_add_key_column.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(8192, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + } + } + // check generate 3 segments + getTabletStatus(2, 3) + + streamLoad { + table "${tableName}" + set 'column_separator', ',' + file 'test_schema_change_add_key_column1.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(20480, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + } + } + // check generate 2 segments(6 -> 2) + getTabletStatus(3, 2) + + streamLoad { + table "${tableName}" + set 'column_separator', ',' + file 'test_schema_change_add_key_column2.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(20480, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + } + } + // check generate 2 segments(6 -> 2) + getTabletStatus(4, 2) + + streamLoad { + table "${tableName}" + set 'column_separator', ',' + file 'test_schema_change_add_key_column3.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(20480, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + } + } + // check generate 2 segments(6 -> 2) + getTabletStatus(5, 2) + + def rowCount1 = sql """ select count() from ${tableName}; """ + logger.info("rowCount1: ${rowCount1}") + + // get be info + def tablets = sql_return_maparray """ show tablets from ${tableName}; """ + for (def tablet in tablets) { + def (code, out, err) = curl("GET", tablet.CompactionStatus) + logger.info("Show tablet status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + } + + // check generate 1 segments + // getTabletStatus(2, 1) // [2-5] + + // check row count + def rowCount2 = sql """ select count() from ${tableName}; """ + logger.info("rowCount2: ${rowCount2}") + assertEquals(rowCount1[0][0], rowCount2[0][0]) + // check no duplicated key + def result = sql """ select `k1`, `k2`, count(*) a from ${tableName} group by `k1`, `k2` having a > 1; """ + logger.info("result: ${result}") + assertEquals(0, result.size()) + // check one row value + order_qt_select1 """ select * from ${tableName} where `k1` = 12345; """ + order_qt_select2 """ select * from ${tableName} where `k1` = 17320; """ + order_qt_select3 """ select * from ${tableName} where `k1` = 59832 and `k2` = 36673; """ + } +}