Skip to content

Commit

Permalink
[PECO-983] Support streaming query results via Node.js streams
Browse files Browse the repository at this point in the history
Signed-off-by: Levko Kravets <[email protected]>
  • Loading branch information
kravets-levko committed Jul 31, 2024
1 parent 3c29fe2 commit e1950c6
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 0 deletions.
19 changes: 19 additions & 0 deletions lib/DBSQLOperation.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { stringify, NIL } from 'uuid';
import { Readable } from 'node:stream';
import IOperation, {
FetchOptions,
FinishedOptions,
Expand All @@ -7,6 +8,7 @@ import IOperation, {
IteratorOptions,
IOperationChunksIterator,
IOperationRowsIterator,
NodeStreamOptions,
} from './contracts/IOperation';
import {
TGetOperationStatusResp,
Expand Down Expand Up @@ -101,6 +103,23 @@ export default class DBSQLOperation implements IOperation {
return new OperationRowsIterator(this, options);
}

toNodeStream(options?: NodeStreamOptions): Readable {
let iterable: IOperationChunksIterator | IOperationRowsIterator | undefined;

switch (options?.mode) {
case 'chunks':
iterable = this.iterateChunks(options?.iteratorOptions);
break;
case 'rows':
iterable = this.iterateRows(options?.iteratorOptions);
break;
default:
throw new Error(`IOperation.toNodeStream: unsupported mode ${options?.mode}`);
}

return Readable.from(iterable, options?.streamOptions);
}

public get id() {
const operationId = this.operationHandle?.operationId?.guid;
return operationId ? stringify(operationId) : NIL;
Expand Down
9 changes: 9 additions & 0 deletions lib/contracts/IOperation.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { Readable, ReadableOptions } from 'node:stream';
import { TGetOperationStatusResp, TTableSchema } from '../../thrift/TCLIService_types';
import Status from '../dto/Status';

Expand Down Expand Up @@ -35,6 +36,12 @@ export interface IOperationRowsIterator extends AsyncIterableIterator<object> {
readonly operation: IOperation;
}

export interface NodeStreamOptions {
mode?: 'chunks' | 'rows'; // defaults to 'chunks'
iteratorOptions?: IteratorOptions;
streamOptions?: ReadableOptions;
}

export default interface IOperation {
/**
* Operation identifier
Expand Down Expand Up @@ -86,4 +93,6 @@ export default interface IOperation {
iterateChunks(options?: IteratorOptions): IOperationChunksIterator;

iterateRows(options?: IteratorOptions): IOperationRowsIterator;

toNodeStream(options?: NodeStreamOptions): Readable;
}

0 comments on commit e1950c6

Please sign in to comment.