Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Throttle outgoing requests by both peer and protocol id #8969

Merged
merged 11 commits into from
Jan 9, 2025
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@
- Remove delay when fetching blobs from the local EL on block arrival

### Bug Fixes
- Fix `--version` command output [#8960](https://github.com/Consensys/teku/issues/8960)
- Fix `--version` command output [#8960](https://github.com/Consensys/teku/issues/8960)
- Fix issue (introduced in `24.12.1`) with peer stability when the upperbound is set to a high number
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,5 @@ public interface Eth2RpcMethod<TRequest extends RpcRequest & SszData, TResponse

@Override
Eth2OutgoingRequestHandler<TRequest, TResponse> createOutgoingRequestHandler(
String protocolId,
final TRequest request,
Eth2RpcResponseHandler<TResponse, ?> responseHandler);
String protocolId, TRequest request, Eth2RpcResponseHandler<TResponse, ?> responseHandler);
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import static tech.pegasys.teku.networking.p2p.libp2p.LibP2PNetwork.REMOTE_OPEN_STREAMS_RATE_LIMIT;
import static tech.pegasys.teku.networking.p2p.libp2p.LibP2PNetwork.REMOTE_PARALLEL_OPEN_STREAMS_COUNT_LIMIT;
import static tech.pegasys.teku.spec.constants.NetworkConstants.MAX_CONCURRENT_REQUESTS;

import com.google.common.base.Preconditions;
import identify.pb.IdentifyOuterClass;
Expand Down Expand Up @@ -153,9 +152,7 @@ public P2PNetwork<Peer> build() {
}

protected List<? extends RpcHandler<?, ?, ?>> createRpcHandlers() {
return rpcMethods.stream()
.map(m -> new RpcHandler<>(asyncRunner, m, MAX_CONCURRENT_REQUESTS))
.toList();
return rpcMethods.stream().map(m -> new RpcHandler<>(asyncRunner, m)).toList();
}

protected LibP2PGossipNetwork createGossipNetwork() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.libp2p.core.PeerId;
import io.libp2p.core.crypto.PubKey;
import io.libp2p.protocol.Identify;
import io.libp2p.protocol.IdentifyController;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -28,6 +29,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.async.ThrottlingTaskQueue;
import tech.pegasys.teku.networking.p2p.libp2p.rpc.RpcHandler;
import tech.pegasys.teku.networking.p2p.network.PeerAddress;
import tech.pegasys.teku.networking.p2p.peer.DisconnectReason;
Expand All @@ -41,11 +43,12 @@
import tech.pegasys.teku.networking.p2p.rpc.RpcRequestHandler;
import tech.pegasys.teku.networking.p2p.rpc.RpcResponseHandler;
import tech.pegasys.teku.networking.p2p.rpc.RpcStreamController;
import tech.pegasys.teku.spec.constants.NetworkConstants;

public class LibP2PPeer implements Peer {
private static final Logger LOG = LogManager.getLogger();

private final Map<RpcMethod<?, ?, ?>, RpcHandler<?, ?, ?>> rpcHandlers;
private final Map<RpcMethod<?, ?, ?>, ThrottlingRpcHandler<?, ?, ?>> rpcHandlers;
private final ReputationManager reputationManager;
private final Function<PeerId, Double> peerScoreFunction;
private final Connection connection;
Expand All @@ -71,7 +74,8 @@ public LibP2PPeer(
final Function<PeerId, Double> peerScoreFunction) {
this.connection = connection;
this.rpcHandlers =
rpcHandlers.stream().collect(Collectors.toMap(RpcHandler::getRpcMethod, h -> h));
rpcHandlers.stream()
.collect(Collectors.toMap(RpcHandler::getRpcMethod, ThrottlingRpcHandler::new));
this.reputationManager = reputationManager;
this.peerScoreFunction = peerScoreFunction;
this.peerId = connection.secureSession().getRemoteId();
Expand Down Expand Up @@ -109,10 +113,6 @@ private PeerClientType getPeerTypeFromAgentString(final String agentVersion) {
return EnumUtils.getEnumIgnoreCase(PeerClientType.class, agent, PeerClientType.UNKNOWN);
}

public Optional<String> getMaybeAgentString() {
return maybeAgentString;
}

public PubKey getPubKey() {
return pubKey;
}
Expand Down Expand Up @@ -161,7 +161,7 @@ private SafeFuture<IdentifyOuterClass.Identify> getIdentify() {
.muxerSession()
.createStream(new Identify())
.getController()
.thenCompose(controller -> controller.id()))
.thenCompose(IdentifyController::id))
.exceptionallyCompose(
error -> {
LOG.debug("Failed to get peer identity", error);
Expand Down Expand Up @@ -208,8 +208,8 @@ SafeFuture<RpcStreamController<TOutgoingHandler>> sendRequest(
final TRequest request,
final RespHandler responseHandler) {
@SuppressWarnings("unchecked")
RpcHandler<TOutgoingHandler, TRequest, RespHandler> rpcHandler =
(RpcHandler<TOutgoingHandler, TRequest, RespHandler>) rpcHandlers.get(rpcMethod);
final ThrottlingRpcHandler<TOutgoingHandler, TRequest, RespHandler> rpcHandler =
(ThrottlingRpcHandler<TOutgoingHandler, TRequest, RespHandler>) rpcHandlers.get(rpcMethod);
if (rpcHandler == null) {
throw new IllegalArgumentException(
"Unknown rpc method invoked: " + String.join(",", rpcMethod.getIds()));
Expand Down Expand Up @@ -240,4 +240,25 @@ public void adjustReputation(final ReputationAdjustment adjustment) {
disconnectCleanly(DisconnectReason.REMOTE_FAULT).ifExceptionGetsHereRaiseABug();
}
}

private static class ThrottlingRpcHandler<
TOutgoingHandler extends RpcRequestHandler,
TRequest,
TRespHandler extends RpcResponseHandler<?>> {

private final RpcHandler<TOutgoingHandler, TRequest, TRespHandler> delegate;

private final ThrottlingTaskQueue requestsQueue =
ThrottlingTaskQueue.create(NetworkConstants.MAX_CONCURRENT_REQUESTS);

ThrottlingRpcHandler(final RpcHandler<TOutgoingHandler, TRequest, TRespHandler> delegate) {
StefanBratanov marked this conversation as resolved.
Show resolved Hide resolved
this.delegate = delegate;
}

SafeFuture<RpcStreamController<TOutgoingHandler>> sendRequest(
final Connection connection, final TRequest request, final TRespHandler responseHandler) {
return requestsQueue.queueTask(
() -> delegate.sendRequest(connection, request, responseHandler));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import tech.pegasys.teku.infrastructure.async.AsyncRunner;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.async.SafeFuture.Interruptor;
import tech.pegasys.teku.infrastructure.async.ThrottlingTaskQueue;
import tech.pegasys.teku.infrastructure.exceptions.ExceptionUtil;
import tech.pegasys.teku.networking.p2p.libp2p.LibP2PNodeId;
import tech.pegasys.teku.networking.p2p.libp2p.rpc.RpcHandler.Controller;
Expand All @@ -63,15 +62,12 @@ public class RpcHandler<

private final AsyncRunner asyncRunner;
private final RpcMethod<TOutgoingHandler, TRequest, TRespHandler> rpcMethod;
private final ThrottlingTaskQueue concurrentRequestsQueue;

public RpcHandler(
final AsyncRunner asyncRunner,
final RpcMethod<TOutgoingHandler, TRequest, TRespHandler> rpcMethod,
final int maxConcurrentRequests) {
final RpcMethod<TOutgoingHandler, TRequest, TRespHandler> rpcMethod) {
this.asyncRunner = asyncRunner;
this.rpcMethod = rpcMethod;
concurrentRequestsQueue = ThrottlingTaskQueue.create(maxConcurrentRequests);
}

public RpcMethod<TOutgoingHandler, TRequest, TRespHandler> getRpcMethod() {
Expand All @@ -80,13 +76,6 @@ public RpcMethod<TOutgoingHandler, TRequest, TRespHandler> getRpcMethod() {

public SafeFuture<RpcStreamController<TOutgoingHandler>> sendRequest(
final Connection connection, final TRequest request, final TRespHandler responseHandler) {
return concurrentRequestsQueue.queueTask(
() -> sendRequestInternal(connection, request, responseHandler));
}

public SafeFuture<RpcStreamController<TOutgoingHandler>> sendRequestInternal(
final Connection connection, final TRequest request, final TRespHandler responseHandler) {

final Bytes initialPayload;
try {
initialPayload = rpcMethod.encodeRequest(request);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright Consensys Software Inc., 2025
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package tech.pegasys.teku.networking.p2p.libp2p;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import io.libp2p.core.Connection;
import io.libp2p.core.security.SecureChannel.Session;
import java.util.List;
import java.util.stream.IntStream;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.networking.p2p.libp2p.rpc.RpcHandler;
import tech.pegasys.teku.networking.p2p.reputation.ReputationManager;
import tech.pegasys.teku.networking.p2p.rpc.RpcMethod;
import tech.pegasys.teku.networking.p2p.rpc.RpcRequestHandler;
import tech.pegasys.teku.networking.p2p.rpc.RpcResponseHandler;
import tech.pegasys.teku.networking.p2p.rpc.RpcStreamController;
import tech.pegasys.teku.spec.constants.NetworkConstants;

public class LibP2PPeerTest {

private final Connection connection = mock(Connection.class);

@SuppressWarnings("unchecked")
private final RpcHandler<RpcRequestHandler, Object, RpcResponseHandler<Void>> rpcHandler =
mock(RpcHandler.class);

@SuppressWarnings("unchecked")
private final RpcMethod<RpcRequestHandler, Object, RpcResponseHandler<Void>> rpcMethod =
mock(RpcMethod.class);

private LibP2PPeer libP2PPeer;

@BeforeEach
public void init() {
when(rpcHandler.getRpcMethod()).thenReturn(rpcMethod);
final Session secureSession = mock(Session.class);
when(connection.secureSession()).thenReturn(secureSession);
when(connection.closeFuture()).thenReturn(new SafeFuture<>());
libP2PPeer =
new LibP2PPeer(connection, List.of(rpcHandler), ReputationManager.NOOP, peer -> 0.0);
}

@SuppressWarnings({"unchecked", "FutureReturnValueIgnored"})
@Test
public void sendRequest_throttlesRequests() {

// fill the queue with incomplete futures
final List<SafeFuture<RpcStreamController<RpcRequestHandler>>> queuedFutures =
IntStream.range(0, NetworkConstants.MAX_CONCURRENT_REQUESTS)
.mapToObj(
__ -> {
final SafeFuture<RpcStreamController<RpcRequestHandler>> future =
new SafeFuture<>();
when(rpcHandler.sendRequest(connection, null, null)).thenReturn(future);
libP2PPeer.sendRequest(rpcMethod, null, null);
return future;
})
.toList();

when(rpcHandler.sendRequest(connection, null, null))
.thenReturn(SafeFuture.completedFuture(mock(RpcStreamController.class)));

final SafeFuture<RpcStreamController<RpcRequestHandler>> throttledRequest =
libP2PPeer.sendRequest(rpcMethod, null, null);

// completed request should be throttled
assertThat(throttledRequest).isNotDone();

// empty the queue
queuedFutures.forEach(future -> future.complete(mock(RpcStreamController.class)));

// throttled request should have completed now
assertThat(throttledRequest).isDone();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import io.libp2p.core.mux.StreamMuxer.Session;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.IntStream;
import kotlin.Unit;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -53,9 +52,8 @@ public class RpcHandlerTest {

StubAsyncRunner asyncRunner = new StubAsyncRunner();
RpcMethod<RpcRequestHandler, Object, RpcResponseHandler<?>> rpcMethod = mock(RpcMethod.class);
int maxConcurrentRequests = 2;
RpcHandler<RpcRequestHandler, Object, RpcResponseHandler<?>> rpcHandler =
new RpcHandler<>(asyncRunner, rpcMethod, maxConcurrentRequests);
new RpcHandler<>(asyncRunner, rpcMethod);

Connection connection = mock(Connection.class);
Session session = mock(Session.class);
Expand Down Expand Up @@ -249,39 +247,6 @@ void sendRequest_interruptBeforeInitialPayloadWritten(
verify(stream).close();
}

@Test
@SuppressWarnings("FutureReturnValueIgnored")
void requestIsThrottledIfQueueIsFull() {
// fill the queue
IntStream.range(0, maxConcurrentRequests)
.forEach(__ -> rpcHandler.sendRequest(connection, request, responseHandler));

final StreamPromise<Controller<RpcRequestHandler>> streamPromise1 =
new StreamPromise<>(new CompletableFuture<>(), new CompletableFuture<>());
when(session.createStream((ProtocolBinding<Controller<RpcRequestHandler>>) any()))
.thenReturn(streamPromise1);
final Stream stream1 = mock(Stream.class);
streamPromise1.getStream().complete(stream1);
streamPromise1.getController().complete(controller);
final CompletableFuture<String> protocolIdFuture1 = new CompletableFuture<>();
when(stream1.getProtocol()).thenReturn(protocolIdFuture1);
protocolIdFuture1.complete("test");

final SafeFuture<RpcStreamController<RpcRequestHandler>> throttledResult =
rpcHandler.sendRequest(connection, request, responseHandler);

assertThat(throttledResult).isNotDone();

// empty the queue
streamPromise.getStream().complete(stream);
streamPromise.getController().complete(controller);
stream.getProtocol().complete("test");
writeFuture.complete(null);

// throttled request should have completed now
assertThat(throttledResult).isCompleted();
}

@SuppressWarnings("UnnecessaryAsync")
private Class<? extends Exception> executeInterrupts(
final boolean closeStream, final boolean exceedTimeout) {
Expand Down
Loading