Skip to content

Commit

Permalink
add index config check
Browse files Browse the repository at this point in the history
Signed-off-by: xianliang.li <[email protected]>
  • Loading branch information
foxspy committed Aug 30, 2024
1 parent 746cb67 commit d79fcce
Show file tree
Hide file tree
Showing 99 changed files with 833 additions and 411 deletions.
2 changes: 1 addition & 1 deletion cmd/tools/migration/mmap/mmap_230_240.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/rootcoord"
"github.com/milvus-io/milvus/internal/tso"
"github.com/milvus-io/milvus/internal/util/indexparamcheck"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/indexparamcheck"
)

// In Milvus 2.3.x, querynode.MmapDirPath is used to enable mmap and save mmap files.
Expand Down
20 changes: 20 additions & 0 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1044,3 +1044,23 @@ streamingNode:
serverMaxRecvSize: 268435456 # The maximum size of each RPC request that the streamingNode can receive, unit: byte
clientMaxSendSize: 268435456 # The maximum size of each RPC request that the clients on streamingNode can send, unit: byte
clientMaxRecvSize: 268435456 # The maximum size of each RPC request that the clients on streamingNode can receive, unit: byte

knowhere:
enable: true
HNSW:
build:
efConstruction : 360
M: 30
search:
ef: 30
DISKANN:
build:
max_degree: 56
search_list_size: 100
pq_code_budget_gb_ratio: 0.125
search_cache_budget_gb_ratio: 0.1
beam_width_ratio: 4
load:
cacheRatio: 0.1
search:
beamRatio: 4.0
9 changes: 2 additions & 7 deletions internal/core/src/index/Index.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,8 @@ class IndexBase {
virtual const bool
HasRawData() const = 0;

bool
IsMmapSupported() const {
return knowhere::IndexFactory::Instance().FeatureCheck(index_type_, knowhere::feature::MMAP) ||
// support mmap for bitmap/hybrid index
index_type_ == milvus::index::BITMAP_INDEX_TYPE ||
index_type_ == milvus::index::HYBRID_INDEX_TYPE;
}
virtual bool
IsMmapSupported() const = 0;

const IndexType&
Type() const {
Expand Down
6 changes: 6 additions & 0 deletions internal/core/src/index/ScalarIndex.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,12 @@ class ScalarIndex : public IndexBase {
PanicInfo(Unsupported, "pattern match is not supported");
}

virtual bool
IsMmapSupported() const {
return index_type_ == milvus::index::BITMAP_INDEX_TYPE ||
index_type_ == milvus::index::HYBRID_INDEX_TYPE;
}

virtual int64_t
Size() = 0;

Expand Down
5 changes: 5 additions & 0 deletions internal/core/src/index/VectorIndex.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,11 @@ class VectorIndex : public IndexBase {
err_msg);
}

virtual bool
IsMmapSupported() const {
return knowhere::IndexFactory::Instance().FeatureCheck(index_type_, knowhere::feature::MMAP);
}

knowhere::Json
PrepareSearchParams(const SearchInfo& search_info) const {
knowhere::Json search_cfg = search_info.search_params_;
Expand Down
54 changes: 54 additions & 0 deletions internal/core/src/segcore/vector_index_c.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,63 @@

#include "segcore/vector_index_c.h"

#include "common/Types.h"
#include "common/EasyAssert.h"
#include "knowhere/utils.h"
#include "knowhere/config.h"
#include "knowhere/version.h"
#include "index/Meta.h"
#include "index/IndexFactory.h"
#include "pb/index_cgo_msg.pb.h"

CStatus
ValidateIndexParams(const char* index_type, enum CDataType data_type, const uint8_t* serialized_index_params, const uint64_t length) {
try {
auto index_params =
std::make_unique<milvus::proto::indexcgo::IndexParams>();
auto res = index_params->ParseFromArray(serialized_index_params, length);
AssertInfo(res, "Unmarshall index params failed");

knowhere::Json json;

for (size_t i = 0; i < index_params->params_size(); i++) {
auto& param = index_params->params(i);
json[param.key()] = param.value();
}

milvus::DataType dataType(static_cast<milvus::DataType>(data_type));

knowhere::Status status;
std::string error_msg;
if (dataType == milvus::DataType::VECTOR_BINARY) {
status = knowhere::CheckConfig<knowhere::bin1>(index_type, knowhere::Version::GetCurrentVersion().VersionNumber(), json, knowhere::PARAM_TYPE::TRAIN, error_msg);
} else if (dataType == milvus::DataType::VECTOR_FLOAT) {
status = knowhere::CheckConfig<knowhere::fp32>(index_type, knowhere::Version::GetCurrentVersion().VersionNumber(), json, knowhere::PARAM_TYPE::TRAIN, error_msg);
} else if (dataType == milvus::DataType::VECTOR_BFLOAT16) {
status = knowhere::CheckConfig<knowhere::bf16>(index_type, knowhere::Version::GetCurrentVersion().VersionNumber(), json, knowhere::PARAM_TYPE::TRAIN, error_msg);
} else if (dataType == milvus::DataType::VECTOR_FLOAT16) {
status = knowhere::CheckConfig<knowhere::fp16>(index_type, knowhere::Version::GetCurrentVersion().VersionNumber(), json, knowhere::PARAM_TYPE::TRAIN, error_msg);
} else if (dataType == milvus::DataType::VECTOR_SPARSE_FLOAT) {
status = knowhere::CheckConfig<knowhere::fp32>(index_type, knowhere::Version::GetCurrentVersion().VersionNumber(), json, knowhere::PARAM_TYPE::TRAIN, error_msg);
} else {
status = knowhere::Status::invalid_args;
}
CStatus cStatus;
if (status == knowhere::Status::success) {
cStatus.error_code = milvus::Success;
cStatus.error_msg = "";
} else {
cStatus.error_code = milvus::ConfigInvalid;
cStatus.error_msg = error_msg.c_str();
}
return cStatus;
} catch (std::exception& e) {
auto cStatus = CStatus();
cStatus.error_code = milvus::UnexpectedError;
cStatus.error_msg = strdup(e.what());
return cStatus;
}
}

int
GetIndexListSize() {
Expand Down
4 changes: 4 additions & 0 deletions internal/core/src/segcore/vector_index_c.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ extern "C" {
#endif

#include <stdbool.h>
#include "common/type_c.h"

CStatus
ValidateIndexParams(const char* index_type, enum CDataType data_type, const uint8_t* index_params, const uint64_t length);

int
GetIndexListSize();
Expand Down
2 changes: 2 additions & 0 deletions internal/core/thirdparty/knowhere/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,5 @@ endif()
# get prometheus COMPILE_OPTIONS
get_property( var DIRECTORY "${knowhere_SOURCE_DIR}" PROPERTY COMPILE_OPTIONS )
message( STATUS "knowhere src compile options: ${var}" )

set( KNOWHERE_INCLUDE_DIR ${knowhere_SOURCE_DIR}/include CACHE INTERNAL "Path to knowhere include directory" )
2 changes: 1 addition & 1 deletion internal/datacoord/compaction_trigger_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ import (
"github.com/milvus-io/milvus/internal/datacoord/allocator"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/util/indexparamcheck"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/indexparamcheck"
"github.com/milvus-io/milvus/pkg/util/lock"
"github.com/milvus-io/milvus/pkg/util/logutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
Expand Down
2 changes: 1 addition & 1 deletion internal/datacoord/index_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ import (
"github.com/milvus-io/milvus/internal/metastore"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/util/indexparamcheck"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/indexparamcheck"
"github.com/milvus-io/milvus/pkg/util/timerecord"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
Expand Down
12 changes: 6 additions & 6 deletions internal/datacoord/index_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/proto/indexpb"
indexparamcheck2 "github.com/milvus-io/milvus/internal/util/indexparamcheck"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/indexparamcheck"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metautil"
"github.com/milvus-io/milvus/pkg/util/paramtable"
Expand Down Expand Up @@ -222,7 +222,7 @@ func (s *Server) CreateIndex(ctx context.Context, req *indexpb.CreateIndexReques
metrics.IndexRequestCounter.WithLabelValues(metrics.FailLabel).Inc()
return merr.Status(err), nil
}
if indexparamcheck.GetVecIndexMgrInstance().IsDiskANN(GetIndexType(req.IndexParams)) && !s.indexNodeManager.ClientSupportDisk() {
if indexparamcheck2.GetVecIndexMgrInstance().IsDiskANN(GetIndexType(req.IndexParams)) && !s.indexNodeManager.ClientSupportDisk() {
errMsg := "all IndexNodes do not support disk indexes, please verify"
log.Warn(errMsg)
err = merr.WrapErrIndexNotSupported(GetIndexType(req.IndexParams))
Expand Down Expand Up @@ -273,16 +273,16 @@ func ValidateIndexParams(index *model.Index) error {
indexType := GetIndexType(index.IndexParams)
indexParams := funcutil.KeyValuePair2Map(index.IndexParams)
userIndexParams := funcutil.KeyValuePair2Map(index.UserIndexParams)
if err := indexparamcheck.ValidateMmapIndexParams(indexType, indexParams); err != nil {
if err := indexparamcheck2.ValidateMmapIndexParams(indexType, indexParams); err != nil {
return merr.WrapErrParameterInvalidMsg("invalid mmap index params", err.Error())
}
if err := indexparamcheck.ValidateMmapIndexParams(indexType, userIndexParams); err != nil {
if err := indexparamcheck2.ValidateMmapIndexParams(indexType, userIndexParams); err != nil {
return merr.WrapErrParameterInvalidMsg("invalid mmap user index params", err.Error())
}
if err := indexparamcheck.ValidateOffsetCacheIndexParams(indexType, indexParams); err != nil {
if err := indexparamcheck2.ValidateOffsetCacheIndexParams(indexType, indexParams); err != nil {
return merr.WrapErrParameterInvalidMsg("invalid offset cache index params", err.Error())
}
if err := indexparamcheck.ValidateOffsetCacheIndexParams(indexType, userIndexParams); err != nil {
if err := indexparamcheck2.ValidateOffsetCacheIndexParams(indexType, userIndexParams); err != nil {
return merr.WrapErrParameterInvalidMsg("invalid offset cache index params", err.Error())
}
return nil
Expand Down
6 changes: 3 additions & 3 deletions internal/datacoord/index_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/indexparamcheck"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/indexparamcheck"
"github.com/milvus-io/milvus/pkg/util/merr"
)

Expand Down Expand Up @@ -619,13 +619,13 @@ func TestServer_AlterIndex(t *testing.T) {
s.stateCode.Store(commonpb.StateCode_Healthy)

t.Run("mmap_unsupported", func(t *testing.T) {
indexParams[0].Value = indexparamcheck.IndexRaftCagra
indexParams[0].Value = "GPU_CAGRA"

resp, err := s.AlterIndex(ctx, req)
assert.NoError(t, err)
assert.ErrorIs(t, merr.CheckRPCCall(resp, err), merr.ErrParameterInvalid)

indexParams[0].Value = indexparamcheck.IndexFaissIvfFlat
indexParams[0].Value = "IVF_FLAT"
})

t.Run("param_value_invalied", func(t *testing.T) {
Expand Down
12 changes: 12 additions & 0 deletions internal/datacoord/task_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/indexparams"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

Expand Down Expand Up @@ -168,6 +169,17 @@ func (it *indexBuildTask) PreCheck(ctx context.Context, dependency *taskSchedule

fieldID := dependency.meta.indexMeta.GetFieldIDByIndexID(segIndex.CollectionID, segIndex.IndexID)
binlogIDs := getBinLogIDs(segment, fieldID)
if Params.IndexEngineConfig.Enable.GetAsBool() {
var ret error
indexParams, ret = Params.IndexEngineConfig.MergeRequestParam(GetIndexType(indexParams), paramtable.BuildStage, indexParams)

if ret != nil {
log.Ctx(ctx).Warn("failed to construct index build params", zap.Int64("taskID", it.taskID), zap.Error(ret))
it.SetState(indexpb.JobState_JobStateInit, ret.Error())
return true
}
}

if isDiskANNIndex(GetIndexType(indexParams)) {
var err error
indexParams, err = indexparams.UpdateDiskIndexBuildParams(Params, indexParams)
Expand Down
2 changes: 1 addition & 1 deletion internal/datacoord/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/util/indexparamcheck"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/indexparamcheck"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
Expand Down
9 changes: 7 additions & 2 deletions internal/indexnode/task_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ import (
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/indexcgowrapper"
"github.com/milvus-io/milvus/internal/util/indexparamcheck"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/indexparamcheck"
"github.com/milvus-io/milvus/pkg/util/indexparams"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metautil"
Expand Down Expand Up @@ -208,6 +208,7 @@ func (it *indexBuildTask) Execute(ctx context.Context) error {
zap.Int32("currentIndexVersion", it.req.GetCurrentIndexVersion()))

indexType := it.newIndexParams[common.IndexTypeKey]
var fieldDataSize uint64
if indexparamcheck.GetVecIndexMgrInstance().IsDiskANN(indexType) {
// check index node support disk index
if !Params.IndexNodeCfg.EnableDisk.GetAsBool() {
Expand All @@ -223,7 +224,7 @@ func (it *indexBuildTask) Execute(ctx context.Context) error {
log.Warn("IndexNode get local used size failed")
return err
}
fieldDataSize, err := estimateFieldDataSize(it.req.GetDim(), it.req.GetNumRows(), it.req.GetField().GetDataType())
fieldDataSize, err = estimateFieldDataSize(it.req.GetDim(), it.req.GetNumRows(), it.req.GetField().GetDataType())
if err != nil {
log.Warn("IndexNode get local used size failed")
return err
Expand All @@ -245,6 +246,10 @@ func (it *indexBuildTask) Execute(ctx context.Context) error {
}
}

if Params.IndexEngineConfig.Enable.GetAsBool() {
it.newIndexParams, _ = Params.IndexEngineConfig.MergeWithResource(fieldDataSize, it.newIndexParams)
}

storageConfig := &indexcgopb.StorageConfig{
Address: it.req.GetStorageConfig().GetAddress(),
AccessKeyID: it.req.GetStorageConfig().GetAccessKeyID(),
Expand Down
16 changes: 10 additions & 6 deletions internal/proxy/task_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/types"
indexparamcheck "github.com/milvus-io/milvus/internal/util/indexparamcheck"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/indexparamcheck"
"github.com/milvus-io/milvus/pkg/util/indexparams"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metric"
Expand Down Expand Up @@ -297,6 +297,13 @@ func (cit *createIndexTask) parseIndexParams() error {
if !exist {
return fmt.Errorf("IndexType not specified")
}
if Params.IndexEngineConfig.Enable.GetAsBool() {
var err error
indexParamsMap, err = Params.IndexEngineConfig.MergeRequestMapParam(indexType, paramtable.BuildStage, indexParamsMap)
if err != nil {
return err
}
}
if indexparamcheck.GetVecIndexMgrInstance().IsDiskANN(indexType) {
err := indexparams.FillDiskIndexParams(Params, indexParamsMap)
if err != nil {
Expand Down Expand Up @@ -416,17 +423,14 @@ func checkTrain(field *schemapb.FieldSchema, indexParams map[string]string) erro
if err := fillDimension(field, indexParams); err != nil {
return err
}
} else {
// used only for checker, should be deleted after checking
indexParams[IsSparseKey] = "true"
}

if err := checker.CheckValidDataType(field); err != nil {
if err := checker.CheckValidDataType(indexType, field); err != nil {
log.Info("create index with invalid data type", zap.Error(err), zap.String("data_type", field.GetDataType().String()))
return err
}

if err := checker.CheckTrain(indexParams); err != nil {
if err := checker.CheckTrain(field.DataType, indexParams); err != nil {
log.Info("create index with invalid parameters", zap.Error(err))
return err
}
Expand Down
2 changes: 1 addition & 1 deletion internal/proxy/task_index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ import (
"github.com/milvus-io/milvus/internal/mocks"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/util/indexparamcheck"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/indexparamcheck"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
Expand Down
Loading

0 comments on commit d79fcce

Please sign in to comment.