Skip to content

Commit

Permalink
Reduce buffering between watcher and Store (#1494)
Browse files Browse the repository at this point in the history
* wip

Signed-off-by: Fabrizio Sestito <[email protected]>

* wip tests

Signed-off-by: Fabrizio Sestito <[email protected]>

* rename events

Signed-off-by: Fabrizio Sestito <[email protected]>

* avoid locking on the buffer

Signed-off-by: Fabrizio Sestito <[email protected]>

* fix EventFlatten RestartDelete match

Signed-off-by: Fabrizio Sestito <[email protected]>

* improve docs on event variants

Signed-off-by: Fabrizio Sestito <[email protected]>

* filter out Restart and RestartInit from watch_object

Signed-off-by: Fabrizio Sestito <[email protected]>

* add comment to clarify why we are not using .clear()

Signed-off-by: Fabrizio Sestito <[email protected]>

* cleanup store / apply suggestions

Signed-off-by: Fabrizio Sestito <[email protected]>

* broadcast when receiving a Restart event directly from the store

Signed-off-by: Fabrizio Sestito <[email protected]>

* do not dispatch RestartApply(obj) event

Signed-off-by: Fabrizio Sestito <[email protected]>

* fix runtime tests after paging

Signed-off-by: clux <[email protected]>
Signed-off-by: Fabrizio Sestito <[email protected]>

---------

Signed-off-by: Fabrizio Sestito <[email protected]>
Signed-off-by: clux <[email protected]>
Co-authored-by: clux <[email protected]>
  • Loading branch information
fabriziosestito and clux authored May 23, 2024
1 parent c30ce34 commit 92a012f
Show file tree
Hide file tree
Showing 9 changed files with 287 additions and 179 deletions.
2 changes: 1 addition & 1 deletion kube-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ rust.unsafe_code = "forbid"
#rust.missing_docs = "warn"

[dependencies]
futures.workspace = true
futures = { workspace = true, features = ["async-await"] }
kube-client = { path = "../kube-client", version = "=0.91.0", default-features = false, features = ["jsonpatch", "client"] }
derivative.workspace = true
serde.workspace = true
Expand Down
5 changes: 3 additions & 2 deletions kube-runtime/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1753,7 +1753,7 @@ mod tests {
|obj, _| {
Box::pin(async move {
// Try to flood the rescheduling buffer buffer by just putting it back in the queue immediately
println!("reconciling {:?}", obj.metadata.name);
//println!("reconciling {:?}", obj.metadata.name);
Ok(Action::requeue(Duration::ZERO))
})
},
Expand All @@ -1763,6 +1763,7 @@ mod tests {
queue_rx.map(Result::<_, Infallible>::Ok),
Config::default(),
));
store_tx.apply_watcher_event(&watcher::Event::Restart);
for i in 0..items {
let obj = ConfigMap {
metadata: ObjectMeta {
Expand All @@ -1772,7 +1773,7 @@ mod tests {
},
..Default::default()
};
store_tx.apply_watcher_event(&watcher::Event::Applied(obj.clone()));
store_tx.apply_watcher_event(&watcher::Event::Apply(obj.clone()));
queue_tx.unbounded_send(ObjectRef::from_obj(&obj)).unwrap();
}

Expand Down
104 changes: 75 additions & 29 deletions kube-runtime/src/reflector/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,11 @@ pub(crate) mod test {
let foo = testpod("foo");
let bar = testpod("bar");
let st = stream::iter([
Ok(Event::Applied(foo.clone())),
Ok(Event::Apply(foo.clone())),
Err(Error::TooManyObjects),
Ok(Event::Restarted(vec![foo, bar])),
Ok(Event::RestartInit),
Ok(Event::RestartPage(vec![foo, bar])),
Ok(Event::Restart),
]);

let (reader, writer) = reflector::store_shared(10);
Expand All @@ -178,7 +180,7 @@ pub(crate) mod test {
assert_eq!(reader.len(), 0);
assert!(matches!(
poll!(reflect.next()),
Poll::Ready(Some(Ok(Event::Applied(_))))
Poll::Ready(Some(Ok(Event::Apply(_))))
));

// Make progress and assert all events are seen
Expand All @@ -190,7 +192,15 @@ pub(crate) mod test {
assert_eq!(reader.len(), 1);

let restarted = poll!(reflect.next());
assert!(matches!(restarted, Poll::Ready(Some(Ok(Event::Restarted(_))))));
assert!(matches!(restarted, Poll::Ready(Some(Ok(Event::RestartInit)))));
assert_eq!(reader.len(), 1);

let restarted = poll!(reflect.next());
assert!(matches!(restarted, Poll::Ready(Some(Ok(Event::RestartPage(_))))));
assert_eq!(reader.len(), 1);

let restarted = poll!(reflect.next());
assert!(matches!(restarted, Poll::Ready(Some(Ok(Event::Restart)))));
assert_eq!(reader.len(), 2);

assert!(matches!(poll!(reflect.next()), Poll::Ready(None)));
Expand All @@ -206,10 +216,12 @@ pub(crate) mod test {
let foo = testpod("foo");
let bar = testpod("bar");
let st = stream::iter([
Ok(Event::Deleted(foo.clone())),
Ok(Event::Applied(foo.clone())),
Ok(Event::Delete(foo.clone())),
Ok(Event::Apply(foo.clone())),
Err(Error::TooManyObjects),
Ok(Event::Restarted(vec![foo.clone(), bar.clone()])),
Ok(Event::RestartInit),
Ok(Event::RestartPage(vec![foo.clone(), bar.clone()])),
Ok(Event::Restart),
]);

let foo = Arc::new(foo);
Expand All @@ -224,13 +236,13 @@ pub(crate) mod test {
// Deleted events should be skipped by subscriber.
assert!(matches!(
poll!(reflect.next()),
Poll::Ready(Some(Ok(Event::Deleted(_))))
Poll::Ready(Some(Ok(Event::Delete(_))))
));
assert_eq!(poll!(subscriber.next()), Poll::Pending);

assert!(matches!(
poll!(reflect.next()),
Poll::Ready(Some(Ok(Event::Applied(_))))
Poll::Ready(Some(Ok(Event::Apply(_))))
));
assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(foo.clone())));

Expand All @@ -242,12 +254,25 @@ pub(crate) mod test {
assert!(matches!(poll!(subscriber.next()), Poll::Pending));

// Restart event will yield all objects in the list

assert!(matches!(
poll!(reflect.next()),
Poll::Ready(Some(Ok(Event::Restarted(_))))
Poll::Ready(Some(Ok(Event::RestartInit)))
));
assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(foo.clone())));
assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(bar.clone())));

assert!(matches!(
poll!(reflect.next()),
Poll::Ready(Some(Ok(Event::RestartPage(_))))
));

assert!(matches!(
poll!(reflect.next()),
Poll::Ready(Some(Ok(Event::Restart)))
));

// these don't come back in order atm:
assert!(matches!(poll!(subscriber.next()), Poll::Ready(Some(_))));
assert!(matches!(poll!(subscriber.next()), Poll::Ready(Some(_))));

// When main channel is closed, it is propagated to subscribers
assert!(matches!(poll!(reflect.next()), Poll::Ready(None)));
Expand All @@ -261,12 +286,14 @@ pub(crate) mod test {
let foo = testpod("foo");
let bar = testpod("bar");
let st = stream::iter([
Ok(Event::Applied(foo.clone())),
Ok(Event::Restarted(vec![foo.clone(), bar.clone()])),
Ok(Event::Apply(foo.clone())),
Ok(Event::RestartInit),
Ok(Event::RestartPage(vec![foo.clone(), bar.clone()])),
Ok(Event::Restart),
]);

let foo = Arc::new(foo);
let bar = Arc::new(bar);
let _bar = Arc::new(bar);

let (_, writer) = reflector::store_shared(10);
let subscriber = writer.subscribe().unwrap();
Expand All @@ -275,7 +302,7 @@ pub(crate) mod test {

assert!(matches!(
poll!(reflect.next()),
Poll::Ready(Some(Ok(Event::Applied(_))))
Poll::Ready(Some(Ok(Event::Apply(_))))
));
assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(foo.clone())));

Expand All @@ -284,14 +311,28 @@ pub(crate) mod test {
//
// First, subscribers should be pending.
assert_eq!(poll!(subscriber.next()), Poll::Pending);

assert!(matches!(
poll!(reflect.next()),
Poll::Ready(Some(Ok(Event::Restarted(_))))
Poll::Ready(Some(Ok(Event::RestartInit)))
));
assert_eq!(poll!(subscriber.next()), Poll::Pending);

assert!(matches!(
poll!(reflect.next()),
Poll::Ready(Some(Ok(Event::RestartPage(_))))
));
assert_eq!(poll!(subscriber.next()), Poll::Pending);

assert!(matches!(
poll!(reflect.next()),
Poll::Ready(Some(Ok(Event::Restart)))
));
drop(reflect);

assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(foo.clone())));
assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(bar.clone())));
// we will get foo and bar here, but we dont have a guaranteed ordering on page events
assert!(matches!(poll!(subscriber.next()), Poll::Ready(Some(_))));
assert!(matches!(poll!(subscriber.next()), Poll::Ready(Some(_))));
assert_eq!(poll!(subscriber.next()), Poll::Ready(None));
}

