Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adds watch-list implementation without breaking changes #1255

Merged
merged 2 commits into from
Aug 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -213,3 +213,7 @@ path = "secret_syncer.rs"
name = "pod_shell_crossterm"
path = "pod_shell_crossterm.rs"
required-features = ["ws"]

[[example]]
name = "namespace_reflector"
path = "namespace_reflector.rs"
49 changes: 49 additions & 0 deletions examples/namespace_reflector.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
use futures::TryStreamExt;
use k8s_openapi::api::core::v1::Namespace;
use kube::{
api::Api,
runtime::{predicates, reflector, watcher, WatchStreamExt},
Client, ResourceExt,
};
use tracing::*;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
let client = Client::try_default().await?;

let api: Api<Namespace> = Api::all(client);
let (reader, writer) = reflector::store::<Namespace>();

tokio::spawn(async move {
// Show state every 5 seconds of watching
loop {
reader.wait_until_ready().await.unwrap();
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
info!("Current namespace count: {}", reader.state().len());
// full information with debug logs
for p in reader.state() {
let yaml = serde_yaml::to_string(p.as_ref()).unwrap();
debug!("Namespace {}: \n{}", p.name_any(), yaml);
}
}
});

let stream = watcher(api, watcher::Config::default().streaming_lists())
.default_backoff()
.modify(|ns| {
// memory optimization for our store - we don't care about managed fields/annotations/status
ns.managed_fields_mut().clear();
ns.annotations_mut().clear();
ns.status = None;
})
.reflect(writer)
.applied_objects()
.predicate_filter(predicates::resource_version); // NB: requires an unstable feature

futures::pin_mut!(stream);
while let Some(ns) = stream.try_next().await? {
info!("saw {}", ns.name_any());
}
Ok(())
}
5 changes: 3 additions & 2 deletions justfile
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,12 @@ e2e-job-musl features:
chmod +x e2e/job

k3d:
k3d cluster create main --servers 1 --registry-create main \
k3d cluster create main --servers 1 --registry-create main --image rancher/k3s:v1.27.3-k3s1 \
--no-lb --no-rollback \
--k3s-arg "--disable=traefik,servicelb,metrics-server@server:*" \
--k3s-arg '--kubelet-arg=eviction-hard=imagefs.available<1%,nodefs.available<1%@agent:*' \
--k3s-arg '--kubelet-arg=eviction-minimum-reclaim=imagefs.available=1%,nodefs.available=1%@agent:*'
--k3s-arg '--kubelet-arg=eviction-minimum-reclaim=imagefs.available=1%,nodefs.available=1%@agent:*' \
--k3s-arg '--kube-apiserver-arg=feature-gates=WatchList=true'

## RELEASE RELATED

Expand Down
2 changes: 1 addition & 1 deletion kube-client/src/client/tls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub mod rustls_tls {

let mut client_config = if let Some((chain, pkey)) = identity_pem.map(client_auth).transpose()? {
config_builder
.with_single_cert(chain, pkey)
.with_client_auth_cert(chain, pkey)
.map_err(Error::InvalidPrivateKey)?
} else {
config_builder.with_no_client_auth()
Expand Down
75 changes: 75 additions & 0 deletions kube-core/src/params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,29 @@
/// If the feature gate WatchBookmarks is not enabled in apiserver,
/// this field is ignored.
pub bookmarks: bool,

/// Kubernetes 1.27 Streaming Lists
/// `sendInitialEvents=true` may be set together with `watch=true`.
/// In that case, the watch stream will begin with synthetic events to
/// produce the current state of objects in the collection. Once all such
/// events have been sent, a synthetic "Bookmark" event will be sent.
/// The bookmark will report the ResourceVersion (RV) corresponding to the
/// set of objects, and be marked with `"k8s.io/initial-events-end": "true"` annotation.
/// Afterwards, the watch stream will proceed as usual, sending watch events
/// corresponding to changes (subsequent to the RV) to objects watched.
///
/// When `sendInitialEvents` option is set, we require `resourceVersionMatch`
/// option to also be set. The semantic of the watch request is as following:
/// - `resourceVersionMatch` = NotOlderThan
/// is interpreted as "data at least as new as the provided `resourceVersion`"
/// and the bookmark event is send when the state is synced
/// to a `resourceVersion` at least as fresh as the one provided by the ListOptions.
/// If `resourceVersion` is unset, this is interpreted as "consistent read" and the
/// bookmark event is send when the state is synced at least to the moment
/// when request started being processed.
/// - `resourceVersionMatch` set to any other value or unset
/// Invalid error is returned.
pub send_initial_events: bool,
}

impl WatchParams {
Expand All @@ -315,6 +338,11 @@
return Err(Error::Validation("WatchParams::timeout must be < 295s".into()));
}
}
if self.send_initial_events && !self.bookmarks {
return Err(Error::Validation(
"WatchParams::bookmarks must be set when using send_initial_events".into(),

Check warning on line 343 in kube-core/src/params.rs

View check run for this annotation

Codecov / codecov/patch

kube-core/src/params.rs#L342-L343

Added lines #L342 - L343 were not covered by tests
));
}
clux marked this conversation as resolved.
Show resolved Hide resolved
Ok(())
}

