diff --git a/internal/core/src/segcore/DeletedRecord.h b/internal/core/src/segcore/DeletedRecord.h index f2f0e2d8a0d0e..28ef9ef91aacf 100644 --- a/internal/core/src/segcore/DeletedRecord.h +++ b/internal/core/src/segcore/DeletedRecord.h @@ -17,111 +17,296 @@ #include #include #include +#include #include "AckResponder.h" #include "common/Schema.h" #include "common/Types.h" #include "segcore/Record.h" +#include "segcore/InsertRecord.h" +#include "segcore/SegmentInterface.h" #include "ConcurrentVector.h" namespace milvus::segcore { -struct DeletedRecord { - struct TmpBitmap { - // Just for query - int64_t del_barrier = 0; - BitsetTypePtr bitmap_ptr; - - std::shared_ptr - clone(int64_t capacity); - }; - static constexpr int64_t deprecated_size_per_chunk = 32 * 1024; - DeletedRecord() - : lru_(std::make_shared()), - timestamps_(deprecated_size_per_chunk), - pks_(deprecated_size_per_chunk) { - lru_->bitmap_ptr = std::make_shared(); +using Offset = int32_t; + +struct Comparator { + bool + operator()(const std::pair& left, + const std::pair& right) const { + if (left.first == right.first) { + return left.second < right.second; + } + return left.first < right.first; + } +}; + +// a lock-free list for multi-thread insert && read +using SortedDeleteList = + folly::ConcurrentSkipList, Comparator>; + +static int32_t DUMP_BATCH_SIZE = 100000; +static int32_t DELETE_PAIR_SIZE = sizeof(std::pair); + +template +class DeletedRecord { + public: + DeletedRecord(InsertRecord* insert_record, + SegmentInternalInterface* segment) + : insert_record_(insert_record), + segment_(segment), + deleted_lists_(SortedDeleteList::createInstance()) { + } + + // not binding segment, only for testing purposes + DeletedRecord(InsertRecord* insert_record) + : insert_record_(insert_record), + segment_(nullptr), + deleted_lists_(SortedDeleteList::createInstance()) { } - auto - get_lru_entry() { - std::shared_lock lck(shared_mutex_); - return lru_; + ~DeletedRecord() { } - std::shared_ptr - clone_lru_entry(int64_t insert_barrier, - int64_t del_barrier, - int64_t& old_del_barrier, - bool& hit_cache) { - std::shared_lock lck(shared_mutex_); - auto res = lru_->clone(insert_barrier); - old_del_barrier = lru_->del_barrier; - - if (lru_->bitmap_ptr->size() == insert_barrier && - lru_->del_barrier == del_barrier) { - hit_cache = true; - } else { - res->del_barrier = del_barrier; + DeletedRecord(DeletedRecord&& delete_record) = delete; + + DeletedRecord& + operator=(DeletedRecord&& delete_record) = delete; + + void + LoadPush(const std::vector& pks, const Timestamp* timestamps) { + if (pks.empty()) { + return; + } + + auto max_deleted_ts = InternalPush(pks, timestamps); + + if (max_deleted_ts > max_load_timestamp_) { + max_load_timestamp_ = max_deleted_ts; } - return res; + //TODO: add support for dump snapshot when load finished } + // stream push delete timestamps should be sorted outside of the interface + // considering concurrent query and push void - insert_lru_entry(std::shared_ptr new_entry, bool force = false) { - std::lock_guard lck(shared_mutex_); - if (new_entry->del_barrier <= lru_->del_barrier) { - if (!force || - new_entry->bitmap_ptr->size() <= lru_->bitmap_ptr->size()) { - // DO NOTHING - return; + StreamPush(const std::vector& pks, const Timestamp* timestamps) { + if (pks.empty()) { + return; + } + + InternalPush(pks, timestamps); + + bool can_dump = timestamps[0] >= max_load_timestamp_; + if (can_dump) { + DumpSnapshot(); + } + } + + Timestamp + InternalPush(const std::vector& pks, const Timestamp* timestamps) { + int64_t removed_num = 0; + int64_t mem_add = 0; + Timestamp max_timestamp = 0; + + SortedDeleteList::Accessor accessor(deleted_lists_); + for (size_t i = 0; i < pks.size(); ++i) { + auto deleted_pk = pks[i]; + auto deleted_ts = timestamps[i]; + if (deleted_ts > max_timestamp) { + max_timestamp = deleted_ts; + } + std::vector offsets; + if (segment_) { + offsets = + std::move(segment_->search_pk(deleted_pk, deleted_ts)); + } else { + // only for testing + offsets = std::move( + insert_record_->search_pk(deleted_pk, deleted_ts)); + } + for (auto& offset : offsets) { + auto row_id = offset.get(); + // if alreay deleted, no need to add new record + if (deleted_mask_.size() > row_id && deleted_mask_[row_id]) { + continue; + } + // if insert record and delete record is same timestamp, + // delete not take effect on this record. + if (deleted_ts == insert_record_->timestamps_[row_id]) { + continue; + } + accessor.insert(std::make_pair(deleted_ts, row_id)); + if constexpr (is_sealed) { + Assert(deleted_mask_.size() > 0); + deleted_mask_.set(row_id); + } else { + // need to add mask size firstly for growing segment + deleted_mask_.resize(insert_record_->size()); + deleted_mask_.set(row_id); + } + removed_num++; + mem_add += DELETE_PAIR_SIZE; } } - lru_ = std::move(new_entry); + + n_.fetch_add(removed_num); + mem_size_.fetch_add(mem_add); + return max_timestamp; } void - push(const std::vector& pks, const Timestamp* timestamps) { - std::lock_guard lck(buffer_mutex_); - - auto size = pks.size(); - ssize_t divide_point = 0; - auto n = n_.load(); - // Truncate the overlapping prefix - if (n > 0) { - auto last = timestamps_[n - 1]; - divide_point = - std::lower_bound(timestamps, timestamps + size, last + 1) - - timestamps; + Query(BitsetType& bitset, + int64_t insert_barrier, + Timestamp query_timestamp) { + Assert(bitset.size() == insert_barrier); + + SortedDeleteList::Accessor accessor(deleted_lists_); + if (accessor.size() == 0) { + return; } - // All these delete records have been applied - if (divide_point == size) { + // try use snapshot to skip iterations + bool hit_snapshot = false; + SortedDeleteList::iterator next_iter; + if (!snapshots_.empty()) { + int loc = snapshots_.size() - 1; + // find last meeted snapshot + { + std::shared_lock lock(snap_lock_); + while (snapshots_[loc].first > query_timestamp && loc >= 0) { + loc--; + } + if (loc >= 0) { + next_iter = snap_next_iter_[loc]; + auto or_size = + std::min(snapshots_[loc].second.size(), bitset.size()); + bitset.inplace_or_with_count(snapshots_[loc].second, + or_size); + hit_snapshot = true; + } + } + } + + auto start_iter = hit_snapshot ? next_iter : accessor.begin(); + auto end_iter = + accessor.lower_bound(std::make_pair(query_timestamp, 0)); + + auto it = start_iter; + + // when end_iter point to skiplist end, concurrent delete may append new value + // after lower_bound() called, so end_iter is not logical valid. + if (end_iter == accessor.end()) { + while (it != accessor.end() && it->first <= query_timestamp) { + if (it->second < insert_barrier) { + bitset.set(it->second); + } + it++; + } return; } - size -= divide_point; - pks_.set_data_raw(n, pks.data() + divide_point, size); - timestamps_.set_data_raw(n, timestamps + divide_point, size); - n_ += size; - mem_size_ += sizeof(Timestamp) * size + - CalcPksSize(pks.data() + divide_point, size); + while (it != accessor.end() && it != end_iter) { + if (it->second < insert_barrier) { + bitset.set(it->second); + } + it++; + } + while (it != accessor.end() && it->first == query_timestamp) { + if (it->second < insert_barrier) { + bitset.set(it->second); + } + it++; + } } - const ConcurrentVector& - timestamps() const { - return timestamps_; + size_t + GetSnapshotBitsSize() const { + auto all_dump_bits = 0; + auto next_dump_ts = 0; + std::shared_lock lock(snap_lock_); + int loc = snapshots_.size() - 1; + while (loc >= 0) { + if (next_dump_ts != snapshots_[loc].first) { + all_dump_bits += snapshots_[loc].second.size(); + } + loc--; + } + return all_dump_bits; } - const ConcurrentVector& - pks() const { - return pks_; + void + DumpSnapshot() { + SortedDeleteList::Accessor accessor(deleted_lists_); + int total_size = accessor.size(); + int dumped_size = snapshots_.empty() ? 0 : GetSnapshotBitsSize(); + + while (total_size - dumped_size > DUMP_BATCH_SIZE) { + int32_t bitsize = 0; + if constexpr (is_sealed) { + bitsize = sealed_row_count_; + } else { + bitsize = insert_record_->size(); + } + BitsetType bitmap(bitsize, false); + + auto it = accessor.begin(); + Timestamp last_dump_ts = 0; + if (!snapshots_.empty()) { + it = snap_next_iter_.back(); + last_dump_ts = snapshots_.back().first; + bitmap.inplace_or_with_count(snapshots_.back().second, + snapshots_.back().second.size()); + } + + while (total_size - dumped_size > DUMP_BATCH_SIZE && + it != accessor.end()) { + Timestamp dump_ts = 0; + + for (auto size = 0; size < DUMP_BATCH_SIZE; ++it, ++size) { + bitmap.set(it->second); + if (size == DUMP_BATCH_SIZE - 1) { + dump_ts = it->first; + } + } + + { + std::unique_lock lock(snap_lock_); + if (dump_ts == last_dump_ts) { + // only update + snapshots_.back().second = std::move(bitmap.clone()); + snap_next_iter_.back() = it; + } else { + // add new snapshot + snapshots_.push_back( + std::make_pair(dump_ts, std::move(bitmap.clone()))); + Assert(it != accessor.end() && it.good()); + snap_next_iter_.push_back(it); + } + + LOG_INFO( + "dump delete record snapshot at ts: {}, cursor: {}, " + "total size:{} " + "current snapshot size: {} for segment: {}", + dump_ts, + dumped_size + DUMP_BATCH_SIZE, + total_size, + snapshots_.size(), + segment_ ? segment_->get_segment_id() : 0); + last_dump_ts = dump_ts; + } + + dumped_size += DUMP_BATCH_SIZE; + } + } } int64_t size() const { - return n_.load(); + SortedDeleteList::Accessor accessor(deleted_lists_); + return accessor.size(); } size_t @@ -129,27 +314,39 @@ struct DeletedRecord { return mem_size_.load(); } - private: - std::shared_ptr lru_; - std::shared_mutex shared_mutex_; + void + set_sealed_row_count(size_t row_count) { + sealed_row_count_ = row_count; + deleted_mask_.resize(row_count); + } - std::shared_mutex buffer_mutex_; + std::vector> + get_snapshots() const { + std::shared_lock lock(snap_lock_); + std::vector> snapshots; + for (const auto& snap : snapshots_) { + snapshots.emplace_back(snap.first, snap.second.clone()); + } + return std::move(snapshots); + } + + public: std::atomic n_ = 0; std::atomic mem_size_ = 0; - ConcurrentVector timestamps_; - ConcurrentVector pks_; -}; + InsertRecord* insert_record_; + SegmentInternalInterface* segment_; + std::shared_ptr deleted_lists_; + // max timestamp of deleted records which replayed in load process + Timestamp max_load_timestamp_{0}; + int32_t sealed_row_count_; + // used to remove duplicated deleted records for fast access + BitsetType deleted_mask_; -inline auto -DeletedRecord::TmpBitmap::clone(int64_t capacity) - -> std::shared_ptr { - auto res = std::make_shared(); - res->del_barrier = this->del_barrier; - // res->bitmap_ptr = std::make_shared(); - // *(res->bitmap_ptr) = *(this->bitmap_ptr); - res->bitmap_ptr = std::make_shared(this->bitmap_ptr->clone()); - res->bitmap_ptr->resize(capacity, false); - return res; -} + // dump snapshot low frequency + mutable std::shared_mutex snap_lock_; + std::vector> snapshots_; + // next delete record iterator that follows every snapshot + std::vector snap_next_iter_; +}; } // namespace milvus::segcore diff --git a/internal/core/src/segcore/SegmentGrowingImpl.cpp b/internal/core/src/segcore/SegmentGrowingImpl.cpp index 19c2e95fe8dd6..d93b97f2ca820 100644 --- a/internal/core/src/segcore/SegmentGrowingImpl.cpp +++ b/internal/core/src/segcore/SegmentGrowingImpl.cpp @@ -48,23 +48,7 @@ void SegmentGrowingImpl::mask_with_delete(BitsetType& bitset, int64_t ins_barrier, Timestamp timestamp) const { - auto del_barrier = get_barrier(get_deleted_record(), timestamp); - if (del_barrier == 0) { - return; - } - auto bitmap_holder = get_deleted_bitmap( - del_barrier, ins_barrier, deleted_record_, insert_record_, timestamp); - if (!bitmap_holder || !bitmap_holder->bitmap_ptr) { - return; - } - auto& delete_bitset = *bitmap_holder->bitmap_ptr; - AssertInfo( - delete_bitset.size() == bitset.size(), - fmt::format( - "Deleted bitmap size:{} not equal to filtered bitmap size:{}", - delete_bitset.size(), - bitset.size())); - bitset |= delete_bitset; + deleted_record_.Query(bitset, ins_barrier, timestamp); } void @@ -391,7 +375,7 @@ SegmentGrowingImpl::Delete(int64_t reserved_begin, } // step 2: fill delete record - deleted_record_.push(sort_pks, sort_timestamps.data()); + deleted_record_.StreamPush(sort_pks, sort_timestamps.data()); return SegcoreError::success(); } @@ -411,38 +395,8 @@ SegmentGrowingImpl::LoadDeletedRecord(const LoadDeletedRecordInfo& info) { ParsePksFromIDs(pks, field_meta.get_data_type(), *info.primary_keys); auto timestamps = reinterpret_cast(info.timestamps); - std::vector> ordering(size); - for (int i = 0; i < size; i++) { - ordering[i] = std::make_tuple(timestamps[i], pks[i]); - } - - if (!insert_record_.empty_pks()) { - auto end = std::remove_if( - ordering.begin(), - ordering.end(), - [&](const std::tuple& record) { - return !insert_record_.contain(std::get<1>(record)); - }); - size = end - ordering.begin(); - ordering.resize(size); - } - - // all record filtered - if (size == 0) { - return; - } - - std::sort(ordering.begin(), ordering.end()); - std::vector sort_pks(size); - std::vector sort_timestamps(size); - - for (int i = 0; i < size; i++) { - auto [t, pk] = ordering[i]; - sort_timestamps[i] = t; - sort_pks[i] = pk; - } - - deleted_record_.push(sort_pks, sort_timestamps.data()); + // step 2: push delete info to delete_record + deleted_record_.LoadPush(pks, timestamps); } SpanBase diff --git a/internal/core/src/segcore/SegmentGrowingImpl.h b/internal/core/src/segcore/SegmentGrowingImpl.h index 6ccbe8d3637ef..db98f2be011bf 100644 --- a/internal/core/src/segcore/SegmentGrowingImpl.h +++ b/internal/core/src/segcore/SegmentGrowingImpl.h @@ -86,11 +86,6 @@ class SegmentGrowingImpl : public SegmentGrowing { return indexing_record_; } - const DeletedRecord& - get_deleted_record() const { - return deleted_record_; - } - std::shared_mutex& get_chunk_mutex() const { return chunk_mutex_; @@ -229,7 +224,8 @@ class SegmentGrowingImpl : public SegmentGrowing { insert_record_( *schema_, segcore_config.get_chunk_rows(), mmap_descriptor_), indexing_record_(*schema_, index_meta_, segcore_config_), - id_(segment_id) { + id_(segment_id), + deleted_record_(&insert_record_, this) { if (mmap_descriptor_ != nullptr) { LOG_INFO("growing segment {} use mmap to hold raw data", this->get_segment_id()); @@ -311,6 +307,16 @@ class SegmentGrowingImpl : public SegmentGrowing { return false; } + std::vector + search_pk(const PkType& pk, Timestamp timestamp) const override { + return insert_record_.search_pk(pk, timestamp); + } + + std::vector + search_pk(const PkType& pk, int64_t insert_barrier) const override { + return insert_record_.search_pk(pk, insert_barrier); + } + protected: int64_t num_chunk() const override; @@ -356,7 +362,7 @@ class SegmentGrowingImpl : public SegmentGrowing { mutable std::shared_mutex chunk_mutex_; // deleted pks - mutable DeletedRecord deleted_record_; + mutable DeletedRecord deleted_record_; int64_t id_; diff --git a/internal/core/src/segcore/SegmentInterface.h b/internal/core/src/segcore/SegmentInterface.h index f0b3347c3e949..354199e59d44a 100644 --- a/internal/core/src/segcore/SegmentInterface.h +++ b/internal/core/src/segcore/SegmentInterface.h @@ -19,7 +19,6 @@ #include #include -#include "DeletedRecord.h" #include "FieldIndexing.h" #include "common/Schema.h" #include "common/Span.h" @@ -351,6 +350,12 @@ class SegmentInternalInterface : public SegmentInterface { virtual bool is_mmap_field(FieldId field_id) const = 0; + virtual std::vector + search_pk(const PkType& pk, Timestamp timestamp) const = 0; + + virtual std::vector + search_pk(const PkType& pk, int64_t insert_barrier) const = 0; + protected: // internal API: return chunk_data in span virtual SpanBase diff --git a/internal/core/src/segcore/SegmentSealed.h b/internal/core/src/segcore/SegmentSealed.h index 2df9833680364..1ef966d528b5b 100644 --- a/internal/core/src/segcore/SegmentSealed.h +++ b/internal/core/src/segcore/SegmentSealed.h @@ -43,6 +43,9 @@ class SegmentSealed : public SegmentInternalInterface { virtual void WarmupChunkCache(const FieldId field_id, bool mmap_enabled) = 0; + virtual InsertRecord& + get_insert_record() = 0; + SegmentType type() const override { return SegmentType::Sealed; diff --git a/internal/core/src/segcore/SegmentSealedImpl.cpp b/internal/core/src/segcore/SegmentSealedImpl.cpp index 04e0d448a8330..8c8673d20ecc9 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.cpp +++ b/internal/core/src/segcore/SegmentSealedImpl.cpp @@ -654,38 +654,8 @@ SegmentSealedImpl::LoadDeletedRecord(const LoadDeletedRecordInfo& info) { ParsePksFromIDs(pks, field_meta.get_data_type(), *info.primary_keys); auto timestamps = reinterpret_cast(info.timestamps); - std::vector> ordering(size); - for (int i = 0; i < size; i++) { - ordering[i] = std::make_tuple(timestamps[i], pks[i]); - } - - if (!insert_record_.empty_pks()) { - auto end = std::remove_if( - ordering.begin(), - ordering.end(), - [&](const std::tuple& record) { - return !insert_record_.contain(std::get<1>(record)); - }); - size = end - ordering.begin(); - ordering.resize(size); - } - - // all record filtered - if (size == 0) { - return; - } - - std::sort(ordering.begin(), ordering.end()); - std::vector sort_pks(size); - std::vector sort_timestamps(size); - - for (int i = 0; i < size; i++) { - auto [t, pk] = ordering[i]; - sort_timestamps[i] = t; - sort_pks[i] = pk; - } - - deleted_record_.push(sort_pks, sort_timestamps.data()); + // step 2: push delete info to delete_record + deleted_record_.LoadPush(pks, timestamps); } void @@ -804,24 +774,7 @@ void SegmentSealedImpl::mask_with_delete(BitsetType& bitset, int64_t ins_barrier, Timestamp timestamp) const { - auto del_barrier = get_barrier(get_deleted_record(), timestamp); - if (del_barrier == 0) { - return; - } - - auto bitmap_holder = get_deleted_bitmap( - del_barrier, ins_barrier, deleted_record_, insert_record_, timestamp); - if (!bitmap_holder || !bitmap_holder->bitmap_ptr) { - return; - } - auto& delete_bitset = *bitmap_holder->bitmap_ptr; - AssertInfo( - delete_bitset.size() == bitset.size(), - fmt::format( - "Deleted bitmap size:{} not equal to filtered bitmap size:{}", - delete_bitset.size(), - bitset.size())); - bitset |= delete_bitset; + deleted_record_.Query(bitset, ins_barrier, timestamp); } void @@ -1125,7 +1078,8 @@ SegmentSealedImpl::SegmentSealedImpl(SchemaPtr schema, schema_(schema), id_(segment_id), col_index_meta_(index_meta), - TEST_skip_index_for_retrieve_(TEST_skip_index_for_retrieve) { + TEST_skip_index_for_retrieve_(TEST_skip_index_for_retrieve), + deleted_record_(&insert_record_, this) { mmap_descriptor_ = std::shared_ptr( new storage::MmapChunkDescriptor({segment_id, SegmentType::Sealed})); auto mcm = storage::MmapManager::GetInstance().GetMmapChunkManager(); @@ -1642,7 +1596,7 @@ SegmentSealedImpl::Delete(int64_t reserved_offset, // deprecated sort_pks[i] = pk; } - deleted_record_.push(sort_pks, sort_timestamps.data()); + deleted_record_.StreamPush(sort_pks, sort_timestamps.data()); return SegcoreError::success(); } diff --git a/internal/core/src/segcore/SegmentSealedImpl.h b/internal/core/src/segcore/SegmentSealedImpl.h index 49ef7c15ffe41..e206a59a52f60 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.h +++ b/internal/core/src/segcore/SegmentSealedImpl.h @@ -110,6 +110,11 @@ class SegmentSealedImpl : public SegmentSealed { std::unique_ptr get_vector(FieldId field_id, const int64_t* ids, int64_t count) const; + InsertRecord& + get_insert_record() override { + return insert_record_; + } + public: int64_t num_chunk_index(FieldId field_id) const override; @@ -162,6 +167,16 @@ class SegmentSealedImpl : public SegmentSealed { void ClearData(); + std::vector + search_pk(const PkType& pk, Timestamp timestamp) const override { + return insert_record_.search_pk(pk, timestamp); + } + + std::vector + search_pk(const PkType& pk, int64_t insert_barrier) const override { + return insert_record_.search_pk(pk, insert_barrier); + } + protected: // blob and row_count SpanBase @@ -250,6 +265,7 @@ class SegmentSealedImpl : public SegmentSealed { // } else { num_rows_ = row_count; // } + deleted_record_.set_sealed_row_count(row_count); } void @@ -274,11 +290,6 @@ class SegmentSealedImpl : public SegmentSealed { return system_ready_count_ == 2; } - const DeletedRecord& - get_deleted_record() const { - return deleted_record_; - } - std::pair, std::vector> search_ids(const IdArray& id_array, Timestamp timestamp) const override; @@ -319,7 +330,7 @@ class SegmentSealedImpl : public SegmentSealed { InsertRecord insert_record_; // deleted pks - mutable DeletedRecord deleted_record_; + mutable DeletedRecord deleted_record_; LoadFieldDataInfo field_data_info_; diff --git a/internal/core/src/segcore/Utils.h b/internal/core/src/segcore/Utils.h index dee98b668d1cd..11bc53079838e 100644 --- a/internal/core/src/segcore/Utils.h +++ b/internal/core/src/segcore/Utils.h @@ -107,76 +107,6 @@ std::unique_ptr MergeDataArray(std::vector& merge_bases, const FieldMeta& field_meta); -template -std::shared_ptr -get_deleted_bitmap(int64_t del_barrier, - int64_t insert_barrier, - DeletedRecord& delete_record, - const InsertRecord& insert_record, - Timestamp query_timestamp) { - // if insert_barrier and del_barrier have not changed, use cache data directly - bool hit_cache = false; - int64_t old_del_barrier = 0; - auto current = delete_record.clone_lru_entry( - insert_barrier, del_barrier, old_del_barrier, hit_cache); - if (hit_cache) { - return current; - } - - auto bitmap = current->bitmap_ptr; - - int64_t start, end; - if (del_barrier < old_del_barrier) { - // in this case, ts of delete record[current_del_barrier : old_del_barrier] > query_timestamp - // so these deletion records do not take effect in query/search - // so bitmap corresponding to those pks in delete record[current_del_barrier:old_del_barrier] will be reset to 0 - // for example, current_del_barrier = 2, query_time = 120, the bitmap will be reset to [0, 1, 1, 0, 0, 0, 0, 0] - start = del_barrier; - end = old_del_barrier; - } else { - // the cache is not enough, so update bitmap using new pks in delete record[old_del_barrier:current_del_barrier] - // for example, current_del_barrier = 4, query_time = 300, bitmap will be updated to [0, 1, 1, 0, 1, 1, 0, 0] - start = old_del_barrier; - end = del_barrier; - } - - // Avoid invalid calculations when there are a lot of repeated delete pks - std::unordered_map delete_timestamps; - for (auto del_index = start; del_index < end; ++del_index) { - auto pk = delete_record.pks()[del_index]; - auto timestamp = delete_record.timestamps()[del_index]; - - delete_timestamps[pk] = timestamp > delete_timestamps[pk] - ? timestamp - : delete_timestamps[pk]; - } - - for (auto& [pk, timestamp] : delete_timestamps) { - auto segOffsets = insert_record.search_pk(pk, insert_barrier); - for (auto offset : segOffsets) { - int64_t insert_row_offset = offset.get(); - - // The deletion record do not take effect in search/query, - // and reset bitmap to 0 - if (timestamp > query_timestamp) { - bitmap->reset(insert_row_offset); - continue; - } - // Insert after delete with same pk, delete will not task effect on this insert record, - // and reset bitmap to 0 - if (insert_record.timestamps_[insert_row_offset] >= timestamp) { - bitmap->reset(insert_row_offset); - continue; - } - // insert data corresponding to the insert_row_offset will be ignored in search/query - bitmap->set(insert_row_offset); - } - } - - delete_record.insert_lru_entry(current); - return current; -} - std::unique_ptr ReverseDataFromIndex(const index::IndexBase* index, const int64_t* seg_offsets, diff --git a/internal/core/unittest/test_c_api.cpp b/internal/core/unittest/test_c_api.cpp index 834c55d996903..268ba4eac9cc0 100644 --- a/internal/core/unittest/test_c_api.cpp +++ b/internal/core/unittest/test_c_api.cpp @@ -1137,6 +1137,7 @@ TEST(CApiTest, InsertSamePkAfterDeleteOnSealedSegment) { auto segment_interface = reinterpret_cast(segment); auto sealed_segment = dynamic_cast(segment_interface); SealedLoadFieldData(dataset, *sealed_segment); + sealed_segment->get_insert_record().seal_pks(); // delete data pks = {1, 2, 3}, timestamps = {4, 4, 4} std::vector delete_row_ids = {1, 2, 3}; @@ -5271,4 +5272,4 @@ TEST(CApiTest, RANGE_SEARCH_WITH_RADIUS_AND_RANGE_FILTER_WHEN_IP_BFLOAT16) { TEST(CApiTest, IsLoadWithDisk) { ASSERT_TRUE(IsLoadWithDisk(INVERTED_INDEX_TYPE, 0)); -} \ No newline at end of file +} diff --git a/internal/core/unittest/test_delete_record.cpp b/internal/core/unittest/test_delete_record.cpp new file mode 100644 index 0000000000000..693e66b7ac32d --- /dev/null +++ b/internal/core/unittest/test_delete_record.cpp @@ -0,0 +1,434 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "segcore/DeletedRecord.h" +#include "segcore/SegmentGrowingImpl.h" +#include "segcore/SegmentSealedImpl.h" +#include "segcore/SegmentGrowingImpl.h" +#include "test_utils/DataGen.h" + +using namespace milvus; +using namespace milvus::segcore; + +TEST(DeleteMVCC, common_case) { + auto schema = std::make_shared(); + auto pk = schema->AddDebugField("pk", DataType::INT64); + schema->set_primary_field_id(pk); + auto segment = CreateSealedSegment(schema); + ASSERT_EQ(0, segment->get_real_count()); + + // load insert: pk (0, 1, 2, 3, 4, 5, 6, 7, 8, 9) + // with timestamp ts (0, 1, 2, 3, 4, 5, 6, 7, 8, 9) + int64_t c = 10; + auto dataset = DataGen(schema, c); + auto pks = dataset.get_col(pk); + SealedLoadFieldData(dataset, *segment); + ASSERT_EQ(c, segment->get_real_count()); + auto& insert_record = segment->get_insert_record(); + DeletedRecord delete_record(&insert_record); + delete_record.set_sealed_row_count(c); + + // delete pk(1) at ts(10); + std::vector delete_ts = {10}; + std::vector delete_pk = {1}; + delete_record.StreamPush(delete_pk, delete_ts.data()); + ASSERT_EQ(1, delete_record.size()); + + { + BitsetType bitsets(c); + auto insert_barrier = c; + // query at ts (10) + Timestamp query_timestamp = 10; + delete_record.Query(bitsets, insert_barrier, query_timestamp); + std::vector expected = {0, 1, 0, 0, 0, 0, 0, 0, 0, 0}; + for (int i = 0; i < c; i++) { + ASSERT_EQ(bitsets[i], expected[i]); + } + } + { + BitsetType bitsets(c); + auto insert_barrier = c; + // query at ts (11) + Timestamp query_timestamp = 11; + // query at ts (11) + query_timestamp = 11; + delete_record.Query(bitsets, insert_barrier, query_timestamp); + std::vector expected = {0, 1, 0, 0, 0, 0, 0, 0, 0, 0}; + for (int i = 0; i < c; i++) { + ASSERT_EQ(bitsets[i], expected[i]); + } + } + + // delete pk(5) at ts(12) + delete_ts = {12}; + delete_pk = {5}; + delete_record.StreamPush(delete_pk, delete_ts.data()); + ASSERT_EQ(2, delete_record.size()); + + { + BitsetType bitsets(c); + auto insert_barrier = c; + // query at ts (12) + Timestamp query_timestamp = 12; + delete_record.Query(bitsets, insert_barrier, query_timestamp); + std::vector expected = {0, 1, 0, 0, 0, 1, 0, 0, 0, 0}; + for (int i = 0; i < c; i++) { + ASSERT_EQ(bitsets[i], expected[i]); + } + } + + // delete at pk(1) at ts(13) again + delete_ts = {13}; + delete_pk = {1}; + delete_record.StreamPush(delete_pk, delete_ts.data()); + // not add new record, because already deleted. + ASSERT_EQ(2, delete_record.size()); + + { + BitsetType bitsets(c); + auto insert_barrier = c; + // query at ts (14) + Timestamp query_timestamp = 14; + + delete_record.Query(bitsets, insert_barrier, query_timestamp); + std::vector expected = {0, 1, 0, 0, 0, 1, 0, 0, 0, 0}; + for (int i = 0; i < c; i++) { + ASSERT_EQ(bitsets[i], expected[i]); + } + } + + // delete pk(9) at ts(9) + delete_ts = {9}; + delete_pk = {9}; + delete_record.StreamPush(delete_pk, delete_ts.data()); + // not add new record, because insert also at ts(9) same as deleted + // delete not take effect. + ASSERT_EQ(2, delete_record.size()); + + { + BitsetType bitsets(c); + auto insert_barrier = c; + // query at ts (14) + Timestamp query_timestamp = 14; + delete_record.Query(bitsets, insert_barrier, query_timestamp); + std::vector expected = {0, 1, 0, 0, 0, 1, 0, 0, 0, 0}; + for (int i = 0; i < c; i++) { + ASSERT_EQ(bitsets[i], expected[i]); + } + } +} + +TEST(DeleteMVCC, delete_exist_duplicate_pks) { + using namespace milvus; + using namespace milvus::query; + using namespace milvus::segcore; + + auto schema = std::make_shared(); + auto vec_fid = schema->AddDebugField( + "fakevec", DataType::VECTOR_FLOAT, 16, knowhere::metric::L2); + auto i64_fid = schema->AddDebugField("age", DataType::INT64); + schema->set_primary_field_id(i64_fid); + auto N = 10; + uint64_t seg_id = 101; + InsertRecord insert_record(*schema, N); + DeletedRecord delete_record(&insert_record); + + // insert pk: (0, 1, 1, 2, 2, 3, 4, 3, 2, 5) + // at ts: (0, 1, 2, 3, 4, 5, 6, 7, 8, 9) + std::vector age_data = {0, 1, 1, 2, 2, 3, 4, 3, 2, 5}; + std::vector tss(N); + for (int i = 0; i < N; ++i) { + tss[i] = i; + insert_record.insert_pk(age_data[i], i); + } + auto insert_offset = insert_record.reserved.fetch_add(N); + insert_record.timestamps_.set_data_raw(insert_offset, tss.data(), N); + auto field_data = insert_record.get_data_base(i64_fid); + field_data->set_data_raw(insert_offset, age_data.data(), N); + insert_record.ack_responder_.AddSegment(insert_offset, insert_offset + N); + + // delete pk(2) at ts(5) + std::vector delete_ts = {5}; + std::vector delete_pk = {2}; + delete_record.StreamPush(delete_pk, delete_ts.data()); + ASSERT_EQ(2, delete_record.size()); + + { + BitsetType bitsets(N); + int64_t insert_barrier = N; + // query at ts (10) + Timestamp query_timestamp = 10; + delete_record.Query(bitsets, insert_barrier, query_timestamp); + std::vector expected = {0, 0, 0, 1, 1, 0, 0, 0, 0, 0}; + // two pk 2 at ts(3, 4) was deleted + for (int i = 0; i < N; i++) { + ASSERT_EQ(bitsets[i], expected[i]); + } + } + + // delete pk(3) at ts(6) + delete_ts = {6}; + delete_pk = {3}; + delete_record.StreamPush(delete_pk, delete_ts.data()); + ASSERT_EQ(3, delete_record.size()); + + { + BitsetType bitsets(N); + int64_t insert_barrier = N; + // query at ts (10) + Timestamp query_timestamp = 10; + delete_record.Query(bitsets, insert_barrier, query_timestamp); + std::vector expected = {0, 0, 0, 1, 1, 1, 0, 0, 0, 0}; + // one pk 3 in ts(5) was deleted + for (int i = 0; i < N; i++) { + ASSERT_EQ(bitsets[i], expected[i]); + } + } + + // delete pk(3) at ts(9) again + delete_ts = {9}; + delete_pk = {3}; + delete_record.StreamPush(delete_pk, delete_ts.data()); + ASSERT_EQ(4, delete_record.size()); + + { + BitsetType bitsets(N); + int64_t insert_barrier = N; + // query at ts (10) + Timestamp query_timestamp = 10; + delete_record.Query(bitsets, insert_barrier, query_timestamp); + std::vector expected = {0, 0, 0, 1, 1, 1, 0, 1, 0, 0}; + // pk 3 in ts(7) was deleted + for (int i = 0; i < N; i++) { + ASSERT_EQ(bitsets[i], expected[i]); + } + } + + // delete pk(2) at ts(9) again + delete_ts = {9}; + delete_pk = {2}; + delete_record.StreamPush(delete_pk, delete_ts.data()); + ASSERT_EQ(5, delete_record.size()); + + { + BitsetType bitsets(N); + int64_t insert_barrier = N; + // query at ts (10) + Timestamp query_timestamp = 10; + delete_record.Query(bitsets, insert_barrier, query_timestamp); + std::vector expected = {0, 0, 0, 1, 1, 1, 0, 1, 1, 0}; + // pk 2 in ts(8) was deleted + for (int i = 0; i < N; i++) { + ASSERT_EQ(bitsets[i], expected[i]); + } + } +} + +TEST(DeleteMVCC, snapshot) { + using namespace milvus; + using namespace milvus::query; + using namespace milvus::segcore; + + auto schema = std::make_shared(); + auto vec_fid = schema->AddDebugField( + "fakevec", DataType::VECTOR_FLOAT, 16, knowhere::metric::L2); + auto i64_fid = schema->AddDebugField("age", DataType::INT64); + schema->set_primary_field_id(i64_fid); + auto N = 500000; + uint64_t seg_id = 101; + InsertRecord insert_record(*schema, N); + DeletedRecord delete_record(&insert_record); + + std::vector age_data(N); + std::vector tss(N); + for (int i = 0; i < N; ++i) { + age_data[i] = i; + tss[i] = i; + insert_record.insert_pk(age_data[i], i); + } + auto insert_offset = insert_record.reserved.fetch_add(N); + insert_record.timestamps_.set_data_raw(insert_offset, tss.data(), N); + auto field_data = insert_record.get_data_base(i64_fid); + field_data->set_data_raw(insert_offset, age_data.data(), N); + insert_record.ack_responder_.AddSegment(insert_offset, insert_offset + N); + + auto DN = 400000; + std::vector delete_ts(DN); + std::vector delete_pk(DN); + for (int i = 0; i < DN; ++i) { + delete_pk[i] = age_data[i]; + delete_ts[i] = i + 1; + } + delete_record.StreamPush(delete_pk, delete_ts.data()); + ASSERT_EQ(DN, delete_record.size()); + + auto snapshots = std::move(delete_record.get_snapshots()); + ASSERT_EQ(3, snapshots.size()); + ASSERT_EQ(snapshots[2].second.count(), 300000); +} + +TEST(DeleteMVCC, insert_after_snapshot) { + using namespace milvus; + using namespace milvus::query; + using namespace milvus::segcore; + + auto schema = std::make_shared(); + auto vec_fid = schema->AddDebugField( + "fakevec", DataType::VECTOR_FLOAT, 16, knowhere::metric::L2); + auto i64_fid = schema->AddDebugField("age", DataType::INT64); + schema->set_primary_field_id(i64_fid); + auto N = 110000; + uint64_t seg_id = 101; + InsertRecord insert_record(*schema, N); + DeletedRecord delete_record(&insert_record); + + // insert (0, 0), (1, 1) .... (N - 1, N - 1) + std::vector age_data(N); + std::vector tss(N); + for (int i = 0; i < N; ++i) { + age_data[i] = i; + tss[i] = i; + insert_record.insert_pk(age_data[i], i); + } + auto insert_offset = insert_record.reserved.fetch_add(N); + insert_record.timestamps_.set_data_raw(insert_offset, tss.data(), N); + auto field_data = insert_record.get_data_base(i64_fid); + field_data->set_data_raw(insert_offset, age_data.data(), N); + insert_record.ack_responder_.AddSegment(insert_offset, insert_offset + N); + + // delete (0, 1), (1, 2) .... (DN, DN + 1) + auto DN = 101000; + std::vector delete_ts(DN); + std::vector delete_pk(DN); + for (int i = 0; i < DN; ++i) { + delete_pk[i] = age_data[i]; + delete_ts[i] = i + 1; + } + delete_record.StreamPush(delete_pk, delete_ts.data()); + ASSERT_EQ(DN, delete_record.size()); + + auto snapshots = std::move(delete_record.get_snapshots()); + ASSERT_EQ(1, snapshots.size()); + ASSERT_EQ(snapshots[0].second.count(), 100000); + + // Query at N+1 ts + { + BitsetType bitsets(N); + int64_t insert_barrier = N; + Timestamp query_timestamp = N + 1; + delete_record.Query(bitsets, insert_barrier, query_timestamp); + for (auto i = 0; i < DN; i++) { + ASSERT_EQ(bitsets[i], true) << i; + } + for (auto i = DN; i < N; i++) { + ASSERT_EQ(bitsets[i], false) << i; + } + } + + // insert (N, N), (N + 1, N + 1).... (N + AN - 1, N + AN - 1) again + auto AN = 1000; + std::vector age_data_new(AN); + std::vector tss_new(AN); + for (int i = 0; i < AN; ++i) { + age_data_new[i] = N + i; + tss_new[i] = N + i; + insert_record.insert_pk(age_data_new[i], i + N); + } + insert_offset = insert_record.reserved.fetch_add(AN); + insert_record.timestamps_.set_data_raw(insert_offset, tss_new.data(), AN); + field_data = insert_record.get_data_base(i64_fid); + field_data->set_data_raw(insert_offset, age_data.data(), AN); + insert_record.ack_responder_.AddSegment(insert_offset, insert_offset + AN); + + // Query at N + AN + 1 ts + { + BitsetType bitsets(N + AN); + int64_t insert_barrier = N + AN; + Timestamp query_timestamp = N + AN + 1; + delete_record.Query(bitsets, insert_barrier, query_timestamp); + for (auto i = 0; i < DN; i++) { + ASSERT_EQ(bitsets[i], true); + } + for (auto i = DN; i < N + AN; i++) { + ASSERT_EQ(bitsets[i], false); + } + } +} + +TEST(DeleteMVCC, perform) { + using namespace milvus; + using namespace milvus::query; + using namespace milvus::segcore; + + auto schema = std::make_shared(); + auto vec_fid = schema->AddDebugField( + "fakevec", DataType::VECTOR_FLOAT, 16, knowhere::metric::L2); + auto i64_fid = schema->AddDebugField("age", DataType::INT64); + schema->set_primary_field_id(i64_fid); + auto N = 1000000; + uint64_t seg_id = 101; + InsertRecord insert_record(*schema, N); + DeletedRecord delete_record(&insert_record); + + std::vector age_data(N); + std::vector tss(N); + for (int i = 0; i < N; ++i) { + age_data[i] = i; + tss[i] = i; + insert_record.insert_pk(i, i); + } + auto insert_offset = insert_record.reserved.fetch_add(N); + insert_record.timestamps_.set_data_raw(insert_offset, tss.data(), N); + auto field_data = insert_record.get_data_base(i64_fid); + field_data->set_data_raw(insert_offset, age_data.data(), N); + insert_record.ack_responder_.AddSegment(insert_offset, insert_offset + N); + + auto DN = N / 2; + std::vector delete_ts(DN); + std::vector delete_pk(DN); + for (int i = 0; i < DN; ++i) { + delete_ts[i] = N + i; + delete_pk[i] = i; + } + auto start = std::chrono::steady_clock::now(); + delete_record.StreamPush(delete_pk, delete_ts.data()); + auto end = std::chrono::steady_clock::now(); + std::cout << "push cost:" + << std::chrono::duration_cast(end - + start) + .count() + << std::endl; + std::cout << delete_record.size() << std::endl; + + auto query_timestamp = delete_ts[DN - 1]; + auto insert_barrier = get_barrier(insert_record, query_timestamp); + BitsetType res_bitmap(insert_barrier); + start = std::chrono::steady_clock::now(); + delete_record.Query(res_bitmap, insert_barrier, query_timestamp); + end = std::chrono::steady_clock::now(); + std::cout << "query cost:" + << std::chrono::duration_cast(end - + start) + .count() + << std::endl; +} diff --git a/internal/core/unittest/test_growing.cpp b/internal/core/unittest/test_growing.cpp index 3a15dbbf3135c..24c691a3186c6 100644 --- a/internal/core/unittest/test_growing.cpp +++ b/internal/core/unittest/test_growing.cpp @@ -89,7 +89,7 @@ TEST(Growing, RealCount) { // delete all. auto del_offset3 = segment->get_deleted_count(); - ASSERT_EQ(del_offset3, half * 2); + ASSERT_EQ(del_offset3, half); auto del_ids3 = GenPKs(pks.begin(), pks.end()); auto del_tss3 = GenTss(c, c + half * 2); status = segment->Delete(del_offset3, c, del_ids3.get(), del_tss3.data()); diff --git a/internal/core/unittest/test_sealed.cpp b/internal/core/unittest/test_sealed.cpp index dbfc0bceb280f..857bbd8825f71 100644 --- a/internal/core/unittest/test_sealed.cpp +++ b/internal/core/unittest/test_sealed.cpp @@ -1238,6 +1238,7 @@ TEST(Sealed, DeleteCount) { auto pk = schema->AddDebugField("pk", DataType::INT64); schema->set_primary_field_id(pk); auto segment = CreateSealedSegment(schema); + segment->get_insert_record().seal_pks(); int64_t c = 10; auto offset = segment->get_deleted_count(); @@ -1249,9 +1250,8 @@ TEST(Sealed, DeleteCount) { auto status = segment->Delete(offset, c, pks.get(), tss.data()); ASSERT_TRUE(status.ok()); - // shouldn't be filtered for empty segment. auto cnt = segment->get_deleted_count(); - ASSERT_EQ(cnt, 10); + ASSERT_EQ(cnt, 0); } { auto schema = std::make_shared(); @@ -1318,7 +1318,7 @@ TEST(Sealed, RealCount) { // delete all. auto del_offset3 = segment->get_deleted_count(); - ASSERT_EQ(del_offset3, half * 2); + ASSERT_EQ(del_offset3, half); auto del_ids3 = GenPKs(pks.begin(), pks.end()); auto del_tss3 = GenTss(c, c + half * 2); status = segment->Delete(del_offset3, c, del_ids3.get(), del_tss3.data()); diff --git a/internal/core/unittest/test_utils.cpp b/internal/core/unittest/test_utils.cpp index 2c8e30877bbd6..9f5c44976077f 100644 --- a/internal/core/unittest/test_utils.cpp +++ b/internal/core/unittest/test_utils.cpp @@ -64,7 +64,7 @@ TEST(Util, GetDeleteBitmap) { auto N = 10; uint64_t seg_id = 101; InsertRecord insert_record(*schema, N); - DeletedRecord delete_record; + DeletedRecord delete_record(&insert_record); // fill insert record, all insert records has same pk = 1, timestamps= {1 ... N} std::vector age_data(N); @@ -83,37 +83,13 @@ TEST(Util, GetDeleteBitmap) { // test case delete pk1(ts = 0) -> insert repeated pk1 (ts = {1 ... N}) -> query (ts = N) std::vector delete_ts = {0}; std::vector delete_pk = {1}; - delete_record.push(delete_pk, delete_ts.data()); + delete_record.StreamPush(delete_pk, delete_ts.data()); auto query_timestamp = tss[N - 1]; - auto del_barrier = get_barrier(delete_record, query_timestamp); auto insert_barrier = get_barrier(insert_record, query_timestamp); - auto res_bitmap = get_deleted_bitmap(del_barrier, - insert_barrier, - delete_record, - insert_record, - query_timestamp); - ASSERT_EQ(res_bitmap->bitmap_ptr->count(), 0); - - // test case insert repeated pk1 (ts = {1 ... N}) -> delete pk1 (ts = N) -> query (ts = N) - delete_ts = {uint64_t(N)}; - delete_pk = {1}; - delete_record.push(delete_pk, delete_ts.data()); - - del_barrier = get_barrier(delete_record, query_timestamp); - res_bitmap = get_deleted_bitmap(del_barrier, - insert_barrier, - delete_record, - insert_record, - query_timestamp); - ASSERT_EQ(res_bitmap->bitmap_ptr->count(), N - 1); - - // test case insert repeated pk1 (ts = {1 ... N}) -> delete pk1 (ts = N) -> query (ts = N/2) - query_timestamp = tss[N - 1] / 2; - del_barrier = get_barrier(delete_record, query_timestamp); - res_bitmap = get_deleted_bitmap( - del_barrier, N, delete_record, insert_record, query_timestamp); - ASSERT_EQ(res_bitmap->bitmap_ptr->count(), 0); + BitsetType res_bitmap(insert_barrier); + delete_record.Query(res_bitmap, insert_barrier, query_timestamp); + ASSERT_EQ(res_bitmap.count(), 0); } TEST(Util, OutOfRange) {