Skip to content

Commit

Permalink
fix: comment
Browse files Browse the repository at this point in the history
  • Loading branch information
longfar-ncy committed Oct 21, 2024
1 parent 5dff21e commit f1a1048
Show file tree
Hide file tree
Showing 13 changed files with 120 additions and 63 deletions.
24 changes: 19 additions & 5 deletions ppd/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,37 @@
* of patent rights can be found in the PATENTS file in the same directory.
*/

#include "brpc/server.h"
#include "butil/errno.h"
#include "gflags/gflags.h"
#include "spdlog/spdlog.h"

#include "pd_service.h"

DEFINE_int32(port, 8080, "Port of rpc server");
DEFINE_int32(idle_timeout_s, 60,
"Connection will be closed if there is no "
"read/write operations during the last `idle_timeout_s`");
DEFINE_int32(max_concurrency, 0, "Limit of request processing in parallel");

int main(int argc, char* argv[]) {
GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);
brpc::Server server;

PlacementDriverServiceImpl service;
if (server.AddService(&service, brpc::SERVER_OWNS_SERVICE) != 0) {
fprintf(stderr, "Fail to add service!\n");
spdlog::error("Failed to add service for: {}", berror());
return -1;
}

brpc::ServerOptions options;
options.idle_timeout_sec = FLAGS_idle_timeout_s;
options.max_concurrency = FLAGS_max_concurrency;

// 启动服务
if (server.Start(8080, nullptr) != 0) {
fprintf(stderr, "Fail to start server!\n");
if (server.Start(FLAGS_port, &options) != 0) {
spdlog::error("Failed to start server for: {}", berror());
return -1;
}

server.RunUntilAskedToQuit();
return 0;
}
17 changes: 13 additions & 4 deletions ppd/pd.proto
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
syntax="proto3";
syntax = "proto3";
package pikiwidb;
option cc_generic_services = true;

import "store.proto";
message Peer {
string group_id = 1;
int32 cluster_idx = 2;
};

