Skip to content

Commit

Permalink
address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
casualjim committed Jul 23, 2023
1 parent 69abbef commit e835c70
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 93 deletions.
45 changes: 3 additions & 42 deletions kube-core/src/params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,14 +330,6 @@ pub struct WatchParams {
/// Defaults to true if `resourceVersion=""` or `resourceVersion="0"` (for backward
/// compatibility reasons) and to false otherwise.
pub send_initial_events: bool,

/// Determines how resourceVersion is matched applied to list calls.
pub version_match: Option<VersionMatch>,

/// An explicit resourceVersion using the given `VersionMatch` strategy
///
/// See <https://kubernetes.io/docs/reference/using-api/api-concepts/#resource-versions> for details.
pub resource_version: Option<String>,
}

impl WatchParams {
Expand All @@ -349,12 +341,6 @@ impl WatchParams {
}
}
if self.send_initial_events {
if self.version_match != Some(VersionMatch::NotOlderThan) {
return Err(Error::Validation(
"WatchParams::version_match must be set to NotOlderThan when using send_initial_events"
.into(),
));
}
if !self.bookmarks {
return Err(Error::Validation(
"WatchParams::bookmarks must be set when using send_initial_events".into(),
Expand All @@ -380,15 +366,9 @@ impl WatchParams {
if self.bookmarks {
qp.append_pair("allowWatchBookmarks", "true");
}
if let Some(resource_version_match) = &self.version_match {
match resource_version_match {
VersionMatch::NotOlderThan => {
qp.append_pair("resourceVersionMatch", "NotOlderThan");
}
VersionMatch::Exact => {
qp.append_pair("resourceVersionMatch", "Exact");
}
}
if self.send_initial_events {
qp.append_pair("sendInitialEvents", "true");
qp.append_pair("resourceVersionMatch", "NotOlderThan");
}
}
}
Expand All @@ -404,8 +384,6 @@ impl Default for WatchParams {
field_selector: None,
timeout: None,
send_initial_events: false,
resource_version: None,
version_match: None,
}
}
}
Expand Down Expand Up @@ -490,23 +468,6 @@ impl WatchParams {

self
}

/// Sets the resource version
#[must_use]
pub fn at(mut self, resource_version: &str) -> Self {
self.resource_version = Some(resource_version.into());
self
}

/// Sets an arbitary resource version match strategy
///
/// A non-default strategy such as `VersionMatch::Exact` or `VersionMatch::NotGreaterThan`
/// requires an explicit `resource_version` set to pass request validation.
#[must_use]
pub fn matching(mut self, version_match: VersionMatch) -> Self {
self.version_match = Some(version_match);
self
}
}

/// Common query parameters for put/post calls
Expand Down
12 changes: 12 additions & 0 deletions kube-core/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,18 @@ mod test {
"/api/v1/namespaces/ns/pods?&watch=true&timeoutSeconds=290&allowWatchBookmarks=true&resourceVersion=0"
);
}

#[test]
fn watch_streaming_list() {
let url = corev1::Pod::url_path(&(), Some("ns"));
let wp = WatchParams::default().initial_events();
let req = Request::new(url).watch(&wp, "0").unwrap();
assert_eq!(
req.uri(),
"/api/v1/namespaces/ns/pods?&watch=true&timeoutSeconds=290&allowWatchBookmarks=true&sendInitialEvents=true&resourceVersionMatch=NotOlderThan&resourceVersion=0"
);
}

