Skip to content

Commit

Permalink
Deprecate TTFB, RESP_TIMEOUT, introduce MAX_CONCURRENT_REQUESTS (#8839)
Browse files Browse the repository at this point in the history
(cherry picked from commit d633566)
  • Loading branch information
StefanBratanov committed Dec 12, 2024
1 parent fc9e03e commit d3dcb3a
Show file tree
Hide file tree
Showing 13 changed files with 124 additions and 205 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,7 @@ public class NetworkConstants {
public static final int DEFAULT_SAFE_SLOTS_TO_IMPORT_OPTIMISTICALLY = 128;

public static final int NODE_ID_BITS = 256;

// https://github.com/ethereum/consensus-specs/pull/3767
public static final int MAX_CONCURRENT_REQUESTS = 2;
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ public class ThrottlingTaskQueue {

private int inflightTaskCount = 0;

public static ThrottlingTaskQueue create(final int maximumConcurrentTasks) {
return new ThrottlingTaskQueue(maximumConcurrentTasks);
}

public static ThrottlingTaskQueue create(
final int maximumConcurrentTasks,
final MetricsSystem metricsSystem,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import tech.pegasys.teku.networking.p2p.peer.Peer;
import tech.pegasys.teku.networking.p2p.peer.PeerConnectedSubscriber;
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.config.SpecConfig;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.metadata.MetadataMessage;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.metadata.MetadataMessageSchema;
import tech.pegasys.teku.spec.datastructures.state.Checkpoint;
Expand All @@ -51,6 +50,8 @@
public class Eth2PeerManager implements PeerLookup, PeerHandler {
private static final Logger LOG = LogManager.getLogger();

private static final Duration STATUS_RECEIVED_TIMEOUT = Duration.ofSeconds(10);

private final AsyncRunner asyncRunner;
private final RecentChainData recentChainData;
private final Eth2PeerFactory eth2PeerFactory;
Expand All @@ -66,7 +67,6 @@ public class Eth2PeerManager implements PeerLookup, PeerHandler {
private final int eth2RpcOutstandingPingThreshold;

private final Duration eth2StatusUpdateInterval;
private final SpecConfig specConfig;

Eth2PeerManager(
final Spec spec,
Expand Down Expand Up @@ -99,7 +99,6 @@ public class Eth2PeerManager implements PeerLookup, PeerHandler {
this.eth2RpcPingInterval = eth2RpcPingInterval;
this.eth2RpcOutstandingPingThreshold = eth2RpcOutstandingPingThreshold;
this.eth2StatusUpdateInterval = eth2StatusUpdateInterval;
this.specConfig = spec.getGenesisSpecConfig();
}

public static Eth2PeerManager create(
Expand Down Expand Up @@ -237,7 +236,7 @@ private void ensureStatusReceived(final Eth2Peer peer) {
.ifExceptionGetsHereRaiseABug();
}
},
Duration.ofSeconds(specConfig.getRespTimeout()))
STATUS_RECEIVED_TIMEOUT)
.finish(
() -> {},
error -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ public static BeaconChainMethods create(
final MetadataMessagesFactory metadataMessagesFactory,
final RpcEncoding rpcEncoding) {
return new BeaconChainMethods(
createStatus(spec, asyncRunner, statusMessageFactory, peerLookup, rpcEncoding),
createGoodBye(spec, asyncRunner, metricsSystem, peerLookup, rpcEncoding),
createStatus(asyncRunner, statusMessageFactory, peerLookup, rpcEncoding),
createGoodBye(asyncRunner, metricsSystem, peerLookup, rpcEncoding),
createBeaconBlocksByRoot(
spec, metricsSystem, asyncRunner, recentChainData, peerLookup, rpcEncoding),
createBeaconBlocksByRange(
Expand Down Expand Up @@ -144,11 +144,10 @@ public static BeaconChainMethods create(
rpcEncoding,
recentChainData),
createMetadata(spec, asyncRunner, metadataMessagesFactory, peerLookup, rpcEncoding),
createPing(spec, asyncRunner, metadataMessagesFactory, peerLookup, rpcEncoding));
createPing(asyncRunner, metadataMessagesFactory, peerLookup, rpcEncoding));
}

private static Eth2RpcMethod<StatusMessage, StatusMessage> createStatus(
final Spec spec,
final AsyncRunner asyncRunner,
final StatusMessageFactory statusMessageFactory,
final PeerLookup peerLookup,
Expand All @@ -165,12 +164,10 @@ private static Eth2RpcMethod<StatusMessage, StatusMessage> createStatus(
true,
contextCodec,
statusHandler,
peerLookup,
spec.getNetworkingConfig());
peerLookup);
}

private static Eth2RpcMethod<GoodbyeMessage, GoodbyeMessage> createGoodBye(
final Spec spec,
final AsyncRunner asyncRunner,
final MetricsSystem metricsSystem,
final PeerLookup peerLookup,
Expand All @@ -187,8 +184,7 @@ private static Eth2RpcMethod<GoodbyeMessage, GoodbyeMessage> createGoodBye(
false,
contextCodec,
goodbyeHandler,
peerLookup,
spec.getNetworkingConfig());
peerLookup);
}

private static Eth2RpcMethod<BeaconBlocksByRootRequestMessage, SignedBeaconBlock>
Expand Down Expand Up @@ -221,8 +217,7 @@ private static Eth2RpcMethod<GoodbyeMessage, GoodbyeMessage> createGoodBye(
expectResponseToRequest,
forkDigestContextCodec,
beaconBlocksByRootHandler,
peerLookup,
spec.getNetworkingConfig());
peerLookup);

return VersionedEth2RpcMethod.create(
rpcEncoding, requestType, expectResponseToRequest, List.of(v2Method));
Expand Down Expand Up @@ -259,8 +254,7 @@ private static Eth2RpcMethod<GoodbyeMessage, GoodbyeMessage> createGoodBye(
expectResponseToRequest,
forkDigestContextCodec,
beaconBlocksByRangeHandler,
peerLookup,
spec.getNetworkingConfig());
peerLookup);

return VersionedEth2RpcMethod.create(
rpcEncoding, requestType, expectResponseToRequest, List.of(v2Method));
Expand Down Expand Up @@ -299,8 +293,7 @@ private static Eth2RpcMethod<GoodbyeMessage, GoodbyeMessage> createGoodBye(
true,
forkDigestContextCodec,
blobSidecarsByRootHandler,
peerLookup,
spec.getNetworkingConfig()));
peerLookup));
}

