Skip to content

Commit

Permalink
[FLINK-3713] [clients, runtime] Use user code class loader when dispo…
Browse files Browse the repository at this point in the history
…sing savepoint

Disposing savepoints via the JobManager fails for state handles or descriptors,
which contain user classes (for example custom folding state or RocksDB handles).

With this change, the user can provide the job JAR when disposing a savepoint in
order to use the user code class loader of that job. The JAR is optional, hence
not breaking the CLI API.

This closes apache#2083.
  • Loading branch information
uce committed Jul 18, 2016
1 parent bd8d7f5 commit b67e508
Show file tree
Hide file tree
Showing 31 changed files with 858 additions and 287 deletions.
49 changes: 26 additions & 23 deletions docs/apis/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ and connects by default to the running Flink master (JobManager) that was
started from the same installation directory.

A prerequisite to using the command line interface is that the Flink
master (JobManager) has been started (via
`<flink-home>/bin/start-local.sh` or
`<flink-home>/bin/start-cluster.sh`) or that a YARN environment is
master (JobManager) has been started (via
`<flink-home>/bin/start-local.sh` or
`<flink-home>/bin/start-cluster.sh`) or that a YARN environment is
available.

The command line can be used to
Expand Down Expand Up @@ -116,11 +116,11 @@ The command line can be used to
- Stop a job (streaming jobs only):

./bin/flink stop <jobID>


The difference between cancelling and stopping a (streaming) job is the following:

On a cancel call, the operators in a job immediately receive a `cancel()` method call to cancel them as
On a cancel call, the operators in a job immediately receive a `cancel()` method call to cancel them as
soon as possible.
If operators are not not stopping after the cancel call, Flink will start interrupting the thread periodically
until it stops.
Expand All @@ -142,22 +142,30 @@ This allows the job to finish processing all inflight data.

Returns the path of the created savepoint. You need this path to restore and dispose savepoints.

#### **Restore a savepoint**:
#### **Restore a savepoint**

{% highlight bash %}
./bin/flink run -s <savepointPath> ...
{% endhighlight %}

The run command has a savepoint flag to submit a job, which restores its state from a savepoint. The savepoint path is returned by the savepoint trigger command.

#### **Dispose a savepoint**:
#### **Dispose a savepoint**

{% highlight bash %}
./bin/flink savepoint -d <savepointPath>
{% endhighlight %}

Disposes the savepoint at the given path. The savepoint path is returned by the savepoint trigger command.

If you use custom state instances (for example custom reducing state or RocksDB state), you have to specify the path to the program JAR with which the savepoint was triggered in order to dispose the savepoint with the user code class loader:

{% highlight bash %}
./bin/flink savepoint -d <savepointPath> -j <jarFile>
{% endhighlight %}

Otherwise, you will run into a `ClassNotFoundException`.

## Usage