#[test]
fn watch_metadata_path() {
let url = corev1::Pod::url_path(&(), Some("ns"));
Expand Down
124 changes: 73 additions & 51 deletions kube-runtime/src/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,14 +234,17 @@ pub struct Config {
pub list_semantic: ListSemantic,

/// Kubernetes 1.27 Streaming Lists
/// Mode for the initial watch call, or when the watcher is restarted.
/// Enables opening a stream to bring data from the API server.
/// Streaming has the primary advantage of using fewer server's resources to fetch data.
/// Determines how the watcher fetches the initial list of objects.
/// Defaults to `ListWatch` for backwards compatibility.
///
/// The old behaviour establishes a LIST request which gets data in chunks.
/// Paginated list is less efficient and depending on the actual size of objects
/// might result in an increased memory consumption of the APIServer.
/// ListWatch: The watcher will fetch the initial list of objects using a list call.
/// StreamingList: The watcher will fetch the initial list of objects using a watch call.
///
/// StreamingList is more efficient than ListWatch, but it requires the server to support
/// streaming list bookmarks. If the server does not support streaming list bookmarks,
/// the watcher will fall back to ListWatch.
///
/// See https://kubernetes.io/docs/reference/using-api/api-concepts/#streaming-lists
/// See https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/3157-watch-list#design-details
pub watcher_mode: WatcherMode,

Expand Down Expand Up @@ -377,17 +380,11 @@ impl Config {

/// Converts generic `watcher::Config` structure to the instance of `WatchParams` used for watch requests.
fn to_watch_params(&self) -> WatchParams {
let (resource_version, version_match) = match self.list_semantic {
ListSemantic::Any => (Some("0".into()), Some(VersionMatch::NotOlderThan)),
ListSemantic::MostRecent => (None, None),
};
WatchParams {
label_selector: self.label_selector.clone(),
field_selector: self.field_selector.clone(),
timeout: self.timeout,
bookmarks: self.bookmarks,
resource_version,
version_match,
send_initial_events: self.watcher_mode == WatcherMode::StreamingList,
}
}
Expand Down Expand Up @@ -465,16 +462,20 @@ where
Ok(list) => {
objects.extend(list.items);
if let Some(continue_token) = list.metadata.continue_.filter(|s| !s.is_empty()) {
(None, State::Empty {
continue_token: Some(continue_token),
objects,
})
(
None,
State::Empty {
continue_token: Some(continue_token),
objects,
},
)
} else if let Some(resource_version) =
list.metadata.resource_version.filter(|s| !s.is_empty())
{
(Some(Ok(Event::Restarted(objects))), State::InitListed {
resource_version,
})
(
Some(Ok(Event::Restarted(objects))),
State::InitListed { resource_version },
)
} else {
(Some(Err(Error::NoResourceVersion)), State::default())
}
Expand Down Expand Up @@ -517,15 +518,21 @@ where
Some(Ok(WatchEvent::Bookmark(bm))) => {
let marks_initial_end = bm.metadata.annotations.contains_key("k8s.io/initial-events-end");
if marks_initial_end {
(Some(Ok(Event::Restarted(objects))), State::Watching {
resource_version: bm.metadata.resource_version,
stream,
})
(
Some(Ok(Event::Restarted(objects))),
State::Watching {
resource_version: bm.metadata.resource_version,
stream,
},
)
} else {
(None, State::Watching {
resource_version: bm.metadata.resource_version,
stream,
})
(
None,
State::Watching {
resource_version: bm.metadata.resource_version,
stream,
},
)
}
}
Some(Ok(WatchEvent::Error(err))) => {
Expand All @@ -548,20 +555,23 @@ where
} else {
debug!("watcher error: {err:?}");
}
(Some(Err(err).map_err(Error::WatchFailed)), State::IntialWatch {
objects,
stream,
})
(
Some(Err(err).map_err(Error::WatchFailed)),
State::IntialWatch { objects, stream },
)
}
None => (None, State::default()),
}
}
State::InitListed { resource_version } => {
match api.watch(&wc.to_watch_params(), &resource_version).await {
Ok(stream) => (None, State::Watching {
resource_version,
stream,
}),
Ok(stream) => (
None,
State::Watching {
resource_version,
stream,
},
),
Err(err) => {
if std::matches!(err, ClientErr::Api(ErrorResponse { code: 403, .. })) {
warn!("watch initlist error with 403: {err:?}");
Expand All @@ -584,27 +594,36 @@ where
if resource_version.is_empty() {
(Some(Err(Error::NoResourceVersion)), State::default())
} else {
(Some(Ok(Event::Applied(obj))), State::Watching {
resource_version,
stream,
})
(
Some(Ok(Event::Applied(obj))),
State::Watching {
resource_version,
stream,
},
)
}
}
Some(Ok(WatchEvent::Deleted(obj))) => {
let resource_version = obj.resource_version().unwrap_or_default();
if resource_version.is_empty() {
(Some(Err(Error::NoResourceVersion)), State::default())
} else {
(Some(Ok(Event::Deleted(obj))), State::Watching {
resource_version,
stream,
})
(
Some(Ok(Event::Deleted(obj))),
State::Watching {
resource_version,
stream,
},
)
}
}
Some(Ok(WatchEvent::Bookmark(bm))) => (None, State::Watching {
resource_version: bm.metadata.resource_version,
stream,
}),
Some(Ok(WatchEvent::Bookmark(bm))) => (
None,
State::Watching {
resource_version: bm.metadata.resource_version,
stream,
},
),
Some(Ok(WatchEvent::Error(err))) => {
// HTTP GONE, means we have desynced and need to start over and re-list :(
let new_state = if err.code == 410 {
Expand All @@ -628,10 +647,13 @@ where
} else {
debug!("watcher error: {err:?}");
}
(Some(Err(err).map_err(Error::WatchFailed)), State::Watching {
resource_version,
stream,
})
(
Some(Err(err).map_err(Error::WatchFailed)),
State::Watching {
resource_version,
stream,
},
)
}
None => (None, State::InitListed { resource_version }),
},
Expand Down

0 comments on commit e835c70

Please sign in to comment.