Skip to content

Commit

Permalink
[FLINK-28353][k8s] Exclude unschedulable nodes when generating the no…
Browse files Browse the repository at this point in the history
…de port

This closes apache#20134.
  • Loading branch information
suxinglee authored and wangyang0918 committed Jul 26, 2022
1 parent 0d46660 commit 48acc1c
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ private Optional<Endpoint> getLoadBalancerRestEndpoint(
// only consider IPs with the configured address type.
address =
internalClient.nodes().list().getItems().stream()
.filter(
node ->
node.getSpec().getUnschedulable() == null
|| !node.getSpec().getUnschedulable())
.flatMap(node -> node.getStatus().getAddresses().stream())
.filter(
nodeAddress ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ public Optional<Endpoint> getRestEndpoint(
// only consider IPs with the configured address type.
address =
internalClient.nodes().list().getItems().stream()
.filter(
node ->
node.getSpec().getUnschedulable() == null
|| !node.getSpec().getUnschedulable())
.flatMap(node -> node.getStatus().getAddresses().stream())
.filter(
nodeAddress ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.fabric8.kubernetes.api.model.NodeAddressBuilder;
import io.fabric8.kubernetes.api.model.NodeBuilder;
import io.fabric8.kubernetes.api.model.NodeListBuilder;
import io.fabric8.kubernetes.api.model.NodeSpecBuilder;
import io.fabric8.kubernetes.api.model.NodeStatusBuilder;
import io.fabric8.kubernetes.api.model.Service;
import io.fabric8.kubernetes.api.model.ServiceBuilder;
Expand All @@ -43,6 +44,7 @@
import io.fabric8.mockwebserver.dsl.MockServerExpectation;
import io.fabric8.mockwebserver.dsl.ReturnOrWebsocketable;
import io.fabric8.mockwebserver.dsl.TimesOnceableOrHttpHeaderable;
import org.apache.commons.lang3.StringUtils;

import javax.annotation.Nullable;

Expand All @@ -66,9 +68,17 @@ protected void mockExpectedNodesFromServerSide(List<String> addresses) {
for (String address : addresses) {
final String[] parts = address.split(":");
Preconditions.checkState(
parts.length == 2, "Address should be in format \"<type>:<ip>\".");
parts.length == 3,
"Address should be in format \"<type>:<ip>:<unschedulable>\".");
nodes.add(
new NodeBuilder()
.withSpec(
new NodeSpecBuilder()
.withUnschedulable(
StringUtils.isBlank(parts[2])
? null
: Boolean.parseBoolean(parts[2]))
.build())
.withStatus(
new NodeStatusBuilder()
.withAddresses(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,9 +319,15 @@ private void testNodePortService(KubernetesConfigOptions.NodePortAddressType add
flinkConfig.set(
KubernetesConfigOptions.REST_SERVICE_EXPOSED_NODE_PORT_ADDRESS_TYPE, addressType);
final List<String> internalAddresses =
Arrays.asList("InternalIP:10.0.0.1", "InternalIP:10.0.0.2", "InternalIP:10.0.0.3");
Arrays.asList(
"InternalIP:10.0.0.1:true",
"InternalIP:10.0.0.2:false",
"InternalIP:10.0.0.3: ");
final List<String> externalAddresses =
Arrays.asList("ExternalIP:7.7.7.7", "ExternalIP:8.8.8.8", "ExternalIP:9.9.9.9");
Arrays.asList(
"ExternalIP:7.7.7.7:true",
"ExternalIP:8.8.8.8:false",
"ExternalIP:9.9.9.9: ");
final List<String> addresses = new ArrayList<>();
addresses.addAll(internalAddresses);
addresses.addAll(externalAddresses);
Expand All @@ -339,19 +345,22 @@ private void testNodePortService(KubernetesConfigOptions.NodePortAddressType add
case InternalIP:
expectedIps =
internalAddresses.stream()
.filter(s -> !"true".equals(s.split(":")[2]))
.map(s -> s.split(":")[1])
.collect(Collectors.toList());
break;
case ExternalIP:
expectedIps =
externalAddresses.stream()
.filter(s -> !"true".equals(s.split(":")[2]))
.map(s -> s.split(":")[1])
.collect(Collectors.toList());
break;
default:
throw new IllegalArgumentException(
String.format("Unexpected address type %s.", addressType));
}
assertThat(expectedIps.size()).isEqualTo(2);
assertThat(resultEndpoint.get().getAddress()).isIn(expectedIps);
assertThat(resultEndpoint.get().getPort()).isEqualTo(NODE_PORT);
}
Expand Down

0 comments on commit 48acc1c

Please sign in to comment.