From 7ba3e85ac3a98328186e8b3598a7eaa5e2de9589 Mon Sep 17 00:00:00 2001 From: lukasIO Date: Mon, 6 Nov 2023 16:20:23 +0200 Subject: [PATCH] negotiation in pc manager --- src/room/PCTransportManager.ts | 50 ++++++++++++++++-- src/room/RTCEngine.ts | 67 +++++++++++------------- src/room/Room.ts | 7 ++- src/room/participant/LocalParticipant.ts | 5 +- 4 files changed, 85 insertions(+), 44 deletions(-) diff --git a/src/room/PCTransportManager.ts b/src/room/PCTransportManager.ts index 6be5da144f..32680a0f87 100644 --- a/src/room/PCTransportManager.ts +++ b/src/room/PCTransportManager.ts @@ -1,16 +1,18 @@ import log from '../logger'; import { SignalTarget } from '../proto/livekit_rtc_pb'; -import PCTransport from './PCTransport'; +import PCTransport, { PCEvents } from './PCTransport'; import { roomConnectOptionDefaults } from './defaults'; import { ConnectionError, ConnectionErrorReason } from './errors'; import CriticalTimers from './timers'; import { Mutex, sleep } from './utils'; export enum PCTransportState { - DISCONNECTED, + NEW, CONNECTING, CONNECTED, FAILED, + CLOSING, + CLOSED, } export class PCTransportManager { @@ -84,7 +86,7 @@ export class PCTransportManager { this.onLocalOffer?.(offer); }; - this.state = PCTransportState.DISCONNECTED; + this.state = PCTransportState.NEW; this.connectionLock = new Mutex(); } @@ -103,6 +105,10 @@ export class PCTransportManager { return this.publisher.createAndSendOffer(options); } + setAnswer(sd: RTCSessionDescriptionInit) { + return this.publisher.setRemoteDescription(sd); + } + removeTrack(sender: RTCRtpSender) { return this.publisher.removeTrack(sender); } @@ -189,6 +195,38 @@ export class PCTransportManager { } } + async negotiate(abortController: AbortController) { + console.log('negotiation requested'); + return new Promise(async (resolve, reject) => { + const negotiationTimeout = setTimeout(() => { + reject('negotiation timed out'); + }, this.peerConnectionTimeout); + + const abortHandler = () => { + clearTimeout(negotiationTimeout); + reject('negotiation aborted'); + }; + + abortController.signal.addEventListener('abort', abortHandler); + this.publisher.once(PCEvents.NegotiationStarted, () => { + console.log('negotiation started'); + if (abortController.signal.aborted) { + return; + } + this.publisher.once(PCEvents.NegotiationComplete, () => { + console.log('negotiation complete'); + clearTimeout(negotiationTimeout); + resolve(); + }); + }); + + await this.publisher.negotiate((e) => { + clearTimeout(negotiationTimeout); + reject(e); + }); + }); + } + addTransceiver(track: MediaStreamTrack, transceiverInit: RTCRtpTransceiverInit) { return this.publisher.addTransceiver(track, transceiverInit); } @@ -227,9 +265,11 @@ export class PCTransportManager { } else if (connectionStates.some((st) => st === 'connecting')) { this.state = PCTransportState.CONNECTING; } else if (connectionStates.every((st) => st === 'closed')) { - this.state = PCTransportState.DISCONNECTED; + this.state = PCTransportState.CLOSED; + } else if (connectionStates.some((st) => st === 'closed')) { + this.state = PCTransportState.CLOSING; } else if (connectionStates.every((st) => st === 'new')) { - this.state = PCTransportState.DISCONNECTED; + this.state = PCTransportState.NEW; } log.info(`pc state: ${PCTransportState[this.state]}`, { publisher: this.publisher.getConnectionState(), diff --git a/src/room/RTCEngine.ts b/src/room/RTCEngine.ts index f2a3cc0e1c..4c97098d95 100644 --- a/src/room/RTCEngine.ts +++ b/src/room/RTCEngine.ts @@ -246,6 +246,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit async cleanupPeerConnections() { await this.pcManager?.close(); + this.pcManager = undefined; // this.primaryTransport = undefined; @@ -342,7 +343,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit private async configure(joinResponse: JoinResponse) { // already configured - if (this.pcManager && this.pcManager.currentState !== PCTransportState.DISCONNECTED) { + if (this.pcManager && this.pcManager.currentState !== PCTransportState.NEW) { return; } @@ -400,9 +401,8 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit } log.debug('received server answer', { RTCSdpType: sd.type, - signalingState: this.pcManager.publisher.getSignallingState().toString(), }); - await this.pcManager.publisher.setRemoteDescription(sd); + await this.pcManager.setAnswer(sd); }; // add candidate on trickle @@ -811,8 +811,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit this.clientConfiguration?.resumeConnection === ClientConfigSetting.DISABLED || // signaling state could change to closed due to hardware sleep // those connections cannot be resumed - (this.pcManager?.currentState ?? PCTransportState.DISCONNECTED) === - PCTransportState.DISCONNECTED + (this.pcManager?.currentState ?? PCTransportState.NEW) === PCTransportState.NEW ) { this.fullReconnectOnNext = true; } @@ -975,7 +974,10 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit } async waitForPCInitialConnection(timeout?: number, abortController?: AbortController) { - await this.pcManager?.ensurePCTransportConnection(abortController, timeout); + if (!this.pcManager) { + throw new UnexpectedConnectionState('PC manager is closed'); + } + await this.pcManager.ensurePCTransportConnection(abortController, timeout); // if (this.pcState === PCState.Connected) { // return; // } @@ -1021,7 +1023,10 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit log.debug('waiting for peer connection to reconnect'); try { await sleep(minReconnectWait); // FIXME setTimeout again not ideal for a connection critical path - await this.pcManager?.ensurePCTransportConnection(undefined, this.peerConnectionTimeout); + if (!this.pcManager) { + throw new UnexpectedConnectionState('PC manager is closed'); + } + await this.pcManager.ensurePCTransportConnection(undefined, this.peerConnectionTimeout); this.pcState = PCState.Connected; } catch (e: any) { // TODO do we need a `failed` state here for the PC? @@ -1084,7 +1089,10 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit kind: DataPacket_Kind, subscriber: boolean = this.subscriberPrimary, ) { - const transport = subscriber ? this.pcManager?.subscriber : this.pcManager?.publisher; + if (!this.pcManager) { + throw new UnexpectedConnectionState('PC manager is closed'); + } + const transport = subscriber ? this.pcManager.subscriber : this.pcManager.publisher; const transportName = subscriber ? 'Subscriber' : 'Publisher'; if (!transport) { throw new ConnectionError(`${transportName} connection not set`); @@ -1092,8 +1100,8 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit if ( !subscriber && - !this.pcManager?.publisher.isICEConnected && - this.pcManager?.publisher.getICEConnectionState() !== 'checking' + !this.pcManager.publisher.isICEConnected && + this.pcManager.publisher.getICEConnectionState() !== 'checking' ) { // start negotiation this.negotiate(); @@ -1146,19 +1154,21 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit } /** @internal */ - negotiate(): Promise { + async negotiate(): Promise { // observe signal state - return new Promise((resolve, reject) => { + return new Promise(async (resolve, reject) => { if (!this.pcManager) { - reject(new NegotiationError('pc manager is not defined')); + reject(new NegotiationError('PC manager is closed')); return; } this.pcManager.requirePublisher(); + const abortController = new AbortController(); + const handleClosed = () => { + abortController.abort(); log.debug('engine disconnected while negotiation was ongoing'); - cleanup(); resolve(); return; }; @@ -1168,23 +1178,6 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit } this.on(EngineEvent.Closing, handleClosed); - const negotiationTimeout = setTimeout(() => { - reject('negotiation timed out'); - this.handleDisconnect('negotiation', ReconnectReason.RR_SIGNAL_DISCONNECTED); - }, this.peerConnectionTimeout); - - const cleanup = () => { - clearTimeout(negotiationTimeout); - this.off(EngineEvent.Closing, handleClosed); - }; - - this.pcManager.publisher.once(PCEvents.NegotiationStarted, () => { - this.pcManager?.publisher?.once(PCEvents.NegotiationComplete, () => { - cleanup(); - resolve(); - }); - }); - this.pcManager.publisher.once( PCEvents.RTPVideoPayloadTypes, (rtpTypes: MediaAttributes['rtp']) => { @@ -1199,14 +1192,18 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit }, ); - this.pcManager.publisher.negotiate((e) => { - cleanup(); - reject(e); + try { + await this.pcManager.negotiate(abortController); + resolve(); + } catch (e: any) { if (e instanceof NegotiationError) { this.fullReconnectOnNext = true; } this.handleDisconnect('negotiation', ReconnectReason.RR_UNKNOWN); - }); + reject(e); + } finally { + this.off(EngineEvent.Closing, handleClosed); + } }); } diff --git a/src/room/Room.ts b/src/room/Room.ts index eb2b8ff61e..f4ea5a927e 100644 --- a/src/room/Room.ts +++ b/src/room/Room.ts @@ -1551,8 +1551,11 @@ class Room extends (EventEmitter as new () => TypedEmitter) } private sendSyncState() { - const previousAnswer = this.engine.pcManager?.subscriber.getLocalDescription(); - const previousOffer = this.engine.pcManager?.subscriber.getRemoteDescription(); + if (!this.engine.pcManager) { + return; + } + const previousAnswer = this.engine.pcManager.subscriber.getLocalDescription(); + const previousOffer = this.engine.pcManager.subscriber.getRemoteDescription(); if (!previousAnswer) { return; diff --git a/src/room/participant/LocalParticipant.ts b/src/room/participant/LocalParticipant.ts index f4605cc800..d1acc4cf00 100644 --- a/src/room/participant/LocalParticipant.ts +++ b/src/room/participant/LocalParticipant.ts @@ -17,6 +17,7 @@ import { TrackPublishedResponse, TrackUnpublishedResponse, } from '../../proto/livekit_rtc_pb'; +import { PCTransportState } from '../PCTransportManager'; import type RTCEngine from '../RTCEngine'; import { defaultVideoCodec } from '../defaults'; import { DeviceUnsupportedError, TrackInvalidError, UnexpectedConnectionState } from '../errors'; @@ -929,8 +930,8 @@ export default class LocalParticipant extends Participant { const trackSender = track.sender; track.sender = undefined; if ( - this.engine.pcManager?.publisher && - this.engine.pcManager?.publisher.getConnectionState() !== 'closed' && + this.engine.pcManager && + this.engine.pcManager.currentState < PCTransportState.FAILED && trackSender ) { try {