Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add watchdog timer #549

Closed
wants to merge 2 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 27 additions & 8 deletions Sources/LiveKit/Broadcast/SocketConnectionFrameReader.swift
Original file line number Diff line number Diff line change
Expand Up @@ -134,19 +134,29 @@ class SocketConnectionFrameReader: NSObject {
}
}
}


/// A task to automatically stop the capture if an initial frame isn't received within a timeout window.
private var watchdogTask: Task<(), any Error>?

private var message: Message?
var didCapture: ((CVPixelBuffer, RTCVideoRotation) -> Void)?
var didEnd: (() -> Void)?

override init() {}

func startCapture(with connection: BroadcastServerSocketConnection) {

self.connection = connection
message = nil

if !connection.open() {
guard connection.open() else {
stopCapture()
return
}
watchdogTask = Task { [weak self] in
let timeoutWindow: TimeInterval = .defaultCaptureStart
try await Task.sleep(nanoseconds: UInt64(timeoutWindow * 1_000_000_000))
try Task.checkCancellation()
logger.warning("Initial frame not received before timeout")
self?.stopCaptureAndNotify()
}
}

Expand All @@ -157,7 +167,12 @@ class SocketConnectionFrameReader: NSObject {

// MARK: Private Methods

func readBytes(from stream: InputStream) {
private func stopCaptureAndNotify() {
stopCapture()
didEnd?()
}

private func readBytes(from stream: InputStream) {
if !(stream.hasBytesAvailable) {
return
}
Expand Down Expand Up @@ -194,13 +209,18 @@ class SocketConnectionFrameReader: NSObject {
}
}

func didCaptureVideoFrame(
private func didCaptureVideoFrame(
_ pixelBuffer: CVPixelBuffer?,
with orientation: CGImagePropertyOrientation
) {
guard let pixelBuffer else {
return
}
if let watchdogTask {
logger.debug("Initial frame received")
watchdogTask.cancel()
self.watchdogTask = nil
}

var rotation: RTCVideoRotation
switch orientation {
Expand All @@ -227,8 +247,7 @@ extension SocketConnectionFrameReader: StreamDelegate {
readBytes(from: aStream as! InputStream)
case .endEncountered:
logger.log(level: .debug, "server stream end encountered")
stopCapture()
didEnd?()
stopCaptureAndNotify()
case .errorOccurred:
logger.log(level: .debug, "server stream error encountered: \(aStream.streamError?.localizedDescription ?? "")")
default:
Expand Down
Loading