From eba80e03e8c09839ce636be9f9ce4a9ba05c0674 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Teo=20Klestrup=20R=C3=B6ijezon?= Date: Wed, 21 Jul 2021 03:13:03 +0200 Subject: [PATCH 1/7] Import Neokubism from https://gitlab.com/teozkr/neokubism/-/tree/1b1a240a242ebefd00a745fea7110cbf2f615a13 --- kube/Cargo.toml | 1 + kube/src/client/decoder/mod.rs | 5 ++ kube/src/client/decoder/single.rs | 85 +++++++++++++++++++++++++++++++ kube/src/client/decoder/stream.rs | 82 +++++++++++++++++++++++++++++ kube/src/client/mod.rs | 43 ++++++++++++++-- kube/src/client/scope.rs | 38 ++++++++++++++ kube/src/client/verb.rs | 81 +++++++++++++++++++++++++++++ 7 files changed, 331 insertions(+), 4 deletions(-) create mode 100644 kube/src/client/decoder/mod.rs create mode 100644 kube/src/client/decoder/single.rs create mode 100644 kube/src/client/decoder/stream.rs create mode 100644 kube/src/client/scope.rs create mode 100644 kube/src/client/verb.rs diff --git a/kube/Cargo.toml b/kube/Cargo.toml index 063e15ace..534dd4475 100644 --- a/kube/Cargo.toml +++ b/kube/Cargo.toml @@ -71,6 +71,7 @@ tame-oauth = { version = "0.4.7", features = ["gcp"], optional = true } pin-project = { version = "1.0.4", optional = true } rand = { version = "0.8.3", optional = true } tracing = { version = "0.1.25", features = ["log"], optional = true } +snafu = "0.6.10" [dependencies.k8s-openapi] version = "0.12.0" diff --git a/kube/src/client/decoder/mod.rs b/kube/src/client/decoder/mod.rs new file mode 100644 index 000000000..842f0cbc8 --- /dev/null +++ b/kube/src/client/decoder/mod.rs @@ -0,0 +1,5 @@ +pub mod single; +pub mod stream; + +pub use single::DecodeSingle; +pub use stream::DecodeStream; diff --git a/kube/src/client/decoder/single.rs b/kube/src/client/decoder/single.rs new file mode 100644 index 000000000..fdd210b5c --- /dev/null +++ b/kube/src/client/decoder/single.rs @@ -0,0 +1,85 @@ +use bytes::{Buf, Bytes}; +use futures::{ready, Future, StreamExt}; +use http::Response; +use hyper::Body; +use serde::de::DeserializeOwned; +use snafu::{ResultExt, Snafu}; +use std::{io::Read, marker::PhantomData, task::Poll}; + +#[derive(Debug, Snafu)] +pub enum Error { + #[snafu(display("read failed: {}", source))] + ReadFailed { source: hyper::Error }, + #[snafu(display("deserialize failed: {}", source))] + DeserializeFailed { source: serde_json::Error }, +} + +pub struct DecodeSingle { + tpe: PhantomData<*const K>, + chunks: Vec, + body: Body, +} + +impl From> for DecodeSingle { + fn from(res: Response) -> Self { + Self { + tpe: PhantomData, + chunks: Vec::new(), + body: res.into_body(), + } + } +} + +impl Future for DecodeSingle { + type Output = Result; + + fn poll( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll { + loop { + break match ready!(self.body.poll_next_unpin(cx)) { + Some(Ok(chunk)) => { + self.chunks.push(chunk); + continue; + } + Some(Err(err)) => Poll::Ready(Err(err).context(ReadFailed)), + None => Poll::Ready( + serde_json::from_reader(BytesVecCursor::from(std::mem::take(&mut self.chunks))) + .context(DeserializeFailed), + ), + }; + } + } +} + +struct BytesVecCursor { + cur_chunk: bytes::buf::Reader, + chunks: std::vec::IntoIter, +} + +impl Read for BytesVecCursor { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + loop { + break Ok(match self.cur_chunk.read(buf)? { + 0 => match self.chunks.next() { + Some(chunk) => { + self.cur_chunk = chunk.reader(); + continue; + } + None => 0, + }, + n => n, + }); + } + } +} + +impl From> for BytesVecCursor { + fn from(vec: Vec) -> Self { + BytesVecCursor { + cur_chunk: Bytes::new().reader(), + chunks: vec.into_iter(), + } + } +} diff --git a/kube/src/client/decoder/stream.rs b/kube/src/client/decoder/stream.rs new file mode 100644 index 000000000..6bcb18558 --- /dev/null +++ b/kube/src/client/decoder/stream.rs @@ -0,0 +1,82 @@ +use bytes::Bytes; +use futures::{ready, stream::MapErr, Future, Stream, StreamExt, TryStreamExt}; +use http::Response; +use hyper::Body; +use serde::de::DeserializeOwned; +use snafu::{ResultExt, Snafu}; +use std::{convert::Infallible, marker::PhantomData, task::Poll}; +use tokio_util::{ + codec::{FramedRead, LinesCodec, LinesCodecError}, + io::StreamReader, +}; + +#[derive(Debug, Snafu)] +pub enum Error { + #[snafu(display("read failed: {}", source))] + ReadFailed { source: LinesCodecError }, + #[snafu(display("deserialize failed: {}", source))] + DeserializeFailed { source: serde_json::Error }, +} + +pub struct DecodeStream { + tpe: PhantomData<*const K>, + body: Option, +} + +impl Future for DecodeStream { + type Output = Result, Infallible>; + + fn poll( + mut self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> Poll { + Poll::Ready(Ok(DecodeStreamStream { + tpe: self.tpe, + body: FramedRead::new( + StreamReader::new( + self.body + .take() + .expect("DecodeStream may not be polled again after resolving") + .map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)), + ), + LinesCodec::new(), + ), + })) + } +} + +impl From> for DecodeStream { + fn from(res: Response) -> Self { + Self { + tpe: PhantomData, + body: Some(res.into_body()), + } + } +} + +pub struct DecodeStreamStream { + tpe: PhantomData<*const K>, + #[allow(clippy::type_complexity)] + body: FramedRead< + StreamReader std::io::Error>, Bytes>, + LinesCodec, + >, +} + +impl Stream for DecodeStreamStream { + type Item = Result; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + match ready!(self.body.poll_next_unpin(cx)) { + Some(frame) => Poll::Ready(Some(read_frame(frame))), + None => Poll::Ready(None), + } + } +} + +fn read_frame(frame: Result) -> Result { + serde_json::from_str(&frame.context(ReadFailed)?).context(DeserializeFailed) +} diff --git a/kube/src/client/mod.rs b/kube/src/client/mod.rs index f655529b4..3c6e7b232 100644 --- a/kube/src/client/mod.rs +++ b/kube/src/client/mod.rs @@ -10,9 +10,10 @@ use std::convert::TryFrom; +use crate::{api::WatchEvent, error::ErrorResponse, Config, Error, Result}; use bytes::Bytes; use either::{Either, Left, Right}; -use futures::{self, Stream, StreamExt, TryStream, TryStreamExt}; +use futures::{self, Stream, StreamExt, TryFuture, TryFutureExt, TryStream, TryStreamExt}; use http::{self, Request, Response, StatusCode}; use hyper::{client::HttpConnector, Body}; use hyper_timeout::TimeoutConnector; @@ -20,6 +21,7 @@ use k8s_openapi::apimachinery::pkg::apis::meta::v1 as k8s_meta_v1; pub use kube_core::response::Status; use serde::de::DeserializeOwned; use serde_json::{self, Value}; +use snafu::{ResultExt, Snafu}; #[cfg(feature = "ws")] use tokio_tungstenite::{tungstenite as ws, WebSocketStream}; use tokio_util::{ @@ -31,14 +33,17 @@ use tower_http::{ classify::ServerErrorsFailureClass, map_response_body::MapResponseBodyLayer, trace::TraceLayer, }; -use crate::{api::WatchEvent, error::ErrorResponse, Config, Error, Result}; - mod auth; mod body; +mod decoder; +mod scope; +mod verb; // Add `into_stream()` to `http::Body` use body::BodyStreamExt; mod config_ext; pub use config_ext::ConfigExt; + +use self::verb::Verb; pub mod middleware; #[cfg(any(feature = "native-tls", feature = "rustls-tls"))] mod tls; @@ -46,6 +51,21 @@ pub mod middleware; #[cfg(feature = "ws")] const WS_PROTOCOL: &str = "v4.channel.k8s.io"; +#[derive(Snafu, Debug)] +#[allow(missing_docs)] +/// Failed to perform an API call +pub enum CallError { + /// Failed to build the API request + #[snafu(display("failed to build api request: {}", source))] + BuildRequestFailed { source: verb::Error }, + /// API request failed + #[snafu(display("kube api request failed: {}", source))] + RequestFailed { source: Error }, + /// Failed to decode API response + #[snafu(display("failed to decode api response: {}", source))] + DecodeFailed { source: DecodeErr }, +} + /// Client for connecting with a Kubernetes cluster. /// /// The easiest way to instantiate the client is either by @@ -96,7 +116,7 @@ impl Client { S::Error: Into, B: http_body::Body + Send + 'static, B::Error: std::error::Error + Send + Sync + 'static, - T: Into + T: Into, { // Transform response body to `hyper::Body` and use type erased error to avoid type parameters. let service = MapResponseBodyLayer::new(|b: B| Body::wrap_stream(b.into_stream())) @@ -126,6 +146,21 @@ impl Client { &self.default_ns } + /// Perform a request described by a [`Verb`] + pub async fn call( + &self, + verb: V, + ) -> Result<::Ok, CallError<::Error>> + where + ::Error: std::error::Error + 'static, + { + let req = verb.to_http_request().context(BuildRequestFailed)?; + V::ResponseDecoder::from(self.send(req).await.context(RequestFailed)?) + .into_future() + .await + .context(DecodeFailed) + } + async fn send(&self, request: Request) -> Result> { let mut svc = self.inner.clone(); let res = svc diff --git a/kube/src/client/scope.rs b/kube/src/client/scope.rs new file mode 100644 index 000000000..2b7d2356c --- /dev/null +++ b/kube/src/client/scope.rs @@ -0,0 +1,38 @@ +use k8s_openapi::{ClusterResourceScope, NamespaceResourceScope}; + +pub trait Scope { + fn path_segment(&self) -> String; + fn namespace(&self) -> Option<&str>; +} + +pub struct ClusterScope; +pub struct NamespaceScope { + pub namespace: String, +} + +impl Scope for ClusterScope { + fn path_segment(&self) -> String { + String::new() + } + + fn namespace(&self) -> Option<&str> { + None + } +} +impl Scope for NamespaceScope { + fn path_segment(&self) -> String { + format!("namespaces/{}/", self.namespace) + } + + fn namespace(&self) -> Option<&str> { + Some(&self.namespace) + } +} + +pub trait NativeScope: Scope {} +impl> NativeScope for NamespaceScope {} +impl> NativeScope for ClusterScope {} + +pub trait ListScope: Scope {} +impl> ListScope for NamespaceScope {} +impl ListScope for ClusterScope {} diff --git a/kube/src/client/verb.rs b/kube/src/client/verb.rs new file mode 100644 index 000000000..8420f4c32 --- /dev/null +++ b/kube/src/client/verb.rs @@ -0,0 +1,81 @@ +use std::ops::Deref; + +use futures::TryFuture; +use http::{Request, Response}; +use hyper::Body; +use kube_core::{Resource, WatchEvent}; +use serde::{de::DeserializeOwned, Deserialize}; +use snafu::{ResultExt, Snafu}; + +use crate::client::{ + decoder::{DecodeSingle, DecodeStream}, + scope::{self, NativeScope}, + Config, +}; + +#[derive(Snafu, Debug)] +pub enum Error { + #[snafu(display("verb created invalid http request: {}", source))] + BuildRequestFailed { source: http::Error }, +} +type Result = std::result::Result; + +pub trait Verb { + type ResponseDecoder: TryFuture + From>; + + fn to_http_request(&self) -> Result>; +} + +pub struct Get { + pub name: String, + pub scope: Scope, + pub dyn_type: Kind::DynamicType, +} +impl> Verb for Get { + type ResponseDecoder = DecodeSingle; + + fn to_http_request(&self) -> Result> { + Request::get(format!( + "{}/{}", + Kind::url_path(&self.dyn_type, self.scope.namespace()), + self.name + )) + .body(Body::empty()) + .context(BuildRequestFailed) + } +} + +pub struct List { + pub scope: Scope, + pub dyn_type: Kind::DynamicType, +} +impl Verb for List { + type ResponseDecoder = DecodeSingle>; + + fn to_http_request(&self) -> Result> { + Request::get(Kind::url_path(&self.dyn_type, self.scope.namespace())) + .body(Body::empty()) + .context(BuildRequestFailed) + } +} +#[derive(Deserialize)] +pub struct ObjectList { + pub items: Vec, +} + +pub struct Watch { + pub scope: Scope, + pub dyn_type: Kind::DynamicType, +} +impl Verb for Watch { + type ResponseDecoder = DecodeStream>; + + fn to_http_request(&self) -> Result> { + Request::get(format!( + "{}?watch=1", + Kind::url_path(&self.dyn_type, self.scope.namespace()), + )) + .body(Body::empty()) + .context(BuildRequestFailed) + } +} From 62914df07a9d197e7ece595e6e3f5760cab9fe16 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Teo=20Klestrup=20R=C3=B6ijezon?= Date: Wed, 21 Jul 2021 04:42:34 +0200 Subject: [PATCH 2/7] Migrate discovery and Api::get/list/create to use Client::call Note that this is still very broken and MVP, since no extra params are implemented (yet) --- kube-core/src/object.rs | 15 ++-- kube-core/src/resource.rs | 2 +- kube/src/api/core_methods.rs | 31 +++++-- kube/src/api/mod.rs | 60 ++++++------- kube/src/api/subresource.rs | 14 +-- kube/src/client/decoder/mod.rs | 4 + kube/src/client/decoder/single.rs | 14 +-- kube/src/client/decoder/stream.rs | 22 ++--- kube/src/client/mod.rs | 65 +++++++------- kube/src/client/scope.rs | 45 ++++++++-- kube/src/client/verb.rs | 144 +++++++++++++++++++++++++----- kube/src/error.rs | 4 + 12 files changed, 281 insertions(+), 139 deletions(-) diff --git a/kube-core/src/object.rs b/kube-core/src/object.rs index ec5208e3b..97e94fd0d 100644 --- a/kube-core/src/object.rs +++ b/kube-core/src/object.rs @@ -16,11 +16,8 @@ use std::borrow::Cow; /// and is generally produced from list/watch/delete collection queries on an [`Resource`](super::Resource). /// /// This is almost equivalent to [`k8s_openapi::List`](k8s_openapi::List), but iterable. -#[derive(Deserialize, Debug)] -pub struct ObjectList -where - T: Clone, -{ +#[derive(Clone, Deserialize, Debug)] +pub struct ObjectList { // NB: kind and apiVersion can be set here, but no need for it atm /// ListMeta - only really used for its `resourceVersion` /// @@ -32,7 +29,7 @@ where pub items: Vec, } -impl ObjectList { +impl ObjectList { /// `iter` returns an Iterator over the elements of this ObjectList /// /// # Example @@ -75,7 +72,7 @@ impl ObjectList { } } -impl IntoIterator for ObjectList { +impl IntoIterator for ObjectList { type IntoIter = ::std::vec::IntoIter; type Item = T; @@ -84,7 +81,7 @@ impl IntoIterator for ObjectList { } } -impl<'a, T: Clone> IntoIterator for &'a ObjectList { +impl<'a, T> IntoIterator for &'a ObjectList { type IntoIter = ::std::slice::Iter<'a, T>; type Item = &'a T; @@ -93,7 +90,7 @@ impl<'a, T: Clone> IntoIterator for &'a ObjectList { } } -impl<'a, T: Clone> IntoIterator for &'a mut ObjectList { +impl<'a, T> IntoIterator for &'a mut ObjectList { type IntoIter = ::std::slice::IterMut<'a, T>; type Item = &'a mut T; diff --git a/kube-core/src/resource.rs b/kube-core/src/resource.rs index 48da547a0..5e7eec749 100644 --- a/kube-core/src/resource.rs +++ b/kube-core/src/resource.rs @@ -22,7 +22,7 @@ pub trait Resource { /// as type of this information. /// /// See [`DynamicObject`](crate::dynamic::DynamicObject) for a valid implementation of non-k8s-openapi resources. - type DynamicType: Send + Sync + 'static; + type DynamicType: Clone + Send + Sync + 'static; /// Returns kind of this object fn kind(dt: &Self::DynamicType) -> Cow<'_, str>; diff --git a/kube/src/api/core_methods.rs b/kube/src/api/core_methods.rs index da3cd9f2f..1f6fd13d5 100644 --- a/kube/src/api/core_methods.rs +++ b/kube/src/api/core_methods.rs @@ -3,13 +3,17 @@ use futures::Stream; use serde::{de::DeserializeOwned, Serialize}; use std::fmt::Debug; -use crate::{api::Api, Result}; -use kube_core::{object::ObjectList, params::*, response::Status, WatchEvent}; +use crate::{ + api::Api, + client::verb::{Get, List}, + Result, +}; +use kube_core::{object::ObjectList, params::*, response::Status, Resource, WatchEvent}; /// PUSH/PUT/POST/GET abstractions impl Api where - K: Clone + DeserializeOwned + Debug, + K: Clone + DeserializeOwned + Debug + Resource, { /// Get a named resource /// @@ -25,9 +29,14 @@ where /// } /// ``` pub async fn get(&self, name: &str) -> Result { - let mut req = self.request.get(name)?; - req.extensions_mut().insert("get"); - self.client.request::(req).await + Ok(self + .client + .call(Get { + name, + scope: &self.scope, + dyn_type: &self.dyn_type, + }) + .await?) } /// Get a list of resources @@ -49,9 +58,13 @@ where /// } /// ``` pub async fn list(&self, lp: &ListParams) -> Result> { - let mut req = self.request.list(lp)?; - req.extensions_mut().insert("list"); - self.client.request::>(req).await + Ok(self + .client + .call(List { + scope: &self.scope, + dyn_type: &self.dyn_type, + }) + .await?) } /// Create a resource diff --git a/kube/src/api/mod.rs b/kube/src/api/mod.rs index 029cf1de4..72810120d 100644 --- a/kube/src/api/mod.rs +++ b/kube/src/api/mod.rs @@ -8,8 +8,8 @@ mod core_methods; mod subresource; #[cfg(feature = "ws")] #[cfg_attr(docsrs, doc(cfg(feature = "ws")))] -pub use subresource::{AttachParams, Attach, Execute}; -pub use subresource::{EvictParams, Evict, LogParams, Log, ScaleSpec, ScaleStatus}; +pub use subresource::{Attach, AttachParams, Execute}; +pub use subresource::{Evict, EvictParams, Log, LogParams, ScaleSpec, ScaleStatus}; // Re-exports from kube-core #[cfg(feature = "admission")] @@ -29,7 +29,10 @@ pub use params::{ DeleteParams, ListParams, Patch, PatchParams, PostParams, Preconditions, PropagationPolicy, }; -use crate::Client; +use crate::{ + client::scope::{ClusterScope, DynamicScope, NamespaceScope}, + Client, +}; /// The generic Api abstraction /// /// This abstracts over a [`Request`] and a type `K` so that @@ -37,9 +40,11 @@ use crate::Client; /// implemented by the dynamic [`Resource`]. #[cfg_attr(docsrs, doc(cfg(feature = "client")))] #[derive(Clone)] -pub struct Api { +pub struct Api { /// The request builder object with its resource dependent url pub(crate) request: Request, + pub(crate) scope: DynamicScope, + pub(crate) dyn_type: K::DynamicType, /// The client to use (from this library) pub(crate) client: Client, /// Note: Using `iter::Empty` over `PhantomData`, because we never actually keep any @@ -55,10 +60,12 @@ impl Api { /// Cluster level resources, or resources viewed across all namespaces /// /// This function accepts `K::DynamicType` so it can be used with dynamic resources. - pub fn all_with(client: Client, dyntype: &K::DynamicType) -> Self { - let url = K::url_path(dyntype, None); + pub fn all_with(client: Client, dyn_type: &K::DynamicType) -> Self { + let url = K::url_path(dyn_type, None); Self { client, + scope: DynamicScope::Cluster(ClusterScope), + dyn_type: dyn_type.clone(), request: Request::new(url), phantom: std::iter::empty(), } @@ -67,10 +74,14 @@ impl Api { /// Namespaced resource within a given namespace /// /// This function accepts `K::DynamicType` so it can be used with dynamic resources. - pub fn namespaced_with(client: Client, ns: &str, dyntype: &K::DynamicType) -> Self { - let url = K::url_path(dyntype, Some(ns)); + pub fn namespaced_with(client: Client, ns: &str, dyn_type: &K::DynamicType) -> Self { + let url = K::url_path(dyn_type, Some(ns)); Self { client, + scope: DynamicScope::Namespace(NamespaceScope { + namespace: ns.to_string(), + }), + dyn_type: dyn_type.clone(), request: Request::new(url), phantom: std::iter::empty(), } @@ -82,13 +93,9 @@ impl Api { /// /// Unless configured explicitly, the default namespace is either "default" /// out of cluster, or the service account's namespace in cluster. - pub fn default_namespaced_with(client: Client, dyntype: &K::DynamicType) -> Self { - let url = K::url_path(dyntype, Some(client.default_ns())); - Self { - client, - request: Request::new(url), - phantom: std::iter::empty(), - } + pub fn default_namespaced_with(client: Client, dyn_type: &K::DynamicType) -> Self { + let ns = client.default_ns().to_string(); + Self::namespaced_with(client, &ns, dyn_type) } /// Consume self and return the [`Client`] @@ -112,22 +119,12 @@ where { /// Cluster level resources, or resources viewed across all namespaces pub fn all(client: Client) -> Self { - let url = K::url_path(&Default::default(), None); - Self { - client, - request: Request::new(url), - phantom: std::iter::empty(), - } + Self::all_with(client, &K::DynamicType::default()) } /// Namespaced resource within a given namespace pub fn namespaced(client: Client, ns: &str) -> Self { - let url = K::url_path(&Default::default(), Some(ns)); - Self { - client, - request: Request::new(url), - phantom: std::iter::empty(), - } + Self::namespaced_with(client, ns, &K::DynamicType::default()) } /// Namespaced resource within the default namespace @@ -135,16 +132,11 @@ where /// Unless configured explicitly, the default namespace is either "default" /// out of cluster, or the service account's namespace in cluster. pub fn default_namespaced(client: Client) -> Self { - let url = K::url_path(&Default::default(), Some(client.default_ns())); - Self { - client, - request: Request::new(url), - phantom: std::iter::empty(), - } + Self::default_namespaced_with(client, &K::DynamicType::default()) } } -impl From> for Client { +impl From> for Client { fn from(api: Api) -> Self { api.client } diff --git a/kube/src/api/subresource.rs b/kube/src/api/subresource.rs index 1067eec35..8c8231a06 100644 --- a/kube/src/api/subresource.rs +++ b/kube/src/api/subresource.rs @@ -8,8 +8,8 @@ use crate::{ Result, }; -use kube_core::response::Status; pub use kube_core::subresource::{EvictParams, LogParams}; +use kube_core::{response::Status, Resource}; #[cfg(feature = "ws")] #[cfg_attr(docsrs, doc(cfg(feature = "ws")))] @@ -22,7 +22,7 @@ pub use k8s_openapi::api::autoscaling::v1::{Scale, ScaleSpec, ScaleStatus}; /// Methods for [scale subresource](https://kubernetes.io/docs/tasks/access-kubernetes-api/custom-resources/custom-resource-definitions/#scale-subresource). impl Api where - K: Clone + DeserializeOwned, + K: Clone + DeserializeOwned + Resource, { /// Fetch the scale subresource pub async fn get_scale(&self, name: &str) -> Result { @@ -58,7 +58,7 @@ where /// Methods for [status subresource](https://kubernetes.io/docs/tasks/access-kubernetes-api/custom-resources/custom-resource-definitions/#status-subresource). impl Api where - K: DeserializeOwned, + K: DeserializeOwned + Resource, { /// Get the named resource with a status subresource /// @@ -153,7 +153,7 @@ impl Log for k8s_openapi::api::core::v1::Pod {} impl Api where - K: DeserializeOwned + Log, + K: DeserializeOwned + Log + Resource, { /// Fetch logs as a string pub async fn logs(&self, name: &str, lp: &LogParams) -> Result { @@ -191,7 +191,7 @@ impl Evict for k8s_openapi::api::core::v1::Pod {} impl Api where - K: DeserializeOwned + Evict, + K: DeserializeOwned + Evict + Resource, { /// Create an eviction pub async fn evict(&self, name: &str, ep: &EvictParams) -> Result { @@ -235,7 +235,7 @@ impl Attach for k8s_openapi::api::core::v1::Pod {} #[cfg_attr(docsrs, doc(cfg(feature = "ws")))] impl Api where - K: Clone + DeserializeOwned + Attach, + K: Clone + DeserializeOwned + Attach + Resource, { /// Attach to pod pub async fn attach(&self, name: &str, ap: &AttachParams) -> Result { @@ -281,7 +281,7 @@ impl Execute for k8s_openapi::api::core::v1::Pod {} #[cfg_attr(docsrs, doc(cfg(feature = "ws")))] impl Api where - K: Clone + DeserializeOwned + Execute, + K: Clone + DeserializeOwned + Execute + Resource, { /// Execute a command in a pod pub async fn exec( diff --git a/kube/src/client/decoder/mod.rs b/kube/src/client/decoder/mod.rs index 842f0cbc8..1367198bf 100644 --- a/kube/src/client/decoder/mod.rs +++ b/kube/src/client/decoder/mod.rs @@ -1,3 +1,7 @@ +//! Decode the result of a [`Verb`] +//! +//! You typically don't need to interact with this directly, unless you are implementing a custom [`Verb`] + pub mod single; pub mod stream; diff --git a/kube/src/client/decoder/single.rs b/kube/src/client/decoder/single.rs index fdd210b5c..2542b2fb3 100644 --- a/kube/src/client/decoder/single.rs +++ b/kube/src/client/decoder/single.rs @@ -1,3 +1,5 @@ +//! Single-value decoder + use bytes::{Buf, Bytes}; use futures::{ready, Future, StreamExt}; use http::Response; @@ -7,15 +9,20 @@ use snafu::{ResultExt, Snafu}; use std::{io::Read, marker::PhantomData, task::Poll}; #[derive(Debug, Snafu)] +#[allow(missing_docs)] +/// Failed to decode body pub enum Error { + /// Failed to read body #[snafu(display("read failed: {}", source))] ReadFailed { source: hyper::Error }, + /// Failed to deserialize body #[snafu(display("deserialize failed: {}", source))] DeserializeFailed { source: serde_json::Error }, } +/// Decode a single JSON value pub struct DecodeSingle { - tpe: PhantomData<*const K>, + tpe: PhantomData K>, chunks: Vec, body: Body, } @@ -33,10 +40,7 @@ impl From> for DecodeSingle { impl Future for DecodeSingle { type Output = Result; - fn poll( - mut self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll { + fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { loop { break match ready!(self.body.poll_next_unpin(cx)) { Some(Ok(chunk)) => { diff --git a/kube/src/client/decoder/stream.rs b/kube/src/client/decoder/stream.rs index 6bcb18558..49ea097c0 100644 --- a/kube/src/client/decoder/stream.rs +++ b/kube/src/client/decoder/stream.rs @@ -1,3 +1,5 @@ +//! Stream decoder + use bytes::Bytes; use futures::{ready, stream::MapErr, Future, Stream, StreamExt, TryStreamExt}; use http::Response; @@ -11,25 +13,27 @@ use tokio_util::{ }; #[derive(Debug, Snafu)] +#[allow(missing_docs)] +/// Failed to decode body pub enum Error { + /// Failed to read body #[snafu(display("read failed: {}", source))] ReadFailed { source: LinesCodecError }, + /// Failed to deserialize body #[snafu(display("deserialize failed: {}", source))] DeserializeFailed { source: serde_json::Error }, } +/// Decode a stream of newline-separate JSON values pub struct DecodeStream { - tpe: PhantomData<*const K>, + tpe: PhantomData K>, body: Option, } impl Future for DecodeStream { type Output = Result, Infallible>; - fn poll( - mut self: std::pin::Pin<&mut Self>, - _cx: &mut std::task::Context<'_>, - ) -> Poll { + fn poll(mut self: std::pin::Pin<&mut Self>, _cx: &mut std::task::Context<'_>) -> Poll { Poll::Ready(Ok(DecodeStreamStream { tpe: self.tpe, body: FramedRead::new( @@ -54,13 +58,11 @@ impl From> for DecodeStream { } } +/// Helper stream returned by [`DecodeStream`] pub struct DecodeStreamStream { - tpe: PhantomData<*const K>, + tpe: PhantomData K>, #[allow(clippy::type_complexity)] - body: FramedRead< - StreamReader std::io::Error>, Bytes>, - LinesCodec, - >, + body: FramedRead std::io::Error>, Bytes>, LinesCodec>, } impl Stream for DecodeStreamStream { diff --git a/kube/src/client/mod.rs b/kube/src/client/mod.rs index 3c6e7b232..a5cfba86f 100644 --- a/kube/src/client/mod.rs +++ b/kube/src/client/mod.rs @@ -10,7 +10,12 @@ use std::convert::TryFrom; -use crate::{api::WatchEvent, error::ErrorResponse, Config, Error, Result}; +use crate::{ + api::WatchEvent, + client::verb::{GetApiserverVersion, ListApiGroups}, + error::ErrorResponse, + Config, Error, Result, +}; use bytes::Bytes; use either::{Either, Left, Right}; use futures::{self, Stream, StreamExt, TryFuture, TryFutureExt, TryStream, TryStreamExt}; @@ -35,15 +40,15 @@ use tower_http::{ mod auth; mod body; -mod decoder; -mod scope; -mod verb; +pub mod decoder; +pub mod scope; +pub mod verb; // Add `into_stream()` to `http::Body` use body::BodyStreamExt; mod config_ext; pub use config_ext::ConfigExt; -use self::verb::Verb; +use self::verb::{ListApiGroupResources, ListCoreApiVersions, Verb}; pub mod middleware; #[cfg(any(feature = "native-tls", feature = "rustls-tls"))] mod tls; @@ -54,16 +59,18 @@ const WS_PROTOCOL: &str = "v4.channel.k8s.io"; #[derive(Snafu, Debug)] #[allow(missing_docs)] /// Failed to perform an API call -pub enum CallError { +pub enum CallError { /// Failed to build the API request #[snafu(display("failed to build api request: {}", source))] BuildRequestFailed { source: verb::Error }, /// API request failed #[snafu(display("kube api request failed: {}", source))] - RequestFailed { source: Error }, + RequestFailed { source: Box }, /// Failed to decode API response #[snafu(display("failed to decode api response: {}", source))] - DecodeFailed { source: DecodeErr }, + DecodeFailed { + source: Box, + }, } /// Client for connecting with a Kubernetes cluster. @@ -147,17 +154,15 @@ impl Client { } /// Perform a request described by a [`Verb`] - pub async fn call( - &self, - verb: V, - ) -> Result<::Ok, CallError<::Error>> + pub async fn call(&self, verb: V) -> Result<::Ok, CallError> where - ::Error: std::error::Error + 'static, + ::Error: std::error::Error + Send + Sync + 'static, { let req = verb.to_http_request().context(BuildRequestFailed)?; - V::ResponseDecoder::from(self.send(req).await.context(RequestFailed)?) + V::ResponseDecoder::from(self.send(req).await.map_err(Box::new).context(RequestFailed)?) .into_future() .await + .map_err(|err| Box::new(err) as Box) .context(DecodeFailed) } @@ -169,17 +174,12 @@ impl Client { .map_err(Error::Service)? .call(request) .await - .map_err(|err| { - if err.is::() { - // Error decorating request - *err.downcast::().expect("kube::Error") - } else if err.is::() { - // Error requesting - Error::HyperError(*err.downcast::().expect("hyper::Error")) - } else { - // Errors from other middlewares - Error::Service(err) - } + .map_err(|err| match err.downcast::() { + Ok(err) => *err, + Err(err) => match err.downcast::() { + Ok(err) => Error::HyperError(*err), + Err(err) => Error::Service(err), + }, })?; Ok(res) } @@ -231,6 +231,7 @@ impl Client { /// Perform a raw HTTP request against the API and deserialize the response /// as JSON to some known type. + #[deprecated(note = "use Client::call instead", since = "0.59.0")] pub async fn request(&self, request: Request>) -> Result where T: DeserializeOwned, @@ -370,13 +371,12 @@ impl Client { impl Client { /// Returns apiserver version. pub async fn apiserver_version(&self) -> Result { - self.request(Request::builder().uri("/version").body(vec![])?) - .await + Ok(self.call(GetApiserverVersion).await?) } /// Lists api groups that apiserver serves. pub async fn list_api_groups(&self) -> Result { - self.request(Request::builder().uri("/apis").body(vec![])?).await + Ok(self.call(ListApiGroups).await?) } /// Lists resources served in given API group. @@ -398,19 +398,18 @@ impl Client { /// # } /// ``` pub async fn list_api_group_resources(&self, apiversion: &str) -> Result { - let url = format!("/apis/{}", apiversion); - self.request(Request::builder().uri(url).body(vec![])?).await + let (group, version) = apiversion.split_once('/').unwrap_or(("", apiversion)); + Ok(self.call(ListApiGroupResources { group, version }).await?) } /// Lists versions of `core` a.k.a. `""` legacy API group. pub async fn list_core_api_versions(&self) -> Result { - self.request(Request::builder().uri("/api").body(vec![])?).await + Ok(self.call(ListCoreApiVersions).await?) } /// Lists resources served in particular `core` group version. pub async fn list_core_api_resources(&self, version: &str) -> Result { - let url = format!("/api/{}", version); - self.request(Request::builder().uri(url).body(vec![])?).await + self.list_api_group_resources(version).await } } diff --git a/kube/src/client/scope.rs b/kube/src/client/scope.rs index 2b7d2356c..b23f27f54 100644 --- a/kube/src/client/scope.rs +++ b/kube/src/client/scope.rs @@ -1,38 +1,65 @@ +//! Scopes delimiting which objects an API call applies to + use k8s_openapi::{ClusterResourceScope, NamespaceResourceScope}; +/// A scope for interacting with Kubernetes objects pub trait Scope { - fn path_segment(&self) -> String; + /// The namespace associated with the [`Scope`], if any fn namespace(&self) -> Option<&str>; } +/// Access all objects in the cluster (that the user has permission to access) +#[derive(Clone, Debug)] pub struct ClusterScope; +/// Access all objects in one namespace (that the user has permission to access) +#[derive(Clone, Debug)] pub struct NamespaceScope { + /// Namespace that access is limited to pub namespace: String, } +/// A [`Scope`] that is resolved at runtime +/// +/// NOTE: By using [`DynamicScope`] you opt out of Kube's ability to validate that the scope is valid for a given operation +#[derive(Clone, Debug)] +pub enum DynamicScope { + /// Access all objects in the cluster (that the user has permission to access) + Cluster(ClusterScope), + /// Access all objects in one namespace (that the user has permission to access) + Namespace(NamespaceScope), +} impl Scope for ClusterScope { - fn path_segment(&self) -> String { - String::new() - } - fn namespace(&self) -> Option<&str> { None } } impl Scope for NamespaceScope { - fn path_segment(&self) -> String { - format!("namespaces/{}/", self.namespace) - } - fn namespace(&self) -> Option<&str> { Some(&self.namespace) } } +impl Scope for DynamicScope { + fn namespace(&self) -> Option<&str> { + self.inner().namespace() + } +} +impl DynamicScope { + fn inner(&self) -> &dyn Scope { + match self { + DynamicScope::Cluster(scope) => scope, + DynamicScope::Namespace(scope) => scope, + } + } +} +/// Scope where a [`Resource`]'s objects can be read from or written to pub trait NativeScope: Scope {} impl> NativeScope for NamespaceScope {} impl> NativeScope for ClusterScope {} +impl NativeScope for DynamicScope {} +/// Scope where a [`Resource`]'s objects can be listed from pub trait ListScope: Scope {} impl> ListScope for NamespaceScope {} impl ListScope for ClusterScope {} +impl ListScope for DynamicScope {} diff --git a/kube/src/client/verb.rs b/kube/src/client/verb.rs index 8420f4c32..4859866e5 100644 --- a/kube/src/client/verb.rs +++ b/kube/src/client/verb.rs @@ -1,37 +1,55 @@ -use std::ops::Deref; +//! Operations supported by kube use futures::TryFuture; use http::{Request, Response}; use hyper::Body; -use kube_core::{Resource, WatchEvent}; -use serde::{de::DeserializeOwned, Deserialize}; -use snafu::{ResultExt, Snafu}; +use kube_core::{object::ObjectList, Resource, WatchEvent}; +use serde::{de::DeserializeOwned, Serialize}; +use snafu::{OptionExt, ResultExt, Snafu}; use crate::client::{ decoder::{DecodeSingle, DecodeStream}, scope::{self, NativeScope}, - Config, }; #[derive(Snafu, Debug)] +#[allow(missing_docs)] +/// Failed to create a [`Request`] for a given [`Verb`] pub enum Error { + /// Verb tried to create invalid HTTP request #[snafu(display("verb created invalid http request: {}", source))] BuildRequestFailed { source: http::Error }, + /// Failed to serialize object + #[snafu(display("failed to serialize object: {}", source))] + SerializeFailed { source: serde_json::Error }, + // Object has no name + #[snafu(display("object has no name"))] + UnnamedObject, } type Result = std::result::Result; +/// An action that Kube can take pub trait Verb { + /// Decodes the response given from the server + /// Will typically be [`DecodeSingle`] type ResponseDecoder: TryFuture + From>; + /// Prepare a HTTP request that takes the action + /// + /// Should include request-specific options, but not global options (such as the base URI or authentication tokens) fn to_http_request(&self) -> Result>; } -pub struct Get { - pub name: String, - pub scope: Scope, - pub dyn_type: Kind::DynamicType, +/// Get a single object +pub struct Get<'a, Kind: Resource, Scope> { + /// The name of the object + pub name: &'a str, + /// The scope that the object will be queried from + pub scope: &'a Scope, + /// The type of the object + pub dyn_type: &'a Kind::DynamicType, } -impl> Verb for Get { +impl<'a, Kind: Resource + DeserializeOwned, Scope: NativeScope> Verb for Get<'a, Kind, Scope> { type ResponseDecoder = DecodeSingle; fn to_http_request(&self) -> Result> { @@ -45,11 +63,14 @@ impl> Verb for Get { - pub scope: Scope, - pub dyn_type: Kind::DynamicType, +/// List all objects of a resource type +pub struct List<'a, Kind: Resource, Scope> { + /// The scope that the objects will be queried from + pub scope: &'a Scope, + /// The type of the objects + pub dyn_type: &'a Kind::DynamicType, } -impl Verb for List { +impl<'a, Kind: Resource + DeserializeOwned, Scope: scope::Scope> Verb for List<'a, Kind, Scope> { type ResponseDecoder = DecodeSingle>; fn to_http_request(&self) -> Result> { @@ -58,16 +79,15 @@ impl Verb for List { - pub items: Vec, -} -pub struct Watch { - pub scope: Scope, - pub dyn_type: Kind::DynamicType, +/// Watch all objects of a resource type for modifications +pub struct Watch<'a, Kind: Resource, Scope> { + /// The scope that the objects will be queried from + pub scope: &'a Scope, + /// The type of the objects + pub dyn_type: &'a Kind::DynamicType, } -impl Verb for Watch { +impl<'a, Kind: Resource, Scope: scope::Scope> Verb for Watch<'a, Kind, Scope> { type ResponseDecoder = DecodeStream>; fn to_http_request(&self) -> Result> { @@ -79,3 +99,83 @@ impl Verb for Watch { .context(BuildRequestFailed) } } + +/// Create a new object +pub struct Create<'a, Kind: Resource, Scope> { + /// The object to be created + pub object: &'a Kind, + /// The scope for the object to be created in + pub scope: &'a Scope, + /// The type of the object + pub dyn_type: &'a Kind::DynamicType, +} +impl<'a, Kind: Resource + Serialize, Scope: scope::Scope> Verb for Create<'a, Kind, Scope> { + type ResponseDecoder = DecodeStream; + + fn to_http_request(&self) -> Result> { + Request::post(format!( + "{}/{}", + Kind::url_path(&self.dyn_type, self.scope.namespace()), + self.object.meta().name.as_ref().context(UnnamedObject)? + )) + .body(Body::from( + serde_json::to_vec(self.object).context(SerializeFailed)?, + )) + .context(BuildRequestFailed) + } +} + +/// Get the API server's version +pub struct GetApiserverVersion; +impl Verb for GetApiserverVersion { + type ResponseDecoder = DecodeSingle; + + fn to_http_request(&self) -> Result> { + Request::get("/version") + .body(Body::empty()) + .context(BuildRequestFailed) + } +} + +/// Get all API groups supported by the API server +pub struct ListApiGroups; +impl Verb for ListApiGroups { + type ResponseDecoder = DecodeSingle; + + fn to_http_request(&self) -> Result> { + Request::get("/apis") + .body(Body::empty()) + .context(BuildRequestFailed) + } +} + +/// Get all supported versions of the legacy core API group +pub struct ListCoreApiVersions; +impl Verb for ListCoreApiVersions { + type ResponseDecoder = DecodeSingle; + + fn to_http_request(&self) -> Result> { + Request::get("/api") + .body(Body::empty()) + .context(BuildRequestFailed) + } +} + +/// Get all resources supported by the API server for a given API group and version +pub struct ListApiGroupResources<'a> { + /// The API group, use `""` for the legacy core group + pub group: &'a str, + /// THe API version + pub version: &'a str, +} +impl<'a> Verb for ListApiGroupResources<'a> { + type ResponseDecoder = DecodeSingle; + + fn to_http_request(&self) -> Result> { + let path = match self.group { + "" => format!("/api/{}", self.version), + group => format!("/apis/{}/{}", group, self.version), + }; + Request::get(path).body(Body::empty()).context(BuildRequestFailed) + } +} diff --git a/kube/src/error.rs b/kube/src/error.rs index b8346a87d..a053d6676 100644 --- a/kube/src/error.rs +++ b/kube/src/error.rs @@ -17,6 +17,10 @@ pub enum Error { #[error("ApiError: {0} ({0:?})")] Api(#[source] ErrorResponse), + /// Call failed + #[error("CallError: {0}")] + ClientCall(#[from] crate::client::CallError), + /// ConnectionError for when TcpStream fails to connect. #[error("ConnectionError: {0}")] Connection(std::io::Error), From 01b053589b434e3ec3f74f34eab857408a673d78 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Teo=20Klestrup=20R=C3=B6ijezon?= Date: Mon, 26 Jul 2021 20:29:17 +0200 Subject: [PATCH 3/7] Add Neokubism versions of all remaining regular verbs --- kube/src/api/core_methods.rs | 85 +++++++++++++++++++--------- kube/src/client/mod.rs | 6 ++ kube/src/client/verb.rs | 106 ++++++++++++++++++++++++++++++++++- kube/src/error.rs | 3 + 4 files changed, 169 insertions(+), 31 deletions(-) diff --git a/kube/src/api/core_methods.rs b/kube/src/api/core_methods.rs index 1f6fd13d5..755ea5c8e 100644 --- a/kube/src/api/core_methods.rs +++ b/kube/src/api/core_methods.rs @@ -1,11 +1,11 @@ use either::Either; -use futures::Stream; +use futures::{Stream, TryStreamExt}; use serde::{de::DeserializeOwned, Serialize}; use std::fmt::Debug; use crate::{ api::Api, - client::verb::{Get, List}, + client::verb::{self, Create, Delete, DeleteCollection, Get, List, Replace, Watch}, Result, }; use kube_core::{object::ObjectList, params::*, response::Status, Resource, WatchEvent}; @@ -87,10 +87,14 @@ where where K: Serialize, { - let bytes = serde_json::to_vec(&data)?; - let mut req = self.request.create(pp, bytes)?; - req.extensions_mut().insert("create"); - self.client.request::(req).await + Ok(self + .client + .call(Create:: { + object: &data, + scope: &self.scope, + dyn_type: &self.dyn_type, + }) + .await?) } /// Delete a named resource @@ -116,9 +120,15 @@ where /// } /// ``` pub async fn delete(&self, name: &str, dp: &DeleteParams) -> Result> { - let mut req = self.request.delete(name, dp)?; - req.extensions_mut().insert("delete"); - self.client.request_status::(req).await + Ok(self + .client + .call(Delete:: { + name, + scope: &self.scope, + dyn_type: &self.dyn_type, + }) + .await?) + .map(Either::Left) } /// Delete a collection of resources @@ -153,9 +163,14 @@ where dp: &DeleteParams, lp: &ListParams, ) -> Result, Status>> { - let mut req = self.request.delete_collection(dp, lp)?; - req.extensions_mut().insert("delete_collection"); - self.client.request_status::>(req).await + Ok(self + .client + .call(DeleteCollection:: { + scope: &self.scope, + dyn_type: &self.dyn_type, + }) + .await?) + .map(Either::Left) } /// Patch a subset of a resource's properties @@ -187,15 +202,20 @@ where /// ``` /// [`Patch`]: super::Patch /// [`PatchParams`]: super::PatchParams - pub async fn patch( - &self, - name: &str, - pp: &PatchParams, - patch: &Patch

