summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/consumer/fetcher.py99
-rw-r--r--kafka/consumer/group.py95
2 files changed, 180 insertions, 14 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index 5109523..d09f9da 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -94,6 +94,7 @@ class Fetcher(six.Iterator):
self._unauthorized_topics = set()
self._offset_out_of_range_partitions = dict() # {topic_partition: offset}
self._record_too_large_partitions = dict() # {topic_partition: offset}
+ self._iterator = None
self._fetch_futures = collections.deque()
self._sensors = FetchManagerMetrics(metrics, self.config['metric_group_prefix'])
@@ -375,6 +376,90 @@ class Fetcher(six.Iterator):
part.discard()
return 0
+ def _message_generator(self):
+ """Iterate over fetched_records"""
+ if self._subscriptions.needs_partition_assignment:
+ raise StopIteration('Subscription needs partition assignment')
+
+ while self._records:
+
+ # Check on each iteration since this is a generator
+ self._raise_if_offset_out_of_range()
+ self._raise_if_unauthorized_topics()
+ self._raise_if_record_too_large()
+
+ # Send additional FetchRequests when the internal queue is low
+ # this should enable moderate pipelining
+ if len(self._records) <= self.config['iterator_refetch_records']:
+ self.send_fetches()
+
+ part = self._records.popleft()
+
+ tp = part.topic_partition
+ fetch_offset = part.fetch_offset
+ if not self._subscriptions.is_assigned(tp):
+ # this can happen when a rebalance happened before
+ # fetched records are returned
+ log.debug("Not returning fetched records for partition %s"
+ " since it is no longer assigned", tp)
+ continue
+
+ # note that the position should always be available
+ # as long as the partition is still assigned
+ position = self._subscriptions.assignment[tp].position
+ if not self._subscriptions.is_fetchable(tp):
+ # this can happen when a partition is paused before
+ # fetched records are returned
+ log.debug("Not returning fetched records for assigned partition"
+ " %s since it is no longer fetchable", tp)
+
+ elif fetch_offset == position:
+ log.log(0, "Returning fetched records at offset %d for assigned"
+ " partition %s", position, tp)
+
+ # We can ignore any prior signal to drop pending message sets
+ # because we are starting from a fresh one where fetch_offset == position
+ # i.e., the user seek()'d to this position
+ self._subscriptions.assignment[tp].drop_pending_message_set = False
+
+ for msg in part.messages:
+
+ # Because we are in a generator, it is possible for
+ # subscription state to change between yield calls
+ # so we need to re-check on each loop
+ # this should catch assignment changes, pauses
+ # and resets via seek_to_beginning / seek_to_end
+ if not self._subscriptions.is_fetchable(tp):
+ log.debug("Not returning fetched records for partition %s"
+ " since it is no longer fetchable", tp)
+ break
+
+ # If there is a seek during message iteration,
+ # we should stop unpacking this message set and
+ # wait for a new fetch response that aligns with the
+ # new seek position
+ elif self._subscriptions.assignment[tp].drop_pending_message_set:
+ log.debug("Skipping remainder of message set for partition %s", tp)
+ self._subscriptions.assignment[tp].drop_pending_message_set = False
+ break
+
+ # Compressed messagesets may include earlier messages
+ elif msg.offset < self._subscriptions.assignment[tp].position:
+ log.debug("Skipping message offset: %s (expecting %s)",
+ msg.offset,
+ self._subscriptions.assignment[tp].position)
+ continue
+
+ self._subscriptions.assignment[tp].position = msg.offset + 1
+ yield msg
+
+ else:
+ # these records aren't next in line based on the last consumed
+ # position, ignore them they must be from an obsolete request
+ log.debug("Ignoring fetched records for %s at offset %s since"
+ " the current position is %d", tp, part.fetch_offset,
+ position)
+
def _unpack_message_set(self, tp, messages):
try:
for offset, size, msg in messages:
@@ -448,13 +533,13 @@ class Fetcher(six.Iterator):
return self
def __next__(self):
- ret, _ = self.fetched_records(max_records=1)
- if not ret:
- raise StopIteration
- assert len(ret) == 1
- (messages,) = ret.values()
- assert len(messages) == 1
- return messages[0]
+ if not self._iterator:
+ self._iterator = self._message_generator()
+ try:
+ return next(self._iterator)
+ except StopIteration:
+ self._iterator = None
+ raise
def _deserialize(self, msg):
if self.config['key_deserializer']:
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index efadde1..3ab68a7 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -298,6 +298,8 @@ class KafkaConsumer(six.Iterator):
assignors=self.config['partition_assignment_strategy'],
**self.config)
self._closed = False
+ self._iterator = None
+ self._consumer_timeout = float('inf')
if topics:
self._subscription.subscribe(topics=topics)
@@ -835,17 +837,96 @@ class KafkaConsumer(six.Iterator):
# then do any offset lookups in case some positions are not known
self._fetcher.update_fetch_positions(partitions)
+ def _message_generator(self):
+ assert self.assignment() or self.subscription() is not None, 'No topic subscription or manual partition assignment'
+ while time.time() < self._consumer_timeout:
+
+ if self._use_consumer_group():
+ self._coordinator.ensure_coordinator_known()
+ self._coordinator.ensure_active_group()
+
+ # 0.8.2 brokers support kafka-backed offset storage via group coordinator
+ elif self.config['group_id'] is not None and self.config['api_version'] >= (0, 8, 2):
+ self._coordinator.ensure_coordinator_known()
+
+ # fetch offsets for any subscribed partitions that we arent tracking yet
+ if not self._subscription.has_all_fetch_positions():
+ partitions = self._subscription.missing_fetch_positions()
+ self._update_fetch_positions(partitions)
+
+ poll_ms = 1000 * (self._consumer_timeout - time.time())
+ if not self._fetcher.in_flight_fetches():
+ poll_ms = 0
+ self._client.poll(timeout_ms=poll_ms, sleep=True)
+
+ # We need to make sure we at least keep up with scheduled tasks,
+ # like heartbeats, auto-commits, and metadata refreshes
+ timeout_at = self._next_timeout()
+
+ # Because the consumer client poll does not sleep unless blocking on
+ # network IO, we need to explicitly sleep when we know we are idle
+ # because we haven't been assigned any partitions to fetch / consume
+ if self._use_consumer_group() and not self.assignment():
+ sleep_time = max(timeout_at - time.time(), 0)
+ if sleep_time > 0 and not self._client.in_flight_request_count():
+ log.debug('No partitions assigned; sleeping for %s', sleep_time)
+ time.sleep(sleep_time)
+ continue
+
+ # Short-circuit the fetch iterator if we are already timed out
+ # to avoid any unintentional interaction with fetcher setup
+ if time.time() > timeout_at:
+ continue
+
+ for msg in self._fetcher:
+ yield msg
+ if time.time() > timeout_at:
+ log.debug("internal iterator timeout - breaking for poll")
+ break
+
+ # an else block on a for loop only executes if there was no break
+ # so this should only be called on a StopIteration from the fetcher
+ # and we assume that it is safe to init_fetches when fetcher is done
+ # i.e., there are no more records stored internally
+ else:
+ self._fetcher.send_fetches()
+
+ def _next_timeout(self):
+ timeout = min(self._consumer_timeout,
+ self._client._delayed_tasks.next_at() + time.time(),
+ self._client.cluster.ttl() / 1000.0 + time.time())
+
+ # Although the delayed_tasks timeout above should cover processing
+ # HeartbeatRequests, it is still possible that HeartbeatResponses
+ # are left unprocessed during a long _fetcher iteration without
+ # an intermediate poll(). And because tasks are responsible for
+ # rescheduling themselves, an unprocessed response will prevent
+ # the next heartbeat from being sent. This check should help
+ # avoid that.
+ if self._use_consumer_group():
+ heartbeat = time.time() + self._coordinator.heartbeat.ttl()
+ timeout = min(timeout, heartbeat)
+ return timeout
+
def __iter__(self): # pylint: disable=non-iterator-returned
return self
def __next__(self):
- ret = self.poll(timeout_ms=self.config['consumer_timeout_ms'], max_records=1)
- if not ret:
- raise StopIteration
- assert len(ret) == 1
- (messages,) = ret.values()
- assert len(messages) == 1
- return messages[0]
+ if not self._iterator:
+ self._iterator = self._message_generator()
+
+ self._set_consumer_timeout()
+ try:
+ return next(self._iterator)
+ except StopIteration:
+ self._iterator = None
+ raise
+
+ def _set_consumer_timeout(self):
+ # consumer_timeout_ms can be used to stop iteration early
+ if self.config['consumer_timeout_ms'] >= 0:
+ self._consumer_timeout = time.time() + (
+ self.config['consumer_timeout_ms'] / 1000.0)
# old KafkaConsumer methods are deprecated
def configure(self, **configs):