Skip to content

Commit

Permalink
Spawn the worker process through escalated daemon!
Browse files Browse the repository at this point in the history
  • Loading branch information
ifd3f committed May 5, 2024
1 parent b3eed24 commit 903c3cc
Show file tree
Hide file tree
Showing 7 changed files with 180 additions and 31 deletions.
15 changes: 15 additions & 0 deletions src/escalated_daemon/ipc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
use serde::{Deserialize, Serialize};
use valuable::Valuable;

use crate::writer_process::ipc::WriterProcessConfig;

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Valuable)]
pub struct EscalatedDaemonInitConfig {
// Okay, there's nothing here right now, but there might be someday!
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Valuable)]
pub struct SpawnWriter {
pub log_file: String,
pub init_config: WriterProcessConfig,
}
73 changes: 72 additions & 1 deletion src/escalated_daemon/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,75 @@
//! This sounds satanic, but it's really just a background process running as root,
//! that spawns other processes running as root. The point is so that we can spawn
//! multiple writers running as root while only executing sudo once.
//!
//! The reason we can't just `sudo caligula` writer processes over and over again
//! is because most desktop sudo installations (rightfully) drop your sudo cookie
//! after a while and it would suck to have to repeatedly enter in your password
//! when you only really need to do it once.
//!
//! Given that this is running in root, we would like to restrict its interface as
//! much as possible. In the future, it may even be worthwhile to harden the IPC
//! even further.
//!
//! IT IS NOT TO BE USED DIRECTLY BY THE USER! ITS API HAS NO STABILITY GUARANTEES!

use anyhow::Context;
use interprocess::local_socket::{tokio::prelude::*, GenericFilePath};
use tokio::io::{AsyncBufRead, BufReader};
use tracing::{error, info, info_span, Instrument};
use tracing_unwrap::ResultExt;
use valuable::Valuable;

use crate::{
childproc_common::child_init,
escalated_daemon::ipc::{EscalatedDaemonInitConfig, SpawnWriter},
ipc_common::read_msg_async,
run_mode::make_writer_spawn_command,
};

pub mod ipc;

#[tokio::main(flavor = "current_thread")]
pub async fn main() {
todo!()
let (sock, _) = child_init::<EscalatedDaemonInitConfig>();

info!("Opening socket {sock}");
let stream = LocalSocketStream::connect(
sock.as_str()
.to_fs_name::<GenericFilePath>()
.unwrap_or_log(),
)
.await
.unwrap_or_log();

event_loop(&sock, BufReader::new(stream))
.await
.unwrap_or_log();
}

#[tracing::instrument(skip_all)]
async fn event_loop(socket: &str, mut stream: impl AsyncBufRead + Unpin) -> anyhow::Result<()> {
loop {
let msg = read_msg_async::<SpawnWriter>(&mut stream).await?;
info!(msg = msg.as_value(), "Received SpawnWriter request");

let command =
make_writer_spawn_command(socket.into(), msg.log_file.into(), &msg.init_config);
let mut cmd = tokio::process::Command::from(command);
cmd.kill_on_drop(true);
let mut child = cmd.spawn().context("Failed to spawn writer process")?;
info!(?child, "Spawned writer process");

// Wait on child processes to reap them when they're done.
let pid = child.id();
tokio::spawn(
async move {
match child.wait().await {
Ok(r) => info!("Exited with exit code {r}"),
Err(e) => error!("Failed to wait on child: {e}"),
}
}
.instrument(info_span!("childwait", child_pid = pid)),
);
}
}
13 changes: 12 additions & 1 deletion src/ipc_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use anyhow::Context;
use bincode::Options;
use byteorder::{BigEndian, WriteBytesExt};
use serde::{de::DeserializeOwned, Serialize};
use tokio::io::{AsyncRead, AsyncReadExt};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};

