From cd5ba56e56e434c307925726628915a09eed8ad2 Mon Sep 17 00:00:00 2001 From: Jacob Gelman <3182119+ladvoc@users.noreply.github.com> Date: Thu, 16 Jan 2025 09:06:20 -0800 Subject: [PATCH] Add watchdog timer Automatically end broadcast capture if an initial frame isn't received within a timeout window. --- .../SocketConnectionFrameReader.swift | 35 ++++++++++++++----- 1 file changed, 27 insertions(+), 8 deletions(-) diff --git a/Sources/LiveKit/Broadcast/SocketConnectionFrameReader.swift b/Sources/LiveKit/Broadcast/SocketConnectionFrameReader.swift index 8bed0b099..5793b9c62 100644 --- a/Sources/LiveKit/Broadcast/SocketConnectionFrameReader.swift +++ b/Sources/LiveKit/Broadcast/SocketConnectionFrameReader.swift @@ -134,19 +134,29 @@ class SocketConnectionFrameReader: NSObject { } } } - + + /// A task to automatically stop the capture if an initial frame isn't received within a timeout window. + private var watchdogTask: Task<(), any Error>? + private var message: Message? var didCapture: ((CVPixelBuffer, RTCVideoRotation) -> Void)? var didEnd: (() -> Void)? - override init() {} - func startCapture(with connection: BroadcastServerSocketConnection) { + self.connection = connection message = nil - if !connection.open() { + guard connection.open() else { stopCapture() + return + } + watchdogTask = Task { [weak self] in + let timeoutWindow: TimeInterval = .defaultCaptureStart + try await Task.sleep(nanoseconds: UInt64(timeoutWindow * 1_000_000_000)) + try Task.checkCancellation() + logger.warning("Initial frame not received before timeout") + self?.stopCaptureAndNotify() } } @@ -157,7 +167,12 @@ class SocketConnectionFrameReader: NSObject { // MARK: Private Methods - func readBytes(from stream: InputStream) { + private func stopCaptureAndNotify() { + stopCapture() + didEnd?() + } + + private func readBytes(from stream: InputStream) { if !(stream.hasBytesAvailable) { return } @@ -194,13 +209,18 @@ class SocketConnectionFrameReader: NSObject { } } - func didCaptureVideoFrame( + private func didCaptureVideoFrame( _ pixelBuffer: CVPixelBuffer?, with orientation: CGImagePropertyOrientation ) { guard let pixelBuffer else { return } + if let watchdogTask { + logger.debug("Initial frame received") + watchdogTask.cancel() + self.watchdogTask = nil + } var rotation: RTCVideoRotation switch orientation { @@ -227,8 +247,7 @@ extension SocketConnectionFrameReader: StreamDelegate { readBytes(from: aStream as! InputStream) case .endEncountered: logger.log(level: .debug, "server stream end encountered") - stopCapture() - didEnd?() + stopCaptureAndNotify() case .errorOccurred: logger.log(level: .debug, "server stream error encountered: \(aStream.streamError?.localizedDescription ?? "")") default: