Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

api: itunnel transplants #1065

Merged
merged 7 commits into from
Jan 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/src/core/api.js
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ export const runAPI = async (express, app, __dirname, isPrimary = true) => {
...Object.entries(req.headers)
]);

return stream(res, { type: 'internal', ...streamInfo });
return stream(res, { type: 'internal', data: streamInfo });
};

app.get('/itunnel', itunnelHandler);
Expand Down
4 changes: 4 additions & 0 deletions api/src/misc/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,7 @@ export function splitFilenameExtension(filename) {
return [ parts.join('.'), ext ]
}
}

export function zip(a, b) {
return a.map((value, i) => [ value, b[i] ]);
}
3 changes: 2 additions & 1 deletion api/src/processing/match-action.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ export default function({ r, host, audioFormat, isAudioOnly, isAudioMuted, disab
filename: r.filenameAttributes ?
createFilename(r.filenameAttributes, filenameStyle, isAudioOnly, isAudioMuted) : r.filename,
fileMetadata: !disableMetadata ? r.fileMetadata : false,
requestIP
requestIP,
originalRequest: r.originalRequest
},
params = {};

Expand Down
38 changes: 28 additions & 10 deletions api/src/processing/services/youtube.js
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ export default async function (o) {
useHLS = false;
}

let innertubeClient = "ANDROID";
let innertubeClient = o.innertubeClient || "ANDROID";

if (cookie) {
useHLS = false;
Expand Down Expand Up @@ -240,12 +240,12 @@ export default async function (o) {
const quality = o.quality === "max" ? 9000 : Number(o.quality);

const normalizeQuality = res => {
const shortestSide = res.height > res.width ? res.width : res.height;
const shortestSide = Math.min(res.height, res.width);
return videoQualities.find(qual => qual >= shortestSide);
}

let video, audio, dubbedLanguage,
codec = o.format || "h264";
codec = o.format || "h264", itag = o.itag;

if (useHLS) {
const hlsManifest = info.streaming_data.hls_manifest_url;
Expand Down Expand Up @@ -351,17 +351,21 @@ export default async function (o) {
Number(b.bitrate) - Number(a.bitrate)
).forEach(format => {
Object.keys(codecList).forEach(yCodec => {
const matchingItag = slot => !itag || itag[slot] === format.itag;
const sorted = sorted_formats[yCodec];
const goodFormat = checkFormat(format, yCodec);
if (!goodFormat) return;

if (format.has_video) {
if (format.has_video && matchingItag('video')) {
sorted.video.push(format);
if (!sorted.bestVideo) sorted.bestVideo = format;
if (!sorted.bestVideo)
sorted.bestVideo = format;
}
if (format.has_audio) {

if (format.has_audio && matchingItag('audio')) {
sorted.audio.push(format);
if (!sorted.bestAudio) sorted.bestAudio = format;
if (!sorted.bestAudio)
sorted.bestAudio = format;
}
})
});
Expand Down Expand Up @@ -448,6 +452,18 @@ export default async function (o) {
youtubeDubName: dubbedLanguage || false,
}

itag = {
video: video.itag,
audio: audio.itag
};

const originalRequest = {
...o,
dispatcher: undefined,
itag,
innertubeClient
};

if (audio && o.isAudioOnly) {
let bestAudio = codec === "h264" ? "m4a" : "opus";
let urls = audio.url;
Expand All @@ -469,6 +485,7 @@ export default async function (o) {
fileMetadata,
bestAudio,
isHLS: useHLS,
originalRequest
}
}

Expand All @@ -491,12 +508,12 @@ export default async function (o) {
filenameAttributes.resolution = `${video.width}x${video.height}`;
filenameAttributes.extension = codecList[codec].container;

video = video.url;
audio = audio.url;

if (innertubeClient === "WEB" && innertube) {
video = video.decipher(innertube.session.player);
audio = audio.decipher(innertube.session.player);
} else {
video = video.url;
audio = audio.url;
}
}

Expand All @@ -512,6 +529,7 @@ export default async function (o) {
filenameAttributes,
fileMetadata,
isHLS: useHLS,
originalRequest
}
}

Expand Down
12 changes: 11 additions & 1 deletion api/src/stream/internal.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ const CHUNK_SIZE = BigInt(8e6); // 8 MB
const min = (a, b) => a < b ? a : b;

async function* readChunks(streamInfo, size) {
let read = 0n;
let read = 0n, chunksSinceTransplant = 0;
while (read < size) {
if (streamInfo.controller.signal.aborted) {
throw new Error("controller aborted");
Expand All @@ -22,6 +22,16 @@ async function* readChunks(streamInfo, size) {
signal: streamInfo.controller.signal
});

if (chunk.statusCode === 403 && chunksSinceTransplant >= 3 && streamInfo.transplant) {
chunksSinceTransplant = 0;
try {
await streamInfo.transplant(streamInfo.dispatcher);
continue;
} catch {}
}

chunksSinceTransplant++;

const expected = min(CHUNK_SIZE, size - read);
const received = BigInt(chunk.headers['content-length']);

Expand Down
70 changes: 68 additions & 2 deletions api/src/stream/manage.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { env } from "../config.js";
import { closeRequest } from "./shared.js";
import { decryptStream, encryptStream } from "../misc/crypto.js";
import { hashHmac } from "../security/secrets.js";
import { zip } from "../misc/utils.js";

// optional dependency
const freebind = env.freebindCIDR && await import('freebind').catch(() => {});
Expand Down Expand Up @@ -40,6 +41,7 @@ export function createStream(obj) {
audioFormat: obj.audioFormat,

isHLS: obj.isHLS || false,
originalRequest: obj.originalRequest
};

// FIXME: this is now a Promise, but it is not awaited
Expand Down Expand Up @@ -100,6 +102,7 @@ export function createInternalStream(url, obj = {}) {
controller,
dispatcher,
isHLS: obj.isHLS,
transplant: obj.transplant
});

let streamLink = new URL('/itunnel', `http://127.0.0.1:${env.tunnelPort}`);
Expand All @@ -115,23 +118,86 @@ export function createInternalStream(url, obj = {}) {
return streamLink.toString();
}

export function destroyInternalStream(url) {
function getInternalTunnelId(url) {
url = new URL(url);
if (url.hostname !== '127.0.0.1') {
return;
}

const id = url.searchParams.get('id');
return url.searchParams.get('id');
}

export function destroyInternalStream(url) {
const id = getInternalTunnelId(url);

if (internalStreamCache.has(id)) {
closeRequest(getInternalStream(id)?.controller);
internalStreamCache.delete(id);
}
}

const transplantInternalTunnels = function(tunnelUrls, transplantUrls) {
if (tunnelUrls.length !== transplantUrls.length) {
return;
}

for (const [ tun, url ] of zip(tunnelUrls, transplantUrls)) {
const id = getInternalTunnelId(tun);
const itunnel = getInternalStream(id);

if (!itunnel) continue;
itunnel.url = url;
}
}

const transplantTunnel = async function (dispatcher) {
if (this.pendingTransplant) {
await this.pendingTransplant;
return;
}

let finished;
this.pendingTransplant = new Promise(r => finished = r);

try {
const handler = await import(`../processing/services/${this.service}.js`);
const response = await handler.default({
...this.originalRequest,
dispatcher
});

if (!response.urls) {
return;
}

response.urls = [response.urls].flat();
if (this.originalRequest.isAudioOnly && response.urls.length > 1) {
response.urls = [response.urls[1]];
} else if (this.originalRequest.isAudioMuted) {
response.urls = [response.urls[0]];
}

const tunnels = [this.urls].flat();
if (tunnels.length !== response.urls.length) {
return;
}

transplantInternalTunnels(tunnels, response.urls);
}
catch {}
finally {
finished();
delete this.pendingTransplant;
}
}

function wrapStream(streamInfo) {
const url = streamInfo.urls;

if (streamInfo.originalRequest) {
streamInfo.transplant = transplantTunnel.bind(streamInfo);
}

if (typeof url === 'string') {
streamInfo.urls = createInternalStream(url, streamInfo);
} else if (Array.isArray(url)) {
Expand Down
2 changes: 1 addition & 1 deletion api/src/stream/stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ export default async function(res, streamInfo) {
return await stream.proxy(streamInfo, res);

case "internal":
return internalStream(streamInfo, res);
return internalStream(streamInfo.data, res);

case "merge":
return stream.merge(streamInfo, res);
Expand Down
Loading