/// Common bincode options to use for inter-process communication.
#[inline]
Expand All @@ -23,6 +23,17 @@ pub fn write_msg<T: Serialize>(mut w: impl Write, msg: &T) -> anyhow::Result<()>
Ok(())
}

pub async fn write_msg_async<T: Serialize>(
mut w: impl AsyncWrite + Unpin,
msg: &T,
) -> anyhow::Result<()> {
let buf = bincode_options().serialize(msg)?;
w.write_u32(buf.len() as u32).await?;
w.write_all(&buf).await?;
w.flush().await?;
Ok(())
}

pub async fn read_msg_async<T: DeserializeOwned>(
mut r: impl AsyncRead + Unpin,
) -> anyhow::Result<T> {
Expand Down
4 changes: 4 additions & 0 deletions src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ impl LogPaths {
self.log_dir.join("main.log")
}

pub fn escalated_daemon(&self) -> PathBuf {
self.log_dir.join("escalated.log")
}

pub fn writer(&self, id: u64) -> PathBuf {
self.log_dir.join(format!("writer-{id}.log"))
}
Expand Down
13 changes: 12 additions & 1 deletion src/run_mode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ use process_path::get_executable_path;
use serde::Serialize;
use valuable::Valuable;

use crate::{escalation::Command, writer_process::ipc::WriterProcessConfig};
use crate::{
escalated_daemon::ipc::EscalatedDaemonInitConfig, escalation::Command,
writer_process::ipc::WriterProcessConfig,
};

pub const RUN_MODE_ENV_NAME: &str = "__CALIGULA_RUN_MODE";

Expand Down Expand Up @@ -91,3 +94,11 @@ pub fn make_writer_spawn_command<'a>(
) -> Command<'a> {
make_spawn_command(socket, log_path, RunMode::Writer, init_config)
}

pub fn make_escalated_daemon_spawn_command<'a>(
socket: Cow<'a, str>,
log_path: Cow<'a, str>,
init_config: &EscalatedDaemonInitConfig,
) -> Command<'a> {
make_spawn_command(socket, log_path, RunMode::EscalatedDaemon, init_config)
}
10 changes: 5 additions & 5 deletions src/ui/herder/handle.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use crate::{ipc_common::read_msg_async, writer_process::ipc::InitialInfo};
use std::pin::Pin;

use interprocess::local_socket::tokio::{prelude::*, RecvHalf};
use interprocess::local_socket::tokio::{prelude::*, RecvHalf, SendHalf};
use serde::de::DeserializeOwned;
use tokio::{
io::{AsyncWrite, BufReader},
io::{BufReader, BufWriter},
process::Child,
};

Expand All @@ -18,15 +18,15 @@ pub struct ChildHandle {
/// process. So, we own a handle to it.
pub(super) child: Option<Child>,
pub(super) rx: Pin<Box<BufReader<RecvHalf>>>,
pub(super) _tx: Pin<Box<dyn AsyncWrite>>,
pub(super) tx: Pin<Box<BufWriter<SendHalf>>>,
}

