Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pagination: Limit, Offset & downloadCSV #33

Merged
merged 10 commits into from
Mar 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 51 additions & 17 deletions src/api/client.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
import * as fs from "fs/promises";
import {
DuneError,
ResultsResponse,
ExecutionState,
QueryParameter,
GetStatusResponse,
ExecutionResponseCSV,
} from "../types";
import { ageInHours, sleep } from "../utils";
import log from "loglevel";
import { logPrefix } from "../utils";
import { ExecutionAPI } from "./execution";
import { POLL_FREQUENCY_SECONDS, THREE_MONTHS_IN_HOURS } from "../constants";
import {
MAX_NUM_ROWS_PER_BATCH,
POLL_FREQUENCY_SECONDS,
THREE_MONTHS_IN_HOURS,
} from "../constants";
import { ExecutionParams } from "../types/requestPayload";
import { QueryAPI } from "./query";
import { join } from "path";

const TERMINAL_STATES = [
ExecutionState.CANCELLED,
Expand All @@ -31,6 +38,7 @@ export class DuneClient {
async runQuery(
queryID: number,
params?: ExecutionParams,
batchSize: number = MAX_NUM_ROWS_PER_BATCH,
pingFrequency: number = POLL_FREQUENCY_SECONDS,
): Promise<ResultsResponse> {
let { state, execution_id: jobID } = await this._runInner(
Expand All @@ -39,7 +47,17 @@ export class DuneClient {
pingFrequency,
);
if (state === ExecutionState.COMPLETED) {
return this.exec.getExecutionResults(jobID);
let result = await this.getLatestResult(
queryID,
params?.query_parameters,
batchSize,
);
if (result.execution_id !== jobID) {
throw new DuneError(
`invalid execution ID: expected ${jobID}, got ${result.execution_id}`,
);
}
return result;
} else {
const message = `refresh (execution ${jobID}) yields incomplete terminal state ${state}`;
// TODO - log the error in constructor
Expand All @@ -52,14 +70,17 @@ export class DuneClient {
queryID: number,
params?: ExecutionParams,
pingFrequency: number = POLL_FREQUENCY_SECONDS,
): Promise<string> {
): Promise<ExecutionResponseCSV> {
let { state, execution_id: jobID } = await this._runInner(
queryID,
params,
pingFrequency,
);
if (state === ExecutionState.COMPLETED) {
return this.exec.getResultCSV(jobID);
// we can't assert that the execution ids agree here, so we use max age hours as a "safe guard"
return this.exec.getResultCSV(jobID, {
query_parameters: params?.query_parameters,
});
} else {
const message = `refresh (execution ${jobID}) yields incomplete terminal state ${state}`;
// TODO - log the error in constructor
Expand All @@ -73,52 +94,65 @@ export class DuneClient {
* Here contains additional logic to refresh the results if they are too old.
* @param queryId - query to get results of.
* @param parameters - parameters for which they were called.
* @param limit - the number of rows to retrieve
* @param maxAgeHours - oldest acceptable results (if expired results are refreshed)
* @returns Latest execution results for the given parameters.
*/
async getLatestResult(
queryId: number,
parameters?: QueryParameter[],
parameters: QueryParameter[] = [],
batchSize: number = MAX_NUM_ROWS_PER_BATCH,
maxAgeHours: number = THREE_MONTHS_IN_HOURS,
): Promise<ResultsResponse> {
let results = await this.exec.getLastExecutionResults(queryId, parameters);
let results = await this.exec.getLastExecutionResults(queryId, {
query_parameters: parameters,
limit: batchSize,
});
const lastRun: Date = results.execution_ended_at!;
if (lastRun !== undefined && ageInHours(lastRun) > maxAgeHours) {
log.info(
logPrefix,
`results (from ${lastRun}) older than ${maxAgeHours} hours, re-running query.`,
);
results = await this.runQuery(queryId, { query_parameters: parameters });
results = await this.runQuery(queryId, { query_parameters: parameters }, batchSize);
}
return results;
}

/**
* Get the lastest execution results in CSV format.
* Get the lastest execution results in CSV format and saves to disk.
* @param queryId - query to get results of.
* @param outFile - location to save CSV.
* @param parameters - parameters for which they were called.
* @param batchSize - the page size when retriving results.
* @param maxAgeHours - oldest acceptable results (if expired results are refreshed)
* @returns Latest execution results for the given parameters.
*/
async getLatestResultCSV(
async downloadCSV(
queryId: number,
parameters?: QueryParameter[],
outFile: string,
parameters: QueryParameter[] = [],
batchSize: number = MAX_NUM_ROWS_PER_BATCH,
maxAgeHours: number = THREE_MONTHS_IN_HOURS,
): Promise<string> {
const lastResults = await this.exec.getLastExecutionResults(queryId, parameters);
): Promise<void> {
const params = { query_parameters: parameters, limit: batchSize };
const lastResults = await this.exec.getLastExecutionResults(queryId, params);
const lastRun: Date = lastResults.execution_ended_at!;
let results: Promise<string>;
let results: Promise<ExecutionResponseCSV>;
if (lastRun !== undefined && ageInHours(lastRun) > maxAgeHours) {
log.info(
logPrefix,
`results (from ${lastRun}) older than ${maxAgeHours} hours, re-running query.`,
);
results = this.runQueryCSV(queryId, { query_parameters: parameters });
results = this.runQueryCSV(queryId, { query_parameters: parameters }, batchSize);
} else {
// TODO (user cost savings): transform the lastResults into CSV instead of refetching
results = this.exec.getLastResultCSV(queryId, parameters);
results = this.exec.getLastResultCSV(queryId, params);
}
return results;
// Wait for the results promise to resolve and then write the CSV data to the specified outFile
const csvData = (await results).data;
await fs.writeFile(outFile, csvData, "utf8");
log.info(`CSV data has been saved to ${outFile}`);
}

private async _runInner(
Expand Down Expand Up @@ -150,7 +184,7 @@ export class DuneClient {
*/
async refresh(
queryID: number,
parameters?: QueryParameter[],
parameters: QueryParameter[] = [],
pingFrequency: number = 1,
): Promise<ResultsResponse> {
return this.runQuery(queryID, { query_parameters: parameters }, pingFrequency);
Expand Down
94 changes: 78 additions & 16 deletions src/api/execution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,23 @@ import {
GetStatusResponse,
ResultsResponse,
QueryParameter,
ExecutionResponseCSV,
concatResultResponse,
concatResultCSV,
} from "../types";
import log from "loglevel";
import { logPrefix } from "../utils";
import { Router } from "./router";
import { ExecutionParams, ExecutionPerformance } from "../types/requestPayload";
import {
ExecutionParams,
ExecutionPerformance,
GetResultPayload,
} from "../types/requestPayload";
import {
DEFAULT_GET_PARAMS,
DUNE_CSV_NEXT_OFFSET_HEADER,
DUNE_CSV_NEXT_URI_HEADER,
} from "../constants";

// This class implements all the routes defined in the Dune API Docs: https://dune.com/docs/api/
export class ExecutionAPI extends Router {
Expand Down Expand Up @@ -46,36 +58,86 @@ export class ExecutionAPI extends Router {
return response as GetStatusResponse;
}

async getExecutionResults(executionId: string): Promise<ResultsResponse> {
const response: ResultsResponse = await this._get(`execution/${executionId}/results`);
async getExecutionResults(
executionId: string,
params: GetResultPayload = DEFAULT_GET_PARAMS,
): Promise<ResultsResponse> {
const response: ResultsResponse = await this._get(
`execution/${executionId}/results`,
params,
);
log.debug(logPrefix, `get_result response ${JSON.stringify(response)}`);
return response as ResultsResponse;
}

async getResultCSV(executionId: string): Promise<string> {
const response = await this._get<string>(`execution/${executionId}/results/csv`);
async getResultCSV(
executionId: string,
params: GetResultPayload = DEFAULT_GET_PARAMS,
): Promise<ExecutionResponseCSV> {
const response = await this._get<Response>(
`execution/${executionId}/results/csv`,
params,
true,
);
log.debug(logPrefix, `get_result response ${JSON.stringify(response)}`);
return response;
return this.buildCSVResponse(response);
}

async getLastExecutionResults(
queryId: number,
parameters?: QueryParameter[],
params: GetResultPayload = DEFAULT_GET_PARAMS,
): Promise<ResultsResponse> {
return this._get<ResultsResponse>(`query/${queryId}/results`, {
query_parameters: parameters ? parameters : [],
});
// The first bit might only return a page.
let results = await this._get<ResultsResponse>(`query/${queryId}/results`, params);
return this._fetchEntireResult(results);
}

async getLastResultCSV(
queryId: number,
parameters?: QueryParameter[],
): Promise<string> {
return this._get<string>(`query/${queryId}/results/csv`, {
query_parameters: parameters ? parameters : [],
});
params: GetResultPayload = DEFAULT_GET_PARAMS,
): Promise<ExecutionResponseCSV> {
let response = await this._get<Response>(
`query/${queryId}/results/csv`,
params,
true,
);
return this._fetchEntireResultCSV(await this.buildCSVResponse(response));
}

private async buildCSVResponse(response: Response): Promise<ExecutionResponseCSV> {
const nextOffset = response.headers.get(DUNE_CSV_NEXT_OFFSET_HEADER);
return {
data: await response.text(),
next_uri: response.headers.get(DUNE_CSV_NEXT_URI_HEADER),
next_offset: nextOffset ? parseInt(nextOffset) : undefined,
};
}

private async _fetchEntireResult(results: ResultsResponse): Promise<ResultsResponse> {
let next_uri = results.next_uri;
let batch: ResultsResponse;
while (next_uri !== undefined) {
batch = await this._getByUrl<ResultsResponse>(next_uri);
results = concatResultResponse(results, batch);
next_uri = batch.next_uri;
}
return results;
}

private async _fetchEntireResultCSV(
results: ExecutionResponseCSV,
): Promise<ExecutionResponseCSV> {
let next_uri = results.next_uri;
let batch: ExecutionResponseCSV;
while (next_uri !== null) {
batch = await this.buildCSVResponse(
await this._getByUrl<Response>(next_uri!, undefined, true),
);
results = concatResultCSV(results, batch);
next_uri = batch.next_uri;
}
return results;
}
// TODO - add getExecutionResultsCSV

/**
* @deprecated since version 0.0.2 Use executeQuery
Expand Down
58 changes: 43 additions & 15 deletions src/api/router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@ import { DuneError } from "../types";
import fetch from "cross-fetch";
import log from "loglevel";
import { logPrefix } from "../utils";
import { RequestPayload, payloadJSON } from "../types/requestPayload";
import {
RequestPayload,
payloadJSON,
payloadSearchParams,
} from "../types/requestPayload";

const BASE_URL = "https://api.dune.com/api/v1";
const BASE_URL = "https://api.dune.com/api";

enum RequestMethod {
GET = "GET",
Expand All @@ -15,9 +19,11 @@ enum RequestMethod {
// This class implements all the routes defined in the Dune API Docs: https://dune.com/docs/api/
export class Router {
private apiKey: string;
private apiVersion: string;

constructor(apiKey: string) {
constructor(apiKey: string, apiVersion: string = "v1") {
this.apiKey = apiKey;
this.apiVersion = apiVersion;
}

protected async _handleResponse<T>(responsePromise: Promise<Response>): Promise<T> {
Expand Down Expand Up @@ -60,38 +66,60 @@ export class Router {
method: RequestMethod,
url: string,
payload?: RequestPayload,
raw: boolean = false,
): Promise<T> {
const payloadData = payloadJSON(payload);
log.debug(logPrefix, `${method} received input url=${url}, payload=${payloadData}`);
const response = fetch(url, {
let requestData: RequestInit = {
method,
headers: {
"x-dune-api-key": this.apiKey,
"User-Agent": `ts-client-sdk (https://www.npmjs.com/package/@duneanalytics/client-sdk)`,
},
// conditionally add the body property
...(method !== RequestMethod.GET && {
body: payloadJSON(payload),
body: payloadData,
}),
...(method === RequestMethod.GET && {
params: payloadJSON(payload),
}),
});
};
let queryParams = "";
/// Build Url Search Parameters on GET
if (method === "GET" && payload) {
const searchParams = new URLSearchParams(payloadSearchParams(payload)).toString();
queryParams = `?${searchParams}`;
}

let response = fetch(url + queryParams, requestData);
if (raw) {
return response as T;
}
return this._handleResponse<T>(response);
}

protected async _get<T>(route: string, params?: RequestPayload): Promise<T> {
return this._request(RequestMethod.GET, this.url(route), params);
protected async _get<T>(
route: string,
params?: RequestPayload,
raw: boolean = false,
): Promise<T> {
return this._request<T>(RequestMethod.GET, this.url(route), params, raw);
}

protected async _getByUrl<T>(
url: string,
params?: RequestPayload,
raw: boolean = false,
): Promise<T> {
return this._request<T>(RequestMethod.GET, url, params, raw);
}

protected async _post<T>(route: string, params?: RequestPayload): Promise<T> {
return this._request(RequestMethod.POST, this.url(route), params);
return this._request<T>(RequestMethod.POST, this.url(route), params);
}

protected async _patch<T>(route: string, params?: RequestPayload): Promise<T> {
return this._request(RequestMethod.PATCH, this.url(route), params);
return this._request<T>(RequestMethod.PATCH, this.url(route), params);
}

private url(route: string): string {
return `${BASE_URL}/${route}`;
url(route?: string): string {
return `${BASE_URL}/${this.apiVersion}/${route}`;
}
}
Loading