Skip to content

Commit

Permalink
Dont retry streamed requests (#428)
Browse files Browse the repository at this point in the history
* Don't retry request when request body is streamed

* Add test to verify retry is not called on streamed requests

* swift format
  • Loading branch information
adam-fowler authored Mar 10, 2021
1 parent 69d9bf8 commit 95209ce
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 5 deletions.
26 changes: 21 additions & 5 deletions Sources/SotoCore/AWSClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -453,25 +453,32 @@ extension AWSClient {
) -> EventLoopFuture<Output> {
let eventLoop = eventLoop ?? eventLoopGroup.next()
let logger = logger.attachingRequestId(Self.globalRequestID.add(1), operation: operationName, service: config.service)

// get credentials
let future: EventLoopFuture<Output> = credentialProvider.getCredential(on: eventLoop, logger: logger)
.flatMapThrowing { credential in
.flatMapThrowing { credential -> AWSHTTPRequest in
// construct signer
let signer = AWSSigner(credentials: credential, name: config.signingName, region: config.region.rawValue)
// create request and sign with signer
let awsRequest = try createRequest()
return try awsRequest
.applyMiddlewares(config.middlewares + self.middlewares, config: config)
.createHTTPRequest(signer: signer, byteBufferAllocator: config.byteBufferAllocator)
}.flatMap { request in
}.flatMap { request -> EventLoopFuture<Output> in
// send request to AWS and process result
let streaming: Bool
switch request.body.payload {
case .stream:
streaming = true
default:
streaming = false
}
return self.invoke(
with: config,
eventLoop: eventLoop,
logger: logger,
request: { eventLoop in execute(request, eventLoop, logger) },
processResponse: processResponse
processResponse: processResponse,
streaming: streaming
)
}
return recordRequest(future, service: config.service, operation: operationName, logger: logger)
Expand Down Expand Up @@ -583,7 +590,8 @@ extension AWSClient {
eventLoop: EventLoop,
logger: Logger,
request: @escaping (EventLoop) -> EventLoopFuture<AWSHTTPResponse>,
processResponse: @escaping (AWSHTTPResponse) throws -> Output
processResponse: @escaping (AWSHTTPResponse) throws -> Output,
streaming: Bool
) -> EventLoopFuture<Output> {
let promise = eventLoop.makePromise(of: Output.self)

Expand All @@ -599,6 +607,14 @@ extension AWSClient {
promise.succeed(output)
}
.flatMapErrorThrowing { (error) -> Void in
// if streaming and the error returned is an AWS error fail immediately. Do not attempt
// to retry as the streaming function will not know you are retrying
if streaming,
error is AWSErrorType || error is AWSRawError
{
promise.fail(error)
return
}
// If I get a retry wait time for this error then attempt to retry request
if case .retry(let retryTime) = self.retryPolicy.getRetryWaitTime(error: error, attempt: attempt) {
logger.debug("Retrying request", metadata: [
Expand Down
7 changes: 7 additions & 0 deletions Sources/SotoCore/Message/Body.swift
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,11 @@ extension Body {
public func asByteBuffer(byteBufferAllocator: ByteBufferAllocator) -> ByteBuffer? {
return asPayload(byteBufferAllocator: byteBufferAllocator).asByteBuffer()
}

var isStreaming: Bool {
if case .raw(let payload) = self, case .stream = payload.payload {
return true
}
return false
}
}
46 changes: 46 additions & 0 deletions Tests/SotoCoreTests/AWSClientTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,52 @@ class AWSClientTests: XCTestCase {
}
}

/// verify we are not calling the Retry handler when streaming a request
func testDontRetryStreamingRequests() {
class TestRetryPolicy: RetryPolicy {
func getRetryWaitTime(error: Error, attempt: Int) -> RetryStatus? {
XCTFail("This should not be called as streaming has disabled retries")
return .dontRetry
}
}
struct Input: AWSEncodableShape & AWSShapeWithPayload {
static var _payloadPath: String = "payload"
static var _payloadOptions: AWSShapePayloadOptions = [.allowStreaming, .allowChunkedStreaming, .raw]
let payload: AWSPayload
private enum CodingKeys: CodingKey {}
}
let retryPolicy = TestRetryPolicy()
do {
let httpClient = AsyncHTTPClient.HTTPClient(eventLoopGroupProvider: .createNew)
let awsServer = AWSTestServer(serviceProtocol: .json)
let config = createServiceConfig(serviceProtocol: .json(version: "1.1"), endpoint: awsServer.address)
let client = createAWSClient(credentialProvider: .empty, retryPolicy: .init(retryPolicy: retryPolicy), httpClientProvider: .shared(httpClient))
defer {
XCTAssertNoThrow(try awsServer.stop())
XCTAssertNoThrow(try client.syncShutdown())
XCTAssertNoThrow(try httpClient.syncShutdown())
}
let payload = AWSPayload.stream { eventLoop in return eventLoop.makeSucceededFuture(.end) }
let input = Input(payload: payload)
let response: EventLoopFuture<Void> = client.execute(
operation: "test",
path: "/",
httpMethod: .POST,
serviceConfig: config,
input: input,
logger: TestEnvironment.logger
)
try awsServer.processRaw { _ in
return .error(.accessDenied, continueProcessing: false)
}

try response.wait()
} catch let error as AWSClientError where error == .accessDenied {
} catch {
XCTFail("Unexpected error: \(error)")
}
}

func testClientResponseEventLoop() {
do {
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 5)
Expand Down

0 comments on commit 95209ce

Please sign in to comment.