, - ) -> Result { - let mut req = self.request.patch(name, pp, patch)?; - req.extensions_mut().insert("patch"); - self.client.request::(req).await + pub async fn patch(&self, name: &str, pp: &PatchParams, patch: &Patch) -> Result + where + K: Serialize, + Patch: Serialize, + { + Ok(self + .client + .call(verb::Patch:: { + name, + scope: &self.scope, + dyn_type: &self.dyn_type, + patch, + }) + .await?) } /// Replace a resource entirely with a new one @@ -246,10 +266,14 @@ where where K: Serialize, { - let bytes = serde_json::to_vec(&data)?; - let mut req = self.request.replace(name, pp, bytes)?; - req.extensions_mut().insert("replace"); - self.client.request::(req).await + Ok(self + .client + .call(Replace:: { + scope: &self.scope, + dyn_type: &self.dyn_type, + object: data, + }) + .await?) } /// Watch a list of resources @@ -294,8 +318,13 @@ where lp: &ListParams, version: &str, ) -> Result>>> { - let mut req = self.request.watch(lp, version)?; - req.extensions_mut().insert("watch"); - self.client.request_events::(req).await + Ok(self + .client + .call(Watch:: { + scope: &self.scope, + dyn_type: &self.dyn_type, + }) + .await? + .err_into()) } } diff --git a/kube/src/client/mod.rs b/kube/src/client/mod.rs index a5cfba86f..8c2c2719e 100644 --- a/kube/src/client/mod.rs +++ b/kube/src/client/mod.rs @@ -232,6 +232,7 @@ impl Client { /// Perform a raw HTTP request against the API and deserialize the response /// as JSON to some known type. #[deprecated(note = "use Client::call instead", since = "0.59.0")] + #[allow(deprecated)] pub async fn request(&self, request: Request>) -> Result where T: DeserializeOwned, @@ -246,6 +247,7 @@ impl Client { /// Perform a raw HTTP request against the API and get back the response /// as a string + #[deprecated(note = "use Client::call instead", since = "0.59.0")] pub async fn request_text(&self, request: Request>) -> Result { let res = self.send(request.map(Body::from)).await?; let status = res.status(); @@ -259,6 +261,7 @@ impl Client { /// Perform a raw HTTP request against the API and get back the response /// as a stream of bytes + #[deprecated(note = "use Client::call instead", since = "0.59.0")] pub async fn request_text_stream( &self, request: Request>, @@ -270,6 +273,8 @@ impl Client { /// Perform a raw HTTP request against the API and get back either an object /// deserialized as JSON or a [`Status`] Object. + #[deprecated(note = "use Client::call instead", since = "0.59.0")] + #[allow(deprecated)] pub async fn request_status(&self, request: Request>) -> Result> where T: DeserializeOwned, @@ -292,6 +297,7 @@ impl Client { } /// Perform a raw request and get back a stream of [`WatchEvent`] objects + #[deprecated(note = "use Client::call instead", since = "0.59.0")] pub async fn request_events( &self, request: Request>, diff --git a/kube/src/client/verb.rs b/kube/src/client/verb.rs index 4859866e5..31ae7de00 100644 --- a/kube/src/client/verb.rs +++ b/kube/src/client/verb.rs @@ -3,7 +3,7 @@ use futures::TryFuture; use http::{Request, Response}; use hyper::Body; -use kube_core::{object::ObjectList, Resource, WatchEvent}; +use kube_core::{object::ObjectList, params, Resource, WatchEvent}; use serde::{de::DeserializeOwned, Serialize}; use snafu::{OptionExt, ResultExt, Snafu}; @@ -109,8 +109,10 @@ pub struct Create<'a, Kind: Resource, Scope> { /// The type of the object pub dyn_type: &'a Kind::DynamicType, } -impl<'a, Kind: Resource + Serialize, Scope: scope::Scope> Verb for Create<'a, Kind, Scope> { - type ResponseDecoder = DecodeStream; +impl<'a, Kind: Resource + Serialize + DeserializeOwned, Scope: scope::Scope> Verb + for Create<'a, Kind, Scope> +{ + type ResponseDecoder = DecodeSingle; fn to_http_request(&self) -> Result> { Request::post(format!( @@ -125,6 +127,104 @@ impl<'a, Kind: Resource + Serialize, Scope: scope::Scope> Verb for Create<'a, Ki } } +/// Delete a named object +pub struct Delete<'a, Kind: Resource, Scope> { + /// The name of the object to be deleted + pub name: &'a str, + /// The scope for the object to be deleted from + pub scope: &'a Scope, + /// The type of the object + pub dyn_type: &'a Kind::DynamicType, +} +impl<'a, Kind: Resource + DeserializeOwned, Scope: scope::Scope> Verb for Delete<'a, Kind, Scope> { + type ResponseDecoder = DecodeSingle; + + fn to_http_request(&self) -> Result> { + Request::delete(format!( + "{}/{}", + Kind::url_path(&self.dyn_type, self.scope.namespace()), + self.name + )) + .body(Body::empty()) + .context(BuildRequestFailed) + } +} + +/// Delete objects matching a search query +pub struct DeleteCollection<'a, Kind: Resource, Scope> { + /// The scope for the object to be deleted from + pub scope: &'a Scope, + /// The type of the objects + pub dyn_type: &'a Kind::DynamicType, +} +impl<'a, Kind: Resource + DeserializeOwned, Scope: scope::Scope> Verb for DeleteCollection<'a, Kind, Scope> { + type ResponseDecoder = DecodeSingle>; + + fn to_http_request(&self) -> Result> { + Request::delete(Kind::url_path(&self.dyn_type, self.scope.namespace())) + .body(Body::empty()) + .context(BuildRequestFailed) + } +} + +/// Patch a named object +pub struct Patch<'a, Kind: Resource + Serialize, Scope> { + /// The name of the object to patch + pub name: &'a str, + /// The scope of the object + pub scope: &'a Scope, + /// The type of the objects + pub dyn_type: &'a Kind::DynamicType, + /// The patch to be applied + pub patch: &'a params::Patch, +} +impl<'a, Kind: Resource + Serialize + DeserializeOwned, Scope: scope::Scope> Verb for Patch<'a, Kind, Scope> +where + params::Patch: Serialize, +{ + type ResponseDecoder = DecodeSingle; + + fn to_http_request(&self) -> Result> { + Request::patch(format!( + "{}/{}", + Kind::url_path(&self.dyn_type, self.scope.namespace()), + self.name + )) + .body(Body::from( + serde_json::to_vec(self.patch).context(SerializeFailed)?, + )) + .context(BuildRequestFailed) + } +} + +/// Replace a named existing object +pub struct Replace<'a, Kind: Resource, Scope> { + /// The object to replace + /// Requires `metadata.resourceVersion` to be `Some` + pub object: &'a Kind, + /// The scope of the object + pub scope: &'a Scope, + /// The type of the objects + pub dyn_type: &'a Kind::DynamicType, +} +impl<'a, Kind: Resource + Serialize + DeserializeOwned, Scope: scope::Scope> Verb + for Replace<'a, Kind, Scope> +{ + type ResponseDecoder = DecodeSingle; + + fn to_http_request(&self) -> Result> { + Request::patch(format!( + "{}/{}", + Kind::url_path(&self.dyn_type, self.scope.namespace()), + self.object.meta().name.as_ref().context(UnnamedObject)? + )) + .body(Body::from( + serde_json::to_vec(self.object).context(SerializeFailed)?, + )) + .context(BuildRequestFailed) + } +} + /// Get the API server's version pub struct GetApiserverVersion; impl Verb for GetApiserverVersion { diff --git a/kube/src/error.rs b/kube/src/error.rs index a053d6676..d3e135741 100644 --- a/kube/src/error.rs +++ b/kube/src/error.rs @@ -20,6 +20,9 @@ pub enum Error { /// Call failed #[error("CallError: {0}")] ClientCall(#[from] crate::client::CallError), + /// Failed to decode stream + #[error("DecodeStreamError: {0}")] + DecodeStream(#[from] crate::client::decoder::stream::Error), /// ConnectionError for when TcpStream fails to connect. #[error("ConnectionError: {0}")] From 06d360b88e73e9b10535df8f1aab868351364e66 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Teo=20Klestrup=20R=C3=B6ijezon?= Date: Mon, 26 Jul 2021 23:48:28 +0200 Subject: [PATCH 4/7] Carry over get/list parameters --- kube/Cargo.toml | 1 + kube/src/api/core_methods.rs | 10 ++++- kube/src/client/verb.rs | 87 +++++++++++++++++++++++++++++++----- 3 files changed, 85 insertions(+), 13 deletions(-) diff --git a/kube/Cargo.toml b/kube/Cargo.toml index 534dd4475..71d632288 100644 --- a/kube/Cargo.toml +++ b/kube/Cargo.toml @@ -72,6 +72,7 @@ pin-project = { version = "1.0.4", optional = true } rand = { version = "0.8.3", optional = true } tracing = { version = "0.1.25", features = ["log"], optional = true } snafu = "0.6.10" +form_urlencoded = "1.0.1" [dependencies.k8s-openapi] version = "0.12.0" diff --git a/kube/src/api/core_methods.rs b/kube/src/api/core_methods.rs index 755ea5c8e..a795b4b2d 100644 --- a/kube/src/api/core_methods.rs +++ b/kube/src/api/core_methods.rs @@ -1,11 +1,11 @@ use either::Either; use futures::{Stream, TryStreamExt}; use serde::{de::DeserializeOwned, Serialize}; -use std::fmt::Debug; +use std::{fmt::Debug, time::Duration}; use crate::{ api::Api, - client::verb::{self, Create, Delete, DeleteCollection, Get, List, Replace, Watch}, + client::verb::{self, Create, Delete, DeleteCollection, Get, List, Query, Replace, Watch}, Result, }; use kube_core::{object::ObjectList, params::*, response::Status, Resource, WatchEvent}; @@ -63,6 +63,9 @@ where .call(List { scope: &self.scope, dyn_type: &self.dyn_type, + query: &Query::from_list_params(lp), + limit: lp.limit, + continue_token: lp.continue_token.as_deref(), }) .await?) } @@ -323,6 +326,9 @@ where .call(Watch:: { scope: &self.scope, dyn_type: &self.dyn_type, + query: &Query::from_list_params(lp), + version, + timeout: lp.timeout.map(|timeout| Duration::from_secs(timeout.into())), }) .await? .err_into()) diff --git a/kube/src/client/verb.rs b/kube/src/client/verb.rs index 31ae7de00..2a2114ed7 100644 --- a/kube/src/client/verb.rs +++ b/kube/src/client/verb.rs @@ -1,9 +1,15 @@ //! Operations supported by kube +use std::{str::FromStr, time::Duration}; + use futures::TryFuture; -use http::{Request, Response}; +use http::{Request, Response, Uri}; use hyper::Body; -use kube_core::{object::ObjectList, params, Resource, WatchEvent}; +use kube_core::{ + object::ObjectList, + params::{self, ListParams}, + Resource, WatchEvent, +}; use serde::{de::DeserializeOwned, Serialize}; use snafu::{OptionExt, ResultExt, Snafu}; @@ -69,14 +75,64 @@ pub struct List<'a, Kind: Resource, Scope> { pub scope: &'a Scope, /// The type of the objects pub dyn_type: &'a Kind::DynamicType, + /// The query to filter the objects by + pub query: &'a Query<'a>, + + /// Limit the number of results. + /// + /// If there are more results, the server will respond with a continue token which can be used to fetch another page + /// of results. See the [Kubernetes API docs](https://kubernetes.io/docs/reference/using-api/api-concepts/#retrieving-large-results-sets-in-chunks) + /// for pagination details. + pub limit: Option, + + /// Fetch a second page of results. + /// + /// After listing results with a `limit`, a continue token can be used to fetch another page of results. + pub continue_token: Option<&'a str>, } impl<'a, Kind: Resource + DeserializeOwned, Scope: scope::Scope> Verb for List<'a, Kind, Scope> { type ResponseDecoder = DecodeSingle>; fn to_http_request(&self) -> Result> { - Request::get(Kind::url_path(&self.dyn_type, self.scope.namespace())) - .body(Body::empty()) - .context(BuildRequestFailed) + let mut url = format!("{}?", Kind::url_path(&self.dyn_type, self.scope.namespace())); + let mut qp = form_urlencoded::Serializer::new(&mut url); + self.query.populate_qp(&mut qp); + if let Some(limit) = self.limit { + qp.append_pair("limit", &limit.to_string()); + } + if let Some(cont) = self.continue_token { + qp.append_pair("continue", cont); + } + Request::get(url).body(Body::empty()).context(BuildRequestFailed) + } +} +/// Common query parameters used to select multiple objects +#[derive(Default)] +pub struct Query<'a> { + /// A selector to restrict the list of returned objects by their labels. + /// + /// Defaults to everything if `None`. + pub label_selector: Option<&'a str>, + /// A selector to restrict the list of returned objects by their fields. + /// + /// Defaults to everything if `None`. + pub field_selector: Option<&'a str>, +} +impl<'a> Query<'a> { + fn populate_qp(&self, qp: &mut form_urlencoded::Serializer<&mut String>) { + if let Some(labels) = self.label_selector { + qp.append_pair("labelSelector", labels); + } + if let Some(fields) = self.field_selector { + qp.append_pair("fieldSelector", fields); + } + } + + pub(crate) fn from_list_params(lp: &'a ListParams) -> Self { + Self { + label_selector: lp.label_selector.as_deref(), + field_selector: lp.field_selector.as_deref(), + } } } @@ -86,17 +142,26 @@ pub struct Watch<'a, Kind: Resource, Scope> { pub scope: &'a Scope, /// The type of the objects pub dyn_type: &'a Kind::DynamicType, + /// The query to filter the objects by + pub query: &'a Query<'a>, + /// The `resourceVersion` to report events newer than + pub version: &'a str, + /// Upper bound on how long the watch should be active for, rounded down to the nearest second + pub timeout: Option, } impl<'a, Kind: Resource, Scope: scope::Scope> Verb for Watch<'a, Kind, Scope> { type ResponseDecoder = DecodeStream>; fn to_http_request(&self) -> Result> { - Request::get(format!( - "{}?watch=1", - Kind::url_path(&self.dyn_type, self.scope.namespace()), - )) - .body(Body::empty()) - .context(BuildRequestFailed) + let mut url = format!("{}?", Kind::url_path(&self.dyn_type, self.scope.namespace()),); + let mut qp = form_urlencoded::Serializer::new(&mut url); + qp.append_pair("watch", "1"); + qp.append_pair("resourceVersion", self.version); + qp.append_pair("allowWatchBookmarks", "1"); + if let Some(timeout) = self.timeout { + qp.append_pair("timeoutSeconds", &timeout.as_secs().to_string()); + } + Request::get(url).body(Body::empty()).context(BuildRequestFailed) } } From bcddc3b32806dae30350af33065851dceae365e9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Teo=20Klestrup=20R=C3=B6ijezon?= Date: Tue, 27 Jul 2021 12:04:01 +0200 Subject: [PATCH 5/7] Fix Patch typing --- kube-core/src/params.rs | 11 ++++++----- kube/src/api/core_methods.rs | 3 +-- kube/src/client/verb.rs | 14 +++++--------- 3 files changed, 12 insertions(+), 16 deletions(-) diff --git a/kube-core/src/params.rs b/kube-core/src/params.rs index ae8ce8002..e84652c54 100644 --- a/kube-core/src/params.rs +++ b/kube-core/src/params.rs @@ -199,7 +199,7 @@ impl PostParams { /// ``` #[non_exhaustive] #[derive(Debug)] -pub enum Patch { +pub enum Patch { /// [Server side apply](https://kubernetes.io/docs/reference/using-api/api-concepts/#server-side-apply) /// /// Requires kubernetes >= 1.16 @@ -226,12 +226,13 @@ pub enum Patch { Strategic(T), } -impl Patch { +impl Patch { pub(crate) fn is_apply(&self) -> bool { matches!(self, Patch::Apply(_)) } - pub(crate) fn content_type(&self) -> &'static str { + /// The MIME content type for the patch + pub fn content_type(&self) -> &'static str { match &self { Self::Apply(_) => "application/apply-patch+yaml", #[cfg(feature = "jsonpatch")] @@ -244,7 +245,8 @@ impl Patch { } impl Patch { - pub(crate) fn serialize(&self) -> Result> { + /// Try to serialize the patch object to JSON + pub fn serialize(&self) -> Result, serde_json::Error> { match self { Self::Apply(p) => serde_json::to_vec(p), #[cfg(feature = "jsonpatch")] @@ -253,7 +255,6 @@ impl Patch { Self::Strategic(p) => serde_json::to_vec(p), Self::Merge(p) => serde_json::to_vec(p), } - .map_err(Into::into) } } diff --git a/kube/src/api/core_methods.rs b/kube/src/api/core_methods.rs index a795b4b2d..5ac15308c 100644 --- a/kube/src/api/core_methods.rs +++ b/kube/src/api/core_methods.rs @@ -198,7 +198,7 @@ where /// } /// }); /// let params = PatchParams::apply("myapp"); - /// let patch = Patch::Apply(&patch); + /// let patch = Patch::::Apply(serde_json::from_value(patch)?); /// let o_patched = pods.patch("blog", ¶ms, &patch).await?; /// Ok(()) /// } @@ -208,7 +208,6 @@ where pub async fn patch(&self, name: &str, pp: &PatchParams, patch: &Patch) -> Result where K: Serialize, - Patch: Serialize, { Ok(self .client diff --git a/kube/src/client/verb.rs b/kube/src/client/verb.rs index 2a2114ed7..681308788 100644 --- a/kube/src/client/verb.rs +++ b/kube/src/client/verb.rs @@ -3,7 +3,7 @@ use std::{str::FromStr, time::Duration}; use futures::TryFuture; -use http::{Request, Response, Uri}; +use http::{header::CONTENT_TYPE, Request, Response, Uri}; use hyper::Body; use kube_core::{ object::ObjectList, @@ -233,7 +233,7 @@ impl<'a, Kind: Resource + DeserializeOwned, Scope: scope::Scope> Verb for Delete } /// Patch a named object -pub struct Patch<'a, Kind: Resource + Serialize, Scope> { +pub struct Patch<'a, Kind: Resource, Scope> { /// The name of the object to patch pub name: &'a str, /// The scope of the object @@ -243,10 +243,7 @@ pub struct Patch<'a, Kind: Resource + Serialize, Scope> { /// The patch to be applied pub patch: &'a params::Patch, } -impl<'a, Kind: Resource + Serialize + DeserializeOwned, Scope: scope::Scope> Verb for Patch<'a, Kind, Scope> -where - params::Patch: Serialize, -{ +impl<'a, Kind: Resource + Serialize + DeserializeOwned, Scope: scope::Scope> Verb for Patch<'a, Kind, Scope> { type ResponseDecoder = DecodeSingle; fn to_http_request(&self) -> Result> { @@ -255,9 +252,8 @@ where Kind::url_path(&self.dyn_type, self.scope.namespace()), self.name )) - .body(Body::from( - serde_json::to_vec(self.patch).context(SerializeFailed)?, - )) + .header(CONTENT_TYPE, self.patch.content_type()) + .body(Body::from(self.patch.serialize().context(SerializeFailed)?)) .context(BuildRequestFailed) } } From 2f5c7359b79cb6d8817ae847c3d8c9ccec795265 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Teo=20Klestrup=20R=C3=B6ijezon?= Date: Mon, 2 Aug 2021 04:31:31 +0200 Subject: [PATCH 6/7] Fill in the remaining object-level verb params --- examples/configmapgen_controller.rs | 12 +- kube-runtime/src/finalizer.rs | 4 +- kube-runtime/src/lib.rs | 2 + kube-runtime/src/reflector/store.rs | 4 +- kube-runtime/src/watcher.rs | 6 +- kube/src/api/core_methods.rs | 19 ++- kube/src/client/scope.rs | 10 ++ kube/src/client/verb.rs | 177 +++++++++++++++++++++------- 8 files changed, 174 insertions(+), 60 deletions(-) diff --git a/examples/configmapgen_controller.rs b/examples/configmapgen_controller.rs index acf7a0b6b..bf6cdbbcb 100644 --- a/examples/configmapgen_controller.rs +++ b/examples/configmapgen_controller.rs @@ -82,11 +82,15 @@ async fn reconcile(generator: ConfigMapGenerator, ctx: Context) -> Result< ); cm_api .patch( - cm.metadata.name.as_ref().context(MissingObjectKey { - name: ".metadata.name", - })?, + &cm.metadata + .name + .as_ref() + .context(MissingObjectKey { + name: ".metadata.name", + })? + .clone(), &PatchParams::apply("configmapgenerator.kube-rt.nullable.se"), - &Patch::Apply(&cm), + &Patch::Apply(cm), ) .await .context(ConfigMapCreationFailed)?; diff --git a/kube-runtime/src/finalizer.rs b/kube-runtime/src/finalizer.rs index 2fbd7b928..69553b485 100644 --- a/kube-runtime/src/finalizer.rs +++ b/kube-runtime/src/finalizer.rs @@ -127,7 +127,7 @@ where .context(CleanupFailed)?; // Cleanup was successful, remove the finalizer so that deletion can continue let finalizer_path = format!("/metadata/finalizers/{}", finalizer_i); - api.patch::( + api.patch( &name, &PatchParams::default(), &Patch::Json(json_patch::Patch(vec![ @@ -167,7 +167,7 @@ where value: finalizer_name.into(), })] }); - api.patch::( + api.patch( obj.meta().name.as_deref().context(UnnamedObject)?, &PatchParams::default(), &Patch::Json(patch), diff --git a/kube-runtime/src/lib.rs b/kube-runtime/src/lib.rs index 87a858355..28acbf822 100644 --- a/kube-runtime/src/lib.rs +++ b/kube-runtime/src/lib.rs @@ -16,6 +16,8 @@ // Triggered by many derive macros (kube-derive, derivative) #![allow(clippy::default_trait_access)] #![allow(clippy::type_repetition_in_bounds)] +// Triggered by tokio::test macros +#![allow(clippy::semicolon_if_nothing_returned)] pub mod controller; pub mod finalizer; diff --git a/kube-runtime/src/reflector/store.rs b/kube-runtime/src/reflector/store.rs index e13d3c8e1..c287d64bd 100644 --- a/kube-runtime/src/reflector/store.rs +++ b/kube-runtime/src/reflector/store.rs @@ -50,11 +50,11 @@ where match event { watcher::Event::Applied(obj) => { self.store - .insert(ObjectRef::from_obj_with(&obj, self.dyntype.clone()), obj.clone()); + .insert(ObjectRef::from_obj_with(obj, self.dyntype.clone()), obj.clone()); } watcher::Event::Deleted(obj) => { self.store - .remove(&ObjectRef::from_obj_with(&obj, self.dyntype.clone())); + .remove(&ObjectRef::from_obj_with(obj, self.dyntype.clone())); } watcher::Event::Restarted(new_objs) => { let new_objs = new_objs diff --git a/kube-runtime/src/watcher.rs b/kube-runtime/src/watcher.rs index b01fa083b..6eb002eee 100644 --- a/kube-runtime/src/watcher.rs +++ b/kube-runtime/src/watcher.rs @@ -114,13 +114,13 @@ async fn step_trampolined, ) -> (Option>>, State) { match state { - State::Empty => match api.list(&list_params).await { + State::Empty => match api.list(list_params).await { Ok(list) => (Some(Ok(Event::Restarted(list.items))), State::InitListed { resource_version: list.metadata.resource_version.unwrap(), }), Err(err) => (Some(Err(err).context(InitialListFailed)), State::Empty), }, - State::InitListed { resource_version } => match api.watch(&list_params, &resource_version).await { + State::InitListed { resource_version } => match api.watch(list_params, &resource_version).await { Ok(stream) => (None, State::Watching { resource_version, stream: stream.boxed(), @@ -179,7 +179,7 @@ async fn step( mut state: State, ) -> (Result>, State) { loop { - match step_trampolined(&api, &list_params, state).await { + match step_trampolined(api, list_params, state).await { (Some(result), new_state) => return (result, new_state), (None, new_state) => state = new_state, } diff --git a/kube/src/api/core_methods.rs b/kube/src/api/core_methods.rs index 5ac15308c..efe5550b5 100644 --- a/kube/src/api/core_methods.rs +++ b/kube/src/api/core_methods.rs @@ -5,7 +5,9 @@ use std::{fmt::Debug, time::Duration}; use crate::{ api::Api, - client::verb::{self, Create, Delete, DeleteCollection, Get, List, Query, Replace, Watch}, + client::verb::{ + self, Create, Delete, DeleteCollection, DeleteMode, Get, List, Query, Replace, Watch, WriteMode, + }, Result, }; use kube_core::{object::ObjectList, params::*, response::Status, Resource, WatchEvent}; @@ -92,10 +94,10 @@ where { Ok(self .client - .call(Create:: { - object: &data, - scope: &self.scope, + .call(Create:: { + object: data, dyn_type: &self.dyn_type, + write_mode: &WriteMode::from_post_params(pp), }) .await?) } @@ -129,6 +131,7 @@ where name, scope: &self.scope, dyn_type: &self.dyn_type, + delete_mode: &DeleteMode::from_delete_params(dp), }) .await?) .map(Either::Left) @@ -171,6 +174,8 @@ where .call(DeleteCollection:: { scope: &self.scope, dyn_type: &self.dyn_type, + query: &Query::from_list_params(lp), + delete_mode: &DeleteMode::from_delete_params(dp), }) .await?) .map(Either::Left) @@ -215,7 +220,12 @@ where name, scope: &self.scope, dyn_type: &self.dyn_type, + force: pp.force, patch, + write_mode: &WriteMode { + dry_run: pp.dry_run, + field_manager: pp.field_manager.as_deref(), + }, }) .await?) } @@ -274,6 +284,7 @@ where scope: &self.scope, dyn_type: &self.dyn_type, object: data, + write_mode: &WriteMode::from_post_params(pp), }) .await?) } diff --git a/kube/src/client/scope.rs b/kube/src/client/scope.rs index b23f27f54..c31474818 100644 --- a/kube/src/client/scope.rs +++ b/kube/src/client/scope.rs @@ -1,6 +1,7 @@ //! Scopes delimiting which objects an API call applies to use k8s_openapi::{ClusterResourceScope, NamespaceResourceScope}; +use kube_core::Resource; /// A scope for interacting with Kubernetes objects pub trait Scope { @@ -50,6 +51,15 @@ impl DynamicScope { DynamicScope::Namespace(scope) => scope, } } + + pub(crate) fn of_object(object: &T) -> Self { + match &object.meta().namespace { + Some(ns) => DynamicScope::Namespace(NamespaceScope { + namespace: ns.to_string(), + }), + None => DynamicScope::Cluster(ClusterScope), + } + } } /// Scope where a [`Resource`]'s objects can be read from or written to diff --git a/kube/src/client/verb.rs b/kube/src/client/verb.rs index 681308788..b29b5a1a6 100644 --- a/kube/src/client/verb.rs +++ b/kube/src/client/verb.rs @@ -1,22 +1,20 @@ //! Operations supported by kube -use std::{str::FromStr, time::Duration}; - +use crate::client::{ + decoder::{DecodeSingle, DecodeStream}, + scope::{self, DynamicScope, NativeScope, Scope}, +}; use futures::TryFuture; -use http::{header::CONTENT_TYPE, Request, Response, Uri}; +use http::{header::CONTENT_TYPE, Request, Response}; use hyper::Body; use kube_core::{ object::ObjectList, - params::{self, ListParams}, + params::{self, DeleteParams, ListParams, PostParams, Preconditions, PropagationPolicy}, Resource, WatchEvent, }; use serde::{de::DeserializeOwned, Serialize}; use snafu::{OptionExt, ResultExt, Snafu}; - -use crate::client::{ - decoder::{DecodeSingle, DecodeStream}, - scope::{self, NativeScope}, -}; +use std::time::Duration; #[derive(Snafu, Debug)] #[allow(missing_docs)] @@ -61,7 +59,7 @@ impl<'a, Kind: Resource + DeserializeOwned, Scope: NativeScope> Verb for G fn to_http_request(&self) -> Result> { Request::get(format!( "{}/{}", - Kind::url_path(&self.dyn_type, self.scope.namespace()), + Kind::url_path(self.dyn_type, self.scope.namespace()), self.name )) .body(Body::empty()) @@ -94,7 +92,7 @@ impl<'a, Kind: Resource + DeserializeOwned, Scope: scope::Scope> Verb for List<' type ResponseDecoder = DecodeSingle>; fn to_http_request(&self) -> Result> { - let mut url = format!("{}?", Kind::url_path(&self.dyn_type, self.scope.namespace())); + let mut url = format!("{}?", Kind::url_path(self.dyn_type, self.scope.namespace())); let mut qp = form_urlencoded::Serializer::new(&mut url); self.query.populate_qp(&mut qp); if let Some(limit) = self.limit { @@ -153,7 +151,7 @@ impl<'a, Kind: Resource, Scope: scope::Scope> Verb for Watch<'a, Kind, Scope> { type ResponseDecoder = DecodeStream>; fn to_http_request(&self) -> Result> { - let mut url = format!("{}?", Kind::url_path(&self.dyn_type, self.scope.namespace()),); + let mut url = format!("{}?", Kind::url_path(self.dyn_type, self.scope.namespace()),); let mut qp = form_urlencoded::Serializer::new(&mut url); qp.append_pair("watch", "1"); qp.append_pair("resourceVersion", self.version); @@ -166,29 +164,56 @@ impl<'a, Kind: Resource, Scope: scope::Scope> Verb for Watch<'a, Kind, Scope> { } /// Create a new object -pub struct Create<'a, Kind: Resource, Scope> { +pub struct Create<'a, Kind: Resource> { /// The object to be created pub object: &'a Kind, - /// The scope for the object to be created in - pub scope: &'a Scope, /// The type of the object pub dyn_type: &'a Kind::DynamicType, + /// The mode used when writing + pub write_mode: &'a WriteMode<'a>, } -impl<'a, Kind: Resource + Serialize + DeserializeOwned, Scope: scope::Scope> Verb - for Create<'a, Kind, Scope> -{ +impl<'a, Kind: Resource + Serialize + DeserializeOwned> Verb for Create<'a, Kind> { type ResponseDecoder = DecodeSingle; fn to_http_request(&self) -> Result> { - Request::post(format!( - "{}/{}", - Kind::url_path(&self.dyn_type, self.scope.namespace()), + let mut url = format!( + "{}/{}?", + Kind::url_path(self.dyn_type, DynamicScope::of_object(self.object).namespace()), self.object.meta().name.as_ref().context(UnnamedObject)? - )) - .body(Body::from( - serde_json::to_vec(self.object).context(SerializeFailed)?, - )) - .context(BuildRequestFailed) + ); + let mut qp = form_urlencoded::Serializer::new(&mut url); + self.write_mode.populate_qp(&mut qp); + Request::post(url) + .body(Body::from( + serde_json::to_vec(self.object).context(SerializeFailed)?, + )) + .context(BuildRequestFailed) + } +} +/// Specifies how to write modifications +pub struct WriteMode<'a> { + /// When present, indicates that modifications should not be persisted. + pub dry_run: bool, + + /// fieldManager is a name of the actor that is making changes. Required for [`Patch::Apply`] + /// optional for everything else. + pub field_manager: Option<&'a str>, +} +impl<'a> WriteMode<'a> { + fn populate_qp(&self, qp: &mut form_urlencoded::Serializer<&mut String>) { + if self.dry_run { + qp.append_pair("dryRun", "All"); + } + if let Some(fm) = self.field_manager { + qp.append_pair("fieldManager", fm); + } + } + + pub(crate) fn from_post_params(pp: &'a PostParams) -> Self { + Self { + dry_run: pp.dry_run, + field_manager: pp.field_manager.as_deref(), + } } } @@ -200,6 +225,8 @@ pub struct Delete<'a, Kind: Resource, Scope> { pub scope: &'a Scope, /// The type of the object pub dyn_type: &'a Kind::DynamicType, + /// The mode used when deleting + pub delete_mode: &'a DeleteMode<'a>, } impl<'a, Kind: Resource + DeserializeOwned, Scope: scope::Scope> Verb for Delete<'a, Kind, Scope> { type ResponseDecoder = DecodeSingle; @@ -207,13 +234,49 @@ impl<'a, Kind: Resource + DeserializeOwned, Scope: scope::Scope> Verb for Delete fn to_http_request(&self) -> Result> { Request::delete(format!( "{}/{}", - Kind::url_path(&self.dyn_type, self.scope.namespace()), + Kind::url_path(self.dyn_type, self.scope.namespace()), self.name )) - .body(Body::empty()) + .body(Body::from( + serde_json::to_vec(self.delete_mode).context(SerializeFailed)?, + )) .context(BuildRequestFailed) } } +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +/// Specifies how to delete objects +pub struct DeleteMode<'a> { + /// When present, indicates that modifications should not be persisted. + pub dry_run: bool, + + /// The duration before the object should be deleted, rounded down to the nearest second. + /// + /// The value zero indicates delete immediately. + /// If this value is `None`, the default grace period for the specified type will be used. + pub grace_period_seconds: Option, + + /// Whether or how garbage collection is performed. + /// + /// The default policy is decided by the existing finalizer set in + /// `metadata.finalizers`, and the resource-specific default policy. + pub propagation_policy: Option<&'a PropagationPolicy>, + + /// Condtions that must be fulfilled before a deletion is carried out + /// + /// If not possible, a `409 Conflict` status will be returned. + pub preconditions: Option<&'a Preconditions>, +} +impl<'a> DeleteMode<'a> { + pub(crate) fn from_delete_params(dp: &'a DeleteParams) -> Self { + Self { + dry_run: dp.dry_run, + grace_period_seconds: dp.grace_period_seconds.map(u64::from).map(Duration::from_secs), + propagation_policy: dp.propagation_policy.as_ref(), + preconditions: dp.preconditions.as_ref(), + } + } +} /// Delete objects matching a search query pub struct DeleteCollection<'a, Kind: Resource, Scope> { @@ -221,13 +284,22 @@ pub struct DeleteCollection<'a, Kind: Resource, Scope> { pub scope: &'a Scope, /// The type of the objects pub dyn_type: &'a Kind::DynamicType, + /// The query to filter the objects by + pub query: &'a Query<'a>, + /// The mode used when deleting + pub delete_mode: &'a DeleteMode<'a>, } impl<'a, Kind: Resource + DeserializeOwned, Scope: scope::Scope> Verb for DeleteCollection<'a, Kind, Scope> { type ResponseDecoder = DecodeSingle>; fn to_http_request(&self) -> Result> { - Request::delete(Kind::url_path(&self.dyn_type, self.scope.namespace())) - .body(Body::empty()) + let mut url = format!("{}?", Kind::url_path(self.dyn_type, self.scope.namespace())); + let mut qp = form_urlencoded::Serializer::new(&mut url); + self.query.populate_qp(&mut qp); + Request::delete(url) + .body(Body::from( + serde_json::to_vec(self.delete_mode).context(SerializeFailed)?, + )) .context(BuildRequestFailed) } } @@ -242,19 +314,29 @@ pub struct Patch<'a, Kind: Resource, Scope> { pub dyn_type: &'a Kind::DynamicType, /// The patch to be applied pub patch: &'a params::Patch, + /// Whether to ignore conflicts where fields are owned by other field managers + pub force: bool, + /// The mode used when deleting + pub write_mode: &'a WriteMode<'a>, } impl<'a, Kind: Resource + Serialize + DeserializeOwned, Scope: scope::Scope> Verb for Patch<'a, Kind, Scope> { type ResponseDecoder = DecodeSingle; fn to_http_request(&self) -> Result> { - Request::patch(format!( - "{}/{}", - Kind::url_path(&self.dyn_type, self.scope.namespace()), + let mut url = format!( + "{}/{}?", + Kind::url_path(self.dyn_type, self.scope.namespace()), self.name - )) - .header(CONTENT_TYPE, self.patch.content_type()) - .body(Body::from(self.patch.serialize().context(SerializeFailed)?)) - .context(BuildRequestFailed) + ); + let mut qp = form_urlencoded::Serializer::new(&mut url); + if self.force { + qp.append_pair("force", "1"); + } + self.write_mode.populate_qp(&mut qp); + Request::patch(url) + .header(CONTENT_TYPE, self.patch.content_type()) + .body(Body::from(self.patch.serialize().context(SerializeFailed)?)) + .context(BuildRequestFailed) } } @@ -267,6 +349,8 @@ pub struct Replace<'a, Kind: Resource, Scope> { pub scope: &'a Scope, /// The type of the objects pub dyn_type: &'a Kind::DynamicType, + /// The mode used when deleting + pub write_mode: &'a WriteMode<'a>, } impl<'a, Kind: Resource + Serialize + DeserializeOwned, Scope: scope::Scope> Verb for Replace<'a, Kind, Scope> @@ -274,15 +358,18 @@ impl<'a, Kind: Resource + Serialize + DeserializeOwned, Scope: scope::Scope> Ver type ResponseDecoder = DecodeSingle; fn to_http_request(&self) -> Result> { - Request::patch(format!( - "{}/{}", - Kind::url_path(&self.dyn_type, self.scope.namespace()), + let mut url = format!( + "{}/{}?", + Kind::url_path(self.dyn_type, self.scope.namespace()), self.object.meta().name.as_ref().context(UnnamedObject)? - )) - .body(Body::from( - serde_json::to_vec(self.object).context(SerializeFailed)?, - )) - .context(BuildRequestFailed) + ); + let mut qp = form_urlencoded::Serializer::new(&mut url); + self.write_mode.populate_qp(&mut qp); + Request::put(url) + .body(Body::from( + serde_json::to_vec(self.object).context(SerializeFailed)?, + )) + .context(BuildRequestFailed) } } From 175d4cc5c9332a67dfd7cef27e804bf514b3bc8e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Teo=20Klestrup=20R=C3=B6ijezon?= Date: Fri, 10 Sep 2021 02:14:46 +0200 Subject: [PATCH 7/7] WIP: Subresources --- kube/src/client/scope.rs | 39 +++++++++++++++++++++++++++++++-------- kube/src/lib.rs | 2 +- 2 files changed, 32 insertions(+), 9 deletions(-) diff --git a/kube/src/client/scope.rs b/kube/src/client/scope.rs index c31474818..c335b9761 100644 --- a/kube/src/client/scope.rs +++ b/kube/src/client/scope.rs @@ -1,13 +1,16 @@ //! Scopes delimiting which objects an API call applies to +use std::ops::Deref; + use k8s_openapi::{ClusterResourceScope, NamespaceResourceScope}; -use kube_core::Resource; +use kube_core::{subresource::Scale, DynamicObject, Resource}; /// A scope for interacting with Kubernetes objects pub trait Scope { /// The namespace associated with the [`Scope`], if any fn namespace(&self) -> Option<&str>; } +pub trait TopLevelScope: Scope {} /// Access all objects in the cluster (that the user has permission to access) #[derive(Clone, Debug)] @@ -18,6 +21,11 @@ pub struct NamespaceScope { /// Namespace that access is limited to pub namespace: String, } +#[derive(Clone, Debug)] +pub struct SubresourceScope { + pub parent: Parent, + pub dyn_type: Kind::DynamicType, +} /// A [`Scope`] that is resolved at runtime /// /// NOTE: By using [`DynamicScope`] you opt out of Kube's ability to validate that the scope is valid for a given operation @@ -27,6 +35,7 @@ pub enum DynamicScope { Cluster(ClusterScope), /// Access all objects in one namespace (that the user has permission to access) Namespace(NamespaceScope), + Subresource(Box>), } impl Scope for ClusterScope { @@ -34,21 +43,30 @@ impl Scope for ClusterScope { None } } +impl TopLevelScope for ClusterScope {} impl Scope for NamespaceScope { fn namespace(&self) -> Option<&str> { Some(&self.namespace) } } +impl TopLevelScope for NamespaceScope {} +impl self::Scope for SubresourceScope { + fn namespace(&self) -> Option<&str> { + self.parent.namespace() + } +} impl Scope for DynamicScope { fn namespace(&self) -> Option<&str> { self.inner().namespace() } } +impl TopLevelScope for DynamicScope {} impl DynamicScope { fn inner(&self) -> &dyn Scope { match self { DynamicScope::Cluster(scope) => scope, DynamicScope::Namespace(scope) => scope, + DynamicScope::Subresource(scope) => scope.deref(), } } @@ -62,14 +80,19 @@ impl DynamicScope { } } +/// Scope where a [`Resource`]'s objects can be listed from +pub trait ResourceScope: Scope { + fn group(&self) -> &str; + fn version(&self) -> &str; + fn resource(&self) -> &str; +} +impl> ResourceScope for NamespaceScope {} +impl ResourceScope for ClusterScope {} +impl ResourceScope for SubresourceScope {} +impl ResourceScope for DynamicScope {} + /// Scope where a [`Resource`]'s objects can be read from or written to -pub trait NativeScope: Scope {} +pub trait NativeScope: ResourceScope {} impl> NativeScope for NamespaceScope {} impl> NativeScope for ClusterScope {} impl NativeScope for DynamicScope {} - -/// Scope where a [`Resource`]'s objects can be listed from -pub trait ListScope: Scope {} -impl> ListScope for NamespaceScope {} -impl ListScope for ClusterScope {} -impl ListScope for DynamicScope {} diff --git a/kube/src/lib.rs b/kube/src/lib.rs index 9429b3ece..c471dcebd 100644 --- a/kube/src/lib.rs +++ b/kube/src/lib.rs @@ -71,7 +71,7 @@ //! ``` #![cfg_attr(docsrs, feature(doc_cfg))] -#![deny(missing_docs)] +#![warn(missing_docs)] #![deny(unsafe_code)] macro_rules! cfg_client {