From b96e6daf575a65712fe1be848d042046ed8bea0b Mon Sep 17 00:00:00 2001 From: Shiyuan Ji <919745273@qq.com> Date: Sat, 4 Nov 2023 21:38:14 +0800 Subject: [PATCH 1/4] [WIP]improvement(hashmap): use folly ConcurrentHashMap replace unordered_map --- src/helper.h | 23 +++++++++++++++++++++++ src/store.cc | 47 +++++++++++++++++++++++++++++++---------------- src/store.h | 18 ++++++++++++------ 3 files changed, 66 insertions(+), 22 deletions(-) diff --git a/src/helper.h b/src/helper.h index 91ee194ce..4d5644ad2 100644 --- a/src/helper.h +++ b/src/helper.h @@ -48,6 +48,29 @@ inline typename HASH::const_local_iterator RandomHashMember(const HASH& containe return typename HASH::const_local_iterator(); } +template +inline HASH::const_iterator KeyRandomHashMember(const HASH& container) { + auto it = container.cend(); + if (container.empty()) { + return it; + } + + it = container.cbegin(); + // FIXME: don't iterator, use O(1) algorithm + size_t randomIdx = random() % container.size(); + for (size_t i = 0; i < randomIdx; ++i) { + ++it; + } + return it; +} + +template +inline size_t ScanHashMember(const HASH& container, size_t cursor, size_t count, + std::vector& res) { + // TODO(century): wait to do with folly concurrent hashmap + return 0; +} + // scan template inline size_t ScanHashMember(const HASH& container, size_t cursor, size_t count, diff --git a/src/store.cc b/src/store.cc index df492bdca..1ad745c57 100644 --- a/src/store.cc +++ b/src/store.cc @@ -381,7 +381,7 @@ int PStore::GetDB() const { return dbno_; } const PObject* PStore::GetObject(const PString& key) const { auto db = &dbs_[dbno_]; - PDB::const_iterator it(db->find(key)); + FDB::const_iterator it(db->find(key)); if (it != db->end()) { return &it->second; } @@ -397,12 +397,14 @@ const PObject* PStore::GetObject(const PString& key) const { if (obj.type != PType_invalid) { DEBUG("GetKey from leveldb:{}", key); - (*db)[key] = std::move(obj); - PObject& realobj = (*db)[key]; - realobj.lru = PObject::lruclock; + unsigned int remainTtlSecondsTemp = obj.lru; + // XXX: assign lru here cause everything get from db is immutable + obj.lru = PObject::lruclock; + db->try_emplace(key, std::move(obj)); + const PObject& realobj = db->find(key)->second; // trick: use lru field to store the remain seconds to be expired. - unsigned int remainTtlSeconds = obj.lru; + unsigned int remainTtlSeconds = remainTtlSecondsTemp; if (remainTtlSeconds > 0) { SetExpire(key, ::Now() + remainTtlSeconds * 1000); } @@ -421,7 +423,15 @@ bool PStore::DeleteKey(const PString& key) { waitSyncKeys_[dbno_][key] = nullptr; // null implies delete data } - return db->erase(key) != 0; + size_t ret = 0; + // erase() from folly ConcurrentHashmap will throw an exception if hash function crashes + try { + ret = db->erase(key); + } catch (const std::exception& e) { + return false; + } + + return ret != 0; } bool PStore::ExistsKey(const PString& key) const { @@ -438,10 +448,11 @@ PType PStore::KeyType(const PString& key) const { return PType(obj->type); } -static bool RandomMember(const PDB& hash, PString& res, PObject** val) { - PDB::const_local_iterator it = RandomHashMember(hash); +static bool RandomMember(const FDB& hash, PString& res, PObject** val) { + FDB::const_iterator it = KeyRandomHashMember(hash); - if (it != PDB::const_local_iterator()) { + // FIXME: bugs here + if (it != hash.cend()) { res = it->first; if (val) { *val = const_cast(&it->second); @@ -466,11 +477,11 @@ size_t PStore::ScanKey(size_t cursor, size_t count, std::vector& res) c return 0; } - std::vector iters; + std::vector iters; size_t newCursor = ScanHashMember(dbs_[dbno_], cursor, count, iters); res.reserve(iters.size()); - for (auto it : iters) { + for (const auto& it : iters) { res.push_back(it->first); } @@ -518,16 +529,20 @@ PError PStore::getValueByType(const PString& key, PObject*& value, PType type, b PObject* PStore::SetValue(const PString& key, PObject&& value) { auto db = &dbs_[dbno_]; - (*db)[key] = std::move(value); - PObject& obj = (*db)[key]; - obj.lru = PObject::lruclock; + value.lru = PObject::lruclock; + const auto& [_, status] = db->try_emplace(key, std::move(value)); + // if this is an update cmd, try_emplace will return false + if (!status) { + db->assign(key, std::move(value)); + } + const PObject& obj = db->find(key)->second; // put this key to sync list if (!waitSyncKeys_.empty()) { waitSyncKeys_[dbno_][key] = &obj; } - return &obj; + return const_cast(&obj); } void PStore::SetExpire(const PString& key, uint64_t when) const { expiredDBs_[dbno_].SetExpire(key, when); } @@ -552,7 +567,7 @@ void PStore::InitExpireTimer() { } void PStore::ResetDB() { - std::vector(dbs_.size()).swap(dbs_); + std::vector(dbs_.size()).swap(dbs_); std::vector(expiredDBs_.size()).swap(expiredDBs_); std::vector(blockedClients_.size()).swap(blockedClients_); dbno_ = 0; diff --git a/src/store.h b/src/store.h index 3796ec936..5b57f0f43 100644 --- a/src/store.h +++ b/src/store.h @@ -7,6 +7,8 @@ #pragma once +#define GLOG_NO_ABBREVIATED_SEVERITIES + #include "common.h" #include "dump_interface.h" #include "hash.h" @@ -14,6 +16,7 @@ #include "set.h" #include "sorted_set.h" +#include #include #include #include @@ -79,7 +82,8 @@ struct PObject { class PClient; -using PDB = std::unordered_map >; +//using PDB = std::unordered_map >; +using FDB = folly::ConcurrentHashMap>; const int kMaxDBNum = 65536; @@ -104,10 +108,12 @@ class PStore { size_t ScanKey(size_t cursor, size_t count, std::vector& res) const; // iterator - PDB::const_iterator begin() const { return dbs_[dbno_].begin(); } - PDB::const_iterator end() const { return dbs_[dbno_].end(); } - PDB::iterator begin() { return dbs_[dbno_].begin(); } - PDB::iterator end() { return dbs_[dbno_].end(); } + FDB::const_iterator begin() const { return dbs_[dbno_].begin(); } + FDB::const_iterator end() const { return dbs_[dbno_].end(); } + +// FIXME: folly::ConcurrentHashMap doesn't support non-const iterator +// PDB::iterator begin() { return dbs_[dbno_].begin(); } +// PDB::iterator end() { return dbs_[dbno_].end(); } const PObject* GetObject(const PString& key) const; PError GetValue(const PString& key, PObject*& value, bool touch = true); @@ -195,7 +201,7 @@ class PStore { PError setValue(const PString& key, PObject& value, bool exclusive = false); // Because GetObject() must be const, so mutable them - mutable std::vector dbs_; + mutable std::vector dbs_; mutable std::vector expiredDBs_; std::vector blockedClients_; std::vector > backends_; From 3d42384e85c658687d7e724ca387d7343763cb53 Mon Sep 17 00:00:00 2001 From: Shiyuan Ji <919745273@qq.com> Date: Sat, 4 Nov 2023 21:44:42 +0800 Subject: [PATCH 2/4] fix missing typename --- src/helper.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/helper.h b/src/helper.h index 4d5644ad2..c496d724c 100644 --- a/src/helper.h +++ b/src/helper.h @@ -49,7 +49,7 @@ inline typename HASH::const_local_iterator RandomHashMember(const HASH& containe } template -inline HASH::const_iterator KeyRandomHashMember(const HASH& container) { +inline typename HASH::const_iterator KeyRandomHashMember(const HASH& container) { auto it = container.cend(); if (container.empty()) { return it; From 20d29c4de23906a224a04432887edd4f59f5d12f Mon Sep 17 00:00:00 2001 From: Shiyuan Ji <919745273@qq.com> Date: Tue, 7 Nov 2023 11:54:52 +0800 Subject: [PATCH 3/4] update expiredDb, blockedClients and syncDB --- .gitignore | 2 +- src/helper.h | 37 +++++++++++++++++++++++++----- src/store.cc | 65 ++++++++++++++++++++++++++++++---------------------- src/store.h | 19 ++++++--------- 4 files changed, 77 insertions(+), 46 deletions(-) diff --git a/.gitignore b/.gitignore index 91d1ebba6..4dbc45a59 100644 --- a/.gitignore +++ b/.gitignore @@ -33,7 +33,7 @@ cmake-build-release # Log path make_config.mk -log/ +logs/ lib/ output/ diff --git a/src/helper.h b/src/helper.h index c496d724c..3bd7e942d 100644 --- a/src/helper.h +++ b/src/helper.h @@ -49,25 +49,50 @@ inline typename HASH::const_local_iterator RandomHashMember(const HASH& containe } template -inline typename HASH::const_iterator KeyRandomHashMember(const HASH& container) { +inline typename HASH::const_iterator FollyRandomHashMember(const HASH& container) { auto it = container.cend(); if (container.empty()) { return it; } it = container.cbegin(); - // FIXME: don't iterator, use O(1) algorithm size_t randomIdx = random() % container.size(); - for (size_t i = 0; i < randomIdx; ++i) { + while (randomIdx > 0) { + randomIdx--; ++it; } + return it; } template -inline size_t ScanHashMember(const HASH& container, size_t cursor, size_t count, - std::vector& res) { - // TODO(century): wait to do with folly concurrent hashmap +inline size_t FollyScanHashMember(const HASH& container, size_t cursor, size_t count, + std::vector& res) { + if (cursor >= container.size()) { + return 0; + } + + // find corresponding iterator + size_t idx = cursor; + auto it = container.cbegin(); + while (idx > 0) { + --idx; + ++it; + } + + size_t new_cursor = cursor; + auto end = container.cend(); + while (res.size() < count && it != end) { + ++new_cursor; + res.push_back(it->first); + ++it; + } + + // not the last element of map + if (it != end) { + return new_cursor; + } + return 0; } diff --git a/src/store.cc b/src/store.cc index 1ad745c57..a1bc7f03a 100644 --- a/src/store.cc +++ b/src/store.cc @@ -112,7 +112,7 @@ void PObject::freeValue() { int PStore::dirty_ = 0; -void PStore::ExpiredDB::SetExpire(const PString& key, uint64_t when) { expireKeys_[key] = when; } +void PStore::ExpiredDB::SetExpire(const PString& key, uint64_t when) { expireKeys_.insert_or_assign(key, when); } int64_t PStore::ExpiredDB::TTL(const PString& key, uint64_t now) { if (!PSTORE.ExistsKey(key)) { @@ -147,6 +147,7 @@ PStore::ExpireResult PStore::ExpiredDB::ExpireIfNeed(const PString& key, uint64_ WARN("Delete timeout key {}", it->first); PSTORE.DeleteKey(it->first); + // XXX: may throw exception if hash function crash expireKeys_.erase(it); return ExpireResult::expired; } @@ -170,7 +171,7 @@ int PStore::ExpiredDB::LoopCheck(uint64_t now) { Propagate(params); PSTORE.DeleteKey(it->first); - expireKeys_.erase(it++); + it = expireKeys_.erase(it); ++nDel; } else { @@ -188,8 +189,17 @@ bool PStore::BlockedClients::BlockClient(const PString& key, PClient* client, ui return false; } - Clients& clients = blockedClients_[key]; - clients.push_back(Clients::value_type(std::static_pointer_cast(client->shared_from_this()), timeout, pos)); + auto it = blockedClients_.find(key); + if (it != blockedClients_.end()) { + // since folly doesn't support modify value directly, we have to make a copy to update atomically + auto clients = it->second; + clients.emplace_back(std::static_pointer_cast(client->shared_from_this()), timeout, pos); + blockedClients_.assign(key, clients); + } else { + std::list, uint64_t, ListPosition>> clients; + clients.emplace_back(std::static_pointer_cast(client->shared_from_this()), timeout, pos); + blockedClients_.insert(key, clients); + } INFO("{} is waited by {}, timeout {}", key, client->GetName(), timeout); return true; @@ -200,7 +210,7 @@ size_t PStore::BlockedClients::UnblockClient(PClient* client) { const auto& keys = client->WaitingKeys(); for (const auto& key : keys) { - Clients& clients = blockedClients_[key]; + Clients clients = blockedClients_[key]; assert(!clients.empty()); for (auto it(clients.begin()); it != clients.end(); ++it) { @@ -213,6 +223,7 @@ size_t PStore::BlockedClients::UnblockClient(PClient* client) { break; } } + blockedClients_.assign(key, clients); } client->ClearWaitingKeys(); @@ -227,7 +238,7 @@ size_t PStore::BlockedClients::ServeClient(const PString& key, const PLIST& list return 0; } - Clients& clients = it->second; + Clients clients = it->second; if (clients.empty()) { return 0; } @@ -300,6 +311,7 @@ size_t PStore::BlockedClients::ServeClient(const PString& key, const PLIST& list ++nServed; } else { clients.pop_front(); + blockedClients_.assign(it->first, clients); } } @@ -310,7 +322,8 @@ int PStore::BlockedClients::LoopCheck(uint64_t now) { int n = 0; for (auto it(blockedClients_.begin()); it != blockedClients_.end() && n < 100;) { - Clients& clients = it->second; + // iterator in folly::ConcurrentHashMap is always const + Clients clients = it->second; for (auto cli(clients.begin()); cli != clients.end();) { if (std::get<1>(*cli) < now) { // timeout ++n; @@ -326,13 +339,15 @@ int PStore::BlockedClients::LoopCheck(uint64_t now) { } clients.erase(cli++); + // clients is a copy, once it's modified, map should update it + blockedClients_.assign(it->first, clients); } else { ++cli; } } if (clients.empty()) { - blockedClients_.erase(it++); + it = blockedClients_.erase(it); } else { ++it; } @@ -381,14 +396,14 @@ int PStore::GetDB() const { return dbno_; } const PObject* PStore::GetObject(const PString& key) const { auto db = &dbs_[dbno_]; - FDB::const_iterator it(db->find(key)); + PDB::const_iterator it(db->find(key)); if (it != db->end()) { return &it->second; } if (!backends_.empty()) { // if it's in dirty list, it must be deleted, wait sync to backend - if (waitSyncKeys_[dbno_].count(key)) { + if (waitSyncKeys_[dbno_].find(key) != waitSyncKeys_[dbno_].end()) { return nullptr; } @@ -420,7 +435,7 @@ bool PStore::DeleteKey(const PString& key) { auto db = &dbs_[dbno_]; // add to dirty queue if (!waitSyncKeys_.empty()) { - waitSyncKeys_[dbno_][key] = nullptr; // null implies delete data + waitSyncKeys_[dbno_].insert_or_assign(key, nullptr); // null implies delete data } size_t ret = 0; @@ -448,10 +463,9 @@ PType PStore::KeyType(const PString& key) const { return PType(obj->type); } -static bool RandomMember(const FDB& hash, PString& res, PObject** val) { - FDB::const_iterator it = KeyRandomHashMember(hash); +static bool RandomMember(const PDB& hash, PString& res, PObject** val) { + PDB::const_iterator it = FollyRandomHashMember(hash); - // FIXME: bugs here if (it != hash.cend()) { res = it->first; if (val) { @@ -477,12 +491,12 @@ size_t PStore::ScanKey(size_t cursor, size_t count, std::vector& res) c return 0; } - std::vector iters; - size_t newCursor = ScanHashMember(dbs_[dbno_], cursor, count, iters); + std::vector iters; + size_t newCursor = FollyScanHashMember(dbs_[dbno_], cursor, count, iters); res.reserve(iters.size()); for (const auto& it : iters) { - res.push_back(it->first); + res.push_back(it); } return newCursor; @@ -530,18 +544,15 @@ PError PStore::getValueByType(const PString& key, PObject*& value, PType type, b PObject* PStore::SetValue(const PString& key, PObject&& value) { auto db = &dbs_[dbno_]; value.lru = PObject::lruclock; - const auto& [_, status] = db->try_emplace(key, std::move(value)); - // if this is an update cmd, try_emplace will return false - if (!status) { - db->assign(key, std::move(value)); - } - const PObject& obj = db->find(key)->second; + auto [realObj, status] = db->insert_or_assign(key, std::move(value)); + const PObject& obj = realObj->second; // put this key to sync list if (!waitSyncKeys_.empty()) { - waitSyncKeys_[dbno_][key] = &obj; + waitSyncKeys_[dbno_].insert_or_assign(key, &obj); } + // XXX: any better solution without const_cast? return const_cast(&obj); } @@ -567,7 +578,7 @@ void PStore::InitExpireTimer() { } void PStore::ResetDB() { - std::vector(dbs_.size()).swap(dbs_); + std::vector(dbs_.size()).swap(dbs_); std::vector(expiredDBs_.size()).swap(expiredDBs_); std::vector(blockedClients_.size()).swap(blockedClients_); dbno_ = 0; @@ -741,14 +752,14 @@ void PStore::AddDirtyKey(const PString& key) { if (!waitSyncKeys_.empty()) { PObject* obj = nullptr; GetValue(key, obj); - waitSyncKeys_[dbno_][key] = obj; + waitSyncKeys_[dbno_].insert_or_assign(key, obj); } } void PStore::AddDirtyKey(const PString& key, const PObject* value) { // put this key to sync list if (!waitSyncKeys_.empty()) { - waitSyncKeys_[dbno_][key] = value; + waitSyncKeys_[dbno_].insert_or_assign(key, value); } } diff --git a/src/store.h b/src/store.h index 5b57f0f43..081d2cdcc 100644 --- a/src/store.h +++ b/src/store.h @@ -82,8 +82,7 @@ struct PObject { class PClient; -//using PDB = std::unordered_map >; -using FDB = folly::ConcurrentHashMap>; +using PDB = folly::ConcurrentHashMap>; const int kMaxDBNum = 65536; @@ -108,12 +107,8 @@ class PStore { size_t ScanKey(size_t cursor, size_t count, std::vector& res) const; // iterator - FDB::const_iterator begin() const { return dbs_[dbno_].begin(); } - FDB::const_iterator end() const { return dbs_[dbno_].end(); } - -// FIXME: folly::ConcurrentHashMap doesn't support non-const iterator -// PDB::iterator begin() { return dbs_[dbno_].begin(); } -// PDB::iterator end() { return dbs_[dbno_].end(); } + PDB::const_iterator begin() const { return dbs_[dbno_].begin(); } + PDB::const_iterator end() const { return dbs_[dbno_].end(); } const PObject* GetObject(const PString& key) const; PError GetValue(const PString& key, PObject*& value, bool touch = true); @@ -177,7 +172,7 @@ class PStore { int LoopCheck(uint64_t now); private: - using P_EXPIRE_DB = std::unordered_map >; + using P_EXPIRE_DB = folly::ConcurrentHashMap>; P_EXPIRE_DB expireKeys_; // all the keys to be expired, unordered. }; @@ -193,7 +188,7 @@ class PStore { private: using Clients = std::list, uint64_t, ListPosition> >; - using WaitingList = std::unordered_map; + using WaitingList = folly::ConcurrentHashMap>; WaitingList blockedClients_; }; @@ -201,12 +196,12 @@ class PStore { PError setValue(const PString& key, PObject& value, bool exclusive = false); // Because GetObject() must be const, so mutable them - mutable std::vector dbs_; + mutable std::vector dbs_; mutable std::vector expiredDBs_; std::vector blockedClients_; std::vector > backends_; - using ToSyncDB = std::unordered_map >; + using ToSyncDB = folly::ConcurrentHashMap >; std::vector waitSyncKeys_; int dbno_ = -1; }; From 7aafd541472d050787171378d489777bc084c9a3 Mon Sep 17 00:00:00 2001 From: Shiyuan Ji <919745273@qq.com> Date: Wed, 8 Nov 2023 10:55:03 +0800 Subject: [PATCH 4/4] update random_device and find --- src/helper.h | 8 ++++++-- src/store.cc | 2 +- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/src/helper.h b/src/helper.h index 3bd7e942d..bbb845671 100644 --- a/src/helper.h +++ b/src/helper.h @@ -7,7 +7,7 @@ #pragma once -#include +#include #include #include "pstring.h" @@ -56,7 +56,11 @@ inline typename HASH::const_iterator FollyRandomHashMember(const HASH& container } it = container.cbegin(); - size_t randomIdx = random() % container.size(); + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution<> dis(0, container.size() - 1); + + size_t randomIdx = dis(gen); while (randomIdx > 0) { randomIdx--; ++it; diff --git a/src/store.cc b/src/store.cc index a1bc7f03a..43c5a1c59 100644 --- a/src/store.cc +++ b/src/store.cc @@ -210,7 +210,7 @@ size_t PStore::BlockedClients::UnblockClient(PClient* client) { const auto& keys = client->WaitingKeys(); for (const auto& key : keys) { - Clients clients = blockedClients_[key]; + Clients clients = blockedClients_.find(key)->second; assert(!clients.empty()); for (auto it(clients.begin()); it != clients.end(); ++it) {