Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow LAVA jobs to pass artifacts back to the runner #17

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ homepage = "https://gitlab.collabora.com/lava/lava-gitlab-runner"
repository = "https://gitlab.collabora.com/lava/lava-gitlab-runner.git"

[dependencies]
axum = "0.6"
bytes = "1.2.0"
chrono = { version = "0.4", features = ["serde"] }
colored = "2"
gitlab-runner = "0.0.8"
lava-api = "0.1.1"
lazy_static = "1.4"
local-ip-address = "0.5"
structopt = "0.3.23"
url = "2.2.2"
tokio = "1.12.0"
Expand All @@ -27,6 +29,7 @@ serde = { version = "^1.0.97", features = ["derive"] }
serde_json = "1.0.68"
serde_yaml = "0.9"
rand = "0.8.4"
rand_chacha = "0.3"
tempfile = "3.3.0"
tokio-util = { version = "0.7", features = [ "io" ] }
tracing-subscriber = { version = "0.3.9", features = [ "env-filter"] }
Expand Down
169 changes: 164 additions & 5 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
use std::borrow::Cow;
use std::collections::btree_map::Entry;
use std::collections::{BTreeMap, HashSet};
use std::env;
use std::io::Read;
use std::net::{IpAddr, Ipv4Addr};
use std::sync::{Arc, Mutex};
use std::time::Duration;

use axum::extract::Path;
use axum::routing::post;
use axum::Router;
use bytes::{Buf, Bytes};
use colored::{Color, Colorize};
use futures::stream::{Stream, TryStreamExt};
Expand All @@ -25,13 +30,16 @@ use structopt::StructOpt;
use tokio::time::sleep;
use tokio_util::sync::CancellationToken;
use tracing::Level;
use tracing::{debug, info};
use tracing::{debug, info, warn};
use tracing_subscriber::filter;
use tracing_subscriber::prelude::*;
use url::Url;

mod throttled;
mod upload;

use throttled::{ThrottledLava, Throttler};
use upload::{JobArtifacts, UploadServer};

const MASK_PATTERN: &str = "[MASKED]";

Expand Down Expand Up @@ -107,6 +115,7 @@ struct MonitorJobs {
#[derive(Clone, Debug, Serialize)]
struct TransformVariables<'a> {
pub job: BTreeMap<&'a str, &'a str>,
pub runner: BTreeMap<&'a str, &'a str>,
}

#[derive(Debug)]
Expand Down Expand Up @@ -243,11 +252,18 @@ enum JobCancelBehaviour {
struct AvailableArtifactStore {
lava: Arc<ThrottledLava>,
masker: Arc<Masker>,
artifact_caches: Mutex<BTreeMap<String, Arc<Mutex<JobArtifacts>>>>,
job_map: Mutex<BTreeMap<i64, Arc<Mutex<JobArtifacts>>>>,
}

impl AvailableArtifactStore {
pub fn new(lava: Arc<ThrottledLava>, masker: Arc<Masker>) -> Self {
Self { lava, masker }
Self {
lava,
masker,
artifact_caches: Default::default(),
job_map: Default::default(),
}
}

pub fn get_log(
Expand Down Expand Up @@ -282,12 +298,53 @@ impl AvailableArtifactStore {
.flatten_stream(),
)
}

pub fn create_upload_url(&self) -> String {
let artifacts = UPLOAD_SERVER.lock().unwrap().add_new_job();
let url = artifacts.lock().unwrap().get_upload_url().to_string();
self.artifact_caches
.lock()
.unwrap()
.insert(url.clone(), artifacts);
url
}

pub fn add_job_for_upload_url(&self, id: i64, upload_url: &str) {
let artifacts = self
.artifact_caches
.lock()
.unwrap()
.get(upload_url)
.unwrap()
.clone();
self.job_map.lock().unwrap().insert(id, artifacts);
}

pub fn get_uploaded_artifact(&self, id: i64, path: &str) -> Option<Bytes> {
self.job_map
.lock()
.unwrap()
.get(&id)
.and_then(|cache| cache.lock().unwrap().get_artifact_data(path))
}

pub fn get_uploaded_artifact_paths(&self, id: i64) -> Option<Vec<String>> {
self.job_map.lock().unwrap().get(&id).map(|cache| {
cache
.lock()
.unwrap()
.get_artifact_paths()
.map(str::to_string)
.collect()
})
}
}

#[derive(Clone, Debug, PartialEq, Eq, Ord, PartialOrd)]
enum LavaUploadableFileType {
Log { id: i64 },
Junit { id: i64 },
Artifact { id: i64, path: String },
}

#[derive(Clone)]
Expand Down Expand Up @@ -336,15 +393,25 @@ impl LavaUploadableFile {
store,
}
}

