Skip to content

Commit

Permalink
Various docstring / pep8 / code hygiene cleanups
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffwidman committed Nov 19, 2018
1 parent 8dab14b commit 24f4131
Showing 1 changed file with 86 additions and 71 deletions.
157 changes: 86 additions & 71 deletions kafka/admin/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@
from kafka.structs import TopicPartition, OffsetAndMetadata
from kafka.version import __version__


log = logging.getLogger(__name__)


class KafkaAdmin(object):
"""An class for administering the kafka cluster.
"""A class for administering the Kafka cluster.
Warning:
This is an unstable interface that was recently added and is subject to
Expand All @@ -35,10 +37,9 @@ class KafkaAdmin(object):
The KafkaAdmin class will negotiate for the latest version of each message
protocol format supported by both the kafka-python client library and the
kafka broker. Usage of optional fields from protocol versions that are not
Kafka broker. Usage of optional fields from protocol versions that are not
supported by the broker will result in IncompatibleBrokerVersion exceptions.
Use of this class requires a minimum broker version >= 0.10.0.0.
Keyword Arguments:
Expand Down Expand Up @@ -167,16 +168,16 @@ class KafkaAdmin(object):
'sasl_kerberos_service_name': 'kafka',

# metrics configs
'metric_reporters' : [],
'metric_reporters': [],
'metrics_num_samples': 2,
'metrics_sample_window_ms': 30000,
}

def __init__(self, **configs):
log.debug("Starting Kafka administration interface")
log.debug("Starting KafkaAdmin interface.")
extra_configs = set(configs).difference(self.DEFAULT_CONFIG)
if extra_configs:
raise KafkaConfigurationError("Unrecognized configs: %s" % (extra_configs,))
raise KafkaConfigurationError("Unrecognized configs: {}".format(extra_configs))

self.config = copy.copy(self.DEFAULT_CONFIG)
self.config.update(configs)
Expand All @@ -189,55 +190,59 @@ def __init__(self, **configs):
reporters = [reporter() for reporter in self.config['metric_reporters']]
self._metrics = Metrics(metric_config, reporters)

self._client = KafkaClient(metrics=self._metrics, metric_group_prefix='admin',
**self.config)
self._client = KafkaClient(metrics=self._metrics,
metric_group_prefix='admin',
**self.config)

# Get auto-discovered version from client if necessary
if self.config['api_version'] is None:
self.config['api_version'] = self._client.config['api_version']

self._closed = False
self._refresh_controller_id()
log.debug('Kafka administration interface started')
log.debug("KafkaAdmin interface started.")

def close(self):
"""Close the administration connection to the kafka broker"""
"""Close the KafkaAdmin connection to the Kafka broker."""
if not hasattr(self, '_closed') or self._closed:
log.info('Kafka administration interface already closed')
log.info("KafkaAdmin interface already closed.")
return

self._metrics.close()
self._client.close()
self._closed = True
log.debug('Kafka administration interface has closed')
log.debug("KafkaAdmin interface has closed.")

def _matching_api_version(self, operation):
"""Find matching api version, the lesser of either the latest api version the library supports, or
the max version supported by the broker
"""Find the latest version of the protocol operation supported by both
this library and the broker.
This resolves to the lesser of either the latest api version this
library supports, or the max version supported by the broker.
:param operation: An operation array from kafka.protocol
:return: The max matching version number between client and broker
:param operation: A list of protocol operation versions from kafka.protocol.
:return: The max matching version number between client and broker.
"""
version = min(len(operation) - 1,
self._client.get_api_versions()[operation[0].API_KEY][1])
if version < self._client.get_api_versions()[operation[0].API_KEY][0]:
# max library version is less than min broker version. Not sure any brokers
# actually set a min version greater than 0 right now, tho. But maybe in the future?
# max library version is less than min broker version. Currently,
# no Kafka versions specify a min msg version. Maybe in the future?
raise IncompatibleBrokerVersion(
"No version of the '{}' kafka protocol is supported by both the client and broker."
"No version of the '{}' Kafka protocol is supported by both the client and broker."
.format(operation.__name__))
return version

def _validate_timeout(self, timeout_ms):
"""Validate the timeout is set or use the configuration default
"""Validate the timeout is set or use the configuration default.
:param timeout_ms: The timeout provided by api call, in milliseconds
:return: The timeout to use for the operation
:param timeout_ms: The timeout provided by api call, in milliseconds.
:return: The timeout to use for the operation.
"""
return timeout_ms or self.config['request_timeout_ms']

