Skip to content

Commit

Permalink
fix: apply returns after full completion of the inner tasks
Browse files Browse the repository at this point in the history
Signed-off-by: Fabrizio Demaria <[email protected]>
  • Loading branch information
fabriziodemaria committed Jan 12, 2024
1 parent 81bbf68 commit 62563e9
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 57 deletions.
74 changes: 41 additions & 33 deletions Sources/ConfidenceProvider/Apply/FlagApplierWithRetries.swift
Original file line number Diff line number Diff line change
Expand Up @@ -37,68 +37,66 @@ final class FlagApplierWithRetries: FlagApplier {

public func apply(flagName: String, resolveToken: String) async {
let applyTime = Date.backport.now
let (data, added) = await cacheDataInteractor.add(
async let (data, added) = await cacheDataInteractor.add(
resolveToken: resolveToken,
flagName: flagName,
applyTime: applyTime
)
guard added == true else {
guard await added == true else {
// If record is found in the cache, early return (de-duplication).
// Triggerring batch apply in case if there are any unsent events stored
await triggerBatch()
return
}

self.writeToFile(data: data)
await self.writeToFile(data: data)
await triggerBatch()
}

// MARK: private

private func triggerBatch() async {
async let cacheData = await cacheDataInteractor.cache
await cacheData.resolveEvents.forEach { resolveEvent in
await cacheData.resolveEvents.asyncForEach { resolveEvent in
let appliesToSend = resolveEvent.events.filter { $0.status == .created }
.chunk(size: 20)

guard appliesToSend.isEmpty == false else {
return
}

appliesToSend.forEach { chunk in
self.writeStatus(resolveToken: resolveEvent.resolveToken, events: chunk, status: .sending)
executeApply(
await appliesToSend.asyncForEach { chunk in
await self.writeStatus(resolveToken: resolveEvent.resolveToken, events: chunk, status: .sending)
await executeApply(
resolveToken: resolveEvent.resolveToken,
items: chunk
) { success in
guard success else {
self.writeStatus(resolveToken: resolveEvent.resolveToken, events: chunk, status: .created)
await self.writeStatus(resolveToken: resolveEvent.resolveToken, events: chunk, status: .created)
return
}
// Set 'sent' property of apply events to true
self.writeStatus(resolveToken: resolveEvent.resolveToken, events: chunk, status: .sent)
await self.writeStatus(resolveToken: resolveEvent.resolveToken, events: chunk, status: .sent)
}
}
}
}

private func writeStatus(resolveToken: String, events: [FlagApply], status: ApplyEventStatus) {
private func writeStatus(resolveToken: String, events: [FlagApply], status: ApplyEventStatus) async {
let lastIndex = events.count - 1
events.enumerated().forEach { index, event in
Task(priority: .medium) {
var data = await self.cacheDataInteractor.setEventStatus(
resolveToken: resolveToken,
name: event.name,
status: status
)

if index == lastIndex {
let unsentFlagApplies = data.resolveEvents.filter {
$0.isSent == false
}
data.resolveEvents = unsentFlagApplies
try? self.storage.save(data: data)
await events.enumerated().asyncForEach { index, event in
var data = await self.cacheDataInteractor.setEventStatus(
resolveToken: resolveToken,
name: event.name,
status: status
)

if index == lastIndex {
let unsentFlagApplies = data.resolveEvents.filter {
$0.isSent == false
}
data.resolveEvents = unsentFlagApplies
try? self.storage.save(data: data)
}
}
}
Expand All @@ -110,8 +108,8 @@ final class FlagApplierWithRetries: FlagApplier {
private func executeApply(
resolveToken: String,
items: [FlagApply],
completion: @escaping (Bool) -> Void
) {
completion: @escaping (Bool) async -> Void
) async {
let applyFlagRequestItems = items.map { applyEvent in
AppliedFlagRequestItem(
flag: applyEvent.name,
Expand All @@ -126,25 +124,25 @@ final class FlagApplierWithRetries: FlagApplier {
sdk: Sdk(id: metadata.name, version: metadata.version)
)

performRequest(request: request) { result in
await performRequest(request: request) { result in
switch result {
case .success:
completion(true)
await completion(true)
case .failure(let error):
self.logApplyError(error: error)
completion(false)
await completion(false)
}
}
}

private func performRequest(
request: ApplyFlagsRequest,
completion: @escaping (ApplyFlagResult) -> Void
) {
completion: @escaping (ApplyFlagResult) async -> Void
) async {
do {
try httpClient.post(path: ":apply", data: request, completion: completion)
try await httpClient.post(path: ":apply", data: request, completion: completion)
} catch {
completion(.failure(handleError(error: error)))
await completion(.failure(handleError(error: error)))
}
}

Expand All @@ -168,3 +166,13 @@ final class FlagApplierWithRetries: FlagApplier {
}
}
}

extension Sequence {
func asyncForEach(
_ transform: (Element) async throws -> Void
) async rethrows {
for element in self {
try await transform(element)
}
}
}
4 changes: 2 additions & 2 deletions Sources/ConfidenceProvider/Http/HttpClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ protocol HttpClient {
func post<T: Decodable>(
path: String,
data: Codable,
completion: @escaping (HttpClientResult<T>) -> Void
) throws
completion: @escaping (HttpClientResult<T>) async -> Void
) async throws

func post<T: Decodable>(path: String, data: Codable) async throws -> HttpClientResponse<T>
}
Expand Down
32 changes: 14 additions & 18 deletions Sources/ConfidenceProvider/Http/NetworkClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -44,27 +44,23 @@ final class NetworkClient: HttpClient {
func post<T: Decodable>(
path: String,
data: Codable,
completion: @escaping (HttpClientResult<T>) -> Void
) throws {
completion: @escaping (HttpClientResult<T>) async -> Void
) async throws {
let request = try buildRequest(path: path, data: data)
perform(request: request, retry: self.retry) { response, data, error in
if let error {
completion(.failure(error))
return
}

guard let response, let data else {
completion(.failure(ConfidenceError.internalError(message: "Bad response")))
return
}

do {
let httpClientResult: HttpClientResponse<T> =
try self.buildResponse(response: response, data: data)
completion(.success(httpClientResult))
} catch {
completion(.failure(error))
}
let (data, response) = try await URLSession.shared.data(for: request)
guard let response = response as? HTTPURLResponse, response.statusCode == 202 else {
await completion(.failure(HttpClientError.internalError))
return
}

do {
let httpClientResult: HttpClientResponse<T> =
try self.buildResponse(response: response, data: data)
await completion(.success(httpClientResult))
} catch {
await completion(.failure(error))
}
}

Expand Down
8 changes: 4 additions & 4 deletions Tests/ConfidenceProviderTests/Helpers/HttpClientMock.swift
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ final class HttpClientMock: HttpClient {
func post<T>(
path: String,
data: Codable,
completion: @escaping (ConfidenceProvider.HttpClientResult<T>) -> Void
) throws where T: Decodable {
completion: @escaping (ConfidenceProvider.HttpClientResult<T>) async -> Void
) async throws where T: Decodable {
do {
let result: HttpClientResponse<T> = try handlePost(path: path, data: data)
completion(.success(result))
await completion(.success(result))
} catch {
completion(.failure(error))
await completion(.failure(error))
}
}

Expand Down

0 comments on commit 62563e9

Please sign in to comment.