Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

All-fields as an argument of aggregator such as count() can be resolved after other field #814

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1122,4 +1122,166 @@ class FlintSparkPPLAggregationsITSuite

comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}

test("test count() at the first of stats clause") {
val frame = sql(s"""
| source = $testTable | eval a = 1 | stats count() as cnt, sum(a) as sum, avg(a) as avg
| """.stripMargin)
assertSameRows(Seq(Row(4, 4, 1.0)), frame)

val logicalPlan: LogicalPlan = frame.queryExecution.logical
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))
val eval = Project(Seq(UnresolvedStar(None), Alias(Literal(1), "a")()), table)
val sum =
Alias(
UnresolvedFunction(Seq("SUM"), Seq(UnresolvedAttribute("a")), isDistinct = false),
"sum")()
val avg =
Alias(
UnresolvedFunction(Seq("AVG"), Seq(UnresolvedAttribute("a")), isDistinct = false),
"avg")()
val count =
Alias(
UnresolvedFunction(Seq("COUNT"), Seq(UnresolvedStar(None)), isDistinct = false),
"cnt")()
val aggregate = Aggregate(Seq.empty, Seq(count, sum, avg), eval)
val expectedPlan = Project(Seq(UnresolvedStar(None)), aggregate)
comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}

test("test count() in the middle of stats clause") {
val frame = sql(s"""
| source = $testTable | eval a = 1 | stats sum(a) as sum, count() as cnt, avg(a) as avg
| """.stripMargin)
assertSameRows(Seq(Row(4, 4, 1.0)), frame)

val logicalPlan: LogicalPlan = frame.queryExecution.logical
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))
val eval = Project(Seq(UnresolvedStar(None), Alias(Literal(1), "a")()), table)
val sum =
Alias(
UnresolvedFunction(Seq("SUM"), Seq(UnresolvedAttribute("a")), isDistinct = false),
"sum")()
val avg =
Alias(
UnresolvedFunction(Seq("AVG"), Seq(UnresolvedAttribute("a")), isDistinct = false),
"avg")()
val count =
Alias(
UnresolvedFunction(Seq("COUNT"), Seq(UnresolvedStar(None)), isDistinct = false),
"cnt")()
val aggregate = Aggregate(Seq.empty, Seq(sum, count, avg), eval)
val expectedPlan = Project(Seq(UnresolvedStar(None)), aggregate)
comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}

test("test count() at the end of stats clause") {
val frame = sql(s"""
| source = $testTable | eval a = 1 | stats sum(a) as sum, avg(a) as avg, count() as cnt
| """.stripMargin)
assertSameRows(Seq(Row(4, 1.0, 4)), frame)

val logicalPlan: LogicalPlan = frame.queryExecution.logical
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))
val eval = Project(Seq(UnresolvedStar(None), Alias(Literal(1), "a")()), table)
val sum =
Alias(
UnresolvedFunction(Seq("SUM"), Seq(UnresolvedAttribute("a")), isDistinct = false),
"sum")()
val avg =
Alias(
UnresolvedFunction(Seq("AVG"), Seq(UnresolvedAttribute("a")), isDistinct = false),
"avg")()
val count =
Alias(
UnresolvedFunction(Seq("COUNT"), Seq(UnresolvedStar(None)), isDistinct = false),
"cnt")()
val aggregate = Aggregate(Seq.empty, Seq(sum, avg, count), eval)
val expectedPlan = Project(Seq(UnresolvedStar(None)), aggregate)
comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}

test("test count() at the first of stats by clause") {
val frame = sql(s"""
| source = $testTable | eval a = 1 | stats count() as cnt, sum(a) as sum, avg(a) as avg by country
| """.stripMargin)
assertSameRows(Seq(Row(2, 2, 1.0, "Canada"), Row(2, 2, 1.0, "USA")), frame)

val logicalPlan: LogicalPlan = frame.queryExecution.logical
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))
val eval = Project(Seq(UnresolvedStar(None), Alias(Literal(1), "a")()), table)
val sum =
Alias(
UnresolvedFunction(Seq("SUM"), Seq(UnresolvedAttribute("a")), isDistinct = false),
"sum")()
val avg =
Alias(
UnresolvedFunction(Seq("AVG"), Seq(UnresolvedAttribute("a")), isDistinct = false),
"avg")()
val count =
Alias(
UnresolvedFunction(Seq("COUNT"), Seq(UnresolvedStar(None)), isDistinct = false),
"cnt")()
val grouping =
Alias(UnresolvedAttribute("country"), "country")()
val aggregate = Aggregate(Seq(grouping), Seq(count, sum, avg, grouping), eval)
val expectedPlan = Project(Seq(UnresolvedStar(None)), aggregate)
comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}

