Skip to content

Commit

Permalink
Fix issue of 'too many handles' error when downloading a large file
Browse files Browse the repository at this point in the history
  • Loading branch information
EmmaZhu committed Feb 1, 2024
1 parent 3f64420 commit 1e0dee5
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 23 deletions.
29 changes: 11 additions & 18 deletions src/common/persistence/FSExtentStore.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import {
close,
createReadStream,
createWriteStream,
fdatasync,
mkdir,
Expand Down Expand Up @@ -30,6 +29,7 @@ import IExtentStore, {
} from "./IExtentStore";
import IOperationQueue from "./IOperationQueue";
import OperationQueue from "./OperationQueue";
import FileLazyReadStream from "./FileLazyReadStream";

const statAsync = promisify(stat);
const mkdirAsync = promisify(mkdir);
Expand Down Expand Up @@ -333,26 +333,19 @@ export default class FSExtentStore implements IExtentStore {
const op = () =>
new Promise<NodeJS.ReadableStream>((resolve, reject) => {
this.logger.verbose(
`FSExtentStore:readExtent() Creating read stream. LocationId:${persistencyId} extentId:${
extentChunk.id
} path:${path} offset:${extentChunk.offset} count:${
extentChunk.count
`FSExtentStore:readExtent() Creating read stream. LocationId:${persistencyId} extentId:${extentChunk.id
} path:${path} offset:${extentChunk.offset} count:${extentChunk.count
} end:${extentChunk.offset + extentChunk.count - 1}`,
contextId
);
const stream = createReadStream(path, {
start: extentChunk.offset,
end: extentChunk.offset + extentChunk.count - 1
}).on("close", () => {
this.logger.verbose(
`FSExtentStore:readExtent() Read stream closed. LocationId:${persistencyId} extentId:${
extentChunk.id
} path:${path} offset:${extentChunk.offset} count:${
extentChunk.count
} end:${extentChunk.offset + extentChunk.count - 1}`,
contextId
);
});
const stream = new FileLazyReadStream(
path,
extentChunk.offset,
extentChunk.offset + extentChunk.count - 1,
this.logger,
persistencyId,
extentChunk.id,
contextId);
resolve(stream);
});

Expand Down
72 changes: 72 additions & 0 deletions src/common/persistence/FileLazyReadStream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import { ReadStream, createReadStream } from "fs";
import { Readable } from "stream";
import ILogger from "../ILogger";


export default class FileLazyReadStream extends Readable {
private extentStream: ReadStream | undefined;
constructor(
private readonly extentPath: string,
private readonly start: number,
private readonly end: number,
private readonly logger: ILogger,
private readonly persistencyId: string,
private readonly extentId: string,
private readonly contextId?: string) {
super();
}

public _read(): void {
if (this.extentStream === undefined) {
this.extentStream = createReadStream(this.extentPath, {
start: this.start,
end: this.end
}).on("close", () => {
this.logger.verbose(
`FSExtentStore:readExtent() Read stream closed. LocationId:${this.persistencyId} extentId:${this.extentId
} path:${this.extentPath} offset:${this.start} end:${this.end}`,
this.contextId
);
});
this.setSourceEventHandlers();
}
this.extentStream?.resume();
}

private setSourceEventHandlers() {
this.extentStream?.on("data", this.sourceDataHandler);
this.extentStream?.on("end", this.sourceErrorOrEndHandler);
this.extentStream?.on("error", this.sourceErrorOrEndHandler);
}

private removeSourceEventHandlers() {
this.extentStream?.removeListener("data", this.sourceDataHandler);
this.extentStream?.removeListener("end", this.sourceErrorOrEndHandler);
this.extentStream?.removeListener("error", this.sourceErrorOrEndHandler);
}

private sourceDataHandler = (data: Buffer) => {
if (!this.push(data)) {
this.extentStream?.pause();
}
}

private sourceErrorOrEndHandler = (err?: Error) => {
if (err && err.name === "AbortError") {
this.destroy(err);
return;
}

this.removeSourceEventHandlers();
this.push(null);
this.destroy(err);
}

_destroy(error: Error | null, callback: (error?: Error) => void): void {
// remove listener from source and release source
//this.removeSourceEventHandlers();
(this.extentStream as Readable).destroy();

callback(error === null ? undefined : error);
}
}
10 changes: 5 additions & 5 deletions tests/blob/blockblob.highlevel.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import {
} from "../testutils";

// Set true to enable debug log
configLogger(false);
configLogger(true);

// tslint:disable:no-empty
describe("BlockBlobHighlevel", () => {
Expand Down Expand Up @@ -179,7 +179,7 @@ describe("BlockBlobHighlevel", () => {
aborter.abort();
}
});
} catch (err) {}
} catch (err) { }
assert.ok(eventTriggered);
}).timeout(timeoutForLargeFileUploadingTest);

Expand All @@ -198,7 +198,7 @@ describe("BlockBlobHighlevel", () => {
aborter.abort();
}
});
} catch (err) {}
} catch (err) { }
assert.ok(eventTriggered);
});

Expand Down Expand Up @@ -260,7 +260,7 @@ describe("BlockBlobHighlevel", () => {
abortSignal: AbortController.timeout(1)
});
assert.fail();
} catch (err:any) {
} catch (err: any) {
assert.ok((err.message as string).toLowerCase().includes("abort"));
}
}).timeout(timeoutForLargeFileUploadingTest);
Expand Down Expand Up @@ -314,7 +314,7 @@ describe("BlockBlobHighlevel", () => {
aborter.abort();
}
});
} catch (err) {}
} catch (err) { }
assert.ok(eventTriggered);
}).timeout(timeoutForLargeFileUploadingTest);

Expand Down

0 comments on commit 1e0dee5

Please sign in to comment.