Skip to content

Commit

Permalink
adds streaming watch list operation
Browse files Browse the repository at this point in the history
Signed-off-by: Ivan Porto Carrero <[email protected]>
  • Loading branch information
casualjim committed Jul 23, 2023
1 parent 6643301 commit 145a1cd
Show file tree
Hide file tree
Showing 15 changed files with 352 additions and 69 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ Select a version of `kube` along with the generated [k8s-openapi](https://github
```toml
[dependencies]
kube = { version = "0.84.0", features = ["runtime", "derive"] }
k8s-openapi = { version = "0.18.0", features = ["v1_26"] }
k8s-openapi = { version = "0.18.0", features = ["v1_27"] }
```

[Features are available](https://github.com/kube-rs/kube/blob/main/kube/Cargo.toml#L18).
Expand Down Expand Up @@ -153,7 +153,7 @@ By default `openssl` is used for TLS, but [rustls](https://github.com/ctz/rustls
```toml
[dependencies]
kube = { version = "0.84.0", default-features = false, features = ["client", "rustls-tls"] }
k8s-openapi = { version = "0.18.0", features = ["v1_26"] }
k8s-openapi = { version = "0.18.0", features = ["v1_27"] }
```

This will pull in `rustls` and `hyper-rustls`. If `default-features` is left enabled, you will pull in two TLS stacks, and the default will remain as `openssl`.
Expand Down
4 changes: 2 additions & 2 deletions e2e/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ name = "boot"
path = "boot.rs"

[features]
latest = ["k8s-openapi/v1_26"]
latest = ["k8s-openapi/v1_27"]
mk8sv = ["k8s-openapi/v1_21"]
rustls = ["kube/rustls-tls"]
openssl = ["kube/openssl-tls"]
Expand All @@ -29,6 +29,6 @@ tracing = "0.1.36"
tracing-subscriber = "0.3.3"
futures = "0.3.17"
kube = { path = "../kube", version = "^0.84.0", default-features = false, features = ["client", "runtime", "ws", "admission", "gzip"] }
k8s-openapi = { version = "0.18.0", default-features = false }
k8s-openapi = { git = "https://github.com/Arnavion/k8s-openapi", branch = "master", default-features = false }
serde_json = "1.0.68"
tokio = { version = "1.14.0", features = ["full"] }
8 changes: 6 additions & 2 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ rustls-tls = ["kube/client", "kube/rustls-tls"]
runtime = ["kube/runtime", "kube/unstable-runtime"]
refresh = ["kube/oauth", "kube/oidc"]
ws = ["kube/ws"]
latest = ["k8s-openapi/v1_26"]
latest = ["k8s-openapi/v1_27"]

[dev-dependencies]
tokio-util = "0.7.0"
Expand All @@ -32,7 +32,7 @@ futures = "0.3.17"
jsonpath_lib = "0.3.0"
kube = { path = "../kube", version = "^0.84.0", default-features = false, features = ["admission"] }
kube-derive = { path = "../kube-derive", version = "^0.84.0", default-features = false } # only needed to opt out of schema
k8s-openapi = { version = "0.18.0", default-features = false }
k8s-openapi = { git = "https://github.com/Arnavion/k8s-openapi", branch = "master", default-features = false }
serde = { version = "1.0.130", features = ["derive"] }
serde_json = "1.0.68"
serde_yaml = "0.9.19"
Expand Down Expand Up @@ -213,3 +213,7 @@ path = "secret_syncer.rs"
name = "pod_shell_crossterm"
path = "pod_shell_crossterm.rs"
required-features = ["ws"]

[[example]]
name = "namespace_reflector"
path = "namespace_reflector.rs"
49 changes: 49 additions & 0 deletions examples/namespace_reflector.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
use futures::TryStreamExt;
use k8s_openapi::api::core::v1::Namespace;
use kube::{
api::Api,
runtime::{predicates, reflector, watcher, WatchStreamExt},
Client, ResourceExt,
};
use tracing::*;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
let client = Client::try_default().await?;

let api: Api<Namespace> = Api::all(client);
let (reader, writer) = reflector::store::<Namespace>();

tokio::spawn(async move {
// Show state every 5 seconds of watching
loop {
reader.wait_until_ready().await.unwrap();
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
info!("Current namespace count: {}", reader.state().len());
// full information with debug logs
for p in reader.state() {
let yaml = serde_yaml::to_string(p.as_ref()).unwrap();
debug!("Namespace {}: \n{}", p.name_any(), yaml);
}
}
});

let stream = watcher(api, watcher::Config::default().streaming_lists())
.default_backoff()
.modify(|ns| {
// memory optimization for our store - we don't care about managed fields/annotations/status
ns.managed_fields_mut().clear();
ns.annotations_mut().clear();
ns.status = None;
})
.reflect(writer)
.applied_objects()
.predicate_filter(predicates::resource_version); // NB: requires an unstable feature

futures::pin_mut!(stream);
while let Some(ns) = stream.try_next().await? {
info!("saw {}", ns.name_any());
}
Ok(())
}
7 changes: 4 additions & 3 deletions justfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ fmt:
rustfmt +nightly --edition 2021 $(find . -type f -iname *.rs)

doc:
RUSTDOCFLAGS="--cfg docsrs" cargo +nightly doc --lib --workspace --features=derive,ws,oauth,oidc,jsonpatch,client,derive,runtime,admission,k8s-openapi/v1_26,unstable-runtime --open
RUSTDOCFLAGS="--cfg docsrs" cargo +nightly doc --lib --workspace --features=derive,ws,oauth,oidc,jsonpatch,client,derive,runtime,admission,k8s-openapi/v1_27,unstable-runtime --open

deny:
# might require rm Cargo.lock first to match CI
Expand Down Expand Up @@ -83,11 +83,12 @@ e2e-job-musl features:
chmod +x e2e/job

k3d:
k3d cluster create main --servers 1 --registry-create main \
k3d cluster create main --servers 1 --registry-create main --image rancher/k3s:v1.27.3-k3s1 \
--no-lb --no-rollback \
--k3s-arg "--disable=traefik,servicelb,metrics-server@server:*" \
--k3s-arg '--kubelet-arg=eviction-hard=imagefs.available<1%,nodefs.available<1%@agent:*' \
--k3s-arg '--kubelet-arg=eviction-minimum-reclaim=imagefs.available=1%,nodefs.available=1%@agent:*'
--k3s-arg '--kubelet-arg=eviction-minimum-reclaim=imagefs.available=1%,nodefs.available=1%@agent:*' \
--k3s-arg '--kube-apiserver-arg=feature-gates=WatchList=true'

## RELEASE RELATED

Expand Down
10 changes: 6 additions & 4 deletions kube-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ config = ["__non_core", "pem", "home"]
__non_core = ["tracing", "serde_yaml", "base64"]

[package.metadata.docs.rs]
features = ["client", "rustls-tls", "openssl-tls", "ws", "oauth", "oidc", "jsonpatch", "admission", "k8s-openapi/v1_26"]
features = ["client", "rustls-tls", "openssl-tls", "ws", "oauth", "oidc", "jsonpatch", "admission", "k8s-openapi/v1_27"]
# Define the configuration attribute `docsrs`. Used to enable `doc_cfg` feature.
rustdoc-args = ["--cfg", "docsrs"]

Expand Down Expand Up @@ -72,7 +72,8 @@ hyper-openssl = { version = "0.9.2", optional = true }
form_urlencoded = { version = "1.2.0", optional = true }

[dependencies.k8s-openapi]
version = "0.18.0"
git = "https://github.com/Arnavion/k8s-openapi"
branch = "master"
default-features = false
features = []

Expand All @@ -85,6 +86,7 @@ tokio-test = "0.4.0"
tower-test = "0.4.0"

[dev-dependencies.k8s-openapi]
version = "0.18.0"
git = "https://github.com/Arnavion/k8s-openapi"
branch = "master"
default-features = false
features = ["v1_26"]
features = ["v1_27"]
10 changes: 6 additions & 4 deletions kube-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ repository = "https://github.com/kube-rs/kube"
readme = "../README.md"

[package.metadata.docs.rs]
features = ["ws", "admission", "jsonpatch", "k8s-openapi/v1_26"]
features = ["ws", "admission", "jsonpatch", "k8s-openapi/v1_27"]
rustdoc-args = ["--cfg", "docsrs"]

[features]
Expand All @@ -36,14 +36,16 @@ chrono = { version = "0.4.19", default-features = false, features = ["clock"] }
schemars = { version = "0.8.6", optional = true }

[dependencies.k8s-openapi]
version = "0.18.0"
git = "https://github.com/Arnavion/k8s-openapi"
branch = "master"
default-features = false
features = []

[dev-dependencies.k8s-openapi]
version = "0.18.0"
git = "https://github.com/Arnavion/k8s-openapi"
branch = "master"
default-features = false
features = ["v1_26"]
features = ["v1_27"]

[dev-dependencies]
assert-json-diff = "2.0.1"
Expand Down
69 changes: 69 additions & 0 deletions kube-core/src/params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,32 @@ pub struct WatchParams {
/// If the feature gate WatchBookmarks is not enabled in apiserver,
/// this field is ignored.
pub bookmarks: bool,

/// Kubernetes 1.27 Streaming Lists
/// `sendInitialEvents=true` may be set together with `watch=true`.
/// In that case, the watch stream will begin with synthetic events to
/// produce the current state of objects in the collection. Once all such
/// events have been sent, a synthetic "Bookmark" event will be sent.
/// The bookmark will report the ResourceVersion (RV) corresponding to the
/// set of objects, and be marked with `"k8s.io/initial-events-end": "true"` annotation.
/// Afterwards, the watch stream will proceed as usual, sending watch events
/// corresponding to changes (subsequent to the RV) to objects watched.
///
/// When `sendInitialEvents` option is set, we require `resourceVersionMatch`
/// option to also be set. The semantic of the watch request is as following:
/// - `resourceVersionMatch` = NotOlderThan
/// is interpreted as "data at least as new as the provided `resourceVersion`"
/// and the bookmark event is send when the state is synced
/// to a `resourceVersion` at least as fresh as the one provided by the ListOptions.
/// If `resourceVersion` is unset, this is interpreted as "consistent read" and the
/// bookmark event is send when the state is synced at least to the moment
/// when request started being processed.
/// - `resourceVersionMatch` set to any other value or unset
/// Invalid error is returned.
///
/// Defaults to true if `resourceVersion=""` or `resourceVersion="0"` (for backward
/// compatibility reasons) and to false otherwise.
pub send_initial_events: bool,
}

impl WatchParams {
Expand All @@ -315,6 +341,13 @@ impl WatchParams {
return Err(Error::Validation("WatchParams::timeout must be < 295s".into()));
}
}
if self.send_initial_events {
if !self.bookmarks {
return Err(Error::Validation(
"WatchParams::bookmarks must be set when using send_initial_events".into(),
));
}
}
Ok(())
}

Expand All @@ -334,6 +367,10 @@ impl WatchParams {
if self.bookmarks {
qp.append_pair("allowWatchBookmarks", "true");
}
if self.send_initial_events {
qp.append_pair("sendInitialEvents", "true");
qp.append_pair("resourceVersionMatch", "NotOlderThan");
}
}
}

