Skip to content

Commit

Permalink
[FLINK-30678][tests] Use random port
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol authored Jan 19, 2023
1 parent 4fdb5c4 commit e705090
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,7 @@
import org.apache.flink.runtime.testutils.InternalMiniClusterExtension;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.NetUtils;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
Expand Down Expand Up @@ -88,8 +86,6 @@ class ClientTest {

private Plan plan;

private NetUtils.Port port;

private Configuration config;

private static final String TEST_EXECUTOR_NAME = "test_executor";
Expand All @@ -108,20 +104,11 @@ void setUp() {

config = new Configuration();
config.setString(JobManagerOptions.ADDRESS, "localhost");
port = NetUtils.getAvailablePort();
config.setInteger(JobManagerOptions.PORT, port.getPort());

config.set(
AkkaOptions.ASK_TIMEOUT_DURATION, AkkaOptions.ASK_TIMEOUT_DURATION.defaultValue());
}

@AfterEach
void tearDown() throws Exception {
if (port != null) {
port.close();
}
}

private Configuration fromPackagedProgram(
final PackagedProgram program, final int parallelism, final boolean detached) {
final Configuration configuration = new Configuration();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.TestLogger;

import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
Expand Down Expand Up @@ -255,13 +254,10 @@ public void testRequestUnavailableHost() throws Exception {

Client<KvStateInternalRequest, KvStateResponse> client = null;

try (NetUtils.Port port = NetUtils.getAvailablePort()) {
try {
client = new Client<>("Test Client", 1, serializer, stats);

int availablePort = port.getPort();

InetSocketAddress serverAddress =
new InetSocketAddress(InetAddress.getLocalHost(), availablePort);
InetSocketAddress serverAddress = new InetSocketAddress(InetAddress.getLocalHost(), 0);

KvStateInternalRequest request =
new KvStateInternalRequest(new KvStateID(), new byte[0]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.util.NetUtils;

import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap;
import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
Expand All @@ -48,11 +47,11 @@ public void testMatchingNumberOfArenasAndThreadsAsDefault() throws Exception {
// Expected number of arenas and threads
int numberOfSlots = 2;
NettyConnectionManager connectionManager;
try (NetUtils.Port port = NetUtils.getAvailablePort()) {
{
NettyConfig config =
new NettyConfig(
InetAddress.getLocalHost(),
port.getPort(),
0,
1024,
numberOfSlots,
new Configuration());
Expand Down Expand Up @@ -117,11 +116,9 @@ public void testManualConfiguration() throws Exception {
flinkConfig.setInteger(NettyShuffleEnvironmentOptions.NUM_THREADS_SERVER, 4);

NettyConnectionManager connectionManager;
try (NetUtils.Port port = NetUtils.getAvailablePort()) {

{
NettyConfig config =
new NettyConfig(
InetAddress.getLocalHost(), port.getPort(), 1024, 1337, flinkConfig);
new NettyConfig(InetAddress.getLocalHost(), 0, 1024, 1337, flinkConfig);

connectionManager = createNettyConnectionManager(config);
connectionManager.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.util.NetUtils;

import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel;
Expand Down Expand Up @@ -285,20 +284,16 @@ public void testAcknowledgeAllRecordsProcessed() throws Exception {
private NettyPartitionRequestClient createPartitionRequestClient(
Channel tcpChannel, NetworkClientHandler clientHandler, boolean connectionReuseEnabled)
throws Exception {
try (NetUtils.Port availablePort = NetUtils.getAvailablePort()) {
int port = availablePort.getPort();
ConnectionID connectionID =
new ConnectionID(
ResourceID.generate(), new InetSocketAddress("localhost", port), 0);
NettyConfig config =
new NettyConfig(InetAddress.getLocalHost(), port, 1024, 1, new Configuration());
NettyClient nettyClient = new NettyClient(config);
PartitionRequestClientFactory partitionRequestClientFactory =
new PartitionRequestClientFactory(nettyClient, connectionReuseEnabled);

return new NettyPartitionRequestClient(
tcpChannel, clientHandler, connectionID, partitionRequestClientFactory);
}
ConnectionID connectionID =
new ConnectionID(ResourceID.generate(), new InetSocketAddress("localhost", 0), 0);
NettyConfig config =
new NettyConfig(InetAddress.getLocalHost(), 0, 1024, 1, new Configuration());
NettyClient nettyClient = new NettyClient(config);
PartitionRequestClientFactory partitionRequestClientFactory =
new PartitionRequestClientFactory(nettyClient, connectionReuseEnabled);

return new NettyPartitionRequestClient(
tcpChannel, clientHandler, connectionID, partitionRequestClientFactory);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.Reference;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.TimeUtils;
Expand Down Expand Up @@ -2244,31 +2243,26 @@ public void testDisconnectFromJobMasterWhenNewLeader() throws Exception {

@Test(timeout = 10000L)
public void testLogNotFoundHandling() throws Throwable {
try (NetUtils.Port port = NetUtils.getAvailablePort()) {
int dataPort = port.getPort();

configuration.setInteger(NettyShuffleEnvironmentOptions.DATA_PORT, dataPort);
configuration.setInteger(
NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
configuration.setInteger(
NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
configuration.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, "/i/dont/exist");

try (TaskSubmissionTestEnvironment env =
new Builder(jobId)
.setConfiguration(configuration)
.setLocalCommunication(false)
.build(EXECUTOR_RESOURCE.getExecutor())) {
TaskExecutorGateway tmGateway = env.getTaskExecutorGateway();
try {
CompletableFuture<TransientBlobKey> logFuture =
tmGateway.requestFileUploadByType(FileType.LOG, timeout);
logFuture.get();
} catch (Exception e) {
assertThat(
e.getMessage(),
containsString("The file LOG does not exist on the TaskExecutor."));
}
configuration.setInteger(NettyShuffleEnvironmentOptions.DATA_PORT, 0);
configuration.setInteger(
NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
configuration.setInteger(NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
configuration.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, "/i/dont/exist");

try (TaskSubmissionTestEnvironment env =
new Builder(jobId)
.setConfiguration(configuration)
.setLocalCommunication(false)
.build(EXECUTOR_RESOURCE.getExecutor())) {
TaskExecutorGateway tmGateway = env.getTaskExecutorGateway();
try {
CompletableFuture<TransientBlobKey> logFuture =
tmGateway.requestFileUploadByType(FileType.LOG, timeout);
logFuture.get();
} catch (Exception e) {
assertThat(
e.getMessage(),
containsString("The file LOG does not exist on the TaskExecutor."));
}
}
}
Expand Down

0 comments on commit e705090

Please sign in to comment.