Skip to content

Commit

Permalink
Merge branch 'exception_refactor' into add_tests
Browse files Browse the repository at this point in the history
  • Loading branch information
wizzat committed May 1, 2014
2 parents f6f298f + 636778a commit 4d9236b
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 54 deletions.
36 changes: 17 additions & 19 deletions kafka/client.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
import copy
import logging
import collections

import kafka.common

from collections import defaultdict
from functools import partial
from itertools import count
from kafka.common import *

from kafka.common import (ErrorMapping, ErrorStrings, TopicAndPartition,
from kafka.common import (TopicAndPartition,
ConnectionError, FailedPayloadsError,
BrokerResponseError, PartitionUnavailableError,
LeaderUnavailableError,
KafkaUnavailableError)
PartitionUnavailableError,
LeaderUnavailableError, KafkaUnavailableError,
UnknownTopicOrPartitionError, NotLeaderForPartitionError)

from kafka.conn import collect_hosts, KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS
from kafka.protocol import KafkaProtocol
Expand Down Expand Up @@ -39,6 +42,7 @@ def __init__(self, hosts, client_id=CLIENT_ID,
self.topic_partitions = {} # topic_id -> [0, 1, 2, ...]
self.load_metadata_for_topics() # bootstrap with all metadata


##################
# Private API #
##################
Expand Down Expand Up @@ -92,10 +96,9 @@ def _send_broker_unaware_request(self, requestId, request):
conn.send(requestId, request)
response = conn.recv(requestId)
return response
except Exception, e:
except Exception as e:
log.warning("Could not send request [%r] to server %s:%i, "
"trying next server: %s" % (request, host, port, e))
continue

raise KafkaUnavailableError("All servers failed to process request")

Expand Down Expand Up @@ -123,7 +126,7 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):

# Group the requests by topic+partition
original_keys = []
payloads_by_broker = defaultdict(list)
payloads_by_broker = collections.defaultdict(list)

