Skip to content

Commit 0f1367a

Browse files
committed
[FLINK-36755][k8s] Relax the limitation that kubernetes application require a main jar
1 parent ced02ce commit 0f1367a

File tree

7 files changed

+44
-14
lines changed

7 files changed

+44
-14
lines changed

flink-clients/src/main/java/org/apache/flink/client/program/artifact/ArtifactFetchManager.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -115,11 +115,9 @@ public Result fetchArtifacts(String[] uris) {
115115
* @return result with the fetched artifacts
116116
* @throws Exception
117117
*/
118-
public Result fetchArtifacts(String jobUri, @Nullable List<String> additionalUris)
118+
public Result fetchArtifacts(@Nullable String jobUri, @Nullable List<String> additionalUris)
119119
throws Exception {
120-
checkArgument(jobUri != null && !jobUri.trim().isEmpty(), "The jobUri is required.");
121-
122-
File jobJar = fetchArtifact(jobUri);
120+
File jobJar = jobUri == null ? null : fetchArtifact(jobUri);
123121
List<File> additionalArtifacts =
124122
additionalUris == null
125123
? Collections.emptyList()

flink-clients/src/test/java/org/apache/flink/client/program/artifact/ArtifactFetchManagerTest.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -198,9 +198,6 @@ void testHttpDisabledError() {
198198
@Test
199199
void testMissingRequiredFetchArgs() {
200200
ArtifactFetchManager fetchMgr = new ArtifactFetchManager(configuration);
201-
assertThatThrownBy(() -> fetchMgr.fetchArtifacts(null, null))
202-
.isInstanceOf(IllegalArgumentException.class)
203-
.hasMessage("The jobUri is required.");
204201

205202
assertThatThrownBy(() -> fetchMgr.fetchArtifacts(null))
206203
.isInstanceOf(IllegalArgumentException.class)

flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,8 @@ public ClusterClientProvider<String> deployApplicationCluster(
220220
|| PackagedProgramUtils.isPython(applicationConfiguration.getProgramArguments()))) {
221221
final List<URI> pipelineJars =
222222
KubernetesUtils.checkJarFileForApplicationMode(flinkConfig);
223-
Preconditions.checkArgument(pipelineJars.size() == 1, "Should only have one jar");
223+
Preconditions.checkArgument(
224+
pipelineJars.size() <= 1, "Should only have at most one jar.");
224225
}
225226

226227
try {

flink-kubernetes/src/main/java/org/apache/flink/kubernetes/entrypoint/KubernetesApplicationClusterEntrypoint.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.flink.kubernetes.entrypoint;
2020

2121
import org.apache.flink.annotation.Internal;
22+
import org.apache.flink.annotation.VisibleForTesting;
2223
import org.apache.flink.client.cli.ArtifactFetchOptions;
2324
import org.apache.flink.client.deployment.application.ApplicationClusterEntryPoint;
2425
import org.apache.flink.client.deployment.application.ApplicationConfiguration;
@@ -101,7 +102,8 @@ public static void main(final String[] args) {
101102
ClusterEntrypoint.runClusterEntrypoint(kubernetesApplicationClusterEntrypoint);
102103
}
103104

104-
private static PackagedProgram getPackagedProgram(final Configuration configuration)
105+
@VisibleForTesting
106+
static PackagedProgram getPackagedProgram(final Configuration configuration)
105107
throws FlinkException {
106108

107109
final ApplicationConfiguration applicationConfiguration =
@@ -146,14 +148,15 @@ private static ArtifactFetchManager.Result fetchArtifacts(Configuration configur
146148
String targetDir = generateJarDir(configuration);
147149
ArtifactFetchManager fetchMgr = new ArtifactFetchManager(configuration, targetDir);
148150

149-
List<String> uris = configuration.get(PipelineOptions.JARS);
150-
checkArgument(uris.size() == 1, "Should only have one jar");
151+
List<String> uris =
152+
configuration.getOptional(PipelineOptions.JARS).orElse(Collections.emptyList());
153+
checkArgument(uris.size() <= 1, "Should only have at most one jar");
151154
List<String> additionalUris =
152155
configuration
153156
.getOptional(ArtifactFetchOptions.ARTIFACT_LIST)
154157
.orElse(Collections.emptyList());
155158

156-
return fetchMgr.fetchArtifacts(uris.get(0), additionalUris);
159+
return fetchMgr.fetchArtifacts(uris.size() == 1 ? uris.get(0) : null, additionalUris);
157160
} catch (Exception e) {
158161
throw new RuntimeException(e);
159162
}

flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -400,7 +400,8 @@ public static List<String> getStartCommandWithBashWrapper(String command) {
400400
}
401401

402402
public static List<URI> checkJarFileForApplicationMode(Configuration configuration) {
403-
return configuration.get(PipelineOptions.JARS).stream()
403+
return configuration.getOptional(PipelineOptions.JARS).orElse(Collections.emptyList())
404+
.stream()
404405
.map(FunctionUtils.uncheckedFunction(PackagedProgramUtils::resolveURI))
405406
.collect(Collectors.toList());
406407
}

flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClusterDescriptorTest.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,21 @@ void testDeployApplicationCluster() {
157157
checkUpdatedConfigAndResourceSetting();
158158
}
159159

160+
@Test
161+
void testDeployApplicationClusterWithoutJar() {
162+
flinkConfig.set(DeploymentOptions.TARGET, KubernetesDeploymentTarget.APPLICATION.getName());
163+
try {
164+
descriptor.deployApplicationCluster(clusterSpecification, appConfig);
165+
} catch (Exception ignored) {
166+
}
167+
168+
mockExpectedServiceFromServerSide(loadBalancerSvc);
169+
final ClusterClient<String> clusterClient =
170+
descriptor.retrieve(CLUSTER_ID).getClusterClient();
171+
checkClusterClient(clusterClient);
172+
checkUpdatedConfigAndResourceSetting();
173+
}
174+
160175
@Test
161176
void testDeployApplicationClusterWithClusterAlreadyExists() {
162177
flinkConfig.set(
@@ -202,7 +217,7 @@ void testDeployApplicationClusterWithMultipleJarsSet() {
202217
cause ->
203218
assertThat(cause)
204219
.isInstanceOf(IllegalArgumentException.class)
205-
.hasMessageContaining("Should only have one jar"));
220+
.hasMessageContaining("Should only have at most one jar"));
206221
}
207222

208223
@Test

flink-kubernetes/src/test/java/org/apache/flink/kubernetes/entrypoint/KubernetesApplicationClusterEntrypointTest.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
package org.apache.flink.kubernetes.entrypoint;
2020

2121
import org.apache.flink.client.cli.ArtifactFetchOptions;
22+
import org.apache.flink.client.deployment.application.ApplicationConfiguration;
23+
import org.apache.flink.client.program.PackagedProgram;
2224
import org.apache.flink.configuration.Configuration;
2325
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
2426

@@ -32,6 +34,8 @@
3234
import java.io.File;
3335
import java.nio.file.Path;
3436

37+
import static org.assertj.core.api.Assertions.assertThat;
38+
3539
/** Tests for {@link KubernetesApplicationClusterEntrypointTest}. */
3640
public class KubernetesApplicationClusterEntrypointTest {
3741
private static final Logger LOG =
@@ -58,4 +62,15 @@ public void testGenerateJarDir() {
5862
new String[] {tempDir.toString(), TEST_NAMESPACE, TEST_CLUSTER_ID});
5963
Assertions.assertEquals(expectedDir, baseDir);
6064
}
65+
66+
@Test
67+
void testGetPackagedProgram() throws Exception {
68+
Configuration config = new Configuration();
69+
new ApplicationConfiguration(
70+
new String[0], KubernetesApplicationClusterEntrypoint.class.getName())
71+
.applyToConfiguration(config);
72+
PackagedProgram packagedProgram =
73+
KubernetesApplicationClusterEntrypoint.getPackagedProgram(config);
74+
assertThat(packagedProgram.getJobJarAndDependencies()).isNullOrEmpty();
75+
}
6176
}

0 commit comments

Comments
 (0)