Skip to content

Commit

Permalink
use semaphore to make upload serialz
Browse files Browse the repository at this point in the history
  • Loading branch information
vahidlazio committed May 8, 2024
1 parent 4ba7df7 commit 0acf84d
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 46 deletions.
59 changes: 35 additions & 24 deletions Sources/Confidence/EventSenderEngine.swift
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ final class EventSenderEngineImpl: EventSenderEngine {
private let uploader: ConfidenceClient
private let clientSecret: String
private let payloadMerger: PayloadMerger = PayloadMergerImpl()
private let queue = DispatchQueue(label: "com.confidence.engine")
private let semaphore = DispatchSemaphore(value: 1)

init(
clientSecret: String,
Expand Down Expand Up @@ -53,38 +53,49 @@ final class EventSenderEngineImpl: EventSenderEngine {
.store(in: &cancellables)

uploadReqChannel.sink { [weak self] _ in
guard let self = self else { return }
await self.upload()
}
.store(in: &cancellables)
}

func upload() async {
await withSemaphore { [weak self] in
guard let self = self else { return }
do {
guard let self = self else { return }
var batches: [[ConfidenceEvent]] = []
try queue.sync {
try self.storage.startNewBatch()
let ids = try storage.batchReadyIds()
for id in ids {
let events: [ConfidenceEvent] = try self.storage.eventsFrom(id: id)
batches.append(events)
do {
try storage.remove(id: id)
} catch {
try self.storage.startNewBatch()
let ids = try storage.batchReadyIds()
if ids.isEmpty {
return
}
for id in ids {
let events: [NetworkEvent] = try self.storage.eventsFrom(id: id)
.compactMap { event in
return NetworkEvent(
eventDefinition: event.name,
payload: NetworkStruct(fields: TypeMapper.convert(structure: event.payload).fields),
eventTime: Date.backport.toISOString(date: event.eventTime))
}
var shouldCleanup = false
if events.isEmpty {
shouldCleanup = true
} else {
shouldCleanup = try await self.uploader.upload(events: events)
}
}

for events in batches {
let batch = events.compactMap { event in
return NetworkEvent(
eventDefinition: event.name,
payload: NetworkStruct(fields: TypeMapper.convert(structure: event.payload).fields),
eventTime: Date.backport.toISOString(date: event.eventTime))
}
let shouldCleanup = try await self.uploader.upload(events: batch)
if !shouldCleanup {
try storage.writeEvents(events: events)
if shouldCleanup {
try storage.remove(id: id)
}
}
} catch {
}
}
.store(in: &cancellables)
}

func withSemaphore(callback: @escaping () async -> Void) async {
semaphore.wait()
await callback()
semaphore.signal()
}

func emit(eventName: String, message: ConfidenceStruct, context: ConfidenceStruct) {
Expand Down
18 changes: 0 additions & 18 deletions Sources/Confidence/EventStorage.swift
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ struct ConfidenceEvent: Codable {
internal protocol EventStorage {
func startNewBatch() throws
func writeEvent(event: ConfidenceEvent) throws
func writeEvents(events: [ConfidenceEvent]) throws
func batchReadyIds() throws -> [String]
func eventsFrom(id: String) throws -> [ConfidenceEvent]
func remove(id: String) throws
Expand Down Expand Up @@ -44,23 +43,6 @@ internal class EventStorageImpl: EventStorage {
}
}

func writeEvents(events: [ConfidenceEvent]) throws {
try storageQueue.sync {
guard let currentFileHandle = currentFileHandle else {
return
}
let encoder = JSONEncoder()
let serialied = try encoder.encode(events)
let delimiter = "\n".data(using: .utf8)
guard let delimiter else {
return
}
currentFileHandle.seekToEndOfFile()
try currentFileHandle.write(contentsOf: delimiter)
try currentFileHandle.write(contentsOf: serialied)
}
}

func writeEvent(event: ConfidenceEvent) throws {
try storageQueue.sync {
guard let currentFileHandle = currentFileHandle else {
Expand Down
4 changes: 0 additions & 4 deletions Tests/ConfidenceTests/EventUploaderMock.swift
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,6 @@ final class EventStorageMock: EventStorage {
events.removeAll()
}

func writeEvents(events: [ConfidenceEvent]) throws {
self.events += events
}

func writeEvent(event: ConfidenceEvent) throws {
events.append(event)
}
Expand Down

0 comments on commit 0acf84d

Please sign in to comment.