Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support zpopmin/zpopmax/zunionstore/zinterstore #332

Merged
merged 1 commit into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ Run this command, compare with redis use pipeline commands, try it.
- sadd scard srem sismember smembers sdiff sdiffstore sinter sinterstore sunion sunionstore smove spop srandmember sscan

#### sorted set commands
- zadd zcard zrank zrevrank zrem zincrby zscore zrange zrevrange zrangebyscore zrevrangebyscore zremrangebyrank zremrangebyscore
- zadd zcard zrank zrevrank zrem zincrby zscore zrange zrevrange zrangebyscore zrevrangebyscore zremrangebyrank zremrangebyscore zpopmin zpopmax zunionstore zinterstore

#### pubsub commands
- subscribe unsubscribe publish psubscribe punsubscribe pubsub
Expand Down
2 changes: 1 addition & 1 deletion README_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ RocksDB 可以配置为 PikiwiDB 的持久化存储引擎,可以存储更多

#### sorted set commands

- zadd zcard zrank zrevrank zrem zincrby zscore zrange zrevrange zrangebyscore zrevrangebyscore zremrangebyrank zremrangebyscore
- zadd zcard zrank zrevrank zrem zincrby zscore zrange zrevrange zrangebyscore zrevrangebyscore zremrangebyrank zremrangebyscore zpopmin zpopmax zunionstore zinterstore

#### pubsub commands

Expand Down
4 changes: 4 additions & 0 deletions src/base_cmd.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,10 @@ const std::string kCmdNameRPoplpush = "rpoplpush";

// zset cmd
const std::string kCmdNameZAdd = "zadd";
const std::string kCmdNameZPopMin = "zpopmin";
const std::string kCmdNameZPopMax = "zpopmax";
const std::string kCmdNameZInterstore = "zinterstore";
const std::string kCmdNameZUnionstore = "zunionstore";
const std::string kCmdNameZRevrange = "zrevrange";
const std::string kCmdNameZRangebyscore = "zrangebyscore";
const std::string kCmdNameZRemrangebyscore = "zremrangebyscore";
Expand Down
4 changes: 4 additions & 0 deletions src/cmd_table_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,10 @@ void CmdTableManager::InitCmdTable() {

// zset
ADD_COMMAND(ZAdd, -4);
ADD_COMMAND(ZPopMin, -2);
ADD_COMMAND(ZPopMax, -2);
ADD_COMMAND(ZInterstore, -4);
ADD_COMMAND(ZUnionstore, -4);
ADD_COMMAND(ZRevrange, -4);
ADD_COMMAND(ZRangebyscore, -4);
ADD_COMMAND(ZRemrangebyscore, 4);
Expand Down
181 changes: 181 additions & 0 deletions src/cmd_zset.cc
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,187 @@ void ZAddCmd::DoCmd(PClient* client) {
}
}

ZPopMinCmd::ZPopMinCmd(const std::string& name, int16_t arity)
: BaseCmd(name, arity, kCmdFlagsWrite, kAclCategoryWrite | kAclCategorySortedSet) {}

bool ZPopMinCmd::DoInitial(PClient* client) {
if (client->argv_.size() > 3) {
client->SetRes(CmdRes::kWrongNum, client->CmdName());
return false;
}

client->SetKey(client->argv_[1]);
return true;
}

void ZPopMinCmd::DoCmd(PClient* client) {
int32_t count = 1;
if (client->argv_.size() == 3) {
if (pstd::String2int(client->argv_[2].data(), client->argv_[2].size(), &count) == 0) {
client->SetRes(CmdRes::kInvalidInt);
return;
}
}

std::vector<storage::ScoreMember> score_members;
storage::Status s =
PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->ZPopMin(client->Key(), count, &score_members);
if (s.ok()) {
char buf[32];
int64_t len = 0;
client->AppendArrayLen(static_cast<int64_t>(score_members.size()) * 2);
for (auto& score_member : score_members) {
client->AppendStringLenUint64(score_member.member.size());
client->AppendContent(score_member.member);
len = pstd::D2string(buf, sizeof(buf), score_member.score);
client->AppendStringLen(len);
client->AppendContent(buf);
}
} else {
client->SetRes(CmdRes::kErrOther, s.ToString());
}
}

