From 2c4cd56c7ec86de96e988358218f3047d26e1bd7 Mon Sep 17 00:00:00 2001 From: Shengkai <1059623455@qq.com> Date: Fri, 3 Jan 2025 19:31:01 +0800 Subject: [PATCH] use URI instead of Path --- .../table/gateway/api/SqlGatewayService.java | 6 +-- .../api/utils/MockedSqlGatewayService.java | 4 +- .../application/DeployScriptHandler.java | 8 +-- .../application/DeployScriptRequestBody.java | 14 ++--- .../service/SqlGatewayServiceImpl.java | 16 +++--- .../gateway/rest/DeployScriptITCase.java | 51 ++++++++++++++----- .../application/ScriptRunnerITCase.java | 4 +- .../sql_gateway_rest_api_v4.snapshot | 2 +- .../table/runtime/application/SqlDriver.java | 8 +-- 9 files changed, 68 insertions(+), 45 deletions(-) diff --git a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java index 82f9d30a009b8..bb69fdc658db4 100644 --- a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java +++ b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java @@ -41,7 +41,7 @@ import javax.annotation.Nullable; -import java.nio.file.Path; +import java.net.URI; import java.util.List; import java.util.Map; import java.util.Set; @@ -357,14 +357,14 @@ OperationHandle refreshMaterializedTable( * Deploy the script in application mode. * * @param sessionHandle handle to identify the session. - * @param scriptPath path to the script. + * @param scriptUri URI of the script. * @param script the content of the script. * @param executionConfig to run the script. * @return the cluster description. */ ClusterID deployScript( SessionHandle sessionHandle, - @Nullable Path scriptPath, + @Nullable URI scriptUri, @Nullable String script, Configuration executionConfig) throws SqlGatewayException; diff --git a/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java b/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java index 591d5088b5f94..d7b4577962926 100644 --- a/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java +++ b/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java @@ -39,7 +39,7 @@ import javax.annotation.Nullable; -import java.nio.file.Path; +import java.net.URI; import java.util.List; import java.util.Map; import java.util.Set; @@ -205,7 +205,7 @@ public OperationHandle refreshMaterializedTable( @Override public ClusterID deployScript( SessionHandle sessionHandle, - @Nullable Path scriptPath, + @Nullable URI scriptUri, @Nullable String script, Configuration executionConfig) throws SqlGatewayException { diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/application/DeployScriptHandler.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/application/DeployScriptHandler.java index 9ce1f05d04fc6..120fb2787509b 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/application/DeployScriptHandler.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/application/DeployScriptHandler.java @@ -33,7 +33,7 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; -import java.nio.file.Paths; +import java.net.URI; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -63,10 +63,10 @@ protected CompletableFuture handleRequest( service.deployScript( request.getPathParameter( SessionHandleIdPathParameter.class), - request.getRequestBody().getScriptPath() == null + request.getRequestBody().getScriptUri() == null ? null - : Paths.get( - request.getRequestBody().getScriptPath()), + : URI.create( + request.getRequestBody().getScriptUri()), request.getRequestBody().getScript(), Configuration.fromMap( request.getRequestBody().getExecutionConfig())) diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/application/DeployScriptRequestBody.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/application/DeployScriptRequestBody.java index 65051d023ec6b..bc30a5b3e102b 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/application/DeployScriptRequestBody.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/application/DeployScriptRequestBody.java @@ -32,14 +32,14 @@ public class DeployScriptRequestBody implements RequestBody { private static final String FIELD_NAME_SCRIPT = "script"; - private static final String FIELD_NAME_SCRIPT_PATH = "scriptPath"; + private static final String FIELD_NAME_SCRIPT_URI = "scriptUri"; private static final String FIELD_NAME_EXECUTION_CONFIG = "executionConfig"; @JsonProperty(FIELD_NAME_SCRIPT) private final @Nullable String script; - @JsonProperty(FIELD_NAME_SCRIPT_PATH) - private final @Nullable String scriptPath; + @JsonProperty(FIELD_NAME_SCRIPT_URI) + private final @Nullable String scriptUri; @JsonProperty(FIELD_NAME_EXECUTION_CONFIG) private final Map executionConfig; @@ -47,11 +47,11 @@ public class DeployScriptRequestBody implements RequestBody { @JsonCreator public DeployScriptRequestBody( @JsonProperty(FIELD_NAME_SCRIPT) @Nullable String script, - @JsonProperty(FIELD_NAME_SCRIPT_PATH) @Nullable String scriptPath, + @JsonProperty(FIELD_NAME_SCRIPT_URI) @Nullable String scriptUri, @JsonProperty(FIELD_NAME_EXECUTION_CONFIG) @Nullable Map executionConfig) { this.script = script; - this.scriptPath = scriptPath; + this.scriptUri = scriptUri; this.executionConfig = executionConfig == null ? Collections.emptyMap() : executionConfig; } @@ -65,7 +65,7 @@ public String getScript() { } @Nullable - public String getScriptPath() { - return scriptPath; + public String getScriptUri() { + return scriptUri; } } diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java index 00ccd661ad5af..fb3a9246e9d00 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java @@ -53,7 +53,7 @@ import javax.annotation.Nullable; -import java.nio.file.Path; +import java.net.URI; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -340,25 +340,25 @@ public OperationHandle refreshMaterializedTable( @Override public ClusterID deployScript( SessionHandle sessionHandle, - @Nullable Path scriptPath, + @Nullable URI scriptUri, @Nullable String script, Configuration executionConfig) throws SqlGatewayException { Session session = sessionManager.getSession(sessionHandle); - if (scriptPath == null && script == null) { - throw new IllegalArgumentException("Please specify script path or script."); + if (scriptUri == null && script == null) { + throw new IllegalArgumentException("Please specify script path or uri."); } - if (scriptPath != null && !StringUtils.isNullOrWhitespaceOnly(script)) { + if (scriptUri != null && !StringUtils.isNullOrWhitespaceOnly(script)) { throw new IllegalArgumentException( - "Please specify either the script path or the script itself, but not both."); + "Please specify either the script uri or the script itself, but not both."); } Configuration mergedConfig = Configuration.fromMap(session.getSessionConfig()); mergedConfig.addAll(executionConfig); List arguments = new ArrayList<>(); - if (scriptPath != null) { + if (scriptUri != null) { arguments.add("--" + SqlDriver.OPTION_SQL_FILE.getLongOpt()); - arguments.add(scriptPath.toString()); + arguments.add(scriptUri.toString()); } if (script != null) { arguments.add("--" + SqlDriver.OPTION_SQL_SCRIPT.getLongOpt()); diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/DeployScriptITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/DeployScriptITCase.java index 6e0f968e5269f..3ed3c80c16975 100644 --- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/DeployScriptITCase.java +++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/DeployScriptITCase.java @@ -46,16 +46,22 @@ import org.apache.flink.table.gateway.rest.message.session.SessionMessageParameters; import org.apache.flink.table.gateway.rest.util.SqlGatewayRestEndpointExtension; import org.apache.flink.table.gateway.rest.util.TestingRestClient; +import org.apache.flink.table.gateway.service.utils.MockHttpServer; import org.apache.flink.table.gateway.service.utils.SqlGatewayServiceExtension; import org.apache.flink.table.runtime.application.SqlDriver; +import org.apache.flink.util.FileUtils; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Order; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.api.io.TempDir; import javax.annotation.Nullable; +import java.io.File; +import java.net.URL; +import java.nio.file.Path; import java.util.Collection; import java.util.Collections; import java.util.Map; @@ -80,6 +86,13 @@ public class DeployScriptITCase { private static TestingRestClient restClient; private static SessionHandle sessionHandle; + private static final String script = + "CREATE TEMPORARY TABLE sink(\n" + + " a INT\n" + + ") WITH (\n" + + " 'connector' = 'blackhole'\n" + + ");\n" + + "INSERT INTO sink VALUES (1);"; @BeforeAll static void beforeAll() throws Exception { @@ -102,24 +115,34 @@ static void beforeAll() throws Exception { } @Test - void testDeployScriptToYarnCluster() throws Exception { - verifyDeployScriptToCluster("yarn-application"); + void testDeployScriptToYarnCluster(@TempDir Path workDir) throws Exception { + verifyDeployScriptToCluster("yarn-application", script, null, script); + try (MockHttpServer server = MockHttpServer.startHttpServer()) { + File file = workDir.resolve("script.sql").toFile(); + assertThat(file.createNewFile()).isTrue(); + FileUtils.writeFileUtf8(file, script); + URL url = server.prepareResource("/download/script.sql", file); + verifyDeployScriptToCluster("yarn-application", null, url.toString(), script); + } } @Test - void testDeployScriptToKubernetesCluster() throws Exception { - verifyDeployScriptToCluster("kubernetes-application"); + void testDeployScriptToKubernetesCluster(@TempDir Path workDir) throws Exception { + verifyDeployScriptToCluster("kubernetes-application", script, null, script); + try (MockHttpServer server = MockHttpServer.startHttpServer()) { + File file = workDir.resolve("script.sql").toFile(); + assertThat(file.createNewFile()).isTrue(); + FileUtils.writeFileUtf8(file, script); + URL url = server.prepareResource("/download/script.sql", file); + verifyDeployScriptToCluster("kubernetes-application", null, url.toString(), script); + } } - private void verifyDeployScriptToCluster(String target) throws Exception { + private void verifyDeployScriptToCluster( + String target, @Nullable String script, @Nullable String scriptUri, String content) + throws Exception { TestApplicationClusterClientFactory.id = target; - String script = - "CREATE TEMPORARY TABLE sink(\n" - + " a INT\n" - + ") WITH (\n" - + " 'connector' = 'blackhole'\n" - + ");\n" - + "INSERT INTO sink VALUES (1);"; + assertThat( restClient .sendRequest( @@ -129,7 +152,7 @@ private void verifyDeployScriptToCluster(String target) throws Exception { new SessionMessageParameters(sessionHandle), new DeployScriptRequestBody( script, - null, + scriptUri, Collections.singletonMap( DeploymentOptions.TARGET.key(), target))) .get() @@ -139,7 +162,7 @@ private void verifyDeployScriptToCluster(String target) throws Exception { assertThat(TestApplicationClusterClientFactory.configuration.getString("key", "none")) .isEqualTo("value"); assertThat(config.getApplicationClassName()).isEqualTo(SqlDriver.class.getName()); - assertThat(SqlDriver.parseOptions(config.getProgramArguments())).isEqualTo(script); + assertThat(SqlDriver.parseOptions(config.getProgramArguments())).isEqualTo(content); } /** diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/application/ScriptRunnerITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/application/ScriptRunnerITCase.java index 5ddfeed506291..4eb91e0a44b17 100644 --- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/application/ScriptRunnerITCase.java +++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/application/ScriptRunnerITCase.java @@ -141,7 +141,7 @@ void testRunScriptFromFile(@TempDir Path workDir) throws Exception { udfJar.getAbsolutePath(), GENERATED_LOWER_UDF_CLASS); List arguments = - Arrays.asList("--scriptPath", createStatementFile(workDir, script).toString()); + Arrays.asList("--scriptUri", createStatementFile(workDir, script).toString()); runScriptInCluster(arguments); assertThat(TestValuesTableFactory.getResultsAsStrings("sink")) @@ -166,7 +166,7 @@ void testRunScriptFromRemoteFile(@TempDir Path workDir) throws Exception { try (MockHttpServer server = MockHttpServer.startHttpServer()) { URL url = server.prepareResource("/download/script.sql", file); - List arguments = Arrays.asList("--scriptPath", url.toString()); + List arguments = Arrays.asList("--scriptUri", url.toString()); runScriptInCluster(arguments); } } diff --git a/flink-table/flink-sql-gateway/src/test/resources/sql_gateway_rest_api_v4.snapshot b/flink-table/flink-sql-gateway/src/test/resources/sql_gateway_rest_api_v4.snapshot index 6790948235bc6..8da82fac08010 100644 --- a/flink-table/flink-sql-gateway/src/test/resources/sql_gateway_rest_api_v4.snapshot +++ b/flink-table/flink-sql-gateway/src/test/resources/sql_gateway_rest_api_v4.snapshot @@ -361,7 +361,7 @@ "script" : { "type" : "string" }, - "scriptPath" : { + "scriptUri" : { "type" : "string" }, "executionConfig" : { diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/application/SqlDriver.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/application/SqlDriver.java index 1d983c3ae6bf2..5c22f6b9d732d 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/application/SqlDriver.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/application/SqlDriver.java @@ -62,9 +62,9 @@ public class SqlDriver { public static final Option OPTION_SQL_FILE = Option.builder() - .longOpt("scriptPath") + .longOpt("scriptUri") .numberOfArgs(1) - .desc("SQL script file path. It supports to fetch files from the DFS or HTTP.") + .desc("SQL script file URI. It supports to fetch files from the DFS or HTTP.") .build(); public static final Option OPTION_SQL_SCRIPT = @@ -162,11 +162,11 @@ public static String parseOptions(String[] args) { if (content == null) { return Preconditions.checkNotNull( line.getOptionValue(OPTION_SQL_SCRIPT.getLongOpt()), - "Please use \"--script\" or \"--scriptPath\" to specify script either."); + "Please use \"--script\" or \"--scriptUri\" to specify script either."); } else { Preconditions.checkArgument( line.getOptionValue(OPTION_SQL_SCRIPT.getLongOpt()) == null, - "Don't set \"--script\" or \"--scriptPath\" together."); + "Don't set \"--script\" or \"--scriptUri\" together."); return content; } } catch (ParseException | URISyntaxException e) {