From e1950c6c53e2d855fc0ffba457607f3470b530db Mon Sep 17 00:00:00 2001 From: Levko Kravets Date: Wed, 31 Jul 2024 16:46:29 +0300 Subject: [PATCH] [PECO-983] Support streaming query results via Node.js streams Signed-off-by: Levko Kravets --- lib/DBSQLOperation.ts | 19 +++++++++++++++++++ lib/contracts/IOperation.ts | 9 +++++++++ 2 files changed, 28 insertions(+) diff --git a/lib/DBSQLOperation.ts b/lib/DBSQLOperation.ts index e7ab4bb..5a753fd 100644 --- a/lib/DBSQLOperation.ts +++ b/lib/DBSQLOperation.ts @@ -1,4 +1,5 @@ import { stringify, NIL } from 'uuid'; +import { Readable } from 'node:stream'; import IOperation, { FetchOptions, FinishedOptions, @@ -7,6 +8,7 @@ import IOperation, { IteratorOptions, IOperationChunksIterator, IOperationRowsIterator, + NodeStreamOptions, } from './contracts/IOperation'; import { TGetOperationStatusResp, @@ -101,6 +103,23 @@ export default class DBSQLOperation implements IOperation { return new OperationRowsIterator(this, options); } + toNodeStream(options?: NodeStreamOptions): Readable { + let iterable: IOperationChunksIterator | IOperationRowsIterator | undefined; + + switch (options?.mode) { + case 'chunks': + iterable = this.iterateChunks(options?.iteratorOptions); + break; + case 'rows': + iterable = this.iterateRows(options?.iteratorOptions); + break; + default: + throw new Error(`IOperation.toNodeStream: unsupported mode ${options?.mode}`); + } + + return Readable.from(iterable, options?.streamOptions); + } + public get id() { const operationId = this.operationHandle?.operationId?.guid; return operationId ? stringify(operationId) : NIL; diff --git a/lib/contracts/IOperation.ts b/lib/contracts/IOperation.ts index 35382a5..1d0bb9a 100644 --- a/lib/contracts/IOperation.ts +++ b/lib/contracts/IOperation.ts @@ -1,3 +1,4 @@ +import { Readable, ReadableOptions } from 'node:stream'; import { TGetOperationStatusResp, TTableSchema } from '../../thrift/TCLIService_types'; import Status from '../dto/Status'; @@ -35,6 +36,12 @@ export interface IOperationRowsIterator extends AsyncIterableIterator { readonly operation: IOperation; } +export interface NodeStreamOptions { + mode?: 'chunks' | 'rows'; // defaults to 'chunks' + iteratorOptions?: IteratorOptions; + streamOptions?: ReadableOptions; +} + export default interface IOperation { /** * Operation identifier @@ -86,4 +93,6 @@ export default interface IOperation { iterateChunks(options?: IteratorOptions): IOperationChunksIterator; iterateRows(options?: IteratorOptions): IOperationRowsIterator; + + toNodeStream(options?: NodeStreamOptions): Readable; }