Skip to content

Commit

Permalink
[MOD-7297] Replace Variable Length Array on stack with heap allocation (
Browse files Browse the repository at this point in the history
#505)

* vaseline for BM

* fix to baseline

* replace stack with allocation

* use unique ptr

* revert format batch iterator

* fix lifetime

* fix

* fix

* fix

* rearrange

* use ref to allocator instead of pointer

* disable flow temp
  • Loading branch information
meiravgri authored Jul 16, 2024
1 parent da2ae4a commit ab96a8d
Show file tree
Hide file tree
Showing 9 changed files with 69 additions and 33 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/flow-temp.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
uses: ./.github/workflows/task-unit-test.yml
with:
container: ubuntu:jammy
run-valgrind: false
run-valgrind: true
focal:
uses: ./.github/workflows/task-unit-test.yml
with:
Expand Down
6 changes: 3 additions & 3 deletions src/VecSim/algorithms/hnsw/hnsw.h
Original file line number Diff line number Diff line change
Expand Up @@ -1820,9 +1820,9 @@ AddVectorCtx HNSWIndex<DataType, DistType>::storeNewElement(labelType label,
// Create the new element's graph metadata.
// We must assign manually enough memory on the stack and not just declare an `ElementGraphData`
// variable, since it has a flexible array member.
char tmpData[this->elementGraphDataSize];
memset(tmpData, 0, this->elementGraphDataSize);
ElementGraphData *cur_egd = (ElementGraphData *)tmpData;
auto tmpData = this->allocator->allocate_unique(this->elementGraphDataSize);
memset(tmpData.get(), 0, this->elementGraphDataSize);
ElementGraphData *cur_egd = (ElementGraphData *)(tmpData.get());
// Allocate memory (inside `ElementGraphData` constructor) for the links in higher levels and
// initialize this memory to zeros. The reason for doing it here is that we might mark this
// vector as deleted BEFORE we finish its indexing. In that case, we will collect the incoming
Expand Down
15 changes: 8 additions & 7 deletions src/VecSim/algorithms/hnsw/hnsw_serializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,15 +160,15 @@ void HNSWIndex<DataType, DistType>::restoreGraph(std::ifstream &input) {
unsigned int block_len = 0;
readBinaryPOD(input, block_len);
for (size_t j = 0; j < block_len; j++) {
char cur_vec[this->dataSize];
input.read(cur_vec, this->dataSize);
this->vectorBlocks.back().addElement(cur_vec);
auto cur_vec = this->getAllocator()->allocate_unique(this->dataSize);
input.read(static_cast<char *>(cur_vec.get()), this->dataSize);
this->vectorBlocks.back().addElement(cur_vec.get());
}
}

// Get graph data blocks
ElementGraphData *cur_egt;
char tmpData[this->elementGraphDataSize];
auto tmpData = this->getAllocator()->allocate_unique(this->elementGraphDataSize);
size_t toplevel = 0;
for (size_t i = 0; i < num_blocks; i++) {
this->graphDataBlocks.emplace_back(this->blockSize, this->elementGraphDataSize,
Expand All @@ -177,19 +177,20 @@ void HNSWIndex<DataType, DistType>::restoreGraph(std::ifstream &input) {
readBinaryPOD(input, block_len);
for (size_t j = 0; j < block_len; j++) {
// Reset tmpData
memset(tmpData, 0, this->elementGraphDataSize);
memset(tmpData.get(), 0, this->elementGraphDataSize);
// Read the current element top level
readBinaryPOD(input, toplevel);
// Allocate space and structs for the current element
try {
new (tmpData) ElementGraphData(toplevel, this->levelDataSize, this->allocator);
new (tmpData.get())
ElementGraphData(toplevel, this->levelDataSize, this->allocator);
} catch (std::runtime_error &e) {
this->log(VecSimCommonStrings::LOG_WARNING_STRING,
"Error - allocating memory for new element failed due to low memory");
throw e;
}
// Add the current element to the current block, and update cur_egt to point to it.
this->graphDataBlocks.back().addElement(tmpData);
this->graphDataBlocks.back().addElement(tmpData.get());
cur_egt = (ElementGraphData *)this->graphDataBlocks.back().getElement(j);

// Restore the current element's graph data
Expand Down
7 changes: 4 additions & 3 deletions src/VecSim/algorithms/hnsw/hnsw_tiered.h
Original file line number Diff line number Diff line change
Expand Up @@ -504,11 +504,12 @@ void TieredHNSWIndex<DataType, DistType>::executeInsertJob(HNSWInsertJob *job) {
HNSWIndex<DataType, DistType> *hnsw_index = this->getHNSWIndex();
// Copy the vector blob from the flat buffer, so we can release the flat lock while we are
// indexing the vector into HNSW index.
DataType blob_copy[this->frontendIndex->getDim()];
memcpy(blob_copy, this->frontendIndex->getDataByInternalId(job->id),
auto blob_copy = this->getAllocator()->allocate_unique(this->frontendIndex->getDataSize());

memcpy(blob_copy.get(), this->frontendIndex->getDataByInternalId(job->id),
this->frontendIndex->getDim() * sizeof(DataType));

this->insertVectorToHNSW<true>(hnsw_index, job->label, blob_copy);
this->insertVectorToHNSW<true>(hnsw_index, job->label, blob_copy.get());

// Remove the vector and the insert job from the flat buffer.
this->flatIndexGuard.lock();
Expand Down
11 changes: 11 additions & 0 deletions src/VecSim/memory/vecsim_malloc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,17 @@ void *VecSimAllocator::callocate(size_t size) {
return nullptr;
}

std::unique_ptr<void, VecSimAllocator::Deleter>
VecSimAllocator::allocate_aligned_unique(size_t size, size_t alignment) {
void *ptr = this->allocate_aligned(size, alignment);
return {ptr, Deleter(*this)};
}

std::unique_ptr<void, VecSimAllocator::Deleter> VecSimAllocator::allocate_unique(size_t size) {
void *ptr = this->allocate(size);
return {ptr, Deleter(*this)};
}

void *VecSimAllocator::operator new(size_t size) { return vecsim_malloc(size); }

void *VecSimAllocator::operator new[](size_t size) { return vecsim_malloc(size); }
Expand Down
14 changes: 13 additions & 1 deletion src/VecSim/memory/vecsim_malloc.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ struct VecSimAllocator {
static size_t allocation_header_size;
static VecSimMemoryFunctions memFunctions;

// Forward declaration of the deleter for the unique_ptr.
struct Deleter;
VecSimAllocator() : allocated(std::atomic_uint64_t(sizeof(VecSimAllocator))) {}

public:
Expand All @@ -36,6 +38,10 @@ struct VecSimAllocator {
void *reallocate(void *p, size_t size);
void free_allocation(void *p);

// Allocations for scope-life-time memory.
std::unique_ptr<void, Deleter> allocate_aligned_unique(size_t size, size_t alignment);
std::unique_ptr<void, Deleter> allocate_unique(size_t size);

void *operator new(size_t size);
void *operator new[](size_t size);
void operator delete(void *p, size_t size);
Expand All @@ -55,8 +61,14 @@ struct VecSimAllocator {
static size_t getAllocationOverheadSize() { return allocation_header_size; }

private:
// Retrive the original requested allocation size. Required for remalloc.
// Retrieve the original requested allocation size. Required for remalloc.
inline size_t getPointerAllocationSize(void *p) { return *(((size_t *)p) - 1); }

struct Deleter {
VecSimAllocator &allocator;
explicit constexpr Deleter(VecSimAllocator &allocator) : allocator(allocator) {}
void operator()(void *ptr) const { allocator.free_allocation(ptr); }
};
};

/**
Expand Down
5 changes: 3 additions & 2 deletions src/VecSim/spaces/normalize/normalize_naive.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "VecSim/types/bfloat16.h"
#include "VecSim/types/float16.h"
#include <cmath>
#include <vector>

using bfloat16 = vecsim_types::bfloat16;
using float16 = vecsim_types::float16;
Expand All @@ -35,7 +36,7 @@ template <bool is_little>
static inline void bfloat16_normalizeVector(void *vec, const size_t dim) {
bfloat16 *input_vector = (bfloat16 *)vec;

float f32_tmp[dim];
std::vector<float> f32_tmp(dim);

float sum = 0;

Expand All @@ -55,7 +56,7 @@ static inline void bfloat16_normalizeVector(void *vec, const size_t dim) {
static inline void float16_normalizeVector(void *vec, const size_t dim) {
float16 *input_vector = (float16 *)vec;

float f32_tmp[dim];
std::vector<float> f32_tmp(dim);

float sum = 0;

Expand Down
20 changes: 12 additions & 8 deletions src/VecSim/vec_sim_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -206,33 +206,37 @@ struct VecSimIndexAbstract : public VecSimIndexInterface {

protected:
virtual int addVectorWrapper(const void *blob, labelType label, void *auxiliaryCtx) override {
char PORTABLE_ALIGN aligned_mem[this->dataSize];
const void *processed_blob = processBlob(blob, aligned_mem);
auto aligned_mem =
this->getAllocator()->allocate_aligned_unique(this->dataSize, this->alignment);
const void *processed_blob = processBlob(blob, aligned_mem.get());

return this->addVector(processed_blob, label, auxiliaryCtx);
}

virtual VecSimQueryReply *topKQueryWrapper(const void *queryBlob, size_t k,
VecSimQueryParams *queryParams) const override {
char PORTABLE_ALIGN aligned_mem[this->dataSize];
const void *processed_blob = processBlob(queryBlob, aligned_mem);
auto aligned_mem =
this->getAllocator()->allocate_aligned_unique(this->dataSize, this->alignment);
const void *processed_blob = processBlob(queryBlob, aligned_mem.get());

return this->topKQuery(processed_blob, k, queryParams);
}

virtual VecSimQueryReply *rangeQueryWrapper(const void *queryBlob, double radius,
VecSimQueryParams *queryParams,
VecSimQueryReply_Order order) const override {
char PORTABLE_ALIGN aligned_mem[this->dataSize];
const void *processed_blob = processBlob(queryBlob, aligned_mem);
auto aligned_mem =
this->getAllocator()->allocate_aligned_unique(this->dataSize, this->alignment);
const void *processed_blob = processBlob(queryBlob, aligned_mem.get());

return this->rangeQuery(processed_blob, radius, queryParams, order);
}

virtual VecSimBatchIterator *
newBatchIteratorWrapper(const void *queryBlob, VecSimQueryParams *queryParams) const override {
char PORTABLE_ALIGN aligned_mem[this->dataSize];
const void *processed_blob = processBlob(queryBlob, aligned_mem);
auto aligned_mem =
this->getAllocator()->allocate_aligned_unique(this->dataSize, this->alignment);
const void *processed_blob = processBlob(queryBlob, aligned_mem.get());

return this->newBatchIterator(processed_blob, queryParams);
}
Expand Down
22 changes: 14 additions & 8 deletions src/VecSim/vec_sim_tiered_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,31 +109,37 @@ class VecSimTieredIndex : public VecSimIndexInterface {

private:
virtual int addVectorWrapper(const void *blob, labelType label, void *auxiliaryCtx) override {
char PORTABLE_ALIGN aligned_mem[this->backendIndex->getDataSize()];
const void *processed_blob = this->backendIndex->processBlob(blob, aligned_mem);
auto aligned_mem = this->getAllocator()->allocate_aligned_unique(
this->backendIndex->getDataSize(), this->backendIndex->getAlignment());
const void *processed_blob = this->backendIndex->processBlob(blob, aligned_mem.get());

return this->addVector(processed_blob, label, auxiliaryCtx);
}

virtual VecSimQueryReply *topKQueryWrapper(const void *queryBlob, size_t k,
VecSimQueryParams *queryParams) const override {
char PORTABLE_ALIGN aligned_mem[this->backendIndex->getDataSize()];
const void *processed_blob = this->backendIndex->processBlob(queryBlob, aligned_mem);
auto aligned_mem = this->getAllocator()->allocate_aligned_unique(
this->backendIndex->getDataSize(), this->backendIndex->getAlignment());
const void *processed_blob = this->backendIndex->processBlob(queryBlob, aligned_mem.get());

return this->topKQuery(processed_blob, k, queryParams);
}

virtual VecSimQueryReply *rangeQueryWrapper(const void *queryBlob, double radius,
VecSimQueryParams *queryParams,
VecSimQueryReply_Order order) const override {
char PORTABLE_ALIGN aligned_mem[this->backendIndex->getDataSize()];
const void *processed_blob = this->backendIndex->processBlob(queryBlob, aligned_mem);
auto aligned_mem = this->getAllocator()->allocate_aligned_unique(
this->backendIndex->getDataSize(), this->backendIndex->getAlignment());
const void *processed_blob = this->backendIndex->processBlob(queryBlob, aligned_mem.get());

return this->rangeQuery(processed_blob, radius, queryParams, order);
}

virtual VecSimBatchIterator *
newBatchIteratorWrapper(const void *queryBlob, VecSimQueryParams *queryParams) const override {
char PORTABLE_ALIGN aligned_mem[this->backendIndex->getDataSize()];
const void *processed_blob = this->backendIndex->processBlob(queryBlob, aligned_mem);
auto aligned_mem = this->getAllocator()->allocate_aligned_unique(
this->backendIndex->getDataSize(), this->backendIndex->getAlignment());
const void *processed_blob = this->backendIndex->processBlob(queryBlob, aligned_mem.get());

return this->newBatchIterator(processed_blob, queryParams);
}
Expand Down

0 comments on commit ab96a8d

Please sign in to comment.