Skip to content

Commit

Permalink
[FLINK-4308] [web frontend] Add optional config parameter 'jobmanager…
Browse files Browse the repository at this point in the history
….web.uploaddir' to specifiy job jar location

This allows users for example to prepackage jars in docker images

This closes apache#2335
  • Loading branch information
Zhenzhong Xu authored and StephanEwen committed Aug 5, 2016
1 parent 0870007 commit 8d4a64d
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 8 deletions.
5 changes: 4 additions & 1 deletion docs/setup/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,10 @@ If you are on YARN, then it is sufficient to authenticate the client with Kerber
- `jobmanager.web.port`: Port of the JobManager's web interface (DEFAULT: 8081).

- `jobmanager.web.tmpdir`: This configuration parameter allows defining the Flink web directory to be used by the web interface. The web interface
will copy its static files into the directory. Also uploaded job jars are stored in the directory. By default, the temporary directory is used.
will copy its static files into the directory. Also uploaded job jars are stored in the directory if not overridden. By default, the temporary directory is used.

- `jobmanager.web.upload.dir`: The config parameter defining the directory for uploading the job jars. If not specified a dynamic directory
will be used under the directory specified by jobmanager.web.tmpdir.

- `fs.overwrite-files`: Specifies whether file output writers should overwrite existing files by default. Set to *true* to overwrite by default, *false* otherwise. (DEFAULT: false)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,13 @@ public final class ConfigConstants {
* The config parameter defining the flink web directory to be used by the webmonitor.
*/
public static final String JOB_MANAGER_WEB_TMPDIR_KEY = "jobmanager.web.tmpdir";


/**
* The config parameter defining the directory for uploading the job jars. If not specified a dynamic directory
* will be used under the directory specified by JOB_MANAGER_WEB_TMPDIR_KEY.
*/
public static final String JOB_MANAGER_WEB_UPLOAD_DIR_KEY = "jobmanager.web.upload.dir";

/**
* The config parameter defining the number of archived jobs for the jobmanager
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,12 @@ public WebRuntimeMonitor(
final boolean webSubmitAllow = cfg.isProgramSubmitEnabled();
if (webSubmitAllow) {
// create storage for uploads
String uploadDirName = "flink-web-upload-" + UUID.randomUUID();
this.uploadDir = new File(getBaseDir(config), uploadDirName);
if (!uploadDir.mkdir() || !uploadDir.canWrite()) {
throw new IOException("Unable to create temporary directory to support jar uploads.");
this.uploadDir = getUploadDir(config);
// the upload directory should either 1. exist and writable or 2. can be created and writable
if (!(uploadDir.exists() && uploadDir.canWrite()) && !(uploadDir.mkdir() && uploadDir.canWrite())) {
throw new IOException(
String.format("Jar upload directory %s cannot be created or is not writable.",
uploadDir.getAbsolutePath()));
}
LOG.info("Using directory {} for web frontend JAR file uploads", uploadDir);
}
Expand Down Expand Up @@ -437,12 +439,23 @@ private void cleanup() {
// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------

private RuntimeMonitorHandler handler(RequestHandler handler) {
return new RuntimeMonitorHandler(handler, retriever, jobManagerAddressPromise.future(), timeout);
}

File getBaseDir(Configuration configuration) {
return new File(configuration.getString(ConfigConstants.JOB_MANAGER_WEB_TMPDIR_KEY, System.getProperty("java.io.tmpdir")));
return new File(getBaseDirStr(configuration));
}

private String getBaseDirStr(Configuration configuration) {
return configuration.getString(ConfigConstants.JOB_MANAGER_WEB_TMPDIR_KEY, System.getProperty("java.io.tmpdir"));
}

private File getUploadDir(Configuration configuration) {
File baseDir = new File(configuration.getString(ConfigConstants.JOB_MANAGER_WEB_UPLOAD_DIR_KEY,
getBaseDirStr(configuration)));

boolean uploadDirSpecified = configuration.containsKey(ConfigConstants.JOB_MANAGER_WEB_UPLOAD_DIR_KEY);
return uploadDirSpecified ? baseDir : new File(baseDir, "flink-web-" + UUID.randomUUID());
}
}

0 comments on commit 8d4a64d

Please sign in to comment.