Skip to content

Commit

Permalink
Merge pull request intel#13 from xiangquanliu/pr/interface_kill
Browse files Browse the repository at this point in the history
acond: Add signal/kill interface
  • Loading branch information
binxing authored Sep 14, 2023
2 parents 0aa0a4a + 75c35c4 commit c19b4d2
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 70 deletions.
18 changes: 14 additions & 4 deletions acond/proto/acon.proto
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,6 @@ message RestartRequest {
uint64 timeout = 2;
}

message RestartResponse {
uint32 container_id = 1;
}

message ExecRequest {
uint32 container_id = 1;
string command = 2;
Expand All @@ -58,6 +54,11 @@ message ExecResponse {
bytes stderr = 2;
}

message KillRequest {
uint32 container_id = 1;
int32 signal_num = 2;
}

message InspectRequest {
uint32 container_id = 1;
}
Expand Down Expand Up @@ -170,6 +171,15 @@ service AconService {
// On failure, returns the specified error.
rpc Exec(ExecRequest) returns (ExecResponse);

// Sends signal which must exist in the corresponding manifest to an existing container.
//
// container_id specifies the Container to which the specified sinal will be sent. The signal must
// exist in the corresponding manifest. Otherwise, it fails with errors.
//
// On success, returns OK.
// On failure, returns the specified error.
rpc Kill(KillRequest) returns (google.protobuf.Empty);

// Retrieves information of the specified Containers.
//
// container_id specifies the Container whose status to be retrieved. If container_id is 0 (zero),
Expand Down
14 changes: 10 additions & 4 deletions acond/src/container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ impl Container {

#[cfg(not(feature = "interactive"))]
if _timeout == 0 {
Err(anyhow!(utils::ERR_RPC_NOT_SUPPORT_IA_MODE))
return Err(anyhow!(utils::ERR_RPC_INVALID_TIMEOUT));
} else {
let (crdstdin, pwrstdin) = unistd::pipe()?;
fcntl::fcntl(pwrstdin, FcntlArg::F_SETFD(FdFlag::FD_CLOEXEC))?;
Expand Down Expand Up @@ -390,7 +390,11 @@ fn create_child(fork_args: &ForkArgs) -> Result<Pid> {
unistd::write(cwrfd, &i32::from(pid).to_be_bytes())?;
process::exit(0);
}
Err(_) => Err(anyhow!(utils::ERR_RPC_FAIL_FORK)),
Err(errno) => {
return Err(anyhow!(
utils::ERR_RPC_SYSTEM_ERROR.replace("{}", format!("{}", errno).as_str())
));
}
}
}

Expand Down Expand Up @@ -463,8 +467,10 @@ fn run_child(fork_args: &ForkArgs, slave: Option<i32>, cwrfd: i32, crdfd: i32) -
return Ok(child);
}
Ok(ForkResult::Child) => (),
Err(_) => {
return Err(anyhow!(utils::ERR_RPC_FAIL_FORK));
Err(errno) => {
return Err(anyhow!(
utils::ERR_RPC_SYSTEM_ERROR.replace("{}", format!("{}", errno).as_str())
));
}
}

Expand Down
6 changes: 3 additions & 3 deletions acond/src/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ async fn handle_request(stream: UnixStream, tx: mpsc::Sender<Request>) -> Result
match stream.try_read(&mut msg_hdr_bytes) {
Ok(n) => {
if n != msg_hdr_bytes.len() {
resp_bytes = Some(utils::ERR_IPC_INVALID_REQ_FORMAT.as_bytes().to_vec());
resp_bytes = Some(utils::ERR_IPC_INVALID_REQUEST.as_bytes().to_vec());
} else {
msg_hdr = bincode::deserialize(&msg_hdr_bytes)?;
}
Expand All @@ -184,7 +184,7 @@ async fn handle_request(stream: UnixStream, tx: mpsc::Sender<Request>) -> Result
match stream.try_read(&mut data) {
Ok(n) => {
if n != data.len() {
resp_bytes = Some(utils::ERR_IPC_INVALID_REQ_FORMAT.as_bytes().to_vec());
resp_bytes = Some(utils::ERR_IPC_INVALID_REQUEST.as_bytes().to_vec());
} else {
msg_hdr_bytes.append(&mut data);
}
Expand Down Expand Up @@ -314,7 +314,7 @@ async fn dispatch_request(request: &Request, service: &AconService) -> Result<Ve
}
}

_ => Err(anyhow!(utils::ERR_IPC_NOT_SUPPORT_REQ)),
_ => Err(anyhow!(utils::ERR_IPC_NOT_SUPPORTED)),
}
}

Expand Down
124 changes: 89 additions & 35 deletions acond/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
use grpc::acon_service_server::{AconService, AconServiceServer};
use grpc::{
AddBlobRequest, AddManifestRequest, AddManifestResponse, ContainerInfo, ExecRequest,
ExecResponse, GetManifestRequest, GetManifestResponse, InspectRequest, InspectResponse, MrLog,
ReportRequest, ReportResponse, RestartRequest, StartRequest, StartResponse,
ExecResponse, GetManifestRequest, GetManifestResponse, InspectRequest, InspectResponse,
KillRequest, MrLog, ReportRequest, ReportResponse, RestartRequest, StartRequest, StartResponse,
};
use nix::errno::Errno;
use std::{collections::HashMap, sync::Arc};
use tokio::{
sync::RwLock,
Expand Down Expand Up @@ -46,14 +47,14 @@ impl AconService for TDAconService {
let mut pod = ref_pod.write().await;

if pod.finalized {
return Err(Status::unknown(utils::ERR_RPC_REJECT_MANIFEST));
return Err(Status::permission_denied(utils::ERR_RPC_MANIFEST_FINALIZED));
}

let verified = utils::verify_signature(manifest_bytes, signature_bytes, signer_bytes)
.map_err(|e| Status::unknown(e.to_string()))?;

if !verified {
return Err(Status::unknown(utils::ERR_RPC_INVALID_SIGNATURE));
return Err(Status::invalid_argument(utils::ERR_RPC_INVALID_SIGNATURE));
}

// verify contents of manifest.
Expand Down Expand Up @@ -93,7 +94,9 @@ impl AconService for TDAconService {
.is_manifest_accepted(&image)
.map_err(|e| Status::unknown(e.to_string()))?;
if !is_accepted {
return Err(Status::unknown(utils::ERR_RPC_REJECT_MANIFEST));
return Err(Status::permission_denied(
utils::ERR_RPC_INCOMPATIBLE_POLICY,
));
}

utils::create_alias_link(&image).map_err(|e| Status::unknown(e.to_string()))?;
Expand All @@ -120,7 +123,7 @@ impl AconService for TDAconService {
let mut pod = ref_pod.write().await;

if pod.finalized {
return Err(Status::unknown(utils::ERR_RPC_REJECT_MANIFEST));
return Err(Status::permission_denied(utils::ERR_RPC_MANIFEST_FINALIZED));
}

utils::measure_image(None).map_err(|e| Status::unknown(e.to_string()))?;
Expand All @@ -144,7 +147,7 @@ impl AconService for TDAconService {
let ref_pod = self.pod.clone();
let pod = ref_pod.read().await;
if !pod.is_blob_accepted(&layers) {
return Err(Status::unknown(utils::ERR_RPC_REJECT_BLOB));
return Err(Status::permission_denied(utils::ERR_RPC_REJECT_BLOB));
}

utils::save_blob(&layers, data).map_err(|e| Status::unknown(e.to_string()))?;
Expand All @@ -164,7 +167,7 @@ impl AconService for TDAconService {
let mut pod = ref_pod.write().await;
let image = pod
.get_image(&request.get_ref().image_id)
.ok_or_else(|| Status::unknown(utils::ERR_RPC_INVALID_IMAGE_ID))?;
.ok_or_else(|| Status::invalid_argument(utils::ERR_RPC_INVALID_IMAGE_ID))?;

let container = Container::start(image, &request.get_ref().envs)
.await
Expand Down Expand Up @@ -192,19 +195,21 @@ impl AconService for TDAconService {
let pod = ref_pod.read().await;
let container = pod
.get_container(&container_id)
.ok_or_else(|| Status::unknown(utils::ERR_RPC_INVALID_CONTAINER_ID))?;
.ok_or_else(|| Status::invalid_argument(utils::ERR_RPC_INVALID_CONTAINER_ID))?;
let image = pod
.get_image(&container.image_id)
.ok_or_else(|| Status::unknown(utils::ERR_RPC_INVALID_IMAGE_ID))?;
.ok_or_else(|| Status::invalid_argument(utils::ERR_RPC_INVALID_IMAGE_ID))?;

if image.manifest.no_restart {
return Err(Status::unknown(utils::ERR_RPC_CONTAINER_NOT_ALLOW_RESTART));
return Err(Status::permission_denied(
utils::ERR_RPC_CONTAINER_NOT_ALLOW_RESTART,
));
}

if container.is_running() {
if timeout == 0 {
return Err(Status::unknown(
utils::ERR_RPC_CONTAINER_KILL_TIMEOUT.replace("{}", "0"),
return Err(Status::deadline_exceeded(
utils::ERR_RPC_CONTAINER_RESTART_TIMEOUT,
));
}

Expand All @@ -213,19 +218,28 @@ impl AconService for TDAconService {
if s.abs() == libc::SIGTERM || s.abs() == libc::SIGKILL {
s
} else {
return Err(Status::unknown(utils::ERR_RPC_CONTAINER_NOT_ALLOW_KILL));
return Err(Status::permission_denied(
utils::ERR_RPC_CONTAINER_NOT_ALLOW_RESTART,
));
}
} else {
return Err(Status::unknown(utils::ERR_RPC_CONTAINER_NOT_ALLOW_KILL));
return Err(Status::permission_denied(
utils::ERR_RPC_CONTAINER_NOT_ALLOW_RESTART,
));
};

unsafe {
let pid = container.pid.into();
if sig > 0 {
libc::kill(pid, sig.abs());
} else {
libc::kill(-pid, sig.abs());
let mut pid: i32 = container.pid.into();
if sig < 0 {
pid = -pid.abs();
}

Errno::result(libc::kill(pid, sig.abs())).map_err(|errno| {
Status::unknown(
utils::ERR_RPC_SYSTEM_ERROR
.replace("{}", format!("{}", errno).as_str()),
)
})?;
}

Some(container.exit_notifier.as_ref().unwrap().clone())
Expand All @@ -238,8 +252,8 @@ impl AconService for TDAconService {
loop {
tokio::select! {
_ = time::sleep(Duration::from_secs(timeout)) => {
return Err(Status::unknown(
utils::ERR_RPC_CONTAINER_KILL_TIMEOUT.replace("{}", format!("{}", timeout).as_str()),
return Err(Status::deadline_exceeded(
utils::ERR_RPC_CONTAINER_RESTART_TIMEOUT,
));
}
_ = notifier.notified() => break,
Expand All @@ -252,17 +266,17 @@ impl AconService for TDAconService {
let pod = ref_pod.read().await;
let container = pod
.get_container(&container_id)
.ok_or_else(|| Status::unknown(utils::ERR_RPC_INVALID_CONTAINER_ID))?;
.ok_or_else(|| Status::invalid_argument(utils::ERR_RPC_INVALID_CONTAINER_ID))?;
pod.get_image(&container.image_id)
.ok_or_else(|| Status::unknown(utils::ERR_RPC_INVALID_IMAGE_ID))?
.ok_or_else(|| Status::invalid_argument(utils::ERR_RPC_INVALID_IMAGE_ID))?
.clone()
};

let ref_pod = self.pod.clone();
let mut pod = ref_pod.write().await;
let container = pod
.get_container_mut(&container_id)
.ok_or_else(|| Status::unknown(utils::ERR_RPC_INVALID_CONTAINER_ID))?;
.ok_or_else(|| Status::invalid_argument(utils::ERR_RPC_INVALID_CONTAINER_ID))?;
container
.restart(&image)
.await
Expand All @@ -289,25 +303,21 @@ impl AconService for TDAconService {
}

if stdin.len() > capture_size {
return Err(Status::invalid_argument(
utils::ERR_RPC_BUFFER_EXCEED.replace("{}", format!("{}", capture_size).as_str()),
));
return Err(Status::invalid_argument(utils::ERR_RPC_BUFFER_EXCEED));
}

if !utils::start_with_uppercase(command) {
return Err(Status::invalid_argument(
utils::ERR_RPC_INVALID_COMMAND.replace("{}", command),
));
return Err(Status::invalid_argument(utils::ERR_RPC_PRIVATE_ENTRYPOINT));
}

let ref_pod = self.pod.clone();
let pod = ref_pod.read().await;
let container = pod
.get_container(&container_id)
.ok_or_else(|| Status::unknown(utils::ERR_RPC_INVALID_CONTAINER_ID))?;
.ok_or_else(|| Status::invalid_argument(utils::ERR_RPC_INVALID_CONTAINER_ID))?;

if !container.is_running() {
return Err(Status::unknown(utils::ERR_RPC_CONTAINER_EXITED));
return Err(Status::unknown(utils::ERR_RPC_CONTAINER_TERMINATED));
}

let (stdout, stderr) = container
Expand All @@ -322,6 +332,50 @@ impl AconService for TDAconService {
Ok(Response::new(ExecResponse { stdout, stderr }))
}

async fn kill(&self, request: Request<KillRequest>) -> Result<Response<()>, Status> {
let container_id = request.get_ref().container_id;
let signal_num = request.get_ref().signal_num;

let ref_pod = self.pod.clone();
let pod = ref_pod.read().await;
let container = pod
.get_container(&container_id)
.ok_or_else(|| Status::invalid_argument(utils::ERR_RPC_INVALID_CONTAINER_ID))?;

if !container.is_running() {
return Err(Status::unknown(utils::ERR_RPC_CONTAINER_TERMINATED));
}

let image = pod
.get_image(&container.image_id)
.ok_or_else(|| Status::invalid_argument(utils::ERR_RPC_INVALID_IMAGE_ID))?;

if let None = image.manifest.signals.iter().find(|&&s| s == signal_num) {
return Err(Status::permission_denied(
utils::ERR_RPC_CONTAINER_NOT_ALLOW_KILL,
));
}

unsafe {
let mut pid: i32 = container.pid.into();
if signal_num < 0 {
pid = -pid.abs();
}

Errno::result(libc::kill(pid, signal_num.abs())).map_err(|errno| {
Status::unknown(
utils::ERR_RPC_SYSTEM_ERROR.replace("{}", format!("{}", errno).as_str()),
)
})?;
}

if let Some(tx) = &pod.timeout_tx {
let _ = tx.send(false).await;
}

Ok(Response::new(()))
}

async fn inspect(
&self,
request: Request<InspectRequest>,
Expand Down Expand Up @@ -355,7 +409,7 @@ impl AconService for TDAconService {
} else {
let container = pod
.get_container_mut(&container_id)
.ok_or_else(|| Status::unknown(utils::ERR_RPC_INVALID_CONTAINER_ID))?;
.ok_or_else(|| Status::invalid_argument(utils::ERR_RPC_INVALID_CONTAINER_ID))?;

container
.update_status()
Expand Down Expand Up @@ -439,7 +493,7 @@ impl AconService for TDAconService {
let pod = ref_pod.read().await;
let image = pod
.get_image(image_id)
.ok_or_else(|| Status::unknown(utils::ERR_RPC_INVALID_IMAGE_ID))?;
.ok_or_else(|| Status::invalid_argument(utils::ERR_RPC_INVALID_IMAGE_ID))?;

let manifest = utils::get_manifest(image_id).map_err(|e| Status::unknown(e.to_string()))?;
let certificate = image.signer_bytes.clone();
Expand Down
Loading

0 comments on commit c19b4d2

Please sign in to comment.