ZPopMaxCmd::ZPopMaxCmd(const std::string& name, int16_t arity)
: BaseCmd(name, arity, kCmdFlagsWrite, kAclCategoryWrite | kAclCategorySortedSet) {}

bool ZPopMaxCmd::DoInitial(PClient* client) {
if (client->argv_.size() > 3) {
client->SetRes(CmdRes::kWrongNum, client->CmdName());
return false;
}

client->SetKey(client->argv_[1]);
return true;
}

void ZPopMaxCmd::DoCmd(PClient* client) {
int32_t count = 1;
if (client->argv_.size() == 3) {
if (pstd::String2int(client->argv_[2].data(), client->argv_[2].size(), &count) == 0) {
client->SetRes(CmdRes::kInvalidInt);
return;
}
}

std::vector<storage::ScoreMember> score_members;
storage::Status s =
PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->ZPopMax(client->Key(), count, &score_members);
if (s.ok()) {
char buf[32];
int64_t len = 0;
client->AppendArrayLen(static_cast<int64_t>(score_members.size()) * 2);
for (auto& score_member : score_members) {
client->AppendStringLenUint64(score_member.member.size());
client->AppendContent(score_member.member);
len = pstd::D2string(buf, sizeof(buf), score_member.score);
client->AppendStringLen(len);
client->AppendContent(buf);
}
} else {
client->SetRes(CmdRes::kErrOther, s.ToString());
}
}

ZsetUIstoreParentCmd::ZsetUIstoreParentCmd(const std::string& name, int16_t arity)
: BaseCmd(name, arity, kCmdFlagsWrite, kAclCategoryWrite | kAclCategorySortedSet) {}

// ZINTERSTORE destination numkeys key [key ...] [WEIGHTS weight [weight ...]] [AGGREGATE <SUM | MIN | MAX>]
// ZUNIONSTORE destination numkeys key [key ...] [WEIGHTS weight [weight ...]] [AGGREGATE <SUM | MIN | MAX>]
bool ZsetUIstoreParentCmd::DoInitial(PClient* client) {
auto argv_ = client->argv_;
dest_key_ = argv_[1];
if (pstd::String2int(argv_[2].data(), argv_[2].size(), &num_keys_) == 0) {
client->SetRes(CmdRes::kInvalidInt);
return false;
}
if (num_keys_ < 1) {
client->SetRes(CmdRes::kErrOther, "at least 1 input key is needed for ZUNIONSTORE/ZINTERSTORE");
return false;
}
auto argc = argv_.size();
if (argc < num_keys_ + 3) {
client->SetRes(CmdRes::kSyntaxErr);
return false;
}
keys_.assign(argv_.begin() + 3, argv_.begin() + 3 + num_keys_);
weights_.assign(num_keys_, 1);
auto index = num_keys_ + 3;
while (index < argc) {
if (strcasecmp(argv_[index].data(), "weights") == 0) {
index++;
if (argc < index + num_keys_) {
client->SetRes(CmdRes::kSyntaxErr);
return false;
}
double weight;
auto base = index;
for (; index < base + num_keys_; index++) {
if (pstd::String2d(argv_[index].data(), argv_[index].size(), &weight) == 0) {
client->SetRes(CmdRes::kErrOther, "weight value is not a float");
return false;
}
weights_[index - base] = weight;
}
} else if (strcasecmp(argv_[index].data(), "aggregate") == 0) {
index++;
if (argc < index + 1) {
client->SetRes(CmdRes::kSyntaxErr);
return false;
}
if (strcasecmp(argv_[index].data(), "sum") == 0) {
aggregate_ = storage::SUM;
} else if (strcasecmp(argv_[index].data(), "min") == 0) {
aggregate_ = storage::MIN;
} else if (strcasecmp(argv_[index].data(), "max") == 0) {
aggregate_ = storage::MAX;
} else {
client->SetRes(CmdRes::kSyntaxErr);
return false;
}
index++;
} else {
client->SetRes(CmdRes::kSyntaxErr);
return false;
}
}
return true;
}

