Skip to content

Commit

Permalink
Don't trap on invalid connection state transitions (#1573)
Browse files Browse the repository at this point in the history
Motivation:

The connection manager is quite aggresive about trapping if an invalid
state is hit. It's alsmost impossible to know when states are truly
unreachable so in most coses we should handle them as best as we can. If
we believe them to be unreachable but cannot easily prove it then we
should crash in debug mode and handle it as best as possible in release
mode.

Modifications:

- Handle various state transitions more gently in the connection
  manager.

Result:

Gentler state handling.
  • Loading branch information
glbrntt authored Mar 3, 2023
1 parent a20cac0 commit 770629b
Show file tree
Hide file tree
Showing 2 changed files with 193 additions and 70 deletions.
150 changes: 80 additions & 70 deletions Sources/GRPC/ConnectionManager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,14 @@ internal final class ConnectionManager {
var scheduled: Scheduled<Void>
var reason: Error

init(from state: ConnectingState, scheduled: Scheduled<Void>, reason: Error) {
init(from state: ConnectingState, scheduled: Scheduled<Void>, reason: Error?) {
self.backoffIterator = state.backoffIterator
self.readyChannelMuxPromise = state.readyChannelMuxPromise
self.scheduled = scheduled
self.reason = reason
self.reason = reason ?? GRPCStatus(
code: .unavailable,
message: "Unexpected connection drop"
)
}

init(from state: ConnectedState, scheduled: Scheduled<Void>) {
Expand Down Expand Up @@ -391,7 +394,7 @@ internal final class ConnectionManager {
self.startConnecting()
// We started connecting so we must transition to the `connecting` state.
guard case let .connecting(connecting) = self.state else {
self.invalidState()
self.unreachableState()
}
multiplexer = connecting.readyChannelMuxPromise.futureResult

Expand Down Expand Up @@ -432,7 +435,7 @@ internal final class ConnectionManager {
self.startConnecting()
// We started connecting so we must transition to the `connecting` state.
guard case let .connecting(connecting) = self.state else {
self.invalidState()
self.unreachableState()
}
return connecting.candidateMuxPromise.futureResult
case let .connecting(state):
Expand Down Expand Up @@ -674,20 +677,13 @@ internal final class ConnectionManager {
case .shutdown:
channel.close(mode: .all, promise: nil)

// These cases are purposefully separated: some crash reporting services provide stack traces
// which don't include the precondition failure message (which contain the invalid state we were
// in). Keeping the cases separate allows us work out the state from the line number.
case .idle:
self.invalidState()

case .active:
self.invalidState()

case .ready:
self.invalidState()

case .transientFailure:
self.invalidState()
case .idle, .transientFailure:
// Received a channelActive when not connecting. Can happen if channelActive and
// channelInactive are reordered. Ignore.
()
case .active, .ready:
// Received a second 'channelActive', already active so ignore.
()
}
}

Expand All @@ -700,6 +696,43 @@ internal final class ConnectionManager {
])

switch self.state {
// We can hit inactive in connecting if we see channelInactive before channelActive; that's not
// common but we should tolerate it.
case let .connecting(connecting):
// Should we try connecting again?
switch connecting.reconnect {
// No, shutdown instead.
case .none:
self.logger.debug("shutting down connection")

let error = GRPCStatus(
code: .unavailable,
message: "The connection was dropped and connection re-establishment is disabled"
)

let shutdownState = ShutdownState(
closeFuture: self.eventLoop.makeSucceededFuture(()),
reason: error
)

self.state = .shutdown(shutdownState)
// Shutting down, so fail the outstanding promises.
connecting.readyChannelMuxPromise.fail(error)
connecting.candidateMuxPromise.fail(error)

// Yes, after some time.
case let .after(delay):
let error = GRPCStatus(code: .unavailable, message: "Connection closed while connecting")
// Fail the candidate mux promise. KEep the 'readyChannelMuxPromise' as we'll try again.
connecting.candidateMuxPromise.fail(error)

let scheduled = self.eventLoop.scheduleTask(in: .seconds(timeInterval: delay)) {
self.startConnecting()
}
self.logger.debug("scheduling connection attempt", metadata: ["delay_secs": "\(delay)"])
self.state = .transientFailure(.init(from: connecting, scheduled: scheduled, reason: nil))
}

// The channel is `active` but not `ready`. Should we try again?
case let .active(active):
switch active.reconnect {
Expand Down Expand Up @@ -766,14 +799,9 @@ internal final class ConnectionManager {
case .shutdown:
()

// These cases are purposefully separated: some crash reporting services provide stack traces
// which don't include the precondition failure message (which contain the invalid state we were
// in). Keeping the cases separate allows us work out the state from the line number.
case .connecting:
self.invalidState()

// Received 'channelInactive' twice; fine, ignore.
case .transientFailure:
self.invalidState()
()
}
}

Expand All @@ -793,20 +821,20 @@ internal final class ConnectionManager {
case .shutdown:
()

// These cases are purposefully separated: some crash reporting services provide stack traces
// which don't include the precondition failure message (which contain the invalid state we were
// in). Keeping the cases separate allows us work out the state from the line number.
case .idle:
self.invalidState()

case .transientFailure:
self.invalidState()
case .idle, .transientFailure:
// No connection or connection attempt exists but connection was marked as ready. This is
// strange. Ignore it in release mode as there's nothing to close and nowehere to fire an
// error to.
assertionFailure("received initial HTTP/2 SETTINGS frame in \(self.state.label) state")

case .connecting:
self.invalidState()
// No channel exists to receive initial HTTP/2 SETTINGS frame on... weird. Ignore in release
// mode.
assertionFailure("received initial HTTP/2 SETTINGS frame in \(self.state.label) state")

case .ready:
self.invalidState()
// Already received initial HTTP/2 SETTINGS frame; ignore in release mode.
assertionFailure("received initial HTTP/2 SETTINGS frame in \(self.state.label) state")
}
}

Expand Down Expand Up @@ -834,17 +862,14 @@ internal final class ConnectionManager {
// 'channelInactive()'.
()

// These cases are purposefully separated: some crash reporting services provide stack traces
// which don't include the precondition failure message (which contain the invalid state we were
// in). Keeping the cases separate allows us work out the state from the line number.
case .idle:
self.invalidState()
case .idle, .transientFailure:
// There's no connection to idle; ignore.
()

case .connecting:
self.invalidState()

case .transientFailure:
self.invalidState()
// The idle watchdog is started when the connection is active, this shouldn't happen
// in the connecting state. Ignore it in release mode.
assertionFailure("tried to idle a connection in the \(self.state.label) state")
}
}

Expand Down Expand Up @@ -908,22 +933,10 @@ extension ConnectionManager {
case .shutdown:
()

// We can't fail to connect if we aren't trying.
//
// These cases are purposefully separated: some crash reporting services provide stack traces
// which don't include the precondition failure message (which contain the invalid state we were
// in). Keeping the cases separate allows us work out the state from the line number.
case .idle:
self.invalidState()

case .active:
self.invalidState()

case .ready:
self.invalidState()

case .transientFailure:
self.invalidState()
// Connection attempt failed, but no connection attempt is in progress.
case .idle, .active, .ready, .transientFailure:
// Nothing we can do other than ignore in release mode.
assertionFailure("connect promise failed in \(self.state.label) state")
}
}
}
Expand Down Expand Up @@ -951,17 +964,14 @@ extension ConnectionManager {
case .shutdown:
()

// These cases are purposefully separated: some crash reporting services provide stack traces
// which don't include the precondition failure message (which contain the invalid state we were
// in). Keeping the cases separate allows us work out the state from the line number.
// We only call startConnecting() if the connection does not exist and after checking what the
// current state is, so none of these states should be reachable.
case .connecting:
self.invalidState()

self.unreachableState()
case .active:
self.invalidState()

self.unreachableState()
case .ready:
self.invalidState()
self.unreachableState()
}
}

