Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DO NOT MERGE] Kubernetes integration (unrebased) #1435

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
826 changes: 813 additions & 13 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 0 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,3 @@ lto = "fat"
inherits = "release"
debug = 2
strip = "none"

[profile.dev.package.website_playground]
debug-assertions = false

[profile.release.package.website_playground]
opt-level = "s"
3 changes: 3 additions & 0 deletions hydro_deploy/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ futures = "0.3.26"
futures-core = "0.3.26"
hydroflow_cli_integration = { path = "../hydroflow_cli_integration", version = "^0.5.1" }
indicatif = "0.17.6"
kube = { version = "0.90.0", features = ["derive", "runtime", "ws"] }
k8s-openapi = { version = "0.21.1", features = ["latest"] }
nanoid = "0.4.0"
nix = "0.26.2"
once_cell = "1.17"
Expand All @@ -32,3 +34,4 @@ shell-escape = "0.1.5"
tempfile = "3.3.0"
tokio = { version = "1.16", features = [ "full" ] }
tokio-util = { version = "0.7.7", features=[ "compat" ] }
tar = "0.4.40"
2 changes: 1 addition & 1 deletion hydro_deploy/core/src/azure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl AzureHost {
#[async_trait]
impl Host for AzureHost {
fn target_type(&self) -> HostTargetType {
HostTargetType::Linux
HostTargetType::Linux(crate::LinuxArchitecture::AARCH64)
}

fn request_port(&mut self, bind_type: &ServerStrategy) {
Expand Down
7 changes: 6 additions & 1 deletion hydro_deploy/core/src/deployment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use tokio::sync::RwLock;
use super::gcp::GCPNetwork;
use super::{
progress, CustomService, GCPComputeEngineHost, Host, LocalhostHost, ResourcePool,
ResourceResult, Service,
ResourceResult, Service, PodHost
};
use crate::ServiceBuilder;

Expand All @@ -31,6 +31,11 @@ impl Deployment {
self.add_host(LocalhostHost::new)
}

#[allow(non_snake_case)]
pub fn PodHost(&mut self) -> Arc<RwLock<PodHost>> {
self.add_host(PodHost::new)
}

#[allow(non_snake_case)]
pub fn GCPComputeEngineHost(
&mut self,
Expand Down
2 changes: 1 addition & 1 deletion hydro_deploy/core/src/gcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ impl GCPComputeEngineHost {
#[async_trait]
impl Host for GCPComputeEngineHost {
fn target_type(&self) -> HostTargetType {
HostTargetType::Linux
HostTargetType::Linux(crate::LinuxArchitecture::AARCH64)
}

fn request_port(&mut self, bind_type: &ServerStrategy) {
Expand Down
9 changes: 6 additions & 3 deletions hydro_deploy/core/src/hydroflow_crate/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use once_cell::sync::Lazy;
use tokio::sync::OnceCell;

use crate::progress::ProgressTracker;
use crate::HostTargetType;
use crate::{HostTargetType, LinuxArchitecture};

#[derive(PartialEq, Eq, Hash)]
struct CacheKey {
Expand Down Expand Up @@ -66,7 +66,7 @@ pub async fn build_crate(
tokio::task::spawn_blocking(move || {
let mut command = Command::new("cargo");
command.args([
"build".to_string(),
"zigbuild".to_string(),
"--profile".to_string(),
profile.unwrap_or("release".to_string()),
]);
Expand All @@ -81,9 +81,12 @@ pub async fn build_crate(

match target_type {
HostTargetType::Local => {}
HostTargetType::Linux => {
HostTargetType::Linux(LinuxArchitecture::X86_64) => {
command.args(["--target", "x86_64-unknown-linux-musl"]);
}
HostTargetType::Linux(LinuxArchitecture::AARCH64) => {
command.args(["--target", "aarch64-unknown-linux-musl"]);
}
}

if let Some(features) = features {
Expand Down
5 changes: 5 additions & 0 deletions hydro_deploy/core/src/hydroflow_crate/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,9 @@ impl Service for HydroflowCrateService {
let mut host_write = self.on.write().await;
let launched = host_write.provision(resource_result).await;

ProgressTracker::println("About to copy binary!");
launched.copy_binary(built.clone()).await?;
ProgressTracker::println("Done copying binary");

self.launched_host = Some(launched);
Ok(())
Expand Down Expand Up @@ -291,6 +293,8 @@ impl Service for HydroflowCrateService {
// request stdout before sending config so we don't miss the "ready" response
let stdout_receiver = binary.write().await.cli_stdout().await;

ProgressTracker::println(format!("Service ready: {formatted_bind_config}\n").as_str());

binary
.write()
.await
Expand All @@ -308,6 +312,7 @@ impl Service for HydroflowCrateService {
*self.server_defns.try_write().unwrap() =
serde_json::from_str(ready_line.trim_start_matches("ready: ")).unwrap();
} else {
ProgressTracker::println(format!("Did not find ready. Instead found: {:?}", ready_line).as_str());
bail!("expected ready");
}

Expand Down
Binary file not shown.
102 changes: 102 additions & 0 deletions hydro_deploy/core/src/kubernetes/launched_binary.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
#[cfg(unix)]
use std::sync::Arc;

use async_channel::{Receiver, Sender};
use async_trait::async_trait;
use futures::{StreamExt, TryStreamExt};
use kube::api::AttachedProcess;
use tokio::sync::RwLock;

use tokio::io::AsyncWriteExt;

use crate::util::prioritized_broadcast;
use crate::LaunchedBinary;

pub struct LaunchedPodBinary {
stdin_sender: Sender<String>,
stdout_cli_receivers: Arc<RwLock<Option<tokio::sync::oneshot::Sender<String>>>>,
stdout_receivers: Arc<RwLock<Vec<Sender<String>>>>,
stderr_receivers: Arc<RwLock<Vec<Sender<String>>>>,
}

impl LaunchedPodBinary {
pub fn new(launched_pod_binary: &mut AttachedProcess, id: String) -> Self {
// Create streams for stdout and stdin for the running binary in the pod

let launch_binary_out = tokio_util::io::ReaderStream::new(launched_pod_binary.stdout().unwrap());
let launch_binary_err = tokio_util::io::ReaderStream::new(launched_pod_binary.stderr().unwrap());
let mut stdin = launched_pod_binary.stdin().unwrap();


let (stdin_sender, mut stdin_receiver) = async_channel::unbounded::<String>();
tokio::spawn(async move {
while let Some(line) = stdin_receiver.next().await {
if stdin.write_all(line.as_bytes()).await.is_err() {
break;
}

stdin.flush().await.ok();
}
});

let id_clone = id.clone();
let (stdout_cli_receivers, stdout_receivers) = prioritized_broadcast(
launch_binary_out.map_ok(|bytes| String::from_utf8_lossy(&bytes).to_string()),
move |s| println!("[{id_clone}] {s}"),
);
let (_, stderr_receivers) = prioritized_broadcast(
launch_binary_err.map_ok(|bytes| String::from_utf8_lossy(&bytes).to_string()),
move |s| eprintln!("[{id}] {s}"),
);

Self {
stdin_sender,
stdout_cli_receivers,
stdout_receivers,
stderr_receivers,
}
}
}

#[async_trait]
impl LaunchedBinary for LaunchedPodBinary {
async fn stdin(&self) -> Sender<String> {
self.stdin_sender.clone()
}

async fn cli_stdout(&self) -> tokio::sync::oneshot::Receiver<String> {
let mut receivers = self.stdout_cli_receivers.write().await;

if receivers.is_some() {
panic!("Only one CLI stdout receiver is allowed at a time");
}

let (sender, receiver) = tokio::sync::oneshot::channel::<String>();
*receivers = Some(sender);
receiver
}

async fn stdout(&self) -> Receiver<String> {
let mut receivers = self.stdout_receivers.write().await;
let (sender, receiver) = async_channel::unbounded::<String>();
receivers.push(sender);
receiver
}

async fn stderr(&self) -> Receiver<String> {
let mut receivers = self.stderr_receivers.write().await;
let (sender, receiver) = async_channel::unbounded::<String>();
receivers.push(sender);
receiver
}

// returns exit code when the hydroflow program finishes
async fn exit_code(&self) -> Option<i32> {
None
}

// waits for the hydroflow program to finish
async fn wait(&mut self) -> Option<i32> {
None
}
}
Loading
Loading