From 0acf84d21e260cf4b0f24322ab612a8c644a3e5d Mon Sep 17 00:00:00 2001 From: vahid torkaman Date: Wed, 8 May 2024 15:15:02 +0200 Subject: [PATCH] use semaphore to make upload serialz --- Sources/Confidence/EventSenderEngine.swift | 59 +++++++++++-------- Sources/Confidence/EventStorage.swift | 18 ------ Tests/ConfidenceTests/EventUploaderMock.swift | 4 -- 3 files changed, 35 insertions(+), 46 deletions(-) diff --git a/Sources/Confidence/EventSenderEngine.swift b/Sources/Confidence/EventSenderEngine.swift index dd4cb245..46bfec33 100644 --- a/Sources/Confidence/EventSenderEngine.swift +++ b/Sources/Confidence/EventSenderEngine.swift @@ -22,7 +22,7 @@ final class EventSenderEngineImpl: EventSenderEngine { private let uploader: ConfidenceClient private let clientSecret: String private let payloadMerger: PayloadMerger = PayloadMergerImpl() - private let queue = DispatchQueue(label: "com.confidence.engine") + private let semaphore = DispatchSemaphore(value: 1) init( clientSecret: String, @@ -53,38 +53,49 @@ final class EventSenderEngineImpl: EventSenderEngine { .store(in: &cancellables) uploadReqChannel.sink { [weak self] _ in + guard let self = self else { return } + await self.upload() + } + .store(in: &cancellables) + } + + func upload() async { + await withSemaphore { [weak self] in + guard let self = self else { return } do { - guard let self = self else { return } - var batches: [[ConfidenceEvent]] = [] - try queue.sync { - try self.storage.startNewBatch() - let ids = try storage.batchReadyIds() - for id in ids { - let events: [ConfidenceEvent] = try self.storage.eventsFrom(id: id) - batches.append(events) - do { - try storage.remove(id: id) - } catch { + try self.storage.startNewBatch() + let ids = try storage.batchReadyIds() + if ids.isEmpty { + return + } + for id in ids { + let events: [NetworkEvent] = try self.storage.eventsFrom(id: id) + .compactMap { event in + return NetworkEvent( + eventDefinition: event.name, + payload: NetworkStruct(fields: TypeMapper.convert(structure: event.payload).fields), + eventTime: Date.backport.toISOString(date: event.eventTime)) } + var shouldCleanup = false + if events.isEmpty { + shouldCleanup = true + } else { + shouldCleanup = try await self.uploader.upload(events: events) } - } - for events in batches { - let batch = events.compactMap { event in - return NetworkEvent( - eventDefinition: event.name, - payload: NetworkStruct(fields: TypeMapper.convert(structure: event.payload).fields), - eventTime: Date.backport.toISOString(date: event.eventTime)) - } - let shouldCleanup = try await self.uploader.upload(events: batch) - if !shouldCleanup { - try storage.writeEvents(events: events) + if shouldCleanup { + try storage.remove(id: id) } } } catch { } } - .store(in: &cancellables) + } + + func withSemaphore(callback: @escaping () async -> Void) async { + semaphore.wait() + await callback() + semaphore.signal() } func emit(eventName: String, message: ConfidenceStruct, context: ConfidenceStruct) { diff --git a/Sources/Confidence/EventStorage.swift b/Sources/Confidence/EventStorage.swift index 7fe5ed26..41d2825d 100644 --- a/Sources/Confidence/EventStorage.swift +++ b/Sources/Confidence/EventStorage.swift @@ -10,7 +10,6 @@ struct ConfidenceEvent: Codable { internal protocol EventStorage { func startNewBatch() throws func writeEvent(event: ConfidenceEvent) throws - func writeEvents(events: [ConfidenceEvent]) throws func batchReadyIds() throws -> [String] func eventsFrom(id: String) throws -> [ConfidenceEvent] func remove(id: String) throws @@ -44,23 +43,6 @@ internal class EventStorageImpl: EventStorage { } } - func writeEvents(events: [ConfidenceEvent]) throws { - try storageQueue.sync { - guard let currentFileHandle = currentFileHandle else { - return - } - let encoder = JSONEncoder() - let serialied = try encoder.encode(events) - let delimiter = "\n".data(using: .utf8) - guard let delimiter else { - return - } - currentFileHandle.seekToEndOfFile() - try currentFileHandle.write(contentsOf: delimiter) - try currentFileHandle.write(contentsOf: serialied) - } - } - func writeEvent(event: ConfidenceEvent) throws { try storageQueue.sync { guard let currentFileHandle = currentFileHandle else { diff --git a/Tests/ConfidenceTests/EventUploaderMock.swift b/Tests/ConfidenceTests/EventUploaderMock.swift index 24539a80..ffffeb46 100644 --- a/Tests/ConfidenceTests/EventUploaderMock.swift +++ b/Tests/ConfidenceTests/EventUploaderMock.swift @@ -27,10 +27,6 @@ final class EventStorageMock: EventStorage { events.removeAll() } - func writeEvents(events: [ConfidenceEvent]) throws { - self.events += events - } - func writeEvent(event: ConfidenceEvent) throws { events.append(event) }