Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance: refactor delete mvcc function (#38066) #39258

Merged
merged 1 commit into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
372 changes: 285 additions & 87 deletions internal/core/src/segcore/DeletedRecord.h

Large diffs are not rendered by default.

54 changes: 4 additions & 50 deletions internal/core/src/segcore/SegmentGrowingImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,23 +48,7 @@ void
SegmentGrowingImpl::mask_with_delete(BitsetType& bitset,
int64_t ins_barrier,
Timestamp timestamp) const {
auto del_barrier = get_barrier(get_deleted_record(), timestamp);
if (del_barrier == 0) {
return;
}
auto bitmap_holder = get_deleted_bitmap(
del_barrier, ins_barrier, deleted_record_, insert_record_, timestamp);
if (!bitmap_holder || !bitmap_holder->bitmap_ptr) {
return;
}
auto& delete_bitset = *bitmap_holder->bitmap_ptr;
AssertInfo(
delete_bitset.size() == bitset.size(),
fmt::format(
"Deleted bitmap size:{} not equal to filtered bitmap size:{}",
delete_bitset.size(),
bitset.size()));
bitset |= delete_bitset;
deleted_record_.Query(bitset, ins_barrier, timestamp);
}

void
Expand Down Expand Up @@ -391,7 +375,7 @@ SegmentGrowingImpl::Delete(int64_t reserved_begin,
}

// step 2: fill delete record
deleted_record_.push(sort_pks, sort_timestamps.data());
deleted_record_.StreamPush(sort_pks, sort_timestamps.data());
return SegcoreError::success();
}

Expand All @@ -411,38 +395,8 @@ SegmentGrowingImpl::LoadDeletedRecord(const LoadDeletedRecordInfo& info) {
ParsePksFromIDs(pks, field_meta.get_data_type(), *info.primary_keys);
auto timestamps = reinterpret_cast<const Timestamp*>(info.timestamps);

std::vector<std::tuple<Timestamp, PkType>> ordering(size);
for (int i = 0; i < size; i++) {
ordering[i] = std::make_tuple(timestamps[i], pks[i]);
}

if (!insert_record_.empty_pks()) {
auto end = std::remove_if(
ordering.begin(),
ordering.end(),
[&](const std::tuple<Timestamp, PkType>& record) {
return !insert_record_.contain(std::get<1>(record));
});
size = end - ordering.begin();
ordering.resize(size);
}

// all record filtered
if (size == 0) {
return;
}

std::sort(ordering.begin(), ordering.end());
std::vector<PkType> sort_pks(size);
std::vector<Timestamp> sort_timestamps(size);

for (int i = 0; i < size; i++) {
auto [t, pk] = ordering[i];
sort_timestamps[i] = t;
sort_pks[i] = pk;
}

deleted_record_.push(sort_pks, sort_timestamps.data());
// step 2: push delete info to delete_record
deleted_record_.LoadPush(pks, timestamps);
}

SpanBase
Expand Down
20 changes: 13 additions & 7 deletions internal/core/src/segcore/SegmentGrowingImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,6 @@ class SegmentGrowingImpl : public SegmentGrowing {
return indexing_record_;
}

const DeletedRecord&
get_deleted_record() const {
return deleted_record_;
}

std::shared_mutex&
get_chunk_mutex() const {
return chunk_mutex_;
Expand Down Expand Up @@ -229,7 +224,8 @@ class SegmentGrowingImpl : public SegmentGrowing {
insert_record_(
*schema_, segcore_config.get_chunk_rows(), mmap_descriptor_),
indexing_record_(*schema_, index_meta_, segcore_config_),
id_(segment_id) {
id_(segment_id),
deleted_record_(&insert_record_, this) {
if (mmap_descriptor_ != nullptr) {
LOG_INFO("growing segment {} use mmap to hold raw data",
this->get_segment_id());
Expand Down Expand Up @@ -311,6 +307,16 @@ class SegmentGrowingImpl : public SegmentGrowing {
return false;
}

std::vector<SegOffset>
search_pk(const PkType& pk, Timestamp timestamp) const override {
return insert_record_.search_pk(pk, timestamp);
}

std::vector<SegOffset>
search_pk(const PkType& pk, int64_t insert_barrier) const override {
return insert_record_.search_pk(pk, insert_barrier);
}

protected:
int64_t
num_chunk() const override;
Expand Down Expand Up @@ -356,7 +362,7 @@ class SegmentGrowingImpl : public SegmentGrowing {
mutable std::shared_mutex chunk_mutex_;

// deleted pks
mutable DeletedRecord deleted_record_;
mutable DeletedRecord<false> deleted_record_;

int64_t id_;

Expand Down
7 changes: 6 additions & 1 deletion internal/core/src/segcore/SegmentInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
#include <vector>
#include <index/ScalarIndex.h>

#include "DeletedRecord.h"
#include "FieldIndexing.h"
#include "common/Schema.h"
#include "common/Span.h"
Expand Down Expand Up @@ -351,6 +350,12 @@ class SegmentInternalInterface : public SegmentInterface {
virtual bool
is_mmap_field(FieldId field_id) const = 0;

virtual std::vector<SegOffset>
search_pk(const PkType& pk, Timestamp timestamp) const = 0;

virtual std::vector<SegOffset>
search_pk(const PkType& pk, int64_t insert_barrier) const = 0;

protected:
// internal API: return chunk_data in span
virtual SpanBase
Expand Down
3 changes: 3 additions & 0 deletions internal/core/src/segcore/SegmentSealed.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ class SegmentSealed : public SegmentInternalInterface {
virtual void
WarmupChunkCache(const FieldId field_id, bool mmap_enabled) = 0;

virtual InsertRecord<true>&
get_insert_record() = 0;

SegmentType
type() const override {
return SegmentType::Sealed;
Expand Down
58 changes: 6 additions & 52 deletions internal/core/src/segcore/SegmentSealedImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -654,38 +654,8 @@ SegmentSealedImpl::LoadDeletedRecord(const LoadDeletedRecordInfo& info) {
ParsePksFromIDs(pks, field_meta.get_data_type(), *info.primary_keys);
auto timestamps = reinterpret_cast<const Timestamp*>(info.timestamps);

std::vector<std::tuple<Timestamp, PkType>> ordering(size);
for (int i = 0; i < size; i++) {
ordering[i] = std::make_tuple(timestamps[i], pks[i]);
}

if (!insert_record_.empty_pks()) {
auto end = std::remove_if(
ordering.begin(),
ordering.end(),
[&](const std::tuple<Timestamp, PkType>& record) {
return !insert_record_.contain(std::get<1>(record));
});
size = end - ordering.begin();
ordering.resize(size);
}

// all record filtered
if (size == 0) {
return;
}

std::sort(ordering.begin(), ordering.end());
std::vector<PkType> sort_pks(size);
std::vector<Timestamp> sort_timestamps(size);

for (int i = 0; i < size; i++) {
auto [t, pk] = ordering[i];
sort_timestamps[i] = t;
sort_pks[i] = pk;
}

deleted_record_.push(sort_pks, sort_timestamps.data());
// step 2: push delete info to delete_record
deleted_record_.LoadPush(pks, timestamps);
}

void
Expand Down Expand Up @@ -804,24 +774,7 @@ void
SegmentSealedImpl::mask_with_delete(BitsetType& bitset,
int64_t ins_barrier,
Timestamp timestamp) const {
auto del_barrier = get_barrier(get_deleted_record(), timestamp);
if (del_barrier == 0) {
return;
}

auto bitmap_holder = get_deleted_bitmap(
del_barrier, ins_barrier, deleted_record_, insert_record_, timestamp);
if (!bitmap_holder || !bitmap_holder->bitmap_ptr) {
return;
}
auto& delete_bitset = *bitmap_holder->bitmap_ptr;
AssertInfo(
delete_bitset.size() == bitset.size(),
fmt::format(
"Deleted bitmap size:{} not equal to filtered bitmap size:{}",
delete_bitset.size(),
bitset.size()));
bitset |= delete_bitset;
deleted_record_.Query(bitset, ins_barrier, timestamp);
}

void
Expand Down Expand Up @@ -1125,7 +1078,8 @@ SegmentSealedImpl::SegmentSealedImpl(SchemaPtr schema,
schema_(schema),
id_(segment_id),
col_index_meta_(index_meta),
TEST_skip_index_for_retrieve_(TEST_skip_index_for_retrieve) {
TEST_skip_index_for_retrieve_(TEST_skip_index_for_retrieve),
deleted_record_(&insert_record_, this) {
mmap_descriptor_ = std::shared_ptr<storage::MmapChunkDescriptor>(
new storage::MmapChunkDescriptor({segment_id, SegmentType::Sealed}));
auto mcm = storage::MmapManager::GetInstance().GetMmapChunkManager();
Expand Down Expand Up @@ -1642,7 +1596,7 @@ SegmentSealedImpl::Delete(int64_t reserved_offset, // deprecated
sort_pks[i] = pk;
}

deleted_record_.push(sort_pks, sort_timestamps.data());
deleted_record_.StreamPush(sort_pks, sort_timestamps.data());
return SegcoreError::success();
}

Expand Down
23 changes: 17 additions & 6 deletions internal/core/src/segcore/SegmentSealedImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ class SegmentSealedImpl : public SegmentSealed {
std::unique_ptr<DataArray>
get_vector(FieldId field_id, const int64_t* ids, int64_t count) const;

InsertRecord<true>&
get_insert_record() override {
return insert_record_;
}

public:
int64_t
num_chunk_index(FieldId field_id) const override;
Expand Down Expand Up @@ -162,6 +167,16 @@ class SegmentSealedImpl : public SegmentSealed {
void
ClearData();

std::vector<SegOffset>
search_pk(const PkType& pk, Timestamp timestamp) const override {
return insert_record_.search_pk(pk, timestamp);
}

std::vector<SegOffset>
search_pk(const PkType& pk, int64_t insert_barrier) const override {
return insert_record_.search_pk(pk, insert_barrier);
}

protected:
// blob and row_count
SpanBase
Expand Down Expand Up @@ -250,6 +265,7 @@ class SegmentSealedImpl : public SegmentSealed {
// } else {
num_rows_ = row_count;
// }
deleted_record_.set_sealed_row_count(row_count);
}

void
Expand All @@ -274,11 +290,6 @@ class SegmentSealedImpl : public SegmentSealed {
return system_ready_count_ == 2;
}

const DeletedRecord&
get_deleted_record() const {
return deleted_record_;
}

std::pair<std::unique_ptr<IdArray>, std::vector<SegOffset>>
search_ids(const IdArray& id_array, Timestamp timestamp) const override;

Expand Down Expand Up @@ -319,7 +330,7 @@ class SegmentSealedImpl : public SegmentSealed {
InsertRecord<true> insert_record_;

// deleted pks
mutable DeletedRecord deleted_record_;
mutable DeletedRecord<true> deleted_record_;

LoadFieldDataInfo field_data_info_;

Expand Down
70 changes: 0 additions & 70 deletions internal/core/src/segcore/Utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,76 +107,6 @@ std::unique_ptr<DataArray>
MergeDataArray(std::vector<MergeBase>& merge_bases,
const FieldMeta& field_meta);

template <bool is_sealed>
std::shared_ptr<DeletedRecord::TmpBitmap>
get_deleted_bitmap(int64_t del_barrier,
int64_t insert_barrier,
DeletedRecord& delete_record,
const InsertRecord<is_sealed>& insert_record,
Timestamp query_timestamp) {
// if insert_barrier and del_barrier have not changed, use cache data directly
bool hit_cache = false;
int64_t old_del_barrier = 0;
auto current = delete_record.clone_lru_entry(
insert_barrier, del_barrier, old_del_barrier, hit_cache);
if (hit_cache) {
return current;
}

auto bitmap = current->bitmap_ptr;

int64_t start, end;
if (del_barrier < old_del_barrier) {
// in this case, ts of delete record[current_del_barrier : old_del_barrier] > query_timestamp
// so these deletion records do not take effect in query/search
// so bitmap corresponding to those pks in delete record[current_del_barrier:old_del_barrier] will be reset to 0
// for example, current_del_barrier = 2, query_time = 120, the bitmap will be reset to [0, 1, 1, 0, 0, 0, 0, 0]
start = del_barrier;
end = old_del_barrier;
} else {
// the cache is not enough, so update bitmap using new pks in delete record[old_del_barrier:current_del_barrier]
// for example, current_del_barrier = 4, query_time = 300, bitmap will be updated to [0, 1, 1, 0, 1, 1, 0, 0]
start = old_del_barrier;
end = del_barrier;
}

// Avoid invalid calculations when there are a lot of repeated delete pks
std::unordered_map<PkType, Timestamp> delete_timestamps;
for (auto del_index = start; del_index < end; ++del_index) {
auto pk = delete_record.pks()[del_index];
auto timestamp = delete_record.timestamps()[del_index];

delete_timestamps[pk] = timestamp > delete_timestamps[pk]
? timestamp
: delete_timestamps[pk];
}

for (auto& [pk, timestamp] : delete_timestamps) {
auto segOffsets = insert_record.search_pk(pk, insert_barrier);
for (auto offset : segOffsets) {
int64_t insert_row_offset = offset.get();

// The deletion record do not take effect in search/query,
// and reset bitmap to 0
if (timestamp > query_timestamp) {
bitmap->reset(insert_row_offset);
continue;
}
// Insert after delete with same pk, delete will not task effect on this insert record,
// and reset bitmap to 0
if (insert_record.timestamps_[insert_row_offset] >= timestamp) {
bitmap->reset(insert_row_offset);
continue;
}
// insert data corresponding to the insert_row_offset will be ignored in search/query
bitmap->set(insert_row_offset);
}
}

delete_record.insert_lru_entry(current);
return current;
}

std::unique_ptr<DataArray>
ReverseDataFromIndex(const index::IndexBase* index,
const int64_t* seg_offsets,
Expand Down
3 changes: 2 additions & 1 deletion internal/core/unittest/test_c_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1137,6 +1137,7 @@ TEST(CApiTest, InsertSamePkAfterDeleteOnSealedSegment) {
auto segment_interface = reinterpret_cast<SegmentInterface*>(segment);
auto sealed_segment = dynamic_cast<SegmentSealed*>(segment_interface);
SealedLoadFieldData(dataset, *sealed_segment);
sealed_segment->get_insert_record().seal_pks();

// delete data pks = {1, 2, 3}, timestamps = {4, 4, 4}
std::vector<int64_t> delete_row_ids = {1, 2, 3};
Expand Down Expand Up @@ -5271,4 +5272,4 @@ TEST(CApiTest, RANGE_SEARCH_WITH_RADIUS_AND_RANGE_FILTER_WHEN_IP_BFLOAT16) {

TEST(CApiTest, IsLoadWithDisk) {
ASSERT_TRUE(IsLoadWithDisk(INVERTED_INDEX_TYPE, 0));
}
}
Loading
Loading