Expand All @@ -305,8 +346,9 @@ pub(crate) mod test {
let foo = testpod("foo");
let bar = testpod("bar");
let st = stream::iter([
Ok(Event::Applied(foo.clone())),
Ok(Event::Restarted(vec![foo.clone(), bar.clone()])),
Ok(Event::Apply(foo.clone())),
Ok(Event::Apply(bar.clone())),
Ok(Event::Apply(foo.clone())),
]);

let foo = Arc::new(foo);
Expand All @@ -325,13 +367,14 @@ pub(crate) mod test {

// Poll first subscriber, but not the second.
//
// The buffer can hold one value, so even if we have a slow subscriber,
// The buffer can hold one object value, so even if we have a slow subscriber,
// we will still get an event from the root.
assert!(matches!(
poll!(reflect.next()),
Poll::Ready(Some(Ok(Event::Applied(_))))
Poll::Ready(Some(Ok(Event::Apply(_))))
));
assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(foo.clone())));

// One subscriber is not reading, so we need to apply backpressure until
// channel has capacity.
//
Expand All @@ -348,18 +391,21 @@ pub(crate) mod test {

// We now have room for only one more item. In total, the previous event
// had two. We repeat the same pattern.
assert!(matches!(
poll!(reflect.next()),
Poll::Ready(Some(Ok(Event::Apply(_))))
));
assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(bar.clone())));
assert!(matches!(poll!(reflect.next()), Poll::Pending));
assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(foo.clone())));
assert!(matches!(poll!(reflect.next()), Poll::Pending));
assert_eq!(poll!(subscriber_slow.next()), Poll::Ready(Some(foo.clone())));
assert_eq!(poll!(subscriber_slow.next()), Poll::Ready(Some(bar.clone())));
assert!(matches!(
poll!(reflect.next()),
Poll::Ready(Some(Ok(Event::Restarted(_))))
Poll::Ready(Some(Ok(Event::Apply(_))))
));
// Poll again to drain the queue.
assert!(matches!(poll!(reflect.next()), Poll::Ready(None)));
assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(bar.clone())));
assert_eq!(poll!(subscriber_slow.next()), Poll::Ready(Some(bar.clone())));
assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(foo.clone())));
assert_eq!(poll!(subscriber_slow.next()), Poll::Ready(Some(foo.clone())));

