diff --git a/Cargo.lock b/Cargo.lock index 2d9c8fcd535..d1c90290746 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -251,6 +251,28 @@ dependencies = [ "tokio", ] +[[package]] +name = "async-stream" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.75", +] + [[package]] name = "async-task" version = "4.7.1" @@ -1589,6 +1611,7 @@ dependencies = [ "stageleft_tool", "syn 2.0.75", "tokio", + "tokio-test", "toml", "trybuild", "trybuild-internals-api", @@ -3701,6 +3724,19 @@ dependencies = [ "tokio-util", ] +[[package]] +name = "tokio-test" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2468baabc3311435b55dd935f702f42cd1b8abb7e754fb7dfb16bd36aa88f9f7" +dependencies = [ + "async-stream", + "bytes", + "futures-core", + "tokio", + "tokio-stream", +] + [[package]] name = "tokio-tungstenite" version = "0.20.1" diff --git a/hydro_lang/Cargo.toml b/hydro_lang/Cargo.toml index 025ca886a34..0ebfe55e1e6 100644 --- a/hydro_lang/Cargo.toml +++ b/hydro_lang/Cargo.toml @@ -49,4 +49,5 @@ async-ssh2-lite = { version = "0.5.0", features = ["vendored-openssl"] } ctor = "0.2.8" hydro_deploy = { path = "../hydro_deploy/core", version = "^0.11.0" } insta = "1.39" +tokio-test = "0.4.4" trybuild = "1" diff --git a/hydro_lang/src/stream.rs b/hydro_lang/src/stream.rs index 6a29fec9405..b9f6c013b11 100644 --- a/hydro_lang/src/stream.rs +++ b/hydro_lang/src/stream.rs @@ -211,6 +211,34 @@ impl<'a, T: Clone, L: Location<'a>, B, Order> Clone for Stream { } impl<'a, T, L: Location<'a>, B, Order> Stream { + /// Transforms the stream by applying a function (`f`) to each element, + /// emitting the output elements in the same order as the input. + /// + /// # Example + /// ```rust + /// # use hydro_lang::*; + /// # use dfir_rs::futures::StreamExt; + /// # tokio_test::block_on(async { + /// # let mut deployment = hydro_deploy::Deployment::new(); + /// # let flow = FlowBuilder::new(); + /// # let process = flow.process::<()>(); + /// # let external = flow.external_process::<()>(); + /// let numbers = process.source_iter(q!(0..10)); + /// let mapped = numbers.map(q!(|n| n * 2)); + /// # let out_port = mapped.send_bincode_external(&external); + /// # let nodes = flow + /// # .with_process(&process, deployment.Localhost()) + /// # .with_external(&external, deployment.Localhost()) + /// # .deploy(&mut deployment); + /// # deployment.deploy().await.unwrap(); + /// # let mut external_out = nodes.connect_source_bincode(out_port).await; + /// # deployment.start().await.unwrap(); + /// // 2, 4, 6, 8, ... + /// # for i in 0..10 { + /// # assert_eq!(external_out.next().await.unwrap(), i * 2); + /// # } + /// # }); + /// ``` pub fn map U + 'a>( self, f: impl IntoQuotedMut<'a, F, L>,