Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Detect unexpected response leaks for multi-address client instances #3096

Merged
merged 3 commits into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ public StreamingHttpClient buildStreaming() {
urlClient = redirectConfig == null ? urlClient :
new RedirectingHttpRequesterFilter(redirectConfig).create(urlClient);

// Detect leaks that can be caused by unexpected exceptions
urlClient = HttpMessageDiscardWatchdogClientFilter.CLIENT_CLEANER.create(urlClient);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this results in 2 cleaners. Maybe it's not a big deal, but just wanted to be sure I understood what the structure will be.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, because from a single address client we don't know if it's a standalone instance or part of the multi-address client group we can not remove it from there. So, for multi-address flow we will have 2 cleaners for onError case:

  1. Last one before returning from single-address level.
  2. Last one before returning from multi-address level to the users.

This should not be a big deal because the watchdog overhead mostly comes from the connection level filter that instantiates AtomicReference for each request and transforms response message body for each response. The overhead of CleanerStreamingHttpClientFilterFactory is negligible bcz onError handling is super cheap and happens only in case of exceptions.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sent a new commit to also replace onErrorResume with whenOnError. It will help to reduce overhead (avoids subscribe sequence for returned failed single).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was suggesting we could make the Single client aware if it gets instantiated from a multi client, but your explanation makes me think it's probably not a big deal. Let's be sure and check the perf stats after merge.


LOGGER.debug("Multi-address client created with base strategy {}", executionContext.executionStrategy());
return new FilterableClientToClient(urlClient, executionContext);
} catch (final Throwable t) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2023 Apple Inc. and the ServiceTalk project authors
* Copyright © 2023-2024 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -107,17 +107,16 @@ protected Single<StreamingHttpResponse> request(final StreamingHttpRequester del
final StreamingHttpRequest request) {
return delegate
.request(request)
.onErrorResume(cause -> {
.beforeOnError(cause -> {
final AtomicReference<?> maybePublisher = request.context().get(MESSAGE_PUBLISHER_KEY);
if (maybePublisher != null && maybePublisher.getAndSet(null) != null) {
// No-one subscribed to the message (or there is none), so if there is a message
// tell the user to clean it up.
LOGGER.warn("Discovered un-drained HTTP response message body which has " +
"been dropped by user code - this is a strong indication of a bug " +
"in a user-defined filter. Response payload (message) body must " +
"be fully consumed before discarding.");
"be fully consumed before discarding.", cause);
}
return Single.<StreamingHttpResponse>failed(cause).shareContextOnSubscribe();
});
}
};
Expand Down
Loading