Skip to content

Commit

Permalink
[FLINK-25550][k8s] Migrate tests to JUnit 5
Browse files Browse the repository at this point in the history
  • Loading branch information
reswqa authored Apr 27, 2022
1 parent 5f8fb30 commit 6fc5f78
Show file tree
Hide file tree
Showing 58 changed files with 1,940 additions and 2,152 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.kubernetes.executors.KubernetesSessionClusterExecutor;

import org.junit.Test;
import org.junit.jupiter.api.Test;

import static org.junit.Assert.assertTrue;
import static org.assertj.core.api.Assertions.assertThat;

/** Test for the {@link KubernetesClusterClientFactory} discovery. */
public class KubernetesClusterClientFactoryTest {
class KubernetesClusterClientFactoryTest {

@Test
public void testKubernetesClusterClientFactoryDiscoveryWithSessionExecutor() {
void testKubernetesClusterClientFactoryDiscoveryWithSessionExecutor() {
testKubernetesClusterClientFactoryDiscoveryHelper(KubernetesSessionClusterExecutor.NAME);
}

Expand All @@ -45,6 +45,6 @@ private void testKubernetesClusterClientFactoryDiscoveryHelper(final String targ
final ClusterClientFactory<String> factory =
serviceLoader.getClusterClientFactory(configuration);

assertTrue(factory instanceof KubernetesClusterClientFactory);
assertThat(factory).isInstanceOf(KubernetesClusterClientFactory.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,21 +40,17 @@
import io.fabric8.kubernetes.api.model.EnvVar;
import io.fabric8.kubernetes.api.model.Service;
import io.fabric8.kubernetes.api.model.apps.Deployment;
import org.junit.Test;
import org.junit.jupiter.api.Test;

import java.util.Arrays;
import java.util.Collections;
import java.util.stream.Collectors;

import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows;
import static org.apache.flink.kubernetes.utils.Constants.ENV_FLINK_POD_IP_ADDRESS;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Tests for the {@link KubernetesClusterDescriptor}. */
public class KubernetesClusterDescriptorTest extends KubernetesClientTestBase {
class KubernetesClusterDescriptorTest extends KubernetesClientTestBase {
private static final String MOCK_SERVICE_HOST_NAME = "mock-host-name-of-service";
private static final String MOCK_SERVICE_IP = "192.168.0.1";

Expand All @@ -77,7 +73,7 @@ protected void onSetup() throws Exception {
}

@Test
public void testDeploySessionCluster() throws Exception {
void testDeploySessionCluster() throws Exception {
flinkConfig.set(DeploymentOptions.TARGET, KubernetesDeploymentTarget.SESSION.getName());
final ClusterClient<String> clusterClient = deploySessionCluster().getClusterClient();
checkClusterClient(clusterClient);
Expand All @@ -86,7 +82,7 @@ public void testDeploySessionCluster() throws Exception {
}

@Test
public void testDeployHighAvailabilitySessionCluster() throws ClusterDeploymentException {
void testDeployHighAvailabilitySessionCluster() throws ClusterDeploymentException {
flinkConfig.set(DeploymentOptions.TARGET, KubernetesDeploymentTarget.SESSION.getName());
flinkConfig.setString(
HighAvailabilityOptions.HA_MODE, HighAvailabilityMode.ZOOKEEPER.toString());
Expand All @@ -105,33 +101,30 @@ public void testDeployHighAvailabilitySessionCluster() throws ClusterDeploymentE
.getSpec()
.getContainers()
.get(0);
assertTrue(
"Environment " + ENV_FLINK_POD_IP_ADDRESS + " should be set.",
jmContainer.getEnv().stream()
.map(EnvVar::getName)
.collect(Collectors.toList())
.contains(ENV_FLINK_POD_IP_ADDRESS));
assertThat(jmContainer.getEnv().stream().map(EnvVar::getName))
.withFailMessage("Environment " + ENV_FLINK_POD_IP_ADDRESS + " should be set.")
.contains(ENV_FLINK_POD_IP_ADDRESS);

clusterClient.close();
}

@Test
public void testKillCluster() throws Exception {
void testKillCluster() throws Exception {
flinkConfig.set(DeploymentOptions.TARGET, KubernetesDeploymentTarget.SESSION.getName());
deploySessionCluster();

assertEquals(2, kubeClient.services().list().getItems().size());
assertThat(kubeClient.services().list().getItems()).hasSize(2);

descriptor.killCluster(CLUSTER_ID);

// Mock kubernetes server do not delete the accompanying resources by gc.
assertTrue(kubeClient.apps().deployments().list().getItems().isEmpty());
assertEquals(2, kubeClient.services().list().getItems().size());
assertEquals(1, kubeClient.configMaps().list().getItems().size());
assertThat(kubeClient.apps().deployments().list().getItems()).isEmpty();
assertThat(kubeClient.services().list().getItems()).hasSize(2);
assertThat(kubeClient.configMaps().list().getItems()).hasSize(1);
}

@Test
public void testDeployApplicationCluster() {
void testDeployApplicationCluster() {
flinkConfig.set(
PipelineOptions.JARS, Collections.singletonList("local:///path/of/user.jar"));
flinkConfig.set(DeploymentOptions.TARGET, KubernetesDeploymentTarget.APPLICATION.getName());
Expand All @@ -148,53 +141,70 @@ public void testDeployApplicationCluster() {
}

@Test
public void testDeployApplicationClusterWithNonLocalSchema() {
void testDeployApplicationClusterWithNonLocalSchema() {
flinkConfig.set(
PipelineOptions.JARS, Collections.singletonList("file:///path/of/user.jar"));
flinkConfig.set(DeploymentOptions.TARGET, KubernetesDeploymentTarget.APPLICATION.getName());
assertThrows(
"Only \"local\" is supported as schema for application mode.",
IllegalArgumentException.class,
() -> descriptor.deployApplicationCluster(clusterSpecification, appConfig));
assertThatThrownBy(
() -> descriptor.deployApplicationCluster(clusterSpecification, appConfig))
.satisfies(
cause ->
assertThat(cause)
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining(
"Only \"local\" is supported as schema for application mode."));
}

@Test
public void testDeployApplicationClusterWithClusterAlreadyExists() {
void testDeployApplicationClusterWithClusterAlreadyExists() {
flinkConfig.set(
PipelineOptions.JARS, Collections.singletonList("local:///path/of/user.jar"));
flinkConfig.set(DeploymentOptions.TARGET, KubernetesDeploymentTarget.APPLICATION.getName());
mockExpectedServiceFromServerSide(loadBalancerSvc);
assertThrows(
"The Flink cluster " + CLUSTER_ID + " already exists.",
ClusterDeploymentException.class,
() -> descriptor.deployApplicationCluster(clusterSpecification, appConfig));
assertThatThrownBy(
() -> descriptor.deployApplicationCluster(clusterSpecification, appConfig))
.satisfies(
cause ->
assertThat(cause)
.isInstanceOf(ClusterDeploymentException.class)
.hasMessageContaining(
"The Flink cluster "
+ CLUSTER_ID
+ " already exists."));
}

@Test
public void testDeployApplicationClusterWithDeploymentTargetNotCorrectlySet() {
void testDeployApplicationClusterWithDeploymentTargetNotCorrectlySet() {
flinkConfig.set(
PipelineOptions.JARS, Collections.singletonList("local:///path/of/user.jar"));
flinkConfig.set(DeploymentOptions.TARGET, KubernetesDeploymentTarget.SESSION.getName());
assertThrows(
"Expected deployment.target=kubernetes-application",
ClusterDeploymentException.class,
() -> descriptor.deployApplicationCluster(clusterSpecification, appConfig));
assertThatThrownBy(
() -> descriptor.deployApplicationCluster(clusterSpecification, appConfig))
.satisfies(
cause ->
assertThat(cause)
.isInstanceOf(ClusterDeploymentException.class)
.hasMessageContaining(
"Expected deployment.target=kubernetes-application"));
}

@Test
public void testDeployApplicationClusterWithMultipleJarsSet() {
void testDeployApplicationClusterWithMultipleJarsSet() {
flinkConfig.set(
PipelineOptions.JARS,
Arrays.asList("local:///path/of/user.jar", "local:///user2.jar"));
flinkConfig.set(DeploymentOptions.TARGET, KubernetesDeploymentTarget.APPLICATION.getName());
assertThrows(
"Should only have one jar",
IllegalArgumentException.class,
() -> descriptor.deployApplicationCluster(clusterSpecification, appConfig));
assertThatThrownBy(
() -> descriptor.deployApplicationCluster(clusterSpecification, appConfig))
.satisfies(
cause ->
assertThat(cause)
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Should only have one jar"));
}

@Test
public void testDeployApplicationClusterWithClusterIP() throws Exception {
void testDeployApplicationClusterWithClusterIP() throws Exception {
flinkConfig.set(
PipelineOptions.JARS, Collections.singletonList("local:///path/of/user.jar"));
flinkConfig.set(DeploymentOptions.TARGET, KubernetesDeploymentTarget.APPLICATION.getName());
Expand All @@ -209,9 +219,8 @@ public void testDeployApplicationClusterWithClusterIP() throws Exception {

final String address = CLUSTER_ID + Constants.FLINK_REST_SERVICE_SUFFIX + "." + NAMESPACE;
final int port = flinkConfig.get(RestOptions.PORT);
assertThat(
clusterClient.getWebInterfaceURL(),
is(String.format("http://%s:%d", address, port)));
assertThat(clusterClient.getWebInterfaceURL())
.isEqualTo(String.format("http://%s:%d", address, port));
}

private ClusterClientProvider<String> deploySessionCluster() throws ClusterDeploymentException {
Expand All @@ -220,44 +229,42 @@ private ClusterClientProvider<String> deploySessionCluster() throws ClusterDeplo
}

private void checkClusterClient(ClusterClient<String> clusterClient) {
assertEquals(CLUSTER_ID, clusterClient.getClusterId());
assertThat(clusterClient.getClusterId()).isEqualTo(CLUSTER_ID);
// Both HA and non-HA mode, the web interface should always be the Kubernetes exposed
// service address.
assertEquals(
String.format("http://%s:%d", MOCK_SERVICE_IP, REST_PORT),
clusterClient.getWebInterfaceURL());
assertThat(clusterClient.getWebInterfaceURL())
.isEqualTo(String.format("http://%s:%d", MOCK_SERVICE_IP, REST_PORT));
}

private void checkUpdatedConfigAndResourceSetting() {
// Check updated flink config options
assertEquals(
String.valueOf(Constants.BLOB_SERVER_PORT),
flinkConfig.getString(BlobServerOptions.PORT));
assertEquals(
String.valueOf(Constants.TASK_MANAGER_RPC_PORT),
flinkConfig.getString(TaskManagerOptions.RPC_PORT));
assertEquals(
InternalServiceDecorator.getNamespacedInternalServiceName(CLUSTER_ID, NAMESPACE),
flinkConfig.getString(JobManagerOptions.ADDRESS));
assertThat(flinkConfig.getString(BlobServerOptions.PORT))
.isEqualTo(String.valueOf(Constants.BLOB_SERVER_PORT));
assertThat(flinkConfig.getString(TaskManagerOptions.RPC_PORT))
.isEqualTo(String.valueOf(Constants.TASK_MANAGER_RPC_PORT));
assertThat(flinkConfig.getString(JobManagerOptions.ADDRESS))
.isEqualTo(
InternalServiceDecorator.getNamespacedInternalServiceName(
CLUSTER_ID, NAMESPACE));

final Deployment jmDeployment = kubeClient.apps().deployments().list().getItems().get(0);

final Container jmContainer =
jmDeployment.getSpec().getTemplate().getSpec().getContainers().get(0);

assertEquals(
String.valueOf(clusterSpecification.getMasterMemoryMB()),
jmContainer
.getResources()
.getRequests()
.get(Constants.RESOURCE_NAME_MEMORY)
.getAmount());
assertEquals(
String.valueOf(clusterSpecification.getMasterMemoryMB()),
jmContainer
.getResources()
.getLimits()
.get(Constants.RESOURCE_NAME_MEMORY)
.getAmount());
assertThat(
jmContainer
.getResources()
.getRequests()
.get(Constants.RESOURCE_NAME_MEMORY)
.getAmount())
.isEqualTo(String.valueOf(clusterSpecification.getMasterMemoryMB()));
assertThat(
jmContainer
.getResources()
.getLimits()
.get(Constants.RESOURCE_NAME_MEMORY)
.getAmount())
.isEqualTo(String.valueOf(clusterSpecification.getMasterMemoryMB()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,21 @@
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
import org.apache.flink.kubernetes.kubeclient.FlinkKubeClientFactory;
import org.apache.flink.util.StringUtils;

import org.junit.Assume;
import org.junit.jupiter.api.extension.AfterAllCallback;
import org.junit.jupiter.api.extension.BeforeAllCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.rules.ExternalResource;

import static org.assertj.core.api.Assumptions.assumeThat;

/**
* {@link ExternalResource} which has a configured real Kubernetes cluster and client. We assume
* that one already has a running Kubernetes cluster. And all the ITCases assume that the
* environment ITCASE_KUBECONFIG is set with a valid kube config file. In the E2E tests, we will use
* a minikube for the testing.
*/
public class KubernetesResource extends ExternalResource {
public class KubernetesExtension implements BeforeAllCallback, AfterAllCallback {

private static final String CLUSTER_ID = "flink-itcase-cluster";
private static final int KUBERNETES_TRANSACTIONAL_OPERATION_MAX_RETRIES = 100;
Expand All @@ -44,16 +47,15 @@ public class KubernetesResource extends ExternalResource {

public static void checkEnv() {
final String kubeConfigEnv = System.getenv("ITCASE_KUBECONFIG");
Assume.assumeTrue(
"ITCASE_KUBECONFIG environment is not set.",
!StringUtils.isNullOrWhitespaceOnly(kubeConfigEnv));
assumeThat(kubeConfigEnv)
.withFailMessage("ITCASE_KUBECONFIG environment is not set.")
.isNotBlank();
kubeConfigFile = kubeConfigEnv;
}

@Override
public void before() {
public void beforeAll(ExtensionContext extensionContext) throws Exception {
checkEnv();

configuration = new Configuration();
configuration.set(KubernetesConfigOptions.KUBE_CONFIG_FILE, kubeConfigFile);
configuration.setString(KubernetesConfigOptions.CLUSTER_ID, CLUSTER_ID);
Expand All @@ -65,8 +67,10 @@ public void before() {
}

@Override
public void after() {
flinkKubeClient.close();
public void afterAll(ExtensionContext extensionContext) throws Exception {
if (flinkKubeClient != null) {
flinkKubeClient.close();
}
}

public Configuration getConfiguration() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,7 @@
import java.util.List;
import java.util.stream.Collectors;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.assertj.core.api.Assertions.assertThat;

/**
* Utilities for the Kubernetes pod template tests. The create methods are to provide the init
Expand All @@ -53,7 +50,7 @@ public static File getPodTemplateFile() {
KubernetesPodTemplateTestUtils.class
.getClassLoader()
.getResource(TESTING_TEMPLATE_FILE_NAME);
assertThat(podTemplateUrl, not(nullValue()));
assertThat(podTemplateUrl).isNotNull();
return new File(podTemplateUrl.getPath());
}

Expand Down Expand Up @@ -120,7 +117,7 @@ public static Container getContainerWithName(PodSpec podSpec, String containerNa
podSpec.getContainers().stream()
.filter(e -> e.getName().equals(containerName))
.collect(Collectors.toList());
assertThat(containers.size(), is(1));
assertThat(containers).hasSize(1);
return containers.get(0);
}
}
Loading

0 comments on commit 6fc5f78

Please sign in to comment.