Skip to content

Commit

Permalink
Attempt to fix travis build. Decrease complexity of service.py in fav…
Browse files Browse the repository at this point in the history
…or of in memory logging. Address code review concerns
  • Loading branch information
wizzat committed May 7, 2014
1 parent 99320fb commit efcf58b
Show file tree
Hide file tree
Showing 7 changed files with 38 additions and 66 deletions.
2 changes: 0 additions & 2 deletions kafka/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@

from functools import partial
from itertools import count
from kafka.common import *

from kafka.common import (TopicAndPartition,
ConnectionError, FailedPayloadsError,
PartitionUnavailableError,
Expand Down
10 changes: 8 additions & 2 deletions kafka/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,16 @@ class StaleControllerEpochError(BrokerResponseError):
message = 'STALE_CONTROLLER_EPOCH'


class OffsetMetadataTooLarge(BrokerResponseError):
class OffsetMetadataTooLargeError(BrokerResponseError):
errno = 12
message = 'OFFSET_METADATA_TOO_LARGE'


class StaleLeaderEpochCodeError(BrokerResponseError):
errno = 13
message = 'STALE_LEADER_EPOCH_CODE'


class KafkaUnavailableError(KafkaError):
pass

Expand Down Expand Up @@ -178,7 +183,8 @@ class ProtocolError(KafkaError):
9 : ReplicaNotAvailableError,
10 : MessageSizeTooLargeError,
11 : StaleControllerEpochError,
12 : OffsetMetadataTooLarge,
12 : OffsetMetadataTooLargeError,
13 : StaleLeaderEpochCodeError,
}

def check_error(response):
Expand Down
4 changes: 2 additions & 2 deletions kafka/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -430,12 +430,12 @@ def _fetch(self):
# Put the message in our queue
self.queue.put((partition, message))
self.fetch_offsets[partition] = message.offset + 1
except ConsumerFetchSizeTooSmall as e:
except ConsumerFetchSizeTooSmall:
if (self.max_buffer_size is not None and
self.buffer_size == self.max_buffer_size):
log.error("Max fetch size %d too small",
self.max_buffer_size)
raise e
raise
if self.max_buffer_size is None:
self.buffer_size *= 2
else:
Expand Down
4 changes: 2 additions & 2 deletions kafka/util.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import sys
import struct
import collections
import struct
import sys
from threading import Thread, Event

from kafka.common import BufferUnderflowError
Expand Down
4 changes: 0 additions & 4 deletions test/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,6 @@ def open(self):
"org.apache.zookeeper.server.quorum.QuorumPeerMain",
properties
))
self.child.configure_stdout(os.path.join(self.tmp_dir, "stdout.txt"))
self.child.configure_stderr(os.path.join(self.tmp_dir, "stderr.txt"))

# Party!
self.out("Starting...")
Expand Down Expand Up @@ -180,8 +178,6 @@ def open(self):
self.child = SpawnedService(self.kafka_run_class_args(
"kafka.Kafka", properties
))
self.child.configure_stdout(os.path.join(self.tmp_dir, "stdout.txt"))
self.child.configure_stderr(os.path.join(self.tmp_dir, "stderr.txt"))

# Party!
self.out("Creating Zookeeper chroot node...")
Expand Down
78 changes: 25 additions & 53 deletions test/service.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
import re
import select
import subprocess
Expand Down Expand Up @@ -29,43 +30,15 @@ def __init__(self, args=[]):
threading.Thread.__init__(self)

self.args = args
self.captured_stdout = ""
self.captured_stderr = ""
self.stdout_file = None
self.stderr_file = None
self.capture_stdout = True
self.capture_stderr = True
self.show_stdout = True
self.show_stderr = True
self.captured_stdout = []
self.captured_stderr = []

self.should_die = threading.Event()

def configure_stdout(self, file=None, capture=True, show=False):
self.stdout_file = file
self.capture_stdout = capture
self.show_stdout = show

def configure_stderr(self, file=None, capture=False, show=False):
self.stderr_file = file
self.capture_stderr = capture
self.show_stderr = show

def run(self):
stdout_handle = None
stderr_handle = None
try:
if self.stdout_file:
stdout_handle = open(self.stdout_file, "w")
if self.stderr_file:
stderr_handle = open(self.stderr_file, "w")
self.run_with_handles(stdout_handle, stderr_handle)
finally:
if stdout_handle:
stdout_handle.close()
if stderr_handle:
stderr_handle.close()

def run_with_handles(self, stdout_handle, stderr_handle):
self.run_with_handles()

def run_with_handles(self):
self.child = subprocess.Popen(
self.args,
bufsize=1,
Expand All @@ -78,35 +51,32 @@ def run_with_handles(self, stdout_handle, stderr_handle):

if self.child.stdout in rds:
line = self.child.stdout.readline()
if stdout_handle:
stdout_handle.write(line)
stdout_handle.flush()
if self.capture_stdout:
self.captured_stdout += line
if self.show_stdout:
sys.stdout.write(line)
sys.stdout.flush()
self.captured_stdout.append(line)

if self.child.stderr in rds:
line = self.child.stderr.readline()
if stderr_handle:
stderr_handle.write(line)
stderr_handle.flush()
if self.capture_stderr:
self.captured_stderr += line
if self.show_stderr:
sys.stderr.write(line)
sys.stderr.flush()
self.captured_stderr.append(line)

if self.should_die.is_set():
self.child.terminate()
alive = False

if self.child.poll() is not None:
poll_results = self.child.poll()
if poll_results is not None:
if not alive:
break
else:
raise RuntimeError("Subprocess has died. Aborting.")
self.dump_logs()
raise RuntimeError("Subprocess has died. Aborting. (args=%s)" % ' '.join(str(x) for x in self.args))

def dump_logs(self):
logging.critical('stderr')
for line in self.captured_stderr:
logging.critical(line.rstrip())

logging.critical('stdout')
for line in self.captured_stdout:
logging.critical(line.rstrip())

def wait_for(self, pattern, timeout=10):
t1 = time.time()
Expand All @@ -117,11 +87,13 @@ def wait_for(self, pattern, timeout=10):
self.child.kill()
except:
logging.exception("Received exception when killing child process")
self.dump_logs()

raise RuntimeError("Waiting for %r timed out" % pattern)

if re.search(pattern, self.captured_stdout, re.IGNORECASE) is not None:
if re.search(pattern, '\n'.join(self.captured_stdout), re.IGNORECASE) is not None:
return
if re.search(pattern, self.captured_stderr, re.IGNORECASE) is not None:
if re.search(pattern, '\n'.join(self.captured_stderr), re.IGNORECASE) is not None:
return
time.sleep(0.1)

Expand Down
2 changes: 1 addition & 1 deletion test/test_consumer_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def setUpClass(cls):
cls.server = cls.server1 # Bootstrapping server

@classmethod
def tearDownClass(cls): # noqa
def tearDownClass(cls):
if not os.environ.get('KAFKA_VERSION'):
return

Expand Down

0 comments on commit efcf58b

Please sign in to comment.