diff options
-rw-r--r-- | kafka/consumer/fetcher.py | 28 | ||||
-rw-r--r-- | kafka/consumer/group.py | 28 |
2 files changed, 44 insertions, 12 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 5e15424..ddf9d6f 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -28,7 +28,7 @@ class RecordTooLargeError(Errors.KafkaError): pass -class Fetcher(object): +class Fetcher(six.Iterator): DEFAULT_CONFIG = { 'key_deserializer': None, 'value_deserializer': None, @@ -79,6 +79,7 @@ class Fetcher(object): self._unauthorized_topics = set() self._offset_out_of_range_partitions = dict() # {topic_partition: offset} self._record_too_large_partitions = dict() # {topic_partition: offset} + self._iterator = None #self.sensors = FetchManagerMetrics(metrics, metric_group_prefix) @@ -253,7 +254,7 @@ class Fetcher(object): def fetched_records(self): """Returns previously fetched records and updates consumed offsets. - NOTE: returning empty records guarantees the consumed position are NOT updated. + Incompatible with iterator interface - use one or the other, not both. Raises: OffsetOutOfRangeError: if no subscription offset_reset_strategy @@ -263,10 +264,13 @@ class Fetcher(object): configured max_partition_fetch_bytes TopicAuthorizationError: if consumer is not authorized to fetch messages from the topic + AssertionError: if used with iterator (incompatible) Returns: dict: {TopicPartition: deque([messages])} """ + assert self._iterator is None, ( + 'fetched_records is incompatible with message iterator') if self._subscriptions.needs_partition_assignment: return {} @@ -324,7 +328,7 @@ class Fetcher(object): key, value = self._deserialize(msg) yield ConsumerRecord(tp.topic, tp.partition, offset, key, value) - def __iter__(self): + def _message_generator(self): """Iterate over fetched_records""" if self._subscriptions.needs_partition_assignment: raise StopIteration('Subscription needs partition assignment') @@ -342,7 +346,7 @@ class Fetcher(object): # this can happen when a rebalance happened before # fetched records are returned log.warning("Not returning fetched records for partition %s" - " since it is no longer assigned", tp) + " since it is no longer assigned", tp) continue # note that the consumed position should always be available @@ -352,7 +356,7 @@ class Fetcher(object): # this can happen when a partition consumption paused before # fetched records are returned log.warning("Not returning fetched records for assigned partition" - " %s since it is no longer fetchable", tp) + " %s since it is no longer fetchable", tp) # we also need to reset the fetch positions to pretend we did # not fetch this partition in the previous request at all @@ -366,13 +370,25 @@ class Fetcher(object): # these records aren't next in line based on the last consumed # position, ignore them they must be from an obsolete request log.warning("Ignoring fetched records for %s at offset %s", - tp, fetch_offset) + tp, fetch_offset) # Send any additional FetchRequests that we can now # this will likely fetch each partition individually, rather than # fetch multiple partitions in bulk when they are on the same broker self.init_fetches() + def __iter__(self): + return self + + def __next__(self): + if not self._iterator: + self._iterator = self._message_generator() + try: + return next(self._iterator) + except StopIteration: + self._iterator = None + raise + def _deserialize(self, msg): if self.config['key_deserializer']: key = self.config['key_deserializer'](msg.key) # pylint: disable-msg=not-callable diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 5278214..cea2e1c 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -4,6 +4,8 @@ import copy import logging import time +import six + from kafka.client_async import KafkaClient from kafka.consumer.fetcher import Fetcher from kafka.consumer.subscription_state import SubscriptionState @@ -15,7 +17,7 @@ from kafka.version import __version__ log = logging.getLogger(__name__) -class KafkaConsumer(object): +class KafkaConsumer(six.Iterator): """Consumer for Kafka 0.9""" DEFAULT_CONFIG = { 'bootstrap_servers': 'localhost', @@ -160,6 +162,7 @@ class KafkaConsumer(object): assignors=self.config['partition_assignment_strategy'], **self.config) self._closed = False + self._iterator = None #self.metrics = None if topics: @@ -324,16 +327,16 @@ class KafkaConsumer(object): return self._client.cluster.partitions_for_topic(topic) def poll(self, timeout_ms=0): - """ - Fetch data for the topics or partitions specified using one of the - subscribe/assign APIs. It is an error to not have subscribed to any - topics or partitions before polling for data. + """Fetch data from assigned topics / partitions. + Records are fetched and returned in batches by topic-partition. On each poll, consumer will try to use the last consumed offset as the starting offset and fetch sequentially. The last consumed offset can be manually set through seek(partition, offset) or automatically set as the last committed offset for the subscribed list of partitions. + Incompatible with iterator interface -- use one or the other, not both. + Arguments: timeout_ms (int, optional): milliseconds to spend waiting in poll if data is not available. If 0, returns immediately with any @@ -344,6 +347,7 @@ class KafkaConsumer(object): subscribed list of topics and partitions """ assert timeout_ms >= 0, 'Timeout must not be negative' + assert self._iterator is None, 'Incompatible with iterator interface' # poll for new data until the timeout expires start = time.time() @@ -564,7 +568,7 @@ class KafkaConsumer(object): # then do any offset lookups in case some positions are not known self._fetcher.update_fetch_positions(partitions) - def __iter__(self): + def _message_generator(self): while True: self._coordinator.ensure_coordinator_known() @@ -585,3 +589,15 @@ class KafkaConsumer(object): yield msg if time.time() > timeout: break + + def __iter__(self): + return self + + def __next__(self): + if not self._iterator: + self._iterator = self._message_generator() + try: + return next(self._iterator) + except StopIteration: + self._iterator = None + raise |