Skip to content

Commit

Permalink
negotiation in pc manager
Browse files Browse the repository at this point in the history
  • Loading branch information
lukasIO committed Nov 6, 2023
1 parent f47326b commit 7ba3e85
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 44 deletions.
50 changes: 45 additions & 5 deletions src/room/PCTransportManager.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
import log from '../logger';
import { SignalTarget } from '../proto/livekit_rtc_pb';
import PCTransport from './PCTransport';
import PCTransport, { PCEvents } from './PCTransport';
import { roomConnectOptionDefaults } from './defaults';
import { ConnectionError, ConnectionErrorReason } from './errors';
import CriticalTimers from './timers';
import { Mutex, sleep } from './utils';

export enum PCTransportState {
DISCONNECTED,
NEW,
CONNECTING,
CONNECTED,
FAILED,
CLOSING,
CLOSED,
}

export class PCTransportManager {
Expand Down Expand Up @@ -84,7 +86,7 @@ export class PCTransportManager {
this.onLocalOffer?.(offer);
};

this.state = PCTransportState.DISCONNECTED;
this.state = PCTransportState.NEW;

this.connectionLock = new Mutex();
}
Expand All @@ -103,6 +105,10 @@ export class PCTransportManager {
return this.publisher.createAndSendOffer(options);
}

setAnswer(sd: RTCSessionDescriptionInit) {
return this.publisher.setRemoteDescription(sd);
}

removeTrack(sender: RTCRtpSender) {
return this.publisher.removeTrack(sender);
}
Expand Down Expand Up @@ -189,6 +195,38 @@ export class PCTransportManager {
}
}

async negotiate(abortController: AbortController) {
console.log('negotiation requested');
return new Promise<void>(async (resolve, reject) => {
const negotiationTimeout = setTimeout(() => {
reject('negotiation timed out');
}, this.peerConnectionTimeout);

const abortHandler = () => {
clearTimeout(negotiationTimeout);
reject('negotiation aborted');
};

abortController.signal.addEventListener('abort', abortHandler);
this.publisher.once(PCEvents.NegotiationStarted, () => {
console.log('negotiation started');
if (abortController.signal.aborted) {
return;
}
this.publisher.once(PCEvents.NegotiationComplete, () => {
console.log('negotiation complete');
clearTimeout(negotiationTimeout);
resolve();
});
});

await this.publisher.negotiate((e) => {
clearTimeout(negotiationTimeout);
reject(e);
});
});
}