test("test count() in the middle of stats by clause") {
val frame = sql(s"""
| source = $testTable | eval a = 1 | stats sum(a) as sum, count() as cnt, avg(a) as avg by country
| """.stripMargin)
assertSameRows(Seq(Row(2, 2, 1.0, "Canada"), Row(2, 2, 1.0, "USA")), frame)

val logicalPlan: LogicalPlan = frame.queryExecution.logical
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))
val eval = Project(Seq(UnresolvedStar(None), Alias(Literal(1), "a")()), table)
val sum =
Alias(
UnresolvedFunction(Seq("SUM"), Seq(UnresolvedAttribute("a")), isDistinct = false),
"sum")()
val avg =
Alias(
UnresolvedFunction(Seq("AVG"), Seq(UnresolvedAttribute("a")), isDistinct = false),
"avg")()
val count =
Alias(
UnresolvedFunction(Seq("COUNT"), Seq(UnresolvedStar(None)), isDistinct = false),
"cnt")()
val grouping =
Alias(UnresolvedAttribute("country"), "country")()
val aggregate = Aggregate(Seq(grouping), Seq(sum, count, avg, grouping), eval)
val expectedPlan = Project(Seq(UnresolvedStar(None)), aggregate)
comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}

test("test count() at the end of stats by clause") {
val frame = sql(s"""
| source = $testTable | eval a = 1 | stats sum(a) as sum, avg(a) as avg, count() as cnt by country
| """.stripMargin)
assertSameRows(Seq(Row(2, 1.0, 2, "Canada"), Row(2, 1.0, 2, "USA")), frame)

val logicalPlan: LogicalPlan = frame.queryExecution.logical
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))
val eval = Project(Seq(UnresolvedStar(None), Alias(Literal(1), "a")()), table)
val sum =
Alias(
UnresolvedFunction(Seq("SUM"), Seq(UnresolvedAttribute("a")), isDistinct = false),
"sum")()
val avg =
Alias(
UnresolvedFunction(Seq("AVG"), Seq(UnresolvedAttribute("a")), isDistinct = false),
"avg")()
val count =
Alias(
UnresolvedFunction(Seq("COUNT"), Seq(UnresolvedStar(None)), isDistinct = false),
"cnt")()
val grouping =
Alias(UnresolvedAttribute("country"), "country")()
val aggregate = Aggregate(Seq(grouping), Seq(sum, avg, count, grouping), eval)
val expectedPlan = Project(Seq(UnresolvedStar(None)), aggregate)
comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -662,11 +662,7 @@ public Expression visitCorrelationMapping(FieldsMapping node, CatalystPlanContex

@Override
public Expression visitAllFields(AllFields node, CatalystPlanContext context) {
// Case of aggregation step - no start projection can be added
if (context.getNamedParseExpressions().isEmpty()) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@YANG-DB could you double confirm this fixing? IMO it's safe to remove the condition. Any case do I miss? Since I don't understand the comment of L665. Another similar case is in method visitEval(), the same condition could be removed since each eval command must convert to a project list start with *.

// Create an UnresolvedStar for all-fields projection
context.getNamedParseExpressions().push(UnresolvedStar$.MODULE$.apply(Option.<Seq<String>>empty()));
}
context.getNamedParseExpressions().push(UnresolvedStar$.MODULE$.apply(Option.<Seq<String>>empty()));
return context.getNamedParseExpressions().peek();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -959,4 +959,58 @@ class PPLLogicalPlanAggregationQueriesTranslatorTestSuite

comparePlans(expectedPlan, logPlan, false)
}

test("test count() as the last aggregator in stats clause") {
val context = new CatalystPlanContext
val logPlan = planTransformer.visit(
plan(
pplParser,
"source = table | eval a = 1 | stats sum(a) as sum, avg(a) as avg, count() as cnt"),
context)
val tableRelation = UnresolvedRelation(Seq("table"))
val eval = Project(Seq(UnresolvedStar(None), Alias(Literal(1), "a")()), tableRelation)
val sum =
Alias(
UnresolvedFunction(Seq("SUM"), Seq(UnresolvedAttribute("a")), isDistinct = false),
"sum")()
val avg =
Alias(
UnresolvedFunction(Seq("AVG"), Seq(UnresolvedAttribute("a")), isDistinct = false),
"avg")()
val count =
Alias(
UnresolvedFunction(Seq("COUNT"), Seq(UnresolvedStar(None)), isDistinct = false),
"cnt")()
val aggregate = Aggregate(Seq.empty, Seq(sum, avg, count), eval)
val expectedPlan = Project(Seq(UnresolvedStar(None)), aggregate)
comparePlans(expectedPlan, logPlan, checkAnalysis = false)
}

test("test count() as the last aggregator in stats by clause") {
val context = new CatalystPlanContext
val logPlan = planTransformer.visit(
plan(
pplParser,
"source = table | eval a = 1 | stats sum(a) as sum, avg(a) as avg, count() as cnt by country"),
context)
val tableRelation = UnresolvedRelation(Seq("table"))
val eval = Project(Seq(UnresolvedStar(None), Alias(Literal(1), "a")()), tableRelation)
val sum =
Alias(
UnresolvedFunction(Seq("SUM"), Seq(UnresolvedAttribute("a")), isDistinct = false),
"sum")()
val avg =
Alias(
UnresolvedFunction(Seq("AVG"), Seq(UnresolvedAttribute("a")), isDistinct = false),
"avg")()
val count =
Alias(
UnresolvedFunction(Seq("COUNT"), Seq(UnresolvedStar(None)), isDistinct = false),
"cnt")()
val grouping =
Alias(UnresolvedAttribute("country"), "country")()
val aggregate = Aggregate(Seq(grouping), Seq(sum, avg, count, grouping), eval)
val expectedPlan = Project(Seq(UnresolvedStar(None)), aggregate)
comparePlans(expectedPlan, logPlan, checkAnalysis = false)
}
}
Loading