Skip to content

Commit

Permalink
fix: moved graph query to new counts API (#422)
Browse files Browse the repository at this point in the history
This PR moves all the count related graph queries
to the new counts API implemented here
parseablehq/parseable#1103
  • Loading branch information
praveen5959 authored Jan 16, 2025
1 parent 328e830 commit 2d1cd74
Show file tree
Hide file tree
Showing 6 changed files with 151 additions and 308 deletions.
7 changes: 7 additions & 0 deletions src/@types/parseable/api/query.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@ export type LogsQuery = {
access: string[] | null;
};

export type GraphQueryOpts = {
stream: string;
startTime: string;
endTime: string;
numBins: number;
};

export enum SortOrder {
ASCENDING = 1,
DESCENDING = -1,
Expand Down
1 change: 1 addition & 0 deletions src/api/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const parseParamsToQueryString = (params: Params) => {
// Streams Management
export const LOG_STREAM_LIST_URL = `${API_V1}/logstream`;
export const LOG_STREAMS_SCHEMA_URL = (streamName: string) => `${LOG_STREAM_LIST_URL}/${streamName}/schema`;
export const GRAPH_DATA_URL = `${API_V1}/counts`;
export const LOG_QUERY_URL = (params?: Params, resourcePath = 'query') =>
`${API_V1}/${resourcePath}` + parseParamsToQueryString(params);
export const LOG_STREAMS_ALERTS_URL = (streamName: string) => `${LOG_STREAM_LIST_URL}/${streamName}/alert`;
Expand Down
8 changes: 6 additions & 2 deletions src/api/query.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Axios } from './axios';
import { LOG_QUERY_URL } from './constants';
import { Log, LogsQuery, LogsResponseWithHeaders } from '@/@types/parseable/api/query';
import { GRAPH_DATA_URL, LOG_QUERY_URL } from './constants';
import { GraphQueryOpts, Log, LogsQuery, LogsResponseWithHeaders } from '@/@types/parseable/api/query';
import timeRangeUtils from '@/utils/timeRangeUtils';
import { CorrelationQueryBuilder, QueryBuilder } from '@/utils/queryBuilder';

Expand Down Expand Up @@ -91,3 +91,7 @@ export const getQueryResultWithHeaders = (logsQuery: LogsQuery, query = '') => {
const endPoint = LOG_QUERY_URL({ fields: true }, queryBuilder.getResourcePath());
return Axios().post<LogsResponseWithHeaders>(endPoint, makeCustomQueryRequestData(logsQuery, query), {});
};

export const getGraphData = (data: GraphQueryOpts) => {
return Axios().post<LogsResponseWithHeaders>(GRAPH_DATA_URL, data);
};
27 changes: 25 additions & 2 deletions src/hooks/useQueryResult.tsx
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { getQueryResultWithHeaders, getQueryResult } from '@/api/query';
import { LogsQuery } from '@/@types/parseable/api/query';
import { getQueryResultWithHeaders, getQueryResult, getGraphData } from '@/api/query';
import { GraphQueryOpts, LogsQuery } from '@/@types/parseable/api/query';
import { notifications } from '@mantine/notifications';
import { isAxiosError, AxiosError } from 'axios';
import { IconCheck } from '@tabler/icons-react';
Expand Down Expand Up @@ -52,6 +52,29 @@ export const useQueryResult = () => {
return { fetchQueryMutation };
};

export const useGraphData = () => {
const fetchGraphDataHandler = async (data: GraphQueryOpts) => {
const response = await getGraphData(data);
if (response.status !== 200) {
throw new Error(response.statusText);
}
return response.data;
};

const fetchGraphDataMutation = useMutation(fetchGraphDataHandler, {
onError: (data: AxiosError) => {
if (isAxiosError(data) && data.response) {
const error = data.response?.data as string;
typeof error === 'string' && notifyError({ message: error });
} else if (data.message && typeof data.message === 'string') {
notifyError({ message: data.message });
}
},
});

return { fetchGraphDataMutation };
};

export const useFetchCount = () => {
const [currentStream] = useAppStore((store) => store.currentStream);
const { setTotalCount } = logsStoreReducers;
Expand Down
194 changes: 49 additions & 145 deletions src/pages/Correlation/components/MultiEventTimeLineGraph.tsx
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Paper, Skeleton, Stack, Text } from '@mantine/core';
import classes from '../styles/Correlation.module.css';
import { useQueryResult } from '@/hooks/useQueryResult';
import { useGraphData } from '@/hooks/useQueryResult';
import { useCallback, useEffect, useMemo, useState } from 'react';
import dayjs from 'dayjs';
import { AreaChart } from '@mantine/charts';
Expand All @@ -9,20 +9,13 @@ import { appStoreReducers, useAppStore } from '@/layouts/MainLayout/providers/Ap
import { LogsResponseWithHeaders } from '@/@types/parseable/api/query';
import _ from 'lodash';
import timeRangeUtils from '@/utils/timeRangeUtils';
import { filterStoreReducers, useFilterStore } from '@/pages/Stream/providers/FilterProvider';
import { useCorrelationStore } from '../providers/CorrelationProvider';

const { parseQuery } = filterStoreReducers;
const { makeTimeRangeLabel } = timeRangeUtils;
const { setTimeRange } = appStoreReducers;

type CompactInterval = 'minute' | 'day' | 'hour' | 'quarter-hour' | 'half-hour' | 'month';

function removeOffsetFromQuery(query: string): string {
const offsetRegex = /\sOFFSET\s+\d+/i;
return query.replace(offsetRegex, '');
}

const getCompactType = (interval: number): CompactInterval => {
const totalMinutes = interval / (1000 * 60);
if (totalMinutes <= 60) {
Expand All @@ -44,74 +37,6 @@ const getCompactType = (interval: number): CompactInterval => {
}
};

const getStartOfTs = (time: Date, compactType: CompactInterval): Date => {
if (compactType === 'minute') {
return time;
} else if (compactType === 'hour') {
return new Date(time.getFullYear(), time.getMonth(), time.getDate(), time.getHours());
} else if (compactType === 'quarter-hour') {
const roundOff = 1000 * 60 * 15;
return new Date(Math.floor(time.getTime() / roundOff) * roundOff);
} else if (compactType === 'half-hour') {
const roundOff = 1000 * 60 * 30;
return new Date(Math.floor(time.getTime() / roundOff) * roundOff);
} else if (compactType === 'day') {
return new Date(time.getFullYear(), time.getMonth(), time.getDate());
} else {
return new Date(time.getFullYear(), time.getMonth());
}
};

const getEndOfTs = (time: Date, compactType: CompactInterval): Date => {
if (compactType === 'minute') {
return time;
} else if (compactType === 'hour') {
return new Date(time.getFullYear(), time.getMonth(), time.getDate(), time.getHours() + 1);
} else if (compactType === 'quarter-hour') {
const roundOff = 1000 * 60 * 15;
return new Date(Math.round(time.getTime() / roundOff) * roundOff);
} else if (compactType === 'half-hour') {
const roundOff = 1000 * 60 * 30;
return new Date(Math.round(time.getTime() / roundOff) * roundOff);
} else if (compactType === 'day') {
return new Date(time.getFullYear(), time.getMonth(), time.getDate() + 1);
} else {
return new Date(time.getFullYear(), time.getMonth() + 1);
}
};

const getModifiedTimeRange = (
startTime: Date,
endTime: Date,
interval: number,
): { modifiedStartTime: Date; modifiedEndTime: Date; compactType: CompactInterval } => {
const compactType = getCompactType(interval);
const modifiedStartTime = getStartOfTs(startTime, compactType);
const modifiedEndTime = getEndOfTs(endTime, compactType);
return { modifiedEndTime, modifiedStartTime, compactType };
};

const compactTypeIntervalMap = {
minute: '1 minute',
hour: '1 hour',
day: '24 hour',
'quarter-hour': '15 minute',
'half-hour': '30 minute',
month: '1 month',
};

const generateCountQuery = (
streamName: string,
startTime: Date,
endTime: Date,
compactType: CompactInterval,
whereClause: string,
) => {
const range = compactTypeIntervalMap[compactType];
/* eslint-disable no-useless-escape */
return `SELECT DATE_BIN('${range}', p_timestamp, '${startTime.toISOString()}') AS date_bin_timestamp, COUNT(*) AS log_count FROM \"${streamName}\" WHERE p_timestamp BETWEEN '${startTime.toISOString()}' AND '${endTime.toISOString()}' AND ${whereClause} GROUP BY date_bin_timestamp ORDER BY date_bin_timestamp`;
};

const NoDataView = (props: { isError: boolean }) => {
return (
<Stack style={{ width: '100%', height: '100%', alignItems: 'center', justifyContent: 'center' }}>
Expand All @@ -136,38 +61,6 @@ const calcAverage = (data: LogsResponseWithHeaders | undefined) => {
return parseInt(Math.abs(total / records.length).toFixed(0));
};

const getAllIntervals = (start: Date, end: Date, compactType: CompactInterval): Date[] => {
const result = [];
let currentDate = new Date(start);

while (currentDate <= end) {
result.push(new Date(currentDate));
currentDate = incrementDateByCompactType(currentDate, compactType);
}

return result;
};

const incrementDateByCompactType = (date: Date, type: CompactInterval): Date => {
const tempDate = new Date(date);
if (type === 'minute') {
tempDate.setMinutes(tempDate.getMinutes() + 1);
} else if (type === 'hour') {
tempDate.setHours(tempDate.getHours() + 1);
} else if (type === 'day') {
tempDate.setDate(tempDate.getDate() + 1);
} else if (type === 'quarter-hour') {
tempDate.setMinutes(tempDate.getMinutes() + 15);
} else if (type === 'half-hour') {
tempDate.setMinutes(tempDate.getMinutes() + 30);
} else if (type === 'month') {
tempDate.setMonth(tempDate.getMonth() + 1);
} else {
tempDate;
}
return new Date(tempDate);
};

type GraphTickItem = {
events: number;
minute: Date;
Expand Down Expand Up @@ -243,6 +136,11 @@ function ChartTooltip({ payload, series }: ChartTooltipProps) {
);
}

type LogRecord = {
counts_timestamp: string;
log_count: number;
};

// date_bin removes tz info
// filling data with empty values where there is no rec
const parseGraphData = (
Expand All @@ -256,39 +154,53 @@ const parseGraphData = (
const firstResponse = dataSets[0]?.records || [];
const secondResponse = dataSets[1]?.records || [];

const { modifiedEndTime, modifiedStartTime, compactType } = getModifiedTimeRange(startTime, endTime, interval);
const allTimestamps = getAllIntervals(modifiedStartTime, modifiedEndTime, compactType);
const compactType = getCompactType(interval);
const ticksCount = interval < 10 * 60 * 1000 ? interval / (60 * 1000) : interval < 60 * 60 * 1000 ? 10 : 60;
const intervalDuration = (endTime.getTime() - startTime.getTime()) / ticksCount;

const allTimestamps = Array.from(
{ length: ticksCount },
(_, index) => new Date(startTime.getTime() + index * intervalDuration),
);

const hasSecondDataset = dataSets[1] !== undefined;

const isValidRecord = (record: any): record is LogRecord => {
return typeof record.counts_timestamp === 'string' && typeof record.log_count === 'number';
};

const secondResponseMap =
secondResponse.length > 0
? new Map(
secondResponse.map((entry) => [new Date(`${entry.date_bin_timestamp}Z`).toISOString(), entry.log_count]),
secondResponse
.filter((entry) => isValidRecord(entry))
.map((entry) => {
const timestamp = entry.counts_timestamp;
if (timestamp != null) {
return [new Date(timestamp).getTime(), entry.log_count];
}
return null;
})
.filter((entry): entry is [number, number] => entry !== null),
)
: new Map();
const calculateTimeRange = (timestamp: Date | string) => {
const startTime = dayjs(timestamp);
const endTimeByCompactType = incrementDateByCompactType(startTime.toDate(), compactType);
const endTime = dayjs(endTimeByCompactType);
return { startTime, endTime };
};

const combinedData = allTimestamps.map((ts) => {
const firstRecord = firstResponse.find((record) => {
const recordTimestamp = new Date(`${record.date_bin_timestamp}Z`).toISOString();
const tsISO = ts.toISOString();
if (!isValidRecord(record)) return false;
const recordTimestamp = new Date(record.counts_timestamp).getTime();
const tsISO = ts.getTime();
return recordTimestamp === tsISO;
});

const secondCount = secondResponseMap?.get(ts.toISOString()) ?? 0;
const { startTime, endTime } = calculateTimeRange(ts);

const defaultOpts: Record<string, any> = {
stream: firstRecord?.log_count || 0,
minute: ts,
compactType,
startTime,
endTime,
startTime: dayjs(ts),
endTime: dayjs(new Date(ts.getTime() + intervalDuration)),
};

if (hasSecondDataset) {
Expand All @@ -302,9 +214,8 @@ const parseGraphData = (
};

const MultiEventTimeLineGraph = () => {
const { fetchQueryMutation } = useQueryResult();
const { fetchGraphDataMutation } = useGraphData();
const [fields] = useCorrelationStore((store) => store.fields);
const [appliedQuery] = useFilterStore((store) => store.appliedQuery);
const [streamData] = useCorrelationStore((store) => store.streamData);
const [timeRange] = useAppStore((store) => store.timeRange);
const [multipleStreamData, setMultipleStreamData] = useState<{ [key: string]: any }>({});
Expand Down Expand Up @@ -334,30 +245,23 @@ const MultiEventTimeLineGraph = () => {

const streamNames = Object.keys(fields);
const streamsToFetch = streamNames.filter((streamName) => !Object.keys(streamData).includes(streamName));
const queries = streamsToFetch.map((streamKey) => {
const { modifiedEndTime, modifiedStartTime, compactType } = getModifiedTimeRange(startTime, endTime, interval);
const totalMinutes = interval / (1000 * 60);
const numBins = Math.trunc(totalMinutes < 10 ? totalMinutes : totalMinutes < 60 ? 10 : 60);
const eventTimeLineGraphOpts = streamsToFetch.map((streamKey) => {
const logsQuery = {
startTime: modifiedStartTime,
endTime: modifiedEndTime,
access: [],
};
const whereClause = parseQuery(appliedQuery, streamKey).where;
const query = generateCountQuery(streamKey, modifiedStartTime, modifiedEndTime, compactType, whereClause);
const graphQuery = removeOffsetFromQuery(query);

return {
queryEngine: 'Parseable',
logsQuery,
query: graphQuery,
streamKey,
stream: streamKey,
startTime: dayjs(startTime).toISOString(),
endTime: dayjs(endTime).add(1, 'minute').toISOString(),
numBins,
};
return logsQuery;
});
Promise.all(queries.map((queryData: any) => fetchQueryMutation.mutateAsync(queryData)))
Promise.all(eventTimeLineGraphOpts.map((queryData: any) => fetchGraphDataMutation.mutateAsync(queryData)))
.then((results) => {
setMultipleStreamData((prevData: any) => {
const newData = { ...prevData };
results.forEach((result, index) => {
newData[queries[index].streamKey] = result;
newData[eventTimeLineGraphOpts[index].stream] = result;
});
return newData;
});
Expand All @@ -367,8 +271,8 @@ const MultiEventTimeLineGraph = () => {
});
}, [fields, timeRange]);

const isLoading = fetchQueryMutation.isLoading;
const avgEventCount = useMemo(() => calcAverage(fetchQueryMutation?.data), [fetchQueryMutation?.data]);
const isLoading = fetchGraphDataMutation.isLoading;
const avgEventCount = useMemo(() => calcAverage(fetchGraphDataMutation?.data), [fetchGraphDataMutation?.data]);
const graphData = useMemo(() => {
if (!streamGraphData || streamGraphData.length === 0 || streamGraphData.length !== Object.keys(fields).length)
return [];
Expand All @@ -394,7 +298,7 @@ const MultiEventTimeLineGraph = () => {
return (
<Stack className={classes.graphContainer}>
<Skeleton
visible={fetchQueryMutation.isLoading}
visible={fetchGraphDataMutation.isLoading}
h="100%"
w={isLoading ? '98%' : '100%'}
style={isLoading ? { marginLeft: '1.8rem', alignSelf: 'center' } : !hasData ? { marginLeft: '1rem' } : {}}>
Expand Down Expand Up @@ -433,7 +337,7 @@ const MultiEventTimeLineGraph = () => {
dotProps={{ strokeWidth: 1, r: 2.5 }}
/>
) : (
<NoDataView isError={fetchQueryMutation.isError} />
<NoDataView isError={fetchGraphDataMutation.isError} />
)}
</Skeleton>
</Stack>
Expand Down
Loading

0 comments on commit 2d1cd74

Please sign in to comment.