diff --git a/Sources/LiveKit/Participant/LocalParticipant.swift b/Sources/LiveKit/Participant/LocalParticipant.swift index de9f4a886..e35fca55f 100644 --- a/Sources/LiveKit/Participant/LocalParticipant.swift +++ b/Sources/LiveKit/Participant/LocalParticipant.swift @@ -446,13 +446,13 @@ extension LocalParticipant { /// /// Example: /// ```swift - /// try await room.localParticipant.registerRpcMethod("greet") { requestId, callerIdentity, payload, responseTimeout in - /// print("Received greeting from \(callerIdentity): \(payload)") - /// return "Hello, \(callerIdentity)!" + /// try await room.localParticipant.registerRpcMethod("greet") { data in + /// print("Received greeting from \(data.callerIdentity): \(data.payload)") + /// return "Hello, \(data.callerIdentity)!" /// } /// ``` /// - /// The handler receives the following parameters: + /// The handler receives an `RpcInvocationData` containing the following parameters: /// - `requestId`: A unique identifier for this RPC request /// - `callerIdentity`: The identity of the RemoteParticipant who initiated the RPC call /// - `payload`: The data sent by the caller (as a string) @@ -468,8 +468,8 @@ extension LocalParticipant { /// - Parameters: /// - method: The name of the indicated RPC method /// - handler: Will be invoked when an RPC request for this method is received - public func registerRpcMethod(_ method: String, - handler: @escaping RpcHandler) async { + public func registerRpcMethod(_ method: String, + handler: @escaping RpcHandler) async { await rpcState.registerHandler(method, handler: handler) } @@ -485,15 +485,13 @@ extension LocalParticipant { /// - destinationIdentity: The identity of the destination participant /// - method: The method name to call /// - payload: The payload to pass to the method - /// - responseTimeout: Timeout for receiving a response after initial connection. - /// Defaults to 10 seconds. Max value of UInt.max milliseconds. + /// - responseTimeout: Timeout for receiving a response after initial connection. (default 10s) /// - Returns: The response payload /// - Throws: RpcError on failure. Details in RpcError.message public func performRpc(destinationIdentity: Identity, - method: String, - payload: String, - responseTimeout: TimeInterval = 10) async throws -> String { - + method: String, + payload: String, + responseTimeout: TimeInterval = 10) async throws -> String { guard payload.byteLength <= MAX_RPC_PAYLOAD_BYTES else { throw RpcError.builtIn(.requestPayloadTooLarge) } @@ -502,12 +500,11 @@ extension LocalParticipant { let maxRoundTripLatency: TimeInterval = 2 let effectiveTimeout = responseTimeout - maxRoundTripLatency - // Publish the request first try await publishRpcRequest(destinationIdentity: destinationIdentity, - requestId: requestId, - method: method, - payload: payload, - responseTimeout: effectiveTimeout) + requestId: requestId, + method: method, + payload: payload, + responseTimeout: effectiveTimeout) return try await withThrowingTimeout(seconds: responseTimeout) { try await withCheckedThrowingContinuation { continuation in @@ -520,7 +517,7 @@ extension LocalParticipant { Task { await self.rpcState.removePendingAck(requestId) await self.rpcState.removePendingResponse(requestId) - + if let error { continuation.resume(throwing: error) } else { @@ -533,7 +530,7 @@ extension LocalParticipant { Task { try await Task.sleep(nanoseconds: UInt64(maxRoundTripLatency * 1_000_000_000)) - + if await self.rpcState.hasPendingAck(requestId) { await self.rpcState.removeAllPending(requestId) continuation.resume(throwing: RpcError.builtIn(.connectionTimeout)) @@ -544,14 +541,14 @@ extension LocalParticipant { } private func publishRpcRequest(destinationIdentity: Identity, - requestId: String, - method: String, - payload: String, - responseTimeout: TimeInterval = 10) async throws + requestId: String, + method: String, + payload: String, + responseTimeout: TimeInterval = 10) async throws { guard payload.byteLength <= MAX_RPC_PAYLOAD_BYTES else { - throw LiveKitError(.invalidParameter, - message: "cannot publish data larger than \(MAX_RPC_PAYLOAD_BYTES)") + throw LiveKitError(.invalidParameter, + message: "cannot publish data larger than \(MAX_RPC_PAYLOAD_BYTES)") } let room = try requireRoom() @@ -571,14 +568,12 @@ extension LocalParticipant { try await room.send(dataPacket: dataPacket) } - private func publishRpcResponse( - destinationIdentity: Identity, - requestId: String, - payload: String?, - error: RpcError? - ) async throws { + private func publishRpcResponse(destinationIdentity: Identity, + requestId: String, + payload: String?, + error: RpcError?) async throws { let room = try requireRoom() - + let dataPacket = Livekit_DataPacket.with { $0.destinationIdentities = [destinationIdentity.stringValue] $0.kind = .reliable @@ -595,12 +590,10 @@ extension LocalParticipant { try await room.send(dataPacket: dataPacket) } - private func publishRpcAck( - destinationIdentity: Identity, - requestId: String - ) async throws { + private func publishRpcAck(destinationIdentity: Identity, + requestId: String) async throws { let room = try requireRoom() - + let dataPacket = Livekit_DataPacket.with { $0.destinationIdentities = [destinationIdentity.stringValue] $0.kind = .reliable @@ -613,15 +606,15 @@ extension LocalParticipant { } func handleIncomingRpcRequest(callerIdentity: Identity, - requestId: String, - method: String, - payload: String, - responseTimeout: TimeInterval, - version: Int) async + requestId: String, + method: String, + payload: String, + responseTimeout: TimeInterval, + version: Int) async { do { try await publishRpcAck(destinationIdentity: callerIdentity, - requestId: requestId) + requestId: requestId) } catch { log("[Rpc] Failed to publish RPC ack for \(requestId)", .error) } @@ -629,9 +622,9 @@ extension LocalParticipant { guard version == 1 else { do { try await publishRpcResponse(destinationIdentity: callerIdentity, - requestId: requestId, - payload: nil, - error: RpcError.builtIn(.unsupportedVersion)) + requestId: requestId, + payload: nil, + error: RpcError.builtIn(.unsupportedVersion)) } catch { log("[Rpc] Failed to publish RPC error response for \(requestId)", .error) } @@ -641,9 +634,9 @@ extension LocalParticipant { guard let handler = await rpcState.getHandler(for: method) else { do { try await publishRpcResponse(destinationIdentity: callerIdentity, - requestId: requestId, - payload: nil, - error: RpcError.builtIn(.unsupportedMethod)) + requestId: requestId, + payload: nil, + error: RpcError.builtIn(.unsupportedMethod)) } catch { log("[Rpc] Failed to publish RPC error response for \(requestId)", .error) } @@ -674,9 +667,9 @@ extension LocalParticipant { do { try await publishRpcResponse(destinationIdentity: callerIdentity, - requestId: requestId, - payload: responsePayload, - error: responseError) + requestId: requestId, + payload: responsePayload, + error: responseError) } catch { log("[Rpc] Failed to publish RPC response for \(requestId)", .error) } @@ -689,8 +682,8 @@ extension LocalParticipant { } func handleIncomingRpcResponse(requestId: String, - payload: String?, - error: RpcError?) + payload: String?, + error: RpcError?) { Task { guard let handler = await rpcState.removePendingResponse(requestId) else {