Expand All @@ -347,6 +384,7 @@ impl Default for WatchParams {
label_selector: None,
field_selector: None,
timeout: None,
send_initial_events: false,
}
}
}
Expand Down Expand Up @@ -401,6 +439,37 @@ impl WatchParams {
self.bookmarks = false;
self
}

/// Kubernetes 1.27 Streaming Lists
/// `sendInitialEvents=true` may be set together with `watch=true`.
/// In that case, the watch stream will begin with synthetic events to
/// produce the current state of objects in the collection. Once all such
/// events have been sent, a synthetic "Bookmark" event will be sent.
/// The bookmark will report the ResourceVersion (RV) corresponding to the
/// set of objects, and be marked with `"k8s.io/initial-events-end": "true"` annotation.
/// Afterwards, the watch stream will proceed as usual, sending watch events
/// corresponding to changes (subsequent to the RV) to objects watched.
///
/// When `sendInitialEvents` option is set, we require `resourceVersionMatch`
/// option to also be set. The semantic of the watch request is as following:
/// - `resourceVersionMatch` = NotOlderThan
/// is interpreted as "data at least as new as the provided `resourceVersion`"
/// and the bookmark event is send when the state is synced
/// to a `resourceVersion` at least as fresh as the one provided by the ListOptions.
/// If `resourceVersion` is unset, this is interpreted as "consistent read" and the
/// bookmark event is send when the state is synced at least to the moment
/// when request started being processed.
/// - `resourceVersionMatch` set to any other value or unset
/// Invalid error is returned.
///
/// Defaults to true if `resourceVersion=""` or `resourceVersion="0"` (for backward
/// compatibility reasons) and to false otherwise.
#[must_use]
pub fn initial_events(mut self) -> Self {
self.send_initial_events = true;

self
}
}

