Skip to content

Commit 5f1b49f

Browse files
authored
[FLINK-34599][table] Migrate BatchPhysicalConstantTableFunctionScanRule to Java
Signed-off-by: Sergey Nuyanzin <snuyanzin@gmail.com>
1 parent 5739b16 commit 5f1b49f

File tree

3 files changed

+123
-83
lines changed

3 files changed

+123
-83
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.table.planner.plan.rules.physical.batch;
20+
21+
import org.apache.flink.table.planner.plan.nodes.FlinkConventions;
22+
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan;
23+
import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalCorrelate;
24+
import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalValues;
25+
26+
import com.google.common.collect.ImmutableList;
27+
import org.apache.calcite.plan.RelOptCluster;
28+
import org.apache.calcite.plan.RelOptRuleCall;
29+
import org.apache.calcite.plan.RelRule;
30+
import org.apache.calcite.plan.RelTraitSet;
31+
import org.apache.calcite.rel.core.JoinRelType;
32+
import org.apache.calcite.rex.RexUtil;
33+
import org.immutables.value.Value;
34+
35+
import scala.Option;
36+
37+
/**
38+
* Converts {@link FlinkLogicalTableFunctionScan} with constant RexCall to
39+
*
40+
* <pre>
41+
* {@link BatchPhysicalCorrelate}
42+
* / \
43+
* empty {@link BatchPhysicalValuesRule}} {@link FlinkLogicalTableFunctionScan}.
44+
* </pre>
45+
*
46+
* <p>Add the rule to support select from a UDF directly, such as the following SQL: {@code SELECT *
47+
* FROM LATERAL TABLE(func()) as T(c)}
48+
*
49+
* <p>Note: {@link BatchPhysicalCorrelateRule} is responsible for converting a reasonable physical
50+
* plan for the normal correlate query, such as the following SQL: example1: {@code SELECT * FROM T,
51+
* LATERAL TABLE(func()) as T(c) example2: SELECT a, c FROM T, LATERAL TABLE(func(a)) as T(c)}
52+
*/
53+
@Value.Enclosing
54+
public class BatchPhysicalConstantTableFunctionScanRule
55+
extends RelRule<
56+
BatchPhysicalConstantTableFunctionScanRule
57+
.BatchPhysicalConstantTableFunctionScanRuleConfig> {
58+
59+
public static final BatchPhysicalConstantTableFunctionScanRule INSTANCE =
60+
BatchPhysicalConstantTableFunctionScanRuleConfig.DEFAULT.toRule();
61+
62+
protected BatchPhysicalConstantTableFunctionScanRule(
63+
BatchPhysicalConstantTableFunctionScanRuleConfig config) {
64+
super(config);
65+
}
66+
67+
public boolean matches(RelOptRuleCall call) {
68+
FlinkLogicalTableFunctionScan scan = call.rel(0);
69+
return RexUtil.isConstant(scan.getCall()) && scan.getInputs().isEmpty();
70+
}
71+
72+
public void onMatch(RelOptRuleCall call) {
73+
FlinkLogicalTableFunctionScan scan = call.rel(0);
74+
75+
// create correlate left
76+
RelOptCluster cluster = scan.getCluster();
77+
RelTraitSet traitSet =
78+
call.getPlanner().emptyTraitSet().replace(FlinkConventions.BATCH_PHYSICAL());
79+
BatchPhysicalValues values =
80+
new BatchPhysicalValues(
81+
cluster,
82+
traitSet,
83+
ImmutableList.of(ImmutableList.of()),
84+
cluster.getTypeFactory()
85+
.createStructType(ImmutableList.of(), ImmutableList.of()));
86+
87+
BatchPhysicalCorrelate correlate =
88+
new BatchPhysicalCorrelate(
89+
cluster,
90+
traitSet,
91+
values,
92+
scan,
93+
Option.empty(),
94+
scan.getRowType(),
95+
JoinRelType.INNER);
96+
call.transformTo(correlate);
97+
}
98+
99+
/** Configuration for {@link BatchPhysicalConstantTableFunctionScanRule}. */
100+
@Value.Immutable(singleton = false)
101+
public interface BatchPhysicalConstantTableFunctionScanRuleConfig extends RelRule.Config {
102+
BatchPhysicalConstantTableFunctionScanRule.BatchPhysicalConstantTableFunctionScanRuleConfig
103+
DEFAULT =
104+
ImmutableBatchPhysicalConstantTableFunctionScanRule
105+
.BatchPhysicalConstantTableFunctionScanRuleConfig.builder()
106+
.build()
107+
.withOperandSupplier(
108+
b0 ->
109+
b0.operand(FlinkLogicalTableFunctionScan.class)
110+
.anyInputs())
111+
.withDescription("BatchPhysicalConstantTableFunctionScanRule")
112+
.as(
113+
BatchPhysicalConstantTableFunctionScanRule
114+
.BatchPhysicalConstantTableFunctionScanRuleConfig
115+
.class);
116+
117+
@Override
118+
default BatchPhysicalConstantTableFunctionScanRule toRule() {
119+
return new BatchPhysicalConstantTableFunctionScanRule(this);
120+
}
121+
}
122+
}

flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalConstantTableFunctionScanRule.scala

Lines changed: 0 additions & 82 deletions
This file was deleted.

tools/maven/suppressions.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ under the License.
5656

5757
<!-- Have to use guava directly -->
5858
<suppress
59-
files="OverConvertRule.java|InConverter.java|SymbolUtil.java|RexNodeJsonDeserializer.java|RexNodeJsonSerializer.java|RexNodeJsonSerdeTest.java|FlinkAggregateProjectMergeRule.java|StreamPhysicalConstantTableFunctionScanRule.java"
59+
files="OverConvertRule.java|InConverter.java|SymbolUtil.java|RexNodeJsonDeserializer.java|RexNodeJsonSerializer.java|RexNodeJsonSerdeTest.java|FlinkAggregateProjectMergeRule.java|BatchPhysicalConstantTableFunctionScanRule.java|StreamPhysicalConstantTableFunctionScanRule.java"
6060
checks="IllegalImport"/>
6161
<!-- Classes copied from AWS -->
6262
<suppress

0 commit comments

Comments
 (0)