Skip to content

Commit

Permalink
KAFKA-13582: TestVerifiableProducer.test_multiple_kraft_security_prot…
Browse files Browse the repository at this point in the history
…ocols fails (apache#11664)

KRaft brokers always use the first controller listener, so if there is not also a colocated KRaft controller on the node be sure to only publish one controller listener in `controller.listener.names` even when the inter-controller listener name differs.  System tests were failing due to unnecessarily publishing a second entry in `controller.listener.names` for a broker-only config and not also publishing a mapping for it in `listener.security.protocol.map`.  Removing the unnecessary entry in `controller.listener.names` solves the problem.

Reviewers: David Jacot <[email protected]>
  • Loading branch information
rondagostino authored Jan 10, 2022
1 parent aaa546d commit 1785e12
Showing 1 changed file with 11 additions and 6 deletions.
17 changes: 11 additions & 6 deletions tests/kafkatest/services/kafka/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,7 @@ def set_protocol_and_port(self, node):
advertised_listeners = []
protocol_map = []

controller_listener_names = self.controller_listener_name_list()
controller_listener_names = self.controller_listener_name_list(node)

for port in self.port_mappings.values():
if port.open:
Expand Down Expand Up @@ -758,12 +758,17 @@ def start_cmd(self, node):
KafkaService.STDOUT_STDERR_CAPTURE)
return cmd

def controller_listener_name_list(self):
def controller_listener_name_list(self, node):
if self.quorum_info.using_zk:
return []
broker_to_controller_listener_name = self.controller_listener_name(self.controller_quorum.controller_security_protocol)
return [broker_to_controller_listener_name] if (self.controller_quorum.intercontroller_security_protocol == self.controller_quorum.controller_security_protocol) \
else [broker_to_controller_listener_name, self.controller_listener_name(self.controller_quorum.intercontroller_security_protocol)]
# Brokers always use the first controller listener, so include a second, inter-controller listener if and only if:
# 1) the node is a controller node
# 2) the inter-controller listener name differs from the broker-to-controller listener name
return [broker_to_controller_listener_name, self.controller_listener_name(self.controller_quorum.intercontroller_security_protocol)] \
if (quorum.NodeQuorumInfo(self.quorum_info, node).has_controller_role and
self.controller_quorum.intercontroller_security_protocol != self.controller_quorum.controller_security_protocol) \
else [broker_to_controller_listener_name]

def start_node(self, node, timeout_sec=60):
if node not in self.nodes_to_start:
Expand All @@ -772,7 +777,7 @@ def start_node(self, node, timeout_sec=60):

self.node_quorum_info = quorum.NodeQuorumInfo(self.quorum_info, node)
if self.quorum_info.has_controllers:
for controller_listener in self.controller_listener_name_list():
for controller_listener in self.controller_listener_name_list(node):
if self.node_quorum_info.has_controller_role:
self.open_port(controller_listener)
else: # co-located case where node doesn't have a controller
Expand All @@ -793,7 +798,7 @@ def start_node(self, node, timeout_sec=60):
KafkaService.SECURITY_PROTOCOLS.index(security_protocol_to_use))
for node in self.controller_quorum.nodes[:self.controller_quorum.num_nodes_controller_role]])
# define controller.listener.names
self.controller_listener_names = ','.join(self.controller_listener_name_list())
self.controller_listener_names = ','.join(self.controller_listener_name_list(node))
# define sasl.mechanism.controller.protocol to match remote quorum if one exists
if self.remote_controller_quorum:
self.controller_sasl_mechanism = self.remote_controller_quorum.controller_sasl_mechanism
Expand Down

0 comments on commit 1785e12

Please sign in to comment.