From 6de6ed48532a31a7ba90338ce7e5c2bc61d0ae11 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Thu, 21 Jun 2018 14:57:45 +0200 Subject: [PATCH] [hotfix] Introduce builder for MiniClusterResourceConfiguration --- .../apache/flink/client/LocalExecutor.java | 3 +- .../connectors/fs/RollingSinkITCase.java | 9 +- .../fs/RollingSinkSecuredITCase.java | 11 +- .../kafka/KafkaShortRetentionTestBase.java | 10 +- .../connectors/kafka/KafkaTestBase.java | 10 +- .../TopSpeedWindowingExampleITCase.java | 10 +- .../hdfstests/DistributedCacheDfsTest.java | 9 +- .../apache/flink/ml/util/FlinkTestBase.scala | 10 +- .../gateway/local/LocalExecutorITCase.java | 10 +- .../jobmanager/JMXJobManagerMetricTest.java | 13 +- .../HAQueryableStateFsBackendITCase.java | 10 +- .../HAQueryableStateRocksDBBackendITCase.java | 10 +- .../NonHAQueryableStateFsBackendITCase.java | 10 +- ...nHAQueryableStateRocksDBBackendITCase.java | 10 +- .../runtime/webmonitor/WebFrontendITCase.java | 16 +- .../handlers/JarRunHandlerTest.java | 14 +- .../webmonitor/history/HistoryServerTest.java | 14 +- .../runtime/minicluster/MiniCluster.java | 6 +- .../minicluster/MiniClusterConfiguration.java | 11 +- .../minicluster/RpcServiceSharing.java | 28 ++++ .../minicluster/MiniClusterITCase.java | 4 +- .../flink/api/scala/ScalaShellITCase.scala | 6 +- .../scala/ScalaShellLocalStartupITCase.scala | 6 +- .../flink/test/util/AbstractTestBase.java | 9 +- .../flink/test/util/MiniClusterResource.java | 103 ++----------- .../MiniClusterResourceConfiguration.java | 138 ++++++++++++++++++ .../apache/flink/test/util/TestBaseUtils.java | 25 ++++ .../accumulators/AccumulatorErrorITCase.java | 9 +- .../accumulators/AccumulatorLiveITCase.java | 10 +- .../test/cancelling/CancelingTestBase.java | 10 +- ...actEventTimeWindowCheckpointingITCase.java | 10 +- ...EventTimeAllWindowCheckpointingITCase.java | 10 +- .../KeyedStateCheckpointingITCase.java | 10 +- .../test/checkpointing/RescalingITCase.java | 10 +- .../ResumeCheckpointManuallyITCase.java | 10 +- .../test/checkpointing/SavepointITCase.java | 51 +++---- .../StreamFaultToleranceTestBase.java | 10 +- .../WindowCheckpointingITCase.java | 10 +- .../ZooKeeperHighAvailabilityITCase.java | 10 +- .../utils/SavepointMigrationTestBase.java | 10 +- .../example/client/JobRetrievalITCase.java | 13 +- .../failing/JobSubmissionFailsITCase.java | 10 +- .../StreamingScalabilityAndLatency.java | 10 +- .../test/misc/AutoParallelismITCase.java | 15 +- .../test/misc/CustomSerializationITCase.java | 10 +- .../test/misc/MiscellaneousIssuesITCase.java | 9 +- ...ccessAfterNetworkBuffersFailureITCase.java | 10 +- .../operators/CustomDistributionITCase.java | 10 +- ...mpleRecoveryFailureRateStrategyITBase.java | 9 +- ...coveryFixedDelayRestartStrategyITBase.java | 10 +- .../test/runtime/IPv6HostnamesITCase.java | 10 +- .../flink/test/runtime/NettyEpollITCase.java | 11 +- .../runtime/NetworkStackThroughputITCase.java | 11 +- .../AbstractOperatorRestoreTestBase.java | 11 +- .../streaming/runtime/TimestampITCase.java | 10 +- .../org/apache/flink/yarn/YarnTestBase.java | 8 +- 56 files changed, 509 insertions(+), 343 deletions(-) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/RpcServiceSharing.java create mode 100644 flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResourceConfiguration.java diff --git a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java index f837c4f5be9fd..4e4993a88db11 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java +++ b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java @@ -40,6 +40,7 @@ import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.runtime.minicluster.MiniCluster; import org.apache.flink.runtime.minicluster.MiniClusterConfiguration; +import org.apache.flink.runtime.minicluster.RpcServiceSharing; import java.util.List; @@ -137,7 +138,7 @@ private JobExecutorService createJobExecutorService(Configuration configuration) configuration.getInteger( ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, ConfigConstants.DEFAULT_LOCAL_NUMBER_TASK_MANAGER)) - .setRpcServiceSharing(MiniClusterConfiguration.RpcServiceSharing.SHARED) + .setRpcServiceSharing(RpcServiceSharing.SHARED) .setNumSlotsPerTaskManager( configuration.getInteger( TaskManagerOptions.NUM_TASK_SLOTS, 1)) diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java index 78b53d8de5278..93f6d5225927a 100644 --- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java +++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java @@ -33,6 +33,7 @@ import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.test.util.MiniClusterResourceConfiguration; import org.apache.flink.util.Collector; import org.apache.flink.util.NetUtils; import org.apache.flink.util.TestLogger; @@ -118,10 +119,10 @@ public static void setup() throws Exception { + "/"; miniClusterResource = new MiniClusterResource( - new MiniClusterResource.MiniClusterResourceConfiguration( - new org.apache.flink.configuration.Configuration(), - 1, - 4)); + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(4) + .build()); miniClusterResource.before(); } diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java index b9564ee32d566..7296269694505 100644 --- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java +++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java @@ -26,6 +26,7 @@ import org.apache.flink.runtime.security.SecurityConfiguration; import org.apache.flink.runtime.security.modules.HadoopModule; import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.test.util.MiniClusterResourceConfiguration; import org.apache.flink.test.util.SecureTestEnvironment; import org.apache.flink.test.util.TestBaseUtils; import org.apache.flink.test.util.TestingSecurityContext; @@ -147,10 +148,12 @@ public static void setup() throws Exception { Configuration configuration = startSecureFlinkClusterWithRecoveryModeEnabled(); - miniClusterResource = new MiniClusterResource(new MiniClusterResource.MiniClusterResourceConfiguration( - configuration, - 1, - 4)); + miniClusterResource = new MiniClusterResource( + new MiniClusterResourceConfiguration.Builder() + .setConfiguration(configuration) + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(4) + .build()); miniClusterResource.before(); } diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java index f48832553c34e..1e3e1fc418e29 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java @@ -32,6 +32,7 @@ import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.test.util.MiniClusterResourceConfiguration; import org.apache.flink.util.InstantiationUtil; import org.junit.AfterClass; @@ -70,10 +71,11 @@ public class KafkaShortRetentionTestBase implements Serializable { @ClassRule public static MiniClusterResource flink = new MiniClusterResource( - new MiniClusterResource.MiniClusterResourceConfiguration( - getConfiguration(), - NUM_TMS, - TM_SLOTS)); + new MiniClusterResourceConfiguration.Builder() + .setConfiguration(getConfiguration()) + .setNumberTaskManagers(NUM_TMS) + .setNumberSlotsPerTaskManager(TM_SLOTS) + .build()); @ClassRule public static TemporaryFolder tempFolder = new TemporaryFolder(); diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java index 697e075353758..806c90c5e03a6 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java @@ -27,6 +27,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.util.TestStreamEnvironment; import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.test.util.MiniClusterResourceConfiguration; import org.apache.flink.test.util.SuccessException; import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.TestLogger; @@ -81,10 +82,11 @@ public abstract class KafkaTestBase extends TestLogger { @ClassRule public static MiniClusterResource flink = new MiniClusterResource( - new MiniClusterResource.MiniClusterResourceConfiguration( - getFlinkConfiguration(), - NUM_TMS, - TM_SLOTS), + new MiniClusterResourceConfiguration.Builder() + .setConfiguration(getFlinkConfiguration()) + .setNumberTaskManagers(NUM_TMS) + .setNumberSlotsPerTaskManager(TM_SLOTS) + .build(), true); protected static FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS); diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/TopSpeedWindowingExampleITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/TopSpeedWindowingExampleITCase.java index 320dd5faa8ebe..ab8d6b99b4e4f 100644 --- a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/TopSpeedWindowingExampleITCase.java +++ b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/TopSpeedWindowingExampleITCase.java @@ -17,10 +17,10 @@ package org.apache.flink.streaming.test.examples.windowing; -import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.examples.windowing.TopSpeedWindowing; import org.apache.flink.streaming.examples.windowing.util.TopSpeedWindowingExampleData; import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.test.util.MiniClusterResourceConfiguration; import org.apache.flink.util.FileUtils; import org.apache.flink.util.TestLogger; @@ -42,10 +42,10 @@ public class TopSpeedWindowingExampleITCase extends TestLogger { @ClassRule public static MiniClusterResource miniClusterResource = new MiniClusterResource( - new MiniClusterResource.MiniClusterResourceConfiguration( - new Configuration(), - 1, - 1)); + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(1) + .build()); @Test public void testTopSpeedWindowingExampleITCase() throws Exception { diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/DistributedCacheDfsTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/DistributedCacheDfsTest.java index 5d6a30f1fa04e..ed4ec947b6c61 100644 --- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/DistributedCacheDfsTest.java +++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/DistributedCacheDfsTest.java @@ -24,6 +24,7 @@ import org.apache.flink.core.fs.Path; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.test.util.MiniClusterResourceConfiguration; import org.apache.flink.util.NetUtils; import org.apache.hadoop.conf.Configuration; @@ -69,10 +70,10 @@ public class DistributedCacheDfsTest { @ClassRule public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource( - new MiniClusterResource.MiniClusterResourceConfiguration( - new org.apache.flink.configuration.Configuration(), - 1, - 1)); + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(1) + .build()); private static MiniDFSCluster hdfsCluster; private static Configuration conf = new Configuration(); diff --git a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/util/FlinkTestBase.scala b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/util/FlinkTestBase.scala index 21651523a7bde..194b657ec6265 100644 --- a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/util/FlinkTestBase.scala +++ b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/util/FlinkTestBase.scala @@ -18,9 +18,7 @@ package org.apache.flink.ml.util -import org.apache.flink.configuration.Configuration -import org.apache.flink.test.util.MiniClusterResource -import org.apache.flink.test.util.MiniClusterResource.MiniClusterResourceConfiguration +import org.apache.flink.test.util.{MiniClusterResource, MiniClusterResourceConfiguration} import org.scalatest.{BeforeAndAfter, Suite} /** Mixin to start and stop a LocalFlinkMiniCluster automatically for Scala based tests. @@ -57,8 +55,10 @@ trait FlinkTestBase extends BeforeAndAfter { before { val cl = new MiniClusterResource( - new MiniClusterResourceConfiguration(new Configuration(), 1, parallelism) - ) + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(parallelism) + .build()) cl.before() diff --git a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java index 64b526de8310c..ad2d509506bce 100644 --- a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java +++ b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java @@ -36,6 +36,7 @@ import org.apache.flink.table.client.gateway.TypedResult; import org.apache.flink.table.client.gateway.utils.EnvironmentFileUtil; import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.test.util.MiniClusterResourceConfiguration; import org.apache.flink.test.util.TestBaseUtils; import org.apache.flink.types.Row; import org.apache.flink.util.TestLogger; @@ -71,10 +72,11 @@ public class LocalExecutorITCase extends TestLogger { @ClassRule public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource( - new MiniClusterResource.MiniClusterResourceConfiguration( - getConfig(), - NUM_TMS, - NUM_SLOTS_PER_TM), + new MiniClusterResourceConfiguration.Builder() + .setConfiguration(getConfig()) + .setNumberTaskManagers(NUM_TMS) + .setNumberSlotsPerTaskManager(NUM_SLOTS_PER_TM) + .build(), true); private static ClusterClient clusterClient; diff --git a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java index c00b5d357cb29..90ba6d46f54b1 100644 --- a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java +++ b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java @@ -38,6 +38,8 @@ import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.test.util.MiniClusterResourceConfiguration; +import org.apache.flink.util.TestLogger; import org.junit.Assert; import org.junit.ClassRule; @@ -57,14 +59,15 @@ /** * Tests to verify JMX reporter functionality on the JobManager. */ -public class JMXJobManagerMetricTest { +public class JMXJobManagerMetricTest extends TestLogger { @ClassRule public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource( - new MiniClusterResource.MiniClusterResourceConfiguration( - getConfiguration(), - 1, - 1), + new MiniClusterResourceConfiguration.Builder() + .setConfiguration(getConfiguration()) + .setNumberSlotsPerTaskManager(1) + .setNumberTaskManagers(1) + .build(), true); private static Configuration getConfiguration() { diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java index a47045f35a8c5..f977a8e9e9da8 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.test.util.MiniClusterResourceConfiguration; import org.apache.curator.test.TestingServer; import org.junit.AfterClass; @@ -68,10 +69,11 @@ public static void setup() throws Exception { // we have to manage this manually because we have to create the ZooKeeper server // ahead of this miniClusterResource = new MiniClusterResource( - new MiniClusterResource.MiniClusterResourceConfiguration( - getConfig(), - NUM_TMS, - NUM_SLOTS_PER_TM), + new MiniClusterResourceConfiguration.Builder() + .setConfiguration(getConfig()) + .setNumberTaskManagers(NUM_TMS) + .setNumberSlotsPerTaskManager(NUM_SLOTS_PER_TM) + .build(), true); miniClusterResource.before(); diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java index b1092c1416702..25f56d7ec6669 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java @@ -28,6 +28,7 @@ import org.apache.flink.queryablestate.client.QueryableStateClient; import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.test.util.MiniClusterResourceConfiguration; import org.apache.curator.test.TestingServer; import org.junit.AfterClass; @@ -68,10 +69,11 @@ public static void setup() throws Exception { // we have to manage this manually because we have to create the ZooKeeper server // ahead of this miniClusterResource = new MiniClusterResource( - new MiniClusterResource.MiniClusterResourceConfiguration( - getConfig(), - NUM_TMS, - NUM_SLOTS_PER_TM), + new MiniClusterResourceConfiguration.Builder() + .setConfiguration(getConfig()) + .setNumberTaskManagers(NUM_TMS) + .setNumberSlotsPerTaskManager(NUM_SLOTS_PER_TM) + .build(), true); miniClusterResource.before(); diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java index eb300c12e4cab..0d61e0dfe3c83 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java @@ -27,6 +27,7 @@ import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.test.util.MiniClusterResourceConfiguration; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -52,10 +53,11 @@ public class NonHAQueryableStateFsBackendITCase extends AbstractQueryableStateTe @ClassRule public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource( - new MiniClusterResource.MiniClusterResourceConfiguration( - getConfig(), - NUM_TMS, - NUM_SLOTS_PER_TM), + new MiniClusterResourceConfiguration.Builder() + .setConfiguration(getConfig()) + .setNumberTaskManagers(NUM_TMS) + .setNumberSlotsPerTaskManager(NUM_SLOTS_PER_TM) + .build(), true); @Override diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java index 3d6a3e3fcfdc0..002ff9fc3c9e8 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java @@ -27,6 +27,7 @@ import org.apache.flink.queryablestate.client.QueryableStateClient; import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.test.util.MiniClusterResourceConfiguration; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -52,10 +53,11 @@ public class NonHAQueryableStateRocksDBBackendITCase extends AbstractQueryableSt @ClassRule public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource( - new MiniClusterResource.MiniClusterResourceConfiguration( - getConfig(), - NUM_TMS, - NUM_SLOTS_PER_TM), + new MiniClusterResourceConfiguration.Builder() + .setConfiguration(getConfig()) + .setNumberTaskManagers(NUM_TMS) + .setNumberSlotsPerTaskManager(NUM_SLOTS_PER_TM) + .build(), true); @Override diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java index 994966e8aa411..776a267833f54 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java @@ -32,6 +32,7 @@ import org.apache.flink.runtime.jobgraph.tasks.StoppableTask; import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient; import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.test.util.MiniClusterResourceConfiguration; import org.apache.flink.test.util.TestBaseUtils; import org.apache.flink.util.TestLogger; @@ -80,10 +81,11 @@ public class WebFrontendITCase extends TestLogger { @ClassRule public static final MiniClusterResource CLUSTER = new MiniClusterResource( - new MiniClusterResource.MiniClusterResourceConfiguration( - CLUSTER_CONFIGURATION, - NUM_TASK_MANAGERS, - NUM_SLOTS), + new MiniClusterResourceConfiguration.Builder() + .setConfiguration(CLUSTER_CONFIGURATION) + .setNumberTaskManagers(NUM_TASK_MANAGERS) + .setNumberSlotsPerTaskManager(NUM_SLOTS) + .build(), true ); @@ -153,7 +155,7 @@ public void testResponseHeaders() throws Exception { if (notFoundJobConnection.getResponseCode() >= 400) { // we don't set the content-encoding header Assert.assertNull(notFoundJobConnection.getContentEncoding()); - if (Objects.equals(MiniClusterResource.NEW_CODEBASE, System.getProperty(MiniClusterResource.CODEBASE_KEY))) { + if (Objects.equals(CLUSTER.getMiniClusterType(), TestBaseUtils.CodebaseType.NEW)) { Assert.assertEquals("application/json; charset=UTF-8", notFoundJobConnection.getContentType()); } else { Assert.assertEquals("text/plain; charset=UTF-8", notFoundJobConnection.getContentType()); @@ -281,7 +283,7 @@ public void testStop() throws Exception { final Deadline deadline = testTimeout.fromNow(); try (HttpTestClient client = new HttpTestClient("localhost", CLUSTER.getWebUIPort())) { - if (Objects.equals(MiniClusterResource.NEW_CODEBASE, System.getProperty(MiniClusterResource.CODEBASE_KEY))) { + if (Objects.equals(CLUSTER.getMiniClusterType(), TestBaseUtils.CodebaseType.NEW)) { // stop the job client.sendPatchRequest("/jobs/" + jid + "/?mode=stop", deadline.timeLeft()); HttpTestClient.SimpleHttpResponse response = client.getNextResponse(deadline.timeLeft()); @@ -356,7 +358,7 @@ public void testStopYarn() throws Exception { HttpTestClient.SimpleHttpResponse response = client .getNextResponse(deadline.timeLeft()); - if (Objects.equals(MiniClusterResource.NEW_CODEBASE, System.getProperty(MiniClusterResource.CODEBASE_KEY))) { + if (Objects.equals(CLUSTER.getMiniClusterType(), TestBaseUtils.CodebaseType.NEW)) { assertEquals(HttpResponseStatus.ACCEPTED, response.getStatus()); } else { assertEquals(HttpResponseStatus.OK, response.getStatus()); diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java index aefe4f1c1c781..d26620bab1b23 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java @@ -28,6 +28,8 @@ import org.apache.flink.runtime.rest.util.RestClientException; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.test.util.MiniClusterResourceConfiguration; +import org.apache.flink.test.util.TestBaseUtils; import org.apache.flink.util.ExceptionUtils; import org.junit.ClassRule; @@ -62,12 +64,12 @@ public void testRunJar() throws Exception { config.setString(WebOptions.UPLOAD_DIR, uploadDir.toString()); MiniClusterResource clusterResource = new MiniClusterResource( - new MiniClusterResource.MiniClusterResourceConfiguration( - config, - 1, - 1 - ), - MiniClusterResource.MiniClusterType.NEW + new MiniClusterResourceConfiguration.Builder() + .setConfiguration(config) + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(1) + .build(), + TestBaseUtils.CodebaseType.NEW ); clusterResource.before(); diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java index 9407af21623e8..5d29fbb68e2b0 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java @@ -28,6 +28,8 @@ import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.test.util.MiniClusterResourceConfiguration; +import org.apache.flink.test.util.TestBaseUtils; import org.apache.flink.util.TestLogger; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator; @@ -75,12 +77,12 @@ public void setUp() throws Exception { clusterConfig.setString(JobManagerOptions.ARCHIVE_DIR, jmDirectory.toURI().toString()); cluster = new MiniClusterResource( - new MiniClusterResource.MiniClusterResourceConfiguration( - clusterConfig, - 1, - 1 - ), - MiniClusterResource.MiniClusterType.NEW + new MiniClusterResourceConfiguration.Builder() + .setConfiguration(clusterConfig) + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(1) + .build(), + TestBaseUtils.CodebaseType.NEW ); cluster.before(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java index aca4fdbea1fc4..b89617b40b8be 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java @@ -230,7 +230,7 @@ public void start() throws Exception { final Configuration configuration = miniClusterConfiguration.getConfiguration(); final Time rpcTimeout = miniClusterConfiguration.getRpcTimeout(); final int numTaskManagers = miniClusterConfiguration.getNumTaskManagers(); - final boolean useSingleRpcService = miniClusterConfiguration.getRpcServiceSharing() == MiniClusterConfiguration.RpcServiceSharing.SHARED; + final boolean useSingleRpcService = miniClusterConfiguration.getRpcServiceSharing() == RpcServiceSharing.SHARED; try { initializeIOFormatClasses(configuration); @@ -916,7 +916,7 @@ private void terminateMiniClusterServices() throws Exception { @Nonnull private CompletionStage terminateRpcServices() { final int numRpcServices; - if (miniClusterConfiguration.getRpcServiceSharing() == MiniClusterConfiguration.RpcServiceSharing.SHARED) { + if (miniClusterConfiguration.getRpcServiceSharing() == RpcServiceSharing.SHARED) { numRpcServices = 1; } else { numRpcServices = 1 + 2 + miniClusterConfiguration.getNumTaskManagers(); // common, JM, RM, TMs @@ -927,7 +927,7 @@ private CompletionStage terminateRpcServices() { synchronized (lock) { rpcTerminationFutures.add(commonRpcService.stopService()); - if (miniClusterConfiguration.getRpcServiceSharing() != MiniClusterConfiguration.RpcServiceSharing.SHARED) { + if (miniClusterConfiguration.getRpcServiceSharing() != RpcServiceSharing.SHARED) { rpcTerminationFutures.add(jobManagerRpcService.stopService()); rpcTerminationFutures.add(resourceManagerRpcService.stopService()); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java index 0a0c692079443..f728f24111d28 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java @@ -32,7 +32,7 @@ import scala.concurrent.duration.FiniteDuration; -import static org.apache.flink.runtime.minicluster.MiniClusterConfiguration.RpcServiceSharing.SHARED; +import static org.apache.flink.runtime.minicluster.RpcServiceSharing.SHARED; /** * Configuration object for the {@link MiniCluster}. @@ -117,15 +117,6 @@ public String toString() { // Enums // ---------------------------------------------------------------------------------- - /** - * Enum which defines whether the mini cluster components use a shared RpcService - * or whether every component gets its own dedicated RpcService started. - */ - public enum RpcServiceSharing { - SHARED, // a single shared rpc service - DEDICATED // every component gets his own dedicated rpc service - } - // ---------------------------------------------------------------------------------- // Builder // ---------------------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/RpcServiceSharing.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/RpcServiceSharing.java new file mode 100644 index 0000000000000..7b33869142235 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/RpcServiceSharing.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.minicluster; + +/** + * Enum which defines whether the mini cluster components use a shared RpcService + * or whether every component gets its own dedicated RpcService started. + */ +public enum RpcServiceSharing { + SHARED, // a single shared rpc service + DEDICATED // every component gets his own dedicated rpc service +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java index a72e654568c26..30ac84f4b66e3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java @@ -54,7 +54,7 @@ public static void setup() { @Test public void runJobWithSingleRpcService() throws Exception { MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder() - .setRpcServiceSharing(MiniClusterConfiguration.RpcServiceSharing.SHARED) + .setRpcServiceSharing(RpcServiceSharing.SHARED) .setConfiguration(configuration) .build(); @@ -71,7 +71,7 @@ public void runJobWithSingleRpcService() throws Exception { @Test public void runJobWithMultipleRpcServices() throws Exception { MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder() - .setRpcServiceSharing(MiniClusterConfiguration.RpcServiceSharing.DEDICATED) + .setRpcServiceSharing(RpcServiceSharing.DEDICATED) .setConfiguration(configuration) .build(); diff --git a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala index 12522a83ce8ee..c6b43c2920aca 100644 --- a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala +++ b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala @@ -24,7 +24,8 @@ import java.util.Objects import org.apache.flink.configuration.{Configuration, CoreOptions, RestOptions, TaskManagerOptions} import org.apache.flink.runtime.clusterframework.BootstrapTools import org.apache.flink.runtime.minicluster.{MiniCluster, MiniClusterConfiguration, StandaloneMiniCluster} -import org.apache.flink.test.util.MiniClusterResource +import org.apache.flink.test.util.{MiniClusterResource, TestBaseUtils} +import org.apache.flink.test.util.TestBaseUtils.CodebaseType import org.apache.flink.util.TestLogger import org.junit._ import org.junit.rules.TemporaryFolder @@ -321,8 +322,7 @@ object ScalaShellITCase { @BeforeClass def beforeAll(): Unit = { - val isNew = Objects.equals(MiniClusterResource.NEW_CODEBASE, - System.getProperty(MiniClusterResource.CODEBASE_KEY)) + val isNew = TestBaseUtils.isNewCodebase() if (isNew) { configuration.setString(CoreOptions.MODE, CoreOptions.NEW_MODE) // set to different than default so not to interfere with ScalaShellLocalStartupITCase diff --git a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala index a971db82bd605..f13f57be0bcd9 100644 --- a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala +++ b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala @@ -19,11 +19,10 @@ package org.apache.flink.api.scala import java.io._ -import java.util.Objects import org.apache.flink.configuration.{Configuration, CoreOptions} import org.apache.flink.runtime.clusterframework.BootstrapTools -import org.apache.flink.test.util.MiniClusterResource +import org.apache.flink.test.util.TestBaseUtils import org.apache.flink.util.TestLogger import org.junit.rules.TemporaryFolder import org.junit.{Assert, Rule, Test} @@ -87,8 +86,7 @@ class ScalaShellLocalStartupITCase extends TestLogger { System.setOut(new PrintStream(baos)) val configuration = new Configuration() - val mode = if (Objects.equals(MiniClusterResource.NEW_CODEBASE, - System.getProperty(MiniClusterResource.CODEBASE_KEY))) { + val mode = if (TestBaseUtils.isNewCodebase()) { CoreOptions.NEW_MODE } else { CoreOptions.LEGACY_MODE diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java index 65b351daa696e..0b7a3b3f93f29 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java @@ -18,7 +18,6 @@ package org.apache.flink.test.util; -import org.apache.flink.configuration.Configuration; import org.apache.flink.util.FileUtils; import org.junit.ClassRule; @@ -60,10 +59,10 @@ public abstract class AbstractTestBase extends TestBaseUtils { @ClassRule public static MiniClusterResource miniClusterResource = new MiniClusterResource( - new MiniClusterResource.MiniClusterResourceConfiguration( - new Configuration(), - 1, - DEFAULT_PARALLELISM)); + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM) + .build()); @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java index cbe329c61e734..c412bbf899580 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java @@ -18,18 +18,10 @@ package org.apache.flink.test.util; -import org.apache.flink.api.common.time.Time; import org.apache.flink.client.program.ClusterClient; import org.apache.flink.client.program.MiniClusterClient; import org.apache.flink.client.program.StandaloneClusterClient; -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.CoreOptions; -import org.apache.flink.configuration.JobManagerOptions; -import org.apache.flink.configuration.RestOptions; -import org.apache.flink.configuration.TaskManagerOptions; -import org.apache.flink.configuration.UnmodifiableConfiguration; -import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.configuration.*; import org.apache.flink.runtime.minicluster.JobExecutorService; import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.runtime.minicluster.MiniCluster; @@ -38,13 +30,11 @@ import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.Preconditions; - import org.junit.rules.ExternalResource; import org.junit.rules.TemporaryFolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -56,15 +46,11 @@ public class MiniClusterResource extends ExternalResource { private static final Logger LOG = LoggerFactory.getLogger(MiniClusterResource.class); - public static final String CODEBASE_KEY = "codebase"; - - public static final String NEW_CODEBASE = "new"; - private final TemporaryFolder temporaryFolder = new TemporaryFolder(); private final MiniClusterResourceConfiguration miniClusterResourceConfiguration; - private final MiniClusterType miniClusterType; + private final TestBaseUtils.CodebaseType miniClusterType; private JobExecutorService jobExecutorService; @@ -85,30 +71,30 @@ public MiniClusterResource(final MiniClusterResourceConfiguration miniClusterRes } public MiniClusterResource( - final MiniClusterResourceConfiguration miniClusterResourceConfiguration, - final MiniClusterType miniClusterType) { + final MiniClusterResourceConfiguration miniClusterResourceConfiguration, + final TestBaseUtils.CodebaseType miniClusterType) { this(miniClusterResourceConfiguration, miniClusterType, false); } public MiniClusterResource( - final MiniClusterResourceConfiguration miniClusterResourceConfiguration, - final boolean enableClusterClient) { + final MiniClusterResourceConfiguration miniClusterResourceConfiguration, + final boolean enableClusterClient) { this( miniClusterResourceConfiguration, - Objects.equals(NEW_CODEBASE, System.getProperty(CODEBASE_KEY)) ? MiniClusterType.NEW : MiniClusterType.LEGACY, + miniClusterResourceConfiguration.getCodebaseType(), enableClusterClient); } private MiniClusterResource( - final MiniClusterResourceConfiguration miniClusterResourceConfiguration, - final MiniClusterType miniClusterType, - final boolean enableClusterClient) { + final MiniClusterResourceConfiguration miniClusterResourceConfiguration, + final TestBaseUtils.CodebaseType miniClusterType, + final boolean enableClusterClient) { this.miniClusterResourceConfiguration = Preconditions.checkNotNull(miniClusterResourceConfiguration); this.miniClusterType = Preconditions.checkNotNull(miniClusterType); this.enableClusterClient = enableClusterClient; } - public MiniClusterType getMiniClusterType() { + public TestBaseUtils.CodebaseType getMiniClusterType() { return miniClusterType; } @@ -187,7 +173,7 @@ public void after() { } } - private void startJobExecutorService(MiniClusterType miniClusterType) throws Exception { + private void startJobExecutorService(TestBaseUtils.CodebaseType miniClusterType) throws Exception { switch (miniClusterType) { case LEGACY: startLegacyMiniCluster(); @@ -196,7 +182,7 @@ private void startJobExecutorService(MiniClusterType miniClusterType) throws Exc startMiniCluster(); break; default: - throw new FlinkRuntimeException("Unknown MiniClusterType " + miniClusterType + '.'); + throw new FlinkRuntimeException("Unknown MiniClusterType " + miniClusterType + '.'); } } @@ -264,67 +250,4 @@ private void startMiniCluster() throws Exception { webUIPort = miniCluster.getRestAddress().getPort(); } - - /** - * Mini cluster resource configuration object. - */ - public static class MiniClusterResourceConfiguration { - private final Configuration configuration; - - private final int numberTaskManagers; - - private final int numberSlotsPerTaskManager; - - private final Time shutdownTimeout; - - public MiniClusterResourceConfiguration( - Configuration configuration, - int numberTaskManagers, - int numberSlotsPerTaskManager) { - this( - configuration, - numberTaskManagers, - numberSlotsPerTaskManager, - AkkaUtils.getTimeoutAsTime(configuration)); - } - - public MiniClusterResourceConfiguration( - Configuration configuration, - int numberTaskManagers, - int numberSlotsPerTaskManager, - Time shutdownTimeout) { - this.configuration = Preconditions.checkNotNull(configuration); - this.numberTaskManagers = numberTaskManagers; - this.numberSlotsPerTaskManager = numberSlotsPerTaskManager; - this.shutdownTimeout = Preconditions.checkNotNull(shutdownTimeout); - } - - public Configuration getConfiguration() { - return configuration; - } - - public int getNumberTaskManagers() { - return numberTaskManagers; - } - - public int getNumberSlotsPerTaskManager() { - return numberSlotsPerTaskManager; - } - - public Time getShutdownTimeout() { - return shutdownTimeout; - } - } - - // --------------------------------------------- - // Enum definitions - // --------------------------------------------- - - /** - * Type of the mini cluster to start. - */ - public enum MiniClusterType { - LEGACY, - NEW - } } diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResourceConfiguration.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResourceConfiguration.java new file mode 100644 index 0000000000000..c9389201ed853 --- /dev/null +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResourceConfiguration.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.util; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.minicluster.RpcServiceSharing; +import org.apache.flink.util.Preconditions; + +/** + * Mini cluster resource configuration object. + */ +public class MiniClusterResourceConfiguration { + + private final Configuration configuration; + + private final int numberTaskManagers; + + private final int numberSlotsPerTaskManager; + + private final Time shutdownTimeout; + + private final TestBaseUtils.CodebaseType codebaseType; + + private final RpcServiceSharing rpcServiceSharing; + + MiniClusterResourceConfiguration( + Configuration configuration, + int numberTaskManagers, + int numberSlotsPerTaskManager, + Time shutdownTimeout, + TestBaseUtils.CodebaseType codebaseType, + RpcServiceSharing rpcServiceSharing) { + this.configuration = Preconditions.checkNotNull(configuration); + this.numberTaskManagers = numberTaskManagers; + this.numberSlotsPerTaskManager = numberSlotsPerTaskManager; + this.shutdownTimeout = Preconditions.checkNotNull(shutdownTimeout); + this.codebaseType = Preconditions.checkNotNull(codebaseType); + this.rpcServiceSharing = Preconditions.checkNotNull(rpcServiceSharing); + } + + public Configuration getConfiguration() { + return configuration; + } + + public int getNumberTaskManagers() { + return numberTaskManagers; + } + + public int getNumberSlotsPerTaskManager() { + return numberSlotsPerTaskManager; + } + + public Time getShutdownTimeout() { + return shutdownTimeout; + } + + /** + * @deprecated Will be irrelevant once the legacy mode has been removed. + */ + @Deprecated + public TestBaseUtils.CodebaseType getCodebaseType() { + return codebaseType; + } + + public RpcServiceSharing getRpcServiceSharing() { + return rpcServiceSharing; + } + + /** + * Builder for {@link MiniClusterResourceConfiguration}. + */ + public static final class Builder { + + private Configuration configuration = new Configuration(); + private int numberTaskManagers = 1; + private int numberSlotsPerTaskManager = 1; + private Time shutdownTimeout = AkkaUtils.getTimeoutAsTime(configuration); + private TestBaseUtils.CodebaseType codebaseType = TestBaseUtils.getCodebaseType(); + + private RpcServiceSharing rpcServiceSharing = RpcServiceSharing.SHARED; + + public Builder setConfiguration(Configuration configuration) { + this.configuration = configuration; + return this; + } + + public Builder setNumberTaskManagers(int numberTaskManagers) { + this.numberTaskManagers = numberTaskManagers; + return this; + } + + public Builder setNumberSlotsPerTaskManager(int numberSlotsPerTaskManager) { + this.numberSlotsPerTaskManager = numberSlotsPerTaskManager; + return this; + } + + public Builder setShutdownTimeout(Time shutdownTimeout) { + this.shutdownTimeout = shutdownTimeout; + return this; + } + + /** + * @deprecated Will be irrelevant once the legacy mode has been removed. + */ + @Deprecated + public Builder setCodebaseType(TestBaseUtils.CodebaseType codebaseType) { + this.codebaseType = codebaseType; + return this; + } + + public Builder setRpcServiceSharing(RpcServiceSharing rpcServiceSharing) { + this.rpcServiceSharing = rpcServiceSharing; + return this; + } + + public MiniClusterResourceConfiguration build() { + return new MiniClusterResourceConfiguration(configuration, numberTaskManagers, numberSlotsPerTaskManager, shutdownTimeout, codebaseType, rpcServiceSharing); + } + } +} diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java index 7e9b12ed080ea..10fe1f0850105 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java @@ -44,6 +44,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nonnull; + import java.io.BufferedInputStream; import java.io.BufferedReader; import java.io.File; @@ -103,6 +105,9 @@ public class TestBaseUtils extends TestLogger { public static final Time DEFAULT_HTTP_TIMEOUT = Time.seconds(10L); + static final String NEW_CODEBASE = "new"; + static final String CODEBASE_KEY = "codebase"; + // ------------------------------------------------------------------------ protected static File logDir; @@ -673,6 +678,26 @@ public static String getFromHTTP(String url, Time timeout) throws Exception { throw new TimeoutException("Could not get HTTP response in time since the service is still unavailable."); } + @Nonnull + public static CodebaseType getCodebaseType() { + return Objects.equals(NEW_CODEBASE, System.getProperty(CODEBASE_KEY)) ? CodebaseType.NEW : CodebaseType.LEGACY; + } + + public static boolean isNewCodebase() { + return CodebaseType.NEW == getCodebaseType(); + } + + /** + * Type of the mini cluster to start. + * + * @deprecated Will be irrelevant once the legacy mode has been removed. + */ + @Deprecated + public enum CodebaseType { + LEGACY, + NEW + } + /** * Comparator for comparable Tuples. * @param tuple type diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java index 3d90833cbd02f..d186e36c9ff6c 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java @@ -29,6 +29,7 @@ import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.test.util.MiniClusterResourceConfiguration; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.TestLogger; @@ -50,10 +51,10 @@ public class AccumulatorErrorITCase extends TestLogger { @ClassRule public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource( - new MiniClusterResource.MiniClusterResourceConfiguration( - getConfiguration(), - 2, - 3)); + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(2) + .setNumberSlotsPerTaskManager(3) + .build()); public static Configuration getConfiguration() { Configuration config = new Configuration(); diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java index 347c2e3f65787..5944d3acca526 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java @@ -42,6 +42,7 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.test.util.MiniClusterResourceConfiguration; import org.apache.flink.util.Collector; import org.apache.flink.util.TestLogger; @@ -84,10 +85,11 @@ public class AccumulatorLiveITCase extends TestLogger { @ClassRule public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource( - new MiniClusterResource.MiniClusterResourceConfiguration( - getConfiguration(), - 1, - 1), + new MiniClusterResourceConfiguration.Builder() + .setConfiguration(getConfiguration()) + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(1) + .build(), true); private static Configuration getConfiguration() { diff --git a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java index cac16f038dfed..871df7d46ebff 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java @@ -33,6 +33,7 @@ import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.test.util.MiniClusterResourceConfiguration; import org.apache.flink.util.TestLogger; import org.junit.Assert; @@ -56,10 +57,11 @@ public abstract class CancelingTestBase extends TestLogger { @ClassRule public static final MiniClusterResource CLUSTER = new MiniClusterResource( - new MiniClusterResource.MiniClusterResourceConfiguration( - getConfiguration(), - 2, - 4), + new MiniClusterResourceConfiguration.Builder() + .setConfiguration(getConfiguration()) + .setNumberTaskManagers(2) + .setNumberSlotsPerTaskManager(4) + .build(), true); // -------------------------------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java index 61baefa05d8ad..a1e31e93ee836 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java @@ -45,6 +45,7 @@ import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.test.util.MiniClusterResourceConfiguration; import org.apache.flink.test.util.SuccessException; import org.apache.flink.util.Collector; import org.apache.flink.util.TestLogger; @@ -106,10 +107,11 @@ enum StateBackendEnum { protected final MiniClusterResource getMiniClusterResource() { return new MiniClusterResource( - new MiniClusterResource.MiniClusterResourceConfiguration( - getConfigurationSafe(), - 2, - PARALLELISM / 2)); + new MiniClusterResourceConfiguration.Builder() + .setConfiguration(getConfigurationSafe()) + .setNumberTaskManagers(2) + .setNumberSlotsPerTaskManager(PARALLELISM / 2) + .build()); } private Configuration getConfigurationSafe() { diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java index 07167a9fd6e37..b82abb9f8c406 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java @@ -37,6 +37,7 @@ import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.test.util.MiniClusterResourceConfiguration; import org.apache.flink.test.util.SuccessException; import org.apache.flink.util.Collector; import org.apache.flink.util.TestLogger; @@ -66,10 +67,11 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger { @ClassRule public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource( - new MiniClusterResource.MiniClusterResourceConfiguration( - getConfiguration(), - 2, - PARALLELISM / 2)); + new MiniClusterResourceConfiguration.Builder() + .setConfiguration(getConfiguration()) + .setNumberTaskManagers(2) + .setNumberSlotsPerTaskManager(PARALLELISM / 2) + .build()); private static Configuration getConfiguration() { Configuration config = new Configuration(); diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java index e5868c3023356..caa1791f70949 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java @@ -37,6 +37,7 @@ import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.test.util.MiniClusterResourceConfiguration; import org.apache.flink.util.TestLogger; import org.junit.ClassRule; @@ -79,10 +80,11 @@ public class KeyedStateCheckpointingITCase extends TestLogger { @ClassRule public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource( - new MiniClusterResource.MiniClusterResourceConfiguration( - getConfiguration(), - NUM_TASK_MANAGERS, - NUM_TASK_SLOTS)); + new MiniClusterResourceConfiguration.Builder() + .setConfiguration(getConfiguration()) + .setNumberTaskManagers(NUM_TASK_MANAGERS) + .setNumberSlotsPerTaskManager(NUM_TASK_SLOTS) + .build()); private static Configuration getConfiguration() { Configuration config = new Configuration(); diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java index 55631a2f831d5..e7fcefd7e2c99 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java @@ -52,6 +52,7 @@ import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.test.util.MiniClusterResourceConfiguration; import org.apache.flink.util.Collector; import org.apache.flink.util.FlinkException; import org.apache.flink.util.TestLogger; @@ -127,10 +128,11 @@ public void setup() throws Exception { config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString()); cluster = new MiniClusterResource( - new MiniClusterResource.MiniClusterResourceConfiguration( - config, - numTaskManagers, - slotsPerTaskManager), + new MiniClusterResourceConfiguration.Builder() + .setConfiguration(config) + .setNumberTaskManagers(numTaskManagers) + .setNumberSlotsPerTaskManager(numSlots) + .build(), true); cluster.before(); } diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java index aebaa63edab27..f4fe8e71c6fc2 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java @@ -37,6 +37,7 @@ import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.test.state.ManualWindowSpeedITCase; import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.test.util.MiniClusterResourceConfiguration; import org.apache.flink.util.TestLogger; import org.apache.curator.test.TestingServer; @@ -263,10 +264,11 @@ private void testExternalizedCheckpoints( } MiniClusterResource cluster = new MiniClusterResource( - new MiniClusterResource.MiniClusterResourceConfiguration( - config, - NUM_TASK_MANAGERS, - SLOTS_PER_TASK_MANAGER), + new MiniClusterResourceConfiguration.Builder() + .setConfiguration(config) + .setNumberTaskManagers(NUM_TASK_MANAGERS) + .setNumberSlotsPerTaskManager(SLOTS_PER_TASK_MANAGER) + .build(), true); cluster.before(); diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java index 28051446ced0d..b46c485605b0c 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java @@ -49,6 +49,7 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.graph.StreamGraph; import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.test.util.MiniClusterResourceConfiguration; import org.apache.flink.util.Collector; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.TestLogger; @@ -227,11 +228,11 @@ public void testSubmitWithUnknownSavepointPath() throws Exception { config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString()); MiniClusterResource cluster = new MiniClusterResource( - new MiniClusterResource.MiniClusterResourceConfiguration( - config, - numTaskManagers, - numSlotsPerTaskManager - ), + new MiniClusterResourceConfiguration.Builder() + .setConfiguration(config) + .setNumberTaskManagers(numTaskManagers) + .setNumberSlotsPerTaskManager(numSlotsPerTaskManager) + .build(), true); cluster.before(); ClusterClient client = cluster.getClusterClient(); @@ -296,11 +297,11 @@ public void testCanRestoreWithModifiedStatelessOperators() throws Exception { // Start Flink MiniClusterResource cluster = new MiniClusterResource( - new MiniClusterResource.MiniClusterResourceConfiguration( - config, - numTaskManagers, - numSlotsPerTaskManager - ), + new MiniClusterResourceConfiguration.Builder() + .setConfiguration(config) + .setNumberTaskManagers(numTaskManagers) + .setNumberSlotsPerTaskManager(numSlotsPerTaskManager) + .build(), true); LOG.info("Shutting down Flink cluster."); @@ -341,11 +342,11 @@ public void testCanRestoreWithModifiedStatelessOperators() throws Exception { // create a new TestingCluster to make sure we start with completely // new resources cluster = new MiniClusterResource( - new MiniClusterResource.MiniClusterResourceConfiguration( - config, - numTaskManagers, - numSlotsPerTaskManager - ), + new MiniClusterResourceConfiguration.Builder() + .setConfiguration(config) + .setNumberTaskManagers(numTaskManagers) + .setNumberSlotsPerTaskManager(numSlotsPerTaskManager) + .build(), true); LOG.info("Restarting Flink cluster."); cluster.before(); @@ -578,11 +579,11 @@ public Integer map(Integer value) throws Exception { config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString()); MiniClusterResource cluster = new MiniClusterResource( - new MiniClusterResource.MiniClusterResourceConfiguration( - config, - 1, - 2 * jobGraph.getMaximumParallelism() - ), + new MiniClusterResourceConfiguration.Builder() + .setConfiguration(config) + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(2 * jobGraph.getMaximumParallelism()) + .build(), true); cluster.before(); ClusterClient client = cluster.getClusterClient(); @@ -708,11 +709,11 @@ private MiniClusterResourceFactory(int numTaskManagers, int numSlotsPerTaskManag MiniClusterResource get() { return new MiniClusterResource( - new MiniClusterResource.MiniClusterResourceConfiguration( - config, - numTaskManagers, - numSlotsPerTaskManager - ), + new MiniClusterResourceConfiguration.Builder() + .setConfiguration(config) + .setNumberTaskManagers(numTaskManagers) + .setNumberSlotsPerTaskManager(numSlotsPerTaskManager) + .build(), true); } } diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java index 5ac12146b71ab..0ed9d6b2195ae 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java @@ -19,9 +19,9 @@ package org.apache.flink.test.checkpointing; import org.apache.flink.api.common.restartstrategy.RestartStrategies; -import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.test.util.MiniClusterResourceConfiguration; import org.apache.flink.test.util.TestUtils; import org.apache.flink.util.TestLogger; @@ -42,10 +42,10 @@ public abstract class StreamFaultToleranceTestBase extends TestLogger { @ClassRule public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource( - new MiniClusterResource.MiniClusterResourceConfiguration( - new Configuration(), - NUM_TASK_MANAGERS, - NUM_TASK_SLOTS)); + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(NUM_TASK_MANAGERS) + .setNumberSlotsPerTaskManager(NUM_TASK_SLOTS) + .build()); /** * Implementations are expected to assemble the test topology in this function diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java index bc73717dbeec9..1b93684254b0d 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java @@ -35,6 +35,7 @@ import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.test.util.MiniClusterResourceConfiguration; import org.apache.flink.test.util.SuccessException; import org.apache.flink.util.Collector; import org.apache.flink.util.TestLogger; @@ -74,10 +75,11 @@ public WindowCheckpointingITCase(TimeCharacteristic timeCharacteristic) { @ClassRule public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource( - new MiniClusterResource.MiniClusterResourceConfiguration( - getConfiguration(), - 2, - PARALLELISM / 2)); + new MiniClusterResourceConfiguration.Builder() + .setConfiguration(getConfiguration()) + .setNumberTaskManagers(2) + .setNumberSlotsPerTaskManager(PARALLELISM / 2) + .build()); private static Configuration getConfiguration() { Configuration config = new Configuration(); diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java index 156d4486c627b..6fb18e825945e 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java @@ -44,6 +44,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.test.util.MiniClusterResourceConfiguration; import org.apache.flink.util.Preconditions; import org.apache.flink.util.TestLogger; @@ -109,10 +110,11 @@ public static void setup() throws Exception { // we have to manage this manually because we have to create the ZooKeeper server // ahead of this miniClusterResource = new MiniClusterResource( - new MiniClusterResource.MiniClusterResourceConfiguration( - config, - NUM_TMS, - NUM_SLOTS_PER_TM), + new MiniClusterResourceConfiguration.Builder() + .setConfiguration(config) + .setNumberTaskManagers(NUM_TMS) + .setNumberSlotsPerTaskManager(NUM_SLOTS_PER_TM) + .build(), true); miniClusterResource.before(); diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java index 84cb88adaaf22..0d43aa924be41 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java @@ -32,6 +32,7 @@ import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.test.util.MiniClusterResourceConfiguration; import org.apache.flink.test.util.TestBaseUtils; import org.apache.flink.util.OptionalFailure; @@ -88,10 +89,11 @@ protected static String getResourceFilename(String filename) { protected SavepointMigrationTestBase() throws Exception { miniClusterResource = new MiniClusterResource( - new MiniClusterResource.MiniClusterResourceConfiguration( - getConfiguration(), - 1, - DEFAULT_PARALLELISM), + new MiniClusterResourceConfiguration.Builder() + .setConfiguration(getConfiguration()) + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM) + .build(), true); } diff --git a/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java b/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java index d8ed2ee1df115..93bec744cde09 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java @@ -30,6 +30,8 @@ import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.test.util.MiniClusterResourceConfiguration; +import org.apache.flink.test.util.TestBaseUtils; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.TestLogger; @@ -53,12 +55,11 @@ public class JobRetrievalITCase extends TestLogger { @ClassRule public static final MiniClusterResource CLUSTER = new MiniClusterResource( - new MiniClusterResource.MiniClusterResourceConfiguration( - new Configuration(), - 1, - 4 - ), - MiniClusterResource.MiniClusterType.NEW + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(4) + .build(), + TestBaseUtils.CodebaseType.NEW ); private RestClusterClient client; diff --git a/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java index ecd16a1ff30b8..5ca9ba242792e 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java @@ -26,6 +26,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.testtasks.NoOpInvokable; import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.test.util.MiniClusterResourceConfiguration; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.TestLogger; @@ -51,10 +52,11 @@ public class JobSubmissionFailsITCase extends TestLogger { @ClassRule public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource( - new MiniClusterResource.MiniClusterResourceConfiguration( - getConfiguration(), - NUM_TM, - NUM_SLOTS / NUM_TM), + new MiniClusterResourceConfiguration.Builder() + .setConfiguration(getConfiguration()) + .setNumberTaskManagers(NUM_TM) + .setNumberSlotsPerTaskManager(NUM_SLOTS / NUM_TM) + .build(), true); private static Configuration getConfiguration() { diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java b/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java index a5b01bcd4f444..5aabd31c1f046 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java +++ b/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java @@ -27,6 +27,7 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.test.util.MiniClusterResourceConfiguration; import static org.junit.Assert.fail; @@ -55,10 +56,11 @@ public static void main(String[] args) throws Exception { config.setInteger("taskmanager.net.client.numThreads", 1); cluster = new MiniClusterResource( - new MiniClusterResource.MiniClusterResourceConfiguration( - config, - taskManagers, - slotsPerTaskManager)); + new MiniClusterResourceConfiguration.Builder() + .setConfiguration(config) + .setNumberTaskManagers(taskManagers) + .setNumberSlotsPerTaskManager(slotsPerTaskManager) + .build()); cluster.before(); runPartitioningProgram(parallelism); diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java index 074b72131e5c6..189962c78c789 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java @@ -24,12 +24,11 @@ import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.io.LocalCollectionOutputFormat; -import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.GenericInputSplit; import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder; import org.apache.flink.test.util.MiniClusterResource; -import org.apache.flink.test.util.MiniClusterResource.MiniClusterResourceConfiguration; -import org.apache.flink.test.util.MiniClusterResource.MiniClusterType; +import org.apache.flink.test.util.MiniClusterResourceConfiguration; +import org.apache.flink.test.util.TestBaseUtils; import org.apache.flink.util.Collector; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.TestLogger; @@ -56,10 +55,10 @@ public class AutoParallelismITCase extends TestLogger { @ClassRule public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource( - new MiniClusterResourceConfiguration( - new Configuration(), - NUM_TM, - SLOTS_PER_TM)); + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(NUM_TM) + .setNumberSlotsPerTaskManager(SLOTS_PER_TM) + .build()); @Test public void testProgramWithAutoParallelism() throws Exception { @@ -81,7 +80,7 @@ public void testProgramWithAutoParallelism() throws Exception { assertEquals(PARALLELISM, resultCollection.size()); } catch (Exception ex) { - if (MINI_CLUSTER_RESOURCE.getMiniClusterType().equals(MiniClusterType.LEGACY)) { + if (MINI_CLUSTER_RESOURCE.getMiniClusterType().equals(TestBaseUtils.CodebaseType.LEGACY)) { throw ex; } assertTrue( diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java index ab937c922af19..ddc7dd8d14a1d 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java @@ -28,6 +28,7 @@ import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.test.util.MiniClusterResourceConfiguration; import org.apache.flink.types.Value; import org.apache.flink.util.TestLogger; @@ -50,10 +51,11 @@ public class CustomSerializationITCase extends TestLogger { @ClassRule public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource( - new MiniClusterResource.MiniClusterResourceConfiguration( - getConfiguration(), - 1, - PARLLELISM)); + new MiniClusterResourceConfiguration.Builder() + .setConfiguration(getConfiguration()) + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(PARLLELISM) + .build()); public static Configuration getConfiguration() { Configuration config = new Configuration(); diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java index 6099934351eae..6b3371dd4ffe3 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java @@ -29,6 +29,7 @@ import org.apache.flink.core.fs.FileSystem; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.test.util.MiniClusterResourceConfiguration; import org.apache.flink.util.Collector; import org.apache.flink.util.TestLogger; @@ -55,10 +56,10 @@ public class MiscellaneousIssuesITCase extends TestLogger { @ClassRule public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource( - new MiniClusterResource.MiniClusterResourceConfiguration( - new Configuration(), - 2, - 3)); + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(2) + .setNumberSlotsPerTaskManager(3) + .build()); @Test public void testNullValues() { diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java index e149f433d1b5a..c9050716a3a99 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java @@ -33,6 +33,7 @@ import org.apache.flink.examples.java.graph.util.ConnectedComponentsData; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.test.util.MiniClusterResourceConfiguration; import org.apache.flink.util.TestLogger; import org.junit.ClassRule; @@ -52,10 +53,11 @@ public class SuccessAfterNetworkBuffersFailureITCase extends TestLogger { @ClassRule public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource( - new MiniClusterResource.MiniClusterResourceConfiguration( - getConfiguration(), - 2, - 8)); + new MiniClusterResourceConfiguration.Builder() + .setConfiguration(getConfiguration()) + .setNumberTaskManagers(2) + .setNumberSlotsPerTaskManager(8) + .build()); private static Configuration getConfiguration() { Configuration config = new Configuration(); diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/CustomDistributionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/CustomDistributionITCase.java index 74b8cf74bc926..a20a0d0c9221e 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/operators/CustomDistributionITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/operators/CustomDistributionITCase.java @@ -27,11 +27,11 @@ import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.utils.DataSetUtils; -import org.apache.flink.configuration.Configuration; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.test.operators.util.CollectionDataSets; import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.test.util.MiniClusterResourceConfiguration; import org.apache.flink.util.Collector; import org.apache.flink.util.TestLogger; @@ -50,10 +50,10 @@ public class CustomDistributionITCase extends TestLogger { @ClassRule public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource( - new MiniClusterResource.MiniClusterResourceConfiguration( - new Configuration(), - 1, - 8)); + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(8) + .build()); // ------------------------------------------------------------------------ diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFailureRateStrategyITBase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFailureRateStrategyITBase.java index 424f5409f7a62..d3247f22c782a 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFailureRateStrategyITBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFailureRateStrategyITBase.java @@ -21,6 +21,7 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.test.util.MiniClusterResourceConfiguration; import org.junit.ClassRule; @@ -31,10 +32,10 @@ public class SimpleRecoveryFailureRateStrategyITBase extends SimpleRecoveryITCas @ClassRule public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource( - new MiniClusterResource.MiniClusterResourceConfiguration( - getConfiguration(), - 2, - 2)); + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(2) + .setNumberSlotsPerTaskManager(2) + .build()); private static Configuration getConfiguration() { Configuration config = new Configuration(); diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFixedDelayRestartStrategyITBase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFixedDelayRestartStrategyITBase.java index 1f52d274478d3..0ccb3fe14f806 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFixedDelayRestartStrategyITBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFixedDelayRestartStrategyITBase.java @@ -21,6 +21,7 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.test.util.MiniClusterResourceConfiguration; import org.junit.ClassRule; @@ -31,10 +32,11 @@ public class SimpleRecoveryFixedDelayRestartStrategyITBase extends SimpleRecover @ClassRule public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource( - new MiniClusterResource.MiniClusterResourceConfiguration( - getConfiguration(), - 2, - 2)); + new MiniClusterResourceConfiguration.Builder() + .setConfiguration(getConfiguration()) + .setNumberTaskManagers(2) + .setNumberSlotsPerTaskManager(2) + .build()); private static Configuration getConfiguration() { Configuration config = new Configuration(); diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java index 4a7b3459049eb..527258193d161 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java @@ -29,6 +29,7 @@ import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.test.testdata.WordCountData; import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.test.util.MiniClusterResourceConfiguration; import org.apache.flink.test.util.TestBaseUtils; import org.apache.flink.util.Collector; import org.apache.flink.util.NetUtils; @@ -60,10 +61,11 @@ public class IPv6HostnamesITCase extends TestLogger { @Rule public final MiniClusterResource miniClusterResource = new MiniClusterResource( - new MiniClusterResource.MiniClusterResourceConfiguration( - getConfiguration(), - 2, - 2)); + new MiniClusterResourceConfiguration.Builder() + .setConfiguration(getConfiguration()) + .setNumberTaskManagers(2) + .setNumberSlotsPerTaskManager(2) + .build()); private Configuration getConfiguration() { final Inet6Address ipv6address = getLocalIPv6Address(); diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/NettyEpollITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/NettyEpollITCase.java index e8914eca1a7d2..555993541f7e6 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/runtime/NettyEpollITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/NettyEpollITCase.java @@ -23,7 +23,7 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.test.util.MiniClusterResource; -import org.apache.flink.test.util.MiniClusterResource.MiniClusterResourceConfiguration; +import org.apache.flink.test.util.MiniClusterResourceConfiguration; import org.apache.flink.util.TestLogger; import org.junit.AssumptionViolatedException; @@ -75,10 +75,11 @@ private MiniClusterResource trySetUpCluster() throws Exception { Configuration config = new Configuration(); config.setString(TRANSPORT_TYPE, "epoll"); MiniClusterResource cluster = new MiniClusterResource( - new MiniClusterResourceConfiguration( - config, - NUM_TASK_MANAGERS, - 1), + new MiniClusterResourceConfiguration.Builder() + .setConfiguration(config) + .setNumberTaskManagers(NUM_TASK_MANAGERS) + .setNumberSlotsPerTaskManager(1) + .build(), true); cluster.before(); return cluster; diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java index b5c233afcbd83..c2748a506aad9 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java @@ -20,7 +20,6 @@ import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.client.program.ClusterClient; -import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.IOReadableWritable; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; @@ -34,6 +33,7 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.test.util.MiniClusterResourceConfiguration; import org.apache.flink.util.TestLogger; import org.junit.Test; @@ -235,11 +235,10 @@ public void testThroughput() throws Exception { final int numTaskManagers = parallelism / numSlotsPerTaskManager; final MiniClusterResource cluster = new MiniClusterResource( - new MiniClusterResource.MiniClusterResourceConfiguration( - new Configuration(), - numTaskManagers, - numSlotsPerTaskManager - ), + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(numTaskManagers) + .setNumberSlotsPerTaskManager(numSlotsPerTaskManager) + .build(), true ); cluster.before(); diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java index b5c2aafb55fc0..60027c9641a27 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java @@ -22,7 +22,6 @@ import org.apache.flink.api.common.time.Deadline; import org.apache.flink.api.common.time.Time; import org.apache.flink.client.program.ClusterClient; -import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializers; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.jobgraph.JobGraph; @@ -35,6 +34,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator; import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.test.util.MiniClusterResourceConfiguration; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.TestLogger; @@ -50,7 +50,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; - import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -73,10 +72,10 @@ public abstract class AbstractOperatorRestoreTestBase extends TestLogger { @ClassRule public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource( - new MiniClusterResource.MiniClusterResourceConfiguration( - new Configuration(), - NUM_TMS, - NUM_SLOTS_PER_TM), + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(NUM_TMS) + .setNumberSlotsPerTaskManager(NUM_SLOTS_PER_TM) + .build(), true); private final boolean allowNonRestoredState; diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java index 3b46c8259e51e..21051c580c413 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java @@ -46,6 +46,7 @@ import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.test.util.MiniClusterResourceConfiguration; import org.apache.flink.util.TestLogger; import org.junit.Assert; @@ -78,10 +79,11 @@ public class TimestampITCase extends TestLogger { @ClassRule public static final MiniClusterResource CLUSTER = new MiniClusterResource( - new MiniClusterResource.MiniClusterResourceConfiguration( - getConfiguration(), - NUM_TASK_MANAGERS, - NUM_TASK_SLOTS), + new MiniClusterResourceConfiguration.Builder() + .setConfiguration(getConfiguration()) + .setNumberTaskManagers(NUM_TASK_MANAGERS) + .setNumberSlotsPerTaskManager(NUM_TASK_SLOTS) + .build(), true); private static Configuration getConfiguration() { diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java index bfb8c3de6f794..514a3d55fff2f 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java @@ -80,9 +80,6 @@ import java.util.concurrent.ConcurrentMap; import java.util.regex.Pattern; -import static org.apache.flink.test.util.MiniClusterResource.CODEBASE_KEY; -import static org.apache.flink.test.util.MiniClusterResource.NEW_CODEBASE; - /** * This base class allows to use the MiniYARNCluster. * The cluster is re-used for all tests. @@ -220,7 +217,8 @@ public void checkClusterEmpty() throws IOException, YarnException { } flinkConfiguration = new org.apache.flink.configuration.Configuration(globalConfiguration); - isNewMode = Objects.equals(NEW_CODEBASE, System.getProperty(CODEBASE_KEY)); + + isNewMode = Objects.equals(TestBaseUtils.CodebaseType.NEW, TestBaseUtils.getCodebaseType()); } @Nullable @@ -536,7 +534,7 @@ private static void start(YarnConfiguration conf, String principal, String keyta FileUtils.copyDirectory(new File(confDirPath), tempConfPathForSecureRun); globalConfiguration.setString(CoreOptions.MODE, - Objects.equals(NEW_CODEBASE, System.getProperty(CODEBASE_KEY)) ? CoreOptions.NEW_MODE : CoreOptions.LEGACY_MODE); + Objects.equals(TestBaseUtils.CodebaseType.NEW, TestBaseUtils.getCodebaseType()) ? CoreOptions.NEW_MODE : CoreOptions.LEGACY_MODE); BootstrapTools.writeConfiguration( globalConfiguration,