Skip to content

Commit

Permalink
enhance: refactor delete mvcc function (#38066)
Browse files Browse the repository at this point in the history
Signed-off-by: luzhang <[email protected]>
  • Loading branch information
zhagnlu authored and luzhang committed Jan 14, 2025
1 parent 8da3ec8 commit 971b7d9
Show file tree
Hide file tree
Showing 13 changed files with 778 additions and 307 deletions.
371 changes: 284 additions & 87 deletions internal/core/src/segcore/DeletedRecord.h

Large diffs are not rendered by default.

54 changes: 4 additions & 50 deletions internal/core/src/segcore/SegmentGrowingImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,23 +48,7 @@ void
SegmentGrowingImpl::mask_with_delete(BitsetType& bitset,
int64_t ins_barrier,
Timestamp timestamp) const {
auto del_barrier = get_barrier(get_deleted_record(), timestamp);
if (del_barrier == 0) {
return;
}
auto bitmap_holder = get_deleted_bitmap(
del_barrier, ins_barrier, deleted_record_, insert_record_, timestamp);
if (!bitmap_holder || !bitmap_holder->bitmap_ptr) {
return;
}
auto& delete_bitset = *bitmap_holder->bitmap_ptr;
AssertInfo(
delete_bitset.size() == bitset.size(),
fmt::format(
"Deleted bitmap size:{} not equal to filtered bitmap size:{}",
delete_bitset.size(),
bitset.size()));
bitset |= delete_bitset;
deleted_record_.Query(bitset, ins_barrier, timestamp);
}

void
Expand Down Expand Up @@ -391,7 +375,7 @@ SegmentGrowingImpl::Delete(int64_t reserved_begin,
}

// step 2: fill delete record
deleted_record_.push(sort_pks, sort_timestamps.data());
deleted_record_.StreamPush(sort_pks, sort_timestamps.data());
return SegcoreError::success();
}

Expand All @@ -411,38 +395,8 @@ SegmentGrowingImpl::LoadDeletedRecord(const LoadDeletedRecordInfo& info) {
ParsePksFromIDs(pks, field_meta.get_data_type(), *info.primary_keys);
auto timestamps = reinterpret_cast<const Timestamp*>(info.timestamps);

std::vector<std::tuple<Timestamp, PkType>> ordering(size);
for (int i = 0; i < size; i++) {
ordering[i] = std::make_tuple(timestamps[i], pks[i]);
}

if (!insert_record_.empty_pks()) {
auto end = std::remove_if(
ordering.begin(),
ordering.end(),
[&](const std::tuple<Timestamp, PkType>& record) {
return !insert_record_.contain(std::get<1>(record));
});
size = end - ordering.begin();
ordering.resize(size);
}

// all record filtered
if (size == 0) {
return;
}

std::sort(ordering.begin(), ordering.end());
std::vector<PkType> sort_pks(size);
std::vector<Timestamp> sort_timestamps(size);

for (int i = 0; i < size; i++) {
auto [t, pk] = ordering[i];
sort_timestamps[i] = t;
sort_pks[i] = pk;
}

deleted_record_.push(sort_pks, sort_timestamps.data());
// step 2: push delete info to delete_record
deleted_record_.LoadPush(pks, timestamps);
}

SpanBase
Expand Down
20 changes: 13 additions & 7 deletions internal/core/src/segcore/SegmentGrowingImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,6 @@ class SegmentGrowingImpl : public SegmentGrowing {
return indexing_record_;
}

const DeletedRecord&
get_deleted_record() const {
return deleted_record_;
}

