Skip to content

Commit

Permalink
Fix types errors.
Browse files Browse the repository at this point in the history
  • Loading branch information
i-zolotarenko committed Nov 16, 2023
1 parent 92021d3 commit 3b4dbdd
Show file tree
Hide file tree
Showing 12 changed files with 91 additions and 66 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { LoadProgress } from "./request-container";
import { LoadProgress } from "./request";

export class BandwidthApproximator {
private readonly loadings: LoadProgress[] = [];
Expand Down
18 changes: 11 additions & 7 deletions packages/p2p-media-loader-core/src/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ import {
SegmentBase,
CoreEventHandlers,
} from "./types";
import * as Utils from "./utils/utils";
import * as StreamUtils from "./utils/stream";
import { LinkedMap } from "./linked-map";
import { BandwidthApproximator } from "./bandwidth-approximator";
import { EngineCallbacks } from "./request-container";
import { EngineCallbacks } from "./request";
import { SegmentsMemoryStorage } from "./segments-storage";

export class Core<TStream extends Stream = Stream> {
Expand All @@ -25,9 +25,10 @@ export class Core<TStream extends Stream = Stream> {
cachedSegmentExpiration: 120 * 1000,
cachedSegmentsCount: 50,
webRtcMaxMessageSize: 64 * 1024 - 1,
p2pSegmentDownloadTimeout: 5000,
p2pLoaderDestroyTimeout: 30 * 1000,
httpRequestTimeout: 5000,
p2pSegmentFirstBytesTimeoutMs: 1000,
p2pSegmentDownloadTimeoutMs: 5000,
p2pLoaderDestroyTimeoutMs: 30 * 1000,
httpDownloadTimeoutMs: 5000,
};
private readonly bandwidthApproximator = new BandwidthApproximator();
private segmentStorage?: SegmentsMemoryStorage;
Expand All @@ -41,7 +42,7 @@ export class Core<TStream extends Stream = Stream> {
}

hasSegment(segmentLocalId: string): boolean {
const segment = Utils.getSegmentFromStreamsMap(
const segment = StreamUtils.getSegmentFromStreamsMap(
this.streams,
segmentLocalId
);
Expand Down Expand Up @@ -117,7 +118,10 @@ export class Core<TStream extends Stream = Stream> {
throw new Error("Manifest response url is undefined");
}

const segment = Utils.getSegmentFromStreamsMap(this.streams, segmentId);
const segment = StreamUtils.getSegmentFromStreamsMap(
this.streams,
segmentId
);
if (!segment) {
throw new Error(`Not found segment with id: ${segmentId}`);
}
Expand Down
10 changes: 5 additions & 5 deletions packages/p2p-media-loader-core/src/hybrid-loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { Settings, CoreEventHandlers } from "./types";
import { BandwidthApproximator } from "./bandwidth-approximator";
import { Playback, QueueItem } from "./internal-types";
import { RequestsContainer } from "./request-container";
import { Request, EngineCallbacks, RequestError } from "./request";
import { Request, EngineCallbacks } from "./request";
import * as QueueUtils from "./utils/queue";
import * as LoggerUtils from "./utils/logger";
import * as StreamUtils from "./utils/stream";
Expand Down Expand Up @@ -209,7 +209,7 @@ export class HybridLoader {
const { segment } = item;

const request = this.requests.getOrCreateRequest(segment);
request.subscribe("onSuccess", this.onRequestSucceed);
request.subscribe("onSuccess", this.onRequestSuccess);
request.subscribe("onError", this.onRequestError);

void fulfillHttpSegmentRequest(request, this.settings);
Expand All @@ -225,18 +225,18 @@ export class HybridLoader {
const request = p2pLoader.downloadSegment(item);
if (request === undefined) return;

request.subscribe("onSuccess", this.onRequestSucceed);
request.subscribe("onSuccess", this.onRequestSuccess);
request.subscribe("onError", this.onRequestError);
}

private onRequestSucceed = (request: Request, data: ArrayBuffer) => {
private onRequestSuccess = (request: Request, data: ArrayBuffer) => {
const { segment } = request;
this.logger.loader(`http responses: ${segment.externalId}`);
this.eventHandlers?.onSegmentLoaded?.(data.byteLength, "http");
this.createProcessQueueMicrotask();
};

private onRequestError = (request: Request, error: RequestError) => {
private onRequestError = () => {
this.createProcessQueueMicrotask();
};

Expand Down
2 changes: 2 additions & 0 deletions packages/p2p-media-loader-core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,5 @@

export { Core } from "./core";
export type * from "./types";
export { CoreRequestError } from "./request";
export type { EngineCallbacks } from "./request";
2 changes: 1 addition & 1 deletion packages/p2p-media-loader-core/src/p2p/loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ export class P2PLoader {
statuses
)}`
);
request.subscribe("onCompleted", () => {
request.subscribe("onSuccess", () => {
this.logger(`p2p loaded: ${segment.externalId}`);
});

Expand Down
3 changes: 2 additions & 1 deletion packages/p2p-media-loader-core/src/p2p/loaders-container.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,10 @@ export class P2PLoadersContainer {
}

private setLoaderDestroyTimeout(item: P2PLoaderContainerItem) {
// TODO: use Timeout class instead
item.destroyTimeoutId = window.setTimeout(
() => this.destroyAndRemoveLoader(item),
this.settings.p2pLoaderDestroyTimeout
this.settings.p2pLoaderDestroyTimeoutMs
);
}

Expand Down
5 changes: 2 additions & 3 deletions packages/p2p-media-loader-core/src/p2p/peer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,14 @@ import {
PeerSendSegmentCommand,
} from "../internal-types";
import { PeerCommandType, PeerSegmentStatus } from "../enums";
import * as PeerUtil from "../utils/peer";
import {
Request,
RequestControls,
RequestError,
PeerRequestErrorType,
RequestInnerErrorType,
} from "../request";
import { Segment, Settings } from "../types";
import * as PeerUtil from "../utils/peer";
import * as Utils from "../utils/utils";
import debug from "debug";

Expand Down Expand Up @@ -218,7 +217,7 @@ export class Peer {
}
}

private abortRequest = (reason: RequestInnerErrorType) => {
private abortRequest = () => {
if (!this.requestContext) return;
const { request } = this.requestContext;
this.sendCancelSegmentRequestCommand(request.segment);
Expand Down
12 changes: 3 additions & 9 deletions packages/p2p-media-loader-core/src/request-container.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,11 @@
import { Segment, StreamType } from "./types";
import Debug from "debug";
import { EventDispatcher } from "./event-dispatcher";
import { BandwidthApproximator } from "./bandwidth-approximator";
import { Request, RequestEvents } from "./request";

type RequestsContainerEvents = {
httpRequestsUpdated: () => void;
};

export class RequestsContainer {
private readonly requests = new Map<string, Request>();
private readonly logger: Debug.Debugger;
private readonly events = new EventDispatcher<RequestsContainerEvents>();

constructor(
streamType: StreamType,
Expand Down Expand Up @@ -47,13 +41,13 @@ export class RequestsContainer {
let request = this.requests.get(id);
if (!request) {
request = new Request(segment, this.bandwidthApproximator);
request.subscribe("onCompleted", this.onRequestCompleted);
request.subscribe("onSuccess", this.onRequestCompleted);
this.requests.set(request.id, request);
}
return request;
}

private onRequestCompleted: RequestEvents["onCompleted"] = (request) => {
private onRequestCompleted: RequestEvents["onSuccess"] = (request) => {
this.requests.delete(request.id);
};

Expand Down Expand Up @@ -97,7 +91,7 @@ export class RequestsContainer {
destroy() {
for (const request of this.requests.values()) {
request.abort();
request.engineCallbacks?.onError("failed");
request.abortEngineRequest();
}
this.requests.clear();
}
Expand Down
25 changes: 15 additions & 10 deletions packages/p2p-media-loader-core/src/request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@ import * as Utils from "./utils/utils";

export type EngineCallbacks = {
onSuccess: (response: SegmentResponse) => void;
// TODO: Error for engines
onError: (reason: "failed" | "abort") => void;
onError: (reason: CoreRequestError) => void;
};

export type LoadProgress = {
Expand Down Expand Up @@ -144,6 +143,7 @@ export class Request extends EventDispatcher<RequestEvents> {
loadedBytes: 0,
startTimestamp: performance.now(),
};
this.bandwidthApproximator.addLoading(this.progress);
const { firstBytesTimeoutMs, fullLoadingTimeoutMs, abort } = controls;
this._abortRequestCallback = abort;
if (firstBytesTimeoutMs !== undefined) {
Expand Down Expand Up @@ -171,7 +171,7 @@ export class Request extends EventDispatcher<RequestEvents> {
}

abortEngineRequest() {
this._engineCallbacks?.onError("abort");
this._engineCallbacks?.onError(new CoreRequestError("aborted"));
this._engineCallbacks = undefined;
}

Expand Down Expand Up @@ -268,14 +268,13 @@ export type RequestInnerErrorType = (typeof requestInnerErrorTypes)[number];
export type HttpRequestErrorType = (typeof httpRequestErrorTypes)[number];
export type PeerRequestErrorType = (typeof peerRequestErrorTypes)[number];

type RequestErrorType =
| RequestInnerErrorType
| PeerRequestErrorType
| HttpRequestErrorType;

export class RequestError<
T extends
| RequestInnerErrorType
| PeerRequestErrorType
| HttpRequestErrorType =
| RequestInnerErrorType
| PeerRequestErrorType
| HttpRequestErrorType
T extends RequestErrorType = RequestErrorType
> extends Error {
constructor(readonly type: T, message?: string) {
super(message);
Expand All @@ -300,6 +299,12 @@ export class RequestError<
}
}

export class CoreRequestError extends Error {
constructor(readonly type: "failed" | "aborted") {
super();
}
}

class Timeout {
private timeoutId?: number;

Expand Down
9 changes: 2 additions & 7 deletions packages/p2p-media-loader-core/src/types.d.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import { LinkedMap } from "./linked-map";
import { HybridLoaderRequest } from "./request-container";

export type { EngineCallbacks } from "./request-container";
import { RequestAttempt } from "./request";

export type StreamType = "main" | "secondary";

Expand Down Expand Up @@ -65,8 +63,5 @@ export type Settings = {
};

export type CoreEventHandlers = {
onSegmentLoaded?: (
byteLength: number,
type: HybridLoaderRequest["type"]
) => void;
onSegmentLoaded?: (byteLength: number, type: RequestAttempt["type"]) => void;
};
26 changes: 14 additions & 12 deletions packages/p2p-media-loader-hlsjs/src/fragment-loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,7 @@ import type {
LoaderStats,
} from "hls.js";
import * as Utils from "./utils";
import {
RequestAbortError,
Core,
FetchError,
SegmentResponse,
} from "p2p-media-loader-core";
import { Core, SegmentResponse, CoreRequestError } from "p2p-media-loader-core";

const DEFAULT_DOWNLOAD_LATENCY = 10;

Expand Down Expand Up @@ -89,7 +84,13 @@ export class FragmentLoaderBase implements Loader<FragmentLoaderContext> {
};

const onError = (error: unknown) => {
if (error instanceof RequestAbortError && this.stats.aborted) return;
if (
error instanceof CoreRequestError &&
error.type === "aborted" &&
this.stats.aborted
) {
return;
}
this.handleError(error);
};

Expand All @@ -98,15 +99,16 @@ export class FragmentLoaderBase implements Loader<FragmentLoaderContext> {

private handleError(thrownError: unknown) {
const error = { code: 0, text: "" };
let details: object | null = null;
if (thrownError instanceof FetchError) {
error.code = thrownError.code;
if (
thrownError instanceof CoreRequestError &&
thrownError.type === "failed"
) {
// error.code = thrownError.code;
error.text = thrownError.message;
details = thrownError.details;
} else if (thrownError instanceof Error) {
error.text = thrownError.message;
}
this.callbacks?.onError(error, this.context, details, this.stats);
this.callbacks?.onError(error, this.context, null, this.stats);
}

private abortInternal() {
Expand Down
43 changes: 33 additions & 10 deletions packages/p2p-media-loader-shaka/src/loading-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@ import * as Utils from "./stream-utils";
import { SegmentManager } from "./segment-manager";
import { StreamInfo } from "./types";
import { Shaka, Stream } from "./types";
import { Core, EngineCallbacks, SegmentResponse } from "p2p-media-loader-core";
import {
Core,
CoreRequestError,
SegmentResponse,
EngineCallbacks,
} from "p2p-media-loader-core";

interface LoadingHandlerInterface {
handleLoading: shaka.extern.SchemePlugin;
Expand Down Expand Up @@ -70,15 +75,33 @@ export class LoadingHandler implements LoadingHandlerInterface {

const loadSegment = async (): Promise<Response> => {
const { request, callbacks } = getSegmentRequest();
await this.core.loadSegment(segmentId, callbacks);
const { data, bandwidth } = await request;
return {
data,
headers: {},
uri: segmentUrl,
originalUri: segmentUrl,
timeMs: getLoadingDurationBasedOnBandwidth(bandwidth, data.byteLength),
};
void this.core.loadSegment(segmentId, callbacks);
try {
const { data, bandwidth } = await request;
return {
data,
headers: {},
uri: segmentUrl,
originalUri: segmentUrl,
timeMs: getLoadingDurationBasedOnBandwidth(
bandwidth,
data.byteLength
),
};
} catch (error) {
// TODO: throw Shaka Errors
if (error instanceof CoreRequestError) {
const { Error: ShakaError } = this.shaka.util;
if (error.type === "aborted") {
throw new ShakaError(
ShakaError.Severity.RECOVERABLE,
ShakaError.Category.NETWORK,
this.shaka.util.Error.Code.OPERATION_ABORTED
);
}
}
throw error;
}
};

return new this.shaka.util.AbortableOperation(loadSegment(), async () =>
Expand Down

0 comments on commit 3b4dbdd

Please sign in to comment.