Skip to content

Commit

Permalink
Add streaming request errors
Browse files Browse the repository at this point in the history
Throw errors for when a streaming request either provides too much data or not enough data
  • Loading branch information
adam-fowler committed Nov 9, 2020
1 parent 3a17f27 commit f088c5b
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 17 deletions.
5 changes: 5 additions & 0 deletions Sources/SotoCore/AWSClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public final class AWSClient {
case alreadyShutdown
case invalidURL
case tooMuchData
case notEnoughData
}

let error: Error
Expand All @@ -45,6 +46,8 @@ public final class AWSClient {
public static var invalidURL: ClientError { .init(error: .invalidURL) }
/// Too much data has been supplied for the Request
public static var tooMuchData: ClientError { .init(error: .tooMuchData) }
/// Not enough data has been supplied for the Request
public static var notEnoughData: ClientError { .init(error: .notEnoughData) }
}

/// Specifies how `HTTPClient` will be created and establishes lifecycle ownership.
Expand Down Expand Up @@ -549,6 +552,8 @@ extension AWSClient.ClientError: CustomStringConvertible {
"""
case .tooMuchData:
return "You have supplied too much data for the Request."
case .notEnoughData:
return "You have not supplied enough data for the Request."
}
}
}
Expand Down
12 changes: 10 additions & 2 deletions Sources/SotoCore/HTTP/S3StreamReader.swift
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,19 @@ class S3ChunkedStreamReader: StreamReader {
self.read(eventLoop).map { (result) -> Void in
// check if a byte buffer was returned. If not then it must have been `.end`
guard case .byteBuffer(var buffer) = result else {
// if we have `.end` then return what we have in the working buffer
promise.succeed(self.workingBuffer)
if self.bytesLeftToRead == self.workingBuffer.readableBytes {
promise.succeed(self.workingBuffer)
} else {
promise.fail(AWSClient.ClientError.notEnoughData)
}
return
}
self.bytesLeftToRead -= buffer.readableBytes

guard self.bytesLeftToRead >= 0 else {
promise.fail(AWSClient.ClientError.tooMuchData)
return
}
// if working buffer is empty and this buffer is the chunk buffer size or there is no data
// left to read and this buffer is less than the size of the chunk buffer then just return
// this buffer. This allows us to avoid the buffer copy
Expand Down
8 changes: 8 additions & 0 deletions Sources/SotoCore/HTTP/StreamWriter+write.swift
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,16 @@ extension AsyncHTTPClient.HTTPClient.Body.StreamWriter {
// calculate amount left to write
let newAmountLeft: Int?
if let amountLeft = amountLeft {
guard byteBuffers.count > 0 else {
promise.fail(AWSClient.ClientError.notEnoughData)
return
}
let bytesToWrite = byteBuffers.reduce(0) { $0 + $1.readableBytes }
newAmountLeft = amountLeft - bytesToWrite
guard newAmountLeft! >= 0 else {
promise.fail(AWSClient.ClientError.tooMuchData)
return
}
} else {
newAmountLeft = nil
}
Expand Down
54 changes: 39 additions & 15 deletions Tests/SotoCoreTests/AWSClientTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -232,14 +232,16 @@ class AWSClientTests: XCTestCase {
let payload = AWSPayload.stream(size: bufferSize) { eventLoop in
let size = min(blockSize, byteBuffer.readableBytes)
// don't ask for 0 bytes
XCTAssertNotEqual(size, 0)
if size == 0 {
return eventLoop.makeSucceededFuture(.end)
}
let buffer = byteBuffer.readSlice(length: size)!
return eventLoop.makeSucceededFuture(.byteBuffer(buffer))
}
let input = Input(payload: payload)
let response = client.execute(operation: "test", path: "/", httpMethod: .POST, serviceConfig: config, input: input, logger: TestEnvironment.logger)

try server.processRaw { request in
try? server.processRaw { request in
let bytes = request.body.getBytes(at: 0, length: request.body.readableBytes)
XCTAssertEqual(bytes, data)
let response = AWSTestServer.Response(httpStatus: .ok, headers: [:], body: nil)
Expand Down Expand Up @@ -287,7 +289,7 @@ class AWSClientTests: XCTestCase {
XCTAssertNoThrow(try self.testRequestStreaming(config: config, client: client, server: awsServer, bufferSize: 65552, blockSize: 65552))
}

func testRequestStreamingTooMuchData() {
func testRequestStreamingWithPayload(_ payload: AWSPayload) throws {
struct Input: AWSEncodableShape & AWSShapeWithPayload {
static var _payloadPath: String = "payload"
static var _payloadOptions: AWSShapePayloadOptions = [.allowStreaming]
Expand All @@ -305,19 +307,41 @@ class AWSClientTests: XCTestCase {
XCTAssertNoThrow(try client.syncShutdown())
XCTAssertNoThrow(try httpClient.syncShutdown())
}
do {
// set up stream of 8 bytes but supply more than that
let payload = AWSPayload.stream(size: 8) { eventLoop in
var buffer = ByteBufferAllocator().buffer(capacity: 0)
buffer.writeString("String longer than 8 bytes")
return eventLoop.makeSucceededFuture(.byteBuffer(buffer))
let input = Input(payload: payload)
let response = client.execute(operation: "test", path: "/", httpMethod: .POST, serviceConfig: config, input: input, logger: TestEnvironment.logger)
try response.wait()
}

func testRequestStreamingTooMuchData() {
// set up stream of 8 bytes but supply more than that
let payload = AWSPayload.stream(size: 8) { eventLoop in
var buffer = ByteBufferAllocator().buffer(capacity: 0)
buffer.writeString("String longer than 8 bytes")
return eventLoop.makeSucceededFuture(.byteBuffer(buffer))
}
XCTAssertThrowsError(try testRequestStreamingWithPayload(payload)) { error in
guard let error = error as? AWSClient.ClientError, error == .tooMuchData else {
XCTFail()
return
}
}
}

func testRequestStreamingNotEnoughData() {
var byteBuffer = ByteBufferAllocator().buffer(staticString: "Buffer")
let payload = AWSPayload.stream(size: byteBuffer.readableBytes+1) { eventLoop in
let size = byteBuffer.readableBytes
if size == 0 {
return eventLoop.makeSucceededFuture(.end)
}
let buffer = byteBuffer.readSlice(length: size)!
return eventLoop.makeSucceededFuture(.byteBuffer(buffer))
}
XCTAssertThrowsError(try testRequestStreamingWithPayload(payload)) { error in
guard let error = error as? AWSClient.ClientError, error == .notEnoughData else {
XCTFail()
return
}
let input = Input(payload: payload)
let response = client.execute(operation: "test", path: "/", httpMethod: .POST, serviceConfig: config, input: input, logger: TestEnvironment.logger)
try response.wait()
} catch let error as HTTPClientError where error == .bodyLengthMismatch {
} catch {
XCTFail("Unexpected error: \(error)")
}
}

Expand Down

0 comments on commit f088c5b

Please sign in to comment.