Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement RPC feature #544

Merged
merged 20 commits into from
Jan 17, 2025
14 changes: 8 additions & 6 deletions Sources/LiveKit/Core/DataChannelPair.swift
Original file line number Diff line number Diff line change
Expand Up @@ -104,19 +104,21 @@ class DataChannelPair: NSObject, Loggable {
}

public func send(userPacket: Livekit_UserPacket, kind: Livekit_DataPacket.Kind) throws {
guard isOpen else {
throw LiveKitError(.invalidState, message: "Data channel is not open")
}

let packet = Livekit_DataPacket.with {
try send(dataPacket: .with {
$0.kind = kind
$0.user = userPacket
})
}

public func send(dataPacket packet: Livekit_DataPacket) throws {
guard isOpen else {
throw LiveKitError(.invalidState, message: "Data channel is not open")
}

let serializedData = try packet.serializedData()
let rtcData = RTC.createDataBuffer(data: serializedData)

let channel = _state.read { kind == .reliable ? $0.reliable : $0.lossy }
let channel = _state.read { packet.kind == .reliable ? $0.reliable : $0.lossy }
guard let sendDataResult = channel?.sendData(rtcData), sendDataResult else {
throw LiveKitError(.invalidState, message: "sendData failed")
}
Expand Down
187 changes: 187 additions & 0 deletions Sources/LiveKit/Core/RPC.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
/*
* Copyright 2025 LiveKit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import Foundation

/// Specialized error handling for RPC methods.
///
/// Instances of this type, when thrown in a RPC method handler, will have their `message`
/// serialized and sent across the wire. The sender will receive an equivalent error on the other side.
///
/// Built-in types are included but developers may use any message string, with a max length of 256 bytes.
struct RpcError: Error {
/// The error code of the RPC call. Error codes 1001-1999 are reserved for built-in errors.
///
/// See `RpcError.BuiltInError` for built-in error information.
let code: Int

/// A message to include. Strings over 256 bytes will be truncated.
let message: String

/// An optional data payload. Must be smaller than 15KB in size, or else will be truncated.
let data: String

enum BuiltInError {
case applicationError
case connectionTimeout
case responseTimeout
case recipientDisconnected
case responsePayloadTooLarge
case sendFailed
case unsupportedMethod
case recipientNotFound
case requestPayloadTooLarge
case unsupportedServer
case unsupportedVersion

var code: Int {
switch self {
case .applicationError: return 1500
case .connectionTimeout: return 1501
case .responseTimeout: return 1502
case .recipientDisconnected: return 1503
case .responsePayloadTooLarge: return 1504
case .sendFailed: return 1505
case .unsupportedMethod: return 1400
case .recipientNotFound: return 1401
case .requestPayloadTooLarge: return 1402
case .unsupportedServer: return 1403
case .unsupportedVersion: return 1404
}
}

var message: String {
switch self {
case .applicationError: return "Application error in method handler"
case .connectionTimeout: return "Connection timeout"
case .responseTimeout: return "Response timeout"
case .recipientDisconnected: return "Recipient disconnected"
case .responsePayloadTooLarge: return "Response payload too large"
case .sendFailed: return "Failed to send"
case .unsupportedMethod: return "Method not supported at destination"
case .recipientNotFound: return "Recipient not found"
case .requestPayloadTooLarge: return "Request payload too large"
case .unsupportedServer: return "RPC not supported by server"
case .unsupportedVersion: return "Unsupported RPC version"
}
}

func create(data: String = "") -> RpcError {
RpcError(code: code, message: message, data: data)
}
}

static func builtIn(_ key: BuiltInError, data: String = "") -> RpcError {
RpcError(code: key.code, message: key.message, data: data)
}

static let MAX_MESSAGE_BYTES = 256
static let MAX_DATA_BYTES = 15360 // 15 KB

static func fromProto(_ proto: Livekit_RpcError) -> RpcError {
RpcError(
code: Int(proto.code),
message: (proto.message).truncate(maxBytes: MAX_MESSAGE_BYTES),
data: proto.data.truncate(maxBytes: MAX_DATA_BYTES)
)
}

func toProto() -> Livekit_RpcError {
Livekit_RpcError.with {
$0.code = UInt32(code)
$0.message = message
$0.data = data
}
}
}

/*
* Maximum payload size for RPC requests and responses. If a payload exceeds this size,
* the RPC call will fail with a REQUEST_PAYLOAD_TOO_LARGE(1402) or RESPONSE_PAYLOAD_TOO_LARGE(1504) error.
*/
let MAX_RPC_PAYLOAD_BYTES = 15360 // 15 KB

/// A handler that processes an RPC request and returns a string
/// that will be sent back to the requester.
///
/// Throwing an `RpcError` will send the error back to the requester.
///
/// - SeeAlso: `LocalParticipant.registerRpcMethod`
public typealias RpcHandler = (RpcInvocationData) async throws -> String

public struct RpcInvocationData {
/// A unique identifier for this RPC request
let requestId: String

/// The identity of the RemoteParticipant who initiated the RPC call
let callerIdentity: Participant.Identity

/// The data sent by the caller (as a string)
let payload: String

/// The maximum time available to return a response
let responseTimeout: TimeInterval
}

struct PendingRpcResponse {
let participantIdentity: Participant.Identity
let onResolve: (_ payload: String?, _ error: RpcError?) -> Void
}

actor RpcStateManager {
private var handlers: [String: RpcHandler] = [:] // methodName to handler
private var pendingAcks: Set<String> = Set()
private var pendingResponses: [String: PendingRpcResponse] = [:] // requestId to pending response

func registerHandler(_ method: String, handler: @escaping RpcHandler) {
handlers[method] = handler
}

func unregisterHandler(_ method: String) {
handlers.removeValue(forKey: method)
}

func getHandler(for method: String) -> RpcHandler? {
handlers[method]
}

func addPendingAck(_ requestId: String) {
pendingAcks.insert(requestId)
}

@discardableResult
func removePendingAck(_ requestId: String) -> Bool {
pendingAcks.remove(requestId) != nil
}

func hasPendingAck(_ requestId: String) -> Bool {
pendingAcks.contains(requestId)
}

func setPendingResponse(_ requestId: String, response: PendingRpcResponse) {
pendingResponses[requestId] = response
}

@discardableResult
func removePendingResponse(_ requestId: String) -> PendingRpcResponse? {
pendingResponses.removeValue(forKey: requestId)
}

func removeAllPending(_ requestId: String) async {
pendingAcks.remove(requestId)
pendingResponses.removeValue(forKey: requestId)
}
}
9 changes: 8 additions & 1 deletion Sources/LiveKit/Core/Room+Engine.swift
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,13 @@ extension Room {
}

func send(userPacket: Livekit_UserPacket, kind: Livekit_DataPacket.Kind) async throws {
try await send(dataPacket: .with {
$0.user = userPacket
$0.kind = kind
})
}

func send(dataPacket packet: Livekit_DataPacket) async throws {
func ensurePublisherConnected() async throws {
guard _state.isSubscriberPrimary else { return }

Expand Down Expand Up @@ -96,7 +103,7 @@ extension Room {
}

// Should return true if successful
try publisherDataChannel.send(userPacket: userPacket, kind: kind)
try publisherDataChannel.send(dataPacket: packet)
}
}

Expand Down
35 changes: 35 additions & 0 deletions Sources/LiveKit/Core/Room+EngineDelegate.swift
Original file line number Diff line number Diff line change
Expand Up @@ -239,4 +239,39 @@ extension Room {
$0.participant?(participant, trackPublication: publication, didReceiveTranscriptionSegments: segments)
}
}

func room(didReceiveRpcResponse response: Livekit_RpcResponse) {
let (payload, error): (String?, RpcError?) = switch response.value {
case let .payload(v): (v, nil)
case let .error(e): (nil, RpcError.fromProto(e))
default: (nil, nil)
}

localParticipant.handleIncomingRpcResponse(requestId: response.requestID,
payload: payload,
error: error)
}

func room(didReceiveRpcAck ack: Livekit_RpcAck) {
let requestId = ack.requestID
localParticipant.handleIncomingRpcAck(requestId: requestId)
}

func room(didReceiveRpcRequest request: Livekit_RpcRequest, from participantIdentity: String) {
let callerIdentity = Participant.Identity(from: participantIdentity)
let requestId = request.id
let method = request.method
let payload = request.payload
let responseTimeout = TimeInterval(UInt64(request.responseTimeoutMs) / MSEC_PER_SEC)
let version = Int(request.version)

Task {
await localParticipant.handleIncomingRpcRequest(callerIdentity: callerIdentity,
requestId: requestId,
method: method,
payload: payload,
responseTimeout: responseTimeout,
version: version)
}
}
}
3 changes: 3 additions & 0 deletions Sources/LiveKit/Core/Room.swift
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,9 @@ extension Room: DataChannelDelegate {
case let .speaker(update): engine(self, didUpdateSpeakers: update.speakers)
case let .user(userPacket): engine(self, didReceiveUserPacket: userPacket)
case let .transcription(packet): room(didReceiveTranscriptionPacket: packet)
case let .rpcResponse(response): room(didReceiveRpcResponse: response)
case let .rpcAck(ack): room(didReceiveRpcAck: ack)
case let .rpcRequest(request): room(didReceiveRpcRequest: request, from: dataPacket.participantIdentity)
default: return
}
}
Expand Down
3 changes: 3 additions & 0 deletions Sources/LiveKit/Errors.swift
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public enum LiveKitErrorType: Int, Sendable {
case failedToParseUrl = 102
case failedToConvertData = 103
case invalidState = 104
case invalidParameter = 105

case webRTC = 201

Expand Down Expand Up @@ -66,6 +67,8 @@ extension LiveKitErrorType: CustomStringConvertible {
return "Failed to convert data"
case .invalidState:
return "Invalid state"
case .invalidParameter:
return "Invalid parameter"
case .webRTC:
return "WebRTC error"
case .network:
Expand Down
25 changes: 25 additions & 0 deletions Sources/LiveKit/Extensions/String.swift
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,29 @@ extension String {
var nilIfEmpty: String? {
isEmpty ? nil : self
}

var byteLength: Int {
data(using: .utf8)?.count ?? 0
}

func truncate(maxBytes: Int) -> String {
if byteLength <= maxBytes {
return self
}

var low = 0
var high = count

while low < high {
let mid = (low + high + 1) / 2
let substring = String(prefix(mid))
if substring.byteLength <= maxBytes {
low = mid
} else {
high = mid - 1
}
}

return String(prefix(low))
}
}
Loading
Loading