Skip to content

Commit

Permalink
Move some global constants to client config
Browse files Browse the repository at this point in the history
Signed-off-by: Levko Kravets <[email protected]>
  • Loading branch information
kravets-levko committed Apr 23, 2024
1 parent c5f9037 commit 2e6c094
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 20 deletions.
3 changes: 3 additions & 0 deletions lib/DBSQLClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I

private static getDefaultConfig(): ClientConfig {
return {
directResultsDefaultMaxRows: 100000,
fetchChunkDefaultMaxRows: 100000,

arrowEnabled: true,
useArrowNativeTypes: true,
socketTimeout: 15 * 60 * 1000, // 15 minutes
Expand Down
8 changes: 4 additions & 4 deletions lib/DBSQLOperation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ import { definedOrError } from './utils';
import HiveDriverError from './errors/HiveDriverError';
import IClientContext from './contracts/IClientContext';

const defaultMaxRows = 100000;

interface DBSQLOperationConstructorOptions {
handle: TOperationHandle;
directResults?: TSparkDirectResults;
Expand Down Expand Up @@ -164,8 +162,10 @@ export default class DBSQLOperation implements IOperation {
setTimeout(resolve, 0);
});

const defaultMaxRows = this.context.getConfig().fetchChunkDefaultMaxRows;

const result = resultHandler.fetchNext({
limit: options?.maxRows || defaultMaxRows,
limit: options?.maxRows ?? defaultMaxRows,
disableBuffering: options?.disableBuffering,
});
await this.failIfClosed();
Expand All @@ -174,7 +174,7 @@ export default class DBSQLOperation implements IOperation {
.getLogger()
.log(
LogLevel.debug,
`Fetched chunk of size: ${options?.maxRows || defaultMaxRows} from operation with id: ${this.id}`,
`Fetched chunk of size: ${options?.maxRows ?? defaultMaxRows} from operation with id: ${this.id}`,
);
return result;
}
Expand Down
38 changes: 22 additions & 16 deletions lib/DBSQLSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@ import IClientContext, { ClientConfig } from './contracts/IClientContext';
// Explicitly promisify a callback-style `pipeline` because `node:stream/promises` is not available in Node 14
const pipeline = util.promisify(stream.pipeline);

const defaultMaxRows = 100000;

interface OperationResponseShape {
status: TStatus;
operationHandle?: TOperationHandle;
Expand All @@ -64,14 +62,14 @@ export function numberToInt64(value: number | bigint | Int64): Int64 {
return new Int64(value);
}

function getDirectResultsOptions(maxRows: number | bigint | Int64 | null = defaultMaxRows) {
function getDirectResultsOptions(maxRows: number | bigint | Int64 | null | undefined, config: ClientConfig) {
if (maxRows === null) {
return {};
}

return {
getDirectResults: {
maxRows: numberToInt64(maxRows),
maxRows: numberToInt64(maxRows ?? config.directResultsDefaultMaxRows),
},
};
}
Expand Down Expand Up @@ -101,7 +99,6 @@ function getArrowOptions(config: ClientConfig): {
}

function getQueryParameters(
sessionHandle: TSessionHandle,
namedParameters?: Record<string, DBSQLParameter | DBSQLParameterValue>,
ordinalParameters?: Array<DBSQLParameter | DBSQLParameterValue>,
): Array<TSparkParameter> {
Expand Down Expand Up @@ -201,10 +198,10 @@ export default class DBSQLSession implements IDBSQLSession {
statement,
queryTimeout: options.queryTimeout ? numberToInt64(options.queryTimeout) : undefined,
runAsync: true,
...getDirectResultsOptions(options.maxRows),
...getDirectResultsOptions(options.maxRows, clientConfig),
...getArrowOptions(clientConfig),
canDownloadResult: options.useCloudFetch ?? clientConfig.useCloudFetch,
parameters: getQueryParameters(this.sessionHandle, options.namedParameters, options.ordinalParameters),
parameters: getQueryParameters(options.namedParameters, options.ordinalParameters),
canDecompressLZ4Result: clientConfig.useLZ4Compression && Boolean(LZ4),
});
const response = await this.handleResponse(operationPromise);
Expand Down Expand Up @@ -354,10 +351,11 @@ export default class DBSQLSession implements IDBSQLSession {
public async getTypeInfo(request: TypeInfoRequest = {}): Promise<IOperation> {
await this.failIfClosed();
const driver = await this.context.getDriver();
const clientConfig = this.context.getConfig();
const operationPromise = driver.getTypeInfo({
sessionHandle: this.sessionHandle,
runAsync: true,
...getDirectResultsOptions(request.maxRows),
...getDirectResultsOptions(request.maxRows, clientConfig),
});
const response = await this.handleResponse(operationPromise);
return this.createOperation(response);
Expand All @@ -372,10 +370,11 @@ export default class DBSQLSession implements IDBSQLSession {
public async getCatalogs(request: CatalogsRequest = {}): Promise<IOperation> {
await this.failIfClosed();
const driver = await this.context.getDriver();
const clientConfig = this.context.getConfig();
const operationPromise = driver.getCatalogs({
sessionHandle: this.sessionHandle,
runAsync: true,
...getDirectResultsOptions(request.maxRows),
...getDirectResultsOptions(request.maxRows, clientConfig),
});
const response = await this.handleResponse(operationPromise);
return this.createOperation(response);
Expand All @@ -390,12 +389,13 @@ export default class DBSQLSession implements IDBSQLSession {
public async getSchemas(request: SchemasRequest = {}): Promise<IOperation> {
await this.failIfClosed();
const driver = await this.context.getDriver();
const clientConfig = this.context.getConfig();
const operationPromise = driver.getSchemas({
sessionHandle: this.sessionHandle,
catalogName: request.catalogName,
schemaName: request.schemaName,
runAsync: true,
...getDirectResultsOptions(request.maxRows),
...getDirectResultsOptions(request.maxRows, clientConfig),
});
const response = await this.handleResponse(operationPromise);
return this.createOperation(response);
Expand All @@ -410,14 +410,15 @@ export default class DBSQLSession implements IDBSQLSession {
public async getTables(request: TablesRequest = {}): Promise<IOperation> {
await this.failIfClosed();
const driver = await this.context.getDriver();
const clientConfig = this.context.getConfig();
const operationPromise = driver.getTables({
sessionHandle: this.sessionHandle,
catalogName: request.catalogName,
schemaName: request.schemaName,
tableName: request.tableName,
tableTypes: request.tableTypes,
runAsync: true,
...getDirectResultsOptions(request.maxRows),
...getDirectResultsOptions(request.maxRows, clientConfig),
});
const response = await this.handleResponse(operationPromise);
return this.createOperation(response);
Expand All @@ -432,10 +433,11 @@ export default class DBSQLSession implements IDBSQLSession {
public async getTableTypes(request: TableTypesRequest = {}): Promise<IOperation> {
await this.failIfClosed();
const driver = await this.context.getDriver();
const clientConfig = this.context.getConfig();
const operationPromise = driver.getTableTypes({
sessionHandle: this.sessionHandle,
runAsync: true,
...getDirectResultsOptions(request.maxRows),
...getDirectResultsOptions(request.maxRows, clientConfig),
});
const response = await this.handleResponse(operationPromise);
return this.createOperation(response);
Expand All @@ -450,14 +452,15 @@ export default class DBSQLSession implements IDBSQLSession {
public async getColumns(request: ColumnsRequest = {}): Promise<IOperation> {
await this.failIfClosed();
const driver = await this.context.getDriver();
const clientConfig = this.context.getConfig();
const operationPromise = driver.getColumns({
sessionHandle: this.sessionHandle,
catalogName: request.catalogName,
schemaName: request.schemaName,
tableName: request.tableName,
columnName: request.columnName,
runAsync: true,
...getDirectResultsOptions(request.maxRows),
...getDirectResultsOptions(request.maxRows, clientConfig),
});
const response = await this.handleResponse(operationPromise);
return this.createOperation(response);
Expand All @@ -472,13 +475,14 @@ export default class DBSQLSession implements IDBSQLSession {
public async getFunctions(request: FunctionsRequest): Promise<IOperation> {
await this.failIfClosed();
const driver = await this.context.getDriver();
const clientConfig = this.context.getConfig();
const operationPromise = driver.getFunctions({
sessionHandle: this.sessionHandle,
catalogName: request.catalogName,
schemaName: request.schemaName,
functionName: request.functionName,
runAsync: true,
...getDirectResultsOptions(request.maxRows),
...getDirectResultsOptions(request.maxRows, clientConfig),
});
const response = await this.handleResponse(operationPromise);
return this.createOperation(response);
Expand All @@ -487,13 +491,14 @@ export default class DBSQLSession implements IDBSQLSession {
public async getPrimaryKeys(request: PrimaryKeysRequest): Promise<IOperation> {
await this.failIfClosed();
const driver = await this.context.getDriver();
const clientConfig = this.context.getConfig();
const operationPromise = driver.getPrimaryKeys({
sessionHandle: this.sessionHandle,
catalogName: request.catalogName,
schemaName: request.schemaName,
tableName: request.tableName,
runAsync: true,
...getDirectResultsOptions(request.maxRows),
...getDirectResultsOptions(request.maxRows, clientConfig),
});
const response = await this.handleResponse(operationPromise);
return this.createOperation(response);
Expand All @@ -508,6 +513,7 @@ export default class DBSQLSession implements IDBSQLSession {
public async getCrossReference(request: CrossReferenceRequest): Promise<IOperation> {
await this.failIfClosed();
const driver = await this.context.getDriver();
const clientConfig = this.context.getConfig();
const operationPromise = driver.getCrossReference({
sessionHandle: this.sessionHandle,
parentCatalogName: request.parentCatalogName,
Expand All @@ -517,7 +523,7 @@ export default class DBSQLSession implements IDBSQLSession {
foreignSchemaName: request.foreignSchemaName,
foreignTableName: request.foreignTableName,
runAsync: true,
...getDirectResultsOptions(request.maxRows),
...getDirectResultsOptions(request.maxRows, clientConfig),
});
const response = await this.handleResponse(operationPromise);
return this.createOperation(response);
Expand Down
3 changes: 3 additions & 0 deletions lib/contracts/IClientContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import IConnectionProvider from '../connection/contracts/IConnectionProvider';
import TCLIService from '../../thrift/TCLIService';

export interface ClientConfig {
directResultsDefaultMaxRows: number;
fetchChunkDefaultMaxRows: number;

arrowEnabled?: boolean;
useArrowNativeTypes?: boolean;
socketTimeout: number;
Expand Down
5 changes: 5 additions & 0 deletions tests/unit/DBSQLOperation.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ const { expect, AssertionError } = require('chai');
const sinon = require('sinon');
const { DBSQLLogger, LogLevel } = require('../../dist');
const { TStatusCode, TOperationState, TTypeId, TSparkRowSetType } = require('../../thrift/TCLIService_types');
const DBSQLClient = require('../../dist/DBSQLClient').default;
const DBSQLOperation = require('../../dist/DBSQLOperation').default;
const StatusError = require('../../dist/errors/StatusError').default;
const OperationStateError = require('../../dist/errors/OperationStateError').default;
Expand Down Expand Up @@ -109,6 +110,10 @@ class ClientContextMock {
this.driver = new DriverMock();
}

getConfig() {
return DBSQLClient.getDefaultConfig();
}

getLogger() {
return this.logger;
}
Expand Down

0 comments on commit 2e6c094

Please sign in to comment.