From f1a1048fd973f63f969e861023ba9b7ba24bb811 Mon Sep 17 00:00:00 2001 From: longfar Date: Mon, 21 Oct 2024 17:05:14 +0800 Subject: [PATCH] fix: comment --- ppd/main.cc | 24 +++++++++++++++++++----- ppd/pd.proto | 17 +++++++++++++---- ppd/pd_service.cc | 2 ++ pproxy/main.cc | 24 ++++++++++++++++++------ pproxy/proxy.proto | 2 -- pproxy/proxy_service.cc | 41 ++++++++++++++++++++++++++++++++--------- pproxy/router.cc | 3 ++- pproxy/router.h | 5 +++-- pproxy/task_manager.cc | 30 ++++++++++++++++-------------- pproxy/task_manager.h | 16 ++++++++-------- src/base_cmd.cc | 2 +- src/cmd_admin.cc | 11 +++-------- src/cmd_raft.cc | 6 +++--- 13 files changed, 120 insertions(+), 63 deletions(-) diff --git a/ppd/main.cc b/ppd/main.cc index 2d6937b0..e3c79645 100644 --- a/ppd/main.cc +++ b/ppd/main.cc @@ -5,23 +5,37 @@ * of patent rights can be found in the PATENTS file in the same directory. */ +#include "brpc/server.h" +#include "butil/errno.h" +#include "gflags/gflags.h" +#include "spdlog/spdlog.h" + #include "pd_service.h" +DEFINE_int32(port, 8080, "Port of rpc server"); +DEFINE_int32(idle_timeout_s, 60, + "Connection will be closed if there is no " + "read/write operations during the last `idle_timeout_s`"); +DEFINE_int32(max_concurrency, 0, "Limit of request processing in parallel"); + int main(int argc, char* argv[]) { + GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true); brpc::Server server; - PlacementDriverServiceImpl service; if (server.AddService(&service, brpc::SERVER_OWNS_SERVICE) != 0) { - fprintf(stderr, "Fail to add service!\n"); + spdlog::error("Failed to add service for: {}", berror()); return -1; } + brpc::ServerOptions options; + options.idle_timeout_sec = FLAGS_idle_timeout_s; + options.max_concurrency = FLAGS_max_concurrency; + // 启动服务 - if (server.Start(8080, nullptr) != 0) { - fprintf(stderr, "Fail to start server!\n"); + if (server.Start(FLAGS_port, &options) != 0) { + spdlog::error("Failed to start server for: {}", berror()); return -1; } server.RunUntilAskedToQuit(); - return 0; } \ No newline at end of file diff --git a/ppd/pd.proto b/ppd/pd.proto index 7d3ab454..84f4f871 100644 --- a/ppd/pd.proto +++ b/ppd/pd.proto @@ -1,8 +1,11 @@ -syntax="proto3"; +syntax = "proto3"; package pikiwidb; option cc_generic_services = true; -import "store.proto"; +message Peer { + string group_id = 1; + int32 cluster_idx = 2; +}; message GetClusterInfoRequest { }; @@ -22,8 +25,8 @@ message Store { message Region { int64 region_id = 1; - optional string start_key = 2; - optional string end_key = 3; + string start_key = 2; + string end_key = 3; repeated RegionEpoch region_epoch = 4; repeated Peer peers = 5; }; @@ -39,6 +42,12 @@ enum StoreState { TOMBSTONE = 2; }; +message RegionOptions { + string start_key = 1; + string end_key = 2; + int32 max_data_size = 3; +}; + message CreateAllRegionsRequest { int64 regions_count = 1; int32 region_peers_count = 2; diff --git a/ppd/pd_service.cc b/ppd/pd_service.cc index dcffc058..6463eb3f 100644 --- a/ppd/pd_service.cc +++ b/ppd/pd_service.cc @@ -8,6 +8,7 @@ #include "pd_service.h" #include "pd_server.h" +#include "spdlog/spdlog.h" namespace pikiwidb { void PlacementDriverServiceImpl::CreateAllRegions(::google::protobuf::RpcController* controller, @@ -32,6 +33,7 @@ void PlacementDriverServiceImpl::AddStore(::google::protobuf::RpcController* con response->set_success(true); response->set_store_id(store_id); + spdlog::info("add store success: {}", store_id); } void PlacementDriverServiceImpl::RemoveStore(::google::protobuf::RpcController* controller, diff --git a/pproxy/main.cc b/pproxy/main.cc index d325e483..ff0fa518 100644 --- a/pproxy/main.cc +++ b/pproxy/main.cc @@ -5,23 +5,35 @@ * of patent rights can be found in the PATENTS file in the same directory. */ +#include "brpc/server.h" +#include "gflags/gflags.h" +#include "spdlog/spdlog.h" + #include "proxy_service.h" +DEFINE_int32(port, 8080, "Port of rpc server"); +DEFINE_int32(idle_timeout_s, 60, + "Connection will be closed if there is no " + "read/write operations during the last `idle_timeout_s`"); +DEFINE_int32(max_concurrency, 0, "Limit of request processing in parallel"); + int main(int argc, char* argv[]) { + GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true); brpc::Server server; - ProxyServiceImpl service; if (server.AddService(&service, brpc::SERVER_OWNS_SERVICE) != 0) { - fprintf(stderr, "Fail to add service!\n"); + spdlog::error("Failed to add service for: {}", berror()); return -1; } - // 启动服务 - if (server.Start(8080, nullptr) != 0) { - fprintf(stderr, "Fail to start server!\n"); + brpc::ServerOptions options; + options.idle_timeout_sec = FLAGS_idle_timeout_s; + options.max_concurrency = FLAGS_max_concurrency; + + if (server.Start(FLAGS_port, &options) != 0) { + spdlog::error("Failed to start server for: {}", berror()); return -1; } server.RunUntilAskedToQuit(); - return 0; } \ No newline at end of file diff --git a/pproxy/proxy.proto b/pproxy/proxy.proto index c899cfd8..2340d83d 100644 --- a/pproxy/proxy.proto +++ b/pproxy/proxy.proto @@ -2,7 +2,6 @@ syntax = "proto3"; package pikiwidb.proxy; option cc_generic_services = true; -// 定义请求和响应 message RunCommandRequest { string command = 1; } @@ -21,7 +20,6 @@ message GetRouteInfoResponse { repeated RouteInfo infos = 1; } -// 定义服务 service ProxyService { rpc RunCommand(RunCommandRequest) returns (RunCommandResponse); rpc GetRouteInfo(GetRouteInfoRequest) returns (GetRouteInfoResponse); diff --git a/pproxy/proxy_service.cc b/pproxy/proxy_service.cc index 24be1b01..a6917661 100644 --- a/pproxy/proxy_service.cc +++ b/pproxy/proxy_service.cc @@ -7,33 +7,56 @@ #include "proxy_service.h" +#include +#include +#include + namespace pikiwidb::proxy { void ProxyServiceImpl::RunCommand(::google::protobuf::RpcController* cntl, const pikiwidb::proxy::RunCommandRequest* request, pikiwidb::proxy::RunCommandResponse* response, ::google::protobuf::Closure* done) { - std::string command = request->command(); - std::string output = ExecuteCommand(command); + std::string command = request->command(); // 检查命令是否在白名单中 - response->set_output(output); + if (!IsCommandAllowed(command)) { + response->set_error("Command not allowed"); + done->Run(); + return; + } + std::string output = ExecuteCommand(command); + if (output.empty()) { + response->set_error("Command execution failed"); + } else { + response->set_output(output); + } done->Run(); } + void ProxyServiceImpl::GetRouteINfo(::google::protobuf::RpcController* cntl, const pikiwidb::proxy::GetRouteInfoRequest* request, pikiwidb::proxy::GetRouteInfoResponse* response, - ::google::protobuf::Closure* done) { -} + ::google::protobuf::Closure* done) {} std::string ProxyServiceImpl::ExecuteCommand(const std::string& command) { + if (!IsCommandAllowed(command)) { + return "Command not allowed"; + } + std::array buffer; std::string result; - - // 使用 popen 执行命令 std::unique_ptr pipe(popen(command.c_str(), "r"), pclose); if (!pipe) { - return "popen() failed!"; + return "Failed to execute command"; } - while (fgets(buffer.data(), buffer.size(), pipe.get()) != nullptr) { + + while (true) { + if (fgets(buffer.data(), buffer.size(), pipe.get()) == nullptr) { + if (feof(pipe.get())) { + break; + } else { + return "Error reading command output"; + } + } result += buffer.data(); } return result; diff --git a/pproxy/router.cc b/pproxy/router.cc index c8490a71..c089d033 100644 --- a/pproxy/router.cc +++ b/pproxy/router.cc @@ -7,6 +7,7 @@ #include "router.h" #include +#include Router::Router() { // maximum 100 parameters @@ -59,7 +60,7 @@ void Router::add(const std::vector &route, short handler) { Node *parent = tree; for (const std::string &node : route) { if (parent->children.find(node) == parent->children.end()) { - parent->children[node] = new Node({node, {}, handler}); + parent->children[node] = std::shared_ptr(new Node({node, {}, handler})); } parent = parent->children[node]; } diff --git a/pproxy/router.h b/pproxy/router.h index 644773b4..2788a93f 100644 --- a/pproxy/router.h +++ b/pproxy/router.h @@ -9,6 +9,7 @@ #include #include +#include #include #include #include @@ -29,11 +30,11 @@ class Router { struct Node { std::string name; - std::map children; + std::map> children; short handler; }; - Node *tree = new Node({"GET", {}, -1}); + std::shared_ptr tree = std::shared_ptr(new Node({"GET", {}, -1})); std::string compiled_tree; void add(const std::vector &route, short handler); diff --git a/pproxy/task_manager.cc b/pproxy/task_manager.cc index a21053b8..35346dd5 100644 --- a/pproxy/task_manager.cc +++ b/pproxy/task_manager.cc @@ -10,6 +10,8 @@ #include #include +#include "threadpool.h" + Task::Status Task::TextToStatus(const std::string& input) { if (input == "pending") { return Task::pending; @@ -45,47 +47,47 @@ TaskManager::TaskManager(std::shared_ptr threadpool, size_t maxWorke std::future TaskManager::stop() { auto task = std::make_shared>([this] { - std::unique_lock guard(_mutex); - bool isLast = _workerCount == 1; + std::unique_lock guard(mutex_); + bool isLast = workerCount_ == 1; // Guarantee that the task finishes last. while (!isLast) { guard.unlock(); std::this_thread::sleep_for(std::chrono::milliseconds(1)); guard.lock(); - isLast = _workerCount == 1; + isLast = workerCount_ == 1; } }); auto future = task->get_future(); // Adding a new task and expecting the future guarantees that the last batch of tasks is being executed. auto functor = [task = std::move(task)]() mutable { (*task)(); }; - std::lock_guard guard(_mutex); + std::lock_guard guard(mutex_); - _stopped = true; - _tasks.emplace(std::move(functor)); + stopped_ = true; + tasks_.emplace(std::move(functor)); this->processTasks(); return future; } void TaskManager::addTask(std::function functor) { - std::lock_guard guard(_mutex); + std::lock_guard guard(mutex_); - if (_stopped) { + if (stopped_) { return; } - _tasks.emplace(std::move(functor), Clock::now()); + tasks_.emplace(std::move(functor), Clock::now()); this->processTasks(); } void TaskManager::processTasks() { - if (_tasks.empty() || _workerCount == _maxWorkers) { + if (tasks_.empty() || workerCount_ == maxWorkers_) { return; } - auto task = std::move(_tasks.front()); - _tasks.pop(); + auto task = std::move(tasks_.front()); + tasks_.pop(); - ++_workerCount; - _threadpool->execute(std::move(task)); + ++workerCount_; + threadpool_->execute(std::move(task)); } \ No newline at end of file diff --git a/pproxy/task_manager.h b/pproxy/task_manager.h index 8d70cf5e..06208105 100644 --- a/pproxy/task_manager.h +++ b/pproxy/task_manager.h @@ -56,9 +56,9 @@ class TaskManager { auto functor = [this, task = std::move(task)]() mutable { (*task)(); { - std::lock_guard guard(_mutex); + std::lock_guard guard(mutex_); - --_workerCount; + --workerCount_; this->processTasks(); } }; @@ -72,10 +72,10 @@ class TaskManager { void processTasks(); private: - std::shared_ptr<::Threadpool> _threadpool; - std::queue _tasks; - std::mutex _mutex; - size_t _maxWorkers; - size_t _workerCount{0}; - bool _stopped{false}; + std::shared_ptr<::Threadpool> threadpool_; + std::queue tasks_; + std::mutex mutex_; + size_t maxWorkers_; + size_t workerCount_{0}; + bool stopped_{false}; }; \ No newline at end of file diff --git a/src/base_cmd.cc b/src/base_cmd.cc index e247ad08..24613c85 100644 --- a/src/base_cmd.cc +++ b/src/base_cmd.cc @@ -49,7 +49,7 @@ void BaseCmd::Execute(PClient* client) { // 2. If PRAFT is initialized and the current node is not the leader, return a redirection message for write // commands. if (HasFlag(kCmdFlagsWrite) && !praft->IsLeader()) { - return client->SetRes(CmdRes::kErrOther, fmt::format("MOVED {}", praft->GetLeaderAddress())); + return client->SetRes(CmdRes::kMoved, fmt::format("MOVED {}", praft->GetLeaderAddress())); } } diff --git a/src/cmd_admin.cc b/src/cmd_admin.cc index a63d4280..53590323 100644 --- a/src/cmd_admin.cc +++ b/src/cmd_admin.cc @@ -171,11 +171,6 @@ const std::string InfoCmd::kRaftSection = "raft"; InfoCmd::InfoCmd(const std::string& name, int16_t arity) : BaseCmd(name, arity, kCmdFlagsAdmin, kAclCategoryAdmin) {} -// bool InfoCmd::DoInitial(PClient* client) { -// praft_ = PSTORE.GetBackend(client->GetCurrentDB())->GetPRaft(); -// return true; -// } - bool InfoCmd::DoInitial(PClient* client) { size_t argc = client->argv_.size(); if (argc == 1) { @@ -183,11 +178,11 @@ bool InfoCmd::DoInitial(PClient* client) { return true; } - std::string argv_ = client->argv_[1]; + auto argv = client->argv_[1]; // convert section to lowercase - std::transform(argv_.begin(), argv_.end(), argv_.begin(), [](unsigned char c) { return std::tolower(c); }); + std::transform(argv.begin(), argv.end(), argv.begin(), [](unsigned char c) { return std::tolower(c); }); if (argc == 2) { - auto it = sectionMap.find(argv_); + auto it = sectionMap.find(argv); if (it != sectionMap.end()) { info_section_ = it->second; } else { diff --git a/src/cmd_raft.cc b/src/cmd_raft.cc index ca0c4a78..48bf2f20 100644 --- a/src/cmd_raft.cc +++ b/src/cmd_raft.cc @@ -67,9 +67,9 @@ void RaftNodeCmd::DoCmdAdd(PClient* client) { DEBUG("Received RAFT.NODE ADD cmd from {}", client->PeerIP()); auto db = PSTORE.GetDBByGroupID(group_id_); assert(db); - auto praft = db->GetPRaft(); + auto leader = db->GetPRaft(); // Check whether it is a leader. If it is not a leader, return the leader information - if (!praft->IsLeader()) { + if (!leader->IsLeader()) { client->SetRes(CmdRes::kWrongLeader, praft_->GetLeaderID()); return; } @@ -81,7 +81,7 @@ void RaftNodeCmd::DoCmdAdd(PClient* client) { // RedisRaft has nodeid, but in Braft, NodeId is IP:Port. // So we do not need to parse and use nodeid like redis; - auto s = praft->AddPeer(client->argv_[3]); + auto s = leader->AddPeer(client->argv_[3]); if (s.ok()) { client->SetRes(CmdRes::kOK); } else {