Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions pkg/planner/core/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func TestMain(m *testing.M) {
testDataMap.LoadTestSuiteData("testdata", "index_merge_suite", true)
testDataMap.LoadTestSuiteData("testdata", "runtime_filter_generator_suite")
testDataMap.LoadTestSuiteData("testdata", "plan_cache_suite")
testDataMap.LoadTestSuiteData("testdata", "decorrelate_limit_suite", true)

indexMergeSuiteData = testDataMap["index_merge_suite"]
planSuiteUnexportedData = testDataMap["plan_suite_unexported"]
Expand Down Expand Up @@ -72,3 +73,11 @@ func GetIndexMergeSuiteData() testdata.TestData {
func GetRuntimeFilterGeneratorData() testdata.TestData {
return testDataMap["runtime_filter_generator_suite"]
}

func GetDecorrelateLimitSuiteData() testdata.TestData {
return testDataMap["decorrelate_limit_suite"]
}

func GetCascadesSuiteData() testdata.TestData {
return testDataMap["cascades_suite"]
}
25 changes: 25 additions & 0 deletions pkg/planner/core/operator/logicalop/logical_plans_misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,31 @@ func HasMaxOneRow(p base.LogicalPlan, childMaxOneRow []bool) bool {
return false
}

// CanGenerateMultipleRows returns if the LogicalPlan can generate multiple rows from a single input row.
// This is used to check if an operator can expand rows, which affects uniqueness constraints.
// Operators that can generate multiple rows include:
// - JOIN (except semi/anti joins which preserve row count)
// - UNION ALL
// - PartitionUnionAll
// - Expand (for GROUPING SETS/ROLLUP)
// - TODO: unnest function when implemented
func CanGenerateMultipleRows(p base.LogicalPlan) bool {
switch p.(type) {
case *LogicalJoin:
// JOIN operators can generate multiple rows (Cartesian product effect)
// Note: Semi/Anti joins preserve row count, but we return true here for safety
// as the caller can refine this check if needed
return true
case *LogicalUnionAll, *LogicalPartitionUnionAll:
// UNION ALL combines multiple inputs, potentially generating multiple rows
return true
case *LogicalExpand:
// Expand operator splits rows for GROUPING SETS/ROLLUP
return true
}
return false
}

// AddSelection adds a LogicalSelection to the given LogicalPlan.
func AddSelection(p base.LogicalPlan, child base.LogicalPlan, conditions []expression.Expression, chIdx int) {
if len(conditions) == 0 {
Expand Down
24 changes: 24 additions & 0 deletions pkg/planner/core/plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/pingcap/tidb/pkg/planner/util/coretestsdk"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/testkit/testdata"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/dbterror/plannererrors"
"github.com/pingcap/tidb/pkg/util/plancodec"
Expand Down Expand Up @@ -732,3 +733,26 @@ func TestImportIntoBuildPlan(t *testing.T) {
require.ErrorIs(t, tk.ExecToErr("IMPORT INTO t3 FROM select * from t2"),
infoschema.ErrTableNotExists)
}

func TestDecorrelateLimitOptimization(t *testing.T) {
testkit.RunTestUnderCascadesWithDomain(t, func(t *testing.T, testKit *testkit.TestKit, dom *domain.Domain, cascades, caller string) {
testKit.MustExec("use test")
testKit.MustExec("CREATE TABLE IF NOT EXISTS employees (\n id INT PRIMARY KEY,\n name VARCHAR(50),\n dept_id INT,\n salary DECIMAL(10, 2),\n alias VARCHAR(50)\n)")
testKit.MustExec("CREATE TABLE IF NOT EXISTS employee_notes (\n id INT PRIMARY KEY,\n employee_id INT,\n note TEXT,\n created_at TIMESTAMP,\n INDEX idx_employee_id (employee_id)\n)")
var input []string
var output []struct {
SQL string
Plan []string
}
decorrelateLimitSuiteData := core.GetDecorrelateLimitSuiteData()
decorrelateLimitSuiteData.LoadTestCases(t, &input, &output, cascades, caller)
for i, sql := range input {
plan := testKit.MustQuery(sql)
testdata.OnRecord(func() {
output[i].SQL = sql
output[i].Plan = testdata.ConvertRowsToStrings(plan.Rows())
})
plan.Check(testkit.Rows(output[i].Plan...))
}
})
}
171 changes: 171 additions & 0 deletions pkg/planner/core/rule_decorrelate.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,50 @@ func (s *DecorrelateSolver) optimize(ctx context.Context, p base.LogicalPlan, gr
apply.SetChildren(outerPlan, innerPlan)
return s.optimize(ctx, p, groupByColumn)
} else if m, ok := innerPlan.(*logicalop.LogicalMaxOneRow); ok {
// Check if MaxOneRow's child is Limit or TopN, and if we can remove it for LeftOuterJoin
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems this PR doesn't handle the TopN case?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At this stage, topn is still just a LIMIT, so it doesn’t matter.

// Also handle the case where there's a Projection between MaxOneRow and Limit: MaxOneRow -> Projection -> Limit
if apply.JoinType == base.LeftOuterJoin {
mChild := m.Children()[0]
var removePlan base.LogicalPlan
var canRemove bool

if li, ok := mChild.(*logicalop.LogicalLimit); ok {
// Limit with non-0 offset cannot be removed, but we still check for redundant MaxOneRow
if li.Offset != 0 {
canRemove = false
} else {
// Check if join key is unique key
removePlan = li.Children()[0]
if isJoinKeyUniqueKey(apply, removePlan) {
canRemove = true
}
}
} else if proj, ok := mChild.(*logicalop.LogicalProjection); ok {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When will this happen? Can you provide some cases for this situations?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

// Check if Projection's child is Limit: MaxOneRow -> Projection -> Limit
// This pattern occurs when subqueries contain some clauses like ORDER BY or HAVING clauses that require projection.
// Examples:
// - HAVING clause: SELECT ... (SELECT AVG(...) FROM ... GROUP BY ... HAVING ... LIMIT 1)
// - ORDER BY clause: SELECT ... (SELECT ... FROM ... ORDER BY ... LIMIT 1)
if li, ok := proj.Children()[0].(*logicalop.LogicalLimit); ok {
// Limit with non-0 offset cannot be removed, but we still check for redundant MaxOneRow
if li.Offset != 0 {
canRemove = false
} else {
// Check if join key is unique key
removePlan = li.Children()[0]
if isJoinKeyUniqueKey(apply, removePlan) {
canRemove = true
}
}
}
}
// If LIMIT can be removed (join key is unique key), remove it and re-enter decorrelate solver
if canRemove {
apply.SetChildren(outerPlan, removePlan)
return s.optimize(ctx, p, groupByColumn)
}
}
// If child is already MaxOneRow, remove redundant wrapper
if m.Children()[0].MaxOneRow() {
innerPlan = m.Children()[0]
apply.SetChildren(outerPlan, innerPlan)
Expand Down Expand Up @@ -470,6 +514,133 @@ func (*DecorrelateSolver) Name() string {
return "decorrelate"
}

// extractJoinKeyFromCondition extracts the inner join key column from an equality condition.
// It checks if the condition is of the form "outer_col = inner_col" where outer_col is a correlated column
// from the Apply operator. Returns the inner column if it belongs to the given schema, otherwise returns nil.
func extractJoinKeyFromCondition(apply *logicalop.LogicalApply, cond expression.Expression, schema *expression.Schema) *expression.Column {
decExpr := apply.DeCorColFromEqExpr(cond)
if decExpr == nil {
return nil
}
sf, ok := decExpr.(*expression.ScalarFunction)
if !ok || sf.FuncName.L != ast.EQ {
return nil
}
args := sf.GetArgs()
if len(args) != 2 {
return nil
}
innerCol, ok := args[1].(*expression.Column)
if !ok || !schema.Contains(innerCol) {
return nil
}
return innerCol
}

// isJoinKeyUniqueKey checks if join key is unique key.
// Returns true if the join key forms a unique key constraint.
func isJoinKeyUniqueKey(apply *logicalop.LogicalApply, plan base.LogicalPlan) bool {
var hasMultiRowOperator func(base.LogicalPlan) bool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function needs to guarantee contains all the cases which will generate more rows. If there lacks some cases, it will generate the wring answer. For example, please add some cases related to the unnest function, it will generate more rows? So here should be considered more seriously.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, there may be mis-deletions down the road or in certain cases. But the NoDecorrelate hint lets us sidestep the issue, even if we miss maintaining the list when new funcs are introduced.

hasMultiRowOperator = func(p base.LogicalPlan) bool {
// Use centralized function to check if operator can generate multiple rows
if logicalop.CanGenerateMultipleRows(p) {
return true
}
// Recursively check children
for _, child := range p.Children() {
if hasMultiRowOperator(child) {
return true
}
}
return false
}
if hasMultiRowOperator(plan) {
return false
}

// Extract join keys from Selection conditions and their children recursively
// Join conditions may be pushed down to DataSource or nested in child Selection nodes
innerJoinKeys := make([]*expression.Column, 0)

// Recursively extract all conditions from Selection nodes and their children
var extractConditions func(base.LogicalPlan)
extractConditions = func(p base.LogicalPlan) {
if sel, ok := p.(*logicalop.LogicalSelection); ok {
// Check conditions directly on Selection
for _, cond := range sel.Conditions {
if innerCol := extractJoinKeyFromCondition(apply, cond, sel.Schema()); innerCol != nil {
innerJoinKeys = append(innerJoinKeys, innerCol)
}
}
// Continue to check children recursively
} else if ds, ok := p.(*logicalop.DataSource); ok {
// Check conditions in DataSource (PushedDownConds may contain join key conditions)
for _, cond := range ds.PushedDownConds {
if innerCol := extractJoinKeyFromCondition(apply, cond, ds.Schema()); innerCol != nil {
innerJoinKeys = append(innerJoinKeys, innerCol)
}
}
// Stop recursion at DataSource
return
}
// Continue recursion for other nodes
for _, child := range p.Children() {
extractConditions(child)
}
}

extractConditions(plan)
if len(innerJoinKeys) == 0 {
return false
}

// Find the underlying DataSource to get PKOrUK
var findDataSource func(base.LogicalPlan) *logicalop.DataSource
findDataSource = func(p base.LogicalPlan) *logicalop.DataSource {
if ds, ok := p.(*logicalop.DataSource); ok {
return ds
}
for _, child := range p.Children() {
if ds := findDataSource(child); ds != nil {
return ds
}
}
return nil
}
ds := findDataSource(plan)
if ds == nil {
return false
}

// Use PKOrUK from DataSource Schema directly
if len(ds.Schema().PKOrUK) == 0 {
return false
}

// Check if join keys form a unique key
for _, keyInfo := range ds.Schema().PKOrUK {
allMatch := true
for _, keyCol := range keyInfo {
found := false
for _, joinKey := range innerJoinKeys {
if keyCol.ID == joinKey.ID && keyCol.ID != 0 {
found = true
break
}
}
if !found {
allMatch = false
break
}
}
if allMatch && len(keyInfo) > 0 {
return true
}
}

return false
}

// Return true if we should skip decorrelation for LeftOuterApply + Projection.
func skipDecorrelateProjectionForLeftOuterApply(apply *logicalop.LogicalApply, proj *logicalop.LogicalProjection) bool {
allConst := len(proj.Exprs) > 0
Expand Down
22 changes: 22 additions & 0 deletions pkg/planner/core/testdata/decorrelate_limit_suite_in.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
[
{
"Name": "TestDecorrelateLimitOptimization",
"Cases": [
"EXPLAIN format = 'plan_tree' SELECT e.name, e.salary , ( select salary FROM employees e2 WHERE e2.id = e.id LIMIT 1 OFFSET 0 ) AS avg_dept_salary FROM employees e WHERE e.dept_id > 1",
"EXPLAIN format = 'plan_tree' SELECT e.name, e.salary , ( select salary FROM employees e2 WHERE e2.id = e.id LIMIT 1 OFFSET 1 ) AS avg_dept_salary FROM employees e WHERE e.dept_id > 1",
"EXPLAIN format = 'plan_tree' SELECT e.name, e.salary, ( SELECT e2.salary FROM employees e2 WHERE e2.dept_id = e.dept_id LIMIT 1 OFFSET 0) AS avg_dept_salary FROM employees e WHERE e.dept_id = 1",
"EXPLAIN format = 'plan_tree' SELECT e.id, e.name, e.salary, (SELECT en.note FROM employees e2 JOIN employee_notes en ON en.employee_id = e2.id WHERE e2.id = e.id ORDER BY en.created_at DESC LIMIT 1) AS latest_note FROM employees e",
"EXPLAIN format = 'plan_tree' SELECT e.name, e.salary,(SELECT en.note FROM employees e2 JOIN employee_notes en ON en.employee_id = e2.id LEFT JOIN employees e3 ON e3.id = e2.dept_id WHERE e2.id = e.id LIMIT 1) AS note_multi_join FROM employees e",
"EXPLAIN format = 'plan_tree' SELECT e.name, e.salary,(SELECT e2.salary FROM employees e2 INNER JOIN employee_notes en ON en.employee_id = e2.id WHERE e2.id = e.id LIMIT 1) AS salary_inner_join FROM employees e",
"EXPLAIN format = 'plan_tree' SELECT e.name, e.salary, (SELECT AVG(e2.salary) FROM employees e2 WHERE e2.id = e.id GROUP BY e2.dept_id HAVING AVG(e2.salary) > 1000 LIMIT 1) AS avg_salary_having FROM employees e",
"EXPLAIN format = 'plan_tree' SELECT e.name, e.salary, (SELECT count(e2.dept_id) FROM employees e2 WHERE e2.id = e.id limit 1) AS distinct_dept_id FROM employees e",
"EXPLAIN format = 'plan_tree' SELECT e.name, e.salary,(SELECT e2.salary FROM employees e2 WHERE e2.id = e.id AND e2.dept_id IN ( SELECT dept_id FROM employees e3 WHERE e3.id = e.id LIMIT 1 ) LIMIT 1) AS salary_nested FROM employees e",
"EXPLAIN format = 'plan_tree' SELECT e.name, e.salary,(SELECT e2.salary FROM employees e2 WHERE e2.id = e.id AND EXISTS ( SELECT 1 FROM employee_notes en WHERE en.employee_id = e2.id ) LIMIT 1) AS salary_exists FROM employees e",
"EXPLAIN format = 'plan_tree' SELECT e.name, e.salary, (SELECT e2.salary FROM employees e2 WHERE e2.id = e.id ORDER BY e2.dept_id, e2.salary DESC LIMIT 1) AS salary_order_multi FROM employees e",
"EXPLAIN format = 'plan_tree' SELECT e.name, e.salary, (SELECT DISTINCT e2.dept_id FROM employees e2 WHERE e2.id = e.id LIMIT 1) AS distinct_dept_id FROM employees e",
"EXPLAIN format = 'plan_tree' SELECT e.name, e.salary, (SELECT ROW_NUMBER() OVER (ORDER BY e2.salary DESC) FROM employees e2 WHERE e2.id = e.id LIMIT 1) AS row_num FROM employees e",
"EXPLAIN format = 'plan_tree' SELECT e.name, e.salary, (SELECT DISTINCT e2.dept_id FROM employees e2 WHERE e2.id = e.id LIMIT 1) AS distinct_dept_id FROM employees e"
]
}
]

Loading