addTransceiver(track: MediaStreamTrack, transceiverInit: RTCRtpTransceiverInit) {
return this.publisher.addTransceiver(track, transceiverInit);
}
Expand Down Expand Up @@ -227,9 +265,11 @@ export class PCTransportManager {
} else if (connectionStates.some((st) => st === 'connecting')) {
this.state = PCTransportState.CONNECTING;
} else if (connectionStates.every((st) => st === 'closed')) {
this.state = PCTransportState.DISCONNECTED;
this.state = PCTransportState.CLOSED;
} else if (connectionStates.some((st) => st === 'closed')) {
this.state = PCTransportState.CLOSING;
} else if (connectionStates.every((st) => st === 'new')) {
this.state = PCTransportState.DISCONNECTED;
this.state = PCTransportState.NEW;
}
log.info(`pc state: ${PCTransportState[this.state]}`, {
publisher: this.publisher.getConnectionState(),
Expand Down
67 changes: 32 additions & 35 deletions src/room/RTCEngine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit

async cleanupPeerConnections() {
await this.pcManager?.close();
this.pcManager = undefined;

// this.primaryTransport = undefined;

Expand Down Expand Up @@ -342,7 +343,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit

private async configure(joinResponse: JoinResponse) {
// already configured
if (this.pcManager && this.pcManager.currentState !== PCTransportState.DISCONNECTED) {
if (this.pcManager && this.pcManager.currentState !== PCTransportState.NEW) {
return;
}

Expand Down Expand Up @@ -400,9 +401,8 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
}
log.debug('received server answer', {
RTCSdpType: sd.type,
signalingState: this.pcManager.publisher.getSignallingState().toString(),
});
await this.pcManager.publisher.setRemoteDescription(sd);
await this.pcManager.setAnswer(sd);
};

// add candidate on trickle
Expand Down Expand Up @@ -811,8 +811,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
this.clientConfiguration?.resumeConnection === ClientConfigSetting.DISABLED ||
// signaling state could change to closed due to hardware sleep
// those connections cannot be resumed
(this.pcManager?.currentState ?? PCTransportState.DISCONNECTED) ===
PCTransportState.DISCONNECTED
(this.pcManager?.currentState ?? PCTransportState.NEW) === PCTransportState.NEW
) {
this.fullReconnectOnNext = true;
}
Expand Down Expand Up @@ -975,7 +974,10 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
}

async waitForPCInitialConnection(timeout?: number, abortController?: AbortController) {
await this.pcManager?.ensurePCTransportConnection(abortController, timeout);
if (!this.pcManager) {
throw new UnexpectedConnectionState('PC manager is closed');
}
await this.pcManager.ensurePCTransportConnection(abortController, timeout);
// if (this.pcState === PCState.Connected) {
// return;
// }
Expand Down Expand Up @@ -1021,7 +1023,10 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
log.debug('waiting for peer connection to reconnect');
try {
await sleep(minReconnectWait); // FIXME setTimeout again not ideal for a connection critical path
await this.pcManager?.ensurePCTransportConnection(undefined, this.peerConnectionTimeout);
if (!this.pcManager) {
throw new UnexpectedConnectionState('PC manager is closed');
}
await this.pcManager.ensurePCTransportConnection(undefined, this.peerConnectionTimeout);
this.pcState = PCState.Connected;
} catch (e: any) {
// TODO do we need a `failed` state here for the PC?
Expand Down Expand Up @@ -1084,16 +1089,19 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
kind: DataPacket_Kind,
subscriber: boolean = this.subscriberPrimary,
) {
const transport = subscriber ? this.pcManager?.subscriber : this.pcManager?.publisher;
if (!this.pcManager) {
throw new UnexpectedConnectionState('PC manager is closed');
}
const transport = subscriber ? this.pcManager.subscriber : this.pcManager.publisher;
const transportName = subscriber ? 'Subscriber' : 'Publisher';
if (!transport) {
throw new ConnectionError(`${transportName} connection not set`);
}

if (
!subscriber &&
!this.pcManager?.publisher.isICEConnected &&
this.pcManager?.publisher.getICEConnectionState() !== 'checking'
!this.pcManager.publisher.isICEConnected &&
this.pcManager.publisher.getICEConnectionState() !== 'checking'
) {
// start negotiation
this.negotiate();
Expand Down Expand Up @@ -1146,19 +1154,21 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
}

/** @internal */
negotiate(): Promise<void> {
async negotiate(): Promise<void> {
// observe signal state
return new Promise<void>((resolve, reject) => {
return new Promise<void>(async (resolve, reject) => {
if (!this.pcManager) {
reject(new NegotiationError('pc manager is not defined'));
reject(new NegotiationError('PC manager is closed'));
return;
}

this.pcManager.requirePublisher();

const abortController = new AbortController();

const handleClosed = () => {
abortController.abort();
log.debug('engine disconnected while negotiation was ongoing');
cleanup();
resolve();
return;
};
Expand All @@ -1168,23 +1178,6 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
}
this.on(EngineEvent.Closing, handleClosed);

const negotiationTimeout = setTimeout(() => {
reject('negotiation timed out');
this.handleDisconnect('negotiation', ReconnectReason.RR_SIGNAL_DISCONNECTED);
}, this.peerConnectionTimeout);

const cleanup = () => {
clearTimeout(negotiationTimeout);
this.off(EngineEvent.Closing, handleClosed);
};

this.pcManager.publisher.once(PCEvents.NegotiationStarted, () => {
this.pcManager?.publisher?.once(PCEvents.NegotiationComplete, () => {
cleanup();
resolve();
});
});

this.pcManager.publisher.once(
PCEvents.RTPVideoPayloadTypes,
(rtpTypes: MediaAttributes['rtp']) => {
Expand All @@ -1199,14 +1192,18 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
},
);

this.pcManager.publisher.negotiate((e) => {
cleanup();
reject(e);
try {
await this.pcManager.negotiate(abortController);
resolve();
} catch (e: any) {
if (e instanceof NegotiationError) {
this.fullReconnectOnNext = true;
}
this.handleDisconnect('negotiation', ReconnectReason.RR_UNKNOWN);
});
reject(e);
} finally {
this.off(EngineEvent.Closing, handleClosed);
}
});
}

Expand Down
7 changes: 5 additions & 2 deletions src/room/Room.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1551,8 +1551,11 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
}

private sendSyncState() {
const previousAnswer = this.engine.pcManager?.subscriber.getLocalDescription();
const previousOffer = this.engine.pcManager?.subscriber.getRemoteDescription();
if (!this.engine.pcManager) {
return;
}
const previousAnswer = this.engine.pcManager.subscriber.getLocalDescription();
const previousOffer = this.engine.pcManager.subscriber.getRemoteDescription();

if (!previousAnswer) {
return;
Expand Down
5 changes: 3 additions & 2 deletions src/room/participant/LocalParticipant.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import {
TrackPublishedResponse,
TrackUnpublishedResponse,
} from '../../proto/livekit_rtc_pb';
import { PCTransportState } from '../PCTransportManager';
import type RTCEngine from '../RTCEngine';
import { defaultVideoCodec } from '../defaults';
import { DeviceUnsupportedError, TrackInvalidError, UnexpectedConnectionState } from '../errors';
Expand Down Expand Up @@ -929,8 +930,8 @@ export default class LocalParticipant extends Participant {
const trackSender = track.sender;
track.sender = undefined;
if (
this.engine.pcManager?.publisher &&
this.engine.pcManager?.publisher.getConnectionState() !== 'closed' &&
this.engine.pcManager &&
this.engine.pcManager.currentState < PCTransportState.FAILED &&
trackSender
) {
try {
Expand Down

0 comments on commit 7ba3e85

Please sign in to comment.