Skip to content

Commit

Permalink
PCTransportManager (#909)
Browse files Browse the repository at this point in the history
* Make peerconnection private on PCTransport

* revert enablesimulcast for this branch

* Create moody-nails-flash.md

* fix connection check

* initial pcmanager tap in

* WIP managed transport connection

* shift more responsibilities to transport manager

* leaky PC lifecycle

* add subscriber methods to pcmanager

* wip publisher methods on pcmanager

* move primary transport handling into manager

* fix e2ee reconnect behaviour

* fix publisher reconnect scenarios

* wip transport lifecycle

* negotiation in pc manager

* fix peer connection leak

* cleanup

* more cleanup

* pass state on transport callbacks

* Log syncState failure

* More explicit method naming

* make getConnectedAddress configurable

* Rename onLocalOffer to onPublisherOffer

* Send sync state also for publisher only

* move syncState into engine

* Create fuzzy-waves-knock.md
  • Loading branch information
lukasIO authored Nov 21, 2023
1 parent 7ad002d commit 6ecf576
Show file tree
Hide file tree
Showing 8 changed files with 617 additions and 374 deletions.
5 changes: 5 additions & 0 deletions .changeset/fuzzy-waves-knock.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"livekit-client": patch
---

Move PeerConnection logic into PCTransportManager
4 changes: 2 additions & 2 deletions src/connectionHelper/checks/webrtc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ export class WebRTCCheck extends Checker {
}
};

