Skip to content

Commit

Permalink
Pagination: Limit, Offset & downloadCSV (#33)
Browse files Browse the repository at this point in the history
This PR makes quite a few changes to the routing while adding limit and offset as "GetParameters".

Unit Tests are introduced validating that limits are respected where possible.
  • Loading branch information
bh2smith authored Mar 8, 2024
1 parent 402da9e commit 2a2efd5
Show file tree
Hide file tree
Showing 9 changed files with 563 additions and 95 deletions.
68 changes: 51 additions & 17 deletions src/api/client.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
import * as fs from "fs/promises";
import {
DuneError,
ResultsResponse,
ExecutionState,
QueryParameter,
GetStatusResponse,
ExecutionResponseCSV,
} from "../types";
import { ageInHours, sleep } from "../utils";
import log from "loglevel";
import { logPrefix } from "../utils";
import { ExecutionAPI } from "./execution";
import { POLL_FREQUENCY_SECONDS, THREE_MONTHS_IN_HOURS } from "../constants";
import {
MAX_NUM_ROWS_PER_BATCH,
POLL_FREQUENCY_SECONDS,
THREE_MONTHS_IN_HOURS,
} from "../constants";
import { ExecutionParams } from "../types/requestPayload";
import { QueryAPI } from "./query";
import { join } from "path";

const TERMINAL_STATES = [
ExecutionState.CANCELLED,
Expand All @@ -31,6 +38,7 @@ export class DuneClient {
async runQuery(
queryID: number,
params?: ExecutionParams,
batchSize: number = MAX_NUM_ROWS_PER_BATCH,
pingFrequency: number = POLL_FREQUENCY_SECONDS,
): Promise<ResultsResponse> {
let { state, execution_id: jobID } = await this._runInner(
Expand All @@ -39,7 +47,17 @@ export class DuneClient {
pingFrequency,
);
if (state === ExecutionState.COMPLETED) {
return this.exec.getExecutionResults(jobID);
let result = await this.getLatestResult(
queryID,
params?.query_parameters,
batchSize,
);
if (result.execution_id !== jobID) {
throw new DuneError(
`invalid execution ID: expected ${jobID}, got ${result.execution_id}`,
);
}
return result;
} else {
const message = `refresh (execution ${jobID}) yields incomplete terminal state ${state}`;
// TODO - log the error in constructor
Expand All @@ -52,14 +70,17 @@ export class DuneClient {
queryID: number,
params?: ExecutionParams,
pingFrequency: number = POLL_FREQUENCY_SECONDS,
): Promise<string> {
): Promise<ExecutionResponseCSV> {
let { state, execution_id: jobID } = await this._runInner(
queryID,
params,
pingFrequency,
);
if (state === ExecutionState.COMPLETED) {
return this.exec.getResultCSV(jobID);
// we can't assert that the execution ids agree here, so we use max age hours as a "safe guard"
return this.exec.getResultCSV(jobID, {
query_parameters: params?.query_parameters,
});
} else {
const message = `refresh (execution ${jobID}) yields incomplete terminal state ${state}`;
// TODO - log the error in constructor
Expand All @@ -73,52 +94,65 @@ export class DuneClient {
* Here contains additional logic to refresh the results if they are too old.
* @param queryId - query to get results of.
* @param parameters - parameters for which they were called.
* @param limit - the number of rows to retrieve
* @param maxAgeHours - oldest acceptable results (if expired results are refreshed)
* @returns Latest execution results for the given parameters.
*/
async getLatestResult(
queryId: number,
parameters?: QueryParameter[],
parameters: QueryParameter[] = [],
batchSize: number = MAX_NUM_ROWS_PER_BATCH,
maxAgeHours: number = THREE_MONTHS_IN_HOURS,
): Promise<ResultsResponse> {
let results = await this.exec.getLastExecutionResults(queryId, parameters);
let results = await this.exec.getLastExecutionResults(queryId, {
query_parameters: parameters,
limit: batchSize,
});
const lastRun: Date = results.execution_ended_at!;
if (lastRun !== undefined && ageInHours(lastRun) > maxAgeHours) {
log.info(
logPrefix,
`results (from ${lastRun}) older than ${maxAgeHours} hours, re-running query.`,
);
results = await this.runQuery(queryId, { query_parameters: parameters });
results = await this.runQuery(queryId, { query_parameters: parameters }, batchSize);
}
return results;
}

/**
* Get the lastest execution results in CSV format.
* Get the lastest execution results in CSV format and saves to disk.
* @param queryId - query to get results of.
* @param outFile - location to save CSV.
* @param parameters - parameters for which they were called.
* @param batchSize - the page size when retriving results.
* @param maxAgeHours - oldest acceptable results (if expired results are refreshed)
* @returns Latest execution results for the given parameters.
*/
async getLatestResultCSV(
async downloadCSV(
queryId: number,
parameters?: QueryParameter[],
outFile: string,
parameters: QueryParameter[] = [],
batchSize: number = MAX_NUM_ROWS_PER_BATCH,
maxAgeHours: number = THREE_MONTHS_IN_HOURS,
): Promise<string> {
const lastResults = await this.exec.getLastExecutionResults(queryId, parameters);
): Promise<void> {
const params = { query_parameters: parameters, limit: batchSize };
const lastResults = await this.exec.getLastExecutionResults(queryId, params);
const lastRun: Date = lastResults.execution_ended_at!;
let results: Promise<string>;
let results: Promise<ExecutionResponseCSV>;
if (lastRun !== undefined && ageInHours(lastRun) > maxAgeHours) {
log.info(
logPrefix,
`results (from ${lastRun}) older than ${maxAgeHours} hours, re-running query.`,
);
results = this.runQueryCSV(queryId, { query_parameters: parameters });
results = this.runQueryCSV(queryId, { query_parameters: parameters }, batchSize);
} else {
// TODO (user cost savings): transform the lastResults into CSV instead of refetching
results = this.exec.getLastResultCSV(queryId, parameters);
results = this.exec.getLastResultCSV(queryId, params);
}
return results;
// Wait for the results promise to resolve and then write the CSV data to the specified outFile
const csvData = (await results).data;
await fs.writeFile(outFile, csvData, "utf8");
log.info(`CSV data has been saved to ${outFile}`);
}

private async _runInner(
Expand Down Expand Up @@ -150,7 +184,7 @@ export class DuneClient {
*/
async refresh(
queryID: number,
parameters?: QueryParameter[],
parameters: QueryParameter[] = [],
pingFrequency: number = 1,
): Promise<ResultsResponse> {
return this.runQuery(queryID, { query_parameters: parameters }, pingFrequency);
Expand Down
94 changes: 78 additions & 16 deletions src/api/execution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,23 @@ import {
GetStatusResponse,
ResultsResponse,
QueryParameter,
ExecutionResponseCSV,
concatResultResponse,
concatResultCSV,
} from "../types";
import log from "loglevel";
import { logPrefix } from "../utils";
import { Router } from "./router";
import { ExecutionParams, ExecutionPerformance } from "../types/requestPayload";
import {
ExecutionParams,
ExecutionPerformance,
GetResultPayload,
} from "../types/requestPayload";
import {
DEFAULT_GET_PARAMS,
DUNE_CSV_NEXT_OFFSET_HEADER,
DUNE_CSV_NEXT_URI_HEADER,
} from "../constants";

// This class implements all the routes defined in the Dune API Docs: https://dune.com/docs/api/
export class ExecutionAPI extends Router {
Expand Down Expand Up @@ -46,36 +58,86 @@ export class ExecutionAPI extends Router {
return response as GetStatusResponse;
}

async getExecutionResults(executionId: string): Promise<ResultsResponse> {
const response: ResultsResponse = await this._get(`execution/${executionId}/results`);
async getExecutionResults(
executionId: string,
params: GetResultPayload = DEFAULT_GET_PARAMS,
): Promise<ResultsResponse> {
const response: ResultsResponse = await this._get(
`execution/${executionId}/results`,
params,
);
log.debug(logPrefix, `get_result response ${JSON.stringify(response)}`);
return response as ResultsResponse;
}

async getResultCSV(executionId: string): Promise<string> {
const response = await this._get<string>(`execution/${executionId}/results/csv`);
async getResultCSV(
executionId: string,
params: GetResultPayload = DEFAULT_GET_PARAMS,
): Promise<ExecutionResponseCSV> {
const response = await this._get<Response>(
`execution/${executionId}/results/csv`,
params,
true,
);
log.debug(logPrefix, `get_result response ${JSON.stringify(response)}`);
return response;
return this.buildCSVResponse(response);
}

async getLastExecutionResults(
queryId: number,
parameters?: QueryParameter[],
params: GetResultPayload = DEFAULT_GET_PARAMS,
): Promise<ResultsResponse> {
return this._get<ResultsResponse>(`query/${queryId}/results`, {
query_parameters: parameters ? parameters : [],
});
// The first bit might only return a page.
let results = await this._get<ResultsResponse>(`query/${queryId}/results`, params);
return this._fetchEntireResult(results);
}

async getLastResultCSV(
queryId: number,
parameters?: QueryParameter[],
): Promise<string> {
return this._get<string>(`query/${queryId}/results/csv`, {
query_parameters: parameters ? parameters : [],
});
params: GetResultPayload = DEFAULT_GET_PARAMS,
): Promise<ExecutionResponseCSV> {
let response = await this._get<Response>(
`query/${queryId}/results/csv`,
params,
true,
);
return this._fetchEntireResultCSV(await this.buildCSVResponse(response));
}

private async buildCSVResponse(response: Response): Promise<ExecutionResponseCSV> {
const nextOffset = response.headers.get(DUNE_CSV_NEXT_OFFSET_HEADER);
return {
data: await response.text(),
next_uri: response.headers.get(DUNE_CSV_NEXT_URI_HEADER),
next_offset: nextOffset ? parseInt(nextOffset) : undefined,
};
}

private async _fetchEntireResult(results: ResultsResponse): Promise<ResultsResponse> {
let next_uri = results.next_uri;
let batch: ResultsResponse;
while (next_uri !== undefined) {
batch = await this._getByUrl<ResultsResponse>(next_uri);
results = concatResultResponse(results, batch);
next_uri = batch.next_uri;
}
return results;
}

private async _fetchEntireResultCSV(
results: ExecutionResponseCSV,
): Promise<ExecutionResponseCSV> {
let next_uri = results.next_uri;
let batch: ExecutionResponseCSV;
while (next_uri !== null) {
batch = await this.buildCSVResponse(
await this._getByUrl<Response>(next_uri!, undefined, true),
);
results = concatResultCSV(results, batch);
next_uri = batch.next_uri;
}
return results;
}
// TODO - add getExecutionResultsCSV

/**
* @deprecated since version 0.0.2 Use executeQuery
Expand Down
58 changes: 43 additions & 15 deletions src/api/router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@ import { DuneError } from "../types";
import fetch from "cross-fetch";
import log from "loglevel";
import { logPrefix } from "../utils";
import { RequestPayload, payloadJSON } from "../types/requestPayload";
import {
RequestPayload,
payloadJSON,
payloadSearchParams,
} from "../types/requestPayload";

const BASE_URL = "https://api.dune.com/api/v1";
const BASE_URL = "https://api.dune.com/api";

enum RequestMethod {
GET = "GET",
Expand All @@ -15,9 +19,11 @@ enum RequestMethod {
// This class implements all the routes defined in the Dune API Docs: https://dune.com/docs/api/
export class Router {
private apiKey: string;
private apiVersion: string;

constructor(apiKey: string) {
constructor(apiKey: string, apiVersion: string = "v1") {
this.apiKey = apiKey;
this.apiVersion = apiVersion;
}

protected async _handleResponse<T>(responsePromise: Promise<Response>): Promise<T> {
Expand Down Expand Up @@ -60,38 +66,60 @@ export class Router {
method: RequestMethod,
url: string,
payload?: RequestPayload,
raw: boolean = false,
): Promise<T> {
const payloadData = payloadJSON(payload);
log.debug(logPrefix, `${method} received input url=${url}, payload=${payloadData}`);
const response = fetch(url, {
let requestData: RequestInit = {
method,
headers: {
"x-dune-api-key": this.apiKey,
"User-Agent": `ts-client-sdk (https://www.npmjs.com/package/@duneanalytics/client-sdk)`,
},
// conditionally add the body property
...(method !== RequestMethod.GET && {
body: payloadJSON(payload),
body: payloadData,
}),
...(method === RequestMethod.GET && {
params: payloadJSON(payload),
}),
});
};
let queryParams = "";
/// Build Url Search Parameters on GET
if (method === "GET" && payload) {
const searchParams = new URLSearchParams(payloadSearchParams(payload)).toString();
queryParams = `?${searchParams}`;
}

let response = fetch(url + queryParams, requestData);
if (raw) {
return response as T;
}
return this._handleResponse<T>(response);
}

protected async _get<T>(route: string, params?: RequestPayload): Promise<T> {
return this._request(RequestMethod.GET, this.url(route), params);
protected async _get<T>(
route: string,
params?: RequestPayload,
raw: boolean = false,
): Promise<T> {
return this._request<T>(RequestMethod.GET, this.url(route), params, raw);
}

protected async _getByUrl<T>(
url: string,
params?: RequestPayload,
raw: boolean = false,
): Promise<T> {
return this._request<T>(RequestMethod.GET, url, params, raw);
}

protected async _post<T>(route: string, params?: RequestPayload): Promise<T> {
return this._request(RequestMethod.POST, this.url(route), params);
return this._request<T>(RequestMethod.POST, this.url(route), params);
}

protected async _patch<T>(route: string, params?: RequestPayload): Promise<T> {
return this._request(RequestMethod.PATCH, this.url(route), params);
return this._request<T>(RequestMethod.PATCH, this.url(route), params);
}

private url(route: string): string {
return `${BASE_URL}/${route}`;
url(route?: string): string {
return `${BASE_URL}/${this.apiVersion}/${route}`;
}
}
Loading

0 comments on commit 2a2efd5

Please sign in to comment.