private static Optional<Eth2RpcMethod<BlobSidecarsByRangeRequestMessage, BlobSidecar>>
Expand Down Expand Up @@ -336,8 +329,7 @@ private static Eth2RpcMethod<GoodbyeMessage, GoodbyeMessage> createGoodBye(
true,
forkDigestContextCodec,
blobSidecarsByRangeHandler,
peerLookup,
spec.getNetworkingConfig()));
peerLookup));
}

private static Eth2RpcMethod<EmptyMessage, MetadataMessage> createMetadata(
Expand Down Expand Up @@ -369,8 +361,7 @@ private static Eth2RpcMethod<EmptyMessage, MetadataMessage> createMetadata(
expectResponse,
phase0ContextCodec,
messageHandler,
peerLookup,
spec.getNetworkingConfig());
peerLookup);

if (spec.isMilestoneSupported(SpecMilestone.ALTAIR)) {
final SszSchema<MetadataMessage> altairMetadataSchema =
Expand All @@ -392,8 +383,7 @@ private static Eth2RpcMethod<EmptyMessage, MetadataMessage> createMetadata(
expectResponse,
altairContextCodec,
messageHandler,
peerLookup,
spec.getNetworkingConfig());
peerLookup);
return VersionedEth2RpcMethod.create(
rpcEncoding, requestType, expectResponse, List.of(v2Method, v1Method));
} else {
Expand All @@ -402,7 +392,6 @@ private static Eth2RpcMethod<EmptyMessage, MetadataMessage> createMetadata(
}

private static Eth2RpcMethod<PingMessage, PingMessage> createPing(
final Spec spec,
final AsyncRunner asyncRunner,
final MetadataMessagesFactory metadataMessagesFactory,
final PeerLookup peerLookup,
Expand All @@ -419,8 +408,7 @@ private static Eth2RpcMethod<PingMessage, PingMessage> createPing(
true,
contextCodec,
statusHandler,
peerLookup,
spec.getNetworkingConfig());
peerLookup);
}

public Collection<RpcMethod<?, ?, ?>> all() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@
import tech.pegasys.teku.networking.p2p.rpc.RpcRequestHandler;
import tech.pegasys.teku.networking.p2p.rpc.RpcStream;
import tech.pegasys.teku.networking.p2p.rpc.StreamClosedException;
import tech.pegasys.teku.spec.config.NetworkingSpecConfig;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.RpcRequest;

public class Eth2IncomingRequestHandler<
TRequest extends RpcRequest & SszData, TResponse extends SszData>
implements RpcRequestHandler {
private static final Logger LOG = LogManager.getLogger();
private static final Duration RECEIVE_INCOMING_REQUEST_TIMEOUT = Duration.ofSeconds(10);

private final PeerLookup peerLookup;
private final LocalMessageHandler<TRequest, TResponse> localMessageHandler;
Expand All @@ -45,23 +45,20 @@ public class Eth2IncomingRequestHandler<
private final String protocolId;
private final AsyncRunner asyncRunner;
private final AtomicBoolean requestHandled = new AtomicBoolean(false);
private final Duration respTimeout;

public Eth2IncomingRequestHandler(
final String protocolId,
final RpcResponseEncoder<TResponse, ?> responseEncoder,
final RpcRequestDecoder<TRequest> requestDecoder,
final AsyncRunner asyncRunner,
final PeerLookup peerLookup,
final LocalMessageHandler<TRequest, TResponse> localMessageHandler,
final NetworkingSpecConfig networkingConfig) {
final LocalMessageHandler<TRequest, TResponse> localMessageHandler) {
this.protocolId = protocolId;
this.asyncRunner = asyncRunner;
this.peerLookup = peerLookup;
this.localMessageHandler = localMessageHandler;
this.responseEncoder = responseEncoder;
this.requestDecoder = requestDecoder;
this.respTimeout = Duration.ofSeconds(networkingConfig.getRespTimeout());
}

@Override
Expand Down Expand Up @@ -121,15 +118,14 @@ private void handleRequest(
}

private void ensureRequestReceivedWithinTimeLimit(final RpcStream stream) {
final Duration timeout = respTimeout;
asyncRunner
.getDelayedFuture(timeout)
.getDelayedFuture(RECEIVE_INCOMING_REQUEST_TIMEOUT)
.thenAccept(
(__) -> {
if (!requestHandled.get()) {
LOG.debug(
"Failed to receive incoming request data within {} sec for protocol {}. Close stream.",
timeout.toSeconds(),
RECEIVE_INCOMING_REQUEST_TIMEOUT.toSeconds(),
protocolId);
stream.closeAbruptly().ifExceptionGetsHereRaiseABug();
}
Expand Down
Loading

0 comments on commit d3dcb3a

Please sign in to comment.