Skip to content

Commit

Permalink
Merge branch 'milvus-io:master' into bugfix/fix-err-2-new-variable
Browse files Browse the repository at this point in the history
  • Loading branch information
fengjun2016 authored Sep 2, 2024
2 parents 72bc3da + 3698c53 commit ee19203
Show file tree
Hide file tree
Showing 173 changed files with 7,701 additions and 1,614 deletions.
4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ ifeq (${ENABLE_AZURE}, false)
AZURE_OPTION := -Z
endif

milvus: build-cpp print-build-info
milvus: build-cpp print-build-info build-go

build-go:
@echo "Building Milvus ..."
@source $(PWD)/scripts/setenv.sh && \
mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && \
Expand Down
177 changes: 173 additions & 4 deletions internal/core/src/common/FieldDataInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,175 @@ class FieldDataBase {
const bool nullable_;
};

template <typename Type>
class FieldBitsetImpl : public FieldDataBase {
public:
FieldBitsetImpl() = delete;
FieldBitsetImpl(FieldBitsetImpl&&) = delete;
FieldBitsetImpl(const FieldBitsetImpl&) = delete;

FieldBitsetImpl&
operator=(FieldBitsetImpl&&) = delete;
FieldBitsetImpl&
operator=(const FieldBitsetImpl&) = delete;

explicit FieldBitsetImpl(DataType data_type, TargetBitmap&& bitmap)
: FieldDataBase(data_type, false), length_(bitmap.size()) {
data_ = std::move(bitmap).into();
cap_ = data_.size() * sizeof(Type) * 8;
Assert(cap_ >= length_);
}

// FillFieldData used for read and write with storage,
// no need to implement for bitset which used in runtime process.
void
FillFieldData(const void* source, ssize_t element_count) override {
PanicInfo(NotImplemented,
"FillFieldData(const void* source, ssize_t element_count)"
"not implemented for bitset");
}

void
FillFieldData(const void* field_data,
const uint8_t* valid_data,
ssize_t element_count) override {
PanicInfo(NotImplemented,
"FillFieldData(const void* field_data, "
"const uint8_t* valid_data, ssize_t element_count)"
"not implemented for bitset");
}

void
FillFieldData(const std::shared_ptr<arrow::Array> array) override {
PanicInfo(NotImplemented,
"FillFieldData(const std::shared_ptr<arrow::Array>& array) "
"not implemented for bitset");
}

virtual void
FillFieldData(const std::shared_ptr<arrow::StringArray>& array) {
PanicInfo(NotImplemented,
"FillFieldData(const std::shared_ptr<arrow::StringArray>& "
"array) not implemented for bitset");
}

virtual void
FillFieldData(const std::shared_ptr<arrow::BinaryArray>& array) {
PanicInfo(NotImplemented,
"FillFieldData(const std::shared_ptr<arrow::BinaryArray>& "
"array) not implemented for bitset");
}

std::string
GetName() const {
return "FieldBitsetImpl";
}

void*
Data() override {
return data_.data();
}

uint8_t*
ValidData() override {
PanicInfo(NotImplemented, "ValidData() not implemented for bitset");
}

const void*
RawValue(ssize_t offset) const override {
PanicInfo(NotImplemented,
"RawValue(ssize_t offset) not implemented for bitset");
}

int64_t
Size() const override {
return DataSize() + ValidDataSize();
}

int64_t
DataSize() const override {
return sizeof(Type) * get_num_rows();
}

int64_t
DataSize(ssize_t offset) const override {
return sizeof(Type);
}

int64_t
ValidDataSize() const override {
return 0;
}

size_t
Length() const override {
return get_length();
}

bool
IsFull() const override {
auto cap_num_rows = get_num_rows();
auto filled_num_rows = get_length();
return cap_num_rows == filled_num_rows;
}

bool
IsNullable() const override {
return false;
}

void
Reserve(size_t cap) override {
std::lock_guard lck(cap_mutex_);
AssertInfo(cap % (8 * sizeof(Type)) == 0,
"Reverse bitset size must be a multiple of {}",
8 * sizeof(Type));
if (cap > cap_) {
data_.resize(cap / (8 * sizeof(Type)));
cap_ = cap;
}
}

public:
int64_t
get_num_rows() const override {
std::shared_lock lck(cap_mutex_);
return cap_;
}

size_t
get_length() const {
std::shared_lock lck(length_mutex_);
return length_;
}

int64_t
get_dim() const override {
return 1;
}

int64_t
get_null_count() const override {
PanicInfo(NotImplemented,
"get_null_count() not implemented for bitset");
}

bool
is_valid(ssize_t offset) const override {
PanicInfo(NotImplemented,
"is_valid(ssize_t offset) not implemented for bitset");
}

private:
FixedVector<Type> data_{};
// capacity that data_ can store
int64_t cap_;
mutable std::shared_mutex cap_mutex_;
// number of actual elements in data_
size_t length_{};
mutable std::shared_mutex length_mutex_;
};

