Skip to content

Commit

Permalink
fix: dummy plugin refactored to use "Session" tracking so to overcome…
Browse files Browse the repository at this point in the history
… hanging
  • Loading branch information
j-lanson committed Aug 29, 2024
1 parent 4f65cfe commit a65f7d9
Show file tree
Hide file tree
Showing 14 changed files with 1,505 additions and 194 deletions.
27 changes: 27 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
resolver = "2"

# Members of the workspace.
members = ["hipcheck", "hipcheck-macros", "xtask", "plugins/dummy_rand_data"]
members = ["hipcheck", "hipcheck-macros", "xtask", "plugins/dummy_rand_data", "plugins/dummy_sha256"]

# Make sure Hipcheck is run with `cargo run`.
#
Expand Down
12 changes: 9 additions & 3 deletions hipcheck/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ pub enum FullCommands {
Ready,
Update(UpdateArgs),
Cache(CacheArgs),
Plugin,
Plugin(PluginArgs),
PrintConfig,
PrintData,
PrintCache,
Expand All @@ -415,7 +415,7 @@ impl From<&Commands> for FullCommands {
Commands::Scoring => FullCommands::Scoring,
Commands::Update(args) => FullCommands::Update(args.clone()),
Commands::Cache(args) => FullCommands::Cache(args.clone()),
Commands::Plugin => FullCommands::Plugin,
Commands::Plugin(args) => FullCommands::Plugin(args.clone()),
}
}
}
Expand Down Expand Up @@ -446,7 +446,7 @@ pub enum Commands {
Cache(CacheArgs),
/// Execute temporary code for exercising plugin engine
#[command(hide = true)]
Plugin,
Plugin(PluginArgs),
}

// If no subcommand matched, default to use of '-t <TYPE> <TARGET' syntax. In
Expand Down Expand Up @@ -965,6 +965,12 @@ impl TryFrom<Vec<String>> for RepoCacheDeleteScope {
}
}

#[derive(Debug, Clone, clap::Args)]
pub struct PluginArgs {
#[arg(long = "async")]
pub asynch: bool,
}