The command line syntax is as follows:
Expand Down Expand Up @@ -301,19 +309,14 @@ guarantees for a stop request.
Action "savepoint" triggers savepoints for a running job or disposes existing ones.
Syntax: savepoint [OPTIONS] <Job ID>
"savepoint" action options:
-d,--dispose <savepointPath> Disposes an existing savepoint.
-m,--jobmanager <host:port> Address of the JobManager (master) to which
to connect. Specify 'yarn-cluster' as the
JobManager to deploy a YARN cluster for the
job. Use this flag to connect to a different
JobManager than the one specified in the
configuration.
Additional arguments if -m yarn-cluster is set:
-yid <yarnApplicationId> YARN application ID of Flink YARN session to
connect to. Must not be set if JobManager HA
is used. In this case, JobManager RPC
location is automatically retrieved from
Zookeeper.
Syntax: savepoint [OPTIONS] <Job ID>
"savepoint" action options:
-d,--dispose <arg> Path of savepoint to dispose.
-j,--jarfile <jarfile> Flink program JAR file.
-m,--jobmanager <host:port> Address of the JobManager (master) to which
to connect. Use this flag to connect to a
different JobManager than the one specified
in the configuration.
Options for yarn-cluster mode:
-yid,--yarnapplicationId <arg> Attach to running YARN session
~~~
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.costs.DefaultCostEstimator;
Expand All @@ -52,6 +53,8 @@
import org.apache.flink.optimizer.plan.StreamingPlan;
import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.BlobClient;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobStatus;
Expand All @@ -67,10 +70,9 @@
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import scala.Option;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
Expand Down Expand Up @@ -604,11 +606,9 @@ protected int savepoint(String[] args) {
SavepointOptions options;
try {
options = CliFrontendParser.parseSavepointCommand(args);
}
catch (CliArgsException e) {
} catch (CliArgsException e) {
return handleArgException(e);
}
catch (Throwable t) {
} catch (Throwable t) {
return handleError(t);
}

Expand All @@ -620,9 +620,8 @@ protected int savepoint(String[] args) {

if (options.isDispose()) {
// Discard
return disposeSavepoint(options, options.getDisposeSavepointPath());
}
else {
return disposeSavepoint(options);
} else {
// Trigger
String[] cleanedArgs = options.getArgs();
JobID jobId;
Expand All @@ -631,14 +630,12 @@ protected int savepoint(String[] args) {
String jobIdString = cleanedArgs[0];
try {
jobId = new JobID(StringUtils.hexStringToByte(jobIdString));
}
catch (Exception e) {
return handleError(new IllegalArgumentException(
} catch (Exception e) {
return handleArgException(new IllegalArgumentException(
"Error: The value for the Job ID is not a valid ID."));
}
}
else {
return handleError(new IllegalArgumentException(
} else {
return handleArgException(new IllegalArgumentException(
"Error: The value for the Job ID is not a valid ID. " +
"Specify a Job ID to trigger a savepoint."));
}
Expand Down Expand Up @@ -693,35 +690,77 @@ else if (result instanceof TriggerSavepointFailure) {
* Sends a {@link org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint}
* message to the job manager.
*/
private int disposeSavepoint(SavepointOptions options, String savepointPath) {
private int disposeSavepoint(SavepointOptions options) {
try {
String savepointPath = options.getSavepointPath();
if (savepointPath == null) {
throw new IllegalArgumentException("Missing required argument: savepoint path. " +
"Usage: bin/flink savepoint -d <savepoint-path>");
}

String jarFile = options.getJarFilePath();

ActorGateway jobManager = getJobManagerGateway(options);
logAndSysout("Disposing savepoint '" + savepointPath + "'.");
Future<Object> response = jobManager.ask(new DisposeSavepoint(savepointPath), clientTimeout);

List<BlobKey> blobKeys = null;
if (jarFile != null) {
logAndSysout("Disposing savepoint '" + savepointPath + "' with JAR " + jarFile + ".");

List<File> libs = null;
try {
libs = PackagedProgram.extractContainedLibraries(new File(jarFile).toURI().toURL());
if (!libs.isEmpty()) {
List<Path> libPaths = new ArrayList<>(libs.size());
for (File f : libs) {
libPaths.add(new Path(f.toURI()));
}

logAndSysout("Uploading JAR files.");
LOG.debug("JAR files: " + libPaths);
blobKeys = BlobClient.uploadJarFiles(jobManager, clientTimeout, libPaths);
LOG.debug("Blob keys: " + blobKeys.toString());
}
} finally {
if (libs != null) {
PackagedProgram.deleteExtractedLibraries(libs);
}
}
} else {
logAndSysout("Disposing savepoint '" + savepointPath + "'.");
}

Object msg = new DisposeSavepoint(savepointPath, Option.apply(blobKeys));
Future<Object> response = jobManager.ask(msg, clientTimeout);

Object result;
try {
logAndSysout("Waiting for response...");
result = Await.result(response, clientTimeout);
}
catch (Exception e) {
} catch (Exception e) {
throw new Exception("Disposing the savepoint with path" + savepointPath + " failed.", e);
}

if (result.getClass() == JobManagerMessages.getDisposeSavepointSuccess().getClass()) {
logAndSysout("Savepoint '" + savepointPath + "' disposed.");
return 0;
}
else if (result instanceof DisposeSavepointFailure) {
} else if (result instanceof DisposeSavepointFailure) {
DisposeSavepointFailure failure = (DisposeSavepointFailure) result;
throw failure.cause();
}
else {

if (failure.cause() instanceof ClassNotFoundException) {
throw new ClassNotFoundException("Savepoint disposal failed, because of a " +
"missing class. This is most likely caused by a custom state " +
"instance, which cannot be disposed without the user code class " +
"loader. Please provide the program jar with which you have created " +
"the savepoint via -j <JAR> for disposal.",
failure.cause().getCause());
} else {
throw failure.cause();
}
} else {
throw new IllegalStateException("Unknown JobManager response of type " +
result.getClass());
}
}
catch (Throwable t) {
} catch (Throwable t) {
return handleError(t);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public class CliFrontendParser {
"Path to a savepoint to reset the job back to (for example file:///flink/savepoint-1537).");

static final Option SAVEPOINT_DISPOSE_OPTION = new Option("d", "dispose", true,
"Disposes an existing savepoint.");
"Path of savepoint to dispose.");

// list specific options
static final Option RUNNING_OPTION = new Option("r", "running", false,
Expand Down Expand Up @@ -112,9 +112,6 @@ public class CliFrontendParser {

SAVEPOINT_PATH_OPTION.setRequired(false);
SAVEPOINT_PATH_OPTION.setArgName("savepointPath");

SAVEPOINT_DISPOSE_OPTION.setRequired(false);
SAVEPOINT_DISPOSE_OPTION.setArgName("savepointPath");
}

private static final Options RUN_OPTIONS = getRunOptions(buildGeneralOptions(new Options()));
Expand Down Expand Up @@ -195,6 +192,7 @@ private static Options getStopOptions(Options options) {
private static Options getSavepointOptions(Options options) {
options = getJobManagerAddressOption(options);
options.addOption(SAVEPOINT_DISPOSE_OPTION);
options.addOption(JAR_OPTION);
return addCustomCliOptions(options, false);
}

Expand Down Expand Up @@ -234,6 +232,7 @@ private static Options getStopOptionsWithoutDeprecatedOptions(Options options) {
private static Options getSavepointOptionsWithoutDeprecatedOptions(Options options) {
options = getJobManagerAddressOption(options);
options.addOption(SAVEPOINT_DISPOSE_OPTION);
options.addOption(JAR_OPTION);
return options;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.commons.cli.CommandLine;

import static org.apache.flink.client.cli.CliFrontendParser.JAR_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.SAVEPOINT_DISPOSE_OPTION;

/**
Expand All @@ -29,12 +30,14 @@ public class SavepointOptions extends CommandLineOptions {
private final String[] args;
private boolean dispose;
private String disposeSavepointPath;
private String jarFile;

public SavepointOptions(CommandLine line) {
super(line);
this.args = line.getArgs();
this.dispose = line.hasOption(SAVEPOINT_DISPOSE_OPTION.getOpt());
this.disposeSavepointPath = line.getOptionValue(SAVEPOINT_DISPOSE_OPTION.getOpt());
args = line.getArgs();
dispose = line.hasOption(SAVEPOINT_DISPOSE_OPTION.getOpt());
disposeSavepointPath = line.getOptionValue(SAVEPOINT_DISPOSE_OPTION.getOpt());
jarFile = line.getOptionValue(JAR_OPTION.getOpt());
}

public String[] getArgs() {
Expand All @@ -45,7 +48,11 @@ public boolean isDispose() {
return dispose;
}

public String getDisposeSavepointPath() {
public String getSavepointPath() {
return disposeSavepointPath;
}

public String getJarFilePath() {
return jarFile;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ public PackagedProgram(File jarFile, List<URL> classpaths, String entryPointClas
}

// now that we have an entry point, we can extract the nested jar files (if any)
this.extractedTempLibraries = extractContainedLibaries(jarFileUrl);
this.extractedTempLibraries = extractContainedLibraries(jarFileUrl);
this.classpaths = classpaths;
this.userCodeClassLoader = JobWithJars.buildUserCodeClassLoader(getAllLibraries(), classpaths, getClass().getClassLoader());

Expand Down Expand Up @@ -642,7 +642,7 @@ private static Plan createPlanFromProgram(Program program, String[] options) thr
* @return The file names of the extracted temporary files.
* @throws ProgramInvocationException Thrown, if the extraction process failed.
*/
private static List<File> extractContainedLibaries(URL jarFile) throws ProgramInvocationException {
public static List<File> extractContainedLibraries(URL jarFile) throws ProgramInvocationException {

Random rnd = new Random();

Expand Down Expand Up @@ -741,7 +741,7 @@ private static List<File> extractContainedLibaries(URL jarFile) throws ProgramIn
}
}

private static void deleteExtractedLibraries(List<File> tempLibraries) {
public static void deleteExtractedLibraries(List<File> tempLibraries) {
for (File f : tempLibraries) {
f.delete();
}
Expand Down
Loading

0 comments on commit b67e508

Please sign in to comment.