diff --git a/README.md b/README.md index 305bbd23f..80c17b378 100644 --- a/README.md +++ b/README.md @@ -17,7 +17,7 @@ Select a version of `kube` along with the generated [k8s-openapi](https://github ```toml [dependencies] kube = { version = "0.84.0", features = ["runtime", "derive"] } -k8s-openapi = { version = "0.18.0", features = ["v1_26"] } +k8s-openapi = { version = "0.18.0", features = ["v1_27"] } ``` [Features are available](https://github.com/kube-rs/kube/blob/main/kube/Cargo.toml#L18). @@ -153,7 +153,7 @@ By default `openssl` is used for TLS, but [rustls](https://github.com/ctz/rustls ```toml [dependencies] kube = { version = "0.84.0", default-features = false, features = ["client", "rustls-tls"] } -k8s-openapi = { version = "0.18.0", features = ["v1_26"] } +k8s-openapi = { version = "0.18.0", features = ["v1_27"] } ``` This will pull in `rustls` and `hyper-rustls`. If `default-features` is left enabled, you will pull in two TLS stacks, and the default will remain as `openssl`. diff --git a/e2e/Cargo.toml b/e2e/Cargo.toml index b7cfb8e4e..19b679cdd 100644 --- a/e2e/Cargo.toml +++ b/e2e/Cargo.toml @@ -18,7 +18,7 @@ name = "boot" path = "boot.rs" [features] -latest = ["k8s-openapi/v1_26"] +latest = ["k8s-openapi/v1_27"] mk8sv = ["k8s-openapi/v1_21"] rustls = ["kube/rustls-tls"] openssl = ["kube/openssl-tls"] @@ -29,6 +29,6 @@ tracing = "0.1.36" tracing-subscriber = "0.3.3" futures = "0.3.17" kube = { path = "../kube", version = "^0.84.0", default-features = false, features = ["client", "runtime", "ws", "admission", "gzip"] } -k8s-openapi = { version = "0.18.0", default-features = false } +k8s-openapi = { git = "https://github.com/Arnavion/k8s-openapi", branch = "master", default-features = false } serde_json = "1.0.68" tokio = { version = "1.14.0", features = ["full"] } diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 74a698928..4d8589e65 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -21,7 +21,7 @@ rustls-tls = ["kube/client", "kube/rustls-tls"] runtime = ["kube/runtime", "kube/unstable-runtime"] refresh = ["kube/oauth", "kube/oidc"] ws = ["kube/ws"] -latest = ["k8s-openapi/v1_26"] +latest = ["k8s-openapi/v1_27"] [dev-dependencies] tokio-util = "0.7.0" @@ -32,7 +32,7 @@ futures = "0.3.17" jsonpath_lib = "0.3.0" kube = { path = "../kube", version = "^0.84.0", default-features = false, features = ["admission"] } kube-derive = { path = "../kube-derive", version = "^0.84.0", default-features = false } # only needed to opt out of schema -k8s-openapi = { version = "0.18.0", default-features = false } +k8s-openapi = { git = "https://github.com/Arnavion/k8s-openapi", branch = "master", default-features = false } serde = { version = "1.0.130", features = ["derive"] } serde_json = "1.0.68" serde_yaml = "0.9.19" @@ -213,3 +213,7 @@ path = "secret_syncer.rs" name = "pod_shell_crossterm" path = "pod_shell_crossterm.rs" required-features = ["ws"] + +[[example]] +name = "namespace_reflector" +path = "namespace_reflector.rs" diff --git a/examples/namespace_reflector.rs b/examples/namespace_reflector.rs new file mode 100644 index 000000000..6d7af6385 --- /dev/null +++ b/examples/namespace_reflector.rs @@ -0,0 +1,49 @@ +use futures::TryStreamExt; +use k8s_openapi::api::core::v1::Namespace; +use kube::{ + api::Api, + runtime::{predicates, reflector, watcher, WatchStreamExt}, + Client, ResourceExt, +}; +use tracing::*; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + tracing_subscriber::fmt::init(); + let client = Client::try_default().await?; + + let api: Api = Api::all(client); + let (reader, writer) = reflector::store::(); + + tokio::spawn(async move { + // Show state every 5 seconds of watching + loop { + reader.wait_until_ready().await.unwrap(); + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + info!("Current namespace count: {}", reader.state().len()); + // full information with debug logs + for p in reader.state() { + let yaml = serde_yaml::to_string(p.as_ref()).unwrap(); + debug!("Namespace {}: \n{}", p.name_any(), yaml); + } + } + }); + + let stream = watcher(api, watcher::Config::default().streaming_lists()) + .default_backoff() + .modify(|ns| { + // memory optimization for our store - we don't care about managed fields/annotations/status + ns.managed_fields_mut().clear(); + ns.annotations_mut().clear(); + ns.status = None; + }) + .reflect(writer) + .applied_objects() + .predicate_filter(predicates::resource_version); // NB: requires an unstable feature + + futures::pin_mut!(stream); + while let Some(ns) = stream.try_next().await? { + info!("saw {}", ns.name_any()); + } + Ok(()) +} diff --git a/justfile b/justfile index 9a8d6f556..31139ee54 100644 --- a/justfile +++ b/justfile @@ -15,7 +15,7 @@ fmt: rustfmt +nightly --edition 2021 $(find . -type f -iname *.rs) doc: - RUSTDOCFLAGS="--cfg docsrs" cargo +nightly doc --lib --workspace --features=derive,ws,oauth,oidc,jsonpatch,client,derive,runtime,admission,k8s-openapi/v1_26,unstable-runtime --open + RUSTDOCFLAGS="--cfg docsrs" cargo +nightly doc --lib --workspace --features=derive,ws,oauth,oidc,jsonpatch,client,derive,runtime,admission,k8s-openapi/v1_27,unstable-runtime --open deny: # might require rm Cargo.lock first to match CI @@ -83,11 +83,12 @@ e2e-job-musl features: chmod +x e2e/job k3d: - k3d cluster create main --servers 1 --registry-create main \ + k3d cluster create main --servers 1 --registry-create main --image rancher/k3s:v1.27.3-k3s1 \ --no-lb --no-rollback \ --k3s-arg "--disable=traefik,servicelb,metrics-server@server:*" \ --k3s-arg '--kubelet-arg=eviction-hard=imagefs.available<1%,nodefs.available<1%@agent:*' \ - --k3s-arg '--kubelet-arg=eviction-minimum-reclaim=imagefs.available=1%,nodefs.available=1%@agent:*' + --k3s-arg '--kubelet-arg=eviction-minimum-reclaim=imagefs.available=1%,nodefs.available=1%@agent:*' \ + --k3s-arg '--kube-apiserver-arg=feature-gates=WatchList=true' ## RELEASE RELATED diff --git a/kube-client/Cargo.toml b/kube-client/Cargo.toml index 19a8bd3f2..4e13eebed 100644 --- a/kube-client/Cargo.toml +++ b/kube-client/Cargo.toml @@ -32,7 +32,7 @@ config = ["__non_core", "pem", "home"] __non_core = ["tracing", "serde_yaml", "base64"] [package.metadata.docs.rs] -features = ["client", "rustls-tls", "openssl-tls", "ws", "oauth", "oidc", "jsonpatch", "admission", "k8s-openapi/v1_26"] +features = ["client", "rustls-tls", "openssl-tls", "ws", "oauth", "oidc", "jsonpatch", "admission", "k8s-openapi/v1_27"] # Define the configuration attribute `docsrs`. Used to enable `doc_cfg` feature. rustdoc-args = ["--cfg", "docsrs"] @@ -72,7 +72,8 @@ hyper-openssl = { version = "0.9.2", optional = true } form_urlencoded = { version = "1.2.0", optional = true } [dependencies.k8s-openapi] -version = "0.18.0" +git = "https://github.com/Arnavion/k8s-openapi" +branch = "master" default-features = false features = [] @@ -85,6 +86,7 @@ tokio-test = "0.4.0" tower-test = "0.4.0" [dev-dependencies.k8s-openapi] -version = "0.18.0" +git = "https://github.com/Arnavion/k8s-openapi" +branch = "master" default-features = false -features = ["v1_26"] +features = ["v1_27"] diff --git a/kube-core/Cargo.toml b/kube-core/Cargo.toml index 71dc8202e..33e7dd9d5 100644 --- a/kube-core/Cargo.toml +++ b/kube-core/Cargo.toml @@ -15,7 +15,7 @@ repository = "https://github.com/kube-rs/kube" readme = "../README.md" [package.metadata.docs.rs] -features = ["ws", "admission", "jsonpatch", "k8s-openapi/v1_26"] +features = ["ws", "admission", "jsonpatch", "k8s-openapi/v1_27"] rustdoc-args = ["--cfg", "docsrs"] [features] @@ -36,14 +36,16 @@ chrono = { version = "0.4.19", default-features = false, features = ["clock"] } schemars = { version = "0.8.6", optional = true } [dependencies.k8s-openapi] -version = "0.18.0" +git = "https://github.com/Arnavion/k8s-openapi" +branch = "master" default-features = false features = [] [dev-dependencies.k8s-openapi] -version = "0.18.0" +git = "https://github.com/Arnavion/k8s-openapi" +branch = "master" default-features = false -features = ["v1_26"] +features = ["v1_27"] [dev-dependencies] assert-json-diff = "2.0.1" diff --git a/kube-core/src/params.rs b/kube-core/src/params.rs index 93d333f19..dca11816d 100644 --- a/kube-core/src/params.rs +++ b/kube-core/src/params.rs @@ -305,6 +305,32 @@ pub struct WatchParams { /// If the feature gate WatchBookmarks is not enabled in apiserver, /// this field is ignored. pub bookmarks: bool, + + /// Kubernetes 1.27 Streaming Lists + /// `sendInitialEvents=true` may be set together with `watch=true`. + /// In that case, the watch stream will begin with synthetic events to + /// produce the current state of objects in the collection. Once all such + /// events have been sent, a synthetic "Bookmark" event will be sent. + /// The bookmark will report the ResourceVersion (RV) corresponding to the + /// set of objects, and be marked with `"k8s.io/initial-events-end": "true"` annotation. + /// Afterwards, the watch stream will proceed as usual, sending watch events + /// corresponding to changes (subsequent to the RV) to objects watched. + /// + /// When `sendInitialEvents` option is set, we require `resourceVersionMatch` + /// option to also be set. The semantic of the watch request is as following: + /// - `resourceVersionMatch` = NotOlderThan + /// is interpreted as "data at least as new as the provided `resourceVersion`" + /// and the bookmark event is send when the state is synced + /// to a `resourceVersion` at least as fresh as the one provided by the ListOptions. + /// If `resourceVersion` is unset, this is interpreted as "consistent read" and the + /// bookmark event is send when the state is synced at least to the moment + /// when request started being processed. + /// - `resourceVersionMatch` set to any other value or unset + /// Invalid error is returned. + /// + /// Defaults to true if `resourceVersion=""` or `resourceVersion="0"` (for backward + /// compatibility reasons) and to false otherwise. + pub send_initial_events: bool, } impl WatchParams { @@ -315,6 +341,13 @@ impl WatchParams { return Err(Error::Validation("WatchParams::timeout must be < 295s".into())); } } + if self.send_initial_events { + if !self.bookmarks { + return Err(Error::Validation( + "WatchParams::bookmarks must be set when using send_initial_events".into(), + )); + } + } Ok(()) } @@ -334,6 +367,10 @@ impl WatchParams { if self.bookmarks { qp.append_pair("allowWatchBookmarks", "true"); } + if self.send_initial_events { + qp.append_pair("sendInitialEvents", "true"); + qp.append_pair("resourceVersionMatch", "NotOlderThan"); + } } } @@ -347,6 +384,7 @@ impl Default for WatchParams { label_selector: None, field_selector: None, timeout: None, + send_initial_events: false, } } } @@ -401,6 +439,37 @@ impl WatchParams { self.bookmarks = false; self } + + /// Kubernetes 1.27 Streaming Lists + /// `sendInitialEvents=true` may be set together with `watch=true`. + /// In that case, the watch stream will begin with synthetic events to + /// produce the current state of objects in the collection. Once all such + /// events have been sent, a synthetic "Bookmark" event will be sent. + /// The bookmark will report the ResourceVersion (RV) corresponding to the + /// set of objects, and be marked with `"k8s.io/initial-events-end": "true"` annotation. + /// Afterwards, the watch stream will proceed as usual, sending watch events + /// corresponding to changes (subsequent to the RV) to objects watched. + /// + /// When `sendInitialEvents` option is set, we require `resourceVersionMatch` + /// option to also be set. The semantic of the watch request is as following: + /// - `resourceVersionMatch` = NotOlderThan + /// is interpreted as "data at least as new as the provided `resourceVersion`" + /// and the bookmark event is send when the state is synced + /// to a `resourceVersion` at least as fresh as the one provided by the ListOptions. + /// If `resourceVersion` is unset, this is interpreted as "consistent read" and the + /// bookmark event is send when the state is synced at least to the moment + /// when request started being processed. + /// - `resourceVersionMatch` set to any other value or unset + /// Invalid error is returned. + /// + /// Defaults to true if `resourceVersion=""` or `resourceVersion="0"` (for backward + /// compatibility reasons) and to false otherwise. + #[must_use] + pub fn initial_events(mut self) -> Self { + self.send_initial_events = true; + + self + } } /// Common query parameters for put/post calls diff --git a/kube-core/src/request.rs b/kube-core/src/request.rs index 2490c925e..93dd5d8af 100644 --- a/kube-core/src/request.rs +++ b/kube-core/src/request.rs @@ -522,6 +522,18 @@ mod test { "/api/v1/namespaces/ns/pods?&watch=true&timeoutSeconds=290&allowWatchBookmarks=true&resourceVersion=0" ); } + + #[test] + fn watch_streaming_list() { + let url = corev1::Pod::url_path(&(), Some("ns")); + let wp = WatchParams::default().initial_events(); + let req = Request::new(url).watch(&wp, "0").unwrap(); + assert_eq!( + req.uri(), + "/api/v1/namespaces/ns/pods?&watch=true&timeoutSeconds=290&allowWatchBookmarks=true&sendInitialEvents=true&resourceVersionMatch=NotOlderThan&resourceVersion=0" + ); + } + #[test] fn watch_metadata_path() { let url = corev1::Pod::url_path(&(), Some("ns")); diff --git a/kube-core/src/watch.rs b/kube-core/src/watch.rs index 4155d490e..f2423af8d 100644 --- a/kube-core/src/watch.rs +++ b/kube-core/src/watch.rs @@ -59,4 +59,9 @@ pub struct Bookmark { pub struct BookmarkMeta { /// The only field we need from a Bookmark event. pub resource_version: String, + + /// Kubernetes 1.27 Streaming Lists + /// The rest of the fields are optional and may be empty. + #[serde(default)] + pub annotations: std::collections::BTreeMap, } diff --git a/kube-derive/Cargo.toml b/kube-derive/Cargo.toml index a6eae1092..d90698c27 100644 --- a/kube-derive/Cargo.toml +++ b/kube-derive/Cargo.toml @@ -29,7 +29,7 @@ proc-macro = true serde = { version = "1.0.130", features = ["derive"] } serde_yaml = "0.9.19" kube = { path = "../kube", version = "<1.0.0, >=0.61.0", features = ["derive", "client"] } -k8s-openapi = { version = "0.18.0", default-features = false, features = ["v1_26"] } +k8s-openapi = { git = "https://github.com/Arnavion/k8s-openapi", branch = "master", default-features = false, features = ["v1_27"] } schemars = { version = "0.8.6", features = ["chrono"] } chrono = { version = "0.4.19", default-features = false } trybuild = "1.0.48" diff --git a/kube-runtime/Cargo.toml b/kube-runtime/Cargo.toml index e25f639fb..ee77505b1 100644 --- a/kube-runtime/Cargo.toml +++ b/kube-runtime/Cargo.toml @@ -22,7 +22,7 @@ unstable-runtime-stream-control = [] unstable-runtime-reconcile-on = [] [package.metadata.docs.rs] -features = ["k8s-openapi/v1_26", "unstable-runtime"] +features = ["k8s-openapi/v1_27", "unstable-runtime"] # Define the configuration attribute `docsrs`. Used to enable `doc_cfg` feature. rustdoc-args = ["--cfg", "docsrs"] @@ -46,7 +46,8 @@ async-trait = "0.1.64" hashbrown = "0.14.0" [dependencies.k8s-openapi] -version = "0.18.0" +git = "https://github.com/Arnavion/k8s-openapi" +branch = "master" default-features = false [dev-dependencies] @@ -58,6 +59,7 @@ schemars = "0.8.6" tracing-subscriber = "0.3.17" [dev-dependencies.k8s-openapi] -version = "0.18.0" +git = "https://github.com/Arnavion/k8s-openapi" +branch = "master" default-features = false -features = ["v1_26"] +features = ["v1_27"] diff --git a/kube-runtime/src/watcher.rs b/kube-runtime/src/watcher.rs index 0b3815ccc..3c946bcf7 100644 --- a/kube-runtime/src/watcher.rs +++ b/kube-runtime/src/watcher.rs @@ -122,6 +122,13 @@ enum State { continue_token: Option, objects: Vec, }, + /// Kubernetes 1.27 Streaming Lists + /// The initial watch is in progress + IntialWatch { + objects: Vec, + #[derivative(Debug = "ignore")] + stream: BoxStream<'static, kube_client::Result>>, + }, /// The initial LIST was successful, so we should move on to starting the actual watch. InitListed { resource_version: String }, /// The watch is in progress, from this point we just return events from the server. @@ -192,6 +199,16 @@ pub enum ListSemantic { Any, } +/// Configurable watcher listwatch semantics +#[derive(Clone, Default, Debug, PartialEq)] +pub enum WatcherMode { + #[default] + ListWatch, + /// Kubernetes 1.27 Streaming Lists + /// https://kubernetes.io/docs/reference/using-api/api-concepts/#streaming-lists + StreamingList, +} + /// Accumulates all options that can be used on the watcher invocation. #[derive(Clone, Debug, PartialEq)] pub struct Config { @@ -217,6 +234,21 @@ pub struct Config { /// Configures re-list for performance vs. consistency. pub list_semantic: ListSemantic, + /// Kubernetes 1.27 Streaming Lists + /// Determines how the watcher fetches the initial list of objects. + /// Defaults to `ListWatch` for backwards compatibility. + /// + /// ListWatch: The watcher will fetch the initial list of objects using a list call. + /// StreamingList: The watcher will fetch the initial list of objects using a watch call. + /// + /// StreamingList is more efficient than ListWatch, but it requires the server to support + /// streaming list bookmarks. If the server does not support streaming list bookmarks, + /// the watcher will fall back to ListWatch. + /// + /// See https://kubernetes.io/docs/reference/using-api/api-concepts/#streaming-lists + /// See https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/3157-watch-list#design-details + pub watcher_mode: WatcherMode, + /// Maximum number of objects retrieved per list operation resyncs. /// /// This can reduce the memory consumption during resyncs, at the cost of requiring more @@ -243,6 +275,7 @@ impl Default for Config { // same default page size limit as client-go // https://github.com/kubernetes/client-go/blob/aed71fa5cf054e1c196d67b2e21f66fd967b8ab1/tools/pager/pager.go#L31 page_size: Some(500), + watcher_mode: WatcherMode::ListWatch, } } } @@ -321,6 +354,14 @@ impl Config { self } + /// Kubernetes 1.27 Streaming Lists + /// Sets list semantic to `Stream` to make use of watch bookmarks + #[must_use] + pub fn streaming_lists(mut self) -> Self { + self.watcher_mode = WatcherMode::StreamingList; + self + } + /// Converts generic `watcher::Config` structure to the instance of `ListParams` used for list requests. fn to_list_params(&self) -> ListParams { let (resource_version, version_match) = match self.list_semantic { @@ -346,6 +387,7 @@ impl Config { field_selector: self.field_selector.clone(), timeout: self.timeout, bookmarks: self.bookmarks, + send_initial_events: self.watcher_mode == WatcherMode::StreamingList, } } } @@ -414,43 +456,124 @@ where State::Empty { continue_token, mut objects, + } => match wc.watcher_mode { + WatcherMode::ListWatch => { + let mut lp = wc.to_list_params(); + lp.continue_token = continue_token; + match api.list(&lp).await { + Ok(list) => { + objects.extend(list.items); + if let Some(continue_token) = list.metadata.continue_.filter(|s| !s.is_empty()) { + ( + None, + State::Empty { + continue_token: Some(continue_token), + objects, + }, + ) + } else if let Some(resource_version) = + list.metadata.resource_version.filter(|s| !s.is_empty()) + { + ( + Some(Ok(Event::Restarted(objects))), + State::InitListed { resource_version }, + ) + } else { + (Some(Err(Error::NoResourceVersion)), State::default()) + } + } + Err(err) => { + if std::matches!(err, ClientErr::Api(ErrorResponse { code: 403, .. })) { + warn!("watch list error with 403: {err:?}"); + } else { + debug!("watch list error: {err:?}"); + } + (Some(Err(err).map_err(Error::InitialListFailed)), State::default()) + } + } + } + WatcherMode::StreamingList => match api.watch(&wc.to_watch_params(), "0").await { + Ok(stream) => (None, State::IntialWatch { stream, objects }), + Err(err) => { + if std::matches!(err, ClientErr::Api(ErrorResponse { code: 403, .. })) { + warn!("watch initlist error with 403: {err:?}"); + } else { + debug!("watch initlist error: {err:?}"); + } + (Some(Err(err).map_err(Error::WatchStartFailed)), State::default()) + } + }, + }, + State::IntialWatch { + mut objects, + mut stream, } => { - let mut lp = wc.to_list_params(); - lp.continue_token = continue_token; - match api.list(&lp).await { - Ok(list) => { - objects.extend(list.items); - if let Some(continue_token) = list.metadata.continue_.filter(|s| !s.is_empty()) { - (None, State::Empty { - continue_token: Some(continue_token), - objects, - }) - } else if let Some(resource_version) = - list.metadata.resource_version.filter(|s| !s.is_empty()) - { - (Some(Ok(Event::Restarted(objects))), State::InitListed { - resource_version, - }) + match stream.next().await { + Some(Ok(WatchEvent::Added(obj) | WatchEvent::Modified(obj))) => { + objects.push(obj); + (None, State::IntialWatch { objects, stream }) + } + Some(Ok(WatchEvent::Deleted(obj))) => { + objects.retain(|o| o.name_any() != obj.name_any() && o.namespace() != obj.namespace()); + (None, State::IntialWatch { objects, stream }) + } + Some(Ok(WatchEvent::Bookmark(bm))) => { + let marks_initial_end = bm.metadata.annotations.contains_key("k8s.io/initial-events-end"); + if marks_initial_end { + ( + Some(Ok(Event::Restarted(objects))), + State::Watching { + resource_version: bm.metadata.resource_version, + stream, + }, + ) } else { - (Some(Err(Error::NoResourceVersion)), State::default()) + ( + None, + State::Watching { + resource_version: bm.metadata.resource_version, + stream, + }, + ) } } - Err(err) => { + Some(Ok(WatchEvent::Error(err))) => { + // HTTP GONE, means we have desynced and need to start over and re-list :( + let new_state = if err.code == 410 { + State::default() + } else { + State::IntialWatch { objects, stream } + }; + if err.code == 403 { + warn!("watcher watchevent error 403: {err:?}"); + } else { + debug!("error watchevent error: {err:?}"); + } + (Some(Err(err).map_err(Error::WatchError)), new_state) + } + Some(Err(err)) => { if std::matches!(err, ClientErr::Api(ErrorResponse { code: 403, .. })) { - warn!("watch list error with 403: {err:?}"); + warn!("watcher error 403: {err:?}"); } else { - debug!("watch list error: {err:?}"); + debug!("watcher error: {err:?}"); } - (Some(Err(err).map_err(Error::InitialListFailed)), State::default()) + ( + Some(Err(err).map_err(Error::WatchFailed)), + State::IntialWatch { objects, stream }, + ) } + None => (None, State::default()), } } State::InitListed { resource_version } => { match api.watch(&wc.to_watch_params(), &resource_version).await { - Ok(stream) => (None, State::Watching { - resource_version, - stream, - }), + Ok(stream) => ( + None, + State::Watching { + resource_version, + stream, + }, + ), Err(err) => { if std::matches!(err, ClientErr::Api(ErrorResponse { code: 403, .. })) { warn!("watch initlist error with 403: {err:?}"); @@ -473,10 +596,13 @@ where if resource_version.is_empty() { (Some(Err(Error::NoResourceVersion)), State::default()) } else { - (Some(Ok(Event::Applied(obj))), State::Watching { - resource_version, - stream, - }) + ( + Some(Ok(Event::Applied(obj))), + State::Watching { + resource_version, + stream, + }, + ) } } Some(Ok(WatchEvent::Deleted(obj))) => { @@ -484,16 +610,22 @@ where if resource_version.is_empty() { (Some(Err(Error::NoResourceVersion)), State::default()) } else { - (Some(Ok(Event::Deleted(obj))), State::Watching { - resource_version, - stream, - }) + ( + Some(Ok(Event::Deleted(obj))), + State::Watching { + resource_version, + stream, + }, + ) } } - Some(Ok(WatchEvent::Bookmark(bm))) => (None, State::Watching { - resource_version: bm.metadata.resource_version, - stream, - }), + Some(Ok(WatchEvent::Bookmark(bm))) => ( + None, + State::Watching { + resource_version: bm.metadata.resource_version, + stream, + }, + ), Some(Ok(WatchEvent::Error(err))) => { // HTTP GONE, means we have desynced and need to start over and re-list :( let new_state = if err.code == 410 { @@ -517,10 +649,13 @@ where } else { debug!("watcher error: {err:?}"); } - (Some(Err(err).map_err(Error::WatchFailed)), State::Watching { - resource_version, - stream, - }) + ( + Some(Err(err).map_err(Error::WatchFailed)), + State::Watching { + resource_version, + stream, + }, + ) } None => (None, State::InitListed { resource_version }), }, diff --git a/kube/Cargo.toml b/kube/Cargo.toml index d8bc2e146..36d019f89 100644 --- a/kube/Cargo.toml +++ b/kube/Cargo.toml @@ -32,7 +32,7 @@ runtime = ["kube-runtime"] unstable-runtime = ["kube-runtime/unstable-runtime"] [package.metadata.docs.rs] -features = ["client", "rustls-tls", "openssl-tls", "derive", "ws", "oauth", "jsonpatch", "admission", "runtime", "k8s-openapi/v1_26", "unstable-runtime"] +features = ["client", "rustls-tls", "openssl-tls", "derive", "ws", "oauth", "jsonpatch", "admission", "runtime", "k8s-openapi/v1_27", "unstable-runtime"] # Define the configuration attribute `docsrs`. Used to enable `doc_cfg` feature. rustdoc-args = ["--cfg", "docsrs"] @@ -45,7 +45,8 @@ kube-runtime = { path = "../kube-runtime", version = "=0.84.0", optional = true} # Not used directly, but required by resolver 2.0 to ensure that the k8s-openapi dependency # is considered part of the "deps" graph rather than just the "dev-deps" graph [dependencies.k8s-openapi] -version = "0.18.0" +git = "https://github.com/Arnavion/k8s-openapi" +branch = "master" default-features = false [dev-dependencies] @@ -60,6 +61,7 @@ tower-test = "0.4.0" anyhow = "1.0.71" [dev-dependencies.k8s-openapi] -version = "0.18.0" +git = "https://github.com/Arnavion/k8s-openapi" +branch = "master" default-features = false -features = ["v1_26"] +features = ["v1_27"] diff --git a/release.toml b/release.toml index 21c7eef10..52e7108c7 100644 --- a/release.toml +++ b/release.toml @@ -4,7 +4,7 @@ # # 0. (optional) cargo release minor ; verify readme + changelog bumped; then git reset --hard # 1. PUBLISH_GRACE_SLEEP=20 cargo release minor --execute -# 1X. - on failure: follow plan manually, cd into next dirs and publish insequence with cargo publish --features=k8s-openapi/v1_26 +# 1X. - on failure: follow plan manually, cd into next dirs and publish insequence with cargo publish --features=k8s-openapi/v1_27 # 2. check consolidated commit # 2X. - on failure: git commit --amend and insert version # 3. ./scripts/release-post.sh @@ -21,4 +21,4 @@ push = false tag = false # A Kubernetes version is normally supplied by the application consuming the library in the end. # Since we don't have that when verifying, supply one ourselves. -enable-features = ["k8s-openapi/v1_26"] +enable-features = ["k8s-openapi/v1_27"]