Skip to content

Commit

Permalink
fix: apply returns after full completion of the inner tasks
Browse files Browse the repository at this point in the history
Signed-off-by: Fabrizio Demaria <[email protected]>
  • Loading branch information
fabriziodemaria committed Jan 12, 2024
1 parent 81bbf68 commit 64026c7
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 79 deletions.
74 changes: 41 additions & 33 deletions Sources/ConfidenceProvider/Apply/FlagApplierWithRetries.swift
Original file line number Diff line number Diff line change
Expand Up @@ -37,68 +37,66 @@ final class FlagApplierWithRetries: FlagApplier {

public func apply(flagName: String, resolveToken: String) async {
let applyTime = Date.backport.now
let (data, added) = await cacheDataInteractor.add(
async let (data, added) = await cacheDataInteractor.add(
resolveToken: resolveToken,
flagName: flagName,
applyTime: applyTime
)
guard added == true else {
guard await added == true else {
// If record is found in the cache, early return (de-duplication).
// Triggerring batch apply in case if there are any unsent events stored
await triggerBatch()
return
}

self.writeToFile(data: data)
await self.writeToFile(data: data)
await triggerBatch()
}

// MARK: private

private func triggerBatch() async {
async let cacheData = await cacheDataInteractor.cache
await cacheData.resolveEvents.forEach { resolveEvent in
await cacheData.resolveEvents.asyncForEach { resolveEvent in
let appliesToSend = resolveEvent.events.filter { $0.status == .created }
.chunk(size: 20)

guard appliesToSend.isEmpty == false else {
return
}

appliesToSend.forEach { chunk in
self.writeStatus(resolveToken: resolveEvent.resolveToken, events: chunk, status: .sending)
executeApply(
await appliesToSend.asyncForEach { chunk in
await self.writeStatus(resolveToken: resolveEvent.resolveToken, events: chunk, status: .sending)
await executeApply(
resolveToken: resolveEvent.resolveToken,
items: chunk
) { success in
guard success else {
self.writeStatus(resolveToken: resolveEvent.resolveToken, events: chunk, status: .created)
await self.writeStatus(resolveToken: resolveEvent.resolveToken, events: chunk, status: .created)
return
}
// Set 'sent' property of apply events to true
self.writeStatus(resolveToken: resolveEvent.resolveToken, events: chunk, status: .sent)
await self.writeStatus(resolveToken: resolveEvent.resolveToken, events: chunk, status: .sent)
}
}
}
}

private func writeStatus(resolveToken: String, events: [FlagApply], status: ApplyEventStatus) {
private func writeStatus(resolveToken: String, events: [FlagApply], status: ApplyEventStatus) async {
let lastIndex = events.count - 1
events.enumerated().forEach { index, event in
Task(priority: .medium) {
var data = await self.cacheDataInteractor.setEventStatus(
resolveToken: resolveToken,
name: event.name,
status: status
)

if index == lastIndex {
let unsentFlagApplies = data.resolveEvents.filter {
$0.isSent == false
}
data.resolveEvents = unsentFlagApplies
try? self.storage.save(data: data)
await events.enumerated().asyncForEach { index, event in
var data = await self.cacheDataInteractor.setEventStatus(
resolveToken: resolveToken,
name: event.name,
status: status
)

if index == lastIndex {
let unsentFlagApplies = data.resolveEvents.filter {
$0.isSent == false
}
data.resolveEvents = unsentFlagApplies
try? self.storage.save(data: data)
}
}
}
Expand All @@ -110,8 +108,8 @@ final class FlagApplierWithRetries: FlagApplier {
private func executeApply(
resolveToken: String,
items: [FlagApply],
completion: @escaping (Bool) -> Void
) {
completion: @escaping (Bool) async -> Void
) async {
let applyFlagRequestItems = items.map { applyEvent in
AppliedFlagRequestItem(
flag: applyEvent.name,
Expand All @@ -126,25 +124,25 @@ final class FlagApplierWithRetries: FlagApplier {
sdk: Sdk(id: metadata.name, version: metadata.version)
)

performRequest(request: request) { result in
await performRequest(request: request) { result in
switch result {
case .success:
completion(true)
await completion(true)
case .failure(let error):
self.logApplyError(error: error)
completion(false)
await completion(false)
}
}
}

private func performRequest(
request: ApplyFlagsRequest,
completion: @escaping (ApplyFlagResult) -> Void
) {
completion: @escaping (ApplyFlagResult) async -> Void
) async {
do {
try httpClient.post(path: ":apply", data: request, completion: completion)
try await httpClient.post(path: ":apply", data: request, completion: completion)
} catch {
completion(.failure(handleError(error: error)))
await completion(.failure(handleError(error: error)))
}
}

Expand All @@ -168,3 +166,13 @@ final class FlagApplierWithRetries: FlagApplier {
}
}
}

extension Sequence {
func asyncForEach(
_ transform: (Element) async throws -> Void
) async rethrows {
for element in self {
try await transform(element)
}
}
}
4 changes: 2 additions & 2 deletions Sources/ConfidenceProvider/Http/HttpClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ protocol HttpClient {
func post<T: Decodable>(
path: String,
data: Codable,
completion: @escaping (HttpClientResult<T>) -> Void
) throws
completion: @escaping (HttpClientResult<T>) async -> Void
) async throws

func post<T: Decodable>(path: String, data: Codable) async throws -> HttpClientResponse<T>
}
Expand Down
74 changes: 34 additions & 40 deletions Sources/ConfidenceProvider/Http/NetworkClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ final class NetworkClient: HttpClient {
retry: Retry = .none
) {
self.session =
session
?? {
let configuration = URLSessionConfiguration.default
configuration.timeoutIntervalForRequest = timeout
configuration.httpAdditionalHeaders = defaultHeaders
session
?? {
let configuration = URLSessionConfiguration.default
configuration.timeoutIntervalForRequest = timeout
configuration.httpAdditionalHeaders = defaultHeaders

return URLSession(configuration: configuration)
}()
return URLSession(configuration: configuration)
}()

self.headers = defaultHeaders
self.retry = retry
Expand All @@ -44,67 +44,61 @@ final class NetworkClient: HttpClient {
func post<T: Decodable>(
path: String,
data: Codable,
completion: @escaping (HttpClientResult<T>) -> Void
) throws {
completion: @escaping (HttpClientResult<T>) async -> Void
) async throws {
let request = try buildRequest(path: path, data: data)
perform(request: request, retry: self.retry) { response, data, error in
await perform(request: request, retry: self.retry) { response, data, error in
if let error {
completion(.failure(error))
await completion(.failure(error))
return
}

guard let response, let data else {
completion(.failure(ConfidenceError.internalError(message: "Bad response")))
await completion(.failure(ConfidenceError.internalError(message: "Bad response")))
return
}

do {
let httpClientResult: HttpClientResponse<T> =
try self.buildResponse(response: response, data: data)
completion(.success(httpClientResult))
try self.buildResponse(response: response, data: data)
await completion(.success(httpClientResult))
} catch {
completion(.failure(error))
await completion(.failure(error))
}
}
}

private func perform(
request: URLRequest,
retry: Retry,
completion: @escaping (HTTPURLResponse?, Data?, Error?) -> Void
) {
completion: @escaping (HTTPURLResponse?, Data?, Error?) async -> Void
) async {
let retryHandler = retry.handler()
let retryWait: TimeInterval? = retryHandler.retryIn()

self.session.dataTask(with: request) { data, response, error in
if let error {
if self.shouldRetry(error: error), let retryWait {
DispatchQueue.main.asyncAfter(deadline: .now() + retryWait) {
self.perform(request: request, retry: retry, completion: completion)
}
return
} else {
completion(nil, nil, error)
}
}

guard let httpResponse = response as? HTTPURLResponse else {
completion(nil, nil, HttpClientError.invalidResponse)
do {
async let (data, response) = await self.session.data(for: request)
guard let httpResponse = try? await response as? HTTPURLResponse else {
await completion(nil, nil, HttpClientError.invalidResponse)
return
}

if self.shouldRetry(httpResponse: httpResponse), let retryWait {
DispatchQueue.main.asyncAfter(deadline: .now() + retryWait) {
self.perform(request: request, retry: retry, completion: completion)
}
try? await Task.sleep(nanoseconds: UInt64(retryWait * 1_000_000_000))
await self.perform(request: request, retry: retry, completion: completion)
return
}

if let data {
completion(httpResponse, data, nil)
} else {
guard let data = try? await data else {
let error = ConfidenceError.internalError(message: "Unable to complete request")
completion(httpResponse, nil, error)
await completion(httpResponse, nil, error)
return
}
await completion(httpResponse, data, nil)
} catch {
if self.shouldRetry(error: error), let retryWait {
try? await Task.sleep(nanoseconds: UInt64(retryWait * 1_000_000_000))
await self.perform(request: request, retry: retry, completion: completion)
} else {
await completion(nil, nil, error)
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions Tests/ConfidenceProviderTests/Helpers/HttpClientMock.swift
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ final class HttpClientMock: HttpClient {
func post<T>(
path: String,
data: Codable,
completion: @escaping (ConfidenceProvider.HttpClientResult<T>) -> Void
) throws where T: Decodable {
completion: @escaping (ConfidenceProvider.HttpClientResult<T>) async -> Void
) async throws where T: Decodable {
do {
let result: HttpClientResponse<T> = try handlePost(path: path, data: data)
completion(.success(result))
await completion(.success(result))
} catch {
completion(.failure(error))
await completion(.failure(error))
}
}

Expand Down

0 comments on commit 64026c7

Please sign in to comment.