Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improvement(hashmap): use folly ConcurrentHashMap replace unordered_map #29

Merged
merged 4 commits into from
Nov 8, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ cmake-build-release

# Log path
make_config.mk
log/
logs/
lib/
output/

Expand Down
37 changes: 31 additions & 6 deletions src/helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,25 +49,50 @@ inline typename HASH::const_local_iterator RandomHashMember(const HASH& containe
}

template <typename HASH>
inline typename HASH::const_iterator KeyRandomHashMember(const HASH& container) {
inline typename HASH::const_iterator FollyRandomHashMember(const HASH& container) {
auto it = container.cend();
if (container.empty()) {
return it;
}

it = container.cbegin();
// FIXME: don't iterator, use O(1) algorithm
size_t randomIdx = random() % container.size();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里随机函数推荐使用 C++的随机函数, 像这里的

std::random_device rd;

这里我的锅, 在移植 pstd的时候, 没有把 随机函数 单独 包装好

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

for (size_t i = 0; i < randomIdx; ++i) {
while (randomIdx > 0) {
randomIdx--;
++it;
}

return it;
}

template <typename HASH>
inline size_t ScanHashMember(const HASH& container, size_t cursor, size_t count,
std::vector<typename HASH::const_iterator>& res) {
// TODO(century): wait to do with folly concurrent hashmap
inline size_t FollyScanHashMember(const HASH& container, size_t cursor, size_t count,
std::vector<PString>& res) {
if (cursor >= container.size()) {
return 0;
}

// find corresponding iterator
size_t idx = cursor;
auto it = container.cbegin();
while (idx > 0) {
--idx;
++it;
}

size_t new_cursor = cursor;
auto end = container.cend();
while (res.size() < count && it != end) {
++new_cursor;
res.push_back(it->first);
++it;
}

// not the last element of map
if (it != end) {
return new_cursor;
}

return 0;
}

Expand Down
65 changes: 38 additions & 27 deletions src/store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ void PObject::freeValue() {

int PStore::dirty_ = 0;

void PStore::ExpiredDB::SetExpire(const PString& key, uint64_t when) { expireKeys_[key] = when; }
void PStore::ExpiredDB::SetExpire(const PString& key, uint64_t when) { expireKeys_.insert_or_assign(key, when); }

int64_t PStore::ExpiredDB::TTL(const PString& key, uint64_t now) {
if (!PSTORE.ExistsKey(key)) {
Expand Down Expand Up @@ -147,6 +147,7 @@ PStore::ExpireResult PStore::ExpiredDB::ExpireIfNeed(const PString& key, uint64_

WARN("Delete timeout key {}", it->first);
PSTORE.DeleteKey(it->first);
// XXX: may throw exception if hash function crash
expireKeys_.erase(it);
return ExpireResult::expired;
}
Expand All @@ -170,7 +171,7 @@ int PStore::ExpiredDB::LoopCheck(uint64_t now) {
Propagate(params);

PSTORE.DeleteKey(it->first);
expireKeys_.erase(it++);
it = expireKeys_.erase(it);

++nDel;
} else {
Expand All @@ -188,8 +189,17 @@ bool PStore::BlockedClients::BlockClient(const PString& key, PClient* client, ui
return false;
}

Clients& clients = blockedClients_[key];
clients.push_back(Clients::value_type(std::static_pointer_cast<PClient>(client->shared_from_this()), timeout, pos));
auto it = blockedClients_.find(key);
if (it != blockedClients_.end()) {
// since folly doesn't support modify value directly, we have to make a copy to update atomically
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里有没有并发安全问题?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

迭代器是一个不可变量;此外在官方的示例中,如果当前线程获取了某个迭代器,其余线程的更改对于当前线程是感知不到的,因此我认为是线程安全的

auto clients = it->second;
clients.emplace_back(std::static_pointer_cast<PClient>(client->shared_from_this()), timeout, pos);
blockedClients_.assign(key, clients);
} else {
std::list<std::tuple<std::weak_ptr<PClient>, uint64_t, ListPosition>> clients;
clients.emplace_back(std::static_pointer_cast<PClient>(client->shared_from_this()), timeout, pos);
blockedClients_.insert(key, clients);
}

INFO("{} is waited by {}, timeout {}", key, client->GetName(), timeout);
return true;
Expand All @@ -200,7 +210,7 @@ size_t PStore::BlockedClients::UnblockClient(PClient* client) {
const auto& keys = client->WaitingKeys();

for (const auto& key : keys) {
Clients& clients = blockedClients_[key];
Clients clients = blockedClients_[key];
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里用find会不会更好一点

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

嗯对,这里应该用find的

assert(!clients.empty());

for (auto it(clients.begin()); it != clients.end(); ++it) {
Expand All @@ -213,6 +223,7 @@ size_t PStore::BlockedClients::UnblockClient(PClient* client) {
break;
}
}
blockedClients_.assign(key, clients);
}

client->ClearWaitingKeys();
Expand All @@ -227,7 +238,7 @@ size_t PStore::BlockedClients::ServeClient(const PString& key, const PLIST& list
return 0;
}

Clients& clients = it->second;
Clients clients = it->second;
if (clients.empty()) {
return 0;
}
Expand Down Expand Up @@ -300,6 +311,7 @@ size_t PStore::BlockedClients::ServeClient(const PString& key, const PLIST& list
++nServed;
} else {
clients.pop_front();
blockedClients_.assign(it->first, clients);
}
}

Expand All @@ -310,7 +322,8 @@ int PStore::BlockedClients::LoopCheck(uint64_t now) {
int n = 0;

for (auto it(blockedClients_.begin()); it != blockedClients_.end() && n < 100;) {
Clients& clients = it->second;
// iterator in folly::ConcurrentHashMap is always const
Clients clients = it->second;
for (auto cli(clients.begin()); cli != clients.end();) {
if (std::get<1>(*cli) < now) { // timeout
++n;
Expand All @@ -326,13 +339,15 @@ int PStore::BlockedClients::LoopCheck(uint64_t now) {
}

clients.erase(cli++);
// clients is a copy, once it's modified, map should update it
blockedClients_.assign(it->first, clients);
} else {
++cli;
}
}

if (clients.empty()) {
blockedClients_.erase(it++);
it = blockedClients_.erase(it);
} else {
++it;
}
Expand Down Expand Up @@ -381,14 +396,14 @@ int PStore::GetDB() const { return dbno_; }

const PObject* PStore::GetObject(const PString& key) const {
auto db = &dbs_[dbno_];
FDB::const_iterator it(db->find(key));
PDB::const_iterator it(db->find(key));
if (it != db->end()) {
return &it->second;
}

if (!backends_.empty()) {
// if it's in dirty list, it must be deleted, wait sync to backend
if (waitSyncKeys_[dbno_].count(key)) {
if (waitSyncKeys_[dbno_].find(key) != waitSyncKeys_[dbno_].end()) {
return nullptr;
}

Expand Down Expand Up @@ -420,7 +435,7 @@ bool PStore::DeleteKey(const PString& key) {
auto db = &dbs_[dbno_];
// add to dirty queue
if (!waitSyncKeys_.empty()) {
waitSyncKeys_[dbno_][key] = nullptr; // null implies delete data
waitSyncKeys_[dbno_].insert_or_assign(key, nullptr); // null implies delete data
}

size_t ret = 0;
Expand Down Expand Up @@ -448,10 +463,9 @@ PType PStore::KeyType(const PString& key) const {
return PType(obj->type);
}

static bool RandomMember(const FDB& hash, PString& res, PObject** val) {
FDB::const_iterator it = KeyRandomHashMember(hash);
static bool RandomMember(const PDB& hash, PString& res, PObject** val) {
PDB::const_iterator it = FollyRandomHashMember(hash);

// FIXME: bugs here
if (it != hash.cend()) {
res = it->first;
if (val) {
Expand All @@ -477,12 +491,12 @@ size_t PStore::ScanKey(size_t cursor, size_t count, std::vector<PString>& res) c
return 0;
}

std::vector<FDB::const_iterator> iters;
size_t newCursor = ScanHashMember(dbs_[dbno_], cursor, count, iters);
std::vector<PString> iters;
size_t newCursor = FollyScanHashMember(dbs_[dbno_], cursor, count, iters);

res.reserve(iters.size());
for (const auto& it : iters) {
res.push_back(it->first);
res.push_back(it);
}

return newCursor;
Expand Down Expand Up @@ -530,18 +544,15 @@ PError PStore::getValueByType(const PString& key, PObject*& value, PType type, b
PObject* PStore::SetValue(const PString& key, PObject&& value) {
auto db = &dbs_[dbno_];
value.lru = PObject::lruclock;
const auto& [_, status] = db->try_emplace(key, std::move(value));
// if this is an update cmd, try_emplace will return false
if (!status) {
db->assign(key, std::move(value));
}
const PObject& obj = db->find(key)->second;
auto [realObj, status] = db->insert_or_assign(key, std::move(value));
const PObject& obj = realObj->second;

// put this key to sync list
if (!waitSyncKeys_.empty()) {
waitSyncKeys_[dbno_][key] = &obj;
waitSyncKeys_[dbno_].insert_or_assign(key, &obj);
}

// XXX: any better solution without const_cast?
return const_cast<PObject*>(&obj);
}

Expand All @@ -567,7 +578,7 @@ void PStore::InitExpireTimer() {
}

void PStore::ResetDB() {
std::vector<FDB>(dbs_.size()).swap(dbs_);
std::vector<PDB>(dbs_.size()).swap(dbs_);
std::vector<ExpiredDB>(expiredDBs_.size()).swap(expiredDBs_);
std::vector<BlockedClients>(blockedClients_.size()).swap(blockedClients_);
dbno_ = 0;
Expand Down Expand Up @@ -741,14 +752,14 @@ void PStore::AddDirtyKey(const PString& key) {
if (!waitSyncKeys_.empty()) {
PObject* obj = nullptr;
GetValue(key, obj);
waitSyncKeys_[dbno_][key] = obj;
waitSyncKeys_[dbno_].insert_or_assign(key, obj);
}
}

void PStore::AddDirtyKey(const PString& key, const PObject* value) {
// put this key to sync list
if (!waitSyncKeys_.empty()) {
waitSyncKeys_[dbno_][key] = value;
waitSyncKeys_[dbno_].insert_or_assign(key, value);
}
}

Expand Down
19 changes: 7 additions & 12 deletions src/store.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,7 @@ struct PObject {

class PClient;

//using PDB = std::unordered_map<PString, PObject, my_hash, std::equal_to<PString> >;
using FDB = folly::ConcurrentHashMap<PString, PObject, my_hash, std::equal_to<PString>>;
using PDB = folly::ConcurrentHashMap<PString, PObject, my_hash, std::equal_to<PString>>;

const int kMaxDBNum = 65536;

Expand All @@ -108,12 +107,8 @@ class PStore {
size_t ScanKey(size_t cursor, size_t count, std::vector<PString>& res) const;

// iterator
FDB::const_iterator begin() const { return dbs_[dbno_].begin(); }
FDB::const_iterator end() const { return dbs_[dbno_].end(); }

// FIXME: folly::ConcurrentHashMap doesn't support non-const iterator
// PDB::iterator begin() { return dbs_[dbno_].begin(); }
// PDB::iterator end() { return dbs_[dbno_].end(); }
PDB::const_iterator begin() const { return dbs_[dbno_].begin(); }
PDB::const_iterator end() const { return dbs_[dbno_].end(); }

const PObject* GetObject(const PString& key) const;
PError GetValue(const PString& key, PObject*& value, bool touch = true);
Expand Down Expand Up @@ -177,7 +172,7 @@ class PStore {
int LoopCheck(uint64_t now);

private:
using P_EXPIRE_DB = std::unordered_map<PString, uint64_t, my_hash, std::equal_to<PString> >;
using P_EXPIRE_DB = folly::ConcurrentHashMap<PString, uint64_t, my_hash, std::equal_to<PString>>;
P_EXPIRE_DB expireKeys_; // all the keys to be expired, unordered.
};

Expand All @@ -193,20 +188,20 @@ class PStore {

private:
using Clients = std::list<std::tuple<std::weak_ptr<PClient>, uint64_t, ListPosition> >;
using WaitingList = std::unordered_map<PString, Clients>;
using WaitingList = folly::ConcurrentHashMap<PString, Clients, my_hash, std::equal_to<PString>>;

WaitingList blockedClients_;
};

PError setValue(const PString& key, PObject& value, bool exclusive = false);

// Because GetObject() must be const, so mutable them
mutable std::vector<FDB> dbs_;
mutable std::vector<PDB> dbs_;
mutable std::vector<ExpiredDB> expiredDBs_;
std::vector<BlockedClients> blockedClients_;
std::vector<std::unique_ptr<PDumpInterface> > backends_;

using ToSyncDB = std::unordered_map<PString, const PObject*, my_hash, std::equal_to<PString> >;
using ToSyncDB = folly::ConcurrentHashMap<PString, const PObject*, my_hash, std::equal_to<PString> >;
std::vector<ToSyncDB> waitSyncKeys_;
int dbno_ = -1;
};
Expand Down