Skip to content

Commit

Permalink
[PECO-1541] Optimize UC Volume ingestion (#247)
Browse files Browse the repository at this point in the history
* [PECO-1541] Optimize UC Volume ingestion

Signed-off-by: Levko Kravets <[email protected]>

* Refine the code

Signed-off-by: Levko Kravets <[email protected]>

---------

Signed-off-by: Levko Kravets <[email protected]>
  • Loading branch information
kravets-levko authored Apr 19, 2024
1 parent c00f72e commit c52f2ba
Showing 1 changed file with 22 additions and 4 deletions.
26 changes: 22 additions & 4 deletions lib/DBSQLSession.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import * as fs from 'fs';
import * as path from 'path';
import stream from 'node:stream';
import util from 'node:util';
import { stringify, NIL, parse } from 'uuid';
import fetch, { HeadersInit } from 'node-fetch';
import {
Expand Down Expand Up @@ -36,6 +38,9 @@ import { DBSQLParameter, DBSQLParameterValue } from './DBSQLParameter';
import ParameterError from './errors/ParameterError';
import IClientContext, { ClientConfig } from './contracts/IClientContext';

// Explicitly promisify a callback-style `pipeline` because `node:stream/promises` is not available in Node 14
const pipeline = util.promisify(stream.pipeline);

const defaultMaxRows = 100000;

interface OperationResponseShape {
Expand Down Expand Up @@ -271,8 +276,10 @@ export default class DBSQLSession implements IDBSQLSession {
if (!response.ok) {
throw new StagingError(`HTTP error ${response.status} ${response.statusText}`);
}
const buffer = await response.arrayBuffer();
fs.writeFileSync(localFile, Buffer.from(buffer));

const fileStream = fs.createWriteStream(localFile);
// `pipeline` will do all the dirty job for us, including error handling and closing all the streams properly
return pipeline(response.body, fileStream);
}

private async handleStagingRemove(presignedUrl: string, headers: HeadersInit): Promise<void> {
Expand Down Expand Up @@ -301,8 +308,19 @@ export default class DBSQLSession implements IDBSQLSession {
const connectionProvider = await this.context.getConnectionProvider();
const agent = await connectionProvider.getAgent();

const data = fs.readFileSync(localFile);
const response = await fetch(presignedUrl, { method: 'PUT', headers, agent, body: data });
const fileStream = fs.createReadStream(localFile);
const fileInfo = fs.statSync(localFile, { bigint: true });

const response = await fetch(presignedUrl, {
method: 'PUT',
headers: {
...headers,
// This header is required by server
'Content-Length': fileInfo.size.toString(),
},
agent,
body: fileStream,
});
if (!response.ok) {
throw new StagingError(`HTTP error ${response.status} ${response.statusText}`);
}
Expand Down

0 comments on commit c52f2ba

Please sign in to comment.