if (this.room.engine.subscriber) {
this.room.engine.subscriber.onIceCandidateError = (ev) => {
if (this.room.engine.pcManager) {
this.room.engine.pcManager.subscriber.onIceCandidateError = (ev) => {
if (ev instanceof RTCPeerConnectionIceErrorEvent) {
this.appendWarning(
`error with ICE candidate: ${ev.errorCode} ${ev.errorText} ${ev.url}`,
Expand Down
91 changes: 62 additions & 29 deletions src/room/PCTransport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,16 @@ export default class PCTransport extends EventEmitter {
private _pc: RTCPeerConnection | null;

private get pc() {
if (this._pc) return this._pc;
throw new UnexpectedConnectionState('Expected peer connection to be available');
if (!this._pc) {
this._pc = this.createPC();
}
return this._pc;
}

private config?: RTCConfiguration;

private mediaConstraints: Record<string, unknown>;

pendingCandidates: RTCIceCandidateInit[] = [];

restartingIce: boolean = false;
Expand All @@ -57,32 +63,53 @@ export default class PCTransport extends EventEmitter {

onConnectionStateChange?: (state: RTCPeerConnectionState) => void;

onIceConnectionStateChange?: (state: RTCIceConnectionState) => void;

onSignalingStatechange?: (state: RTCSignalingState) => void;

onDataChannel?: (ev: RTCDataChannelEvent) => void;

onTrack?: (ev: RTCTrackEvent) => void;

constructor(config?: RTCConfiguration, mediaConstraints: Record<string, unknown> = {}) {
super();
this._pc = isChromiumBased()
this.config = config;
this.mediaConstraints = mediaConstraints;
this._pc = this.createPC();
}

private createPC() {
const pc = isChromiumBased()
? // @ts-expect-error chrome allows additional media constraints to be passed into the RTCPeerConnection constructor
new RTCPeerConnection(config, mediaConstraints)
: new RTCPeerConnection(config);
this._pc.onicecandidate = (ev) => {
new RTCPeerConnection(this.config, this.mediaConstraints)
: new RTCPeerConnection(this.config);

pc.onicecandidate = (ev) => {
if (!ev.candidate) return;
this.onIceCandidate?.(ev.candidate);
};
this._pc.onicecandidateerror = (ev) => {
pc.onicecandidateerror = (ev) => {
this.onIceCandidateError?.(ev);
};
this._pc.onconnectionstatechange = () => {
this.onConnectionStateChange?.(this._pc?.connectionState ?? 'closed');

pc.oniceconnectionstatechange = () => {
this.onIceConnectionStateChange?.(pc.iceConnectionState);
};
this._pc.ondatachannel = (ev) => {

pc.onsignalingstatechange = () => {
this.onSignalingStatechange?.(pc.signalingState);
};

pc.onconnectionstatechange = () => {
this.onConnectionStateChange?.(pc.connectionState);
};
pc.ondatachannel = (ev) => {
this.onDataChannel?.(ev);
};
this._pc.ontrack = (ev) => {
pc.ontrack = (ev) => {
this.onTrack?.(ev);
};
return pc;
}

get isICEConnected(): boolean {
Expand Down Expand Up @@ -168,7 +195,7 @@ export default class PCTransport extends EventEmitter {

if (this.renegotiate) {
this.renegotiate = false;
this.createAndSendOffer();
await this.createAndSendOffer();
} else if (sd.type === 'answer') {
this.emit(PCEvents.NegotiationComplete);
if (sd.sdp) {
Expand All @@ -183,10 +210,10 @@ export default class PCTransport extends EventEmitter {
}

// debounced negotiate interface
negotiate = debounce((onError?: (e: Error) => void) => {
negotiate = debounce(async (onError?: (e: Error) => void) => {
this.emit(PCEvents.NegotiationStarted);
try {
this.createAndSendOffer();
await this.createAndSendOffer();
} catch (e) {
if (onError) {
onError(e as Error);
Expand All @@ -209,11 +236,11 @@ export default class PCTransport extends EventEmitter {
if (this._pc && this._pc.signalingState === 'have-local-offer') {
// we're waiting for the peer to accept our offer, so we'll just wait
// the only exception to this is when ICE restart is needed
const currentSD = this.pc.remoteDescription;
const currentSD = this._pc.remoteDescription;
if (options?.iceRestart && currentSD) {
// TODO: handle when ICE restart is needed but we don't have a remote description
// the best thing to do is to recreate the peerconnection
await this.pc.setRemoteDescription(currentSD);
await this._pc.setRemoteDescription(currentSD);
} else {
this.renegotiate = true;
return;
Expand Down Expand Up @@ -307,51 +334,57 @@ export default class PCTransport extends EventEmitter {
}

addTrack(track: MediaStreamTrack) {
return this.pc.addTrack(track);
if (!this._pc) {
throw new UnexpectedConnectionState('PC closed, cannot add track');
}
return this._pc.addTrack(track);
}

setTrackCodecBitrate(info: TrackBitrateInfo) {
this.trackBitrates.push(info);
}

setConfiguration(rtcConfig: RTCConfiguration) {
return this.pc.setConfiguration(rtcConfig);
if (!this._pc) {
throw new UnexpectedConnectionState('PC closed, cannot configure');
}
return this._pc?.setConfiguration(rtcConfig);
}

canRemoveTrack(): boolean {
return !!this.pc.removeTrack;
return !!this._pc?.removeTrack;
}

removeTrack(sender: RTCRtpSender) {
return this.pc.removeTrack(sender);
return this._pc?.removeTrack(sender);
}

getConnectionState() {
return this.pc.connectionState;
return this._pc?.connectionState ?? 'closed';
}

getICEConnectionState() {
return this.pc.iceConnectionState;
return this._pc?.iceConnectionState ?? 'closed';
}

getSignallingState() {
return this.pc.signalingState;
return this._pc?.signalingState ?? 'closed';
}

getTransceivers() {
return this.pc.getTransceivers();
return this._pc?.getTransceivers() ?? [];
}

getSenders() {
return this.pc.getSenders();
return this._pc?.getSenders() ?? [];
}

getLocalDescription() {
return this.pc.localDescription;
return this._pc?.localDescription;
}

getRemoteDescription() {
return this.pc.remoteDescription;
return this.pc?.remoteDescription;
}

getStats() {
Expand Down Expand Up @@ -395,7 +428,7 @@ export default class PCTransport extends EventEmitter {
return candidates.get(selectedID);
}

close() {
close = () => {
if (!this._pc) {
return;
}
Expand All @@ -412,7 +445,7 @@ export default class PCTransport extends EventEmitter {
this._pc.onconnectionstatechange = null;
this._pc.oniceconnectionstatechange = null;
this._pc = null;
}
};

private async setMungedSDP(sd: RTCSessionDescriptionInit, munged?: string, remote?: boolean) {
if (munged) {
Expand Down
Loading

0 comments on commit 6ecf576

Please sign in to comment.