diff --git a/hydro_lang/src/location/tick.rs b/hydro_lang/src/location/tick.rs index f161fc170b4..2841db80646 100644 --- a/hydro_lang/src/location/tick.rs +++ b/hydro_lang/src/location/tick.rs @@ -123,7 +123,7 @@ impl<'a, L: Location<'a>> Tick { } } - pub fn singleton_first_tick( + pub fn optional_first_tick( &self, e: impl QuotedWithContext<'a, T, Tick>, ) -> Optional diff --git a/hydro_lang/src/optional.rs b/hydro_lang/src/optional.rs index f686c3927b7..dbfa591e0b3 100644 --- a/hydro_lang/src/optional.rs +++ b/hydro_lang/src/optional.rs @@ -197,6 +197,24 @@ impl<'a, T: Clone, L: Location<'a>, B> Clone for Optional { } impl<'a, T, L: Location<'a>, B> Optional { + /// Transforms the optional value by applying a function `f` to it, + /// continuously as the input is updated. + /// + /// Whenever the optional is empty, the output optional is also empty. + /// + /// # Example + /// ```rust + /// # use hydro_lang::*; + /// # use dfir_rs::futures::StreamExt; + /// # tokio_test::block_on(test_util::stream_transform_test(|process| { + /// let tick = process.tick(); + /// let optional = tick.optional_first_tick(q!(1)); + /// optional.map(q!(|v| v + 1)).all_ticks().drop_timestamp() + /// # }, |mut stream| async move { + /// // 2 + /// # assert_eq!(stream.next().await.unwrap(), 2); + /// # })); + /// ``` pub fn map U + 'a>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional { let f = f.splice_fn1_ctx(&self.location).into(); Optional::new( diff --git a/hydro_test/src/cluster/bench_client.rs b/hydro_test/src/cluster/bench_client.rs index 788a61c8da2..d573a9d2c1f 100644 --- a/hydro_test/src/cluster/bench_client.rs +++ b/hydro_test/src/cluster/bench_client.rs @@ -19,7 +19,7 @@ pub fn bench_client<'a>( // r_to_clients_payload_applied.clone().inspect(q!(|payload: &(u32, ReplicaPayload)| println!("Client received payload: {:?}", payload))); // Set up an initial set of payloads on the first tick - let start_this_tick = client_tick.singleton_first_tick(q!(())); + let start_this_tick = client_tick.optional_first_tick(q!(())); let c_new_payloads_on_start = start_this_tick.clone().flat_map_ordered(q!(move |_| (0 ..num_clients_per_node)