Skip to content

Commit 5739b16

Browse files
authored
[FLINK-34600][table] Migrate PushLimitIntoLegacyTableSourceScanRule to java
1 parent b48ccce commit 5739b16

File tree

2 files changed

+153
-126
lines changed

2 files changed

+153
-126
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.table.planner.plan.rules.logical;
20+
21+
import org.apache.flink.annotation.Internal;
22+
import org.apache.flink.table.api.TableException;
23+
import org.apache.flink.table.legacy.sources.LimitableTableSource;
24+
import org.apache.flink.table.legacy.sources.TableSource;
25+
import org.apache.flink.table.plan.stats.TableStats;
26+
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalLegacyTableSourceScan;
27+
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalSort;
28+
import org.apache.flink.table.planner.plan.schema.FlinkPreparingTableBase;
29+
import org.apache.flink.table.planner.plan.schema.LegacyTableSourceTable;
30+
import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
31+
32+
import org.apache.calcite.plan.RelOptRuleCall;
33+
import org.apache.calcite.plan.RelRule;
34+
import org.apache.calcite.rel.core.Sort;
35+
import org.apache.calcite.rex.RexLiteral;
36+
import org.apache.calcite.tools.RelBuilder;
37+
import org.immutables.value.Value;
38+
39+
import java.util.Collections;
40+
41+
/**
42+
* Planner rule that tries to push limit into a {@link LimitableTableSource}. The original limit
43+
* will still be retained.
44+
*
45+
* <p>The reasons why the limit still be retained: 1.If the source is required to return the exact
46+
* number of limit number, the implementation of the source is highly required. The source is
47+
* required to accurately control the record number of split, and the parallelism setting also need
48+
* to be adjusted accordingly. 2.When remove the limit, maybe filter will be pushed down to the
49+
* source after limit pushed down. The source need know it should do limit first and do the filter
50+
* later, it is hard to implement. 3.We can support limit with offset, we can push down offset +
51+
* fetch to table source.
52+
*/
53+
@Internal
54+
@Value.Enclosing
55+
public class PushLimitIntoLegacyTableSourceScanRule
56+
extends RelRule<
57+
PushLimitIntoLegacyTableSourceScanRule
58+
.PushLimitIntoLegacyTableSourceScanRuleConfig> {
59+
60+
public static final PushLimitIntoLegacyTableSourceScanRule INSTANCE =
61+
PushLimitIntoLegacyTableSourceScanRuleConfig.DEFAULT.toRule();
62+
63+
protected PushLimitIntoLegacyTableSourceScanRule(
64+
PushLimitIntoLegacyTableSourceScanRuleConfig config) {
65+
super(config);
66+
}
67+
68+
public boolean matches(RelOptRuleCall call) {
69+
Sort sort = call.rel(0);
70+
final boolean onlyLimit =
71+
sort.getCollation().getFieldCollations().isEmpty() && sort.fetch != null;
72+
if (onlyLimit) {
73+
LegacyTableSourceTable table =
74+
call.rel(1).getTable().unwrap(LegacyTableSourceTable.class);
75+
if (table != null) {
76+
TableSource tableSource = table.tableSource();
77+
return tableSource instanceof LimitableTableSource
78+
&& !((LimitableTableSource) tableSource).isLimitPushedDown();
79+
}
80+
}
81+
return false;
82+
}
83+
84+
public void onMatch(RelOptRuleCall call) {
85+
Sort sort = call.rel(0);
86+
FlinkLogicalLegacyTableSourceScan scan = call.rel(1);
87+
LegacyTableSourceTable tableSourceTable =
88+
scan.getTable().unwrap(LegacyTableSourceTable.class);
89+
int offset = (sort.offset == null) ? 0 : RexLiteral.intValue(sort.offset);
90+
int limit = offset + RexLiteral.intValue(sort.fetch);
91+
RelBuilder relBuilder = call.builder();
92+
LegacyTableSourceTable newRelOptTable = applyLimit(limit, tableSourceTable);
93+
FlinkLogicalLegacyTableSourceScan newScan = scan.copy(scan.getTraitSet(), newRelOptTable);
94+
95+
TableSource newTableSource =
96+
newRelOptTable.unwrap(LegacyTableSourceTable.class).tableSource();
97+
TableSource oldTableSource =
98+
tableSourceTable.unwrap(LegacyTableSourceTable.class).tableSource();
99+
100+
if (((LimitableTableSource) newTableSource).isLimitPushedDown()
101+
&& newTableSource.explainSource().equals(oldTableSource.explainSource())) {
102+
throw new TableException(
103+
"Failed to push limit into table source! "
104+
+ "table source with pushdown capability must override and change "
105+
+ "explainSource() API to explain the pushdown applied!");
106+
}
107+
108+
call.transformTo(sort.copy(sort.getTraitSet(), Collections.singletonList(newScan)));
109+
}
110+
111+
private LegacyTableSourceTable applyLimit(long limit, FlinkPreparingTableBase relOptTable) {
112+
LegacyTableSourceTable tableSourceTable = relOptTable.unwrap(LegacyTableSourceTable.class);
113+
LimitableTableSource limitedSource = (LimitableTableSource) tableSourceTable.tableSource();
114+
TableSource newTableSource = limitedSource.applyLimit(limit);
115+
116+
FlinkStatistic statistic = relOptTable.getStatistic();
117+
long newRowCount =
118+
(statistic.getRowCount() != null)
119+
? Math.min(limit, statistic.getRowCount().longValue())
120+
: limit;
121+
// Update TableStats after limit push down
122+
TableStats newTableStats = new TableStats(newRowCount);
123+
FlinkStatistic newStatistic =
124+
FlinkStatistic.builder().statistic(statistic).tableStats(newTableStats).build();
125+
return tableSourceTable.copy(newTableSource, newStatistic);
126+
}
127+
128+
/** Configuration for {@link PushLimitIntoLegacyTableSourceScanRule}. */
129+
@Value.Immutable(singleton = false)
130+
public interface PushLimitIntoLegacyTableSourceScanRuleConfig extends RelRule.Config {
131+
PushLimitIntoLegacyTableSourceScanRule.PushLimitIntoLegacyTableSourceScanRuleConfig
132+
DEFAULT =
133+
ImmutablePushLimitIntoLegacyTableSourceScanRule
134+
.PushLimitIntoLegacyTableSourceScanRuleConfig.builder()
135+
.operandSupplier(
136+
b0 ->
137+
b0.operand(FlinkLogicalSort.class)
138+
.oneInput(
139+
b1 ->
140+
b1.operand(
141+
FlinkLogicalLegacyTableSourceScan
142+
.class)
143+
.noInputs()))
144+
.description("PushLimitIntoLegacyTableSourceScanRule")
145+
.build()
146+
.as(PushLimitIntoLegacyTableSourceScanRuleConfig.class);
147+
148+
@Override
149+
default PushLimitIntoLegacyTableSourceScanRule toRule() {
150+
return new PushLimitIntoLegacyTableSourceScanRule(this);
151+
}
152+
}
153+
}

flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PushLimitIntoLegacyTableSourceScanRule.scala

Lines changed: 0 additions & 126 deletions
This file was deleted.

0 commit comments

Comments
 (0)