Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PECO-1541] Optimize UC Volume ingestion #247

Merged
merged 3 commits into from
Apr 19, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 22 additions & 4 deletions lib/DBSQLSession.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import * as fs from 'fs';
import * as path from 'path';
import stream from 'node:stream';
import util from 'node:util';
import { stringify, NIL, parse } from 'uuid';
import fetch, { HeadersInit } from 'node-fetch';
import {
Expand Down Expand Up @@ -36,6 +38,9 @@ import { DBSQLParameter, DBSQLParameterValue } from './DBSQLParameter';
import ParameterError from './errors/ParameterError';
import IClientContext, { ClientConfig } from './contracts/IClientContext';

// Explicitly promisify a callback-style `pipeline` because `node:stream/promises` is not available in Node 14
const pipeline = util.promisify(stream.pipeline);

const defaultMaxRows = 100000;

interface OperationResponseShape {
Expand Down Expand Up @@ -271,8 +276,10 @@ export default class DBSQLSession implements IDBSQLSession {
if (!response.ok) {
throw new StagingError(`HTTP error ${response.status} ${response.statusText}`);
}
const buffer = await response.arrayBuffer();
fs.writeFileSync(localFile, Buffer.from(buffer));

const fileStream = fs.createWriteStream(localFile);
// `pipeline` will do all the dirty job for us, including error handling and closing all the streams properly
return pipeline(response.body, fileStream);
}

private async handleStagingRemove(presignedUrl: string, headers: HeadersInit): Promise<void> {
Expand Down Expand Up @@ -301,8 +308,19 @@ export default class DBSQLSession implements IDBSQLSession {
const connectionProvider = await this.context.getConnectionProvider();
const agent = await connectionProvider.getAgent();

const data = fs.readFileSync(localFile);
const response = await fetch(presignedUrl, { method: 'PUT', headers, agent, body: data });
const fileStream = fs.createReadStream(localFile);
const fileInfo = fs.statSync(localFile, { bigint: true });

const response = await fetch(presignedUrl, {
method: 'PUT',
headers: {
...headers,
// This header is required by server
'Content-Length': fileInfo.size.toString(),
},
agent,
body: fileStream,
});
if (!response.ok) {
throw new StagingError(`HTTP error ${response.status} ${response.statusText}`);
}
Expand Down
Loading