summaryrefslogtreecommitdiff
path: root/kafka/consumer/group.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/consumer/group.py')
-rw-r--r--kafka/consumer/group.py134
1 files changed, 29 insertions, 105 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index d4e0ff3..efadde1 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -3,6 +3,7 @@ from __future__ import absolute_import
import copy
import logging
import socket
+import sys
import time
from kafka.vendor import six
@@ -115,6 +116,7 @@ class KafkaConsumer(six.Iterator):
rebalances. Default: 3000
session_timeout_ms (int): The timeout used to detect failures when
using Kafka's group managementment facilities. Default: 30000
+ max_poll_records (int): ....
receive_buffer_bytes (int): The size of the TCP receive buffer
(SO_RCVBUF) to use when reading data. Default: None (relies on
system defaults). The java client defaults to 32768.
@@ -126,7 +128,7 @@ class KafkaConsumer(six.Iterator):
[(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)]
consumer_timeout_ms (int): number of milliseconds to block during
message iteration before raising StopIteration (i.e., ending the
- iterator). Default -1 (block forever).
+ iterator). Default block forever [float('inf')].
skip_double_compressed_messages (bool): A bug in KafkaProducer <= 1.2.4
caused some messages to be corrupted via double-compression.
By default, the fetcher will return these messages as a compressed
@@ -220,10 +222,11 @@ class KafkaConsumer(six.Iterator):
'partition_assignment_strategy': (RangePartitionAssignor, RoundRobinPartitionAssignor),
'heartbeat_interval_ms': 3000,
'session_timeout_ms': 30000,
+ 'max_poll_records': sys.maxsize,
'receive_buffer_bytes': None,
'send_buffer_bytes': None,
'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)],
- 'consumer_timeout_ms': -1,
+ 'consumer_timeout_ms': float('inf'),
'skip_double_compressed_messages': False,
'security_protocol': 'PLAINTEXT',
'ssl_context': None,
@@ -295,8 +298,6 @@ class KafkaConsumer(six.Iterator):
assignors=self.config['partition_assignment_strategy'],
**self.config)
self._closed = False
- self._iterator = None
- self._consumer_timeout = float('inf')
if topics:
self._subscription.subscribe(topics=topics)
@@ -483,7 +484,7 @@ class KafkaConsumer(six.Iterator):
"""
return self._client.cluster.partitions_for_topic(topic)
- def poll(self, timeout_ms=0):
+ def poll(self, timeout_ms=0, max_records=None):
"""Fetch data from assigned topics / partitions.
Records are fetched and returned in batches by topic-partition.
@@ -505,19 +506,15 @@ class KafkaConsumer(six.Iterator):
subscribed list of topics and partitions
"""
assert timeout_ms >= 0, 'Timeout must not be negative'
- assert self._iterator is None, 'Incompatible with iterator interface'
+ if max_records is None:
+ max_records = self.config['max_poll_records']
# poll for new data until the timeout expires
start = time.time()
remaining = timeout_ms
while True:
- records = self._poll_once(remaining)
+ records = self._poll_once(remaining, max_records)
if records:
- # before returning the fetched records, we can send off the
- # next round of fetches and avoid block waiting for their
- # responses to enable pipelining while the user is handling the
- # fetched records.
- self._fetcher.init_fetches()
return records
elapsed_ms = (time.time() - start) * 1000
@@ -526,7 +523,7 @@ class KafkaConsumer(six.Iterator):
if remaining <= 0:
return {}
- def _poll_once(self, timeout_ms):
+ def _poll_once(self, timeout_ms, max_records):
"""
Do one round of polling. In addition to checking for new data, this does
any needed heart-beating, auto-commits, and offset updates.
@@ -545,23 +542,29 @@ class KafkaConsumer(six.Iterator):
elif self.config['group_id'] is not None and self.config['api_version'] >= (0, 8, 2):
self._coordinator.ensure_coordinator_known()
-
# fetch positions if we have partitions we're subscribed to that we
# don't know the offset for
if not self._subscription.has_all_fetch_positions():
self._update_fetch_positions(self._subscription.missing_fetch_positions())
- # init any new fetches (won't resend pending fetches)
- records = self._fetcher.fetched_records()
-
# if data is available already, e.g. from a previous network client
# poll() call to commit, then just return it immediately
+ records, partial = self._fetcher.fetched_records(max_records)
if records:
+ # before returning the fetched records, we can send off the
+ # next round of fetches and avoid block waiting for their
+ # responses to enable pipelining while the user is handling the
+ # fetched records.
+ if not partial:
+ self._fetcher.send_fetches()
return records
- self._fetcher.init_fetches()
+ # send any new fetches (won't resend pending fetches)
+ self._fetcher.send_fetches()
+
self._client.poll(timeout_ms=timeout_ms, sleep=True)
- return self._fetcher.fetched_records()
+ records, _ = self._fetcher.fetched_records(max_records)
+ return records
def position(self, partition):
"""Get the offset of the next record that will be fetched
@@ -832,96 +835,17 @@ class KafkaConsumer(six.Iterator):
# then do any offset lookups in case some positions are not known
self._fetcher.update_fetch_positions(partitions)
- def _message_generator(self):
- assert self.assignment() or self.subscription() is not None, 'No topic subscription or manual partition assignment'
- while time.time() < self._consumer_timeout:
-
- if self._use_consumer_group():
- self._coordinator.ensure_coordinator_known()
- self._coordinator.ensure_active_group()
-
- # 0.8.2 brokers support kafka-backed offset storage via group coordinator
- elif self.config['group_id'] is not None and self.config['api_version'] >= (0, 8, 2):
- self._coordinator.ensure_coordinator_known()
-
- # fetch offsets for any subscribed partitions that we arent tracking yet
- if not self._subscription.has_all_fetch_positions():
- partitions = self._subscription.missing_fetch_positions()
- self._update_fetch_positions(partitions)
-
- poll_ms = 1000 * (self._consumer_timeout - time.time())
- if not self._fetcher.in_flight_fetches():
- poll_ms = 0
- self._client.poll(timeout_ms=poll_ms, sleep=True)
-
- # We need to make sure we at least keep up with scheduled tasks,
- # like heartbeats, auto-commits, and metadata refreshes
- timeout_at = self._next_timeout()
-
- # Because the consumer client poll does not sleep unless blocking on
- # network IO, we need to explicitly sleep when we know we are idle
- # because we haven't been assigned any partitions to fetch / consume
- if self._use_consumer_group() and not self.assignment():
- sleep_time = max(timeout_at - time.time(), 0)
- if sleep_time > 0 and not self._client.in_flight_request_count():
- log.debug('No partitions assigned; sleeping for %s', sleep_time)
- time.sleep(sleep_time)
- continue
-
- # Short-circuit the fetch iterator if we are already timed out
- # to avoid any unintentional interaction with fetcher setup
- if time.time() > timeout_at:
- continue
-
- for msg in self._fetcher:
- yield msg
- if time.time() > timeout_at:
- log.debug("internal iterator timeout - breaking for poll")
- break
-
- # an else block on a for loop only executes if there was no break
- # so this should only be called on a StopIteration from the fetcher
- # and we assume that it is safe to init_fetches when fetcher is done
- # i.e., there are no more records stored internally
- else:
- self._fetcher.init_fetches()
-
- def _next_timeout(self):
- timeout = min(self._consumer_timeout,
- self._client._delayed_tasks.next_at() + time.time(),
- self._client.cluster.ttl() / 1000.0 + time.time())
-
- # Although the delayed_tasks timeout above should cover processing
- # HeartbeatRequests, it is still possible that HeartbeatResponses
- # are left unprocessed during a long _fetcher iteration without
- # an intermediate poll(). And because tasks are responsible for
- # rescheduling themselves, an unprocessed response will prevent
- # the next heartbeat from being sent. This check should help
- # avoid that.
- if self._use_consumer_group():
- heartbeat = time.time() + self._coordinator.heartbeat.ttl()
- timeout = min(timeout, heartbeat)
- return timeout
-
def __iter__(self): # pylint: disable=non-iterator-returned
return self
def __next__(self):
- if not self._iterator:
- self._iterator = self._message_generator()
-
- self._set_consumer_timeout()
- try:
- return next(self._iterator)
- except StopIteration:
- self._iterator = None
- raise
-
- def _set_consumer_timeout(self):
- # consumer_timeout_ms can be used to stop iteration early
- if self.config['consumer_timeout_ms'] >= 0:
- self._consumer_timeout = time.time() + (
- self.config['consumer_timeout_ms'] / 1000.0)
+ ret = self.poll(timeout_ms=self.config['consumer_timeout_ms'], max_records=1)
+ if not ret:
+ raise StopIteration
+ assert len(ret) == 1
+ (messages,) = ret.values()
+ assert len(messages) == 1
+ return messages[0]
# old KafkaConsumer methods are deprecated
def configure(self, **configs):