Skip to content

Commit

Permalink
fix race for positive control counters usage
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 committed Jan 24, 2025
1 parent f146192 commit 97581d1
Show file tree
Hide file tree
Showing 8 changed files with 232 additions and 179 deletions.
39 changes: 39 additions & 0 deletions ydb/core/tx/limiter/grouped_memory/service/allocation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,43 @@ TAllocationInfo::TAllocationInfo(const ui64 processId, const ui64 scopeId, const
Stage->Add(AllocatedVolume, GetAllocationStatus() == EAllocationStatus::Allocated);
}

bool TAllocationInfo::Allocate(const NActors::TActorId& ownerId) {
AFL_TRACE(NKikimrServices::GROUPED_MEMORY_LIMITER)("event", "allocated")("allocation_id", Identifier)("stage", Stage->GetName());
AFL_VERIFY(Allocation)("status", GetAllocationStatus())("volume", AllocatedVolume)("id", Identifier)("stage", Stage->GetName())(
"allocation_internal_group_id", AllocationInternalGroupId);
auto allocationResult = Stage->Allocate(AllocatedVolume);
if (allocationResult.IsFail()) {
AllocationFailed = true;
Allocation->OnAllocationImpossible(allocationResult.GetErrorMessage());
Allocation = nullptr;
return false;
}
const bool result = Allocation->OnAllocated(
std::make_shared<TAllocationGuard>(ProcessId, ScopeId, Allocation->GetIdentifier(), ownerId, Allocation->GetMemory()), Allocation);
if (!result) {
Stage->Free(AllocatedVolume, true);
AllocationFailed = true;
}
Allocation = nullptr;
return result;
}

void TAllocationInfo::SetAllocatedVolume(const ui64 value) {
AFL_VERIFY(GetAllocationStatus() != EAllocationStatus::Failed);
Stage->UpdateVolume(AllocatedVolume, value, GetAllocationStatus() == EAllocationStatus::Allocated);
AllocatedVolume = value;
}

bool TAllocationInfo::IsAllocatable(const ui64 additional) const {
return Stage->IsAllocatable(AllocatedVolume, additional);
}

TAllocationInfo::~TAllocationInfo() {
if (GetAllocationStatus() != EAllocationStatus::Failed) {
Stage->Free(AllocatedVolume, GetAllocationStatus() == EAllocationStatus::Allocated);
}

AFL_TRACE(NKikimrServices::GROUPED_MEMORY_LIMITER)("event", "destroy")("allocation_id", Identifier)("stage", Stage->GetName());
}

} // namespace NKikimr::NOlap::NGroupedMemoryManager
39 changes: 4 additions & 35 deletions ydb/core/tx/limiter/grouped_memory/service/allocation.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,48 +22,17 @@ class TAllocationInfo: public NColumnShard::TMonitoringObjectsCounter<TAllocatio
bool AllocationFailed = false;

public:
~TAllocationInfo() {
if (GetAllocationStatus() != EAllocationStatus::Failed) {
Stage->Free(AllocatedVolume, GetAllocationStatus() == EAllocationStatus::Allocated);
}

AFL_TRACE(NKikimrServices::GROUPED_MEMORY_LIMITER)("event", "destroy")("allocation_id", Identifier)("stage", Stage->GetName());
}
~TAllocationInfo();

bool IsAllocatable(const ui64 additional) const {
return Stage->IsAllocatable(AllocatedVolume, additional);
}
bool IsAllocatable(const ui64 additional) const;

void SetAllocatedVolume(const ui64 value) {
AFL_VERIFY(GetAllocationStatus() != EAllocationStatus::Failed);
Stage->UpdateVolume(AllocatedVolume, value, GetAllocationStatus() == EAllocationStatus::Allocated);
AllocatedVolume = value;
}
void SetAllocatedVolume(const ui64 value);

ui64 GetAllocatedVolume() const {
return AllocatedVolume;
}

[[nodiscard]] bool Allocate(const NActors::TActorId& ownerId) {
AFL_TRACE(NKikimrServices::GROUPED_MEMORY_LIMITER)("event", "allocated")("allocation_id", Identifier)("stage", Stage->GetName());
AFL_VERIFY(Allocation)("status", GetAllocationStatus())("volume", AllocatedVolume)("id", Identifier)("stage", Stage->GetName())(
"allocation_internal_group_id", AllocationInternalGroupId);
auto allocationResult = Stage->Allocate(AllocatedVolume);
if (allocationResult.IsFail()) {
AllocationFailed = true;
Allocation->OnAllocationImpossible(allocationResult.GetErrorMessage());
Allocation = nullptr;
return false;
}
const bool result = Allocation->OnAllocated(
std::make_shared<TAllocationGuard>(ProcessId, ScopeId, Allocation->GetIdentifier(), ownerId, Allocation->GetMemory()), Allocation);
if (!result) {
Stage->Free(AllocatedVolume, true);
AllocationFailed = true;
}
Allocation = nullptr;
return result;
}
[[nodiscard]] bool Allocate(const NActors::TActorId& ownerId);