Expand Down Expand Up @@ -1066,11 +1076,11 @@ extension ConnectionManager {
}

extension ConnectionManager {
private func invalidState(
private func unreachableState(
function: StaticString = #function,
file: StaticString = #fileID,
line: UInt = #line
) -> Never {
preconditionFailure("Invalid state \(self.state) for \(function)", file: file, line: line)
fatalError("Invalid state \(self.state) for \(function)", file: file, line: line)
}
}
113 changes: 113 additions & 0 deletions Tests/GRPCTests/ConnectionManagerTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,119 @@ extension ConnectionManagerTests {
}
}

func testChannelInactiveBeforeActiveWithNoReconnect() throws {
let channel = EmbeddedChannel(loop: self.loop)
let channelPromise = self.loop.makePromise(of: Channel.self)

let manager = self.makeConnectionManager { _, _ in
return channelPromise.futureResult
}

// Start the connection.
self.waitForStateChange(from: .idle, to: .connecting) {
// Triggers the connect.
_ = manager.getHTTP2Multiplexer()
self.loop.run()
}

try channel.pipeline.syncOperations.addHandler(
GRPCIdleHandler(
connectionManager: manager,
multiplexer: HTTP2StreamMultiplexer(
mode: .client,
channel: channel,
inboundStreamInitializer: nil
),
idleTimeout: .minutes(5),
keepalive: .init(),
logger: self.logger
)
)
channelPromise.succeed(channel)
// Oops: wrong way around. We should tolerate this.
self.waitForStateChange(from: .connecting, to: .shutdown) {
channel.pipeline.fireChannelInactive()
}

// Should be ignored.
channel.pipeline.fireChannelActive()
}

