Skip to content

Commit

Permalink
add index config check
Browse files Browse the repository at this point in the history
Signed-off-by: xianliang.li <[email protected]>
  • Loading branch information
foxspy committed Sep 3, 2024
1 parent ea2ac2a commit bc2b7ec
Show file tree
Hide file tree
Showing 102 changed files with 933 additions and 515 deletions.
2 changes: 1 addition & 1 deletion cmd/tools/migration/mmap/mmap_230_240.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/rootcoord"
"github.com/milvus-io/milvus/internal/tso"
"github.com/milvus-io/milvus/internal/util/indexparamcheck"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/indexparamcheck"
)

// In Milvus 2.3.x, querynode.MmapDirPath is used to enable mmap and save mmap files.
Expand Down
20 changes: 20 additions & 0 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1044,3 +1044,23 @@ streamingNode:
serverMaxRecvSize: 268435456 # The maximum size of each RPC request that the streamingNode can receive, unit: byte
clientMaxSendSize: 268435456 # The maximum size of each RPC request that the clients on streamingNode can send, unit: byte
clientMaxRecvSize: 268435456 # The maximum size of each RPC request that the clients on streamingNode can receive, unit: byte

knowhere:
enable: true
HNSW:
build:
efConstruction : 360
M: 30
search:
ef: 30
DISKANN:
build:
max_degree: 56
search_list_size: 100
pq_code_budget_gb_ratio: 0.125
search_cache_budget_gb_ratio: 0.1
beam_width_ratio: 4
load:
cacheRatio: 0.1
search:
beamRatio: 4.0
9 changes: 2 additions & 7 deletions internal/core/src/index/Index.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,8 @@ class IndexBase {
virtual const bool
HasRawData() const = 0;

bool
IsMmapSupported() const {
return knowhere::IndexFactory::Instance().FeatureCheck(index_type_, knowhere::feature::MMAP) ||
// support mmap for bitmap/hybrid index
index_type_ == milvus::index::BITMAP_INDEX_TYPE ||
index_type_ == milvus::index::HYBRID_INDEX_TYPE;
}
virtual bool
IsMmapSupported() const = 0;

const IndexType&
Type() const {
Expand Down
6 changes: 6 additions & 0 deletions internal/core/src/index/ScalarIndex.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,12 @@ class ScalarIndex : public IndexBase {
PanicInfo(Unsupported, "pattern match is not supported");
}

virtual bool
IsMmapSupported() const {
return index_type_ == milvus::index::BITMAP_INDEX_TYPE ||
index_type_ == milvus::index::HYBRID_INDEX_TYPE;
}

virtual int64_t
Size() = 0;

Expand Down
5 changes: 5 additions & 0 deletions internal/core/src/index/VectorIndex.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,11 @@ class VectorIndex : public IndexBase {
err_msg);
}

virtual bool
IsMmapSupported() const {
return knowhere::IndexFactory::Instance().FeatureCheck(index_type_, knowhere::feature::MMAP);
}

knowhere::Json
PrepareSearchParams(const SearchInfo& search_info) const {
knowhere::Json search_cfg = search_info.search_params_;
Expand Down
54 changes: 54 additions & 0 deletions internal/core/src/segcore/vector_index_c.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,63 @@

#include "segcore/vector_index_c.h"

#include "common/Types.h"
#include "common/EasyAssert.h"
#include "knowhere/utils.h"
#include "knowhere/config.h"
#include "knowhere/version.h"
#include "index/Meta.h"
#include "index/IndexFactory.h"
#include "pb/index_cgo_msg.pb.h"

CStatus
ValidateIndexParams(const char* index_type, enum CDataType data_type, const uint8_t* serialized_index_params, const uint64_t length) {
try {
auto index_params =
std::make_unique<milvus::proto::indexcgo::IndexParams>();
auto res = index_params->ParseFromArray(serialized_index_params, length);
AssertInfo(res, "Unmarshall index params failed");

knowhere::Json json;

for (size_t i = 0; i < index_params->params_size(); i++) {
auto& param = index_params->params(i);
json[param.key()] = param.value();
}

milvus::DataType dataType(static_cast<milvus::DataType>(data_type));

knowhere::Status status;
std::string error_msg;
if (dataType == milvus::DataType::VECTOR_BINARY) {
status = knowhere::CheckConfig<knowhere::bin1>(index_type, knowhere::Version::GetCurrentVersion().VersionNumber(), json, knowhere::PARAM_TYPE::TRAIN, error_msg);
} else if (dataType == milvus::DataType::VECTOR_FLOAT) {
status = knowhere::CheckConfig<knowhere::fp32>(index_type, knowhere::Version::GetCurrentVersion().VersionNumber(), json, knowhere::PARAM_TYPE::TRAIN, error_msg);
} else if (dataType == milvus::DataType::VECTOR_BFLOAT16) {
status = knowhere::CheckConfig<knowhere::bf16>(index_type, knowhere::Version::GetCurrentVersion().VersionNumber(), json, knowhere::PARAM_TYPE::TRAIN, error_msg);
} else if (dataType == milvus::DataType::VECTOR_FLOAT16) {
status = knowhere::CheckConfig<knowhere::fp16>(index_type, knowhere::Version::GetCurrentVersion().VersionNumber(), json, knowhere::PARAM_TYPE::TRAIN, error_msg);
} else if (dataType == milvus::DataType::VECTOR_SPARSE_FLOAT) {
status = knowhere::CheckConfig<knowhere::fp32>(index_type, knowhere::Version::GetCurrentVersion().VersionNumber(), json, knowhere::PARAM_TYPE::TRAIN, error_msg);
} else {
status = knowhere::Status::invalid_args;
}
CStatus cStatus;
if (status == knowhere::Status::success) {
cStatus.error_code = milvus::Success;
cStatus.error_msg = "";
} else {
cStatus.error_code = milvus::ConfigInvalid;
cStatus.error_msg = error_msg.c_str();
}
return cStatus;
} catch (std::exception& e) {
auto cStatus = CStatus();
cStatus.error_code = milvus::UnexpectedError;
cStatus.error_msg = strdup(e.what());
return cStatus;
}
}

int
GetIndexListSize() {
Expand Down
4 changes: 4 additions & 0 deletions internal/core/src/segcore/vector_index_c.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ extern "C" {
#endif

#include <stdbool.h>
#include "common/type_c.h"

CStatus
ValidateIndexParams(const char* index_type, enum CDataType data_type, const uint8_t* index_params, const uint64_t length);

int
GetIndexListSize();
Expand Down
2 changes: 2 additions & 0 deletions internal/core/thirdparty/knowhere/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,5 @@ endif()
# get prometheus COMPILE_OPTIONS
get_property( var DIRECTORY "${knowhere_SOURCE_DIR}" PROPERTY COMPILE_OPTIONS )
message( STATUS "knowhere src compile options: ${var}" )

set( KNOWHERE_INCLUDE_DIR ${knowhere_SOURCE_DIR}/include CACHE INTERNAL "Path to knowhere include directory" )
2 changes: 1 addition & 1 deletion internal/datacoord/index_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ import (
"github.com/milvus-io/milvus/internal/metastore"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/util/indexparamcheck"
"github.com/milvus-io/milvus/internal/proto/workerpb"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/indexparamcheck"
"github.com/milvus-io/milvus/pkg/util/timerecord"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
Expand Down
2 changes: 1 addition & 1 deletion internal/datacoord/index_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ import (
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/util/indexparamcheck"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/indexparamcheck"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metautil"
"github.com/milvus-io/milvus/pkg/util/paramtable"
Expand Down
6 changes: 3 additions & 3 deletions internal/datacoord/index_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ import (
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/proto/workerpb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/indexparamcheck"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/indexparamcheck"
"github.com/milvus-io/milvus/pkg/util/merr"
)

Expand Down Expand Up @@ -620,13 +620,13 @@ func TestServer_AlterIndex(t *testing.T) {
s.stateCode.Store(commonpb.StateCode_Healthy)

t.Run("mmap_unsupported", func(t *testing.T) {
indexParams[0].Value = indexparamcheck.IndexRaftCagra
indexParams[0].Value = "GPU_CAGRA"

resp, err := s.AlterIndex(ctx, req)
assert.NoError(t, err)
assert.ErrorIs(t, merr.CheckRPCCall(resp, err), merr.ErrParameterInvalid)

indexParams[0].Value = indexparamcheck.IndexFaissIvfFlat
indexParams[0].Value = "IVF_FLAT"
})

t.Run("param_value_invalied", func(t *testing.T) {
Expand Down
12 changes: 12 additions & 0 deletions internal/datacoord/task_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/indexparams"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

Expand Down Expand Up @@ -156,6 +157,17 @@ func (it *indexBuildTask) PreCheck(ctx context.Context, dependency *taskSchedule

fieldID := dependency.meta.indexMeta.GetFieldIDByIndexID(segIndex.CollectionID, segIndex.IndexID)
binlogIDs := getBinLogIDs(segment, fieldID)
if Params.IndexEngineConfig.Enable.GetAsBool() {
var ret error
indexParams, ret = Params.IndexEngineConfig.MergeRequestParam(GetIndexType(indexParams), paramtable.BuildStage, indexParams)

if ret != nil {
log.Ctx(ctx).Warn("failed to construct index build params", zap.Int64("taskID", it.taskID), zap.Error(ret))
it.SetState(indexpb.JobState_JobStateInit, ret.Error())
return true
}
}

if isDiskANNIndex(GetIndexType(indexParams)) {
var err error
indexParams, err = indexparams.UpdateDiskIndexBuildParams(Params, indexParams)
Expand Down
7 changes: 3 additions & 4 deletions internal/datacoord/task_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import (
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/proto/workerpb"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/indexparamcheck"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
Expand Down Expand Up @@ -1432,7 +1431,7 @@ func (s *taskSchedulerSuite) Test_indexTaskWithMvOptionalScalarField() {
},
{
Key: common.IndexTypeKey,
Value: indexparamcheck.IndexHNSW,
Value: "HNSW",
},
},
},
Expand Down Expand Up @@ -1485,7 +1484,7 @@ func (s *taskSchedulerSuite) Test_indexTaskWithMvOptionalScalarField() {
},
{
Key: common.IndexTypeKey,
Value: indexparamcheck.IndexHNSW,
Value: "HNSW",
},
},
},
Expand Down Expand Up @@ -1582,7 +1581,7 @@ func (s *taskSchedulerSuite) Test_indexTaskWithMvOptionalScalarField() {
resetMetaFunc := func() {
mt.indexMeta.buildID2SegmentIndex[buildID].IndexState = commonpb.IndexState_Unissued
mt.indexMeta.segmentIndexes[segID][indexID].IndexState = commonpb.IndexState_Unissued
mt.indexMeta.indexes[collID][indexID].IndexParams[1].Value = indexparamcheck.IndexHNSW
mt.indexMeta.indexes[collID][indexID].IndexParams[1].Value = "HNSW"
mt.collections[collID].Schema.Fields[0].DataType = schemapb.DataType_FloatVector
mt.collections[collID].Schema.Fields[1].IsPartitionKey = true
mt.collections[collID].Schema.Fields[1].DataType = schemapb.DataType_VarChar
Expand Down
2 changes: 1 addition & 1 deletion internal/datacoord/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/util/indexparamcheck"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/indexparamcheck"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
Expand Down
9 changes: 7 additions & 2 deletions internal/indexnode/task_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ import (
"github.com/milvus-io/milvus/internal/proto/workerpb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/indexcgowrapper"
"github.com/milvus-io/milvus/internal/util/indexparamcheck"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/indexparamcheck"
"github.com/milvus-io/milvus/pkg/util/indexparams"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metautil"
Expand Down Expand Up @@ -210,6 +210,7 @@ func (it *indexBuildTask) Execute(ctx context.Context) error {
zap.Int32("currentIndexVersion", it.req.GetCurrentIndexVersion()))

indexType := it.newIndexParams[common.IndexTypeKey]
var fieldDataSize uint64
if indexparamcheck.GetVecIndexMgrInstance().IsDiskANN(indexType) {
// check index node support disk index
if !Params.IndexNodeCfg.EnableDisk.GetAsBool() {
Expand All @@ -225,7 +226,7 @@ func (it *indexBuildTask) Execute(ctx context.Context) error {
log.Warn("IndexNode get local used size failed")
return err
}
fieldDataSize, err := estimateFieldDataSize(it.req.GetDim(), it.req.GetNumRows(), it.req.GetField().GetDataType())
fieldDataSize, err = estimateFieldDataSize(it.req.GetDim(), it.req.GetNumRows(), it.req.GetField().GetDataType())
if err != nil {
log.Warn("IndexNode get local used size failed")
return err
Expand All @@ -247,6 +248,10 @@ func (it *indexBuildTask) Execute(ctx context.Context) error {
}
}

if Params.IndexEngineConfig.Enable.GetAsBool() {
it.newIndexParams, _ = Params.IndexEngineConfig.MergeWithResource(fieldDataSize, it.newIndexParams)
}

storageConfig := &indexcgopb.StorageConfig{
Address: it.req.GetStorageConfig().GetAddress(),
AccessKeyID: it.req.GetStorageConfig().GetAccessKeyID(),
Expand Down
31 changes: 15 additions & 16 deletions internal/proxy/cgo_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"testing"

"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/pkg/util/indexparamcheck"
)

func Test_CheckVecIndexWithDataTypeExist(t *testing.T) {
Expand All @@ -29,25 +28,25 @@ func Test_CheckVecIndexWithDataTypeExist(t *testing.T) {
dataType schemapb.DataType
want bool
}{
{indexparamcheck.IndexHNSW, schemapb.DataType_FloatVector, true},
{indexparamcheck.IndexHNSW, schemapb.DataType_BinaryVector, false},
{indexparamcheck.IndexHNSW, schemapb.DataType_Float16Vector, true},
{"HNSW", schemapb.DataType_FloatVector, true},
{"HNSW", schemapb.DataType_BinaryVector, false},
{"HNSW", schemapb.DataType_Float16Vector, true},

{indexparamcheck.IndexSparseWand, schemapb.DataType_SparseFloatVector, true},
{indexparamcheck.IndexSparseWand, schemapb.DataType_FloatVector, false},
{indexparamcheck.IndexSparseWand, schemapb.DataType_Float16Vector, false},
{"SPARSE_WAND", schemapb.DataType_SparseFloatVector, true},
{"SPARSE_WAND", schemapb.DataType_FloatVector, false},
{"SPARSE_WAND", schemapb.DataType_Float16Vector, false},

{indexparamcheck.IndexGpuBF, schemapb.DataType_FloatVector, true},
{indexparamcheck.IndexGpuBF, schemapb.DataType_Float16Vector, false},
{indexparamcheck.IndexGpuBF, schemapb.DataType_BinaryVector, false},
{"GPU_BRUTE_FORCE", schemapb.DataType_FloatVector, true},
{"GPU_BRUTE_FORCE", schemapb.DataType_Float16Vector, false},
{"GPU_BRUTE_FORCE", schemapb.DataType_BinaryVector, false},

{indexparamcheck.IndexFaissBinIvfFlat, schemapb.DataType_BinaryVector, true},
{indexparamcheck.IndexFaissBinIvfFlat, schemapb.DataType_FloatVector, false},
{"BIN_IVF_FLAT", schemapb.DataType_BinaryVector, true},
{"BIN_IVF_FLAT", schemapb.DataType_FloatVector, false},

{indexparamcheck.IndexDISKANN, schemapb.DataType_FloatVector, true},
{indexparamcheck.IndexDISKANN, schemapb.DataType_Float16Vector, true},
{indexparamcheck.IndexDISKANN, schemapb.DataType_BFloat16Vector, true},
{indexparamcheck.IndexDISKANN, schemapb.DataType_BinaryVector, false},
{"DISKANN", schemapb.DataType_FloatVector, true},
{"DISKANN", schemapb.DataType_Float16Vector, true},
{"DISKANN", schemapb.DataType_BFloat16Vector, true},
{"DISKANN", schemapb.DataType_BinaryVector, false},
}

for _, test := range cases {
Expand Down
Loading

0 comments on commit bc2b7ec

Please sign in to comment.