diff options
author | Carlo Cabanilla <carlo@datadoghq.com> | 2014-02-28 12:53:18 -0500 |
---|---|---|
committer | Carlo Cabanilla <carlo@datadoghq.com> | 2014-02-28 19:28:49 +0000 |
commit | a798c1e32b91325f4311461362b510e720b925e3 (patch) | |
tree | b9472122c8cce1251013012b87086ab4e05581ad /kafka/consumer.py | |
parent | 4abf7ee1fbbdc47c8cb7b35f2600e58f1f95e6bb (diff) | |
download | kafka-python-a798c1e32b91325f4311461362b510e720b925e3.tar.gz |
fix consumer retry logic (fixes #135)
Fixes bug in the follow condition:
* Starting buffer size is 1024, max buffer size is 2048, both set on an instance level
* Fetch from p0, p1 and received response
* p0 has more than 1024 bytes, consumer doubles buffer size to 2048 and marks p0 for retry
* p1 has more than 1024 bytes, consumer tries to double buffer size, but sees that it's at
the max and raises ConsumerFetchSizeTooSmall
The fix changes the logic to the following:
* Starting buffer size is 1024 set on a per-partition level, max buffer size is 2048 set on an instance level
* Fetch from p0, p1 and received response
* p0 has more than 1024 bytes, consumer doubles buffer size to 2048 for p0 and marks p0 for retry
* p1 has more than 1024 bytes, consumer double buffer size to 2048 for p1 and marks p1 for retry
* Consumer sees that there's partitions to retry, repeats parsing loop
* p0 sent all the bytes this time, consumer yields these messages
* p1 sent all the bytes this time, consumer yields these messages
Diffstat (limited to 'kafka/consumer.py')
-rw-r--r-- | kafka/consumer.py | 26 |
1 files changed, 14 insertions, 12 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py index 28b53ec..98bfaaf 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -404,22 +404,24 @@ class SimpleConsumer(Consumer): def _fetch(self): # Create fetch request payloads for all the partitions - requests = [] - partitions = self.fetch_offsets.keys() + partitions = dict((p, self.buffer_size) + for p in self.fetch_offsets.keys()) while partitions: - for partition in partitions: + requests = [] + for partition, buffer_size in partitions.iteritems(): requests.append(FetchRequest(self.topic, partition, self.fetch_offsets[partition], - self.buffer_size)) + buffer_size)) # Send request responses = self.client.send_fetch_request( requests, max_wait_time=int(self.fetch_max_wait_time), min_bytes=self.fetch_min_bytes) - retry_partitions = set() + retry_partitions = {} for resp in responses: partition = resp.partition + buffer_size = partitions[partition] try: for message in resp.messages: # Put the message in our queue @@ -427,24 +429,24 @@ class SimpleConsumer(Consumer): self.fetch_offsets[partition] = message.offset + 1 except ConsumerFetchSizeTooSmall, e: if (self.max_buffer_size is not None and - self.buffer_size == self.max_buffer_size): + buffer_size == self.max_buffer_size): log.error("Max fetch size %d too small", self.max_buffer_size) raise e if self.max_buffer_size is None: - self.buffer_size *= 2 + buffer_size *= 2 else: - self.buffer_size = max(self.buffer_size * 2, - self.max_buffer_size) + buffer_size = max(buffer_size * 2, + self.max_buffer_size) log.warn("Fetch size too small, increase to %d (2x) " - "and retry", self.buffer_size) - retry_partitions.add(partition) + "and retry", buffer_size) + retry_partitions[partition] = buffer_size except ConsumerNoMoreData, e: log.debug("Iteration was ended by %r", e) except StopIteration: # Stop iterating through this partition log.debug("Done iterating over partition %s" % partition) - partitions = retry_partitions + partitions = retry_partitions def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size): """ |