summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-09-16 09:31:19 -0700
committerDana Powers <dana.powers@gmail.com>2016-09-19 21:07:14 -0700
commitaed27505015fdb3ca69f9565d4b1e3a159f1ae89 (patch)
tree4a69cb765abcc2cf21d29359b8af2a87557f04a9
parent6e3eddc267e6d0e2c3d64d821ff03de9a2c52f53 (diff)
downloadkafka-python-aed27505015fdb3ca69f9565d4b1e3a159f1ae89.tar.gz
Greedy unpack messagesets; simplify fetch and consumer iterators -- just call poll() w/ 1 max record
-rw-r--r--kafka/consumer/fetcher.py136
-rw-r--r--kafka/consumer/group.py117
2 files changed, 42 insertions, 211 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index 21c0b30..a237c0c 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -94,7 +94,6 @@ class Fetcher(six.Iterator):
self._unauthorized_topics = set()
self._offset_out_of_range_partitions = dict() # {topic_partition: offset}
self._record_too_large_partitions = dict() # {topic_partition: offset}
- self._iterator = None
self._fetch_futures = collections.deque()
self._sensors = FetchManagerMetrics(metrics, self.config['metric_group_prefix'])
@@ -106,16 +105,6 @@ class Fetcher(six.Iterator):
Returns:
List of Futures: each future resolves to a FetchResponse
"""
- # We need to be careful when creating fetch records during iteration
- # so we verify that there are no records in the deque, or in an
- # iterator
- if self._records or self._iterator:
- log.debug('Skipping send_fetches because there are unconsumed'
- ' records internally')
- return []
- return self._send_fetches()
-
- def _send_fetches(self):
futures = []
for node_id, request in six.iteritems(self._create_fetch_requests()):
if self._client.ready(node_id):
@@ -293,10 +282,12 @@ class Fetcher(six.Iterator):
copied_record_too_large_partitions,
self.config['max_partition_fetch_bytes'])
- def fetched_records(self):
+ def fetched_records(self, max_records=None):
"""Returns previously fetched records and updates consumed offsets.
- Incompatible with iterator interface - use one or the other, not both.
+ Arguments:
+ max_records (int): Maximum number of records returned. Defaults
+ to max_poll_records configuration.
Raises:
OffsetOutOfRangeError: if no subscription offset_reset_strategy
@@ -306,22 +297,21 @@ class Fetcher(six.Iterator):
configured max_partition_fetch_bytes
TopicAuthorizationError: if consumer is not authorized to fetch
messages from the topic
- AssertionError: if used with iterator (incompatible)
Returns:
dict: {TopicPartition: [messages]}
"""
- assert self._iterator is None, (
- 'fetched_records is incompatible with message iterator')
+ if max_records is None:
+ max_records = self.config['max_poll_records']
+
if self._subscriptions.needs_partition_assignment:
return {}
- drained = collections.defaultdict(list)
self._raise_if_offset_out_of_range()
self._raise_if_unauthorized_topics()
self._raise_if_record_too_large()
- max_records = self.config['max_poll_records']
+ drained = collections.defaultdict(list)
while self._records and max_records > 0:
part = self._records.popleft()
max_records -= self._append(drained, part, max_records)
@@ -349,13 +339,15 @@ class Fetcher(six.Iterator):
elif fetch_offset == position:
part_records = part.take(max_records)
- next_offset = part_records[-1][0] + 1
+ if not part_records:
+ return 0
+ next_offset = part_records[-1].offset + 1
log.log(0, "Returning fetched records at offset %d for assigned"
" partition %s and update position to %s", position,
tp, next_offset)
- for record in self._unpack_message_set(tp, part_records):
+ for record in part_records:
# Fetched compressed messages may include additional records
if record.offset < fetch_offset:
log.debug("Skipping message offset: %s (expecting %s)",
@@ -372,6 +364,7 @@ class Fetcher(six.Iterator):
log.debug("Ignoring fetched records for %s at offset %s since"
" the current position is %d", tp, part.fetch_offset,
position)
+
part.discard()
return 0
@@ -444,98 +437,17 @@ class Fetcher(six.Iterator):
log.exception('StopIteration raised unpacking messageset: %s', e)
raise Exception('StopIteration raised unpacking messageset')
- def _message_generator(self):
- """Iterate over fetched_records"""
- if self._subscriptions.needs_partition_assignment:
- raise StopIteration('Subscription needs partition assignment')
-
- while self._records:
-
- # Check on each iteration since this is a generator
- self._raise_if_offset_out_of_range()
- self._raise_if_unauthorized_topics()
- self._raise_if_record_too_large()
-
- # Send additional FetchRequests when the internal queue is low
- # this should enable moderate pipelining
- if len(self._records) <= self.config['iterator_refetch_records']:
- self._send_fetches()
-
- part = self._records.popleft()
- tp = part.topic_partition
-
- if not self._subscriptions.is_assigned(tp):
- # this can happen when a rebalance happened before
- # fetched records are returned
- log.debug("Not returning fetched records for partition %s"
- " since it is no longer assigned", tp)
- continue
-
- # note that the consumed position should always be available
- # as long as the partition is still assigned
- position = self._subscriptions.assignment[tp].position
- if not self._subscriptions.is_fetchable(tp):
- # this can happen when a partition consumption paused before
- # fetched records are returned
- log.debug("Not returning fetched records for assigned partition"
- " %s since it is no longer fetchable", tp)
-
- elif part.fetch_offset == position:
- log.log(0, "Returning fetched records at offset %d for assigned"
- " partition %s", position, tp)
-
- # We can ignore any prior signal to drop pending message sets
- # because we are starting from a fresh one where fetch_offset == position
- # i.e., the user seek()'d to this position
- self._subscriptions.assignment[tp].drop_pending_message_set = False
-
- for msg in self._unpack_message_set(tp, part.messages):
-
- # Because we are in a generator, it is possible for
- # subscription state to change between yield calls
- # so we need to re-check on each loop
- # this should catch assignment changes, pauses
- # and resets via seek_to_beginning / seek_to_end
- if not self._subscriptions.is_fetchable(tp):
- log.debug("Not returning fetched records for partition %s"
- " since it is no longer fetchable", tp)
- break
-
- # If there is a seek during message iteration,
- # we should stop unpacking this message set and
- # wait for a new fetch response that aligns with the
- # new seek position
- elif self._subscriptions.assignment[tp].drop_pending_message_set:
- log.debug("Skipping remainder of message set for partition %s", tp)
- self._subscriptions.assignment[tp].drop_pending_message_set = False
- break
-
- # Compressed messagesets may include earlier messages
- elif msg.offset < self._subscriptions.assignment[tp].position:
- log.debug("Skipping message offset: %s (expecting %s)",
- msg.offset,
- self._subscriptions.assignment[tp].position)
- continue
-
- self._subscriptions.assignment[tp].position = msg.offset + 1
- yield msg
- else:
- # these records aren't next in line based on the last consumed
- # position, ignore them they must be from an obsolete request
- log.debug("Ignoring fetched records for %s at offset %s",
- tp, part.fetch_offset)
-
def __iter__(self): # pylint: disable=non-iterator-returned
return self
def __next__(self):
- if not self._iterator:
- self._iterator = self._message_generator()
- try:
- return next(self._iterator)
- except StopIteration:
- self._iterator = None
- raise
+ ret = self.fetched_records(max_records=1)
+ if not ret:
+ raise StopIteration
+ assert len(ret.keys()) == 1
+ (messages,) = ret.values()
+ assert len(messages) == 1
+ return messages[0]
def _deserialize(self, msg):
if self.config['key_deserializer']:
@@ -717,7 +629,8 @@ class Fetcher(six.Iterator):
log.debug("Adding fetched record for partition %s with"
" offset %d to buffered record list", tp,
position)
- self._records.append(self.PartitionRecords(fetch_offset, tp, messages))
+ unpacked = list(self._unpack_message_set(tp, messages))
+ self._records.append(self.PartitionRecords(fetch_offset, tp, unpacked))
last_offset, _, _ = messages[-1]
self._sensors.records_fetch_lag.record(highwater - last_offset)
num_bytes = sum(msg[1] for msg in messages)
@@ -757,8 +670,7 @@ class Fetcher(six.Iterator):
self._sensors.fetch_throttle_time_sensor.record(response.throttle_time_ms)
self._sensors.fetch_latency.record((recv_time - send_time) * 1000)
- class PartitionRecords(object):
-
+ class PartitionRecords(six.Iterator):
def __init__(self, fetch_offset, tp, messages):
self.fetch_offset = fetch_offset
self.topic_partition = tp
@@ -780,7 +692,7 @@ class Fetcher(six.Iterator):
self.messages = self.messages[n:]
if self.messages:
- self.fetch_offset = self.messages[0][0]
+ self.fetch_offset = self.messages[0].offset
return res
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index 08f50e9..83a3531 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -128,7 +128,7 @@ class KafkaConsumer(six.Iterator):
[(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)]
consumer_timeout_ms (int): number of milliseconds to block during
message iteration before raising StopIteration (i.e., ending the
- iterator). Default -1 (block forever).
+ iterator). Default block forever [float('inf')].
skip_double_compressed_messages (bool): A bug in KafkaProducer <= 1.2.4
caused some messages to be corrupted via double-compression.
By default, the fetcher will return these messages as a compressed
@@ -226,7 +226,7 @@ class KafkaConsumer(six.Iterator):
'receive_buffer_bytes': None,
'send_buffer_bytes': None,
'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)],
- 'consumer_timeout_ms': -1,
+ 'consumer_timeout_ms': float('inf'),
'skip_double_compressed_messages': False,
'security_protocol': 'PLAINTEXT',
'ssl_context': None,
@@ -298,8 +298,6 @@ class KafkaConsumer(six.Iterator):
assignors=self.config['partition_assignment_strategy'],
**self.config)
self._closed = False
- self._iterator = None
- self._consumer_timeout = float('inf')
if topics:
self._subscription.subscribe(topics=topics)
@@ -486,7 +484,7 @@ class KafkaConsumer(six.Iterator):
"""
return self._client.cluster.partitions_for_topic(topic)
- def poll(self, timeout_ms=0):
+ def poll(self, timeout_ms=0, max_records=None):
"""Fetch data from assigned topics / partitions.
Records are fetched and returned in batches by topic-partition.
@@ -508,13 +506,14 @@ class KafkaConsumer(six.Iterator):
subscribed list of topics and partitions
"""
assert timeout_ms >= 0, 'Timeout must not be negative'
- assert self._iterator is None, 'Incompatible with iterator interface'
+ if max_records is None:
+ max_records = self.config['max_poll_records']
# poll for new data until the timeout expires
start = time.time()
remaining = timeout_ms
while True:
- records = self._poll_once(remaining)
+ records = self._poll_once(remaining, max_records)
if records:
# before returning the fetched records, we can send off the
# next round of fetches and avoid block waiting for their
@@ -529,7 +528,7 @@ class KafkaConsumer(six.Iterator):
if remaining <= 0:
return {}
- def _poll_once(self, timeout_ms):
+ def _poll_once(self, timeout_ms, max_records):
"""
Do one round of polling. In addition to checking for new data, this does
any needed heart-beating, auto-commits, and offset updates.
@@ -548,23 +547,22 @@ class KafkaConsumer(six.Iterator):
elif self.config['group_id'] is not None and self.config['api_version'] >= (0, 8, 2):
self._coordinator.ensure_coordinator_known()
-
# fetch positions if we have partitions we're subscribed to that we
# don't know the offset for
if not self._subscription.has_all_fetch_positions():
self._update_fetch_positions(self._subscription.missing_fetch_positions())
- # init any new fetches (won't resend pending fetches)
- records = self._fetcher.fetched_records()
-
# if data is available already, e.g. from a previous network client
# poll() call to commit, then just return it immediately
+ records = self._fetcher.fetched_records(max_records)
if records:
return records
+ # send any new fetches (won't resend pending fetches)
self._fetcher.send_fetches()
+
self._client.poll(timeout_ms=timeout_ms, sleep=True)
- return self._fetcher.fetched_records()
+ return self._fetcher.fetched_records(max_records)
def position(self, partition):
"""Get the offset of the next record that will be fetched
@@ -835,96 +833,17 @@ class KafkaConsumer(six.Iterator):
# then do any offset lookups in case some positions are not known
self._fetcher.update_fetch_positions(partitions)
- def _message_generator(self):
- assert self.assignment() or self.subscription() is not None, 'No topic subscription or manual partition assignment'
- while time.time() < self._consumer_timeout:
-
- if self._use_consumer_group():
- self._coordinator.ensure_coordinator_known()
- self._coordinator.ensure_active_group()
-
- # 0.8.2 brokers support kafka-backed offset storage via group coordinator
- elif self.config['group_id'] is not None and self.config['api_version'] >= (0, 8, 2):
- self._coordinator.ensure_coordinator_known()
-
- # fetch offsets for any subscribed partitions that we arent tracking yet
- if not self._subscription.has_all_fetch_positions():
- partitions = self._subscription.missing_fetch_positions()
- self._update_fetch_positions(partitions)
-
- poll_ms = 1000 * (self._consumer_timeout - time.time())
- if not self._fetcher.in_flight_fetches():
- poll_ms = 0
- self._client.poll(timeout_ms=poll_ms, sleep=True)
-
- # We need to make sure we at least keep up with scheduled tasks,
- # like heartbeats, auto-commits, and metadata refreshes
- timeout_at = self._next_timeout()
-
- # Because the consumer client poll does not sleep unless blocking on
- # network IO, we need to explicitly sleep when we know we are idle
- # because we haven't been assigned any partitions to fetch / consume
- if self._use_consumer_group() and not self.assignment():
- sleep_time = max(timeout_at - time.time(), 0)
- if sleep_time > 0 and not self._client.in_flight_request_count():
- log.debug('No partitions assigned; sleeping for %s', sleep_time)
- time.sleep(sleep_time)
- continue
-
- # Short-circuit the fetch iterator if we are already timed out
- # to avoid any unintentional interaction with fetcher setup
- if time.time() > timeout_at:
- continue
-
- for msg in self._fetcher:
- yield msg
- if time.time() > timeout_at:
- log.debug("internal iterator timeout - breaking for poll")
- break
-
- # an else block on a for loop only executes if there was no break
- # so this should only be called on a StopIteration from the fetcher
- # and we assume that it is safe to send_fetches when fetcher is done
- # i.e., there are no more records stored internally
- else:
- self._fetcher.send_fetches()
-
- def _next_timeout(self):
- timeout = min(self._consumer_timeout,
- self._client._delayed_tasks.next_at() + time.time(),
- self._client.cluster.ttl() / 1000.0 + time.time())
-
- # Although the delayed_tasks timeout above should cover processing
- # HeartbeatRequests, it is still possible that HeartbeatResponses
- # are left unprocessed during a long _fetcher iteration without
- # an intermediate poll(). And because tasks are responsible for
- # rescheduling themselves, an unprocessed response will prevent
- # the next heartbeat from being sent. This check should help
- # avoid that.
- if self._use_consumer_group():
- heartbeat = time.time() + self._coordinator.heartbeat.ttl()
- timeout = min(timeout, heartbeat)
- return timeout
-
def __iter__(self): # pylint: disable=non-iterator-returned
return self
def __next__(self):
- if not self._iterator:
- self._iterator = self._message_generator()
-
- self._set_consumer_timeout()
- try:
- return next(self._iterator)
- except StopIteration:
- self._iterator = None
- raise
-
- def _set_consumer_timeout(self):
- # consumer_timeout_ms can be used to stop iteration early
- if self.config['consumer_timeout_ms'] >= 0:
- self._consumer_timeout = time.time() + (
- self.config['consumer_timeout_ms'] / 1000.0)
+ ret = self.poll(timeout_ms=self.config['consumer_timeout_ms'], max_records=1)
+ if not ret:
+ raise StopIteration
+ assert len(ret.keys()) == 1
+ (messages,) = ret.values()
+ assert len(messages) == 1
+ return messages[0]
# old KafkaConsumer methods are deprecated
def configure(self, **configs):