Skip to content

Commit

Permalink
add vector index mgr to remove vector index type dependency
Browse files Browse the repository at this point in the history
Signed-off-by: xianliang.li <[email protected]>
  • Loading branch information
foxspy committed Oct 15, 2024
1 parent f0f5147 commit ef763a9
Show file tree
Hide file tree
Showing 26 changed files with 606 additions and 67 deletions.
4 changes: 2 additions & 2 deletions cmd/tools/migration/mmap/mmap_230_240.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/rootcoord"
"github.com/milvus-io/milvus/internal/tso"
"github.com/milvus-io/milvus/internal/util/vecindexmgr"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/indexparamcheck"
)

// In Milvus 2.3.x, querynode.MmapDirPath is used to enable mmap and save mmap files.
Expand Down Expand Up @@ -84,7 +84,7 @@ func (m *MmapMigration) MigrateIndexCoordCollection(ctx context.Context) {

alteredIndexes := make([]*model.Index, 0)
for _, index := range fieldIndexes {
if !indexparamcheck.IsVectorMmapIndex(getIndexType(index.IndexParams)) {
if !vecindexmgr.GetVecIndexMgrInstance().IsMMapSupported(getIndexType(index.IndexParams)) {
continue
}
fmt.Printf("migrate index, collection:%v, indexId: %v, indexName: %s\n", index.CollectionID, index.IndexID, index.IndexName)
Expand Down
20 changes: 3 additions & 17 deletions internal/core/src/index/Index.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "common/EasyAssert.h"
#include "knowhere/comp/index_param.h"
#include "knowhere/dataset.h"
#include "knowhere/index/index_factory.h"
#include "common/Tracer.h"
#include "common/Types.h"
#include "index/Meta.h"
Expand Down Expand Up @@ -62,23 +63,8 @@ class IndexBase {
virtual const bool
HasRawData() const = 0;

bool
IsMmapSupported() const {
return index_type_ == knowhere::IndexEnum::INDEX_HNSW ||
index_type_ == knowhere::IndexEnum::INDEX_FAISS_IVFFLAT ||
index_type_ == knowhere::IndexEnum::INDEX_FAISS_IVFFLAT_CC ||
index_type_ == knowhere::IndexEnum::INDEX_FAISS_IVFPQ ||
index_type_ == knowhere::IndexEnum::INDEX_FAISS_IVFSQ8 ||
index_type_ == knowhere::IndexEnum::INDEX_FAISS_BIN_IVFFLAT ||
index_type_ == knowhere::IndexEnum::INDEX_FAISS_IDMAP ||
index_type_ == knowhere::IndexEnum::INDEX_FAISS_BIN_IDMAP ||
index_type_ ==
knowhere::IndexEnum::INDEX_SPARSE_INVERTED_INDEX ||
index_type_ == knowhere::IndexEnum::INDEX_SPARSE_WAND ||
// support mmap for bitmap/hybrid index
index_type_ == milvus::index::BITMAP_INDEX_TYPE ||
index_type_ == milvus::index::HYBRID_INDEX_TYPE;
}
virtual bool
IsMmapSupported() const = 0;

const IndexType&
Type() const {
Expand Down
6 changes: 6 additions & 0 deletions internal/core/src/index/ScalarIndex.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,12 @@ class ScalarIndex : public IndexBase {
PanicInfo(Unsupported, "pattern match is not supported");
}

virtual bool
IsMmapSupported() const {
return index_type_ == milvus::index::BITMAP_INDEX_TYPE ||
index_type_ == milvus::index::HYBRID_INDEX_TYPE;
}

virtual int64_t
Size() = 0;

Expand Down
6 changes: 6 additions & 0 deletions internal/core/src/index/VectorIndex.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,12 @@ class VectorIndex : public IndexBase {
err_msg);
}

virtual bool
IsMmapSupported() const {
return knowhere::IndexFactory::Instance().FeatureCheck(
index_type_, knowhere::feature::MMAP);
}

knowhere::Json
PrepareSearchParams(const SearchInfo& search_info) const {
knowhere::Json search_cfg = search_info.search_params_;
Expand Down
40 changes: 40 additions & 0 deletions internal/core/src/segcore/vector_index_c.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License

#include "segcore/vector_index_c.h"

#include "common/Types.h"
#include "common/EasyAssert.h"
#include "knowhere/utils.h"
#include "knowhere/config.h"
#include "knowhere/version.h"
#include "index/Meta.h"
#include "index/IndexFactory.h"
#include "pb/index_cgo_msg.pb.h"

int
GetIndexListSize() {
return knowhere::IndexFactory::Instance().GetIndexFeatures().size();
}

void
GetIndexFeatures(void* index_key_list, uint64_t* index_feature_list) {
auto features = knowhere::IndexFactory::Instance().GetIndexFeatures();
int idx = 0;

const char** index_keys = (const char**)index_key_list;
uint64_t* index_features = (uint64_t*)index_feature_list;
for (auto it = features.begin(); it != features.end(); ++it) {
index_keys[idx] = it->first.c_str();
index_features[idx] = it->second;
idx++;
}
}
28 changes: 28 additions & 0 deletions internal/core/src/segcore/vector_index_c.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License

#include <stdint.h>
#ifdef __cplusplus
extern "C" {
#endif

#include <stdbool.h>
#include "common/type_c.h"

int
GetIndexListSize();

void
GetIndexFeatures(void* index_key_list, uint64_t* index_feature_list);

#ifdef __cplusplus
}
#endif
4 changes: 2 additions & 2 deletions internal/core/thirdparty/knowhere/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
# Update KNOWHERE_VERSION for the first occurrence
milvus_add_pkg_config("knowhere")
set_property(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} PROPERTY INCLUDE_DIRECTORIES "")
set( KNOWHERE_VERSION d0d7eefb )
set( GIT_REPOSITORY "https://github.com/zilliztech/knowhere.git")
set( KNOWHERE_VERSION add_vector_index_mgr_bak )
set( GIT_REPOSITORY "https://github.com/foxspy/knowhere.git")
message(STATUS "Knowhere repo: ${GIT_REPOSITORY}")
message(STATUS "Knowhere version: ${KNOWHERE_VERSION}")

Expand Down
17 changes: 17 additions & 0 deletions internal/core/unittest/test_c_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
#include "exec/expression/Expr.h"
#include "segcore/load_index_c.h"
#include "test_utils/c_api_test_utils.h"
#include "segcore/vector_index_c.h"

namespace chrono = std::chrono;

Expand Down Expand Up @@ -712,6 +713,22 @@ TEST(CApiTest, MultiDeleteGrowingSegment) {
DeleteSegment(segment);
}

TEST(CApiTest, GetIndexListSizeAndFeatures) {
int size = GetIndexListSize();
ASSERT_GT(size, 0);

std::vector<const char*> index_keys(size);
std::vector<uint64_t> index_features(size);

GetIndexFeatures(index_keys.data(), index_features.data());

for (int i = 0; i < size; i++) {
ASSERT_NE(index_keys[i], nullptr);
ASSERT_GT(strlen(index_keys[i]), 0);
ASSERT_GT(index_features[i], 0);
}
}

TEST(CApiTest, MultiDeleteSealedSegment) {
auto collection = NewCollection(get_default_schema_config());
CSegmentInterface segment;
Expand Down
4 changes: 2 additions & 2 deletions internal/datacoord/import_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@
package datacoord

import (
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"google.golang.org/protobuf/proto"

"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/timerecord"
)

Expand Down
3 changes: 2 additions & 1 deletion internal/datacoord/index_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/proto/workerpb"
"github.com/milvus-io/milvus/internal/util/vecindexmgr"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
Expand Down Expand Up @@ -980,7 +981,7 @@ func (m *indexMeta) AreAllDiskIndex(collectionID int64, schema *schemapb.Collect
})
vectorFieldsWithDiskIndex := lo.Filter(vectorFields, func(field *schemapb.FieldSchema, _ int) bool {
if indexType, ok := fieldIndexTypes[field.FieldID]; ok {
return indexparamcheck.IsDiskIndex(indexType)
return vecindexmgr.GetVecIndexMgrInstance().IsDiskVecIndex(indexType)
}
return false
})
Expand Down
5 changes: 3 additions & 2 deletions internal/datacoord/index_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/util/vecindexmgr"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/funcutil"
Expand Down Expand Up @@ -231,10 +232,10 @@ func (s *Server) CreateIndex(ctx context.Context, req *indexpb.CreateIndexReques
metrics.IndexRequestCounter.WithLabelValues(metrics.FailLabel).Inc()
return merr.Status(err), nil
}
if GetIndexType(req.GetIndexParams()) == indexparamcheck.IndexDISKANN && !s.indexNodeManager.ClientSupportDisk() {
if vecindexmgr.GetVecIndexMgrInstance().IsDiskANN(GetIndexType(req.IndexParams)) && !s.indexNodeManager.ClientSupportDisk() {
errMsg := "all IndexNodes do not support disk indexes, please verify"
log.Warn(errMsg)
err = merr.WrapErrIndexNotSupported(indexparamcheck.IndexDISKANN)
err = merr.WrapErrIndexNotSupported(GetIndexType(req.IndexParams))
metrics.IndexRequestCounter.WithLabelValues(metrics.FailLabel).Inc()
return merr.Status(err), nil
}
Expand Down
2 changes: 1 addition & 1 deletion internal/datacoord/task_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (it *indexBuildTask) PreCheck(ctx context.Context, dependency *taskSchedule
}
indexParams := dependency.meta.indexMeta.GetIndexParams(segIndex.CollectionID, segIndex.IndexID)
indexType := GetIndexType(indexParams)
if isFlatIndex(indexType) || segIndex.NumRows < Params.DataCoordCfg.MinSegmentNumRowsToEnableIndex.GetAsInt64() {
if isNoTrainIndex(indexType) || segIndex.NumRows < Params.DataCoordCfg.MinSegmentNumRowsToEnableIndex.GetAsInt64() {
log.Ctx(ctx).Info("segment does not need index really", zap.Int64("taskID", it.taskID),
zap.Int64("segmentID", segIndex.SegmentID), zap.Int64("num rows", segIndex.NumRows))
it.SetStartTime(time.Now())
Expand Down
7 changes: 3 additions & 4 deletions internal/datacoord/task_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import (
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/proto/workerpb"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/indexparamcheck"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
Expand Down Expand Up @@ -1470,7 +1469,7 @@ func (s *taskSchedulerSuite) Test_indexTaskWithMvOptionalScalarField() {
},
{
Key: common.IndexTypeKey,
Value: indexparamcheck.IndexHNSW,
Value: "HNSW",
},
},
},
Expand Down Expand Up @@ -1523,7 +1522,7 @@ func (s *taskSchedulerSuite) Test_indexTaskWithMvOptionalScalarField() {
},
{
Key: common.IndexTypeKey,
Value: indexparamcheck.IndexHNSW,
Value: "HNSW",
},
},
},
Expand Down Expand Up @@ -1624,7 +1623,7 @@ func (s *taskSchedulerSuite) Test_indexTaskWithMvOptionalScalarField() {
resetMetaFunc := func() {
mt.indexMeta.buildID2SegmentIndex[buildID].IndexState = commonpb.IndexState_Unissued
mt.indexMeta.segmentIndexes[segID][indexID].IndexState = commonpb.IndexState_Unissued
mt.indexMeta.indexes[collID][indexID].IndexParams[1].Value = indexparamcheck.IndexHNSW
mt.indexMeta.indexes[collID][indexID].IndexParams[1].Value = "HNSW"
mt.collections[collID].Schema.Fields[0].DataType = schemapb.DataType_FloatVector
mt.collections[collID].Schema.Fields[1].IsPartitionKey = true
mt.collections[collID].Schema.Fields[1].DataType = schemapb.DataType_VarChar
Expand Down
10 changes: 5 additions & 5 deletions internal/datacoord/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/util/vecindexmgr"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/indexparamcheck"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
Expand Down Expand Up @@ -203,16 +203,16 @@ func GetIndexType(indexParams []*commonpb.KeyValuePair) string {
return invalidIndex
}

func isFlatIndex(indexType string) bool {
return indexType == indexparamcheck.IndexFaissIDMap || indexType == indexparamcheck.IndexFaissBinIDMap
func isNoTrainIndex(indexType string) bool {
return vecindexmgr.GetVecIndexMgrInstance().IsNoTrainIndex(indexType)
}

func isOptionalScalarFieldSupported(indexType string) bool {
return indexType == indexparamcheck.IndexHNSW
return vecindexmgr.GetVecIndexMgrInstance().IsMvSupported(indexType)
}

func isDiskANNIndex(indexType string) bool {
return indexType == indexparamcheck.IndexDISKANN
return vecindexmgr.GetVecIndexMgrInstance().IsDiskANN(indexType)
}

func parseBuildIDFromFilePath(key string) (UniqueID, error) {
Expand Down
4 changes: 2 additions & 2 deletions internal/indexnode/task_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ import (
"github.com/milvus-io/milvus/internal/proto/workerpb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/indexcgowrapper"
"github.com/milvus-io/milvus/internal/util/vecindexmgr"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/indexparamcheck"
"github.com/milvus-io/milvus/pkg/util/indexparams"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metautil"
Expand Down Expand Up @@ -210,7 +210,7 @@ func (it *indexBuildTask) Execute(ctx context.Context) error {
zap.Int32("currentIndexVersion", it.req.GetCurrentIndexVersion()))

indexType := it.newIndexParams[common.IndexTypeKey]
if indexType == indexparamcheck.IndexDISKANN {
if vecindexmgr.GetVecIndexMgrInstance().IsDiskANN(indexType) {
// check index node support disk index
if !Params.IndexNodeCfg.EnableDisk.GetAsBool() {
log.Warn("IndexNode don't support build disk index",
Expand Down
31 changes: 15 additions & 16 deletions internal/proxy/cgo_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"testing"

"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/pkg/util/indexparamcheck"
)

func Test_CheckVecIndexWithDataTypeExist(t *testing.T) {
Expand All @@ -29,25 +28,25 @@ func Test_CheckVecIndexWithDataTypeExist(t *testing.T) {
dataType schemapb.DataType
want bool
}{
{indexparamcheck.IndexHNSW, schemapb.DataType_FloatVector, true},
{indexparamcheck.IndexHNSW, schemapb.DataType_BinaryVector, false},
{indexparamcheck.IndexHNSW, schemapb.DataType_Float16Vector, true},
{"HNSW", schemapb.DataType_FloatVector, true},
{"HNSW", schemapb.DataType_BinaryVector, false},
{"HNSW", schemapb.DataType_Float16Vector, true},

{indexparamcheck.IndexSparseWand, schemapb.DataType_SparseFloatVector, true},
{indexparamcheck.IndexSparseWand, schemapb.DataType_FloatVector, false},
{indexparamcheck.IndexSparseWand, schemapb.DataType_Float16Vector, false},
{"SPARSE_WAND", schemapb.DataType_SparseFloatVector, true},
{"SPARSE_WAND", schemapb.DataType_FloatVector, false},
{"SPARSE_WAND", schemapb.DataType_Float16Vector, false},

{indexparamcheck.IndexGpuBF, schemapb.DataType_FloatVector, true},
{indexparamcheck.IndexGpuBF, schemapb.DataType_Float16Vector, false},
{indexparamcheck.IndexGpuBF, schemapb.DataType_BinaryVector, false},
{"GPU_BRUTE_FORCE", schemapb.DataType_FloatVector, true},
{"GPU_BRUTE_FORCE", schemapb.DataType_Float16Vector, false},
{"GPU_BRUTE_FORCE", schemapb.DataType_BinaryVector, false},

{indexparamcheck.IndexFaissBinIvfFlat, schemapb.DataType_BinaryVector, true},
{indexparamcheck.IndexFaissBinIvfFlat, schemapb.DataType_FloatVector, false},
{"BIN_IVF_FLAT", schemapb.DataType_BinaryVector, true},
{"BIN_IVF_FLAT", schemapb.DataType_FloatVector, false},

{indexparamcheck.IndexDISKANN, schemapb.DataType_FloatVector, true},
{indexparamcheck.IndexDISKANN, schemapb.DataType_Float16Vector, true},
{indexparamcheck.IndexDISKANN, schemapb.DataType_BFloat16Vector, true},
{indexparamcheck.IndexDISKANN, schemapb.DataType_BinaryVector, false},
{"DISKANN", schemapb.DataType_FloatVector, true},
{"DISKANN", schemapb.DataType_Float16Vector, true},
{"DISKANN", schemapb.DataType_BFloat16Vector, true},
{"DISKANN", schemapb.DataType_BinaryVector, false},
}

for _, test := range cases {
Expand Down
Loading

0 comments on commit ef763a9

Please sign in to comment.