Skip to content

Commit

Permalink
[FLINK-4490] [distributed coordination] (part 3) Rename methods on 'I…
Browse files Browse the repository at this point in the history
…nstance' to have more intuitive names

getResourceID() --> getTaskManagerID()
getInstanceConnectionInfo() --> getTaskManagerLocation()
  • Loading branch information
StephanEwen committed Sep 2, 2016
1 parent aaa474a commit eac6088
Show file tree
Hide file tree
Showing 19 changed files with 319 additions and 324 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public String handleRequest(Map<String, String> pathParams, Map<String, String>
gen.writeStartObject();
gen.writeStringField("id", instance.getId().toString());
gen.writeStringField("path", instance.getActorGateway().path());
gen.writeNumberField("dataPort", instance.getInstanceConnectionInfo().dataPort());
gen.writeNumberField("dataPort", instance.getTaskManagerLocation().dataPort());
gen.writeNumberField("timeSinceLastHeartbeat", instance.getLastHeartBeat());
gen.writeNumberField("slotsNumber", instance.getTotalNumberOfSlots());
gen.writeNumberField("freeSlots", instance.getNumberOfAvailableSlots());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,20 +44,20 @@ public class PartialInputChannelDeploymentDescriptor {
private final ResultPartitionID partitionID;

/** The partition connection info. */
private final TaskManagerLocation partitionConnectionInfo;
private final TaskManagerLocation partitionTaskManagerLocation;

/** The partition connection index. */
private final int partitionConnectionIndex;

public PartialInputChannelDeploymentDescriptor(
IntermediateDataSetID resultId,
ResultPartitionID partitionID,
TaskManagerLocation partitionConnectionInfo,
TaskManagerLocation partitionTaskManagerLocation,
int partitionConnectionIndex) {

this.resultId = checkNotNull(resultId);
this.partitionID = checkNotNull(partitionID);
this.partitionConnectionInfo = checkNotNull(partitionConnectionInfo);
this.partitionTaskManagerLocation = checkNotNull(partitionTaskManagerLocation);
this.partitionConnectionIndex = partitionConnectionIndex;
}

Expand All @@ -66,23 +66,20 @@ public PartialInputChannelDeploymentDescriptor(
*
* @see InputChannelDeploymentDescriptor
*/
public InputChannelDeploymentDescriptor createInputChannelDeploymentDescriptor(
Execution consumerExecution) {
public InputChannelDeploymentDescriptor createInputChannelDeploymentDescriptor(Execution consumerExecution) {
checkNotNull(consumerExecution, "consumerExecution");

checkNotNull(consumerExecution, "Consumer execution null");

TaskManagerLocation consumerConnectionInfo = consumerExecution.getAssignedResourceLocation();

checkNotNull(consumerConnectionInfo, "Consumer connection info null");
TaskManagerLocation consumerLocation = consumerExecution.getAssignedResourceLocation();
checkNotNull(consumerLocation, "Consumer connection info null");

final ResultPartitionLocation partitionLocation;

if (consumerConnectionInfo.equals(partitionConnectionInfo)) {
if (consumerLocation.equals(partitionTaskManagerLocation)) {
partitionLocation = ResultPartitionLocation.createLocal();
}
else {
partitionLocation = ResultPartitionLocation.createRemote(
new ConnectionID(partitionConnectionInfo, partitionConnectionIndex));
new ConnectionID(partitionTaskManagerLocation, partitionConnectionIndex));
}

return new InputChannelDeploymentDescriptor(partitionID, partitionLocation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,11 @@ public class Instance implements SlotOwner {
private final ActorGateway actorGateway;

/** The instance connection information for the data transfer. */
private final TaskManagerLocation connectionInfo;
private final TaskManagerLocation location;

/** A description of the resources of the task manager */
private final HardwareDescription resources;

/** The ID identifies the resource the task manager runs on */
private final ResourceID resourceId;

/** The ID identifying the taskManager. */
private final InstanceID instanceId;

Expand Down Expand Up @@ -90,22 +87,19 @@ public class Instance implements SlotOwner {
* Constructs an instance reflecting a registered TaskManager.
*
* @param actorGateway The actor gateway to communicate with the remote instance
* @param connectionInfo The remote connection where the task manager receives requests.
* @param resourceId The resource id which denotes the resource the task manager uses.
* @param location The remote connection where the task manager receives requests.
* @param id The id under which the taskManager is registered.
* @param resources The resources available on the machine.
* @param numberOfSlots The number of task slots offered by this taskManager.
*/
public Instance(
ActorGateway actorGateway,
TaskManagerLocation connectionInfo,
ResourceID resourceId,
TaskManagerLocation location,
InstanceID id,
HardwareDescription resources,
int numberOfSlots) {
this.actorGateway = actorGateway;
this.connectionInfo = connectionInfo;
this.resourceId = resourceId;
this.location = location;
this.instanceId = id;
this.resources = resources;
this.numberOfSlots = numberOfSlots;
Expand All @@ -120,8 +114,8 @@ public Instance(
// Properties
// --------------------------------------------------------------------------------------------

public ResourceID getResourceId() {
return resourceId;
public ResourceID getTaskManagerID() {
return location.getResourceID();
}

public InstanceID getId() {
Expand Down Expand Up @@ -246,7 +240,7 @@ public SimpleSlot allocateSimpleSlot(JobID jobID) throws InstanceDiedException {
return null;
}
else {
SimpleSlot slot = new SimpleSlot(jobID, this, connectionInfo, nextSlot, actorGateway);
SimpleSlot slot = new SimpleSlot(jobID, this, location, nextSlot, actorGateway);
allocatedSlots.add(slot);
return slot;
}
Expand Down Expand Up @@ -284,7 +278,7 @@ public SharedSlot allocateSharedSlot(JobID jobID, SlotSharingGroupAssignment sha
}
else {
SharedSlot slot = new SharedSlot(
jobID, this, connectionInfo, nextSlot, actorGateway, sharingGroupAssignment);
jobID, this, location, nextSlot, actorGateway, sharingGroupAssignment);
allocatedSlots.add(slot);
return slot;
}
Expand Down Expand Up @@ -355,8 +349,8 @@ public ActorGateway getActorGateway() {
return actorGateway;
}

public TaskManagerLocation getInstanceConnectionInfo() {
return connectionInfo;
public TaskManagerLocation getTaskManagerLocation() {
return location;
}

public int getNumberOfAvailableSlots() {
Expand Down Expand Up @@ -405,7 +399,7 @@ public void removeSlotListener() {

@Override
public String toString() {
return String.format("%s @ %s - %d slots - URL: %s", instanceId, connectionInfo.getHostname(),
return String.format("%s @ %s - %d slots - URL: %s", instanceId, location.getHostname(),
numberOfSlots, (actorGateway != null ? actorGateway.path() : "No instance gateway"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -138,21 +138,20 @@ public boolean reportHeartBeat(InstanceID instanceId, byte[] lastMetricsReport)
* for the job execution.
*
* @param taskManager ActorRef to the TaskManager which wants to be registered
* @param resourceID The resource id of the TaskManager
* @param connectionInfo ConnectionInfo of the TaskManager
* @param taskManagerLocation Location info of the TaskManager
* @param resources Hardware description of the TaskManager
* @param numberOfSlots Number of available slots on the TaskManager
* @param leaderSessionID The current leader session ID of the JobManager
* @return The assigned InstanceID of the registered task manager
*/
public InstanceID registerTaskManager(
ActorRef taskManager,
ResourceID resourceID,
TaskManagerLocation connectionInfo,
TaskManagerLocation taskManagerLocation,
HardwareDescription resources,
int numberOfSlots,
UUID leaderSessionID){
synchronized(this.lock){
UUID leaderSessionID) {

synchronized (this.lock) {
if (this.isShutdown) {
throw new IllegalStateException("InstanceManager is shut down.");
}
Expand All @@ -174,20 +173,19 @@ public InstanceID registerTaskManager(

InstanceID instanceID = new InstanceID();

Instance host = new Instance(actorGateway, connectionInfo, resourceID, instanceID,
resources, numberOfSlots);
Instance host = new Instance(actorGateway, taskManagerLocation, instanceID, resources, numberOfSlots);

registeredHostsById.put(instanceID, host);
registeredHostsByConnection.put(taskManager, host);
registeredHostsByResource.put(resourceID, host);
registeredHostsByResource.put(taskManagerLocation.getResourceID(), host);

totalNumberOfAliveTaskSlots += numberOfSlots;

if (LOG.isInfoEnabled()) {
LOG.info(String.format("Registered TaskManager at %s (%s) as %s. " +
"Current number of registered hosts is %d. " +
"Current number of alive task slots is %d.",
connectionInfo.getHostname(),
taskManagerLocation.getHostname(),
taskManager.path(),
instanceID,
registeredHostsById.size(),
Expand Down Expand Up @@ -217,7 +215,7 @@ public void unregisterTaskManager(ActorRef instanceID, boolean terminated){

registeredHostsByConnection.remove(host);
registeredHostsById.remove(instance.getId());
registeredHostsByResource.remove(instance.getResourceId());
registeredHostsByResource.remove(instance.getTaskManagerID());

if (terminated) {
deadHosts.add(instance.getActorGateway().actor());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ protected SimpleSlot getFreeSlotForTask(ExecutionVertex vertex,

// if the instance has further available slots, re-add it to the set of available resources.
if (instanceToUse.hasResourcesAvailable()) {
this.instancesWithAvailableResources.put(instanceToUse.getResourceId(), instanceToUse);
this.instancesWithAvailableResources.put(instanceToUse.getTaskManagerID(), instanceToUse);
}

if (slot != null) {
Expand Down Expand Up @@ -425,7 +425,7 @@ protected SimpleSlot getNewSlotForSharingGroup(ExecutionVertex vertex,

// if the instance has further available slots, re-add it to the set of available resources.
if (instanceToUse.hasResourcesAvailable()) {
this.instancesWithAvailableResources.put(instanceToUse.getResourceId(), instanceToUse);
this.instancesWithAvailableResources.put(instanceToUse.getTaskManagerID(), instanceToUse);
}

if (sharedSlot != null) {
Expand Down Expand Up @@ -469,7 +469,7 @@ private Pair<Instance, Locality> findInstance(Iterable<TaskManagerLocation> requ
while (this.newlyAvailableInstances.size() > 0) {
Instance queuedInstance = this.newlyAvailableInstances.poll();
if (queuedInstance != null) {
this.instancesWithAvailableResources.put(queuedInstance.getResourceId(), queuedInstance);
this.instancesWithAvailableResources.put(queuedInstance.getTaskManagerID(), queuedInstance);
}
}

Expand Down Expand Up @@ -583,7 +583,7 @@ private void handleNewSlot() {
}
}
else {
this.instancesWithAvailableResources.put(instance.getResourceId(), instance);
this.instancesWithAvailableResources.put(instance.getTaskManagerID(), instance);
}
}
}
Expand Down Expand Up @@ -649,7 +649,7 @@ public void newInstanceAvailable(Instance instance) {
instance.setSlotAvailabilityListener(this);

// store the instance in the by-host-lookup
String instanceHostName = instance.getInstanceConnectionInfo().getHostname();
String instanceHostName = instance.getTaskManagerLocation().getHostname();
Set<Instance> instanceSet = allInstancesByHost.get(instanceHostName);
if (instanceSet == null) {
instanceSet = new HashSet<Instance>();
Expand All @@ -658,7 +658,7 @@ public void newInstanceAvailable(Instance instance) {
instanceSet.add(instance);

// add it to the available resources and let potential waiters know
this.instancesWithAvailableResources.put(instance.getResourceId(), instance);
this.instancesWithAvailableResources.put(instance.getTaskManagerID(), instance);

// add all slots as available
for (int i = 0; i < instance.getNumberOfAvailableSlots(); i++) {
Expand Down Expand Up @@ -693,9 +693,9 @@ private void removeInstance(Instance instance) {
}

allInstances.remove(instance);
instancesWithAvailableResources.remove(instance.getResourceId());
instancesWithAvailableResources.remove(instance.getTaskManagerID());

String instanceHostName = instance.getInstanceConnectionInfo().getHostname();
String instanceHostName = instance.getTaskManagerLocation().getHostname();
Set<Instance> instanceSet = allInstancesByHost.get(instanceHostName);
if (instanceSet != null) {
instanceSet.remove(instance);
Expand Down Expand Up @@ -795,7 +795,7 @@ private void processNewlyAvailableInstances() {

while ((instance = newlyAvailableInstances.poll()) != null) {
if (instance.hasResourcesAvailable()) {
instancesWithAvailableResources.put(instance.getResourceId(), instance);
instancesWithAvailableResources.put(instance.getTaskManagerID(), instance);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ class JobManager(
currentResourceManager = Option(msg.resourceManager())

val taskManagerResources = instanceManager.getAllRegisteredInstances.asScala.map(
instance => instance.getResourceId).toList.asJava
instance => instance.getTaskManagerID).toList.asJava

// confirm registration and send known task managers with their resource ids
sender ! decorateMessage(new RegisterResourceManagerSuccessful(self, taskManagerResources))
Expand Down Expand Up @@ -425,7 +425,6 @@ class JobManager(
try {
val instanceID = instanceManager.registerTaskManager(
taskManager,
resourceId,
connectionInfo,
hardwareInformation,
numberOfSlots,
Expand Down
Loading

0 comments on commit eac6088

Please sign in to comment.