Skip to content

Commit

Permalink
[Enhancement] generate runtime filter only for tuples with conjunct (#…
Browse files Browse the repository at this point in the history
…8745)

Remove useless runtime filter in some primary-foreign key join scenario in TPCH case.
  • Loading branch information
starocean999 authored Jul 18, 2022
1 parent 2b6cdcf commit 5c88a74
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.Predicate;
import org.apache.doris.analysis.SlotId;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.analysis.TupleId;
import org.apache.doris.analysis.TupleIsNullPredicate;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.IdGenerator;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.thrift.TRuntimeFilterDesc;
import org.apache.doris.thrift.TRuntimeFilterType;

Expand All @@ -40,6 +42,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -225,9 +228,9 @@ public HashJoinNode getBuilderNode() {
* to the join node 'filterSrcNode'. Returns an instance of RuntimeFilter
* or null if a runtime filter cannot be generated from the specified predicate.
*/
public static RuntimeFilter create(IdGenerator<RuntimeFilterId> idGen, Analyzer analyzer,
Expr joinPredicate, int exprOrder, HashJoinNode filterSrcNode,
TRuntimeFilterType type, RuntimeFilterGenerator.FilterSizeLimits filterSizeLimits) {
public static RuntimeFilter create(IdGenerator<RuntimeFilterId> idGen, Analyzer analyzer, Expr joinPredicate,
int exprOrder, HashJoinNode filterSrcNode, TRuntimeFilterType type,
RuntimeFilterGenerator.FilterSizeLimits filterSizeLimits, HashSet<TupleId> tupleHasConjuncts) {
Preconditions.checkNotNull(idGen);
Preconditions.checkNotNull(joinPredicate);
Preconditions.checkNotNull(filterSrcNode);
Expand Down Expand Up @@ -268,6 +271,20 @@ public static RuntimeFilter create(IdGenerator<RuntimeFilterId> idGen, Analyzer
if (LOG.isTraceEnabled()) {
LOG.trace("Generating runtime filter from predicate " + joinPredicate);
}
if (ConnectContext.get().getSessionVariable().enableRemoveNoConjunctsRuntimeFilterPolicy) {
if (srcExpr instanceof SlotRef) {
if (!tupleHasConjuncts.contains(((SlotRef) srcExpr).getDesc().getParent().getId())) {
// src tuple has no conjunct, don't create runtime filter
return null;
} else {
// runtime filter itself is a valid conjunct, add all the target tuple ids
for (TupleId tupleId : targetSlots.keySet()) {
tupleHasConjuncts.add(tupleId);
}
}
}
}

return new RuntimeFilter(idGen.getNextId(), filterSrcNode, srcExpr, exprOrder,
targetExpr, targetSlots, type, filterSizeLimits);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ public final class RuntimeFilterGenerator {
// Generator for filter ids
private final IdGenerator<RuntimeFilterId> filterIdGenerator = RuntimeFilterId.createGenerator();

private HashSet<TupleId> tupleHasConjuncts = null;

/**
* Internal class that encapsulates the max, min and default sizes used for creating
* bloom filter objects.
Expand Down Expand Up @@ -121,6 +123,31 @@ private RuntimeFilterGenerator(Analyzer analyzer) {
bloomFilterSizeLimits = new FilterSizeLimits(sessionVariable);
}

private void collectAllTupleIdsHavingConjunct(PlanNode node, HashSet<TupleId> tupleIds) {
// for simplicity, skip join node( which contains more than 1 tuple id )
// we only look for the node meets either of the 2 conditions:
// 1. The node itself has conjunct
// 2. Its descendant have conjuncts.
int tupleNumBeforeCheckingChildren = tupleIds.size();
for (PlanNode child : node.getChildren()) {
collectAllTupleIdsHavingConjunct(child, tupleIds);
}
if (node.getTupleIds().size() == 1
&& (!node.conjuncts.isEmpty() || tupleIds.size() > tupleNumBeforeCheckingChildren)) {
// The node or its descendant has conjuncts
tupleIds.add(node.getTupleIds().get(0));
}
}

public void findAllTuplesHavingConjuncts(PlanNode node) {
if (tupleHasConjuncts == null) {
tupleHasConjuncts = new HashSet<>();
} else {
tupleHasConjuncts.clear();
}
collectAllTupleIdsHavingConjunct(node, tupleHasConjuncts);
}

/**
* Generates and assigns runtime filters to a query plan tree.
*/
Expand All @@ -133,6 +160,9 @@ public static void generateRuntimeFilters(Analyzer analyzer, PlanNode plan) {
Preconditions.checkState(runtimeFilterType >= 0, "runtimeFilterType not expected");
Preconditions.checkState(runtimeFilterType <= Arrays.stream(TRuntimeFilterType.values())
.mapToInt(TRuntimeFilterType::getValue).sum(), "runtimeFilterType not expected");
if (ConnectContext.get().getSessionVariable().enableRemoveNoConjunctsRuntimeFilterPolicy) {
filterGenerator.findAllTuplesHavingConjuncts(plan);
}
filterGenerator.generateFilters(plan);
List<RuntimeFilter> filters = filterGenerator.getRuntimeFilters();
if (filters.size() > maxNumBloomFilters) {
Expand Down Expand Up @@ -216,7 +246,7 @@ private void generateFilters(PlanNode root) {
for (int i = 0; i < joinConjuncts.size(); i++) {
Expr conjunct = joinConjuncts.get(i);
RuntimeFilter filter = RuntimeFilter.create(filterIdGenerator,
analyzer, conjunct, i, joinNode, type, bloomFilterSizeLimits);
analyzer, conjunct, i, joinNode, type, bloomFilterSizeLimits, tupleHasConjuncts);
if (filter == null) {
continue;
}
Expand Down
16 changes: 16 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,9 @@ public class SessionVariable implements Serializable, Writable {

public static final String ENABLE_NEREIDS = "enable_nereids";

public static final String ENABLE_REMOVE_NO_CONJUNCTS_RUNTIME_FILTER =
"enable_remove_no_conjuncts_runtime_filter_policy";

// session origin value
public Map<Field, String> sessionOriginValue = new HashMap<Field, String>();
// check stmt is or not [select /*+ SET_VAR(...)*/ ...]
Expand Down Expand Up @@ -480,6 +483,9 @@ public class SessionVariable implements Serializable, Writable {
@VariableMgr.VarAttr(name = ENABLE_NEREIDS)
private boolean enableNereids = false;

@VariableMgr.VarAttr(name = ENABLE_REMOVE_NO_CONJUNCTS_RUNTIME_FILTER)
public boolean enableRemoveNoConjunctsRuntimeFilterPolicy = false;

public String getBlockEncryptionMode() {
return blockEncryptionMode;
}
Expand Down Expand Up @@ -996,6 +1002,16 @@ public void setEnableNereids(boolean enableNereids) {
* Serialize to thrift object.
* Used for rest api.
**/
public boolean isEnableRemoveNoConjunctsRuntimeFilterPolicy() {
return enableRemoveNoConjunctsRuntimeFilterPolicy;
}

public void setEnableRemoveNoConjunctsRuntimeFilterPolicy(boolean enableRemoveNoConjunctsRuntimeFilterPolicy) {
this.enableRemoveNoConjunctsRuntimeFilterPolicy = enableRemoveNoConjunctsRuntimeFilterPolicy;
}

// Serialize to thrift object
// used for rest api
public TQueryOptions toThrift() {
TQueryOptions tResult = new TQueryOptions();
tResult.setMemLimit(maxExecMemByte);
Expand Down

0 comments on commit 5c88a74

Please sign in to comment.