From 92a012f6b9cff15a1bbd9d05713416a406b55018 Mon Sep 17 00:00:00 2001 From: Fabrizio Sestito Date: Thu, 23 May 2024 11:52:27 +0200 Subject: [PATCH] Reduce buffering between watcher and Store (#1494) * wip Signed-off-by: Fabrizio Sestito * wip tests Signed-off-by: Fabrizio Sestito * rename events Signed-off-by: Fabrizio Sestito * avoid locking on the buffer Signed-off-by: Fabrizio Sestito * fix EventFlatten RestartDelete match Signed-off-by: Fabrizio Sestito * improve docs on event variants Signed-off-by: Fabrizio Sestito * filter out Restart and RestartInit from watch_object Signed-off-by: Fabrizio Sestito * add comment to clarify why we are not using .clear() Signed-off-by: Fabrizio Sestito * cleanup store / apply suggestions Signed-off-by: Fabrizio Sestito * broadcast when receiving a Restart event directly from the store Signed-off-by: Fabrizio Sestito * do not dispatch RestartApply(obj) event Signed-off-by: Fabrizio Sestito * fix runtime tests after paging Signed-off-by: clux Signed-off-by: Fabrizio Sestito --------- Signed-off-by: Fabrizio Sestito Signed-off-by: clux Co-authored-by: clux --- kube-runtime/Cargo.toml | 2 +- kube-runtime/src/controller/mod.rs | 5 +- kube-runtime/src/reflector/dispatcher.rs | 104 ++++++++---- kube-runtime/src/reflector/mod.rs | 29 ++-- kube-runtime/src/reflector/store.rs | 67 ++++++-- kube-runtime/src/utils/event_flatten.rs | 22 +-- kube-runtime/src/utils/event_modify.rs | 8 +- kube-runtime/src/utils/reflect.rs | 22 ++- kube-runtime/src/watcher.rs | 207 ++++++++++++----------- 9 files changed, 287 insertions(+), 179 deletions(-) diff --git a/kube-runtime/Cargo.toml b/kube-runtime/Cargo.toml index 6b191836f..6c1710ad7 100644 --- a/kube-runtime/Cargo.toml +++ b/kube-runtime/Cargo.toml @@ -30,7 +30,7 @@ rust.unsafe_code = "forbid" #rust.missing_docs = "warn" [dependencies] -futures.workspace = true +futures = { workspace = true, features = ["async-await"] } kube-client = { path = "../kube-client", version = "=0.91.0", default-features = false, features = ["jsonpatch", "client"] } derivative.workspace = true serde.workspace = true diff --git a/kube-runtime/src/controller/mod.rs b/kube-runtime/src/controller/mod.rs index 5c36b38e7..986cc5a37 100644 --- a/kube-runtime/src/controller/mod.rs +++ b/kube-runtime/src/controller/mod.rs @@ -1753,7 +1753,7 @@ mod tests { |obj, _| { Box::pin(async move { // Try to flood the rescheduling buffer buffer by just putting it back in the queue immediately - println!("reconciling {:?}", obj.metadata.name); + //println!("reconciling {:?}", obj.metadata.name); Ok(Action::requeue(Duration::ZERO)) }) }, @@ -1763,6 +1763,7 @@ mod tests { queue_rx.map(Result::<_, Infallible>::Ok), Config::default(), )); + store_tx.apply_watcher_event(&watcher::Event::Restart); for i in 0..items { let obj = ConfigMap { metadata: ObjectMeta { @@ -1772,7 +1773,7 @@ mod tests { }, ..Default::default() }; - store_tx.apply_watcher_event(&watcher::Event::Applied(obj.clone())); + store_tx.apply_watcher_event(&watcher::Event::Apply(obj.clone())); queue_tx.unbounded_send(ObjectRef::from_obj(&obj)).unwrap(); } diff --git a/kube-runtime/src/reflector/dispatcher.rs b/kube-runtime/src/reflector/dispatcher.rs index 7fb1a2cf0..7173f4184 100644 --- a/kube-runtime/src/reflector/dispatcher.rs +++ b/kube-runtime/src/reflector/dispatcher.rs @@ -165,9 +165,11 @@ pub(crate) mod test { let foo = testpod("foo"); let bar = testpod("bar"); let st = stream::iter([ - Ok(Event::Applied(foo.clone())), + Ok(Event::Apply(foo.clone())), Err(Error::TooManyObjects), - Ok(Event::Restarted(vec![foo, bar])), + Ok(Event::RestartInit), + Ok(Event::RestartPage(vec![foo, bar])), + Ok(Event::Restart), ]); let (reader, writer) = reflector::store_shared(10); @@ -178,7 +180,7 @@ pub(crate) mod test { assert_eq!(reader.len(), 0); assert!(matches!( poll!(reflect.next()), - Poll::Ready(Some(Ok(Event::Applied(_)))) + Poll::Ready(Some(Ok(Event::Apply(_)))) )); // Make progress and assert all events are seen @@ -190,7 +192,15 @@ pub(crate) mod test { assert_eq!(reader.len(), 1); let restarted = poll!(reflect.next()); - assert!(matches!(restarted, Poll::Ready(Some(Ok(Event::Restarted(_)))))); + assert!(matches!(restarted, Poll::Ready(Some(Ok(Event::RestartInit))))); + assert_eq!(reader.len(), 1); + + let restarted = poll!(reflect.next()); + assert!(matches!(restarted, Poll::Ready(Some(Ok(Event::RestartPage(_)))))); + assert_eq!(reader.len(), 1); + + let restarted = poll!(reflect.next()); + assert!(matches!(restarted, Poll::Ready(Some(Ok(Event::Restart))))); assert_eq!(reader.len(), 2); assert!(matches!(poll!(reflect.next()), Poll::Ready(None))); @@ -206,10 +216,12 @@ pub(crate) mod test { let foo = testpod("foo"); let bar = testpod("bar"); let st = stream::iter([ - Ok(Event::Deleted(foo.clone())), - Ok(Event::Applied(foo.clone())), + Ok(Event::Delete(foo.clone())), + Ok(Event::Apply(foo.clone())), Err(Error::TooManyObjects), - Ok(Event::Restarted(vec![foo.clone(), bar.clone()])), + Ok(Event::RestartInit), + Ok(Event::RestartPage(vec![foo.clone(), bar.clone()])), + Ok(Event::Restart), ]); let foo = Arc::new(foo); @@ -224,13 +236,13 @@ pub(crate) mod test { // Deleted events should be skipped by subscriber. assert!(matches!( poll!(reflect.next()), - Poll::Ready(Some(Ok(Event::Deleted(_)))) + Poll::Ready(Some(Ok(Event::Delete(_)))) )); assert_eq!(poll!(subscriber.next()), Poll::Pending); assert!(matches!( poll!(reflect.next()), - Poll::Ready(Some(Ok(Event::Applied(_)))) + Poll::Ready(Some(Ok(Event::Apply(_)))) )); assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(foo.clone()))); @@ -242,12 +254,25 @@ pub(crate) mod test { assert!(matches!(poll!(subscriber.next()), Poll::Pending)); // Restart event will yield all objects in the list + assert!(matches!( poll!(reflect.next()), - Poll::Ready(Some(Ok(Event::Restarted(_)))) + Poll::Ready(Some(Ok(Event::RestartInit))) )); - assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(foo.clone()))); - assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(bar.clone()))); + + assert!(matches!( + poll!(reflect.next()), + Poll::Ready(Some(Ok(Event::RestartPage(_)))) + )); + + assert!(matches!( + poll!(reflect.next()), + Poll::Ready(Some(Ok(Event::Restart))) + )); + + // these don't come back in order atm: + assert!(matches!(poll!(subscriber.next()), Poll::Ready(Some(_)))); + assert!(matches!(poll!(subscriber.next()), Poll::Ready(Some(_)))); // When main channel is closed, it is propagated to subscribers assert!(matches!(poll!(reflect.next()), Poll::Ready(None))); @@ -261,12 +286,14 @@ pub(crate) mod test { let foo = testpod("foo"); let bar = testpod("bar"); let st = stream::iter([ - Ok(Event::Applied(foo.clone())), - Ok(Event::Restarted(vec![foo.clone(), bar.clone()])), + Ok(Event::Apply(foo.clone())), + Ok(Event::RestartInit), + Ok(Event::RestartPage(vec![foo.clone(), bar.clone()])), + Ok(Event::Restart), ]); let foo = Arc::new(foo); - let bar = Arc::new(bar); + let _bar = Arc::new(bar); let (_, writer) = reflector::store_shared(10); let subscriber = writer.subscribe().unwrap(); @@ -275,7 +302,7 @@ pub(crate) mod test { assert!(matches!( poll!(reflect.next()), - Poll::Ready(Some(Ok(Event::Applied(_)))) + Poll::Ready(Some(Ok(Event::Apply(_)))) )); assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(foo.clone()))); @@ -284,14 +311,28 @@ pub(crate) mod test { // // First, subscribers should be pending. assert_eq!(poll!(subscriber.next()), Poll::Pending); + assert!(matches!( poll!(reflect.next()), - Poll::Ready(Some(Ok(Event::Restarted(_)))) + Poll::Ready(Some(Ok(Event::RestartInit))) + )); + assert_eq!(poll!(subscriber.next()), Poll::Pending); + + assert!(matches!( + poll!(reflect.next()), + Poll::Ready(Some(Ok(Event::RestartPage(_)))) + )); + assert_eq!(poll!(subscriber.next()), Poll::Pending); + + assert!(matches!( + poll!(reflect.next()), + Poll::Ready(Some(Ok(Event::Restart))) )); drop(reflect); - assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(foo.clone()))); - assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(bar.clone()))); + // we will get foo and bar here, but we dont have a guaranteed ordering on page events + assert!(matches!(poll!(subscriber.next()), Poll::Ready(Some(_)))); + assert!(matches!(poll!(subscriber.next()), Poll::Ready(Some(_)))); assert_eq!(poll!(subscriber.next()), Poll::Ready(None)); } @@ -305,8 +346,9 @@ pub(crate) mod test { let foo = testpod("foo"); let bar = testpod("bar"); let st = stream::iter([ - Ok(Event::Applied(foo.clone())), - Ok(Event::Restarted(vec![foo.clone(), bar.clone()])), + Ok(Event::Apply(foo.clone())), + Ok(Event::Apply(bar.clone())), + Ok(Event::Apply(foo.clone())), ]); let foo = Arc::new(foo); @@ -325,13 +367,14 @@ pub(crate) mod test { // Poll first subscriber, but not the second. // - // The buffer can hold one value, so even if we have a slow subscriber, + // The buffer can hold one object value, so even if we have a slow subscriber, // we will still get an event from the root. assert!(matches!( poll!(reflect.next()), - Poll::Ready(Some(Ok(Event::Applied(_)))) + Poll::Ready(Some(Ok(Event::Apply(_)))) )); assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(foo.clone()))); + // One subscriber is not reading, so we need to apply backpressure until // channel has capacity. // @@ -348,18 +391,21 @@ pub(crate) mod test { // We now have room for only one more item. In total, the previous event // had two. We repeat the same pattern. + assert!(matches!( + poll!(reflect.next()), + Poll::Ready(Some(Ok(Event::Apply(_)))) + )); + assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(bar.clone()))); assert!(matches!(poll!(reflect.next()), Poll::Pending)); - assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(foo.clone()))); - assert!(matches!(poll!(reflect.next()), Poll::Pending)); - assert_eq!(poll!(subscriber_slow.next()), Poll::Ready(Some(foo.clone()))); + assert_eq!(poll!(subscriber_slow.next()), Poll::Ready(Some(bar.clone()))); assert!(matches!( poll!(reflect.next()), - Poll::Ready(Some(Ok(Event::Restarted(_)))) + Poll::Ready(Some(Ok(Event::Apply(_)))) )); // Poll again to drain the queue. assert!(matches!(poll!(reflect.next()), Poll::Ready(None))); - assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(bar.clone()))); - assert_eq!(poll!(subscriber_slow.next()), Poll::Ready(Some(bar.clone()))); + assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(foo.clone()))); + assert_eq!(poll!(subscriber_slow.next()), Poll::Ready(Some(foo.clone()))); assert_eq!(poll!(subscriber.next()), Poll::Ready(None)); assert_eq!(poll!(subscriber_slow.next()), Poll::Ready(None)); diff --git a/kube-runtime/src/reflector/mod.rs b/kube-runtime/src/reflector/mod.rs index 85726175d..3be1e06ee 100644 --- a/kube-runtime/src/reflector/mod.rs +++ b/kube-runtime/src/reflector/mod.rs @@ -157,13 +157,10 @@ mod tests { }, ..ConfigMap::default() }; - reflector( - store_w, - stream::iter(vec![Ok(watcher::Event::Applied(cm.clone()))]), - ) - .map(|_| ()) - .collect::<()>() - .await; + reflector(store_w, stream::iter(vec![Ok(watcher::Event::Apply(cm.clone()))])) + .map(|_| ()) + .collect::<()>() + .await; assert_eq!(store.get(&ObjectRef::from_obj(&cm)).as_deref(), Some(&cm)); } @@ -189,8 +186,8 @@ mod tests { reflector( store_w, stream::iter(vec![ - Ok(watcher::Event::Applied(cm.clone())), - Ok(watcher::Event::Applied(updated_cm.clone())), + Ok(watcher::Event::Apply(cm.clone())), + Ok(watcher::Event::Apply(updated_cm.clone())), ]), ) .map(|_| ()) @@ -213,8 +210,8 @@ mod tests { reflector( store_w, stream::iter(vec![ - Ok(watcher::Event::Applied(cm.clone())), - Ok(watcher::Event::Deleted(cm.clone())), + Ok(watcher::Event::Apply(cm.clone())), + Ok(watcher::Event::Delete(cm.clone())), ]), ) .map(|_| ()) @@ -244,8 +241,10 @@ mod tests { reflector( store_w, stream::iter(vec![ - Ok(watcher::Event::Applied(cm_a.clone())), - Ok(watcher::Event::Restarted(vec![cm_b.clone()])), + Ok(watcher::Event::Apply(cm_a.clone())), + Ok(watcher::Event::RestartInit), + Ok(watcher::Event::RestartPage(vec![cm_b.clone()])), + Ok(watcher::Event::Restart), ]), ) .map(|_| ()) @@ -276,9 +275,9 @@ mod tests { ..ConfigMap::default() }; Ok(if deleted { - watcher::Event::Deleted(obj) + watcher::Event::Delete(obj) } else { - watcher::Event::Applied(obj) + watcher::Event::Apply(obj) }) })), ) diff --git a/kube-runtime/src/reflector/store.rs b/kube-runtime/src/reflector/store.rs index 15db23d34..a0cc10db3 100644 --- a/kube-runtime/src/reflector/store.rs +++ b/kube-runtime/src/reflector/store.rs @@ -21,6 +21,7 @@ where K::DynamicType: Eq + Hash + Clone, { store: Cache, + buffer: AHashMap, Arc>, dyntype: K::DynamicType, ready_tx: Option>, ready_rx: Arc>, @@ -39,6 +40,7 @@ where let (ready_tx, ready_rx) = DelayedInit::new(); Writer { store: Default::default(), + buffer: Default::default(), dyntype, ready_tx: Some(ready_tx), ready_rx: Arc::new(ready_rx), @@ -60,6 +62,7 @@ where let (ready_tx, ready_rx) = DelayedInit::new(); Writer { store: Default::default(), + buffer: Default::default(), dyntype, ready_tx: Some(ready_tx), ready_rx: Arc::new(ready_rx), @@ -96,27 +99,50 @@ where /// Applies a single watcher event to the store pub fn apply_watcher_event(&mut self, event: &watcher::Event) { match event { - watcher::Event::Applied(obj) => { + watcher::Event::Apply(obj) => { let key = obj.to_object_ref(self.dyntype.clone()); let obj = Arc::new(obj.clone()); self.store.write().insert(key, obj); } - watcher::Event::Deleted(obj) => { + watcher::Event::Delete(obj) => { let key = obj.to_object_ref(self.dyntype.clone()); self.store.write().remove(&key); } - watcher::Event::Restarted(new_objs) => { + watcher::Event::RestartInit => { + self.buffer = AHashMap::new(); + } + watcher::Event::RestartPage(new_objs) => { let new_objs = new_objs .iter() .map(|obj| (obj.to_object_ref(self.dyntype.clone()), Arc::new(obj.clone()))) .collect::>(); - *self.store.write() = new_objs; + self.buffer.extend(new_objs); } - } + watcher::Event::Restart => { + let mut store = self.store.write(); + + // Swap the buffer into the store + std::mem::swap(&mut *store, &mut self.buffer); + + // Clear the buffer + // This is preferred over self.buffer.clear(), as clear() will keep the allocated memory for reuse. + // This way, the old buffer is dropped. + self.buffer = AHashMap::new(); - // Mark as ready after the first event, "releasing" any calls to Store::wait_until_ready() - if let Some(ready_tx) = self.ready_tx.take() { - ready_tx.init(()) + // Mark as ready after the Restart, "releasing" any calls to Store::wait_until_ready() + if let Some(ready_tx) = self.ready_tx.take() { + ready_tx.init(()) + } + } + watcher::Event::RestartApply(obj) => { + let key = obj.to_object_ref(self.dyntype.clone()); + let obj = Arc::new(obj.clone()); + self.buffer.insert(key, obj); + } + watcher::Event::RestartDelete(obj) => { + let key = obj.to_object_ref(self.dyntype.clone()); + self.buffer.remove(&key); + } } } @@ -124,20 +150,25 @@ where pub(crate) async fn dispatch_event(&mut self, event: &watcher::Event) { if let Some(ref mut dispatcher) = self.dispatcher { match event { - watcher::Event::Applied(obj) => { + watcher::Event::Apply(obj) => { let obj_ref = obj.to_object_ref(self.dyntype.clone()); // TODO (matei): should this take a timeout to log when backpressure has // been applied for too long, e.g. 10s dispatcher.broadcast(obj_ref).await; } - watcher::Event::Restarted(new_objs) => { - let obj_refs = new_objs.iter().map(|obj| obj.to_object_ref(self.dyntype.clone())); + watcher::Event::Restart => { + let obj_refs: Vec<_> = { + let store = self.store.read(); + store.keys().cloned().collect() + }; + for obj_ref in obj_refs { dispatcher.broadcast(obj_ref).await; } } - watcher::Event::Deleted(_) => {} + + _ => {} } } } @@ -302,7 +333,7 @@ mod tests { ..ConfigMap::default() }; let mut store_w = Writer::default(); - store_w.apply_watcher_event(&watcher::Event::Applied(cm.clone())); + store_w.apply_watcher_event(&watcher::Event::Apply(cm.clone())); let store = store_w.as_reader(); assert_eq!(store.get(&ObjectRef::from_obj(&cm)).as_deref(), Some(&cm)); } @@ -320,7 +351,7 @@ mod tests { let mut cluster_cm = cm.clone(); cluster_cm.metadata.namespace = None; let mut store_w = Writer::default(); - store_w.apply_watcher_event(&watcher::Event::Applied(cm)); + store_w.apply_watcher_event(&watcher::Event::Apply(cm)); let store = store_w.as_reader(); assert_eq!(store.get(&ObjectRef::from_obj(&cluster_cm)), None); } @@ -336,7 +367,7 @@ mod tests { ..ConfigMap::default() }; let (store, mut writer) = store(); - writer.apply_watcher_event(&watcher::Event::Applied(cm.clone())); + writer.apply_watcher_event(&watcher::Event::Apply(cm.clone())); assert_eq!(store.get(&ObjectRef::from_obj(&cm)).as_deref(), Some(&cm)); } @@ -354,7 +385,7 @@ mod tests { let mut nsed_cm = cm.clone(); nsed_cm.metadata.namespace = Some("ns".to_string()); let mut store_w = Writer::default(); - store_w.apply_watcher_event(&watcher::Event::Applied(cm.clone())); + store_w.apply_watcher_event(&watcher::Event::Apply(cm.clone())); let store = store_w.as_reader(); assert_eq!(store.get(&ObjectRef::from_obj(&nsed_cm)).as_deref(), Some(&cm)); } @@ -373,14 +404,14 @@ mod tests { let (reader, mut writer) = store::(); assert!(reader.is_empty()); - writer.apply_watcher_event(&watcher::Event::Applied(cm)); + writer.apply_watcher_event(&watcher::Event::Apply(cm)); assert_eq!(reader.len(), 1); assert!(reader.find(|k| k.metadata.generation == Some(1234)).is_none()); target_cm.metadata.name = Some("obj1".to_string()); target_cm.metadata.generation = Some(1234); - writer.apply_watcher_event(&watcher::Event::Applied(target_cm.clone())); + writer.apply_watcher_event(&watcher::Event::Apply(target_cm.clone())); assert!(!reader.is_empty()); assert_eq!(reader.len(), 2); let found = reader.find(|k| k.metadata.generation == Some(1234)); diff --git a/kube-runtime/src/utils/event_flatten.rs b/kube-runtime/src/utils/event_flatten.rs index b73a80b44..da81af4c7 100644 --- a/kube-runtime/src/utils/event_flatten.rs +++ b/kube-runtime/src/utils/event_flatten.rs @@ -36,22 +36,24 @@ where if let Some(item) = me.queue.next() { break Some(Ok(item)); } - break match ready!(me.stream.as_mut().poll_next(cx)) { - Some(Ok(Event::Applied(obj))) => Some(Ok(obj)), - Some(Ok(Event::Deleted(obj))) => { + let var_name = match ready!(me.stream.as_mut().poll_next(cx)) { + Some(Ok(Event::Apply(obj) | Event::RestartApply(obj))) => Some(Ok(obj)), + Some(Ok(Event::Delete(obj) | Event::RestartDelete(obj))) => { if *me.emit_deleted { Some(Ok(obj)) } else { continue; } } - Some(Ok(Event::Restarted(objs))) => { + Some(Ok(Event::RestartPage(objs))) => { *me.queue = objs.into_iter(); continue; } + Some(Ok(Event::RestartInit | Event::Restart)) => continue, Some(Err(err)) => Some(Err(err)), None => return Poll::Ready(None), }; + break var_name; }) } } @@ -66,13 +68,13 @@ pub(crate) mod tests { #[tokio::test] async fn watches_applies_uses_correct_eventflattened_stream() { let data = stream::iter([ - Ok(Event::Applied(0)), - Ok(Event::Applied(1)), - Ok(Event::Deleted(0)), - Ok(Event::Applied(2)), - Ok(Event::Restarted(vec![1, 2])), + Ok(Event::Apply(0)), + Ok(Event::Apply(1)), + Ok(Event::Delete(0)), + Ok(Event::Apply(2)), + Ok(Event::RestartPage(vec![1, 2])), Err(Error::TooManyObjects), - Ok(Event::Applied(2)), + Ok(Event::Apply(2)), ]); let mut rx = pin!(EventFlatten::new(data, false)); assert!(matches!(poll!(rx.next()), Poll::Ready(Some(Ok(0))))); diff --git a/kube-runtime/src/utils/event_modify.rs b/kube-runtime/src/utils/event_modify.rs index 09fb4685e..17753eb9e 100644 --- a/kube-runtime/src/utils/event_modify.rs +++ b/kube-runtime/src/utils/event_modify.rs @@ -54,9 +54,9 @@ pub(crate) mod test { #[tokio::test] async fn eventmodify_modifies_innner_value_of_event() { let st = stream::iter([ - Ok(Event::Applied(0)), + Ok(Event::Apply(0)), Err(Error::TooManyObjects), - Ok(Event::Restarted(vec![10])), + Ok(Event::RestartPage(vec![10])), ]); let mut ev_modify = pin!(EventModify::new(st, |x| { *x += 1; @@ -64,7 +64,7 @@ pub(crate) mod test { assert!(matches!( poll!(ev_modify.next()), - Poll::Ready(Some(Ok(Event::Applied(1)))) + Poll::Ready(Some(Ok(Event::Apply(1)))) )); assert!(matches!( @@ -75,7 +75,7 @@ pub(crate) mod test { let restarted = poll!(ev_modify.next()); assert!(matches!( restarted, - Poll::Ready(Some(Ok(Event::Restarted(vec)))) if vec == [11] + Poll::Ready(Some(Ok(Event::RestartPage(vec)))) if vec == [11] )); assert!(matches!(poll!(ev_modify.next()), Poll::Ready(None))); diff --git a/kube-runtime/src/utils/reflect.rs b/kube-runtime/src/utils/reflect.rs index 00c216218..4e2e19229 100644 --- a/kube-runtime/src/utils/reflect.rs +++ b/kube-runtime/src/utils/reflect.rs @@ -72,9 +72,11 @@ pub(crate) mod test { let foo = testpod("foo"); let bar = testpod("bar"); let st = stream::iter([ - Ok(Event::Applied(foo.clone())), + Ok(Event::Apply(foo.clone())), Err(Error::TooManyObjects), - Ok(Event::Restarted(vec![foo, bar])), + Ok(Event::RestartInit), + Ok(Event::RestartPage(vec![foo, bar])), + Ok(Event::Restart), ]); let (reader, writer) = reflector::store(); @@ -83,7 +85,7 @@ pub(crate) mod test { assert!(matches!( poll!(reflect.next()), - Poll::Ready(Some(Ok(Event::Applied(_)))) + Poll::Ready(Some(Ok(Event::Apply(_)))) )); assert_eq!(reader.len(), 1); @@ -93,8 +95,20 @@ pub(crate) mod test { )); assert_eq!(reader.len(), 1); + assert!(matches!( + poll!(reflect.next()), + Poll::Ready(Some(Ok(Event::RestartInit))) + )); + assert_eq!(reader.len(), 1); + let restarted = poll!(reflect.next()); - assert!(matches!(restarted, Poll::Ready(Some(Ok(Event::Restarted(_)))))); + assert!(matches!(restarted, Poll::Ready(Some(Ok(Event::RestartPage(_)))))); + assert_eq!(reader.len(), 1); + + assert!(matches!( + poll!(reflect.next()), + Poll::Ready(Some(Ok(Event::Restart))) + )); assert_eq!(reader.len(), 2); assert!(matches!(poll!(reflect.next()), Poll::Ready(None))); diff --git a/kube-runtime/src/watcher.rs b/kube-runtime/src/watcher.rs index f1ee886ab..9be78af98 100644 --- a/kube-runtime/src/watcher.rs +++ b/kube-runtime/src/watcher.rs @@ -40,19 +40,36 @@ pub type Result = std::result::Result; /// Watch events returned from the [`watcher`] pub enum Event { /// An object was added or modified - Applied(K), + Apply(K), /// An object was deleted /// /// NOTE: This should not be used for managing persistent state elsewhere, since /// events may be lost if the watcher is unavailable. Use Finalizers instead. - Deleted(K), - /// The watch stream was restarted, so `Deleted` events may have been missed + Delete(K), + /// The watch stream was restarted. /// - /// Should be used as a signal to replace the store contents atomically. + /// If using the `ListWatch` strategy, `RestartPage` events will follow this event. + /// If using the `StreamingList` strategy, this event will be followed by `RestartApply` or `RestartDelete` events. + RestartInit, + /// A page of objects was received during the restart. + /// + /// This event is only returned when using the `ListWatch` strategy. /// - /// Any objects that were previously [`Applied`](Event::Applied) but are not listed in this event + /// Any objects that were previously [`Applied`](Event::Applied) but are not listed in any of the pages /// should be assumed to have been [`Deleted`](Event::Deleted). - Restarted(Vec), + RestartPage(Vec), + /// An object was added or modified during the initial watch. + /// + /// This event is only returned when using the `StreamingList` strategy. + RestartApply(K), + /// An object was deleted during the initial watch. + /// + /// This event is only returned when using the `StreamingList` strategy. + RestartDelete(K), + /// The restart is complete. + /// + /// Should be used as a signal to replace the store contents atomically. + Restart, } impl Event { @@ -62,9 +79,13 @@ impl Event { /// emitted individually. pub fn into_iter_applied(self) -> impl Iterator { match self { - Event::Applied(obj) => SmallVec::from_buf([obj]), - Event::Deleted(_) => SmallVec::new(), - Event::Restarted(objs) => SmallVec::from_vec(objs), + Event::Apply(obj) => SmallVec::from_buf([obj]), + Event::Delete(_) + | Event::RestartInit + | Event::Restart + | Self::RestartApply(_) + | Self::RestartDelete(_) => SmallVec::new(), + Event::RestartPage(objs) => SmallVec::from_vec(objs), } .into_iter() } @@ -76,8 +97,11 @@ impl Event { /// deleted objects. pub fn into_iter_touched(self) -> impl Iterator { match self { - Event::Applied(obj) | Event::Deleted(obj) => SmallVec::from_buf([obj]), - Event::Restarted(objs) => SmallVec::from_vec(objs), + Event::Apply(obj) | Event::Delete(obj) | Event::RestartApply(obj) | Event::RestartDelete(obj) => { + SmallVec::from_buf([obj]) + } + Event::RestartInit | Event::Restart => SmallVec::new(), + Event::RestartPage(objs) => SmallVec::from_vec(objs), } .into_iter() } @@ -102,8 +126,11 @@ impl Event { #[must_use] pub fn modify(mut self, mut f: impl FnMut(&mut K)) -> Self { match &mut self { - Event::Applied(obj) | Event::Deleted(obj) => (f)(obj), - Event::Restarted(objs) => { + Event::Apply(obj) | Event::Delete(obj) | Event::RestartApply(obj) | Event::RestartDelete(obj) => { + (f)(obj) + } + Event::RestartInit | Event::Restart => {} + Event::RestartPage(objs) => { for k in objs { (f)(k) } @@ -113,19 +140,20 @@ impl Event { } } -#[derive(Derivative)] +#[derive(Derivative, Default)] #[derivative(Debug)] /// The internal finite state machine driving the [`watcher`] enum State { /// The Watcher is empty, and the next [`poll`](Stream::poll_next) will start the initial LIST to get all existing objects - Empty { - continue_token: Option, - objects: Vec, - }, + #[default] + Empty, + /// The Watcher is in the process of paginating through the initial LIST + InitPage { continue_token: Option }, + /// The Watcher has completed the initial LIST and is ready to start the watch + InitPageDone { resource_version: String }, /// Kubernetes 1.27 Streaming Lists /// The initial watch is in progress - IntialWatch { - objects: Vec, + InitialWatch { #[derivative(Debug = "ignore")] stream: BoxStream<'static, kube_client::Result>>, }, @@ -144,15 +172,6 @@ enum State { }, } -impl Default for State { - fn default() -> Self { - Self::Empty { - continue_token: None, - objects: vec![], - } - } -} - /// Used to control whether the watcher receives the full object, or only the /// metadata #[async_trait] @@ -467,43 +486,12 @@ where A::Value: Resource + 'static, { match state { - State::Empty { - continue_token, - mut objects, - } => match wc.initial_list_strategy { - InitialListStrategy::ListWatch => { - let mut lp = wc.to_list_params(); - lp.continue_token = continue_token; - match api.list(&lp).await { - Ok(list) => { - objects.extend(list.items); - if let Some(continue_token) = list.metadata.continue_.filter(|s| !s.is_empty()) { - (None, State::Empty { - continue_token: Some(continue_token), - objects, - }) - } else if let Some(resource_version) = - list.metadata.resource_version.filter(|s| !s.is_empty()) - { - (Some(Ok(Event::Restarted(objects))), State::InitListed { - resource_version, - }) - } else { - (Some(Err(Error::NoResourceVersion)), State::default()) - } - } - Err(err) => { - if std::matches!(err, ClientErr::Api(ErrorResponse { code: 403, .. })) { - warn!("watch list error with 403: {err:?}"); - } else { - debug!("watch list error: {err:?}"); - } - (Some(Err(Error::InitialListFailed(err))), State::default()) - } - } - } + State::Empty => match wc.initial_list_strategy { + InitialListStrategy::ListWatch => (Some(Ok(Event::RestartInit)), State::InitPage { + continue_token: None, + }), InitialListStrategy::StreamingList => match api.watch(&wc.to_watch_params(), "0").await { - Ok(stream) => (None, State::IntialWatch { stream, objects }), + Ok(stream) => (None, State::InitialWatch { stream }), Err(err) => { if std::matches!(err, ClientErr::Api(ErrorResponse { code: 403, .. })) { warn!("watch initlist error with 403: {err:?}"); @@ -514,31 +502,57 @@ where } }, }, - State::IntialWatch { - mut objects, - mut stream, - } => { + State::InitPage { continue_token } => { + let mut lp = wc.to_list_params(); + lp.continue_token = continue_token; + match api.list(&lp).await { + Ok(list) => { + if let Some(continue_token) = list.metadata.continue_.filter(|s| !s.is_empty()) { + (Some(Ok(Event::RestartPage(list.items))), State::InitPage { + continue_token: Some(continue_token), + }) + } else if let Some(resource_version) = + list.metadata.resource_version.filter(|s| !s.is_empty()) + { + (Some(Ok(Event::RestartPage(list.items))), State::InitPageDone { + resource_version, + }) + } else { + (Some(Err(Error::NoResourceVersion)), State::Empty) + } + } + Err(err) => { + if std::matches!(err, ClientErr::Api(ErrorResponse { code: 403, .. })) { + warn!("watch list error with 403: {err:?}"); + } else { + debug!("watch list error: {err:?}"); + } + (Some(Err(Error::InitialListFailed(err))), State::Empty) + } + } + } + State::InitPageDone { resource_version } => { + (Some(Ok(Event::Restart)), State::InitListed { resource_version }) + } + State::InitialWatch { mut stream } => { match stream.next().await { Some(Ok(WatchEvent::Added(obj) | WatchEvent::Modified(obj))) => { - objects.push(obj); - (None, State::IntialWatch { objects, stream }) + (Some(Ok(Event::RestartApply(obj))), State::InitialWatch { stream }) } Some(Ok(WatchEvent::Deleted(obj))) => { - objects.retain(|o| o.name_any() != obj.name_any() && o.namespace() != obj.namespace()); - (None, State::IntialWatch { objects, stream }) + (Some(Ok(Event::RestartDelete(obj))), State::InitialWatch { + stream, + }) } Some(Ok(WatchEvent::Bookmark(bm))) => { let marks_initial_end = bm.metadata.annotations.contains_key("k8s.io/initial-events-end"); if marks_initial_end { - (Some(Ok(Event::Restarted(objects))), State::Watching { + (Some(Ok(Event::Restart)), State::Watching { resource_version: bm.metadata.resource_version, stream, }) } else { - (None, State::Watching { - resource_version: bm.metadata.resource_version, - stream, - }) + (None, State::InitialWatch { stream }) } } Some(Ok(WatchEvent::Error(err))) => { @@ -546,7 +560,7 @@ where let new_state = if err.code == 410 { State::default() } else { - State::IntialWatch { objects, stream } + State::InitialWatch { stream } }; if err.code == 403 { warn!("watcher watchevent error 403: {err:?}"); @@ -561,10 +575,7 @@ where } else { debug!("watcher error: {err:?}"); } - (Some(Err(Error::WatchFailed(err))), State::IntialWatch { - objects, - stream, - }) + (Some(Err(Error::WatchFailed(err))), State::InitialWatch { stream }) } None => (None, State::default()), } @@ -596,7 +607,7 @@ where if resource_version.is_empty() { (Some(Err(Error::NoResourceVersion)), State::default()) } else { - (Some(Ok(Event::Applied(obj))), State::Watching { + (Some(Ok(Event::Apply(obj))), State::Watching { resource_version, stream, }) @@ -607,7 +618,7 @@ where if resource_version.is_empty() { (Some(Err(Error::NoResourceVersion)), State::default()) } else { - (Some(Ok(Event::Deleted(obj))), State::Watching { + (Some(Ok(Event::Delete(obj))), State::Watching { resource_version, stream, }) @@ -799,21 +810,25 @@ pub fn metadata_watcher( api: Api, name: &str, ) -> impl Stream>> + Send { - watcher(api, Config::default().fields(&format!("metadata.name={name}"))).map(|event| match event? { - Event::Deleted(_) => Ok(None), - // We're filtering by object name, so getting more than one object means that either: - // 1. The apiserver is accepting multiple objects with the same name, or - // 2. The apiserver is ignoring our query - // In either case, the K8s apiserver is broken and our API will return invalid data, so - // we had better bail out ASAP. - Event::Restarted(objs) if objs.len() > 1 => Err(Error::TooManyObjects), - Event::Restarted(mut objs) => Ok(objs.pop()), - Event::Applied(obj) => Ok(Some(obj)), + watcher(api, Config::default().fields(&format!("metadata.name={name}"))).filter_map(|event| async { + match event { + Ok(Event::Delete(_) | Event::RestartDelete(_)) => Some(Ok(None)), + // We're filtering by object name, so getting more than one object means that either: + // 1. The apiserver is accepting multiple objects with the same name, or + // 2. The apiserver is ignoring our query + // In either case, the K8s apiserver is broken and our API will return invalid data, so + // we had better bail out ASAP. + Ok(Event::RestartPage(objs)) if objs.len() > 1 => Some(Err(Error::TooManyObjects)), + Ok(Event::RestartPage(mut objs)) => Some(Ok(objs.pop())), + Ok(Event::Apply(obj) | Event::RestartApply(obj)) => Some(Ok(Some(obj))), + Ok(Event::Restart | Event::RestartInit) => None, + Err(err) => Some(Err(err)), + } }) }