Skip to content

Commit

Permalink
Add Array.reduce style paginate function. (#385)
Browse files Browse the repository at this point in the history
* Add Array.reduce style paginate function.

Re-implement original functions using new `reduce` style functions

* swift format
  • Loading branch information
adam-fowler authored Oct 13, 2020
1 parent 8a8ff16 commit ecc3479
Show file tree
Hide file tree
Showing 2 changed files with 184 additions and 34 deletions.
162 changes: 129 additions & 33 deletions Sources/SotoCore/AWSClient+Paginate.swift
Original file line number Diff line number Diff line change
Expand Up @@ -25,43 +25,49 @@ public protocol AWSPaginateToken: AWSShape {
extension AWSClient {
/// If an AWS command is returning an arbituary sized array sometimes it adds support for paginating this array
/// ie it will return the array in blocks of a defined size, each block also includes a token which can be used to access
/// the next block. This function loads each block and calls a closure with each block as parameter.
/// the next block. This function loads each block and calls a closure with each block as parameter. This function returns
/// the result of combining all of these blocks using the given closure,
///
/// - Parameters:
/// - input: Input for request
/// - initialValue: The value to use as the initial accumulating value. `initialValue` is passed to `onPage` the first time it is called.
/// - command: Command to be paginated
/// - tokenKey: The name of token in the response object to continue pagination
/// - onPage: closure called with each block of entries
public func paginate<Input: AWSPaginateToken, Output: AWSShape>(
/// - eventLoop: EventLoop to run this process on
/// - logger: Logger used flot logging
/// - onPage: closure called with each block of entries. It combines an accumulating result with the contents of response from the call to AWS. This combined result is then returned
/// along with a boolean indicating if the paginate operation should continue.
public func paginate<Input: AWSPaginateToken, Output: AWSShape, Result>(
input: Input,
initialValue: Result,
command: @escaping (Input, EventLoop?, Logger) -> EventLoopFuture<Output>,
tokenKey: KeyPath<Output, Input.Token?>,
on eventLoop: EventLoop? = nil,
logger: Logger = AWSClient.loggingDisabled,
onPage: @escaping (Output, EventLoop) -> EventLoopFuture<Bool>
) -> EventLoopFuture<Void> {
onPage: @escaping (Result, Output, EventLoop) -> EventLoopFuture<(Bool, Result)>
) -> EventLoopFuture<Result> {
let eventLoop = eventLoop ?? eventLoopGroup.next()
let promise = eventLoop.makePromise(of: Void.self)
let promise = eventLoop.makePromise(of: Result.self)

func paginatePart(input: Input) {
func paginatePart(input: Input, currentValue: Result) {
let responseFuture = command(input, eventLoop, logger)
.flatMap { response in
return onPage(response, eventLoop)
.map { (rt) -> Void in
guard rt == true else { return promise.succeed(()) }
return onPage(currentValue, response, eventLoop)
.map { continuePaginate, result -> Void in
guard continuePaginate == true else { return promise.succeed(result) }
// get next block token and construct a new input with this token
guard let token = response[keyPath: tokenKey] else { return promise.succeed(()) }
guard let token = response[keyPath: tokenKey] else { return promise.succeed(result) }

let input = input.usingPaginationToken(token)
paginatePart(input: input)
paginatePart(input: input, currentValue: result)
}
}
responseFuture.whenFailure { error in
promise.fail(error)
}
}

paginatePart(input: input)
paginatePart(input: input, currentValue: initialValue)

return promise.futureResult
}
Expand All @@ -74,39 +80,70 @@ extension AWSClient {
/// - input: Input for request
/// - command: Command to be paginated
/// - tokenKey: The name of token in the response object to continue pagination
/// - onPage: closure called with each block of entries
/// - eventLoop: EventLoop to run this process on
/// - logger: Logger used flot logging
/// - onPage: closure called with each block of entries. Returns boolean indicating whether we should continue.
public func paginate<Input: AWSPaginateToken, Output: AWSShape>(
input: Input,
command: @escaping (Input, EventLoop?, Logger) -> EventLoopFuture<Output>,
tokenKey: KeyPath<Output, Input.Token?>,
moreResultsKey: KeyPath<Output, Bool>,
on eventLoop: EventLoop? = nil,
logger: Logger = AWSClient.loggingDisabled,
onPage: @escaping (Output, EventLoop) -> EventLoopFuture<Bool>
) -> EventLoopFuture<Void> {
self.paginate(input: input, initialValue: (), command: command, tokenKey: tokenKey, on: eventLoop, logger: logger) { _, output, eventLoop in
return onPage(output, eventLoop).map { rt in (rt, ()) }
}
}

/// If an AWS command is returning an arbituary sized array sometimes it adds support for paginating this array
/// ie it will return the array in blocks of a defined size, each block also includes a token which can be used to access
/// the next block. This function loads each block and calls a closure with each block as parameter. This function returns
/// the result of combining all of these blocks using the given closure,
///
/// - Parameters:
/// - input: Input for request
/// - initialValue: The value to use as the initial accumulating value. `initialValue` is passed to `onPage` the first time it is called.
/// - command: Command to be paginated
/// - tokenKey: The name of token in the response object to continue pagination
/// - moreResultsKey: The KeyPath for the member of the output that indicates whether we should ask for more data
/// - eventLoop: EventLoop to run this process on
/// - logger: Logger used flot logging
/// - onPage: closure called with each block of entries. It combines an accumulating result with the contents of response from the call to AWS. This combined result is then returned
/// along with a boolean indicating if the paginate operation should continue.
public func paginate<Input: AWSPaginateToken, Output: AWSShape, Result>(
input: Input,
initialValue: Result,
command: @escaping (Input, EventLoop?, Logger) -> EventLoopFuture<Output>,
tokenKey: KeyPath<Output, Input.Token?>,
moreResultsKey: KeyPath<Output, Bool>,
on eventLoop: EventLoop? = nil,
logger: Logger = AWSClient.loggingDisabled,
onPage: @escaping (Result, Output, EventLoop) -> EventLoopFuture<(Bool, Result)>
) -> EventLoopFuture<Result> {
let eventLoop = eventLoop ?? eventLoopGroup.next()
let promise = eventLoop.makePromise(of: Void.self)
let promise = eventLoop.makePromise(of: Result.self)

func paginatePart(input: Input) {
func paginatePart(input: Input, currentValue: Result) {
let responseFuture = command(input, eventLoop, logger)
.flatMap { response in
return onPage(response, eventLoop)
.map { (rt) -> Void in
guard rt == true else { return promise.succeed(()) }
return onPage(currentValue, response, eventLoop)
.map { continuePaginate, result -> Void in
guard continuePaginate == true else { return promise.succeed(result) }
// get next block token and construct a new input with this token
guard let token = response[keyPath: tokenKey],
response[keyPath: moreResultsKey] else { return promise.succeed(()) }
response[keyPath: moreResultsKey] else { return promise.succeed(result) }

let input = input.usingPaginationToken(token)
paginatePart(input: input)
paginatePart(input: input, currentValue: result)
}
}
responseFuture.whenFailure { error in
promise.fail(error)
}
}

paginatePart(input: input)
paginatePart(input: input, currentValue: initialValue)

return promise.futureResult
}
Expand All @@ -119,40 +156,99 @@ extension AWSClient {
/// - input: Input for request
/// - command: Command to be paginated
/// - tokenKey: The name of token in the response object to continue pagination
/// - onPage: closure called with each block of entries
/// - moreResultsKey: The KeyPath for the member of the output that indicates whether we should ask for more data
/// - eventLoop: EventLoop to run this process on
/// - logger: Logger used flot logging
/// - onPage: closure called with each block of entries. Returns boolean indicating whether we should continue.
public func paginate<Input: AWSPaginateToken, Output: AWSShape>(
input: Input,
command: @escaping (Input, EventLoop?, Logger) -> EventLoopFuture<Output>,
tokenKey: KeyPath<Output, Input.Token?>,
moreResultsKey: KeyPath<Output, Bool?>,
moreResultsKey: KeyPath<Output, Bool>,
on eventLoop: EventLoop? = nil,
logger: Logger = AWSClient.loggingDisabled,
onPage: @escaping (Output, EventLoop) -> EventLoopFuture<Bool>
) -> EventLoopFuture<Void> {
self.paginate(input: input, initialValue: (), command: command, tokenKey: tokenKey, moreResultsKey: moreResultsKey, on: eventLoop, logger: logger) { _, output, eventLoop in
return onPage(output, eventLoop).map { rt in (rt, ()) }
}
}

/// If an AWS command is returning an arbituary sized array sometimes it adds support for paginating this array
/// ie it will return the array in blocks of a defined size, each block also includes a token which can be used to access
/// the next block. This function loads each block and calls a closure with each block as parameter. This function returns
/// the result of combining all of these blocks using the given closure,
///
/// - Parameters:
/// - input: Input for request
/// - initialValue: The value to use as the initial accumulating value. `initialValue` is passed to `onPage` the first time it is called.
/// - command: Command to be paginated
/// - tokenKey: The name of token in the response object to continue pagination
/// - moreResultsKey: The KeyPath for the member of the output that indicates whether we should ask for more data
/// - eventLoop: EventLoop to run this process on
/// - logger: Logger used flot logging
/// - onPage: closure called with each block of entries. It combines an accumulating result with the contents of response from the call to AWS. This combined result is then returned
/// along with a boolean indicating if the paginate operation should continue.
public func paginate<Input: AWSPaginateToken, Output: AWSShape, Result>(
input: Input,
initialValue: Result,
command: @escaping (Input, EventLoop?, Logger) -> EventLoopFuture<Output>,
tokenKey: KeyPath<Output, Input.Token?>,
moreResultsKey: KeyPath<Output, Bool?>,
on eventLoop: EventLoop? = nil,
logger: Logger = AWSClient.loggingDisabled,
onPage: @escaping (Result, Output, EventLoop) -> EventLoopFuture<(Bool, Result)>
) -> EventLoopFuture<Result> {
let eventLoop = eventLoop ?? eventLoopGroup.next()
let promise = eventLoop.makePromise(of: Void.self)
let promise = eventLoop.makePromise(of: Result.self)

func paginatePart(input: Input) {
func paginatePart(input: Input, currentValue: Result) {
let responseFuture = command(input, eventLoop, logger)
.flatMap { response in
return onPage(response, eventLoop)
.map { (rt) -> Void in
guard rt == true else { return promise.succeed(()) }
return onPage(currentValue, response, eventLoop)
.map { continuePaginate, result -> Void in
guard continuePaginate == true else { return promise.succeed(result) }
// get next block token and construct a new input with this token
guard let token = response[keyPath: tokenKey],
response[keyPath: moreResultsKey] == true else { return promise.succeed(()) }
response[keyPath: moreResultsKey] == true else { return promise.succeed(result) }

let input = input.usingPaginationToken(token)
paginatePart(input: input)
paginatePart(input: input, currentValue: result)
}
}
responseFuture.whenFailure { error in
promise.fail(error)
}
}

paginatePart(input: input)
paginatePart(input: input, currentValue: initialValue)

return promise.futureResult
}

/// If an AWS command is returning an arbituary sized array sometimes it adds support for paginating this array
/// ie it will return the array in blocks of a defined size, each block also includes a token which can be used to access
/// the next block. This function loads each block and calls a closure with each block as parameter.
///
/// - Parameters:
/// - input: Input for request
/// - command: Command to be paginated
/// - tokenKey: The name of token in the response object to continue pagination
/// - moreResultsKey: The KeyPath for the member of the output that indicates whether we should ask for more data
/// - eventLoop: EventLoop to run this process on
/// - logger: Logger used flot logging
/// - onPage: closure called with each block of entries. Returns boolean indicating whether we should continue.
public func paginate<Input: AWSPaginateToken, Output: AWSShape>(
input: Input,
command: @escaping (Input, EventLoop?, Logger) -> EventLoopFuture<Output>,
tokenKey: KeyPath<Output, Input.Token?>,
moreResultsKey: KeyPath<Output, Bool?>,
on eventLoop: EventLoop? = nil,
logger: Logger = AWSClient.loggingDisabled,
onPage: @escaping (Output, EventLoop) -> EventLoopFuture<Bool>
) -> EventLoopFuture<Void> {
self.paginate(input: input, initialValue: (), command: command, tokenKey: tokenKey, moreResultsKey: moreResultsKey, on: eventLoop, logger: logger) { _, output, eventLoop in
return onPage(output, eventLoop).map { rt in (rt, ()) }
}
}
}
56 changes: 55 additions & 1 deletion Tests/SotoCoreTests/PaginateTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,13 @@ class PaginateTests: XCTestCase {
let moreResults: Bool?
}

// conform to Encodable so server can encode these
struct StringList2Output: AWSDecodableShape, Encodable {
let array: [String]
let outputToken: String?
let moreResults: Bool
}

func stringList(_ input: StringListInput, on eventLoop: EventLoop? = nil, logger: Logger) -> EventLoopFuture<StringListOutput> {
return self.client.execute(
operation: "TestOperation",
Expand All @@ -169,6 +176,31 @@ class PaginateTests: XCTestCase {
)
}

func stringList2(_ input: StringListInput, on eventLoop: EventLoop? = nil, logger: Logger) -> EventLoopFuture<StringList2Output> {
return self.client.execute(
operation: "TestOperation",
path: "/",
httpMethod: .POST,
serviceConfig: self.config,
input: input,
on: eventLoop,
logger: logger
)
}

func stringListPaginator<Result>(_ input: StringListInput, _ initialValue: Result, on eventLoop: EventLoop? = nil, onPage: @escaping (Result, StringList2Output, EventLoop) -> EventLoopFuture<(Bool, Result)>) -> EventLoopFuture<Result> {
return self.client.paginate(
input: input,
initialValue: initialValue,
command: self.stringList2,
tokenKey: \StringList2Output.outputToken,
moreResultsKey: \StringList2Output.moreResults,
on: eventLoop,
logger: TestEnvironment.logger,
onPage: onPage
)
}

// create list of unique strings
let stringList = Set("Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.".split(separator: " ").map { String($0) }).map { $0 }

Expand All @@ -185,7 +217,7 @@ class PaginateTests: XCTestCase {
array.append(self.stringList[i])
}
var outputToken: String?
var moreResults: Bool?
var moreResults: Bool = false
var continueProcessing = false
if endIndex < self.stringList.count {
outputToken = self.stringList[endIndex]
Expand Down Expand Up @@ -219,6 +251,28 @@ class PaginateTests: XCTestCase {
}
}

func testStringTokenReducePaginate() throws {
// paginate input
let input = StringListInput(inputToken: nil, pageSize: 5)
let future = self.stringListPaginator(input, []) { current, result, eventloop in
// collate results into array
return eventloop.makeSucceededFuture((true, current + result.array))
}

// aws server process
XCTAssertNoThrow(try self.awsServer.process(self.stringListServerProcess))

// wait for response
var array: [String]?
XCTAssertNoThrow(array = try future.wait())
let finalArray = try XCTUnwrap(array)
// verify contents of array
XCTAssertEqual(finalArray.count, self.stringList.count)
for i in 0..<finalArray.count {
XCTAssertEqual(finalArray[i], self.stringList[i])
}
}

struct ErrorOutput: AWSShape {
let error: String
}
Expand Down

0 comments on commit ecc3479

Please sign in to comment.