Skip to content

Commit

Permalink
[control-plane] [serving] [processs] [sdk] Consolidate S3 implementat…
Browse files Browse the repository at this point in the history
…ion for swapping
  • Loading branch information
mcopik committed Oct 15, 2024
1 parent 3974ecd commit 38f2494
Show file tree
Hide file tree
Showing 21 changed files with 576 additions and 80 deletions.
18 changes: 0 additions & 18 deletions control-plane/include/praas/control-plane/aws.hpp

This file was deleted.

26 changes: 18 additions & 8 deletions control-plane/include/praas/control-plane/backend.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#define PRAAS_CONTROLL_PLANE_BACKEND_HPP

#include <praas/common/http.hpp>
#include <praas/control-plane/deployment.hpp>
#include <praas/control-plane/process.hpp>

#if defined(WITH_FARGATE_BACKEND)
Expand Down Expand Up @@ -74,7 +75,10 @@ namespace praas::control_plane::backend {

struct Backend {

Backend() = default;
Backend(deployment::Deployment& deployment):
_deployment(deployment)
{}

Backend(const Backend&) = default;
Backend(Backend&&) = delete;
Backend& operator=(const Backend&) = default;
Expand All @@ -89,9 +93,9 @@ namespace praas::control_plane::backend {
* @param resources [TODO:description]
*/
virtual void allocate_process(
process::ProcessPtr, const process::Resources& resources,
std::function<void(std::shared_ptr<ProcessInstance>&&, std::optional<std::string>)>&&
callback
process::ProcessPtr, const process::Resources& resources,
std::function<void(std::shared_ptr<ProcessInstance>&&, std::optional<std::string>)>&&
callback
) = 0;

virtual void shutdown(
Expand Down Expand Up @@ -122,13 +126,20 @@ namespace praas::control_plane::backend {
* @param {name} initialized backend instance, where instance type is one of Backend's
* childrens.
*/
static std::unique_ptr<Backend> construct(const config::Config&);
static std::unique_ptr<Backend> construct(const config::Config&, deployment::Deployment&);

void configure_tcpserver(const std::string& ip, int port);

const deployment::Deployment& deployment() const
{
return _deployment;
}

protected:
std::string _tcp_ip;
int _tcp_port;

deployment::Deployment& _deployment;
};

struct DockerBackend : Backend {
Expand All @@ -149,8 +160,7 @@ namespace praas::control_plane::backend {
std::string container_id;
};

DockerBackend(const config::BackendDocker& cfg);

DockerBackend(const config::BackendDocker& cfg, deployment::Deployment& deployment);
~DockerBackend() override;

void allocate_process(
Expand Down Expand Up @@ -217,7 +227,7 @@ namespace praas::control_plane::backend {
std::shared_ptr<Aws::EC2::EC2Client> _ec2_client;
};

FargateBackend(const config::BackendFargate& cfg);
FargateBackend(const config::BackendFargate& cfg, deployment::Deployment& deployment);

~FargateBackend() override;

Expand Down
16 changes: 15 additions & 1 deletion control-plane/include/praas/control-plane/config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,20 @@ namespace praas::control_plane::config {
void set_defaults();
};

struct Deployment {};
struct Deployment {
virtual ~Deployment() = default;
};

struct LocalDeployment : Deployment {
void load(cereal::JSONInputArchive& archive) {}
};

struct AWSDeployment : Deployment {
std::string s3_bucket;

void load(cereal::JSONInputArchive& archive);
void set_defaults();
};

struct Config {

Expand All @@ -103,6 +116,7 @@ namespace praas::control_plane::config {
TCPServer tcpserver;

deployment::Type deployment_type;
std::unique_ptr<Deployment> deployment;

backend::Type backend_type;
std::unique_ptr<Backend> backend;
Expand Down
31 changes: 26 additions & 5 deletions control-plane/include/praas/control-plane/deployment.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,6 @@

#include <praas/control-plane/state.hpp>

#if defined(WITH_AWS_DEPLOYMENT)
#include <praas/control-plane/aws.hpp>
#endif

#include <filesystem>

namespace praas::control_plane::config {
Expand All @@ -31,7 +27,17 @@ namespace praas::control_plane::state {
};

#if defined(WITH_AWS_DEPLOYMENT)
class AWSS3SwapLocation : SwapLocation {};
struct AWSS3SwapLocation : SwapLocation {
std::string app_name;

AWSS3SwapLocation(std::string app_name):
app_name(std::move(app_name))
{}

std::string_view root_path() const override;
std::string path(std::string process_name) const override;
};

class RedisSwapLocation : SwapLocation {};
#endif

Expand Down Expand Up @@ -72,6 +78,21 @@ namespace praas::control_plane::deployment {
std::filesystem::path _path;
};

#if defined(WITH_AWS_DEPLOYMENT)
class AWS : public Deployment {
public:
std::string s3_bucket;

AWS(std::string s3_bucket):
s3_bucket(s3_bucket)
{}

std::unique_ptr<state::SwapLocation> get_location(std::string app_name) override;

void delete_swap(const state::SwapLocation& /*unused*/) override;
};
#endif

} // namespace praas::control_plane::deployment

#endif
4 changes: 2 additions & 2 deletions control-plane/include/praas/control-plane/server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ namespace praas::control_plane {

Resources _resources;

std::unique_ptr<backend::Backend> _backend;

std::unique_ptr<deployment::Deployment> _deployment;

std::unique_ptr<backend::Backend> _backend;

worker::Workers _workers;

tcpserver::TCPServer _tcp_server;
Expand Down
5 changes: 5 additions & 0 deletions control-plane/src/application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,12 @@ namespace praas::control_plane {

auto iter = _active_processes.find(ptr->name());
if (iter != _active_processes.end()) {

// If process failed during swapping, then notify client.
(*iter).second->swapped_callback(0, 0, "Process closed unexpectedly during swapping");

_active_processes.erase(iter);

} else {

// FIXME: check for processes allocated by the control plane
Expand Down
23 changes: 17 additions & 6 deletions control-plane/src/backend.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <praas/common/util.hpp>
#include <praas/control-plane/application.hpp>
#include <praas/control-plane/config.hpp>
#include <praas/control-plane/deployment.hpp>

#if defined(WITH_FARGATE_BACKEND)
#include <aws/core/client/AsyncCallerContext.h>
Expand Down Expand Up @@ -43,23 +44,27 @@ namespace praas::control_plane::backend {
}
}

std::unique_ptr<Backend> Backend::construct(const config::Config& cfg)
std::unique_ptr<Backend> Backend::construct(const config::Config& cfg, deployment::Deployment& deployment)
{
if (cfg.backend_type == Type::DOCKER) {
return std::make_unique<DockerBackend>(*dynamic_cast<config::BackendDocker*>(cfg.backend.get()
));
return std::make_unique<DockerBackend>(
*dynamic_cast<config::BackendDocker*>(cfg.backend.get()),
deployment
);
}
#if defined(WITH_FARGATE_BACKEND)
if (cfg.backend_type == Type::AWS_FARGATE) {
return std::make_unique<FargateBackend>(
*dynamic_cast<config::BackendFargate*>(cfg.backend.get())
*dynamic_cast<config::BackendFargate*>(cfg.backend.get()),
deployment
);
}
#endif
return nullptr;
}

DockerBackend::DockerBackend(const config::BackendDocker& cfg)
DockerBackend::DockerBackend(const config::BackendDocker& cfg, deployment::Deployment& deployment):
Backend(deployment)
{
_logger = common::util::create_logger("LocalBackend");

Expand Down Expand Up @@ -92,6 +97,10 @@ namespace praas::control_plane::backend {
body["swap-location"] = process->state().swap->path(process->name());
}

if(auto * ptr = dynamic_cast<deployment::AWS*>(&_deployment)) {
body["s3-swapping-bucket"] = ptr->s3_bucket;
}

_http_client.post(
"/create",
{
Expand Down Expand Up @@ -185,7 +194,8 @@ namespace praas::control_plane::backend {
}

#if defined(WITH_FARGATE_BACKEND)
FargateBackend::FargateBackend(const config::BackendFargate& cfg)
FargateBackend::FargateBackend(const config::BackendFargate& cfg, deployment::Deployment& deployment):
Backend(deployment)
{
_logger = common::util::create_logger("FargateBackend");

Expand Down Expand Up @@ -308,6 +318,7 @@ namespace praas::control_plane::backend {
std::function<void(std::shared_ptr<ProcessInstance>&&, std::optional<std::string>)>&& callback
)
{
// FIXME: swap bucket
std::string cluster_name = _fargate_config["cluster_name"].asString();

Aws::ECS::Model::RunTaskRequest req;
Expand Down
19 changes: 19 additions & 0 deletions control-plane/src/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <praas/common/exceptions.hpp>
#include <praas/common/util.hpp>
#include <praas/control-plane/backend.hpp>
#include <praas/control-plane/deployment.hpp>

#include <optional>
#include <stdexcept>
Expand Down Expand Up @@ -101,6 +102,16 @@ namespace praas::control_plane::config {
fargate_config = "";
}

void AWSDeployment::load(cereal::JSONInputArchive& archive)
{
archive(CEREAL_NVP(s3_bucket));
}

void AWSDeployment::set_defaults()
{
s3_bucket = "";
}

Config Config::deserialize(std::istream& in_stream)
{
Config cfg;
Expand Down Expand Up @@ -142,6 +153,9 @@ namespace praas::control_plane::config {
public_ip_address = "127.0.0.1";
http_client_io_threads = 1;

deployment_type = deployment::Type::LOCAL;
deployment = std::make_unique<LocalDeployment>();

http.set_defaults();
workers.set_defaults();
down_scaler.set_defaults();
Expand All @@ -168,6 +182,11 @@ namespace praas::control_plane::config {
std::string deployment_type;
archive(cereal::make_nvp("deployment-type", deployment_type));
this->deployment_type = deployment::deserialize(deployment_type);
if(this->deployment_type == deployment::Type::AWS) {
auto ptr = std::make_unique<AWSDeployment>();
common::util::cereal_load_optional(archive, "deployment", *ptr);
this->deployment = std::move(ptr);
}

archive(cereal::make_nvp("ip-address", public_ip_address));
archive(cereal::make_nvp("http-client-io-threads", http_client_io_threads));
Expand Down
33 changes: 33 additions & 0 deletions control-plane/src/deployment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,18 @@ namespace praas::control_plane::state {
return fmt::format("local://{}", app_name);
}

#if defined(WITH_AWS_DEPLOYMENT)
std::string_view AWSS3SwapLocation::root_path() const
{
return "/";
}

std::string AWSS3SwapLocation::path(std::string process_name) const
{
return fmt::format("s3://{}", app_name);
}
#endif

} // namespace praas::control_plane::state

namespace praas::control_plane::deployment {
Expand All @@ -36,6 +48,15 @@ namespace praas::control_plane::deployment {
{
if (cfg.deployment_type == Type::LOCAL) {
return std::make_unique<Local>();
} else if (cfg.deployment_type == Type::AWS) {

auto cfg_ptr = dynamic_cast<config::AWSDeployment*>(cfg.deployment.get());
if(!cfg_ptr) {
spdlog::error("Wrong deployment config type!");
return nullptr;
}

return std::make_unique<AWS>(cfg_ptr->s3_bucket);
}
spdlog::error("Unknown deployment type! {}", static_cast<int>(cfg.deployment_type));
return nullptr;
Expand All @@ -51,4 +72,16 @@ namespace praas::control_plane::deployment {
spdlog::error("Deleting swap is not supported for disk operations.");
}

#if defined(WITH_AWS_DEPLOYMENT)
std::unique_ptr<state::SwapLocation> AWS::get_location(std::string app_name)
{
return std::make_unique<state::AWSS3SwapLocation>(app_name);
}

void AWS::delete_swap(const state::SwapLocation&)
{
spdlog::error("Deleting swap is not yet supported!");
}
#endif

} // namespace praas::control_plane::deployment
12 changes: 6 additions & 6 deletions control-plane/src/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ extern void signal_handler(int);

namespace praas::control_plane {

Server::Server(config::Config& cfg)
: _backend(backend::Backend::construct(cfg)),
_deployment(deployment::Deployment::construct(cfg)),
_workers(worker::Workers(cfg.workers, *_backend, *_deployment, _resources)),
_tcp_server(tcpserver::TCPServer(cfg.tcpserver, _workers)),
_http_server(std::make_shared<HttpServer>(cfg.http, _workers))
Server::Server(config::Config& cfg):
_deployment(deployment::Deployment::construct(cfg)),
_backend(backend::Backend::construct(cfg, *_deployment)),
_workers(worker::Workers(cfg.workers, *_backend, *_deployment, _resources)),
_tcp_server(tcpserver::TCPServer(cfg.tcpserver, _workers)),
_http_server(std::make_shared<HttpServer>(cfg.http, _workers))
{
_logger = common::util::create_logger("Server");

Expand Down
Loading

0 comments on commit 38f2494

Please sign in to comment.