for payload in payloads:
leader = self._get_leader_for_partition(payload.topic,
Expand Down Expand Up @@ -157,11 +160,11 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
continue
try:
response = conn.recv(requestId)
except ConnectionError, e:
except ConnectionError as e:
log.warning("Could not receive response to request [%s] "
"from server %s: %s", request, conn, e)
failed = True
except ConnectionError, e:
except ConnectionError as e:
log.warning("Could not send request [%s] to server %s: %s",
request, conn, e)
failed = True
Expand All @@ -184,16 +187,11 @@ def __repr__(self):
return '<KafkaClient client_id=%s>' % (self.client_id)

def _raise_on_response_error(self, resp):
if resp.error == ErrorMapping.NO_ERROR:
return

if resp.error in (ErrorMapping.UNKNOWN_TOPIC_OR_PARTITON,
ErrorMapping.NOT_LEADER_FOR_PARTITION):
try:
kafka.common.check_error(resp)
except (UnknownTopicOrPartitionError, NotLeaderForPartitionError) as e:
self.reset_topic_metadata(resp.topic)

raise BrokerResponseError(
"Request for %s failed with errorcode=%d (%s)" %
(TopicAndPartition(resp.topic, resp.partition), resp.error, ErrorStrings[resp.error]))
raise

#################
# Public API #
Expand Down
114 changes: 89 additions & 25 deletions kafka/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,29 +48,6 @@
TopicAndPartition = namedtuple("TopicAndPartition", ["topic", "partition"])


ErrorStrings = {
-1 : 'UNKNOWN',
0 : 'NO_ERROR',
1 : 'OFFSET_OUT_OF_RANGE',
2 : 'INVALID_MESSAGE',
3 : 'UNKNOWN_TOPIC_OR_PARTITON',
4 : 'INVALID_FETCH_SIZE',
5 : 'LEADER_NOT_AVAILABLE',
6 : 'NOT_LEADER_FOR_PARTITION',
7 : 'REQUEST_TIMED_OUT',
8 : 'BROKER_NOT_AVAILABLE',
9 : 'REPLICA_NOT_AVAILABLE',
10 : 'MESSAGE_SIZE_TOO_LARGE',
11 : 'STALE_CONTROLLER_EPOCH',
12 : 'OFFSET_METADATA_TOO_LARGE',
}

class ErrorMapping(object):
pass

for k, v in ErrorStrings.items():
setattr(ErrorMapping, v, k)

#################
# Exceptions #
#################
Expand All @@ -80,11 +57,76 @@ class KafkaError(RuntimeError):
pass


class KafkaUnavailableError(KafkaError):
class BrokerResponseError(KafkaError):
pass


class BrokerResponseError(KafkaError):
class UnknownError(BrokerResponseError):
errno = -1
message = 'UNKNOWN'


class OffsetOutOfRangeError(BrokerResponseError):
errno = 1
message = 'OFFSET_OUT_OF_RANGE'


class InvalidMessageError(BrokerResponseError):
errno = 2
message = 'INVALID_MESSAGE'


class UnknownTopicOrPartitionError(BrokerResponseError):
errno = 3
message = 'UNKNOWN_TOPIC_OR_PARTITON'


class InvalidFetchRequestError(BrokerResponseError):
errno = 4
message = 'INVALID_FETCH_SIZE'


class LeaderNotAvailableError(BrokerResponseError):
errno = 5
message = 'LEADER_NOT_AVAILABLE'


class NotLeaderForPartitionError(BrokerResponseError):
errno = 6
message = 'NOT_LEADER_FOR_PARTITION'


class RequestTimedOutError(BrokerResponseError):
errno = 7
message = 'REQUEST_TIMED_OUT'


class BrokerNotAvailableError(BrokerResponseError):
errno = 8
message = 'BROKER_NOT_AVAILABLE'


class ReplicaNotAvailableError(BrokerResponseError):
errno = 9
message = 'REPLICA_NOT_AVAILABLE'


class MessageSizeTooLargeError(BrokerResponseError):
errno = 10
message = 'MESSAGE_SIZE_TOO_LARGE'


class StaleControllerEpochError(BrokerResponseError):
errno = 11
message = 'STALE_CONTROLLER_EPOCH'


class OffsetMetadataTooLarge(BrokerResponseError):
errno = 12
message = 'OFFSET_METADATA_TOO_LARGE'


class KafkaUnavailableError(KafkaError):
pass


Expand Down Expand Up @@ -122,3 +164,25 @@ class ConsumerNoMoreData(KafkaError):

class ProtocolError(KafkaError):
pass

kafka_errors = {
-1 : UnknownError,
1 : OffsetOutOfRangeError,
2 : InvalidMessageError,
3 : UnknownTopicOrPartitionError,
4 : InvalidFetchRequestError,
5 : LeaderNotAvailableError,
6 : NotLeaderForPartitionError,
7 : RequestTimedOutError,
8 : BrokerNotAvailableError,
9 : ReplicaNotAvailableError,
10 : MessageSizeTooLargeError,
11 : StaleControllerEpochError,
12 : OffsetMetadataTooLarge,
}

def check_error(response):
error = kafka_errors.get(response.error)
if error:
raise error(response)

18 changes: 8 additions & 10 deletions kafka/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@
from multiprocessing import Process, Queue as MPQueue, Event, Value
from Queue import Empty, Queue

import kafka
from kafka.common import (
ErrorMapping, FetchRequest,
FetchRequest,
OffsetRequest, OffsetCommitRequest,
OffsetFetchRequest,
ConsumerFetchSizeTooSmall, ConsumerNoMoreData
Expand Down Expand Up @@ -100,14 +101,11 @@ def __init__(self, client, group, topic, partitions=None, auto_commit=True,
self.commit_timer.start()

def get_or_init_offset_callback(resp):
if resp.error == ErrorMapping.NO_ERROR:
try:
kafka.common.check_error(resp)
return resp.offset
elif resp.error == ErrorMapping.UNKNOWN_TOPIC_OR_PARTITON:
except kafka.common.UnknownTopicOrPartitionError:
return 0
else:
raise ProtocolError("OffsetFetchRequest for topic=%s, "
"partition=%d failed with errorcode=%s" % (
resp.topic, resp.partition, resp.error))

if auto_commit:
for partition in partitions:
Expand Down Expand Up @@ -154,7 +152,7 @@ def commit(self, partitions=None):

resps = self.client.send_offset_commit_request(self.group, reqs)
for resp in resps:
assert resp.error == 0
kafka.common.check_error(resp)

self.count_since_commit = 0

Expand Down Expand Up @@ -432,7 +430,7 @@ def _fetch(self):
# Put the message in our queue
self.queue.put((partition, message))
self.fetch_offsets[partition] = message.offset + 1
except ConsumerFetchSizeTooSmall, e:
except ConsumerFetchSizeTooSmall as e:
if (self.max_buffer_size is not None and
self.buffer_size == self.max_buffer_size):
log.error("Max fetch size %d too small",
Expand All @@ -446,7 +444,7 @@ def _fetch(self):
log.warn("Fetch size too small, increase to %d (2x) "
"and retry", self.buffer_size)
retry_partitions.add(partition)
except ConsumerNoMoreData, e:
except ConsumerNoMoreData as e:
log.debug("Iteration was ended by %r", e)
except StopIteration:
# Stop iterating through this partition
Expand Down

0 comments on commit 4d9236b

Please sign in to comment.