-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Event sender engine #88
Merged
Merged
Changes from all commits
Commits
Show all changes
8 commits
Select commit
Hold shift + click to select a range
384c23c
add event sender engine basics
vahidlazio 843cc0b
handle throwing from straoge
vahidlazio dbd94e4
add test to support engine implementation
vahidlazio 00c233a
add tests
vahidlazio 322da37
fix lint
vahidlazio feaaa71
revert deleted files
vahidlazio 7c12583
only pass list of events to the uploader
vahidlazio 033b8e9
add event time and add test to support concurrent upload task
vahidlazio File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,106 @@ | ||
import Combine | ||
import Foundation | ||
|
||
protocol EventsUploader { | ||
func upload(request: [Event]) async -> Bool | ||
} | ||
|
||
struct Event: Encodable, Equatable { | ||
let name: String | ||
let payload: [String: ConfidenceValue] | ||
let eventTime: Date | ||
} | ||
|
||
protocol FlushPolicy { | ||
func reset() | ||
func hit(event: Event) | ||
func shouldFlush() -> Bool | ||
} | ||
|
||
protocol Clock { | ||
func now() -> Date | ||
} | ||
|
||
protocol EventSenderEngine { | ||
func send(name: String, message: [String: ConfidenceValue]) | ||
func shutdown() | ||
} | ||
|
||
final class EventSenderEngineImpl: EventSenderEngine { | ||
private static let sendSignalName: String = "FLUSH" | ||
private let storage: any EventStorage | ||
private let writeReqChannel = PassthroughSubject<Event, Never>() | ||
private let uploadReqChannel = PassthroughSubject<String, Never>() | ||
private var cancellables = Set<AnyCancellable>() | ||
private let flushPolicies: [FlushPolicy] | ||
private let uploader: EventsUploader | ||
private let clientSecret: String | ||
private let clock: Clock | ||
|
||
init( | ||
clientSecret: String, | ||
uploader: EventsUploader, | ||
clock: Clock, | ||
storage: EventStorage, | ||
flushPolicies: [FlushPolicy] | ||
) { | ||
self.clock = clock | ||
self.uploader = uploader | ||
self.clientSecret = clientSecret | ||
self.storage = storage | ||
self.flushPolicies = flushPolicies | ||
|
||
writeReqChannel.sink(receiveValue: { [weak self] event in | ||
guard let self = self else { return } | ||
do { | ||
try self.storage.writeEvent(event: event) | ||
} catch { | ||
|
||
} | ||
|
||
self.flushPolicies.forEach({ policy in policy.hit(event: event) }) | ||
let shouldFlush = self.flushPolicies.contains(where: { policy in policy.shouldFlush() }) | ||
|
||
if shouldFlush { | ||
self.uploadReqChannel.send(EventSenderEngineImpl.sendSignalName) | ||
self.flushPolicies.forEach({ policy in policy.reset() }) | ||
} | ||
|
||
}).store(in: &cancellables) | ||
|
||
uploadReqChannel.sink(receiveValue: { [weak self] _ in | ||
do { | ||
guard let self = self else { return } | ||
try self.storage.startNewBatch() | ||
let ids = storage.batchReadyIds() | ||
for id in ids { | ||
let events = try self.storage.eventsFrom(id: id) | ||
let shouldCleanup = await self.uploader.upload(request: events) | ||
if shouldCleanup { | ||
try storage.remove(id: id) | ||
} | ||
} | ||
} catch { | ||
|
||
} | ||
}).store(in: &cancellables) | ||
} | ||
|
||
func send(name: String, message: [String: ConfidenceValue]) { | ||
writeReqChannel.send(Event(name: name, payload: message, eventTime: Date())) | ||
} | ||
|
||
func shutdown() { | ||
cancellables.removeAll() | ||
} | ||
} | ||
|
||
private extension Publisher where Self.Failure == Never { | ||
func sink(receiveValue: @escaping ((Self.Output) async -> Void)) -> AnyCancellable { | ||
sink { value in | ||
Task { | ||
await receiveValue(value) | ||
} | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
import Foundation | ||
|
||
struct EventBatchRequest: Encodable { | ||
let clientSecret: String | ||
let sendTime: Date | ||
let events: [Event] | ||
} | ||
|
||
internal protocol EventStorage { | ||
func startNewBatch() throws | ||
func writeEvent(event: Event) throws | ||
func batchReadyIds() -> [String] | ||
func eventsFrom(id: String) throws -> [Event] | ||
func remove(id: String) throws | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
import Foundation | ||
import XCTest | ||
|
||
@testable import Confidence | ||
|
||
final class MinSizeFlushPolicy: FlushPolicy { | ||
private var maxSize = 5 | ||
private var size = 0 | ||
func reset() { | ||
size = 0 | ||
} | ||
|
||
func hit(event: Event) { | ||
size += 1 | ||
} | ||
|
||
func shouldFlush() -> Bool { | ||
return size >= maxSize | ||
} | ||
|
||
|
||
} | ||
|
||
final class EventSenderEngineTest: XCTestCase { | ||
func testAddingEventsWithSizeFlushPolicyWorks() throws { | ||
let flushPolicies = [MinSizeFlushPolicy()] | ||
let uploader = EventUploaderMock() | ||
let eventSenderEngine = EventSenderEngineImpl( | ||
clientSecret: "CLIENT_SECRET", | ||
uploader: uploader, | ||
clock: ClockMock(), | ||
storage: EventStorageMock(), | ||
flushPolicies: flushPolicies | ||
) | ||
|
||
let expectation = XCTestExpectation(description: "Upload finished") | ||
let cancellable = uploader.subject.sink { value in | ||
expectation.fulfill() | ||
} | ||
|
||
var events: [Event] = [] | ||
for i in 0..<5 { | ||
events.append(Event(name: "\(i)", payload: [:], eventTime: Date())) | ||
eventSenderEngine.send(name: "\(i)", message: [:]) | ||
} | ||
|
||
wait(for: [expectation], timeout: 5) | ||
let uploadRequest = try XCTUnwrap(uploader.calledRequest) | ||
XCTAssertTrue(uploadRequest.map { $0.name } == events.map { $0.name }) | ||
|
||
uploader.reset() | ||
eventSenderEngine.send(name: "Hello", message: [:]) | ||
XCTAssertNil(uploader.calledRequest) | ||
cancellable.cancel() | ||
} | ||
} | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
import Foundation | ||
import Combine | ||
@testable import Confidence | ||
|
||
final class EventUploaderMock: EventsUploader { | ||
var calledRequest: [Event]? = nil | ||
let subject: PassthroughSubject<Int, Never> = PassthroughSubject() | ||
func upload(request: [Event]) async -> Bool { | ||
calledRequest = request | ||
subject.send(1) | ||
return true | ||
} | ||
|
||
func reset() { | ||
calledRequest = nil | ||
} | ||
} | ||
|
||
final class ClockMock: Clock { | ||
func now() -> Date { | ||
return Date() | ||
} | ||
} | ||
|
||
final class EventStorageMock: EventStorage { | ||
private var events: [Event] = [] | ||
private var batches: [String: [Event]] = [:] | ||
func startNewBatch() throws { | ||
batches[("\(batches.count)")] = events | ||
events.removeAll() | ||
} | ||
|
||
func writeEvent(event: Event) throws { | ||
events.append(event) | ||
} | ||
|
||
func batchReadyIds() -> [String] { | ||
return batches.map({ batch in batch.0}) | ||
} | ||
|
||
func eventsFrom(id: String) throws -> [Event] { | ||
return batches[id]! | ||
} | ||
|
||
func remove(id: String) throws { | ||
batches.removeValue(forKey: id) | ||
} | ||
|
||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking about test cases and what do you think about something pseudo like this:
We don't have support for this yet but I think we need it (on both android and ios).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you mean introducing flush intervals?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes 👍 but for later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ticket is created here