Skip to content

Commit

Permalink
Improve cloud fetch result handler: better performance and fixed memo…
Browse files Browse the repository at this point in the history
…ry consumption issues

Signed-off-by: Levko Kravets <[email protected]>
  • Loading branch information
kravets-levko committed Aug 3, 2023
1 parent 92cde96 commit dc8d319
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 60 deletions.
53 changes: 12 additions & 41 deletions lib/DBSQLOperation/FetchResultsHelper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ export default class FetchResultsHelper {

private fetchOrientation: TFetchOrientation = TFetchOrientation.FETCH_FIRST;

private pendingResults: TFetchResultsResp[] = [];
private prefetchedResults: TFetchResultsResp[] = [];

private readonly returnOnlyPrefetchedResults: boolean;

Expand All @@ -58,7 +58,7 @@ export default class FetchResultsHelper {
this.operationHandle = operationHandle;
prefetchedResults.forEach((item) => {
if (item) {
this.prepareCloudFetchChunks(item);
this.prefetchedResults.push(item);
}
});
this.returnOnlyPrefetchedResults = returnOnlyPrefetchedResults;
Expand All @@ -68,7 +68,7 @@ export default class FetchResultsHelper {
Status.assert(response.status);
this.fetchOrientation = TFetchOrientation.FETCH_NEXT;

if (this.pendingResults.length > 0) {
if (this.prefetchedResults.length > 0) {
this.hasMoreRows = true;
} else if (this.returnOnlyPrefetchedResults) {
this.hasMoreRows = false;
Expand All @@ -80,47 +80,18 @@ export default class FetchResultsHelper {
}

public async fetch(maxRows: number) {
if (this.pendingResults.length === 0) {
const results = await this.driver.fetchResults({
operationHandle: this.operationHandle,
orientation: this.fetchOrientation,
maxRows: new Int64(maxRows),
fetchType: FetchType.Data,
});

this.prepareCloudFetchChunks(results);
const prefetchedResponse = this.prefetchedResults.shift();
if (prefetchedResponse) {
return this.processFetchResponse(prefetchedResponse);
}

const response = this.pendingResults.shift();
// This check is rather for safety and to make TS happy. In practice, such a case should not happen
if (!response) {
throw new Error('Unexpected error: no more data');
}
const response = await this.driver.fetchResults({
operationHandle: this.operationHandle,
orientation: this.fetchOrientation,
maxRows: new Int64(maxRows),
fetchType: FetchType.Data,
});

return this.processFetchResponse(response);
}

private prepareCloudFetchChunks(response: TFetchResultsResp) {
// TODO: Make it configurable. Effectively, this is a concurrent downloads limit for an operation
const maxLinkCount = 1;

if (response.results && response.results.resultLinks && response.results.resultLinks.length > 0) {
const allLinks = [...response.results.resultLinks];
while (allLinks.length > 0) {
// Shallow clone the original response object, but rewrite cloud fetch links array
// to contain the only entry
const responseFragment = {
...response,
results: {
...response.results,
resultLinks: allLinks.splice(0, maxLinkCount),
},
};

this.pendingResults.push(responseFragment);
}
} else {
this.pendingResults.push(response);
}
}
}
11 changes: 10 additions & 1 deletion lib/DBSQLOperation/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,19 @@ export default class DBSQLOperation implements IOperation {
}

public async hasMoreRows(): Promise<boolean> {
// If operation is closed or cancelled - we should not try to get data from it
if (this._completeOperation.closed || this._completeOperation.cancelled) {
return false;
}
return this._data.hasMoreRows;

// Return early if there are still data available for fetching
if (this._data.hasMoreRows) {
return true;
}

// If we fetched all the data from server - check if there's anything buffered in result handler
const resultHandler = await this._schema.getResultHandler();
return resultHandler.hasPendingData();
}

public async getSchema(options?: GetSchemaOptions): Promise<TTableSchema | null> {
Expand Down
8 changes: 4 additions & 4 deletions lib/result/ArrowResult.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,16 @@ export default class ArrowResult implements IOperationResult {
this.arrowSchema = arrowSchema;
}

async hasPendingData() {
return false;
}

async getValue(data?: Array<TRowSet>) {
if (this.schema.length === 0 || !this.arrowSchema || !data) {
return [];
}

const batches = await this.getBatches(data);
return this.batchesToRows(batches);
}

protected batchesToRows(batches: Array<Buffer>) {
if (batches.length === 0) {
return [];
}
Expand Down
29 changes: 15 additions & 14 deletions lib/result/CloudFetchResult.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,32 @@ import fetch from 'node-fetch';
import { TRowSet, TSparkArrowResultLink } from '../../thrift/TCLIService_types';
import ArrowResult from './ArrowResult';

export default class CloudFetchResult extends ArrowResult {
protected batchesToRows(batches: Array<Buffer>) {
if (batches.length === 1) {
return super.batchesToRows(batches);
}
const concurrentDownloads = 10;

const results: Array<Array<any>> = [];
export default class CloudFetchResult extends ArrowResult {
private pendingLinks: Array<TSparkArrowResultLink> = [];

for (const batch of batches) {
results.push(super.batchesToRows([batch]));
}
private downloadedBatches: Array<Buffer> = [];

return results.flat(1);
async hasPendingData() {
return this.pendingLinks.length > 0 || this.downloadedBatches.length > 0;
}

protected async getBatches(data: Array<TRowSet>): Promise<Array<Buffer>> {
const tasks: Array<Promise<Buffer>> = [];

data?.forEach((item) => {
item.resultLinks?.forEach((link) => {
tasks.push(this.downloadLink(link));
this.pendingLinks.push(link);
});
});

return Promise.all(tasks);
if (this.downloadedBatches.length === 0) {
const links = this.pendingLinks.splice(0, concurrentDownloads);
const tasks = links.map((link) => this.downloadLink(link));
const batches = await Promise.all(tasks);
this.downloadedBatches.push(...batches);
}

return this.downloadedBatches.splice(0, 1);
}

private async downloadLink(link: TSparkArrowResultLink): Promise<Buffer> {
Expand Down
2 changes: 2 additions & 0 deletions lib/result/IOperationResult.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,6 @@ import { TRowSet } from '../../thrift/TCLIService_types';

export default interface IOperationResult {
getValue(data?: Array<TRowSet>): Promise<any>;

hasPendingData(): Promise<boolean>;
}
4 changes: 4 additions & 0 deletions lib/result/JsonResult.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ export default class JsonResult implements IOperationResult {
this.schema = getSchemaColumns(schema);
}

async hasPendingData() {
return false;
}

async getValue(data?: Array<TRowSet>): Promise<Array<object>> {
if (this.schema.length === 0 || !data) {
return [];
Expand Down

0 comments on commit dc8d319

Please sign in to comment.