Skip to content

Commit

Permalink
use URI instead of Path
Browse files Browse the repository at this point in the history
  • Loading branch information
fsk119 committed Jan 3, 2025
1 parent a39bd7d commit 2c4cd56
Show file tree
Hide file tree
Showing 9 changed files with 68 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@

import javax.annotation.Nullable;

import java.nio.file.Path;
import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -357,14 +357,14 @@ OperationHandle refreshMaterializedTable(
* Deploy the script in application mode.
*
* @param sessionHandle handle to identify the session.
* @param scriptPath path to the script.
* @param scriptUri URI of the script.
* @param script the content of the script.
* @param executionConfig to run the script.
* @return the cluster description.
*/
<ClusterID> ClusterID deployScript(
SessionHandle sessionHandle,
@Nullable Path scriptPath,
@Nullable URI scriptUri,
@Nullable String script,
Configuration executionConfig)
throws SqlGatewayException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@

import javax.annotation.Nullable;

import java.nio.file.Path;
import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -205,7 +205,7 @@ public OperationHandle refreshMaterializedTable(
@Override
public <ClusterID> ClusterID deployScript(
SessionHandle sessionHandle,
@Nullable Path scriptPath,
@Nullable URI scriptUri,
@Nullable String script,
Configuration executionConfig)
throws SqlGatewayException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import java.nio.file.Paths;
import java.net.URI;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

Expand Down Expand Up @@ -63,10 +63,10 @@ protected CompletableFuture<DeployScriptResponseBody> handleRequest(
service.deployScript(
request.getPathParameter(
SessionHandleIdPathParameter.class),
request.getRequestBody().getScriptPath() == null
request.getRequestBody().getScriptUri() == null
? null
: Paths.get(
request.getRequestBody().getScriptPath()),
: URI.create(
request.getRequestBody().getScriptUri()),
request.getRequestBody().getScript(),
Configuration.fromMap(
request.getRequestBody().getExecutionConfig()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,26 +32,26 @@
public class DeployScriptRequestBody implements RequestBody {

private static final String FIELD_NAME_SCRIPT = "script";
private static final String FIELD_NAME_SCRIPT_PATH = "scriptPath";
private static final String FIELD_NAME_SCRIPT_URI = "scriptUri";
private static final String FIELD_NAME_EXECUTION_CONFIG = "executionConfig";

@JsonProperty(FIELD_NAME_SCRIPT)
private final @Nullable String script;

@JsonProperty(FIELD_NAME_SCRIPT_PATH)
private final @Nullable String scriptPath;
@JsonProperty(FIELD_NAME_SCRIPT_URI)
private final @Nullable String scriptUri;

@JsonProperty(FIELD_NAME_EXECUTION_CONFIG)
private final Map<String, String> executionConfig;

@JsonCreator
public DeployScriptRequestBody(
@JsonProperty(FIELD_NAME_SCRIPT) @Nullable String script,
@JsonProperty(FIELD_NAME_SCRIPT_PATH) @Nullable String scriptPath,
@JsonProperty(FIELD_NAME_SCRIPT_URI) @Nullable String scriptUri,
@JsonProperty(FIELD_NAME_EXECUTION_CONFIG) @Nullable
Map<String, String> executionConfig) {
this.script = script;
this.scriptPath = scriptPath;
this.scriptUri = scriptUri;
this.executionConfig = executionConfig == null ? Collections.emptyMap() : executionConfig;
}

Expand All @@ -65,7 +65,7 @@ public String getScript() {
}

@Nullable
public String getScriptPath() {
return scriptPath;
public String getScriptUri() {
return scriptUri;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@

import javax.annotation.Nullable;

import java.nio.file.Path;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -340,25 +340,25 @@ public OperationHandle refreshMaterializedTable(
@Override
public <ClusterID> ClusterID deployScript(
SessionHandle sessionHandle,
@Nullable Path scriptPath,
@Nullable URI scriptUri,
@Nullable String script,
Configuration executionConfig)
throws SqlGatewayException {
Session session = sessionManager.getSession(sessionHandle);
if (scriptPath == null && script == null) {
throw new IllegalArgumentException("Please specify script path or script.");
if (scriptUri == null && script == null) {
throw new IllegalArgumentException("Please specify script path or uri.");
}
if (scriptPath != null && !StringUtils.isNullOrWhitespaceOnly(script)) {
if (scriptUri != null && !StringUtils.isNullOrWhitespaceOnly(script)) {
throw new IllegalArgumentException(
"Please specify either the script path or the script itself, but not both.");
"Please specify either the script uri or the script itself, but not both.");
}
Configuration mergedConfig = Configuration.fromMap(session.getSessionConfig());
mergedConfig.addAll(executionConfig);

List<String> arguments = new ArrayList<>();
if (scriptPath != null) {
if (scriptUri != null) {
arguments.add("--" + SqlDriver.OPTION_SQL_FILE.getLongOpt());
arguments.add(scriptPath.toString());
arguments.add(scriptUri.toString());
}
if (script != null) {
arguments.add("--" + SqlDriver.OPTION_SQL_SCRIPT.getLongOpt());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,22 @@
import org.apache.flink.table.gateway.rest.message.session.SessionMessageParameters;
import org.apache.flink.table.gateway.rest.util.SqlGatewayRestEndpointExtension;
import org.apache.flink.table.gateway.rest.util.TestingRestClient;
import org.apache.flink.table.gateway.service.utils.MockHttpServer;
import org.apache.flink.table.gateway.service.utils.SqlGatewayServiceExtension;
import org.apache.flink.table.runtime.application.SqlDriver;
import org.apache.flink.util.FileUtils;

import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;

import javax.annotation.Nullable;

import java.io.File;
import java.net.URL;
import java.nio.file.Path;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
Expand All @@ -80,6 +86,13 @@ public class DeployScriptITCase {

private static TestingRestClient restClient;
private static SessionHandle sessionHandle;
private static final String script =
"CREATE TEMPORARY TABLE sink(\n"
+ " a INT\n"
+ ") WITH (\n"
+ " 'connector' = 'blackhole'\n"
+ ");\n"
+ "INSERT INTO sink VALUES (1);";

@BeforeAll
static void beforeAll() throws Exception {
Expand All @@ -102,24 +115,34 @@ static void beforeAll() throws Exception {
}

@Test
void testDeployScriptToYarnCluster() throws Exception {
verifyDeployScriptToCluster("yarn-application");
void testDeployScriptToYarnCluster(@TempDir Path workDir) throws Exception {
verifyDeployScriptToCluster("yarn-application", script, null, script);
try (MockHttpServer server = MockHttpServer.startHttpServer()) {
File file = workDir.resolve("script.sql").toFile();
assertThat(file.createNewFile()).isTrue();
FileUtils.writeFileUtf8(file, script);
URL url = server.prepareResource("/download/script.sql", file);
verifyDeployScriptToCluster("yarn-application", null, url.toString(), script);
}
}

@Test
void testDeployScriptToKubernetesCluster() throws Exception {
verifyDeployScriptToCluster("kubernetes-application");
void testDeployScriptToKubernetesCluster(@TempDir Path workDir) throws Exception {
verifyDeployScriptToCluster("kubernetes-application", script, null, script);
try (MockHttpServer server = MockHttpServer.startHttpServer()) {
File file = workDir.resolve("script.sql").toFile();
assertThat(file.createNewFile()).isTrue();
FileUtils.writeFileUtf8(file, script);
URL url = server.prepareResource("/download/script.sql", file);
verifyDeployScriptToCluster("kubernetes-application", null, url.toString(), script);
}
}

private void verifyDeployScriptToCluster(String target) throws Exception {
private void verifyDeployScriptToCluster(
String target, @Nullable String script, @Nullable String scriptUri, String content)
throws Exception {
TestApplicationClusterClientFactory.id = target;
String script =
"CREATE TEMPORARY TABLE sink(\n"
+ " a INT\n"
+ ") WITH (\n"
+ " 'connector' = 'blackhole'\n"
+ ");\n"
+ "INSERT INTO sink VALUES (1);";

assertThat(
restClient
.sendRequest(
Expand All @@ -129,7 +152,7 @@ private void verifyDeployScriptToCluster(String target) throws Exception {
new SessionMessageParameters(sessionHandle),
new DeployScriptRequestBody(
script,
null,
scriptUri,
Collections.singletonMap(
DeploymentOptions.TARGET.key(), target)))
.get()
Expand All @@ -139,7 +162,7 @@ private void verifyDeployScriptToCluster(String target) throws Exception {
assertThat(TestApplicationClusterClientFactory.configuration.getString("key", "none"))
.isEqualTo("value");
assertThat(config.getApplicationClassName()).isEqualTo(SqlDriver.class.getName());
assertThat(SqlDriver.parseOptions(config.getProgramArguments())).isEqualTo(script);
assertThat(SqlDriver.parseOptions(config.getProgramArguments())).isEqualTo(content);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ void testRunScriptFromFile(@TempDir Path workDir) throws Exception {
udfJar.getAbsolutePath(), GENERATED_LOWER_UDF_CLASS);

List<String> arguments =
Arrays.asList("--scriptPath", createStatementFile(workDir, script).toString());
Arrays.asList("--scriptUri", createStatementFile(workDir, script).toString());
runScriptInCluster(arguments);

assertThat(TestValuesTableFactory.getResultsAsStrings("sink"))
Expand All @@ -166,7 +166,7 @@ void testRunScriptFromRemoteFile(@TempDir Path workDir) throws Exception {

try (MockHttpServer server = MockHttpServer.startHttpServer()) {
URL url = server.prepareResource("/download/script.sql", file);
List<String> arguments = Arrays.asList("--scriptPath", url.toString());
List<String> arguments = Arrays.asList("--scriptUri", url.toString());
runScriptInCluster(arguments);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@
"script" : {
"type" : "string"
},
"scriptPath" : {
"scriptUri" : {
"type" : "string"
},
"executionConfig" : {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ public class SqlDriver {

public static final Option OPTION_SQL_FILE =
Option.builder()
.longOpt("scriptPath")
.longOpt("scriptUri")
.numberOfArgs(1)
.desc("SQL script file path. It supports to fetch files from the DFS or HTTP.")
.desc("SQL script file URI. It supports to fetch files from the DFS or HTTP.")
.build();

public static final Option OPTION_SQL_SCRIPT =
Expand Down Expand Up @@ -162,11 +162,11 @@ public static String parseOptions(String[] args) {
if (content == null) {
return Preconditions.checkNotNull(
line.getOptionValue(OPTION_SQL_SCRIPT.getLongOpt()),
"Please use \"--script\" or \"--scriptPath\" to specify script either.");
"Please use \"--script\" or \"--scriptUri\" to specify script either.");
} else {
Preconditions.checkArgument(
line.getOptionValue(OPTION_SQL_SCRIPT.getLongOpt()) == null,
"Don't set \"--script\" or \"--scriptPath\" together.");
"Don't set \"--script\" or \"--scriptUri\" together.");
return content;
}
} catch (ParseException | URISyntaxException e) {
Expand Down

0 comments on commit 2c4cd56

Please sign in to comment.