impl ChildHandle {
pub fn new(child: Option<Child>, stream: LocalSocketStream) -> ChildHandle {
let (rx, tx) = stream.split();
let rx = Box::pin(BufReader::new(rx));
let tx = Box::pin(tx);
Self { child, rx, _tx: tx }
let tx = Box::pin(BufWriter::new(tx));
Self { child, rx, tx }
}

pub async fn next_message<T: DeserializeOwned>(&mut self) -> anyhow::Result<T> {
Expand Down
83 changes: 60 additions & 23 deletions src/ui/herder/herder.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use crate::escalated_daemon::ipc::{EscalatedDaemonInitConfig, SpawnWriter};
use crate::ipc_common::write_msg_async;
use crate::run_mode::make_escalated_daemon_spawn_command;
use crate::ui::herder::handle::ChildHandle;
use crate::{
ipc_common::read_msg_async, logging::get_log_paths, run_mode::make_writer_spawn_command,
ui::herder::socket::HerderSocket, writer_process::ipc::ErrorType,
};
use anyhow::Context;
use interprocess::local_socket::tokio::prelude::*;
use process_path::get_executable_path;
use tracing::{debug, trace};
use valuable::Valuable;

Expand All @@ -17,53 +19,88 @@ use super::handle::WriterHandle;
/// Handles the herding of all child processes. This includes lifecycle management
/// and communication.
///
/// Why "Herder"? It's a horse, and horses are herded. I think. I'm not a farmer.
/// Why "Herder"? Caligula liked his horse, and horses are herded. I think. I'm not
/// a farmer.
pub struct Herder {
socket: HerderSocket,
escalated_daemon: Option<ChildHandle>,
}

impl Herder {
pub fn new(socket: HerderSocket) -> Self {
Self { socket }
Self {
socket,
escalated_daemon: None,
}
}

#[tracing::instrument(skip_all)]
async fn ensure_escalated_daemon(&mut self) -> anyhow::Result<&mut ChildHandle> {
// Can't use if let here because of polonius! so we gotta do this ugly-ass workaround
if self.escalated_daemon.is_none() {
let log_path = get_log_paths().escalated_daemon();
let cmd = make_escalated_daemon_spawn_command(
self.socket.socket_name().to_string_lossy(),
log_path.to_string_lossy(),
&EscalatedDaemonInitConfig {},
);

debug!("Starting child process with command: {:?}", cmd);
fn modify_cmd(cmd: &mut tokio::process::Command) {
cmd.kill_on_drop(true);
}
let child = run_escalate(&cmd, modify_cmd)
.await
.context("Failed to spawn escalated daemon process")?;

debug!(?child, "Process spawned, waiting for pipe to be opened...");
let stream: LocalSocketStream = self.socket.accept().await?;
let handle = ChildHandle::new(Some(child), stream);

self.escalated_daemon = Some(handle);
}

Ok(self.escalated_daemon.as_mut().unwrap())
}

#[tracing::instrument(skip_all, fields(escalate))]
pub async fn start_writer(
&mut self,
args: &WriterProcessConfig,
escalate: bool,
) -> anyhow::Result<WriterHandle> {
// Get path to this process
let proc = get_executable_path().unwrap();

debug!(
proc = proc.to_string_lossy().to_string(),
"Read absolute path to this program"
);

let log_path = get_log_paths().writer(0);
let cmd = make_writer_spawn_command(
self.socket.socket_name().to_string_lossy(),
log_path.to_string_lossy(),
args,
);

debug!("Starting child process with command: {:?}", cmd);
fn modify_cmd(cmd: &mut tokio::process::Command) {
cmd.kill_on_drop(true);
}

let child = if escalate {
run_escalate(&cmd, modify_cmd)
.await
.context("Failed to spawn child process")?
let daemon = self.ensure_escalated_daemon().await?;
write_msg_async(
&mut daemon.tx,
&SpawnWriter {
log_file: log_path.to_string_lossy().to_string(),
init_config: args.clone(),
},
)
.await?;
None
} else {
let cmd = make_writer_spawn_command(
self.socket.socket_name().to_string_lossy(),
log_path.to_string_lossy(),
args,
);
debug!("Directly spawning child process with command: {:?}", cmd);

let mut cmd = tokio::process::Command::from(cmd);
modify_cmd(&mut cmd);
cmd.spawn().context("Failed to spawn child process")?
Some(cmd.spawn().context("Failed to spawn child process")?)
};

debug!("Waiting for pipe to be opened...");
let stream: LocalSocketStream = self.socket.accept().await?;
let mut handle = ChildHandle::new(Some(child), stream);
let mut handle = ChildHandle::new(child, stream);

trace!("Reading results from child");
let first_msg = read_msg_async::<StatusMessage>(&mut handle.rx).await?;
Expand Down

0 comments on commit 903c3cc

Please sign in to comment.