Skip to content

Commit

Permalink
Merge branch 'master' into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
mrtheb committed Feb 1, 2014
2 parents 8bcf0f0 + 4abf7ee commit 72fdf39
Show file tree
Hide file tree
Showing 4 changed files with 168 additions and 128 deletions.
26 changes: 13 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ Copyright 2013, David Arthur under Apache License, v2.0. See `LICENSE`

# Status

The current version of this package is **0.9.0** and is compatible with
The current version of this package is **0.9.0** and is compatible with
Kafka brokers running version **0.8.1**.

# Usage
Expand All @@ -32,24 +32,24 @@ from kafka.producer import SimpleProducer, KeyedProducer
kafka = KafkaClient("localhost", 9092)

# To send messages synchronously
producer = SimpleProducer(kafka, "my-topic")
producer.send_messages("some message")
producer.send_messages("this method", "is variadic")
producer = SimpleProducer(kafka)
producer.send_messages("my-topic", "some message")
producer.send_messages("my-topic", "this method", "is variadic")

# To send messages asynchronously
producer = SimpleProducer(kafka, "my-topic", async=True)
producer.send_messages("async message")
producer = SimpleProducer(kafka, async=True)
producer.send_messages("my-topic", "async message")

# To wait for acknowledgements
# ACK_AFTER_LOCAL_WRITE : server will wait till the data is written to
# a local log before sending response
# ACK_AFTER_CLUSTER_COMMIT : server will block until the message is committed
# by all in sync replicas before sending a response
producer = SimpleProducer(kafka, "my-topic", async=False,
producer = SimpleProducer(kafka, async=False,
req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE,
ack_timeout=2000)

response = producer.send_messages("async message")
response = producer.send_messages("my-topic", "async message")

if response:
print(response[0].error)
Expand All @@ -62,7 +62,7 @@ if response:
# Notes:
# * If the producer dies before the messages are sent, there will be losses
# * Call producer.stop() to send the messages and cleanup
producer = SimpleProducer(kafka, "my-topic", batch_send=True,
producer = SimpleProducer(kafka, batch_send=True,
batch_send_every_n=20,
batch_send_every_t=60)

Expand All @@ -83,11 +83,11 @@ from kafka.partitioner import HashedPartitioner, RoundRobinPartitioner
kafka = KafkaClient("localhost", 9092)

# HashedPartitioner is default
producer = KeyedProducer(kafka, "my-topic")
producer.send("key1", "some message")
producer.send("key2", "this methode")
producer = KeyedProducer(kafka)
producer.send("my-topic", "key1", "some message")
producer.send("my-topic", "key2", "this methode")

producer = KeyedProducer(kafka, "my-topic", partitioner=RoundRobinPartitioner)
producer = KeyedProducer(kafka, partitioner=RoundRobinPartitioner)
```

## Multiprocess consumer
Expand Down
109 changes: 70 additions & 39 deletions kafka/consumer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from __future__ import absolute_import

from collections import defaultdict
from itertools import izip_longest, repeat
import logging
import time
Expand Down Expand Up @@ -235,6 +234,12 @@ def __init__(self, client, group, topic, auto_commit=True, partitions=None,
buffer_size=FETCH_BUFFER_SIZE_BYTES,
max_buffer_size=MAX_FETCH_BUFFER_SIZE_BYTES,
iter_timeout=None):
super(SimpleConsumer, self).__init__(
client, group, topic,
partitions=partitions,
auto_commit=auto_commit,
auto_commit_every_n=auto_commit_every_n,
auto_commit_every_t=auto_commit_every_t)

if max_buffer_size is not None and buffer_size > max_buffer_size:
raise ValueError("buffer_size (%d) is greater than "
Expand All @@ -245,17 +250,10 @@ def __init__(self, client, group, topic, auto_commit=True, partitions=None,
self.partition_info = False # Do not return partition info in msgs
self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME
self.fetch_min_bytes = fetch_size_bytes
self.fetch_started = defaultdict(bool) # defaults to false
self.fetch_offsets = self.offsets.copy()
self.iter_timeout = iter_timeout
self.queue = Queue()

super(SimpleConsumer, self).__init__(
client, group, topic,
partitions=partitions,
auto_commit=auto_commit,
auto_commit_every_n=auto_commit_every_n,
auto_commit_every_t=auto_commit_every_t)

def __repr__(self):
return '<SimpleConsumer group=%s, topic=%s, partitions=%s>' % \
(self.group, self.topic, str(self.offsets.keys()))
Expand Down Expand Up @@ -305,6 +303,10 @@ def seek(self, offset, whence):
else:
raise ValueError("Unexpected value for `whence`, %d" % whence)

# Reset queue and fetch offsets since they are invalid
self.fetch_offsets = self.offsets.copy()
self.queue = Queue()

def get_messages(self, count=1, block=True, timeout=0.1):
"""
Fetch the specified number of messages
Expand All @@ -316,33 +318,69 @@ def get_messages(self, count=1, block=True, timeout=0.1):
it will block forever.
"""
messages = []
if timeout:
if timeout is not None:
max_time = time.time() + timeout

new_offsets = {}
while count > 0 and (timeout is None or timeout > 0):
message = self.get_message(block, timeout)
if message:
messages.append(message)
result = self._get_message(block, timeout, get_partition_info=True,
update_offset=False)
if result:
partition, message = result
if self.partition_info:
messages.append(result)
else:
messages.append(message)
new_offsets[partition] = message.offset + 1
count -= 1
else:
# Ran out of messages for the last request.
if not block:
# If we're not blocking, break.
break
if timeout:
if timeout is not None:
# If we're blocking and have a timeout, reduce it to the
# appropriate value
timeout = max_time - time.time()

# Update and commit offsets if necessary
self.offsets.update(new_offsets)
self.count_since_commit += len(messages)
self._auto_commit()
return messages

def get_message(self, block=True, timeout=0.1):
def get_message(self, block=True, timeout=0.1, get_partition_info=None):
return self._get_message(block, timeout, get_partition_info)

def _get_message(self, block=True, timeout=0.1, get_partition_info=None,
update_offset=True):
"""
If no messages can be fetched, returns None.
If get_partition_info is None, it defaults to self.partition_info
If get_partition_info is True, returns (partition, message)
If get_partition_info is False, returns message
"""
if self.queue.empty():
# We're out of messages, go grab some more.
with FetchContext(self, block, timeout):
self._fetch()
try:
return self.queue.get_nowait()
partition, message = self.queue.get_nowait()

if update_offset:
# Update partition offset
self.offsets[partition] = message.offset + 1

# Count, check and commit messages if necessary
self.count_since_commit += 1
self._auto_commit()

if get_partition_info is None:
get_partition_info = self.partition_info
if get_partition_info:
return partition, message
else:
return message
except Empty:
return None

Expand All @@ -367,11 +405,11 @@ def __iter__(self):
def _fetch(self):
# Create fetch request payloads for all the partitions
requests = []
partitions = self.offsets.keys()
partitions = self.fetch_offsets.keys()
while partitions:
for partition in partitions:
requests.append(FetchRequest(self.topic, partition,
self.offsets[partition],
self.fetch_offsets[partition],
self.buffer_size))
# Send request
responses = self.client.send_fetch_request(
Expand All @@ -384,18 +422,9 @@ def _fetch(self):
partition = resp.partition
try:
for message in resp.messages:
# Update partition offset
self.offsets[partition] = message.offset + 1

# Count, check and commit messages if necessary
self.count_since_commit += 1
self._auto_commit()

# Put the message in our queue
if self.partition_info:
self.queue.put((partition, message))
else:
self.queue.put(message)
self.queue.put((partition, message))
self.fetch_offsets[partition] = message.offset + 1
except ConsumerFetchSizeTooSmall, e:
if (self.max_buffer_size is not None and
self.buffer_size == self.max_buffer_size):
Expand Down Expand Up @@ -585,12 +614,11 @@ def __iter__(self):
break

# Count, check and commit messages if necessary
self.offsets[partition] = message.offset
self.offsets[partition] = message.offset + 1
self.start.clear()
yield message

self.count_since_commit += 1
self._auto_commit()
yield message

self.start.clear()

Expand All @@ -613,9 +641,10 @@ def get_messages(self, count=1, block=True, timeout=10):
self.size.value = count
self.pause.clear()

if timeout:
if timeout is not None:
max_time = time.time() + timeout

new_offsets = {}
while count > 0 and (timeout is None or timeout > 0):
# Trigger consumption only if the queue is empty
# By doing this, we will ensure that consumers do not
Expand All @@ -630,16 +659,18 @@ def get_messages(self, count=1, block=True, timeout=10):
break

messages.append(message)

# Count, check and commit messages if necessary
self.offsets[partition] = message.offset
self.count_since_commit += 1
self._auto_commit()
new_offsets[partition] = message.offset + 1
count -= 1
timeout = max_time - time.time()
if timeout is not None:
timeout = max_time - time.time()

self.size.value = 0
self.start.clear()
self.pause.set()

# Update and commit offsets if necessary
self.offsets.update(new_offsets)
self.count_since_commit += len(messages)
self._auto_commit()

return messages
Loading

0 comments on commit 72fdf39

Please sign in to comment.