Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Add monitor to watch for stale connections and evict them #373

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,34 +20,22 @@
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
import org.apache.http.client.methods.HttpPatch;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.client.methods.HttpRequestBase;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.client.methods.*;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.DefaultHttpRequestRetryHandler;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.util.EntityUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.rest.RestStatus;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.*;
import java.util.concurrent.TimeUnit;


/**
Expand All @@ -62,6 +50,27 @@ public class DestinationHttpClient {
private static final int TIMEOUT_MILLISECONDS = (int) TimeValue.timeValueSeconds(5).millis();
private static final int SOCKET_TIMEOUT_MILLISECONDS = (int)TimeValue.timeValueSeconds(50).millis();

/**
* Watches for stale connections and evicts them.
*/
private static class IdleConnectionMonitor extends TimerTask {

private static final Logger logger = LogManager.getLogger(IdleConnectionMonitor.class);

private final PoolingHttpClientConnectionManager cm;

IdleConnectionMonitor(PoolingHttpClientConnectionManager cm) {
this.cm = cm;
}

@Override
public void run() {
logger.debug("Trying to close expired and idle connections.");
cm.closeExpiredConnections();
cm.closeIdleConnections(60, TimeUnit.SECONDS);
}
}

/**
* all valid response status
*/
Expand All @@ -70,8 +79,9 @@ public class DestinationHttpClient {
RestStatus.NON_AUTHORITATIVE_INFORMATION.getStatus(), RestStatus.NO_CONTENT.getStatus(),
RestStatus.RESET_CONTENT.getStatus(), RestStatus.PARTIAL_CONTENT.getStatus(),
RestStatus.MULTI_STATUS.getStatus())));

private static CloseableHttpClient HTTP_CLIENT = createHttpClient();
private static IdleConnectionMonitor monitor;

private static CloseableHttpClient createHttpClient() {
RequestConfig config = RequestConfig.custom()
Expand All @@ -84,12 +94,18 @@ private static CloseableHttpClient createHttpClient() {
connectionManager.setMaxTotal(MAX_CONNECTIONS);
connectionManager.setDefaultMaxPerRoute(MAX_CONNECTIONS_PER_ROUTE);

return HttpClientBuilder.create()
CloseableHttpClient client = HttpClientBuilder.create()
.setDefaultRequestConfig(config)
.setConnectionManager(connectionManager)
.setRetryHandler(new DefaultHttpRequestRetryHandler())
.useSystemProperties()
.build();

// Start up the eviction task
Timer timer = new Timer(true);
monitor = new IdleConnectionMonitor(connectionManager);
timer.schedule(monitor, 60000L, 60000L);

return client;
}

public String execute(BaseMessage message) throws Exception {
Expand Down