Skip to content

Commit f58d5fc

Browse files
authored
[FLINK-36720][table-planner] Add CompiledPlan annotations to BatchExecWindowTableFunction
1 parent 0580924 commit f58d5fc

File tree

14 files changed

+2623
-15
lines changed

14 files changed

+2623
-15
lines changed

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecWindowTableFunction.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.flink.table.planner.plan.nodes.exec.batch;
2020

21+
import org.apache.flink.FlinkVersion;
2122
import org.apache.flink.api.dag.Transformation;
2223
import org.apache.flink.configuration.ReadableConfig;
2324
import org.apache.flink.table.api.TableException;
@@ -27,13 +28,24 @@
2728
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
2829
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
2930
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
31+
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
3032
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
3133
import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecWindowTableFunction;
3234
import org.apache.flink.table.types.logical.RowType;
3335

36+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
37+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
38+
3439
import java.util.Collections;
40+
import java.util.List;
3541

3642
/** Batch {@link ExecNode} for window table-valued function. */
43+
@ExecNodeMetadata(
44+
name = "batch-exec-window-table-function",
45+
version = 1,
46+
producedTransformations = CommonExecWindowTableFunction.WINDOW_TRANSFORMATION,
47+
minPlanVersion = FlinkVersion.v2_0,
48+
minStateVersion = FlinkVersion.v2_0)
3749
public class BatchExecWindowTableFunction extends CommonExecWindowTableFunction
3850
implements BatchExecNode<RowData> {
3951

@@ -53,6 +65,25 @@ public BatchExecWindowTableFunction(
5365
description);
5466
}
5567

68+
@JsonCreator
69+
public BatchExecWindowTableFunction(
70+
@JsonProperty(FIELD_NAME_ID) int id,
71+
@JsonProperty(FIELD_NAME_TYPE) ExecNodeContext context,
72+
@JsonProperty(FIELD_NAME_CONFIGURATION) ReadableConfig persistedConfig,
73+
@JsonProperty(FIELD_NAME_WINDOWING) TimeAttributeWindowingStrategy windowingStrategy,
74+
@JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty> inputProperties,
75+
@JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
76+
@JsonProperty(FIELD_NAME_DESCRIPTION) String description) {
77+
super(
78+
id,
79+
context,
80+
persistedConfig,
81+
windowingStrategy,
82+
inputProperties,
83+
outputType,
84+
description);
85+
}
86+
5687
@Override
5788
protected Transformation<RowData> translateToPlanInternal(
5889
PlannerBase planner, ExecNodeConfig config) {

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecTableSourceScan;
4545
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecUnion;
4646
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecValues;
47+
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecWindowTableFunction;
4748
import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecAsyncCalc;
4849
import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc;
4950
import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecChangelogNormalize;
@@ -180,6 +181,7 @@ private ExecNodeMetadataUtil() {
180181
add(BatchExecExpand.class);
181182
add(BatchExecSortAggregate.class);
182183
add(BatchExecSortLimit.class);
184+
add(BatchExecWindowTableFunction.class);
183185
add(BatchExecMatch.class);
184186
}
185187
};
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.table.planner.plan.nodes.exec.batch;
20+
21+
import org.apache.flink.table.planner.plan.nodes.exec.common.WindowTableFunctionTestPrograms;
22+
import org.apache.flink.table.planner.plan.nodes.exec.testutils.BatchRestoreTestBase;
23+
import org.apache.flink.table.test.program.TableTestProgram;
24+
25+
import java.util.Arrays;
26+
import java.util.List;
27+
28+
/** Batch Compiled Plan tests for {@link BatchExecWindowTableFunction}. */
29+
public class WindowTableFunctionEventTimeBatchRestoreTest extends BatchRestoreTestBase {
30+
31+
public WindowTableFunctionEventTimeBatchRestoreTest() {
32+
super(BatchExecWindowTableFunction.class);
33+
}
34+
35+
@Override
36+
public List<TableTestProgram> programs() {
37+
return Arrays.asList(
38+
WindowTableFunctionTestPrograms.WINDOW_TABLE_FUNCTION_HOP_TVF,
39+
WindowTableFunctionTestPrograms.WINDOW_TABLE_FUNCTION_HOP_TVF_AGG,
40+
WindowTableFunctionTestPrograms.WINDOW_TABLE_FUNCTION_CUMULATE_TVF,
41+
WindowTableFunctionTestPrograms.WINDOW_TABLE_FUNCTION_CUMULATE_TVF_AGG,
42+
WindowTableFunctionTestPrograms.WINDOW_TABLE_FUNCTION_TUMBLE_TVF,
43+
WindowTableFunctionTestPrograms.WINDOW_TABLE_FUNCTION_TUMBLE_TVF_AGG,
44+
WindowTableFunctionTestPrograms.WINDOW_TABLE_FUNCTION_TUMBLE_TVF_POSITIVE_OFFSET,
45+
WindowTableFunctionTestPrograms.WINDOW_TABLE_FUNCTION_TUMBLE_TVF_NEGATIVE_OFFSET);
46+
}
47+
}
Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,22 @@
1616
* limitations under the License.
1717
*/
1818

