Skip to content

Commit

Permalink
[FLINK-4490] [distributed coordination] (part 1) Change InstanceConne…
Browse files Browse the repository at this point in the history
…ctionInfo to TaskManagerLocation

This adds the ResourceId to the TaskManagerLocation
  • Loading branch information
StephanEwen committed Sep 2, 2016
1 parent e227b10 commit 34cda87
Show file tree
Hide file tree
Showing 31 changed files with 276 additions and 289 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,19 @@
package org.apache.flink.mesos.runtime.clusterframework

import org.apache.flink.runtime.clusterframework.types.ResourceID
import org.apache.flink.runtime.instance.InstanceConnectionInfo
import org.apache.flink.runtime.io.disk.iomanager.IOManager
import org.apache.flink.runtime.io.network.NetworkEnvironment
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
import org.apache.flink.runtime.memory.MemoryManager
import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerConfiguration}
import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerConfiguration, TaskManagerLocation}

/** An extension of the TaskManager that listens for additional Mesos-related
* messages.
*/
class MesosTaskManager(
config: TaskManagerConfiguration,
resourceID: ResourceID,
connectionInfo: InstanceConnectionInfo,
taskManagerLocation: TaskManagerLocation,
memoryManager: MemoryManager,
ioManager: IOManager,
network: NetworkEnvironment,
Expand All @@ -41,7 +40,7 @@ class MesosTaskManager(
extends TaskManager(
config,
resourceID,
connectionInfo,
taskManagerLocation,
memoryManager,
ioManager,
network,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.instance.InstanceConnectionInfo;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
import org.apache.flink.util.ExceptionUtils;

Expand Down Expand Up @@ -66,7 +66,7 @@ public String handleRequest(ExecutionGraph graph, Map<String, String> params) th
break;
}

InstanceConnectionInfo location = task.getCurrentAssignedResourceLocation();
TaskManagerLocation location = task.getCurrentAssignedResourceLocation();
String locationString = location != null ?
location.getFQDNHostname() + ':' + location.dataPort() : "(unassigned)";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.instance.InstanceConnectionInfo;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;

import java.io.StringWriter;
Expand Down Expand Up @@ -61,7 +61,7 @@ public String handleRequest(ExecutionJobVertex jobVertex, Map<String, String> pa
for (ExecutionVertex vertex : jobVertex.getTaskVertices()) {
final ExecutionState status = vertex.getExecutionState();

InstanceConnectionInfo location = vertex.getCurrentAssignedResourceLocation();
TaskManagerLocation location = vertex.getCurrentAssignedResourceLocation();
String locationString = location == null ? "(unassigned)" : location.getHostname() + ":" + location.dataPort();

long startTime = vertex.getStateTimestamp(ExecutionState.DEPLOYING);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.instance.InstanceConnectionInfo;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;

import java.io.StringWriter;
Expand All @@ -51,7 +51,7 @@ public String handleRequest(ExecutionJobVertex jobVertex, Map<String, String> pa
Map<String, List<ExecutionVertex>> taskManagerVertices = new HashMap<>();

for (ExecutionVertex vertex : jobVertex.getTaskVertices()) {
InstanceConnectionInfo location = vertex.getCurrentAssignedResourceLocation();
TaskManagerLocation location = vertex.getCurrentAssignedResourceLocation();
String taskManager = location == null ? "(unassigned)" : location.getHostname() + ":" + location.dataPort();

List<ExecutionVertex> vertices = taskManagerVertices.get(taskManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.instance.InstanceConnectionInfo;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;

import java.io.StringWriter;
Expand All @@ -45,7 +45,7 @@ public String handleRequest(Execution execAttempt, Map<String, String> params) t
final ExecutionState status = execAttempt.getState();
final long now = System.currentTimeMillis();

InstanceConnectionInfo location = execAttempt.getAssignedResourceLocation();
TaskManagerLocation location = execAttempt.getAssignedResourceLocation();
String locationString = location == null ? "(unassigned)" : location.getHostname();

long startTime = execAttempt.getStateTimestamp(ExecutionState.DEPLOYING);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.instance.InstanceConnectionInfo;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;

import java.io.StringWriter;
Expand Down Expand Up @@ -52,7 +52,7 @@ public String handleRequest(ExecutionJobVertex jobVertex, Map<String, String> pa
int num = 0;
for (ExecutionVertex vertex : jobVertex.getTaskVertices()) {

InstanceConnectionInfo location = vertex.getCurrentAssignedResourceLocation();
TaskManagerLocation location = vertex.getCurrentAssignedResourceLocation();
String locationString = location == null ? "(unassigned)" : location.getHostname();

gen.writeStartObject();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.instance.InstanceConnectionInfo;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;

import java.io.StringWriter;
Expand Down Expand Up @@ -70,7 +70,7 @@ public String handleRequest(ExecutionJobVertex jobVertex, Map<String, String> pa
gen.writeStartObject();
gen.writeNumberField("subtask", num++);

InstanceConnectionInfo location = vertex.getCurrentAssignedResourceLocation();
TaskManagerLocation location = vertex.getCurrentAssignedResourceLocation();
String locationString = location == null ? "(unassigned)" : location.getHostname();
gen.writeStringField("host", locationString);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.IntermediateResult;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
import org.apache.flink.runtime.instance.InstanceConnectionInfo;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
Expand All @@ -44,15 +44,15 @@ public class PartialInputChannelDeploymentDescriptor {
private final ResultPartitionID partitionID;

/** The partition connection info. */
private final InstanceConnectionInfo partitionConnectionInfo;
private final TaskManagerLocation partitionConnectionInfo;

/** The partition connection index. */
private final int partitionConnectionIndex;

public PartialInputChannelDeploymentDescriptor(
IntermediateDataSetID resultId,
ResultPartitionID partitionID,
InstanceConnectionInfo partitionConnectionInfo,
TaskManagerLocation partitionConnectionInfo,
int partitionConnectionIndex) {

this.resultId = checkNotNull(resultId);
Expand All @@ -71,7 +71,7 @@ public InputChannelDeploymentDescriptor createInputChannelDeploymentDescriptor(

checkNotNull(consumerExecution, "Consumer execution null");

InstanceConnectionInfo consumerConnectionInfo = consumerExecution.getAssignedResourceLocation();
TaskManagerLocation consumerConnectionInfo = consumerExecution.getAssignedResourceLocation();

checkNotNull(consumerConnectionInfo, "Consumer connection info null");

Expand Down Expand Up @@ -107,7 +107,7 @@ public static PartialInputChannelDeploymentDescriptor fromEdge(
final IntermediateResult result = partition.getIntermediateResult();

final IntermediateDataSetID resultId = result.getId();
final InstanceConnectionInfo partitionConnectionInfo = producer.getAssignedResourceLocation();
final TaskManagerLocation partitionConnectionInfo = producer.getAssignedResourceLocation();
final int partitionConnectionIndex = result.getConnectionIndex();

return new PartialInputChannelDeploymentDescriptor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.InstanceConnectionInfo;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.io.network.ConnectionID;
Expand Down Expand Up @@ -133,7 +133,7 @@ public class Execution {

private volatile Throwable failureCause; // once assigned, never changes

private volatile InstanceConnectionInfo assignedResourceLocation; // for the archived execution
private volatile TaskManagerLocation assignedResourceLocation; // for the archived execution

private ChainedStateHandle<StreamStateHandle> chainedStateHandle;

Expand All @@ -147,7 +147,7 @@ public class Execution {

/* Lock for updating the accumulators atomically. Prevents final accumulators to be overwritten
* by partial accumulators on a late heartbeat*/
private final SerializableObject accumulatorLock = new SerializableObject();
private final Object accumulatorLock = new Object();

/* Continuously updated map of user-defined accumulators */
private volatile Map<String, Accumulator<?, ?>> userAccumulators;
Expand Down Expand Up @@ -202,7 +202,7 @@ public SimpleSlot getAssignedResource() {
return assignedResource;
}

public InstanceConnectionInfo getAssignedResourceLocation() {
public TaskManagerLocation getAssignedResourceLocation() {
return assignedResourceLocation;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.InstanceConnectionInfo;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
Expand Down Expand Up @@ -228,7 +228,7 @@ public SimpleSlot getCurrentAssignedResource() {
return currentExecution.getAssignedResource();
}

public InstanceConnectionInfo getCurrentAssignedResourceLocation() {
public TaskManagerLocation getCurrentAssignedResourceLocation() {
return currentExecution.getAssignedResourceLocation();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.jobmanager.scheduler.SlotAvailabilityListener;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -46,7 +47,7 @@ public class Instance {
private final ActorGateway actorGateway;

/** The instance connection information for the data transfer. */
private final InstanceConnectionInfo connectionInfo;
private final TaskManagerLocation connectionInfo;

/** A description of the resources of the task manager */
private final HardwareDescription resources;
Expand Down Expand Up @@ -92,7 +93,7 @@ public class Instance {
*/
public Instance(
ActorGateway actorGateway,
InstanceConnectionInfo connectionInfo,
TaskManagerLocation connectionInfo,
ResourceID resourceId,
InstanceID id,
HardwareDescription resources,
Expand Down Expand Up @@ -350,7 +351,7 @@ public ActorGateway getActorGateway() {
return actorGateway;
}

public InstanceConnectionInfo getInstanceConnectionInfo() {
public TaskManagerLocation getInstanceConnectionInfo() {
return connectionInfo;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import akka.actor.ActorRef;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -147,7 +148,7 @@ public boolean reportHeartBeat(InstanceID instanceId, byte[] lastMetricsReport)
public InstanceID registerTaskManager(
ActorRef taskManager,
ResourceID resourceID,
InstanceConnectionInfo connectionInfo,
TaskManagerLocation connectionInfo,
HardwareDescription resources,
int numberOfSlots,
UUID leaderSessionID){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.flink.runtime.io.network;

import org.apache.flink.runtime.executiongraph.IntermediateResult;
import org.apache.flink.runtime.instance.InstanceConnectionInfo;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;

import java.io.Serializable;
import java.net.InetSocketAddress;
Expand All @@ -43,7 +43,7 @@ public class ConnectionID implements Serializable {

private final int connectionIndex;

public ConnectionID(InstanceConnectionInfo connectionInfo, int connectionIndex) {
public ConnectionID(TaskManagerLocation connectionInfo, int connectionIndex) {
this(new InetSocketAddress(connectionInfo.address(), connectionInfo.dataPort()), connectionIndex);
}

Expand Down
Loading

0 comments on commit 34cda87

Please sign in to comment.