Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

experiment with a Store<K> parameter for Reflectors - #102 #220

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 82 additions & 71 deletions kube/src/runtime/reflector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@

use std::{collections::BTreeMap, sync::Arc, time::Duration};

/// A Reflector with a default MapCache
pub type Reflector<K> = GenericReflector<K, MapCache<K>>;

/// A reflection of state for a Kubernetes ['Api'] resource
///
/// This builds on top of the ['Informer'] by tracking the events received,
Expand All @@ -20,30 +23,34 @@
/// It is prone to the same desync problems as an informer, but it will self-heal,
/// as best as possible - though this means that you might occasionally see a full
/// reset (boot equivalent) when network issues are encountered.
/// During a reset, the state is cleared before it is rebuilt.
/// During a reset, the state is cleared and rebuilt in an atomic operation.
///
/// The internal state is exposed readably through a getter.
#[derive(Clone)]
pub struct Reflector<K>
pub struct GenericReflector<K, S>
where
K: Clone + DeserializeOwned + Meta,
S: Store<K> + Default,
{
state: Arc<Mutex<State<K>>>,
store: Arc<Mutex<S>>,
version: Arc<Mutex<String>>,
params: ListParams,
api: Api<K>,
}

impl<K> Reflector<K>
impl<K, S> GenericReflector<K, S>
where
K: Clone + DeserializeOwned + Meta,
S: Store<K> + Default,
{
/// Create a reflector on an api resource
pub fn new(api: Api<K>) -> Self {
Reflector {
api,
params: ListParams::default(),
state: Default::default(),
version: Arc::new(Mutex::new(0.to_string())),
store: Default::default(),
}

Check failure on line 53 in kube/src/runtime/reflector.rs

View workflow job for this annotation

GitHub Actions / clippy

mismatched types

error[E0308]: mismatched types --> kube/src/runtime/reflector.rs:48:9 | 41 | impl<K, S> GenericReflector<K, S> | - expected this type parameter ... 47 | pub fn new(api: Api<K>) -> Self { | ---- expected `runtime::reflector::GenericReflector<K, S>` because of return type 48 | / Reflector { 49 | | api, 50 | | params: ListParams::default(), 51 | | version: Arc::new(Mutex::new(0.to_string())), 52 | | store: Default::default(), 53 | | } | |_________^ expected `GenericReflector<K, S>`, found `GenericReflector<K, ...>` | = note: expected struct `runtime::reflector::GenericReflector<_, S>` found struct `runtime::reflector::GenericReflector<_, std::collections::BTreeMap<runtime::reflector::ObjectId, K>>`
}

/// Modify the default watch parameters for the underlying watch
Expand All @@ -69,11 +76,11 @@
// Then pin then futures to the stack, and wait for any of them
pin_mut!(ctrlc_fut, sigterm_fut, poll_fut);
select! {
ctrlc = ctrlc_fut => {

Check warning on line 79 in kube/src/runtime/reflector.rs

View workflow job for this annotation

GitHub Actions / clippy

unused variable: `ctrlc`

warning: unused variable: `ctrlc` --> kube/src/runtime/reflector.rs:79:17 | 79 | ctrlc = ctrlc_fut => { | ^^^^^ help: if this is intentional, prefix it with an underscore: `_ctrlc` | = note: `#[warn(unused_variables)]` on by default
warn!("Intercepted ctrl_c signal");
return Ok(());
},
sigterm = sigterm_fut => {

Check warning on line 83 in kube/src/runtime/reflector.rs

View workflow job for this annotation

GitHub Actions / clippy

unused variable: `sigterm`

warning: unused variable: `sigterm` --> kube/src/runtime/reflector.rs:83:17 | 83 | sigterm = sigterm_fut => { | ^^^^^^^ help: if this is intentional, prefix it with an underscore: `_sigterm`
warn!("Intercepted SIGTERM");
return Ok(());
}
Expand All @@ -94,14 +101,13 @@
/// A single poll call to modify the internal state
async fn poll(&self) -> Result<()> {
let kind = &self.api.resource.kind;
let resource_version = self.state.lock().await.version.clone();
let resource_version = self.version.lock().await.clone();
trace!("Polling {} from resourceVersion={}", kind, resource_version);
let stream = self.api.watch(&self.params, &resource_version).await?;
pin_mut!(stream);

// For every event, modify our state
while let Some(ev) = stream.try_next().await? {
let mut state = self.state.lock().await;
// Informer-like version tracking:
match &ev {
WatchEvent::Added(o)
Expand All @@ -111,85 +117,64 @@
// always store the last seen resourceVersion
if let Some(nv) = Meta::resource_ver(o) {
trace!("Updating reflector version for {} to {}", kind, nv);
state.version = nv.clone();
*self.version.lock().await = nv.clone();
}
}
_ => {}
WatchEvent::Error(e) => {
warn!("Failed to watch {}: {:?}", kind, e);
return Err(Error::Api(e.to_owned()));
}
}

let data = &mut state.data;
// Core Reflector logic
let mut store = self.store.lock().await;
match ev {
WatchEvent::Added(o) => {
debug!("Adding {} to {}", Meta::name(&o), kind);
data.entry(ObjectId::key_for(&o)).or_insert_with(|| o.clone());
}
WatchEvent::Modified(o) => {
debug!("Modifying {} in {}", Meta::name(&o), kind);
data.entry(ObjectId::key_for(&o)).and_modify(|e| *e = o.clone());
}
WatchEvent::Deleted(o) => {
debug!("Removing {} from {}", Meta::name(&o), kind);
data.remove(&ObjectId::key_for(&o));
}
WatchEvent::Bookmark(o) => {
debug!("Bookmarking {} from {}", Meta::name(&o), kind);
}
WatchEvent::Error(e) => {
warn!("Failed to watch {}: {:?}", kind, e);
return Err(Error::Api(e));
}
WatchEvent::Added(o) => store.add(o),
WatchEvent::Modified(o) => store.modify(o),
WatchEvent::Deleted(o) => store.delete(o),
_ => {}
}
}
Ok(())
}

/// Reset the state of the underlying informer and clear the cache
pub async fn reset(&self) -> Result<()> {
/// Reset the resource version and clear cache
async fn reset(&self) -> Result<()> {
trace!("Resetting {}", self.api.resource.kind);
// Simplified for k8s >= 1.16
//*self.state.lock().await = Default::default();
//self.informer.reset().await

// For now:
let (data, version) = self.get_full_resource_entries().await?;
*self.state.lock().await = State { data, version };
*self.version.lock().await = version;
let mut store = self.store.lock().await;
store.clear();
for d in data {
store.add(d);
}
Ok(())
}

/// Legacy helper for kubernetes < 1.16
///
/// Needed to do an initial list operation because of https://github.com/clux/kube-rs/issues/219
/// Soon, this goes away as we drop support for k8s < 1.16
async fn get_full_resource_entries(&self) -> Result<(Cache<K>, String)> {
async fn get_full_resource_entries(&self) -> Result<(Vec<K>, String)> {
let res = self.api.list(&self.params).await?;
debug!("Initializing {}", K::KIND);
let version = res.metadata.resource_version.unwrap_or_default();
trace!(
"Got {} {} at resourceVersion={:?}",
debug!(
"Initialized {} with {} objects at {}",
K::KIND,
res.items.len(),
self.api.resource.kind,
version
);
let mut data = BTreeMap::new();
for i in res.items {
// The non-generic parts we care about are spec + status
data.insert(ObjectId::key_for(&i), i);
}
let keys = data
.keys()
.map(ObjectId::to_string)
.collect::<Vec<_>>()
.join(", ");
debug!("Initialized with: [{}]", keys);
Ok((data, version))
Ok((res.items, version))
}

/// Read data for users of the reflector
///
/// This is instant if you are reading and writing from the same context.
pub async fn state(&self) -> Result<Vec<K>> {
let state = self.state.lock().await;
Ok(state.data.values().cloned().collect::<Vec<K>>())
let store = self.store.lock().await;
Ok(store.values())
}

/// Read a single entry by name
Expand All @@ -203,7 +188,7 @@
namespace: self.api.resource.namespace.clone(),
};

Ok(self.state.lock().await.data.get(&id).map(Clone::clone))
Ok(self.store.lock().await.get(&id).map(Clone::clone))
}

/// Read a single entry by name within a specific namespace
Expand All @@ -215,15 +200,15 @@
name: name.into(),
namespace: Some(ns.into()),
};
Ok(self.state.lock().await.data.get(&id).map(Clone::clone))
Ok(self.store.lock().await.get(&id).map(Clone::clone))
}
}

/// ObjectId represents an object by name and namespace (if any)
///
/// This is an internal subset of ['k8s_openapi::api::core::v1::ObjectReference']
#[derive(Ord, PartialOrd, Hash, Eq, PartialEq, Clone)]
struct ObjectId {
pub struct ObjectId {
name: String,
namespace: Option<String>,
}
Expand All @@ -246,21 +231,47 @@
}
}

/// Internal shared state of Reflector
///
/// Can remove this in k8s >= 1.16 once this uses Informer
struct State<K> {
data: Cache<K>,
version: String,
/// A store that can be plugged into a Reflector
pub trait Store<K> {
fn clear(&mut self);
fn get(&self, id: &ObjectId) -> Option<&K>;
fn add(&mut self, k: K);
fn values(&self) -> Vec<K>;
fn modify(&mut self, k: K);
fn delete(&mut self, k: K);
}

impl<K> Default for State<K> {
fn default() -> Self {
State {
data: Default::default(),
version: 0.to_string(),
}
/// Default Store for a Reflector
pub type MapCache<K> = BTreeMap<ObjectId, K>;

impl<K> Store<K> for MapCache<K>
where
K: Clone + DeserializeOwned + Meta,
{
fn clear(&mut self) {
self.clear()
}

fn get(&self, id: &ObjectId) -> Option<&K> {
self.get(id)
}

fn values(&self) -> Vec<K> {
self.values().cloned().collect()
}

fn add(&mut self, k: K) {
debug!("Adding {} to {}", Meta::name(&k), K::KIND);
self.entry(ObjectId::key_for(&k)).or_insert_with(|| k);
}

fn modify(&mut self, k: K) {
debug!("Modifying {} in {}", Meta::name(&k), K::KIND);
self.entry(ObjectId::key_for(&k)).and_modify(|e| *e = k);
}

fn delete(&mut self, k: K) {
debug!("Removing {} from {}", Meta::name(&k), K::KIND);
self.remove(&ObjectId::key_for(&k));
}
}
/// Internal representation for Reflector
type Cache<K> = BTreeMap<ObjectId, K>;
Loading