Skip to content

Commit

Permalink
[FLINK-9890][Distributed Coordination] Remove obsolete class Resource…
Browse files Browse the repository at this point in the history
…ManagerConfiguration

The configuration values are effectively not used. This commit removes the class
and all its usages.

This closes apache#6368.
  • Loading branch information
GJL authored and tillrohrmann committed Jul 22, 2018
1 parent c6b39e4 commit 690ab2c
Show file tree
Hide file tree
Showing 27 changed files with 15 additions and 213 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.resourcemanager.ResourceManager;
import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
Expand Down Expand Up @@ -109,7 +108,6 @@ protected ResourceManager<?> createResourceManager(
FatalErrorHandler fatalErrorHandler,
ClusterInformation clusterInformation,
@Nullable String webInterfaceUrl) throws Exception {
final ResourceManagerConfiguration resourceManagerConfiguration = ResourceManagerConfiguration.fromConfiguration(configuration);
final ResourceManagerRuntimeServicesConfiguration resourceManagerRuntimeServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
final ResourceManagerRuntimeServices resourceManagerRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration(
resourceManagerRuntimeServicesConfiguration,
Expand All @@ -120,7 +118,6 @@ protected ResourceManager<?> createResourceManager(
rpcService,
ResourceManager.RESOURCE_MANAGER_NAME,
resourceId,
resourceManagerConfiguration,
highAvailabilityServices,
heartbeatServices,
resourceManagerRuntimeServices.getSlotManager(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.resourcemanager.ResourceManager;
import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
Expand Down Expand Up @@ -125,7 +124,6 @@ protected ResourceManager<?> createResourceManager(
FatalErrorHandler fatalErrorHandler,
ClusterInformation clusterInformation,
@Nullable String webInterfaceUrl) throws Exception {
final ResourceManagerConfiguration rmConfiguration = ResourceManagerConfiguration.fromConfiguration(configuration);
final ResourceManagerRuntimeServicesConfiguration rmServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
final ResourceManagerRuntimeServices rmRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration(
rmServicesConfiguration,
Expand All @@ -136,7 +134,6 @@ protected ResourceManager<?> createResourceManager(
rpcService,
ResourceManager.RESOURCE_MANAGER_NAME,
resourceId,
rmConfiguration,
highAvailabilityServices,
heartbeatServices,
rmRuntimeServices.getSlotManager(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.resourcemanager.ResourceManager;
import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
Expand Down Expand Up @@ -115,7 +114,6 @@ protected ResourceManager<?> createResourceManager(
FatalErrorHandler fatalErrorHandler,
ClusterInformation clusterInformation,
@Nullable String webInterfaceUrl) throws Exception {
final ResourceManagerConfiguration rmConfiguration = ResourceManagerConfiguration.fromConfiguration(configuration);
final ResourceManagerRuntimeServicesConfiguration rmServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
final ResourceManagerRuntimeServices rmRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration(
rmServicesConfiguration,
Expand All @@ -126,7 +124,6 @@ protected ResourceManager<?> createResourceManager(
rpcService,
ResourceManager.RESOURCE_MANAGER_NAME,
resourceId,
rmConfiguration,
highAvailabilityServices,
heartbeatServices,
rmRuntimeServices.getSlotManager(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
import org.apache.flink.runtime.resourcemanager.ResourceManager;
import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
Expand Down Expand Up @@ -144,7 +143,6 @@ public MesosResourceManager(
RpcService rpcService,
String resourceManagerEndpointId,
ResourceID resourceId,
ResourceManagerConfiguration resourceManagerConfiguration,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
SlotManager slotManager,
Expand All @@ -162,7 +160,6 @@ public MesosResourceManager(
rpcService,
resourceManagerEndpointId,
resourceId,
resourceManagerConfiguration,
highAvailabilityServices,
heartbeatServices,
slotManager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@
import org.apache.flink.runtime.metrics.MetricRegistryImpl;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.resourcemanager.SlotRequest;
import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceActions;
Expand Down Expand Up @@ -161,7 +160,6 @@ public TestingMesosResourceManager(
RpcService rpcService,
String resourceManagerEndpointId,
ResourceID resourceId,
ResourceManagerConfiguration resourceManagerConfiguration,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
SlotManager slotManager,
Expand All @@ -179,7 +177,6 @@ public TestingMesosResourceManager(
rpcService,
resourceManagerEndpointId,
resourceId,
resourceManagerConfiguration,
highAvailabilityServices,
heartbeatServices,
slotManager,
Expand Down Expand Up @@ -239,7 +236,6 @@ static class Context implements AutoCloseable {
MockMesosServices mesosServices;

// RM
ResourceManagerConfiguration rmConfiguration;
ResourceID rmResourceID;
static final String RM_ADDRESS = "resourceManager";
TestingMesosResourceManager resourceManager;
Expand Down Expand Up @@ -284,16 +280,12 @@ static class Context implements AutoCloseable {
Option.<String>empty(), Collections.<String>emptyList());

// resource manager
rmConfiguration = new ResourceManagerConfiguration(
Time.seconds(5L),
Time.seconds(5L));
rmResourceID = ResourceID.generate();
resourceManager =
new TestingMesosResourceManager(
rpcService,
RM_ADDRESS,
rmResourceID,
rmConfiguration,
rmServices.highAvailabilityServices,
rmServices.heartbeatServices,
rmServices.slotManager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.resourcemanager.ResourceManager;
import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
Expand Down Expand Up @@ -58,7 +57,6 @@ protected ResourceManager<?> createResourceManager(
FatalErrorHandler fatalErrorHandler,
ClusterInformation clusterInformation,
@Nullable String webInterfaceUrl) throws Exception {
final ResourceManagerConfiguration resourceManagerConfiguration = ResourceManagerConfiguration.fromConfiguration(configuration);
final ResourceManagerRuntimeServicesConfiguration resourceManagerRuntimeServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
final ResourceManagerRuntimeServices resourceManagerRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration(
resourceManagerRuntimeServicesConfiguration,
Expand All @@ -69,7 +67,6 @@ protected ResourceManager<?> createResourceManager(
rpcService,
FlinkResourceManager.RESOURCE_MANAGER_NAME,
resourceId,
resourceManagerConfiguration,
highAvailabilityServices,
heartbeatServices,
resourceManagerRuntimeServices.getSlotManager(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,30 +31,17 @@ public class JobMasterRegistrationSuccess extends RegistrationResponse.Success {

private static final long serialVersionUID = 5577641250204140415L;

private final long heartbeatInterval;

private final ResourceManagerId resourceManagerId;

private final ResourceID resourceManagerResourceId;

public JobMasterRegistrationSuccess(
final long heartbeatInterval,
final ResourceManagerId resourceManagerId,
final ResourceID resourceManagerResourceId) {
this.heartbeatInterval = heartbeatInterval;
this.resourceManagerId = checkNotNull(resourceManagerId);
this.resourceManagerResourceId = checkNotNull(resourceManagerResourceId);
}

/**
* Gets the interval in which the ResourceManager will heartbeat the JobMaster.
*
* @return the interval in which the ResourceManager will heartbeat the JobMaster
*/
public long getHeartbeatInterval() {
return heartbeatInterval;
}

public ResourceManagerId getResourceManagerId() {
return resourceManagerId;
}
Expand All @@ -66,8 +53,7 @@ public ResourceID getResourceManagerResourceId() {
@Override
public String toString() {
return "JobMasterRegistrationSuccess{" +
"heartbeatInterval=" + heartbeatInterval +
", resourceManagerLeaderId=" + resourceManagerId +
"resourceManagerId=" + resourceManagerId +
", resourceManagerResourceId=" + resourceManagerResourceId +
'}';
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,6 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
/** Unique id of the resource manager. */
private final ResourceID resourceId;

/** Configuration of the resource manager. */
private final ResourceManagerConfiguration resourceManagerConfiguration;

/** All currently registered JobMasterGateways scoped by JobID. */
private final Map<JobID, JobManagerRegistration> jobManagerRegistrations;

Expand Down Expand Up @@ -146,7 +143,6 @@ public ResourceManager(
RpcService rpcService,
String resourceManagerEndpointId,
ResourceID resourceId,
ResourceManagerConfiguration resourceManagerConfiguration,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
SlotManager slotManager,
Expand All @@ -158,7 +154,6 @@ public ResourceManager(
super(rpcService, resourceManagerEndpointId);

this.resourceId = checkNotNull(resourceId);
this.resourceManagerConfiguration = checkNotNull(resourceManagerConfiguration);
this.highAvailabilityServices = checkNotNull(highAvailabilityServices);
this.slotManager = checkNotNull(slotManager);
this.metricRegistry = checkNotNull(metricRegistry);
Expand Down Expand Up @@ -668,7 +663,6 @@ public void requestHeartbeat(ResourceID resourceID, Void payload) {
});

return new JobMasterRegistrationSuccess(
resourceManagerConfiguration.getHeartbeatInterval().toMilliseconds(),
getFencingToken(),
resourceId);
}
Expand Down Expand Up @@ -726,7 +720,6 @@ public void requestHeartbeat(ResourceID resourceID, Void payload) {
return new TaskExecutorRegistrationSuccess(
registration.getInstanceID(),
resourceId,
resourceManagerConfiguration.getHeartbeatInterval().toMilliseconds(),
clusterInformation);
}
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,6 @@ public ResourceManagerRunner(
Preconditions.checkNotNull(heartbeatServices);
Preconditions.checkNotNull(metricRegistry);

final ResourceManagerConfiguration resourceManagerConfiguration = ResourceManagerConfiguration.fromConfiguration(configuration);

final ResourceManagerRuntimeServicesConfiguration resourceManagerRuntimeServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);

resourceManagerRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration(
Expand All @@ -78,7 +76,6 @@ public ResourceManagerRunner(
rpcService,
resourceManagerEndpointId,
resourceId,
resourceManagerConfiguration,
highAvailabilityServices,
heartbeatServices,
resourceManagerRuntimeServices.getSlotManager(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ public StandaloneResourceManager(
RpcService rpcService,
String resourceManagerEndpointId,
ResourceID resourceId,
ResourceManagerConfiguration resourceManagerConfiguration,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
SlotManager slotManager,
Expand All @@ -56,7 +55,6 @@ public StandaloneResourceManager(
rpcService,
resourceManagerEndpointId,
resourceId,
resourceManagerConfiguration,
highAvailabilityServices,
heartbeatServices,
slotManager,
Expand Down
Loading

0 comments on commit 690ab2c

Please sign in to comment.