summaryrefslogtreecommitdiff
path: root/kafka/consumer/kafka.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2014-09-17 13:49:23 -0700
committerDana Powers <dana.powers@rd.io>2014-12-15 12:42:56 -0800
commitb80e83b7335a92fcbfcf25e38d51f24fc00c20ea (patch)
tree18a57ce552645e7d72c2a43050e73d2c7f2679c5 /kafka/consumer/kafka.py
parent206560a74b56e7a2dcc7f358f24b5769a22769b5 (diff)
downloadkafka-python-b80e83b7335a92fcbfcf25e38d51f24fc00c20ea.tar.gz
Fix task_done checks when no previous commit exists; add test
Diffstat (limited to 'kafka/consumer/kafka.py')
-rw-r--r--kafka/consumer/kafka.py11
1 files changed, 5 insertions, 6 deletions
diff --git a/kafka/consumer/kafka.py b/kafka/consumer/kafka.py
index 43e8c55..705c70d 100644
--- a/kafka/consumer/kafka.py
+++ b/kafka/consumer/kafka.py
@@ -485,12 +485,11 @@ class KafkaConsumer(object):
offset, prev_done)
# Warn on smaller offsets than previous commit
- # "commit" offsets are actually the offset of the next # message to fetch.
- # so task_done should be compared with (commit - 1)
- prev_done = (self._offsets.commit[topic_partition] - 1)
- if prev_done is not None and (offset <= prev_done):
- logger.warning('Marking task_done on a previously committed offset?: %d <= %d',
- offset, prev_done)
+ # "commit" offsets are actually the offset of the next message to fetch.
+ prev_commit = self._offsets.commit[topic_partition]
+ if prev_commit is not None and ((offset + 1) <= prev_commit):
+ logger.warning('Marking task_done on a previously committed offset?: %d (+1) <= %d',
+ offset, prev_commit)
self._offsets.task_done[topic_partition] = offset