Skip to content

Commit

Permalink
Better handle new streams when server is quiescing (#481)
Browse files Browse the repository at this point in the history
Motivation:

When the server is in the locally quiescing state and a client attempts
to open a new stream, the connection state machine treats receiving the
headers as a connection error. This results in the connection being
closed and in-flight requests being failed.

This can happen because of the inherent race between the server sending
a GOAWAY frame and the client receiving it, during which time the client
can open a stream not knowing it's doomed to failure.

Rather than the server treating this as a connection error, it should
reject the stream with a RST_STREAM frame and emit a stream error.

Modifications:

- Emit a stream error when receiving HEADERS in the locally quiesced
state

Result:

Better handling of new streams when the server is quiescing
  • Loading branch information
glbrntt authored Jan 20, 2025
1 parent 1e0c823 commit 124e859
Show file tree
Hide file tree
Showing 13 changed files with 462 additions and 162 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
"1k_requests_inline_noninterleaved": 29100,
"1k_requests_interleaved": 36150,
"1k_requests_noninterleaved": 35100,
"client_server_h1_request_response": 276050,
"client_server_h1_request_response_inline": 261050,
"client_server_request_response": 245050,
"client_server_request_response_inline": 236050,
"client_server_request_response_many": 1190050,
"client_server_request_response_many_inline": 881050,
"client_server_h1_request_response": 278050,
"client_server_h1_request_response_inline": 263050,
"client_server_request_response": 247050,
"client_server_request_response_inline": 238050,
"client_server_request_response_many": 1192050,
"client_server_request_response_many_inline": 883050,
"create_client_stream_channel": 35050,
"create_client_stream_channel_inline": 35050,
"create_client_stream_channel_inline_no_promise_based_API": 35050,
Expand All @@ -18,6 +18,6 @@
"get_100000_headers_canonical_form_trimming_whitespace_from_long_string": 300050,
"get_100000_headers_canonical_form_trimming_whitespace_from_short_string": 200050,
"hpack_decoding": 5050,
"stream_teardown_100_concurrent": 252400,
"stream_teardown_100_concurrent_inline": 252400
}
"stream_teardown_100_concurrent": 252550,
"stream_teardown_100_concurrent_inline": 251650
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
"1k_requests_inline_noninterleaved": 29100,
"1k_requests_interleaved": 36150,
"1k_requests_noninterleaved": 35100,
"client_server_h1_request_response": 276050,
"client_server_h1_request_response_inline": 261050,
"client_server_request_response": 245050,
"client_server_request_response_inline": 236050,
"client_server_request_response_many": 1190050,
"client_server_request_response_many_inline": 881050,
"client_server_h1_request_response": 278050,
"client_server_h1_request_response_inline": 263050,
"client_server_request_response": 247050,
"client_server_request_response_inline": 238050,
"client_server_request_response_many": 1192050,
"client_server_request_response_many_inline": 883050,
"create_client_stream_channel": 35050,
"create_client_stream_channel_inline": 35050,
"create_client_stream_channel_inline_no_promise_based_API": 35050,
Expand All @@ -18,6 +18,6 @@
"get_100000_headers_canonical_form_trimming_whitespace_from_long_string": 300050,
"get_100000_headers_canonical_form_trimming_whitespace_from_short_string": 200050,
"hpack_decoding": 5050,
"stream_teardown_100_concurrent": 252400,
"stream_teardown_100_concurrent_inline": 252400
}
"stream_teardown_100_concurrent": 252550,
"stream_teardown_100_concurrent_inline": 251650
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
"1k_requests_inline_noninterleaved": 29100,
"1k_requests_interleaved": 36150,
"1k_requests_noninterleaved": 35100,
"client_server_h1_request_response": 276050,
"client_server_h1_request_response_inline": 261050,
"client_server_request_response": 245050,
"client_server_request_response_inline": 236050,
"client_server_request_response_many": 1190050,
"client_server_request_response_many_inline": 881050,
"client_server_h1_request_response": 278050,
"client_server_h1_request_response_inline": 263050,
"client_server_request_response": 247050,
"client_server_request_response_inline": 238050,
"client_server_request_response_many": 1192050,
"client_server_request_response_many_inline": 883050,
"create_client_stream_channel": 35050,
"create_client_stream_channel_inline": 35050,
"create_client_stream_channel_inline_no_promise_based_API": 35050,
Expand All @@ -18,6 +18,6 @@
"get_100000_headers_canonical_form_trimming_whitespace_from_long_string": 300050,
"get_100000_headers_canonical_form_trimming_whitespace_from_short_string": 200050,
"hpack_decoding": 5050,
"stream_teardown_100_concurrent": 252450,
"stream_teardown_100_concurrent_inline": 251550
}
"stream_teardown_100_concurrent": 252550,
"stream_teardown_100_concurrent_inline": 251650
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
"1k_requests_inline_noninterleaved": 29100,
"1k_requests_interleaved": 36150,
"1k_requests_noninterleaved": 35100,
"client_server_h1_request_response": 276050,
"client_server_h1_request_response_inline": 261050,
"client_server_request_response": 245050,
"client_server_request_response_inline": 236050,
"client_server_request_response_many": 1190050,
"client_server_request_response_many_inline": 881050,
"client_server_h1_request_response": 278050,
"client_server_h1_request_response_inline": 263050,
"client_server_request_response": 247050,
"client_server_request_response_inline": 238050,
"client_server_request_response_many": 1192050,
"client_server_request_response_many_inline": 883050,
"create_client_stream_channel": 35050,
"create_client_stream_channel_inline": 35050,
"create_client_stream_channel_inline_no_promise_based_API": 35050,
Expand All @@ -18,6 +18,6 @@
"get_100000_headers_canonical_form_trimming_whitespace_from_long_string": 300050,
"get_100000_headers_canonical_form_trimming_whitespace_from_short_string": 200050,
"hpack_decoding": 5050,
"stream_teardown_100_concurrent": 252450,
"stream_teardown_100_concurrent_inline": 251550
}
"stream_teardown_100_concurrent": 252550,
"stream_teardown_100_concurrent_inline": 251650
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
"1k_requests_inline_noninterleaved": 29100,
"1k_requests_interleaved": 36150,
"1k_requests_noninterleaved": 35100,
"client_server_h1_request_response": 276050,
"client_server_h1_request_response_inline": 261050,
"client_server_request_response": 245050,
"client_server_request_response_inline": 236050,
"client_server_request_response_many": 1190050,
"client_server_request_response_many_inline": 881050,
"client_server_h1_request_response": 278050,
"client_server_h1_request_response_inline": 263050,
"client_server_request_response": 247050,
"client_server_request_response_inline": 238050,
"client_server_request_response_many": 1192050,
"client_server_request_response_many_inline": 883050,
"create_client_stream_channel": 35050,
"create_client_stream_channel_inline": 35050,
"create_client_stream_channel_inline_no_promise_based_API": 35050,
Expand All @@ -18,6 +18,6 @@
"get_100000_headers_canonical_form_trimming_whitespace_from_long_string": 300050,
"get_100000_headers_canonical_form_trimming_whitespace_from_short_string": 200050,
"hpack_decoding": 5050,
"stream_teardown_100_concurrent": 252450,
"stream_teardown_100_concurrent_inline": 251550
"stream_teardown_100_concurrent": 252550,
"stream_teardown_100_concurrent_inline": 251650
}
Original file line number Diff line number Diff line change
Expand Up @@ -175,20 +175,23 @@ struct ConnectionStreamState {
/// - streamID: The ID of the stream to modify.
/// - ignoreRecentlyReset: Whether a recently reset stream should be ignored. Should be set to `true` when receiving frames.
/// - ignoreClosed: Whether a closed stream should be ignored. Should be set to `true` when receiving window update or reset stream frames.
/// - isLocallyQuiescing: Whether the connection is quiescing and the quiescing was initiated locally.
/// - modifier: A block that will be invoked to modify the stream state, if present.
/// - Returns: The result of the state modification, as well as any state change that occurred to the stream.
mutating func modifyStreamState(
streamID: HTTP2StreamID,
ignoreRecentlyReset: Bool,
ignoreClosed: Bool = false,
isLocallyQuiescing: Bool = false,
_ modifier: (inout HTTP2StreamStateMachine) -> StateMachineResultWithStreamEffect
) -> StateMachineResultWithStreamEffect {
guard let result = self.activeStreams.autoClosingTransform(streamID: streamID, modifier) else {
return StateMachineResultWithStreamEffect(
result: self.streamMissing(
streamID: streamID,
ignoreRecentlyReset: ignoreRecentlyReset,
ignoreClosed: ignoreClosed
ignoreClosed: ignoreClosed,
isLocallyQuiescing: isLocallyQuiescing
),
effect: nil
)
Expand Down Expand Up @@ -218,11 +221,9 @@ struct ConnectionStreamState {
_ modifier: (inout HTTP2StreamStateMachine) -> StateMachineResultWithStreamEffect
) -> StateMachineResultWithStreamEffect {
guard let result = self.activeStreams.autoClosingTransform(streamID: streamID, modifier) else {
// We never ignore recently reset streams here, as this should only ever be used when *sending* frames.
return StateMachineResultWithStreamEffect(
result: self.streamMissing(streamID: streamID, ignoreRecentlyReset: false, ignoreClosed: false),
effect: nil
)
// Always allow RST_STREAM to be sent locally, even if the stream doesn't exist. This can happen
// when a stream is rejected so the state machine won't know about the stream.
return StateMachineResultWithStreamEffect(result: .succeed)
}

guard let effect = result.effect, effect.closedStream else {
Expand Down Expand Up @@ -320,11 +321,13 @@ struct ConnectionStreamState {
/// - streamID: The ID of the missing stream.
/// - ignoreRecentlyReset: Whether a recently reset stream should be ignored.
/// - ignoreClosed: Whether a closed stream should be ignored.
/// - isLocallyQuiescing: Whether the connection is quiescing and the quiescing was initiated locally.
/// - Returns: A `StateMachineResult` for this frame error.
private func streamMissing(
streamID: HTTP2StreamID,
ignoreRecentlyReset: Bool,
ignoreClosed: Bool
ignoreClosed: Bool,
isLocallyQuiescing: Bool = false
) -> StateMachineResult {
if ignoreRecentlyReset && self.recentlyResetStreams.contains(streamID) {
return .ignoreFrame
Expand All @@ -333,11 +336,22 @@ struct ConnectionStreamState {
switch streamID.mayBeInitiatedBy(.client) {
case true where streamID > self.lastClientStreamID,
false where streamID > self.lastServerStreamID:
// The stream in question is idle.
return .connectionError(
underlyingError: NIOHTTP2Errors.noSuchStream(streamID: streamID),
type: .protocolError
)
if isLocallyQuiescing {
return .streamError(
streamID: streamID,
underlyingError: NIOHTTP2Errors.streamError(
streamID: streamID,
baseError: NIOHTTP2Errors.createdStreamAfterGoaway()
),
type: .refusedStream
)
} else {
// The stream in question is idle.
return .connectionError(
underlyingError: NIOHTTP2Errors.noSuchStream(streamID: streamID),
type: .protocolError
)
}
default:
// This stream must have already been closed.
if ignoreClosed {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,11 @@ extension ReceivingHeadersState where Self: LocallyQuiescingState {
}

// At this stage we've quiesced, so the remote peer is not allowed to create new streams.
let result = self.streamState.modifyStreamState(streamID: streamID, ignoreRecentlyReset: true) {
let result = self.streamState.modifyStreamState(
streamID: streamID,
ignoreRecentlyReset: true,
isLocallyQuiescing: true
) {
$0.receiveHeaders(
headers: headers,
validateHeaderBlock: validateHeaderBlock,
Expand Down
42 changes: 29 additions & 13 deletions Sources/NIOHTTP2/DOSHeuristics.swift
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,15 @@ struct DOSHeuristics<DeadlineClock: NIODeadlineClock> {
/// The maximum number of "empty" data frames we're willing to tolerate.
private let maximumSequentialEmptyDataFrames: Int

private var resetFrameRateControlStateMachine: HTTP2ResetFrameRateControlStateMachine
private var resetFrameRateControlStateMachine: RateLimitStateMachine
private var streamErrorRateControlStateMachine: RateLimitStateMachine

internal init(
maximumSequentialEmptyDataFrames: Int,
maximumResetFrameCount: Int,
resetFrameCounterWindow: TimeAmount,
maximumStreamErrorCount: Int,
streamErrorCounterWindow: TimeAmount,
clock: DeadlineClock = RealNIODeadlineClock()
) {
precondition(
Expand All @@ -47,6 +50,11 @@ struct DOSHeuristics<DeadlineClock: NIODeadlineClock> {
timeWindow: resetFrameCounterWindow,
clock: clock
)
self.streamErrorRateControlStateMachine = .init(
countThreshold: maximumStreamErrorCount,
timeWindow: streamErrorCounterWindow,
clock: clock
)
}
}

Expand All @@ -64,7 +72,7 @@ extension DOSHeuristics {
case .headers:
self.receivedEmptyDataFrames = 0
case .rstStream:
switch self.resetFrameRateControlStateMachine.resetReceived() {
switch self.resetFrameRateControlStateMachine.recordEvent() {
case .rateTooHigh:
throw NIOHTTP2Errors.excessiveRSTFrames()
case .noneReceived, .ratePermitted:
Expand All @@ -80,13 +88,21 @@ extension DOSHeuristics {
throw NIOHTTP2Errors.excessiveEmptyDataFrames()
}
}

mutating func processStreamError() throws {
switch self.streamErrorRateControlStateMachine.recordEvent() {
case .rateTooHigh:
throw NIOHTTP2Errors.excessiveStreamErrors()
case .noneReceived, .ratePermitted:
()
}
}
}

extension DOSHeuristics {
// protect against excessive numbers of stream RST frames being issued
struct HTTP2ResetFrameRateControlStateMachine {

enum ResetFrameRateControlState: Hashable {
struct RateLimitStateMachine {
enum RateState: Hashable {
case noneReceived
case ratePermitted
case rateTooHigh
Expand All @@ -96,28 +112,28 @@ extension DOSHeuristics {
private let timeWindow: TimeAmount
private let clock: DeadlineClock

private var resetTimestamps: Deque<NIODeadline>
private var _state: ResetFrameRateControlState = .noneReceived
private var timestamps: Deque<NIODeadline>
private var _state: RateState = .noneReceived

init(countThreshold: Int, timeWindow: TimeAmount, clock: DeadlineClock = RealNIODeadlineClock()) {
self.countThreshold = countThreshold
self.timeWindow = timeWindow
self.clock = clock

self.resetTimestamps = .init(minimumCapacity: self.countThreshold)
self.timestamps = .init(minimumCapacity: self.countThreshold)
}

mutating func resetReceived() -> ResetFrameRateControlState {
mutating func recordEvent() -> RateState {
self.garbageCollect()
self.resetTimestamps.append(self.clock.now())
self.timestamps.append(self.clock.now())
self.evaluateState()
return self._state
}

private mutating func garbageCollect() {
let now = self.clock.now()
while let first = self.resetTimestamps.first, now - first > self.timeWindow {
_ = self.resetTimestamps.popFirst()
while let first = self.timestamps.first, now - first > self.timeWindow {
_ = self.timestamps.popFirst()
}
}

Expand All @@ -126,7 +142,7 @@ extension DOSHeuristics {
case .noneReceived:
self._state = .ratePermitted
case .ratePermitted:
if self.resetTimestamps.count > self.countThreshold {
if self.timestamps.count > self.countThreshold {
self._state = .rateTooHigh
}
case .rateTooHigh:
Expand Down
Loading

0 comments on commit 124e859

Please sign in to comment.