From 5320d8f9f235289baba791796a0e7aaa23af1b23 Mon Sep 17 00:00:00 2001 From: Eirik A Date: Wed, 19 Jun 2024 22:47:06 +0100 Subject: [PATCH] Fix watcher not fully paginating on Init (#1525) * Fix watcher not fully paginating on Init cannot believe i missed this :( fixes #1524 Signed-off-by: clux * upgrade mock test (these fail on master) Signed-off-by: clux --------- Signed-off-by: clux --- kube-runtime/src/watcher.rs | 9 ++++++--- kube/src/mock_tests.rs | 26 ++++++++++++++++++++++++-- 2 files changed, 30 insertions(+), 5 deletions(-) diff --git a/kube-runtime/src/watcher.rs b/kube-runtime/src/watcher.rs index b9833f1db..22c26be57 100644 --- a/kube-runtime/src/watcher.rs +++ b/kube-runtime/src/watcher.rs @@ -499,9 +499,12 @@ where last_bookmark, }); } - if let Some(resource_version) = last_bookmark { - // we have drained the last page - move on to next stage - return (Some(Ok(Event::InitDone)), State::InitListed { resource_version }); + // check if we need to perform more pages + if continue_token.is_none() { + if let Some(resource_version) = last_bookmark { + // we have drained the last page - move on to next stage + return (Some(Ok(Event::InitDone)), State::InitListed { resource_version }); + } } let mut lp = wc.to_list_params(); lp.continue_token = continue_token; diff --git a/kube/src/mock_tests.rs b/kube/src/mock_tests.rs index e58b05e22..c651a885a 100644 --- a/kube/src/mock_tests.rs +++ b/kube/src/mock_tests.rs @@ -29,7 +29,9 @@ impl Hack { #[tokio::test] async fn watchers_respect_pagination_limits() { let (client, fakeserver) = testcontext(); - // NB: scenario only responds responds to TWO paginated list calls with two objects + // NB: page scenario which responds to 3 paginated list calls with 3 object (one per page). + // This ensures the watcher internal paging mechanism is not bypassed + // and that each page is actually drained before starting the long watch. let mocksrv = fakeserver.run(Scenario::PaginatedList); let api: Api = Api::all(client); @@ -39,6 +41,8 @@ async fn watchers_respect_pagination_limits() { assert_eq!(first.spec.num, 1); let second: Hack = stream.try_next().await.unwrap().unwrap(); assert_eq!(second.spec.num, 2); + let third: Hack = stream.try_next().await.unwrap().unwrap(); + assert_eq!(third.spec.num, 3); assert!(poll!(stream.next()).is_pending()); timeout_after_1s(mocksrv).await; } @@ -117,7 +121,7 @@ impl ApiServerVerifier { "kind": "HackList", "apiVersion": "kube.rs/v1", "metadata": { - "continue": "", + "continue": "second", "resourceVersion": "2" }, "items": [Hack::test(2)] @@ -125,6 +129,24 @@ impl ApiServerVerifier { let response = serde_json::to_vec(&respdata).unwrap(); // respond as the apiserver would have send.send_response(Response::builder().body(Body::from(response)).unwrap()); } + { + // we expect a final list GET because we included a continue token + let (request, send) = self.0.next_request().await.expect("service not called 3"); + assert_eq!(request.method(), http::Method::GET); + let req_uri = request.uri().to_string(); + assert!(req_uri.contains("&continue=second")); + let respdata = json!({ + "kind": "HackList", + "apiVersion": "kube.rs/v1", + "metadata": { + "continue": "", + "resourceVersion": "2" + }, + "items": [Hack::test(3)] + }); + let response = serde_json::to_vec(&respdata).unwrap(); // respond as the apiserver would have + send.send_response(Response::builder().body(Body::from(response)).unwrap()); + } Ok(self) } }