assert_eq!(poll!(subscriber.next()), Poll::Ready(None));
assert_eq!(poll!(subscriber_slow.next()), Poll::Ready(None));
Expand Down
29 changes: 14 additions & 15 deletions kube-runtime/src/reflector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,13 +157,10 @@ mod tests {
},
..ConfigMap::default()
};
reflector(
store_w,
stream::iter(vec![Ok(watcher::Event::Applied(cm.clone()))]),
)
.map(|_| ())
.collect::<()>()
.await;
reflector(store_w, stream::iter(vec![Ok(watcher::Event::Apply(cm.clone()))]))
.map(|_| ())
.collect::<()>()
.await;
assert_eq!(store.get(&ObjectRef::from_obj(&cm)).as_deref(), Some(&cm));
}

Expand All @@ -189,8 +186,8 @@ mod tests {
reflector(
store_w,
stream::iter(vec![
Ok(watcher::Event::Applied(cm.clone())),
Ok(watcher::Event::Applied(updated_cm.clone())),
Ok(watcher::Event::Apply(cm.clone())),
Ok(watcher::Event::Apply(updated_cm.clone())),
]),
)
.map(|_| ())
Expand All @@ -213,8 +210,8 @@ mod tests {
reflector(
store_w,
stream::iter(vec![
Ok(watcher::Event::Applied(cm.clone())),
Ok(watcher::Event::Deleted(cm.clone())),
Ok(watcher::Event::Apply(cm.clone())),
Ok(watcher::Event::Delete(cm.clone())),
]),
)
.map(|_| ())
Expand Down Expand Up @@ -244,8 +241,10 @@ mod tests {
reflector(
store_w,
stream::iter(vec![
Ok(watcher::Event::Applied(cm_a.clone())),
Ok(watcher::Event::Restarted(vec![cm_b.clone()])),
Ok(watcher::Event::Apply(cm_a.clone())),
Ok(watcher::Event::RestartInit),
Ok(watcher::Event::RestartPage(vec![cm_b.clone()])),
Ok(watcher::Event::Restart),
]),
)
.map(|_| ())
Expand Down Expand Up @@ -276,9 +275,9 @@ mod tests {
..ConfigMap::default()
};
Ok(if deleted {
watcher::Event::Deleted(obj)
watcher::Event::Delete(obj)
} else {
watcher::Event::Applied(obj)
watcher::Event::Apply(obj)
})
})),
)
Expand Down
Loading

0 comments on commit 92a012f

Please sign in to comment.