From 7b9761398ecd016066d7cfa05b4692999fb707fa Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Wed, 22 Jan 2025 14:16:41 +0200 Subject: [PATCH] chore: introduce background deletions of DenseSet objects We currently implement them only for sets but the same approach can work for zset, hashes, lists as well. Signed-off-by: Roman Gershman --- src/server/generic_family.cc | 71 ++++++++++++++++++++++++++++--- src/server/generic_family.h | 2 +- src/server/generic_family_test.cc | 14 ++++++ src/server/string_family.cc | 2 +- 4 files changed, 81 insertions(+), 8 deletions(-) diff --git a/src/server/generic_family.cc b/src/server/generic_family.cc index 0f89fd8db508..4b9c4baae6ab 100644 --- a/src/server/generic_family.cc +++ b/src/server/generic_family.cc @@ -33,6 +33,7 @@ extern "C" { #include "server/search/doc_index.h" #include "server/set_family.h" #include "server/transaction.h" +#include "util/fibers/proactor_base.h" #include "util/varz.h" ABSL_FLAG(uint32_t, dbnum, 16, "Number of databases"); @@ -41,6 +42,7 @@ ABSL_FLAG(uint32_t, keys_output_limit, 8192, "Maximum number of keys output by k namespace dfly { using namespace std; using namespace facade; +using util::fb2::ProactorBase; namespace { @@ -994,14 +996,46 @@ std::optional ParseExpireOptionsOrReply(const CmdArgList args, SinkRepl return flags; } -void DeleteGeneric(CmdArgList args, const CommandContext& cmd_cntx) { +constexpr uint32_t kClearStepSize = 1024; +struct ClearNode { + DenseSet* ds; + uint32_t cursor; + ClearNode* next; + + ClearNode(DenseSet* d, uint32_t c, ClearNode* n) : ds(d), cursor(c), next(n) { + } +}; + +// We add async deletion requests to a linked list and process them asynchronously +// in each thread. +__thread ClearNode* clear_head = nullptr; + +int32_t ClearQueuedDenseSetEntries() { + if (clear_head == nullptr) + return -1; // unregister itself. + + auto* current = clear_head; + + DVLOG(2) << "ClearQueuedDenseSetEntries " << current->cursor; + uint32_t next = current->ds->ClearStep(current->cursor, kClearStepSize); + if (next == current->ds->BucketCount()) { // reached the end. + CompactObj::DeleteMR(current->ds); + clear_head = current->next; + delete current; + } else { + current->cursor = next; + } + return ProactorBase::kOnIdleMaxLevel; +}; + +void DeleteGeneric(CmdArgList args, const CommandContext& cmd_cntx, bool async) { atomic_uint32_t result{0}; auto* builder = cmd_cntx.rb; bool is_mc = (builder->GetProtocol() == Protocol::MEMCACHE); - auto cb = [&result](const Transaction* t, EngineShard* shard) { + auto cb = [&](const Transaction* t, EngineShard* shard) { ShardArgs args = t->GetShardArgs(shard->shard_id()); - auto res = GenericFamily::OpDel(t->GetOpArgs(shard), args); + auto res = GenericFamily::OpDel(t->GetOpArgs(shard), args, async); result.fetch_add(res.value_or(0), memory_order_relaxed); return OpStatus::OK; @@ -1029,7 +1063,7 @@ void DeleteGeneric(CmdArgList args, const CommandContext& cmd_cntx) { } // namespace -OpResult GenericFamily::OpDel(const OpArgs& op_args, const ShardArgs& keys) { +OpResult GenericFamily::OpDel(const OpArgs& op_args, const ShardArgs& keys, bool async) { DVLOG(1) << "Del: " << keys.Front(); auto& db_slice = op_args.GetDbSlice(); @@ -1040,6 +1074,31 @@ OpResult GenericFamily::OpDel(const OpArgs& op_args, const ShardArgs& if (!IsValid(it)) continue; + PrimeValue& pv = it->second; + if (pv.ObjType() == OBJ_SET && pv.Encoding() == kEncodingStrMap2) { + DenseSet* ds = (DenseSet*)pv.RObjPtr(); + pv.SetRObjPtr(nullptr); + + uint32_t next = ds->ClearStep(0, kClearStepSize); + if (next < ds->BucketCount() && async) { + ProactorBase* pb = ProactorBase::me(); + DCHECK(pb); + + bool launch_task = (clear_head == nullptr); + + // register ds + clear_head = new ClearNode{ds, next, clear_head}; + ds = nullptr; + if (launch_task) { + pb->AddOnIdleTask(&ClearQueuedDenseSetEntries); + } + } + + if (ds) { + CompactObj::DeleteMR(ds); + } + } + db_slice.Del(op_args.db_cntx, it); ++res; } @@ -1050,11 +1109,11 @@ OpResult GenericFamily::OpDel(const OpArgs& op_args, const ShardArgs& void GenericFamily::Del(CmdArgList args, const CommandContext& cmd_cntx) { VLOG(1) << "Del " << ArgS(args, 0); - DeleteGeneric(args, cmd_cntx); + DeleteGeneric(args, cmd_cntx, false); } void GenericFamily::Unlink(CmdArgList args, const CommandContext& cmd_cntx) { - DeleteGeneric(args, cmd_cntx); + DeleteGeneric(args, cmd_cntx, true); } void GenericFamily::Ping(CmdArgList args, const CommandContext& cmd_cntx) { diff --git a/src/server/generic_family.h b/src/server/generic_family.h index 0fb231ec80eb..f3d636ce1fe4 100644 --- a/src/server/generic_family.h +++ b/src/server/generic_family.h @@ -29,7 +29,7 @@ class GenericFamily { // Accessed by Service::Exec and Service::Watch as an utility. static OpResult OpExists(const OpArgs& op_args, const ShardArgs& keys); - static OpResult OpDel(const OpArgs& op_args, const ShardArgs& keys); + static OpResult OpDel(const OpArgs& op_args, const ShardArgs& keys, bool async); private: using SinkReplyBuilder = facade::SinkReplyBuilder; diff --git a/src/server/generic_family_test.cc b/src/server/generic_family_test.cc index 68d54ea6f82c..77237f6ea3d1 100644 --- a/src/server/generic_family_test.cc +++ b/src/server/generic_family_test.cc @@ -872,4 +872,18 @@ TEST_F(GenericFamilyTest, RestoreOOM) { EXPECT_THAT(resp, ErrArg("Out of memory")); } +TEST_F(GenericFamilyTest, Unlink) { + for (unsigned i = 0; i < 1000; ++i) { + unsigned start = i * 10; + vector cmd = {"SADD", "s1"}; + for (unsigned j = 0; j < 10; ++j) { + cmd.push_back(absl::StrCat("f", start + j)); + } + auto resp = Run(absl::MakeSpan(cmd)); + ASSERT_THAT(resp, IntArg(10)); + } + auto resp = Run({"unlink", "s1"}); + EXPECT_THAT(resp, IntArg(1)); +} + } // namespace dfly diff --git a/src/server/string_family.cc b/src/server/string_family.cc index 706126ad517b..0904f7e113ec 100644 --- a/src/server/string_family.cc +++ b/src/server/string_family.cc @@ -1014,7 +1014,7 @@ void StringFamily::Set(CmdArgList args, const CommandContext& cmnd_cntx) { if (rel_ms < 0) { cmnd_cntx.tx->ScheduleSingleHop([](const Transaction* tx, EngineShard* es) { ShardArgs args = tx->GetShardArgs(es->shard_id()); - GenericFamily::OpDel(tx->GetOpArgs(es), args); + GenericFamily::OpDel(tx->GetOpArgs(es), args, false); return OpStatus::OK; }); return builder->SendStored();