Skip to content

Commit

Permalink
[control-plane] [process] Improvements to downscaling
Browse files Browse the repository at this point in the history
  • Loading branch information
mcopik committed Oct 16, 2024
1 parent aae47fe commit 8720d8f
Show file tree
Hide file tree
Showing 13 changed files with 166 additions and 48 deletions.
13 changes: 0 additions & 13 deletions common/tests/unit/messages.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -279,19 +279,6 @@ TEST(Messages, DataPlaneMetricsMsg)
}
}

TEST(Messages, DataPlaneMetricsMsgIncorrect)
{
{
DataPlaneMetricsData req;
EXPECT_THROW(req.invocations(-5), praas::common::InvalidArgument);
}

{
DataPlaneMetricsData req;
EXPECT_THROW(req.computation_time(-5), praas::common::InvalidArgument);
}
}

TEST(Messages, DataPlaneMetricsMsgParse)
{
int32_t invocations{0};
Expand Down
7 changes: 7 additions & 0 deletions control-plane/include/praas/control-plane/application.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,12 @@ namespace praas::control_plane {
std::function<void(size_t, double, const std::optional<std::string>&)>&& callback
);

// FIXME: this overload is provided to support workers that only accept pass-by-copy (no refs)
void swapin_process(
std::string process_name, backend::Backend* backend, tcpserver::TCPServer* poller,
std::function<void(process::ProcessPtr, const std::optional<std::string>&)>&& callback
);

void swapin_process(
std::string process_name, backend::Backend& backend, tcpserver::TCPServer& poller,
std::function<void(process::ProcessPtr, const std::optional<std::string>&)>&& callback
Expand Down Expand Up @@ -252,6 +258,7 @@ namespace praas::control_plane {

lock_t _controlplane_mutex;
std::unordered_map<std::string, process::ProcessPtr> _controlplane_processes;
//std::vector<process::ProcessPtr> _controlplane_processes;
int _controlplane_counter = 0;
};

Expand Down
11 changes: 11 additions & 0 deletions control-plane/include/praas/control-plane/process.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,16 @@ namespace praas::control_plane::process {
return *this;
}

void set_controlplane_process()
{
_is_controlplane_proc = true;
}

bool is_controlplane_process() const
{
return _is_controlplane_proc;
}

const std::string& name() const;

Status status() const;
Expand Down Expand Up @@ -210,6 +220,7 @@ namespace praas::control_plane::process {
void _send_invocation(Invocation&);

std::string _name;
bool _is_controlplane_proc = false;

common::UUID _uuid_generator;

Expand Down
8 changes: 2 additions & 6 deletions control-plane/include/praas/control-plane/server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,9 @@ namespace praas::control_plane {
_instance.reset(new Server{cfg});
}

static Server& instance()
static Server* instance()
{
if(!_instance) {
abort();
}

return *_instance;
return _instance.get();
}

private:
Expand Down
104 changes: 85 additions & 19 deletions control-plane/src/application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,11 @@ namespace praas::control_plane {
if (instance != nullptr) {
process->set_handle(std::move(instance));

Server::instance().downscaler().register_process(process);
// FIXME: temporary fix to make tests running - dependency on static func
auto *ptr = Server::instance();
if(ptr != nullptr) {
ptr->downscaler().register_process(process);
}

// Avoid a race condition.
// Created callback can be called by process connection, or by
Expand Down Expand Up @@ -128,6 +132,10 @@ namespace praas::control_plane {
{
read_lock_t lock(_controlplane_mutex);

//auto iter = _active_processes.find(process_name);
//if(iter != _active_processes.end()) {
// return std::make_tuple(iter->second->write_lock(), iter->second.get());
//}
auto iter = _controlplane_processes.find(process_name);
if(iter != _controlplane_processes.end()) {
return std::make_tuple(iter->second->write_lock(), iter->second.get());
Expand Down Expand Up @@ -179,7 +187,11 @@ namespace praas::control_plane {
spdlog::info("Allocated process {}", name);
process->set_handle(std::move(instance));

Server::instance().downscaler().register_process(process);
// FIXME: temporary fix to make tests running - dependency on static func
auto *ptr = Server::instance();
if(ptr != nullptr) {
ptr->downscaler().register_process(process);
}

// Avoid a race condition.
// Created callback can be called by process connection, or by
Expand All @@ -191,6 +203,10 @@ namespace praas::control_plane {
write_lock_t lock(_controlplane_mutex);
_controlplane_processes[name] = process;
}
{
write_lock_t lock(_active_mutex);
_active_processes[name] = process;
}
} else {

spdlog::error("Failed to allocate process!");
Expand Down Expand Up @@ -228,6 +244,7 @@ namespace praas::control_plane {
std::function<void(size_t, double, const std::optional<std::string>&)>&& callback
)
{
// FIXME: replace exceptions with collbacks.
if (process_name.length() == 0) {
throw praas::common::InvalidConfigurationError("Application name cannot be empty");
}
Expand Down Expand Up @@ -259,7 +276,11 @@ namespace praas::control_plane {
proc.set_status(process::Status::SWAPPING_OUT);

// Clear from down scaler early, don't wait for confirmation. Avoids double swap calls.
Server::instance().downscaler().remove_process((*iter).second);
// FIXME: temporary fix to make tests running - dependency on static func
auto *ptr = Server::instance();
if(ptr != nullptr) {
ptr->downscaler().remove_process((*iter).second);
}

// Swap the process
proc.state().swap = deployment.get_location(_name);
Expand All @@ -268,6 +289,14 @@ namespace praas::control_plane {

}

void Application::swapin_process(
std::string process_name, backend::Backend* backend, tcpserver::TCPServer* poller,
std::function<void(process::ProcessPtr, const std::optional<std::string>&)>&& callback
)
{
swapin_process(process_name, *backend, *poller, std::move(callback));
}

void Application::swapin_process(
std::string process_name, backend::Backend& backend, tcpserver::TCPServer& poller,
std::function<void(process::ProcessPtr, const std::optional<std::string>&)>&& callback
Expand Down Expand Up @@ -310,7 +339,11 @@ namespace praas::control_plane {

proc_ptr->set_handle(std::move(instance));

Server::instance().downscaler().register_process(proc_ptr);
// FIXME: temporary fix to make tests running - dependency on static func
auto *ptr = Server::instance();
if(ptr != nullptr) {
ptr->downscaler().register_process(proc_ptr);
}

// As in createrd process, we avoid a race condition.
// Created callback can be called by process connection, or by
Expand Down Expand Up @@ -372,33 +405,58 @@ namespace praas::control_plane {
spdlog::error("Failure! Closing not-swapped process {}, state {}", _name, ptr->status());

ptr->set_status(process::Status::FAILURE);
Server::instance().downscaler().remove_process(ptr);
// FIXME: temporary fix to make tests running - dependency on static func
auto *server_ptr = Server::instance();
if(server_ptr != nullptr) {
server_ptr->downscaler().remove_process(ptr);
}

// Modify internal collections
write_lock_t application_lock(_active_mutex);
bool found = false;
{
write_lock_t application_lock(_active_mutex);

auto iter = _active_processes.find(ptr->name());
if (iter != _active_processes.end()) {
auto iter = _active_processes.find(ptr->name());
if (iter != _active_processes.end()) {

// If process failed during swapping, then notify client.
(*iter).second->swapped_callback(0, 0, "Process closed unexpectedly during swapping");
// If process failed during swapping, then notify client.
(*iter).second->swapped_callback(0, 0, "Process closed unexpectedly during swapping");

_active_processes.erase(iter);
if((*iter).second->is_controlplane_process()) {

} else {
application_lock.unlock();
_active_processes.erase(iter);

write_lock_t control_plane_lock(_controlplane_mutex);
auto iter = _controlplane_processes.find(ptr->name());
if (iter != _controlplane_processes.end()) {
_controlplane_processes.erase(iter);
} else {
spdlog::error("Control plane process {} not present in the collection!", ptr->name());
}

} else {
_active_processes.erase(iter);
}

found = true;
}
}

if(!found) {
auto iter = _swapped_processes.find(ptr->name());
if (iter != _swapped_processes.end()) {
_swapped_processes.erase(iter);
} else {

auto iter = _controlplane_processes.find(ptr->name());
if (iter != _controlplane_processes.end()) {
_controlplane_processes.erase(iter);
} else {
spdlog::error("Unknown process {}", ptr->name());
}
//auto iter = _controlplane_processes.find(ptr->name());
//if (iter != _controlplane_processes.end()) {
// _controlplane_processes.erase(iter);
//} else {
// spdlog::error("Unknown process {}", ptr->name());
//}

spdlog::error("Unknown process {}", ptr->name());
}
}

Expand Down Expand Up @@ -436,7 +494,15 @@ namespace praas::control_plane {
_active_processes.erase(iter);
}

Server::instance().downscaler().remove_process((*iter).second);
if((*iter).second->is_controlplane_process()) {
_controlplane_processes.erase((*iter).second->name());
}

// FIXME: temporary fix to make tests running - dependency on static func
auto *server_ptr = Server::instance();
if(server_ptr != nullptr) {
server_ptr->downscaler().remove_process((*iter).second);
}
}

callback(msg);
Expand Down
6 changes: 3 additions & 3 deletions control-plane/src/cli.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

void signal_handler(int /*unused*/)
{
praas::control_plane::Server::instance().shutdown();
praas::control_plane::Server::instance()->shutdown();
}

int main(int argc, char** argv)
Expand All @@ -40,9 +40,9 @@ int main(int argc, char** argv)
sigaction(SIGINT, &sigIntHandler, nullptr);

praas::control_plane::Server::configure(cfg);
praas::control_plane::Server::instance().run();
praas::control_plane::Server::instance()->run();

praas::control_plane::Server::instance().wait();
praas::control_plane::Server::instance()->wait();

//server.shutdown();

Expand Down
1 change: 1 addition & 0 deletions control-plane/src/downscaler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ namespace praas::control_plane::downscaler {
deployment::Deployment* deployment,
std::function<void(size_t, double, const std::optional<std::string>&)>&& callback
);
spdlog::info("[Downscaler] Schedule swap of process {}", proc_name);
_workers.add_other_task(
(ptr_t)&Application::swap_process,
&stats.proc->application(),
Expand Down
48 changes: 44 additions & 4 deletions control-plane/src/worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,49 @@ namespace praas::control_plane::worker {
auto val = acc.get()->get_controlplane_process(process_name.value());

if(val.has_value()) {
spdlog::debug("[Workers] Schedule new invocation on proc {}", process_name.value());
std::get<1>(val.value())->add_invocation(
std::move(request), std::move(callback), function_name, start
);


auto proc_ptr = std::get<1>(val.value());

spdlog::debug("Requesitng new invocation on process, status: {}", proc_ptr->status());

if(proc_ptr->status() == process::Status::ALLOCATED) {

spdlog::debug("[Workers] Schedule new invocation on proc {}", process_name.value());
std::get<1>(val.value())->add_invocation(
std::move(request), std::move(callback), function_name, start
);

} else if(proc_ptr->status() == process::Status::SWAPPED_OUT) {

std::get<0>(val.value()).unlock();

// Schedul swap in!
using ptr_t = void (Application::*)(
std::string process_name,
backend::Backend*, tcpserver::TCPServer*,
std::function<void(process::ProcessPtr, const std::optional<std::string>&)>&&
);
add_other_task(
(ptr_t) &Application::swapin_process,
acc.get(),
process_name.value(), &_backend, _server,
[
request=std::move(request),
callback=std::move(callback),
function_name,
start
](process::ProcessPtr ptr, const std::optional<std::string> &) mutable {
ptr->add_invocation(
std::move(request), std::move(callback), function_name, start
);
}
);

} else {
callback(HttpServer::failed_response("Invalid process state!", drogon::k500InternalServerError));
}

} else {
callback(HttpServer::failed_response("Process unknown!", drogon::k404NotFound));
}
Expand All @@ -58,6 +97,7 @@ namespace praas::control_plane::worker {
) mutable {
if (proc_ptr) {
proc_ptr->write_lock();
proc_ptr->set_controlplane_process();
proc_ptr->add_invocation(
std::move(request), std::move(callback), function_name, start
);
Expand Down
1 change: 1 addition & 0 deletions control-plane/tests/integration/http.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -277,4 +277,5 @@ TEST_F(HttpTCPIntegration, Invoke)
_loop.wait();

http_server->shutdown();
http_server->wait();
}
1 change: 1 addition & 0 deletions control-plane/tests/unit/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ TEST(Config, DownScalerConfig)
"ip-address": "127.0.0.1",
"http-client-io-threads": 1,
"downscaler": {
"enabled": false,
"polling_interval": 30,
"swapping_threshold": 60
}
Expand Down
Loading

0 comments on commit 8720d8f

Please sign in to comment.