Skip to content

Commit

Permalink
[FLINK-10411] Make ClusterEntrypoint more compositional
Browse files Browse the repository at this point in the history
Introduce a ClusterComponent which encapsulates the logic for starting the cluster
components, Dispatcher, RestServerEndpoint and the ResourceManager. The individual
components are created by using a factory instance. The ClusterEntrypoint is now
only responsible for managing the required services needed by the ClusterComponent.
This design should make the testing of these components easier, improve reusability
and reduce code duplication.
  • Loading branch information
tillrohrmann committed Sep 28, 2018
1 parent e663990 commit 1fcbc9f
Show file tree
Hide file tree
Showing 27 changed files with 1,387 additions and 795 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.container.entrypoint;

import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.PackagedProgramUtils;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.runtime.entrypoint.JobGraphRetriever;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.util.FlinkException;

import javax.annotation.Nonnull;

/**
* {@link JobGraphRetriever} which creates the {@link JobGraph} from a class
* on the class path.
*/
public class ClassPathJobGraphRetriever implements JobGraphRetriever {

@Nonnull
private final String jobClassName;

@Nonnull
private final SavepointRestoreSettings savepointRestoreSettings;

@Nonnull
private final String[] programArguments;

public ClassPathJobGraphRetriever(
@Nonnull String jobClassName,
@Nonnull SavepointRestoreSettings savepointRestoreSettings,
@Nonnull String[] programArguments) {
this.jobClassName = jobClassName;
this.savepointRestoreSettings = savepointRestoreSettings;
this.programArguments = programArguments;
}

@Override
public JobGraph retrieveJobGraph(Configuration configuration) throws FlinkException {
final PackagedProgram packagedProgram = createPackagedProgram();
final int defaultParallelism = configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM);
try {
final JobGraph jobGraph = PackagedProgramUtils.createJobGraph(packagedProgram, configuration, defaultParallelism);
jobGraph.setAllowQueuedScheduling(true);
jobGraph.setSavepointRestoreSettings(savepointRestoreSettings);

return jobGraph;
} catch (Exception e) {
throw new FlinkException("Could not create the JobGraph from the provided user code jar.", e);
}
}

