diff --git a/eBPF_Supermarket/Auto_Cluster_Deployer/README.md b/eBPF_Supermarket/Auto_Cluster_Deployer/README.md
new file mode 100644
index 000000000..a7ad301be
--- /dev/null
+++ b/eBPF_Supermarket/Auto_Cluster_Deployer/README.md
@@ -0,0 +1,312 @@
+# Linux服务器集群的应用软件自动化部署管理工具

## 一、项目背景

在企业级 IT 环境中,随着业务需求的不断变化和扩展,服务器集群的管理和运维变得越来越复杂和繁重。特别是在 Linux 服务器集群中,面对大量分布式节点和多种应用软件的管理需求,如何高效地进行应用软件的安装、更新、卸载和同步操作,成为了一项重要的挑战。

本项目旨在设计和开发一个基于 Rust 语言的自动化应用软件部署与管理工具,以满足企业在 Linux 服务器集群中自动化运维的需求。该工具致力于提高系统运维效率、降低运维人员的工作强度,并实现以下几个目标:

1. **同步更新与自动部署**:自动检测服务器集群中不同节点的状态,根据预设的部署策略和更新计划,执行应用软件的安装、更新或卸载操作。确保应用版本的一致性和更新的及时性,减少人为干预和操作风险。

2. **支持多种应用场景**:该工具适用于多种业务场景,包括但不限于 Web 服务、数据库服务、数据分析服务、容器管理平台等。根据服务器类别和业务需求,工具能够自动判断和执行相应的软件管理操作。

3. **适配国产操作系统**:为满足国产化、自主可控的需求,本工具特别设计成兼容主流国产操作系统,如麒麟、统信 UOS 等,确保在这些平台上能够正常运行并实现自动化运维目标。

4. **扩展性和可维护性**:工具采用模块化设计,支持功能扩展和定制化开发。通过基于 Rust 语言的安全性、高效性和并发性优势,提供良好的性能表现和较低的运行开销。同时,提供详细的日志和监控功能,便于问题排查和性能优化。

## 二、使用的技术

本项目使用了多种关键技术和工具来实现自动化应用软件部署与管理:

