diff --git a/examples/shared_stream_controllers.rs b/examples/shared_stream_controllers.rs index 0d2b11b9b..f8ff276d6 100644 --- a/examples/shared_stream_controllers.rs +++ b/examples/shared_stream_controllers.rs @@ -1,9 +1,14 @@ -use std::{sync::Arc, time::Duration}; +use std::{ops::Deref, sync::Arc, time::Duration}; -use futures::StreamExt; +use futures::{future, StreamExt}; use k8s_openapi::api::{apps::v1::Deployment, core::v1::Pod}; use kube::{ - runtime::{controller::Action, reflector, watcher, Config, Controller, WatchStreamExt}, + runtime::{ + controller::Action, + predicates, + reflector::{self, ReflectHandle}, + watcher, Config, Controller, WatchStreamExt, + }, Api, Client, ResourceExt, }; use tracing::{debug, error, info, warn}; @@ -49,10 +54,17 @@ async fn main() -> anyhow::Result<()> { let (reader, writer) = reflector::store_shared(SUBSCRIBE_BUFFER_SIZE); // Before threading an object watch through the store, create a subscriber. // Any number of subscribers can be created from one writer. - let subscriber = writer + let subscriber: ReflectHandle = writer .subscribe() .expect("subscribers can only be created from shared stores"); + // Subscriber events can be filtered in advance with predicates + let filtered = subscriber + .clone() + .map(|r| Ok(r.deref().clone())) + .predicate_filter(predicates::resource_version) + .filter_map(|r| future::ready(r.ok().map(Arc::new))); + // Reflect a stream of pod watch events into the store and apply a backoff. For subscribers to // be able to consume updates, the reflector must be shared. let pod_watch = watcher(pods.clone(), Default::default()) @@ -68,7 +80,7 @@ async fn main() -> anyhow::Result<()> { // Create the first controller; the controller will log whenever it // reconciles a pod. The reconcile is a no-op. // Controllers accept subscribers through a dedicated interface. - let pod_controller = Controller::for_shared_stream(subscriber.clone(), reader) + let pod_controller = Controller::for_shared_stream(filtered, reader) .with_config(config.clone()) .shutdown_on_signal() .run(reconcile, error_policy, Arc::new(()))