19-
package org.apache.flink.table.planner.plan.nodes.exec.stream;
19+
package org.apache.flink.table.planner.plan.nodes.exec.common;
2020

2121
import org.apache.flink.table.api.config.TableConfigOptions;
22+
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecWindowTableFunction;
23+
import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWindowTableFunction;
2224
import org.apache.flink.table.test.program.SinkTestStep;
2325
import org.apache.flink.table.test.program.SourceTestStep;
2426
import org.apache.flink.table.test.program.TableTestProgram;
2527
import org.apache.flink.types.Row;
2628

2729
import java.math.BigDecimal;
2830

29-
/** {@link TableTestProgram} definitions for testing {@link StreamExecWindowJoin}. */
31+
/**
32+
* {@link TableTestProgram} definitions for testing {@link BatchExecWindowTableFunction} and {@link
33+
* StreamExecWindowTableFunction}.
34+
*/
3035
public class WindowTableFunctionTestPrograms {
3136

3237
static final Row[] BEFORE_DATA = {
@@ -110,7 +115,7 @@ public class WindowTableFunctionTestPrograms {
110115
+ " %s\n"
111116
+ " GROUP BY window_start, window_end";
112117

113-
static final TableTestProgram WINDOW_TABLE_FUNCTION_TUMBLE_TVF =
118+
public static final TableTestProgram WINDOW_TABLE_FUNCTION_TUMBLE_TVF =
114119
TableTestProgram.of(
115120
"window-table-function-tumble-tvf",
116121
"validates window table function using tumble tvf windows")
@@ -133,7 +138,7 @@ public class WindowTableFunctionTestPrograms {
133138
.runSql(String.format(QUERY_TVF, String.format(TUMBLE_TVF, "bid_time")))
134139
.build();
135140

136-
static final TableTestProgram WINDOW_TABLE_FUNCTION_TUMBLE_TVF_POSITIVE_OFFSET =
141+
public static final TableTestProgram WINDOW_TABLE_FUNCTION_TUMBLE_TVF_POSITIVE_OFFSET =
137142
TableTestProgram.of(
138143
"window-table-function-tumble-tvf-positive-offset",
139144
"validates window table function using tumble tvf windows with positive offset")
@@ -158,7 +163,7 @@ public class WindowTableFunctionTestPrograms {
158163
QUERY_TVF, String.format(TUMBLE_TVF_OFFSET, "bid_time", "6")))
159164
.build();
160165

161-
static final TableTestProgram WINDOW_TABLE_FUNCTION_TUMBLE_TVF_NEGATIVE_OFFSET =
166+
public static final TableTestProgram WINDOW_TABLE_FUNCTION_TUMBLE_TVF_NEGATIVE_OFFSET =
162167
TableTestProgram.of(
163168
"window-table-function-tumble-tvf-negative-offset",
164169
"validates window table function using tumble tvf windows with negative offset")
@@ -183,7 +188,7 @@ public class WindowTableFunctionTestPrograms {
183188
QUERY_TVF, String.format(TUMBLE_TVF_OFFSET, "bid_time", "-6")))
184189
.build();
185190

186-
static final TableTestProgram WINDOW_TABLE_FUNCTION_TUMBLE_TVF_AGG =
191+
public static final TableTestProgram WINDOW_TABLE_FUNCTION_TUMBLE_TVF_AGG =
187192
TableTestProgram.of(
188193
"window-table-function-tumble-tvf-agg",
189194
"validates window table function using tumble tvf windows with aggregation")
@@ -200,7 +205,7 @@ public class WindowTableFunctionTestPrograms {
200205
.runSql(String.format(QUERY_TVF_AGG, String.format(TUMBLE_TVF, "bid_time")))
201206
.build();
202207

203-
static final TableTestProgram WINDOW_TABLE_FUNCTION_TUMBLE_TVF_AGG_PROC_TIME =
208+
public static final TableTestProgram WINDOW_TABLE_FUNCTION_TUMBLE_TVF_AGG_PROC_TIME =
204209
TableTestProgram.of(
205210
"window-table-function-tumble-tvf-agg-proc-time",
206211
"validates window table function using tumble tvf windows with aggregation and processing time")
@@ -217,7 +222,7 @@ public class WindowTableFunctionTestPrograms {
217222
String.format(TUMBLE_TVF, "proc_time")))
218223
.build();
219224

220-
static final TableTestProgram WINDOW_TABLE_FUNCTION_HOP_TVF =
225+
public static final TableTestProgram WINDOW_TABLE_FUNCTION_HOP_TVF =
221226
TableTestProgram.of(
222227
"window-table-function-hop-tvf",
223228
"validates window table function using hop tvf windows")
@@ -248,7 +253,7 @@ public class WindowTableFunctionTestPrograms {
248253
.runSql(String.format(QUERY_TVF, String.format(HOP_TVF, "bid_time")))
249254
.build();
250255

251-
static final TableTestProgram WINDOW_TABLE_FUNCTION_HOP_TVF_AGG =
256+
public static final TableTestProgram WINDOW_TABLE_FUNCTION_HOP_TVF_AGG =
252257
TableTestProgram.of(
253258
"window-table-function-hop-tvf-agg",
254259
"validates window table function using hop tvf windows with aggregation")
@@ -268,7 +273,7 @@ public class WindowTableFunctionTestPrograms {
268273
.runSql(String.format(QUERY_TVF_AGG, String.format(HOP_TVF, "bid_time")))
269274
.build();
270275

271-
static final TableTestProgram WINDOW_TABLE_FUNCTION_HOP_TVF_AGG_PROC_TIME =
276+
public static final TableTestProgram WINDOW_TABLE_FUNCTION_HOP_TVF_AGG_PROC_TIME =
272277
TableTestProgram.of(
273278
"window-table-function-hop-tvf-agg-proc-time",
274279
"validates window table function using hop tvf windows with aggregation and processing time")
@@ -284,7 +289,7 @@ public class WindowTableFunctionTestPrograms {
284289
QUERY_TVF_AGG_PROC_TIME, String.format(HOP_TVF, "proc_time")))
285290
.build();
286291

287-
static final TableTestProgram WINDOW_TABLE_FUNCTION_CUMULATE_TVF =
292+
public static final TableTestProgram WINDOW_TABLE_FUNCTION_CUMULATE_TVF =
288293
TableTestProgram.of(
289294
"window-table-function-cumulate-tvf",
290295
"validates window table function using cumulate tvf windows")
@@ -310,7 +315,7 @@ public class WindowTableFunctionTestPrograms {
310315
.runSql(String.format(QUERY_TVF, String.format(CUMULATE_TVF, "bid_time")))
311316
.build();
312317

313-
static final TableTestProgram WINDOW_TABLE_FUNCTION_CUMULATE_TVF_AGG =
318+
public static final TableTestProgram WINDOW_TABLE_FUNCTION_CUMULATE_TVF_AGG =
314319
TableTestProgram.of(
315320
"window-table-function-cumulate-tvf-agg",
316321
"validates window table function using cumulate tvf windows with aggregation")
@@ -329,7 +334,7 @@ public class WindowTableFunctionTestPrograms {
329334
.runSql(String.format(QUERY_TVF_AGG, String.format(CUMULATE_TVF, "bid_time")))
330335
.build();
331336

332-
static final TableTestProgram WINDOW_TABLE_FUNCTION_CUMULATE_TVF_AGG_PROC_TIME =
337+
public static final TableTestProgram WINDOW_TABLE_FUNCTION_CUMULATE_TVF_AGG_PROC_TIME =
333338
TableTestProgram.of(
334339
"window-table-function-cumulate-tvf-agg-proc-time",
335340
"validates window table function using cumulate tvf windows with aggregation")

flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowTableFunctionEventTimeRestoreTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,14 @@
1818

1919
package org.apache.flink.table.planner.plan.nodes.exec.stream;
2020

21+
import org.apache.flink.table.planner.plan.nodes.exec.common.WindowTableFunctionTestPrograms;
2122
import org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase;
2223
import org.apache.flink.table.test.program.TableTestProgram;
2324

2425
import java.util.Arrays;
2526
import java.util.List;
2627

27-
/** Restore tests for {@link StreamExecWindowTableFunction}. */
28+
/** Restore tests for {@link StreamExecWindowTableFunction} which use event time. */
2829
public class WindowTableFunctionEventTimeRestoreTest extends RestoreTestBase {
2930

3031
public WindowTableFunctionEventTimeRestoreTest() {

flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowTableFunctionProcTimeRestoreTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,14 @@
1818

1919
package org.apache.flink.table.planner.plan.nodes.exec.stream;
2020

21+
import org.apache.flink.table.planner.plan.nodes.exec.common.WindowTableFunctionTestPrograms;
2122
import org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase;
2223
import org.apache.flink.table.test.program.TableTestProgram;
2324

2425
import java.util.Arrays;
2526
import java.util.List;
2627

27-
/** Restore tests for {@link StreamExecWindowTableFunction}. */
28+
/** Restore tests for {@link StreamExecWindowTableFunction} which use processing time. */
2829
public class WindowTableFunctionProcTimeRestoreTest extends RestoreTestBase {
2930

3031
public WindowTableFunctionProcTimeRestoreTest() {

0 commit comments

Comments
 (0)