diff options
author | Dana Powers <dana.powers@rd.io> | 2016-01-10 22:47:13 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2016-01-10 23:00:09 -0800 |
commit | 0adb71af3858a8f4d1cfb3fe072989499b3b3c4f (patch) | |
tree | d254f668fe4b1e064bb021aff097d406e7f9b949 /kafka/consumer | |
parent | 829b0379eb6f4df2e20fceb6673ad60e21b348f3 (diff) | |
download | kafka-python-0adb71af3858a8f4d1cfb3fe072989499b3b3c4f.tar.gz |
Fetcher logging should be debug or trace (left higher during testing)
Diffstat (limited to 'kafka/consumer')
-rw-r--r-- | kafka/consumer/fetcher.py | 20 | ||||
-rw-r--r-- | kafka/consumer/group.py | 1 |
2 files changed, 12 insertions, 9 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index eb85060..6446f4a 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -301,9 +301,9 @@ class Fetcher(six.Iterator): elif fetch_offset == position: next_offset = messages[-1][0] + 1 - log.debug("Returning fetched records at offset %d for assigned" - " partition %s and update position to %s", position, - tp, next_offset) + log.log(0, "Returning fetched records at offset %d for assigned" + " partition %s and update position to %s", position, + tp, next_offset) self._subscriptions.assignment[tp].position = next_offset for record in self._unpack_message_set(tp, messages): @@ -344,8 +344,8 @@ class Fetcher(six.Iterator): if not self._subscriptions.is_assigned(tp): # this can happen when a rebalance happened before # fetched records are returned - log.warning("Not returning fetched records for partition %s" - " since it is no longer assigned", tp) + log.debug("Not returning fetched records for partition %s" + " since it is no longer assigned", tp) continue # note that the consumed position should always be available @@ -354,18 +354,20 @@ class Fetcher(six.Iterator): if not self._subscriptions.is_fetchable(tp): # this can happen when a partition consumption paused before # fetched records are returned - log.warning("Not returning fetched records for assigned partition" - " %s since it is no longer fetchable", tp) + log.debug("Not returning fetched records for assigned partition" + " %s since it is no longer fetchable", tp) elif fetch_offset == position: + log.log(0, "Returning fetched records at offset %d for assigned" + " partition %s", position, tp) for msg in self._unpack_message_set(tp, messages): self._subscriptions.assignment[tp].position = msg.offset + 1 yield msg else: # these records aren't next in line based on the last consumed # position, ignore them they must be from an obsolete request - log.warning("Ignoring fetched records for %s at offset %s", - tp, fetch_offset) + log.debug("Ignoring fetched records for %s at offset %s", + tp, fetch_offset) # Send any additional FetchRequests that we can now # this will likely fetch each partition individually, rather than diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 704c994..d83c452 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -639,6 +639,7 @@ class KafkaConsumer(six.Iterator): for msg in self._fetcher: yield msg if time.time() > timeout_at: + log.debug("internal iterator timeout - breaking for poll") break def __iter__(self): # pylint: disable=non-iterator-returned |