Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Zygote process #775

Merged
merged 4 commits into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 47 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions crates/containerd-shim-wasm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ futures = { version = "0.3.30" }
wasmparser = { version = "0.220.0" }
tokio-stream = { version = "0.1" }
sha256 = { workspace = true }
serde_bytes = "0.11"

# tracing
# note: it's important to keep the version of tracing in sync with tracing-subscriber
Expand Down Expand Up @@ -63,6 +64,7 @@ tracing-opentelemetry = { version = "0.24", optional = true }


[target.'cfg(unix)'.dependencies]
zygote = { version = "0.1.2" }
caps = "0.5"
# this must match the version pulled by libcontainer
dbus = { version = "0", features = ["vendored"] }
Expand Down Expand Up @@ -108,3 +110,7 @@ opentelemetry = [
"dep:tracing-opentelemetry",
]
tracing = ["dep:tracing", "dep:tracing-subscriber"]

[package.metadata.cargo-machete]
# used as part of a derive macro
ignored = ["serde_bytes"]
3 changes: 3 additions & 0 deletions crates/containerd-shim-wasm/src/sandbox/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ pub fn shim_main<'a, I>(
I: 'static + Instance + Sync + Send,
I::Engine: Default,
{
#[cfg(unix)]
zygote::Zygote::init();

#[cfg(feature = "opentelemetry")]
if otel_traces_enabled() {
// opentelemetry uses tokio, so we need to initialize a runtime
Expand Down
3 changes: 2 additions & 1 deletion crates/containerd-shim-wasm/src/sandbox/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ use std::path::{Path, PathBuf};
use std::time::Duration;

use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};

use super::error::Error;

/// Generic options builder for creating a wasm instance.
/// This is passed to the `Instance::new` method.
#[derive(Clone)]
#[derive(Clone, Serialize, Deserialize)]
pub struct InstanceConfig {
/// Optional stdin named pipe path.
stdin: PathBuf,
Expand Down
4 changes: 3 additions & 1 deletion crates/containerd-shim-wasm/src/sandbox/oci.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ use std::process;

use anyhow::Context;
use oci_spec::image::Descriptor;
use serde::{Deserialize, Serialize};

use super::error::Result;

#[derive(Clone, Debug)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct WasmLayer {
pub config: Descriptor,
#[serde(with = "serde_bytes")]
pub layer: Vec<u8>,
}

Expand Down
65 changes: 41 additions & 24 deletions crates/containerd-shim-wasm/src/sys/unix/container/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use nix::errno::Errno;
use nix::sys::wait::{waitid, Id as WaitID, WaitPidFlag, WaitStatus};
use nix::unistd::Pid;
use oci_spec::image::Platform;
use zygote::{WireError, Zygote};

use crate::container::Engine;
use crate::sandbox::async_utils::AmbientRuntime as _;
Expand All @@ -25,7 +26,7 @@ use crate::sandbox::{
use crate::sys::container::executor::Executor;
use crate::sys::stdio::open;

static DEFAULT_CONTAINER_ROOT_DIR: &str = "/run/containerd";
const DEFAULT_CONTAINER_ROOT_DIR: &str = "/run/containerd";

pub struct Instance<E: Engine> {
exit_code: WaitableCell<(u32, DateTime<Utc>)>,
Expand All @@ -39,36 +40,52 @@ impl<E: Engine + Default> SandboxInstance for Instance<E> {

#[cfg_attr(feature = "tracing", tracing::instrument(parent = tracing::Span::current(), skip_all, level = "Info"))]
fn new(id: String, cfg: &InstanceConfig) -> Result<Self, SandboxError> {
let engine = Self::Engine::default();
let bundle = cfg.get_bundle().to_path_buf();
let namespace = cfg.get_namespace();
let rootdir = Path::new(DEFAULT_CONTAINER_ROOT_DIR).join(E::name());
let rootdir = determine_rootdir(&bundle, &namespace, rootdir)?;

// check if container is OCI image with wasm layers and attempt to read the module
let (modules, platform) = containerd::Client::connect(cfg.get_containerd_address().as_str(), &namespace).block_on()?
.load_modules(&id, &engine)
let (modules, platform) = containerd::Client::connect(cfg.get_containerd_address(), &cfg.get_namespace()).block_on()?
.load_modules(&id, &E::default())
.block_on()
.unwrap_or_else(|e| {
log::warn!("Error obtaining wasm layers for container {id}. Will attempt to use files inside container image. Error: {e}");
(vec![], Platform::default())
});

let mut builder = ContainerBuilder::new(id.clone(), SyscallType::Linux)
.with_executor(Executor::new(engine, modules, platform))
.with_root_path(rootdir.clone())?;

if let Ok(f) = open(cfg.get_stdin()) {
builder = builder.with_stdin(f);
}
if let Ok(f) = open(cfg.get_stdout()) {
builder = builder.with_stdout(f);
}
if let Ok(f) = open(cfg.get_stderr()) {
builder = builder.with_stderr(f);
}

let container = builder.as_init(&bundle).with_systemd(false).build()?;
let (root, state) = Zygote::global()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great, nice interface to use

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to see the memory usage of this zygote process vs. the cloned shim process running containers

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Memory usage as reported by VmPeak in /proc/xxx/status

Without zygote

Shim peak memory usage was: 3341772 kB

With zygote:

Shim peak memory usage was: 3342252 kB
Zygote peak memory usage was: 31056 kB

I am a bit surprised by the memory usage of the shim, I expected it to consume less memory.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Revisiting the numbers, for wasmtime the shim peaks at ~19000 kB of resident memory with or without the zygote.
The zygote itself peaks at 5500kB of resident memory.

.run(
|(id, cfg, modules, platform)| -> Result<_, WireError> {
let namespace = cfg.get_namespace();

let bundle = cfg.get_bundle().to_path_buf();
let rootdir = Path::new(DEFAULT_CONTAINER_ROOT_DIR).join(E::name());
let rootdir = determine_rootdir(&bundle, &namespace, rootdir)?;
let engine = E::default();

let mut builder = ContainerBuilder::new(id.clone(), SyscallType::Linux)
.with_executor(Executor::new(engine, modules, platform))
.with_root_path(rootdir.clone())?;

if let Ok(f) = open(cfg.get_stdin()) {
builder = builder.with_stdin(f);
}
if let Ok(f) = open(cfg.get_stdout()) {
builder = builder.with_stdout(f);
}
if let Ok(f) = open(cfg.get_stderr()) {
builder = builder.with_stderr(f);
}

let Container { root, state } = builder
.as_init(&bundle)
.as_sibling(true)
.with_systemd(false)
.build()?;

// Container is not serializable, but its parts are
Ok((root, state))
},
(id.clone(), cfg.clone(), modules, platform),
)
.map_err(|e| SandboxError::Others(e.to_string()))?;
let container = Container { root, state };

Ok(Self {
id,
Expand Down
49 changes: 20 additions & 29 deletions crates/containerd-shim-wasm/src/test/signals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,37 +18,23 @@
//! Once #755 is fixed we can remove the libc based implementation and
//! remove the ignore attribute from the test.

use std::fs::canonicalize;
use std::future::pending;
use std::io::{stderr, Write as _};
use std::sync::mpsc::channel;
use std::sync::{Arc, LazyLock};
use std::sync::Arc;
use std::thread::sleep;
use std::time::Duration;

use anyhow::{bail, Result};
use containerd_shim_wasm_test_modules::HELLO_WORLD;
use tokio::sync::Notify;

use crate::container::{Engine, Instance, RuntimeContext};
use crate::testing::WasiTest;

#[derive(Clone, Default)]
pub struct SomeEngine;

async fn ctrl_c(use_libc: bool) {
static CANCELLATION: LazyLock<Notify> = LazyLock::new(|| Notify::new());

fn on_ctr_c(_: libc::c_int) {
CANCELLATION.notify_waiters();
}

if use_libc {
unsafe { libc::signal(libc::SIGINT, on_ctr_c as _) };
CANCELLATION.notified().await;
} else {
let _ = tokio::signal::ctrl_c().await;
}
}

impl Engine for SomeEngine {
fn name() -> &'static str {
"some-engine"
Expand All @@ -61,16 +47,16 @@ impl Engine for SomeEngine {
.build()?
.block_on(async move {
use tokio::time::sleep;
let use_libc = std::env::var("USE_LIBC").unwrap_or_default();
let use_libc = !use_libc.is_empty() && use_libc != "0";
let signal = async {
println!("{name}> waiting for signal!");
ctrl_c(use_libc).await;
let _ = tokio::signal::ctrl_c().await;
println!("{name}> received signal, bye!");
};
let task = async {
sleep(Duration::from_millis(10)).await;
eprintln!("{name}> ready");
// use writeln to avoid output capturing from the
// testing framework
let _ = writeln!(stderr(), "{name}> ready");
pending().await
};
tokio::select! {
Expand All @@ -92,23 +78,30 @@ impl Drop for KillGuard {
}

#[test]
#[ignore = "this currently fails due to tokio's global state"]
fn test_handling_signals() -> Result<()> {
zygote::Zygote::global();

// use a thread scope to ensure we join all threads at the end
std::thread::scope(|s| -> Result<()> {
let mut containers = vec![];

for i in 0..20 {
let container = WasiTest::<SomeInstance>::builder()?
let builder = WasiTest::<SomeInstance>::builder()?
.with_name(format!("test-{i}"))
.with_start_fn(format!("test-{i}"))
.with_stdout("/proc/self/fd/1")?
.with_wasm(HELLO_WORLD)?
.build()?;
.with_wasm(HELLO_WORLD)?;

// In CI /proc/self/fd/1 doesn't seem to be available
let builder = match canonicalize("/proc/self/fd/1") {
Ok(stdout) => builder.with_stdout(stdout)?,
_ => builder,
};

let container = builder.build()?;
containers.push(Arc::new(container));
}

let guard: Vec<_> = containers.iter().cloned().map(KillGuard).collect();
let _guard: Vec<_> = containers.iter().cloned().map(KillGuard).collect();

for container in containers.iter() {
container.start()?;
Expand Down Expand Up @@ -148,8 +141,6 @@ fn test_handling_signals() -> Result<()> {
assert_eq!(id, i);
}

drop(guard);

Ok(())
})
}
2 changes: 2 additions & 0 deletions crates/containerd-shim-wasm/src/testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ where
WasiInstance::Engine: Default + Send + Sync + Clone,
{
pub fn new() -> Result<Self> {
zygote::Zygote::init();

// start logging
// to enable logging run `export RUST_LOG=trace` and append cargo command with
// --show-output before running test
Expand Down
Loading