Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Event sender engine #88

Merged
merged 8 commits into from
Apr 12, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 97 additions & 0 deletions Sources/Confidence/EventSenderEngine.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import Combine
import Foundation

protocol EventsUploader {
func upload(request: EventBatchRequest) -> Bool
vahidlazio marked this conversation as resolved.
Show resolved Hide resolved
}

struct Event: Encodable, Equatable {
let name: String
let payload: [String: ConfidenceValue]
}

protocol FlushPolicy {
func reset()
func hit(event: Event)
func shouldFlush() -> Bool
}

protocol Clock {
func now() -> Date
}

protocol EventSenderEngine {
func send(name: String, message: [String: ConfidenceValue])
func shutdown()
}

final class EventSenderEngineImpl: EventSenderEngine {
private static let sendSignalName: String = "FLUSH"
private let storage: any EventStorage
private let writeReqChannel = PassthroughSubject<Event, Never>()
private let uploadReqChannel = PassthroughSubject<String, Never>()
private var cancellables = Set<AnyCancellable>()
private let flushPolicies: [FlushPolicy]
private let uploader: EventsUploader
private let clientSecret: String
private let clock: Clock

init(
clientSecret: String,
uploader: EventsUploader,
clock: Clock,
storage: EventStorage,
flushPolicies: [FlushPolicy]
) {
self.clock = clock
self.uploader = uploader
self.clientSecret = clientSecret
self.storage = storage
self.flushPolicies = flushPolicies

writeReqChannel.sink(receiveValue: { [weak self] event in

Check warning on line 52 in Sources/Confidence/EventSenderEngine.swift

View workflow job for this annotation

GitHub Actions / SwiftLint

Trailing Closure Violation: Trailing closure syntax should be used whenever possible (trailing_closure)
guard let self = self else { return }
do {
try self.storage.writeEvent(event: event)
} catch {

Check warning on line 57 in Sources/Confidence/EventSenderEngine.swift

View workflow job for this annotation

GitHub Actions / SwiftLint

Vertical Whitespace before Closing Braces Violation: Don't include vertical whitespace (empty line) before closing braces (vertical_whitespace_closing_braces)
}

self.flushPolicies.forEach({ policy in policy.hit(event: event) })

Check warning on line 60 in Sources/Confidence/EventSenderEngine.swift

View workflow job for this annotation

GitHub Actions / SwiftLint

Trailing Closure Violation: Trailing closure syntax should be used whenever possible (trailing_closure)
let shouldFlush = self.flushPolicies.contains(where: { policy in policy.shouldFlush() })

Check warning on line 61 in Sources/Confidence/EventSenderEngine.swift

View workflow job for this annotation

GitHub Actions / SwiftLint

Trailing Closure Violation: Trailing closure syntax should be used whenever possible (trailing_closure)

if shouldFlush {
self.uploadReqChannel.send(EventSenderEngineImpl.sendSignalName)
self.flushPolicies.forEach({ policy in policy.reset() })

Check warning on line 65 in Sources/Confidence/EventSenderEngine.swift

View workflow job for this annotation

GitHub Actions / SwiftLint

Trailing Closure Violation: Trailing closure syntax should be used whenever possible (trailing_closure)
}

Check warning on line 67 in Sources/Confidence/EventSenderEngine.swift

View workflow job for this annotation

GitHub Actions / SwiftLint

Vertical Whitespace before Closing Braces Violation: Don't include vertical whitespace (empty line) before closing braces (vertical_whitespace_closing_braces)
}).store(in: &cancellables)

Check warning on line 68 in Sources/Confidence/EventSenderEngine.swift

View workflow job for this annotation

GitHub Actions / SwiftLint

Multiline Function Chains Violation: Chained function calls should be either on the same line, or one per line (multiline_function_chains)

uploadReqChannel.sink(receiveValue: { [weak self] _ in

Check warning on line 70 in Sources/Confidence/EventSenderEngine.swift

View workflow job for this annotation

GitHub Actions / SwiftLint

Trailing Closure Violation: Trailing closure syntax should be used whenever possible (trailing_closure)
do {
guard let self = self else { return }
try self.storage.startNewBatch()
let ids = storage.batchReadyIds()
for id in ids {
let events = try self.storage.eventsFrom(id: id)
let batchRequest = EventBatchRequest(
clientSecret: clientSecret, sendTime: clock.now(), events: events)
vahidlazio marked this conversation as resolved.
Show resolved Hide resolved
let shouldCleanup = self.uploader.upload(request: batchRequest)
if shouldCleanup {
try storage.remove(id: id)
}
}
} catch {

Check warning on line 85 in Sources/Confidence/EventSenderEngine.swift

View workflow job for this annotation

GitHub Actions / SwiftLint

Vertical Whitespace before Closing Braces Violation: Don't include vertical whitespace (empty line) before closing braces (vertical_whitespace_closing_braces)
}
}).store(in: &cancellables)

Check warning on line 87 in Sources/Confidence/EventSenderEngine.swift

View workflow job for this annotation

GitHub Actions / SwiftLint

Multiline Function Chains Violation: Chained function calls should be either on the same line, or one per line (multiline_function_chains)
}

func send(name: String, message: [String: ConfidenceValue]) {
writeReqChannel.send(Event(name: name, payload: message))
vahidlazio marked this conversation as resolved.
Show resolved Hide resolved
}

func shutdown() {
cancellables.removeAll()
}
}
15 changes: 15 additions & 0 deletions Sources/Confidence/EventSenderStorage.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import Foundation

