diff --git a/packages/p2p-media-loader-core/src/http-loader.ts b/packages/p2p-media-loader-core/src/http-loader.ts index fb58d23f..10011317 100644 --- a/packages/p2p-media-loader-core/src/http-loader.ts +++ b/packages/p2p-media-loader-core/src/http-loader.ts @@ -35,15 +35,9 @@ export async function fulfillHttpSegmentRequest( }); if (fetchResponse.ok) { - const totalBytesString = fetchResponse.headers.get("Content-Length"); - if (!fetchResponse.body) { - fetchResponse.arrayBuffer().then((data) => { - requestControls.addLoadedChunk(data); - requestControls.completeOnSuccess(); - }); - return; - } + if (!fetchResponse.body) return; + const totalBytesString = fetchResponse.headers.get("Content-Length"); if (totalBytesString) request.setTotalBytes(+totalBytesString); const reader = fetchResponse.body.getReader(); diff --git a/packages/p2p-media-loader-core/src/hybrid-loader.ts b/packages/p2p-media-loader-core/src/hybrid-loader.ts index f0f4f415..3f5cc42a 100644 --- a/packages/p2p-media-loader-core/src/hybrid-loader.ts +++ b/packages/p2p-media-loader-core/src/hybrid-loader.ts @@ -4,14 +4,12 @@ import { SegmentsMemoryStorage } from "./segments-storage"; import { Settings, CoreEventHandlers } from "./types"; import { BandwidthApproximator } from "./bandwidth-approximator"; import { Playback, QueueItem } from "./internal-types"; -import { - RequestsContainer, - EngineCallbacks, - HybridLoaderRequest, - Request, -} from "./request-container"; +import { RequestsContainer } from "./request-container"; +import { Request, EngineCallbacks, HybridLoaderRequest } from "./request"; import * as QueueUtils from "./utils/queue"; import * as LoggerUtils from "./utils/logger"; +import * as StreamUtils from "./utils/stream"; +import * as Utils from "./utils/utils"; import { P2PLoadersContainer } from "./p2p/loaders-container"; import { PeerRequestError } from "./p2p/peer"; import debug from "debug"; @@ -40,7 +38,7 @@ export class HybridLoader { this.lastRequestedSegment = requestedSegment; const activeStream = requestedSegment.stream; this.playback = { position: requestedSegment.startTime, rate: 1 }; - this.segmentAvgDuration = getSegmentAvgDuration(activeStream); + this.segmentAvgDuration = StreamUtils.getSegmentAvgDuration(activeStream); this.requests = new RequestsContainer( requestedSegment.stream.type, this.bandwidthApproximator @@ -159,23 +157,6 @@ export class HybridLoader { if (statuses.isHighDemand) { if (request?.type === "http") continue; - - if (request?.type === "p2p") { - const timeToPlayback = getTimeToSegmentPlayback( - segment, - this.playback - ); - const remainingDownloadTime = - getPredictedRemainingDownloadTime(request); - if ( - remainingDownloadTime === undefined || - remainingDownloadTime > timeToPlayback - ) { - request.abort(); - } else { - continue; - } - } if (this.requests.executingHttpCount < simultaneousHttpDownloads) { void this.loadThroughHttp(item); continue; @@ -218,8 +199,11 @@ export class HybridLoader { // api method for engines abortSegment(segment: Segment) { + const request = this.requests.get(segment); + if (!request) return; + request.abort(); + this.createProcessQueueMicrotask(); this.logger.engine("abort: ", LoggerUtils.getSegmentString(segment)); - this.requests.abortEngineRequest(segment); } private async loadThroughHttp(item: QueueItem, isRandom = false) { @@ -286,7 +270,7 @@ export class HybridLoader { const shouldLoad = Math.random() < probability; if (!shouldLoad) return; - const item = queue[Math.floor(Math.random() * queue.length)]; + const item = Utils.getRandomItem(queue); void this.loadThroughHttp(item, true); this.logger.loader( @@ -387,36 +371,3 @@ function* arrayBackwards(arr: T[]) { yield arr[i]; } } - -function getTimeToSegmentPlayback(segment: Segment, playback: Playback) { - return Math.max(segment.startTime - playback.position, 0) / playback.rate; -} - -function getPredictedRemainingDownloadTime(request: HybridLoaderRequest) { - const { progress } = request; - if (!progress || progress.lastLoadedChunkTimestamp === undefined) { - return undefined; - } - - const now = performance.now(); - const bandwidth = - progress.percent / - (progress.lastLoadedChunkTimestamp - progress.startTimestamp); - const remainingDownloadPercent = 100 - progress.percent; - const predictedRemainingTimeFromLastDownload = - remainingDownloadPercent / bandwidth; - const timeFromLastDownload = now - progress.lastLoadedChunkTimestamp; - return (predictedRemainingTimeFromLastDownload - timeFromLastDownload) / 1000; -} - -function getSegmentAvgDuration(stream: StreamWithSegments) { - const { segments } = stream; - let sumDuration = 0; - const size = segments.size; - for (const segment of segments.values()) { - const duration = segment.endTime - segment.startTime; - sumDuration += duration; - } - - return sumDuration / size; -} diff --git a/packages/p2p-media-loader-core/src/p2p/loader.ts b/packages/p2p-media-loader-core/src/p2p/loader.ts index 619492c6..a8e04762 100644 --- a/packages/p2p-media-loader-core/src/p2p/loader.ts +++ b/packages/p2p-media-loader-core/src/p2p/loader.ts @@ -4,10 +4,12 @@ import * as PeerUtil from "../utils/peer"; import { Segment, Settings, StreamWithSegments } from "../types"; import { QueueItem } from "../internal-types"; import { SegmentsMemoryStorage } from "../segments-storage"; -import * as Utils from "../utils/utils"; import * as LoggerUtils from "../utils/logger"; +import * as StreamUtils from "../utils/stream"; +import * as Utils from "../utils/utils"; import { PeerSegmentStatus } from "../enums"; -import { Request, RequestsContainer } from "../request-container"; +import { RequestsContainer } from "../request-container"; +import { Request } from "../request"; import debug from "debug"; export class P2PLoader { @@ -26,7 +28,7 @@ export class P2PLoader { private readonly settings: Settings ) { this.peerId = PeerUtil.generatePeerId(); - const streamExternalId = Utils.getStreamExternalId( + const streamExternalId = StreamUtils.getStreamExternalId( this.streamManifestUrl, this.stream ); @@ -108,7 +110,7 @@ export class P2PLoader { } const peer = untestedPeers.length - ? getRandomItem(untestedPeers) + ? Utils.getRandomItem(untestedPeers) : fastestPeer; if (!peer) return; @@ -182,7 +184,7 @@ export class P2PLoader { }; private async onSegmentRequested(peer: Peer, segmentExternalId: string) { - const segment = Utils.getSegmentFromStreamByExternalId( + const segment = StreamUtils.getSegmentFromStreamByExternalId( this.stream, segmentExternalId ); @@ -245,7 +247,3 @@ function utf8ToHex(utf8String: string) { return result; } - -function getRandomItem(items: T[]): T { - return items[Math.floor(Math.random() * items.length)]; -} diff --git a/packages/p2p-media-loader-core/src/p2p/peer.ts b/packages/p2p-media-loader-core/src/p2p/peer.ts index cb9da03f..d3d52594 100644 --- a/packages/p2p-media-loader-core/src/p2p/peer.ts +++ b/packages/p2p-media-loader-core/src/p2p/peer.ts @@ -8,7 +8,7 @@ import { } from "../internal-types"; import { PeerCommandType, PeerSegmentStatus } from "../enums"; import * as PeerUtil from "../utils/peer"; -import { Request, RequestControls } from "../request-container"; +import { Request, RequestControls } from "../request"; import { Segment, Settings } from "../types"; import * as Utils from "../utils/utils"; import debug from "debug"; diff --git a/packages/p2p-media-loader-core/src/request-container.ts b/packages/p2p-media-loader-core/src/request-container.ts index 2185bf26..03f41338 100644 --- a/packages/p2p-media-loader-core/src/request-container.ts +++ b/packages/p2p-media-loader-core/src/request-container.ts @@ -1,182 +1,8 @@ -import { Segment, SegmentResponse, StreamType } from "./types"; -import { PeerRequestError } from "./p2p/peer"; -import { HttpLoaderError } from "./http-loader"; +import { Segment, StreamType } from "./types"; import Debug from "debug"; import { EventDispatcher } from "./event-dispatcher"; -import * as Utils from "./utils/utils"; import { BandwidthApproximator } from "./bandwidth-approximator"; - -export type EngineCallbacks = { - onSuccess: (response: SegmentResponse) => void; - onError: (reason: "failed" | "abort") => void; -}; - -export type LoadProgress = { - startTimestamp: number; - lastLoadedChunkTimestamp?: number; - loadedBytes: number; - totalBytes?: number; -}; - -type HybridLoaderRequestBase = { - abort: () => void; - progress: LoadProgress; -}; - -export type HttpRequest = HybridLoaderRequestBase & { - type: "http"; - error?: HttpLoaderError; -}; - -export type P2PRequest = HybridLoaderRequestBase & { - type: "p2p"; - error?: PeerRequestError; -}; - -export type HybridLoaderRequest = HttpRequest | P2PRequest; - -type RequestEvents = { - onCompleted: (request: Request, data: ArrayBuffer) => void; - onError: (request: Request, data: Error) => void; -}; - -type RequestStatus = - | "not-started" - | "loading" - | "succeed" - | "failed" - | "aborted"; - -export class Request extends EventDispatcher { - readonly id: string; - private _engineCallbacks?: EngineCallbacks; - private hybridLoaderRequest?: HybridLoaderRequest; - private prevAttempts: HybridLoaderRequest[] = []; - private chunks: Uint8Array[] = []; - private _loadedBytes = 0; - private _totalBytes?: number; - private _status: RequestStatus = "not-started"; - - constructor( - readonly segment: Segment, - private readonly bandwidthApproximator: BandwidthApproximator - ) { - super(); - this.id = getRequestItemId(segment); - } - - get status() { - return this._status; - } - - get isSegmentRequestedByEngine(): boolean { - return !!this._engineCallbacks; - } - - get type() { - return this.hybridLoaderRequest?.type; - } - - get loadedBytes() { - return this._loadedBytes; - } - - set engineCallbacks(callbacks: EngineCallbacks) { - if (this._engineCallbacks) { - throw new Error("Segment is already requested by engine"); - } - this._engineCallbacks = callbacks; - } - - get totalBytes(): number | undefined { - return this._totalBytes; - } - - setTotalBytes(value: number) { - if (this._totalBytes !== undefined) { - throw new Error("Request total bytes value is already set"); - } - this._totalBytes = value; - } - - get loadedPercent() { - if (!this._totalBytes) return; - return Utils.getPercent(this.loadedBytes, this._totalBytes); - } - - start(type: "http" | "p2p", abortLoading: () => void): RequestControls { - if (this._status === "loading") { - throw new Error("Request has been already started."); - } - - this._status = "loading"; - this.hybridLoaderRequest = { - type, - abort: abortLoading, - progress: { - loadedBytes: 0, - startTimestamp: performance.now(), - }, - }; - - return { - addLoadedChunk: this.addLoadedChunk, - completeOnSuccess: this.completeOnSuccess, - cancelOnError: this.cancelOnError, - }; - } - - abort() { - if (!this.hybridLoaderRequest) return; - this.hybridLoaderRequest.abort(); - this._status = "aborted"; - } - - private completeOnSuccess = () => { - this.throwErrorIfNotLoadingStatus(); - const data = Utils.joinChunks(this.chunks); - this._status = "succeed"; - this._engineCallbacks?.onSuccess({ - data, - bandwidth: this.bandwidthApproximator.getBandwidth(), - }); - this.dispatch("onCompleted", this, data); - }; - - private addLoadedChunk = (chunk: Uint8Array) => { - this.throwErrorIfNotLoadingStatus(); - const { hybridLoaderRequest: request } = this; - if (!request) return; - this.chunks.push(chunk); - request.progress.lastLoadedChunkTimestamp = performance.now(); - this._loadedBytes += chunk.length; - }; - - private cancelOnError = (error: Error) => { - this.throwErrorIfNotLoadingStatus(); - if (!this.hybridLoaderRequest) return; - this._status = "failed"; - this.hybridLoaderRequest.error = error; - this.prevAttempts.push(this.hybridLoaderRequest); - this.dispatch("onError", this, error); - }; - - private throwErrorIfNotLoadingStatus() { - if (this._status !== "loading") { - throw new Error("Request has been already completed/aborted/failed."); - } - } -} - -export type RequestControls = { - addLoadedChunk: Request["addLoadedChunk"]; - completeOnSuccess: Request["completeOnSuccess"]; - cancelOnError: Request["cancelOnError"]; -}; - -function getRequestItemId(segment: Segment) { - return segment.localId; -} +import { Request, RequestEvents } from "./request"; type RequestsContainerEvents = { httpRequestsUpdated: () => void; @@ -212,12 +38,12 @@ export class RequestsContainer { } get(segment: Segment) { - const id = getRequestItemId(segment); + const id = Request.getRequestItemId(segment); return this.requests.get(id); } getOrCreateRequest(segment: Segment) { - const id = getRequestItemId(segment); + const id = Request.getRequestItemId(segment); let request = this.requests.get(id); if (!request) { request = new Request(segment, this.bandwidthApproximator); @@ -232,7 +58,8 @@ export class RequestsContainer { }; remove(value: Segment | Request) { - const id = value instanceof Request ? value.id : getRequestItemId(value); + const id = + value instanceof Request ? value.id : Request.getRequestItemId(value); this.requests.delete(id); } @@ -253,17 +80,17 @@ export class RequestsContainer { } isHttpRequested(segment: Segment): boolean { - const id = getRequestItemId(segment); + const id = Request.getRequestItemId(segment); return this.requests.get(id)?.type === "http"; } isP2PRequested(segment: Segment): boolean { - const id = getRequestItemId(segment); + const id = Request.getRequestItemId(segment); return this.requests.get(id)?.type === "p2p"; } isHybridLoaderRequested(segment: Segment): boolean { - const id = getRequestItemId(segment); + const id = Request.getRequestItemId(segment); return !!this.requests.get(id)?.type; } diff --git a/packages/p2p-media-loader-core/src/request.ts b/packages/p2p-media-loader-core/src/request.ts new file mode 100644 index 00000000..b20a1057 --- /dev/null +++ b/packages/p2p-media-loader-core/src/request.ts @@ -0,0 +1,183 @@ +import { EventDispatcher } from "./event-dispatcher"; +import { Segment, SegmentResponse } from "./types"; +import { BandwidthApproximator } from "./bandwidth-approximator"; +import * as Utils from "./utils/utils"; +import { HttpLoaderError } from "./http-loader"; +import { PeerRequestError } from "./p2p/peer"; + +export type EngineCallbacks = { + onSuccess: (response: SegmentResponse) => void; + onError: (reason: "failed" | "abort") => void; +}; + +export type LoadProgress = { + startTimestamp: number; + lastLoadedChunkTimestamp?: number; + loadedBytes: number; + totalBytes?: number; +}; + +type HybridLoaderRequestBase = { + abort: () => void; + progress: LoadProgress; +}; + +type HttpRequest = HybridLoaderRequestBase & { + type: "http"; + error?: HttpLoaderError; +}; + +type P2PRequest = HybridLoaderRequestBase & { + type: "p2p"; + error?: PeerRequestError; +}; + +export type HybridLoaderRequest = HttpRequest | P2PRequest; + +export type RequestEvents = { + onCompleted: (request: Request, data: ArrayBuffer) => void; + onError: (request: Request, data: Error) => void; +}; + +export type RequestControls = { + addLoadedChunk: Request["addLoadedChunk"]; + completeOnSuccess: Request["completeOnSuccess"]; + cancelOnError: Request["cancelOnError"]; +}; + +type RequestStatus = + | "not-started" + | "loading" + | "succeed" + | "failed" + | "aborted"; + +export class Request extends EventDispatcher { + readonly id: string; + private _engineCallbacks?: EngineCallbacks; + private hybridLoaderRequest?: HybridLoaderRequest; + private prevAttempts: HybridLoaderRequest[] = []; + private chunks: Uint8Array[] = []; + private _loadedBytes = 0; + private _totalBytes?: number; + private _status: RequestStatus = "not-started"; + + constructor( + readonly segment: Segment, + private readonly bandwidthApproximator: BandwidthApproximator + ) { + super(); + this.id = Request.getRequestItemId(segment); + } + + get status() { + return this._status; + } + + get isSegmentRequestedByEngine(): boolean { + return !!this._engineCallbacks; + } + + get type() { + return this.hybridLoaderRequest?.type; + } + + get loadedBytes() { + return this._loadedBytes; + } + + set engineCallbacks(callbacks: EngineCallbacks) { + if (this._engineCallbacks) { + throw new Error("Segment is already requested by engine"); + } + this._engineCallbacks = callbacks; + } + + get totalBytes(): number | undefined { + return this._totalBytes; + } + + setTotalBytes(value: number) { + if (this._totalBytes !== undefined) { + throw new Error("Request total bytes value is already set"); + } + this._totalBytes = value; + } + + get loadedPercent() { + if (!this._totalBytes) return; + return Utils.getPercent(this.loadedBytes, this._totalBytes); + } + + start(type: "http" | "p2p", abortLoading: () => void): RequestControls { + if (this._status === "loading") { + throw new Error("Request has been already started."); + } + + this._status = "loading"; + this.hybridLoaderRequest = { + type, + abort: abortLoading, + progress: { + loadedBytes: 0, + startTimestamp: performance.now(), + }, + }; + + return { + addLoadedChunk: this.addLoadedChunk, + completeOnSuccess: this.completeOnSuccess, + cancelOnError: this.cancelOnError, + }; + } + + abort() { + if (!this.hybridLoaderRequest) return; + this.hybridLoaderRequest.abort(); + this._status = "aborted"; + } + + abortEngineRequest() { + this._engineCallbacks?.onError("abort"); + this._engineCallbacks = undefined; + } + + private completeOnSuccess = () => { + this.throwErrorIfNotLoadingStatus(); + const data = Utils.joinChunks(this.chunks); + this._status = "succeed"; + this._engineCallbacks?.onSuccess({ + data, + bandwidth: this.bandwidthApproximator.getBandwidth(), + }); + this.dispatch("onCompleted", this, data); + }; + + private addLoadedChunk = (chunk: Uint8Array) => { + this.throwErrorIfNotLoadingStatus(); + const { hybridLoaderRequest: request } = this; + if (!request) return; + this.chunks.push(chunk); + request.progress.lastLoadedChunkTimestamp = performance.now(); + this._loadedBytes += chunk.length; + }; + + private cancelOnError = (error: Error) => { + this.throwErrorIfNotLoadingStatus(); + if (!this.hybridLoaderRequest) return; + this._status = "failed"; + this.hybridLoaderRequest.error = error; + this.prevAttempts.push(this.hybridLoaderRequest); + this.dispatch("onError", this, error); + }; + + private throwErrorIfNotLoadingStatus() { + if (this._status !== "loading") { + throw new Error("Request has been already completed/aborted/failed."); + } + } + + static getRequestItemId(segment: Segment) { + return segment.localId; + } +} diff --git a/packages/p2p-media-loader-core/src/utils/stream.ts b/packages/p2p-media-loader-core/src/utils/stream.ts index 5845e0ac..39d63cba 100644 --- a/packages/p2p-media-loader-core/src/utils/stream.ts +++ b/packages/p2p-media-loader-core/src/utils/stream.ts @@ -1,4 +1,5 @@ import { Segment, Stream, StreamWithSegments } from "../types"; +import { Playback } from "../internal-types"; const PEER_PROTOCOL_VERSION = "V1"; @@ -32,3 +33,19 @@ export function getSegmentFromStreamByExternalId( export function getStreamShortId(stream: Stream) { return `${stream.type}-${stream.index}`; } + +export function getSegmentAvgDuration(stream: StreamWithSegments) { + const { segments } = stream; + let sumDuration = 0; + const size = segments.size; + for (const segment of segments.values()) { + const duration = segment.endTime - segment.startTime; + sumDuration += duration; + } + + return sumDuration / size; +} + +export function getTimeToSegmentPlayback(segment: Segment, playback: Playback) { + return Math.max(segment.startTime - playback.position, 0) / playback.rate; +} diff --git a/packages/p2p-media-loader-core/src/utils/utils.ts b/packages/p2p-media-loader-core/src/utils/utils.ts index 8db9b2bb..a47d2a18 100644 --- a/packages/p2p-media-loader-core/src/utils/utils.ts +++ b/packages/p2p-media-loader-core/src/utils/utils.ts @@ -35,3 +35,7 @@ export function joinChunks( export function getPercent(numerator: number, denominator: number): number { return (numerator / denominator) * 100; } + +export function getRandomItem(items: T[]): T { + return items[Math.floor(Math.random() * items.length)]; +}