From 6f54439b1588b852e4b7845c969d9ca8928796c8 Mon Sep 17 00:00:00 2001 From: Lann Martin Date: Thu, 22 Aug 2024 15:24:09 -0400 Subject: [PATCH 1/2] factors: Port redis trigger Signed-off-by: Lann Martin --- Cargo.lock | 88 +++++++++-- crates/expressions/src/lib.rs | 2 +- crates/factor-variables/src/lib.rs | 12 +- crates/factors-executor/src/lib.rs | 4 + crates/trigger-http2/src/lib.rs | 4 +- crates/trigger-redis/Cargo.toml | 24 +++ crates/trigger-redis/src/lib.rs | 226 +++++++++++++++++++++++++++++ crates/trigger2/src/cli.rs | 4 +- crates/trigger2/src/lib.rs | 11 +- 9 files changed, 347 insertions(+), 28 deletions(-) create mode 100644 crates/trigger-redis/Cargo.toml create mode 100644 crates/trigger-redis/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index d6a8e17f3..6b4b3bfcd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -181,6 +181,12 @@ version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7d5a26814d8dcb93b0e5a0ff3c6d80a8843bafb21b39e8e18a6f05471870e110" +[[package]] +name = "arc-swap" +version = "1.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" + [[package]] name = "arrayvec" version = "0.5.2" @@ -479,9 +485,9 @@ checksum = "fbb36e985947064623dbd357f727af08ffd077f93d696782f3c56365fa2e2799" [[package]] name = "async-trait" -version = "0.1.79" +version = "0.1.81" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a507401cad91ec6a857ed5513a2073c82a9b9048762b885bb98655b306964681" +checksum = "6e0c28dcc82d7c8ead5cb13beb15405b57b8546e93215673ff8ca0349a028107" dependencies = [ "proc-macro2", "quote", @@ -1737,7 +1743,7 @@ dependencies = [ "bitflags 1.3.2", "crossterm_winapi", "libc", - "mio", + "mio 0.8.11", "parking_lot", "signal-hook", "signal-hook-mio", @@ -4493,6 +4499,18 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "mio" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "80e04d1dcff3aae0704555fe5fee3bcfaf3d1fdf8a7e521d5b9d2b42acb52cec" +dependencies = [ + "hermit-abi 0.3.9", + "libc", + "wasi 0.11.0+wasi-snapshot-preview1", + "windows-sys 0.52.0", +] + [[package]] name = "mirai-annotations" version = "1.12.0" @@ -4541,7 +4559,7 @@ dependencies = [ "keyed_priority_queue", "lazy_static 1.4.0", "lru 0.12.3", - "mio", + "mio 0.8.11", "mysql_common", "native-tls", "once_cell", @@ -4712,7 +4730,7 @@ dependencies = [ "inotify", "kqueue", "libc", - "mio", + "mio 0.8.11", "walkdir", "windows-sys 0.45.0", ] @@ -4743,11 +4761,10 @@ dependencies = [ [[package]] name = "num-bigint" -version = "0.4.4" +version = "0.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "608e7659b5c3d7cba262d894801b9ec9d00de989e8a82bd4bef91d08da45cdc0" +checksum = "a5e44f723f1133c9deac646763579fdb3ac745e418f2a7af9cd0c431da1f20b9" dependencies = [ - "autocfg", "num-integer", "num-traits 0.2.18", ] @@ -6035,6 +6052,29 @@ dependencies = [ "url", ] +[[package]] +name = "redis" +version = "0.26.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e902a69d09078829137b4a5d9d082e0490393537badd7c91a3d69d14639e115f" +dependencies = [ + "arc-swap", + "async-trait", + "bytes", + "combine", + "futures-util", + "itoa", + "num-bigint", + "percent-encoding", + "pin-project-lite", + "ryu", + "sha1_smol", + "socket2 0.5.6", + "tokio", + "tokio-util 0.7.10", + "url", +] + [[package]] name = "redox_syscall" version = "0.2.16" @@ -6960,7 +7000,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "29ad2e15f37ec9a6cc544097b78a1ec90001e9f71b81338ca39f430adaca99af" dependencies = [ "libc", - "mio", + "mio 0.8.11", "signal-hook", ] @@ -8042,6 +8082,23 @@ dependencies = [ "webpki-roots 0.26.1", ] +[[package]] +name = "spin-trigger-redis" +version = "2.8.0-pre0" +dependencies = [ + "anyhow", + "async-trait", + "futures", + "redis 0.26.1", + "serde 1.0.197", + "spin-factor-variables", + "spin-telemetry", + "spin-trigger2", + "spin-world", + "tokio", + "tracing", +] + [[package]] name = "spin-trigger2" version = "2.8.0-pre0" @@ -8588,21 +8645,20 @@ dependencies = [ [[package]] name = "tokio" -version = "1.37.0" +version = "1.39.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1adbebffeca75fcfd058afa480fb6c0b81e165a0323f9c9d39c9697e37c46787" +checksum = "9babc99b9923bfa4804bd74722ff02c0381021eafa4db9949217e3be8e84fff5" dependencies = [ "backtrace", "bytes", "libc", - "mio", - "num_cpus", + "mio 1.0.2", "parking_lot", "pin-project-lite", "signal-hook-registry", "socket2 0.5.6", "tokio-macros", - "windows-sys 0.48.0", + "windows-sys 0.52.0", ] [[package]] @@ -8617,9 +8673,9 @@ dependencies = [ [[package]] name = "tokio-macros" -version = "2.2.0" +version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" +checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" dependencies = [ "proc-macro2", "quote", diff --git a/crates/expressions/src/lib.rs b/crates/expressions/src/lib.rs index 811c0e864..350914b55 100644 --- a/crates/expressions/src/lib.rs +++ b/crates/expressions/src/lib.rs @@ -267,7 +267,7 @@ impl<'a> AsRef for Key<'a> { } } -type Result = std::result::Result; +pub type Result = std::result::Result; /// A variable resolution error. #[derive(Debug, thiserror::Error)] diff --git a/crates/factor-variables/src/lib.rs b/crates/factor-variables/src/lib.rs index 9ea892a2b..3c9b6603a 100644 --- a/crates/factor-variables/src/lib.rs +++ b/crates/factor-variables/src/lib.rs @@ -5,7 +5,7 @@ pub mod spin_cli; use std::sync::Arc; use runtime_config::RuntimeConfig; -use spin_expressions::ProviderResolver as ExpressionResolver; +use spin_expressions::{ProviderResolver as ExpressionResolver, Template}; use spin_factors::{ anyhow, ConfigureAppContext, Factor, InitContext, InstanceBuilders, PrepareContext, RuntimeFactors, SelfInstanceBuilder, @@ -70,6 +70,16 @@ pub struct AppState { expression_resolver: Arc, } +impl AppState { + pub async fn resolve_expression( + &self, + expr: impl Into>, + ) -> spin_expressions::Result { + let template = Template::new(expr)?; + self.expression_resolver.resolve_template(&template).await + } +} + pub struct InstanceState { component_id: String, expression_resolver: Arc, diff --git a/crates/factors-executor/src/lib.rs b/crates/factors-executor/src/lib.rs index 2d1c00330..f204dd1a7 100644 --- a/crates/factors-executor/src/lib.rs +++ b/crates/factors-executor/src/lib.rs @@ -115,6 +115,10 @@ impl FactorsExecutorApp { &self.executor.core_engine } + pub fn configured_app(&self) -> &ConfiguredApp { + &self.configured_app + } + pub fn app(&self) -> &App { self.configured_app.app() } diff --git a/crates/trigger-http2/src/lib.rs b/crates/trigger-http2/src/lib.rs index 994911099..705c522f2 100644 --- a/crates/trigger-http2/src/lib.rs +++ b/crates/trigger-http2/src/lib.rs @@ -60,8 +60,6 @@ impl CliArgs { } } -pub(crate) type InstanceState = (); - /// The Spin HTTP trigger. pub struct HttpTrigger { /// The address the server should listen on. @@ -76,7 +74,7 @@ impl Trigger for HttpTrigger { const TYPE: &'static str = "http"; type CliArgs = CliArgs; - type InstanceState = InstanceState; + type InstanceState = (); fn new(cli_args: Self::CliArgs, app: &spin_app::App) -> anyhow::Result { Self::new(app, cli_args.address, cli_args.into_tls_config()) diff --git a/crates/trigger-redis/Cargo.toml b/crates/trigger-redis/Cargo.toml new file mode 100644 index 000000000..c2cba3a57 --- /dev/null +++ b/crates/trigger-redis/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "spin-trigger-redis" +version = { workspace = true } +authors = { workspace = true } +edition = { workspace = true } + +[lib] +doctest = false + +[dependencies] +anyhow = "1.0" +async-trait = "0.1" +futures = "0.3" +serde = "1.0.188" +spin-factor-variables = { path = "../factor-variables" } +spin-telemetry = { path = "../telemetry" } +spin-trigger2 = { path = "../trigger2" } +spin-world = { path = "../world" } +redis = { version = "0.26.1", features = ["tokio-comp"] } +tracing = { workspace = true } +tokio = { version = "1.39.3", features = ["macros", "rt"] } + +[lints] +workspace = true diff --git a/crates/trigger-redis/src/lib.rs b/crates/trigger-redis/src/lib.rs new file mode 100644 index 000000000..d0d3fbbda --- /dev/null +++ b/crates/trigger-redis/src/lib.rs @@ -0,0 +1,226 @@ +use std::{collections::HashMap, sync::Arc}; + +use anyhow::Context; +use futures::{StreamExt, TryFutureExt}; +use redis::{Client, Msg}; +use serde::Deserialize; +use spin_factor_variables::VariablesFactor; +use spin_trigger2::{cli::NoCliArgs, App, Trigger, TriggerApp}; +use spin_world::exports::fermyon::spin::inbound_redis; +use tracing::{instrument, Level}; + +pub struct RedisTrigger; + +/// Redis trigger metadata. +#[derive(Clone, Debug, Default, Deserialize)] +#[serde(deny_unknown_fields)] +struct TriggerMetadata { + address: String, +} + +/// Redis trigger configuration. +#[derive(Clone, Debug, Default, Deserialize)] +#[serde(deny_unknown_fields)] +struct TriggerConfig { + /// Component ID to invoke + component: String, + /// Channel to subscribe to + channel: String, + /// Optionally override address for trigger + address: Option, +} + +impl Trigger for RedisTrigger { + const TYPE: &'static str = "redis"; + + type CliArgs = NoCliArgs; + + type InstanceState = (); + + fn new(_cli_args: Self::CliArgs, _app: &App) -> anyhow::Result { + Ok(Self) + } + + async fn run(self, trigger_app: spin_trigger2::TriggerApp) -> anyhow::Result<()> { + let app_variables = trigger_app + .configured_app() + .app_state::() + .context("RedisTrigger depends on VariablesFactor")?; + + let app = trigger_app.app(); + let metadata = app + .get_trigger_metadata::(Self::TYPE)? + .unwrap_or_default(); + let default_address_expr = &metadata.address; + let default_address = app_variables + .resolve_expression(default_address_expr.clone()) + .await + .with_context(|| { + format!("failed to resolve redis trigger default address {default_address_expr:?}") + })?; + + // Maps -> -> + let mut server_channel_components: HashMap = HashMap::new(); + + // Resolve trigger configs before starting any subscribers + for (_, config) in app + .trigger_configs::(Self::TYPE)? + .into_iter() + .collect::>() + { + let component_id = config.component; + + let address_expr = config.address.as_ref().unwrap_or(&default_address); + let address = app_variables + .resolve_expression(address_expr.clone()) + .await + .with_context(|| { + format!( + "failed to resolve redis trigger address {address_expr:?} for component {component_id}" + ) + })?; + + let channel_expr = &config.channel; + let channel = app_variables + .resolve_expression(channel_expr.clone()) + .await + .with_context(|| { + format!( + "failed to resolve redis trigger channel {channel_expr:?} for component {component_id}" + ) + })?; + + server_channel_components + .entry(address) + .or_default() + .entry(channel) + .or_default() + .push(component_id); + } + + // Start subscriber(s) + let trigger_app = Arc::new(trigger_app); + let mut subscriber_tasks = Vec::new(); + for (address, channel_components) in server_channel_components { + let subscriber = Subscriber::new(address, trigger_app.clone(), channel_components)?; + let task = tokio::spawn(subscriber.run_listener()); + subscriber_tasks.push(task); + } + + // Wait for any task to complete + let (res, _, _) = futures::future::select_all(subscriber_tasks).await; + res? + } +} + +/// Maps -> +type ChannelComponents = HashMap>; + +/// Subscribes to channels from a single Redis server. +struct Subscriber { + client: Client, + trigger_app: Arc>, + channel_components: ChannelComponents, +} + +impl Subscriber { + fn new( + address: String, + trigger_app: Arc>, + channel_components: ChannelComponents, + ) -> anyhow::Result { + let client = Client::open(address)?; + Ok(Self { + client, + trigger_app, + channel_components, + }) + } + + async fn run_listener(self) -> anyhow::Result<()> { + let server_addr = &self.client.get_connection_info().addr; + + tracing::info!("Connecting to Redis server at {server_addr}"); + let mut pubsub = self + .client + .get_async_pubsub() + .await + .with_context(|| format!("Redis trigger failed to connect to {server_addr}"))?; + + println!("Active Channels on {server_addr}:"); + + // Subscribe to channels + for (channel, components) in &self.channel_components { + tracing::info!("Subscribing to {channel:?} on {server_addr}"); + pubsub.subscribe(channel).await.with_context(|| { + format!("Redis trigger failed to subscribe to channel {channel:?} on {server_addr}") + })?; + println!("\t{server_addr}/{channel}: [{}]", components.join(",")); + } + + let mut message_stream = pubsub.on_message(); + while let Some(msg) = message_stream.next().await { + if let Err(err) = self.handle_message(msg).await { + tracing::error!("Error handling message from {server_addr}: {err}"); + } + } + Err(anyhow::anyhow!("disconnected from {server_addr}")) + } + + #[instrument(name = "spin_trigger_redis.handle_message", skip_all, err(level = Level::INFO), fields( + otel.name = format!("{} receive", msg.get_channel_name()), + otel.kind = "consumer", + messaging.operation = "receive", + messaging.system = "redis" + ))] + async fn handle_message(&self, msg: Msg) -> anyhow::Result<()> { + let server_addr = &self.client.get_connection_info().addr; + let channel = msg.get_channel_name(); + tracing::trace!(%server_addr, %channel, "Received message"); + + let Some(component_ids) = self.channel_components.get(channel) else { + anyhow::bail!("message from unexpected channel {channel:?}"); + }; + + let dispatch_futures = component_ids.iter().map(|component_id| { + tracing::trace!("Executing Redis component {component_id}"); + self.dispatch_handler(&msg, component_id) + .inspect_err(move |err| { + tracing::info!("Component {component_id} handler failed: {err}"); + }) + }); + futures::future::join_all(dispatch_futures).await; + + Ok(()) + } + + async fn dispatch_handler(&self, msg: &Msg, component_id: &str) -> anyhow::Result<()> { + spin_telemetry::metrics::monotonic_counter!( + spin.request_count = 1, + trigger_type = "redis", + app_id = self.trigger_app.app().id(), + component_id = component_id + ); + + let (instance, mut store) = self + .trigger_app + .prepare(component_id)? + .instantiate(()) + .await?; + + let guest = { + let exports = &mut instance.exports(&mut store); + let mut inbound_redis_export = exports + .instance("fermyon:spin/inbound-redis") + .context("no fermyon:spin/inbound-redis instance found")?; + inbound_redis::Guest::new(&mut inbound_redis_export)? + }; + + let payload = msg.get_payload_bytes().to_vec(); + + guest + .call_handle_message(&mut store, &payload) + .await? + .context("Redis handler returned an error") + } +} diff --git a/crates/trigger2/src/cli.rs b/crates/trigger2/src/cli.rs index 9bc787cef..4b6dc422b 100644 --- a/crates/trigger2/src/cli.rs +++ b/crates/trigger2/src/cli.rs @@ -130,7 +130,7 @@ pub struct FactorsTriggerCommand { /// An empty implementation of clap::Args to be used as TriggerExecutor::RunConfig /// for executors that do not need additional CLI args. #[derive(Args)] -pub struct NoArgs; +pub struct NoCliArgs; impl FactorsTriggerCommand { /// Create a new TriggerExecutorBuilder from this TriggerExecutorCommand. @@ -402,7 +402,7 @@ pub mod help { impl Trigger for HelpArgsOnlyTrigger { const TYPE: &'static str = "help-args-only"; - type CliArgs = NoArgs; + type CliArgs = NoCliArgs; type InstanceState = (); fn new(_cli_args: Self::CliArgs, _app: &App) -> anyhow::Result { diff --git a/crates/trigger2/src/lib.rs b/crates/trigger2/src/lib.rs index 095d30e3b..0fde4a021 100644 --- a/crates/trigger2/src/lib.rs +++ b/crates/trigger2/src/lib.rs @@ -1,14 +1,15 @@ +pub mod cli; +mod factors; +mod stdio; + use std::future::Future; use clap::Args; use factors::{TriggerFactors, TriggerFactorsInstanceState}; -use spin_app::App; use spin_core::Linker; use spin_factors_executor::{FactorsExecutorApp, FactorsInstanceBuilder}; -pub mod cli; -mod factors; -mod stdio; +pub use spin_app::App; /// Type alias for a [`FactorsConfiguredApp`] specialized to a [`Trigger`]. pub type TriggerApp = FactorsExecutorApp::InstanceState>; @@ -54,7 +55,7 @@ pub trait Trigger: Sized + Send { /// Run this trigger. fn run( self, - configured_app: TriggerApp, + trigger_app: TriggerApp, ) -> impl Future> + Send; /// Returns a list of host requirements supported by this trigger specifically. From 381bd802c91bc636286828488df5b8979390e7e4 Mon Sep 17 00:00:00 2001 From: Lann Martin Date: Thu, 22 Aug 2024 15:29:37 -0400 Subject: [PATCH 2/2] Reintroduce redis trigger to spin up Signed-off-by: Lann Martin --- Cargo.lock | 1 + Cargo.toml | 2 +- build.rs | 1 - src/bin/spin.rs | 6 +++--- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6b4b3bfcd..b5da52fe9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7237,6 +7237,7 @@ dependencies = [ "spin-telemetry", "spin-templates", "spin-trigger-http2", + "spin-trigger-redis", "spin-trigger2", "subprocess", "tempfile", diff --git a/Cargo.toml b/Cargo.toml index 0f270d3b3..664d92b06 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -64,7 +64,7 @@ spin-telemetry = { path = "crates/telemetry", features = [ spin-templates = { path = "crates/templates" } spin-trigger2 = { path = "crates/trigger2" } spin-trigger-http2 = { path = "crates/trigger-http2" } -# TODO: spin-trigger-redis = { path = "crates/trigger-redis" } +spin-trigger-redis = { path = "crates/trigger-redis" } tempfile = "3.8.0" tokio = { version = "1.23", features = ["full"] } diff --git a/build.rs b/build.rs index d4cd7acf0..3246e3ec7 100644 --- a/build.rs +++ b/build.rs @@ -69,7 +69,6 @@ error: the `wasm32-wasi` target is not installed build_wasm_test_program("core-wasi-test.wasm", "crates/core/tests/core-wasi-test"); // build_wasm_test_program("redis-rust.wasm", "crates/trigger-redis/tests/rust"); - // build_wasm_test_program( // "spin-http-benchmark.wasm", // "crates/trigger-http/benches/spin-http-benchmark", diff --git a/src/bin/spin.rs b/src/bin/spin.rs index 42e6539dc..3cce1b3ca 100644 --- a/src/bin/spin.rs +++ b/src/bin/spin.rs @@ -18,7 +18,7 @@ use spin_cli::{build_info::*, subprocess::ExitStatusError}; use spin_trigger2::cli::help::HelpArgsOnlyTrigger; use spin_trigger2::cli::FactorsTriggerCommand; use spin_trigger_http2::HttpTrigger; -// TODO(factors): use spin_trigger_redis::RedisTrigger; +use spin_trigger_redis::RedisTrigger; #[tokio::main] async fn main() { @@ -137,7 +137,7 @@ enum SpinApp { #[derive(Subcommand)] enum TriggerCommands { Http(FactorsTriggerCommand), - // TODO(factors): Redis(TriggerExecutorCommand), + Redis(FactorsTriggerCommand), #[clap(name = spin_cli::HELP_ARGS_ONLY_TRIGGER_TYPE, hide = true)] HelpArgsOnly(FactorsTriggerCommand), } @@ -155,7 +155,7 @@ impl SpinApp { Self::Registry(cmd) => cmd.run().await, Self::Build(cmd) => cmd.run().await, Self::Trigger(TriggerCommands::Http(cmd)) => cmd.run().await, - // TODO(factors): Self::Trigger(TriggerCommands::Redis(cmd)) => cmd.run().await, + Self::Trigger(TriggerCommands::Redis(cmd)) => cmd.run().await, Self::Trigger(TriggerCommands::HelpArgsOnly(cmd)) => cmd.run().await, Self::Plugins(cmd) => cmd.run().await, Self::External(cmd) => execute_external_subcommand(cmd, app).await,