private PackagedProgram createPackagedProgram() throws FlinkException {
try {
final Class<?> mainClass = getClass().getClassLoader().loadClass(jobClassName);
return new PackagedProgram(mainClass, programArguments);
} catch (ClassNotFoundException | ProgramInvocationException e) {
throw new FlinkException("Could not load the provided entrypoint class.", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,38 +18,20 @@

package org.apache.flink.container.entrypoint;

import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.PackagedProgramUtils;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.entrypoint.ClusterComponent;
import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.entrypoint.FlinkParseException;
import org.apache.flink.runtime.entrypoint.JobClusterComponent;
import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint;
import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.resourcemanager.ResourceManager;
import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.util.JvmShutdownSafeguard;
import org.apache.flink.runtime.util.SignalHandler;
import org.apache.flink.util.FlinkException;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import java.util.concurrent.CompletableFuture;

import static org.apache.flink.util.Preconditions.checkNotNull;

Expand Down Expand Up @@ -79,62 +61,10 @@ public final class StandaloneJobClusterEntryPoint extends JobClusterEntrypoint {
}

@Override
protected JobGraph retrieveJobGraph(Configuration configuration) throws FlinkException {
final PackagedProgram packagedProgram = createPackagedProgram();
final int defaultParallelism = configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM);
try {
final JobGraph jobGraph = PackagedProgramUtils.createJobGraph(packagedProgram, configuration, defaultParallelism);
jobGraph.setAllowQueuedScheduling(true);
jobGraph.setSavepointRestoreSettings(savepointRestoreSettings);

return jobGraph;
} catch (Exception e) {
throw new FlinkException("Could not create the JobGraph from the provided user code jar.", e);
}
}

private PackagedProgram createPackagedProgram() throws FlinkException {
try {
final Class<?> mainClass = getClass().getClassLoader().loadClass(jobClassName);
return new PackagedProgram(mainClass, programArguments);
} catch (ClassNotFoundException | ProgramInvocationException e) {
throw new FlinkException("Could not load the provided entrypoint class.", e);
}
}

@Override
protected void registerShutdownActions(CompletableFuture<ApplicationStatus> terminationFuture) {
terminationFuture.thenAccept((status) -> shutDownAndTerminate(0, ApplicationStatus.SUCCEEDED, null, true));
}

@Override
protected ResourceManager<?> createResourceManager(
Configuration configuration,
ResourceID resourceId,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry,
FatalErrorHandler fatalErrorHandler,
ClusterInformation clusterInformation,
@Nullable String webInterfaceUrl) throws Exception {
final ResourceManagerRuntimeServicesConfiguration resourceManagerRuntimeServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
final ResourceManagerRuntimeServices resourceManagerRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration(
resourceManagerRuntimeServicesConfiguration,
highAvailabilityServices,
rpcService.getScheduledExecutor());

return new StandaloneResourceManager(
rpcService,
ResourceManager.RESOURCE_MANAGER_NAME,
resourceId,
highAvailabilityServices,
heartbeatServices,
resourceManagerRuntimeServices.getSlotManager(),
metricRegistry,
resourceManagerRuntimeServices.getJobLeaderIdService(),
clusterInformation,
fatalErrorHandler);
protected ClusterComponent<?> createDispatcherComponent(Configuration configuration) {
return new JobClusterComponent(
StandaloneResourceManagerFactory.INSTANCE,
new ClassPathJobGraphRetriever(jobClassName, savepointRestoreSettings, programArguments));
}

public static void main(String[] args) {
Expand Down Expand Up @@ -166,5 +96,4 @@ public static void main(String[] args) {

entrypoint.startCluster();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,24 +32,24 @@
import static org.junit.Assert.assertThat;

/**
* Tests for the {@link StandaloneJobClusterEntryPoint}.
* Tests for the {@link ClassPathJobGraphRetriever}.
*/
public class StandaloneJobClusterEntryPointTest extends TestLogger {
public class ClassPathJobGraphRetrieverTest extends TestLogger {

public static final String[] PROGRAM_ARGUMENTS = {"--arg", "suffix"};

@Test
public void testJobGraphRetrieval() throws FlinkException {
final Configuration configuration = new Configuration();
final int parallelism = 42;
final Configuration configuration = new Configuration();
configuration.setInteger(CoreOptions.DEFAULT_PARALLELISM, parallelism);
final StandaloneJobClusterEntryPoint standaloneJobClusterEntryPoint = new StandaloneJobClusterEntryPoint(
configuration,

final ClassPathJobGraphRetriever classPathJobGraphRetriever = new ClassPathJobGraphRetriever(
TestJob.class.getCanonicalName(),
SavepointRestoreSettings.none(),
PROGRAM_ARGUMENTS);

final JobGraph jobGraph = standaloneJobClusterEntryPoint.retrieveJobGraph(configuration);
final JobGraph jobGraph = classPathJobGraphRetriever.retrieveJobGraph(configuration);

assertThat(jobGraph.getName(), is(equalTo(TestJob.class.getCanonicalName() + "-suffix")));
assertThat(jobGraph.getMaximumParallelism(), is(parallelism));
Expand All @@ -59,13 +59,13 @@ public void testJobGraphRetrieval() throws FlinkException {
public void testSavepointRestoreSettings() throws FlinkException {
final Configuration configuration = new Configuration();
final SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.forPath("foobar", true);
final StandaloneJobClusterEntryPoint jobClusterEntryPoint = new StandaloneJobClusterEntryPoint(
configuration,

final ClassPathJobGraphRetriever classPathJobGraphRetriever = new ClassPathJobGraphRetriever(
TestJob.class.getCanonicalName(),
savepointRestoreSettings,
PROGRAM_ARGUMENTS);

final JobGraph jobGraph = jobClusterEntryPoint.retrieveJobGraph(configuration);
final JobGraph jobGraph = classPathJobGraphRetriever.retrieveJobGraph(configuration);

assertThat(jobGraph.getSavepointRestoreSettings(), is(equalTo(savepointRestoreSettings)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;

/**
* Test job which is used for {@link StandaloneJobClusterEntryPointTest}.
* Test job which is used for {@link ClassPathJobGraphRetrieverTest}.
*/
public class TestJob {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,54 +20,35 @@

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager;
import org.apache.flink.mesos.runtime.clusterframework.MesosResourceManagerFactory;
import org.apache.flink.mesos.runtime.clusterframework.MesosTaskManagerParameters;
import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
import org.apache.flink.mesos.runtime.clusterframework.services.MesosServicesUtils;
import org.apache.flink.mesos.util.MesosConfiguration;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.clusterframework.ContainerSpecification;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.entrypoint.ClusterComponent;
import org.apache.flink.runtime.entrypoint.FileJobGraphRetriever;
import org.apache.flink.runtime.entrypoint.JobClusterComponent;
import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.resourcemanager.ResourceManager;
import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.util.JvmShutdownSafeguard;
import org.apache.flink.runtime.util.SignalHandler;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;

import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;

import javax.annotation.Nullable;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.concurrent.CompletableFuture;

/**
* Entry point for Mesos per-job clusters.
*/
public class MesosJobClusterEntrypoint extends JobClusterEntrypoint {

public static final String JOB_GRAPH_FILE_PATH = "flink.jobgraph.path";

// ------------------------------------------------------------------------
// Command-line options
// ------------------------------------------------------------------------
Expand Down Expand Up @@ -113,58 +94,6 @@ protected void initializeServices(Configuration config) throws Exception {
taskManagerContainerSpec = MesosEntrypointUtils.createContainerSpec(config, dynamicProperties);
}

@Override
protected ResourceManager<?> createResourceManager(
Configuration configuration,
ResourceID resourceId,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry,
FatalErrorHandler fatalErrorHandler,
ClusterInformation clusterInformation,
@Nullable String webInterfaceUrl) throws Exception {
final ResourceManagerRuntimeServicesConfiguration rmServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
final ResourceManagerRuntimeServices rmRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration(
rmServicesConfiguration,
highAvailabilityServices,
rpcService.getScheduledExecutor());

return new MesosResourceManager(
rpcService,
ResourceManager.RESOURCE_MANAGER_NAME,
resourceId,
highAvailabilityServices,
heartbeatServices,
rmRuntimeServices.getSlotManager(),
metricRegistry,
rmRuntimeServices.getJobLeaderIdService(),
clusterInformation,
fatalErrorHandler,
configuration,
mesosServices,
schedulerConfiguration,
taskManagerParameters,
taskManagerContainerSpec,
webInterfaceUrl);
}

@Override
protected JobGraph retrieveJobGraph(Configuration configuration) throws FlinkException {
String jobGraphFile = configuration.getString(JOB_GRAPH_FILE_PATH, "job.graph");
File fp = new File(jobGraphFile);

try (FileInputStream input = new FileInputStream(fp);
ObjectInputStream obInput = new ObjectInputStream(input)) {

return (JobGraph) obInput.readObject();
} catch (FileNotFoundException e) {
throw new FlinkException("Could not find the JobGraph file.", e);
} catch (ClassNotFoundException | IOException e) {
throw new FlinkException("Could not load the JobGraph from file.", e);
}
}

@Override
protected CompletableFuture<Void> stopClusterServices(boolean cleanupHaData) {
final CompletableFuture<Void> serviceShutDownFuture = super.stopClusterServices(cleanupHaData);
Expand All @@ -179,7 +108,15 @@ protected CompletableFuture<Void> stopClusterServices(boolean cleanupHaData) {
}

@Override
protected void registerShutdownActions(CompletableFuture<ApplicationStatus> terminationFuture) {}
protected ClusterComponent<?> createDispatcherComponent(Configuration configuration) {
return new JobClusterComponent(
new MesosResourceManagerFactory(
mesosServices,
schedulerConfiguration,
taskManagerParameters,
taskManagerContainerSpec),
FileJobGraphRetriever.createFrom(configuration));
}

public static void main(String[] args) {
// startup checks and logging
Expand Down
Loading

0 comments on commit 1fcbc9f

Please sign in to comment.