ZInterstoreCmd::ZInterstoreCmd(const std::string& name, int16_t arity) : ZsetUIstoreParentCmd(name, arity) {}

bool ZInterstoreCmd::DoInitial(PClient* client) { return ZsetUIstoreParentCmd::DoInitial(client); }

void ZInterstoreCmd::DoCmd(PClient* client) {
int32_t count = 0;
std::vector<storage::ScoreMember> value_to_dest_;
storage::Status s = PSTORE.GetBackend(client->GetCurrentDB())
->GetStorage()
->ZInterstore(dest_key_, keys_, weights_, aggregate_, value_to_dest_, &count);
if (s.ok()) {
client->AppendInteger(count);
} else {
client->SetRes(CmdRes::kErrOther, s.ToString());
}
}

ZUnionstoreCmd::ZUnionstoreCmd(const std::string& name, int16_t arity) : ZsetUIstoreParentCmd(name, arity) {}

bool ZUnionstoreCmd::DoInitial(PClient* client) { return ZsetUIstoreParentCmd::DoInitial(client); }

void ZUnionstoreCmd::DoCmd(PClient* client) {
int32_t count = 0;
std::map<std::string, double> value_to_dest;
storage::Status s = PSTORE.GetBackend(client->GetCurrentDB())
->GetStorage()
->ZUnionstore(dest_key_, keys_, weights_, aggregate_, value_to_dest, &count);
if (s.ok()) {
client->AppendInteger(count);
} else {
client->SetRes(CmdRes::kErrOther, s.ToString());
}
}

ZRevrangeCmd::ZRevrangeCmd(const std::string& name, int16_t arity)
: BaseCmd(name, arity, kCmdFlagsReadonly, kAclCategoryRead | kAclCategorySortedSet) {}

Expand Down
58 changes: 58 additions & 0 deletions src/cmd_zset.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,64 @@ class ZAddCmd : public BaseCmd {
void DoCmd(PClient *client) override;
};

class ZPopMinCmd : public BaseCmd {
public:
ZPopMinCmd(const std::string &name, int16_t arity);

protected:
bool DoInitial(PClient *client) override;

private:
void DoCmd(PClient *client) override;
};

class ZPopMaxCmd : public BaseCmd {
public:
ZPopMaxCmd(const std::string &name, int16_t arity);

protected:
bool DoInitial(PClient *client) override;

private:
void DoCmd(PClient *client) override;
};

class ZsetUIstoreParentCmd : public BaseCmd {
public:
ZsetUIstoreParentCmd(const std::string &name, int16_t arity);

protected:
bool DoInitial(PClient *client) override;

std::string dest_key_;
int64_t num_keys_ = 0;
storage::AGGREGATE aggregate_{storage::SUM};
std::vector<std::string> keys_;
std::vector<double> weights_;
};

class ZInterstoreCmd : public ZsetUIstoreParentCmd {
public:
ZInterstoreCmd(const std::string &name, int16_t arity);

protected:
bool DoInitial(PClient *client) override;

private:
void DoCmd(PClient *client) override;
};

class ZUnionstoreCmd : public ZsetUIstoreParentCmd {
public:
ZUnionstoreCmd(const std::string &name, int16_t arity);

protected:
bool DoInitial(PClient *client) override;

private:
void DoCmd(PClient *client) override;
};

class ZRevrangeCmd : public BaseCmd {
public:
ZRevrangeCmd(const std::string &name, int16_t arity);
Expand Down
Loading
Loading