diff --git a/lib/DBSQLSession.ts b/lib/DBSQLSession.ts index 9863bc0c..a9bbbb2e 100644 --- a/lib/DBSQLSession.ts +++ b/lib/DBSQLSession.ts @@ -1,5 +1,7 @@ import * as fs from 'fs'; import * as path from 'path'; +import stream from 'node:stream'; +import util from 'node:util'; import { stringify, NIL, parse } from 'uuid'; import fetch, { HeadersInit } from 'node-fetch'; import { @@ -36,6 +38,9 @@ import { DBSQLParameter, DBSQLParameterValue } from './DBSQLParameter'; import ParameterError from './errors/ParameterError'; import IClientContext, { ClientConfig } from './contracts/IClientContext'; +// Explicitly promisify a callback-style `pipeline` because `node:stream/promises` is not available in Node 14 +const pipeline = util.promisify(stream.pipeline); + const defaultMaxRows = 100000; interface OperationResponseShape { @@ -271,8 +276,10 @@ export default class DBSQLSession implements IDBSQLSession { if (!response.ok) { throw new StagingError(`HTTP error ${response.status} ${response.statusText}`); } - const buffer = await response.arrayBuffer(); - fs.writeFileSync(localFile, Buffer.from(buffer)); + + const fileStream = fs.createWriteStream(localFile); + // `pipeline` will do all the dirty job for us, including error handling and closing all the streams properly + return pipeline(response.body, fileStream); } private async handleStagingRemove(presignedUrl: string, headers: HeadersInit): Promise { @@ -301,8 +308,19 @@ export default class DBSQLSession implements IDBSQLSession { const connectionProvider = await this.context.getConnectionProvider(); const agent = await connectionProvider.getAgent(); - const data = fs.readFileSync(localFile); - const response = await fetch(presignedUrl, { method: 'PUT', headers, agent, body: data }); + const fileStream = fs.createReadStream(localFile); + const fileInfo = fs.statSync(localFile, { bigint: true }); + + const response = await fetch(presignedUrl, { + method: 'PUT', + headers: { + ...headers, + // This header is required by server + 'Content-Length': fileInfo.size.toString(), + }, + agent, + body: fileStream, + }); if (!response.ok) { throw new StagingError(`HTTP error ${response.status} ${response.statusText}`); }