From cb41dc3646909cf89d039f3a40215fcde6bde694 Mon Sep 17 00:00:00 2001 From: Ivan Porto Carrero Date: Sat, 22 Jul 2023 21:04:15 -0700 Subject: [PATCH] address review comments --- kube-core/src/params.rs | 45 +------------ kube-core/src/request.rs | 12 ++++ kube-runtime/src/watcher.rs | 124 +++++++++++++++++++++--------------- 3 files changed, 88 insertions(+), 93 deletions(-) diff --git a/kube-core/src/params.rs b/kube-core/src/params.rs index fefa7f518..a4a5219e7 100644 --- a/kube-core/src/params.rs +++ b/kube-core/src/params.rs @@ -330,14 +330,6 @@ pub struct WatchParams { /// Defaults to true if `resourceVersion=""` or `resourceVersion="0"` (for backward /// compatibility reasons) and to false otherwise. pub send_initial_events: bool, - - /// Determines how resourceVersion is matched applied to list calls. - pub version_match: Option, - - /// An explicit resourceVersion using the given `VersionMatch` strategy - /// - /// See for details. - pub resource_version: Option, } impl WatchParams { @@ -349,12 +341,6 @@ impl WatchParams { } } if self.send_initial_events { - if self.version_match != Some(VersionMatch::NotOlderThan) { - return Err(Error::Validation( - "WatchParams::version_match must be set to NotOlderThan when using send_initial_events" - .into(), - )); - } if !self.bookmarks { return Err(Error::Validation( "WatchParams::bookmarks must be set when using send_initial_events".into(), @@ -380,15 +366,9 @@ impl WatchParams { if self.bookmarks { qp.append_pair("allowWatchBookmarks", "true"); } - if let Some(resource_version_match) = &self.version_match { - match resource_version_match { - VersionMatch::NotOlderThan => { - qp.append_pair("resourceVersionMatch", "NotOlderThan"); - } - VersionMatch::Exact => { - qp.append_pair("resourceVersionMatch", "Exact"); - } - } + if self.send_initial_events { + qp.append_pair("sendInitialEvents", "true"); + qp.append_pair("resourceVersionMatch", "NotOlderThan"); } } } @@ -404,8 +384,6 @@ impl Default for WatchParams { field_selector: None, timeout: None, send_initial_events: false, - resource_version: None, - version_match: None, } } } @@ -490,23 +468,6 @@ impl WatchParams { self } - - /// Sets the resource version - #[must_use] - pub fn at(mut self, resource_version: &str) -> Self { - self.resource_version = Some(resource_version.into()); - self - } - - /// Sets an arbitary resource version match strategy - /// - /// A non-default strategy such as `VersionMatch::Exact` or `VersionMatch::NotGreaterThan` - /// requires an explicit `resource_version` set to pass request validation. - #[must_use] - pub fn matching(mut self, version_match: VersionMatch) -> Self { - self.version_match = Some(version_match); - self - } } /// Common query parameters for put/post calls diff --git a/kube-core/src/request.rs b/kube-core/src/request.rs index 2490c925e..93dd5d8af 100644 --- a/kube-core/src/request.rs +++ b/kube-core/src/request.rs @@ -522,6 +522,18 @@ mod test { "/api/v1/namespaces/ns/pods?&watch=true&timeoutSeconds=290&allowWatchBookmarks=true&resourceVersion=0" ); } + + #[test] + fn watch_streaming_list() { + let url = corev1::Pod::url_path(&(), Some("ns")); + let wp = WatchParams::default().initial_events(); + let req = Request::new(url).watch(&wp, "0").unwrap(); + assert_eq!( + req.uri(), + "/api/v1/namespaces/ns/pods?&watch=true&timeoutSeconds=290&allowWatchBookmarks=true&sendInitialEvents=true&resourceVersionMatch=NotOlderThan&resourceVersion=0" + ); + } + #[test] fn watch_metadata_path() { let url = corev1::Pod::url_path(&(), Some("ns")); diff --git a/kube-runtime/src/watcher.rs b/kube-runtime/src/watcher.rs index 6d528fc37..d1ae8f19f 100644 --- a/kube-runtime/src/watcher.rs +++ b/kube-runtime/src/watcher.rs @@ -234,14 +234,17 @@ pub struct Config { pub list_semantic: ListSemantic, /// Kubernetes 1.27 Streaming Lists - /// Mode for the initial watch call, or when the watcher is restarted. - /// Enables opening a stream to bring data from the API server. - /// Streaming has the primary advantage of using fewer server's resources to fetch data. + /// Determines how the watcher fetches the initial list of objects. + /// Defaults to `ListWatch` for backwards compatibility. /// - /// The old behaviour establishes a LIST request which gets data in chunks. - /// Paginated list is less efficient and depending on the actual size of objects - /// might result in an increased memory consumption of the APIServer. + /// ListWatch: The watcher will fetch the initial list of objects using a list call. + /// StreamingList: The watcher will fetch the initial list of objects using a watch call. /// + /// StreamingList is more efficient than ListWatch, but it requires the server to support + /// streaming list bookmarks. If the server does not support streaming list bookmarks, + /// the watcher will fall back to ListWatch. + /// + /// See https://kubernetes.io/docs/reference/using-api/api-concepts/#streaming-lists /// See https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/3157-watch-list#design-details pub watcher_mode: WatcherMode, @@ -377,17 +380,11 @@ impl Config { /// Converts generic `watcher::Config` structure to the instance of `WatchParams` used for watch requests. fn to_watch_params(&self) -> WatchParams { - let (resource_version, version_match) = match self.list_semantic { - ListSemantic::Any => (Some("0".into()), Some(VersionMatch::NotOlderThan)), - ListSemantic::MostRecent => (None, None), - }; WatchParams { label_selector: self.label_selector.clone(), field_selector: self.field_selector.clone(), timeout: self.timeout, bookmarks: self.bookmarks, - resource_version, - version_match, send_initial_events: self.watcher_mode == WatcherMode::StreamingList, } } @@ -465,16 +462,20 @@ where Ok(list) => { objects.extend(list.items); if let Some(continue_token) = list.metadata.continue_.filter(|s| !s.is_empty()) { - (None, State::Empty { - continue_token: Some(continue_token), - objects, - }) + ( + None, + State::Empty { + continue_token: Some(continue_token), + objects, + }, + ) } else if let Some(resource_version) = list.metadata.resource_version.filter(|s| !s.is_empty()) { - (Some(Ok(Event::Restarted(objects))), State::InitListed { - resource_version, - }) + ( + Some(Ok(Event::Restarted(objects))), + State::InitListed { resource_version }, + ) } else { (Some(Err(Error::NoResourceVersion)), State::default()) } @@ -517,15 +518,21 @@ where Some(Ok(WatchEvent::Bookmark(bm))) => { let marks_initial_end = bm.metadata.annotations.contains_key("k8s.io/initial-events-end"); if marks_initial_end { - (Some(Ok(Event::Restarted(objects))), State::Watching { - resource_version: bm.metadata.resource_version, - stream, - }) + ( + Some(Ok(Event::Restarted(objects))), + State::Watching { + resource_version: bm.metadata.resource_version, + stream, + }, + ) } else { - (None, State::Watching { - resource_version: bm.metadata.resource_version, - stream, - }) + ( + None, + State::Watching { + resource_version: bm.metadata.resource_version, + stream, + }, + ) } } Some(Ok(WatchEvent::Error(err))) => { @@ -548,20 +555,23 @@ where } else { debug!("watcher error: {err:?}"); } - (Some(Err(err).map_err(Error::WatchFailed)), State::IntialWatch { - objects, - stream, - }) + ( + Some(Err(err).map_err(Error::WatchFailed)), + State::IntialWatch { objects, stream }, + ) } None => (None, State::default()), } } State::InitListed { resource_version } => { match api.watch(&wc.to_watch_params(), &resource_version).await { - Ok(stream) => (None, State::Watching { - resource_version, - stream, - }), + Ok(stream) => ( + None, + State::Watching { + resource_version, + stream, + }, + ), Err(err) => { if std::matches!(err, ClientErr::Api(ErrorResponse { code: 403, .. })) { warn!("watch initlist error with 403: {err:?}"); @@ -584,10 +594,13 @@ where if resource_version.is_empty() { (Some(Err(Error::NoResourceVersion)), State::default()) } else { - (Some(Ok(Event::Applied(obj))), State::Watching { - resource_version, - stream, - }) + ( + Some(Ok(Event::Applied(obj))), + State::Watching { + resource_version, + stream, + }, + ) } } Some(Ok(WatchEvent::Deleted(obj))) => { @@ -595,16 +608,22 @@ where if resource_version.is_empty() { (Some(Err(Error::NoResourceVersion)), State::default()) } else { - (Some(Ok(Event::Deleted(obj))), State::Watching { - resource_version, - stream, - }) + ( + Some(Ok(Event::Deleted(obj))), + State::Watching { + resource_version, + stream, + }, + ) } } - Some(Ok(WatchEvent::Bookmark(bm))) => (None, State::Watching { - resource_version: bm.metadata.resource_version, - stream, - }), + Some(Ok(WatchEvent::Bookmark(bm))) => ( + None, + State::Watching { + resource_version: bm.metadata.resource_version, + stream, + }, + ), Some(Ok(WatchEvent::Error(err))) => { // HTTP GONE, means we have desynced and need to start over and re-list :( let new_state = if err.code == 410 { @@ -628,10 +647,13 @@ where } else { debug!("watcher error: {err:?}"); } - (Some(Err(err).map_err(Error::WatchFailed)), State::Watching { - resource_version, - stream, - }) + ( + Some(Err(err).map_err(Error::WatchFailed)), + State::Watching { + resource_version, + stream, + }, + ) } None => (None, State::InitListed { resource_version }), },