From 0d57c2718fcf3819f2c18911126f245e9e9ce3e0 Mon Sep 17 00:00:00 2001 From: Mark Roberts Date: Wed, 30 Apr 2014 17:39:39 -0700 Subject: [PATCH 1/2] Make BrokerRequestError a base class, make subclasses for each broker error --- kafka/client.py | 36 +++++++-------- kafka/common.py | 114 ++++++++++++++++++++++++++++++++++++---------- kafka/consumer.py | 16 +++---- 3 files changed, 113 insertions(+), 53 deletions(-) diff --git a/kafka/client.py b/kafka/client.py index 65914a4b3..4870ab9c3 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -1,15 +1,18 @@ import copy import logging +import collections + +import kafka.common -from collections import defaultdict from functools import partial from itertools import count +from kafka.common import * -from kafka.common import (ErrorMapping, ErrorStrings, TopicAndPartition, +from kafka.common import (TopicAndPartition, ConnectionError, FailedPayloadsError, - BrokerResponseError, PartitionUnavailableError, - LeaderUnavailableError, - KafkaUnavailableError) + PartitionUnavailableError, + LeaderUnavailableError, KafkaUnavailableError, + UnknownTopicOrPartitionError, NotLeaderForPartitionError) from kafka.conn import collect_hosts, KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS from kafka.protocol import KafkaProtocol @@ -39,6 +42,7 @@ def __init__(self, hosts, client_id=CLIENT_ID, self.topic_partitions = {} # topic_id -> [0, 1, 2, ...] self.load_metadata_for_topics() # bootstrap with all metadata + ################## # Private API # ################## @@ -92,10 +96,9 @@ def _send_broker_unaware_request(self, requestId, request): conn.send(requestId, request) response = conn.recv(requestId) return response - except Exception, e: + except Exception as e: log.warning("Could not send request [%r] to server %s:%i, " "trying next server: %s" % (request, host, port, e)) - continue raise KafkaUnavailableError("All servers failed to process request") @@ -123,7 +126,7 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn): # Group the requests by topic+partition original_keys = [] - payloads_by_broker = defaultdict(list) + payloads_by_broker = collections.defaultdict(list) for payload in payloads: leader = self._get_leader_for_partition(payload.topic, @@ -157,11 +160,11 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn): continue try: response = conn.recv(requestId) - except ConnectionError, e: + except ConnectionError as e: log.warning("Could not receive response to request [%s] " "from server %s: %s", request, conn, e) failed = True - except ConnectionError, e: + except ConnectionError as e: log.warning("Could not send request [%s] to server %s: %s", request, conn, e) failed = True @@ -184,16 +187,11 @@ def __repr__(self): return '' % (self.client_id) def _raise_on_response_error(self, resp): - if resp.error == ErrorMapping.NO_ERROR: - return - - if resp.error in (ErrorMapping.UNKNOWN_TOPIC_OR_PARTITON, - ErrorMapping.NOT_LEADER_FOR_PARTITION): + try: + kafka.common.check_error(resp) + except (UnknownTopicOrPartitionError, NotLeaderForPartitionError) as e: self.reset_topic_metadata(resp.topic) - - raise BrokerResponseError( - "Request for %s failed with errorcode=%d (%s)" % - (TopicAndPartition(resp.topic, resp.partition), resp.error, ErrorStrings[resp.error])) + raise ################# # Public API # diff --git a/kafka/common.py b/kafka/common.py index 830e34db9..d288b896f 100644 --- a/kafka/common.py +++ b/kafka/common.py @@ -48,29 +48,6 @@ TopicAndPartition = namedtuple("TopicAndPartition", ["topic", "partition"]) -ErrorStrings = { - -1 : 'UNKNOWN', - 0 : 'NO_ERROR', - 1 : 'OFFSET_OUT_OF_RANGE', - 2 : 'INVALID_MESSAGE', - 3 : 'UNKNOWN_TOPIC_OR_PARTITON', - 4 : 'INVALID_FETCH_SIZE', - 5 : 'LEADER_NOT_AVAILABLE', - 6 : 'NOT_LEADER_FOR_PARTITION', - 7 : 'REQUEST_TIMED_OUT', - 8 : 'BROKER_NOT_AVAILABLE', - 9 : 'REPLICA_NOT_AVAILABLE', - 10 : 'MESSAGE_SIZE_TOO_LARGE', - 11 : 'STALE_CONTROLLER_EPOCH', - 12 : 'OFFSET_METADATA_TOO_LARGE', -} - -class ErrorMapping(object): - pass - -for k, v in ErrorStrings.items(): - setattr(ErrorMapping, v, k) - ################# # Exceptions # ################# @@ -80,11 +57,76 @@ class KafkaError(RuntimeError): pass -class KafkaUnavailableError(KafkaError): +class BrokerResponseError(KafkaError): pass -class BrokerResponseError(KafkaError): +class UnknownError(BrokerResponseError): + errno = -1 + message = 'UNKNOWN' + + +class OffsetOutOfRangeError(BrokerResponseError): + errno = 1 + message = 'OFFSET_OUT_OF_RANGE' + + +class InvalidMessageError(BrokerResponseError): + errno = 2 + message = 'INVALID_MESSAGE' + + +class UnknownTopicOrPartitionError(BrokerResponseError): + errno = 3 + message = 'UNKNOWN_TOPIC_OR_PARTITON' + + +class InvalidFetchRequestError(BrokerResponseError): + errno = 4 + message = 'INVALID_FETCH_SIZE' + + +class LeaderNotAvailableError(BrokerResponseError): + errno = 5 + message = 'LEADER_NOT_AVAILABLE' + + +class NotLeaderForPartitionError(BrokerResponseError): + errno = 6 + message = 'NOT_LEADER_FOR_PARTITION' + + +class RequestTimedOutError(BrokerResponseError): + errno = 7 + message = 'REQUEST_TIMED_OUT' + + +class BrokerNotAvailableError(BrokerResponseError): + errno = 8 + message = 'BROKER_NOT_AVAILABLE' + + +class ReplicaNotAvailableError(BrokerResponseError): + errno = 9 + message = 'REPLICA_NOT_AVAILABLE' + + +class MessageSizeTooLargeError(BrokerResponseError): + errno = 10 + message = 'MESSAGE_SIZE_TOO_LARGE' + + +class StaleControllerEpochError(BrokerResponseError): + errno = 11 + message = 'STALE_CONTROLLER_EPOCH' + + +class OffsetMetadataTooLarge(BrokerResponseError): + errno = 12 + message = 'OFFSET_METADATA_TOO_LARGE' + + +class KafkaUnavailableError(KafkaError): pass @@ -122,3 +164,25 @@ class ConsumerNoMoreData(KafkaError): class ProtocolError(KafkaError): pass + +kafka_errors = { + -1 : UnknownError, + 1 : OffsetOutOfRangeError, + 2 : InvalidMessageError, + 3 : UnknownTopicOrPartitionError, + 4 : InvalidFetchRequestError, + 5 : LeaderNotAvailableError, + 6 : NotLeaderForPartitionError, + 7 : RequestTimedOutError, + 8 : BrokerNotAvailableError, + 9 : ReplicaNotAvailableError, + 10 : MessageSizeTooLargeError, + 11 : StaleControllerEpochError, + 12 : OffsetMetadataTooLarge, +} + +def check_error(response): + error = kafka_errors.get(response.error) + if error: + raise error(response) + diff --git a/kafka/consumer.py b/kafka/consumer.py index 98f18a09e..43b8797df 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -8,8 +8,9 @@ from multiprocessing import Process, Queue as MPQueue, Event, Value from Queue import Empty, Queue +import kafka from kafka.common import ( - ErrorMapping, FetchRequest, + FetchRequest, OffsetRequest, OffsetCommitRequest, OffsetFetchRequest, ConsumerFetchSizeTooSmall, ConsumerNoMoreData @@ -100,14 +101,11 @@ def __init__(self, client, group, topic, partitions=None, auto_commit=True, self.commit_timer.start() def get_or_init_offset_callback(resp): - if resp.error == ErrorMapping.NO_ERROR: + try: + kafka.common.check_error(resp) return resp.offset - elif resp.error == ErrorMapping.UNKNOWN_TOPIC_OR_PARTITON: + except kafka.common.UnknownTopicOrPartitionError: return 0 - else: - raise ProtocolError("OffsetFetchRequest for topic=%s, " - "partition=%d failed with errorcode=%s" % ( - resp.topic, resp.partition, resp.error)) if auto_commit: for partition in partitions: @@ -432,7 +430,7 @@ def _fetch(self): # Put the message in our queue self.queue.put((partition, message)) self.fetch_offsets[partition] = message.offset + 1 - except ConsumerFetchSizeTooSmall, e: + except ConsumerFetchSizeTooSmall as e: if (self.max_buffer_size is not None and self.buffer_size == self.max_buffer_size): log.error("Max fetch size %d too small", @@ -446,7 +444,7 @@ def _fetch(self): log.warn("Fetch size too small, increase to %d (2x) " "and retry", self.buffer_size) retry_partitions.add(partition) - except ConsumerNoMoreData, e: + except ConsumerNoMoreData as e: log.debug("Iteration was ended by %r", e) except StopIteration: # Stop iterating through this partition From 636778a11fa638a1a58d35af123c62d7c3d2adc2 Mon Sep 17 00:00:00 2001 From: Mark Roberts Date: Wed, 30 Apr 2014 18:09:45 -0700 Subject: [PATCH 2/2] Make commit() check for errors instead of simply assert no error --- kafka/consumer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/consumer.py b/kafka/consumer.py index 43b8797df..085f5e808 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -152,7 +152,7 @@ def commit(self, partitions=None): resps = self.client.send_offset_commit_request(self.group, reqs) for resp in resps: - assert resp.error == 0 + kafka.common.check_error(resp) self.count_since_commit = 0