func testChannelInactiveBeforeActiveWillReconnect() throws {
var channels = [EmbeddedChannel(loop: self.loop), EmbeddedChannel(loop: self.loop)]
var channelPromises: [EventLoopPromise<Channel>] = [self.loop.makePromise(),
self.loop.makePromise()]
var channelFutures = Array(channelPromises.map { $0.futureResult })

var configuration = self.defaultConfiguration
configuration.connectionBackoff = .oneSecondFixed

let manager = self.makeConnectionManager(configuration: configuration) { _, _ in
return channelFutures.removeLast()
}

// Start the connection.
self.waitForStateChange(from: .idle, to: .connecting) {
// Triggers the connect.
_ = manager.getHTTP2Multiplexer()
self.loop.run()
}

// Setup the channel.
let channel1 = channels.removeLast()
let channel1Promise = channelPromises.removeLast()

try channel1.pipeline.syncOperations.addHandler(
GRPCIdleHandler(
connectionManager: manager,
multiplexer: HTTP2StreamMultiplexer(
mode: .client,
channel: channel1,
inboundStreamInitializer: nil
),
idleTimeout: .minutes(5),
keepalive: .init(),
logger: self.logger
)
)
channel1Promise.succeed(channel1)
// Oops: wrong way around. We should tolerate this.
self.waitForStateChange(from: .connecting, to: .transientFailure) {
channel1.pipeline.fireChannelInactive()
}

channel1.pipeline.fireChannelActive()

// Start the next attempt.
self.waitForStateChange(from: .transientFailure, to: .connecting) {
self.loop.advanceTime(by: .seconds(1))
}

let channel2 = channels.removeLast()
let channel2Promise = channelPromises.removeLast()
try channel2.pipeline.syncOperations.addHandler(
GRPCIdleHandler(
connectionManager: manager,
multiplexer: HTTP2StreamMultiplexer(
mode: .client,
channel: channel1,
inboundStreamInitializer: nil
),
idleTimeout: .minutes(5),
keepalive: .init(),
logger: self.logger
)
)

channel2Promise.succeed(channel2)

try self.waitForStateChange(from: .connecting, to: .ready) {
channel2.pipeline.fireChannelActive()
let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(.settings([])))
XCTAssertNoThrow(try channel2.writeInbound(frame))
}
}

func testIdleTimeoutWhenThereAreActiveStreams() throws {
let channelPromise = self.loop.makePromise(of: Channel.self)
let manager = self.makeConnectionManager { _, _ in
Expand Down

0 comments on commit 770629b

Please sign in to comment.