Skip to content

Commit

Permalink
[FLINK-2787] [core] Prevent instantiation of RemoteEnvironment when r…
Browse files Browse the repository at this point in the history
…unning a program through the command line.

This also cleans up the use of context environments.
  • Loading branch information
StephanEwen committed Oct 1, 2015
1 parent c04a770 commit 891db5e
Show file tree
Hide file tree
Showing 9 changed files with 109 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import java.util.List;

import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.Plan;
Expand All @@ -31,7 +30,6 @@
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.dag.DataSinkNode;
Expand Down Expand Up @@ -63,7 +61,7 @@ public class LocalExecutor extends PlanExecutor {
private LocalFlinkMiniCluster flink;

/** Custom user configuration for the execution */
private Configuration configuration;
private final Configuration configuration;

/** Config value for how many slots to provide in the local cluster */
private int taskManagerNumSlots = DEFAULT_TASK_MANAGER_NUM_SLOTS;
Expand All @@ -78,11 +76,6 @@ public LocalExecutor() {
}

public LocalExecutor(Configuration conf) {
if (!ExecutionEnvironment.localExecutionIsAllowed()) {
throw new InvalidProgramException(
"The LocalEnvironment cannot be used when submitting a program through a client.");
}

this.configuration = conf != null ? conf : new Configuration();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,14 +283,13 @@ public JobExecutionResult runBlocking(PackagedProgram prog, int parallelism) thr
else if (prog.isUsingInteractiveMode()) {
LOG.info("Starting program in interactive mode");
ContextEnvironment.setAsContext(this, prog.getAllLibraries(), prog.getUserCodeClassLoader(), parallelism, true);
ContextEnvironment.enableLocalExecution(false);

// invoke here
try {
prog.invokeInteractiveModeForExecution();
}
finally {
ContextEnvironment.enableLocalExecution(true);
ContextEnvironment.unsetContext();
}

return JobExecutionResult.fromJobSubmissionResult(new JobSubmissionResult(lastJobID));
Expand All @@ -310,14 +309,13 @@ public JobSubmissionResult runDetached(PackagedProgram prog, int parallelism)
else if (prog.isUsingInteractiveMode()) {
LOG.info("Starting program in interactive mode");
ContextEnvironment.setAsContext(this, prog.getAllLibraries(), prog.getUserCodeClassLoader(), parallelism, false);
ContextEnvironment.enableLocalExecution(false);

// invoke here
try {
prog.invokeInteractiveModeForExecution();
}
finally {
ContextEnvironment.enableLocalExecution(true);
ContextEnvironment.unsetContext();
}

return new JobSubmissionResult(lastJobID);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,17 @@ static void setAsContext(Client client, List<File> jarFilesToAttach,
initializeContextEnvironment(factory);
}

protected static void enableLocalExecution(boolean enabled) {
ExecutionEnvironment.enableLocalExecution(enabled);
static void unsetContext() {
resetContextEnvironment();
}

// --------------------------------------------------------------------------------------------


/**
* The factory that instantiates the environment to be used when running jobs that are
* submitted through a pre-configured client connection.
* This happens for example when a job is submitted from the command line.
*/
public static class ContextEnvironmentFactory implements ExecutionEnvironmentFactory {

private final Client client;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,17 +66,17 @@ public void startNewSession() {
}

public FlinkPlan getOptimizedPlan(PackagedProgram prog) throws ProgramInvocationException {
setAsContext();


// temporarily write syserr and sysout to a byte array.
PrintStream originalOut = System.out;
PrintStream originalErr = System.err;
ByteArrayOutputStream baos = new ByteArrayOutputStream();
System.setOut(new PrintStream(baos));
ByteArrayOutputStream baes = new ByteArrayOutputStream();
System.setErr(new PrintStream(baes));

setAsContext();
try {
ContextEnvironment.enableLocalExecution(false);
prog.invokeInteractiveModeForExecution();
}
catch (ProgramInvocationException e) {
Expand All @@ -91,7 +91,7 @@ public FlinkPlan getOptimizedPlan(PackagedProgram prog) throws ProgramInvocation
}
}
finally {
ContextEnvironment.enableLocalExecution(true);
unsetAsContext();
System.setOut(originalOut);
System.setErr(originalErr);
System.err.println(baes);
Expand All @@ -115,6 +115,10 @@ public ExecutionEnvironment createExecutionEnvironment() {
};
initializeContextEnvironment(factory);
}

private void unsetAsContext() {
resetContextEnvironment();
}

// ------------------------------------------------------------------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,6 @@ else if (isUsingInteractiveMode()) {
PreviewPlanEnvironment env = new PreviewPlanEnvironment();
env.setAsContext();
try {
ContextEnvironment.enableLocalExecution(false);
invokeInteractiveModeForExecution();
}
catch (ProgramInvocationException e) {
Expand All @@ -276,7 +275,7 @@ else if (isUsingInteractiveMode()) {
}
}
finally {
ContextEnvironment.enableLocalExecution(true);
env.unsetAsContext();
}

if (env.previewPlan != null) {
Expand All @@ -292,12 +291,8 @@ else if (isUsingInteractiveMode()) {

PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator();
StringWriter string = new StringWriter(1024);
PrintWriter pw = null;
try {
pw = new PrintWriter(string);
try (PrintWriter pw = new PrintWriter(string)) {
jsonGen.dumpPactPlanAsJSON(previewPlan, pw);
} finally {
pw.close();
}
return string.toString();

Expand Down Expand Up @@ -455,9 +450,9 @@ private static void callMainMethod(Class<?> entryClass, String[] args) throws Pr
}

private static String getEntryPointClassNameFromJar(File jarFile) throws ProgramInvocationException {
JarFile jar = null;
Manifest manifest = null;
String className = null;
JarFile jar;
Manifest manifest;
String className;

// Open jar file
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public String getExecutionPlan() throws Exception {
}

@Override
public void startNewSession() throws Exception {
public void startNewSession() {
}

public void setAsContext() {
Expand All @@ -69,6 +69,11 @@ public ExecutionEnvironment createExecutionEnvironment() {
};
initializeContextEnvironment(factory);
}

public void unsetAsContext() {
resetContextEnvironment();
}


public void setPreview(String preview) {
this.preview = preview;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,6 @@ public abstract class ExecutionEnvironment {
/** The default parallelism used by local environments */
private static int defaultLocalDop = Runtime.getRuntime().availableProcessors();

/** flag to disable local executor when using the ContextEnvironment */
private static boolean allowLocalExecution = true;

// --------------------------------------------------------------------------------------------

private final List<DataSink<?>> sinks = new ArrayList<DataSink<?>>();
Expand Down Expand Up @@ -1127,9 +1124,7 @@ public static LocalEnvironment createLocalEnvironment(int parallelism) {
* @return A local execution environment with the specified parallelism.
*/
public static LocalEnvironment createLocalEnvironment(Configuration customConfiguration) {
LocalEnvironment lee = new LocalEnvironment();
lee.setConfiguration(customConfiguration);
return lee;
return new LocalEnvironment(customConfiguration);
}

/**
Expand Down Expand Up @@ -1159,16 +1154,15 @@ public static ExecutionEnvironment createRemoteEnvironment(String host, int port
*
* @param host The host name or address of the master (JobManager), where the program should be executed.
* @param port The port of the master (JobManager), where the program should be executed.
* @param clientConfiguration Pass a custom configuration to the Client.
* @param clientConfiguration Configuration used by the client that connects to the cluster.
* @param jarFiles The JAR files with code that needs to be shipped to the cluster. If the program uses
* user-defined functions, user-defined input formats, or any libraries, those must be
* provided in the JAR files.
* @return A remote environment that executes the program on a cluster.
*/
public static ExecutionEnvironment createRemoteEnvironment(String host, int port, Configuration clientConfiguration, String... jarFiles) {
RemoteEnvironment rec = new RemoteEnvironment(host, port, jarFiles);
rec.setClientConfiguration(clientConfiguration);
return rec;
public static ExecutionEnvironment createRemoteEnvironment(
String host, int port, Configuration clientConfiguration, String... jarFiles) {
return new RemoteEnvironment(host, port, clientConfiguration, jarFiles);
}

/**
Expand Down Expand Up @@ -1201,23 +1195,40 @@ public static void setDefaultLocalParallelism(int parallelism) {
}

// --------------------------------------------------------------------------------------------
// Methods to control the context and local environments for execution from packaged programs
// Methods to control the context environment and creation of explicit environments other
// than the context environment
// --------------------------------------------------------------------------------------------


/**
* Sets a context environment factory, that creates the context environment for running programs
* with pre-configured environments. Examples are running programs from the command line, and
* running programs in the Scala shell.
*
* <p>When the context environment factors is set, no other environments can be explicitly used.
*
* @param ctx The context environment factory.
*/
protected static void initializeContextEnvironment(ExecutionEnvironmentFactory ctx) {
contextEnvironmentFactory = ctx;
contextEnvironmentFactory = Preconditions.checkNotNull(ctx);
}

protected static boolean isContextEnvironmentSet() {
return contextEnvironmentFactory != null;
}

protected static void enableLocalExecution(boolean enabled) {
allowLocalExecution = enabled;
}

public static boolean localExecutionIsAllowed() {
return allowLocalExecution;

/**
* Un-sets the context environment factory. After this method is called, the call to
* {@link #getExecutionEnvironment()} will again return a default local execution environment, and
* it is possible to explicitly instantiate the LocalEnvironment and the RemoteEnvironment.
*/
protected static void resetContextEnvironment() {
contextEnvironmentFactory = null;
}

/**
* Checks whether it is currently permitted to explicitly instantiate a LocalEnvironment
* or a RemoteEnvironment.
*
* @return True, if it is possible to explicitly instantiate a LocalEnvironment or a
* RemoteEnvironment, false otherwise.
*/
public static boolean areExplicitEnvironmentsAllowed() {
return contextEnvironmentFactory == null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,17 @@
* An {@link ExecutionEnvironment} that runs the program locally, multi-threaded, in the JVM where the
* environment is instantiated.
*
* <p>When this environment is instantiated, it uses a default parallelism of {@code 1}. Teh default
* parallelism can be set via {@link #setParallelism(int)}.</p>
* <p>When this environment is instantiated, it uses a default parallelism of {@code 1}. The default
* parallelism can be set via {@link #setParallelism(int)}.
*
* <p>Local environments can also be instantiated through {@link ExecutionEnvironment#createLocalEnvironment()}
* and {@link ExecutionEnvironment#createLocalEnvironment(int)}. The former version will pick a
* default parallelism equal to the number of hardware contexts in the local machine.</p>
* default parallelism equal to the number of hardware contexts in the local machine.
*/
public class LocalEnvironment extends ExecutionEnvironment {

/** The user-defined configuration for the local execution */
private Configuration configuration;
private final Configuration configuration;

/** Create lazily upon first use */
private PlanExecutor executor;
Expand All @@ -53,20 +53,21 @@ public class LocalEnvironment extends ExecutionEnvironment {
* Creates a new local environment.
*/
public LocalEnvironment() {
if (!ExecutionEnvironment.localExecutionIsAllowed()) {
throw new InvalidProgramException("The LocalEnvironment cannot be used when submitting a program through a client.");
}
this.configuration = new Configuration();
this(new Configuration());
}

/**
* Sets a configuration used to configure the local Flink executor.
* If {@code null} is passed, then the default configuration will be used.
* Creates a new local environment that configures its local executor with the given configuration.
*
* @param customConfiguration The configuration to be used for the local execution.
* @param config The configuration used to configure the local executor.
*/
public void setConfiguration(Configuration customConfiguration) {
this.configuration = customConfiguration != null ? customConfiguration : new Configuration();
public LocalEnvironment(Configuration config) {
if (!ExecutionEnvironment.areExplicitEnvironmentsAllowed()) {
throw new InvalidProgramException(
"The LocalEnvironment cannot be instantiated when running in a pre-defined context " +
"(such as Command Line Client, Scala Shell, or TestEnvironment)");
}
this.configuration = config == null ? new Configuration() : config;
}

// --------------------------------------------------------------------------------------------
Expand Down
Loading

0 comments on commit 891db5e

Please sign in to comment.