Skip to content

Commit

Permalink
add limit for max data points
Browse files Browse the repository at this point in the history
  • Loading branch information
ankitsultana committed Nov 20, 2024
1 parent 77ea25a commit e2dba55
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@ public class TimeSeriesAggResultsBlockMerger implements ResultsBlockMerger<TimeS
private final TimeSeriesBuilderFactory _seriesBuilderFactory;
private final AggInfo _aggInfo;
private final int _maxSeriesLimit;
private final long _maxDataPointsLimit;

public TimeSeriesAggResultsBlockMerger(TimeSeriesBuilderFactory seriesBuilderFactory, AggInfo aggInfo) {
_seriesBuilderFactory = seriesBuilderFactory;
_aggInfo = aggInfo;
_maxSeriesLimit = _seriesBuilderFactory.getMaxUniqueSeriesPerServerLimit();
_maxDataPointsLimit = _seriesBuilderFactory.getMaxDataPointsPerServerLimit();
}

@Override
Expand All @@ -47,7 +49,12 @@ public void mergeResultsBlocks(TimeSeriesResultsBlock mergedBlock, TimeSeriesRes
BaseTimeSeriesBuilder newTimeSeriesToMerge = entry.getValue();
if (currentTimeSeriesBuilder == null) {
currentTimeSeriesBlock.getSeriesBuilderMap().put(seriesHash, newTimeSeriesToMerge);
Preconditions.checkState(currentTimeSeriesBlock.getSeriesBuilderMap().size() <= _maxSeriesLimit,
final long currentUniqueSeries = currentTimeSeriesBlock.getSeriesBuilderMap().size();
final long numBuckets = currentTimeSeriesBlock.getTimeBuckets().getNumBuckets();
Preconditions.checkState(currentUniqueSeries * numBuckets <= _maxDataPointsLimit,
"Max data points limit reached in combine operator. Limit: %s. Current count: %s",
_maxDataPointsLimit, currentUniqueSeries * numBuckets);
Preconditions.checkState(currentUniqueSeries <= _maxSeriesLimit,
"Max series limit reached in combine operator. Limit: %s. Current Size: %s",
_maxSeriesLimit, currentTimeSeriesBlock.getSeriesBuilderMap().size());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class TimeSeriesAggregationOperator extends BaseOperator<TimeSeriesResult
private final TimeBuckets _timeBuckets;
private final TimeSeriesBuilderFactory _seriesBuilderFactory;
private final int _maxSeriesLimit;
private final long _maxDataPointsLimit;
private final long _numTotalDocs;
private long _numDocsScanned = 0;

Expand All @@ -86,6 +87,7 @@ public TimeSeriesAggregationOperator(
_timeBuckets = timeBuckets;
_seriesBuilderFactory = seriesBuilderFactory;
_maxSeriesLimit = _seriesBuilderFactory.getMaxUniqueSeriesPerServerLimit();
_maxDataPointsLimit = _seriesBuilderFactory.getMaxDataPointsPerServerLimit();
_numTotalDocs = segmentMetadata.getTotalDocs();
}

Expand Down Expand Up @@ -138,6 +140,9 @@ protected TimeSeriesResultsBlock getNextBlock() {
throw new IllegalStateException(
"Don't yet support value expression of type: " + valueExpressionBlockValSet.getValueType());
}
Preconditions.checkState(seriesBuilderMap.size() * (long) _timeBuckets.getNumBuckets() <= _maxDataPointsLimit,
"Query exceed max data point limit per server. Limit: %s. Data points in current segment so far: %s",
_maxDataPointsLimit, seriesBuilderMap.size() * _timeBuckets.getNumBuckets());
Preconditions.checkState(seriesBuilderMap.size() <= _maxSeriesLimit,
"Query exceeded max unique series limit per server. Limit: %s. Series in current segment so far: %s",
_maxSeriesLimit, seriesBuilderMap.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@

public abstract class TimeSeriesBuilderFactory {
private static final int DEFAULT_MAX_UNIQUE_SERIES_PER_SERVER_LIMIT = 100_000;
/**
* Default limit for the total number of values across all series.
*/
private static final long DEFAULT_MAX_DATA_POINTS_PER_SERVER_LIMIT = 100_000_000;

public abstract BaseTimeSeriesBuilder newTimeSeriesBuilder(
AggInfo aggInfo,
Expand All @@ -38,5 +42,9 @@ public int getMaxUniqueSeriesPerServerLimit() {
return DEFAULT_MAX_UNIQUE_SERIES_PER_SERVER_LIMIT;
}

public long getMaxDataPointsPerServerLimit() {
return DEFAULT_MAX_DATA_POINTS_PER_SERVER_LIMIT;
}

public abstract void init(PinotConfiguration pinotConfiguration);
}

0 comments on commit e2dba55

Please sign in to comment.