1. **Rust编程语言**:作为项目的核心开发语言,Rust 提供了高效的系统级编程能力和内存安全性。它的“零成本抽象”特性确保了高性能,并且所有权机制有效防止了数据竞争、空指针引用等常见的编程错误,为开发高并发、低延迟的服务提供了坚实基础。 + +2. **gRPC**:用于实现客户端与服务器之间的远程过程调用(RPC)。gRPC 基于 HTTP/2 协议,支持多路复用、流式传输和双向通信等特性,确保高效、可靠的数据传输,适用于低延迟和高吞吐量的场景。 + + ![grpc](img/grpc.jpg) + +3. **Tonic**:Rust 生态中用于实现 gRPC 服务的库,简化了 gRPC 服务端和客户端的开发。Tonic 提供了符合 Rust 语言特性的异步接口,与 tokio 等异步运行时库紧密集成,为构建高效的异步网络服务提供了便利。 + +4. **tonic-web**:用于启用 gRPC-Web 支持。`tonic_web::enable` 允许 `tonic` 生成的 gRPC 服务可以通过 gRPC-Web 与客户端通信。由于 gRPC-Web 是基于 HTTP/1.x,因此需要显式启用 `accept_http1`。 + +5. **Protocol Buffers (Proto)**:作为 gRPC 的接口描述语言(IDL),Proto 用于定义服务端和客户端之间通信的接口和消息格式。它支持跨语言的序列化和反序列化,具有高效的二进制编码方式,能够显著减少数据传输的大小和开销。 + +6. **tokio**:Rust 的异步编程库,提供了多线程的异步运行时,能够处理大量的并发 IO 操作。tokio 在性能和可扩展性上表现出色,使得应用程序能够在不增加线程开销的情况下处理更多的连接和请求,特别适合高并发的网络服务场景。 + +7. **Rocket**:Rust 的一个 Web 框架,用于构建快速、安全的 web 应用。Rocket 提供了简洁的 API 和强大的功能,支持高级路由、请求处理和响应构建,非常适合需要高性能和高安全性的 Web 应用开发。 + +8. **PostgreSQL**:一个强大的开源关系数据库管理系统,提供丰富的功能和高度的可扩展性。PostgreSQL 支持复杂查询、大数据集、事务处理以及多种数据类型,适用于需要高性能和可靠数据存储的应用场景。 + +9. **Vue**:一个现代化的前端 JavaScript 框架,用于构建用户界面和单页应用。Vue 提供了响应式的数据绑定和组件化的开发方式,使得构建复杂的用户界面变得更加简单和高效。 + +这些技术的结合,使得项目能够在保证高性能和可靠性的前提下,实现复杂的部署和管理功能,满足企业级Linux服务器集群的运维需求。 + +## 三、系统实现 + +本项目使用 Rust 语言编写,采用 Tonic 库提供 gRPC 服务来实现客户端与服务器端的通信和数据交换。系统分为两大部分:客户端 (`client.rs`) 和服务器端 (`server.rs`)。通过 gRPC,客户端和服务器端能够进行高效的远程过程调用,实现软件包的部署和管理功能。 + +### 1. 客户端与服务器端的互通 + +- 在客户端部分,首先创建了一个 HTTP 客户端,它使用 `hyper` 库提供的功能来处理 HTTP 请求。为了与 gRPC-Web 兼容,客户端配置了一个中间件层 (`GrpcWebClientLayer`),这使得 HTTP 客户端能够处理 gRPC-Web 的请求和响应。客户端通过指定服务器的地址(如 ``)创建了一个 `SaControlClient` 实例,并将其封装在一个 `Arc` 中。这种封装方式使得客户端能够安全地在多线程环境中使用。 +- 服务器端部分使用 `tonic` 库创建了一个 gRPC 服务,并且通过 `tonic_web` 启用了 gRPC-Web 支持。服务器的地址和端口被设置为 ``。服务器通过 `Server::builder` 配置了 HTTP/1.x 支持,以便与 gRPC-Web 兼容。服务器端初始化并启动了 `SaControlServer`,并开始监听指定的地址。 + +### 2. 通信协议定义 + +本项目使用 Protocol Buffers(简称 Proto)来定义 gRPC 的通信协议,版本使用 `proto3` 语法。 + +下图1是服务的通信协议图示,展示了客户端与服务器之间的交互流程以及消息定义。 + +![通信协议图](img/通信图.jpg) + +#### (1)服务和消息定义 + +- **服务定义**: + + SAControl服务包括以下 RPC 方法: + + - `DeployPackages(Empty) returns (Ack)`:用于在服务器端执行部署软件包的操作。客户端调用该方法后,服务器端将根据预定义的规则进行软件包的自动部署。 + - `SARegist(SAInfo) returns (Ack)`:用于在服务器端注册服务器信息。客户端发送服务器的 IP 地址和类别信息,服务器端接收后确认注册。 + - `GetPackageInfoByIP(IpRequest) returns (PackageInfoResponse)`:根据 IP 地址请求获取软件包信息。客户端发送 IP 地址请求,服务器端返回该 IP 地址对应的包信息(如 ID、版本、软件名称和描述)。 + - `SendPackageFile(PackageRequest) returns (stream FileChunk)`:客户端请求软件包文件传输,服务器端通过流式数据传输的方式发送文件块。 + +- **消息定义**: + + - `Empty`:一个空消息,用于无参数请求。 + - `IpRequest`:包含 IP 地址的请求消息。 + - `SAInfo`:包含服务器 IP 地址和类别的注册信息。 + - `Ack`:一个确认消息,包含布尔类型的成功标志。 + - `ProgramDirectory`:包含程序目录信息,使用名称和子目录列表。 + - `TasksList`:任务列表,包含任务名称的重复字符串。 + - `TaskRequest`:任务请求,包含任务名称列表。 + - `Data`:包含字符串数据的消息。 + - `VersionInfo`:包含版本信息的消息。 + - `PackageRequest`:包含类别信息的请求消息。 + - `FileChunk`:用于文件传输的消息,包含文件内容(字节)和文件类型(字符串)。 + - `PackageInfoResponse`:包含包的 ID、版本、软件名称和描述的响应消息。 + +#### (2)客户端与服务器交互流程 + +1. **注册阶段**:客户端通过 `SARegist` RPC 方法向服务器注册自身信息,包括服务器 IP 和类别信息。服务器在接收到此请求后,确认并记录该信息。 +2. **部署请求阶段**:客户端调用 `DeployPackages` 方法,向服务器发送部署请求。服务器根据请求指示,执行相应的自动化部署任务。 +3. **获取软件包信息**:客户端通过 `GetPackageInfoByIP` 方法,按需向服务器请求特定 IP 地址的相关软件包信息,服务器将返回相应的版本、软件名称等详细信息。 +4. **文件传输**:当需要更新或安装新的软件包时,客户端使用 `SendPackageFile` 方法请求文件传输。服务器端通过流的形式逐块发送文件内容(`FileChunk`),确保大文件的传输可靠性和稳定性。 + +### 3. 客户端设计 + +1. **异步获取本地 IP 地址** + + 客户端需要获取本机的 IP 地址,以便与服务器进行通信。通过异步命令执行,客户端可以动态获取和确认其 IP 地址,以保证更新任务的精确性。 + +2. **解析配置文件和 DEB 包的版本号** + + 客户端检查本地的配置文件和 DEB 软件包以确定当前版本号。 + + - 如果配置文件不可用,客户端会回退到默认版本号。 + - 使用外部工具来解析 DEB 包中的版本号信息,以确保获取的版本号准确无误。 + 这一过程有助于减少不必要的更新操作,并保持软件版本的一致性。 + +3. **gRPC 服务客户端的初始化** + + 客户端通过 gRPC 接口与服务器通信。在初始化过程中,创建了一个基于 HTTP 的 gRPC 客户端,以支持不同服务之间的高效通信。 + 为确保线程安全,客户端对象被包装在并发控制结构中,使其能够在异步操作和多线程环境下正确工作。 + +4. **循环检查版本并触发更新** + + 客户端采用一个循环结构来不断检查软件包的版本信息。具体流程如下: + + - 向服务器发送部署请求,并等待服务器的响应。 + - 根据本地 IP 地址查询服务器上对应的软件包信息。 + - 如果从服务器获取的软件包信息表明不需要更新,客户端将继续进入下一轮循环。 + +5. **注册代理信息** + + 客户端根据获取到的本地 IP 地址和服务器返回的信息,向服务器进行代理注册。这一步骤确保服务器能够识别每个客户端,并准确分配相应的更新任务。 + +6. **比较本地和服务器的版本号** + + 客户端比较本地版本号与服务器上的最新版本号: + + - 如果本地版本落后,客户端将进一步检查传输的 DEB 包版本,以确认是否需要更新。 + - 如果传输的版本也低于服务器版本,客户端会请求获取新的更新包。 + +7. **请求和保存更新包** + + 如果确认需要更新,客户端向服务器请求更新包,并将接收到的包分块保存到本地文件系统中。 + 根据文件类型(如 DEB 包或安装脚本),客户端将内容保存到相应的文件中,确保更新操作的顺利进行。 + +8. **执行更新操作** + + 在接收到完整的更新包后,客户端准备执行更新操作。由于之前版本的软件可能用户还正在使用中,所以需要让用户自行决定要不要执行脚本来部署新版本的软件包。 + + - 执行脚本 + + 用户在终端输入以下命令来执行脚本: + + ```bash + bash install.sh + ``` + +9. **定时检查机制** + + 客户端设置了一个定时机制,在每轮更新检查完成后,等待一段时间再重新开始版本检查。这种设计不仅减少了对服务器的频繁请求,还提高了客户端的性能和稳定性。 + +### 4. 服务器端设计 + +服务器采用了 Rust 编程语言,使用了 `tonic` 和 `tokio` 等异步编程库,同时集成了 PostgreSQL 数据库进行数据管理。 + +1. **项目结构** + + 服务器端的核心逻辑通过实现 `SaControl` 服务接口来完成,该接口包含了多个方法来处理各种任务,如部署软件包、获取软件包信息、注册传感器代理等。程序模块包括以下内容: + + - **gRPC 服务定义**: `SaControl` 服务接口的定义,包含多个远程过程调用 (RPC) 方法。 + - **配置管理**: 使用结构体和序列化库来管理服务器的配置文件,包括程序目录、任务配置、代理信息等。 + - **数据库连接**: 通过 `tokio_postgres` 库实现与 PostgreSQL 数据库的连接与操作。 + - **文件传输**: 使用异步文件读取和流处理来实现软件包文件的分块传输。 + +2. **部署包打包** + + 为了能够在 Linux 系统上自动化部署软件,需要将应用程序打包成 `.deb` 或 `.rpm` 格式的部署包。这些格式的包通常用于在 Debian/Ubuntu 或 Red Hat/CentOS 系统上进行软件安装和管理。 + + - **打包成 `.deb` 文件** + + - **目录结构**: + + ```lua + package/ + ├── DEBIAN + │ └── control + └── usr + └── local + └── bin + ├── config.toml + └── application + ``` + + - **`DEBIAN/control` 文件**:包含包的元数据,如包名、版本、依赖关系等: + + ```less + Package: package + Version: 1.0.0 + Section: base + Priority: optional + Architecture: all + Maintainer: Your Name + Description: A brief description of your package. + ``` + + - **打包命令**: + + ```bash + dpkg-deb --build package + ``` + + - **打包成 `.rpm` 文件** + + - **目录结构**: + + ```plaintext + package-name/ + ├── RPM/ + │ ├── BUILD/ + │ ├── RPMS/ + │ ├── SOURCES/ + │ ├── SPECS/ + │ └── SRPMS/ + └── usr/ + └── local/ + └── bin/ + ├── config.toml + └── application + ``` + + - **`SPECS/your-package.spec` 文件**:此文件包含包的构建说明和元数据。 + + ```spec + Name: package + Version: 1.0.0 + Release: 1%{?dist} + Summary: A brief description of your package. + + License: GPL + URL: http://your.url + Source0: %{name}-%{version}.tar.gz + + %description + A detailed description of your package. + + %prep + %setup -q + + %build + # 在此处添加编译构建的命令 + + %install + # 安装命令,用于将文件复制到 RPM 构建目录 + install -D -m 0755 application %{buildroot}/usr/local/bin/application + install -D -m 0644 config.toml %{buildroot}/usr/local/bin/config.toml + + %files + /usr/local/bin/application + /usr/local/bin/config.toml + ``` + + - **打包命令**: + + ```bash + rpmbuild -ba SPECS/package.spec + ``` + +3. **服务接口实现** + + - **部署软件包 (`deploy_packages`)** + + 该方法用于根据数据库中的任务列表部署软件包。它从 `deployment_tasks` 表中获取未部署的任务,并根据任务的目标类型执行以下操作: + + - **所有目标**: 如果目标类型为“所有”,则查询所有服务器的 IP 地址并将信息插入到 `package_deployment` 表中。 + - **服务器组**: 对于指定的服务器组,查询组成员的服务器 IP 地址并进行相应的插入操作。 + - **单台服务器**: 针对特定服务器的目标类型,直接查询其 IP 地址并将信息存储在 `package_deployment` 表中。 + + 部署完成后,更新任务状态为已部署。 + + - **获取软件包信息 (`get_package_info_by_ip`)** + + 该方法根据客户端提供的 IP 地址,查询指定 IP 所关联的软件包信息。步骤如下: + + 1. 从 `package_deployment` 表中查找对应的包 ID。 + 2. 根据包 ID 从 `deployment_packages` 表中获取详细信息,如版本、软件名称和描述。 + 3. 删除 `package_deployment` 表中与该 IP 相关的记录。 + 4. 返回查询到的软件包信息。 + + 如果查询超时,将返回一个默认的响应,表示查询失败。 + + - **传感器代理注册 (`sa_regist`)** + + 用于接收来自传感器代理的注册信息,记录并确认接收到的注册信息,包括服务器 IP 和软件包 ID。响应一个成功的确认信息。 + + - **发送软件包文件 (`send_package_file`)** + + 此方法用于向客户端传输软件包文件。步骤如下: + + 1. 根据请求中的包 ID 从数据库中查询软件包的存储路径。 + 2. 准备一个文件列表,包括软件包和安装脚本。 + 3. 异步读取文件内容,并将其分块发送给客户端。 + 4. 返回包含数据流的响应。 + +4. **数据库操作** + + 程序通过 `tokio_postgres` 库与 PostgreSQL 数据库进行交互,包括以下主要操作: + + - **连接和错误处理**: 使用异步任务处理数据库连接,并在发生错误时打印相应的错误信息。 + - **查询操作**: 从 `deployment_tasks`、`servers`、`server_group_members`、`package_deployment` 和 `deployment_packages` 表中查询信息。 + - **插入和更新操作**: 向 `package_deployment` 表中插入新的记录,更新 `deployment_tasks` 表的状态。 + - **删除操作**: 删除与特定 IP 地址相关的包部署记录。 + +5. **异步任务和错误处理** + + 服务器使用 `tokio` 提供的异步功能来处理多任务并发。针对每个数据库操作和文件传输,都进行了详细的错误处理和日志记录,以确保系统的可靠性和可维护性。 + +6. **启动和配置** + + 服务器程序的启动由 `main` 函数负责,其中包括设置服务器的监听地址、初始化日志系统、以及配置 gRPC 服务和 HTTP/1 支持。 + + 服务器将在指定的 IP 地址和端口上监听客户端请求,并根据定义的服务接口处理请求。 + diff --git ["serde"] } +dotenv = "0.15.0" +tokio = { version = "1", features = ["full"] } +time = "0.3" +syn = { version = "1.0", features = ["derive", "parsing"] } +bcrypt = "0.10" +async-std = "1.10" +rocket_okapi = "0.5" + + + + + diff --git a/eBPF_Supermarket/Auto_Cluster_Deployer/restful_api/src/auth.rs b/eBPF_Supermarket/Auto_Cluster_Deployer/restful_api/src/auth.rs new file mode 100644 index 000000000..9c4d39648 --- /dev/null +++ b/eBPF_Supermarket/Auto_Cluster_Deployer/restful_api/src/auth.rs @@ -0,0 +1,135 @@ +use rocket::{post, serde::json::Json, State}; +use crate::model::{User, NewUser, ApiResponse}; +use sqlx::query_as; +use bcrypt::{hash, verify}; +use chrono::Utc; +use rocket::http::Status; +use sqlx::PgPool; +use crate::model::ChangePasswordRequest; +use sqlx::query; + +#[post("/register", data = "")] +pub async fn register(db: &State, new_user: Json) -> Result>, Status> { + let password_hash = hash(&new_user.password, 4).map_err(|_| Status::InternalServerError)?; + + let mut conn = db.acquire().await.map_err(|_| Status::InternalServerError)?; + + let result = query_as::<_, User>( + r#" + INSERT INTO users (username, password_hash, created_at, updated_at) + VALUES ($1, $2, $3, $4) + RETURNING * + "# + ) + .bind(&new_user.username) + .bind(&password_hash) + .bind(Utc::now()) + .bind(Utc::now()) + .fetch_one(&mut conn) + .await; + + match result { + Ok(user) => Ok(Json(ApiResponse { + code: 200, + status: "success".to_string(), + message: "User registered successfully".to_string(), + data: Some(user), + })), + Err(sqlx::Error::Database(db_err)) if db_err.constraint() == Some("unique_username") => { + // 处理唯一约束冲突错误 + Err(Status::Conflict) // 返回 409 冲突状态 + }, + Err(_) => Err(Status::UnprocessableEntity), + } +} + + +#[post("/login", data = "")] +pub async fn login(db: &State, login_data: Json) -> Result>, Status> { + let mut conn = db.acquire().await.map_err(|_| Status::InternalServerError)?; + + let user = query_as::<_, User>( + r#" + SELECT id, username, password_hash, created_at, updated_at + FROM users + WHERE username = $1 + "# + ) + .bind(&login_data.username) + .fetch_optional(&mut conn) // 使用获取的连接 + .await + .map_err(|_| Status::InternalServerError)? + .ok_or_else(|| Status::Unauthorized)?; + + let valid = verify(&login_data.password, &user.password_hash).map_err(|_| Status::InternalServerError)?; + + if valid { + Ok(Json(ApiResponse { + code: 200, + status: "success".to_string(), + message: "Login successful".to_string(), + data: Some(user), + })) + } else { + Err(Status::Unauthorized) + } +} + +// 修改密码接口 +#[post("/change_password", data = "")] +pub async fn change_password( + db: &State, + change_password_request: Json, +) -> Result>, Status> { + let mut conn = db.acquire().await.map_err(|_| Status::InternalServerError)?; + + // 获取用户信息 + let user = query_as::<_, User>( + r#" + SELECT id, username, password_hash, created_at, updated_at + FROM users + WHERE username = $1 + "# + ) + .bind(&change_password_request.username) + .fetch_optional(&mut conn) + .await + .map_err(|_| Status::InternalServerError)? + .ok_or_else(|| Status::Unauthorized)?; + + // 验证旧密码 + let is_valid = verify(&change_password_request.old_password, &user.password_hash) + .map_err(|_| Status::InternalServerError)?; + + if !is_valid { + return Err(Status::Unauthorized); // 如果旧密码不正确,返回未授权状态 + } + + // 哈希新密码 + let new_password_hash = hash(&change_password_request.new_password, 4) + .map_err(|_| Status::InternalServerError)?; + + // 更新数据库中的密码哈希 + let update_result = query( + r#" + UPDATE users + SET password_hash = $1, updated_at = $2 + WHERE id = $3 + "# + ) + .bind(&new_password_hash) + .bind(chrono::Utc::now()) + .bind(user.id) + .execute(&mut conn) + .await; + + match update_result { + Ok(_) => Ok(Json(ApiResponse { + code: 200, + status: "success".to_string(), + message: "Password updated successfully".to_string(), + data: None, + })), + Err(_) => Err(Status::InternalServerError), + } +} diff --git a/eBPF_Supermarket/Auto_Cluster_Deployer/restful_api/src/deployment_packages.rs b/eBPF_Supermarket/Auto_Cluster_Deployer/restful_api/src/deployment_packages.rs new file mode 100644 index 000000000..68021f2dd --- /dev/null +++ b/eBPF_Supermarket/Auto_Cluster_Deployer/restful_api/src/deployment_packages.rs @@ -0,0 +1,132 @@ +use rocket::{get, post, put, delete, serde::json::Json, State}; +use crate::model::{DeploymentPackage, NewDeploymentPackage, ApiResponse}; +use sqlx::{PgPool, query_as, query}; +use rocket::http::Status; + +// 增加部署包 +#[post("/deployment_packages", data = "")] +pub async fn add_deployment_package( + db: &State, + new_package: Json +) -> Result>, Status> { + let result = query_as::<_, DeploymentPackage>( + "INSERT INTO deployment_packages (version, software_name, description, path) + VALUES ($1, $2, $3, $4) RETURNING *" + ) + .bind(&new_package.version) + .bind(&new_package.software_name) + .bind(&new_package.description) + .bind(&new_package.path) + .fetch_one(&**db) + .await; + + match result { + Ok(package) => Ok(Json(ApiResponse { + code: 200, + status: "success".to_string(), + message: "Deployment package added successfully".to_string(), + data: Some(package), + })), + Err(_) => Err(Status::InternalServerError), + } +} + +// 查询部署包(根据软件名) +#[get("/deployment_packages/")] +pub async fn get_deployment_packages( + db: &State, + software_name: String +) -> Result>>, Status> { + let result = query_as::<_, DeploymentPackage>( + "SELECT * FROM deployment_packages WHERE software_name = $1" + ) + .bind(&software_name) + .fetch_all(&**db) + .await; + + match result { + Ok(packages) => Ok(Json(ApiResponse { + code: 200, + status: "success".to_string(), + message: "Deployment packages retrieved successfully".to_string(), + data: Some(packages), + })), + Err(_) => Err(Status::InternalServerError), + } +} + +// 获取全部部署包 +#[get("/deployment_packages")] +pub async fn get_all_deployment_packages( + db: &State +) -> Result>>, Status> { + let result = query_as::<_, DeploymentPackage>( + "SELECT * FROM deployment_packages" + ) + .fetch_all(&**db) + .await; + + match result { + Ok(packages) => Ok(Json(ApiResponse { + code: 200, + status: "success".to_string(), + message: "All deployment packages retrieved successfully".to_string(), + data: Some(packages), + })), + Err(_) => Err(Status::InternalServerError), + } +} + +// 修改部署包(根据软件名) +#[put("/deployment_packages/", data = "")] +pub async fn update_deployment_package( + db: &State, + software_name: String, + updated_package: Json +) -> Result>, Status> { + let result = query_as::<_, DeploymentPackage>( + "UPDATE deployment_packages + SET version = $1, description = $2, path = $3 + WHERE software_name = $4 + RETURNING *" + ) + .bind(&updated_package.version) + .bind(&updated_package.description) + .bind(&updated_package.path) + .bind(&software_name) + .fetch_one(&**db) + .await; + + match result { + Ok(package) => Ok(Json(ApiResponse { + code: 200, + status: "success".to_string(), + message: "Deployment package updated successfully".to_string(), + data: Some(package), + })), + Err(_) => Err(Status::InternalServerError), + } +} + +// 删除部署包(根据软件名) +#[delete("/deployment_packages/")] +pub async fn delete_deployment_package( + db: &State, + software_name: String +) -> Result>, Status> { + let result = query("DELETE FROM deployment_packages WHERE software_name = $1") + .bind(&software_name) + .execute(&**db) + .await; + + match result { + Ok(_) => Ok(Json(ApiResponse { + code: 200, + status: "success".to_string(), + message: "Deployment package deleted successfully".to_string(), + data: None, + })), + Err(_) => Err(Status::InternalServerError), + } +} + diff --git a/eBPF_Supermarket/Auto_Cluster_Deployer/restful_api/src/deployment_tasks.rs b/eBPF_Supermarket/Auto_Cluster_Deployer/restful_api/src/deployment_tasks.rs new file mode 100644 index 000000000..b27980cf3 --- /dev/null +++ b/eBPF_Supermarket/Auto_Cluster_Deployer/restful_api/src/deployment_tasks.rs @@ -0,0 +1,170 @@ +use crate::model::{DeploymentTask, NewDeploymentTask, ApiResponse, TargetType}; +use sqlx::{PgPool, query_as, query}; +use rocket::{get, post, put, delete, serde::json::Json, State}; +use rocket::http::Status; + +#[post("/deployment_tasks", data = "")] +pub async fn add_deployment_task( + db: &State, + new_task: Json +) -> Result>, Status> { + let target_type = TargetType::from_str(&new_task.target_type) + .ok_or(Status::BadRequest)?; + + match target_type { + TargetType::SingleServer => { + let server_exists = query("SELECT 1 FROM servers WHERE id = $1") + .bind(new_task.target_id) + .fetch_optional(&**db) + .await + .map_err(|e| { + eprintln!("Database error: {}", e); + Status::InternalServerError + })?; + + if server_exists.is_none() { + return Err(Status::BadRequest); + } + }, + TargetType::ServerGroup => { + let group_exists = query("SELECT 1 FROM server_groups WHERE id = $1") + .bind(new_task.target_id) + .fetch_optional(&**db) + .await + .map_err(|e| { + eprintln!("Database error: {}", e); + Status::InternalServerError + })?; + + if group_exists.is_none() { + return Err(Status::BadRequest); + } + }, + TargetType::All => { + if new_task.target_id.is_some() { + return Err(Status::BadRequest); + } + }, + } + + let result = query_as::<_, DeploymentTask>( + "INSERT INTO deployment_tasks (package_id, target_type, target_id, is_deployed) + VALUES ($1, $2, $3, FALSE) RETURNING *" + ) + .bind(new_task.package_id) + .bind(new_task.target_type.clone()) + .bind(new_task.target_id) + .fetch_one(&**db) + .await; + + match result { + Ok(task) => Ok(Json(ApiResponse { + code: 200, + status: "success".to_string(), + message: "Deployment task created successfully".to_string(), + data: Some(task), + })), + Err(sqlx::Error::Database(db_err)) if db_err.constraint() == Some("deployment_tasks_package_id_target_type_target_id_key") => { + Err(Status::Conflict) + }, + Err(e) => { + eprintln!("Database error: {}", e); + Err(Status::InternalServerError) + }, + } +} + +// 查询所有部署任务 +#[get("/deployment_tasks")] +pub async fn get_all_deployment_tasks( + db: &State +) -> Result>>, Status> { + let result = query_as::<_, DeploymentTask>("SELECT * FROM deployment_tasks") + .fetch_all(&**db) + .await; + + match result { + Ok(tasks) => Ok(Json(ApiResponse { + code: 200, + status: "success".to_string(), + message: "Deployment tasks retrieved successfully".to_string(), + data: Some(tasks), + })), + Err(_) => Err(Status::InternalServerError), + } +} + +// 根据 package_id 查询部署任务 +#[get("/deployment_tasks/package/")] +pub async fn get_deployment_tasks_by_package( + db: &State, + package_id: i32 +) -> Result>>, Status> { + let result = query_as::<_, DeploymentTask>("SELECT * FROM deployment_tasks WHERE package_id = $1") + .bind(package_id) + .fetch_all(&**db) + .await; + + match result { + Ok(tasks) => Ok(Json(ApiResponse { + code: 200, + status: "success".to_string(), + message: "Deployment tasks retrieved successfully".to_string(), + data: Some(tasks), + })), + Err(_) => Err(Status::InternalServerError), + } +} + +// 更新部署任务 +#[put("/deployment_tasks/", data = "")] +pub async fn update_deployment_task( + db: &State, + id: i32, + updated_task: Json +) -> Result>, Status> { + let result = query_as::<_, DeploymentTask>( + "UPDATE deployment_tasks + SET package_id = $1, target_type = $2, target_id = $3 + WHERE id = $4 + RETURNING *" + ) + .bind(updated_task.package_id) + .bind(updated_task.target_type.to_string()) + .bind(updated_task.target_id) + .bind(id) + .fetch_one(&**db) + .await; + + match result { + Ok(task) => Ok(Json(ApiResponse { + code: 200, + status: "success".to_string(), + message: "Deployment task created successfully".to_string(), + data: Some(task), + })), + Err(_) => Err(Status::InternalServerError), + } +} + +// 删除部署任务 +#[delete("/deployment_tasks/")] +pub async fn delete_deployment_task( + db: &State, + id: i32 +) -> Result>, Status> { + let result = query("DELETE FROM deployment_tasks WHERE id = $1") + .bind(id) + .execute(&**db) + .await; + + match result { + Ok(_) => Ok(Json(ApiResponse { + code: 200, + status: "success".to_string(), + message: "Deployment task deleted successfully".to_string(), + data: None, + })), + Err(_) => Err(Status::InternalServerError), + } +} diff --git a/eBPF_Supermarket/Auto_Cluster_Deployer/restful_api/src/main.rs b/eBPF_Supermarket/Auto_Cluster_Deployer/restful_api/src/main.rs new file mode 100644 index 000000000..e8ec93c6f --- /dev/null +++ b/eBPF_Supermarket/Auto_Cluster_Deployer/restful_api/src/main.rs @@ -0,0 +1,122 @@ +mod auth; +mod model; +mod servers; +mod server_groups; +mod server_group_members; +mod deployment_packages; +mod deployment_tasks; + +use rocket::routes; +use sqlx::postgres::PgPoolOptions; +use sqlx::PgPool; +use rocket::{Request, Response}; +use rocket::fairing::{Fairing, Info, Kind}; +use rocket::http::Header; +use rocket::config::Config; + + +type Db = PgPool; + +// 定义自定义 CORS Fairing +pub struct Cors { + allowed_origins: Vec, +} + +#[rocket::async_trait] +impl Fairing for Cors { + fn info(&self) -> Info { + Info { + name: "CORS Fairing", + kind: Kind::Response | Kind::Request, + } + } + + async fn on_request(&self, _request: &mut Request<'_>, _data: &mut rocket::Data<'_>) { + } + + async fn on_response<'r>(&self, request: &'r Request<'_>, response: &mut Response<'r>) { + if request.method() == rocket::http::Method::Options { + response.set_status(rocket::http::Status::Ok); + } + + response.set_header(Header::new("Access-Control-Allow-Origin", self.get_origin(request))); + response.set_header(Header::new("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS")); + response.set_header(Header::new("Access-Control-Allow-Headers", "Authorization, Accept, Content-Type")); + } +} + +impl Cors { + fn new(allowed_origins: Vec<&str>) -> Self { + Cors { + allowed_origins: allowed_origins.into_iter().map(String::from).collect(), + } + } + + fn get_origin(&self, request: &Request<'_>) -> String { + if let Some(origin) = request.headers().get_one("Origin") { + if self.allowed_origins.contains(&origin.to_string()) { + return origin.to_string(); + } + } + "*".to_string() // 默认情况下允许所有来源 + } +} + +#[rocket::options("/")] +fn options_route() -> rocket::http::Status { + rocket::http::Status::Ok +} + +// 配置并启动 Rocket +fn rocket(pool: Db) -> rocket::Rocket { + let allowed_origins = vec![ + "http://localhost:8080", + "", + ]; + + rocket::custom( + Config::figment() + .merge(("port", 8080)) + .merge(("address", "")) + ) + .manage(pool) + .attach(Cors::new(allowed_origins)) + .mount("/", routes![ + options_route, + auth::register, + auth::login, + auth::change_password, + servers::add_server, + servers::get_servers, + servers::delete_server, + server_groups::add_server_group, + server_groups::get_server_groups, + server_groups::delete_server_group, + server_group_members::add_servers_to_group, + server_group_members::get_servers_in_group, + server_group_members::remove_servers_from_group, + deployment_packages::add_deployment_package, + deployment_packages::get_deployment_packages, + deployment_packages::get_all_deployment_packages, + deployment_packages::update_deployment_package, + deployment_packages::delete_deployment_package, + deployment_tasks::add_deployment_task, + deployment_tasks::get_all_deployment_tasks, + deployment_tasks::get_deployment_tasks_by_package, + deployment_tasks::update_deployment_task, + deployment_tasks::delete_deployment_task, + ]) +} + +#[rocket::main] +async fn main() -> Result<(), rocket::Error> { + let pool = PgPoolOptions::new() + .max_connections(5) + .connect("postgres://zxy:123456@localhost/sensordb") + .await + .expect("Failed to create pool."); + + rocket(pool).launch().await?; + + Ok(()) +} diff --git a/eBPF_Supermarket/Auto_Cluster_Deployer/restful_api/src/model.rs b/eBPF_Supermarket/Auto_Cluster_Deployer/restful_api/src/model.rs new file mode 100644 index 000000000..180b0f7dd --- /dev/null +++ b/eBPF_Supermarket/Auto_Cluster_Deployer/restful_api/src/model.rs @@ -0,0 +1,128 @@ +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use sqlx::FromRow; + + +#[derive(Debug, Serialize, Deserialize, FromRow)] +pub struct User { + pub id: i32, + pub username: String, + pub password_hash: String, + pub created_at: DateTime, + pub updated_at: DateTime, +} + +#[derive(Deserialize)] +pub struct NewUser { + pub username: String, + pub password: String, +} + + +#[derive(Debug, Deserialize)] +pub struct ChangePasswordRequest { + pub username: String, + pub old_password: String, + pub new_password: String, +} + +#[derive(Debug, Serialize, Deserialize, FromRow)] +pub struct Server { + pub id: i32, + pub ip_address: String, +} + +#[derive(Debug, Deserialize)] +pub struct NewServer { + pub ip_address: String, +} + +#[derive(Debug, Serialize, Deserialize, FromRow)] +pub struct ServerGroup { + pub id: i32, + pub description: String, +} + +#[derive(Debug, Deserialize)] +pub struct NewServerGroup { + pub description: String, +} + +#[derive(Debug, Serialize, Deserialize, FromRow)] +pub struct ServerGroupMember { + pub id: i32, + pub server_id: i32, + pub group_id: i32, +} + +#[derive(Debug, Deserialize)] +pub struct BatchAddServersRequest { + pub ip_addresses: Vec, + pub group_description: String, +} + + +#[derive(Debug, Deserialize)] +pub struct BatchRemoveServersRequest { + pub ip_addresses: Vec, + pub group_description: String, +} + +#[derive(Debug, Serialize, Deserialize, FromRow)] +pub struct DeploymentPackage { + pub id: i32, + pub version: String, + pub software_name: String, + pub description: Option, + pub path: String, +} + +#[derive(Debug, Deserialize)] +pub struct NewDeploymentPackage { + pub version: String, + pub software_name: String, + pub description: Option, + pub path: String, +} + +#[derive(Debug, Serialize, Deserialize, FromRow)] +pub struct DeploymentTask { + pub id: i32, + pub package_id: i32, + pub target_type: String, + pub target_id: Option, +} + +#[derive(Debug, Deserialize)] +pub struct NewDeploymentTask { + pub package_id: i32, + pub target_type: String, + pub target_id: Option, +} + +#[derive(Debug, Deserialize, Serialize)] +#[serde(rename_all = "snake_case")] +pub enum TargetType { + SingleServer, + ServerGroup, + All, +} + +impl TargetType { + pub fn from_str(s: &str) -> Option { + match s { + "单台服务器" => Some(TargetType::SingleServer), + "服务器组" => Some(TargetType::ServerGroup), + "所有" => Some(TargetType::All), + _ => None, + } + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct ApiResponse { + pub code: u16, + pub status: String, + pub message: String, + pub data: Option, +} diff --git a/eBPF_Supermarket/Auto_Cluster_Deployer/restful_api/src/server_group_members.rs b/eBPF_Supermarket/Auto_Cluster_Deployer/restful_api/src/server_group_members.rs new file mode 100644 index 000000000..99697fe13 --- /dev/null +++ b/eBPF_Supermarket/Auto_Cluster_Deployer/restful_api/src/server_group_members.rs @@ -0,0 +1,142 @@ +// server_group_members.rs + +use rocket::{get, post, delete, serde::json::Json, State}; +use crate::model::{ServerGroupMember, BatchAddServersRequest,BatchRemoveServersRequest, ApiResponse, Server, ServerGroup}; +use sqlx::{PgPool, query_as, query}; +use rocket::http::Status; + +// 通用接口:添加一个或多个服务器到服务器组 +#[post("/add_servers_to_group", data = "")] +pub async fn add_servers_to_group( + db: &State, + request: Json +) -> Result>>, Status> { + // 查找目标服务器组 + let group = query_as::<_, ServerGroup>("SELECT * FROM server_groups WHERE description = $1") + .bind(&request.group_description) + .fetch_one(&**db) + .await + .map_err(|_| Status::NotFound)?; + + let mut added_members = vec![]; // 存储成功添加的成员记录 + + // 逐个查找服务器并添加到组 + for ip in &request.ip_addresses { + // 查找服务器 + let server = query_as::<_, Server>("SELECT * FROM servers WHERE ip_address = $1") + .bind(ip) + .fetch_one(&**db) + .await + .map_err(|_| Status::NotFound)?; + + // 插入关联关系 + match query_as::<_, ServerGroupMember>( + "INSERT INTO server_group_members (server_id, group_id) VALUES ($1, $2) RETURNING *" + ) + .bind(server.id) + .bind(group.id) + .fetch_one(&**db) + .await { + Ok(member) => added_members.push(member), // 成功时将成员添加到结果集中 + Err(_) => continue, // 如果插入失败,继续处理下一个 + } + } + + // 如果没有成功添加的成员,返回错误 + if added_members.is_empty() { + return Err(Status::UnprocessableEntity); + } + + // 返回成功添加的成员记录 + Ok(Json(ApiResponse { + code: 200, + status: "success".to_string(), + message: format!("Successfully added {} servers to group.", added_members.len()), + data: Some(added_members), + })) +} + +// 查看服务器组中的所有服务器 +#[get("/server_group_members/")] +pub async fn get_servers_in_group( + db: &State, + description: String +) -> Result>>, Status> { + // 查询服务器组 + let group = query_as::<_, ServerGroup>("SELECT * FROM server_groups WHERE description = $1") + .bind(&description) + .fetch_one(&**db) + .await + .map_err(|_| Status::NotFound)?; + + // 查询该组下的所有服务器 + let result = query_as::<_, Server>( + "SELECT s.* FROM servers s + JOIN server_group_members m ON s.id = m.server_id + WHERE m.group_id = $1" + ) + .bind(group.id) + .fetch_all(&**db) + .await; + + match result { + Ok(servers) => Ok(Json(ApiResponse { + code: 200, + status: "success".to_string(), + message: "Servers retrieved successfully".to_string(), + data: Some(servers), + })), + Err(_) => Err(Status::InternalServerError), + } +} + +// 批量从服务器组中移除服务器 +#[delete("/remove_servers_from_group", data = "")] +pub async fn remove_servers_from_group( + db: &State, + request: Json +) -> Result>, Status> { + // 查找目标服务器组 + let group = query_as::<_, ServerGroup>("SELECT * FROM server_groups WHERE description = $1") + .bind(&request.group_description) + .fetch_one(&**db) + .await + .map_err(|_| Status::NotFound)?; + + let mut removed_count = 0; // 记录成功移除的服务器数量 + + // 逐个查找服务器并移除其与服务器组的关系 + for ip in &request.ip_addresses { + // 查找服务器 + let server = query_as::<_, Server>("SELECT * FROM servers WHERE ip_address = $1") + .bind(ip) + .fetch_one(&**db) + .await + .map_err(|_| Status::NotFound)?; + + // 删除关联关系 + match query( + "DELETE FROM server_group_members WHERE server_id = $1 AND group_id = $2" + ) + .bind(server.id) + .bind(group.id) + .execute(&**db) + .await { + Ok(_) => removed_count += 1, // 成功时增加计数 + Err(_) => continue, // 如果删除失败,继续处理下一个 + } + } + + // 如果没有成功移除的服务器,返回错误 + if removed_count == 0 { + return Err(Status::UnprocessableEntity); + } + + // 返回成功移除的结果 + Ok(Json(ApiResponse { + code: 200, + status: "success".to_string(), + message: format!("Successfully removed {} servers from group.", removed_count), + data: None, + })) +} \ No newline at end of file diff --git a/eBPF_Supermarket/Auto_Cluster_Deployer/restful_api/src/server_groups.rs b/eBPF_Supermarket/Auto_Cluster_Deployer/restful_api/src/server_groups.rs new file mode 100644 index 000000000..eb143da34 --- /dev/null +++ b/eBPF_Supermarket/Auto_Cluster_Deployer/restful_api/src/server_groups.rs @@ -0,0 +1,64 @@ + +use rocket::{get, post, delete, serde::json::Json, State}; +use crate::model::{ServerGroup, NewServerGroup, ApiResponse}; +use sqlx::{PgPool, query_as, query}; +use rocket::http::Status; + +// 增加服务器组 +#[post("/server_groups", data = "")] +pub async fn add_server_group(db: &State, new_group: Json) -> Result>, Status> { + let result = query_as::<_, ServerGroup>( + "INSERT INTO server_groups (description) VALUES ($1) RETURNING *" + ) + .bind(&new_group.description) + .fetch_one(&**db) + .await; + + match result { + Ok(group) => Ok(Json(ApiResponse { + code: 200, + status: "success".to_string(), + message: "Server group added successfully".to_string(), + data: Some(group), + })), + Err(_) => Err(Status::InternalServerError), + } +} + +// 查询所有服务器组 +#[get("/server_groups")] +pub async fn get_server_groups(db: &State) -> Result>>, Status> { + let result = query_as::<_, ServerGroup>("SELECT * FROM server_groups") + .fetch_all(&**db) + .await; + + match result { + Ok(groups) => Ok(Json(ApiResponse { + code: 200, + status: "success".to_string(), + message: "Server groups retrieved successfully".to_string(), + data: Some(groups), + })), + Err(_) => Err(Status::InternalServerError), + } +} + + +// 删除服务器组 +#[delete("/server_groups/")] +pub async fn delete_server_group(db: &State, id: i32) -> Result>, Status> { + let result = query("DELETE FROM server_groups WHERE id = $1") + .bind(id) + .execute(&**db) + .await; + + match result { + Ok(_) => Ok(Json(ApiResponse { + code: 200, + status: "success".to_string(), + message: "Server group deleted successfully".to_string(), + data: None, + })), + Err(_) => Err(Status::InternalServerError), + } +} diff --git a/eBPF_Supermarket/Auto_Cluster_Deployer/restful_api/src/servers.rs b/eBPF_Supermarket/Auto_Cluster_Deployer/restful_api/src/servers.rs new file mode 100644 index 000000000..7012ba78c --- /dev/null +++ b/eBPF_Supermarket/Auto_Cluster_Deployer/restful_api/src/servers.rs @@ -0,0 +1,63 @@ +use rocket::{get, post,delete, serde::json::Json, State}; +use crate::model::{Server, NewServer, ApiResponse}; +use sqlx::{PgPool, query_as, query}; +use rocket::http::Status; + +// 增加服务器 +#[post("/servers", data = "")] +pub async fn add_server(db: &State, new_server: Json) -> Result>, Status> { + let result = query_as::<_, Server>( + "INSERT INTO servers (ip_address) VALUES ($1) RETURNING *" + ) + .bind(&new_server.ip_address) + .fetch_one(&**db) + .await; + + match result { + Ok(server) => Ok(Json(ApiResponse { + code: 200, + status: "success".to_string(), + message: "Server added successfully".to_string(), + data: Some(server), + })), + Err(_) => Err(Status::InternalServerError), + } +} + +// 查询所有服务器 +#[get("/servers")] +pub async fn get_servers(db: &State) -> Result>>, Status> { + let result = query_as::<_, Server>("SELECT * FROM servers") + .fetch_all(&**db) + .await; + + match result { + Ok(servers) => Ok(Json(ApiResponse { + code: 200, + status: "success".to_string(), + message: "Servers retrieved successfully".to_string(), + data: Some(servers), + })), + Err(_) => Err(Status::InternalServerError),
    }
}

diff --git a/eBPF_Supermarket/CPU_Subsystem/cpu_watcher/controller.c b/eBPF_Supermarket/CPU_Subsystem/cpu_watcher/controller.c
index 2c319dfe0..deaaad387 100644
--- a/eBPF_Supermarket/CPU_Subsystem/cpu_watcher/controller.c
+++ b/eBPF_Supermarket/CPU_Subsystem/cpu_watcher/controller.c
@@ -13,8 +13,7 @@
 // limitations under the License.
 //
 // author: albert_xuu@163.com zhangxy1016304@163.com zhangziheng0525@163.com
-//
-// used to control the execution of proc_image tool
+
 #include <stdio.h>
 #include <stdlib.h>
 #include