Skip to content

Commit

Permalink
[ZEPPELIN-5669] Check pyflink folder existence in yarn application mode
Browse files Browse the repository at this point in the history
### What is this PR for?

Trivial PR to check the existence of the folder, and throw a meaningful error when it doesn't exist.

### What type of PR is it?
[ Improvement ]

### Todos
* [ ] - Task

### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-5669

### How should this be tested?
* CI pass

### Screenshots (if appropriate)

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

Author: Jeff Zhang <[email protected]>

Closes apache#4320 from zjffdu/ZEPPELIN-5669 and squashes the following commits:

769d0aa [Jeff Zhang] [ZEPPELIN-5669] Check pyflink folder existence in yarn application mode
  • Loading branch information
zjffdu committed Apr 1, 2022
1 parent 4235da1 commit 148129e
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -125,20 +125,24 @@ public String getPyFlinkPythonPath(Properties properties) throws IOException {
if ("yarn-application".equalsIgnoreCase(mode)) {
// for yarn application mode, FLINK_HOME is container working directory
String flinkHome = new File(".").getAbsolutePath();
return getPyFlinkPythonPath(flinkHome + "/lib/python");
return getPyFlinkPythonPath(new File(flinkHome + "/lib/python"));
}

String flinkHome = System.getenv("FLINK_HOME");
if (StringUtils.isNotBlank(flinkHome)) {
return getPyFlinkPythonPath(flinkHome + "/opt/python");
return getPyFlinkPythonPath(new File(flinkHome + "/opt/python"));
} else {
throw new IOException("No FLINK_HOME is specified");
}
}

private String getPyFlinkPythonPath(String pyFlinkFolder) {
private String getPyFlinkPythonPath(File pyFlinkFolder) throws IOException {
LOGGER.info("Getting pyflink lib from {}", pyFlinkFolder);
List<File> depFiles = Arrays.asList(new File(pyFlinkFolder).listFiles());
if (!pyFlinkFolder.exists() || !pyFlinkFolder.isDirectory()) {
throw new IOException(String.format("PyFlink folder %s does not exist or is not a folder",
pyFlinkFolder.getAbsolutePath()));
}
List<File> depFiles = Arrays.asList(pyFlinkFolder.listFiles());
StringBuilder builder = new StringBuilder();
for (File file : depFiles) {
LOGGER.info("Adding extracted file {} to PYTHONPATH", file.getAbsolutePath());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,20 +128,24 @@ public String getPyFlinkPythonPath(Properties properties) throws IOException {
if ("yarn-application".equalsIgnoreCase(mode)) {
// for yarn application mode, FLINK_HOME is container working directory
String flinkHome = new File(".").getAbsolutePath();
return getPyFlinkPythonPath(flinkHome + "/lib/python");
return getPyFlinkPythonPath(new File(flinkHome + "/lib/python"));
}

String flinkHome = System.getenv("FLINK_HOME");
if (StringUtils.isNotBlank(flinkHome)) {
return getPyFlinkPythonPath(flinkHome + "/opt/python");
return getPyFlinkPythonPath(new File(flinkHome + "/opt/python"));
} else {
throw new IOException("No FLINK_HOME is specified");
}
}

private String getPyFlinkPythonPath(String pyFlinkFolder) {
private String getPyFlinkPythonPath(File pyFlinkFolder) throws IOException {
LOGGER.info("Getting pyflink lib from {}", pyFlinkFolder);
List<File> depFiles = Arrays.asList(new File(pyFlinkFolder).listFiles());
if (!pyFlinkFolder.exists() || !pyFlinkFolder.isDirectory()) {
throw new IOException(String.format("PyFlink folder %s does not exist or is not a folder",
pyFlinkFolder.getAbsolutePath()));
}
List<File> depFiles = Arrays.asList(pyFlinkFolder.listFiles());
StringBuilder builder = new StringBuilder();
for (File file : depFiles) {
LOGGER.info("Adding extracted file {} to PYTHONPATH", file.getAbsolutePath());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,20 +125,24 @@ public String getPyFlinkPythonPath(Properties properties) throws IOException {
if ("yarn-application".equalsIgnoreCase(mode)) {
// for yarn application mode, FLINK_HOME is container working directory
String flinkHome = new File(".").getAbsolutePath();
return getPyFlinkPythonPath(flinkHome + "/lib/python");
return getPyFlinkPythonPath(new File(flinkHome + "/lib/python"));
}

String flinkHome = System.getenv("FLINK_HOME");
if (StringUtils.isNotBlank(flinkHome)) {
return getPyFlinkPythonPath(flinkHome + "/opt/python");
return getPyFlinkPythonPath(new File(flinkHome + "/opt/python"));
} else {
throw new IOException("No FLINK_HOME is specified");
}
}

private String getPyFlinkPythonPath(String pyFlinkFolder) {
private String getPyFlinkPythonPath(File pyFlinkFolder) throws IOException {
LOGGER.info("Getting pyflink lib from {}", pyFlinkFolder);
List<File> depFiles = Arrays.asList(new File(pyFlinkFolder).listFiles());
if (!pyFlinkFolder.exists() || !pyFlinkFolder.isDirectory()) {
throw new IOException(String.format("PyFlink folder %s does not exist or is not a folder",
pyFlinkFolder.getAbsolutePath()));
}
List<File> depFiles = Arrays.asList(pyFlinkFolder.listFiles());
StringBuilder builder = new StringBuilder();
for (File file : depFiles) {
LOGGER.info("Adding extracted file {} to PYTHONPATH", file.getAbsolutePath());
Expand Down

0 comments on commit 148129e

Please sign in to comment.