diff --git a/kube-runtime/src/watcher.rs b/kube-runtime/src/watcher.rs index 30aa932d5..63b1503ac 100644 --- a/kube-runtime/src/watcher.rs +++ b/kube-runtime/src/watcher.rs @@ -14,7 +14,7 @@ use kube_client::{ Api, Error as ClientErr, }; use serde::de::DeserializeOwned; -use std::{clone::Clone, collections::VecDeque, fmt::Debug, time::Duration}; +use std::{clone::Clone, collections::VecDeque, fmt::Debug, future, time::Duration}; use thiserror::Error; use tracing::{debug, error, warn}; @@ -844,18 +844,36 @@ pub fn watch_object Some(Ok(Some(obj))), - // Pass up `None` for Deleted - Ok(Event::Delete(_)) => Some(Ok(None)), - // Ignore marker events - Ok(Event::Init | Event::InitDone) => None, - // Bubble up errors - Err(err) => Some(Err(err)), - } - }) + watcher(api, Config::default().fields(&fields)) + // The `obj_seen` state is used to track whether the object exists in each Init / InitApply / InitDone + // sequence of events. If the object wasn't seen in any particular sequence it is treated as deleted and + // `None` is emitted when the InitDone event is received. + // + // The first check ensures `None` is emitted if the object was already gone (or not found), subsequent + // checks ensure `None` is emitted even if for some reason the Delete event wasn't received, which + // could happen given K8S events aren't guaranteed delivery. + .scan(false, |obj_seen, event| { + if matches!(event, Ok(Event::Init)) { + *obj_seen = false; + } else if matches!(event, Ok(Event::InitApply(_))) { + *obj_seen = true; + } + future::ready(Some((*obj_seen, event))) + }) + .filter_map(|(obj_seen, event)| async move { + match event { + // Pass up `Some` for Found / Updated + Ok(Event::Apply(obj) | Event::InitApply(obj)) => Some(Ok(Some(obj))), + // Pass up `None` for Deleted + Ok(Event::Delete(_)) => Some(Ok(None)), + // Pass up `None` if the object wasn't seen in the initial list + Ok(Event::InitDone) if !obj_seen => Some(Ok(None)), + // Ignore marker events + Ok(Event::Init | Event::InitDone) => None, + // Bubble up errors + Err(err) => Some(Err(err)), + } + }) } /// Default watcher backoff inspired by Kubernetes' client-go.