Skip to content

Commit

Permalink
NSFS | versioning | close chunkfs read stream to prevent stream being…
Browse files Browse the repository at this point in the history
… closed after stat

Signed-off-by: nadav mizrahi <[email protected]>
  • Loading branch information
nadavMiz committed Nov 13, 2024
1 parent e1bf29e commit cf76add
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 3 deletions.
7 changes: 4 additions & 3 deletions src/sdk/namespace_fs.js
Original file line number Diff line number Diff line change
Expand Up @@ -1140,9 +1140,7 @@ class NamespaceFS {
// end the stream
res.end();

// in case of transform streams such as ChunkFS there is also a readable part. since we expect write stream
// and don't care about the readable part, set readable: false
await stream_utils.wait_finished(res, { readable: false, signal: object_sdk.abort_controller.signal });
await stream_utils.wait_finished(res, { signal: object_sdk.abort_controller.signal });
object_sdk.throw_if_aborted();

dbg.log0('NamespaceFS: read_object_stream completed file', file_path, {
Expand Down Expand Up @@ -1582,8 +1580,11 @@ class NamespaceFS {
});
chunk_fs.on('error', err1 => dbg.error('namespace_fs._upload_stream: error occured on stream ChunkFS: ', err1));
if (copy_source) {
// ChunkFS is a Transform stream, however read_object_stream expects a write stream. call resume to close the read part
chunk_fs.resume();
await this.read_object_stream(copy_source, object_sdk, chunk_fs);
} else if (params.source_params) {
chunk_fs.resume();
await params.source_ns.read_object_stream(params.source_params, object_sdk, chunk_fs);
} else {
await stream_utils.pipeline([source_stream, chunk_fs]);
Expand Down
10 changes: 10 additions & 0 deletions src/test/unit_tests/test_bucketspace_versioning.js
Original file line number Diff line number Diff line change
Expand Up @@ -769,6 +769,16 @@ mocha.describe('bucketspace namespace_fs - versioning', function() {
const exist2 = await fs_utils.file_exists(version_path_nested);
assert.ok(exist2);
});

mocha.it('copy object version id - version matches mtime and inode', async function() {
const key = 'copied_key5.txt';
const res = await s3_uid6.copyObject({ Bucket: bucket_name, Key: key,
CopySource: `${bucket_name}/${key1}?versionId=${key1_ver1}`});
const obj_path = path.join(full_path, key);
const stat = await nb_native().fs.stat(DEFAULT_FS_CONFIG, obj_path);
const expcted_version = 'mtime-' + stat.mtimeNsBigint.toString(36) + '-ino-' + stat.ino.toString(36);
assert.equal(expcted_version, res.VersionId);
});
});

mocha.describe('copy object (latest version) - versioning suspended - nsfs copy fallback flow', function() {
Expand Down

0 comments on commit cf76add

Please sign in to comment.