Skip to content

Commit

Permalink
run fmt and keep tests passing
Browse files Browse the repository at this point in the history
  • Loading branch information
casualjim committed Jul 17, 2023
1 parent d3f8b3d commit b3fa2d9
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 65 deletions.
2 changes: 1 addition & 1 deletion kube-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ features = ["client", "rustls-tls", "openssl-tls", "ws", "oauth", "oidc", "jsonp
rustdoc-args = ["--cfg", "docsrs"]

[dependencies]
base64 = { version = "0.21.2", optional = true }
base64 = { version = "0.20.0", optional = true }
chrono = { version = "0.4.23", optional = true, default-features = false }
dirs = { package = "dirs-next", optional = true, version = "2.0.0" }
serde = { version = "1.0.130", features = ["derive"] }
Expand Down
103 changes: 39 additions & 64 deletions kube-runtime/src/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -458,18 +458,14 @@ where
Ok(list) => {
objects.extend(list.items);
if let Some(continue_token) = list.metadata.continue_.filter(|s| !s.is_empty()) {
(
None,
State::Empty {
continue_token: Some(continue_token),
objects,
},
)
(None, State::Empty {
continue_token: Some(continue_token),
objects,
})
} else if let Some(resource_version) = list.metadata.resource_version {
(
Some(Ok(Event::Restarted(objects))),
State::InitListed { resource_version },
)
(Some(Ok(Event::Restarted(objects))), State::InitListed {
resource_version,
})
} else {
(Some(Err(Error::NoResourceVersion)), State::default())
}
Expand Down Expand Up @@ -512,21 +508,15 @@ where
Some(Ok(WatchEvent::Bookmark(bm))) => {
let marks_initial_end = bm.metadata.annotations.contains_key("k8s.io/initial-events-end");
if marks_initial_end {
(
Some(Ok(Event::Restarted(objects))),
State::Watching {
resource_version: bm.metadata.resource_version,
stream,
},
)
(Some(Ok(Event::Restarted(objects))), State::Watching {
resource_version: bm.metadata.resource_version,
stream,
})
} else {
(
None,
State::Watching {
resource_version: bm.metadata.resource_version,
stream,
},
)
(None, State::Watching {
resource_version: bm.metadata.resource_version,
stream,
})
}
}
Some(Ok(WatchEvent::Error(err))) => {
Expand All @@ -549,23 +539,20 @@ where
} else {
debug!("watcher error: {err:?}");
}
(
Some(Err(err).map_err(Error::WatchFailed)),
State::IntialWatch { objects, stream },
)
(Some(Err(err).map_err(Error::WatchFailed)), State::IntialWatch {
objects,
stream,
})
}
None => (None, State::default()),
}
}
State::InitListed { resource_version } => {
match api.watch(&wc.to_watch_params(), &resource_version).await {
Ok(stream) => (
None,
State::Watching {
resource_version,
stream,
},
),
Ok(stream) => (None, State::Watching {
resource_version,
stream,
}),
Err(err) => {
if std::matches!(err, ClientErr::Api(ErrorResponse { code: 403, .. })) {
warn!("watch initlist error with 403: {err:?}");
Expand All @@ -585,31 +572,22 @@ where
} => match stream.next().await {
Some(Ok(WatchEvent::Added(obj) | WatchEvent::Modified(obj))) => {
let resource_version = obj.resource_version().unwrap();
(
Some(Ok(Event::Applied(obj))),
State::Watching {
resource_version,
stream,
},
)
(Some(Ok(Event::Applied(obj))), State::Watching {
resource_version,
stream,
})
}
Some(Ok(WatchEvent::Deleted(obj))) => {
let resource_version = obj.resource_version().unwrap();
(
Some(Ok(Event::Deleted(obj))),
State::Watching {
resource_version,
stream,
},
)
}
Some(Ok(WatchEvent::Bookmark(bm))) => (
None,
State::Watching {
resource_version: bm.metadata.resource_version,
(Some(Ok(Event::Deleted(obj))), State::Watching {
resource_version,
stream,
},
),
})
}
Some(Ok(WatchEvent::Bookmark(bm))) => (None, State::Watching {
resource_version: bm.metadata.resource_version,
stream,
}),
Some(Ok(WatchEvent::Error(err))) => {
// HTTP GONE, means we have desynced and need to start over and re-list :(
let new_state = if err.code == 410 {
Expand All @@ -633,13 +611,10 @@ where
} else {
debug!("watcher error: {err:?}");
}
(
Some(Err(err).map_err(Error::WatchFailed)),
State::Watching {
resource_version,
stream,
},
)
(Some(Err(err).map_err(Error::WatchFailed)), State::Watching {
resource_version,
stream,
})
}
None => (None, State::InitListed { resource_version }),
},
Expand Down

0 comments on commit b3fa2d9

Please sign in to comment.