summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2016-01-02 13:18:10 -0800
committerDana Powers <dana.powers@rd.io>2016-01-02 13:18:10 -0800
commit24a4c2a7c5a1265899316aca86a1149496d6564e (patch)
treea917f505c3bd05284c2ac7aefbf99da04cf77503
parent976970f89acfdb3582feed613722158004b0ff3e (diff)
downloadkafka-python-24a4c2a7c5a1265899316aca86a1149496d6564e.tar.gz
Improve iterator interface
- Support single message consumption via next(consumer) in py2/py3 - batch message methods (Fetcher.fetched_records / KafkaConsumer.poll) are incompatible with iterators -- message generator state keeps messages internally after they are popped from _records, but before subscription_state is updated.
-rw-r--r--kafka/consumer/fetcher.py28
-rw-r--r--kafka/consumer/group.py28
2 files changed, 44 insertions, 12 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index 5e15424..ddf9d6f 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -28,7 +28,7 @@ class RecordTooLargeError(Errors.KafkaError):
pass
-class Fetcher(object):
+class Fetcher(six.Iterator):
DEFAULT_CONFIG = {
'key_deserializer': None,
'value_deserializer': None,
@@ -79,6 +79,7 @@ class Fetcher(object):
self._unauthorized_topics = set()
self._offset_out_of_range_partitions = dict() # {topic_partition: offset}
self._record_too_large_partitions = dict() # {topic_partition: offset}
+ self._iterator = None
#self.sensors = FetchManagerMetrics(metrics, metric_group_prefix)
@@ -253,7 +254,7 @@ class Fetcher(object):
def fetched_records(self):
"""Returns previously fetched records and updates consumed offsets.
- NOTE: returning empty records guarantees the consumed position are NOT updated.
+ Incompatible with iterator interface - use one or the other, not both.
Raises:
OffsetOutOfRangeError: if no subscription offset_reset_strategy
@@ -263,10 +264,13 @@ class Fetcher(object):
configured max_partition_fetch_bytes
TopicAuthorizationError: if consumer is not authorized to fetch
messages from the topic
+ AssertionError: if used with iterator (incompatible)
Returns:
dict: {TopicPartition: deque([messages])}
"""
+ assert self._iterator is None, (
+ 'fetched_records is incompatible with message iterator')
if self._subscriptions.needs_partition_assignment:
return {}
@@ -324,7 +328,7 @@ class Fetcher(object):
key, value = self._deserialize(msg)
yield ConsumerRecord(tp.topic, tp.partition, offset, key, value)
- def __iter__(self):
+ def _message_generator(self):
"""Iterate over fetched_records"""
if self._subscriptions.needs_partition_assignment:
raise StopIteration('Subscription needs partition assignment')
@@ -342,7 +346,7 @@ class Fetcher(object):
# this can happen when a rebalance happened before
# fetched records are returned
log.warning("Not returning fetched records for partition %s"
- " since it is no longer assigned", tp)
+ " since it is no longer assigned", tp)
continue
# note that the consumed position should always be available
@@ -352,7 +356,7 @@ class Fetcher(object):
# this can happen when a partition consumption paused before
# fetched records are returned
log.warning("Not returning fetched records for assigned partition"
- " %s since it is no longer fetchable", tp)
+ " %s since it is no longer fetchable", tp)
# we also need to reset the fetch positions to pretend we did
# not fetch this partition in the previous request at all
@@ -366,13 +370,25 @@ class Fetcher(object):
# these records aren't next in line based on the last consumed
# position, ignore them they must be from an obsolete request
log.warning("Ignoring fetched records for %s at offset %s",
- tp, fetch_offset)
+ tp, fetch_offset)
# Send any additional FetchRequests that we can now
# this will likely fetch each partition individually, rather than
# fetch multiple partitions in bulk when they are on the same broker
self.init_fetches()
+ def __iter__(self):
+ return self
+
+ def __next__(self):
+ if not self._iterator:
+ self._iterator = self._message_generator()
+ try:
+ return next(self._iterator)
+ except StopIteration:
+ self._iterator = None
+ raise
+
def _deserialize(self, msg):
if self.config['key_deserializer']:
key = self.config['key_deserializer'](msg.key) # pylint: disable-msg=not-callable
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index 5278214..cea2e1c 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -4,6 +4,8 @@ import copy
import logging
import time
+import six
+
from kafka.client_async import KafkaClient
from kafka.consumer.fetcher import Fetcher
from kafka.consumer.subscription_state import SubscriptionState
@@ -15,7 +17,7 @@ from kafka.version import __version__
log = logging.getLogger(__name__)
-class KafkaConsumer(object):
+class KafkaConsumer(six.Iterator):
"""Consumer for Kafka 0.9"""
DEFAULT_CONFIG = {
'bootstrap_servers': 'localhost',
@@ -160,6 +162,7 @@ class KafkaConsumer(object):
assignors=self.config['partition_assignment_strategy'],
**self.config)
self._closed = False
+ self._iterator = None
#self.metrics = None
if topics:
@@ -324,16 +327,16 @@ class KafkaConsumer(object):
return self._client.cluster.partitions_for_topic(topic)
def poll(self, timeout_ms=0):
- """
- Fetch data for the topics or partitions specified using one of the
- subscribe/assign APIs. It is an error to not have subscribed to any
- topics or partitions before polling for data.
+ """Fetch data from assigned topics / partitions.
+ Records are fetched and returned in batches by topic-partition.
On each poll, consumer will try to use the last consumed offset as the
starting offset and fetch sequentially. The last consumed offset can be
manually set through seek(partition, offset) or automatically set as
the last committed offset for the subscribed list of partitions.
+ Incompatible with iterator interface -- use one or the other, not both.
+
Arguments:
timeout_ms (int, optional): milliseconds to spend waiting in poll if
data is not available. If 0, returns immediately with any
@@ -344,6 +347,7 @@ class KafkaConsumer(object):
subscribed list of topics and partitions
"""
assert timeout_ms >= 0, 'Timeout must not be negative'
+ assert self._iterator is None, 'Incompatible with iterator interface'
# poll for new data until the timeout expires
start = time.time()
@@ -564,7 +568,7 @@ class KafkaConsumer(object):
# then do any offset lookups in case some positions are not known
self._fetcher.update_fetch_positions(partitions)
- def __iter__(self):
+ def _message_generator(self):
while True:
self._coordinator.ensure_coordinator_known()
@@ -585,3 +589,15 @@ class KafkaConsumer(object):
yield msg
if time.time() > timeout:
break
+
+ def __iter__(self):
+ return self
+
+ def __next__(self):
+ if not self._iterator:
+ self._iterator = self._message_generator()
+ try:
+ return next(self._iterator)
+ except StopIteration:
+ self._iterator = None
+ raise