From 2c1ca16cfc29d8a80e8f2d9d295bf5171e79b1fa Mon Sep 17 00:00:00 2001 From: HappyUncle Date: Sun, 26 May 2024 12:19:53 +0800 Subject: [PATCH] feat: support zpopmin/zpopmax/zunionstore/zinterstore issue: #30 issue: #280 Signed-off-by: HappyUncle --- src/base_cmd.h | 4 + src/cmd_table_manager.cc | 4 + src/cmd_zset.cc | 185 +++++++++++++++++++++++++++++++++ src/cmd_zset.h | 58 +++++++++++ src/storage/src/redis_zsets.cc | 44 ++++---- tests/consistency_test.go | 68 ++++++++++++ tests/zset_test.go | 178 +++++++++++++++++++++++++++++++ 7 files changed, 519 insertions(+), 22 deletions(-) diff --git a/src/base_cmd.h b/src/base_cmd.h index 0af27012b..b3cb92d02 100644 --- a/src/base_cmd.h +++ b/src/base_cmd.h @@ -133,6 +133,10 @@ const std::string kCmdNameLLen = "llen"; // zset cmd const std::string kCmdNameZAdd = "zadd"; +const std::string kCmdNameZPopMin = "zpopmin"; +const std::string kCmdNameZPopMax = "zpopmax"; +const std::string kCmdNameZInterstore = "zinterstore"; +const std::string kCmdNameZUnionstore = "zunionstore"; const std::string kCmdNameZRevrange = "zrevrange"; const std::string kCmdNameZRangebyscore = "zrangebyscore"; const std::string kCmdNameZRemrangebyscore = "zremrangebyscore"; diff --git a/src/cmd_table_manager.cc b/src/cmd_table_manager.cc index 26f910854..75044fb63 100644 --- a/src/cmd_table_manager.cc +++ b/src/cmd_table_manager.cc @@ -143,6 +143,10 @@ void CmdTableManager::InitCmdTable() { // zset ADD_COMMAND(ZAdd, -4); + ADD_COMMAND(ZPopMin, -2); + ADD_COMMAND(ZPopMax, -2); + ADD_COMMAND(ZInterstore, -4); + ADD_COMMAND(ZUnionstore, -4); ADD_COMMAND(ZRevrange, -4); ADD_COMMAND(ZRangebyscore, -4); ADD_COMMAND(ZRemrangebyscore, 4); diff --git a/src/cmd_zset.cc b/src/cmd_zset.cc index 8fc8d02fb..d21bef662 100644 --- a/src/cmd_zset.cc +++ b/src/cmd_zset.cc @@ -117,6 +117,191 @@ void ZAddCmd::DoCmd(PClient* client) { } } +ZPopMinCmd::ZPopMinCmd(const std::string& name, int16_t arity) + : BaseCmd(name, arity, kCmdFlagsWrite, kAclCategoryWrite | kAclCategorySortedSet) {} + +bool ZPopMinCmd::DoInitial(PClient* client) { + if (client->argv_.size() > 3) { + client->SetRes(CmdRes::kWrongNum, client->CmdName()); + return false; + } + + client->SetKey(client->argv_[1]); + return true; +} + +void ZPopMinCmd::DoCmd(PClient* client) { + int32_t count = 1; + if (client->argv_.size() == 3) { + if (pstd::String2int(client->argv_[2].data(), client->argv_[2].size(), &count) == 0) { + client->SetRes(CmdRes::kInvalidInt); + return; + } + } + + std::vector score_members; + storage::Status s = + PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->ZPopMin(client->Key(), count, &score_members); + if (s.ok()) { + char buf[32]; + int64_t len = 0; + client->AppendArrayLen(static_cast(score_members.size()) * 2); + for (auto & score_member : score_members) { + client->AppendStringLenUint64(score_member.member.size()); + client->AppendContent(score_member.member); + len = pstd::D2string(buf, sizeof(buf), score_member.score); + client->AppendStringLen(len); + client->AppendContent(buf); + } + } else { + client->SetRes(CmdRes::kErrOther, s.ToString()); + } +} + +ZPopMaxCmd::ZPopMaxCmd(const std::string& name, int16_t arity) + : BaseCmd(name, arity, kCmdFlagsWrite, kAclCategoryWrite | kAclCategorySortedSet) {} + +bool ZPopMaxCmd::DoInitial(PClient* client) { + if (client->argv_.size() > 3) { + client->SetRes(CmdRes::kWrongNum, client->CmdName()); + return false; + } + + client->SetKey(client->argv_[1]); + return true; +} + +void ZPopMaxCmd::DoCmd(PClient* client) { + int32_t count = 1; + if (client->argv_.size() == 3) { + if (pstd::String2int(client->argv_[2].data(), client->argv_[2].size(), &count) == 0) { + client->SetRes(CmdRes::kInvalidInt); + return; + } + } + + std::vector score_members; + storage::Status s = + PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->ZPopMax(client->Key(), count, &score_members); + if (s.ok()) { + char buf[32]; + int64_t len = 0; + client->AppendArrayLen(static_cast(score_members.size()) * 2); + for (auto & score_member : score_members) { + client->AppendStringLenUint64(score_member.member.size()); + client->AppendContent(score_member.member); + len = pstd::D2string(buf, sizeof(buf), score_member.score); + client->AppendStringLen(len); + client->AppendContent(buf); + } + } else { + client->SetRes(CmdRes::kErrOther, s.ToString()); + } +} + +ZsetUIstoreParentCmd::ZsetUIstoreParentCmd(const std::string& name, int16_t arity) + : BaseCmd(name, arity, kCmdFlagsWrite, kAclCategoryWrite | kAclCategorySortedSet) {} + +// ZINTERSTORE destination numkeys key [key ...] [WEIGHTS weight [weight ...]] [AGGREGATE ] +// ZUNIONSTORE destination numkeys key [key ...] [WEIGHTS weight [weight ...]] [AGGREGATE ] +bool ZsetUIstoreParentCmd::DoInitial(PClient* client) { + auto argv_ = client->argv_; + dest_key_ = argv_[1]; + if (pstd::String2int(argv_[2].data(), argv_[2].size(), &num_keys_) == 0) { + client->SetRes(CmdRes::kInvalidInt); + return false; + } + if (num_keys_ < 1) { + client->SetRes(CmdRes::kErrOther, "at least 1 input key is needed for ZUNIONSTORE/ZINTERSTORE"); + return false; + } + auto argc = argv_.size(); + if (argc < num_keys_ + 3) { + client->SetRes(CmdRes::kSyntaxErr); + return false; + } + keys_.assign(argv_.begin() + 3, argv_.begin() + 3 + num_keys_); + weights_.assign(num_keys_, 1); + auto index = num_keys_ + 3; + while (index < argc) { + if (strcasecmp(argv_[index].data(), "weights") == 0) { + index++; + if (argc < index + num_keys_) { + client->SetRes(CmdRes::kSyntaxErr); + return false; + } + double weight; + auto base = index; + for (; index < base + num_keys_; index++) { + if (pstd::String2d(argv_[index].data(), argv_[index].size(), &weight) == 0) { + client->SetRes(CmdRes::kErrOther, "weight value is not a float"); + return false; + } + weights_[index - base] = weight; + } + } else if (strcasecmp(argv_[index].data(), "aggregate") == 0) { + index++; + if (argc < index + 1) { + client->SetRes(CmdRes::kSyntaxErr); + return false; + } + if (strcasecmp(argv_[index].data(), "sum") == 0) { + aggregate_ = storage::SUM; + } else if (strcasecmp(argv_[index].data(), "min") == 0) { + aggregate_ = storage::MIN; + } else if (strcasecmp(argv_[index].data(), "max") == 0) { + aggregate_ = storage::MAX; + } else { + client->SetRes(CmdRes::kSyntaxErr); + return false; + } + index++; + } else { + client->SetRes(CmdRes::kSyntaxErr); + return false; + } + } + return true; +} + +ZInterstoreCmd::ZInterstoreCmd(const std::string& name, int16_t arity) + : ZsetUIstoreParentCmd(name, arity) {} + +bool ZInterstoreCmd::DoInitial(PClient* client) { + return ZsetUIstoreParentCmd::DoInitial(client); +} + +void ZInterstoreCmd::DoCmd(PClient* client) { + int32_t count = 0; + std::vector value_to_dest_; + storage::Status s = + PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->ZInterstore(dest_key_, keys_, weights_, aggregate_, value_to_dest_, &count); + if (s.ok()) { + client->AppendInteger(count); + } else { + client->SetRes(CmdRes::kErrOther, s.ToString()); + } +} + +ZUnionstoreCmd::ZUnionstoreCmd(const std::string& name, int16_t arity) + : ZsetUIstoreParentCmd(name, arity) {} + +bool ZUnionstoreCmd::DoInitial(PClient* client) { + return ZsetUIstoreParentCmd::DoInitial(client); +} + +void ZUnionstoreCmd::DoCmd(PClient* client) { + int32_t count = 0; + std::map value_to_dest; + storage::Status s = + PSTORE.GetBackend(client->GetCurrentDB())->GetStorage()->ZUnionstore(dest_key_, keys_, weights_, aggregate_, value_to_dest, &count); + if (s.ok()) { + client->AppendInteger(count); + } else { + client->SetRes(CmdRes::kErrOther, s.ToString()); + } +} + ZRevrangeCmd::ZRevrangeCmd(const std::string& name, int16_t arity) : BaseCmd(name, arity, kCmdFlagsReadonly, kAclCategoryRead | kAclCategorySortedSet) {} diff --git a/src/cmd_zset.h b/src/cmd_zset.h index 13049eaa9..0cdfb1016 100644 --- a/src/cmd_zset.h +++ b/src/cmd_zset.h @@ -23,6 +23,64 @@ class ZAddCmd : public BaseCmd { void DoCmd(PClient *client) override; }; +class ZPopMinCmd : public BaseCmd { + public: + ZPopMinCmd(const std::string &name, int16_t arity); + + protected: + bool DoInitial(PClient *client) override; + + private: + void DoCmd(PClient *client) override; +}; + +class ZPopMaxCmd : public BaseCmd { + public: + ZPopMaxCmd(const std::string &name, int16_t arity); + + protected: + bool DoInitial(PClient *client) override; + + private: + void DoCmd(PClient *client) override; +}; + +class ZsetUIstoreParentCmd : public BaseCmd { + public: + ZsetUIstoreParentCmd(const std::string& name, int16_t arity); + + protected: + bool DoInitial(PClient *client) override; + + std::string dest_key_; + int64_t num_keys_ = 0; + storage::AGGREGATE aggregate_{storage::SUM}; + std::vector keys_; + std::vector weights_; +}; + +class ZInterstoreCmd : public ZsetUIstoreParentCmd { + public: + ZInterstoreCmd(const std::string &name, int16_t arity); + + protected: + bool DoInitial(PClient *client) override; + + private: + void DoCmd(PClient *client) override; +}; + +class ZUnionstoreCmd : public ZsetUIstoreParentCmd { + public: + ZUnionstoreCmd(const std::string &name, int16_t arity); + + protected: + bool DoInitial(PClient *client) override; + + private: + void DoCmd(PClient *client) override; +}; + class ZRevrangeCmd : public BaseCmd { public: ZRevrangeCmd(const std::string &name, int16_t arity); diff --git a/src/storage/src/redis_zsets.cc b/src/storage/src/redis_zsets.cc index 3532b4cca..45f10ac80 100644 --- a/src/storage/src/redis_zsets.cc +++ b/src/storage/src/redis_zsets.cc @@ -109,7 +109,7 @@ Status Redis::ZsetsPKPatternMatchDel(const std::string& pattern, int32_t* ret) { Status Redis::ZPopMax(const Slice& key, const int64_t count, std::vector* score_members) { uint32_t statistic = 0; score_members->clear(); - rocksdb::WriteBatch batch; + auto batch = Batch::CreateBatch(this); ScopeRecordLock l(lock_mgr_, key); std::string meta_value; @@ -136,16 +136,16 @@ Status Redis::ZPopMax(const Slice& key, const int64_t count, std::vectorkey()); + batch->Delete(kZsetsDataCF, zsets_member_key.Encode()); + batch->Delete(kZsetsScoreCF, iter->key()); } delete iter; if (!parsed_zsets_meta_value.CheckModifyCount(-del_cnt)) { return Status::InvalidArgument("zset size overflow"); } parsed_zsets_meta_value.ModifyCount(-del_cnt); - batch.Put(handles_[kZsetsMetaCF], base_meta_key.Encode(), meta_value); - s = db_->Write(default_write_options_, &batch); + batch->Put(kZsetsMetaCF, base_meta_key.Encode(), meta_value); + s = batch->Commit(); UpdateSpecificKeyStatistics(DataType::kZSets, key.ToString(), statistic); return s; } @@ -157,7 +157,7 @@ Status Redis::ZPopMax(const Slice& key, const int64_t count, std::vector* score_members) { uint32_t statistic = 0; score_members->clear(); - rocksdb::WriteBatch batch; + auto batch = Batch::CreateBatch(this); ScopeRecordLock l(lock_mgr_, key); std::string meta_value; @@ -184,16 +184,16 @@ Status Redis::ZPopMin(const Slice& key, const int64_t count, std::vectorkey()); + batch->Delete(kZsetsDataCF, zsets_member_key.Encode()); + batch->Delete(kZsetsScoreCF, iter->key()); } delete iter; if (!parsed_zsets_meta_value.CheckModifyCount(-del_cnt)) { return Status::InvalidArgument("zset size overflow"); } parsed_zsets_meta_value.ModifyCount(-del_cnt); - batch.Put(handles_[kZsetsMetaCF], base_meta_key.Encode(), meta_value); - s = db_->Write(default_write_options_, &batch); + batch->Put(kZsetsMetaCF, base_meta_key.Encode(), meta_value); + s = batch->Commit(); UpdateSpecificKeyStatistics(DataType::kZSets, key.ToString(), statistic); return s; } @@ -1090,7 +1090,7 @@ Status Redis::ZUnionstore(const Slice& destination, const std::vector& value_to_dest, int32_t* ret) { *ret = 0; uint32_t statistic = 0; - rocksdb::WriteBatch batch; + auto batch = Batch::CreateBatch(this); rocksdb::ReadOptions read_options; const rocksdb::Snapshot* snapshot = nullptr; @@ -1158,13 +1158,13 @@ Status Redis::ZUnionstore(const Slice& destination, const std::vector(member_score_map.size())); - batch.Put(handles_[kZsetsMetaCF], base_destination.Encode(), meta_value); + batch->Put(kZsetsMetaCF, base_destination.Encode(), meta_value); } else { char buf[4]; EncodeFixed32(buf, member_score_map.size()); ZSetsMetaValue zsets_meta_value(Slice(buf, sizeof(int32_t))); version = zsets_meta_value.UpdateVersion(); - batch.Put(handles_[kZsetsMetaCF], base_destination.Encode(), zsets_meta_value.Encode()); + batch->Put(kZsetsMetaCF, base_destination.Encode(), zsets_meta_value.Encode()); } char score_buf[8]; @@ -1174,14 +1174,14 @@ Status Redis::ZUnionstore(const Slice& destination, const std::vector(&sm.second); EncodeFixed64(score_buf, *reinterpret_cast(ptr_score)); BaseDataValue member_i_val(Slice(score_buf, sizeof(uint64_t))); - batch.Put(handles_[kZsetsDataCF], zsets_member_key.Encode(), member_i_val.Encode()); + batch->Put(kZsetsDataCF, zsets_member_key.Encode(), member_i_val.Encode()); ZSetsScoreKey zsets_score_key(destination, version, sm.second, sm.first); BaseDataValue score_i_val(Slice{}); - batch.Put(handles_[kZsetsScoreCF], zsets_score_key.Encode(), score_i_val.Encode()); + batch->Put(kZsetsScoreCF, zsets_score_key.Encode(), score_i_val.Encode()); } *ret = static_cast(member_score_map.size()); - s = db_->Write(default_write_options_, &batch); + s = batch->Commit(); UpdateSpecificKeyStatistics(DataType::kZSets, destination.ToString(), statistic); value_to_dest = std::move(member_score_map); return s; @@ -1196,7 +1196,7 @@ Status Redis::ZInterstore(const Slice& destination, const std::vector(final_score_members.size())); - batch.Put(handles_[kZsetsMetaCF], base_destination.Encode(), meta_value); + batch->Put(kZsetsMetaCF, base_destination.Encode(), meta_value); } else { char buf[4]; EncodeFixed32(buf, final_score_members.size()); ZSetsMetaValue zsets_meta_value(Slice(buf, sizeof(int32_t))); version = zsets_meta_value.UpdateVersion(); - batch.Put(handles_[kZsetsMetaCF], base_destination.Encode(), zsets_meta_value.Encode()); + batch->Put(kZsetsMetaCF, base_destination.Encode(), zsets_meta_value.Encode()); } char score_buf[8]; for (const auto& sm : final_score_members) { @@ -1311,14 +1311,14 @@ Status Redis::ZInterstore(const Slice& destination, const std::vector(&sm.score); EncodeFixed64(score_buf, *reinterpret_cast(ptr_score)); BaseDataValue member_i_val(Slice(score_buf, sizeof(uint64_t))); - batch.Put(handles_[kZsetsDataCF], zsets_member_key.Encode(), member_i_val.Encode()); + batch->Put(kZsetsDataCF, zsets_member_key.Encode(), member_i_val.Encode()); ZSetsScoreKey zsets_score_key(destination, version, sm.score, sm.member); BaseDataValue zsets_score_i_val(Slice{}); - batch.Put(handles_[kZsetsScoreCF], zsets_score_key.Encode(), zsets_score_i_val.Encode()); + batch->Put(kZsetsScoreCF, zsets_score_key.Encode(), zsets_score_i_val.Encode()); } *ret = static_cast(final_score_members.size()); - s = db_->Write(default_write_options_, &batch); + s = batch->Commit(); UpdateSpecificKeyStatistics(DataType::kZSets, destination.ToString(), statistic); value_to_dest = std::move(final_score_members); return s; diff --git a/tests/consistency_test.go b/tests/consistency_test.go index 7d83e033b..6ab3a32fa 100644 --- a/tests/consistency_test.go +++ b/tests/consistency_test.go @@ -541,6 +541,74 @@ var _ = Describe("Consistency", Ordered, func() { } }) + It("ZPopMin & ZPopMax Consistency Test", func() { + const testKey = "ZSetsConsistencyTestKey" + i4 := redis.Z{Score: 4, Member: "z4"} + i5 := redis.Z{Score: 5, Member: "z5"} + i8 := redis.Z{Score: 8, Member: "z8"} + { + zadd, err := leader.ZAdd(ctx, testKey, i4, i5, i8).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(zadd).To(Equal(int64(3))) + + vals, err := leader.ZPopMin(ctx, testKey).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(vals).To(Equal([]redis.Z{i4})) + + vals, err = leader.ZPopMax(ctx, testKey).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(vals).To(Equal([]redis.Z{i8})) + + // read check + readChecker(func(c *redis.Client) { + zrange, err := c.ZRevRangeWithScores(ctx, testKey, 0, -1).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(zrange).To(Equal([]redis.Z{i5})) + }) + } + }) + + It("ZUnionstore & ZInterStore Consistency Test", func() { + i4 := redis.Z{Score: 4, Member: "z4"} + i5 := redis.Z{Score: 5, Member: "z5"} + i8 := redis.Z{Score: 8, Member: "z8"} + { + zadd, err := leader.ZAdd(ctx, "in1", i4, i5).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(zadd).To(Equal(int64(2))) + + zadd, err = leader.ZAdd(ctx, "in2", i4, i8).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(zadd).To(Equal(int64(2))) + + vals, err := leader.ZUnionStore(ctx, "out1", &redis.ZStore{ + Keys: []string{"in1", "in2"}, + Weights: []float64{1, 1}, + Aggregate: "MIN", + }).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(vals).To(Equal(int64(3))) + + vals, err = leader.ZInterStore(ctx, "out2", &redis.ZStore{ + Keys: []string{"in1", "in2"}, + Weights: []float64{1, 1}, + Aggregate: "MIN", + }).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(vals).To(Equal(int64(1))) + + readChecker(func(c *redis.Client) { + zrange, err := c.ZRevRangeWithScores(ctx, "out1", 0, -1).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(zrange).To(Equal([]redis.Z{i8, i5, i4})) + + zrange, err = c.ZRevRangeWithScores(ctx, "out2", 0, -1).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(zrange).To(Equal([]redis.Z{i4})) + }) + } + }) + It("SetBit Consistency Test", func() { const testKey = "StringsConsistencyTestKey" { diff --git a/tests/zset_test.go b/tests/zset_test.go index eab3e7838..4a1b7a5b9 100644 --- a/tests/zset_test.go +++ b/tests/zset_test.go @@ -477,4 +477,182 @@ var _ = Describe("Zset", Ordered, func() { Member: "three", }})) }) + + item1 := redis.Z{ + Score: 1, + Member: "one", + } + + item2 := redis.Z{ + Score: 2, + Member: "two", + } + + It("should ZPopMin", func() { + { + err := client.ZAdd(ctx, "ZPopMin", item1, item2).Err() + Expect(err).NotTo(HaveOccurred()) + vals, err := client.ZPopMin(ctx, "ZPopMin").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(vals).To(Equal([]redis.Z{item1})) + } + { + err := client.ZAdd(ctx, "ZPopMin", item1, item2).Err() + Expect(err).NotTo(HaveOccurred()) + vals, err := client.ZPopMin(ctx, "ZPopMin", 2).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(vals).To(Equal([]redis.Z{item1, item2})) + } + { + err := client.ZAdd(ctx, "ZPopMin", item1, item2).Err() + Expect(err).NotTo(HaveOccurred()) + vals, err := client.ZPopMin(ctx, "ZPopMin", 10).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(vals).To(Equal([]redis.Z{item1, item2})) + } + + }) + + It("should ZPopMax", func() { + + { + err := client.ZAdd(ctx, "ZPopMax", item1, item2).Err() + Expect(err).NotTo(HaveOccurred()) + vals, err := client.ZPopMax(ctx, "ZPopMax").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(vals).To(Equal([]redis.Z{item2})) + } + { + err := client.ZAdd(ctx, "ZPopMax", item1, item2).Err() + Expect(err).NotTo(HaveOccurred()) + vals, err := client.ZPopMax(ctx, "ZPopMax", 2).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(vals).To(Equal([]redis.Z{item2, item1})) + } + { + err := client.ZAdd(ctx, "ZPopMax", item1, item2).Err() + Expect(err).NotTo(HaveOccurred()) + vals, err := client.ZPopMax(ctx, "ZPopMax", 10).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(vals).To(Equal([]redis.Z{item2, item1})) + } + }) + + item20 := redis.Z{ + Score: 20, + Member: "two", + } + item30 := redis.Z{ + Score: 30, + Member: "three", + } + It("should ZInterstore", func() { + err := client.ZAdd(ctx, "in1", item1, item2).Err() + Expect(err).NotTo(HaveOccurred()) + err = client.ZAdd(ctx, "in2", item20, item30).Err() + Expect(err).NotTo(HaveOccurred()) + + res, err := client.ZInterStore(ctx, "out", &redis.ZStore{ + Keys: []string{"in1", "in2"}, + Weights: []float64{2, 3}, + Aggregate: "SUM", + }).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).To(Equal(int64(1))) + + vals, err := client.ZRangeWithScores(ctx, "out", 0, -1).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(vals).To(Equal([]redis.Z{{ + Score: 64, + Member: "two", + }})) + + res, err = client.ZInterStore(ctx, "out", &redis.ZStore{ + Keys: []string{"in1", "in2"}, + Weights: []float64{1, 1}, + Aggregate: "MIN", + }).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).To(Equal(int64(1))) + + vals, err = client.ZRangeWithScores(ctx, "out", 0, -1).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(vals).To(Equal([]redis.Z{{ + Score: 2, + Member: "two", + }})) + + res, err = client.ZInterStore(ctx, "out", &redis.ZStore{ + Keys: []string{"in1", "in2"}, + Weights: []float64{20, 1}, + Aggregate: "MAX", + }).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).To(Equal(int64(1))) + + vals, err = client.ZRangeWithScores(ctx, "out", 0, -1).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(vals).To(Equal([]redis.Z{{ + Score: 40, + Member: "two", + }})) + }) + + It("should ZUnionstore && ZInterStore", func() { + err := client.ZAdd(ctx, "in1", item1, item2).Err() + Expect(err).NotTo(HaveOccurred()) + err = client.ZAdd(ctx, "in2", item20, item30).Err() + Expect(err).NotTo(HaveOccurred()) + + res, err := client.ZUnionStore(ctx, "out", &redis.ZStore{ + Keys: []string{"in1", "in2"}, + Weights: []float64{1, 1}, + Aggregate: "SUM", + }).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).To(Equal(int64(3))) + + vals, err := client.ZRangeWithScores(ctx, "out", 0, -1).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(vals).To(Equal([]redis.Z{{ + Score: 1, + Member: "one", + }, { + Score: 22, + Member: "two", + }, { + Score: 30, + Member: "three", + }})) + + res, err = client.ZInterStore(ctx, "out", &redis.ZStore{ + Keys: []string{"in1", "in2"}, + Weights: []float64{1, 1}, + Aggregate: "MIN", + }).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).To(Equal(int64(1))) + + vals, err = client.ZRangeWithScores(ctx, "out", 0, -1).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(vals).To(Equal([]redis.Z{{ + Score: 2, + Member: "two", + }})) + + res, err = client.ZInterStore(ctx, "out", &redis.ZStore{ + Keys: []string{"in1", "in2"}, + Weights: []float64{2, 3}, + Aggregate: "MAX", + }).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).To(Equal(int64(1))) + + vals, err = client.ZRangeWithScores(ctx, "out", 0, -1).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(vals).To(Equal([]redis.Z{{ + Score: 60, + Member: "two", + }})) + }) })