summaryrefslogtreecommitdiff
path: root/kafka/consumer.py
diff options
context:
space:
mode:
authorCarlo Cabanilla <carlo@datadoghq.com>2014-02-28 12:53:18 -0500
committerCarlo Cabanilla <carlo@datadoghq.com>2014-02-28 19:28:49 +0000
commita798c1e32b91325f4311461362b510e720b925e3 (patch)
treeb9472122c8cce1251013012b87086ab4e05581ad /kafka/consumer.py
parent4abf7ee1fbbdc47c8cb7b35f2600e58f1f95e6bb (diff)
downloadkafka-python-a798c1e32b91325f4311461362b510e720b925e3.tar.gz
fix consumer retry logic (fixes #135)
Fixes bug in the follow condition: * Starting buffer size is 1024, max buffer size is 2048, both set on an instance level * Fetch from p0, p1 and received response * p0 has more than 1024 bytes, consumer doubles buffer size to 2048 and marks p0 for retry * p1 has more than 1024 bytes, consumer tries to double buffer size, but sees that it's at the max and raises ConsumerFetchSizeTooSmall The fix changes the logic to the following: * Starting buffer size is 1024 set on a per-partition level, max buffer size is 2048 set on an instance level * Fetch from p0, p1 and received response * p0 has more than 1024 bytes, consumer doubles buffer size to 2048 for p0 and marks p0 for retry * p1 has more than 1024 bytes, consumer double buffer size to 2048 for p1 and marks p1 for retry * Consumer sees that there's partitions to retry, repeats parsing loop * p0 sent all the bytes this time, consumer yields these messages * p1 sent all the bytes this time, consumer yields these messages
Diffstat (limited to 'kafka/consumer.py')
-rw-r--r--kafka/consumer.py26
1 files changed, 14 insertions, 12 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py
index 28b53ec..98bfaaf 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -404,22 +404,24 @@ class SimpleConsumer(Consumer):
def _fetch(self):
# Create fetch request payloads for all the partitions
- requests = []
- partitions = self.fetch_offsets.keys()
+ partitions = dict((p, self.buffer_size)
+ for p in self.fetch_offsets.keys())
while partitions:
- for partition in partitions:
+ requests = []
+ for partition, buffer_size in partitions.iteritems():
requests.append(FetchRequest(self.topic, partition,
self.fetch_offsets[partition],
- self.buffer_size))
+ buffer_size))
# Send request
responses = self.client.send_fetch_request(
requests,
max_wait_time=int(self.fetch_max_wait_time),
min_bytes=self.fetch_min_bytes)
- retry_partitions = set()
+ retry_partitions = {}
for resp in responses:
partition = resp.partition
+ buffer_size = partitions[partition]
try:
for message in resp.messages:
# Put the message in our queue
@@ -427,24 +429,24 @@ class SimpleConsumer(Consumer):
self.fetch_offsets[partition] = message.offset + 1
except ConsumerFetchSizeTooSmall, e:
if (self.max_buffer_size is not None and
- self.buffer_size == self.max_buffer_size):
+ buffer_size == self.max_buffer_size):
log.error("Max fetch size %d too small",
self.max_buffer_size)
raise e
if self.max_buffer_size is None:
- self.buffer_size *= 2
+ buffer_size *= 2
else:
- self.buffer_size = max(self.buffer_size * 2,
- self.max_buffer_size)
+ buffer_size = max(buffer_size * 2,
+ self.max_buffer_size)
log.warn("Fetch size too small, increase to %d (2x) "
- "and retry", self.buffer_size)
- retry_partitions.add(partition)
+ "and retry", buffer_size)
+ retry_partitions[partition] = buffer_size
except ConsumerNoMoreData, e:
log.debug("Iteration was ended by %r", e)
except StopIteration:
# Stop iterating through this partition
log.debug("Done iterating over partition %s" % partition)
- partitions = retry_partitions
+ partitions = retry_partitions
def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size):
"""