summaryrefslogtreecommitdiff
path: root/kafka/consumer
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2016-01-10 22:47:13 -0800
committerDana Powers <dana.powers@rd.io>2016-01-10 23:00:09 -0800
commit0adb71af3858a8f4d1cfb3fe072989499b3b3c4f (patch)
treed254f668fe4b1e064bb021aff097d406e7f9b949 /kafka/consumer
parent829b0379eb6f4df2e20fceb6673ad60e21b348f3 (diff)
downloadkafka-python-0adb71af3858a8f4d1cfb3fe072989499b3b3c4f.tar.gz
Fetcher logging should be debug or trace (left higher during testing)
Diffstat (limited to 'kafka/consumer')
-rw-r--r--kafka/consumer/fetcher.py20
-rw-r--r--kafka/consumer/group.py1
2 files changed, 12 insertions, 9 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index eb85060..6446f4a 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -301,9 +301,9 @@ class Fetcher(six.Iterator):
elif fetch_offset == position:
next_offset = messages[-1][0] + 1
- log.debug("Returning fetched records at offset %d for assigned"
- " partition %s and update position to %s", position,
- tp, next_offset)
+ log.log(0, "Returning fetched records at offset %d for assigned"
+ " partition %s and update position to %s", position,
+ tp, next_offset)
self._subscriptions.assignment[tp].position = next_offset
for record in self._unpack_message_set(tp, messages):
@@ -344,8 +344,8 @@ class Fetcher(six.Iterator):
if not self._subscriptions.is_assigned(tp):
# this can happen when a rebalance happened before
# fetched records are returned
- log.warning("Not returning fetched records for partition %s"
- " since it is no longer assigned", tp)
+ log.debug("Not returning fetched records for partition %s"
+ " since it is no longer assigned", tp)
continue
# note that the consumed position should always be available
@@ -354,18 +354,20 @@ class Fetcher(six.Iterator):
if not self._subscriptions.is_fetchable(tp):
# this can happen when a partition consumption paused before
# fetched records are returned
- log.warning("Not returning fetched records for assigned partition"
- " %s since it is no longer fetchable", tp)
+ log.debug("Not returning fetched records for assigned partition"
+ " %s since it is no longer fetchable", tp)
elif fetch_offset == position:
+ log.log(0, "Returning fetched records at offset %d for assigned"
+ " partition %s", position, tp)
for msg in self._unpack_message_set(tp, messages):
self._subscriptions.assignment[tp].position = msg.offset + 1
yield msg
else:
# these records aren't next in line based on the last consumed
# position, ignore them they must be from an obsolete request
- log.warning("Ignoring fetched records for %s at offset %s",
- tp, fetch_offset)
+ log.debug("Ignoring fetched records for %s at offset %s",
+ tp, fetch_offset)
# Send any additional FetchRequests that we can now
# this will likely fetch each partition individually, rather than
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index 704c994..d83c452 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -639,6 +639,7 @@ class KafkaConsumer(six.Iterator):
for msg in self._fetcher:
yield msg
if time.time() > timeout_at:
+ log.debug("internal iterator timeout - breaking for poll")
break
def __iter__(self): # pylint: disable=non-iterator-returned