/// Common query parameters for put/post calls
Expand Down
12 changes: 12 additions & 0 deletions kube-core/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,18 @@ mod test {
"/api/v1/namespaces/ns/pods?&watch=true&timeoutSeconds=290&allowWatchBookmarks=true&resourceVersion=0"
);
}

#[test]
fn watch_streaming_list() {
let url = corev1::Pod::url_path(&(), Some("ns"));
let wp = WatchParams::default().initial_events();
let req = Request::new(url).watch(&wp, "0").unwrap();
assert_eq!(
req.uri(),
"/api/v1/namespaces/ns/pods?&watch=true&timeoutSeconds=290&allowWatchBookmarks=true&sendInitialEvents=true&resourceVersionMatch=NotOlderThan&resourceVersion=0"
);
}

#[test]
fn watch_metadata_path() {
let url = corev1::Pod::url_path(&(), Some("ns"));
Expand Down
5 changes: 5 additions & 0 deletions kube-core/src/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,9 @@ pub struct Bookmark {
pub struct BookmarkMeta {
/// The only field we need from a Bookmark event.
pub resource_version: String,

/// Kubernetes 1.27 Streaming Lists
/// The rest of the fields are optional and may be empty.
#[serde(default)]
pub annotations: std::collections::BTreeMap<String, String>,
}
2 changes: 1 addition & 1 deletion kube-derive/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ proc-macro = true
serde = { version = "1.0.130", features = ["derive"] }
serde_yaml = "0.9.19"
kube = { path = "../kube", version = "<1.0.0, >=0.61.0", features = ["derive", "client"] }
k8s-openapi = { version = "0.18.0", default-features = false, features = ["v1_26"] }
k8s-openapi = { git = "https://github.com/Arnavion/k8s-openapi", branch = "master", default-features = false, features = ["v1_27"] }
schemars = { version = "0.8.6", features = ["chrono"] }
chrono = { version = "0.4.19", default-features = false }
trybuild = "1.0.48"
Expand Down
10 changes: 6 additions & 4 deletions kube-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ unstable-runtime-stream-control = []
unstable-runtime-reconcile-on = []

[package.metadata.docs.rs]
features = ["k8s-openapi/v1_26", "unstable-runtime"]
features = ["k8s-openapi/v1_27", "unstable-runtime"]
# Define the configuration attribute `docsrs`. Used to enable `doc_cfg` feature.
rustdoc-args = ["--cfg", "docsrs"]

Expand All @@ -46,7 +46,8 @@ async-trait = "0.1.64"
hashbrown = "0.14.0"

[dependencies.k8s-openapi]
version = "0.18.0"
git = "https://github.com/Arnavion/k8s-openapi"
branch = "master"
default-features = false

[dev-dependencies]
Expand All @@ -58,6 +59,7 @@ schemars = "0.8.6"
tracing-subscriber = "0.3.17"

[dev-dependencies.k8s-openapi]
version = "0.18.0"
git = "https://github.com/Arnavion/k8s-openapi"
branch = "master"
default-features = false
features = ["v1_26"]
features = ["v1_27"]
Loading

0 comments on commit 145a1cd

Please sign in to comment.