Skip to content

Commit

Permalink
[control-plane] [sdk] Finalize features of sticky sessions
Browse files Browse the repository at this point in the history
  • Loading branch information
mcopik committed Oct 15, 2024
1 parent ddbbef2 commit 4164db3
Show file tree
Hide file tree
Showing 12 changed files with 189 additions and 43 deletions.
2 changes: 2 additions & 0 deletions common/include/praas/common/http.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ namespace praas::common::http {

static response_ptr_t failed_response(const std::string& reason);

std::shared_ptr<drogon::HttpClient>& handle();

private:
void request(request_ptr_t& req, parameters_t&& params, callback_t&& callback);

Expand Down
5 changes: 5 additions & 0 deletions common/src/http.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ namespace praas::common::http {
return resp;
}

std::shared_ptr<drogon::HttpClient>& HTTPClient::handle()
{
return _http_client;
}

void HTTPClientFactory::initialize(int thread_num)
{
HTTPClientFactory::_pool = std::make_unique<trantor::EventLoopThreadPool>(thread_num);
Expand Down
6 changes: 5 additions & 1 deletion control-plane/include/praas/control-plane/application.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,9 @@ namespace praas::control_plane {
backend::Backend& backend, tcpserver::TCPServer& poller, process::Resources&& resources
);

std::optional<std::tuple<process::Process::write_lock_t, process::Process*>>
get_controlplane_process(const std::string& process_name);

////////////////////////////////////////////////////////////////////////////////
/// @brief Obtain a process that can be used for FaaS invocations.
/// Finds the first process with a spare capacity. Otherwise, it allocates one.
Expand Down Expand Up @@ -236,7 +239,8 @@ namespace praas::control_plane {
std::unordered_map<std::string, process::ProcessPtr> _swapped_processes;

lock_t _controlplane_mutex;
std::vector<process::ProcessPtr> _controlplane_processes;
std::unordered_map<std::string, process::ProcessPtr> _controlplane_processes;
int _controlplane_counter = 0;
};

} // namespace praas::control_plane
Expand Down
3 changes: 2 additions & 1 deletion control-plane/include/praas/control-plane/worker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ namespace praas::control_plane::worker {

void handle_invocation(
HttpServer::request_t request, HttpServer::callback_t&& callback, const std::string& app_id,
std::string function_name, std::chrono::high_resolution_clock::time_point start
std::string function_name, std::chrono::high_resolution_clock::time_point start,
std::optional<std::string> process_name
);

////////////////////////////////////////////////////////////////////////////////
Expand Down
19 changes: 16 additions & 3 deletions control-plane/src/application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,19 @@ namespace praas::control_plane {
throw praas::common::ObjectDoesNotExist{name};
}
}

std::optional<std::tuple<process::Process::write_lock_t, process::Process*>>
Application::get_controlplane_process(const std::string& process_name)
{
read_lock_t lock(_controlplane_mutex);

auto iter = _controlplane_processes.find(process_name);
if(iter != _controlplane_processes.end()) {
return std::make_tuple(iter->second->write_lock(), iter->second.get());
}

return std::nullopt;
}

void Application::get_controlplane_process(
backend::Backend& backend, tcpserver::TCPServer& poller, process::Resources&& resources,
Expand All @@ -130,7 +143,7 @@ namespace praas::control_plane {
// FIXME: this needs to be a parameter - store it in the process
int max_funcs_per_process = 1;

for (auto& proc : _controlplane_processes) {
for (auto& [name, proc] : _controlplane_processes) {

int active_funcs = proc->active_invocations();
if (active_funcs < max_funcs_per_process) {
Expand All @@ -142,7 +155,7 @@ namespace praas::control_plane {
}

// No process? create
std::string name = fmt::format("controlplane-{}", _controlplane_processes.size());
std::string name = fmt::format("controlplane-{}", _controlplane_counter++);
process::ProcessPtr process =
std::make_shared<process::Process>(name, this, std::move(resources));
process->set_creation_callback(std::move(callback), true);
Expand All @@ -168,7 +181,7 @@ namespace praas::control_plane {

{
write_lock_t lock(_controlplane_mutex);
_controlplane_processes.emplace_back(process);
_controlplane_processes[name] = process;
}
} else {

Expand Down
4 changes: 3 additions & 1 deletion control-plane/src/http.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,11 +171,13 @@ namespace praas::control_plane {
const std::string& function_name
)
{
std::string pid = request->getParameter("process_name");

_logger->info("Push new invocation request of {}", function_name);
auto start = std::chrono::high_resolution_clock::now();
_workers.add_task(
&worker::Workers::handle_invocation, request, std::move(callback), app_name, function_name,
start
start, !pid.empty() ? std::make_optional<std::string>(pid) : std::nullopt
);
}

Expand Down
1 change: 1 addition & 0 deletions control-plane/src/process.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ namespace praas::control_plane::process {
spdlog::error("Responding to client of invocation {}", invocation_id);
Json::Value json;
json["function"] = (*iter).function_name;
json["process_name"] = _name;
json["invocation_id"] = invocation_id;
json["return_code"] = return_code;
json["result"] = std::string{buf, len};
Expand Down
18 changes: 16 additions & 2 deletions control-plane/src/worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ namespace praas::control_plane::worker {

void Workers::handle_invocation(
HttpServer::request_t request, HttpServer::callback_t&& callback, const std::string& app_id,
std::string function_name, std::chrono::high_resolution_clock::time_point start
std::string function_name, std::chrono::high_resolution_clock::time_point start,
std::optional<std::string> process_name
)
{
Resources::RWAccessor acc;
Expand All @@ -34,7 +35,20 @@ namespace praas::control_plane::worker {

common::util::assert_true(_server != nullptr);

{
// Choose sticky process
if(process_name.has_value()) {
auto val = acc.get()->get_controlplane_process(process_name.value());

if(val.has_value()) {
spdlog::debug("[Workers] Schedule new invocation on proc {}", process_name.value());
std::get<1>(val.value())->add_invocation(
std::move(request), std::move(callback), function_name, start
);
} else {
callback(HttpServer::failed_response("Process unknown!", drogon::k404NotFound));
}
} else {

// Get a process or allocate one.
// FIXME: make resources configurable
acc.get()->get_controlplane_process(
Expand Down
2 changes: 2 additions & 0 deletions sdk/include/praas/sdk/invocation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ namespace praas::sdk {

struct ControlPlaneInvocationResult {

std::string process_name;

std::string invocation_id;

int return_code;
Expand Down
20 changes: 16 additions & 4 deletions sdk/include/praas/sdk/praas.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@
#include <praas/sdk/invocation.hpp>
#include <praas/sdk/process.hpp>

#include <condition_variable>
#include <drogon/HttpClient.h>
#include <trantor/net/EventLoopThread.h>
#include "praas/common/http.hpp"

namespace praas::sdk {

struct PraaS {

PraaS(const std::string& control_plane_addr);
PraaS(const std::string& control_plane_addr, int thread_num = 8);

void disconnect();

Expand All @@ -28,7 +30,14 @@ namespace praas::sdk {

ControlPlaneInvocationResult invoke(
const std::string& app_name, const std::string& function_name,
const std::string& invocation_data
const std::string& invocation_data,
std::optional<std::string> process_name = std::nullopt
);

std::future<ControlPlaneInvocationResult> invoke_async(
const std::string& app_name, const std::string& function_name,
const std::string& invocation_data,
std::optional<std::string> process_name = std::nullopt
);

std::tuple<bool, std::string> swap_process(const Process& process);
Expand All @@ -46,9 +55,12 @@ namespace praas::sdk {
private:
std::string _last_error;

trantor::EventLoopThread _loop;
std::mutex _clients_mutex;
std::condition_variable _cv;
std::queue<common::http::HTTPClient> _clients;

drogon::HttpClientPtr _http_client;
praas::common::http::HTTPClient _get_client();
void _return_client(praas::common::http::HTTPClient& client);
};

} // namespace praas::sdk
Expand Down
Loading

0 comments on commit 4164db3

Please sign in to comment.