def _refresh_controller_id(self):
"""Determine the kafka cluster controller."""
"""Determine the Kafka cluster controller."""
version = self._matching_api_version(MetadataRequest)
if 1 <= version <= 6:
request = MetadataRequest[version]()
Expand Down Expand Up @@ -293,31 +298,34 @@ def _find_group_coordinator_id(self, group_id):
assert group_coordinator != -1
return group_coordinator

def _send_request_to_node(self, node, request):
"""Send a kafka protocol message to a specific broker. Will block until the message result is received.
def _send_request_to_node(self, node_id, request):
"""Send a Kafka protocol message to a specific broker.
:param node: The broker id to which to send the message
:param request: The message to send
:return: The kafka protocol response for the message
:exception: The exception if the message could not be sent
Will block until the message result is received.
:param node_id: The broker id to which to send the message.
:param request: The message to send.
:return: The Kafka protocol response for the message.
:exception: The exception if the message could not be sent.
"""
while not self._client.ready(node):
# connection to broker not ready, poll until it is or send will fail with NodeNotReadyError
while not self._client.ready(node_id):
# poll until the connection to broker is ready, otherwise send()
# will fail with NodeNotReadyError
self._client.poll()
future = self._client.send(node, request)
future = self._client.send(node_id, request)
self._client.poll(future=future)
if future.succeeded():
return future.value
else:
raise future.exception # pylint: disable-msg=raising-bad-type
raise future.exception # pylint: disable-msg=raising-bad-type

def _send_request_to_controller(self, request):
"""Send a kafka protocol message to the cluster controller.
"""Send a Kafka protocol message to the cluster controller.
Will block until the message result is received.
:param request: The message to send
:return: The kafka protocol response for the message
:param request: The message to send.
:return: The Kafka protocol response for the message.
"""
tries = 2 # in case our cached self._controller_id is outdated
while tries:
Expand Down Expand Up @@ -357,11 +365,12 @@ def _convert_new_topic_request(new_topic):
def create_topics(self, new_topics, timeout_ms=None, validate_only=False):
"""Create new topics in the cluster.
:param new_topics: Array of NewTopic objects
:param timeout_ms: Milliseconds to wait for new topics to be created before broker returns
:param new_topics: A list of NewTopic objects.
:param timeout_ms: Milliseconds to wait for new topics to be created
before the broker returns.
:param validate_only: If True, don't actually create new topics.
Not supported by all versions. Default: False
:return: Appropriate version of CreateTopicResponse class
:return: Appropriate version of CreateTopicResponse class.
"""
version = self._matching_api_version(CreateTopicsRequest)
timeout_ms = self._validate_timeout(timeout_ms)
Expand All @@ -371,40 +380,44 @@ def create_topics(self, new_topics, timeout_ms=None, validate_only=False):
"validate_only requires CreateTopicsRequest >= v1, which is not supported by Kafka {}."
.format(self.config['api_version']))
request = CreateTopicsRequest[version](
create_topic_requests = [self._convert_new_topic_request(new_topic) for new_topic in new_topics],
timeout = timeout_ms
create_topic_requests=[self._convert_new_topic_request(new_topic) for new_topic in new_topics],
timeout=timeout_ms
)
elif version <= 2:
request = CreateTopicsRequest[version](
create_topic_requests = [self._convert_new_topic_request(new_topic) for new_topic in new_topics],
timeout = timeout_ms,
validate_only = validate_only
create_topic_requests=[self._convert_new_topic_request(new_topic) for new_topic in new_topics],
timeout=timeout_ms,
validate_only=validate_only
)
else:
raise NotImplementedError(
"Support for CreateTopics v{} has not yet been added to KafkaAdmin."
.format(version))
# TODO convert structs to a more pythonic interface
# TODO raise exceptions if errors
return self._send_request_to_controller(request)

