diff --git a/.changeset/fuzzy-waves-knock.md b/.changeset/fuzzy-waves-knock.md new file mode 100644 index 0000000000..364aa4d6c2 --- /dev/null +++ b/.changeset/fuzzy-waves-knock.md @@ -0,0 +1,5 @@ +--- +"livekit-client": patch +--- + +Move PeerConnection logic into PCTransportManager diff --git a/src/connectionHelper/checks/webrtc.ts b/src/connectionHelper/checks/webrtc.ts index 9082d847c7..dd85da26c7 100644 --- a/src/connectionHelper/checks/webrtc.ts +++ b/src/connectionHelper/checks/webrtc.ts @@ -38,8 +38,8 @@ export class WebRTCCheck extends Checker { } }; - if (this.room.engine.subscriber) { - this.room.engine.subscriber.onIceCandidateError = (ev) => { + if (this.room.engine.pcManager) { + this.room.engine.pcManager.subscriber.onIceCandidateError = (ev) => { if (ev instanceof RTCPeerConnectionIceErrorEvent) { this.appendWarning( `error with ICE candidate: ${ev.errorCode} ${ev.errorText} ${ev.url}`, diff --git a/src/room/PCTransport.ts b/src/room/PCTransport.ts index d47f210f95..0cf9d3efcc 100644 --- a/src/room/PCTransport.ts +++ b/src/room/PCTransport.ts @@ -33,10 +33,16 @@ export default class PCTransport extends EventEmitter { private _pc: RTCPeerConnection | null; private get pc() { - if (this._pc) return this._pc; - throw new UnexpectedConnectionState('Expected peer connection to be available'); + if (!this._pc) { + this._pc = this.createPC(); + } + return this._pc; } + private config?: RTCConfiguration; + + private mediaConstraints: Record; + pendingCandidates: RTCIceCandidateInit[] = []; restartingIce: boolean = false; @@ -57,32 +63,53 @@ export default class PCTransport extends EventEmitter { onConnectionStateChange?: (state: RTCPeerConnectionState) => void; + onIceConnectionStateChange?: (state: RTCIceConnectionState) => void; + + onSignalingStatechange?: (state: RTCSignalingState) => void; + onDataChannel?: (ev: RTCDataChannelEvent) => void; onTrack?: (ev: RTCTrackEvent) => void; constructor(config?: RTCConfiguration, mediaConstraints: Record = {}) { super(); - this._pc = isChromiumBased() + this.config = config; + this.mediaConstraints = mediaConstraints; + this._pc = this.createPC(); + } + + private createPC() { + const pc = isChromiumBased() ? // @ts-expect-error chrome allows additional media constraints to be passed into the RTCPeerConnection constructor - new RTCPeerConnection(config, mediaConstraints) - : new RTCPeerConnection(config); - this._pc.onicecandidate = (ev) => { + new RTCPeerConnection(this.config, this.mediaConstraints) + : new RTCPeerConnection(this.config); + + pc.onicecandidate = (ev) => { if (!ev.candidate) return; this.onIceCandidate?.(ev.candidate); }; - this._pc.onicecandidateerror = (ev) => { + pc.onicecandidateerror = (ev) => { this.onIceCandidateError?.(ev); }; - this._pc.onconnectionstatechange = () => { - this.onConnectionStateChange?.(this._pc?.connectionState ?? 'closed'); + + pc.oniceconnectionstatechange = () => { + this.onIceConnectionStateChange?.(pc.iceConnectionState); }; - this._pc.ondatachannel = (ev) => { + + pc.onsignalingstatechange = () => { + this.onSignalingStatechange?.(pc.signalingState); + }; + + pc.onconnectionstatechange = () => { + this.onConnectionStateChange?.(pc.connectionState); + }; + pc.ondatachannel = (ev) => { this.onDataChannel?.(ev); }; - this._pc.ontrack = (ev) => { + pc.ontrack = (ev) => { this.onTrack?.(ev); }; + return pc; } get isICEConnected(): boolean { @@ -168,7 +195,7 @@ export default class PCTransport extends EventEmitter { if (this.renegotiate) { this.renegotiate = false; - this.createAndSendOffer(); + await this.createAndSendOffer(); } else if (sd.type === 'answer') { this.emit(PCEvents.NegotiationComplete); if (sd.sdp) { @@ -183,10 +210,10 @@ export default class PCTransport extends EventEmitter { } // debounced negotiate interface - negotiate = debounce((onError?: (e: Error) => void) => { + negotiate = debounce(async (onError?: (e: Error) => void) => { this.emit(PCEvents.NegotiationStarted); try { - this.createAndSendOffer(); + await this.createAndSendOffer(); } catch (e) { if (onError) { onError(e as Error); @@ -209,11 +236,11 @@ export default class PCTransport extends EventEmitter { if (this._pc && this._pc.signalingState === 'have-local-offer') { // we're waiting for the peer to accept our offer, so we'll just wait // the only exception to this is when ICE restart is needed - const currentSD = this.pc.remoteDescription; + const currentSD = this._pc.remoteDescription; if (options?.iceRestart && currentSD) { // TODO: handle when ICE restart is needed but we don't have a remote description // the best thing to do is to recreate the peerconnection - await this.pc.setRemoteDescription(currentSD); + await this._pc.setRemoteDescription(currentSD); } else { this.renegotiate = true; return; @@ -307,7 +334,10 @@ export default class PCTransport extends EventEmitter { } addTrack(track: MediaStreamTrack) { - return this.pc.addTrack(track); + if (!this._pc) { + throw new UnexpectedConnectionState('PC closed, cannot add track'); + } + return this._pc.addTrack(track); } setTrackCodecBitrate(info: TrackBitrateInfo) { @@ -315,43 +345,46 @@ export default class PCTransport extends EventEmitter { } setConfiguration(rtcConfig: RTCConfiguration) { - return this.pc.setConfiguration(rtcConfig); + if (!this._pc) { + throw new UnexpectedConnectionState('PC closed, cannot configure'); + } + return this._pc?.setConfiguration(rtcConfig); } canRemoveTrack(): boolean { - return !!this.pc.removeTrack; + return !!this._pc?.removeTrack; } removeTrack(sender: RTCRtpSender) { - return this.pc.removeTrack(sender); + return this._pc?.removeTrack(sender); } getConnectionState() { - return this.pc.connectionState; + return this._pc?.connectionState ?? 'closed'; } getICEConnectionState() { - return this.pc.iceConnectionState; + return this._pc?.iceConnectionState ?? 'closed'; } getSignallingState() { - return this.pc.signalingState; + return this._pc?.signalingState ?? 'closed'; } getTransceivers() { - return this.pc.getTransceivers(); + return this._pc?.getTransceivers() ?? []; } getSenders() { - return this.pc.getSenders(); + return this._pc?.getSenders() ?? []; } getLocalDescription() { - return this.pc.localDescription; + return this._pc?.localDescription; } getRemoteDescription() { - return this.pc.remoteDescription; + return this.pc?.remoteDescription; } getStats() { @@ -395,7 +428,7 @@ export default class PCTransport extends EventEmitter { return candidates.get(selectedID); } - close() { + close = () => { if (!this._pc) { return; } @@ -412,7 +445,7 @@ export default class PCTransport extends EventEmitter { this._pc.onconnectionstatechange = null; this._pc.oniceconnectionstatechange = null; this._pc = null; - } + }; private async setMungedSDP(sd: RTCSessionDescriptionInit, munged?: string, remote?: boolean) { if (munged) { diff --git a/src/room/PCTransportManager.ts b/src/room/PCTransportManager.ts new file mode 100644 index 0000000000..317957a2bd --- /dev/null +++ b/src/room/PCTransportManager.ts @@ -0,0 +1,336 @@ +import log from '../logger'; +import { SignalTarget } from '../proto/livekit_rtc_pb'; +import PCTransport, { PCEvents } from './PCTransport'; +import { roomConnectOptionDefaults } from './defaults'; +import { ConnectionError, ConnectionErrorReason } from './errors'; +import CriticalTimers from './timers'; +import { Mutex, sleep } from './utils'; + +export enum PCTransportState { + NEW, + CONNECTING, + CONNECTED, + FAILED, + CLOSING, + CLOSED, +} + +export class PCTransportManager { + public publisher: PCTransport; + + public subscriber: PCTransport; + + public peerConnectionTimeout: number = roomConnectOptionDefaults.peerConnectionTimeout; + + public get needsPublisher() { + return this.isPublisherConnectionRequired; + } + + public get needsSubscriber() { + return this.isSubscriberConnectionRequired; + } + + public get currentState() { + return this.state; + } + + public onStateChange?: ( + state: PCTransportState, + pubState: RTCPeerConnectionState, + subState: RTCPeerConnectionState, + ) => void; + + public onIceCandidate?: (ev: RTCIceCandidate, target: SignalTarget) => void; + + public onDataChannel?: (ev: RTCDataChannelEvent) => void; + + public onTrack?: (ev: RTCTrackEvent) => void; + + public onPublisherOffer?: (offer: RTCSessionDescriptionInit) => void; + + private isPublisherConnectionRequired: boolean; + + private isSubscriberConnectionRequired: boolean; + + private state: PCTransportState; + + private connectionLock: Mutex; + + constructor(rtcConfig: RTCConfiguration, subscriberPrimary: boolean) { + this.isPublisherConnectionRequired = !subscriberPrimary; + this.isSubscriberConnectionRequired = subscriberPrimary; + const googConstraints = { optional: [{ googDscp: true }] }; + this.publisher = new PCTransport(rtcConfig, googConstraints); + this.subscriber = new PCTransport(rtcConfig); + + this.publisher.onConnectionStateChange = this.updateState; + this.subscriber.onConnectionStateChange = this.updateState; + this.publisher.onIceConnectionStateChange = this.updateState; + this.subscriber.onIceConnectionStateChange = this.updateState; + this.publisher.onSignalingStatechange = this.updateState; + this.subscriber.onSignalingStatechange = this.updateState; + this.publisher.onIceCandidate = (candidate) => { + this.onIceCandidate?.(candidate, SignalTarget.PUBLISHER); + }; + this.subscriber.onIceCandidate = (candidate) => { + this.onIceCandidate?.(candidate, SignalTarget.SUBSCRIBER); + }; + // in subscriber primary mode, server side opens sub data channels. + this.subscriber.onDataChannel = (ev) => { + this.onDataChannel?.(ev); + }; + this.subscriber.onTrack = (ev) => { + this.onTrack?.(ev); + }; + this.publisher.onOffer = (offer) => { + this.onPublisherOffer?.(offer); + }; + + this.state = PCTransportState.NEW; + + this.connectionLock = new Mutex(); + } + + requirePublisher(require = true) { + this.isPublisherConnectionRequired = require; + this.updateState(); + } + + requireSubscriber(require = true) { + this.isSubscriberConnectionRequired = require; + this.updateState(); + } + + createAndSendPublisherOffer(options?: RTCOfferOptions) { + return this.publisher.createAndSendOffer(options); + } + + setPublisherAnswer(sd: RTCSessionDescriptionInit) { + return this.publisher.setRemoteDescription(sd); + } + + removeTrack(sender: RTCRtpSender) { + return this.publisher.removeTrack(sender); + } + + async close() { + if (this.publisher && this.publisher.getSignallingState() !== 'closed') { + const publisher = this.publisher; + for (const sender of publisher.getSenders()) { + try { + // TODO: react-native-webrtc doesn't have removeTrack yet. + if (publisher.canRemoveTrack()) { + publisher.removeTrack(sender); + } + } catch (e) { + log.warn('could not removeTrack', { error: e }); + } + } + } + await Promise.all([this.publisher.close(), this.subscriber.close()]); + this.updateState(); + } + + async triggerIceRestart() { + this.subscriber.restartingIce = true; + // only restart publisher if it's needed + if (this.needsPublisher) { + await this.createAndSendPublisherOffer({ iceRestart: true }); + } + } + + async addIceCandidate(candidate: RTCIceCandidateInit, target: SignalTarget) { + if (target === SignalTarget.PUBLISHER) { + await this.publisher.addIceCandidate(candidate); + } else { + await this.subscriber.addIceCandidate(candidate); + } + } + + async createSubscriberAnswerFromOffer(sd: RTCSessionDescriptionInit) { + log.debug('received server offer', { + RTCSdpType: sd.type, + signalingState: this.subscriber.getSignallingState().toString(), + }); + await this.subscriber.setRemoteDescription(sd); + + // answer the offer + const answer = await this.subscriber.createAndSetAnswer(); + return answer; + } + + updateConfiguration(config: RTCConfiguration, iceRestart?: boolean) { + this.publisher.setConfiguration(config); + this.subscriber.setConfiguration(config); + if (iceRestart) { + this.triggerIceRestart(); + } + } + + async ensurePCTransportConnection(abortController?: AbortController, timeout?: number) { + const unlock = await this.connectionLock.lock(); + try { + if ( + this.isPublisherConnectionRequired && + this.publisher.getConnectionState() !== 'connected' && + this.publisher.getConnectionState() !== 'connecting' + ) { + log.debug('negotiation required, start negotiating'); + this.publisher.negotiate(); + } + await Promise.all( + this.requiredTransports?.map((transport) => + this.ensureTransportConnected(transport, abortController, timeout), + ), + ); + } finally { + unlock(); + } + } + + async negotiate(abortController: AbortController) { + return new Promise(async (resolve, reject) => { + const negotiationTimeout = setTimeout(() => { + reject('negotiation timed out'); + }, this.peerConnectionTimeout); + + const abortHandler = () => { + clearTimeout(negotiationTimeout); + reject('negotiation aborted'); + }; + + abortController.signal.addEventListener('abort', abortHandler); + this.publisher.once(PCEvents.NegotiationStarted, () => { + if (abortController.signal.aborted) { + return; + } + this.publisher.once(PCEvents.NegotiationComplete, () => { + clearTimeout(negotiationTimeout); + resolve(); + }); + }); + + await this.publisher.negotiate((e) => { + clearTimeout(negotiationTimeout); + reject(e); + }); + }); + } + + addPublisherTransceiver(track: MediaStreamTrack, transceiverInit: RTCRtpTransceiverInit) { + return this.publisher.addTransceiver(track, transceiverInit); + } + + addPublisherTrack(track: MediaStreamTrack) { + return this.publisher.addTrack(track); + } + + createPublisherDataChannel(label: string, dataChannelDict: RTCDataChannelInit) { + return this.publisher.createDataChannel(label, dataChannelDict); + } + + /** + * Returns the first required transport's address if no explicit target is specified + */ + getConnectedAddress(target?: SignalTarget) { + if (target === SignalTarget.PUBLISHER) { + return this.publisher.getConnectedAddress(); + } else if (target === SignalTarget.SUBSCRIBER) { + return this.publisher.getConnectedAddress(); + } + return this.requiredTransports[0].getConnectedAddress(); + } + + private get requiredTransports() { + const transports: PCTransport[] = []; + if (this.isPublisherConnectionRequired) { + transports.push(this.publisher); + } + if (this.isSubscriberConnectionRequired) { + transports.push(this.subscriber); + } + return transports; + } + + private updateState = () => { + const previousState = this.state; + + const connectionStates = this.requiredTransports.map((tr) => tr.getConnectionState()); + if (connectionStates.every((st) => st === 'connected')) { + this.state = PCTransportState.CONNECTED; + } else if (connectionStates.some((st) => st === 'failed')) { + this.state = PCTransportState.FAILED; + } else if (connectionStates.some((st) => st === 'connecting')) { + this.state = PCTransportState.CONNECTING; + } else if (connectionStates.every((st) => st === 'closed')) { + this.state = PCTransportState.CLOSED; + } else if (connectionStates.some((st) => st === 'closed')) { + this.state = PCTransportState.CLOSING; + } else if (connectionStates.every((st) => st === 'new')) { + this.state = PCTransportState.NEW; + } + + if (previousState !== this.state) { + log.debug( + `pc state change: from ${PCTransportState[previousState]} to ${ + PCTransportState[this.state] + }`, + ); + this.onStateChange?.( + this.state, + this.publisher.getConnectionState(), + this.subscriber.getConnectionState(), + ); + } + }; + + private async ensureTransportConnected( + pcTransport: PCTransport, + abortController?: AbortController, + timeout: number = this.peerConnectionTimeout, + ) { + const connectionState = pcTransport.getConnectionState(); + if (connectionState === 'connected') { + return; + } + + return new Promise(async (resolve, reject) => { + const abortHandler = () => { + log.warn('abort transport connection'); + CriticalTimers.clearTimeout(connectTimeout); + + reject( + new ConnectionError( + 'room connection has been cancelled', + ConnectionErrorReason.Cancelled, + ), + ); + }; + if (abortController?.signal.aborted) { + abortHandler(); + } + abortController?.signal.addEventListener('abort', abortHandler); + + const connectTimeout = CriticalTimers.setTimeout(() => { + abortController?.signal.removeEventListener('abort', abortHandler); + reject(new ConnectionError('could not establish pc connection')); + }, timeout); + + while (this.state !== PCTransportState.CONNECTED) { + await sleep(50); // FIXME we shouldn't rely on `sleep` in the connection paths, as it invokes `setTimeout` which can be drastically throttled by browser implementations + if (abortController?.signal.aborted) { + reject( + new ConnectionError( + 'room connection has been cancelled', + ConnectionErrorReason.Cancelled, + ), + ); + return; + } + } + CriticalTimers.clearTimeout(connectTimeout); + abortController?.signal.removeEventListener('abort', abortHandler); + resolve(); + }); + } +} diff --git a/src/room/RTCEngine.ts b/src/room/RTCEngine.ts index 8bca219d85..4c323fad01 100644 --- a/src/room/RTCEngine.ts +++ b/src/room/RTCEngine.ts @@ -2,7 +2,7 @@ import { EventEmitter } from 'events'; import type { MediaAttributes } from 'sdp-transform'; import type TypedEventEmitter from 'typed-emitter'; import type { SignalOptions } from '../api/SignalClient'; -import { SignalClient } from '../api/SignalClient'; +import { SignalClient, toProtoSessionDescription } from '../api/SignalClient'; import log from '../logger'; import type { InternalRoomOptions } from '../options'; import { @@ -19,18 +19,22 @@ import { UserPacket, } from '../proto/livekit_models_pb'; import { - AddTrackRequest, - ConnectionQualityUpdate, - JoinResponse, - LeaveRequest, - ReconnectResponse, + type AddTrackRequest, + type ConnectionQualityUpdate, + DataChannelInfo, + type JoinResponse, + type LeaveRequest, + type ReconnectResponse, SignalTarget, - StreamStateUpdate, - SubscriptionPermissionUpdate, - SubscriptionResponse, - TrackPublishedResponse, + type StreamStateUpdate, + type SubscriptionPermissionUpdate, + type SubscriptionResponse, + SyncState, + type TrackPublishedResponse, + UpdateSubscription, } from '../proto/livekit_rtc_pb'; import PCTransport, { PCEvents } from './PCTransport'; +import { PCTransportManager, PCTransportState } from './PCTransportManager'; import type { ReconnectContext, ReconnectPolicy } from './ReconnectPolicy'; import type { RegionUrlProvider } from './RegionUrlProvider'; import { roomConnectOptionDefaults } from './defaults'; @@ -44,10 +48,13 @@ import { import { EngineEvent } from './events'; import CriticalTimers from './timers'; import type LocalTrack from './track/LocalTrack'; +import type LocalTrackPublication from './track/LocalTrackPublication'; import type LocalVideoTrack from './track/LocalVideoTrack'; import type { SimulcastTrackInfo } from './track/LocalVideoTrack'; +import type RemoteTrackPublication from './track/RemoteTrackPublication'; import { Track } from './track/Track'; import type { TrackPublishOptions, VideoCodec } from './track/options'; +import { getTrackPublicationInfo } from './track/utils'; import { Mutex, isVideoCodec, @@ -73,10 +80,6 @@ enum PCState { /** @internal */ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmitter) { - publisher?: PCTransport; - - subscriber?: PCTransport; - client: SignalClient; rtcConfig: RTCConfiguration = {}; @@ -85,6 +88,8 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit fullReconnectOnNext: boolean = false; + pcManager?: PCTransportManager; + /** * @internal */ @@ -108,8 +113,6 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit private subscriberPrimary: boolean = false; - private primaryTransport?: PCTransport; - private pcState: PCState = PCState.New; private _isClosed: boolean = true; @@ -118,10 +121,6 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit [key: string]: { resolve: (info: TrackInfo) => void; reject: () => void }; } = {}; - // true if publisher connection has already been established. - // this is helpful to know if we need to restart ICE on the publisher connection - private hasPublished: boolean = false; - // keep join info around for reconnect, this could be a region url private url?: string; @@ -201,8 +200,8 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit this.latestJoinResponse = joinResponse; this.subscriberPrimary = joinResponse.subscriberPrimary; - if (!this.publisher) { - this.configure(joinResponse); + if (!this.pcManager) { + await this.configure(joinResponse); } // create offer @@ -247,28 +246,8 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit } async cleanupPeerConnections() { - if (this.publisher && this.publisher.getSignallingState() !== 'closed') { - this.publisher.getSenders().forEach((sender) => { - try { - // TODO: react-native-webrtc doesn't have removeTrack yet. - if (this.publisher?.canRemoveTrack()) { - this.publisher?.removeTrack(sender); - } - } catch (e) { - log.warn('could not removeTrack', { error: e }); - } - }); - } - if (this.publisher) { - this.publisher.close(); - this.publisher = undefined; - } - if (this.subscriber) { - this.subscriber.close(); - this.subscriber = undefined; - } - this.hasPublished = false; - this.primaryTransport = undefined; + await this.pcManager?.close(); + this.pcManager = undefined; const dcCleanup = (dc: RTCDataChannel | undefined) => { if (!dc) return; @@ -336,7 +315,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit delete this.pendingTrackResolvers[sender.track.id]; } try { - this.publisher?.removeTrack(sender); + this.pcManager!.removeTrack(sender); return true; } catch (e: unknown) { log.warn('failed to remove track', { error: e, method: 'removeTrack' }); @@ -353,10 +332,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit } async getConnectedServerAddress(): Promise { - if (this.primaryTransport === undefined) { - return undefined; - } - return this.primaryTransport.getConnectedAddress(); + return this.pcManager?.getConnectedAddress(); } /* @internal */ @@ -364,9 +340,9 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit this.regionUrlProvider = provider; } - private configure(joinResponse: JoinResponse) { + private async configure(joinResponse: JoinResponse) { // already configured - if (this.publisher || this.subscriber) { + if (this.pcManager && this.pcManager.currentState !== PCTransportState.NEW) { return; } @@ -374,71 +350,42 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit const rtcConfig = this.makeRTCConfiguration(joinResponse); - const googConstraints = { optional: [{ googDscp: true }] }; - this.publisher = new PCTransport(rtcConfig, googConstraints); - this.subscriber = new PCTransport(rtcConfig); + this.pcManager = new PCTransportManager(rtcConfig, joinResponse.subscriberPrimary); - this.emit(EngineEvent.TransportsCreated, this.publisher, this.subscriber); + this.emit(EngineEvent.TransportsCreated, this.pcManager.publisher, this.pcManager.subscriber); - this.publisher.onIceCandidate = (candidate) => { - log.trace('adding ICE candidate for peer', candidate); - this.client.sendIceCandidate(candidate, SignalTarget.PUBLISHER); + this.pcManager.onIceCandidate = (candidate, target) => { + this.client.sendIceCandidate(candidate, target); }; - this.subscriber.onIceCandidate = (candidate) => { - this.client.sendIceCandidate(candidate, SignalTarget.SUBSCRIBER); - }; - - this.publisher.onOffer = (offer) => { + this.pcManager.onPublisherOffer = (offer) => { this.client.sendOffer(offer); }; - let primaryTransport = this.publisher; - let secondaryTransport = this.subscriber; - let subscriberPrimary = joinResponse.subscriberPrimary; - if (subscriberPrimary) { - primaryTransport = this.subscriber; - secondaryTransport = this.publisher; - // in subscriber primary mode, server side opens sub data channels. - this.subscriber.onDataChannel = this.handleDataChannel; - } - this.primaryTransport = primaryTransport; - primaryTransport.onConnectionStateChange = async (connectionState) => { + this.pcManager.onDataChannel = this.handleDataChannel; + this.pcManager.onStateChange = async (connectionState, publisherState, subscriberState) => { log.debug(`primary PC state changed ${connectionState}`); - if (connectionState === 'connected') { + if (connectionState === PCTransportState.CONNECTED) { const shouldEmit = this.pcState === PCState.New; this.pcState = PCState.Connected; if (shouldEmit) { this.emit(EngineEvent.Connected, joinResponse); } - } else if (connectionState === 'failed') { + } else if (connectionState === PCTransportState.FAILED) { // on Safari, PeerConnection will switch to 'disconnected' during renegotiation if (this.pcState === PCState.Connected) { this.pcState = PCState.Disconnected; this.handleDisconnect( - 'primary peerconnection', - subscriberPrimary + 'peerconnection failed', + subscriberState === 'failed' ? ReconnectReason.RR_SUBSCRIBER_FAILED : ReconnectReason.RR_PUBLISHER_FAILED, ); } } }; - secondaryTransport.onConnectionStateChange = async (connectionState) => { - log.debug(`secondary PC state changed ${connectionState}`); - // also reconnect if secondary peerconnection fails - if (connectionState === 'failed') { - this.handleDisconnect( - 'secondary peerconnection', - subscriberPrimary - ? ReconnectReason.RR_PUBLISHER_FAILED - : ReconnectReason.RR_SUBSCRIBER_FAILED, - ); - } - }; - - this.subscriber.onTrack = (ev: RTCTrackEvent) => { + this.pcManager.onTrack = (ev: RTCTrackEvent) => { this.emit(EngineEvent.MediaTrackAdded, ev.track, ev.streams[0], ev.receiver); }; @@ -448,42 +395,30 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit private setupSignalClientCallbacks() { // configure signaling client this.client.onAnswer = async (sd) => { - if (!this.publisher) { + if (!this.pcManager) { return; } log.debug('received server answer', { RTCSdpType: sd.type, - signalingState: this.publisher.getSignallingState().toString(), }); - await this.publisher.setRemoteDescription(sd); + await this.pcManager.setPublisherAnswer(sd); }; // add candidate on trickle this.client.onTrickle = (candidate, target) => { - if (!this.publisher || !this.subscriber) { + if (!this.pcManager) { return; } log.trace('got ICE candidate from peer', { candidate, target }); - if (target === SignalTarget.PUBLISHER) { - this.publisher.addIceCandidate(candidate); - } else { - this.subscriber.addIceCandidate(candidate); - } + this.pcManager.addIceCandidate(candidate, target); }; // when server creates an offer for the client this.client.onOffer = async (sd) => { - if (!this.subscriber) { + if (!this.pcManager) { return; } - log.debug('received server offer', { - RTCSdpType: sd.type, - signalingState: this.subscriber.getSignallingState().toString(), - }); - await this.subscriber.setRemoteDescription(sd); - - // answer the offer - const answer = await this.subscriber.createAndSetAnswer(); + const answer = await this.pcManager.createSubscriberAnswerFromOffer(sd); this.client.sendAnswer(answer); }; @@ -509,7 +444,6 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit this.client.onLeave = (leave?: LeaveRequest) => { if (leave?.canReconnect) { this.fullReconnectOnNext = true; - this.primaryTransport = undefined; // reconnect immediately instead of waiting for next attempt this.handleDisconnect(leaveReconnect); } else { @@ -522,6 +456,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit private makeRTCConfiguration(serverResponse: JoinResponse | ReconnectResponse): RTCConfiguration { const rtcConfig = { ...this.rtcConfig }; + if (this.signalOpts?.e2eeEnabled) { log.debug('E2EE - setting up transports with insertable streams'); // this makes sure that no data is sent before the transforms are ready @@ -561,7 +496,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit } private createDataChannels() { - if (!this.publisher) { + if (!this.pcManager) { return; } @@ -576,12 +511,12 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit } // create data channels - this.lossyDC = this.publisher.createDataChannel(lossyDataChannel, { + this.lossyDC = this.pcManager.createPublisherDataChannel(lossyDataChannel, { // will drop older packets that arrive ordered: true, maxRetransmits: 0, }); - this.reliableDC = this.publisher.createDataChannel(reliableDataChannel, { + this.reliableDC = this.pcManager.createPublisherDataChannel(reliableDataChannel, { ordered: true, }); @@ -747,7 +682,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit opts: TrackPublishOptions, encodings?: RTCRtpEncodingParameters[], ) { - if (!this.publisher) { + if (!this.pcManager) { throw new UnexpectedConnectionState('publisher is closed'); } @@ -762,7 +697,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit transceiverInit.sendEncodings = encodings; } // addTransceiver for react-native is async. web is synchronous, but await won't effect it. - const transceiver = await this.publisher.addTransceiver( + const transceiver = await this.pcManager.addPublisherTransceiver( track.mediaStreamTrack, transceiverInit, ); @@ -780,7 +715,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit opts: TrackPublishOptions, encodings?: RTCRtpEncodingParameters[], ) { - if (!this.publisher) { + if (!this.pcManager) { throw new UnexpectedConnectionState('publisher is closed'); } const transceiverInit: RTCRtpTransceiverInit = { direction: 'sendonly' }; @@ -788,7 +723,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit transceiverInit.sendEncodings = encodings; } // addTransceiver for react-native is async. web is synchronous, but await won't effect it. - const transceiver = await this.publisher.addTransceiver( + const transceiver = await this.pcManager.addPublisherTransceiver( simulcastTrack.mediaStreamTrack, transceiverInit, ); @@ -801,10 +736,10 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit } private async createRTCRtpSender(track: MediaStreamTrack) { - if (!this.publisher) { + if (!this.pcManager) { throw new UnexpectedConnectionState('publisher is closed'); } - return this.publisher.addTrack(track); + return this.pcManager.addPublisherTrack(track); } // websocket reconnect behavior. if websocket is interrupted, and the PeerConnection @@ -869,7 +804,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit this.clientConfiguration?.resumeConnection === ClientConfigSetting.DISABLED || // signaling state could change to closed due to hardware sleep // those connections cannot be resumed - (this.primaryTransport?.getSignallingState() ?? 'closed') === 'closed' + (this.pcManager?.currentState ?? PCTransportState.NEW) === PCTransportState.NEW ) { this.fullReconnectOnNext = true; } @@ -984,7 +919,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit throw new UnexpectedConnectionState('could not reconnect, url or token not saved'); } // trigger publisher reconnect - if (!this.publisher || !this.subscriber) { + if (!this.pcManager) { throw new UnexpectedConnectionState('publisher and subscriber connections unset'); } @@ -996,8 +931,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit const res = await this.client.reconnect(this.url, this.token, this.participantSid, reason); if (res) { const rtcConfig = this.makeRTCConfiguration(res); - this.publisher.setConfiguration(rtcConfig); - this.subscriber.setConfiguration(rtcConfig); + this.pcManager.updateConfiguration(rtcConfig); } } catch (e) { let message = ''; @@ -1017,12 +951,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit throw new Error('simulated failure'); } - this.subscriber.restartingIce = true; - - // only restart publisher if it's needed - if (this.hasPublished) { - await this.publisher.createAndSendOffer({ iceRestart: true }); - } + await this.pcManager.triggerIceRestart(); await this.waitForPCReconnected(); this.client.setReconnected(); @@ -1038,72 +967,28 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit } async waitForPCInitialConnection(timeout?: number, abortController?: AbortController) { - if (this.pcState === PCState.Connected) { - return; - } - if (this.pcState !== PCState.New) { - throw new UnexpectedConnectionState( - 'Expected peer connection to be new on initial connection', - ); + if (!this.pcManager) { + throw new UnexpectedConnectionState('PC manager is closed'); } - return new Promise((resolve, reject) => { - const abortHandler = () => { - log.warn('closing engine'); - CriticalTimers.clearTimeout(connectTimeout); - - reject( - new ConnectionError( - 'room connection has been cancelled', - ConnectionErrorReason.Cancelled, - ), - ); - }; - if (abortController?.signal.aborted) { - abortHandler(); - } - abortController?.signal.addEventListener('abort', abortHandler); - const onConnected = () => { - CriticalTimers.clearTimeout(connectTimeout); - abortController?.signal.removeEventListener('abort', abortHandler); - resolve(); - }; - const connectTimeout = CriticalTimers.setTimeout(() => { - this.off(EngineEvent.Connected, onConnected); - reject(new ConnectionError('could not establish pc connection')); - }, timeout ?? this.peerConnectionTimeout); - this.once(EngineEvent.Connected, onConnected); - }); + await this.pcManager.ensurePCTransportConnection(abortController, timeout); } private async waitForPCReconnected() { - const startTime = Date.now(); - let now = startTime; this.pcState = PCState.Reconnecting; log.debug('waiting for peer connection to reconnect'); - while (now - startTime < this.peerConnectionTimeout) { - if (this.primaryTransport === undefined) { - // we can abort early, connection is hosed - break; - } else if ( - // on Safari, we don't get a connectionstatechanged event during ICE restart - // this means we'd have to check its status manually and update address - // manually - now - startTime > minReconnectWait && - this.primaryTransport?.getConnectionState() === 'connected' && - (!this.hasPublished || this.publisher?.getConnectionState() === 'connected') - ) { - this.pcState = PCState.Connected; - } - if (this.pcState === PCState.Connected) { - return; + try { + await sleep(minReconnectWait); // FIXME setTimeout again not ideal for a connection critical path + if (!this.pcManager) { + throw new UnexpectedConnectionState('PC manager is closed'); } - await sleep(100); - now = Date.now(); + await this.pcManager.ensurePCTransportConnection(undefined, this.peerConnectionTimeout); + this.pcState = PCState.Connected; + } catch (e: any) { + // TODO do we need a `failed` state here for the PC? + this.pcState = PCState.Disconnected; + throw new ConnectionError(`could not establish PC connection, ${e.message}`); } - - // have not reconnected, throw - throw new ConnectionError('could not establish PC connection'); } waitForRestarted = () => { @@ -1161,7 +1046,10 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit kind: DataPacket_Kind, subscriber: boolean = this.subscriberPrimary, ) { - const transport = subscriber ? this.subscriber : this.publisher; + if (!this.pcManager) { + throw new UnexpectedConnectionState('PC manager is closed'); + } + const transport = subscriber ? this.pcManager.subscriber : this.pcManager.publisher; const transportName = subscriber ? 'Subscriber' : 'Publisher'; if (!transport) { throw new ConnectionError(`${transportName} connection not set`); @@ -1169,8 +1057,8 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit if ( !subscriber && - !this.publisher?.isICEConnected && - this.publisher?.getICEConnectionState() !== 'checking' + !this.pcManager.publisher.isICEConnected && + this.pcManager.publisher.getICEConnectionState() !== 'checking' ) { // start negotiation this.negotiate(); @@ -1204,30 +1092,14 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit /* @internal */ verifyTransport(): boolean { - // primary connection - if (!this.primaryTransport) { + if (!this.pcManager) { return false; } - if ( - this.primaryTransport.getConnectionState() === 'closed' || - this.primaryTransport.getConnectionState() === 'failed' - ) { + // primary connection + if (this.pcManager.currentState !== PCTransportState.CONNECTED) { return false; } - // also verify publisher connection if it's needed or different - if (this.hasPublished && this.subscriberPrimary) { - if (!this.publisher) { - return false; - } - if ( - this.publisher.getConnectionState() === 'closed' || - this.publisher.getConnectionState() === 'failed' - ) { - return false; - } - } - // ensure signal is connected if (!this.client.ws || this.client.ws.readyState === WebSocket.CLOSED) { return false; @@ -1236,19 +1108,21 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit } /** @internal */ - negotiate(): Promise { + async negotiate(): Promise { // observe signal state - return new Promise((resolve, reject) => { - if (!this.publisher) { - reject(new NegotiationError('publisher is not defined')); + return new Promise(async (resolve, reject) => { + if (!this.pcManager) { + reject(new NegotiationError('PC manager is closed')); return; } - this.hasPublished = true; + this.pcManager.requirePublisher(); + + const abortController = new AbortController(); const handleClosed = () => { + abortController.abort(); log.debug('engine disconnected while negotiation was ongoing'); - cleanup(); resolve(); return; }; @@ -1258,42 +1132,32 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit } this.on(EngineEvent.Closing, handleClosed); - const negotiationTimeout = setTimeout(() => { - reject('negotiation timed out'); - this.handleDisconnect('negotiation', ReconnectReason.RR_SIGNAL_DISCONNECTED); - }, this.peerConnectionTimeout); - - const cleanup = () => { - clearTimeout(negotiationTimeout); - this.off(EngineEvent.Closing, handleClosed); - }; - - this.publisher.once(PCEvents.NegotiationStarted, () => { - this.publisher?.once(PCEvents.NegotiationComplete, () => { - cleanup(); - resolve(); - }); - }); - - this.publisher.once(PCEvents.RTPVideoPayloadTypes, (rtpTypes: MediaAttributes['rtp']) => { - const rtpMap = new Map(); - rtpTypes.forEach((rtp) => { - const codec = rtp.codec.toLowerCase(); - if (isVideoCodec(codec)) { - rtpMap.set(rtp.payload, codec); - } - }); - this.emit(EngineEvent.RTPVideoMapUpdate, rtpMap); - }); + this.pcManager.publisher.once( + PCEvents.RTPVideoPayloadTypes, + (rtpTypes: MediaAttributes['rtp']) => { + const rtpMap = new Map(); + rtpTypes.forEach((rtp) => { + const codec = rtp.codec.toLowerCase(); + if (isVideoCodec(codec)) { + rtpMap.set(rtp.payload, codec); + } + }); + this.emit(EngineEvent.RTPVideoMapUpdate, rtpMap); + }, + ); - this.publisher.negotiate((e) => { - cleanup(); - reject(e); + try { + await this.pcManager.negotiate(abortController); + resolve(); + } catch (e: any) { if (e instanceof NegotiationError) { this.fullReconnectOnNext = true; } this.handleDisconnect('negotiation', ReconnectReason.RR_UNKNOWN); - }); + reject(e); + } finally { + this.off(EngineEvent.Closing, handleClosed); + } }); } @@ -1315,12 +1179,80 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit } } + /** @internal */ + sendSyncState(remoteTracks: RemoteTrackPublication[], localTracks: LocalTrackPublication[]) { + if (!this.pcManager) { + log.warn('sync state cannot be sent without peer connection setup'); + return; + } + const previousAnswer = this.pcManager.subscriber.getLocalDescription(); + const previousOffer = this.pcManager.subscriber.getRemoteDescription(); + + /* 1. autosubscribe on, so subscribed tracks = all tracks - unsub tracks, + in this case, we send unsub tracks, so server add all tracks to this + subscribe pc and unsub special tracks from it. + 2. autosubscribe off, we send subscribed tracks. + */ + const autoSubscribe = this.signalOpts?.autoSubscribe ?? true; + const trackSids = new Array(); + + remoteTracks.forEach((track) => { + if (track.isDesired !== autoSubscribe) { + trackSids.push(track.trackSid); + } + }); + + this.client.sendSyncState( + new SyncState({ + answer: previousAnswer + ? toProtoSessionDescription({ + sdp: previousAnswer.sdp, + type: previousAnswer.type, + }) + : undefined, + offer: previousOffer + ? toProtoSessionDescription({ + sdp: previousOffer.sdp, + type: previousOffer.type, + }) + : undefined, + subscription: new UpdateSubscription({ + trackSids, + subscribe: !autoSubscribe, + participantTracks: [], + }), + publishTracks: getTrackPublicationInfo(localTracks), + dataChannels: this.dataChannelsInfo(), + }), + ); + } + /* @internal */ failNext() { // debugging method to fail the next reconnect/resume attempt this.shouldFailNext = true; } + private dataChannelsInfo(): DataChannelInfo[] { + const infos: DataChannelInfo[] = []; + const getInfo = (dc: RTCDataChannel | undefined, target: SignalTarget) => { + if (dc?.id !== undefined && dc.id !== null) { + infos.push( + new DataChannelInfo({ + label: dc.label, + id: dc.id, + target, + }), + ); + } + }; + getInfo(this.dataChannelForKind(DataPacket_Kind.LOSSY), SignalTarget.PUBLISHER); + getInfo(this.dataChannelForKind(DataPacket_Kind.RELIABLE), SignalTarget.PUBLISHER); + getInfo(this.dataChannelForKind(DataPacket_Kind.LOSSY, true), SignalTarget.SUBSCRIBER); + getInfo(this.dataChannelForKind(DataPacket_Kind.RELIABLE, true), SignalTarget.SUBSCRIBER); + return infos; + } + private clearReconnectTimeout() { if (this.reconnectTimeout) { CriticalTimers.clearTimeout(this.reconnectTimeout); diff --git a/src/room/Room.ts b/src/room/Room.ts index fb773162b3..033670a2c7 100644 --- a/src/room/Room.ts +++ b/src/room/Room.ts @@ -2,7 +2,6 @@ import { protoInt64 } from '@bufbuild/protobuf'; import { EventEmitter } from 'events'; import type TypedEmitter from 'typed-emitter'; import 'webrtc-adapter'; -import { toProtoSessionDescription } from '../api/SignalClient'; import { EncryptionEvent } from '../e2ee'; import { E2EEManager } from '../e2ee/E2eeManager'; import log from '../logger'; @@ -35,8 +34,6 @@ import { StreamStateUpdate, SubscriptionPermissionUpdate, SubscriptionResponse, - SyncState, - UpdateSubscription, } from '../proto/livekit_rtc_pb'; import { getBrowser } from '../utils/browserParser'; import DeviceManager from './DeviceManager'; @@ -1551,49 +1548,12 @@ class Room extends (EventEmitter as new () => TypedEmitter) } private sendSyncState() { - const previousAnswer = this.engine.subscriber?.getLocalDescription(); - const previousOffer = this.engine.subscriber?.getRemoteDescription(); - - if (!previousAnswer) { - return; - } - - /* 1. autosubscribe on, so subscribed tracks = all tracks - unsub tracks, - in this case, we send unsub tracks, so server add all tracks to this - subscribe pc and unsub special tracks from it. - 2. autosubscribe off, we send subscribed tracks. - */ - const autoSubscribe = this.connOptions?.autoSubscribe ?? true; - const trackSids = new Array(); - this.participants.forEach((participant) => { - participant.tracks.forEach((track) => { - if (track.isDesired !== autoSubscribe) { - trackSids.push(track.trackSid); - } - }); - }); - - this.engine.client.sendSyncState( - new SyncState({ - answer: toProtoSessionDescription({ - sdp: previousAnswer.sdp, - type: previousAnswer.type, - }), - offer: previousOffer - ? toProtoSessionDescription({ - sdp: previousOffer.sdp, - type: previousOffer.type, - }) - : undefined, - subscription: new UpdateSubscription({ - trackSids, - subscribe: !autoSubscribe, - participantTracks: [], - }), - publishTracks: this.localParticipant.publishedTracksInfo(), - dataChannels: this.localParticipant.dataChannelsInfo(), - }), - ); + const remoteTracks = Array.from(this.participants.values()).reduce((acc, participant) => { + acc.push(...(participant.getTracks() as RemoteTrackPublication[])); // FIXME would be nice to have this return RemoteTrackPublications directly instead of the type cast + return acc; + }, [] as RemoteTrackPublication[]); + const localTracks = this.localParticipant.getTracks() as LocalTrackPublication[]; // FIXME would be nice to have this return LocalTrackPublications directly instead of the type cast + this.engine.sendSyncState(remoteTracks, localTracks); } /** diff --git a/src/room/participant/LocalParticipant.ts b/src/room/participant/LocalParticipant.ts index 5b317bb422..3ede1a594e 100644 --- a/src/room/participant/LocalParticipant.ts +++ b/src/room/participant/LocalParticipant.ts @@ -10,13 +10,11 @@ import { } from '../../proto/livekit_models_pb'; import { AddTrackRequest, - DataChannelInfo, - SignalTarget, SimulcastCodec, SubscribedQualityUpdate, - TrackPublishedResponse, TrackUnpublishedResponse, } from '../../proto/livekit_rtc_pb'; +import { PCTransportState } from '../PCTransportManager'; import type RTCEngine from '../RTCEngine'; import { defaultVideoCodec } from '../defaults'; import { DeviceUnsupportedError, TrackInvalidError, UnexpectedConnectionState } from '../errors'; @@ -766,8 +764,8 @@ export default class LocalParticipant extends Participant { publication.options = opts; track.sid = ti.sid; - if (!this.engine.publisher) { - throw new UnexpectedConnectionState('publisher is closed'); + if (!this.engine.pcManager) { + throw new UnexpectedConnectionState('pcManager is not ready'); } log.debug(`publishing ${track.kind} with encodings`, { encodings, trackInfo: ti }); @@ -783,21 +781,21 @@ export default class LocalParticipant extends Participant { fix the issue. */ let trackTransceiver: RTCRtpTransceiver | undefined = undefined; - for (const transceiver of this.engine.publisher.getTransceivers()) { + for (const transceiver of this.engine.pcManager.publisher.getTransceivers()) { if (transceiver.sender === track.sender) { trackTransceiver = transceiver; break; } } if (trackTransceiver) { - this.engine.publisher.setTrackCodecBitrate({ + this.engine.pcManager.publisher.setTrackCodecBitrate({ transceiver: trackTransceiver, codec: 'opus', maxbr: encodings[0]?.maxBitrate ? encodings[0].maxBitrate / 1000 : 0, }); } } else if (track.codec && isSVCCodec(track.codec) && encodings[0]?.maxBitrate) { - this.engine.publisher.setTrackCodecBitrate({ + this.engine.pcManager.publisher.setTrackCodecBitrate({ cid: req.cid, codec: track.codec, maxbr: encodings[0].maxBitrate / 1000, @@ -929,12 +927,12 @@ export default class LocalParticipant extends Participant { const trackSender = track.sender; track.sender = undefined; if ( - this.engine.publisher && - this.engine.publisher.getConnectionState() !== 'closed' && + this.engine.pcManager && + this.engine.pcManager.currentState < PCTransportState.FAILED && trackSender ) { try { - for (const transceiver of this.engine.publisher.getTransceivers()) { + for (const transceiver of this.engine.pcManager.publisher.getTransceivers()) { // if sender is not currently sending (after replaceTrack(null)) // removeTrack would have no effect. // to ensure we end up successfully removing the track, manually set @@ -1310,44 +1308,4 @@ export default class LocalParticipant extends Participant { }); return publication; } - - /** @internal */ - publishedTracksInfo(): TrackPublishedResponse[] { - const infos: TrackPublishedResponse[] = []; - this.tracks.forEach((track: LocalTrackPublication) => { - if (track.track !== undefined) { - infos.push( - new TrackPublishedResponse({ - cid: track.track.mediaStreamID, - track: track.trackInfo, - }), - ); - } - }); - return infos; - } - - /** @internal */ - dataChannelsInfo(): DataChannelInfo[] { - const infos: DataChannelInfo[] = []; - const getInfo = (dc: RTCDataChannel | undefined, target: SignalTarget) => { - if (dc?.id !== undefined && dc.id !== null) { - infos.push( - new DataChannelInfo({ - label: dc.label, - id: dc.id, - target, - }), - ); - } - }; - getInfo(this.engine.dataChannelForKind(DataPacket_Kind.LOSSY), SignalTarget.PUBLISHER); - getInfo(this.engine.dataChannelForKind(DataPacket_Kind.RELIABLE), SignalTarget.PUBLISHER); - getInfo(this.engine.dataChannelForKind(DataPacket_Kind.LOSSY, true), SignalTarget.SUBSCRIBER); - getInfo( - this.engine.dataChannelForKind(DataPacket_Kind.RELIABLE, true), - SignalTarget.SUBSCRIBER, - ); - return infos; - } } diff --git a/src/room/track/utils.ts b/src/room/track/utils.ts index 5854ba560c..c17be4fcf0 100644 --- a/src/room/track/utils.ts +++ b/src/room/track/utils.ts @@ -1,6 +1,8 @@ +import { TrackPublishedResponse } from '../../proto/livekit_rtc_pb'; import { cloneDeep } from '../../utils/cloneDeep'; import { isSafari, sleep } from '../utils'; import { Track } from './Track'; +import type { TrackPublication } from './TrackPublication'; import { type AudioCaptureOptions, type CreateLocalTracksOptions, @@ -190,3 +192,20 @@ export function mimeTypeToVideoCodecString(mimeType: string) { } return codec; } + +export function getTrackPublicationInfo( + tracks: T[], +): TrackPublishedResponse[] { + const infos: TrackPublishedResponse[] = []; + tracks.forEach((track: TrackPublication) => { + if (track.track !== undefined) { + infos.push( + new TrackPublishedResponse({ + cid: track.track.mediaStreamID, + track: track.trackInfo, + }), + ); + } + }); + return infos; +} diff --git a/src/room/utils.ts b/src/room/utils.ts index 3605da678c..06e7086d1b 100644 --- a/src/room/utils.ts +++ b/src/room/utils.ts @@ -2,6 +2,7 @@ import { ClientInfo, ClientInfo_SDK } from '../proto/livekit_models_pb'; import type { DetectableBrowser } from '../utils/browserParser'; import { getBrowser } from '../utils/browserParser'; import { protocolVersion, version } from '../version'; +import CriticalTimers from './timers'; import type LocalAudioTrack from './track/LocalAudioTrack'; import type RemoteAudioTrack from './track/RemoteAudioTrack'; import { VideoCodec, videoCodecs } from './track/options'; @@ -21,7 +22,7 @@ export function unpackStreamId(packed: string): string[] { } export async function sleep(duration: number): Promise { - return new Promise((resolve) => setTimeout(resolve, duration)); + return new Promise((resolve) => CriticalTimers.setTimeout(resolve, duration)); } /** @internal */