From eaaf1f059f7e00e1edb7c618b50b4aea4116a858 Mon Sep 17 00:00:00 2001 From: Lann Martin Date: Thu, 22 Aug 2024 15:24:09 -0400 Subject: [PATCH] factors: Port redis trigger Signed-off-by: Lann Martin --- Cargo.lock | 81 ++-- crates/expressions/src/lib.rs | 2 +- crates/factor-variables/src/lib.rs | 12 +- crates/factors-executor/src/lib.rs | 4 + crates/trigger-http2/src/lib.rs | 4 +- crates/trigger-redis/Cargo.toml | 16 +- crates/trigger-redis/src/lib.rs | 363 ++++++++++-------- crates/trigger-redis/src/spin.rs | 65 ---- crates/trigger-redis/src/tests.rs | 26 -- .../trigger-redis/tests/rust/.cargo/.config | 2 - crates/trigger-redis/tests/rust/Cargo.lock | 303 --------------- crates/trigger-redis/tests/rust/Cargo.toml | 13 - crates/trigger-redis/tests/rust/src/lib.rs | 25 -- crates/trigger2/src/cli.rs | 4 +- crates/trigger2/src/lib.rs | 11 +- 15 files changed, 281 insertions(+), 650 deletions(-) delete mode 100644 crates/trigger-redis/src/spin.rs delete mode 100644 crates/trigger-redis/src/tests.rs delete mode 100644 crates/trigger-redis/tests/rust/.cargo/.config delete mode 100644 crates/trigger-redis/tests/rust/Cargo.lock delete mode 100644 crates/trigger-redis/tests/rust/Cargo.toml delete mode 100644 crates/trigger-redis/tests/rust/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 3295d890c9..fb92269ea5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -181,6 +181,12 @@ version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7d5a26814d8dcb93b0e5a0ff3c6d80a8843bafb21b39e8e18a6f05471870e110" +[[package]] +name = "arc-swap" +version = "1.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" + [[package]] name = "arrayvec" version = "0.5.2" @@ -479,9 +485,9 @@ checksum = "fbb36e985947064623dbd357f727af08ffd077f93d696782f3c56365fa2e2799" [[package]] name = "async-trait" -version = "0.1.79" +version = "0.1.81" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a507401cad91ec6a857ed5513a2073c82a9b9048762b885bb98655b306964681" +checksum = "6e0c28dcc82d7c8ead5cb13beb15405b57b8546e93215673ff8ca0349a028107" dependencies = [ "proc-macro2", "quote", @@ -1798,7 +1804,7 @@ dependencies = [ "bitflags 1.3.2", "crossterm_winapi", "libc", - "mio", + "mio 0.8.11", "parking_lot", "signal-hook", "signal-hook-mio", @@ -4627,6 +4633,18 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "mio" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "80e04d1dcff3aae0704555fe5fee3bcfaf3d1fdf8a7e521d5b9d2b42acb52cec" +dependencies = [ + "hermit-abi 0.3.9", + "libc", + "wasi 0.11.0+wasi-snapshot-preview1", + "windows-sys 0.52.0", +] + [[package]] name = "mirai-annotations" version = "1.12.0" @@ -4675,7 +4693,7 @@ dependencies = [ "keyed_priority_queue", "lazy_static 1.4.0", "lru 0.12.3", - "mio", + "mio 0.8.11", "mysql_common", "native-tls", "once_cell", @@ -4846,7 +4864,7 @@ dependencies = [ "inotify", "kqueue", "libc", - "mio", + "mio 0.8.11", "walkdir", "windows-sys 0.45.0", ] @@ -4877,11 +4895,10 @@ dependencies = [ [[package]] name = "num-bigint" -version = "0.4.4" +version = "0.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "608e7659b5c3d7cba262d894801b9ec9d00de989e8a82bd4bef91d08da45cdc0" +checksum = "a5e44f723f1133c9deac646763579fdb3ac745e418f2a7af9cd0c431da1f20b9" dependencies = [ - "autocfg", "num-integer", "num-traits 0.2.18", ] @@ -6291,6 +6308,29 @@ dependencies = [ "url", ] +[[package]] +name = "redis" +version = "0.26.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e902a69d09078829137b4a5d9d082e0490393537badd7c91a3d69d14639e115f" +dependencies = [ + "arc-swap", + "async-trait", + "bytes", + "combine", + "futures-util", + "itoa", + "num-bigint", + "percent-encoding", + "pin-project-lite", + "ryu", + "sha1_smol", + "socket2 0.5.6", + "tokio", + "tokio-util 0.7.10", + "url", +] + [[package]] name = "redox_syscall" version = "0.2.16" @@ -7236,7 +7276,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "29ad2e15f37ec9a6cc544097b78a1ec90001e9f71b81338ca39f430adaca99af" dependencies = [ "libc", - "mio", + "mio 0.8.11", "signal-hook", ] @@ -8456,15 +8496,11 @@ dependencies = [ "anyhow", "async-trait", "futures", - "redis 0.21.7", + "redis 0.26.1", "serde 1.0.197", - "spin-app", - "spin-common", - "spin-core", - "spin-expressions", + "spin-factor-variables", "spin-telemetry", - "spin-testing", - "spin-trigger", + "spin-trigger2", "spin-world", "tokio", "tracing", @@ -9058,21 +9094,20 @@ dependencies = [ [[package]] name = "tokio" -version = "1.37.0" +version = "1.39.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1adbebffeca75fcfd058afa480fb6c0b81e165a0323f9c9d39c9697e37c46787" +checksum = "9babc99b9923bfa4804bd74722ff02c0381021eafa4db9949217e3be8e84fff5" dependencies = [ "backtrace", "bytes", "libc", - "mio", - "num_cpus", + "mio 1.0.2", "parking_lot", "pin-project-lite", "signal-hook-registry", "socket2 0.5.6", "tokio-macros", - "windows-sys 0.48.0", + "windows-sys 0.52.0", ] [[package]] @@ -9087,9 +9122,9 @@ dependencies = [ [[package]] name = "tokio-macros" -version = "2.2.0" +version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" +checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" dependencies = [ "proc-macro2", "quote", diff --git a/crates/expressions/src/lib.rs b/crates/expressions/src/lib.rs index 811c0e864d..350914b558 100644 --- a/crates/expressions/src/lib.rs +++ b/crates/expressions/src/lib.rs @@ -267,7 +267,7 @@ impl<'a> AsRef for Key<'a> { } } -type Result = std::result::Result; +pub type Result = std::result::Result; /// A variable resolution error. #[derive(Debug, thiserror::Error)] diff --git a/crates/factor-variables/src/lib.rs b/crates/factor-variables/src/lib.rs index 9ea892a2b7..3c9b6603aa 100644 --- a/crates/factor-variables/src/lib.rs +++ b/crates/factor-variables/src/lib.rs @@ -5,7 +5,7 @@ pub mod spin_cli; use std::sync::Arc; use runtime_config::RuntimeConfig; -use spin_expressions::ProviderResolver as ExpressionResolver; +use spin_expressions::{ProviderResolver as ExpressionResolver, Template}; use spin_factors::{ anyhow, ConfigureAppContext, Factor, InitContext, InstanceBuilders, PrepareContext, RuntimeFactors, SelfInstanceBuilder, @@ -70,6 +70,16 @@ pub struct AppState { expression_resolver: Arc, } +impl AppState { + pub async fn resolve_expression( + &self, + expr: impl Into>, + ) -> spin_expressions::Result { + let template = Template::new(expr)?; + self.expression_resolver.resolve_template(&template).await + } +} + pub struct InstanceState { component_id: String, expression_resolver: Arc, diff --git a/crates/factors-executor/src/lib.rs b/crates/factors-executor/src/lib.rs index 2d1c00330a..f204dd1a77 100644 --- a/crates/factors-executor/src/lib.rs +++ b/crates/factors-executor/src/lib.rs @@ -115,6 +115,10 @@ impl FactorsExecutorApp { &self.executor.core_engine } + pub fn configured_app(&self) -> &ConfiguredApp { + &self.configured_app + } + pub fn app(&self) -> &App { self.configured_app.app() } diff --git a/crates/trigger-http2/src/lib.rs b/crates/trigger-http2/src/lib.rs index 9949110999..705c522f28 100644 --- a/crates/trigger-http2/src/lib.rs +++ b/crates/trigger-http2/src/lib.rs @@ -60,8 +60,6 @@ impl CliArgs { } } -pub(crate) type InstanceState = (); - /// The Spin HTTP trigger. pub struct HttpTrigger { /// The address the server should listen on. @@ -76,7 +74,7 @@ impl Trigger for HttpTrigger { const TYPE: &'static str = "http"; type CliArgs = CliArgs; - type InstanceState = InstanceState; + type InstanceState = (); fn new(cli_args: Self::CliArgs, app: &spin_app::App) -> anyhow::Result { Self::new(app, cli_args.address, cli_args.into_tls_config()) diff --git a/crates/trigger-redis/Cargo.toml b/crates/trigger-redis/Cargo.toml index 70a90a61d5..c2cba3a575 100644 --- a/crates/trigger-redis/Cargo.toml +++ b/crates/trigger-redis/Cargo.toml @@ -12,19 +12,13 @@ anyhow = "1.0" async-trait = "0.1" futures = "0.3" serde = "1.0.188" -spin-app = { path = "../app" } -spin-common = { path = "../common" } -spin-core = { path = "../core" } -spin-expressions = { path = "../expressions" } -spin-trigger = { path = "../trigger" } +spin-factor-variables = { path = "../factor-variables" } +spin-telemetry = { path = "../telemetry" } +spin-trigger2 = { path = "../trigger2" } spin-world = { path = "../world" } -redis = { version = "0.21", features = ["tokio-comp"] } +redis = { version = "0.26.1", features = ["tokio-comp"] } tracing = { workspace = true } -tokio = { version = "1.23", features = ["full"] } -spin-telemetry = { path = "../telemetry" } - -[dev-dependencies] -spin-testing = { path = "../testing" } +tokio = { version = "1.39.3", features = ["macros", "rt"] } [lints] workspace = true diff --git a/crates/trigger-redis/src/lib.rs b/crates/trigger-redis/src/lib.rs index 440fc00b22..d0d3fbbda4 100644 --- a/crates/trigger-redis/src/lib.rs +++ b/crates/trigger-redis/src/lib.rs @@ -1,203 +1,226 @@ -//! Implementation for the Spin Redis engine. - -mod spin; - -use anyhow::{anyhow, Context, Result}; -use futures::{future::join_all, StreamExt}; -use redis::{Client, ConnectionLike}; -use serde::{de::IgnoredAny, Deserialize, Serialize}; -use spin_common::url::remove_credentials; -use spin_core::{async_trait, InstancePre}; -use spin_trigger::{cli::NoArgs, TriggerAppEngine, TriggerExecutor}; -use std::collections::HashMap; -use std::sync::Arc; +use std::{collections::HashMap, sync::Arc}; + +use anyhow::Context; +use futures::{StreamExt, TryFutureExt}; +use redis::{Client, Msg}; +use serde::Deserialize; +use spin_factor_variables::VariablesFactor; +use spin_trigger2::{cli::NoCliArgs, App, Trigger, TriggerApp}; +use spin_world::exports::fermyon::spin::inbound_redis; use tracing::{instrument, Level}; -use crate::spin::SpinRedisExecutor; +pub struct RedisTrigger; -pub(crate) type RuntimeData = (); -pub(crate) type Store = spin_core::Store; - -type ChannelComponents = HashMap>; -/// The Spin Redis trigger. -#[derive(Clone)] -pub struct RedisTrigger { - engine: Arc>, - // Mapping of server url with subscription channel and associated component IDs - server_channels: HashMap, +/// Redis trigger metadata. +#[derive(Clone, Debug, Default, Deserialize)] +#[serde(deny_unknown_fields)] +struct TriggerMetadata { + address: String, } /// Redis trigger configuration. -#[derive(Clone, Debug, Default, Deserialize, Serialize)] +#[derive(Clone, Debug, Default, Deserialize)] #[serde(deny_unknown_fields)] -pub struct RedisTriggerConfig { +struct TriggerConfig { /// Component ID to invoke - pub component: String, + component: String, /// Channel to subscribe to - pub channel: String, - /// optional overide address for trigger - pub address: Option, - /// Trigger executor (currently unused) - #[serde(default, skip_serializing)] - pub executor: IgnoredAny, + channel: String, + /// Optionally override address for trigger + address: Option, } -#[derive(Clone, Debug, Default, Deserialize, Serialize)] -#[serde(deny_unknown_fields)] -struct TriggerMetadata { - address: String, -} +impl Trigger for RedisTrigger { + const TYPE: &'static str = "redis"; -#[async_trait] -impl TriggerExecutor for RedisTrigger { - const TRIGGER_TYPE: &'static str = "redis"; - type RuntimeData = RuntimeData; - type TriggerConfig = RedisTriggerConfig; - type RunConfig = NoArgs; - type InstancePre = InstancePre; - - async fn new(engine: TriggerAppEngine) -> Result { - let default_address: String = engine - .trigger_metadata::()? - .unwrap_or_default() - .address; - let default_address_expr = spin_expressions::Template::new(default_address)?; - let default_address = engine.resolve_template(&default_address_expr)?; - - let mut server_channels: HashMap = HashMap::new(); - - for (_, config) in engine.trigger_configs() { - let address = config.address.clone().unwrap_or(default_address.clone()); - let address_expr = spin_expressions::Template::new(address)?; - let address = engine.resolve_template(&address_expr)?; - let server = server_channels.entry(address).or_default(); - let channel_expr = spin_expressions::Template::new(config.channel.as_str())?; - let channel = engine.resolve_template(&channel_expr)?; - server - .entry(channel) - .or_default() - .push(config.component.clone()); - } - Ok(Self { - engine: Arc::new(engine), - server_channels, - }) + type CliArgs = NoCliArgs; + + type InstanceState = (); + + fn new(_cli_args: Self::CliArgs, _app: &App) -> anyhow::Result { + Ok(Self) } - /// Run the Redis trigger indefinitely. - async fn run(self, _config: Self::RunConfig) -> Result<()> { - let tasks: Vec<_> = self - .server_channels - .clone() - .into_iter() - .map(|(server_address, channel_components)| { - let trigger = self.clone(); - tokio::spawn(async move { - trigger - .run_listener(server_address.clone(), channel_components.clone()) - .await - }) - }) - .collect(); + async fn run(self, trigger_app: spin_trigger2::TriggerApp) -> anyhow::Result<()> { + let app_variables = trigger_app + .configured_app() + .app_state::() + .context("RedisTrigger depends on VariablesFactor")?; + + let app = trigger_app.app(); + let metadata = app + .get_trigger_metadata::(Self::TYPE)? + .unwrap_or_default(); + let default_address_expr = &metadata.address; + let default_address = app_variables + .resolve_expression(default_address_expr.clone()) + .await + .with_context(|| { + format!("failed to resolve redis trigger default address {default_address_expr:?}") + })?; - // wait for the first handle to be returned and drop the rest - let (result, _, rest) = futures::future::select_all(tasks).await; + // Maps -> -> + let mut server_channel_components: HashMap = HashMap::new(); - drop(rest); + // Resolve trigger configs before starting any subscribers + for (_, config) in app + .trigger_configs::(Self::TYPE)? + .into_iter() + .collect::>() + { + let component_id = config.component; + + let address_expr = config.address.as_ref().unwrap_or(&default_address); + let address = app_variables + .resolve_expression(address_expr.clone()) + .await + .with_context(|| { + format!( + "failed to resolve redis trigger address {address_expr:?} for component {component_id}" + ) + })?; + + let channel_expr = &config.channel; + let channel = app_variables + .resolve_expression(channel_expr.clone()) + .await + .with_context(|| { + format!( + "failed to resolve redis trigger channel {channel_expr:?} for component {component_id}" + ) + })?; + + server_channel_components + .entry(address) + .or_default() + .entry(channel) + .or_default() + .push(component_id); + } + + // Start subscriber(s) + let trigger_app = Arc::new(trigger_app); + let mut subscriber_tasks = Vec::new(); + for (address, channel_components) in server_channel_components { + let subscriber = Subscriber::new(address, trigger_app.clone(), channel_components)?; + let task = tokio::spawn(subscriber.run_listener()); + subscriber_tasks.push(task); + } - result? + // Wait for any task to complete + let (res, _, _) = futures::future::select_all(subscriber_tasks).await; + res? } } -impl RedisTrigger { - // Handle the message. - #[instrument(name = "spin_trigger_redis.handle_message", skip(self, channel_components, msg), - err(level = Level::INFO), fields(otel.name = format!("{} receive", msg.get_channel_name()), - otel.kind = "consumer", messaging.operation = "receive", messaging.system = "redis"))] - async fn handle( - &self, - address: &str, - channel_components: &ChannelComponents, - msg: redis::Msg, - ) -> Result<()> { - let channel = msg.get_channel_name(); - tracing::info!("Received message on channel {address}:{:?}", channel); - - if let Some(component_ids) = channel_components.get(channel) { - let futures = component_ids.iter().map(|id| { - tracing::trace!("Executing Redis component {id:?}"); - SpinRedisExecutor.execute(&self.engine, id, channel, msg.get_payload_bytes()) - }); - let results: Vec<_> = join_all(futures).await.into_iter().collect(); - let errors = results - .into_iter() - .filter_map(|r| r.err()) - .collect::>(); - if !errors.is_empty() { - return Err(anyhow!("{errors:#?}")); - } - } else { - tracing::debug!("No subscription found for {:?}", channel); - } - Ok(()) - } +/// Maps -> +type ChannelComponents = HashMap>; - async fn run_listener( - &self, +/// Subscribes to channels from a single Redis server. +struct Subscriber { + client: Client, + trigger_app: Arc>, + channel_components: ChannelComponents, +} + +impl Subscriber { + fn new( address: String, + trigger_app: Arc>, channel_components: ChannelComponents, - ) -> Result<()> { - tracing::info!("Connecting to Redis server at {}", address); - let mut client = Client::open(address.to_string())?; - let mut pubsub = client - .get_async_connection() + ) -> anyhow::Result { + let client = Client::open(address)?; + Ok(Self { + client, + trigger_app, + channel_components, + }) + } + + async fn run_listener(self) -> anyhow::Result<()> { + let server_addr = &self.client.get_connection_info().addr; + + tracing::info!("Connecting to Redis server at {server_addr}"); + let mut pubsub = self + .client + .get_async_pubsub() .await - .with_context(|| anyhow!("Redis trigger failed to connect to {}", address))? - .into_pubsub(); + .with_context(|| format!("Redis trigger failed to connect to {server_addr}"))?; + + println!("Active Channels on {server_addr}:"); - let sanitised_addr = remove_credentials(&address)?; - println!("Active Channels on {sanitised_addr}:"); // Subscribe to channels - for (channel, component) in channel_components.iter() { - tracing::info!("Subscribing component {component:?} to channel {channel:?}"); - pubsub.subscribe(channel).await?; - println!("\t{sanitised_addr}:{channel}: [{}]", component.join(",")); + for (channel, components) in &self.channel_components { + tracing::info!("Subscribing to {channel:?} on {server_addr}"); + pubsub.subscribe(channel).await.with_context(|| { + format!("Redis trigger failed to subscribe to channel {channel:?} on {server_addr}") + })?; + println!("\t{server_addr}/{channel}: [{}]", components.join(",")); } - let mut stream = pubsub.on_message(); - loop { - match stream.next().await { - Some(msg) => { - if let Err(err) = self.handle(&address, &channel_components, msg).await { - tracing::warn!("Error handling message: {err}"); - } - } - None => { - tracing::trace!("Empty message"); - if !client.check_connection() { - tracing::info!("No Redis connection available"); - println!("Disconnected from {address}"); - break; - } - } - }; + let mut message_stream = pubsub.on_message(); + while let Some(msg) = message_stream.next().await { + if let Err(err) = self.handle_message(msg).await { + tracing::error!("Error handling message from {server_addr}: {err}"); + } } + Err(anyhow::anyhow!("disconnected from {server_addr}")) + } + + #[instrument(name = "spin_trigger_redis.handle_message", skip_all, err(level = Level::INFO), fields( + otel.name = format!("{} receive", msg.get_channel_name()), + otel.kind = "consumer", + messaging.operation = "receive", + messaging.system = "redis" + ))] + async fn handle_message(&self, msg: Msg) -> anyhow::Result<()> { + let server_addr = &self.client.get_connection_info().addr; + let channel = msg.get_channel_name(); + tracing::trace!(%server_addr, %channel, "Received message"); + + let Some(component_ids) = self.channel_components.get(channel) else { + anyhow::bail!("message from unexpected channel {channel:?}"); + }; + + let dispatch_futures = component_ids.iter().map(|component_id| { + tracing::trace!("Executing Redis component {component_id}"); + self.dispatch_handler(&msg, component_id) + .inspect_err(move |err| { + tracing::info!("Component {component_id} handler failed: {err}"); + }) + }); + futures::future::join_all(dispatch_futures).await; + Ok(()) } -} -/// The Redis executor trait. -/// All Redis executors must implement this trait. -#[async_trait] -pub(crate) trait RedisExecutor: Clone + Send + Sync + 'static { - async fn execute( - &self, - engine: &TriggerAppEngine, - component_id: &str, - channel: &str, - payload: &[u8], - ) -> Result<()>; + async fn dispatch_handler(&self, msg: &Msg, component_id: &str) -> anyhow::Result<()> { + spin_telemetry::metrics::monotonic_counter!( + spin.request_count = 1, + trigger_type = "redis", + app_id = self.trigger_app.app().id(), + component_id = component_id + ); + + let (instance, mut store) = self + .trigger_app + .prepare(component_id)? + .instantiate(()) + .await?; + + let guest = { + let exports = &mut instance.exports(&mut store); + let mut inbound_redis_export = exports + .instance("fermyon:spin/inbound-redis") + .context("no fermyon:spin/inbound-redis instance found")?; + inbound_redis::Guest::new(&mut inbound_redis_export)? + }; + + let payload = msg.get_payload_bytes().to_vec(); + + guest + .call_handle_message(&mut store, &payload) + .await? + .context("Redis handler returned an error") + } } - -#[cfg(test)] -mod tests; diff --git a/crates/trigger-redis/src/spin.rs b/crates/trigger-redis/src/spin.rs deleted file mode 100644 index 290265a210..0000000000 --- a/crates/trigger-redis/src/spin.rs +++ /dev/null @@ -1,65 +0,0 @@ -use anyhow::{anyhow, Result}; -use async_trait::async_trait; -use spin_core::Instance; -use spin_trigger::TriggerAppEngine; -use spin_world::v1::redis_types::{Error, Payload}; -use tracing::{instrument, Level}; - -use crate::{RedisExecutor, RedisTrigger, Store}; - -#[derive(Clone)] -pub struct SpinRedisExecutor; - -#[async_trait] -impl RedisExecutor for SpinRedisExecutor { - #[instrument(name = "spin_trigger_redis.execute_wasm", skip(self, engine, payload), err(level = Level::INFO), fields(otel.name = format!("execute_wasm_component {}", component_id)))] - async fn execute( - &self, - engine: &TriggerAppEngine, - component_id: &str, - channel: &str, - payload: &[u8], - ) -> Result<()> { - tracing::trace!("Executing request using the Spin executor for component {component_id}"); - - spin_telemetry::metrics::monotonic_counter!( - spin.request_count = 1, - trigger_type = "redis", - app_id = engine.app_name, - component_id = component_id - ); - - let (instance, store) = engine.prepare_instance(component_id).await?; - - match Self::execute_impl(store, instance, channel, payload.to_vec()).await { - Ok(()) => { - tracing::trace!("Request finished OK"); - Ok(()) - } - Err(e) => { - tracing::trace!("Request finished with error from {component_id}: {e}"); - Err(anyhow!("Error from {component_id}: {e}")) - } - } - } -} - -impl SpinRedisExecutor { - pub async fn execute_impl( - mut store: Store, - instance: Instance, - _channel: &str, - payload: Vec, - ) -> Result<()> { - let func = instance - .exports(&mut store) - .instance("fermyon:spin/inbound-redis") - .ok_or_else(|| anyhow!("no fermyon:spin/inbound-redis instance found"))? - .typed_func::<(Payload,), (Result<(), Error>,)>("handle-message")?; - - match func.call_async(store, (payload,)).await? { - (Ok(()) | Err(Error::Success),) => Ok(()), - _ => Err(anyhow!("`handle-message` returned an error")), - } - } -} diff --git a/crates/trigger-redis/src/tests.rs b/crates/trigger-redis/src/tests.rs deleted file mode 100644 index 6e0bb91d8b..0000000000 --- a/crates/trigger-redis/src/tests.rs +++ /dev/null @@ -1,26 +0,0 @@ -use super::*; -use anyhow::Result; -use redis::{Msg, Value}; -use spin_testing::{tokio, RedisTestConfig}; - -fn create_trigger_event(channel: &str, payload: &str) -> redis::Msg { - Msg::from_value(&redis::Value::Bulk(vec![ - Value::Data("message".into()), - Value::Data(channel.into()), - Value::Data(payload.into()), - ])) - .unwrap() -} - -#[tokio::test] -async fn test_pubsub() -> Result<()> { - let trigger: RedisTrigger = RedisTestConfig::default() - .test_program("redis-rust.wasm") - .build_trigger("messages") - .await; - let test = HashMap::new(); - let msg = create_trigger_event("messages", "hello"); - trigger.handle("", &test, msg).await?; - - Ok(()) -} diff --git a/crates/trigger-redis/tests/rust/.cargo/.config b/crates/trigger-redis/tests/rust/.cargo/.config deleted file mode 100644 index 30c83a7906..0000000000 --- a/crates/trigger-redis/tests/rust/.cargo/.config +++ /dev/null @@ -1,2 +0,0 @@ -[build] - target = "wasm32-wasi" diff --git a/crates/trigger-redis/tests/rust/Cargo.lock b/crates/trigger-redis/tests/rust/Cargo.lock deleted file mode 100644 index dd62f57f13..0000000000 --- a/crates/trigger-redis/tests/rust/Cargo.lock +++ /dev/null @@ -1,303 +0,0 @@ -# This file is automatically @generated by Cargo. -# It is not intended for manual editing. -version = 3 - -[[package]] -name = "anyhow" -version = "1.0.75" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4668cab20f66d8d020e1fbc0ebe47217433c1b6c8f2040faf858554e394ace6" - -[[package]] -name = "bitflags" -version = "2.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "327762f6e5a765692301e5bb513e0d9fef63be86bbc14528052b1cd3e6f03e07" - -[[package]] -name = "equivalent" -version = "1.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" - -[[package]] -name = "hashbrown" -version = "0.14.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7dfda62a12f55daeae5015f81b0baea145391cb4520f86c248fc615d72640d12" - -[[package]] -name = "heck" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" -dependencies = [ - "unicode-segmentation", -] - -[[package]] -name = "id-arena" -version = "2.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "25a2bc672d1148e28034f176e01fffebb08b35768468cc954630da77a1449005" - -[[package]] -name = "indexmap" -version = "2.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8adf3ddd720272c6ea8bf59463c04e0f93d0bbf7c5439b691bca2987e0270897" -dependencies = [ - "equivalent", - "hashbrown", - "serde", -] - -[[package]] -name = "itoa" -version = "1.0.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "af150ab688ff2122fcef229be89cb50dd66af9e01a4ff320cc137eecc9bacc38" - -[[package]] -name = "leb128" -version = "0.2.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "884e2677b40cc8c339eaefcb701c32ef1fd2493d71118dc0ca4b6a736c93bd67" - -[[package]] -name = "log" -version = "0.4.20" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" - -[[package]] -name = "proc-macro2" -version = "1.0.69" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "134c189feb4956b20f6f547d2cf727d4c0fe06722b20a0eec87ed445a97f92da" -dependencies = [ - "unicode-ident", -] - -[[package]] -name = "quote" -version = "1.0.33" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5267fca4496028628a95160fc423a33e8b2e6af8a5302579e322e4b520293cae" -dependencies = [ - "proc-macro2", -] - -[[package]] -name = "rust" -version = "0.1.0" -dependencies = [ - "wit-bindgen", -] - -[[package]] -name = "ryu" -version = "1.0.15" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ad4cc8da4ef723ed60bced201181d83791ad433213d8c24efffda1eec85d741" - -[[package]] -name = "semver" -version = "1.0.20" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "836fa6a3e1e547f9a2c4040802ec865b5d85f4014efe00555d7090a3dcaa1090" - -[[package]] -name = "serde" -version = "1.0.189" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e422a44e74ad4001bdc8eede9a4570ab52f71190e9c076d14369f38b9200537" -dependencies = [ - "serde_derive", -] - -[[package]] -name = "serde_derive" -version = "1.0.189" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e48d1f918009ce3145511378cf68d613e3b3d9137d67272562080d68a2b32d5" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - -[[package]] -name = "serde_json" -version = "1.0.107" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6b420ce6e3d8bd882e9b243c6eed35dbc9a6110c9769e74b584e0d68d1f20c65" -dependencies = [ - "itoa", - "ryu", - "serde", -] - -[[package]] -name = "smallvec" -version = "1.11.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "942b4a808e05215192e39f4ab80813e599068285906cc91aa64f923db842bd5a" - -[[package]] -name = "spdx" -version = "0.10.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b19b32ed6d899ab23174302ff105c1577e45a06b08d4fe0a9dd13ce804bbbf71" -dependencies = [ - "smallvec", -] - -[[package]] -name = "syn" -version = "2.0.38" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e96b79aaa137db8f61e26363a0c9b47d8b4ec75da28b7d1d614c2303e232408b" -dependencies = [ - "proc-macro2", - "quote", - "unicode-ident", -] - -[[package]] -name = "unicode-ident" -version = "1.0.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b" - -[[package]] -name = "unicode-segmentation" -version = "1.10.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1dd624098567895118886609431a7c3b8f516e41d30e0643f03d94592a147e36" - -[[package]] -name = "unicode-xid" -version = "0.2.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f962df74c8c05a667b5ee8bcf162993134c104e96440b663c8daa176dc772d8c" - -[[package]] -name = "wasm-encoder" -version = "0.35.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ca90ba1b5b0a70d3d49473c5579951f3bddc78d47b59256d2f9d4922b150aca" -dependencies = [ - "leb128", -] - -[[package]] -name = "wasm-metadata" -version = "0.10.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "14abc161bfda5b519aa229758b68f2a52b45a12b993808665c857d1a9a00223c" -dependencies = [ - "anyhow", - "indexmap", - "serde", - "serde_derive", - "serde_json", - "spdx", - "wasm-encoder", - "wasmparser", -] - -[[package]] -name = "wasmparser" -version = "0.115.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e06c0641a4add879ba71ccb3a1e4278fd546f76f1eafb21d8f7b07733b547cd5" -dependencies = [ - "indexmap", - "semver", -] - -[[package]] -name = "wit-bindgen" -version = "0.13.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c7d92ce0ca6b6074059413a9581a637550c3a740581c854f9847ec293c8aed71" -dependencies = [ - "bitflags", - "wit-bindgen-rust-macro", -] - -[[package]] -name = "wit-bindgen-core" -version = "0.13.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "565b945ae074886071eccf9cdaf8ccd7b959c2b0d624095bea5fe62003e8b3e0" -dependencies = [ - "anyhow", - "wit-component", - "wit-parser", -] - -[[package]] -name = "wit-bindgen-rust" -version = "0.13.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5695ff4e41873ed9ce56d2787e6b5772bdad9e70e2c1d2d160621d1762257f4f" -dependencies = [ - "anyhow", - "heck", - "wasm-metadata", - "wit-bindgen-core", - "wit-component", -] - -[[package]] -name = "wit-bindgen-rust-macro" -version = "0.13.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a91835ea4231da1fe7971679d505ba14be7826e192b6357f08465866ef482e08" -dependencies = [ - "anyhow", - "proc-macro2", - "quote", - "syn", - "wit-bindgen-core", - "wit-bindgen-rust", - "wit-component", -] - -[[package]] -name = "wit-component" -version = "0.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e87488b57a08e2cbbd076b325acbe7f8666965af174d69d5929cd373bd54547f" -dependencies = [ - "anyhow", - "bitflags", - "indexmap", - "log", - "serde", - "serde_derive", - "serde_json", - "wasm-encoder", - "wasm-metadata", - "wasmparser", - "wit-parser", -] - -[[package]] -name = "wit-parser" -version = "0.12.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f6ace9943d89bbf3dbbc71b966da0e7302057b311f36a4ac3d65ddfef17b52cf" -dependencies = [ - "anyhow", - "id-arena", - "indexmap", - "log", - "semver", - "serde", - "serde_derive", - "serde_json", - "unicode-xid", -] diff --git a/crates/trigger-redis/tests/rust/Cargo.toml b/crates/trigger-redis/tests/rust/Cargo.toml deleted file mode 100644 index ad23592c0b..0000000000 --- a/crates/trigger-redis/tests/rust/Cargo.toml +++ /dev/null @@ -1,13 +0,0 @@ -[package] -name = "rust" -version = "0.1.0" -edition = "2021" -authors = ["Radu Matei "] - -[lib] -crate-type = ["cdylib"] - -[dependencies] -wit-bindgen = "0.13.0" - -[workspace] diff --git a/crates/trigger-redis/tests/rust/src/lib.rs b/crates/trigger-redis/tests/rust/src/lib.rs deleted file mode 100644 index 8371614b9e..0000000000 --- a/crates/trigger-redis/tests/rust/src/lib.rs +++ /dev/null @@ -1,25 +0,0 @@ -use std::str::{from_utf8, Utf8Error}; - -wit_bindgen::generate!({ - world: "redis-trigger", - path: "../../../../wit/deps/spin@unversioned", - exports: { - "fermyon:spin/inbound-redis": SpinRedis, - } -}); -use exports::fermyon::spin::inbound_redis::{self, Error, Payload}; - -struct SpinRedis; - -impl inbound_redis::Guest for SpinRedis { - fn handle_message(message: Payload) -> Result<(), Error> { - println!("Message: {:?}", from_utf8(&message)); - Ok(()) - } -} - -impl From for Error { - fn from(_: Utf8Error) -> Self { - Self::Error - } -} diff --git a/crates/trigger2/src/cli.rs b/crates/trigger2/src/cli.rs index 9bc787cef5..4b6dc422b9 100644 --- a/crates/trigger2/src/cli.rs +++ b/crates/trigger2/src/cli.rs @@ -130,7 +130,7 @@ pub struct FactorsTriggerCommand { /// An empty implementation of clap::Args to be used as TriggerExecutor::RunConfig /// for executors that do not need additional CLI args. #[derive(Args)] -pub struct NoArgs; +pub struct NoCliArgs; impl FactorsTriggerCommand { /// Create a new TriggerExecutorBuilder from this TriggerExecutorCommand. @@ -402,7 +402,7 @@ pub mod help { impl Trigger for HelpArgsOnlyTrigger { const TYPE: &'static str = "help-args-only"; - type CliArgs = NoArgs; + type CliArgs = NoCliArgs; type InstanceState = (); fn new(_cli_args: Self::CliArgs, _app: &App) -> anyhow::Result { diff --git a/crates/trigger2/src/lib.rs b/crates/trigger2/src/lib.rs index 095d30e3b3..0fde4a0219 100644 --- a/crates/trigger2/src/lib.rs +++ b/crates/trigger2/src/lib.rs @@ -1,14 +1,15 @@ +pub mod cli; +mod factors; +mod stdio; + use std::future::Future; use clap::Args; use factors::{TriggerFactors, TriggerFactorsInstanceState}; -use spin_app::App; use spin_core::Linker; use spin_factors_executor::{FactorsExecutorApp, FactorsInstanceBuilder}; -pub mod cli; -mod factors; -mod stdio; +pub use spin_app::App; /// Type alias for a [`FactorsConfiguredApp`] specialized to a [`Trigger`]. pub type TriggerApp = FactorsExecutorApp::InstanceState>; @@ -54,7 +55,7 @@ pub trait Trigger: Sized + Send { /// Run this trigger. fn run( self, - configured_app: TriggerApp, + trigger_app: TriggerApp, ) -> impl Future> + Send; /// Returns a list of host requirements supported by this trigger specifically.