Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Moved graph query to /datebin #422

Merged
merged 8 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/@types/parseable/api/query.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@ export type LogsQuery = {
access: string[] | null;
};

export type GraphQueryOpts = {
stream: string;
praveen5959 marked this conversation as resolved.
Show resolved Hide resolved
startTime: string;
endTime: string;
numBins: number;
};

export enum SortOrder {
ASCENDING = 1,
DESCENDING = -1,
Expand Down
1 change: 1 addition & 0 deletions src/api/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const parseParamsToQueryString = (params: Params) => {
// Streams Management
export const LOG_STREAM_LIST_URL = `${API_V1}/logstream`;
export const LOG_STREAMS_SCHEMA_URL = (streamName: string) => `${LOG_STREAM_LIST_URL}/${streamName}/schema`;
export const GRAPH_DATA_URL = `${API_V1}/counts`;
export const LOG_QUERY_URL = (params?: Params, resourcePath = 'query') =>
`${API_V1}/${resourcePath}` + parseParamsToQueryString(params);
export const LOG_STREAMS_ALERTS_URL = (streamName: string) => `${LOG_STREAM_LIST_URL}/${streamName}/alert`;
Expand Down
8 changes: 6 additions & 2 deletions src/api/query.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Axios } from './axios';
import { LOG_QUERY_URL } from './constants';
import { Log, LogsQuery, LogsResponseWithHeaders } from '@/@types/parseable/api/query';
import { GRAPH_DATA_URL, LOG_QUERY_URL } from './constants';
import { GraphQueryOpts, Log, LogsQuery, LogsResponseWithHeaders } from '@/@types/parseable/api/query';
import timeRangeUtils from '@/utils/timeRangeUtils';
import { CorrelationQueryBuilder, QueryBuilder } from '@/utils/queryBuilder';

Expand Down Expand Up @@ -91,3 +91,7 @@ export const getQueryResultWithHeaders = (logsQuery: LogsQuery, query = '') => {
const endPoint = LOG_QUERY_URL({ fields: true }, queryBuilder.getResourcePath());
return Axios().post<LogsResponseWithHeaders>(endPoint, makeCustomQueryRequestData(logsQuery, query), {});
};

export const getGraphData = (data: GraphQueryOpts) => {
return Axios().post<LogsResponseWithHeaders>(GRAPH_DATA_URL, data);
};
27 changes: 25 additions & 2 deletions src/hooks/useQueryResult.tsx
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { getQueryResultWithHeaders, getQueryResult } from '@/api/query';
import { LogsQuery } from '@/@types/parseable/api/query';
import { getQueryResultWithHeaders, getQueryResult, getGraphData } from '@/api/query';
import { GraphQueryOpts, LogsQuery } from '@/@types/parseable/api/query';
import { notifications } from '@mantine/notifications';
import { isAxiosError, AxiosError } from 'axios';
import { IconCheck } from '@tabler/icons-react';
Expand Down Expand Up @@ -52,6 +52,29 @@ export const useQueryResult = () => {
return { fetchQueryMutation };
};

export const useGraphData = () => {
const fetchGraphDataHandler = async (data: GraphQueryOpts) => {
const response = await getGraphData(data);
if (response.status !== 200) {
throw new Error(response.statusText);
}
return response.data;
};

const fetchGraphDataMutation = useMutation(fetchGraphDataHandler, {
onError: (data: AxiosError) => {
if (isAxiosError(data) && data.response) {
const error = data.response?.data as string;
typeof error === 'string' && notifyError({ message: error });
} else if (data.message && typeof data.message === 'string') {
notifyError({ message: data.message });
}
},
});

return { fetchGraphDataMutation };
};

export const useFetchCount = () => {
const [currentStream] = useAppStore((store) => store.currentStream);
const { setTotalCount } = logsStoreReducers;
Expand Down
194 changes: 49 additions & 145 deletions src/pages/Correlation/components/MultiEventTimeLineGraph.tsx
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Paper, Skeleton, Stack, Text } from '@mantine/core';
import classes from '../styles/Correlation.module.css';
import { useQueryResult } from '@/hooks/useQueryResult';
import { useGraphData } from '@/hooks/useQueryResult';
import { useCallback, useEffect, useMemo, useState } from 'react';
import dayjs from 'dayjs';
import { AreaChart } from '@mantine/charts';
Expand All @@ -9,20 +9,13 @@ import { appStoreReducers, useAppStore } from '@/layouts/MainLayout/providers/Ap
import { LogsResponseWithHeaders } from '@/@types/parseable/api/query';
import _ from 'lodash';
import timeRangeUtils from '@/utils/timeRangeUtils';
import { filterStoreReducers, useFilterStore } from '@/pages/Stream/providers/FilterProvider';
import { useCorrelationStore } from '../providers/CorrelationProvider';

const { parseQuery } = filterStoreReducers;
const { makeTimeRangeLabel } = timeRangeUtils;
const { setTimeRange } = appStoreReducers;

type CompactInterval = 'minute' | 'day' | 'hour' | 'quarter-hour' | 'half-hour' | 'month';

function removeOffsetFromQuery(query: string): string {
const offsetRegex = /\sOFFSET\s+\d+/i;
return query.replace(offsetRegex, '');
}

const getCompactType = (interval: number): CompactInterval => {
const totalMinutes = interval / (1000 * 60);
if (totalMinutes <= 60) {
Expand All @@ -44,74 +37,6 @@ const getCompactType = (interval: number): CompactInterval => {
}
};

const getStartOfTs = (time: Date, compactType: CompactInterval): Date => {
if (compactType === 'minute') {
return time;
} else if (compactType === 'hour') {
return new Date(time.getFullYear(), time.getMonth(), time.getDate(), time.getHours());
} else if (compactType === 'quarter-hour') {
const roundOff = 1000 * 60 * 15;
return new Date(Math.floor(time.getTime() / roundOff) * roundOff);
} else if (compactType === 'half-hour') {
const roundOff = 1000 * 60 * 30;
return new Date(Math.floor(time.getTime() / roundOff) * roundOff);
} else if (compactType === 'day') {
return new Date(time.getFullYear(), time.getMonth(), time.getDate());
} else {
return new Date(time.getFullYear(), time.getMonth());
}
};

const getEndOfTs = (time: Date, compactType: CompactInterval): Date => {
if (compactType === 'minute') {
return time;
} else if (compactType === 'hour') {
return new Date(time.getFullYear(), time.getMonth(), time.getDate(), time.getHours() + 1);
} else if (compactType === 'quarter-hour') {
const roundOff = 1000 * 60 * 15;
return new Date(Math.round(time.getTime() / roundOff) * roundOff);
} else if (compactType === 'half-hour') {
const roundOff = 1000 * 60 * 30;
return new Date(Math.round(time.getTime() / roundOff) * roundOff);
} else if (compactType === 'day') {
return new Date(time.getFullYear(), time.getMonth(), time.getDate() + 1);
} else {
return new Date(time.getFullYear(), time.getMonth() + 1);
}
};

const getModifiedTimeRange = (
startTime: Date,
endTime: Date,
interval: number,
): { modifiedStartTime: Date; modifiedEndTime: Date; compactType: CompactInterval } => {
const compactType = getCompactType(interval);
const modifiedStartTime = getStartOfTs(startTime, compactType);
const modifiedEndTime = getEndOfTs(endTime, compactType);
return { modifiedEndTime, modifiedStartTime, compactType };
};

const compactTypeIntervalMap = {
minute: '1 minute',
hour: '1 hour',
day: '24 hour',
'quarter-hour': '15 minute',
'half-hour': '30 minute',
month: '1 month',
};

const generateCountQuery = (
streamName: string,
startTime: Date,
endTime: Date,
compactType: CompactInterval,
whereClause: string,
) => {
const range = compactTypeIntervalMap[compactType];
/* eslint-disable no-useless-escape */
return `SELECT DATE_BIN('${range}', p_timestamp, '${startTime.toISOString()}') AS date_bin_timestamp, COUNT(*) AS log_count FROM \"${streamName}\" WHERE p_timestamp BETWEEN '${startTime.toISOString()}' AND '${endTime.toISOString()}' AND ${whereClause} GROUP BY date_bin_timestamp ORDER BY date_bin_timestamp`;
};

const NoDataView = (props: { isError: boolean }) => {
return (
<Stack style={{ width: '100%', height: '100%', alignItems: 'center', justifyContent: 'center' }}>
Expand All @@ -136,38 +61,6 @@ const calcAverage = (data: LogsResponseWithHeaders | undefined) => {
return parseInt(Math.abs(total / records.length).toFixed(0));
};

const getAllIntervals = (start: Date, end: Date, compactType: CompactInterval): Date[] => {
const result = [];
let currentDate = new Date(start);

while (currentDate <= end) {
result.push(new Date(currentDate));
currentDate = incrementDateByCompactType(currentDate, compactType);
}

return result;
};

const incrementDateByCompactType = (date: Date, type: CompactInterval): Date => {
const tempDate = new Date(date);
if (type === 'minute') {
tempDate.setMinutes(tempDate.getMinutes() + 1);
} else if (type === 'hour') {
tempDate.setHours(tempDate.getHours() + 1);
} else if (type === 'day') {
tempDate.setDate(tempDate.getDate() + 1);
} else if (type === 'quarter-hour') {
tempDate.setMinutes(tempDate.getMinutes() + 15);
} else if (type === 'half-hour') {
tempDate.setMinutes(tempDate.getMinutes() + 30);
} else if (type === 'month') {
tempDate.setMonth(tempDate.getMonth() + 1);
} else {
tempDate;
}
return new Date(tempDate);
};

type GraphTickItem = {
events: number;
minute: Date;
Expand Down Expand Up @@ -243,6 +136,11 @@ function ChartTooltip({ payload, series }: ChartTooltipProps) {
);
}

type LogRecord = {
counts_timestamp: string;
log_count: number;
};

// date_bin removes tz info
// filling data with empty values where there is no rec
const parseGraphData = (
Expand All @@ -256,39 +154,53 @@ const parseGraphData = (
const firstResponse = dataSets[0]?.records || [];
const secondResponse = dataSets[1]?.records || [];

const { modifiedEndTime, modifiedStartTime, compactType } = getModifiedTimeRange(startTime, endTime, interval);
const allTimestamps = getAllIntervals(modifiedStartTime, modifiedEndTime, compactType);
const compactType = getCompactType(interval);
const ticksCount = interval < 10 * 60 * 1000 ? interval / (60 * 1000) : interval < 60 * 60 * 1000 ? 10 : 60;
const intervalDuration = (endTime.getTime() - startTime.getTime()) / ticksCount;

const allTimestamps = Array.from(
{ length: ticksCount },
(_, index) => new Date(startTime.getTime() + index * intervalDuration),
);

const hasSecondDataset = dataSets[1] !== undefined;

const isValidRecord = (record: any): record is LogRecord => {
return typeof record.counts_timestamp === 'string' && typeof record.log_count === 'number';
};

const secondResponseMap =
secondResponse.length > 0
? new Map(
secondResponse.map((entry) => [new Date(`${entry.date_bin_timestamp}Z`).toISOString(), entry.log_count]),
secondResponse
.filter((entry) => isValidRecord(entry))
.map((entry) => {
const timestamp = entry.counts_timestamp;
if (timestamp != null) {
return [new Date(timestamp).getTime(), entry.log_count];
}
return null;
})
.filter((entry): entry is [number, number] => entry !== null),
)
: new Map();
const calculateTimeRange = (timestamp: Date | string) => {
const startTime = dayjs(timestamp);
const endTimeByCompactType = incrementDateByCompactType(startTime.toDate(), compactType);
const endTime = dayjs(endTimeByCompactType);
return { startTime, endTime };
};

const combinedData = allTimestamps.map((ts) => {
const firstRecord = firstResponse.find((record) => {
const recordTimestamp = new Date(`${record.date_bin_timestamp}Z`).toISOString();
const tsISO = ts.toISOString();
if (!isValidRecord(record)) return false;
const recordTimestamp = new Date(record.counts_timestamp).getTime();
const tsISO = ts.getTime();
return recordTimestamp === tsISO;
});

const secondCount = secondResponseMap?.get(ts.toISOString()) ?? 0;
const { startTime, endTime } = calculateTimeRange(ts);

const defaultOpts: Record<string, any> = {
stream: firstRecord?.log_count || 0,
minute: ts,
compactType,
startTime,
endTime,
startTime: dayjs(ts),
endTime: dayjs(new Date(ts.getTime() + intervalDuration)),
};

if (hasSecondDataset) {
Expand All @@ -302,9 +214,8 @@ const parseGraphData = (
};

const MultiEventTimeLineGraph = () => {
const { fetchQueryMutation } = useQueryResult();
const { fetchGraphDataMutation } = useGraphData();
const [fields] = useCorrelationStore((store) => store.fields);
const [appliedQuery] = useFilterStore((store) => store.appliedQuery);
const [streamData] = useCorrelationStore((store) => store.streamData);
const [timeRange] = useAppStore((store) => store.timeRange);
const [multipleStreamData, setMultipleStreamData] = useState<{ [key: string]: any }>({});
Expand Down Expand Up @@ -334,30 +245,23 @@ const MultiEventTimeLineGraph = () => {

const streamNames = Object.keys(fields);
const streamsToFetch = streamNames.filter((streamName) => !Object.keys(streamData).includes(streamName));
const queries = streamsToFetch.map((streamKey) => {
const { modifiedEndTime, modifiedStartTime, compactType } = getModifiedTimeRange(startTime, endTime, interval);
const totalMinutes = interval / (1000 * 60);
const numBins = Math.trunc(totalMinutes < 10 ? totalMinutes : totalMinutes < 60 ? 10 : 60);
const eventTimeLineGraphOpts = streamsToFetch.map((streamKey) => {
const logsQuery = {
startTime: modifiedStartTime,
endTime: modifiedEndTime,
access: [],
};
const whereClause = parseQuery(appliedQuery, streamKey).where;
const query = generateCountQuery(streamKey, modifiedStartTime, modifiedEndTime, compactType, whereClause);
const graphQuery = removeOffsetFromQuery(query);

return {
queryEngine: 'Parseable',
logsQuery,
query: graphQuery,
streamKey,
stream: streamKey,
startTime: dayjs(startTime).toISOString(),
endTime: dayjs(endTime).add(1, 'minute').toISOString(),
numBins,
};
return logsQuery;
});
Promise.all(queries.map((queryData: any) => fetchQueryMutation.mutateAsync(queryData)))
Promise.all(eventTimeLineGraphOpts.map((queryData: any) => fetchGraphDataMutation.mutateAsync(queryData)))
.then((results) => {
setMultipleStreamData((prevData: any) => {
const newData = { ...prevData };
results.forEach((result, index) => {
newData[queries[index].streamKey] = result;
newData[eventTimeLineGraphOpts[index].stream] = result;
});
return newData;
});
Expand All @@ -367,8 +271,8 @@ const MultiEventTimeLineGraph = () => {
});
}, [fields, timeRange]);

const isLoading = fetchQueryMutation.isLoading;
const avgEventCount = useMemo(() => calcAverage(fetchQueryMutation?.data), [fetchQueryMutation?.data]);
const isLoading = fetchGraphDataMutation.isLoading;
const avgEventCount = useMemo(() => calcAverage(fetchGraphDataMutation?.data), [fetchGraphDataMutation?.data]);
const graphData = useMemo(() => {
if (!streamGraphData || streamGraphData.length === 0 || streamGraphData.length !== Object.keys(fields).length)
return [];
Expand All @@ -394,7 +298,7 @@ const MultiEventTimeLineGraph = () => {
return (
<Stack className={classes.graphContainer}>
<Skeleton
visible={fetchQueryMutation.isLoading}
visible={fetchGraphDataMutation.isLoading}
h="100%"
w={isLoading ? '98%' : '100%'}
style={isLoading ? { marginLeft: '1.8rem', alignSelf: 'center' } : !hasData ? { marginLeft: '1rem' } : {}}>
Expand Down Expand Up @@ -433,7 +337,7 @@ const MultiEventTimeLineGraph = () => {
dotProps={{ strokeWidth: 1, r: 2.5 }}
/>
) : (
<NoDataView isError={fetchQueryMutation.isError} />
<NoDataView isError={fetchGraphDataMutation.isError} />
)}
</Skeleton>
</Stack>
Expand Down
Loading
Loading