template <typename Type, bool is_type_entire_row = false>
class FieldDataImpl : public FieldDataBase {
public:
Expand Down Expand Up @@ -159,8 +328,8 @@ class FieldDataImpl : public FieldDataBase {
: FieldDataBase(type, nullable), dim_(is_type_entire_row ? 1 : dim) {
AssertInfo(!nullable, "need to fill valid_data when nullable is true");
data_ = std::move(data);
Assert(data.size() % dim == 0);
num_rows_ = data.size() / dim;
Assert(data_.size() % dim == 0);
num_rows_ = data_.size() / dim;
}

explicit FieldDataImpl(size_t dim,
Expand All @@ -173,8 +342,8 @@ class FieldDataImpl : public FieldDataBase {
"no need to fill valid_data when nullable is false");
data_ = std::move(data);
valid_data_ = std::move(valid_data);
Assert(data.size() % dim == 0);
num_rows_ = data.size() / dim;
Assert(data_.size() % dim == 0);
num_rows_ = data_.size() / dim;
}

void
Expand Down
1 change: 1 addition & 0 deletions internal/core/src/common/QueryInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ namespace milvus {
struct SearchInfo {
int64_t topk_{0};
int64_t group_size_{1};
bool group_strict_size_{false};
int64_t round_decimal_{0};
FieldId field_id_;
MetricType metric_type_;
Expand Down
4 changes: 2 additions & 2 deletions internal/core/src/common/Vector.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ class ColumnVector final : public BaseVector {
// the size is the number of bits
ColumnVector(TargetBitmap&& bitmap)
: BaseVector(DataType::INT8, bitmap.size()) {
values_ = std::make_shared<FieldDataImpl<uint8_t, false>>(
bitmap.size(), DataType::INT8, false, std::move(bitmap).into());
values_ = std::make_shared<FieldBitsetImpl<uint8_t>>(DataType::INT8,
std::move(bitmap));
}

virtual ~ColumnVector() override {
Expand Down
1 change: 1 addition & 0 deletions internal/core/src/query/PlanProto.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ ProtoParser::PlanNodeFromProto(const planpb::PlanNode& plan_node_proto) {
search_info.group_size_ = query_info_proto.group_size() > 0
? query_info_proto.group_size()
: 1;
search_info.group_strict_size_ = query_info_proto.group_strict_size();
}

auto plan_node = [&]() -> std::unique_ptr<VectorPlanNode> {
Expand Down
15 changes: 12 additions & 3 deletions internal/core/src/query/groupby/SearchGroupByOperator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ SearchGroupBy(const std::vector<std::shared_ptr<VectorIterator>>& iterators,
GroupIteratorsByType<int8_t>(iterators,
search_info.topk_,
search_info.group_size_,
search_info.group_strict_size_,
*dataGetter,
group_by_values,
seg_offsets,
Expand All @@ -58,6 +59,7 @@ SearchGroupBy(const std::vector<std::shared_ptr<VectorIterator>>& iterators,
GroupIteratorsByType<int16_t>(iterators,
search_info.topk_,
search_info.group_size_,
search_info.group_strict_size_,
*dataGetter,
group_by_values,
seg_offsets,
Expand All @@ -72,6 +74,7 @@ SearchGroupBy(const std::vector<std::shared_ptr<VectorIterator>>& iterators,
GroupIteratorsByType<int32_t>(iterators,
search_info.topk_,
search_info.group_size_,
search_info.group_strict_size_,
*dataGetter,
group_by_values,
seg_offsets,
Expand All @@ -86,6 +89,7 @@ SearchGroupBy(const std::vector<std::shared_ptr<VectorIterator>>& iterators,
GroupIteratorsByType<int64_t>(iterators,
search_info.topk_,
search_info.group_size_,
search_info.group_strict_size_,
*dataGetter,
group_by_values,
seg_offsets,
Expand All @@ -99,6 +103,7 @@ SearchGroupBy(const std::vector<std::shared_ptr<VectorIterator>>& iterators,
GroupIteratorsByType<bool>(iterators,
search_info.topk_,
search_info.group_size_,
search_info.group_strict_size_,
*dataGetter,
group_by_values,
seg_offsets,
Expand All @@ -113,6 +118,7 @@ SearchGroupBy(const std::vector<std::shared_ptr<VectorIterator>>& iterators,
GroupIteratorsByType<std::string>(iterators,
search_info.topk_,
search_info.group_size_,
search_info.group_strict_size_,
*dataGetter,
group_by_values,
seg_offsets,
Expand All @@ -136,6 +142,7 @@ GroupIteratorsByType(
const std::vector<std::shared_ptr<VectorIterator>>& iterators,
int64_t topK,
int64_t group_size,
bool group_strict_size,
const DataGetter<T>& data_getter,
std::vector<GroupByValueType>& group_by_values,
std::vector<int64_t>& seg_offsets,
Expand All @@ -147,6 +154,7 @@ GroupIteratorsByType(
GroupIteratorResult<T>(iterator,
topK,
group_size,
group_strict_size,
data_getter,
group_by_values,
seg_offsets,
Expand All @@ -161,13 +169,14 @@ void
GroupIteratorResult(const std::shared_ptr<VectorIterator>& iterator,
int64_t topK,
int64_t group_size,
bool group_strict_size,
const DataGetter<T>& data_getter,
std::vector<GroupByValueType>& group_by_values,
std::vector<int64_t>& offsets,
std::vector<float>& distances,
const knowhere::MetricType& metrics_type) {
//1.
GroupByMap<T> groupMap(topK, group_size);
GroupByMap<T> groupMap(topK, group_size, group_strict_size);

//2. do iteration until fill the whole map or run out of all data
//note it may enumerate all data inside a segment and can block following
Expand Down Expand Up @@ -195,8 +204,8 @@ GroupIteratorResult(const std::shared_ptr<VectorIterator>& iterator,

//4. save groupBy results
for (auto iter = res.cbegin(); iter != res.cend(); iter++) {
offsets.push_back(std::get<0>(*iter));
distances.push_back(std::get<1>(*iter));
offsets.emplace_back(std::get<0>(*iter));
distances.emplace_back(std::get<1>(*iter));
group_by_values.emplace_back(std::move(std::get<2>(*iter)));
}
}
Expand Down
28 changes: 21 additions & 7 deletions internal/core/src/query/groupby/SearchGroupByOperator.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ GroupIteratorsByType(
const std::vector<std::shared_ptr<VectorIterator>>& iterators,
int64_t topK,
int64_t group_size,
bool group_strict_size,
const DataGetter<T>& data_getter,
std::vector<GroupByValueType>& group_by_values,
std::vector<int64_t>& seg_offsets,
Expand All @@ -195,19 +196,31 @@ struct GroupByMap {
std::unordered_map<T, int> group_map_{};
int group_capacity_{0};
int group_size_{0};
int enough_group_count{0};
int enough_group_count_{0};
bool strict_group_size_{false};

public:
GroupByMap(int group_capacity, int group_size)
: group_capacity_(group_capacity), group_size_(group_size){};
GroupByMap(int group_capacity,
int group_size,
bool strict_group_size = false)
: group_capacity_(group_capacity),
group_size_(group_size),
strict_group_size_(strict_group_size){};
bool
IsGroupResEnough() {
return group_map_.size() == group_capacity_ &&
enough_group_count == group_capacity_;
bool enough = false;
if (strict_group_size_) {
enough = group_map_.size() == group_capacity_ &&
enough_group_count_ == group_capacity_;
} else {
enough = group_map_.size() == group_capacity_;
}
return enough;
}
bool
Push(const T& t) {
if (group_map_.size() >= group_capacity_ && group_map_[t] == 0) {
if (group_map_.size() >= group_capacity_ &&
group_map_.find(t) == group_map_.end()) {
return false;
}
if (group_map_[t] >= group_size_) {
Expand All @@ -218,7 +231,7 @@ struct GroupByMap {
}
group_map_[t] += 1;
if (group_map_[t] >= group_size_) {
enough_group_count += 1;
enough_group_count_ += 1;
}
return true;
}
Expand All @@ -229,6 +242,7 @@ void
GroupIteratorResult(const std::shared_ptr<VectorIterator>& iterator,
int64_t topK,
int64_t group_size,
bool group_strict_size,
const DataGetter<T>& data_getter,
std::vector<GroupByValueType>& group_by_values,
std::vector<int64_t>& offsets,
Expand Down
2 changes: 1 addition & 1 deletion internal/core/src/segcore/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@


add_source_at_current_directory_recursively()
add_library(milvus_segcore OBJECT ${SOURCE_FILES})
add_library(milvus_segcore OBJECT ${SOURCE_FILES})
Loading

0 comments on commit ee19203

Please sign in to comment.