/// Test CLI commands
#[cfg(test)]
mod tests {
Expand Down
14 changes: 14 additions & 0 deletions hipcheck/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ fn query(
};
// Initiate the query. If remote closed or we got our response immediately,
// return
println!("Querying {plugin}::{query} with key {key:?}");
let mut ar = match runtime.block_on(p_handle.query(query, key))? {
PluginResponse::RemoteClosed => {
return Err(hc_error!("Plugin channel closed unexpected"));
Expand All @@ -48,12 +49,14 @@ fn query(
// (with salsa memo-ization) to get the needed data, and resume our
// current query by providing the plugin the answer.
loop {
println!("Query needs more info, recursing...");
let answer = db.query(
ar.publisher.clone(),
ar.plugin.clone(),
ar.query.clone(),
ar.key.clone(),
)?;
println!("Got answer {answer:?}, resuming");
ar = match runtime.block_on(p_handle.resume_query(ar, answer))? {
PluginResponse::RemoteClosed => {
return Err(hc_error!("Plugin channel closed unexpected"));
Expand All @@ -79,6 +82,7 @@ pub fn async_query(
};
// Initiate the query. If remote closed or we got our response immediately,
// return
println!("Querying: {query}, key: {key:?}");
let mut ar = match p_handle.query(query, key).await? {
PluginResponse::RemoteClosed => {
return Err(hc_error!("Plugin channel closed unexpected"));
Expand All @@ -92,6 +96,7 @@ pub fn async_query(
// (with salsa memo-ization) to get the needed data, and resume our
// current query by providing the plugin the answer.
loop {
println!("Awaiting result, now recursing");
let answer = async_query(
Arc::clone(&core),
ar.publisher.clone(),
Expand All @@ -100,6 +105,7 @@ pub fn async_query(
ar.key.clone(),
)
.await?;
println!("Resuming query with answer {answer:?}");
ar = match p_handle.resume_query(ar, answer).await? {
PluginResponse::RemoteClosed => {
return Err(hc_error!("Plugin channel closed unexpected"));
Expand All @@ -120,6 +126,14 @@ pub struct HcEngineImpl {

impl salsa::Database for HcEngineImpl {}

impl salsa::ParallelDatabase for HcEngineImpl {
fn snapshot(&self) -> salsa::Snapshot<Self> {
salsa::Snapshot::new(HcEngineImpl {
storage: self.storage.snapshot(),
})
}
}

impl HcEngineImpl {
// Really HcEngineImpl and HcPluginCore do the same thing right now, except HcPluginCore
// has an async constructor. If we can manipulate salsa to accept async functions, we
Expand Down
120 changes: 71 additions & 49 deletions hipcheck/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ use cli::CacheOp;
use cli::CheckArgs;
use cli::CliConfig;
use cli::FullCommands;
use cli::PluginArgs;
use cli::SchemaArgs;
use cli::SchemaCommand;
use cli::SetupArgs;
Expand Down Expand Up @@ -111,7 +112,7 @@ fn main() -> ExitCode {
Some(FullCommands::Ready) => cmd_ready(&config),
Some(FullCommands::Update(args)) => cmd_update(&args),
Some(FullCommands::Cache(args)) => return cmd_cache(args, &config),
Some(FullCommands::Plugin) => cmd_plugin(),
Some(FullCommands::Plugin(args)) => cmd_plugin(args),
Some(FullCommands::PrintConfig) => cmd_print_config(config.config()),
Some(FullCommands::PrintData) => cmd_print_data(config.data()),
Some(FullCommands::PrintCache) => cmd_print_home(config.cache()),
Expand Down Expand Up @@ -603,16 +604,21 @@ fn check_github_token() -> StdResult<(), EnvVarCheckError> {
})
}

fn cmd_plugin() {
fn cmd_plugin(args: PluginArgs) {
use crate::engine::{async_query, HcEngine, HcEngineImpl};
use std::sync::Arc;
use tokio::task::JoinSet;

let tgt_dir = "./target/debug";
let entrypoint = pathbuf![tgt_dir, "dummy_rand_data"];
let plugin = Plugin {
let entrypoint1 = pathbuf![tgt_dir, "dummy_rand_data"];
let entrypoint2 = pathbuf![tgt_dir, "dummy_sha256"];
let plugin1 = Plugin {
name: "rand_data".to_owned(),
entrypoint: entrypoint.display().to_string(),
entrypoint: entrypoint1.display().to_string(),
};
let plugin2 = Plugin {
name: "sha256".to_owned(),
entrypoint: entrypoint2.display().to_string(),
};
let plugin_executor = PluginExecutor::new(
/* max_spawn_attempts */ 3,
Expand All @@ -624,57 +630,73 @@ fn cmd_plugin() {
.unwrap();
let engine = match HcEngineImpl::new(
plugin_executor,
vec![PluginWithConfig(plugin, serde_json::json!(null))],
vec![
PluginWithConfig(plugin1, serde_json::json!(null)),
PluginWithConfig(plugin2, serde_json::json!(null)),
],
) {
Ok(e) => e,
Err(e) => {
println!("Failed to create engine: {e}");
return;
}
};
let core = engine.core();
let handle = HcEngineImpl::runtime();
// @Note - how to initiate multiple queries with async calls
handle.block_on(async move {
let mut futs = JoinSet::new();
for i in 1..10 {
let arc_core = Arc::clone(&core);
println!("Spawning");
futs.spawn(async_query(
arc_core,
"MITRE".to_owned(),
"rand_data".to_owned(),
"rand_data".to_owned(),
serde_json::json!(i),
));
}
while let Some(res) = futs.join_next().await {
println!("res: {res:?}");
}
});
// @Note - how to initiate multiple queries with sync calls
// let conc: Vec<thread::JoinHandle<()>> = vec![];
// for i in 0..10 {
// let fut = thread::spawn(|| {
// let res = match engine.query(
// "MITRE".to_owned(),
// "rand_data".to_owned(),
// "rand_data".to_owned(),
// serde_json::json!(i),
// ) {
// Ok(r) => r,
// Err(e) => {
// println!("{i}: Query failed: {e}");
// return;
// }
// };
// println!("{i}: Result: {res}");
// });
// conc.push(fut);
// }
// while let Some(x) = conc.pop() {
// x.join().unwrap();
// }
if args.asynch {
// @Note - how to initiate multiple queries with async calls
let core = engine.core();
let handle = HcEngineImpl::runtime();
handle.block_on(async move {
let mut futs = JoinSet::new();
for i in 1..10 {
let arc_core = Arc::clone(&core);
println!("Spawning");
futs.spawn(async_query(
arc_core,
"MITRE".to_owned(),
"rand_data".to_owned(),
"rand_data".to_owned(),
serde_json::json!(i),
));
}
while let Some(res) = futs.join_next().await {
println!("res: {res:?}");
}
});
} else {
let res = engine.query(
"MITRE".to_owned(),
"rand_data".to_owned(),
"rand_data".to_owned(),
serde_json::json!(1),
);
println!("res: {res:?}");
// @Note - how to initiate multiple queries with sync calls
// Currently does not work, compiler complains need Sync impl
// use std::thread;
// let conc: Vec<thread::JoinHandle<()>> = vec![];
// for i in 0..10 {
// let snapshot = engine.snapshot();
// let fut = thread::spawn(|| {
// let res = match snapshot.query(
// "MITRE".to_owned(),
// "rand_data".to_owned(),
// "rand_data".to_owned(),
// serde_json::json!(i),
// ) {
// Ok(r) => r,
// Err(e) => {
// println!("{i}: Query failed: {e}");
// return;
// }
// };
// println!("{i}: Result: {res}");
// });
// conc.push(fut);
// }
// while let Some(x) = conc.pop() {
// x.join().unwrap();
// }
}
}

fn cmd_ready(config: &CliConfig) {
Expand Down
13 changes: 11 additions & 2 deletions hipcheck/src/plugin/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,15 @@ impl PluginExecutor {
}
fn get_available_port(&self) -> Result<u16> {
for i in self.port_range.start..self.port_range.end {
if std::net::TcpListener::bind(format!("127.0.0.1:{i}")).is_ok() {
return Ok(i);
// @Todo - either TcpListener::bind returns Ok even if port is bound
// or we have a race condition. For now just have OS assign a port
// if std::net::TcpListener::bind(format!("127.0.0.1:{i}")).is_ok() {
// return Ok(i);
// }
if let Ok(addr) = std::net::TcpListener::bind("127.0.0.1:0") {
if let Ok(local_addr) = addr.local_addr() {
return Ok(local_addr.port());
}
}
}
Err(hc_error!("Failed to find available port"))
Expand All @@ -60,13 +67,15 @@ impl PluginExecutor {
// on the cmdline is not already in use, but it is still possible for that
// port to become unavailable between our check and the plugin's bind attempt.
// Hence the need for subsequent attempts if we get unlucky
eprintln!("Starting plugin '{}'", plugin.name);
let mut spawn_attempts: usize = 0;
while spawn_attempts < self.max_spawn_attempts {
// Find free port for process. Don't retry if we fail since this means all
// ports in the desired range are already bound
let port = self.get_available_port()?;
let port_str = port.to_string();
// Spawn plugin process
eprintln!("Spawning '{}' on port {}", &plugin.entrypoint, port_str);
let Ok(mut proc) = Command::new(&plugin.entrypoint)
.args(["--port", port_str.as_str()])
// @Temporary - directly forward stdout/stderr from plugin to shell
Expand Down
1 change: 1 addition & 0 deletions hipcheck/src/plugin/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ impl ActivePlugin {
key: serde_json::json!(null),
output,
};
eprintln!("Resuming query with answer {query:?}");
Ok(self.channel.query(query).await?.into())
}
}
Expand Down
Loading

0 comments on commit a65f7d9

Please sign in to comment.