Skip to content

Commit

Permalink
Fix
Browse files Browse the repository at this point in the history
  • Loading branch information
EmmaZhu committed Jan 9, 2024
1 parent 3f64420 commit 484420e
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 18 deletions.
29 changes: 11 additions & 18 deletions src/common/persistence/FSExtentStore.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import {
close,
createReadStream,
createWriteStream,
fdatasync,
mkdir,
Expand Down Expand Up @@ -30,6 +29,7 @@ import IExtentStore, {
} from "./IExtentStore";
import IOperationQueue from "./IOperationQueue";
import OperationQueue from "./OperationQueue";
import FileLazyReadStream from "./FileLazyReadStream";

const statAsync = promisify(stat);
const mkdirAsync = promisify(mkdir);
Expand Down Expand Up @@ -333,26 +333,19 @@ export default class FSExtentStore implements IExtentStore {
const op = () =>
new Promise<NodeJS.ReadableStream>((resolve, reject) => {
this.logger.verbose(
`FSExtentStore:readExtent() Creating read stream. LocationId:${persistencyId} extentId:${
extentChunk.id
} path:${path} offset:${extentChunk.offset} count:${
extentChunk.count
`FSExtentStore:readExtent() Creating read stream. LocationId:${persistencyId} extentId:${extentChunk.id
} path:${path} offset:${extentChunk.offset} count:${extentChunk.count
} end:${extentChunk.offset + extentChunk.count - 1}`,
contextId
);
const stream = createReadStream(path, {
start: extentChunk.offset,
end: extentChunk.offset + extentChunk.count - 1
}).on("close", () => {
this.logger.verbose(
`FSExtentStore:readExtent() Read stream closed. LocationId:${persistencyId} extentId:${
extentChunk.id
} path:${path} offset:${extentChunk.offset} count:${
extentChunk.count
} end:${extentChunk.offset + extentChunk.count - 1}`,
contextId
);
});
const stream = new FileLazyReadStream(
path,
extentChunk.offset,
extentChunk.offset + extentChunk.count - 1,
this.logger,
persistencyId,
extentChunk.id,
contextId);
resolve(stream);
});

Expand Down
72 changes: 72 additions & 0 deletions src/common/persistence/FileLazyReadStream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import { ReadStream, createReadStream } from "fs";
import { Readable } from "stream";
import ILogger from "../ILogger";


export default class FileLazyReadStream extends Readable {
private extentStream: ReadStream | undefined;
constructor(
private readonly extentPath: string,
private readonly start: number,
private readonly end: number,
private readonly logger: ILogger,
private readonly persistencyId: string,
private readonly extentId: string,
private readonly contextId?: string) {
super();
}

public _read(): void {
if (this.extentStream === undefined) {
this.extentStream = createReadStream(this.extentPath, {
start: this.start,
end: this.end
}).on("close", () => {
this.logger.verbose(
`FSExtentStore:readExtent() Read stream closed. LocationId:${this.persistencyId} extentId:${this.extentId
} path:${this.extentPath} offset:${this.start} end:${this.end}`,
this.contextId
);
});
this.setSourceEventHandlers();
}
this.extentStream?.resume();
}

private setSourceEventHandlers() {
this.extentStream?.on("data", this.sourceDataHandler);
this.extentStream?.on("end", this.sourceErrorOrEndHandler);
this.extentStream?.on("error", this.sourceErrorOrEndHandler);
}

private removeSourceEventHandlers() {
this.extentStream?.removeListener("data", this.sourceDataHandler);
this.extentStream?.removeListener("end", this.sourceErrorOrEndHandler);
this.extentStream?.removeListener("error", this.sourceErrorOrEndHandler);
}

private sourceDataHandler = (data: Buffer) => {
if (!this.push(data)) {
this.extentStream?.pause();
}
}

private sourceErrorOrEndHandler = (err?: Error) => {
if (err && err.name === "AbortError") {
this.destroy(err);
return;
}

this.removeSourceEventHandlers();
this.push(null);
this.destroy(err);
}

_destroy(error: Error | null, callback: (error?: Error) => void): void {
// remove listener from source and release source
//this.removeSourceEventHandlers();
(this.extentStream as Readable).destroy();

callback(error === null ? undefined : error);
}
}

0 comments on commit 484420e

Please sign in to comment.