Skip to content

Commit

Permalink
Add FederatedQueryPlanner (#2216)
Browse files Browse the repository at this point in the history
* Enrich 'i' and 'ri' rows in metadata table with event date

Modify the generation of 'i' (indexed rows) and 'ri' (reverse indexed
rows) in the metadata table such that the column qualifier contains the
event date. This is required as a first step to support efforts for
issue #825 so that we can identify dates when an event was ingested and
included in a frequency count for an associated 'f' row, but was not
indexed.

* Add counts to 'i' and 'ri' rows

* Initial federated query planner implementation

* code formatting

* Fixed issues with FederatedQueryIterable

* Fix test failures

* Fix failing tests

* Additional test fixes

* pr feedback

* Use new MetadataHelper function version

* Extract fields to filter index holes

* Correct logic for determining sub date ranges

* Remove unnecessary check

* code formatting

* Add check for null query model

* Limit config arg to function scope

* Update metadata-utils submodule commit

* code formatting

* Fix failing tests

* Additional test fixes

* Ensure all original tests pass

* Add federated planner tests and chained schedulers

* pr feedback

* metadata-utils 3.0.3 tag

* Fixed the index hole data ingest to set appropriate time stamps on the keys
Removed some of the code which I believe was trying to diagnose the test issues

* Updated applyModel to use the passed in script

* Remove unneeded changes

* Make FederatedQueryPlanner the default

* Restore original log4j.properties

* code formatting

* Fix QueryPlanTest

* Updated to test with teardown

* Test debugging edits

* Updated formatting

* Concatenate sub-plans

* Make FederatedQueryPlanner implement Cloneable

* code formatting

* * Updated with metadata-utils 4.0.5 (index markers and avoid non-indexed fields for holes)
* Fixed test cases with correct responses and periodic failing test cases
* Updated AncestorQueryLogic to handle federate query planner

* * Allow subclasses of ShardQueryConfiguration

* Updated to throw a NoResultsException for am empty query.

* import reorg

* Updated to avoid expanding unfielded if disabled, and to assume no index holes if no query fields.

* Add tests for default query planner with ne and not-eq

* Revert changes to test data format

* Revert changes to log4j.properties

* Ensure query plan updated after any exception type

* Revert all changes to test data format

---------

Co-authored-by: Ivan Bella <[email protected]>
Co-authored-by: hgklohr <[email protected]>
  • Loading branch information
3 people authored Sep 9, 2024
1 parent 6ff65b9 commit 04336c4
Show file tree
Hide file tree
Showing 27 changed files with 3,105 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,25 +96,36 @@ public GenericQueryConfiguration(BaseQueryLogic<?> configuredLogic) {
this(configuredLogic.getConfig());
}

public GenericQueryConfiguration(GenericQueryConfiguration genericConfig) {
this.setQuery(genericConfig.getQuery());
this.setCheckpointable(genericConfig.isCheckpointable());
this.setBaseIteratorPriority(genericConfig.getBaseIteratorPriority());
this.setBypassAccumulo(genericConfig.getBypassAccumulo());
this.setAccumuloPassword(genericConfig.getAccumuloPassword());
this.setConnPoolName(genericConfig.getConnPoolName());
this.setAuthorizations(genericConfig.getAuthorizations());
this.setBeginDate(genericConfig.getBeginDate());
this.setClient(genericConfig.getClient());
this.setEndDate(genericConfig.getEndDate());
this.setMaxWork(genericConfig.getMaxWork());
this.setQueries(genericConfig.getQueries());
this.setQueriesIter(genericConfig.getQueriesIter());
this.setQueryString(genericConfig.getQueryString());
this.setTableName(genericConfig.getTableName());
this.setReduceResults(genericConfig.isReduceResults());
this.setTableConsistencyLevels(genericConfig.getTableConsistencyLevels());
this.setTableHints(genericConfig.getTableHints());
@SuppressWarnings("CopyConstructorMissesField")
public GenericQueryConfiguration(GenericQueryConfiguration other) {
copyFrom(other);
}

/**
* Deeply copies over all fields from the given {@link GenericQueryConfiguration} to this {@link GenericQueryConfiguration}.
*
* @param other
* the {@link GenericQueryConfiguration} to copy values from
*/
public void copyFrom(GenericQueryConfiguration other) {
this.setQuery(other.getQuery());
this.setCheckpointable(other.isCheckpointable());
this.setBaseIteratorPriority(other.getBaseIteratorPriority());
this.setBypassAccumulo(other.getBypassAccumulo());
this.setAccumuloPassword(other.getAccumuloPassword());
this.setConnPoolName(other.getConnPoolName());
this.setAuthorizations(other.getAuthorizations());
this.setBeginDate(other.getBeginDate());
this.setClient(other.getClient());
this.setEndDate(other.getEndDate());
this.setMaxWork(other.getMaxWork());
this.setQueries(other.getQueries());
this.setQueriesIter(other.getQueriesIter());
this.setQueryString(other.getQueryString());
this.setTableName(other.getTableName());
this.setReduceResults(other.isReduceResults());
this.setTableConsistencyLevels(other.getTableConsistencyLevels());
this.setTableHints(other.getTableHints());
}

public Collection<QueryData> getQueries() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,12 @@ public class ShardQueryConfiguration extends GenericQueryConfiguration implement
*/
private boolean sortQueryByCounts = false;

/**
* The minimum percentage threshold that the count for an index row must meet compared to the count for the corresponding frequency row in the metadata
* table in order to NOT be considered a field index hole. The value must be between 0.0-1.0, where 1.0 is equivalent to 100%.
*/
private double fieldIndexHoleMinThreshold = 1.0d;

/**
* Default constructor
*/
Expand All @@ -506,10 +512,20 @@ public ShardQueryConfiguration() {
* @param other
* - another ShardQueryConfiguration instance
*/
@SuppressWarnings("CopyConstructorMissesField")
public ShardQueryConfiguration(ShardQueryConfiguration other) {
copyFrom(other);
}

/**
* Deeply copies over all fields from the given {@link ShardQueryConfiguration} to this {@link ShardQueryConfiguration}.
*
* @param other
* the {@link ShardQueryConfiguration} to copy values from
*/
public void copyFrom(ShardQueryConfiguration other) {
// GenericQueryConfiguration copy first
super(other);
super.copyFrom(other);

// ShardQueryConfiguration copy
this.setCheckpointable(other.isCheckpointable());
Expand Down Expand Up @@ -716,6 +732,7 @@ public ShardQueryConfiguration(ShardQueryConfiguration other) {
this.setUseTermCounts(other.getUseTermCounts());
this.setSortQueryBeforeGlobalIndex(other.isSortQueryBeforeGlobalIndex());
this.setSortQueryByCounts(other.isSortQueryByCounts());
this.setFieldIndexHoleMinThreshold(other.getFieldIndexHoleMinThreshold());
}

/**
Expand Down Expand Up @@ -2055,6 +2072,14 @@ public QueryStopwatch getTimers() {
return timers;
}

public void setTimers(QueryStopwatch timers) {
this.timers = timers;
}

public void appendTimers(QueryStopwatch timers) {
this.timers.appendTimers(timers);
}

public ASTJexlScript getQueryTree() {
return queryTree;
}
Expand Down Expand Up @@ -2688,6 +2713,14 @@ public void setRebuildDatatypeFilterPerShard(boolean rebuildDatatypeFilterPerSha
this.rebuildDatatypeFilterPerShard = rebuildDatatypeFilterPerShard;
}

public double getFieldIndexHoleMinThreshold() {
return fieldIndexHoleMinThreshold;
}

public void setFieldIndexHoleMinThreshold(double fieldIndexHoleMinThreshold) {
this.fieldIndexHoleMinThreshold = fieldIndexHoleMinThreshold;
}

public boolean getReduceIngestTypes() {
return reduceIngestTypes;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ protected DefaultQueryPlanner(DefaultQueryPlanner other) {
setSourceLimit(other.sourceLimit);
setPushdownThreshold(other.getPushdownThreshold());
setVisitorManager(other.getVisitorManager());
setTransformRules(other.getTransformRules() == null ? null : new ArrayList<>(other.transformRules));
}

public void setMetadataHelper(final MetadataHelper metadataHelper) {
Expand Down Expand Up @@ -1923,17 +1924,23 @@ protected ASTJexlScript parseQueryAndValidatePattern(String query, TraceStopwatc
if (log.isTraceEnabled()) {
log.trace("Stack trace for overflow " + soe);
}
stopwatch.stop();
if (stopwatch != null) {
stopwatch.stop();
}
PreConditionFailedQueryException qe = new PreConditionFailedQueryException(DatawaveErrorCode.QUERY_DEPTH_OR_TERM_THRESHOLD_EXCEEDED, soe);
log.warn(qe);
throw new DatawaveFatalQueryException(qe);
} catch (ParseException e) {
stopwatch.stop();
if (stopwatch != null) {
stopwatch.stop();
}
BadRequestQueryException qe = new BadRequestQueryException(DatawaveErrorCode.UNPARSEABLE_JEXL_QUERY, e, MessageFormat.format("Query: {0}", query));
log.warn(qe);
throw new DatawaveFatalQueryException(qe);
} catch (PatternSyntaxException e) {
stopwatch.stop();
if (stopwatch != null) {
stopwatch.stop();
}
BadRequestQueryException qe = new BadRequestQueryException(DatawaveErrorCode.INVALID_REGEX, e, MessageFormat.format("Query: {0}", query));
log.warn(qe);
throw new DatawaveFatalQueryException(qe);
Expand Down Expand Up @@ -2977,6 +2984,10 @@ public String getPlannedScript() {
return plannedScript;
}

public void setPlannedScript(String plannedScript) {
this.plannedScript = plannedScript;
}

protected Multimap<String,Type<?>> configureIndexedAndNormalizedFields(MetadataHelper metadataHelper, ShardQueryConfiguration config,
ASTJexlScript queryTree) throws DatawaveQueryException {
// Fetch the mapping of fields to Types from the DatawaveMetadata table
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package datawave.query.planner;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

import datawave.core.query.configuration.QueryData;
import datawave.query.CloseableIterable;

/**
* Implementation of {@link CloseableIterable} intended to be used by {@link FederatedQueryPlanner}. This iterable
*/
public class FederatedQueryIterable implements CloseableIterable<QueryData> {

private final List<CloseableIterable<QueryData>> iterables = new ArrayList<>();

/**
* Add an iterable to this {@link FederatedQueryIterable}.
*
* @param iterable
* the iterable to add
*/
public void addIterable(CloseableIterable<QueryData> iterable) {
if (iterable != null) {
iterables.add(iterable);
}
}

/**
* Closes and clears each iterable in this {@link FederatedQueryIterable}.
*
* @throws IOException
* if an error occurred when closing an iterable
*/
@Override
public void close() throws IOException {
for (CloseableIterable<QueryData> iterable : iterables) {
iterable.close();
}
iterables.clear();
}

/**
* Returns an iterator that will iterate over the {@link QueryData} returned by each iterable in this {@link FederatedQueryIterable}.
*
* @return the iterator
*/
@Override
public Iterator<QueryData> iterator() {
return new Iter();
}

/**
* Iterator implementation that provides the ability to iterate over each {@link QueryData} of the iterables in {@link #iterables}.
*/
private class Iter implements Iterator<QueryData> {

// Iterator that traverses over the iterables.
private final Iterator<CloseableIterable<QueryData>> iterableIterator = iterables.iterator();

// The current sub iterator.
private Iterator<QueryData> currentSubIterator = null;

@Override
public boolean hasNext() {
seekToNextAvailableQueryData();
return currentSubIterator != null && currentSubIterator.hasNext();
}

@Override
public QueryData next() {
return currentSubIterator.next();
}

/**
* Seek to the next sub-iterator that has a {@link QueryData} remaining in it.
*/
private void seekToNextAvailableQueryData() {
// If the current sub iterator is null, attempt to get the next available iterator, or return early if there are no more iterators.
if (currentSubIterator == null) {
if (iterableIterator.hasNext()) {
currentSubIterator = iterableIterator.next().iterator();
} else {
return;
}
}
// If the current sub iterator does not have any more elements remaining, move to the next sub iterator that does have elements.
if (!currentSubIterator.hasNext()) {
while (iterableIterator.hasNext()) {
// We must ensure we only ever call iterator() once on each sub-iterator.
currentSubIterator = iterableIterator.next().iterator();
if (currentSubIterator.hasNext()) {
return;
}
}
}
}
}
}
Loading

0 comments on commit 04336c4

Please sign in to comment.