pub fn artifact(id: i64, path: String, store: Arc<AvailableArtifactStore>) -> Self {
Self {
which: LavaUploadableFileType::Artifact { id, path },
store,
}
}
}

impl UploadableFile for LavaUploadableFile {
type Data<'a> = Box<dyn AsyncRead + Send + Unpin + 'a>;

fn get_path(&self) -> Cow<'_, str> {
match self.which {
match &self.which {
LavaUploadableFileType::Log { id } => format!("{}_log.yaml", id).into(),
LavaUploadableFileType::Junit { id } => format!("{}_junit.xml", id).into(),
LavaUploadableFileType::Artifact { id, path } => {
format!("{}_artifacts/{}", id, path).into()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd just make that path as it can only come out of submit jobs; the <job_id>_bla.* mostly came due to those coming from the monitor jobs as well

}
}
}

Expand All @@ -357,6 +424,9 @@ impl UploadableFile for LavaUploadableFile {
LavaUploadableFileType::Junit { id } => {
Box::new(self.store.get_junit(*id).into_async_read())
}
LavaUploadableFileType::Artifact { id, path } => Box::new(futures::io::Cursor::new(
self.store.get_uploaded_artifact(*id, path).unwrap(),
)),
}
}
}
Expand Down Expand Up @@ -724,7 +794,7 @@ impl Run {
}
}

