Skip to content

Commit

Permalink
[FLINK-6064][flip6] fix BlobServer connection in TaskExecutor
Browse files Browse the repository at this point in the history
The hostname used for the BlobServer was set to the akka address which is
invalid for this use. Instead, this adds the hostname to the RpcGateway /
AkkaInvocationHandler so that this information is available to the TaskExecutor.

This closes apache#3551.
  • Loading branch information
Nico Kruber authored and tillrohrmann committed Mar 16, 2017
1 parent 264f6df commit 97f1788
Show file tree
Hide file tree
Showing 8 changed files with 62 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,14 @@ public interface RpcGateway {
/**
* Returns the fully qualified address under which the associated rpc endpoint is reachable.
*
* @return Fully qualified address under which the associated rpc endpoint is reachable
* @return Fully qualified (RPC) address under which the associated rpc endpoint is reachable
*/
String getAddress();

/**
* Returns the fully qualified hostname under which the associated rpc endpoint is reachable.
*
* @return Fully qualified hostname under which the associated rpc endpoint is reachable
*/
String getHostname();
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,17 @@
class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThreadExecutable, StartStoppable, SelfGateway {
private static final Logger LOG = LoggerFactory.getLogger(AkkaInvocationHandler.class);

/**
* The Akka (RPC) address of {@link #rpcEndpoint} including host and port of the ActorSystem in
* which the actor is running.
*/
private final String address;

/**
* Hostname of the host, {@link #rpcEndpoint} is running on.
*/
private final String hostname;

private final ActorRef rpcEndpoint;

// whether the actor ref is local and thus no message serialization is needed
Expand All @@ -74,12 +83,14 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThrea

AkkaInvocationHandler(
String address,
String hostname,
ActorRef rpcEndpoint,
Time timeout,
long maximumFramesize,
Future<Void> terminationFuture) {

this.address = Preconditions.checkNotNull(address);
this.hostname = Preconditions.checkNotNull(hostname);
this.rpcEndpoint = Preconditions.checkNotNull(rpcEndpoint);
this.isLocal = this.rpcEndpoint.path().address().hasLocalScope();
this.timeout = Preconditions.checkNotNull(timeout);
Expand Down Expand Up @@ -313,6 +324,11 @@ public String getAddress() {
return address;
}

@Override
public String getHostname() {
return hostname;
}

@Override
public Future<Void> getTerminationFuture() {
return terminationFuture;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import akka.actor.Props;
import akka.dispatch.Futures;
import akka.dispatch.Mapper;

import akka.pattern.Patterns;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.akka.AkkaUtils;
Expand All @@ -39,16 +38,16 @@
import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
import org.apache.flink.runtime.rpc.MainThreadExecutable;
import org.apache.flink.runtime.rpc.SelfGateway;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.SelfGateway;
import org.apache.flink.runtime.rpc.StartStoppable;
import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import scala.Option;
import scala.concurrent.duration.FiniteDuration;

import javax.annotation.Nonnull;
Expand Down Expand Up @@ -143,9 +142,17 @@ public C checkedApply(Object obj) throws Exception {
ActorRef actorRef = actorIdentity.getRef();

final String address = AkkaUtils.getAkkaURL(actorSystem, actorRef);
final String hostname;
Option<String> host = actorRef.path().address().host();
if (host.isEmpty()) {
hostname = "localhost";
} else {
hostname = host.get();
}

InvocationHandler akkaInvocationHandler = new AkkaInvocationHandler(
address,
hostname,
actorRef,
timeout,
maximumFramesize,
Expand Down Expand Up @@ -187,9 +194,17 @@ public <C extends RpcGateway, S extends RpcEndpoint<C>> C startServer(S rpcEndpo
LOG.info("Starting RPC endpoint for {} at {} .", rpcEndpoint.getClass().getName(), actorRef.path());

final String address = AkkaUtils.getAkkaURL(actorSystem, actorRef);
final String hostname;
Option<String> host = actorRef.path().address().host();
if (host.isEmpty()) {
hostname = "localhost";
} else {
hostname = host.get();
}

InvocationHandler akkaInvocationHandler = new AkkaInvocationHandler(
address,
hostname,
actorRef,
timeout,
maximumFramesize,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -788,17 +788,17 @@ private void closeJobManagerConnection(JobID jobId) {
private JobManagerConnection associateWithJobManager(JobMasterGateway jobMasterGateway, UUID jobManagerLeaderId, int blobPort) {
Preconditions.checkNotNull(jobManagerLeaderId);
Preconditions.checkNotNull(jobMasterGateway);
Preconditions.checkArgument(blobPort > 0 || blobPort < MAX_BLOB_PORT, "Blob port is out of range.");
Preconditions.checkArgument(blobPort > 0 || blobPort < MAX_BLOB_PORT, "Blob server port is out of range.");

TaskManagerActions taskManagerActions = new TaskManagerActionsImpl(jobManagerLeaderId, jobMasterGateway);

CheckpointResponder checkpointResponder = new RpcCheckpointResponder(jobMasterGateway);

InetSocketAddress address = new InetSocketAddress(jobMasterGateway.getAddress(), blobPort);
InetSocketAddress blobServerAddress = new InetSocketAddress(jobMasterGateway.getHostname(), blobPort);

final LibraryCacheManager libraryCacheManager;
try {
final BlobCache blobCache = new BlobCache(address, taskManagerConfiguration.getConfiguration(), haServices);
final BlobCache blobCache = new BlobCache(blobServerAddress, taskManagerConfiguration.getConfiguration(), haServices);
libraryCacheManager = new BlobLibraryCacheManager(
blobCache,
taskManagerConfiguration.getCleanupInterval());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ public String getAddress() {
return address;
}

@Override
public String getHostname() {
return address;
}

// ------------------------------------------------------------------------
// utilities
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,13 @@ public String getAddress() {
return address;
}

// this is not a real hostname but the address above is also not a real akka RPC address
// and we keep it that way until actually needed by a test case
@Override
public String getHostname() {
return address;
}

@Override
public void start() {
// do nothing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public void testSlotAllocation() throws Exception {

when(jmGateway.registerTaskManager(any(String.class), any(TaskManagerLocation.class), eq(jmLeaderId), any(Time.class)))
.thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new JMTMRegistrationSuccess(taskManagerResourceId, 1234)));
when(jmGateway.getAddress()).thenReturn(jmAddress);
when(jmGateway.getHostname()).thenReturn(jmAddress);


rpcService.registerGateway(rmAddress, resourceManager.getSelf());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ public void testJobLeaderDetection() throws Exception {
eq(jobManagerLeaderId),
any(Time.class)
)).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new JMTMRegistrationSuccess(jmResourceId, blobPort)));
when(jobMasterGateway.getAddress()).thenReturn(jobManagerAddress);
when(jobMasterGateway.getHostname()).thenReturn(jobManagerAddress);

rpc.registerGateway(resourceManagerAddress, resourceManagerGateway);
rpc.registerGateway(jobManagerAddress, jobMasterGateway);
Expand Down Expand Up @@ -551,7 +551,7 @@ public void testSlotAcceptance() throws Exception {
eq(jobManagerLeaderId),
any(Time.class)
)).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new JMTMRegistrationSuccess(jmResourceId, blobPort)));
when(jobMasterGateway.getAddress()).thenReturn(jobManagerAddress);
when(jobMasterGateway.getHostname()).thenReturn(jobManagerAddress);

when(jobMasterGateway.offerSlots(
any(ResourceID.class), any(Iterable.class), eq(jobManagerLeaderId), any(Time.class)))
Expand Down Expand Up @@ -754,7 +754,7 @@ public void testSubmitTaskBeforeAcceptSlot() throws Exception {
eq(jobManagerLeaderId),
any(Time.class)
)).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new JMTMRegistrationSuccess(jmResourceId, blobPort)));
when(jobMasterGateway.getAddress()).thenReturn(jobManagerAddress);
when(jobMasterGateway.getHostname()).thenReturn(jobManagerAddress);


rpc.registerGateway(resourceManagerAddress, resourceManagerGateway);
Expand Down

0 comments on commit 97f1788

Please sign in to comment.