message GetClusterInfoRequest {
};
Expand All @@ -22,8 +25,8 @@ message Store {

message Region {
int64 region_id = 1;
optional string start_key = 2;
optional string end_key = 3;
string start_key = 2;
string end_key = 3;
repeated RegionEpoch region_epoch = 4;
repeated Peer peers = 5;
};
Expand All @@ -39,6 +42,12 @@ enum StoreState {
TOMBSTONE = 2;
};

message RegionOptions {
string start_key = 1;
string end_key = 2;
int32 max_data_size = 3;
};

message CreateAllRegionsRequest {
int64 regions_count = 1;
int32 region_peers_count = 2;
Expand Down
2 changes: 2 additions & 0 deletions ppd/pd_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "pd_service.h"

#include "pd_server.h"
#include "spdlog/spdlog.h"

namespace pikiwidb {
void PlacementDriverServiceImpl::CreateAllRegions(::google::protobuf::RpcController* controller,
Expand All @@ -32,6 +33,7 @@ void PlacementDriverServiceImpl::AddStore(::google::protobuf::RpcController* con

response->set_success(true);
response->set_store_id(store_id);
spdlog::info("add store success: {}", store_id);
}

void PlacementDriverServiceImpl::RemoveStore(::google::protobuf::RpcController* controller,
Expand Down
24 changes: 18 additions & 6 deletions pproxy/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,35 @@
* of patent rights can be found in the PATENTS file in the same directory.
*/

#include "brpc/server.h"
#include "gflags/gflags.h"
#include "spdlog/spdlog.h"

#include "proxy_service.h"

DEFINE_int32(port, 8080, "Port of rpc server");
DEFINE_int32(idle_timeout_s, 60,
"Connection will be closed if there is no "
"read/write operations during the last `idle_timeout_s`");
DEFINE_int32(max_concurrency, 0, "Limit of request processing in parallel");

int main(int argc, char* argv[]) {
GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);
brpc::Server server;

ProxyServiceImpl service;
if (server.AddService(&service, brpc::SERVER_OWNS_SERVICE) != 0) {
fprintf(stderr, "Fail to add service!\n");
spdlog::error("Failed to add service for: {}", berror());
return -1;
}

// 启动服务
if (server.Start(8080, nullptr) != 0) {
fprintf(stderr, "Fail to start server!\n");
brpc::ServerOptions options;
options.idle_timeout_sec = FLAGS_idle_timeout_s;
options.max_concurrency = FLAGS_max_concurrency;

if (server.Start(FLAGS_port, &options) != 0) {
spdlog::error("Failed to start server for: {}", berror());
return -1;
}

server.RunUntilAskedToQuit();
return 0;
}
2 changes: 0 additions & 2 deletions pproxy/proxy.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ syntax = "proto3";
package pikiwidb.proxy;
option cc_generic_services = true;

// 定义请求和响应
message RunCommandRequest {
string command = 1;
}
Expand All @@ -21,7 +20,6 @@ message GetRouteInfoResponse {
repeated RouteInfo infos = 1;
}

// 定义服务
service ProxyService {
rpc RunCommand(RunCommandRequest) returns (RunCommandResponse);
rpc GetRouteInfo(GetRouteInfoRequest) returns (GetRouteInfoResponse);
Expand Down
41 changes: 32 additions & 9 deletions pproxy/proxy_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,33 +7,56 @@

#include "proxy_service.h"

#include <array>
#include <memory>
#include <string>

namespace pikiwidb::proxy {
void ProxyServiceImpl::RunCommand(::google::protobuf::RpcController* cntl,
const pikiwidb::proxy::RunCommandRequest* request,
pikiwidb::proxy::RunCommandResponse* response, ::google::protobuf::Closure* done) {
std::string command = request->command();
std::string output = ExecuteCommand(command);
std::string command = request->command(); // 检查命令是否在白名单中

response->set_output(output);
if (!IsCommandAllowed(command)) {
response->set_error("Command not allowed");
done->Run();
return;
}

std::string output = ExecuteCommand(command);
if (output.empty()) {
response->set_error("Command execution failed");
} else {
response->set_output(output);
}
done->Run();
}

void ProxyServiceImpl::GetRouteINfo(::google::protobuf::RpcController* cntl,
const pikiwidb::proxy::GetRouteInfoRequest* request,
pikiwidb::proxy::GetRouteInfoResponse* response,
::google::protobuf::Closure* done) {
}
::google::protobuf::Closure* done) {}

std::string ProxyServiceImpl::ExecuteCommand(const std::string& command) {
if (!IsCommandAllowed(command)) {
return "Command not allowed";
}

std::array<char, 128> buffer;
std::string result;

// 使用 popen 执行命令
std::unique_ptr<FILE, decltype(&pclose)> pipe(popen(command.c_str(), "r"), pclose);
if (!pipe) {
return "popen() failed!";
return "Failed to execute command";
}
while (fgets(buffer.data(), buffer.size(), pipe.get()) != nullptr) {

while (true) {
if (fgets(buffer.data(), buffer.size(), pipe.get()) == nullptr) {
if (feof(pipe.get())) {
break;
} else {
return "Error reading command output";
}
}
result += buffer.data();
}
return result;
Expand Down
3 changes: 2 additions & 1 deletion pproxy/router.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

#include "router.h"
#include <cstring>
#include <memory>

Router::Router() {
// maximum 100 parameters
Expand Down Expand Up @@ -59,7 +60,7 @@ void Router::add(const std::vector<std::string> &route, short handler) {
Node *parent = tree;
for (const std::string &node : route) {
if (parent->children.find(node) == parent->children.end()) {
parent->children[node] = new Node({node, {}, handler});
parent->children[node] = std::shared_ptr<Node>(new Node({node, {}, handler}));
}
parent = parent->children[node];
}
Expand Down
5 changes: 3 additions & 2 deletions pproxy/router.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

#include <functional>
#include <map>
#include <memory>
#include <string>
#include <string_view>
#include <vector>
Expand All @@ -29,11 +30,11 @@ class Router {

struct Node {
std::string name;
std::map<std::string, Node *> children;
std::map<std::string, std::shared_ptr<Node>> children;
short handler;
};

Node *tree = new Node({"GET", {}, -1});
std::shared_ptr<Node> tree = std::shared_ptr<Node>(new Node({"GET", {}, -1}));
std::string compiled_tree;

void add(const std::vector<std::string> &route, short handler);
Expand Down
30 changes: 16 additions & 14 deletions pproxy/task_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
#include <algorithm>
#include <sstream>

#include "threadpool.h"

Task::Status Task::TextToStatus(const std::string& input) {
if (input == "pending") {
return Task::pending;
Expand Down Expand Up @@ -45,47 +47,47 @@ TaskManager::TaskManager(std::shared_ptr<Threadpool> threadpool, size_t maxWorke

std::future<void> TaskManager::stop() {
auto task = std::make_shared<std::packaged_task<void()>>([this] {
std::unique_lock<std::mutex> guard(_mutex);
bool isLast = _workerCount == 1;
std::unique_lock<std::mutex> guard(mutex_);
bool isLast = workerCount_ == 1;

// Guarantee that the task finishes last.
while (!isLast) {
guard.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(1));
guard.lock();
isLast = _workerCount == 1;
isLast = workerCount_ == 1;
}
});
auto future = task->get_future();

// Adding a new task and expecting the future guarantees that the last batch of tasks is being executed.
auto functor = [task = std::move(task)]() mutable { (*task)(); };
std::lock_guard<std::mutex> guard(_mutex);
std::lock_guard<std::mutex> guard(mutex_);

_stopped = true;
_tasks.emplace(std::move(functor));
stopped_ = true;
tasks_.emplace(std::move(functor));
this->processTasks();

return future;
}

void TaskManager::addTask(std::function<void()> functor) {
std::lock_guard<std::mutex> guard(_mutex);
std::lock_guard<std::mutex> guard(mutex_);

if (_stopped) {
if (stopped_) {
return;
}
_tasks.emplace(std::move(functor), Clock::now());
tasks_.emplace(std::move(functor), Clock::now());
this->processTasks();
}

void TaskManager::processTasks() {
if (_tasks.empty() || _workerCount == _maxWorkers) {
if (tasks_.empty() || workerCount_ == maxWorkers_) {
return;
}
auto task = std::move(_tasks.front());
_tasks.pop();
auto task = std::move(tasks_.front());
tasks_.pop();

++_workerCount;
_threadpool->execute(std::move(task));
++workerCount_;
threadpool_->execute(std::move(task));
}
16 changes: 8 additions & 8 deletions pproxy/task_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ class TaskManager {
auto functor = [this, task = std::move(task)]() mutable {
(*task)();
{
std::lock_guard<std::mutex> guard(_mutex);
std::lock_guard<std::mutex> guard(mutex_);

--_workerCount;
--workerCount_;
this->processTasks();
}
};
Expand All @@ -72,10 +72,10 @@ class TaskManager {
void processTasks();

private:
std::shared_ptr<::Threadpool> _threadpool;
std::queue<Task> _tasks;
std::mutex _mutex;
size_t _maxWorkers;
size_t _workerCount{0};
bool _stopped{false};
std::shared_ptr<::Threadpool> threadpool_;
std::queue<Task> tasks_;
std::mutex mutex_;
size_t maxWorkers_;
size_t workerCount_{0};
bool stopped_{false};
};
2 changes: 1 addition & 1 deletion src/base_cmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ void BaseCmd::Execute(PClient* client) {
// 2. If PRAFT is initialized and the current node is not the leader, return a redirection message for write
// commands.
if (HasFlag(kCmdFlagsWrite) && !praft->IsLeader()) {
return client->SetRes(CmdRes::kErrOther, fmt::format("MOVED {}", praft->GetLeaderAddress()));
return client->SetRes(CmdRes::kMoved, fmt::format("MOVED {}", praft->GetLeaderAddress()));
}
}

Expand Down
11 changes: 3 additions & 8 deletions src/cmd_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -171,23 +171,18 @@ const std::string InfoCmd::kRaftSection = "raft";

InfoCmd::InfoCmd(const std::string& name, int16_t arity) : BaseCmd(name, arity, kCmdFlagsAdmin, kAclCategoryAdmin) {}

// bool InfoCmd::DoInitial(PClient* client) {
// praft_ = PSTORE.GetBackend(client->GetCurrentDB())->GetPRaft();
// return true;
// }

bool InfoCmd::DoInitial(PClient* client) {
size_t argc = client->argv_.size();
if (argc == 1) {
info_section_ = kInfo;
return true;
}

std::string argv_ = client->argv_[1];
auto argv = client->argv_[1];
// convert section to lowercase
std::transform(argv_.begin(), argv_.end(), argv_.begin(), [](unsigned char c) { return std::tolower(c); });
std::transform(argv.begin(), argv.end(), argv.begin(), [](unsigned char c) { return std::tolower(c); });
if (argc == 2) {
auto it = sectionMap.find(argv_);
auto it = sectionMap.find(argv);
if (it != sectionMap.end()) {
info_section_ = it->second;
} else {
Expand Down
Loading

0 comments on commit f1a1048

Please sign in to comment.