fn transform(&self, definition: String) -> Result<String, ()> {
fn transform(&self, definition: String, upload_url: &str) -> Result<String, ()> {
let mut handlebars = Handlebars::new();
handlebars.set_strict_mode(true);
handlebars
Expand All @@ -739,6 +809,7 @@ impl Run {
.variables()
.map(|var| (var.key(), var.value()))
.collect(),
runner: BTreeMap::from([("ARTIFACT_UPLOAD_URL", upload_url)]),
};
handlebars.render("definition", &mappings).map_err(|e| {
outputln!("Failed to substitute in template: {}", e);
Expand All @@ -754,15 +825,19 @@ impl Run {
"submit" => {
if let Some(filename) = p.next() {
let data = self.find_file(filename).await?;
let upload_url = self.store.create_upload_url();
let definition = match String::from_utf8(data) {
Ok(data) => self.transform(data)?,
Ok(data) => self.transform(data, &upload_url)?,
Err(_) => {
outputln!("Job definition is not utf-8");
return Err(());
}
};
let ids = self.submit_definition(&definition).await?;
self.ids.extend(&ids);
for id in &self.ids {
self.store.add_job_for_upload_url(*id, &upload_url);
}
self.follow_job(ids[0], cancel_token, JobCancelBehaviour::CancelLava)
.await
} else {
Expand Down Expand Up @@ -834,6 +909,14 @@ impl CancellableJobHandler<LavaUploadableFile> for Run {
for id in &self.ids {
available_files.push(LavaUploadableFile::log(*id, self.store.clone()));
available_files.push(LavaUploadableFile::junit(*id, self.store.clone()));
for path in self
.store
.get_uploaded_artifact_paths(*id)
.into_iter()
.flatten()
{
available_files.push(LavaUploadableFile::artifact(*id, path, self.store.clone()));
}
}
Ok(Box::new(available_files.into_iter()))
}
Expand All @@ -844,6 +927,7 @@ type LavaMap = Arc<Mutex<BTreeMap<(String, String), Arc<ThrottledLava>>>>;
lazy_static! {
static ref LAVA_MAP: LavaMap = Arc::new(Mutex::new(BTreeMap::new()));
static ref MAX_CONCURRENT_REQUESTS: Arc<Mutex<usize>> = Arc::new(Mutex::new(20));
static ref UPLOAD_SERVER: Arc<Mutex<UploadServer>> = Default::default();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rather then having a global static variable why pass a reference to the Run struct ; that avoids the global variable and potentlaly a lot of the lock().unwrap() dances (though that could be sorted out by internal mutability as wel..

Tbh it would probably be nicer if the Runs each simply had a copy or a registration object from the factory

}

async fn new_job(job: Job) -> Result<impl CancellableJobHandler<LavaUploadableFile>, ()> {
Expand Down Expand Up @@ -921,6 +1005,10 @@ async fn new_job(job: Job) -> Result<impl CancellableJobHandler<LavaUploadableFi
Ok(Run::new(lava, url, job, cancel_behaviour))
}

async fn upload_artifact(Path((job, path)): Path<(String, String)>, body: Bytes) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This really needs to be a chunked/streaming transfer as the lava job might be quite big files not stuff we want to store in memory perse

UPLOAD_SERVER.lock().unwrap().upload_file(&job, &path, body);
}

#[tokio::main]
async fn main() {
let opts = Opts::from_args();
Expand Down Expand Up @@ -949,6 +1037,77 @@ async fn main() {
);
}

tokio::spawn(async {
let local_port = match env::var("LAVA_GITLAB_RUNNER_LOCAL_PORT") {
Ok(val) => val
.parse()
.expect("failed to parse LAVA_GITLAB_RUNNER_LOCAL_PORT as a port number"),
Err(_) => {
warn!("No LAVA_GITLAB_RUNNER_LOCAL_PORT set, will listen on ephemeral IP.");
0u16
}
};

let listener = std::net::TcpListener::bind(std::net::SocketAddr::new(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why bothering with a tcp listener rather then just asking Axum::Server to bind itself

IpAddr::V4(Ipv4Addr::UNSPECIFIED),
local_port,
))
.expect("failed to bind listener");

let routable_host = match env::var("LAVA_GITLAB_RUNNER_ROUTABLE_HOST") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should really just be one argument/environment variable for the base URL rather then specifying a seperate hostname and port so one can also do https://my-upload-host:2456/badger/snakes if this endpoint is not mounted on the root

Ok(val) => val,
Err(_) => {
let host = local_ip_address::local_ip()
.expect("failed to determine local ip")
.to_string();

warn!(
"No LAVA_GITLAB_RUNNER_ROUTABLE_HOST set, using best guess of local IP {}.",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is no base url set then simply don't enable the listen functionality; local_pi() is of quite questionable value.

host
);
host
}
};

let routable_port = match env::var("LAVA_GITLAB_RUNNER_ROUTABLE_PORT") {
Ok(val) => val
.parse()
.expect("failed to parse LAVA_GITLAB_RUNNER_ROUTABLE_PORT as a port number"),
Err(_) => {
let port = listener
.local_addr()
.expect("failed to get local address")
.port();

info!(
"No LAVA_GITLAB_RUNNER_ROUTABLE_PORT set, using local port {}.",
port
);
port
}
};

let routable_addr = format!("{}:{}", routable_host, routable_port);

info!(
"Artifact upload listening on {} (reporting routable {})",
listener.local_addr().expect("failed to get local address"),
routable_addr
);

UPLOAD_SERVER
.lock()
.unwrap()
.set_base_address(routable_addr);
let app = Router::new().route("/artifacts/:job/*path", post(upload_artifact));

axum::Server::from_tcp(listener)
.expect("failed to create axum server from TCP listener")
.serve(app.into_make_service())
.await
.unwrap();
});

runner
.run(new_job, 64)
.await
Expand Down
Loading