diff options
Diffstat (limited to 'kafka/consumer/group.py')
-rw-r--r-- | kafka/consumer/group.py | 134 |
1 files changed, 29 insertions, 105 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index d4e0ff3..efadde1 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -3,6 +3,7 @@ from __future__ import absolute_import import copy import logging import socket +import sys import time from kafka.vendor import six @@ -115,6 +116,7 @@ class KafkaConsumer(six.Iterator): rebalances. Default: 3000 session_timeout_ms (int): The timeout used to detect failures when using Kafka's group managementment facilities. Default: 30000 + max_poll_records (int): .... receive_buffer_bytes (int): The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. Default: None (relies on system defaults). The java client defaults to 32768. @@ -126,7 +128,7 @@ class KafkaConsumer(six.Iterator): [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)] consumer_timeout_ms (int): number of milliseconds to block during message iteration before raising StopIteration (i.e., ending the - iterator). Default -1 (block forever). + iterator). Default block forever [float('inf')]. skip_double_compressed_messages (bool): A bug in KafkaProducer <= 1.2.4 caused some messages to be corrupted via double-compression. By default, the fetcher will return these messages as a compressed @@ -220,10 +222,11 @@ class KafkaConsumer(six.Iterator): 'partition_assignment_strategy': (RangePartitionAssignor, RoundRobinPartitionAssignor), 'heartbeat_interval_ms': 3000, 'session_timeout_ms': 30000, + 'max_poll_records': sys.maxsize, 'receive_buffer_bytes': None, 'send_buffer_bytes': None, 'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)], - 'consumer_timeout_ms': -1, + 'consumer_timeout_ms': float('inf'), 'skip_double_compressed_messages': False, 'security_protocol': 'PLAINTEXT', 'ssl_context': None, @@ -295,8 +298,6 @@ class KafkaConsumer(six.Iterator): assignors=self.config['partition_assignment_strategy'], **self.config) self._closed = False - self._iterator = None - self._consumer_timeout = float('inf') if topics: self._subscription.subscribe(topics=topics) @@ -483,7 +484,7 @@ class KafkaConsumer(six.Iterator): """ return self._client.cluster.partitions_for_topic(topic) - def poll(self, timeout_ms=0): + def poll(self, timeout_ms=0, max_records=None): """Fetch data from assigned topics / partitions. Records are fetched and returned in batches by topic-partition. @@ -505,19 +506,15 @@ class KafkaConsumer(six.Iterator): subscribed list of topics and partitions """ assert timeout_ms >= 0, 'Timeout must not be negative' - assert self._iterator is None, 'Incompatible with iterator interface' + if max_records is None: + max_records = self.config['max_poll_records'] # poll for new data until the timeout expires start = time.time() remaining = timeout_ms while True: - records = self._poll_once(remaining) + records = self._poll_once(remaining, max_records) if records: - # before returning the fetched records, we can send off the - # next round of fetches and avoid block waiting for their - # responses to enable pipelining while the user is handling the - # fetched records. - self._fetcher.init_fetches() return records elapsed_ms = (time.time() - start) * 1000 @@ -526,7 +523,7 @@ class KafkaConsumer(six.Iterator): if remaining <= 0: return {} - def _poll_once(self, timeout_ms): + def _poll_once(self, timeout_ms, max_records): """ Do one round of polling. In addition to checking for new data, this does any needed heart-beating, auto-commits, and offset updates. @@ -545,23 +542,29 @@ class KafkaConsumer(six.Iterator): elif self.config['group_id'] is not None and self.config['api_version'] >= (0, 8, 2): self._coordinator.ensure_coordinator_known() - # fetch positions if we have partitions we're subscribed to that we # don't know the offset for if not self._subscription.has_all_fetch_positions(): self._update_fetch_positions(self._subscription.missing_fetch_positions()) - # init any new fetches (won't resend pending fetches) - records = self._fetcher.fetched_records() - # if data is available already, e.g. from a previous network client # poll() call to commit, then just return it immediately + records, partial = self._fetcher.fetched_records(max_records) if records: + # before returning the fetched records, we can send off the + # next round of fetches and avoid block waiting for their + # responses to enable pipelining while the user is handling the + # fetched records. + if not partial: + self._fetcher.send_fetches() return records - self._fetcher.init_fetches() + # send any new fetches (won't resend pending fetches) + self._fetcher.send_fetches() + self._client.poll(timeout_ms=timeout_ms, sleep=True) - return self._fetcher.fetched_records() + records, _ = self._fetcher.fetched_records(max_records) + return records def position(self, partition): """Get the offset of the next record that will be fetched @@ -832,96 +835,17 @@ class KafkaConsumer(six.Iterator): # then do any offset lookups in case some positions are not known self._fetcher.update_fetch_positions(partitions) - def _message_generator(self): - assert self.assignment() or self.subscription() is not None, 'No topic subscription or manual partition assignment' - while time.time() < self._consumer_timeout: - - if self._use_consumer_group(): - self._coordinator.ensure_coordinator_known() - self._coordinator.ensure_active_group() - - # 0.8.2 brokers support kafka-backed offset storage via group coordinator - elif self.config['group_id'] is not None and self.config['api_version'] >= (0, 8, 2): - self._coordinator.ensure_coordinator_known() - - # fetch offsets for any subscribed partitions that we arent tracking yet - if not self._subscription.has_all_fetch_positions(): - partitions = self._subscription.missing_fetch_positions() - self._update_fetch_positions(partitions) - - poll_ms = 1000 * (self._consumer_timeout - time.time()) - if not self._fetcher.in_flight_fetches(): - poll_ms = 0 - self._client.poll(timeout_ms=poll_ms, sleep=True) - - # We need to make sure we at least keep up with scheduled tasks, - # like heartbeats, auto-commits, and metadata refreshes - timeout_at = self._next_timeout() - - # Because the consumer client poll does not sleep unless blocking on - # network IO, we need to explicitly sleep when we know we are idle - # because we haven't been assigned any partitions to fetch / consume - if self._use_consumer_group() and not self.assignment(): - sleep_time = max(timeout_at - time.time(), 0) - if sleep_time > 0 and not self._client.in_flight_request_count(): - log.debug('No partitions assigned; sleeping for %s', sleep_time) - time.sleep(sleep_time) - continue - - # Short-circuit the fetch iterator if we are already timed out - # to avoid any unintentional interaction with fetcher setup - if time.time() > timeout_at: - continue - - for msg in self._fetcher: - yield msg - if time.time() > timeout_at: - log.debug("internal iterator timeout - breaking for poll") - break - - # an else block on a for loop only executes if there was no break - # so this should only be called on a StopIteration from the fetcher - # and we assume that it is safe to init_fetches when fetcher is done - # i.e., there are no more records stored internally - else: - self._fetcher.init_fetches() - - def _next_timeout(self): - timeout = min(self._consumer_timeout, - self._client._delayed_tasks.next_at() + time.time(), - self._client.cluster.ttl() / 1000.0 + time.time()) - - # Although the delayed_tasks timeout above should cover processing - # HeartbeatRequests, it is still possible that HeartbeatResponses - # are left unprocessed during a long _fetcher iteration without - # an intermediate poll(). And because tasks are responsible for - # rescheduling themselves, an unprocessed response will prevent - # the next heartbeat from being sent. This check should help - # avoid that. - if self._use_consumer_group(): - heartbeat = time.time() + self._coordinator.heartbeat.ttl() - timeout = min(timeout, heartbeat) - return timeout - def __iter__(self): # pylint: disable=non-iterator-returned return self def __next__(self): - if not self._iterator: - self._iterator = self._message_generator() - - self._set_consumer_timeout() - try: - return next(self._iterator) - except StopIteration: - self._iterator = None - raise - - def _set_consumer_timeout(self): - # consumer_timeout_ms can be used to stop iteration early - if self.config['consumer_timeout_ms'] >= 0: - self._consumer_timeout = time.time() + ( - self.config['consumer_timeout_ms'] / 1000.0) + ret = self.poll(timeout_ms=self.config['consumer_timeout_ms'], max_records=1) + if not ret: + raise StopIteration + assert len(ret) == 1 + (messages,) = ret.values() + assert len(messages) == 1 + return messages[0] # old KafkaConsumer methods are deprecated def configure(self, **configs): |