Skip to content

Commit

Permalink
Fix watcher not fully paginating on Init (#1525)
Browse files Browse the repository at this point in the history
* Fix watcher not fully paginating on Init

cannot believe i missed this :(
fixes #1524

Signed-off-by: clux <[email protected]>

* upgrade mock test (these fail on master)

Signed-off-by: clux <[email protected]>

---------

Signed-off-by: clux <[email protected]>
  • Loading branch information
clux authored Jun 19, 2024
1 parent f553c4e commit 5320d8f
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 5 deletions.
9 changes: 6 additions & 3 deletions kube-runtime/src/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -499,9 +499,12 @@ where
last_bookmark,
});
}
if let Some(resource_version) = last_bookmark {
// we have drained the last page - move on to next stage
return (Some(Ok(Event::InitDone)), State::InitListed { resource_version });
// check if we need to perform more pages
if continue_token.is_none() {
if let Some(resource_version) = last_bookmark {
// we have drained the last page - move on to next stage
return (Some(Ok(Event::InitDone)), State::InitListed { resource_version });
}
}
let mut lp = wc.to_list_params();
lp.continue_token = continue_token;
Expand Down
26 changes: 24 additions & 2 deletions kube/src/mock_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ impl Hack {
#[tokio::test]
async fn watchers_respect_pagination_limits() {
let (client, fakeserver) = testcontext();
// NB: scenario only responds responds to TWO paginated list calls with two objects
// NB: page scenario which responds to 3 paginated list calls with 3 object (one per page).
// This ensures the watcher internal paging mechanism is not bypassed
// and that each page is actually drained before starting the long watch.
let mocksrv = fakeserver.run(Scenario::PaginatedList);

let api: Api<Hack> = Api::all(client);
Expand All @@ -39,6 +41,8 @@ async fn watchers_respect_pagination_limits() {
assert_eq!(first.spec.num, 1);
let second: Hack = stream.try_next().await.unwrap().unwrap();
assert_eq!(second.spec.num, 2);
let third: Hack = stream.try_next().await.unwrap().unwrap();
assert_eq!(third.spec.num, 3);
assert!(poll!(stream.next()).is_pending());
timeout_after_1s(mocksrv).await;
}
Expand Down Expand Up @@ -117,14 +121,32 @@ impl ApiServerVerifier {
"kind": "HackList",
"apiVersion": "kube.rs/v1",
"metadata": {
"continue": "",
"continue": "second",
"resourceVersion": "2"
},
"items": [Hack::test(2)]
});
let response = serde_json::to_vec(&respdata).unwrap(); // respond as the apiserver would have
send.send_response(Response::builder().body(Body::from(response)).unwrap());
}
{
// we expect a final list GET because we included a continue token
let (request, send) = self.0.next_request().await.expect("service not called 3");
assert_eq!(request.method(), http::Method::GET);
let req_uri = request.uri().to_string();
assert!(req_uri.contains("&continue=second"));
let respdata = json!({
"kind": "HackList",
"apiVersion": "kube.rs/v1",
"metadata": {
"continue": "",
"resourceVersion": "2"
},
"items": [Hack::test(3)]
});
let response = serde_json::to_vec(&respdata).unwrap(); // respond as the apiserver would have
send.send_response(Response::builder().body(Body::from(response)).unwrap());
}
Ok(self)
}
}
Expand Down

0 comments on commit 5320d8f

Please sign in to comment.