diff --git a/Sources/Confidence/EventSenderEngine.swift b/Sources/Confidence/EventSenderEngine.swift new file mode 100644 index 00000000..3cebf628 --- /dev/null +++ b/Sources/Confidence/EventSenderEngine.swift @@ -0,0 +1,106 @@ +import Combine +import Foundation + +protocol EventsUploader { + func upload(request: [Event]) async -> Bool +} + +struct Event: Encodable, Equatable { + let name: String + let payload: [String: ConfidenceValue] + let eventTime: Date +} + +protocol FlushPolicy { + func reset() + func hit(event: Event) + func shouldFlush() -> Bool +} + +protocol Clock { + func now() -> Date +} + +protocol EventSenderEngine { + func send(name: String, message: [String: ConfidenceValue]) + func shutdown() +} + +final class EventSenderEngineImpl: EventSenderEngine { + private static let sendSignalName: String = "FLUSH" + private let storage: any EventStorage + private let writeReqChannel = PassthroughSubject() + private let uploadReqChannel = PassthroughSubject() + private var cancellables = Set() + private let flushPolicies: [FlushPolicy] + private let uploader: EventsUploader + private let clientSecret: String + private let clock: Clock + + init( + clientSecret: String, + uploader: EventsUploader, + clock: Clock, + storage: EventStorage, + flushPolicies: [FlushPolicy] + ) { + self.clock = clock + self.uploader = uploader + self.clientSecret = clientSecret + self.storage = storage + self.flushPolicies = flushPolicies + + writeReqChannel.sink(receiveValue: { [weak self] event in + guard let self = self else { return } + do { + try self.storage.writeEvent(event: event) + } catch { + + } + + self.flushPolicies.forEach({ policy in policy.hit(event: event) }) + let shouldFlush = self.flushPolicies.contains(where: { policy in policy.shouldFlush() }) + + if shouldFlush { + self.uploadReqChannel.send(EventSenderEngineImpl.sendSignalName) + self.flushPolicies.forEach({ policy in policy.reset() }) + } + + }).store(in: &cancellables) + + uploadReqChannel.sink(receiveValue: { [weak self] _ in + do { + guard let self = self else { return } + try self.storage.startNewBatch() + let ids = storage.batchReadyIds() + for id in ids { + let events = try self.storage.eventsFrom(id: id) + let shouldCleanup = await self.uploader.upload(request: events) + if shouldCleanup { + try storage.remove(id: id) + } + } + } catch { + + } + }).store(in: &cancellables) + } + + func send(name: String, message: [String: ConfidenceValue]) { + writeReqChannel.send(Event(name: name, payload: message, eventTime: Date())) + } + + func shutdown() { + cancellables.removeAll() + } +} + +private extension Publisher where Self.Failure == Never { + func sink(receiveValue: @escaping ((Self.Output) async -> Void)) -> AnyCancellable { + sink { value in + Task { + await receiveValue(value) + } + } + } +} diff --git a/Sources/Confidence/EventSenderStorage.swift b/Sources/Confidence/EventSenderStorage.swift new file mode 100644 index 00000000..d25333bf --- /dev/null +++ b/Sources/Confidence/EventSenderStorage.swift @@ -0,0 +1,15 @@ +import Foundation + +struct EventBatchRequest: Encodable { + let clientSecret: String + let sendTime: Date + let events: [Event] +} + +internal protocol EventStorage { + func startNewBatch() throws + func writeEvent(event: Event) throws + func batchReadyIds() -> [String] + func eventsFrom(id: String) throws -> [Event] + func remove(id: String) throws +} diff --git a/Tests/ConfidenceProviderTests/EventSenderEngineTest.swift b/Tests/ConfidenceProviderTests/EventSenderEngineTest.swift new file mode 100644 index 00000000..e9ca010b --- /dev/null +++ b/Tests/ConfidenceProviderTests/EventSenderEngineTest.swift @@ -0,0 +1,57 @@ +import Foundation +import XCTest + +@testable import Confidence + +final class MinSizeFlushPolicy: FlushPolicy { + private var maxSize = 5 + private var size = 0 + func reset() { + size = 0 + } + + func hit(event: Event) { + size += 1 + } + + func shouldFlush() -> Bool { + return size >= maxSize + } + + +} + +final class EventSenderEngineTest: XCTestCase { + func testAddingEventsWithSizeFlushPolicyWorks() throws { + let flushPolicies = [MinSizeFlushPolicy()] + let uploader = EventUploaderMock() + let eventSenderEngine = EventSenderEngineImpl( + clientSecret: "CLIENT_SECRET", + uploader: uploader, + clock: ClockMock(), + storage: EventStorageMock(), + flushPolicies: flushPolicies + ) + + let expectation = XCTestExpectation(description: "Upload finished") + let cancellable = uploader.subject.sink { value in + expectation.fulfill() + } + + var events: [Event] = [] + for i in 0..<5 { + events.append(Event(name: "\(i)", payload: [:], eventTime: Date())) + eventSenderEngine.send(name: "\(i)", message: [:]) + } + + wait(for: [expectation], timeout: 5) + let uploadRequest = try XCTUnwrap(uploader.calledRequest) + XCTAssertTrue(uploadRequest.map { $0.name } == events.map { $0.name }) + + uploader.reset() + eventSenderEngine.send(name: "Hello", message: [:]) + XCTAssertNil(uploader.calledRequest) + cancellable.cancel() + } +} + diff --git a/Tests/ConfidenceProviderTests/EventUploaderMock.swift b/Tests/ConfidenceProviderTests/EventUploaderMock.swift new file mode 100644 index 00000000..f7008d02 --- /dev/null +++ b/Tests/ConfidenceProviderTests/EventUploaderMock.swift @@ -0,0 +1,49 @@ +import Foundation +import Combine +@testable import Confidence + +final class EventUploaderMock: EventsUploader { + var calledRequest: [Event]? = nil + let subject: PassthroughSubject = PassthroughSubject() + func upload(request: [Event]) async -> Bool { + calledRequest = request + subject.send(1) + return true + } + + func reset() { + calledRequest = nil + } +} + +final class ClockMock: Clock { + func now() -> Date { + return Date() + } +} + +final class EventStorageMock: EventStorage { + private var events: [Event] = [] + private var batches: [String: [Event]] = [:] + func startNewBatch() throws { + batches[("\(batches.count)")] = events + events.removeAll() + } + + func writeEvent(event: Event) throws { + events.append(event) + } + + func batchReadyIds() -> [String] { + return batches.map({ batch in batch.0}) + } + + func eventsFrom(id: String) throws -> [Event] { + return batches[id]! + } + + func remove(id: String) throws { + batches.removeValue(forKey: id) + } + +}