From fb817b5b2283908532c9bcd3ef3d9dedeb41935f Mon Sep 17 00:00:00 2001 From: Levko Kravets Date: Thu, 9 May 2024 13:15:08 +0300 Subject: [PATCH] Iterable interface for IOperation (#252) * Iterable interface for IOperation Signed-off-by: Levko Kravets * Chore: split `utils` unit tests into few files Signed-off-by: Levko Kravets * Add tests Signed-off-by: Levko Kravets * Add visibility modifiers Signed-off-by: Levko Kravets * Fixes after merge Signed-off-by: Levko Kravets * Fix import Signed-off-by: Levko Kravets --------- Signed-off-by: Levko Kravets --- lib/DBSQLOperation.ts | 12 ++ lib/contracts/IOperation.ts | 16 ++ lib/utils/OperationIterator.ts | 85 ++++++++ tests/e2e/iterators.test.js | 87 ++++++++ .../CloseableCollection.test.js} | 94 +-------- tests/unit/utils/OperationIterator.test.js | 185 ++++++++++++++++++ tests/unit/utils/utils.test.js | 98 ++++++++++ 7 files changed, 484 insertions(+), 93 deletions(-) create mode 100644 lib/utils/OperationIterator.ts create mode 100644 tests/e2e/iterators.test.js rename tests/unit/{utils.test.js => utils/CloseableCollection.test.js} (58%) create mode 100644 tests/unit/utils/OperationIterator.test.js create mode 100644 tests/unit/utils/utils.test.js diff --git a/lib/DBSQLOperation.ts b/lib/DBSQLOperation.ts index 91b3bdf..84bde91 100644 --- a/lib/DBSQLOperation.ts +++ b/lib/DBSQLOperation.ts @@ -4,6 +4,9 @@ import IOperation, { FinishedOptions, GetSchemaOptions, WaitUntilReadyOptions, + IteratorOptions, + IOperationChunksIterator, + IOperationRowsIterator, } from './contracts/IOperation'; import { TGetOperationStatusResp, @@ -26,6 +29,7 @@ import CloudFetchResultHandler from './result/CloudFetchResultHandler'; import ArrowResultConverter from './result/ArrowResultConverter'; import ResultSlicer from './result/ResultSlicer'; import { definedOrError } from './utils'; +import { OperationChunksIterator, OperationRowsIterator } from './utils/OperationIterator'; import HiveDriverError from './errors/HiveDriverError'; import IClientContext from './contracts/IClientContext'; @@ -89,6 +93,14 @@ export default class DBSQLOperation implements IOperation { this.context.getLogger().log(LogLevel.debug, `Operation created with id: ${this.id}`); } + public iterateChunks(options?: IteratorOptions): IOperationChunksIterator { + return new OperationChunksIterator(this, options); + } + + public iterateRows(options?: IteratorOptions): IOperationRowsIterator { + return new OperationRowsIterator(this, options); + } + public get id() { const operationId = this.operationHandle?.operationId?.guid; return operationId ? stringify(operationId) : NIL; diff --git a/lib/contracts/IOperation.ts b/lib/contracts/IOperation.ts index 4677b06..35382a5 100644 --- a/lib/contracts/IOperation.ts +++ b/lib/contracts/IOperation.ts @@ -23,6 +23,18 @@ export interface GetSchemaOptions extends WaitUntilReadyOptions { // no other options } +export interface IteratorOptions extends FetchOptions { + autoClose?: boolean; // defaults to `false` +} + +export interface IOperationChunksIterator extends AsyncIterableIterator> { + readonly operation: IOperation; +} + +export interface IOperationRowsIterator extends AsyncIterableIterator { + readonly operation: IOperation; +} + export default interface IOperation { /** * Operation identifier @@ -70,4 +82,8 @@ export default interface IOperation { * Fetch schema */ getSchema(options?: GetSchemaOptions): Promise; + + iterateChunks(options?: IteratorOptions): IOperationChunksIterator; + + iterateRows(options?: IteratorOptions): IOperationRowsIterator; } diff --git a/lib/utils/OperationIterator.ts b/lib/utils/OperationIterator.ts new file mode 100644 index 0000000..ab2cc86 --- /dev/null +++ b/lib/utils/OperationIterator.ts @@ -0,0 +1,85 @@ +import IOperation, { IOperationChunksIterator, IOperationRowsIterator, IteratorOptions } from '../contracts/IOperation'; + +abstract class OperationIterator implements AsyncIterableIterator { + public readonly operation: IOperation; + + protected readonly options?: IteratorOptions; + + constructor(operation: IOperation, options?: IteratorOptions) { + this.operation = operation; + this.options = options; + } + + protected abstract getNext(): Promise>; + + public [Symbol.asyncIterator]() { + return this; + } + + public async next() { + const result = await this.getNext(); + + if (result.done && this.options?.autoClose) { + await this.operation.close(); + } + + return result; + } + + // This method is intended for a cleanup when the caller does not intend to make any more + // reads from iterator (e.g. when using `break` in a `for ... of` loop) + public async return(value?: any) { + if (this.options?.autoClose) { + await this.operation.close(); + } + + return { done: true, value }; + } +} + +export class OperationChunksIterator extends OperationIterator> implements IOperationChunksIterator { + protected async getNext(): Promise>> { + const hasMoreRows = await this.operation.hasMoreRows(); + if (hasMoreRows) { + const value = await this.operation.fetchChunk(this.options); + return { done: false, value }; + } + + return { done: true, value: undefined }; + } +} + +export class OperationRowsIterator extends OperationIterator implements IOperationRowsIterator { + private chunk: Array = []; + + private index: number = 0; + + constructor(operation: IOperation, options?: IteratorOptions) { + super(operation, { + ...options, + // Tell slicer to return raw chunks. We're going to process rows one by one anyway, + // so no need to additionally buffer and slice chunks returned by server + disableBuffering: true, + }); + } + + protected async getNext(): Promise> { + if (this.index < this.chunk.length) { + const value = this.chunk[this.index]; + this.index += 1; + return { done: false, value }; + } + + const hasMoreRows = await this.operation.hasMoreRows(); + if (hasMoreRows) { + this.chunk = await this.operation.fetchChunk(this.options); + this.index = 0; + // Note: this call is not really a recursion. Since this method is + // async - the call will be actually scheduled for processing on + // the next event loop cycle + return this.getNext(); + } + + return { done: true, value: undefined }; + } +} diff --git a/tests/e2e/iterators.test.js b/tests/e2e/iterators.test.js new file mode 100644 index 0000000..ee69af2 --- /dev/null +++ b/tests/e2e/iterators.test.js @@ -0,0 +1,87 @@ +const { expect } = require('chai'); +const sinon = require('sinon'); +const config = require('./utils/config'); +const { DBSQLClient } = require('../../lib'); + +async function openSession(customConfig) { + const client = new DBSQLClient(); + + const clientConfig = client.getConfig(); + sinon.stub(client, 'getConfig').returns({ + ...clientConfig, + ...customConfig, + }); + + const connection = await client.connect({ + host: config.host, + path: config.path, + token: config.token, + }); + + return connection.openSession({ + initialCatalog: config.database[0], + initialSchema: config.database[1], + }); +} + +function arrayChunks(arr, chunkSize) { + const result = []; + + while (arr.length > 0) { + const chunk = arr.splice(0, chunkSize); + result.push(chunk); + } + + return result; +} + +describe('Iterators', () => { + it('should iterate over all chunks', async () => { + const session = await openSession({ arrowEnabled: false }); + sinon.spy(session.context.driver, 'fetchResults'); + try { + const expectedRowsCount = 10; + + // set `maxRows` to null to disable direct results so all the data are fetched through `driver.fetchResults` + const operation = await session.executeStatement(`SELECT * FROM range(0, ${expectedRowsCount})`, { + maxRows: null, + }); + + const expectedRows = Array.from({ length: expectedRowsCount }, (_, id) => ({ id })); + const chunkSize = 4; + const expectedChunks = arrayChunks(expectedRows, chunkSize); + + let index = 0; + for await (const chunk of operation.iterateChunks({ maxRows: chunkSize })) { + expect(chunk).to.deep.equal(expectedChunks[index]); + index += 1; + } + + expect(index).to.equal(expectedChunks.length); + } finally { + await session.close(); + } + }); + + it('should iterate over all rows', async () => { + const session = await openSession({ arrowEnabled: false }); + sinon.spy(session.context.driver, 'fetchResults'); + try { + const expectedRowsCount = 10; + + const operation = await session.executeStatement(`SELECT * FROM range(0, ${expectedRowsCount})`); + + const expectedRows = Array.from({ length: expectedRowsCount }, (_, id) => ({ id })); + + let index = 0; + for await (const row of operation.iterateRows()) { + expect(row).to.deep.equal(expectedRows[index]); + index += 1; + } + + expect(index).to.equal(expectedRows.length); + } finally { + await session.close(); + } + }); +}); diff --git a/tests/unit/utils.test.js b/tests/unit/utils/CloseableCollection.test.js similarity index 58% rename from tests/unit/utils.test.js rename to tests/unit/utils/CloseableCollection.test.js index b06ba17..3a167cf 100644 --- a/tests/unit/utils.test.js +++ b/tests/unit/utils/CloseableCollection.test.js @@ -1,97 +1,5 @@ const { expect, AssertionError } = require('chai'); - -const { buildUserAgentString, definedOrError, formatProgress, ProgressUpdateTransformer } = require('../../lib/utils'); -const CloseableCollection = require('../../lib/utils/CloseableCollection').default; - -describe('buildUserAgentString', () => { - // It should follow https://www.rfc-editor.org/rfc/rfc7231#section-5.5.3 and - // https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/User-Agent - // - // UserAgent ::= '/' '(' ')' - // ProductName ::= 'NodejsDatabricksSqlConnector' - // ::= [ ';' ] 'Node.js' ';' - // - // Examples: - // - with provided: NodejsDatabricksSqlConnector/0.1.8-beta.1 (Client ID; Node.js 16.13.1; Darwin 21.5.0) - // - without provided: NodejsDatabricksSqlConnector/0.1.8-beta.1 (Node.js 16.13.1; Darwin 21.5.0) - - function checkUserAgentString(ua, clientId) { - // Prefix: 'NodejsDatabricksSqlConnector/' - // Version: three period-separated digits and optional suffix - const re = - /^(?NodejsDatabricksSqlConnector)\/(?\d+\.\d+\.\d+(-[^(]+)?)\s*\((?[^)]+)\)$/i; - const match = re.exec(ua); - expect(match).to.not.be.eq(null); - - const { comment } = match.groups; - - expect(comment.split(';').length).to.be.gte(2); // at least Node and OS version should be there - - if (clientId) { - expect(comment.trim()).to.satisfy((s) => s.startsWith(`${clientId};`)); - } - } - - it('matches pattern with clientId', () => { - const clientId = 'Some Client ID'; - const ua = buildUserAgentString(clientId); - checkUserAgentString(ua, clientId); - }); - - it('matches pattern without clientId', () => { - const ua = buildUserAgentString(); - checkUserAgentString(ua); - }); -}); - -describe('formatProgress', () => { - it('formats progress', () => { - const result = formatProgress({ - headerNames: [], - rows: [], - }); - expect(result).to.be.eq('\n'); - }); -}); - -describe('ProgressUpdateTransformer', () => { - it('should have equal columns', () => { - const t = new ProgressUpdateTransformer(); - - expect(t.formatRow(['Column 1', 'Column 2'])).to.be.eq('Column 1 |Column 2 '); - }); - - it('should format response as table', () => { - const t = new ProgressUpdateTransformer({ - headerNames: ['Column 1', 'Column 2'], - rows: [ - ['value 1.1', 'value 1.2'], - ['value 2.1', 'value 2.2'], - ], - footerSummary: 'footer', - }); - - expect(String(t)).to.be.eq( - 'Column 1 |Column 2 \n' + 'value 1.1 |value 1.2 \n' + 'value 2.1 |value 2.2 \n' + 'footer', - ); - }); -}); - -describe('definedOrError', () => { - it('should return value if it is defined', () => { - const values = [null, 0, 3.14, false, true, '', 'Hello, World!', [], {}]; - for (const value of values) { - const result = definedOrError(value); - expect(result).to.be.equal(value); - } - }); - - it('should throw error if value is undefined', () => { - expect(() => { - definedOrError(undefined); - }).to.throw(); - }); -}); +const CloseableCollection = require('../../../lib/utils/CloseableCollection').default; describe('CloseableCollection', () => { it('should add item if not already added', () => { diff --git a/tests/unit/utils/OperationIterator.test.js b/tests/unit/utils/OperationIterator.test.js new file mode 100644 index 0000000..29b8292 --- /dev/null +++ b/tests/unit/utils/OperationIterator.test.js @@ -0,0 +1,185 @@ +const { expect } = require('chai'); +const { OperationChunksIterator, OperationRowsIterator } = require('../../../lib/utils/OperationIterator'); + +class OperationMock { + // `chunks` should be an array of chunks + // where each chunk is an array of values + constructor(chunks) { + this.chunks = Array.isArray(chunks) ? [...chunks] : []; + this.closed = false; + } + + async hasMoreRows() { + return !this.closed && this.chunks.length > 0; + } + + async fetchChunk() { + return this.chunks.shift() ?? []; + } + + async close() { + this.closed = true; + } + + iterateChunks(options) { + return new OperationChunksIterator(this, options); + } + + iterateRows(options) { + return new OperationRowsIterator(this, options); + } +} + +describe('OperationChunksIterator', () => { + it('should iterate over all chunks', async () => { + const chunks = [[1, 2, 3], [4, 5, 6, 7, 8], [9]]; + + const operation = new OperationMock(chunks); + + expect(operation.closed).to.be.false; + + let index = 0; + for await (const chunk of operation.iterateChunks()) { + expect(chunk).to.deep.equal(chunks[index]); + index += 1; + } + + expect(index).to.equal(chunks.length); + expect(operation.closed).to.be.false; + }); + + it('should iterate over all chunks and close operation', async () => { + const chunks = [[1, 2, 3], [4, 5, 6, 7, 8], [9]]; + + const operation = new OperationMock(chunks); + + expect(operation.closed).to.be.false; + + let index = 0; + for await (const chunk of operation.iterateChunks({ autoClose: true })) { + expect(chunk).to.deep.equal(chunks[index]); + index += 1; + } + + expect(index).to.equal(chunks.length); + expect(operation.closed).to.be.true; + }); + + it('should iterate partially', async () => { + const chunks = [[1, 2, 3], [4, 5, 6, 7, 8], [9]]; + + const operation = new OperationMock(chunks); + + expect(operation.closed).to.be.false; + + for await (const chunk of operation.iterateChunks()) { + expect(chunk).to.deep.equal(chunks[0]); + break; + } + + for await (const chunk of operation.iterateChunks()) { + expect(chunk).to.deep.equal(chunks[1]); + break; + } + + expect(await operation.hasMoreRows()).to.be.true; + expect(operation.closed).to.be.false; + }); + + it('should iterate partially and close operation', async () => { + const chunks = [[1, 2, 3], [4, 5, 6, 7, 8], [9]]; + + const operation = new OperationMock(chunks); + + expect(operation.closed).to.be.false; + + for await (const chunk of operation.iterateChunks({ autoClose: true })) { + expect(chunk).to.deep.equal(chunks[0]); + break; + } + + expect(await operation.hasMoreRows()).to.be.false; + expect(operation.closed).to.be.true; + }); +}); + +describe('OperationRowsIterator', () => { + it('should iterate over all rows', async () => { + const chunks = [[1, 2, 3], [4, 5, 6, 7, 8], [9]]; + const rows = chunks.flat(); + + const operation = new OperationMock(chunks); + + expect(operation.closed).to.be.false; + + let index = 0; + for await (const row of operation.iterateRows()) { + expect(row).to.equal(rows[index]); + index += 1; + } + + expect(index).to.equal(rows.length); + expect(operation.closed).to.be.false; + }); + + it('should iterate over all rows and close operation', async () => { + const chunks = [[1, 2, 3], [4, 5, 6, 7, 8], [9]]; + const rows = chunks.flat(); + + const operation = new OperationMock(chunks); + + expect(operation.closed).to.be.false; + + let index = 0; + for await (const row of operation.iterateRows({ autoClose: true })) { + expect(row).to.equal(rows[index]); + index += 1; + } + + expect(index).to.equal(rows.length); + expect(operation.closed).to.be.true; + }); + + it('should iterate partially', async () => { + const chunks = [[1, 2, 3], [4, 5, 6, 7, 8], [9]]; + + const operation = new OperationMock(chunks); + + expect(operation.closed).to.be.false; + + for await (const row of operation.iterateRows()) { + expect(row).to.equal(chunks[0][0]); + break; + } + + for await (const row of operation.iterateRows()) { + // This is a limitation of rows iterator. Since operation can only + // supply chunks of rows, when new iterator is created it will + // start with the next available chunk. Generally this should not + // be an issue, because using multiple iterators on the same operation + // is not recommended + expect(row).to.equal(chunks[1][0]); + break; + } + + expect(await operation.hasMoreRows()).to.be.true; + expect(operation.closed).to.be.false; + }); + + it('should iterate partially and close operation', async () => { + const chunks = [[1, 2, 3], [4, 5, 6, 7, 8], [9]]; + const rows = chunks.flat(); + + const operation = new OperationMock(chunks); + + expect(operation.closed).to.be.false; + + for await (const row of operation.iterateRows({ autoClose: true })) { + expect(row).to.equal(rows[0]); + break; + } + + expect(await operation.hasMoreRows()).to.be.false; + expect(operation.closed).to.be.true; + }); +}); diff --git a/tests/unit/utils/utils.test.js b/tests/unit/utils/utils.test.js new file mode 100644 index 0000000..67b3dfe --- /dev/null +++ b/tests/unit/utils/utils.test.js @@ -0,0 +1,98 @@ +const { expect } = require('chai'); + +const { + buildUserAgentString, + definedOrError, + formatProgress, + ProgressUpdateTransformer, +} = require('../../../lib/utils'); + +describe('buildUserAgentString', () => { + // It should follow https://www.rfc-editor.org/rfc/rfc7231#section-5.5.3 and + // https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/User-Agent + // + // UserAgent ::= '/' '(' ')' + // ProductName ::= 'NodejsDatabricksSqlConnector' + // ::= [ ';' ] 'Node.js' ';' + // + // Examples: + // - with provided: NodejsDatabricksSqlConnector/0.1.8-beta.1 (Client ID; Node.js 16.13.1; Darwin 21.5.0) + // - without provided: NodejsDatabricksSqlConnector/0.1.8-beta.1 (Node.js 16.13.1; Darwin 21.5.0) + + function checkUserAgentString(ua, clientId) { + // Prefix: 'NodejsDatabricksSqlConnector/' + // Version: three period-separated digits and optional suffix + const re = + /^(?NodejsDatabricksSqlConnector)\/(?\d+\.\d+\.\d+(-[^(]+)?)\s*\((?[^)]+)\)$/i; + const match = re.exec(ua); + expect(match).to.not.be.eq(null); + + const { comment } = match.groups; + + expect(comment.split(';').length).to.be.gte(2); // at least Node and OS version should be there + + if (clientId) { + expect(comment.trim()).to.satisfy((s) => s.startsWith(`${clientId};`)); + } + } + + it('matches pattern with clientId', () => { + const clientId = 'Some Client ID'; + const ua = buildUserAgentString(clientId); + checkUserAgentString(ua, clientId); + }); + + it('matches pattern without clientId', () => { + const ua = buildUserAgentString(); + checkUserAgentString(ua); + }); +}); + +describe('formatProgress', () => { + it('formats progress', () => { + const result = formatProgress({ + headerNames: [], + rows: [], + }); + expect(result).to.be.eq('\n'); + }); +}); + +describe('ProgressUpdateTransformer', () => { + it('should have equal columns', () => { + const t = new ProgressUpdateTransformer(); + + expect(t.formatRow(['Column 1', 'Column 2'])).to.be.eq('Column 1 |Column 2 '); + }); + + it('should format response as table', () => { + const t = new ProgressUpdateTransformer({ + headerNames: ['Column 1', 'Column 2'], + rows: [ + ['value 1.1', 'value 1.2'], + ['value 2.1', 'value 2.2'], + ], + footerSummary: 'footer', + }); + + expect(String(t)).to.be.eq( + 'Column 1 |Column 2 \n' + 'value 1.1 |value 1.2 \n' + 'value 2.1 |value 2.2 \n' + 'footer', + ); + }); +}); + +describe('definedOrError', () => { + it('should return value if it is defined', () => { + const values = [null, 0, 3.14, false, true, '', 'Hello, World!', [], {}]; + for (const value of values) { + const result = definedOrError(value); + expect(result).to.be.equal(value); + } + }); + + it('should throw error if value is undefined', () => { + expect(() => { + definedOrError(undefined); + }).to.throw(); + }); +});