std::shared_mutex&
get_chunk_mutex() const {
return chunk_mutex_;
Expand Down Expand Up @@ -229,7 +224,8 @@ class SegmentGrowingImpl : public SegmentGrowing {
insert_record_(
*schema_, segcore_config.get_chunk_rows(), mmap_descriptor_),
indexing_record_(*schema_, index_meta_, segcore_config_),
id_(segment_id) {
id_(segment_id),
deleted_record_(&insert_record_, this) {
if (mmap_descriptor_ != nullptr) {
LOG_INFO("growing segment {} use mmap to hold raw data",
this->get_segment_id());
Expand Down Expand Up @@ -311,6 +307,16 @@ class SegmentGrowingImpl : public SegmentGrowing {
return false;
}

std::vector<SegOffset>
search_pk(const PkType& pk, Timestamp timestamp) const override {
return insert_record_.search_pk(pk, timestamp);
}

std::vector<SegOffset>
search_pk(const PkType& pk, int64_t insert_barrier) const override {
return insert_record_.search_pk(pk, insert_barrier);
}

protected:
int64_t
num_chunk() const override;
Expand Down Expand Up @@ -356,7 +362,7 @@ class SegmentGrowingImpl : public SegmentGrowing {
mutable std::shared_mutex chunk_mutex_;

// deleted pks
mutable DeletedRecord deleted_record_;
mutable DeletedRecord<false> deleted_record_;

int64_t id_;

Expand Down
7 changes: 6 additions & 1 deletion internal/core/src/segcore/SegmentInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
#include <vector>
#include <index/ScalarIndex.h>

#include "DeletedRecord.h"
#include "FieldIndexing.h"
#include "common/Schema.h"
#include "common/Span.h"
Expand Down Expand Up @@ -351,6 +350,12 @@ class SegmentInternalInterface : public SegmentInterface {
virtual bool
is_mmap_field(FieldId field_id) const = 0;

virtual std::vector<SegOffset>
search_pk(const PkType& pk, Timestamp timestamp) const = 0;

virtual std::vector<SegOffset>
search_pk(const PkType& pk, int64_t insert_barrier) const = 0;

protected:
// internal API: return chunk_data in span
virtual SpanBase
Expand Down
3 changes: 3 additions & 0 deletions internal/core/src/segcore/SegmentSealed.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ class SegmentSealed : public SegmentInternalInterface {
virtual void
WarmupChunkCache(const FieldId field_id, bool mmap_enabled) = 0;

virtual InsertRecord<true>&
get_insert_record() = 0;

SegmentType
type() const override {
return SegmentType::Sealed;
Expand Down
58 changes: 6 additions & 52 deletions internal/core/src/segcore/SegmentSealedImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -654,38 +654,8 @@ SegmentSealedImpl::LoadDeletedRecord(const LoadDeletedRecordInfo& info) {
ParsePksFromIDs(pks, field_meta.get_data_type(), *info.primary_keys);
auto timestamps = reinterpret_cast<const Timestamp*>(info.timestamps);

std::vector<std::tuple<Timestamp, PkType>> ordering(size);
for (int i = 0; i < size; i++) {
ordering[i] = std::make_tuple(timestamps[i], pks[i]);
}

if (!insert_record_.empty_pks()) {
auto end = std::remove_if(
ordering.begin(),
ordering.end(),
[&](const std::tuple<Timestamp, PkType>& record) {
return !insert_record_.contain(std::get<1>(record));
});
size = end - ordering.begin();
ordering.resize(size);
}

// all record filtered
if (size == 0) {
return;
}

std::sort(ordering.begin(), ordering.end());
std::vector<PkType> sort_pks(size);
std::vector<Timestamp> sort_timestamps(size);

for (int i = 0; i < size; i++) {
auto [t, pk] = ordering[i];
sort_timestamps[i] = t;
sort_pks[i] = pk;
}

deleted_record_.push(sort_pks, sort_timestamps.data());
// step 2: push delete info to delete_record
deleted_record_.LoadPush(pks, timestamps);
}

void
Expand Down Expand Up @@ -804,24 +774,7 @@ void
SegmentSealedImpl::mask_with_delete(BitsetType& bitset,
int64_t ins_barrier,
Timestamp timestamp) const {
auto del_barrier = get_barrier(get_deleted_record(), timestamp);
if (del_barrier == 0) {
return;
}

auto bitmap_holder = get_deleted_bitmap(
del_barrier, ins_barrier, deleted_record_, insert_record_, timestamp);
if (!bitmap_holder || !bitmap_holder->bitmap_ptr) {
return;
}
auto& delete_bitset = *bitmap_holder->bitmap_ptr;
AssertInfo(
delete_bitset.size() == bitset.size(),
fmt::format(
"Deleted bitmap size:{} not equal to filtered bitmap size:{}",
delete_bitset.size(),
bitset.size()));
bitset |= delete_bitset;
deleted_record_.Query(bitset, ins_barrier, timestamp);
}

void
Expand Down Expand Up @@ -1125,7 +1078,8 @@ SegmentSealedImpl::SegmentSealedImpl(SchemaPtr schema,
schema_(schema),
id_(segment_id),
col_index_meta_(index_meta),
TEST_skip_index_for_retrieve_(TEST_skip_index_for_retrieve) {
TEST_skip_index_for_retrieve_(TEST_skip_index_for_retrieve),
deleted_record_(&insert_record_, this) {
mmap_descriptor_ = std::shared_ptr<storage::MmapChunkDescriptor>(
new storage::MmapChunkDescriptor({segment_id, SegmentType::Sealed}));
auto mcm = storage::MmapManager::GetInstance().GetMmapChunkManager();
Expand Down Expand Up @@ -1642,7 +1596,7 @@ SegmentSealedImpl::Delete(int64_t reserved_offset, // deprecated
sort_pks[i] = pk;
}

deleted_record_.push(sort_pks, sort_timestamps.data());
deleted_record_.StreamPush(sort_pks, sort_timestamps.data());
return SegcoreError::success();
}

Expand Down
23 changes: 17 additions & 6 deletions internal/core/src/segcore/SegmentSealedImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ class SegmentSealedImpl : public SegmentSealed {
std::unique_ptr<DataArray>
get_vector(FieldId field_id, const int64_t* ids, int64_t count) const;

InsertRecord<true>&
get_insert_record() override {
return insert_record_;
}

public:
int64_t
num_chunk_index(FieldId field_id) const override;
Expand Down Expand Up @@ -162,6 +167,16 @@ class SegmentSealedImpl : public SegmentSealed {
void
ClearData();

std::vector<SegOffset>
search_pk(const PkType& pk, Timestamp timestamp) const override {
return insert_record_.search_pk(pk, timestamp);
}

std::vector<SegOffset>
search_pk(const PkType& pk, int64_t insert_barrier) const override {
return insert_record_.search_pk(pk, insert_barrier);
}

protected:
// blob and row_count
SpanBase
Expand Down Expand Up @@ -250,6 +265,7 @@ class SegmentSealedImpl : public SegmentSealed {
// } else {
num_rows_ = row_count;
// }
deleted_record_.set_sealed_row_count(row_count);
}

void
Expand All @@ -274,11 +290,6 @@ class SegmentSealedImpl : public SegmentSealed {
return system_ready_count_ == 2;
}

const DeletedRecord&
get_deleted_record() const {
return deleted_record_;
}

std::pair<std::unique_ptr<IdArray>, std::vector<SegOffset>>
search_ids(const IdArray& id_array, Timestamp timestamp) const override;

Expand Down Expand Up @@ -319,7 +330,7 @@ class SegmentSealedImpl : public SegmentSealed {
InsertRecord<true> insert_record_;

// deleted pks
mutable DeletedRecord deleted_record_;
mutable DeletedRecord<true> deleted_record_;

LoadFieldDataInfo field_data_info_;

Expand Down
70 changes: 0 additions & 70 deletions internal/core/src/segcore/Utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,76 +107,6 @@ std::unique_ptr<DataArray>
MergeDataArray(std::vector<MergeBase>& merge_bases,
const FieldMeta& field_meta);

template <bool is_sealed>
std::shared_ptr<DeletedRecord::TmpBitmap>
get_deleted_bitmap(int64_t del_barrier,
int64_t insert_barrier,
DeletedRecord& delete_record,
const InsertRecord<is_sealed>& insert_record,
Timestamp query_timestamp) {
// if insert_barrier and del_barrier have not changed, use cache data directly
bool hit_cache = false;
int64_t old_del_barrier = 0;
auto current = delete_record.clone_lru_entry(
insert_barrier, del_barrier, old_del_barrier, hit_cache);
if (hit_cache) {
return current;
}

auto bitmap = current->bitmap_ptr;

int64_t start, end;
if (del_barrier < old_del_barrier) {
// in this case, ts of delete record[current_del_barrier : old_del_barrier] > query_timestamp
// so these deletion records do not take effect in query/search
// so bitmap corresponding to those pks in delete record[current_del_barrier:old_del_barrier] will be reset to 0
// for example, current_del_barrier = 2, query_time = 120, the bitmap will be reset to [0, 1, 1, 0, 0, 0, 0, 0]
start = del_barrier;
end = old_del_barrier;
} else {
// the cache is not enough, so update bitmap using new pks in delete record[old_del_barrier:current_del_barrier]
// for example, current_del_barrier = 4, query_time = 300, bitmap will be updated to [0, 1, 1, 0, 1, 1, 0, 0]
start = old_del_barrier;
end = del_barrier;
}

// Avoid invalid calculations when there are a lot of repeated delete pks
std::unordered_map<PkType, Timestamp> delete_timestamps;
for (auto del_index = start; del_index < end; ++del_index) {
auto pk = delete_record.pks()[del_index];
auto timestamp = delete_record.timestamps()[del_index];

delete_timestamps[pk] = timestamp > delete_timestamps[pk]
? timestamp
: delete_timestamps[pk];
}

for (auto& [pk, timestamp] : delete_timestamps) {
auto segOffsets = insert_record.search_pk(pk, insert_barrier);
for (auto offset : segOffsets) {
int64_t insert_row_offset = offset.get();

// The deletion record do not take effect in search/query,
// and reset bitmap to 0
if (timestamp > query_timestamp) {
bitmap->reset(insert_row_offset);
continue;
}
// Insert after delete with same pk, delete will not task effect on this insert record,
// and reset bitmap to 0
if (insert_record.timestamps_[insert_row_offset] >= timestamp) {
bitmap->reset(insert_row_offset);
continue;
}
// insert data corresponding to the insert_row_offset will be ignored in search/query
bitmap->set(insert_row_offset);
}
}

delete_record.insert_lru_entry(current);
return current;
}

std::unique_ptr<DataArray>
ReverseDataFromIndex(const index::IndexBase* index,
const int64_t* seg_offsets,
Expand Down
3 changes: 2 additions & 1 deletion internal/core/unittest/test_c_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1137,6 +1137,7 @@ TEST(CApiTest, InsertSamePkAfterDeleteOnSealedSegment) {
auto segment_interface = reinterpret_cast<SegmentInterface*>(segment);
auto sealed_segment = dynamic_cast<SegmentSealed*>(segment_interface);
SealedLoadFieldData(dataset, *sealed_segment);
sealed_segment->get_insert_record().seal_pks();

// delete data pks = {1, 2, 3}, timestamps = {4, 4, 4}
std::vector<int64_t> delete_row_ids = {1, 2, 3};
Expand Down Expand Up @@ -5271,4 +5272,4 @@ TEST(CApiTest, RANGE_SEARCH_WITH_RADIUS_AND_RANGE_FILTER_WHEN_IP_BFLOAT16) {

TEST(CApiTest, IsLoadWithDisk) {
ASSERT_TRUE(IsLoadWithDisk(INVERTED_INDEX_TYPE, 0));
}
}
Loading

0 comments on commit 971b7d9

Please sign in to comment.