diff --git a/src/echo.rs b/src/echo.rs index b0b585b..c74458f 100644 --- a/src/echo.rs +++ b/src/echo.rs @@ -1,8 +1,9 @@ use k8s_openapi::api::apps::v1::{Deployment, DeploymentSpec}; use k8s_openapi::api::core::v1::{Container, ContainerPort, PodSpec, PodTemplateSpec}; use k8s_openapi::apimachinery::pkg::apis::meta::v1::LabelSelector; -use kube::api::{DeleteParams, ObjectMeta, PostParams}; +use kube::api::{DeleteParams, ObjectMeta, Patch, PatchParams, PostParams}; use kube::{Api, Client, Error}; +use serde_json::json; use std::collections::BTreeMap; /// Creates a new deployment of `n` pods with the `inanimate/echo-server:latest` docker image inside, @@ -68,6 +69,31 @@ pub async fn deploy( .await } +pub async fn update( + client: Client, + name: &str, + replicas: i32, + namespace: &str, +) -> Result { + // Get the existing deployment + let deployment_api: Api = Api::namespaced(client, namespace); + let patch = json!({ + "apiVersion": "apps/v1", + "kind": "Deployment", + "metadata": { + "name": name, + }, + "spec": { + "replicas": replicas, + } + }); + let params = PatchParams::apply("echo-operator").force(); + let patch = Patch::Apply(&patch); + let deployment = deployment_api.patch(name, ¶ms, &patch).await?; + + Ok(deployment) +} + /// Deletes an existing deployment. /// /// # Arguments: diff --git a/src/main.rs b/src/main.rs index 06de0e6..73f88fb 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use futures::stream::StreamExt; +use k8s_openapi::api::apps::v1::Deployment; use kube::runtime::watcher::Config; use kube::Resource; use kube::ResourceExt; @@ -69,6 +70,8 @@ enum EchoAction { Create, /// Delete all subresources created in the `Create` phase Delete, + /// Update existing subresources to match the desired state + Update, /// This `Echo` resource is in desired state and requires no actions to be taken NoOp, } @@ -94,7 +97,7 @@ async fn reconcile(echo: Arc, context: Arc) -> Result { // Creates a deployment with `n` Echo service pods, but applies a finalizer first. // Finalizer is applied first, as the operator might be shut down and restarted @@ -108,6 +111,7 @@ async fn reconcile(echo: Arc, context: Arc) -> Result { // Deletes any subresources related to this `Echo` resources. If and only if all subresources // are deleted, the finalizer is removed and Kubernetes is free to remove the `Echo` resource. @@ -123,9 +127,16 @@ async fn reconcile(echo: Arc, context: Arc) -> Result { + // Update the deployment to match the desired state + echo::update(client, &name, echo.spec.replicas, &namespace).await?; + Ok(Action::requeue(Duration::from_secs(10))) + } + // The resource is already in desired state, do nothing and re-check after 10 seconds EchoAction::NoOp => Ok(Action::requeue(Duration::from_secs(10))), - }; + } } /// Resources arrives into reconciliation queue in a certain state. This function looks at @@ -134,19 +145,33 @@ async fn reconcile(echo: Arc, context: Arc) -> Result EchoAction { - return if echo.meta().deletion_timestamp.is_some() { - EchoAction::Delete +async fn determine_action(client: Client, echo: &Echo) -> Result { + if echo.meta().deletion_timestamp.is_some() { + Ok(EchoAction::Delete) } else if echo .meta() .finalizers .as_ref() .map_or(true, |finalizers| finalizers.is_empty()) { - EchoAction::Create + Ok(EchoAction::Create) } else { - EchoAction::NoOp - }; + let deployment_api: Api = Api::namespaced( + client, + echo.meta() + .namespace + .as_ref() + .expect("expected namespace to be set"), + ); + + let deployment = deployment_api.get(&echo.name_any()).await?; + + if deployment.spec.expect("expected spec to be set").replicas != Some(echo.spec.replicas) { + Ok(EchoAction::Update) + } else { + Ok(EchoAction::NoOp) + } + } } /// Actions to be taken when a reconciliation fails - for whatever reason.