EAllocationStatus GetAllocationStatus() const {
if (AllocationFailed) {
Expand Down
124 changes: 2 additions & 122 deletions ydb/core/tx/limiter/grouped_memory/usage/abstract.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
#pragma once
#include "stage_features.h"

#include <ydb/core/tx/limiter/grouped_memory/service/counters.h>

#include <ydb/library/accessor/accessor.h>
Expand All @@ -10,8 +12,6 @@

namespace NKikimr::NOlap::NGroupedMemoryManager {

class TStageFeatures;

class TGroupGuard {
private:
const NActors::TActorId ActorId;
Expand Down Expand Up @@ -76,126 +76,6 @@ class TAllocationGuard {
~TAllocationGuard();
};

class TStageFeatures {
private:
YDB_READONLY_DEF(TString, Name);
YDB_READONLY(ui64, Limit, 0);
YDB_READONLY(ui64, HardLimit, 0);
YDB_ACCESSOR_DEF(TPositiveControlInteger, Usage);
YDB_ACCESSOR_DEF(TPositiveControlInteger, Waiting);
std::shared_ptr<TStageFeatures> Owner;
std::shared_ptr<TStageCounters> Counters;

public:
TString DebugString() const {
TStringBuilder result;
result << "name=" << Name << ";limit=" << Limit << ";";
if (Owner) {
result << "owner=" << Owner->DebugString() << ";";
}
return result;
}

ui64 GetFullMemory() const {
return Usage.Val() + Waiting.Val();
}

TStageFeatures(const TString& name, const ui64 limit, const ui64 hardLimit, const std::shared_ptr<TStageFeatures>& owner,
const std::shared_ptr<TStageCounters>& counters)
: Name(name)
, Limit(limit)
, HardLimit(hardLimit)
, Owner(owner)
, Counters(counters) {
}

[[nodiscard]] TConclusionStatus Allocate(const ui64 volume) {
Waiting.Sub(volume);
if (HardLimit < Usage.Val() + volume) {
Counters->OnCannotAllocate();
AFL_DEBUG(NKikimrServices::GROUPED_MEMORY_LIMITER)("name", Name)("event", "cannot_allocate")("limit", HardLimit)(
"usage", Usage.Val())("delta", volume);
return TConclusionStatus::Fail(
TStringBuilder() << Name << "::(limit:" << HardLimit << ";val:" << Usage.Val() << ";delta=" << volume << ");");
}
Usage.Add(volume);
AFL_DEBUG(NKikimrServices::GROUPED_MEMORY_LIMITER)("name", Name)("event", "allocate")("usage", Usage.Val())("delta", volume);
if (Counters) {
Counters->Add(volume, true);
Counters->Sub(volume, false);
}
if (Owner) {
const auto ownerResult = Owner->Allocate(volume);
if (ownerResult.IsFail()) {
Free(volume, true, false);
return ownerResult;
}
}
return TConclusionStatus::Success();
}

void Free(const ui64 volume, const bool allocated, const bool withOwner = true) {
if (Counters) {
Counters->Sub(volume, allocated);
}
if (allocated) {
Usage.Sub(volume);
} else {
Waiting.Sub(volume);
}
AFL_DEBUG(NKikimrServices::GROUPED_MEMORY_LIMITER)("name", Name)("event", "free")("usage", Usage.Val())("delta", volume);

if (withOwner && Owner) {
Owner->Free(volume, allocated);
}
}

void UpdateVolume(const ui64 from, const ui64 to, const bool allocated) {
if (Counters) {
Counters->Sub(from, allocated);
Counters->Add(to, allocated);
}
AFL_DEBUG(NKikimrServices::GROUPED_MEMORY_LIMITER)("name", Name)("event", "update")("usage", Usage.Val())("waiting", Waiting.Val())(
"allocated", allocated)("from", from)("to", to);
if (allocated) {
Usage.Sub(from);
Usage.Add(to);
} else {
Waiting.Sub(from);
Waiting.Add(to);
}

if (Owner) {
Owner->UpdateVolume(from, to, allocated);
}
}

bool IsAllocatable(const ui64 volume, const ui64 additional) const {
if (Limit < additional + Usage.Val() + volume) {
return false;
}
if (Owner) {
return Owner->IsAllocatable(volume, additional);
}
return true;
}

void Add(const ui64 volume, const bool allocated) {
if (Counters) {
Counters->Add(volume, allocated);
}
if (allocated) {
Usage.Add(volume);
} else {
Waiting.Add(volume);
}

if (Owner) {
Owner->Add(volume, allocated);
}
}
};

class IAllocation {
private:
static inline TAtomicCounter Counter = 0;
Expand Down
131 changes: 131 additions & 0 deletions ydb/core/tx/limiter/grouped_memory/usage/stage_features.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
#include "stage_features.h"

#include <ydb/library/actors/core/log.h>

#include <util/string/builder.h>

namespace NKikimr::NOlap::NGroupedMemoryManager {

TString TStageFeatures::DebugString() const {
TStringBuilder result;
result << "name=" << Name << ";limit=" << Limit << ";";
if (Owner) {
result << "owner=" << Owner->DebugString() << ";";
}
return result;
}

TStageFeatures::TStageFeatures(const TString& name, const ui64 limit, const ui64 hardLimit, const std::shared_ptr<TStageFeatures>& owner,
const std::shared_ptr<TStageCounters>& counters)
: Name(name)
, Limit(limit)
, HardLimit(hardLimit)
, Owner(owner)
, Counters(counters) {
}

TConclusionStatus TStageFeatures::Allocate(const ui64 volume) {
std::optional<TConclusionStatus> result;
{
auto* current = this;
while (current) {
current->Waiting.Sub(volume);
if (current->Counters) {
current->Counters->Sub(volume, false);
}
if (current->HardLimit < current->Usage.Val() + volume) {
if (!result) {
result = TConclusionStatus::Fail(TStringBuilder() << current->Name << "::(limit:" << current->HardLimit
<< ";val:" << current->Usage.Val() << ";delta=" << volume << ");");
}
if (current->Counters) {
current->Counters->OnCannotAllocate();
}
AFL_DEBUG(NKikimrServices::GROUPED_MEMORY_LIMITER)("name", current->Name)("event", "cannot_allocate")(
"limit", current->HardLimit)("usage", current->Usage.Val())("delta", volume);
}
current = current->Owner.get();
}
}
if (!!result) {
return *result;
}
{
auto* current = this;
while (current) {
current->Usage.Add(volume);
AFL_DEBUG(NKikimrServices::GROUPED_MEMORY_LIMITER)("name", current->Name)("event", "allocate")("usage", current->Usage.Val())(
"delta", volume);
if (current->Counters) {
current->Counters->Add(volume, true);
}
current = current->Owner.get();
}
}
return TConclusionStatus::Success();
}

void TStageFeatures::Free(const ui64 volume, const bool allocated) {
auto* current = this;
while (current) {
if (current->Counters) {
current->Counters->Sub(volume, allocated);
}
if (allocated) {
current->Usage.Sub(volume);
} else {
current->Waiting.Sub(volume);
}
AFL_DEBUG(NKikimrServices::GROUPED_MEMORY_LIMITER)("name", current->Name)("event", "free")("usage", current->Usage.Val())(
"delta", volume);
current = current->Owner.get();
}
}

void TStageFeatures::UpdateVolume(const ui64 from, const ui64 to, const bool allocated) {
AFL_VERIFY(false);
if (Counters) {
Counters->Sub(from, allocated);
Counters->Add(to, allocated);
}
AFL_DEBUG(NKikimrServices::GROUPED_MEMORY_LIMITER)("name", Name)("event", "update")("usage", Usage.Val())("waiting", Waiting.Val())(
"allocated", allocated)("from", from)("to", to);
if (allocated) {
Usage.Sub(from);
Usage.Add(to);
} else {
Waiting.Sub(from);
Waiting.Add(to);
}

if (Owner) {
Owner->UpdateVolume(from, to, allocated);
}
}

bool TStageFeatures::IsAllocatable(const ui64 volume, const ui64 additional) const {
if (Limit < additional + Usage.Val() + volume) {
return false;
}
if (Owner) {
return Owner->IsAllocatable(volume, additional);
}
return true;
}

void TStageFeatures::Add(const ui64 volume, const bool allocated) {
if (Counters) {
Counters->Add(volume, allocated);
}
if (allocated) {
Usage.Add(volume);
} else {
Waiting.Add(volume);
}

if (Owner) {
Owner->Add(volume, allocated);
}
}

} // namespace NKikimr::NOlap::NGroupedMemoryManager
Loading

0 comments on commit 97581d1

Please sign in to comment.