Skip to content

Commit

Permalink
add index config check
Browse files Browse the repository at this point in the history
Signed-off-by: xianliang.li <[email protected]>
  • Loading branch information
foxspy committed Aug 29, 2024
1 parent 746cb67 commit b524413
Show file tree
Hide file tree
Showing 96 changed files with 849 additions and 402 deletions.
2 changes: 1 addition & 1 deletion cmd/tools/migration/mmap/mmap_230_240.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ package mmap
import (
"context"
"fmt"
"github.com/milvus-io/milvus/internal/util/indexparamcheck"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/metastore"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/rootcoord"
"github.com/milvus-io/milvus/internal/tso"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/indexparamcheck"
)

// In Milvus 2.3.x, querynode.MmapDirPath is used to enable mmap and save mmap files.
Expand Down
20 changes: 20 additions & 0 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1044,3 +1044,23 @@ streamingNode:
serverMaxRecvSize: 268435456 # The maximum size of each RPC request that the streamingNode can receive, unit: byte
clientMaxSendSize: 268435456 # The maximum size of each RPC request that the clients on streamingNode can send, unit: byte
clientMaxRecvSize: 268435456 # The maximum size of each RPC request that the clients on streamingNode can receive, unit: byte

knowhere:
enable: true
HNSW:
build:
efConstruction : 360
M: 30
search:
ef: 30
DISKANN:
build:
max_degree: 56
search_list_size: 100
pq_code_budget_gb_ratio: 0.125
search_cache_budget_gb_ratio: 0.1
beam_width_ratio: 4
load:
cacheRatio: 0.1
search:
beamRatio: 4.0
52 changes: 52 additions & 0 deletions internal/core/src/segcore/vector_index_c.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,61 @@

#include "segcore/vector_index_c.h"

#include "common/Types.h"
#include "common/EasyAssert.h"
#include "knowhere/utils.h"
#include "knowhere/config.h"
#include "knowhere/version.h"
#include "index/Meta.h"
#include "index/IndexFactory.h"
#include "pb/index_cgo_msg.pb.h"

CStatus
ValidateIndexParams(const char* index_type, enum CDataType data_type, const uint8_t* serialized_index_params, const uint64_t length) {
try {
auto index_params =
std::make_unique<milvus::proto::indexcgo::IndexParams>();
auto res = index_params->ParseFromArray(serialized_index_params, length);
AssertInfo(res, "Unmarshall index params failed");

knowhere::Json json;

for (size_t i = 0; i < index_params->params_size(); i++) {
auto& param = index_params->params(i);
json[param.key()] = param.value();
}

milvus::DataType dataType(static_cast<milvus::DataType>(data_type));

knowhere::Status status;
std::string error_msg;
if (dataType == milvus::DataType::VECTOR_BINARY) {
status = knowhere::CheckConfig<knowhere::bin1>(index_type, knowhere::Version::GetCurrentVersion().VersionNumber(), json, knowhere::PARAM_TYPE::TRAIN, error_msg);
} else if (dataType == milvus::DataType::VECTOR_FLOAT) {
status = knowhere::CheckConfig<knowhere::fp32>(index_type, knowhere::Version::GetCurrentVersion().VersionNumber(), json, knowhere::PARAM_TYPE::TRAIN, error_msg);
} else if (dataType == milvus::DataType::VECTOR_BFLOAT16) {
status = knowhere::CheckConfig<knowhere::bf16>(index_type, knowhere::Version::GetCurrentVersion().VersionNumber(), json, knowhere::PARAM_TYPE::TRAIN, error_msg);
} else if (dataType == milvus::DataType::VECTOR_FLOAT16) {
status = knowhere::CheckConfig<knowhere::fp16>(index_type, knowhere::Version::GetCurrentVersion().VersionNumber(), json, knowhere::PARAM_TYPE::TRAIN, error_msg);
} else if (dataType == milvus::DataType::VECTOR_SPARSE_FLOAT) {
status = knowhere::CheckConfig<knowhere::fp32>(index_type, knowhere::Version::GetCurrentVersion().VersionNumber(), json, knowhere::PARAM_TYPE::TRAIN, error_msg);
} else {
status = knowhere::Status::invalid_args;
}
CStatus cStatus;
if (status == knowhere::Status::success) {
cStatus.error_code = milvus::Success;
cStatus.error_msg = "";
} else {
cStatus.error_code = milvus::ConfigInvalid;
cStatus.error_msg = error_msg.c_str();
}
} catch (std::exception& e) {
auto status = CStatus();
status.error_code = milvus::UnexpectedError;
status.error_msg = strdup(e.what());
}
}

