Skip to content

Commit

Permalink
Restore the MVCC functionality.
Browse files Browse the repository at this point in the history
When the TimeTravel functionality was previously removed, it inadvertently
affected the MVCC functionality within the system.
This PR aims to reintroduce the internal MVCC functionality as follows:

 1. Add MvccTimestamp to the requests of Search/Query and the results of Search internally.
 2. When the delegator receives a Query/Search request and there is no MVCC timestamp set
    in the request, set the delegator's current tsafe as the MVCC timestamp of the request.
    If the request already has an MVCC timestamp, do not modify it.
 3. When the Proxy handles Search and triggers the second phase ReQuery, divide the ReQuery
    into different shards and pass the MVCC timestamp to the corresponding Query requests.

Signed-off-by: zhenshan.cao <[email protected]>
  • Loading branch information
czs007 committed Jan 8, 2024
1 parent 97e4ec5 commit b9d76f7
Show file tree
Hide file tree
Showing 35 changed files with 2,722 additions and 329 deletions.
5 changes: 3 additions & 2 deletions internal/core/src/segcore/SegmentInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,12 @@ SegmentInternalInterface::FillTargetEntry(const query::Plan* plan,
std::unique_ptr<SearchResult>
SegmentInternalInterface::Search(
const query::Plan* plan,
const query::PlaceholderGroup* placeholder_group) const {
const query::PlaceholderGroup* placeholder_group,
Timestamp timestamp) const {
std::shared_lock lck(mutex_);
milvus::tracer::AddEvent("obtained_segment_lock_mutex");
check_search(plan);
query::ExecPlanNodeVisitor visitor(*this, 1L << 63, placeholder_group);
query::ExecPlanNodeVisitor visitor(*this, timestamp, placeholder_group);
auto results = std::make_unique<SearchResult>();
*results = visitor.get_moved_result(*plan->plan_node_);
results->segment_ = (void*)this;
Expand Down
6 changes: 4 additions & 2 deletions internal/core/src/segcore/SegmentInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ class SegmentInterface {

virtual std::unique_ptr<SearchResult>
Search(const query::Plan* Plan,
const query::PlaceholderGroup* placeholder_group) const = 0;
const query::PlaceholderGroup* placeholder_group,
Timestamp timestamp) const = 0;

virtual std::unique_ptr<proto::segcore::RetrieveResults>
Retrieve(const query::RetrievePlan* Plan,
Expand Down Expand Up @@ -136,7 +137,8 @@ class SegmentInternalInterface : public SegmentInterface {

std::unique_ptr<SearchResult>
Search(const query::Plan* Plan,
const query::PlaceholderGroup* placeholder_group) const override;
const query::PlaceholderGroup* placeholder_group,
Timestamp timestamp) const override;

void
FillPrimaryKeys(const query::Plan* plan,
Expand Down
3 changes: 2 additions & 1 deletion internal/core/src/segcore/segment_c.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ Search(CSegmentInterface c_segment,
CSearchPlan c_plan,
CPlaceholderGroup c_placeholder_group,
CTraceContext c_trace,
uint64_t timestamp,
CSearchResult* result) {
try {
auto segment = (milvus::segcore::SegmentInterface*)c_segment;
Expand All @@ -90,7 +91,7 @@ Search(CSegmentInterface c_segment,
c_trace.traceID, c_trace.spanID, c_trace.flag};
auto span = milvus::tracer::StartSpan("SegCoreSearch", &ctx);
milvus::tracer::SetRootSpan(span);
auto search_result = segment->Search(plan, phg_ptr);
auto search_result = segment->Search(plan, phg_ptr, timestamp);
if (!milvus::PositivelyRelated(
plan->plan_node_->search_info_.metric_type_)) {
for (auto& dis : search_result->distances_) {
Expand Down
1 change: 1 addition & 0 deletions internal/core/src/segcore/segment_c.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ Search(CSegmentInterface c_segment,
CSearchPlan c_plan,
CPlaceholderGroup c_placeholder_group,
CTraceContext c_trace,
uint64_t timestamp,
CSearchResult* result);

void
Expand Down
12 changes: 9 additions & 3 deletions internal/core/unittest/bench/bench_search.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,10 @@ Search_GrowingIndex(benchmark::State& state) {
dataset_.timestamps_.data(),
dataset_.raw_);

Timestamp ts = 10000000;

for (auto _ : state) {
auto qr = segment->Search(search_plan.get(), ph_group.get());
auto qr = segment->Search(search_plan.get(), ph_group.get(), ts);
}
}

Expand All @@ -114,7 +116,8 @@ Search_Sealed(benchmark::State& state) {
} else if (choice == 1) {
// hnsw
auto vec = dataset_.get_col<float>(milvus::FieldId(100));
auto indexing = GenVecIndexing(N, dim, vec.data(), knowhere::IndexEnum::INDEX_HNSW);
auto indexing =
GenVecIndexing(N, dim, vec.data(), knowhere::IndexEnum::INDEX_HNSW);
segcore::LoadIndexInfo info;
info.index = std::move(indexing);
info.field_id = (*schema)[FieldName("fakevec")].get_id().get();
Expand All @@ -123,8 +126,11 @@ Search_Sealed(benchmark::State& state) {
segment->DropFieldData(milvus::FieldId(100));
segment->LoadIndex(info);
}

Timestamp ts = 10000000;

for (auto _ : state) {
auto qr = segment->Search(search_plan.get(), ph_group.get());
auto qr = segment->Search(search_plan.get(), ph_group.get(), ts);
}
}

Expand Down
7 changes: 4 additions & 3 deletions internal/core/unittest/test_binlog_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,8 @@ TEST_P(BinlogIndexTest, Accuracy) {
std::vector<const milvus::query::PlaceholderGroup*> ph_group_arr = {
ph_group.get()};
auto nlist = segcore_config.get_nlist();
auto binlog_index_sr = segment->Search(plan.get(), ph_group.get());
auto binlog_index_sr =
segment->Search(plan.get(), ph_group.get(), 1L << 63);
ASSERT_EQ(binlog_index_sr->total_nq_, num_queries);
EXPECT_EQ(binlog_index_sr->unity_topK_, topk);
EXPECT_EQ(binlog_index_sr->distances_.size(), num_queries * topk);
Expand Down Expand Up @@ -226,7 +227,7 @@ TEST_P(BinlogIndexTest, Accuracy) {
EXPECT_TRUE(segment->HasIndex(vec_field_id));
EXPECT_EQ(segment->get_row_count(), data_n);
EXPECT_FALSE(segment->HasFieldData(vec_field_id));
auto ivf_sr = segment->Search(plan.get(), ph_group.get());
auto ivf_sr = segment->Search(plan.get(), ph_group.get(), 1L << 63);
auto similary = GetKnnSearchRecall(num_queries,
binlog_index_sr->seg_offsets_.data(),
topk,
Expand Down Expand Up @@ -312,4 +313,4 @@ TEST_P(BinlogIndexTest, LoadBinlogWithoutIndexMeta) {
EXPECT_FALSE(segment->HasIndex(vec_field_id));
EXPECT_EQ(segment->get_row_count(), data_n);
EXPECT_TRUE(segment->HasFieldData(vec_field_id));
}
}
Loading

0 comments on commit b9d76f7

Please sign in to comment.