Skip to content

Commit

Permalink
clean up / unify code and add a test
Browse files Browse the repository at this point in the history
  • Loading branch information
lnkuiper committed Aug 16, 2024
1 parent ed13f27 commit 35f76fb
Show file tree
Hide file tree
Showing 6 changed files with 230 additions and 7 deletions.
6 changes: 4 additions & 2 deletions src/execution/operator/join/physical_join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,15 @@ void PhysicalJoin::BuildJoinPipelines(Pipeline &current, MetaPipeline &meta_pipe
// on the RHS (build side), we construct a child MetaPipeline with this operator as its sink
auto &child_meta_pipeline = meta_pipeline.CreateChildMetaPipeline(current, op, MetaPipelineType::JOIN_BUILD);
child_meta_pipeline.Build(*op.children[1]);
child_meta_pipeline_ptr = &child_meta_pipeline;
// get the ptr to the last child to set up dependencies later
child_meta_pipeline_ptr = meta_pipeline.GetLastChild();
}

// continue building the current pipeline on the LHS (probe side)
op.children[0]->BuildPipelines(current, meta_pipeline);

if (build_rhs && op.children[1]->CanSaturateThreads(current.GetClientContext())) {
if (build_rhs && op.type != PhysicalOperatorType::LEFT_DELIM_JOIN &&
op.children[1]->CanSaturateThreads(current.GetClientContext())) {
// If the build side can saturate all available threads,
// we don't just make the LHS pipeline depend on the RHS, but recursively all LHS children too.
// This prevents breadth-first plan evaluation
Expand Down
7 changes: 2 additions & 5 deletions src/execution/operator/set/physical_union.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,8 @@ void PhysicalUnion::BuildPipelines(Pipeline &current, MetaPipeline &meta_pipelin
// If the LHS child can saturate all available threads,
// we recursively make all RHS children depend on the LHS.
// This prevents breadth-first plan evaluation
vector<shared_ptr<MetaPipeline>> child_meta_pipelines;
meta_pipeline.GetMetaPipelines(child_meta_pipelines, true, true);
if (!child_meta_pipelines.empty()) {
child_meta_pipeline_ptr = child_meta_pipelines.back().get();
}
// We do this by letting them depend on the last child meta pipeline added after building out the LHS
child_meta_pipeline_ptr = meta_pipeline.GetLastChild();
}

// build the union pipeline
Expand Down
5 changes: 5 additions & 0 deletions src/execution/physical_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,13 @@ idx_t PhysicalOperator::SumOfEstimatedCardinalities() const {
}

bool PhysicalOperator::CanSaturateThreads(ClientContext &context) const {
#ifdef DEBUG
// In debug mode we always return true here so that the code that depends on it is well-tested
return true;
#else
const auto num_threads = NumericCast<idx_t>(TaskScheduler::GetScheduler(context).NumberOfThreads());
return SumOfEstimatedCardinalities() / Storage::ROW_GROUP_SIZE > num_threads;
#endif
}

//===--------------------------------------------------------------------===//
Expand Down
2 changes: 2 additions & 0 deletions src/include/duckdb/parallel/meta_pipeline.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ class MetaPipeline : public enable_shared_from_this<MetaPipeline> {
void GetPipelines(vector<shared_ptr<Pipeline>> &result, bool recursive);
//! Get the MetaPipeline children of this MetaPipeline
void GetMetaPipelines(vector<shared_ptr<MetaPipeline>> &result, bool recursive, bool skip);
//! Recursively gets the last child added
optional_ptr<MetaPipeline> GetLastChild() const;
//! Get the dependencies (within this MetaPipeline) of the given Pipeline
optional_ptr<const vector<reference<Pipeline>>> GetDependencies(Pipeline &dependant) const;
//! Whether the sink of this pipeline is a join build
Expand Down
11 changes: 11 additions & 0 deletions src/parallel/meta_pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,17 @@ void MetaPipeline::GetMetaPipelines(vector<shared_ptr<MetaPipeline>> &result, bo
}
}

optional_ptr<MetaPipeline> MetaPipeline::GetLastChild() const {
if (children.empty()) {
return nullptr;
}
reference<const vector<shared_ptr<MetaPipeline>>> current_children = children;
while (!current_children.get().back()->children.empty()) {
current_children = current_children.get().back()->children;
}
return current_children.get().back().get();
}

optional_ptr<const vector<reference<Pipeline>>> MetaPipeline::GetDependencies(Pipeline &dependant) const {
const auto it = dependencies.find(dependant);
return it == dependencies.end() ? nullptr : &it->second;
Expand Down
206 changes: 206 additions & 0 deletions test/sql/parallelism/intraquery/depth_first_evaluation.test_slow
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
# name: test/sql/parallelism/intraquery/depth_first_evaluation.test_slow
# description: Test that query plans are evaluated in a depth-first fashion
# group: [intraquery]

# we need a persistent DB because we want to compress the table that we're working with
load __TEST_DIR__/depth_first_evaluation.db

# we don't want any disk spilling because we're testing memory pressure
statement ok
SET temp_directory = ''

# 1GiB is pretty tight
statement ok
SET memory_limit = '1GiB'

statement ok
SET threads = 4

# 10M integers but the table is tiny because of delta compression
statement ok
CREATE TABLE integers AS SELECT range i FROM range(10_000_000)

# one of these should easily fit in memory
query I
SELECT count(*) c FROM (SELECT DISTINCT i FROM integers)
----
10000000

# the next query performs 10 of the same distinct aggregations and unions them together
# each distinct aggregation has a different limit (which doesn't do anything)
# so that this test is future-proof (in case DuckDB does any common sub-plan elimination in the future)

# the idea here is that if DuckDB would do breadth-first plan evaluation (like it did before)
# DuckDB would first perform the 'Sink' for every distinct aggregation one by one
# this would create a HUGE temporary intermediates
# only after that DuckDB would perform the 'Finalize' for every distinct aggregation one by one
# the 'Finalize' reduces the data size to a single row
# so, this used to throw an OOM exception given the current memory limit

# with depth-first plan evaluation, DuckDB performs 'Finalize' for every distinct aggregation,
# before starting 'Sink' on the next distinct aggregation
# now this query completes without much memory pressure!
query I
SELECT sum(c)
FROM (
SELECT count(*) c FROM (SELECT DISTINCT i FROM (SELECT i FROM integers LIMIT 100_000_000))
UNION ALL
SELECT count(*) c FROM (SELECT DISTINCT i FROM (SELECT i FROM integers LIMIT 100_000_001))
UNION ALL
SELECT count(*) c FROM (SELECT DISTINCT i FROM (SELECT i FROM integers LIMIT 100_000_002))
UNION ALL
SELECT count(*) c FROM (SELECT DISTINCT i FROM (SELECT i FROM integers LIMIT 100_000_003))
UNION ALL
SELECT count(*) c FROM (SELECT DISTINCT i FROM (SELECT i FROM integers LIMIT 100_000_004))
UNION ALL
SELECT count(*) c FROM (SELECT DISTINCT i FROM (SELECT i FROM integers LIMIT 100_000_005))
UNION ALL
SELECT count(*) c FROM (SELECT DISTINCT i FROM (SELECT i FROM integers LIMIT 100_000_006))
UNION ALL
SELECT count(*) c FROM (SELECT DISTINCT i FROM (SELECT i FROM integers LIMIT 100_000_007))
UNION ALL
SELECT count(*) c FROM (SELECT DISTINCT i FROM (SELECT i FROM integers LIMIT 100_000_008))
UNION ALL
SELECT count(*) c FROM (SELECT DISTINCT i FROM (SELECT i FROM integers LIMIT 100_000_009))
)
----
100000000

statement ok
DROP TABLE integers

# column i has 0, 100, 200, etc., around 100 unique values spread out over the range 0 to 10 million
# all other values in column j are equal to range + 0.5
# column j and k are just ranges from 0 to 10 million
# we have to do this so our statistics propagation and dynamic join filters don't trivialise the query
statement ok
CREATE TABLE doubles AS
SELECT
CASE WHEN range % 100_000 = 0 THEN range ELSE range + 0.5 END i,
range::DOUBLE j,
range::DOUBLE k
FROM range(10_000_000)

# one of these should always fit in memory
# the idea is that the cte is a large join (10m x 10m)
# but it's really selective, only 100 tuples come out of it

# then, we join with doubles union'ed with itself, so that it becomes the probe pipeline,
# i.e., it has a higher cardinality than the selective join, which goes into a build
query I
WITH c AS NOT MATERIALIZED (
SELECT d0.k
FROM doubles d0
JOIN doubles d1
ON (d0.i = d1.j)
)
SELECT count(*)
FROM (
SELECT k FROM doubles
UNION ALL
SELECT k FROM doubles
) d
JOIN c
ON (d.k = c.k)
----
200

# now we just crank up the number of ctes that we're joining with to 10

# again, if DuckDB would do breadth-first plan evaluation (like it did before)
# DuckDB would 'Sink' into all of of the builds in the cte's one by one, creating huge intermediates
# only after that it would perform all the selective joins and reduce the size of the intermediates
# so, this used to throw an OOM exception

# with depth-first plan evaluation, DuckDB performs the selective joins one by one,
# reducing the size of intermediates immediately, and the query completes!
query I
WITH c0 AS NOT MATERIALIZED (
SELECT d0.k
FROM doubles d0
JOIN doubles d1
ON (d0.i = d1.j)
LIMIT 100_000_000
), c1 AS NOT MATERIALIZED (
SELECT d0.k
FROM doubles d0
JOIN doubles d1
ON (d0.i = d1.j)
LIMIT 100_000_001
), c2 AS NOT MATERIALIZED (
SELECT d0.k
FROM doubles d0
JOIN doubles d1
ON (d0.i = d1.j)
LIMIT 100_000_002
), c3 AS NOT MATERIALIZED (
SELECT d0.k
FROM doubles d0
JOIN doubles d1
ON (d0.i = d1.j)
LIMIT 100_000_003
), c4 AS NOT MATERIALIZED (
SELECT d0.k
FROM doubles d0
JOIN doubles d1
ON (d0.i = d1.j)
LIMIT 100_000_004
), c5 AS NOT MATERIALIZED (
SELECT d0.k
FROM doubles d0
JOIN doubles d1
ON (d0.i = d1.j)
LIMIT 100_000_005
), c6 AS NOT MATERIALIZED (
SELECT d0.k
FROM doubles d0
JOIN doubles d1
ON (d0.i = d1.j)
LIMIT 100_000_006
), c7 AS NOT MATERIALIZED (
SELECT d0.k
FROM doubles d0
JOIN doubles d1
ON (d0.i = d1.j)
LIMIT 100_000_007
), c8 AS NOT MATERIALIZED (
SELECT d0.k
FROM doubles d0
JOIN doubles d1
ON (d0.i = d1.j)
LIMIT 100_000_008
), c9 AS NOT MATERIALIZED (
SELECT d0.k
FROM doubles d0
JOIN doubles d1
ON (d0.i = d1.j)
LIMIT 100_000_009
)
SELECT count(*)
FROM (
SELECT k FROM doubles
UNION ALL
SELECT k FROM doubles
) d
JOIN c0
ON (d.k = c0.k)
JOIN c1
ON (d.k = c1.k)
JOIN c2
ON (d.k = c2.k)
JOIN c3
ON (d.k = c3.k)
JOIN c4
ON (d.k = c4.k)
JOIN c5
ON (d.k = c5.k)
JOIN c6
ON (d.k = c6.k)
JOIN c7
ON (d.k = c7.k)
JOIN c8
ON (d.k = c8.k)
JOIN c9
ON (d.k = c9.k)
----
200

0 comments on commit 35f76fb

Please sign in to comment.