int
GetIndexListSize() {
Expand Down
4 changes: 4 additions & 0 deletions internal/core/src/segcore/vector_index_c.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ extern "C" {
#endif

#include <stdbool.h>
#include "common/type_c.h"

CStatus
ValidateIndexParams(const char* index_type, enum CDataType data_type, const uint8_t* index_params, const uint64_t length);

int
GetIndexListSize();
Expand Down
2 changes: 2 additions & 0 deletions internal/core/thirdparty/knowhere/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,5 @@ endif()
# get prometheus COMPILE_OPTIONS
get_property( var DIRECTORY "${knowhere_SOURCE_DIR}" PROPERTY COMPILE_OPTIONS )
message( STATUS "knowhere src compile options: ${var}" )

set( KNOWHERE_INCLUDE_DIR ${knowhere_SOURCE_DIR}/include CACHE INTERNAL "Path to knowhere include directory" )
2 changes: 1 addition & 1 deletion internal/datacoord/compaction_trigger_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package datacoord

import (
"context"
"github.com/milvus-io/milvus/internal/util/indexparamcheck"
"sync"
"time"

Expand All @@ -29,7 +30,6 @@ import (
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/indexparamcheck"
"github.com/milvus-io/milvus/pkg/util/lock"
"github.com/milvus-io/milvus/pkg/util/logutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
Expand Down
2 changes: 1 addition & 1 deletion internal/datacoord/index_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package datacoord
import (
"context"
"fmt"
"github.com/milvus-io/milvus/internal/util/indexparamcheck"
"strconv"
"sync"

Expand All @@ -36,7 +37,6 @@ import (
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/indexparamcheck"
"github.com/milvus-io/milvus/pkg/util/timerecord"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
Expand Down
12 changes: 6 additions & 6 deletions internal/datacoord/index_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package datacoord
import (
"context"
"fmt"
indexparamcheck2 "github.com/milvus-io/milvus/internal/util/indexparamcheck"
"time"

"github.com/samber/lo"
Expand All @@ -30,7 +31,6 @@ import (
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/indexparamcheck"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metautil"
"github.com/milvus-io/milvus/pkg/util/paramtable"
Expand Down Expand Up @@ -222,7 +222,7 @@ func (s *Server) CreateIndex(ctx context.Context, req *indexpb.CreateIndexReques
metrics.IndexRequestCounter.WithLabelValues(metrics.FailLabel).Inc()
return merr.Status(err), nil
}
if indexparamcheck.GetVecIndexMgrInstance().IsDiskANN(GetIndexType(req.IndexParams)) && !s.indexNodeManager.ClientSupportDisk() {
if indexparamcheck2.GetVecIndexMgrInstance().IsDiskANN(GetIndexType(req.IndexParams)) && !s.indexNodeManager.ClientSupportDisk() {
errMsg := "all IndexNodes do not support disk indexes, please verify"
log.Warn(errMsg)
err = merr.WrapErrIndexNotSupported(GetIndexType(req.IndexParams))
Expand Down Expand Up @@ -273,16 +273,16 @@ func ValidateIndexParams(index *model.Index) error {
indexType := GetIndexType(index.IndexParams)
indexParams := funcutil.KeyValuePair2Map(index.IndexParams)
userIndexParams := funcutil.KeyValuePair2Map(index.UserIndexParams)
if err := indexparamcheck.ValidateMmapIndexParams(indexType, indexParams); err != nil {
if err := indexparamcheck2.ValidateMmapIndexParams(indexType, indexParams); err != nil {
return merr.WrapErrParameterInvalidMsg("invalid mmap index params", err.Error())
}
if err := indexparamcheck.ValidateMmapIndexParams(indexType, userIndexParams); err != nil {
if err := indexparamcheck2.ValidateMmapIndexParams(indexType, userIndexParams); err != nil {
return merr.WrapErrParameterInvalidMsg("invalid mmap user index params", err.Error())
}
if err := indexparamcheck.ValidateOffsetCacheIndexParams(indexType, indexParams); err != nil {
if err := indexparamcheck2.ValidateOffsetCacheIndexParams(indexType, indexParams); err != nil {
return merr.WrapErrParameterInvalidMsg("invalid offset cache index params", err.Error())
}
if err := indexparamcheck.ValidateOffsetCacheIndexParams(indexType, userIndexParams); err != nil {
if err := indexparamcheck2.ValidateOffsetCacheIndexParams(indexType, userIndexParams); err != nil {
return merr.WrapErrParameterInvalidMsg("invalid offset cache index params", err.Error())
}
return nil
Expand Down
7 changes: 4 additions & 3 deletions internal/datacoord/index_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package datacoord
import (
"context"
"fmt"
indexparamcheck2 "github.com/milvus-io/milvus/internal/util/indexparamcheck"
"testing"
"time"

Expand Down Expand Up @@ -2427,7 +2428,7 @@ func TestValidateIndexParams(t *testing.T) {
IndexParams: []*commonpb.KeyValuePair{
{
Key: common.IndexTypeKey,
Value: indexparamcheck.AutoIndex,
Value: indexparamcheck2.AutoIndex,
},
{
Key: common.MmapEnabledKey,
Expand All @@ -2444,7 +2445,7 @@ func TestValidateIndexParams(t *testing.T) {
IndexParams: []*commonpb.KeyValuePair{
{
Key: common.IndexTypeKey,
Value: indexparamcheck.AutoIndex,
Value: indexparamcheck2.AutoIndex,
},
{
Key: common.MmapEnabledKey,
Expand All @@ -2461,7 +2462,7 @@ func TestValidateIndexParams(t *testing.T) {
IndexParams: []*commonpb.KeyValuePair{
{
Key: common.IndexTypeKey,
Value: indexparamcheck.AutoIndex,
Value: indexparamcheck2.AutoIndex,
},
},
UserIndexParams: []*commonpb.KeyValuePair{
Expand Down
12 changes: 12 additions & 0 deletions internal/datacoord/task_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package datacoord

import (
"context"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"path"
"time"

Expand Down Expand Up @@ -168,6 +169,17 @@ func (it *indexBuildTask) PreCheck(ctx context.Context, dependency *taskSchedule

fieldID := dependency.meta.indexMeta.GetFieldIDByIndexID(segIndex.CollectionID, segIndex.IndexID)
binlogIDs := getBinLogIDs(segment, fieldID)
if Params.IndexEngineConfig.Enable.GetAsBool() {
var ret error
indexParams, ret = Params.IndexEngineConfig.MergeRequestParam(GetIndexType(indexParams), paramtable.BuildStage, indexParams)

if ret != nil {
log.Ctx(ctx).Warn("failed to construct index build params", zap.Int64("taskID", it.taskID), zap.Error(ret))
it.SetState(indexpb.JobState_JobStateInit, ret.Error())
return true
}
}

if isDiskANNIndex(GetIndexType(indexParams)) {
var err error
indexParams, err = indexparams.UpdateDiskIndexBuildParams(Params, indexParams)
Expand Down
2 changes: 1 addition & 1 deletion internal/datacoord/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package datacoord
import (
"context"
"fmt"
"github.com/milvus-io/milvus/internal/util/indexparamcheck"
"strconv"
"strings"
"time"
Expand All @@ -33,7 +34,6 @@ import (
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/indexparamcheck"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
Expand Down
9 changes: 7 additions & 2 deletions internal/indexnode/task_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package indexnode
import (
"context"
"fmt"
"github.com/milvus-io/milvus/internal/util/indexparamcheck"
"strconv"
"strings"
"time"
Expand All @@ -35,7 +36,6 @@ import (
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/indexparamcheck"
"github.com/milvus-io/milvus/pkg/util/indexparams"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metautil"
Expand Down Expand Up @@ -208,6 +208,7 @@ func (it *indexBuildTask) Execute(ctx context.Context) error {
zap.Int32("currentIndexVersion", it.req.GetCurrentIndexVersion()))

indexType := it.newIndexParams[common.IndexTypeKey]
var fieldDataSize uint64
if indexparamcheck.GetVecIndexMgrInstance().IsDiskANN(indexType) {
// check index node support disk index
if !Params.IndexNodeCfg.EnableDisk.GetAsBool() {
Expand All @@ -223,7 +224,7 @@ func (it *indexBuildTask) Execute(ctx context.Context) error {
log.Warn("IndexNode get local used size failed")
return err
}
fieldDataSize, err := estimateFieldDataSize(it.req.GetDim(), it.req.GetNumRows(), it.req.GetField().GetDataType())
fieldDataSize, err = estimateFieldDataSize(it.req.GetDim(), it.req.GetNumRows(), it.req.GetField().GetDataType())
if err != nil {
log.Warn("IndexNode get local used size failed")
return err
Expand All @@ -245,6 +246,10 @@ func (it *indexBuildTask) Execute(ctx context.Context) error {
}
}

if Params.IndexEngineConfig.Enable.GetAsBool() {
it.newIndexParams, _ = Params.IndexEngineConfig.MergeWithResource(fieldDataSize, it.newIndexParams)
}

storageConfig := &indexcgopb.StorageConfig{
Address: it.req.GetStorageConfig().GetAddress(),
AccessKeyID: it.req.GetStorageConfig().GetAccessKeyID(),
Expand Down
Loading

0 comments on commit b524413

Please sign in to comment.