Skip to content

Commit

Permalink
fix(webrtc): fix webrtc data channel max size
Browse files Browse the repository at this point in the history
  • Loading branch information
unadlib committed Jan 2, 2023
1 parent cac8bfc commit 4833075
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 6 deletions.
10 changes: 9 additions & 1 deletion examples/webrtc/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,16 @@ import { WebRTCTransport, listen } from 'data-transport';
import SimplePeer from 'simple-peer';
import { Other, Main } from './interface';

const bigData = Array(10 ** 5 * 3)
.fill(1)
.map((_, i) => ({
i: Math.random(),
}));

class MainTransport extends WebRTCTransport<Main> implements Other {
async help() {
const response = await this.emit('help', { text: 'SOS!!!' });
// @ts-ignore
const response = await this.emit('help', { text: 'SOS!!!', bigData });
return response;
}

Expand All @@ -14,6 +21,7 @@ class MainTransport extends WebRTCTransport<Main> implements Other {
const input = document.getElementById('input') as HTMLInputElement;
return {
text: `hello ${input?.value || 'anonymous'}, ${options.num}`,
bigData,
};
}
}
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "data-transport",
"version": "3.3.4",
"version": "3.3.5",
"description": "A simple and responsible transport",
"main": "lib/index.js",
"unpkg": "dist/index.umd.js",
Expand Down
109 changes: 105 additions & 4 deletions src/transports/webRTCTransport.ts
Original file line number Diff line number Diff line change
@@ -1,25 +1,82 @@
import type { Instance } from 'simple-peer';
import type { ListenerOptions, TransportOptions } from '../interface';
import type {
ListenerOptions,
SendOptions,
TransportOptions,
} from '../interface';
import { Transport } from '../transport';

const MAX_CHUNK_SIZE = 1024 * 60;

const BUFFER_FULL_THRESHOLD = 1024 * 64;

export interface WebRTCTransportOptions extends Partial<TransportOptions> {
peer: Instance;
}

interface WebRTCTransportSendOptions extends SendOptions<{}> {
isLastChunk?: boolean;
chunkId?: number;
}

abstract class WebRTCTransport<T = any, P = any> extends Transport<T, P> {
private receiveBuffer = new Map<string, { data: any[]; timestamp: number }>();

constructor({
peer,
listener = (callback) => {
const handler = (data: string) => {
callback(JSON.parse(data) as ListenerOptions);
const message: WebRTCTransportSendOptions = JSON.parse(data);
const key = Object.prototype.hasOwnProperty.call(message, 'request')
? 'request'
: 'response';
const buffer = this.receiveBuffer.get(
message.__DATA_TRANSPORT_UUID__
) ?? {
data: [],
timestamp: Date.now(),
};
this.receiveBuffer.set(message.__DATA_TRANSPORT_UUID__, buffer);
buffer.data[message.chunkId!] = message[key];
buffer.timestamp = Date.now();
if (message.isLastChunk) {
const data = JSON.parse(buffer.data.join(''));
message[key] = key === 'request' ? data : data[0];
delete message.isLastChunk;
callback(message as ListenerOptions);
this.receiveBuffer.delete(message.__DATA_TRANSPORT_UUID__);
}
};
peer.on('data', handler);
return () => {
peer.off('data', handler);
};
},
sender = (message) => {
peer.send(JSON.stringify(message));
sender = (message: WebRTCTransportSendOptions) => {
const key = Object.prototype.hasOwnProperty.call(message, 'request')
? 'request'
: 'response';
message[key] = JSON.stringify(
key === 'request'
? message.request
: typeof message.response !== 'undefined'
? [message.response]
: []
);
let chunkId = 0;
while ((message[key] as string).length > 0) {
const data = {
...message,
[key]: (message[key] as string).slice(0, MAX_CHUNK_SIZE),
chunkId,
};
if ((data[key] as string).length < MAX_CHUNK_SIZE) {
data.isLastChunk = true;
}
peer.send(JSON.stringify(data));
message[key] = (message[key] as string).slice(MAX_CHUNK_SIZE);
chunkId += 1;
}
},
...options
}: WebRTCTransportOptions) {
Expand All @@ -28,6 +85,50 @@ abstract class WebRTCTransport<T = any, P = any> extends Transport<T, P> {
listener,
sender,
});

let webRTCPaused = false;
const webRTCMessageQueue: any[] = [];
const peerSend = peer.send.bind(peer);
const sendMessageQueued = () => {
webRTCPaused = false;
let message = webRTCMessageQueue.shift();
while (message) {
if (
(peer as any)._channel.bufferedAmount &&
(peer as any)._channel.bufferedAmount > BUFFER_FULL_THRESHOLD
) {
webRTCPaused = true;
webRTCMessageQueue.unshift(message);
const listener = () => {
(peer as any)._channel.removeEventListener(
'bufferedamountlow',
listener
);
sendMessageQueued();
};
(peer as any)._channel.addEventListener(
'bufferedamountlow',
listener
);
return;
}
try {
peerSend(message);
message = webRTCMessageQueue.shift();
} catch (error: any) {
throw new Error(
`Error send message to peer: ${error.message}`
);
}
}
};
peer.send = function (chunk: any) {
webRTCMessageQueue.push(chunk);
if (webRTCPaused) {
return;
}
sendMessageQueued();
};
}
}

Expand Down

0 comments on commit 4833075

Please sign in to comment.