def delete_topics(self, topics, timeout_ms=None):
"""Delete topics from the cluster
"""Delete topics from the cluster.
:param topics: Array of topic name strings
:param timeout_ms: Milliseconds to wait for topics to be deleted before broker returns
:return: Appropriate version of DeleteTopicsResponse class
:param topics: A list of topic name strings.
:param timeout_ms: Milliseconds to wait for topics to be deleted
before the broker returns.
:return: Appropriate version of DeleteTopicsResponse class.
"""
version = self._matching_api_version(DeleteTopicsRequest)
timeout_ms = self._validate_timeout(timeout_ms)
if version <= 1:
request = DeleteTopicsRequest[version](
topics = topics,
timeout = timeout_ms
topics=topics,
timeout=timeout_ms
)
response = self._send_request_to_controller(request)
else:
raise NotImplementedError(
"Support for DeleteTopics v{} has not yet been added to KafkaAdmin."
.format(version))
return self._send_request_to_controller(request)
return response

# list topics functionality is in ClusterMetadata
# Note: if implemented here, send the request to the least_loaded_node()
Expand Down Expand Up @@ -435,14 +448,15 @@ def _convert_describe_config_resource_request(config_resource):
)

def describe_configs(self, config_resources, include_synonyms=False):
"""Fetch configuration parameters for one or more kafka resources.
"""Fetch configuration parameters for one or more Kafka resources.
:param config_resources: An array of ConfigResource objects.
Any keys in ConfigResource.configs dict will be used to filter the result. The configs dict should be None
to get all values. An empty dict will get zero values (as per kafka protocol).
:param include_synonyms: If True, return synonyms in response. Not
:param config_resources: An list of ConfigResource objects.
Any keys in ConfigResource.configs dict will be used to filter the
result. Setting the configs dict to None will get all values. An
empty dict will get zero values (as per Kafka protocol).
:param include_synonyms: If True, return synonyms in response. Not
supported by all versions. Default: False.
:return: Appropriate version of DescribeConfigsResponse class
:return: Appropriate version of DescribeConfigsResponse class.
"""
version = self._matching_api_version(DescribeConfigsRequest)
if version == 0:
Expand All @@ -451,12 +465,12 @@ def describe_configs(self, config_resources, include_synonyms=False):
"include_synonyms requires DescribeConfigsRequest >= v1, which is not supported by Kafka {}."
.format(self.config['api_version']))
request = DescribeConfigsRequest[version](
resources = [self._convert_describe_config_resource_request(config_resource) for config_resource in config_resources]
resources=[self._convert_describe_config_resource_request(config_resource) for config_resource in config_resources]
)
elif version <= 1:
elif version == 1:
request = DescribeConfigsRequest[version](
resources = [self._convert_describe_config_resource_request(config_resource) for config_resource in config_resources],
include_synonyms = include_synonyms
resources=[self._convert_describe_config_resource_request(config_resource) for config_resource in config_resources],
include_synonyms=include_synonyms
)
else:
raise NotImplementedError(
Expand All @@ -475,21 +489,21 @@ def _convert_alter_config_resource_request(config_resource):
)

def alter_configs(self, config_resources):
"""Alter configuration parameters of one or more kafka resources.
"""Alter configuration parameters of one or more Kafka resources.
Warning:
This is currently broken for BROKER resources because those must be
sent to that specific broker, versus this always picks the
least-loaded node. See the comment in the source code for details.
We would happily accept a PR fixing this.
:param config_resources: An array of ConfigResource objects.
:return: Appropriate version of AlterConfigsResponse class
:param config_resources: A list of ConfigResource objects.
:return: Appropriate version of AlterConfigsResponse class.
"""
version = self._matching_api_version(AlterConfigsRequest)
if version == 0:
request = AlterConfigsRequest[version](
resources = [self._convert_alter_config_resource_request(config_resource) for config_resource in config_resources]
resources=[self._convert_alter_config_resource_request(config_resource) for config_resource in config_resources]
)
else:
raise NotImplementedError(
Expand Down Expand Up @@ -522,19 +536,20 @@ def _convert_create_partitions_request(topic_name, new_partitions):
def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=False):
"""Create additional partitions for an existing topic.
:param topic_partitions: A map of topic name strings to NewPartition objects
:param timeout_ms: Milliseconds to wait for new partitions to be created before broker returns
:param topic_partitions: A map of topic name strings to NewPartition objects.
:param timeout_ms: Milliseconds to wait for new partitions to be
created before the broker returns.
:param validate_only: If True, don't actually create new partitions.
Default: False
:return: Appropriate version of CreatePartitionsResponse class
:return: Appropriate version of CreatePartitionsResponse class.
"""
version = self._matching_api_version(CreatePartitionsRequest)
timeout_ms = self._validate_timeout(timeout_ms)
if version == 0:
request = CreatePartitionsRequest[version](
topic_partitions = [self._convert_create_partitions_request(topic_name, new_partitions) for topic_name, new_partitions in topic_partitions.items()],
timeout = timeout_ms,
validate_only = validate_only
topic_partitions=[self._convert_create_partitions_request(topic_name, new_partitions) for topic_name, new_partitions in topic_partitions.items()],
timeout=timeout_ms,
validate_only=validate_only
)
else:
raise NotImplementedError(
Expand Down

0 comments on commit 24f4131

Please sign in to comment.