Skip to content

Commit

Permalink
fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
bcherry committed Jan 14, 2025
1 parent 6008805 commit c45464c
Showing 1 changed file with 47 additions and 54 deletions.
101 changes: 47 additions & 54 deletions Sources/LiveKit/Participant/LocalParticipant.swift
Original file line number Diff line number Diff line change
Expand Up @@ -446,13 +446,13 @@ extension LocalParticipant {
///
/// Example:
/// ```swift
/// try await room.localParticipant.registerRpcMethod("greet") { requestId, callerIdentity, payload, responseTimeout in
/// print("Received greeting from \(callerIdentity): \(payload)")
/// return "Hello, \(callerIdentity)!"
/// try await room.localParticipant.registerRpcMethod("greet") { data in
/// print("Received greeting from \(data.callerIdentity): \(data.payload)")
/// return "Hello, \(data.callerIdentity)!"
/// }
/// ```
///
/// The handler receives the following parameters:
/// The handler receives an `RpcInvocationData` containing the following parameters:
/// - `requestId`: A unique identifier for this RPC request
/// - `callerIdentity`: The identity of the RemoteParticipant who initiated the RPC call
/// - `payload`: The data sent by the caller (as a string)
Expand All @@ -468,8 +468,8 @@ extension LocalParticipant {
/// - Parameters:
/// - method: The name of the indicated RPC method
/// - handler: Will be invoked when an RPC request for this method is received
public func registerRpcMethod(_ method: String,
handler: @escaping RpcHandler) async {
public func registerRpcMethod(_ method: String,
handler: @escaping RpcHandler) async {
await rpcState.registerHandler(method, handler: handler)
}

Expand All @@ -485,15 +485,13 @@ extension LocalParticipant {
/// - destinationIdentity: The identity of the destination participant
/// - method: The method name to call
/// - payload: The payload to pass to the method
/// - responseTimeout: Timeout for receiving a response after initial connection.
/// Defaults to 10 seconds. Max value of UInt.max milliseconds.
/// - responseTimeout: Timeout for receiving a response after initial connection. (default 10s)
/// - Returns: The response payload
/// - Throws: RpcError on failure. Details in RpcError.message
public func performRpc(destinationIdentity: Identity,
method: String,
payload: String,
responseTimeout: TimeInterval = 10) async throws -> String {

method: String,
payload: String,
responseTimeout: TimeInterval = 10) async throws -> String {
guard payload.byteLength <= MAX_RPC_PAYLOAD_BYTES else {
throw RpcError.builtIn(.requestPayloadTooLarge)
}
Expand All @@ -502,12 +500,11 @@ extension LocalParticipant {
let maxRoundTripLatency: TimeInterval = 2
let effectiveTimeout = responseTimeout - maxRoundTripLatency

// Publish the request first
try await publishRpcRequest(destinationIdentity: destinationIdentity,
requestId: requestId,
method: method,
payload: payload,
responseTimeout: effectiveTimeout)
requestId: requestId,
method: method,
payload: payload,
responseTimeout: effectiveTimeout)

return try await withThrowingTimeout(seconds: responseTimeout) {
try await withCheckedThrowingContinuation { continuation in
Expand All @@ -520,7 +517,7 @@ extension LocalParticipant {
Task {
await self.rpcState.removePendingAck(requestId)
await self.rpcState.removePendingResponse(requestId)

if let error {
continuation.resume(throwing: error)
} else {
Expand All @@ -533,7 +530,7 @@ extension LocalParticipant {

Task {
try await Task.sleep(nanoseconds: UInt64(maxRoundTripLatency * 1_000_000_000))

if await self.rpcState.hasPendingAck(requestId) {
await self.rpcState.removeAllPending(requestId)
continuation.resume(throwing: RpcError.builtIn(.connectionTimeout))
Expand All @@ -544,14 +541,14 @@ extension LocalParticipant {
}

private func publishRpcRequest(destinationIdentity: Identity,
requestId: String,
method: String,
payload: String,
responseTimeout: TimeInterval = 10) async throws
requestId: String,
method: String,
payload: String,
responseTimeout: TimeInterval = 10) async throws
{
guard payload.byteLength <= MAX_RPC_PAYLOAD_BYTES else {
throw LiveKitError(.invalidParameter,
message: "cannot publish data larger than \(MAX_RPC_PAYLOAD_BYTES)")
throw LiveKitError(.invalidParameter,
message: "cannot publish data larger than \(MAX_RPC_PAYLOAD_BYTES)")
}

let room = try requireRoom()
Expand All @@ -571,14 +568,12 @@ extension LocalParticipant {
try await room.send(dataPacket: dataPacket)
}

private func publishRpcResponse(
destinationIdentity: Identity,
requestId: String,
payload: String?,
error: RpcError?
) async throws {
private func publishRpcResponse(destinationIdentity: Identity,
requestId: String,
payload: String?,
error: RpcError?) async throws {
let room = try requireRoom()

let dataPacket = Livekit_DataPacket.with {
$0.destinationIdentities = [destinationIdentity.stringValue]
$0.kind = .reliable
Expand All @@ -595,12 +590,10 @@ extension LocalParticipant {
try await room.send(dataPacket: dataPacket)
}

private func publishRpcAck(
destinationIdentity: Identity,
requestId: String
) async throws {
private func publishRpcAck(destinationIdentity: Identity,
requestId: String) async throws {
let room = try requireRoom()

let dataPacket = Livekit_DataPacket.with {
$0.destinationIdentities = [destinationIdentity.stringValue]
$0.kind = .reliable
Expand All @@ -613,25 +606,25 @@ extension LocalParticipant {
}

func handleIncomingRpcRequest(callerIdentity: Identity,
requestId: String,
method: String,
payload: String,
responseTimeout: TimeInterval,
version: Int) async
requestId: String,
method: String,
payload: String,
responseTimeout: TimeInterval,
version: Int) async
{
do {
try await publishRpcAck(destinationIdentity: callerIdentity,
requestId: requestId)
requestId: requestId)
} catch {
log("[Rpc] Failed to publish RPC ack for \(requestId)", .error)
}

guard version == 1 else {
do {
try await publishRpcResponse(destinationIdentity: callerIdentity,
requestId: requestId,
payload: nil,
error: RpcError.builtIn(.unsupportedVersion))
requestId: requestId,
payload: nil,
error: RpcError.builtIn(.unsupportedVersion))
} catch {
log("[Rpc] Failed to publish RPC error response for \(requestId)", .error)
}
Expand All @@ -641,9 +634,9 @@ extension LocalParticipant {
guard let handler = await rpcState.getHandler(for: method) else {
do {
try await publishRpcResponse(destinationIdentity: callerIdentity,
requestId: requestId,
payload: nil,
error: RpcError.builtIn(.unsupportedMethod))
requestId: requestId,
payload: nil,
error: RpcError.builtIn(.unsupportedMethod))
} catch {
log("[Rpc] Failed to publish RPC error response for \(requestId)", .error)
}
Expand Down Expand Up @@ -674,9 +667,9 @@ extension LocalParticipant {

do {
try await publishRpcResponse(destinationIdentity: callerIdentity,
requestId: requestId,
payload: responsePayload,
error: responseError)
requestId: requestId,
payload: responsePayload,
error: responseError)
} catch {
log("[Rpc] Failed to publish RPC response for \(requestId)", .error)
}
Expand All @@ -689,8 +682,8 @@ extension LocalParticipant {
}

func handleIncomingRpcResponse(requestId: String,
payload: String?,
error: RpcError?)
payload: String?,
error: RpcError?)
{
Task {
guard let handler = await rpcState.removePendingResponse(requestId) else {
Expand Down

0 comments on commit c45464c

Please sign in to comment.