-
Notifications
You must be signed in to change notification settings - Fork 3
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* add event sender engine basics * handle throwing from straoge * add test to support engine implementation * add tests * fix lint * revert deleted files * only pass list of events to the uploader * add event time and add test to support concurrent upload task
- Loading branch information
1 parent
444a191
commit b223804
Showing
4 changed files
with
227 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,106 @@ | ||
import Combine | ||
import Foundation | ||
|
||
protocol EventsUploader { | ||
func upload(request: [Event]) async -> Bool | ||
} | ||
|
||
struct Event: Encodable, Equatable { | ||
let name: String | ||
let payload: [String: ConfidenceValue] | ||
let eventTime: Date | ||
} | ||
|
||
protocol FlushPolicy { | ||
func reset() | ||
func hit(event: Event) | ||
func shouldFlush() -> Bool | ||
} | ||
|
||
protocol Clock { | ||
func now() -> Date | ||
} | ||
|
||
protocol EventSenderEngine { | ||
func send(name: String, message: [String: ConfidenceValue]) | ||
func shutdown() | ||
} | ||
|
||
final class EventSenderEngineImpl: EventSenderEngine { | ||
private static let sendSignalName: String = "FLUSH" | ||
private let storage: any EventStorage | ||
private let writeReqChannel = PassthroughSubject<Event, Never>() | ||
private let uploadReqChannel = PassthroughSubject<String, Never>() | ||
private var cancellables = Set<AnyCancellable>() | ||
private let flushPolicies: [FlushPolicy] | ||
private let uploader: EventsUploader | ||
private let clientSecret: String | ||
private let clock: Clock | ||
|
||
init( | ||
clientSecret: String, | ||
uploader: EventsUploader, | ||
clock: Clock, | ||
storage: EventStorage, | ||
flushPolicies: [FlushPolicy] | ||
) { | ||
self.clock = clock | ||
self.uploader = uploader | ||
self.clientSecret = clientSecret | ||
self.storage = storage | ||
self.flushPolicies = flushPolicies | ||
|
||
writeReqChannel.sink(receiveValue: { [weak self] event in | ||
guard let self = self else { return } | ||
do { | ||
try self.storage.writeEvent(event: event) | ||
} catch { | ||
|
||
} | ||
|
||
self.flushPolicies.forEach({ policy in policy.hit(event: event) }) | ||
let shouldFlush = self.flushPolicies.contains(where: { policy in policy.shouldFlush() }) | ||
|
||
if shouldFlush { | ||
self.uploadReqChannel.send(EventSenderEngineImpl.sendSignalName) | ||
self.flushPolicies.forEach({ policy in policy.reset() }) | ||
} | ||
|
||
}).store(in: &cancellables) | ||
|
||
uploadReqChannel.sink(receiveValue: { [weak self] _ in | ||
do { | ||
guard let self = self else { return } | ||
try self.storage.startNewBatch() | ||
let ids = storage.batchReadyIds() | ||
for id in ids { | ||
let events = try self.storage.eventsFrom(id: id) | ||
let shouldCleanup = await self.uploader.upload(request: events) | ||
if shouldCleanup { | ||
try storage.remove(id: id) | ||
} | ||
} | ||
} catch { | ||
|
||
} | ||
}).store(in: &cancellables) | ||
} | ||
|
||
func send(name: String, message: [String: ConfidenceValue]) { | ||
writeReqChannel.send(Event(name: name, payload: message, eventTime: Date())) | ||
} | ||
|
||
func shutdown() { | ||
cancellables.removeAll() | ||
} | ||
} | ||
|
||
private extension Publisher where Self.Failure == Never { | ||
func sink(receiveValue: @escaping ((Self.Output) async -> Void)) -> AnyCancellable { | ||
sink { value in | ||
Task { | ||
await receiveValue(value) | ||
} | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
import Foundation | ||
|
||
struct EventBatchRequest: Encodable { | ||
let clientSecret: String | ||
let sendTime: Date | ||
let events: [Event] | ||
} | ||
|
||
internal protocol EventStorage { | ||
func startNewBatch() throws | ||
func writeEvent(event: Event) throws | ||
func batchReadyIds() -> [String] | ||
func eventsFrom(id: String) throws -> [Event] | ||
func remove(id: String) throws | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
import Foundation | ||
import XCTest | ||
|
||
@testable import Confidence | ||
|
||
final class MinSizeFlushPolicy: FlushPolicy { | ||
private var maxSize = 5 | ||
private var size = 0 | ||
func reset() { | ||
size = 0 | ||
} | ||
|
||
func hit(event: Event) { | ||
size += 1 | ||
} | ||
|
||
func shouldFlush() -> Bool { | ||
return size >= maxSize | ||
} | ||
|
||
|
||
} | ||
|
||
final class EventSenderEngineTest: XCTestCase { | ||
func testAddingEventsWithSizeFlushPolicyWorks() throws { | ||
let flushPolicies = [MinSizeFlushPolicy()] | ||
let uploader = EventUploaderMock() | ||
let eventSenderEngine = EventSenderEngineImpl( | ||
clientSecret: "CLIENT_SECRET", | ||
uploader: uploader, | ||
clock: ClockMock(), | ||
storage: EventStorageMock(), | ||
flushPolicies: flushPolicies | ||
) | ||
|
||
let expectation = XCTestExpectation(description: "Upload finished") | ||
let cancellable = uploader.subject.sink { value in | ||
expectation.fulfill() | ||
} | ||
|
||
var events: [Event] = [] | ||
for i in 0..<5 { | ||
events.append(Event(name: "\(i)", payload: [:], eventTime: Date())) | ||
eventSenderEngine.send(name: "\(i)", message: [:]) | ||
} | ||
|
||
wait(for: [expectation], timeout: 5) | ||
let uploadRequest = try XCTUnwrap(uploader.calledRequest) | ||
XCTAssertTrue(uploadRequest.map { $0.name } == events.map { $0.name }) | ||
|
||
uploader.reset() | ||
eventSenderEngine.send(name: "Hello", message: [:]) | ||
XCTAssertNil(uploader.calledRequest) | ||
cancellable.cancel() | ||
} | ||
} | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
import Foundation | ||
import Combine | ||
@testable import Confidence | ||
|
||
final class EventUploaderMock: EventsUploader { | ||
var calledRequest: [Event]? = nil | ||
let subject: PassthroughSubject<Int, Never> = PassthroughSubject() | ||
func upload(request: [Event]) async -> Bool { | ||
calledRequest = request | ||
subject.send(1) | ||
return true | ||
} | ||
|
||
func reset() { | ||
calledRequest = nil | ||
} | ||
} | ||
|
||
final class ClockMock: Clock { | ||
func now() -> Date { | ||
return Date() | ||
} | ||
} | ||
|
||
final class EventStorageMock: EventStorage { | ||
private var events: [Event] = [] | ||
private var batches: [String: [Event]] = [:] | ||
func startNewBatch() throws { | ||
batches[("\(batches.count)")] = events | ||
events.removeAll() | ||
} | ||
|
||
func writeEvent(event: Event) throws { | ||
events.append(event) | ||
} | ||
|
||
func batchReadyIds() -> [String] { | ||
return batches.map({ batch in batch.0}) | ||
} | ||
|
||
func eventsFrom(id: String) throws -> [Event] { | ||
return batches[id]! | ||
} | ||
|
||
func remove(id: String) throws { | ||
batches.removeValue(forKey: id) | ||
} | ||
|
||
} |