Expand All @@ -334,6 +362,10 @@
if self.bookmarks {
qp.append_pair("allowWatchBookmarks", "true");
}
if self.send_initial_events {
qp.append_pair("sendInitialEvents", "true");
qp.append_pair("resourceVersionMatch", "NotOlderThan");
}
clux marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand All @@ -347,6 +379,7 @@
label_selector: None,
field_selector: None,
timeout: None,
send_initial_events: false,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a sensible default for now. I think we should maybe add a new constructor that sets correct flags rather than mess with the builders/serialization. Something like:

impl WatchParams
    /// Constructor for doing Kubernetes 1.27 Streaming List watches
    ///
    /// Enables [`VersionMatch::NotGreaterThan`] semantics and [`WatchParams::send_initial_events`].
    pub fn streaming_lists() -> Self {
        Self {
            send_initial_events: true,
            version_match: Some(VersionMatch::NotGreaterThan), // required
            bookmarks: true, // required
            ..WatchParams::default()
        }
    }

}
}
}
Expand Down Expand Up @@ -401,6 +434,48 @@
self.bookmarks = false;
self
}

/// Kubernetes 1.27 Streaming Lists
/// `sendInitialEvents=true` may be set together with `watch=true`.
/// In that case, the watch stream will begin with synthetic events to
/// produce the current state of objects in the collection. Once all such
/// events have been sent, a synthetic "Bookmark" event will be sent.
/// The bookmark will report the ResourceVersion (RV) corresponding to the
/// set of objects, and be marked with `"k8s.io/initial-events-end": "true"` annotation.
/// Afterwards, the watch stream will proceed as usual, sending watch events
/// corresponding to changes (subsequent to the RV) to objects watched.
///
/// When `sendInitialEvents` option is set, we require `resourceVersionMatch`
/// option to also be set. The semantic of the watch request is as following:
/// - `resourceVersionMatch` = NotOlderThan
/// is interpreted as "data at least as new as the provided `resourceVersion`"
/// and the bookmark event is send when the state is synced
/// to a `resourceVersion` at least as fresh as the one provided by the ListOptions.
/// If `resourceVersion` is unset, this is interpreted as "consistent read" and the
/// bookmark event is send when the state is synced at least to the moment
/// when request started being processed.
/// - `resourceVersionMatch` set to any other value or unset
/// Invalid error is returned.
///
/// Defaults to true if `resourceVersion=""` or `resourceVersion="0"` (for backward
/// compatibility reasons) and to false otherwise.
#[must_use]
pub fn initial_events(mut self) -> Self {
self.send_initial_events = true;

self
}

/// Constructor for doing Kubernetes 1.27 Streaming List watches
///
/// Enables [`VersionMatch::NotGreaterThan`] semantics and [`WatchParams::send_initial_events`].
pub fn streaming_lists() -> Self {

Check warning on line 472 in kube-core/src/params.rs

View check run for this annotation

Codecov / codecov/patch

kube-core/src/params.rs#L472

Added line #L472 was not covered by tests
Self {
send_initial_events: true,
bookmarks: true, // required
..WatchParams::default()
}
}
}

/// Common query parameters for put/post calls
Expand Down
12 changes: 12 additions & 0 deletions kube-core/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,18 @@ mod test {
"/api/v1/namespaces/ns/pods?&watch=true&timeoutSeconds=290&allowWatchBookmarks=true&resourceVersion=0"
);
}

#[test]
fn watch_streaming_list() {
let url = corev1::Pod::url_path(&(), Some("ns"));
let wp = WatchParams::default().initial_events();
let req = Request::new(url).watch(&wp, "0").unwrap();
assert_eq!(
req.uri(),
"/api/v1/namespaces/ns/pods?&watch=true&timeoutSeconds=290&allowWatchBookmarks=true&sendInitialEvents=true&resourceVersionMatch=NotOlderThan&resourceVersion=0"
);
}

#[test]
fn watch_metadata_path() {
let url = corev1::Pod::url_path(&(), Some("ns"));
Expand Down
5 changes: 5 additions & 0 deletions kube-core/src/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,9 @@ pub struct Bookmark {
pub struct BookmarkMeta {
/// The only field we need from a Bookmark event.
pub resource_version: String,

/// Kubernetes 1.27 Streaming Lists
/// The rest of the fields are optional and may be empty.
#[serde(default)]
pub annotations: std::collections::BTreeMap<String, String>,
}
Loading
Loading