struct EventBatchRequest: Encodable {
let clientSecret: String
let sendTime: Date
let events: [Event]
}

internal protocol EventStorage {
func startNewBatch() throws
func writeEvent(event: Event) throws
func batchReadyIds() -> [String]
func eventsFrom(id: String) throws -> [Event]
func remove(id: String) throws
}
52 changes: 52 additions & 0 deletions Tests/ConfidenceProviderTests/EventSenderEngineTest.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import Foundation
import XCTest

@testable import Confidence

final class MinSizeFlushPolicy: FlushPolicy {
private var maxSize = 5
private var size = 0
func reset() {
size = 0
}

func hit(event: Event) {
size += 1
}

func shouldFlush() -> Bool {
return size >= maxSize
}


}

final class EventSenderEngineTest: XCTestCase {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about test cases and what do you think about something pseudo like this:

// given that engine is initialized with storage that contains 3 events to be uploaded

// when no "sends" happen (in some specific time?)

// expect the 3 events to be uploaded

We don't have support for this yet but I think we need it (on both android and ios).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you mean introducing flush intervals?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes 👍 but for later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ticket is created here

func testAddingEventsWithSizeFlushPolicyWorks() throws {
let flushPolicies = [MinSizeFlushPolicy()]
let uploader = EventUploaderMock()
let eventSenderEngine = EventSenderEngineImpl(
clientSecret: "CLIENT_SECRET",
uploader: uploader,
clock: ClockMock(),
storage: EventStorageMock(),
flushPolicies: flushPolicies
)

var events: [Event] = []
for i in 0..<5 {
events.append(Event(name: "\(i)", payload: [:]))
eventSenderEngine.send(name: "\(i)", message: [:])
}

let expectedRequest = EventBatchRequest(clientSecret: "CLIENT_SECRET", sendTime: Date(), events: events)
let uploadRequest = try XCTUnwrap(uploader.calledRequest)
XCTAssertTrue(uploadRequest.clientSecret == expectedRequest.clientSecret)
XCTAssertTrue(uploadRequest.events == expectedRequest.events)

uploader.reset()
eventSenderEngine.send(name: "Hello", message: [:])
XCTAssertNil(uploader.calledRequest)
}
}

46 changes: 46 additions & 0 deletions Tests/ConfidenceProviderTests/EventUploaderMock.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import Foundation
@testable import Confidence

final class EventUploaderMock: EventsUploader {
var calledRequest: EventBatchRequest? = nil
func upload(request: EventBatchRequest) -> Bool {
calledRequest = request
return true
}

func reset() {
calledRequest = nil
}
}

final class ClockMock: Clock {
func now() -> Date {
return Date()
}
}

final class EventStorageMock: EventStorage {
private var events: [Event] = []
private var batches: [String: [Event]] = [:]
func startNewBatch() throws {
batches[("\(batches.count)")] = events
events.removeAll()
}

func writeEvent(event: Event) throws {
events.append(event)
}

func batchReadyIds() -> [String] {
return batches.map({ batch in batch.0})
}

func eventsFrom(id: String) throws -> [Event] {
return batches[id]!
}

func remove(id: String) throws {
batches.removeValue(forKey: id)
}

}
Loading