From 2af29ff0d0ea73aaecb3adbc0b7c3b942c82a095 Mon Sep 17 00:00:00 2001 From: lukasIO Date: Wed, 25 Oct 2023 15:04:12 +0200 Subject: [PATCH 01/26] Make peerconnection private on PCTransport --- src/room/PCTransport.ts | 112 ++++++++++++++++++- src/room/RTCEngine.ts | 134 +++++++++-------------- src/room/Room.ts | 10 +- src/room/participant/LocalParticipant.ts | 7 +- 4 files changed, 167 insertions(+), 96 deletions(-) diff --git a/src/room/PCTransport.ts b/src/room/PCTransport.ts index 62fa4fe5ed..274b964b45 100644 --- a/src/room/PCTransport.ts +++ b/src/room/PCTransport.ts @@ -32,7 +32,7 @@ export const PCEvents = { export default class PCTransport extends EventEmitter { private _pc: RTCPeerConnection | null; - public get pc() { + private get pc() { if (this._pc) return this._pc; throw new UnexpectedConnectionState('Expected peer connection to be available'); } @@ -51,12 +51,33 @@ export default class PCTransport extends EventEmitter { onOffer?: (offer: RTCSessionDescriptionInit) => void; + onIceCandidate?: (candidate: RTCIceCandidate) => void; + + onConnectionStateChange?: (state: RTCPeerConnectionState) => void; + + onDataChannel?: (ev: RTCDataChannelEvent) => void; + + onTrack?: (ev: RTCTrackEvent) => void; + constructor(config?: RTCConfiguration, mediaConstraints: Record = {}) { super(); this._pc = isChromiumBased() ? // @ts-expect-error chrome allows additional media constraints to be passed into the RTCPeerConnection constructor new RTCPeerConnection(config, mediaConstraints) : new RTCPeerConnection(config); + this._pc.onicecandidate = (ev) => { + if (!ev.candidate) return; + this.onIceCandidate?.(ev.candidate); + }; + this._pc.onconnectionstatechange = () => { + this.onConnectionStateChange?.(this._pc?.connectionState ?? 'closed'); + }; + this._pc.ondatachannel = (ev) => { + this.onDataChannel?.(ev); + }; + this._pc.ontrack = (ev) => { + this.onTrack?.(ev); + }; } get isICEConnected(): boolean { @@ -270,10 +291,99 @@ export default class PCTransport extends EventEmitter { return answer; } + createDataChannel(label: string, dataChannelDict: RTCDataChannelInit) { + return this.pc.createDataChannel(label, dataChannelDict); + } + + addTransceiver(mediaStreamTrack: MediaStreamTrack, transceiverInit: RTCRtpTransceiverInit) { + return this.pc.addTransceiver(mediaStreamTrack, transceiverInit); + } + + addTrack(track: MediaStreamTrack) { + return this.pc.addTrack(track); + } + setTrackCodecBitrate(info: TrackBitrateInfo) { this.trackBitrates.push(info); } + setConfiguration(rtcConfig: RTCConfiguration) { + return this.pc.setConfiguration(rtcConfig); + } + + canRemoveTrack(): boolean { + return !!this.pc.removeTrack; + } + + removeTrack(sender: RTCRtpSender) { + return this.pc.removeTrack(sender); + } + + getConnectionState() { + return this.pc.connectionState; + } + + getICEConnectionState() { + return this.pc.iceConnectionState; + } + + getSignallingState() { + return this.pc.signalingState; + } + + getTransceivers() { + return this.pc.getTransceivers(); + } + + getSenders() { + return this.pc.getSenders(); + } + + getLocalDescription() { + return this.pc.localDescription; + } + + getRemoteDescription() { + return this.pc.remoteDescription; + } + + async getConnectedAddress(): Promise { + if (!this._pc) { + return; + } + let selectedCandidatePairId = ''; + const candidatePairs = new Map(); + // id -> candidate ip + const candidates = new Map(); + const stats: RTCStatsReport = await this._pc.getStats(); + stats.forEach((v) => { + switch (v.type) { + case 'transport': + selectedCandidatePairId = v.selectedCandidatePairId; + break; + case 'candidate-pair': + if (selectedCandidatePairId === '' && v.selected) { + selectedCandidatePairId = v.id; + } + candidatePairs.set(v.id, v); + break; + case 'remote-candidate': + candidates.set(v.id, `${v.address}:${v.port}`); + break; + default: + } + }); + + if (selectedCandidatePairId === '') { + return undefined; + } + const selectedID = candidatePairs.get(selectedCandidatePairId)?.remoteCandidateId; + if (selectedID === undefined) { + return undefined; + } + return candidates.get(selectedID); + } + close() { if (!this._pc) { return; diff --git a/src/room/RTCEngine.ts b/src/room/RTCEngine.ts index 8203dc518c..83ccf15498 100644 --- a/src/room/RTCEngine.ts +++ b/src/room/RTCEngine.ts @@ -108,7 +108,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit private subscriberPrimary: boolean = false; - private primaryPC?: RTCPeerConnection; + private primaryTransport?: PCTransport; private pcState: PCState = PCState.New; @@ -247,12 +247,12 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit } async cleanupPeerConnections() { - if (this.publisher && this.publisher.pc.signalingState !== 'closed') { - this.publisher.pc.getSenders().forEach((sender) => { + if (this.publisher && this.publisher.getSignallingState() !== 'closed') { + this.publisher.getSenders().forEach((sender) => { try { // TODO: react-native-webrtc doesn't have removeTrack yet. - if (this.publisher?.pc.removeTrack) { - this.publisher?.pc.removeTrack(sender); + if (this.publisher?.canRemoveTrack()) { + this.publisher?.removeTrack(sender); } } catch (e) { log.warn('could not removeTrack', { error: e }); @@ -268,7 +268,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit this.subscriber = undefined; } this.hasPublished = false; - this.primaryPC = undefined; + this.primaryTransport = undefined; const dcCleanup = (dc: RTCDataChannel | undefined) => { if (!dc) return; @@ -336,7 +336,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit delete this.pendingTrackResolvers[sender.track.id]; } try { - this.publisher?.pc.removeTrack(sender); + this.publisher?.removeTrack(sender); return true; } catch (e: unknown) { log.warn('failed to remove track', { error: e, method: 'removeTrack' }); @@ -353,10 +353,10 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit } async getConnectedServerAddress(): Promise { - if (this.primaryPC === undefined) { + if (this.primaryTransport === undefined) { return undefined; } - return getConnectedAddress(this.primaryPC); + return this.primaryTransport.getConnectedAddress(); } /* @internal */ @@ -387,40 +387,38 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit this.emit(EngineEvent.TransportsCreated, this.publisher, this.subscriber); - this.publisher.pc.onicecandidate = (ev) => { - if (!ev.candidate) return; - log.trace('adding ICE candidate for peer', ev.candidate); - this.client.sendIceCandidate(ev.candidate, SignalTarget.PUBLISHER); + this.publisher.onIceCandidate = (candidate) => { + log.trace('adding ICE candidate for peer', candidate); + this.client.sendIceCandidate(candidate, SignalTarget.PUBLISHER); }; - this.subscriber.pc.onicecandidate = (ev) => { - if (!ev.candidate) return; - this.client.sendIceCandidate(ev.candidate, SignalTarget.SUBSCRIBER); + this.subscriber.onIceCandidate = (candidate) => { + this.client.sendIceCandidate(candidate, SignalTarget.SUBSCRIBER); }; this.publisher.onOffer = (offer) => { this.client.sendOffer(offer); }; - let primaryPC = this.publisher.pc; - let secondaryPC = this.subscriber.pc; + let primaryTransport = this.publisher; + let secondaryTransport = this.subscriber; let subscriberPrimary = joinResponse.subscriberPrimary; if (subscriberPrimary) { - primaryPC = this.subscriber.pc; - secondaryPC = this.publisher.pc; + primaryTransport = this.subscriber; + secondaryTransport = this.publisher; // in subscriber primary mode, server side opens sub data channels. - this.subscriber.pc.ondatachannel = this.handleDataChannel; + this.subscriber.onDataChannel = this.handleDataChannel; } - this.primaryPC = primaryPC; - primaryPC.onconnectionstatechange = async () => { - log.debug(`primary PC state changed ${primaryPC.connectionState}`); - if (primaryPC.connectionState === 'connected') { + this.primaryTransport = primaryTransport; + primaryTransport.onConnectionStateChange = async (connectionState) => { + log.debug(`primary PC state changed ${connectionState}`); + if (connectionState === 'connected') { const shouldEmit = this.pcState === PCState.New; this.pcState = PCState.Connected; if (shouldEmit) { this.emit(EngineEvent.Connected, joinResponse); } - } else if (primaryPC.connectionState === 'failed') { + } else if (connectionState === 'failed') { // on Safari, PeerConnection will switch to 'disconnected' during renegotiation if (this.pcState === PCState.Connected) { this.pcState = PCState.Disconnected; @@ -434,10 +432,10 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit } } }; - secondaryPC.onconnectionstatechange = async () => { - log.debug(`secondary PC state changed ${secondaryPC.connectionState}`); + secondaryTransport.onConnectionStateChange = async (connectionState) => { + log.debug(`secondary PC state changed ${connectionState}`); // also reconnect if secondary peerconnection fails - if (secondaryPC.connectionState === 'failed') { + if (connectionState === 'failed') { this.handleDisconnect( 'secondary peerconnection', subscriberPrimary @@ -447,7 +445,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit } }; - this.subscriber.pc.ontrack = (ev: RTCTrackEvent) => { + this.subscriber.onTrack = (ev: RTCTrackEvent) => { this.emit(EngineEvent.MediaTrackAdded, ev.track, ev.streams[0], ev.receiver); }; @@ -462,7 +460,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit } log.debug('received server answer', { RTCSdpType: sd.type, - signalingState: this.publisher.pc.signalingState.toString(), + signalingState: this.publisher.getSignallingState().toString(), }); await this.publisher.setRemoteDescription(sd); }; @@ -487,7 +485,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit } log.debug('received server offer', { RTCSdpType: sd.type, - signalingState: this.subscriber.pc.signalingState.toString(), + signalingState: this.subscriber.getSignallingState().toString(), }); await this.subscriber.setRemoteDescription(sd); @@ -518,7 +516,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit this.client.onLeave = (leave?: LeaveRequest) => { if (leave?.canReconnect) { this.fullReconnectOnNext = true; - this.primaryPC = undefined; + this.primaryTransport = undefined; // reconnect immediately instead of waiting for next attempt this.handleDisconnect(leaveReconnect); } else { @@ -579,12 +577,12 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit } // create data channels - this.lossyDC = this.publisher.pc.createDataChannel(lossyDataChannel, { + this.lossyDC = this.publisher.createDataChannel(lossyDataChannel, { // will drop older packets that arrive ordered: true, maxRetransmits: 0, }); - this.reliableDC = this.publisher.pc.createDataChannel(reliableDataChannel, { + this.reliableDC = this.publisher.createDataChannel(reliableDataChannel, { ordered: true, }); @@ -765,7 +763,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit transceiverInit.sendEncodings = encodings; } // addTransceiver for react-native is async. web is synchronous, but await won't effect it. - const transceiver = await this.publisher.pc.addTransceiver( + const transceiver = await this.publisher.addTransceiver( track.mediaStreamTrack, transceiverInit, ); @@ -791,7 +789,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit transceiverInit.sendEncodings = encodings; } // addTransceiver for react-native is async. web is synchronous, but await won't effect it. - const transceiver = await this.publisher.pc.addTransceiver( + const transceiver = await this.publisher.addTransceiver( simulcastTrack.mediaStreamTrack, transceiverInit, ); @@ -807,7 +805,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit if (!this.publisher) { throw new UnexpectedConnectionState('publisher is closed'); } - return this.publisher.pc.addTrack(track); + return this.publisher.addTrack(track); } // websocket reconnect behavior. if websocket is interrupted, and the PeerConnection @@ -872,7 +870,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit this.clientConfiguration?.resumeConnection === ClientConfigSetting.DISABLED || // signaling state could change to closed due to hardware sleep // those connections cannot be resumed - (this.primaryPC?.signalingState ?? 'closed') === 'closed' + (this.primaryTransport?.getSignallingState() ?? 'closed') === 'closed' ) { this.fullReconnectOnNext = true; } @@ -999,8 +997,8 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit const res = await this.client.reconnect(this.url, this.token, this.participantSid, reason); if (res) { const rtcConfig = this.makeRTCConfiguration(res); - this.publisher.pc.setConfiguration(rtcConfig); - this.subscriber.pc.setConfiguration(rtcConfig); + this.publisher.setConfiguration(rtcConfig); + this.subscriber.setConfiguration(rtcConfig); } } catch (e) { let message = ''; @@ -1084,7 +1082,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit log.debug('waiting for peer connection to reconnect'); while (now - startTime < this.peerConnectionTimeout) { - if (this.primaryPC === undefined) { + if (this.primaryTransport === undefined) { // we can abort early, connection is hosed break; } else if ( @@ -1092,8 +1090,8 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit // this means we'd have to check its status manually and update address // manually now - startTime > minReconnectWait && - this.primaryPC?.connectionState === 'connected' && - (!this.hasPublished || this.publisher?.pc.connectionState === 'connected') + this.primaryTransport?.getConnectionState() === 'connected' && + (!this.hasPublished || this.publisher?.getConnectionState() === 'connected') ) { this.pcState = PCState.Connected; } @@ -1172,7 +1170,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit if ( !subscriber && !this.publisher?.isICEConnected && - this.publisher?.pc.iceConnectionState !== 'checking' + this.publisher?.getICEConnectionState() !== 'checking' ) { // start negotiation this.negotiate(); @@ -1196,7 +1194,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit } throw new ConnectionError( - `could not establish ${transportName} connection, state: ${transport.pc.iceConnectionState}`, + `could not establish ${transportName} connection, state: ${transport.getICEConnectionState()}`, ); } @@ -1207,12 +1205,12 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit /* @internal */ verifyTransport(): boolean { // primary connection - if (!this.primaryPC) { + if (!this.primaryTransport) { return false; } if ( - this.primaryPC.connectionState === 'closed' || - this.primaryPC.connectionState === 'failed' + this.primaryTransport.getConnectionState() === 'closed' || + this.primaryTransport.getConnectionState() === 'failed' ) { return false; } @@ -1223,8 +1221,8 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit return false; } if ( - this.publisher.pc.connectionState === 'closed' || - this.publisher.pc.connectionState === 'failed' + this.publisher.getConnectionState() === 'closed' || + this.publisher.getConnectionState() === 'failed' ) { return false; } @@ -1355,40 +1353,6 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit } } -async function getConnectedAddress(pc: RTCPeerConnection): Promise { - let selectedCandidatePairId = ''; - const candidatePairs = new Map(); - // id -> candidate ip - const candidates = new Map(); - const stats: RTCStatsReport = await pc.getStats(); - stats.forEach((v) => { - switch (v.type) { - case 'transport': - selectedCandidatePairId = v.selectedCandidatePairId; - break; - case 'candidate-pair': - if (selectedCandidatePairId === '' && v.selected) { - selectedCandidatePairId = v.id; - } - candidatePairs.set(v.id, v); - break; - case 'remote-candidate': - candidates.set(v.id, `${v.address}:${v.port}`); - break; - default: - } - }); - - if (selectedCandidatePairId === '') { - return undefined; - } - const selectedID = candidatePairs.get(selectedCandidatePairId)?.remoteCandidateId; - if (selectedID === undefined) { - return undefined; - } - return candidates.get(selectedID); -} - class SignalReconnectError extends Error {} export type EngineEventCallbacks = { diff --git a/src/room/Room.ts b/src/room/Room.ts index 0c49af49ee..b4688e51cf 100644 --- a/src/room/Room.ts +++ b/src/room/Room.ts @@ -1546,14 +1546,12 @@ class Room extends (EventEmitter as new () => TypedEmitter) } private sendSyncState() { - if ( - this.engine.subscriber === undefined || - this.engine.subscriber.pc.localDescription === null - ) { + const previousAnswer = this.engine.subscriber?.getLocalDescription(); + const previousOffer = this.engine.subscriber?.getRemoteDescription(); + + if (!previousAnswer) { return; } - const previousAnswer = this.engine.subscriber.pc.localDescription; - const previousOffer = this.engine.subscriber.pc.remoteDescription; /* 1. autosubscribe on, so subscribed tracks = all tracks - unsub tracks, in this case, we send unsub tracks, so server add all tracks to this diff --git a/src/room/participant/LocalParticipant.ts b/src/room/participant/LocalParticipant.ts index 332fbf5912..5b2ce33c1f 100644 --- a/src/room/participant/LocalParticipant.ts +++ b/src/room/participant/LocalParticipant.ts @@ -799,7 +799,7 @@ export default class LocalParticipant extends Participant { fix the issue. */ let trackTransceiver: RTCRtpTransceiver | undefined = undefined; - for (const transceiver of this.engine.publisher.pc.getTransceivers()) { + for (const transceiver of this.engine.publisher.getTransceivers()) { if (transceiver.sender === track.sender) { trackTransceiver = transceiver; break; @@ -889,7 +889,6 @@ export default class LocalParticipant extends Participant { { codec: opts.videoCodec, cid: simulcastTrack.mediaStreamTrack.id, - enableSimulcastLayers: opts.simulcast, }, ], }); @@ -947,11 +946,11 @@ export default class LocalParticipant extends Participant { track.sender = undefined; if ( this.engine.publisher && - this.engine.publisher.pc.connectionState !== 'closed' && + this.engine.publisher.getConnectionState() !== 'closed' && trackSender ) { try { - for (const transceiver of this.engine.publisher.pc.getTransceivers()) { + for (const transceiver of this.engine.publisher.getTransceivers()) { // if sender is not currently sending (after replaceTrack(null)) // removeTrack would have no effect. // to ensure we end up successfully removing the track, manually set From ce056d5f3f84985bcdcef13cf564d1f61df6f878 Mon Sep 17 00:00:00 2001 From: lukasIO Date: Wed, 25 Oct 2023 15:08:48 +0200 Subject: [PATCH 02/26] revert enablesimulcast for this branch --- src/room/participant/LocalParticipant.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/src/room/participant/LocalParticipant.ts b/src/room/participant/LocalParticipant.ts index 5b2ce33c1f..014f254a42 100644 --- a/src/room/participant/LocalParticipant.ts +++ b/src/room/participant/LocalParticipant.ts @@ -889,6 +889,7 @@ export default class LocalParticipant extends Participant { { codec: opts.videoCodec, cid: simulcastTrack.mediaStreamTrack.id, + enableSimulcastLayers: opts.simulcast, }, ], }); From ea5aab272977980606a37f8bd8654f7725c0f9c2 Mon Sep 17 00:00:00 2001 From: lukasIO Date: Wed, 25 Oct 2023 15:26:23 +0200 Subject: [PATCH 03/26] Create moody-nails-flash.md --- .changeset/moody-nails-flash.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/moody-nails-flash.md diff --git a/.changeset/moody-nails-flash.md b/.changeset/moody-nails-flash.md new file mode 100644 index 0000000000..ac429345f7 --- /dev/null +++ b/.changeset/moody-nails-flash.md @@ -0,0 +1,5 @@ +--- +"livekit-client": patch +--- + +Make peerconnection private on PCTransport From 31ded02db229a43c7ff58983785f53870bda5137 Mon Sep 17 00:00:00 2001 From: lukasIO Date: Wed, 25 Oct 2023 15:31:43 +0200 Subject: [PATCH 04/26] fix connection check --- src/connectionHelper/checks/webrtc.ts | 2 +- src/room/PCTransport.ts | 5 +++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/src/connectionHelper/checks/webrtc.ts b/src/connectionHelper/checks/webrtc.ts index 6461ef06a9..9082d847c7 100644 --- a/src/connectionHelper/checks/webrtc.ts +++ b/src/connectionHelper/checks/webrtc.ts @@ -39,7 +39,7 @@ export class WebRTCCheck extends Checker { }; if (this.room.engine.subscriber) { - this.room.engine.subscriber.pc.onicecandidateerror = (ev) => { + this.room.engine.subscriber.onIceCandidateError = (ev) => { if (ev instanceof RTCPeerConnectionIceErrorEvent) { this.appendWarning( `error with ICE candidate: ${ev.errorCode} ${ev.errorText} ${ev.url}`, diff --git a/src/room/PCTransport.ts b/src/room/PCTransport.ts index 274b964b45..e6fb95ad95 100644 --- a/src/room/PCTransport.ts +++ b/src/room/PCTransport.ts @@ -53,6 +53,8 @@ export default class PCTransport extends EventEmitter { onIceCandidate?: (candidate: RTCIceCandidate) => void; + onIceCandidateError?: (ev: Event) => void; + onConnectionStateChange?: (state: RTCPeerConnectionState) => void; onDataChannel?: (ev: RTCDataChannelEvent) => void; @@ -69,6 +71,9 @@ export default class PCTransport extends EventEmitter { if (!ev.candidate) return; this.onIceCandidate?.(ev.candidate); }; + this._pc.onicecandidateerror = (ev) => { + this.onIceCandidateError?.(ev); + }; this._pc.onconnectionstatechange = () => { this.onConnectionStateChange?.(this._pc?.connectionState ?? 'closed'); }; From 2999699070bf80001a8d0623acfc095f4d39d093 Mon Sep 17 00:00:00 2001 From: lukasIO Date: Thu, 26 Oct 2023 15:36:07 +0200 Subject: [PATCH 05/26] initial pcmanager tap in --- src/room/PCTransport.ts | 32 ++++++++++-- src/room/PCTransportManager.ts | 95 ++++++++++++++++++++++++++++++++++ src/room/RTCEngine.ts | 35 +++++-------- 3 files changed, 136 insertions(+), 26 deletions(-) create mode 100644 src/room/PCTransportManager.ts diff --git a/src/room/PCTransport.ts b/src/room/PCTransport.ts index e6fb95ad95..46eab7ea57 100644 --- a/src/room/PCTransport.ts +++ b/src/room/PCTransport.ts @@ -37,6 +37,10 @@ export default class PCTransport extends EventEmitter { throw new UnexpectedConnectionState('Expected peer connection to be available'); } + private config?: RTCConfiguration; + + private mediaConstraints: Record; + pendingCandidates: RTCIceCandidateInit[] = []; restartingIce: boolean = false; @@ -57,16 +61,26 @@ export default class PCTransport extends EventEmitter { onConnectionStateChange?: (state: RTCPeerConnectionState) => void; + onIceConnectionStateChange?: () => void; + + onSignalingStatechange?: () => void; + onDataChannel?: (ev: RTCDataChannelEvent) => void; onTrack?: (ev: RTCTrackEvent) => void; constructor(config?: RTCConfiguration, mediaConstraints: Record = {}) { super(); + this.config = config; + this.mediaConstraints = mediaConstraints; + this.setupPC(); + } + + private setupPC() { this._pc = isChromiumBased() ? // @ts-expect-error chrome allows additional media constraints to be passed into the RTCPeerConnection constructor - new RTCPeerConnection(config, mediaConstraints) - : new RTCPeerConnection(config); + new RTCPeerConnection(this.config, this.mediaConstraints) + : new RTCPeerConnection(this.config); this._pc.onicecandidate = (ev) => { if (!ev.candidate) return; this.onIceCandidate?.(ev.candidate); @@ -74,6 +88,15 @@ export default class PCTransport extends EventEmitter { this._pc.onicecandidateerror = (ev) => { this.onIceCandidateError?.(ev); }; + + this._pc.oniceconnectionstatechange = () => { + this.onIceConnectionStateChange?.(); + }; + + this._pc.onsignalingstatechange = () => { + this.onSignalingStatechange?.(); + }; + this._pc.onconnectionstatechange = () => { this.onConnectionStateChange?.(this._pc?.connectionState ?? 'closed'); }; @@ -389,7 +412,7 @@ export default class PCTransport extends EventEmitter { return candidates.get(selectedID); } - close() { + close = () => { if (!this._pc) { return; } @@ -406,7 +429,8 @@ export default class PCTransport extends EventEmitter { this._pc.onconnectionstatechange = null; this._pc.oniceconnectionstatechange = null; this._pc = null; - } + this.setupPC(); + }; private async setMungedSDP(sd: RTCSessionDescriptionInit, munged?: string, remote?: boolean) { if (munged) { diff --git a/src/room/PCTransportManager.ts b/src/room/PCTransportManager.ts new file mode 100644 index 0000000000..d42acf63f3 --- /dev/null +++ b/src/room/PCTransportManager.ts @@ -0,0 +1,95 @@ +import log from '../logger'; +import PCTransport from './PCTransport'; + +export enum PCTransportState { + IDLE, + CONNECTING, + CONNECTED, + RECONNECTING, + FAILED, + CLOSED, +} + +export class PCTransportManager { + private isPublisherConnectionRequired: boolean; + + private isSubscriberConnectionRequired: boolean; + + public publisher: PCTransport; + + public subscriber: PCTransport; + + private state: PCTransportState; + + onStateChange?: (state: PCTransportState) => void; + + constructor(rtcConfig: RTCConfiguration) { + this.isPublisherConnectionRequired = true; + this.isSubscriberConnectionRequired = true; + const googConstraints = { optional: [{ googDscp: true }] }; + this.publisher = new PCTransport(rtcConfig, googConstraints); + this.subscriber = new PCTransport(rtcConfig); + + this.publisher.onConnectionStateChange = this.handleStateChanged; + this.subscriber.onConnectionStateChange = this.handleStateChanged; + this.publisher.onIceConnectionStateChange = this.handleStateChanged; + this.subscriber.onIceConnectionStateChange = this.handleStateChanged; + this.publisher.onSignalingStatechange = this.handleStateChanged; + this.subscriber.onSignalingStatechange = this.handleStateChanged; + + this.state = PCTransportState.IDLE; + } + + async ensurePCTransportConnection() { + return Promise.all(this.requiredTransports?.map(this.ensureTransportConnected)); + } + + private get requiredTransports() { + const transports: PCTransport[] = []; + if (this.isPublisherConnectionRequired) { + transports.push(this.publisher); + } + if (this.isSubscriberConnectionRequired) { + transports.push(this.subscriber); + } + return transports; + } + + private handleStateChanged = () => { + const previousState = this.state; + + const connectionStates = this.requiredTransports.map((tr) => tr.getConnectionState()); + if (connectionStates.every((st) => st === 'connected')) { + this.state = PCTransportState.CONNECTED; + } else if (connectionStates.some((st) => st === 'failed')) { + this.state = PCTransportState.FAILED; + } else if (connectionStates.some((st) => st === 'connecting')) { + this.state = PCTransportState.CONNECTING; + } else if (connectionStates.some((st) => st === 'closed')) { + this.state = PCTransportState.CLOSED; + } + + if (previousState !== this.state) { + this.onStateChange?.(this.state); + } + log.info('pc state', { + overall: this.state, + publisher: getPCState(this.publisher), + subscriber: getPCState(this.subscriber), + }); + }; + + private async ensureTransportConnected(pcTransport: PCTransport) { + if (pcTransport.getConnectionState() === 'connected') { + return true; + } + } +} + +function getPCState(pcTransport: PCTransport) { + return { + connectionState: pcTransport.getConnectionState(), + iceState: pcTransport.getICEConnectionState(), + signallingState: pcTransport.getSignallingState(), + }; +} diff --git a/src/room/RTCEngine.ts b/src/room/RTCEngine.ts index 83ccf15498..d0daf7324a 100644 --- a/src/room/RTCEngine.ts +++ b/src/room/RTCEngine.ts @@ -31,6 +31,7 @@ import { TrackPublishedResponse, } from '../proto/livekit_rtc_pb'; import PCTransport, { PCEvents } from './PCTransport'; +import { PCTransportManager, PCTransportState } from './PCTransportManager'; import type { ReconnectContext, ReconnectPolicy } from './ReconnectPolicy'; import type { RegionUrlProvider } from './RegionUrlProvider'; import { roomConnectOptionDefaults } from './defaults'; @@ -85,6 +86,8 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit fullReconnectOnNext: boolean = false; + pcManager?: PCTransportManager; + /** * @internal */ @@ -381,9 +384,11 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit rtcConfig.encodedInsertableStreams = true; } - const googConstraints = { optional: [{ googDscp: true }] }; - this.publisher = new PCTransport(rtcConfig, googConstraints); - this.subscriber = new PCTransport(rtcConfig); + // this.publisher = new PCTransport(rtcConfig, googConstraints); + // this.subscriber = new PCTransport(rtcConfig); + this.pcManager = new PCTransportManager(rtcConfig); + this.publisher = this.pcManager.publisher; + this.subscriber = this.pcManager.subscriber; this.emit(EngineEvent.TransportsCreated, this.publisher, this.subscriber); @@ -401,49 +406,35 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit }; let primaryTransport = this.publisher; - let secondaryTransport = this.subscriber; let subscriberPrimary = joinResponse.subscriberPrimary; if (subscriberPrimary) { primaryTransport = this.subscriber; - secondaryTransport = this.publisher; // in subscriber primary mode, server side opens sub data channels. this.subscriber.onDataChannel = this.handleDataChannel; } this.primaryTransport = primaryTransport; - primaryTransport.onConnectionStateChange = async (connectionState) => { + this.pcManager.onStateChange = async (connectionState) => { log.debug(`primary PC state changed ${connectionState}`); - if (connectionState === 'connected') { + if (connectionState === PCTransportState.CONNECTED) { const shouldEmit = this.pcState === PCState.New; this.pcState = PCState.Connected; if (shouldEmit) { this.emit(EngineEvent.Connected, joinResponse); } - } else if (connectionState === 'failed') { + } else if (connectionState === PCTransportState.FAILED) { // on Safari, PeerConnection will switch to 'disconnected' during renegotiation if (this.pcState === PCState.Connected) { this.pcState = PCState.Disconnected; this.handleDisconnect( - 'primary peerconnection', - subscriberPrimary + 'peerconnection failed', + subscriberPrimary // FIXME actually determine which peer connection failed ? ReconnectReason.RR_SUBSCRIBER_FAILED : ReconnectReason.RR_PUBLISHER_FAILED, ); } } }; - secondaryTransport.onConnectionStateChange = async (connectionState) => { - log.debug(`secondary PC state changed ${connectionState}`); - // also reconnect if secondary peerconnection fails - if (connectionState === 'failed') { - this.handleDisconnect( - 'secondary peerconnection', - subscriberPrimary - ? ReconnectReason.RR_PUBLISHER_FAILED - : ReconnectReason.RR_SUBSCRIBER_FAILED, - ); - } - }; this.subscriber.onTrack = (ev: RTCTrackEvent) => { this.emit(EngineEvent.MediaTrackAdded, ev.track, ev.streams[0], ev.receiver); From a3a4b82826850b9566177a0f902ae42581b70805 Mon Sep 17 00:00:00 2001 From: lukasIO Date: Thu, 26 Oct 2023 18:39:06 +0200 Subject: [PATCH 06/26] WIP managed transport connection --- src/room/PCTransport.ts | 27 +++---- src/room/PCTransportManager.ts | 119 +++++++++++++++++++++++++++---- src/room/RTCEngine.ts | 126 ++++++++++++++------------------- src/room/utils.ts | 3 +- 4 files changed, 175 insertions(+), 100 deletions(-) diff --git a/src/room/PCTransport.ts b/src/room/PCTransport.ts index 46eab7ea57..887237790e 100644 --- a/src/room/PCTransport.ts +++ b/src/room/PCTransport.ts @@ -73,39 +73,40 @@ export default class PCTransport extends EventEmitter { super(); this.config = config; this.mediaConstraints = mediaConstraints; - this.setupPC(); + this._pc = this.setupPC(); } private setupPC() { - this._pc = isChromiumBased() + const pc = isChromiumBased() ? // @ts-expect-error chrome allows additional media constraints to be passed into the RTCPeerConnection constructor new RTCPeerConnection(this.config, this.mediaConstraints) : new RTCPeerConnection(this.config); - this._pc.onicecandidate = (ev) => { + pc.onicecandidate = (ev) => { if (!ev.candidate) return; this.onIceCandidate?.(ev.candidate); }; - this._pc.onicecandidateerror = (ev) => { + pc.onicecandidateerror = (ev) => { this.onIceCandidateError?.(ev); }; - this._pc.oniceconnectionstatechange = () => { + pc.oniceconnectionstatechange = () => { this.onIceConnectionStateChange?.(); }; - this._pc.onsignalingstatechange = () => { + pc.onsignalingstatechange = () => { this.onSignalingStatechange?.(); }; - this._pc.onconnectionstatechange = () => { + pc.onconnectionstatechange = () => { this.onConnectionStateChange?.(this._pc?.connectionState ?? 'closed'); }; - this._pc.ondatachannel = (ev) => { + pc.ondatachannel = (ev) => { this.onDataChannel?.(ev); }; - this._pc.ontrack = (ev) => { + pc.ontrack = (ev) => { this.onTrack?.(ev); }; + return pc; } get isICEConnected(): boolean { @@ -206,10 +207,10 @@ export default class PCTransport extends EventEmitter { } // debounced negotiate interface - negotiate = debounce((onError?: (e: Error) => void) => { + negotiate = debounce(async (onError?: (e: Error) => void) => { this.emit(PCEvents.NegotiationStarted); try { - this.createAndSendOffer(); + await this.createAndSendOffer(); } catch (e) { if (onError) { onError(e as Error); @@ -413,6 +414,7 @@ export default class PCTransport extends EventEmitter { } close = () => { + console.warn('closing pc transport'); if (!this._pc) { return; } @@ -428,8 +430,7 @@ export default class PCTransport extends EventEmitter { this._pc.ontrack = null; this._pc.onconnectionstatechange = null; this._pc.oniceconnectionstatechange = null; - this._pc = null; - this.setupPC(); + this._pc = this.setupPC(); }; private async setMungedSDP(sd: RTCSessionDescriptionInit, munged?: string, remote?: boolean) { diff --git a/src/room/PCTransportManager.ts b/src/room/PCTransportManager.ts index d42acf63f3..5a14dd7938 100644 --- a/src/room/PCTransportManager.ts +++ b/src/room/PCTransportManager.ts @@ -1,5 +1,9 @@ import log from '../logger'; import PCTransport from './PCTransport'; +import { roomConnectOptionDefaults } from './defaults'; +import { ConnectionError, ConnectionErrorReason } from './errors'; +import CriticalTimers from './timers'; +import { Mutex, sleep } from './utils'; export enum PCTransportState { IDLE, @@ -11,20 +15,32 @@ export enum PCTransportState { } export class PCTransportManager { + public publisher: PCTransport; + + public subscriber: PCTransport; + + public get needsPublisher() { + return this.isPublisherConnectionRequired; + } + + public get needsSubscriber() { + return this.isSubscriberConnectionRequired; + } + private isPublisherConnectionRequired: boolean; private isSubscriberConnectionRequired: boolean; - public publisher: PCTransport; + private state: PCTransportState; - public subscriber: PCTransport; + private peerConnectionTimeout: number = roomConnectOptionDefaults.peerConnectionTimeout; - private state: PCTransportState; + private connectionLock: Mutex; - onStateChange?: (state: PCTransportState) => void; + public onStateChange?: (state: PCTransportState) => void; constructor(rtcConfig: RTCConfiguration) { - this.isPublisherConnectionRequired = true; + this.isPublisherConnectionRequired = false; this.isSubscriberConnectionRequired = true; const googConstraints = { optional: [{ googDscp: true }] }; this.publisher = new PCTransport(rtcConfig, googConstraints); @@ -38,10 +54,35 @@ export class PCTransportManager { this.subscriber.onSignalingStatechange = this.handleStateChanged; this.state = PCTransportState.IDLE; + + this.connectionLock = new Mutex(); + } + + requirePublisher(require = true) { + this.isPublisherConnectionRequired = require; + this.handleStateChanged(); + } + + requireSubscriber(require = true) { + this.isSubscriberConnectionRequired = require; + this.handleStateChanged(); } - async ensurePCTransportConnection() { - return Promise.all(this.requiredTransports?.map(this.ensureTransportConnected)); + createAndSendOffer(options?: RTCOfferOptions) { + return this.publisher.createAndSendOffer(options); + } + + async ensurePCTransportConnection(abortController?: AbortController, timeout?: number) { + const unlock = await this.connectionLock.lock(); + try { + await Promise.all( + this.requiredTransports?.map((transport) => + this.ensureTransportConnected(transport, abortController, timeout), + ), + ); + } finally { + unlock(); + } } private get requiredTransports() { @@ -71,18 +112,66 @@ export class PCTransportManager { if (previousState !== this.state) { this.onStateChange?.(this.state); + log.info('pc state', { + overall: this.state, + publisher: getPCState(this.publisher), + subscriber: getPCState(this.subscriber), + }); } - log.info('pc state', { - overall: this.state, - publisher: getPCState(this.publisher), - subscriber: getPCState(this.subscriber), - }); }; - private async ensureTransportConnected(pcTransport: PCTransport) { - if (pcTransport.getConnectionState() === 'connected') { - return true; + private async ensureTransportConnected( + pcTransport: PCTransport, + abortController?: AbortController, + timeout: number = this.peerConnectionTimeout, + ) { + const connectionState = pcTransport.getConnectionState(); + if (connectionState === 'connected') { + return; } + // if (this.pcState !== PCState.New) { + // throw new UnexpectedConnectionState( + // 'Expected peer connection to be new on initial connection', + // ); + // } + return new Promise(async (resolve, reject) => { + const abortHandler = () => { + log.warn('abort transport connection'); + CriticalTimers.clearTimeout(connectTimeout); + + reject( + new ConnectionError( + 'room connection has been cancelled', + ConnectionErrorReason.Cancelled, + ), + ); + }; + if (abortController?.signal.aborted) { + abortHandler(); + } + abortController?.signal.addEventListener('abort', abortHandler); + + const connectTimeout = CriticalTimers.setTimeout(() => { + abortController?.signal.removeEventListener('abort', abortHandler); + reject(new ConnectionError('could not establish pc connection')); + }, timeout); + + while (this.state !== PCTransportState.CONNECTED) { + await sleep(50); // FIXME we shouldn't rely on `sleep` in the connection paths, as it invokes `setTimeout` which can be drastically throttled by browser implementations + if (abortController?.signal.aborted) { + reject( + new ConnectionError( + 'room connection has been cancelled', + ConnectionErrorReason.Cancelled, + ), + ); + return; + } + } + CriticalTimers.clearTimeout(connectTimeout); + abortController?.signal.removeEventListener('abort', abortHandler); + resolve(); + }); } } diff --git a/src/room/RTCEngine.ts b/src/room/RTCEngine.ts index d0daf7324a..655621014b 100644 --- a/src/room/RTCEngine.ts +++ b/src/room/RTCEngine.ts @@ -121,10 +121,6 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit [key: string]: { resolve: (info: TrackInfo) => void; reject: () => void }; } = {}; - // true if publisher connection has already been established. - // this is helpful to know if we need to restart ICE on the publisher connection - private hasPublished: boolean = false; - // keep join info around for reconnect, this could be a region url private url?: string; @@ -251,16 +247,18 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit async cleanupPeerConnections() { if (this.publisher && this.publisher.getSignallingState() !== 'closed') { - this.publisher.getSenders().forEach((sender) => { + const publisher = this.publisher; + for (const sender of publisher.getSenders()) { try { // TODO: react-native-webrtc doesn't have removeTrack yet. - if (this.publisher?.canRemoveTrack()) { - this.publisher?.removeTrack(sender); + if (publisher.canRemoveTrack()) { + console.log('removing track'); + await publisher.removeTrack(sender); } } catch (e) { log.warn('could not removeTrack', { error: e }); } - }); + } } if (this.publisher) { this.publisher.close(); @@ -270,7 +268,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit this.subscriber.close(); this.subscriber = undefined; } - this.hasPublished = false; + this.primaryTransport = undefined; const dcCleanup = (dc: RTCDataChannel | undefined) => { @@ -1011,7 +1009,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit this.subscriber.restartingIce = true; // only restart publisher if it's needed - if (this.hasPublished) { + if (this.pcManager?.needsPublisher) { await this.publisher.createAndSendOffer({ iceRestart: true }); } @@ -1029,72 +1027,58 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit } async waitForPCInitialConnection(timeout?: number, abortController?: AbortController) { - if (this.pcState === PCState.Connected) { - return; - } - if (this.pcState !== PCState.New) { - throw new UnexpectedConnectionState( - 'Expected peer connection to be new on initial connection', - ); - } - return new Promise((resolve, reject) => { - const abortHandler = () => { - log.warn('closing engine'); - CriticalTimers.clearTimeout(connectTimeout); - - reject( - new ConnectionError( - 'room connection has been cancelled', - ConnectionErrorReason.Cancelled, - ), - ); - }; - if (abortController?.signal.aborted) { - abortHandler(); - } - abortController?.signal.addEventListener('abort', abortHandler); - const onConnected = () => { - CriticalTimers.clearTimeout(connectTimeout); - abortController?.signal.removeEventListener('abort', abortHandler); - resolve(); - }; - const connectTimeout = CriticalTimers.setTimeout(() => { - this.off(EngineEvent.Connected, onConnected); - reject(new ConnectionError('could not establish pc connection')); - }, timeout ?? this.peerConnectionTimeout); - this.once(EngineEvent.Connected, onConnected); - }); + await this.pcManager?.ensurePCTransportConnection(abortController, timeout); + // if (this.pcState === PCState.Connected) { + // return; + // } + // if (this.pcState !== PCState.New) { + // throw new UnexpectedConnectionState( + // 'Expected peer connection to be new on initial connection', + // ); + // } + // return new Promise((resolve, reject) => { + // const abortHandler = () => { + // log.warn('closing engine'); + // CriticalTimers.clearTimeout(connectTimeout); + + // reject( + // new ConnectionError( + // 'room connection has been cancelled', + // ConnectionErrorReason.Cancelled, + // ), + // ); + // }; + // if (abortController?.signal.aborted) { + // abortHandler(); + // } + // abortController?.signal.addEventListener('abort', abortHandler); + // const onConnected = () => { + // CriticalTimers.clearTimeout(connectTimeout); + // abortController?.signal.removeEventListener('abort', abortHandler); + // resolve(); + // }; + // const connectTimeout = CriticalTimers.setTimeout(() => { + // this.off(EngineEvent.Connected, onConnected); + // reject(new ConnectionError('could not establish pc connection')); + // }, timeout ?? this.peerConnectionTimeout); + // this.once(EngineEvent.Connected, onConnected); + // }); } private async waitForPCReconnected() { - const startTime = Date.now(); - let now = startTime; + // const startTime = Date.now(); + // let now = startTime; this.pcState = PCState.Reconnecting; log.debug('waiting for peer connection to reconnect'); - while (now - startTime < this.peerConnectionTimeout) { - if (this.primaryTransport === undefined) { - // we can abort early, connection is hosed - break; - } else if ( - // on Safari, we don't get a connectionstatechanged event during ICE restart - // this means we'd have to check its status manually and update address - // manually - now - startTime > minReconnectWait && - this.primaryTransport?.getConnectionState() === 'connected' && - (!this.hasPublished || this.publisher?.getConnectionState() === 'connected') - ) { - this.pcState = PCState.Connected; - } - if (this.pcState === PCState.Connected) { - return; - } - await sleep(100); - now = Date.now(); + try { + await sleep(minReconnectWait); // FIXME setTimeout again not ideal for a connection critical path + await this.pcManager?.ensurePCTransportConnection(undefined, this.peerConnectionTimeout); + this.pcState = PCState.Connected; + } catch (e: any) { + // TODO do we need a `failed` state here for the PC? + throw new ConnectionError(`could not establish PC connection, ${e.message}`); } - - // have not reconnected, throw - throw new ConnectionError('could not establish PC connection'); } waitForRestarted = () => { @@ -1207,7 +1191,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit } // also verify publisher connection if it's needed or different - if (this.hasPublished && this.subscriberPrimary) { + if (this.pcManager?.needsPublisher && this.subscriberPrimary) { if (!this.publisher) { return false; } @@ -1235,7 +1219,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit return; } - this.hasPublished = true; + this.pcManager?.requirePublisher(); const handleClosed = () => { log.debug('engine disconnected while negotiation was ongoing'); diff --git a/src/room/utils.ts b/src/room/utils.ts index 3605da678c..06e7086d1b 100644 --- a/src/room/utils.ts +++ b/src/room/utils.ts @@ -2,6 +2,7 @@ import { ClientInfo, ClientInfo_SDK } from '../proto/livekit_models_pb'; import type { DetectableBrowser } from '../utils/browserParser'; import { getBrowser } from '../utils/browserParser'; import { protocolVersion, version } from '../version'; +import CriticalTimers from './timers'; import type LocalAudioTrack from './track/LocalAudioTrack'; import type RemoteAudioTrack from './track/RemoteAudioTrack'; import { VideoCodec, videoCodecs } from './track/options'; @@ -21,7 +22,7 @@ export function unpackStreamId(packed: string): string[] { } export async function sleep(duration: number): Promise { - return new Promise((resolve) => setTimeout(resolve, duration)); + return new Promise((resolve) => CriticalTimers.setTimeout(resolve, duration)); } /** @internal */ From 7af52289c7d7137bf9441ac49cf8bb43c05c1b33 Mon Sep 17 00:00:00 2001 From: lukasIO Date: Fri, 27 Oct 2023 13:12:39 +0200 Subject: [PATCH 07/26] shift more responsibilities to transport manager --- src/room/PCTransport.ts | 10 ++++--- src/room/PCTransportManager.ts | 46 +++++++++++++++++++++++-------- src/room/RTCEngine.ts | 36 +++++------------------- src/room/track/LocalAudioTrack.ts | 2 +- 4 files changed, 48 insertions(+), 46 deletions(-) diff --git a/src/room/PCTransport.ts b/src/room/PCTransport.ts index 887237790e..4b462329a8 100644 --- a/src/room/PCTransport.ts +++ b/src/room/PCTransport.ts @@ -3,7 +3,7 @@ import type { MediaDescription } from 'sdp-transform'; import { parse, write } from 'sdp-transform'; import { debounce } from 'ts-debounce'; import log from '../logger'; -import { NegotiationError, UnexpectedConnectionState } from './errors'; +import { NegotiationError } from './errors'; import { ddExtensionURI, isChromiumBased, isSVCCodec } from './utils'; /** @internal */ @@ -33,8 +33,10 @@ export default class PCTransport extends EventEmitter { private _pc: RTCPeerConnection | null; private get pc() { - if (this._pc) return this._pc; - throw new UnexpectedConnectionState('Expected peer connection to be available'); + if (!this._pc) { + this._pc = this.setupPC(); + } + return this._pc; } private config?: RTCConfiguration; @@ -430,7 +432,7 @@ export default class PCTransport extends EventEmitter { this._pc.ontrack = null; this._pc.onconnectionstatechange = null; this._pc.oniceconnectionstatechange = null; - this._pc = this.setupPC(); + this._pc = null; }; private async setMungedSDP(sd: RTCSessionDescriptionInit, munged?: string, remote?: boolean) { diff --git a/src/room/PCTransportManager.ts b/src/room/PCTransportManager.ts index 5a14dd7938..abea63ed61 100644 --- a/src/room/PCTransportManager.ts +++ b/src/room/PCTransportManager.ts @@ -27,6 +27,10 @@ export class PCTransportManager { return this.isSubscriberConnectionRequired; } + public get currentState() { + return this.state; + } + private isPublisherConnectionRequired: boolean; private isSubscriberConnectionRequired: boolean; @@ -39,19 +43,19 @@ export class PCTransportManager { public onStateChange?: (state: PCTransportState) => void; - constructor(rtcConfig: RTCConfiguration) { - this.isPublisherConnectionRequired = false; - this.isSubscriberConnectionRequired = true; + constructor(rtcConfig: RTCConfiguration, subscriberPrimary: boolean) { + this.isPublisherConnectionRequired = !subscriberPrimary; + this.isSubscriberConnectionRequired = subscriberPrimary; const googConstraints = { optional: [{ googDscp: true }] }; this.publisher = new PCTransport(rtcConfig, googConstraints); this.subscriber = new PCTransport(rtcConfig); - this.publisher.onConnectionStateChange = this.handleStateChanged; - this.subscriber.onConnectionStateChange = this.handleStateChanged; - this.publisher.onIceConnectionStateChange = this.handleStateChanged; - this.subscriber.onIceConnectionStateChange = this.handleStateChanged; - this.publisher.onSignalingStatechange = this.handleStateChanged; - this.subscriber.onSignalingStatechange = this.handleStateChanged; + this.publisher.onConnectionStateChange = this.updateState; + this.subscriber.onConnectionStateChange = this.updateState; + this.publisher.onIceConnectionStateChange = this.updateState; + this.subscriber.onIceConnectionStateChange = this.updateState; + this.publisher.onSignalingStatechange = this.updateState; + this.subscriber.onSignalingStatechange = this.updateState; this.state = PCTransportState.IDLE; @@ -60,18 +64,36 @@ export class PCTransportManager { requirePublisher(require = true) { this.isPublisherConnectionRequired = require; - this.handleStateChanged(); + this.updateState(); } requireSubscriber(require = true) { this.isSubscriberConnectionRequired = require; - this.handleStateChanged(); + this.updateState(); } createAndSendOffer(options?: RTCOfferOptions) { return this.publisher.createAndSendOffer(options); } + close() { + this.publisher.close(); + this.subscriber.close(); + } + + async triggerIceRestart() { + this.subscriber.restartingIce = true; + // only restart publisher if it's needed + if (this.needsPublisher) { + await this.createAndSendOffer({ iceRestart: true }); + } + } + + updateConfiguration(config: RTCConfiguration) { + this.publisher.setConfiguration(config); + this.subscriber.setConfiguration(config); + } + async ensurePCTransportConnection(abortController?: AbortController, timeout?: number) { const unlock = await this.connectionLock.lock(); try { @@ -96,7 +118,7 @@ export class PCTransportManager { return transports; } - private handleStateChanged = () => { + private updateState = () => { const previousState = this.state; const connectionStates = this.requiredTransports.map((tr) => tr.getConnectionState()); diff --git a/src/room/RTCEngine.ts b/src/room/RTCEngine.ts index 5866d2bc36..2f5427c6ef 100644 --- a/src/room/RTCEngine.ts +++ b/src/room/RTCEngine.ts @@ -382,7 +382,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit // this.publisher = new PCTransport(rtcConfig, googConstraints); // this.subscriber = new PCTransport(rtcConfig); - this.pcManager = new PCTransportManager(rtcConfig); + this.pcManager = new PCTransportManager(rtcConfig, joinResponse.subscriberPrimary); this.publisher = this.pcManager.publisher; this.subscriber = this.pcManager.subscriber; @@ -972,7 +972,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit throw new UnexpectedConnectionState('could not reconnect, url or token not saved'); } // trigger publisher reconnect - if (!this.publisher || !this.subscriber) { + if (!this.pcManager) { throw new UnexpectedConnectionState('publisher and subscriber connections unset'); } @@ -984,8 +984,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit const res = await this.client.reconnect(this.url, this.token, this.participantSid, reason); if (res) { const rtcConfig = this.makeRTCConfiguration(res); - this.publisher.setConfiguration(rtcConfig); - this.subscriber.setConfiguration(rtcConfig); + this.pcManager.updateConfiguration(rtcConfig); } } catch (e) { let message = ''; @@ -1004,12 +1003,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit throw new Error('simulated failure'); } - this.subscriber.restartingIce = true; - - // only restart publisher if it's needed - if (this.pcManager?.needsPublisher) { - await this.publisher.createAndSendOffer({ iceRestart: true }); - } + await this.pcManager.triggerIceRestart(); await this.waitForPCReconnected(); this.client.setReconnected(); @@ -1177,30 +1171,14 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit /* @internal */ verifyTransport(): boolean { - // primary connection - if (!this.primaryTransport) { + if (!this.pcManager) { return false; } - if ( - this.primaryTransport.getConnectionState() === 'closed' || - this.primaryTransport.getConnectionState() === 'failed' - ) { + // primary connection + if (this.pcManager.currentState !== PCTransportState.CONNECTED) { return false; } - // also verify publisher connection if it's needed or different - if (this.pcManager?.needsPublisher && this.subscriberPrimary) { - if (!this.publisher) { - return false; - } - if ( - this.publisher.getConnectionState() === 'closed' || - this.publisher.getConnectionState() === 'failed' - ) { - return false; - } - } - // ensure signal is connected if (!this.client.ws || this.client.ws.readyState === WebSocket.CLOSED) { return false; diff --git a/src/room/track/LocalAudioTrack.ts b/src/room/track/LocalAudioTrack.ts index bebb3f7bf2..530fb9b54d 100644 --- a/src/room/track/LocalAudioTrack.ts +++ b/src/room/track/LocalAudioTrack.ts @@ -138,7 +138,7 @@ export default class LocalAudioTrack extends LocalTrack { this.prevStats = stats; }; - async setProcessor(processor: TrackProcessor) { + async setProcessor(processor: TrackProcessor) { const unlock = await this.processorLock.lock(); try { if (!this.audioContext) { From e43480c963ab00d2fb8d34fcba1e1e3f1be2fb04 Mon Sep 17 00:00:00 2001 From: lukasIO Date: Fri, 27 Oct 2023 15:34:25 +0200 Subject: [PATCH 08/26] leaky PC lifecycle --- src/room/PCTransport.ts | 8 +++--- src/room/PCTransportManager.ts | 45 ++++++++++++++++++++-------------- src/room/RTCEngine.ts | 28 ++++----------------- 3 files changed, 36 insertions(+), 45 deletions(-) diff --git a/src/room/PCTransport.ts b/src/room/PCTransport.ts index 4b462329a8..da0ab03ef9 100644 --- a/src/room/PCTransport.ts +++ b/src/room/PCTransport.ts @@ -3,7 +3,7 @@ import type { MediaDescription } from 'sdp-transform'; import { parse, write } from 'sdp-transform'; import { debounce } from 'ts-debounce'; import log from '../logger'; -import { NegotiationError } from './errors'; +import { NegotiationError, UnexpectedConnectionState } from './errors'; import { ddExtensionURI, isChromiumBased, isSVCCodec } from './utils'; /** @internal */ @@ -34,7 +34,7 @@ export default class PCTransport extends EventEmitter { private get pc() { if (!this._pc) { - this._pc = this.setupPC(); + this._pc = new RTCPeerConnection(this.config); // FIXME this seems to leak peer connections } return this._pc; } @@ -75,10 +75,10 @@ export default class PCTransport extends EventEmitter { super(); this.config = config; this.mediaConstraints = mediaConstraints; - this._pc = this.setupPC(); + this._pc = this.createPC(); } - private setupPC() { + private createPC() { const pc = isChromiumBased() ? // @ts-expect-error chrome allows additional media constraints to be passed into the RTCPeerConnection constructor new RTCPeerConnection(this.config, this.mediaConstraints) diff --git a/src/room/PCTransportManager.ts b/src/room/PCTransportManager.ts index abea63ed61..bfbe65e08e 100644 --- a/src/room/PCTransportManager.ts +++ b/src/room/PCTransportManager.ts @@ -6,12 +6,10 @@ import CriticalTimers from './timers'; import { Mutex, sleep } from './utils'; export enum PCTransportState { - IDLE, + DISCONNECTED, CONNECTING, CONNECTED, - RECONNECTING, FAILED, - CLOSED, } export class PCTransportManager { @@ -19,6 +17,8 @@ export class PCTransportManager { public subscriber: PCTransport; + public peerConnectionTimeout: number = roomConnectOptionDefaults.peerConnectionTimeout; + public get needsPublisher() { return this.isPublisherConnectionRequired; } @@ -31,18 +31,16 @@ export class PCTransportManager { return this.state; } + public onStateChange?: (state: PCTransportState) => void; + private isPublisherConnectionRequired: boolean; private isSubscriberConnectionRequired: boolean; private state: PCTransportState; - private peerConnectionTimeout: number = roomConnectOptionDefaults.peerConnectionTimeout; - private connectionLock: Mutex; - public onStateChange?: (state: PCTransportState) => void; - constructor(rtcConfig: RTCConfiguration, subscriberPrimary: boolean) { this.isPublisherConnectionRequired = !subscriberPrimary; this.isSubscriberConnectionRequired = subscriberPrimary; @@ -57,7 +55,7 @@ export class PCTransportManager { this.publisher.onSignalingStatechange = this.updateState; this.subscriber.onSignalingStatechange = this.updateState; - this.state = PCTransportState.IDLE; + this.state = PCTransportState.DISCONNECTED; this.connectionLock = new Mutex(); } @@ -76,9 +74,22 @@ export class PCTransportManager { return this.publisher.createAndSendOffer(options); } - close() { - this.publisher.close(); - this.subscriber.close(); + async close() { + if (this.publisher && this.publisher.getSignallingState() !== 'closed') { + const publisher = this.publisher; + for (const sender of publisher.getSenders()) { + try { + // TODO: react-native-webrtc doesn't have removeTrack yet. + if (publisher.canRemoveTrack()) { + publisher.removeTrack(sender); + } + } catch (e) { + log.warn('could not removeTrack', { error: e }); + } + } + } + await Promise.all([this.publisher.close(), this.subscriber.close()]); + this.updateState(); } async triggerIceRestart() { @@ -128,17 +139,15 @@ export class PCTransportManager { this.state = PCTransportState.FAILED; } else if (connectionStates.some((st) => st === 'connecting')) { this.state = PCTransportState.CONNECTING; - } else if (connectionStates.some((st) => st === 'closed')) { - this.state = PCTransportState.CLOSED; + } else if (connectionStates.every((st) => st === 'closed')) { + this.state = PCTransportState.DISCONNECTED; + } else if (connectionStates.every((st) => st === 'new')) { + this.state = PCTransportState.DISCONNECTED; } if (previousState !== this.state) { this.onStateChange?.(this.state); - log.info('pc state', { - overall: this.state, - publisher: getPCState(this.publisher), - subscriber: getPCState(this.subscriber), - }); + log.info(`pc state: ${PCTransportState[this.state]}`); } }; diff --git a/src/room/RTCEngine.ts b/src/room/RTCEngine.ts index 2f5427c6ef..4314d38e5c 100644 --- a/src/room/RTCEngine.ts +++ b/src/room/RTCEngine.ts @@ -201,7 +201,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit this.subscriberPrimary = joinResponse.subscriberPrimary; if (!this.publisher) { - this.configure(joinResponse); + await this.configure(joinResponse); } // create offer @@ -246,27 +246,8 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit } async cleanupPeerConnections() { - if (this.publisher && this.publisher.getSignallingState() !== 'closed') { - const publisher = this.publisher; - for (const sender of publisher.getSenders()) { - try { - // TODO: react-native-webrtc doesn't have removeTrack yet. - if (publisher.canRemoveTrack()) { - await publisher.removeTrack(sender); - } - } catch (e) { - log.warn('could not removeTrack', { error: e }); - } - } - } - if (this.publisher) { - this.publisher.close(); - this.publisher = undefined; - } - if (this.subscriber) { - this.subscriber.close(); - this.subscriber = undefined; - } + await this.pcManager?.close(); + this.primaryTransport = undefined; const dcCleanup = (dc: RTCDataChannel | undefined) => { @@ -363,7 +344,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit this.regionUrlProvider = provider; } - private configure(joinResponse: JoinResponse) { + private async configure(joinResponse: JoinResponse) { // already configured if (this.publisher || this.subscriber) { return; @@ -382,6 +363,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit // this.publisher = new PCTransport(rtcConfig, googConstraints); // this.subscriber = new PCTransport(rtcConfig); + await this.pcManager?.close(); this.pcManager = new PCTransportManager(rtcConfig, joinResponse.subscriberPrimary); this.publisher = this.pcManager.publisher; this.subscriber = this.pcManager.subscriber; From 77eb319c6981eab26bf19eaa3faf2071fe5014bb Mon Sep 17 00:00:00 2001 From: lukasIO Date: Fri, 27 Oct 2023 17:44:18 +0200 Subject: [PATCH 09/26] add subscriber methods to pcmanager --- src/room/PCTransport.ts | 2 +- src/room/PCTransportManager.ts | 45 +++++++++++++++++++++++++++++++- src/room/RTCEngine.ts | 47 ++++++++++------------------------ 3 files changed, 59 insertions(+), 35 deletions(-) diff --git a/src/room/PCTransport.ts b/src/room/PCTransport.ts index da0ab03ef9..4b2d74ec52 100644 --- a/src/room/PCTransport.ts +++ b/src/room/PCTransport.ts @@ -3,7 +3,7 @@ import type { MediaDescription } from 'sdp-transform'; import { parse, write } from 'sdp-transform'; import { debounce } from 'ts-debounce'; import log from '../logger'; -import { NegotiationError, UnexpectedConnectionState } from './errors'; +import { NegotiationError } from './errors'; import { ddExtensionURI, isChromiumBased, isSVCCodec } from './utils'; /** @internal */ diff --git a/src/room/PCTransportManager.ts b/src/room/PCTransportManager.ts index bfbe65e08e..7e9fc6d9a9 100644 --- a/src/room/PCTransportManager.ts +++ b/src/room/PCTransportManager.ts @@ -1,4 +1,5 @@ import log from '../logger'; +import { SignalTarget } from '../proto/livekit_rtc_pb'; import PCTransport from './PCTransport'; import { roomConnectOptionDefaults } from './defaults'; import { ConnectionError, ConnectionErrorReason } from './errors'; @@ -33,6 +34,12 @@ export class PCTransportManager { public onStateChange?: (state: PCTransportState) => void; + public onIceCandidate?: (ev: RTCIceCandidate, target: SignalTarget) => void; + + public onDataChannel?: (ev: RTCDataChannelEvent) => void; + + public onTrack?: (ev: RTCTrackEvent) => void; + private isPublisherConnectionRequired: boolean; private isSubscriberConnectionRequired: boolean; @@ -54,6 +61,19 @@ export class PCTransportManager { this.subscriber.onIceConnectionStateChange = this.updateState; this.publisher.onSignalingStatechange = this.updateState; this.subscriber.onSignalingStatechange = this.updateState; + this.publisher.onIceCandidate = (candidate) => { + this.onIceCandidate?.(candidate, SignalTarget.PUBLISHER); + }; + this.subscriber.onIceCandidate = (candidate) => { + this.onIceCandidate?.(candidate, SignalTarget.SUBSCRIBER); + }; + // in subscriber primary mode, server side opens sub data channels. + this.subscriber.onDataChannel = (ev) => { + this.onDataChannel?.(ev); + }; + this.subscriber.onTrack = (ev) => { + this.onTrack?.(ev); + }; this.state = PCTransportState.DISCONNECTED; @@ -100,9 +120,32 @@ export class PCTransportManager { } } - updateConfiguration(config: RTCConfiguration) { + async addIceCandidate(candidate: RTCIceCandidateInit, target: SignalTarget) { + if (target === SignalTarget.PUBLISHER) { + await this.publisher.addIceCandidate(candidate); + } else { + await this.subscriber.addIceCandidate(candidate); + } + } + + async createAnswerFromOffer(sd: RTCSessionDescriptionInit) { + log.debug('received server offer', { + RTCSdpType: sd.type, + signalingState: this.subscriber.getSignallingState().toString(), + }); + await this.subscriber.setRemoteDescription(sd); + + // answer the offer + const answer = await this.subscriber.createAndSetAnswer(); + return answer; + } + + updateConfiguration(config: RTCConfiguration, iceRestart?: boolean) { this.publisher.setConfiguration(config); this.subscriber.setConfiguration(config); + if (iceRestart) { + this.triggerIceRestart(); + } } async ensurePCTransportConnection(abortController?: AbortController, timeout?: number) { diff --git a/src/room/RTCEngine.ts b/src/room/RTCEngine.ts index 4314d38e5c..4db320afaa 100644 --- a/src/room/RTCEngine.ts +++ b/src/room/RTCEngine.ts @@ -76,7 +76,7 @@ enum PCState { export default class RTCEngine extends (EventEmitter as new () => TypedEventEmitter) { publisher?: PCTransport; - subscriber?: PCTransport; + // subscriber?: PCTransport; client: SignalClient; @@ -346,7 +346,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit private async configure(joinResponse: JoinResponse) { // already configured - if (this.publisher || this.subscriber) { + if (this.pcManager && this.pcManager.currentState !== PCTransportState.DISCONNECTED) { return; } @@ -363,20 +363,14 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit // this.publisher = new PCTransport(rtcConfig, googConstraints); // this.subscriber = new PCTransport(rtcConfig); - await this.pcManager?.close(); this.pcManager = new PCTransportManager(rtcConfig, joinResponse.subscriberPrimary); this.publisher = this.pcManager.publisher; - this.subscriber = this.pcManager.subscriber; - - this.emit(EngineEvent.TransportsCreated, this.publisher, this.subscriber); + // this.subscriber = this.pcManager.subscriber; - this.publisher.onIceCandidate = (candidate) => { - log.trace('adding ICE candidate for peer', candidate); - this.client.sendIceCandidate(candidate, SignalTarget.PUBLISHER); - }; + this.emit(EngineEvent.TransportsCreated, this.pcManager.publisher, this.pcManager.subscriber); - this.subscriber.onIceCandidate = (candidate) => { - this.client.sendIceCandidate(candidate, SignalTarget.SUBSCRIBER); + this.pcManager.onIceCandidate = (candidate, target) => { + this.client.sendIceCandidate(candidate, target); }; this.publisher.onOffer = (offer) => { @@ -386,10 +380,9 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit let primaryTransport = this.publisher; let subscriberPrimary = joinResponse.subscriberPrimary; if (subscriberPrimary) { - primaryTransport = this.subscriber; - // in subscriber primary mode, server side opens sub data channels. - this.subscriber.onDataChannel = this.handleDataChannel; + primaryTransport = this.pcManager.subscriber; } + this.pcManager.onDataChannel = this.handleDataChannel; this.primaryTransport = primaryTransport; this.pcManager.onStateChange = async (connectionState) => { log.debug(`primary PC state changed ${connectionState}`); @@ -413,8 +406,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit } } }; - - this.subscriber.onTrack = (ev: RTCTrackEvent) => { + this.pcManager.onTrack = (ev: RTCTrackEvent) => { this.emit(EngineEvent.MediaTrackAdded, ev.track, ev.streams[0], ev.receiver); }; @@ -436,30 +428,19 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit // add candidate on trickle this.client.onTrickle = (candidate, target) => { - if (!this.publisher || !this.subscriber) { + if (!this.pcManager) { return; } log.trace('got ICE candidate from peer', { candidate, target }); - if (target === SignalTarget.PUBLISHER) { - this.publisher.addIceCandidate(candidate); - } else { - this.subscriber.addIceCandidate(candidate); - } + this.pcManager.addIceCandidate(candidate, target); }; // when server creates an offer for the client this.client.onOffer = async (sd) => { - if (!this.subscriber) { + if (!this.pcManager) { return; } - log.debug('received server offer', { - RTCSdpType: sd.type, - signalingState: this.subscriber.getSignallingState().toString(), - }); - await this.subscriber.setRemoteDescription(sd); - - // answer the offer - const answer = await this.subscriber.createAndSetAnswer(); + const answer = await this.pcManager.createAnswerFromOffer(sd); this.client.sendAnswer(answer); }; @@ -1110,7 +1091,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit kind: DataPacket_Kind, subscriber: boolean = this.subscriberPrimary, ) { - const transport = subscriber ? this.subscriber : this.publisher; + const transport = subscriber ? this.pcManager?.subscriber : this.publisher; const transportName = subscriber ? 'Subscriber' : 'Publisher'; if (!transport) { throw new ConnectionError(`${transportName} connection not set`); From e57148cc17238b147c8f02af9b6c480f24aa7027 Mon Sep 17 00:00:00 2001 From: lukasIO Date: Fri, 27 Oct 2023 18:08:29 +0200 Subject: [PATCH 10/26] wip publisher methods on pcmanager --- src/room/PCTransportManager.ts | 21 +++++++++++++++++++++ src/room/RTCEngine.ts | 31 +++++++++++++++---------------- 2 files changed, 36 insertions(+), 16 deletions(-) diff --git a/src/room/PCTransportManager.ts b/src/room/PCTransportManager.ts index 7e9fc6d9a9..e3ebcaf9b6 100644 --- a/src/room/PCTransportManager.ts +++ b/src/room/PCTransportManager.ts @@ -40,6 +40,8 @@ export class PCTransportManager { public onTrack?: (ev: RTCTrackEvent) => void; + public onLocalOffer?: (offer: RTCSessionDescriptionInit) => void; + private isPublisherConnectionRequired: boolean; private isSubscriberConnectionRequired: boolean; @@ -74,6 +76,9 @@ export class PCTransportManager { this.subscriber.onTrack = (ev) => { this.onTrack?.(ev); }; + this.publisher.onOffer = (offer) => { + this.onLocalOffer?.(offer); + }; this.state = PCTransportState.DISCONNECTED; @@ -94,6 +99,10 @@ export class PCTransportManager { return this.publisher.createAndSendOffer(options); } + removeTrack(sender: RTCRtpSender) { + return this.publisher.removeTrack(sender); + } + async close() { if (this.publisher && this.publisher.getSignallingState() !== 'closed') { const publisher = this.publisher; @@ -161,6 +170,18 @@ export class PCTransportManager { } } + addTransceiver(track: MediaStreamTrack, transceiverInit: RTCRtpTransceiverInit) { + return this.publisher.addTransceiver(track, transceiverInit); + } + + addTrack(track: MediaStreamTrack) { + return this.publisher.addTrack(track); + } + + createDataChannel(label: string, dataChannelDict: RTCDataChannelInit) { + return this.publisher.createDataChannel(label, dataChannelDict); + } + private get requiredTransports() { const transports: PCTransport[] = []; if (this.isPublisherConnectionRequired) { diff --git a/src/room/RTCEngine.ts b/src/room/RTCEngine.ts index 4db320afaa..e7dbfcb9fc 100644 --- a/src/room/RTCEngine.ts +++ b/src/room/RTCEngine.ts @@ -18,13 +18,12 @@ import { TrackInfo, UserPacket, } from '../proto/livekit_models_pb'; -import { +import type { AddTrackRequest, ConnectionQualityUpdate, JoinResponse, LeaveRequest, ReconnectResponse, - SignalTarget, StreamStateUpdate, SubscriptionPermissionUpdate, SubscriptionResponse, @@ -74,7 +73,7 @@ enum PCState { /** @internal */ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmitter) { - publisher?: PCTransport; + // publisher?: PCTransport; // subscriber?: PCTransport; @@ -200,7 +199,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit this.latestJoinResponse = joinResponse; this.subscriberPrimary = joinResponse.subscriberPrimary; - if (!this.publisher) { + if (!this.pcManager) { await this.configure(joinResponse); } @@ -316,7 +315,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit delete this.pendingTrackResolvers[sender.track.id]; } try { - this.publisher?.removeTrack(sender); + this.pcManager!.removeTrack(sender); return true; } catch (e: unknown) { log.warn('failed to remove track', { error: e, method: 'removeTrack' }); @@ -373,7 +372,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit this.client.sendIceCandidate(candidate, target); }; - this.publisher.onOffer = (offer) => { + this.pcManager.onLocalOffer = (offer) => { this.client.sendOffer(offer); }; @@ -512,7 +511,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit } private createDataChannels() { - if (!this.publisher) { + if (!this.pcManager) { return; } @@ -527,12 +526,12 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit } // create data channels - this.lossyDC = this.publisher.createDataChannel(lossyDataChannel, { + this.lossyDC = this.pcManager.createDataChannel(lossyDataChannel, { // will drop older packets that arrive ordered: true, maxRetransmits: 0, }); - this.reliableDC = this.publisher.createDataChannel(reliableDataChannel, { + this.reliableDC = this.pcManager.createDataChannel(reliableDataChannel, { ordered: true, }); @@ -698,7 +697,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit opts: TrackPublishOptions, encodings?: RTCRtpEncodingParameters[], ) { - if (!this.publisher) { + if (!this.pcManager) { throw new UnexpectedConnectionState('publisher is closed'); } @@ -713,7 +712,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit transceiverInit.sendEncodings = encodings; } // addTransceiver for react-native is async. web is synchronous, but await won't effect it. - const transceiver = await this.publisher.addTransceiver( + const transceiver = await this.pcManager.addTransceiver( track.mediaStreamTrack, transceiverInit, ); @@ -731,7 +730,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit opts: TrackPublishOptions, encodings?: RTCRtpEncodingParameters[], ) { - if (!this.publisher) { + if (!this.pcManager) { throw new UnexpectedConnectionState('publisher is closed'); } const transceiverInit: RTCRtpTransceiverInit = { direction: 'sendonly' }; @@ -739,7 +738,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit transceiverInit.sendEncodings = encodings; } // addTransceiver for react-native is async. web is synchronous, but await won't effect it. - const transceiver = await this.publisher.addTransceiver( + const transceiver = await this.pcManager.addTransceiver( simulcastTrack.mediaStreamTrack, transceiverInit, ); @@ -752,10 +751,10 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit } private async createRTCRtpSender(track: MediaStreamTrack) { - if (!this.publisher) { + if (!this.pcManager) { throw new UnexpectedConnectionState('publisher is closed'); } - return this.publisher.addTrack(track); + return this.pcManager.addTrack(track); } // websocket reconnect behavior. if websocket is interrupted, and the PeerConnection @@ -1091,7 +1090,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit kind: DataPacket_Kind, subscriber: boolean = this.subscriberPrimary, ) { - const transport = subscriber ? this.pcManager?.subscriber : this.publisher; + const transport = subscriber ? this.pcManager?.subscriber : this.pcManager?.publisher; const transportName = subscriber ? 'Subscriber' : 'Publisher'; if (!transport) { throw new ConnectionError(`${transportName} connection not set`); From a6a4855cef25e5f5b1d5b63f75beac0b4c0300dc Mon Sep 17 00:00:00 2001 From: lukasIO Date: Fri, 3 Nov 2023 11:41:41 +0200 Subject: [PATCH 11/26] move primary transport handling into manager --- src/connectionHelper/checks/webrtc.ts | 4 +- src/room/PCTransportManager.ts | 18 ++++--- src/room/RTCEngine.ts | 69 ++++++++++++------------ src/room/Room.ts | 4 +- src/room/participant/LocalParticipant.ts | 16 +++--- 5 files changed, 57 insertions(+), 54 deletions(-) diff --git a/src/connectionHelper/checks/webrtc.ts b/src/connectionHelper/checks/webrtc.ts index 9082d847c7..dd85da26c7 100644 --- a/src/connectionHelper/checks/webrtc.ts +++ b/src/connectionHelper/checks/webrtc.ts @@ -38,8 +38,8 @@ export class WebRTCCheck extends Checker { } }; - if (this.room.engine.subscriber) { - this.room.engine.subscriber.onIceCandidateError = (ev) => { + if (this.room.engine.pcManager) { + this.room.engine.pcManager.subscriber.onIceCandidateError = (ev) => { if (ev instanceof RTCPeerConnectionIceErrorEvent) { this.appendWarning( `error with ICE candidate: ${ev.errorCode} ${ev.errorText} ${ev.url}`, diff --git a/src/room/PCTransportManager.ts b/src/room/PCTransportManager.ts index e3ebcaf9b6..8e62ed0730 100644 --- a/src/room/PCTransportManager.ts +++ b/src/room/PCTransportManager.ts @@ -182,6 +182,10 @@ export class PCTransportManager { return this.publisher.createDataChannel(label, dataChannelDict); } + getConnectedAddress() { + return this.requiredTransports[0].getConnectedAddress(); + } + private get requiredTransports() { const transports: PCTransport[] = []; if (this.isPublisherConnectionRequired) { @@ -270,10 +274,10 @@ export class PCTransportManager { } } -function getPCState(pcTransport: PCTransport) { - return { - connectionState: pcTransport.getConnectionState(), - iceState: pcTransport.getICEConnectionState(), - signallingState: pcTransport.getSignallingState(), - }; -} +// function getPCState(pcTransport: PCTransport) { +// return { +// connectionState: pcTransport.getConnectionState(), +// iceState: pcTransport.getICEConnectionState(), +// signallingState: pcTransport.getSignallingState(), +// }; +// } diff --git a/src/room/RTCEngine.ts b/src/room/RTCEngine.ts index e7dbfcb9fc..e37c321e10 100644 --- a/src/room/RTCEngine.ts +++ b/src/room/RTCEngine.ts @@ -110,7 +110,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit private subscriberPrimary: boolean = false; - private primaryTransport?: PCTransport; + // private primaryTransport?: PCTransport; private pcState: PCState = PCState.New; @@ -247,7 +247,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit async cleanupPeerConnections() { await this.pcManager?.close(); - this.primaryTransport = undefined; + // this.primaryTransport = undefined; const dcCleanup = (dc: RTCDataChannel | undefined) => { if (!dc) return; @@ -332,10 +332,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit } async getConnectedServerAddress(): Promise { - if (this.primaryTransport === undefined) { - return undefined; - } - return this.primaryTransport.getConnectedAddress(); + return this.pcManager?.getConnectedAddress(); } /* @internal */ @@ -360,11 +357,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit rtcConfig.encodedInsertableStreams = true; } - // this.publisher = new PCTransport(rtcConfig, googConstraints); - // this.subscriber = new PCTransport(rtcConfig); this.pcManager = new PCTransportManager(rtcConfig, joinResponse.subscriberPrimary); - this.publisher = this.pcManager.publisher; - // this.subscriber = this.pcManager.subscriber; this.emit(EngineEvent.TransportsCreated, this.pcManager.publisher, this.pcManager.subscriber); @@ -376,13 +369,13 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit this.client.sendOffer(offer); }; - let primaryTransport = this.publisher; let subscriberPrimary = joinResponse.subscriberPrimary; if (subscriberPrimary) { - primaryTransport = this.pcManager.subscriber; + this.pcManager.requireSubscriber(); + } else { + this.pcManager.requirePublisher(); } this.pcManager.onDataChannel = this.handleDataChannel; - this.primaryTransport = primaryTransport; this.pcManager.onStateChange = async (connectionState) => { log.debug(`primary PC state changed ${connectionState}`); if (connectionState === PCTransportState.CONNECTED) { @@ -415,14 +408,14 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit private setupSignalClientCallbacks() { // configure signaling client this.client.onAnswer = async (sd) => { - if (!this.publisher) { + if (!this.pcManager) { return; } log.debug('received server answer', { RTCSdpType: sd.type, - signalingState: this.publisher.getSignallingState().toString(), + signalingState: this.pcManager.publisher.getSignallingState().toString(), }); - await this.publisher.setRemoteDescription(sd); + await this.pcManager.publisher.setRemoteDescription(sd); }; // add candidate on trickle @@ -465,7 +458,6 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit this.client.onLeave = (leave?: LeaveRequest) => { if (leave?.canReconnect) { this.fullReconnectOnNext = true; - this.primaryTransport = undefined; // reconnect immediately instead of waiting for next attempt this.handleDisconnect(leaveReconnect); } else { @@ -819,7 +811,8 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit this.clientConfiguration?.resumeConnection === ClientConfigSetting.DISABLED || // signaling state could change to closed due to hardware sleep // those connections cannot be resumed - (this.primaryTransport?.getSignallingState() ?? 'closed') === 'closed' + (this.pcManager?.currentState ?? PCTransportState.DISCONNECTED) === + PCTransportState.DISCONNECTED ) { this.fullReconnectOnNext = true; } @@ -1098,8 +1091,8 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit if ( !subscriber && - !this.publisher?.isICEConnected && - this.publisher?.getICEConnectionState() !== 'checking' + !this.pcManager?.publisher.isICEConnected && + this.pcManager?.publisher.getICEConnectionState() !== 'checking' ) { // start negotiation this.negotiate(); @@ -1138,11 +1131,14 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit } // primary connection if (this.pcManager.currentState !== PCTransportState.CONNECTED) { + console.log('pc man state', this.pcManager.currentState); return false; } // ensure signal is connected if (!this.client.ws || this.client.ws.readyState === WebSocket.CLOSED) { + console.log('ws state', this.client.ws?.readyState); + return false; } return true; @@ -1152,8 +1148,8 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit negotiate(): Promise { // observe signal state return new Promise((resolve, reject) => { - if (!this.publisher) { - reject(new NegotiationError('publisher is not defined')); + if (!this.pcManager) { + reject(new NegotiationError('pc manager is not defined')); return; } @@ -1181,25 +1177,28 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit this.off(EngineEvent.Closing, handleClosed); }; - this.publisher.once(PCEvents.NegotiationStarted, () => { - this.publisher?.once(PCEvents.NegotiationComplete, () => { + this.pcManager.publisher.once(PCEvents.NegotiationStarted, () => { + this.pcManager?.publisher?.once(PCEvents.NegotiationComplete, () => { cleanup(); resolve(); }); }); - this.publisher.once(PCEvents.RTPVideoPayloadTypes, (rtpTypes: MediaAttributes['rtp']) => { - const rtpMap = new Map(); - rtpTypes.forEach((rtp) => { - const codec = rtp.codec.toLowerCase(); - if (isVideoCodec(codec)) { - rtpMap.set(rtp.payload, codec); - } - }); - this.emit(EngineEvent.RTPVideoMapUpdate, rtpMap); - }); + this.pcManager.publisher.once( + PCEvents.RTPVideoPayloadTypes, + (rtpTypes: MediaAttributes['rtp']) => { + const rtpMap = new Map(); + rtpTypes.forEach((rtp) => { + const codec = rtp.codec.toLowerCase(); + if (isVideoCodec(codec)) { + rtpMap.set(rtp.payload, codec); + } + }); + this.emit(EngineEvent.RTPVideoMapUpdate, rtpMap); + }, + ); - this.publisher.negotiate((e) => { + this.pcManager.publisher.negotiate((e) => { cleanup(); reject(e); if (e instanceof NegotiationError) { diff --git a/src/room/Room.ts b/src/room/Room.ts index b4688e51cf..61117f4993 100644 --- a/src/room/Room.ts +++ b/src/room/Room.ts @@ -1546,8 +1546,8 @@ class Room extends (EventEmitter as new () => TypedEmitter) } private sendSyncState() { - const previousAnswer = this.engine.subscriber?.getLocalDescription(); - const previousOffer = this.engine.subscriber?.getRemoteDescription(); + const previousAnswer = this.engine.pcManager?.subscriber.getLocalDescription(); + const previousOffer = this.engine.pcManager?.subscriber.getRemoteDescription(); if (!previousAnswer) { return; diff --git a/src/room/participant/LocalParticipant.ts b/src/room/participant/LocalParticipant.ts index e93d8024a9..27019c0c55 100644 --- a/src/room/participant/LocalParticipant.ts +++ b/src/room/participant/LocalParticipant.ts @@ -778,8 +778,8 @@ export default class LocalParticipant extends Participant { publication.options = opts; track.sid = ti.sid; - if (!this.engine.publisher) { - throw new UnexpectedConnectionState('publisher is closed'); + if (!this.engine.pcManager) { + throw new UnexpectedConnectionState('pcManager is not ready'); } log.debug(`publishing ${track.kind} with encodings`, { encodings, trackInfo: ti }); @@ -796,21 +796,21 @@ export default class LocalParticipant extends Participant { fix the issue. */ let trackTransceiver: RTCRtpTransceiver | undefined = undefined; - for (const transceiver of this.engine.publisher.getTransceivers()) { + for (const transceiver of this.engine.pcManager.publisher.getTransceivers()) { if (transceiver.sender === track.sender) { trackTransceiver = transceiver; break; } } if (trackTransceiver) { - this.engine.publisher.setTrackCodecBitrate({ + this.engine.pcManager.publisher.setTrackCodecBitrate({ transceiver: trackTransceiver, codec: 'opus', maxbr: encodings[0]?.maxBitrate ? encodings[0].maxBitrate / 1000 : 0, }); } } else if (track.codec && isSVCCodec(track.codec) && encodings[0]?.maxBitrate) { - this.engine.publisher.setTrackCodecBitrate({ + this.engine.pcManager.publisher.setTrackCodecBitrate({ cid: req.cid, codec: track.codec, maxbr: encodings[0].maxBitrate / 1000, @@ -942,12 +942,12 @@ export default class LocalParticipant extends Participant { const trackSender = track.sender; track.sender = undefined; if ( - this.engine.publisher && - this.engine.publisher.getConnectionState() !== 'closed' && + this.engine.pcManager?.publisher && + this.engine.pcManager?.publisher.getConnectionState() !== 'closed' && trackSender ) { try { - for (const transceiver of this.engine.publisher.getTransceivers()) { + for (const transceiver of this.engine.pcManager.publisher.getTransceivers()) { // if sender is not currently sending (after replaceTrack(null)) // removeTrack would have no effect. // to ensure we end up successfully removing the track, manually set From 59391fcf4be2a774075bf15ae56ce997978531cc Mon Sep 17 00:00:00 2001 From: lukasIO Date: Fri, 3 Nov 2023 11:43:01 +0200 Subject: [PATCH 12/26] fix e2ee reconnect behaviour --- src/room/RTCEngine.ts | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/room/RTCEngine.ts b/src/room/RTCEngine.ts index e37c321e10..420bb97d6d 100644 --- a/src/room/RTCEngine.ts +++ b/src/room/RTCEngine.ts @@ -350,13 +350,6 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit const rtcConfig = this.makeRTCConfiguration(joinResponse); - if (this.signalOpts?.e2eeEnabled) { - log.debug('E2EE - setting up transports with insertable streams'); - // this makes sure that no data is sent before the transforms are ready - // @ts-ignore - rtcConfig.encodedInsertableStreams = true; - } - this.pcManager = new PCTransportManager(rtcConfig, joinResponse.subscriberPrimary); this.emit(EngineEvent.TransportsCreated, this.pcManager.publisher, this.pcManager.subscriber); @@ -471,6 +464,13 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit private makeRTCConfiguration(serverResponse: JoinResponse | ReconnectResponse): RTCConfiguration { const rtcConfig = { ...this.rtcConfig }; + if (this.signalOpts?.e2eeEnabled) { + log.debug('E2EE - setting up transports with insertable streams'); + // this makes sure that no data is sent before the transforms are ready + // @ts-ignore + rtcConfig.encodedInsertableStreams = true; + } + // update ICE servers before creating PeerConnection if (serverResponse.iceServers && !rtcConfig.iceServers) { const rtcIceServers: RTCIceServer[] = []; From 3e0bbf216b91dd78d5f8c29f8af2df53276bf469 Mon Sep 17 00:00:00 2001 From: lukasIO Date: Fri, 3 Nov 2023 12:51:23 +0200 Subject: [PATCH 13/26] fix publisher reconnect scenarios --- src/room/PCTransport.ts | 4 ++-- src/room/PCTransportManager.ts | 33 +++++++++++++++++++++++++++++---- src/room/RTCEngine.ts | 12 +++--------- 3 files changed, 34 insertions(+), 15 deletions(-) diff --git a/src/room/PCTransport.ts b/src/room/PCTransport.ts index 4b2d74ec52..5486368fa9 100644 --- a/src/room/PCTransport.ts +++ b/src/room/PCTransport.ts @@ -34,7 +34,7 @@ export default class PCTransport extends EventEmitter { private get pc() { if (!this._pc) { - this._pc = new RTCPeerConnection(this.config); // FIXME this seems to leak peer connections + this._pc = this.createPC(); // FIXME this seems to leak peer connections } return this._pc; } @@ -194,7 +194,7 @@ export default class PCTransport extends EventEmitter { if (this.renegotiate) { this.renegotiate = false; - this.createAndSendOffer(); + await this.createAndSendOffer(); } else if (sd.type === 'answer') { this.emit(PCEvents.NegotiationComplete); if (sd.sdp) { diff --git a/src/room/PCTransportManager.ts b/src/room/PCTransportManager.ts index 8e62ed0730..6be5da144f 100644 --- a/src/room/PCTransportManager.ts +++ b/src/room/PCTransportManager.ts @@ -32,7 +32,11 @@ export class PCTransportManager { return this.state; } - public onStateChange?: (state: PCTransportState) => void; + public onStateChange?: ( + state: PCTransportState, + pubState: RTCPeerConnectionState, + subState: RTCPeerConnectionState, + ) => void; public onIceCandidate?: (ev: RTCIceCandidate, target: SignalTarget) => void; @@ -159,7 +163,22 @@ export class PCTransportManager { async ensurePCTransportConnection(abortController?: AbortController, timeout?: number) { const unlock = await this.connectionLock.lock(); + try { + if ( + this.isPublisherConnectionRequired && + this.publisher.getConnectionState() !== 'connected' && + this.publisher.getConnectionState() !== 'connecting' + ) { + console.log('negotiation required, start negotiating'); + this.publisher.negotiate(); + } else { + console.log( + 'no negotiation required', + this.isPublisherConnectionRequired, + this.publisher.getConnectionState(), + ); + } await Promise.all( this.requiredTransports?.map((transport) => this.ensureTransportConnected(transport, abortController, timeout), @@ -212,10 +231,16 @@ export class PCTransportManager { } else if (connectionStates.every((st) => st === 'new')) { this.state = PCTransportState.DISCONNECTED; } - + log.info(`pc state: ${PCTransportState[this.state]}`, { + publisher: this.publisher.getConnectionState(), + subscriber: this.subscriber.getConnectionState(), + }); if (previousState !== this.state) { - this.onStateChange?.(this.state); - log.info(`pc state: ${PCTransportState[this.state]}`); + this.onStateChange?.( + this.state, + this.publisher.getConnectionState(), + this.subscriber.getConnectionState(), + ); } }; diff --git a/src/room/RTCEngine.ts b/src/room/RTCEngine.ts index 420bb97d6d..7761461ba2 100644 --- a/src/room/RTCEngine.ts +++ b/src/room/RTCEngine.ts @@ -362,14 +362,8 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit this.client.sendOffer(offer); }; - let subscriberPrimary = joinResponse.subscriberPrimary; - if (subscriberPrimary) { - this.pcManager.requireSubscriber(); - } else { - this.pcManager.requirePublisher(); - } this.pcManager.onDataChannel = this.handleDataChannel; - this.pcManager.onStateChange = async (connectionState) => { + this.pcManager.onStateChange = async (connectionState, publisherState, subscriberState) => { log.debug(`primary PC state changed ${connectionState}`); if (connectionState === PCTransportState.CONNECTED) { const shouldEmit = this.pcState === PCState.New; @@ -384,7 +378,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit this.handleDisconnect( 'peerconnection failed', - subscriberPrimary // FIXME actually determine which peer connection failed + subscriberState === 'failed' ? ReconnectReason.RR_SUBSCRIBER_FAILED : ReconnectReason.RR_PUBLISHER_FAILED, ); @@ -1153,7 +1147,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit return; } - this.pcManager?.requirePublisher(); + this.pcManager.requirePublisher(); const handleClosed = () => { log.debug('engine disconnected while negotiation was ongoing'); From 7a1044ad19b73d087266f1236e75cbd48e949d37 Mon Sep 17 00:00:00 2001 From: lukasIO Date: Fri, 3 Nov 2023 15:37:21 +0200 Subject: [PATCH 14/26] wip transport lifecycle --- src/room/PCTransport.ts | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/room/PCTransport.ts b/src/room/PCTransport.ts index 5486368fa9..6c9ec803b1 100644 --- a/src/room/PCTransport.ts +++ b/src/room/PCTransport.ts @@ -83,6 +83,7 @@ export default class PCTransport extends EventEmitter { ? // @ts-expect-error chrome allows additional media constraints to be passed into the RTCPeerConnection constructor new RTCPeerConnection(this.config, this.mediaConstraints) : new RTCPeerConnection(this.config); + pc.onicecandidate = (ev) => { if (!ev.candidate) return; this.onIceCandidate?.(ev.candidate); @@ -235,11 +236,11 @@ export default class PCTransport extends EventEmitter { if (this._pc && this._pc.signalingState === 'have-local-offer') { // we're waiting for the peer to accept our offer, so we'll just wait // the only exception to this is when ICE restart is needed - const currentSD = this.pc.remoteDescription; + const currentSD = this._pc.remoteDescription; if (options?.iceRestart && currentSD) { // TODO: handle when ICE restart is needed but we don't have a remote description // the best thing to do is to recreate the peerconnection - await this.pc.setRemoteDescription(currentSD); + await this._pc.setRemoteDescription(currentSD); } else { this.renegotiate = true; return; From 7ba3e85ac3a98328186e8b3598a7eaa5e2de9589 Mon Sep 17 00:00:00 2001 From: lukasIO Date: Mon, 6 Nov 2023 16:20:23 +0200 Subject: [PATCH 15/26] negotiation in pc manager --- src/room/PCTransportManager.ts | 50 ++++++++++++++++-- src/room/RTCEngine.ts | 67 +++++++++++------------- src/room/Room.ts | 7 ++- src/room/participant/LocalParticipant.ts | 5 +- 4 files changed, 85 insertions(+), 44 deletions(-) diff --git a/src/room/PCTransportManager.ts b/src/room/PCTransportManager.ts index 6be5da144f..32680a0f87 100644 --- a/src/room/PCTransportManager.ts +++ b/src/room/PCTransportManager.ts @@ -1,16 +1,18 @@ import log from '../logger'; import { SignalTarget } from '../proto/livekit_rtc_pb'; -import PCTransport from './PCTransport'; +import PCTransport, { PCEvents } from './PCTransport'; import { roomConnectOptionDefaults } from './defaults'; import { ConnectionError, ConnectionErrorReason } from './errors'; import CriticalTimers from './timers'; import { Mutex, sleep } from './utils'; export enum PCTransportState { - DISCONNECTED, + NEW, CONNECTING, CONNECTED, FAILED, + CLOSING, + CLOSED, } export class PCTransportManager { @@ -84,7 +86,7 @@ export class PCTransportManager { this.onLocalOffer?.(offer); }; - this.state = PCTransportState.DISCONNECTED; + this.state = PCTransportState.NEW; this.connectionLock = new Mutex(); } @@ -103,6 +105,10 @@ export class PCTransportManager { return this.publisher.createAndSendOffer(options); } + setAnswer(sd: RTCSessionDescriptionInit) { + return this.publisher.setRemoteDescription(sd); + } + removeTrack(sender: RTCRtpSender) { return this.publisher.removeTrack(sender); } @@ -189,6 +195,38 @@ export class PCTransportManager { } } + async negotiate(abortController: AbortController) { + console.log('negotiation requested'); + return new Promise(async (resolve, reject) => { + const negotiationTimeout = setTimeout(() => { + reject('negotiation timed out'); + }, this.peerConnectionTimeout); + + const abortHandler = () => { + clearTimeout(negotiationTimeout); + reject('negotiation aborted'); + }; + + abortController.signal.addEventListener('abort', abortHandler); + this.publisher.once(PCEvents.NegotiationStarted, () => { + console.log('negotiation started'); + if (abortController.signal.aborted) { + return; + } + this.publisher.once(PCEvents.NegotiationComplete, () => { + console.log('negotiation complete'); + clearTimeout(negotiationTimeout); + resolve(); + }); + }); + + await this.publisher.negotiate((e) => { + clearTimeout(negotiationTimeout); + reject(e); + }); + }); + } + addTransceiver(track: MediaStreamTrack, transceiverInit: RTCRtpTransceiverInit) { return this.publisher.addTransceiver(track, transceiverInit); } @@ -227,9 +265,11 @@ export class PCTransportManager { } else if (connectionStates.some((st) => st === 'connecting')) { this.state = PCTransportState.CONNECTING; } else if (connectionStates.every((st) => st === 'closed')) { - this.state = PCTransportState.DISCONNECTED; + this.state = PCTransportState.CLOSED; + } else if (connectionStates.some((st) => st === 'closed')) { + this.state = PCTransportState.CLOSING; } else if (connectionStates.every((st) => st === 'new')) { - this.state = PCTransportState.DISCONNECTED; + this.state = PCTransportState.NEW; } log.info(`pc state: ${PCTransportState[this.state]}`, { publisher: this.publisher.getConnectionState(), diff --git a/src/room/RTCEngine.ts b/src/room/RTCEngine.ts index f2a3cc0e1c..4c97098d95 100644 --- a/src/room/RTCEngine.ts +++ b/src/room/RTCEngine.ts @@ -246,6 +246,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit async cleanupPeerConnections() { await this.pcManager?.close(); + this.pcManager = undefined; // this.primaryTransport = undefined; @@ -342,7 +343,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit private async configure(joinResponse: JoinResponse) { // already configured - if (this.pcManager && this.pcManager.currentState !== PCTransportState.DISCONNECTED) { + if (this.pcManager && this.pcManager.currentState !== PCTransportState.NEW) { return; } @@ -400,9 +401,8 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit } log.debug('received server answer', { RTCSdpType: sd.type, - signalingState: this.pcManager.publisher.getSignallingState().toString(), }); - await this.pcManager.publisher.setRemoteDescription(sd); + await this.pcManager.setAnswer(sd); }; // add candidate on trickle @@ -811,8 +811,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit this.clientConfiguration?.resumeConnection === ClientConfigSetting.DISABLED || // signaling state could change to closed due to hardware sleep // those connections cannot be resumed - (this.pcManager?.currentState ?? PCTransportState.DISCONNECTED) === - PCTransportState.DISCONNECTED + (this.pcManager?.currentState ?? PCTransportState.NEW) === PCTransportState.NEW ) { this.fullReconnectOnNext = true; } @@ -975,7 +974,10 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit } async waitForPCInitialConnection(timeout?: number, abortController?: AbortController) { - await this.pcManager?.ensurePCTransportConnection(abortController, timeout); + if (!this.pcManager) { + throw new UnexpectedConnectionState('PC manager is closed'); + } + await this.pcManager.ensurePCTransportConnection(abortController, timeout); // if (this.pcState === PCState.Connected) { // return; // } @@ -1021,7 +1023,10 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit log.debug('waiting for peer connection to reconnect'); try { await sleep(minReconnectWait); // FIXME setTimeout again not ideal for a connection critical path - await this.pcManager?.ensurePCTransportConnection(undefined, this.peerConnectionTimeout); + if (!this.pcManager) { + throw new UnexpectedConnectionState('PC manager is closed'); + } + await this.pcManager.ensurePCTransportConnection(undefined, this.peerConnectionTimeout); this.pcState = PCState.Connected; } catch (e: any) { // TODO do we need a `failed` state here for the PC? @@ -1084,7 +1089,10 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit kind: DataPacket_Kind, subscriber: boolean = this.subscriberPrimary, ) { - const transport = subscriber ? this.pcManager?.subscriber : this.pcManager?.publisher; + if (!this.pcManager) { + throw new UnexpectedConnectionState('PC manager is closed'); + } + const transport = subscriber ? this.pcManager.subscriber : this.pcManager.publisher; const transportName = subscriber ? 'Subscriber' : 'Publisher'; if (!transport) { throw new ConnectionError(`${transportName} connection not set`); @@ -1092,8 +1100,8 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit if ( !subscriber && - !this.pcManager?.publisher.isICEConnected && - this.pcManager?.publisher.getICEConnectionState() !== 'checking' + !this.pcManager.publisher.isICEConnected && + this.pcManager.publisher.getICEConnectionState() !== 'checking' ) { // start negotiation this.negotiate(); @@ -1146,19 +1154,21 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit } /** @internal */ - negotiate(): Promise { + async negotiate(): Promise { // observe signal state - return new Promise((resolve, reject) => { + return new Promise(async (resolve, reject) => { if (!this.pcManager) { - reject(new NegotiationError('pc manager is not defined')); + reject(new NegotiationError('PC manager is closed')); return; } this.pcManager.requirePublisher(); + const abortController = new AbortController(); + const handleClosed = () => { + abortController.abort(); log.debug('engine disconnected while negotiation was ongoing'); - cleanup(); resolve(); return; }; @@ -1168,23 +1178,6 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit } this.on(EngineEvent.Closing, handleClosed); - const negotiationTimeout = setTimeout(() => { - reject('negotiation timed out'); - this.handleDisconnect('negotiation', ReconnectReason.RR_SIGNAL_DISCONNECTED); - }, this.peerConnectionTimeout); - - const cleanup = () => { - clearTimeout(negotiationTimeout); - this.off(EngineEvent.Closing, handleClosed); - }; - - this.pcManager.publisher.once(PCEvents.NegotiationStarted, () => { - this.pcManager?.publisher?.once(PCEvents.NegotiationComplete, () => { - cleanup(); - resolve(); - }); - }); - this.pcManager.publisher.once( PCEvents.RTPVideoPayloadTypes, (rtpTypes: MediaAttributes['rtp']) => { @@ -1199,14 +1192,18 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit }, ); - this.pcManager.publisher.negotiate((e) => { - cleanup(); - reject(e); + try { + await this.pcManager.negotiate(abortController); + resolve(); + } catch (e: any) { if (e instanceof NegotiationError) { this.fullReconnectOnNext = true; } this.handleDisconnect('negotiation', ReconnectReason.RR_UNKNOWN); - }); + reject(e); + } finally { + this.off(EngineEvent.Closing, handleClosed); + } }); } diff --git a/src/room/Room.ts b/src/room/Room.ts index eb2b8ff61e..f4ea5a927e 100644 --- a/src/room/Room.ts +++ b/src/room/Room.ts @@ -1551,8 +1551,11 @@ class Room extends (EventEmitter as new () => TypedEmitter) } private sendSyncState() { - const previousAnswer = this.engine.pcManager?.subscriber.getLocalDescription(); - const previousOffer = this.engine.pcManager?.subscriber.getRemoteDescription(); + if (!this.engine.pcManager) { + return; + } + const previousAnswer = this.engine.pcManager.subscriber.getLocalDescription(); + const previousOffer = this.engine.pcManager.subscriber.getRemoteDescription(); if (!previousAnswer) { return; diff --git a/src/room/participant/LocalParticipant.ts b/src/room/participant/LocalParticipant.ts index f4605cc800..d1acc4cf00 100644 --- a/src/room/participant/LocalParticipant.ts +++ b/src/room/participant/LocalParticipant.ts @@ -17,6 +17,7 @@ import { TrackPublishedResponse, TrackUnpublishedResponse, } from '../../proto/livekit_rtc_pb'; +import { PCTransportState } from '../PCTransportManager'; import type RTCEngine from '../RTCEngine'; import { defaultVideoCodec } from '../defaults'; import { DeviceUnsupportedError, TrackInvalidError, UnexpectedConnectionState } from '../errors'; @@ -929,8 +930,8 @@ export default class LocalParticipant extends Participant { const trackSender = track.sender; track.sender = undefined; if ( - this.engine.pcManager?.publisher && - this.engine.pcManager?.publisher.getConnectionState() !== 'closed' && + this.engine.pcManager && + this.engine.pcManager.currentState < PCTransportState.FAILED && trackSender ) { try { From 62cd8157716a09589f316b049da217263c5a4ee1 Mon Sep 17 00:00:00 2001 From: lukasIO Date: Mon, 6 Nov 2023 16:40:22 +0200 Subject: [PATCH 16/26] fix peer connection leak --- src/room/PCTransport.ts | 31 +++++++++++++++++++------------ src/room/PCTransportManager.ts | 2 -- 2 files changed, 19 insertions(+), 14 deletions(-) diff --git a/src/room/PCTransport.ts b/src/room/PCTransport.ts index e406fc3de9..bb0e0d745f 100644 --- a/src/room/PCTransport.ts +++ b/src/room/PCTransport.ts @@ -3,7 +3,7 @@ import type { MediaDescription } from 'sdp-transform'; import { parse, write } from 'sdp-transform'; import { debounce } from 'ts-debounce'; import log from '../logger'; -import { NegotiationError } from './errors'; +import { NegotiationError, UnexpectedConnectionState } from './errors'; import { ddExtensionURI, isChromiumBased, isSVCCodec } from './utils'; /** @internal */ @@ -34,6 +34,7 @@ export default class PCTransport extends EventEmitter { private get pc() { if (!this._pc) { + console.warn('creating new peer connection'); this._pc = this.createPC(); // FIXME this seems to leak peer connections } return this._pc; @@ -334,7 +335,10 @@ export default class PCTransport extends EventEmitter { } addTrack(track: MediaStreamTrack) { - return this.pc.addTrack(track); + if (!this._pc) { + throw new UnexpectedConnectionState('PC closed, cannot add track'); + } + return this._pc.addTrack(track); } setTrackCodecBitrate(info: TrackBitrateInfo) { @@ -342,43 +346,46 @@ export default class PCTransport extends EventEmitter { } setConfiguration(rtcConfig: RTCConfiguration) { - return this.pc.setConfiguration(rtcConfig); + if (!this._pc) { + throw new UnexpectedConnectionState('PC closed, cannot configure'); + } + return this._pc?.setConfiguration(rtcConfig); } canRemoveTrack(): boolean { - return !!this.pc.removeTrack; + return !!this._pc?.removeTrack; } removeTrack(sender: RTCRtpSender) { - return this.pc.removeTrack(sender); + return this._pc?.removeTrack(sender); } getConnectionState() { - return this.pc.connectionState; + return this._pc?.connectionState ?? 'closed'; } getICEConnectionState() { - return this.pc.iceConnectionState; + return this._pc?.iceConnectionState ?? 'closed'; } getSignallingState() { - return this.pc.signalingState; + return this._pc?.signalingState ?? 'closed'; } getTransceivers() { - return this.pc.getTransceivers(); + return this._pc?.getTransceivers() ?? []; } getSenders() { - return this.pc.getSenders(); + return this._pc?.getSenders() ?? []; } getLocalDescription() { - return this.pc.localDescription; + return this._pc?.localDescription; } getRemoteDescription() { - return this.pc.remoteDescription; + return this.pc?.remoteDescription; } async getConnectedAddress(): Promise { diff --git a/src/room/PCTransportManager.ts b/src/room/PCTransportManager.ts index 32680a0f87..3c2c8dba20 100644 --- a/src/room/PCTransportManager.ts +++ b/src/room/PCTransportManager.ts @@ -209,12 +209,10 @@ export class PCTransportManager { abortController.signal.addEventListener('abort', abortHandler); this.publisher.once(PCEvents.NegotiationStarted, () => { - console.log('negotiation started'); if (abortController.signal.aborted) { return; } this.publisher.once(PCEvents.NegotiationComplete, () => { - console.log('negotiation complete'); clearTimeout(negotiationTimeout); resolve(); }); From 5e455bf4c28c7dd184cf77044164edc39d4905b7 Mon Sep 17 00:00:00 2001 From: lukasIO Date: Mon, 13 Nov 2023 11:34:11 +0200 Subject: [PATCH 17/26] cleanup --- src/room/PCTransport.ts | 4 +--- src/room/PCTransportManager.ts | 34 +++++++----------------------- src/room/RTCEngine.ts | 38 ---------------------------------- 3 files changed, 9 insertions(+), 67 deletions(-) diff --git a/src/room/PCTransport.ts b/src/room/PCTransport.ts index bb0e0d745f..5ec4a2d0ce 100644 --- a/src/room/PCTransport.ts +++ b/src/room/PCTransport.ts @@ -34,8 +34,7 @@ export default class PCTransport extends EventEmitter { private get pc() { if (!this._pc) { - console.warn('creating new peer connection'); - this._pc = this.createPC(); // FIXME this seems to leak peer connections + this._pc = this.createPC(); } return this._pc; } @@ -426,7 +425,6 @@ export default class PCTransport extends EventEmitter { } close = () => { - console.warn('closing pc transport'); if (!this._pc) { return; } diff --git a/src/room/PCTransportManager.ts b/src/room/PCTransportManager.ts index 3c2c8dba20..1326077239 100644 --- a/src/room/PCTransportManager.ts +++ b/src/room/PCTransportManager.ts @@ -169,21 +169,14 @@ export class PCTransportManager { async ensurePCTransportConnection(abortController?: AbortController, timeout?: number) { const unlock = await this.connectionLock.lock(); - try { if ( this.isPublisherConnectionRequired && this.publisher.getConnectionState() !== 'connected' && this.publisher.getConnectionState() !== 'connecting' ) { - console.log('negotiation required, start negotiating'); + log.debug('negotiation required, start negotiating'); this.publisher.negotiate(); - } else { - console.log( - 'no negotiation required', - this.isPublisherConnectionRequired, - this.publisher.getConnectionState(), - ); } await Promise.all( this.requiredTransports?.map((transport) => @@ -196,7 +189,6 @@ export class PCTransportManager { } async negotiate(abortController: AbortController) { - console.log('negotiation requested'); return new Promise(async (resolve, reject) => { const negotiationTimeout = setTimeout(() => { reject('negotiation timed out'); @@ -269,11 +261,13 @@ export class PCTransportManager { } else if (connectionStates.every((st) => st === 'new')) { this.state = PCTransportState.NEW; } - log.info(`pc state: ${PCTransportState[this.state]}`, { - publisher: this.publisher.getConnectionState(), - subscriber: this.subscriber.getConnectionState(), - }); + if (previousState !== this.state) { + log.debug( + `pc state change: from ${PCTransportState[previousState]} to ${ + PCTransportState[this.state] + }`, + ); this.onStateChange?.( this.state, this.publisher.getConnectionState(), @@ -291,11 +285,7 @@ export class PCTransportManager { if (connectionState === 'connected') { return; } - // if (this.pcState !== PCState.New) { - // throw new UnexpectedConnectionState( - // 'Expected peer connection to be new on initial connection', - // ); - // } + return new Promise(async (resolve, reject) => { const abortHandler = () => { log.warn('abort transport connection'); @@ -336,11 +326,3 @@ export class PCTransportManager { }); } } - -// function getPCState(pcTransport: PCTransport) { -// return { -// connectionState: pcTransport.getConnectionState(), -// iceState: pcTransport.getICEConnectionState(), -// signallingState: pcTransport.getSignallingState(), -// }; -// } diff --git a/src/room/RTCEngine.ts b/src/room/RTCEngine.ts index 4c97098d95..3626d6dad7 100644 --- a/src/room/RTCEngine.ts +++ b/src/room/RTCEngine.ts @@ -978,41 +978,6 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit throw new UnexpectedConnectionState('PC manager is closed'); } await this.pcManager.ensurePCTransportConnection(abortController, timeout); - // if (this.pcState === PCState.Connected) { - // return; - // } - // if (this.pcState !== PCState.New) { - // throw new UnexpectedConnectionState( - // 'Expected peer connection to be new on initial connection', - // ); - // } - // return new Promise((resolve, reject) => { - // const abortHandler = () => { - // log.warn('closing engine'); - // CriticalTimers.clearTimeout(connectTimeout); - - // reject( - // new ConnectionError( - // 'room connection has been cancelled', - // ConnectionErrorReason.Cancelled, - // ), - // ); - // }; - // if (abortController?.signal.aborted) { - // abortHandler(); - // } - // abortController?.signal.addEventListener('abort', abortHandler); - // const onConnected = () => { - // CriticalTimers.clearTimeout(connectTimeout); - // abortController?.signal.removeEventListener('abort', abortHandler); - // resolve(); - // }; - // const connectTimeout = CriticalTimers.setTimeout(() => { - // this.off(EngineEvent.Connected, onConnected); - // reject(new ConnectionError('could not establish pc connection')); - // }, timeout ?? this.peerConnectionTimeout); - // this.once(EngineEvent.Connected, onConnected); - // }); } private async waitForPCReconnected() { @@ -1140,14 +1105,11 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit } // primary connection if (this.pcManager.currentState !== PCTransportState.CONNECTED) { - console.log('pc man state', this.pcManager.currentState); return false; } // ensure signal is connected if (!this.client.ws || this.client.ws.readyState === WebSocket.CLOSED) { - console.log('ws state', this.client.ws?.readyState); - return false; } return true; From 757ad86525ed0679ee059afb9deb5cf4db3f8d15 Mon Sep 17 00:00:00 2001 From: lukasIO Date: Mon, 13 Nov 2023 15:23:41 +0200 Subject: [PATCH 18/26] more cleanup --- src/room/RTCEngine.ts | 17 +---------------- 1 file changed, 1 insertion(+), 16 deletions(-) diff --git a/src/room/RTCEngine.ts b/src/room/RTCEngine.ts index 3626d6dad7..48f5764140 100644 --- a/src/room/RTCEngine.ts +++ b/src/room/RTCEngine.ts @@ -73,10 +73,6 @@ enum PCState { /** @internal */ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmitter) { - // publisher?: PCTransport; - - // subscriber?: PCTransport; - client: SignalClient; rtcConfig: RTCConfiguration = {}; @@ -110,8 +106,6 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit private subscriberPrimary: boolean = false; - // private primaryTransport?: PCTransport; - private pcState: PCState = PCState.New; private _isClosed: boolean = true; @@ -248,8 +242,6 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit await this.pcManager?.close(); this.pcManager = undefined; - // this.primaryTransport = undefined; - const dcCleanup = (dc: RTCDataChannel | undefined) => { if (!dc) return; dc.close(); @@ -457,12 +449,6 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit private makeRTCConfiguration(serverResponse: JoinResponse | ReconnectResponse): RTCConfiguration { const rtcConfig = { ...this.rtcConfig }; - if (this.signalOpts?.e2eeEnabled) { - log.debug('E2EE - setting up transports with insertable streams'); - // this makes sure that no data is sent before the transforms are ready - // @ts-ignore - rtcConfig.encodedInsertableStreams = true; - } if (this.signalOpts?.e2eeEnabled) { log.debug('E2EE - setting up transports with insertable streams'); @@ -981,8 +967,6 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit } private async waitForPCReconnected() { - // const startTime = Date.now(); - // let now = startTime; this.pcState = PCState.Reconnecting; log.debug('waiting for peer connection to reconnect'); @@ -995,6 +979,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit this.pcState = PCState.Connected; } catch (e: any) { // TODO do we need a `failed` state here for the PC? + this.pcState = PCState.Disconnected; throw new ConnectionError(`could not establish PC connection, ${e.message}`); } } From 86db583c7735f3df56c28153ec1ea66cf11d1e29 Mon Sep 17 00:00:00 2001 From: lukasIO Date: Mon, 13 Nov 2023 15:47:53 +0200 Subject: [PATCH 19/26] pass state on transport callbacks --- src/room/PCTransport.ts | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/room/PCTransport.ts b/src/room/PCTransport.ts index 5ec4a2d0ce..277709b382 100644 --- a/src/room/PCTransport.ts +++ b/src/room/PCTransport.ts @@ -63,9 +63,9 @@ export default class PCTransport extends EventEmitter { onConnectionStateChange?: (state: RTCPeerConnectionState) => void; - onIceConnectionStateChange?: () => void; + onIceConnectionStateChange?: (state: RTCIceConnectionState) => void; - onSignalingStatechange?: () => void; + onSignalingStatechange?: (state: RTCSignalingState) => void; onDataChannel?: (ev: RTCDataChannelEvent) => void; @@ -93,15 +93,15 @@ export default class PCTransport extends EventEmitter { }; pc.oniceconnectionstatechange = () => { - this.onIceConnectionStateChange?.(); + this.onIceConnectionStateChange?.(pc.iceConnectionState); }; pc.onsignalingstatechange = () => { - this.onSignalingStatechange?.(); + this.onSignalingStatechange?.(pc.signalingState); }; pc.onconnectionstatechange = () => { - this.onConnectionStateChange?.(this._pc?.connectionState ?? 'closed'); + this.onConnectionStateChange?.(pc.connectionState); }; pc.ondatachannel = (ev) => { this.onDataChannel?.(ev); From a83e3d7b11897fefc648d54c6af90cd0cf767d9a Mon Sep 17 00:00:00 2001 From: lukasIO Date: Mon, 13 Nov 2023 15:55:27 +0200 Subject: [PATCH 20/26] Log syncState failure --- src/room/Room.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/src/room/Room.ts b/src/room/Room.ts index f4ea5a927e..b5ae2ec24c 100644 --- a/src/room/Room.ts +++ b/src/room/Room.ts @@ -1552,6 +1552,7 @@ class Room extends (EventEmitter as new () => TypedEmitter) private sendSyncState() { if (!this.engine.pcManager) { + log.warn('sync state cannot be sent without peer connection setup'); return; } const previousAnswer = this.engine.pcManager.subscriber.getLocalDescription(); From bd3c93e8de891e3241a5df4f3a446f398feecf4f Mon Sep 17 00:00:00 2001 From: lukasIO Date: Mon, 13 Nov 2023 15:58:07 +0200 Subject: [PATCH 21/26] More explicit method naming --- src/room/PCTransportManager.ts | 14 +++++++------- src/room/RTCEngine.ts | 14 +++++++------- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/src/room/PCTransportManager.ts b/src/room/PCTransportManager.ts index 1326077239..861d19fd23 100644 --- a/src/room/PCTransportManager.ts +++ b/src/room/PCTransportManager.ts @@ -101,11 +101,11 @@ export class PCTransportManager { this.updateState(); } - createAndSendOffer(options?: RTCOfferOptions) { + createAndSendPublisherOffer(options?: RTCOfferOptions) { return this.publisher.createAndSendOffer(options); } - setAnswer(sd: RTCSessionDescriptionInit) { + setPublisherAnswer(sd: RTCSessionDescriptionInit) { return this.publisher.setRemoteDescription(sd); } @@ -135,7 +135,7 @@ export class PCTransportManager { this.subscriber.restartingIce = true; // only restart publisher if it's needed if (this.needsPublisher) { - await this.createAndSendOffer({ iceRestart: true }); + await this.createAndSendPublisherOffer({ iceRestart: true }); } } @@ -147,7 +147,7 @@ export class PCTransportManager { } } - async createAnswerFromOffer(sd: RTCSessionDescriptionInit) { + async createSubscriberAnswerFromOffer(sd: RTCSessionDescriptionInit) { log.debug('received server offer', { RTCSdpType: sd.type, signalingState: this.subscriber.getSignallingState().toString(), @@ -217,15 +217,15 @@ export class PCTransportManager { }); } - addTransceiver(track: MediaStreamTrack, transceiverInit: RTCRtpTransceiverInit) { + addPublisherTransceiver(track: MediaStreamTrack, transceiverInit: RTCRtpTransceiverInit) { return this.publisher.addTransceiver(track, transceiverInit); } - addTrack(track: MediaStreamTrack) { + addPublisherTrack(track: MediaStreamTrack) { return this.publisher.addTrack(track); } - createDataChannel(label: string, dataChannelDict: RTCDataChannelInit) { + createPublisherDataChannel(label: string, dataChannelDict: RTCDataChannelInit) { return this.publisher.createDataChannel(label, dataChannelDict); } diff --git a/src/room/RTCEngine.ts b/src/room/RTCEngine.ts index 48f5764140..c4427071ec 100644 --- a/src/room/RTCEngine.ts +++ b/src/room/RTCEngine.ts @@ -394,7 +394,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit log.debug('received server answer', { RTCSdpType: sd.type, }); - await this.pcManager.setAnswer(sd); + await this.pcManager.setPublisherAnswer(sd); }; // add candidate on trickle @@ -411,7 +411,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit if (!this.pcManager) { return; } - const answer = await this.pcManager.createAnswerFromOffer(sd); + const answer = await this.pcManager.createSubscriberAnswerFromOffer(sd); this.client.sendAnswer(answer); }; @@ -504,12 +504,12 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit } // create data channels - this.lossyDC = this.pcManager.createDataChannel(lossyDataChannel, { + this.lossyDC = this.pcManager.createPublisherDataChannel(lossyDataChannel, { // will drop older packets that arrive ordered: true, maxRetransmits: 0, }); - this.reliableDC = this.pcManager.createDataChannel(reliableDataChannel, { + this.reliableDC = this.pcManager.createPublisherDataChannel(reliableDataChannel, { ordered: true, }); @@ -690,7 +690,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit transceiverInit.sendEncodings = encodings; } // addTransceiver for react-native is async. web is synchronous, but await won't effect it. - const transceiver = await this.pcManager.addTransceiver( + const transceiver = await this.pcManager.addPublisherTransceiver( track.mediaStreamTrack, transceiverInit, ); @@ -716,7 +716,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit transceiverInit.sendEncodings = encodings; } // addTransceiver for react-native is async. web is synchronous, but await won't effect it. - const transceiver = await this.pcManager.addTransceiver( + const transceiver = await this.pcManager.addPublisherTransceiver( simulcastTrack.mediaStreamTrack, transceiverInit, ); @@ -732,7 +732,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit if (!this.pcManager) { throw new UnexpectedConnectionState('publisher is closed'); } - return this.pcManager.addTrack(track); + return this.pcManager.addPublisherTrack(track); } // websocket reconnect behavior. if websocket is interrupted, and the PeerConnection From 543446eca5b70b5133e7d4159484b4fb70194ef2 Mon Sep 17 00:00:00 2001 From: lukasIO Date: Mon, 13 Nov 2023 16:00:55 +0200 Subject: [PATCH 22/26] make getConnectedAddress configurable --- src/room/PCTransportManager.ts | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/room/PCTransportManager.ts b/src/room/PCTransportManager.ts index 861d19fd23..47ba1aa87f 100644 --- a/src/room/PCTransportManager.ts +++ b/src/room/PCTransportManager.ts @@ -229,7 +229,15 @@ export class PCTransportManager { return this.publisher.createDataChannel(label, dataChannelDict); } - getConnectedAddress() { + /** + * Returns the first required transport's address if no explicit target is specified + */ + getConnectedAddress(target?: SignalTarget) { + if (target === SignalTarget.PUBLISHER) { + return this.publisher.getConnectedAddress(); + } else if (target === SignalTarget.SUBSCRIBER) { + return this.publisher.getConnectedAddress(); + } return this.requiredTransports[0].getConnectedAddress(); } From 17fa81b50f22f1454b36593ffe06e2383e781e72 Mon Sep 17 00:00:00 2001 From: lukasIO Date: Mon, 13 Nov 2023 16:03:17 +0200 Subject: [PATCH 23/26] Rename onLocalOffer to onPublisherOffer --- src/room/PCTransportManager.ts | 4 ++-- src/room/RTCEngine.ts | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/room/PCTransportManager.ts b/src/room/PCTransportManager.ts index 47ba1aa87f..317957a2bd 100644 --- a/src/room/PCTransportManager.ts +++ b/src/room/PCTransportManager.ts @@ -46,7 +46,7 @@ export class PCTransportManager { public onTrack?: (ev: RTCTrackEvent) => void; - public onLocalOffer?: (offer: RTCSessionDescriptionInit) => void; + public onPublisherOffer?: (offer: RTCSessionDescriptionInit) => void; private isPublisherConnectionRequired: boolean; @@ -83,7 +83,7 @@ export class PCTransportManager { this.onTrack?.(ev); }; this.publisher.onOffer = (offer) => { - this.onLocalOffer?.(offer); + this.onPublisherOffer?.(offer); }; this.state = PCTransportState.NEW; diff --git a/src/room/RTCEngine.ts b/src/room/RTCEngine.ts index c4427071ec..620115674c 100644 --- a/src/room/RTCEngine.ts +++ b/src/room/RTCEngine.ts @@ -351,7 +351,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit this.client.sendIceCandidate(candidate, target); }; - this.pcManager.onLocalOffer = (offer) => { + this.pcManager.onPublisherOffer = (offer) => { this.client.sendOffer(offer); }; From 06edd10c3efb7fbde158b4155ecac66b23ab18c4 Mon Sep 17 00:00:00 2001 From: lukasIO Date: Mon, 13 Nov 2023 16:53:10 +0200 Subject: [PATCH 24/26] Send sync state also for publisher only --- src/room/Room.ts | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/src/room/Room.ts b/src/room/Room.ts index b5ae2ec24c..9b5044b9a8 100644 --- a/src/room/Room.ts +++ b/src/room/Room.ts @@ -1558,10 +1558,6 @@ class Room extends (EventEmitter as new () => TypedEmitter) const previousAnswer = this.engine.pcManager.subscriber.getLocalDescription(); const previousOffer = this.engine.pcManager.subscriber.getRemoteDescription(); - if (!previousAnswer) { - return; - } - /* 1. autosubscribe on, so subscribed tracks = all tracks - unsub tracks, in this case, we send unsub tracks, so server add all tracks to this subscribe pc and unsub special tracks from it. @@ -1579,10 +1575,12 @@ class Room extends (EventEmitter as new () => TypedEmitter) this.engine.client.sendSyncState( new SyncState({ - answer: toProtoSessionDescription({ - sdp: previousAnswer.sdp, - type: previousAnswer.type, - }), + answer: previousAnswer + ? toProtoSessionDescription({ + sdp: previousAnswer.sdp, + type: previousAnswer.type, + }) + : undefined, offer: previousOffer ? toProtoSessionDescription({ sdp: previousOffer.sdp, From f57d7c1bf68496d60afbf8184a49fc8f8258162b Mon Sep 17 00:00:00 2001 From: lukasIO Date: Mon, 20 Nov 2023 10:35:24 +0200 Subject: [PATCH 25/26] move syncState into engine --- src/room/RTCEngine.ts | 97 +++++++++++++++++++++--- src/room/Room.ts | 54 ++----------- src/room/participant/LocalParticipant.ts | 43 ----------- src/room/track/utils.ts | 19 +++++ 4 files changed, 111 insertions(+), 102 deletions(-) diff --git a/src/room/RTCEngine.ts b/src/room/RTCEngine.ts index 620115674c..4c323fad01 100644 --- a/src/room/RTCEngine.ts +++ b/src/room/RTCEngine.ts @@ -2,7 +2,7 @@ import { EventEmitter } from 'events'; import type { MediaAttributes } from 'sdp-transform'; import type TypedEventEmitter from 'typed-emitter'; import type { SignalOptions } from '../api/SignalClient'; -import { SignalClient } from '../api/SignalClient'; +import { SignalClient, toProtoSessionDescription } from '../api/SignalClient'; import log from '../logger'; import type { InternalRoomOptions } from '../options'; import { @@ -18,16 +18,20 @@ import { TrackInfo, UserPacket, } from '../proto/livekit_models_pb'; -import type { - AddTrackRequest, - ConnectionQualityUpdate, - JoinResponse, - LeaveRequest, - ReconnectResponse, - StreamStateUpdate, - SubscriptionPermissionUpdate, - SubscriptionResponse, - TrackPublishedResponse, +import { + type AddTrackRequest, + type ConnectionQualityUpdate, + DataChannelInfo, + type JoinResponse, + type LeaveRequest, + type ReconnectResponse, + SignalTarget, + type StreamStateUpdate, + type SubscriptionPermissionUpdate, + type SubscriptionResponse, + SyncState, + type TrackPublishedResponse, + UpdateSubscription, } from '../proto/livekit_rtc_pb'; import PCTransport, { PCEvents } from './PCTransport'; import { PCTransportManager, PCTransportState } from './PCTransportManager'; @@ -44,10 +48,13 @@ import { import { EngineEvent } from './events'; import CriticalTimers from './timers'; import type LocalTrack from './track/LocalTrack'; +import type LocalTrackPublication from './track/LocalTrackPublication'; import type LocalVideoTrack from './track/LocalVideoTrack'; import type { SimulcastTrackInfo } from './track/LocalVideoTrack'; +import type RemoteTrackPublication from './track/RemoteTrackPublication'; import { Track } from './track/Track'; import type { TrackPublishOptions, VideoCodec } from './track/options'; +import { getTrackPublicationInfo } from './track/utils'; import { Mutex, isVideoCodec, @@ -1172,12 +1179,80 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit } } + /** @internal */ + sendSyncState(remoteTracks: RemoteTrackPublication[], localTracks: LocalTrackPublication[]) { + if (!this.pcManager) { + log.warn('sync state cannot be sent without peer connection setup'); + return; + } + const previousAnswer = this.pcManager.subscriber.getLocalDescription(); + const previousOffer = this.pcManager.subscriber.getRemoteDescription(); + + /* 1. autosubscribe on, so subscribed tracks = all tracks - unsub tracks, + in this case, we send unsub tracks, so server add all tracks to this + subscribe pc and unsub special tracks from it. + 2. autosubscribe off, we send subscribed tracks. + */ + const autoSubscribe = this.signalOpts?.autoSubscribe ?? true; + const trackSids = new Array(); + + remoteTracks.forEach((track) => { + if (track.isDesired !== autoSubscribe) { + trackSids.push(track.trackSid); + } + }); + + this.client.sendSyncState( + new SyncState({ + answer: previousAnswer + ? toProtoSessionDescription({ + sdp: previousAnswer.sdp, + type: previousAnswer.type, + }) + : undefined, + offer: previousOffer + ? toProtoSessionDescription({ + sdp: previousOffer.sdp, + type: previousOffer.type, + }) + : undefined, + subscription: new UpdateSubscription({ + trackSids, + subscribe: !autoSubscribe, + participantTracks: [], + }), + publishTracks: getTrackPublicationInfo(localTracks), + dataChannels: this.dataChannelsInfo(), + }), + ); + } + /* @internal */ failNext() { // debugging method to fail the next reconnect/resume attempt this.shouldFailNext = true; } + private dataChannelsInfo(): DataChannelInfo[] { + const infos: DataChannelInfo[] = []; + const getInfo = (dc: RTCDataChannel | undefined, target: SignalTarget) => { + if (dc?.id !== undefined && dc.id !== null) { + infos.push( + new DataChannelInfo({ + label: dc.label, + id: dc.id, + target, + }), + ); + } + }; + getInfo(this.dataChannelForKind(DataPacket_Kind.LOSSY), SignalTarget.PUBLISHER); + getInfo(this.dataChannelForKind(DataPacket_Kind.RELIABLE), SignalTarget.PUBLISHER); + getInfo(this.dataChannelForKind(DataPacket_Kind.LOSSY, true), SignalTarget.SUBSCRIBER); + getInfo(this.dataChannelForKind(DataPacket_Kind.RELIABLE, true), SignalTarget.SUBSCRIBER); + return infos; + } + private clearReconnectTimeout() { if (this.reconnectTimeout) { CriticalTimers.clearTimeout(this.reconnectTimeout); diff --git a/src/room/Room.ts b/src/room/Room.ts index 9b5044b9a8..033670a2c7 100644 --- a/src/room/Room.ts +++ b/src/room/Room.ts @@ -2,7 +2,6 @@ import { protoInt64 } from '@bufbuild/protobuf'; import { EventEmitter } from 'events'; import type TypedEmitter from 'typed-emitter'; import 'webrtc-adapter'; -import { toProtoSessionDescription } from '../api/SignalClient'; import { EncryptionEvent } from '../e2ee'; import { E2EEManager } from '../e2ee/E2eeManager'; import log from '../logger'; @@ -35,8 +34,6 @@ import { StreamStateUpdate, SubscriptionPermissionUpdate, SubscriptionResponse, - SyncState, - UpdateSubscription, } from '../proto/livekit_rtc_pb'; import { getBrowser } from '../utils/browserParser'; import DeviceManager from './DeviceManager'; @@ -1551,51 +1548,12 @@ class Room extends (EventEmitter as new () => TypedEmitter) } private sendSyncState() { - if (!this.engine.pcManager) { - log.warn('sync state cannot be sent without peer connection setup'); - return; - } - const previousAnswer = this.engine.pcManager.subscriber.getLocalDescription(); - const previousOffer = this.engine.pcManager.subscriber.getRemoteDescription(); - - /* 1. autosubscribe on, so subscribed tracks = all tracks - unsub tracks, - in this case, we send unsub tracks, so server add all tracks to this - subscribe pc and unsub special tracks from it. - 2. autosubscribe off, we send subscribed tracks. - */ - const autoSubscribe = this.connOptions?.autoSubscribe ?? true; - const trackSids = new Array(); - this.participants.forEach((participant) => { - participant.tracks.forEach((track) => { - if (track.isDesired !== autoSubscribe) { - trackSids.push(track.trackSid); - } - }); - }); - - this.engine.client.sendSyncState( - new SyncState({ - answer: previousAnswer - ? toProtoSessionDescription({ - sdp: previousAnswer.sdp, - type: previousAnswer.type, - }) - : undefined, - offer: previousOffer - ? toProtoSessionDescription({ - sdp: previousOffer.sdp, - type: previousOffer.type, - }) - : undefined, - subscription: new UpdateSubscription({ - trackSids, - subscribe: !autoSubscribe, - participantTracks: [], - }), - publishTracks: this.localParticipant.publishedTracksInfo(), - dataChannels: this.localParticipant.dataChannelsInfo(), - }), - ); + const remoteTracks = Array.from(this.participants.values()).reduce((acc, participant) => { + acc.push(...(participant.getTracks() as RemoteTrackPublication[])); // FIXME would be nice to have this return RemoteTrackPublications directly instead of the type cast + return acc; + }, [] as RemoteTrackPublication[]); + const localTracks = this.localParticipant.getTracks() as LocalTrackPublication[]; // FIXME would be nice to have this return LocalTrackPublications directly instead of the type cast + this.engine.sendSyncState(remoteTracks, localTracks); } /** diff --git a/src/room/participant/LocalParticipant.ts b/src/room/participant/LocalParticipant.ts index d1acc4cf00..3ede1a594e 100644 --- a/src/room/participant/LocalParticipant.ts +++ b/src/room/participant/LocalParticipant.ts @@ -10,11 +10,8 @@ import { } from '../../proto/livekit_models_pb'; import { AddTrackRequest, - DataChannelInfo, - SignalTarget, SimulcastCodec, SubscribedQualityUpdate, - TrackPublishedResponse, TrackUnpublishedResponse, } from '../../proto/livekit_rtc_pb'; import { PCTransportState } from '../PCTransportManager'; @@ -1311,44 +1308,4 @@ export default class LocalParticipant extends Participant { }); return publication; } - - /** @internal */ - publishedTracksInfo(): TrackPublishedResponse[] { - const infos: TrackPublishedResponse[] = []; - this.tracks.forEach((track: LocalTrackPublication) => { - if (track.track !== undefined) { - infos.push( - new TrackPublishedResponse({ - cid: track.track.mediaStreamID, - track: track.trackInfo, - }), - ); - } - }); - return infos; - } - - /** @internal */ - dataChannelsInfo(): DataChannelInfo[] { - const infos: DataChannelInfo[] = []; - const getInfo = (dc: RTCDataChannel | undefined, target: SignalTarget) => { - if (dc?.id !== undefined && dc.id !== null) { - infos.push( - new DataChannelInfo({ - label: dc.label, - id: dc.id, - target, - }), - ); - } - }; - getInfo(this.engine.dataChannelForKind(DataPacket_Kind.LOSSY), SignalTarget.PUBLISHER); - getInfo(this.engine.dataChannelForKind(DataPacket_Kind.RELIABLE), SignalTarget.PUBLISHER); - getInfo(this.engine.dataChannelForKind(DataPacket_Kind.LOSSY, true), SignalTarget.SUBSCRIBER); - getInfo( - this.engine.dataChannelForKind(DataPacket_Kind.RELIABLE, true), - SignalTarget.SUBSCRIBER, - ); - return infos; - } } diff --git a/src/room/track/utils.ts b/src/room/track/utils.ts index 5854ba560c..c17be4fcf0 100644 --- a/src/room/track/utils.ts +++ b/src/room/track/utils.ts @@ -1,6 +1,8 @@ +import { TrackPublishedResponse } from '../../proto/livekit_rtc_pb'; import { cloneDeep } from '../../utils/cloneDeep'; import { isSafari, sleep } from '../utils'; import { Track } from './Track'; +import type { TrackPublication } from './TrackPublication'; import { type AudioCaptureOptions, type CreateLocalTracksOptions, @@ -190,3 +192,20 @@ export function mimeTypeToVideoCodecString(mimeType: string) { } return codec; } + +export function getTrackPublicationInfo( + tracks: T[], +): TrackPublishedResponse[] { + const infos: TrackPublishedResponse[] = []; + tracks.forEach((track: TrackPublication) => { + if (track.track !== undefined) { + infos.push( + new TrackPublishedResponse({ + cid: track.track.mediaStreamID, + track: track.trackInfo, + }), + ); + } + }); + return infos; +} From d857e98285062b77b21478ce1a47a947b4907ee4 Mon Sep 17 00:00:00 2001 From: lukasIO Date: Tue, 21 Nov 2023 16:25:22 +0200 Subject: [PATCH 26/26] Create fuzzy-waves-knock.md --- .changeset/fuzzy-waves-knock.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/fuzzy-waves-knock.md diff --git a/.changeset/fuzzy-waves-knock.md b/.changeset/fuzzy-waves-knock.md new file mode 100644 index 0000000000..364aa4d6c2 --- /dev/null +++ b/.changeset/fuzzy-waves-knock.md @@ -0,0 +1,5 @@ +--- +"livekit-client": patch +--- + +Move PeerConnection logic into PCTransportManager