Skip to content

Commit

Permalink
api: itunnel transplants (#1065)
Browse files Browse the repository at this point in the history
  • Loading branch information
wukko authored Jan 20, 2025
2 parents 39752b2 + 600c769 commit 0ab3fe4
Show file tree
Hide file tree
Showing 7 changed files with 115 additions and 16 deletions.
2 changes: 1 addition & 1 deletion api/src/core/api.js
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ export const runAPI = async (express, app, __dirname, isPrimary = true) => {
...Object.entries(req.headers)
]);

return stream(res, { type: 'internal', ...streamInfo });
return stream(res, { type: 'internal', data: streamInfo });
};

app.get('/itunnel', itunnelHandler);
Expand Down
4 changes: 4 additions & 0 deletions api/src/misc/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,7 @@ export function splitFilenameExtension(filename) {
return [ parts.join('.'), ext ]
}
}

export function zip(a, b) {
return a.map((value, i) => [ value, b[i] ]);
}
3 changes: 2 additions & 1 deletion api/src/processing/match-action.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ export default function({ r, host, audioFormat, isAudioOnly, isAudioMuted, disab
filename: r.filenameAttributes ?
createFilename(r.filenameAttributes, filenameStyle, isAudioOnly, isAudioMuted) : r.filename,
fileMetadata: !disableMetadata ? r.fileMetadata : false,
requestIP
requestIP,
originalRequest: r.originalRequest
},
params = {};

Expand Down
38 changes: 28 additions & 10 deletions api/src/processing/services/youtube.js
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ export default async function (o) {
useHLS = false;
}

let innertubeClient = "ANDROID";
let innertubeClient = o.innertubeClient || "ANDROID";

if (cookie) {
useHLS = false;
Expand Down Expand Up @@ -240,12 +240,12 @@ export default async function (o) {
const quality = o.quality === "max" ? 9000 : Number(o.quality);

const normalizeQuality = res => {
const shortestSide = res.height > res.width ? res.width : res.height;
const shortestSide = Math.min(res.height, res.width);
return videoQualities.find(qual => qual >= shortestSide);
}

let video, audio, dubbedLanguage,
codec = o.format || "h264";
codec = o.format || "h264", itag = o.itag;

if (useHLS) {
const hlsManifest = info.streaming_data.hls_manifest_url;
Expand Down Expand Up @@ -351,17 +351,21 @@ export default async function (o) {
Number(b.bitrate) - Number(a.bitrate)
).forEach(format => {
Object.keys(codecList).forEach(yCodec => {
const matchingItag = slot => !itag || itag[slot] === format.itag;
const sorted = sorted_formats[yCodec];
const goodFormat = checkFormat(format, yCodec);
if (!goodFormat) return;

if (format.has_video) {
if (format.has_video && matchingItag('video')) {
sorted.video.push(format);
if (!sorted.bestVideo) sorted.bestVideo = format;
if (!sorted.bestVideo)
sorted.bestVideo = format;
}
if (format.has_audio) {

if (format.has_audio && matchingItag('audio')) {
sorted.audio.push(format);
if (!sorted.bestAudio) sorted.bestAudio = format;
if (!sorted.bestAudio)
sorted.bestAudio = format;
}
})
});
Expand Down Expand Up @@ -448,6 +452,18 @@ export default async function (o) {
youtubeDubName: dubbedLanguage || false,
}

itag = {
video: video.itag,
audio: audio.itag
};

const originalRequest = {
...o,
dispatcher: undefined,
itag,
innertubeClient
};

if (audio && o.isAudioOnly) {
let bestAudio = codec === "h264" ? "m4a" : "opus";
let urls = audio.url;
Expand All @@ -469,6 +485,7 @@ export default async function (o) {
fileMetadata,
bestAudio,
isHLS: useHLS,
originalRequest
}
}

Expand All @@ -491,12 +508,12 @@ export default async function (o) {
filenameAttributes.resolution = `${video.width}x${video.height}`;
filenameAttributes.extension = codecList[codec].container;

video = video.url;
audio = audio.url;

if (innertubeClient === "WEB" && innertube) {
video = video.decipher(innertube.session.player);
audio = audio.decipher(innertube.session.player);
} else {
video = video.url;
audio = audio.url;
}
}

Expand All @@ -512,6 +529,7 @@ export default async function (o) {
filenameAttributes,
fileMetadata,
isHLS: useHLS,
originalRequest
}
}

Expand Down
12 changes: 11 additions & 1 deletion api/src/stream/internal.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ const CHUNK_SIZE = BigInt(8e6); // 8 MB
const min = (a, b) => a < b ? a : b;

async function* readChunks(streamInfo, size) {
let read = 0n;
let read = 0n, chunksSinceTransplant = 0;
while (read < size) {
if (streamInfo.controller.signal.aborted) {
throw new Error("controller aborted");
Expand All @@ -22,6 +22,16 @@ async function* readChunks(streamInfo, size) {
signal: streamInfo.controller.signal
});

if (chunk.statusCode === 403 && chunksSinceTransplant >= 3 && streamInfo.transplant) {
chunksSinceTransplant = 0;
try {
await streamInfo.transplant(streamInfo.dispatcher);
continue;
} catch {}
}

chunksSinceTransplant++;

const expected = min(CHUNK_SIZE, size - read);
const received = BigInt(chunk.headers['content-length']);

Expand Down
70 changes: 68 additions & 2 deletions api/src/stream/manage.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { env } from "../config.js";
import { closeRequest } from "./shared.js";
import { decryptStream, encryptStream } from "../misc/crypto.js";
import { hashHmac } from "../security/secrets.js";
import { zip } from "../misc/utils.js";

// optional dependency
const freebind = env.freebindCIDR && await import('freebind').catch(() => {});
Expand Down Expand Up @@ -40,6 +41,7 @@ export function createStream(obj) {
audioFormat: obj.audioFormat,

isHLS: obj.isHLS || false,
originalRequest: obj.originalRequest
};

// FIXME: this is now a Promise, but it is not awaited
Expand Down Expand Up @@ -100,6 +102,7 @@ export function createInternalStream(url, obj = {}) {
controller,
dispatcher,
isHLS: obj.isHLS,
transplant: obj.transplant
});

let streamLink = new URL('/itunnel', `http://127.0.0.1:${env.tunnelPort}`);
Expand All @@ -115,23 +118,86 @@ export function createInternalStream(url, obj = {}) {
return streamLink.toString();
}

export function destroyInternalStream(url) {
function getInternalTunnelId(url) {
url = new URL(url);
if (url.hostname !== '127.0.0.1') {
return;
}

const id = url.searchParams.get('id');
return url.searchParams.get('id');
}

export function destroyInternalStream(url) {
const id = getInternalTunnelId(url);

if (internalStreamCache.has(id)) {
closeRequest(getInternalStream(id)?.controller);
internalStreamCache.delete(id);
}
}

const transplantInternalTunnels = function(tunnelUrls, transplantUrls) {
if (tunnelUrls.length !== transplantUrls.length) {
return;
}

for (const [ tun, url ] of zip(tunnelUrls, transplantUrls)) {
const id = getInternalTunnelId(tun);
const itunnel = getInternalStream(id);

if (!itunnel) continue;
itunnel.url = url;
}
}

const transplantTunnel = async function (dispatcher) {
if (this.pendingTransplant) {
await this.pendingTransplant;
return;
}

let finished;
this.pendingTransplant = new Promise(r => finished = r);

try {
const handler = await import(`../processing/services/${this.service}.js`);
const response = await handler.default({
...this.originalRequest,
dispatcher
});

if (!response.urls) {
return;
}

response.urls = [response.urls].flat();
if (this.originalRequest.isAudioOnly && response.urls.length > 1) {
response.urls = [response.urls[1]];
} else if (this.originalRequest.isAudioMuted) {
response.urls = [response.urls[0]];
}

const tunnels = [this.urls].flat();
if (tunnels.length !== response.urls.length) {
return;
}

transplantInternalTunnels(tunnels, response.urls);
}
catch {}
finally {
finished();
delete this.pendingTransplant;
}
}

function wrapStream(streamInfo) {
const url = streamInfo.urls;

if (streamInfo.originalRequest) {
streamInfo.transplant = transplantTunnel.bind(streamInfo);
}

if (typeof url === 'string') {
streamInfo.urls = createInternalStream(url, streamInfo);
} else if (Array.isArray(url)) {
Expand Down
2 changes: 1 addition & 1 deletion api/src/stream/stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ export default async function(res, streamInfo) {
return await stream.proxy(streamInfo, res);

case "internal":
return internalStream(streamInfo, res);
return internalStream(streamInfo.data, res);

case "merge":
return stream.merge(streamInfo, res);
Expand Down

0 comments on commit 0ab3fe4

Please sign in to comment.