From efcf58b84214aeda6cf79319f182407cde7833a6 Mon Sep 17 00:00:00 2001 From: Mark Roberts Date: Tue, 6 May 2014 21:24:15 -0700 Subject: [PATCH] Attempt to fix travis build. Decrease complexity of service.py in favor of in memory logging. Address code review concerns --- kafka/client.py | 2 - kafka/common.py | 10 +++- kafka/consumer.py | 4 +- kafka/util.py | 4 +- test/fixtures.py | 4 -- test/service.py | 78 ++++++++++--------------------- test/test_consumer_integration.py | 2 +- 7 files changed, 38 insertions(+), 66 deletions(-) diff --git a/kafka/client.py b/kafka/client.py index 4870ab9c3..d0e07d072 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -6,8 +6,6 @@ from functools import partial from itertools import count -from kafka.common import * - from kafka.common import (TopicAndPartition, ConnectionError, FailedPayloadsError, PartitionUnavailableError, diff --git a/kafka/common.py b/kafka/common.py index d288b896f..d5155321e 100644 --- a/kafka/common.py +++ b/kafka/common.py @@ -121,11 +121,16 @@ class StaleControllerEpochError(BrokerResponseError): message = 'STALE_CONTROLLER_EPOCH' -class OffsetMetadataTooLarge(BrokerResponseError): +class OffsetMetadataTooLargeError(BrokerResponseError): errno = 12 message = 'OFFSET_METADATA_TOO_LARGE' +class StaleLeaderEpochCodeError(BrokerResponseError): + errno = 13 + message = 'STALE_LEADER_EPOCH_CODE' + + class KafkaUnavailableError(KafkaError): pass @@ -178,7 +183,8 @@ class ProtocolError(KafkaError): 9 : ReplicaNotAvailableError, 10 : MessageSizeTooLargeError, 11 : StaleControllerEpochError, - 12 : OffsetMetadataTooLarge, + 12 : OffsetMetadataTooLargeError, + 13 : StaleLeaderEpochCodeError, } def check_error(response): diff --git a/kafka/consumer.py b/kafka/consumer.py index 085f5e808..ef8fbdaac 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -430,12 +430,12 @@ def _fetch(self): # Put the message in our queue self.queue.put((partition, message)) self.fetch_offsets[partition] = message.offset + 1 - except ConsumerFetchSizeTooSmall as e: + except ConsumerFetchSizeTooSmall: if (self.max_buffer_size is not None and self.buffer_size == self.max_buffer_size): log.error("Max fetch size %d too small", self.max_buffer_size) - raise e + raise if self.max_buffer_size is None: self.buffer_size *= 2 else: diff --git a/kafka/util.py b/kafka/util.py index 0577a88dd..a9182346b 100644 --- a/kafka/util.py +++ b/kafka/util.py @@ -1,6 +1,6 @@ -import sys -import struct import collections +import struct +import sys from threading import Thread, Event from kafka.common import BufferUnderflowError diff --git a/test/fixtures.py b/test/fixtures.py index df6faec28..df8cd42b3 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -98,8 +98,6 @@ def open(self): "org.apache.zookeeper.server.quorum.QuorumPeerMain", properties )) - self.child.configure_stdout(os.path.join(self.tmp_dir, "stdout.txt")) - self.child.configure_stderr(os.path.join(self.tmp_dir, "stderr.txt")) # Party! self.out("Starting...") @@ -180,8 +178,6 @@ def open(self): self.child = SpawnedService(self.kafka_run_class_args( "kafka.Kafka", properties )) - self.child.configure_stdout(os.path.join(self.tmp_dir, "stdout.txt")) - self.child.configure_stderr(os.path.join(self.tmp_dir, "stderr.txt")) # Party! self.out("Creating Zookeeper chroot node...") diff --git a/test/service.py b/test/service.py index 78a5f2453..8872c8244 100644 --- a/test/service.py +++ b/test/service.py @@ -1,3 +1,4 @@ +import logging import re import select import subprocess @@ -29,43 +30,15 @@ def __init__(self, args=[]): threading.Thread.__init__(self) self.args = args - self.captured_stdout = "" - self.captured_stderr = "" - self.stdout_file = None - self.stderr_file = None - self.capture_stdout = True - self.capture_stderr = True - self.show_stdout = True - self.show_stderr = True + self.captured_stdout = [] + self.captured_stderr = [] self.should_die = threading.Event() - def configure_stdout(self, file=None, capture=True, show=False): - self.stdout_file = file - self.capture_stdout = capture - self.show_stdout = show - - def configure_stderr(self, file=None, capture=False, show=False): - self.stderr_file = file - self.capture_stderr = capture - self.show_stderr = show - def run(self): - stdout_handle = None - stderr_handle = None - try: - if self.stdout_file: - stdout_handle = open(self.stdout_file, "w") - if self.stderr_file: - stderr_handle = open(self.stderr_file, "w") - self.run_with_handles(stdout_handle, stderr_handle) - finally: - if stdout_handle: - stdout_handle.close() - if stderr_handle: - stderr_handle.close() - - def run_with_handles(self, stdout_handle, stderr_handle): + self.run_with_handles() + + def run_with_handles(self): self.child = subprocess.Popen( self.args, bufsize=1, @@ -78,35 +51,32 @@ def run_with_handles(self, stdout_handle, stderr_handle): if self.child.stdout in rds: line = self.child.stdout.readline() - if stdout_handle: - stdout_handle.write(line) - stdout_handle.flush() - if self.capture_stdout: - self.captured_stdout += line - if self.show_stdout: - sys.stdout.write(line) - sys.stdout.flush() + self.captured_stdout.append(line) if self.child.stderr in rds: line = self.child.stderr.readline() - if stderr_handle: - stderr_handle.write(line) - stderr_handle.flush() - if self.capture_stderr: - self.captured_stderr += line - if self.show_stderr: - sys.stderr.write(line) - sys.stderr.flush() + self.captured_stderr.append(line) if self.should_die.is_set(): self.child.terminate() alive = False - if self.child.poll() is not None: + poll_results = self.child.poll() + if poll_results is not None: if not alive: break else: - raise RuntimeError("Subprocess has died. Aborting.") + self.dump_logs() + raise RuntimeError("Subprocess has died. Aborting. (args=%s)" % ' '.join(str(x) for x in self.args)) + + def dump_logs(self): + logging.critical('stderr') + for line in self.captured_stderr: + logging.critical(line.rstrip()) + + logging.critical('stdout') + for line in self.captured_stdout: + logging.critical(line.rstrip()) def wait_for(self, pattern, timeout=10): t1 = time.time() @@ -117,11 +87,13 @@ def wait_for(self, pattern, timeout=10): self.child.kill() except: logging.exception("Received exception when killing child process") + self.dump_logs() + raise RuntimeError("Waiting for %r timed out" % pattern) - if re.search(pattern, self.captured_stdout, re.IGNORECASE) is not None: + if re.search(pattern, '\n'.join(self.captured_stdout), re.IGNORECASE) is not None: return - if re.search(pattern, self.captured_stderr, re.IGNORECASE) is not None: + if re.search(pattern, '\n'.join(self.captured_stderr), re.IGNORECASE) is not None: return time.sleep(0.1) diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index 9300021a7..da2faf6a0 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -20,7 +20,7 @@ def setUpClass(cls): cls.server = cls.server1 # Bootstrapping server @classmethod - def tearDownClass(cls): # noqa + def tearDownClass(cls): if not os.environ.get('KAFKA_VERSION'): return