summaryrefslogtreecommitdiff
path: root/kafka/consumer
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2014-09-15 21:16:49 -0700
committerDana Powers <dana.powers@rd.io>2014-12-15 12:42:55 -0800
commit5b882981d17cfec06cf2f7b44ff34313e7f0180a (patch)
tree91953a551f9c21250e44e765216c0f4016073141 /kafka/consumer
parent54ecfeaec05d4a4c85a37310885430b771c8bc57 (diff)
downloadkafka-python-5b882981d17cfec06cf2f7b44ff34313e7f0180a.tar.gz
Use 4-space indents
Diffstat (limited to 'kafka/consumer')
-rw-r--r--kafka/consumer/new.py1035
1 files changed, 517 insertions, 518 deletions
diff --git a/kafka/consumer/new.py b/kafka/consumer/new.py
index 04696af..f0a4424 100644
--- a/kafka/consumer/new.py
+++ b/kafka/consumer/new.py
@@ -44,253 +44,60 @@ DEFAULT_CONSUMER_CONFIG = {
class KafkaConsumer(object):
- """
- A simpler kafka consumer
-
- ```
- # A very basic 'tail' consumer, with no stored offset management
- kafka = KafkaConsumer('topic1')
- for m in kafka:
- print m
-
- # Alternate interface: next()
- while True:
- print kafka.next()
-
- # Alternate interface: batch iteration
- while True:
- for m in kafka.fetch_messages():
- print m
- print "Done with batch - let's do another!"
- ```
-
- ```
- # more advanced consumer -- multiple topics w/ auto commit offset management
- kafka = KafkaConsumer('topic1', 'topic2',
- group_id='my_consumer_group',
- auto_commit_enable=True,
- auto_commit_interval_ms=30 * 1000,
- auto_offset_reset='smallest')
-
- # Infinite iteration
- for m in kafka:
- process_message(m)
- kafka.task_done(m)
-
- # Alternate interface: next()
- while True:
- m = kafka.next()
- process_message(m)
- kafka.task_done(m)
-
- # Batch process interface does not auto_commit!
- while True:
- for m in kafka.fetch_messages():
- process_message(m)
- kafka.task_done(m)
- kafka.commit()
- ```
-
- messages (m) are namedtuples with attributes:
- m.topic: topic name (str)
- m.partition: partition number (int)
- m.offset: message offset on topic-partition log (int)
- m.key: key (bytes - can be None)
- m.value: message (output of deserializer_class - default is event object)
-
- Configuration settings can be passed to constructor,
- otherwise defaults will be used:
- client_id='kafka.consumer.XXX',
- group_id=None,
- fetch_message_max_bytes=1024*1024,
- fetch_min_bytes=1,
- fetch_wait_max_ms=100,
- refresh_leader_backoff_ms=200,
- metadata_broker_list=None,
- socket_timeout_ms=30*1000,
- auto_offset_reset='largest',
- deserializer_class=Event.from_bytes,
- auto_commit_enable=False,
- auto_commit_interval_ms=60 * 1000,
- consumer_timeout_ms=-1
-
-
- Configuration parameters are described in more detail at
- http://kafka.apache.org/documentation.html#highlevelconsumerapi
- """
-
- def __init__(self, *topics, **configs):
- self.configure(**configs)
- self.set_topic_partitions(*topics)
- self._msg_iter = None
-
- def _get_commit_offsets(self):
- logger.info("Consumer fetching stored offsets")
- for topic_partition in self._topics:
- (resp,) = self._client.send_offset_fetch_request(
- self._config['group_id'],
- [OffsetFetchRequest(topic_partition[0], topic_partition[1])],
- fail_on_error=False)
- try:
- check_error(resp)
- # API spec says server wont set an error here
- # but 0.8.1.1 does actually...
- except UnknownTopicOrPartitionError:
- pass
-
- # -1 offset signals no commit is currently stored
- if resp.offset == -1:
- self._offsets.commit[topic_partition] = None
-
- # Otherwise we committed the stored offset
- # and need to fetch the next one
- else:
- self._offsets.commit[topic_partition] = resp.offset
-
- def _reset_highwater_offsets(self):
- for topic_partition in self._topics:
- self._offsets.highwater[topic_partition] = None
-
- def _reset_task_done_offsets(self):
- for topic_partition in self._topics:
- self._offsets.task_done[topic_partition] = None
-
- def __repr__(self):
- return '<KafkaConsumer topics=(%s)>' % ', '.join(["%s-%d" % topic_partition
- for topic_partition in
- self._topics])
-
- def __iter__(self):
- return self
-
- def next(self):
- self._set_consumer_timeout_start()
- while True:
-
- # Fetch a new batch if needed
- if self._msg_iter is None:
- self._msg_iter = self.fetch_messages()
-
- # Check for auto-commit
- if self._should_auto_commit():
- self.commit()
-
- try:
- return self._msg_iter.next()
-
- # Handle batch completion
- except StopIteration:
- self._msg_iter = None
-
- self._check_consumer_timeout()
-
- def offsets(self, group=None):
- if not group:
- return {
- 'fetch': self.offsets('fetch'),
- 'commit': self.offsets('commit'),
- 'task_done': self.offsets('task_done'),
- 'highwater': self.offsets('highwater')
- }
- else:
- return dict(deepcopy(getattr(self._offsets, group)))
-
- def task_done(self, message):
- """
- Mark a fetched message as consumed.
- Offsets for messages marked as "task_done" will be stored back
- to the kafka cluster for this consumer group on commit()
"""
- topic_partition = (message.topic, message.partition)
- offset = message.offset
-
- # Warn on non-contiguous offsets
- prev_done = self._offsets.task_done[topic_partition]
- if prev_done is not None and offset != (prev_done + 1):
- logger.warning('Marking task_done on a non-continuous offset: %d != %d + 1',
- offset, prev_done)
-
- # Warn on smaller offsets than previous commit
- # "commit" offsets are actually the offset of the next # message to fetch.
- # so task_done should be compared with (commit - 1)
- prev_done = (self._offsets.commit[topic_partition] - 1)
- if prev_done is not None and (offset <= prev_done):
- logger.warning('Marking task_done on a previously committed offset?: %d <= %d',
- offset, prev_done)
-
- self._offsets.task_done[topic_partition] = offset
-
- def _should_auto_commit(self):
- if not self._config['auto_commit_enable']:
- return False
-
- if not self._next_commit:
- return False
-
- return (time.time() >= self._next_commit)
-
- def _set_next_auto_commit_time(self):
- self._next_commit = time.time() + (self._config['auto_commit_interval_ms'] / 1000.0)
-
- def commit(self):
- """
- Store consumed message offsets (marked via task_done())
- to kafka cluster for this consumer_group.
-
- Note -- this functionality requires server version >=0.8.1.1
- see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI
- """
- if not self._config['group_id']:
- logger.warning('Cannot commit without a group_id!')
- raise KafkaConfigurationError('Attempted to commit offsets without a configured consumer group (group_id)')
-
- # API supports storing metadata with each commit
- # but for now it is unused
- metadata = ''
-
- offsets = self._offsets.task_done
- commits = []
- for topic_partition, task_done_offset in offsets.iteritems():
-
- # Skip if None
- if task_done_offset is None:
- continue
-
- # Commit offsets as the next offset to fetch
- # which is consistent with the Java Client
- # task_done is marked by messages consumed,
- # so add one to mark the next message for fetching
- commit_offset = (task_done_offset + 1)
-
- # Skip if no change from previous committed
- if commit_offset == self._offsets.commit[topic_partition]:
- continue
-
- commits.append(OffsetCommitRequest(topic_partition[0], topic_partition[1], commit_offset, metadata))
+ A simpler kafka consumer
- if commits:
- logger.info('committing consumer offsets to group %s', self._config['group_id'])
- resps = self._client.send_offset_commit_request(self._config['group_id'],
- commits,
- fail_on_error=False)
+ ```
+ # A very basic 'tail' consumer, with no stored offset management
+ kafka = KafkaConsumer('topic1')
+ for m in kafka:
+ print m
- for r in resps:
- check_error(r)
- topic_partition = (r.topic, r.partition)
- task_done = self._offsets.task_done[topic_partition]
- self._offsets.commit[topic_partition] = (task_done + 1)
+ # Alternate interface: next()
+ while True:
+ print kafka.next()
- if self._config['auto_commit_enable']:
- self._set_next_auto_commit_time()
+ # Alternate interface: batch iteration
+ while True:
+ for m in kafka.fetch_messages():
+ print m
+ print "Done with batch - let's do another!"
+ ```
+
+ ```
+ # more advanced consumer -- multiple topics w/ auto commit offset management
+ kafka = KafkaConsumer('topic1', 'topic2',
+ group_id='my_consumer_group',
+ auto_commit_enable=True,
+ auto_commit_interval_ms=30 * 1000,
+ auto_offset_reset='smallest')
+
+ # Infinite iteration
+ for m in kafka:
+ process_message(m)
+ kafka.task_done(m)
- return True
+ # Alternate interface: next()
+ while True:
+ m = kafka.next()
+ process_message(m)
+ kafka.task_done(m)
- else:
- logger.info('No new offsets found to commit in group %s', self._config['group_id'])
- return False
+ # Batch process interface does not auto_commit!
+ while True:
+ for m in kafka.fetch_messages():
+ process_message(m)
+ kafka.task_done(m)
+ kafka.commit()
+ ```
+
+ messages (m) are namedtuples with attributes:
+ m.topic: topic name (str)
+ m.partition: partition number (int)
+ m.offset: message offset on topic-partition log (int)
+ m.key: key (bytes - can be None)
+ m.value: message (output of deserializer_class - default is event object)
- def configure(self, **configs):
- """
Configuration settings can be passed to constructor,
otherwise defaults will be used:
client_id='kafka.consumer.XXX',
@@ -311,303 +118,495 @@ class KafkaConsumer(object):
Configuration parameters are described in more detail at
http://kafka.apache.org/documentation.html#highlevelconsumerapi
"""
- self._config = {}
- for key in DEFAULT_CONSUMER_CONFIG:
- self._config[key] = configs.pop(key, DEFAULT_CONSUMER_CONFIG[key])
-
- if configs:
- raise KafkaConfigurationError('Unknown configuration key(s): ' +
- str(list(configs.keys())))
-
- if self._config['auto_commit_enable']:
- if not self._config['group_id']:
- raise KafkaConfigurationError('KafkaConsumer configured to auto-commit without required consumer group (group_id)')
- # Check auto-commit configuration
- if self._config['auto_commit_enable']:
- logger.info("Configuring consumer to auto-commit offsets")
- self._set_next_auto_commit_time()
-
- if self._config['metadata_broker_list'] is None:
- raise KafkaConfigurationError('metadata_broker_list required to '
- 'configure KafkaConsumer')
-
- self._client = KafkaClient(self._config['metadata_broker_list'],
- client_id=self._config['client_id'],
- timeout=(self._config['socket_timeout_ms'] / 1000.0))
+ def __init__(self, *topics, **configs):
+ self.configure(**configs)
+ self.set_topic_partitions(*topics)
+ self._msg_iter = None
- def set_topic_partitions(self, *topics):
- """
- Set the topic/partitions to consume
- Optionally specify offsets to start from
+ def _get_commit_offsets(self):
+ logger.info("Consumer fetching stored offsets")
+ for topic_partition in self._topics:
+ (resp,) = self._client.send_offset_fetch_request(
+ self._config['group_id'],
+ [OffsetFetchRequest(topic_partition[0], topic_partition[1])],
+ fail_on_error=False)
+ try:
+ check_error(resp)
+ # API spec says server wont set an error here
+ # but 0.8.1.1 does actually...
+ except UnknownTopicOrPartitionError:
+ pass
+
+ # -1 offset signals no commit is currently stored
+ if resp.offset == -1:
+ self._offsets.commit[topic_partition] = None
+
+ # Otherwise we committed the stored offset
+ # and need to fetch the next one
+ else:
+ self._offsets.commit[topic_partition] = resp.offset
- Accepts types:
- str: topic name (will consume all available partitions)
- tuple: (topic, partition)
- dict: { topic: partition }
- { topic: [partition list] }
- { topic: (partition tuple,) }
+ def _reset_highwater_offsets(self):
+ for topic_partition in self._topics:
+ self._offsets.highwater[topic_partition] = None
- Optionally, offsets can be specified directly:
- tuple: (topic, partition, offset)
- dict: { (topic, partition): offset, ... }
+ def _reset_task_done_offsets(self):
+ for topic_partition in self._topics:
+ self._offsets.task_done[topic_partition] = None
- Ex:
- kafka = KafkaConsumer()
+ def __repr__(self):
+ return '<KafkaConsumer topics=(%s)>' % ', '.join(["%s-%d" % topic_partition
+ for topic_partition in
+ self._topics])
- # Consume topic1-all; topic2-partition2; topic3-partition0
- kafka.set_topic_partitions("topic1", ("topic2", 2), {"topic3": 0})
+ def __iter__(self):
+ return self
- # Consume topic1-0 starting at offset 123, and topic2-1 at offset 456
- # using tuples --
- kafka.set_topic_partitions(("topic1", 0, 123), ("topic2", 1, 456))
+ def next(self):
+ self._set_consumer_timeout_start()
+ while True:
- # using dict --
- kafka.set_topic_partitions({ ("topic1", 0): 123, ("topic2", 1): 456 })
- """
- self._topics = []
- self._client.load_metadata_for_topics()
-
- # Setup offsets
- self._offsets = OffsetsStruct(fetch=dict(),
- commit=dict(),
- highwater=dict(),
- task_done=dict())
-
- # Handle different topic types
- for arg in topics:
-
- # Topic name str -- all partitions
- if isinstance(arg, six.string_types):
- topic = arg
- for partition in self._client.get_partition_ids_for_topic(arg):
- self._consume_topic_partition(topic, partition)
-
- # (topic, partition [, offset]) tuple
- elif isinstance(arg, tuple):
- (topic, partition) = arg[0:2]
- if len(arg) == 3:
- offset = arg[2]
- self._offsets.fetch[(topic, partition)] = offset
- self._consume_topic_partition(topic, partition)
-
- # { topic: partitions, ... } dict
- elif isinstance(arg, dict):
- for key, value in arg.iteritems():
-
- # key can be string (a topic)
- if isinstance(key, six.string_types):
-
- # topic: partition
- if isinstance(value, int):
- self._consume_topic_partition(key, value)
-
- # topic: [ partition1, partition2, ... ]
- elif isinstance(value, (list, tuple)):
- for partition in value:
- self._consume_topic_partition(key, partition)
- else:
- raise KafkaConfigurationError('Unknown topic type (dict key must be '
- 'int or list/tuple of ints)')
+ # Fetch a new batch if needed
+ if self._msg_iter is None:
+ self._msg_iter = self.fetch_messages()
- # (topic, partition): offset
- elif isinstance(key, tuple):
- self._consume_topic_partition(*key)
- self._offsets.fetch[key] = value
+ # Check for auto-commit
+ if self._should_auto_commit():
+ self.commit()
- else:
- raise KafkaConfigurationError('Unknown topic type (%s)' % type(arg))
+ try:
+ return self._msg_iter.next()
- # If we have a consumer group, try to fetch stored offsets
- if self._config['group_id']:
- self._get_commit_offsets()
+ # Handle batch completion
+ except StopIteration:
+ self._msg_iter = None
- # Update missing fetch/commit offsets
- for topic_partition in self._topics:
+ self._check_consumer_timeout()
- # Commit offsets default is None
- if topic_partition not in self._offsets.commit:
- self._offsets.commit[topic_partition] = None
+ def offsets(self, group=None):
+ if not group:
+ return {
+ 'fetch': self.offsets('fetch'),
+ 'commit': self.offsets('commit'),
+ 'task_done': self.offsets('task_done'),
+ 'highwater': self.offsets('highwater')
+ }
+ else:
+ return dict(deepcopy(getattr(self._offsets, group)))
+
+ def task_done(self, message):
+ """
+ Mark a fetched message as consumed.
+ Offsets for messages marked as "task_done" will be stored back
+ to the kafka cluster for this consumer group on commit()
+ """
+ topic_partition = (message.topic, message.partition)
+ offset = message.offset
+
+ # Warn on non-contiguous offsets
+ prev_done = self._offsets.task_done[topic_partition]
+ if prev_done is not None and offset != (prev_done + 1):
+ logger.warning('Marking task_done on a non-continuous offset: %d != %d + 1',
+ offset, prev_done)
+
+ # Warn on smaller offsets than previous commit
+ # "commit" offsets are actually the offset of the next # message to fetch.
+ # so task_done should be compared with (commit - 1)
+ prev_done = (self._offsets.commit[topic_partition] - 1)
+ if prev_done is not None and (offset <= prev_done):
+ logger.warning('Marking task_done on a previously committed offset?: %d <= %d',
+ offset, prev_done)
+
+ self._offsets.task_done[topic_partition] = offset
+
+ def _should_auto_commit(self):
+ if not self._config['auto_commit_enable']:
+ return False
+
+ if not self._next_commit:
+ return False
+
+ return (time.time() >= self._next_commit)
+
+ def _set_next_auto_commit_time(self):
+ self._next_commit = time.time() + (self._config['auto_commit_interval_ms'] / 1000.0)
+
+ def commit(self):
+ """
+ Store consumed message offsets (marked via task_done())
+ to kafka cluster for this consumer_group.
+
+ Note -- this functionality requires server version >=0.8.1.1
+ see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI
+ """
+ if not self._config['group_id']:
+ logger.warning('Cannot commit without a group_id!')
+ raise KafkaConfigurationError('Attempted to commit offsets without a configured consumer group (group_id)')
+
+ # API supports storing metadata with each commit
+ # but for now it is unused
+ metadata = ''
+
+ offsets = self._offsets.task_done
+ commits = []
+ for topic_partition, task_done_offset in offsets.iteritems():
+
+ # Skip if None
+ if task_done_offset is None:
+ continue
+
+ # Commit offsets as the next offset to fetch
+ # which is consistent with the Java Client
+ # task_done is marked by messages consumed,
+ # so add one to mark the next message for fetching
+ commit_offset = (task_done_offset + 1)
+
+ # Skip if no change from previous committed
+ if commit_offset == self._offsets.commit[topic_partition]:
+ continue
+
+ commits.append(OffsetCommitRequest(topic_partition[0], topic_partition[1], commit_offset, metadata))
+
+ if commits:
+ logger.info('committing consumer offsets to group %s', self._config['group_id'])
+ resps = self._client.send_offset_commit_request(self._config['group_id'],
+ commits,
+ fail_on_error=False)
+
+ for r in resps:
+ check_error(r)
+ topic_partition = (r.topic, r.partition)
+ task_done = self._offsets.task_done[topic_partition]
+ self._offsets.commit[topic_partition] = (task_done + 1)
+
+ if self._config['auto_commit_enable']:
+ self._set_next_auto_commit_time()
+
+ return True
- # Skip if we already have a fetch offset from user args
- if topic_partition not in self._offsets.fetch:
+ else:
+ logger.info('No new offsets found to commit in group %s', self._config['group_id'])
+ return False
+
+ def configure(self, **configs):
+ """
+ Configuration settings can be passed to constructor,
+ otherwise defaults will be used:
+ client_id='kafka.consumer.XXX',
+ group_id=None,
+ fetch_message_max_bytes=1024*1024,
+ fetch_min_bytes=1,
+ fetch_wait_max_ms=100,
+ refresh_leader_backoff_ms=200,
+ metadata_broker_list=None,
+ socket_timeout_ms=30*1000,
+ auto_offset_reset='largest',
+ deserializer_class=Event.from_bytes,
+ auto_commit_enable=False,
+ auto_commit_interval_ms=60 * 1000,
+ consumer_timeout_ms=-1
+
+ Configuration parameters are described in more detail at
+ http://kafka.apache.org/documentation.html#highlevelconsumerapi
+ """
+ self._config = {}
+ for key in DEFAULT_CONSUMER_CONFIG:
+ self._config[key] = configs.pop(key, DEFAULT_CONSUMER_CONFIG[key])
+
+ if configs:
+ raise KafkaConfigurationError('Unknown configuration key(s): ' +
+ str(list(configs.keys())))
+
+ if self._config['auto_commit_enable']:
+ if not self._config['group_id']:
+ raise KafkaConfigurationError('KafkaConsumer configured to auto-commit without required consumer group (group_id)')
+
+ # Check auto-commit configuration
+ if self._config['auto_commit_enable']:
+ logger.info("Configuring consumer to auto-commit offsets")
+ self._set_next_auto_commit_time()
+
+ if self._config['metadata_broker_list'] is None:
+ raise KafkaConfigurationError('metadata_broker_list required to '
+ 'configure KafkaConsumer')
+
+ self._client = KafkaClient(self._config['metadata_broker_list'],
+ client_id=self._config['client_id'],
+ timeout=(self._config['socket_timeout_ms'] / 1000.0))
+
+ def set_topic_partitions(self, *topics):
+ """
+ Set the topic/partitions to consume
+ Optionally specify offsets to start from
+
+ Accepts types:
+ str: topic name (will consume all available partitions)
+ tuple: (topic, partition)
+ dict: { topic: partition }
+ { topic: [partition list] }
+ { topic: (partition tuple,) }
+
+ Optionally, offsets can be specified directly:
+ tuple: (topic, partition, offset)
+ dict: { (topic, partition): offset, ... }
+
+ Ex:
+ kafka = KafkaConsumer()
+
+ # Consume topic1-all; topic2-partition2; topic3-partition0
+ kafka.set_topic_partitions("topic1", ("topic2", 2), {"topic3": 0})
+
+ # Consume topic1-0 starting at offset 123, and topic2-1 at offset 456
+ # using tuples --
+ kafka.set_topic_partitions(("topic1", 0, 123), ("topic2", 1, 456))
+
+ # using dict --
+ kafka.set_topic_partitions({ ("topic1", 0): 123, ("topic2", 1): 456 })
+ """
+ self._topics = []
+ self._client.load_metadata_for_topics()
- # Fetch offsets default is (1) commit
- if self._offsets.commit[topic_partition] is not None:
- self._offsets.fetch[topic_partition] = self._offsets.commit[topic_partition]
+ # Setup offsets
+ self._offsets = OffsetsStruct(fetch=dict(),
+ commit=dict(),
+ highwater=dict(),
+ task_done=dict())
+
+ # Handle different topic types
+ for arg in topics:
+
+ # Topic name str -- all partitions
+ if isinstance(arg, six.string_types):
+ topic = arg
+ for partition in self._client.get_partition_ids_for_topic(arg):
+ self._consume_topic_partition(topic, partition)
+
+ # (topic, partition [, offset]) tuple
+ elif isinstance(arg, tuple):
+ (topic, partition) = arg[0:2]
+ if len(arg) == 3:
+ offset = arg[2]
+ self._offsets.fetch[(topic, partition)] = offset
+ self._consume_topic_partition(topic, partition)
+
+ # { topic: partitions, ... } dict
+ elif isinstance(arg, dict):
+ for key, value in arg.iteritems():
+
+ # key can be string (a topic)
+ if isinstance(key, six.string_types):
+
+ # topic: partition
+ if isinstance(value, int):
+ self._consume_topic_partition(key, value)
+
+ # topic: [ partition1, partition2, ... ]
+ elif isinstance(value, (list, tuple)):
+ for partition in value:
+ self._consume_topic_partition(key, partition)
+ else:
+ raise KafkaConfigurationError('Unknown topic type (dict key must be '
+ 'int or list/tuple of ints)')
+
+ # (topic, partition): offset
+ elif isinstance(key, tuple):
+ self._consume_topic_partition(*key)
+ self._offsets.fetch[key] = value
- # or (2) auto reset
+ else:
+ raise KafkaConfigurationError('Unknown topic type (%s)' % type(arg))
+
+ # If we have a consumer group, try to fetch stored offsets
+ if self._config['group_id']:
+ self._get_commit_offsets()
+
+ # Update missing fetch/commit offsets
+ for topic_partition in self._topics:
+
+ # Commit offsets default is None
+ if topic_partition not in self._offsets.commit:
+ self._offsets.commit[topic_partition] = None
+
+ # Skip if we already have a fetch offset from user args
+ if topic_partition not in self._offsets.fetch:
+
+ # Fetch offsets default is (1) commit
+ if self._offsets.commit[topic_partition] is not None:
+ self._offsets.fetch[topic_partition] = self._offsets.commit[topic_partition]
+
+ # or (2) auto reset
+ else:
+ self._offsets.fetch[topic_partition] = self._reset_partition_offset(topic_partition)
+
+ # highwater marks (received from server on fetch response)
+ # and task_done (set locally by user)
+ # should always get initialized to None
+ self._reset_highwater_offsets()
+ self._reset_task_done_offsets()
+
+ def _consume_topic_partition(self, topic, partition):
+ if not isinstance(topic, six.string_types):
+ raise KafkaConfigurationError('Unknown topic type (%s) '
+ '-- expected string' % type(topic))
+ if not isinstance(partition, int):
+ raise KafkaConfigurationError('Unknown partition type (%s) '
+ '-- expected int' % type(partition))
+
+ if topic not in self._client.topic_partitions:
+ raise UnknownTopicOrPartitionError("Topic %s not found in broker metadata" % topic)
+ if partition not in self._client.get_partition_ids_for_topic(topic):
+ raise UnknownTopicOrPartitionError("Partition %d not found in Topic %s "
+ "in broker metadata" % (partition, topic))
+ logger.info("Configuring consumer to fetch topic '%s', partition %d", topic, partition)
+ self._topics.append((topic, partition))
+
+ def fetch_messages(self):
+
+ max_bytes = self._config['fetch_message_max_bytes']
+ max_wait_time = self._config['fetch_wait_max_ms']
+ min_bytes = self._config['fetch_min_bytes']
+
+ fetches = []
+ offsets = self._offsets.fetch
+ for topic_partition, offset in offsets.iteritems():
+ fetches.append(FetchRequest(topic_partition[0], topic_partition[1], offset, max_bytes))
+
+ # client.send_fetch_request will collect topic/partition requests by leader
+ # and send each group as a single FetchRequest to the correct broker
+ try:
+ responses = self._client.send_fetch_request(fetches,
+ max_wait_time=max_wait_time,
+ min_bytes=min_bytes,
+ fail_on_error=False)
+ except FailedPayloadsError:
+ logger.warning('FailedPayloadsError attempting to fetch data from kafka')
+ self._refresh_metadata_on_error()
+ return
+
+ for resp in responses:
+ topic_partition = (resp.topic, resp.partition)
+ try:
+ check_error(resp)
+ except OffsetOutOfRangeError:
+ logger.warning('OffsetOutOfRange: topic %s, partition %d, offset %d '
+ '(Highwatermark: %d)',
+ resp.topic, resp.partition,
+ offsets[topic_partition], resp.highwaterMark)
+ # Reset offset
+ self._offsets.fetch[topic_partition] = self._reset_partition_offset(topic_partition)
+ continue
+
+ except NotLeaderForPartitionError:
+ logger.warning("NotLeaderForPartitionError for %s - %d. "
+ "Metadata may be out of date",
+ resp.topic, resp.partition)
+ self._refresh_metadata_on_error()
+ continue
+
+ except RequestTimedOutError:
+ logger.warning("RequestTimedOutError for %s - %d",
+ resp.topic, resp.partition)
+ continue
+
+ # Track server highwater mark
+ self._offsets.highwater[topic_partition] = resp.highwaterMark
+
+ # Yield each message
+ # Kafka-python could raise an exception during iteration
+ # we are not catching -- user will need to address
+ for (offset, message) in resp.messages:
+ # deserializer_class could raise an exception here
+ msg = KafkaMessage(resp.topic,
+ resp.partition,
+ offset, message.key,
+ self._config['deserializer_class'](message.value))
+
+ # Only increment fetch offset if we safely got the message and deserialized
+ self._offsets.fetch[topic_partition] = offset + 1
+
+ # Then yield to user
+ yield msg
+
+ def _reset_partition_offset(self, topic_partition):
+ (topic, partition) = topic_partition
+ LATEST = -1
+ EARLIEST = -2
+
+ request_time_ms = None
+ if self._config['auto_offset_reset'] == 'largest':
+ request_time_ms = LATEST
+ elif self._config['auto_offset_reset'] == 'smallest':
+ request_time_ms = EARLIEST
else:
- self._offsets.fetch[topic_partition] = self._reset_partition_offset(topic_partition)
-
- # highwater marks (received from server on fetch response)
- # and task_done (set locally by user)
- # should always get initialized to None
- self._reset_highwater_offsets()
- self._reset_task_done_offsets()
-
- def _consume_topic_partition(self, topic, partition):
- if not isinstance(topic, six.string_types):
- raise KafkaConfigurationError('Unknown topic type (%s) '
- '-- expected string' % type(topic))
- if not isinstance(partition, int):
- raise KafkaConfigurationError('Unknown partition type (%s) '
- '-- expected int' % type(partition))
-
- if topic not in self._client.topic_partitions:
- raise UnknownTopicOrPartitionError("Topic %s not found in broker metadata" % topic)
- if partition not in self._client.get_partition_ids_for_topic(topic):
- raise UnknownTopicOrPartitionError("Partition %d not found in Topic %s "
- "in broker metadata" % (partition, topic))
- logger.info("Configuring consumer to fetch topic '%s', partition %d", topic, partition)
- self._topics.append((topic, partition))
-
- def fetch_messages(self):
-
- max_bytes = self._config['fetch_message_max_bytes']
- max_wait_time = self._config['fetch_wait_max_ms']
- min_bytes = self._config['fetch_min_bytes']
-
- fetches = []
- offsets = self._offsets.fetch
- for topic_partition, offset in offsets.iteritems():
- fetches.append(FetchRequest(topic_partition[0], topic_partition[1], offset, max_bytes))
-
- # client.send_fetch_request will collect topic/partition requests by leader
- # and send each group as a single FetchRequest to the correct broker
- try:
- responses = self._client.send_fetch_request(fetches,
- max_wait_time=max_wait_time,
- min_bytes=min_bytes,
- fail_on_error=False)
- except FailedPayloadsError:
- logger.warning('FailedPayloadsError attempting to fetch data from kafka')
- self._refresh_metadata_on_error()
- return
-
- for resp in responses:
- topic_partition = (resp.topic, resp.partition)
- try:
- check_error(resp)
- except OffsetOutOfRangeError:
- logger.warning('OffsetOutOfRange: topic %s, partition %d, offset %d '
- '(Highwatermark: %d)',
- resp.topic, resp.partition,
- offsets[topic_partition], resp.highwaterMark)
- # Reset offset
- self._offsets.fetch[topic_partition] = self._reset_partition_offset(topic_partition)
- continue
-
- except NotLeaderForPartitionError:
- logger.warning("NotLeaderForPartitionError for %s - %d. "
- "Metadata may be out of date",
- resp.topic, resp.partition)
- self._refresh_metadata_on_error()
- continue
-
- except RequestTimedOutError:
- logger.warning("RequestTimedOutError for %s - %d",
- resp.topic, resp.partition)
- continue
-
- # Track server highwater mark
- self._offsets.highwater[topic_partition] = resp.highwaterMark
-
- # Yield each message
- # Kafka-python could raise an exception during iteration
- # we are not catching -- user will need to address
- for (offset, message) in resp.messages:
- # deserializer_class could raise an exception here
- msg = KafkaMessage(resp.topic,
- resp.partition,
- offset, message.key,
- self._config['deserializer_class'](message.value))
-
- # Only increment fetch offset if we safely got the message and deserialized
- self._offsets.fetch[topic_partition] = offset + 1
-
- # Then yield to user
- yield msg
-
- def _reset_partition_offset(self, topic_partition):
- (topic, partition) = topic_partition
- LATEST = -1
- EARLIEST = -2
-
- request_time_ms = None
- if self._config['auto_offset_reset'] == 'largest':
- request_time_ms = LATEST
- elif self._config['auto_offset_reset'] == 'smallest':
- request_time_ms = EARLIEST
- else:
-
- # Let's raise an reasonable exception type if user calls
- # outside of an exception context
- if sys.exc_info() == (None, None, None):
- raise OffsetOutOfRangeError('Cannot reset partition offsets without a '
- 'valid auto_offset_reset setting '
- '(largest|smallest)')
-
- # Otherwise we should re-raise the upstream exception
- # b/c it typically includes additional data about
- # the request that triggered it, and we do not want to drop that
- raise
-
- (offset, ) = self.get_partition_offsets(topic, partition,
- request_time_ms, max_num_offsets=1)
- return offset
-
- def get_partition_offsets(self, topic, partition, request_time_ms, max_num_offsets):
- """
- Request available fetch offsets for a single topic/partition
-
- @param topic (str)
- @param partition (int)
- @param request_time_ms (int) -- Used to ask for all messages before a
- certain time (ms). There are two special
- values. Specify -1 to receive the latest
- offset (i.e. the offset of the next coming
- message) and -2 to receive the earliest
- available offset. Note that because offsets
- are pulled in descending order, asking for
- the earliest offset will always return you
- a single element.
- @param max_num_offsets (int)
-
- @return offsets (list)
- """
- reqs = [OffsetRequest(topic, partition, request_time_ms, max_num_offsets)]
- (resp,) = self._client.send_offset_request(reqs)
+ # Let's raise an reasonable exception type if user calls
+ # outside of an exception context
+ if sys.exc_info() == (None, None, None):
+ raise OffsetOutOfRangeError('Cannot reset partition offsets without a '
+ 'valid auto_offset_reset setting '
+ '(largest|smallest)')
+
+ # Otherwise we should re-raise the upstream exception
+ # b/c it typically includes additional data about
+ # the request that triggered it, and we do not want to drop that
+ raise
+
+ (offset, ) = self.get_partition_offsets(topic, partition,
+ request_time_ms, max_num_offsets=1)
+ return offset
+
+ def get_partition_offsets(self, topic, partition, request_time_ms, max_num_offsets):
+ """
+ Request available fetch offsets for a single topic/partition
+
+ @param topic (str)
+ @param partition (int)
+ @param request_time_ms (int) -- Used to ask for all messages before a
+ certain time (ms). There are two special
+ values. Specify -1 to receive the latest
+ offset (i.e. the offset of the next coming
+ message) and -2 to receive the earliest
+ available offset. Note that because offsets
+ are pulled in descending order, asking for
+ the earliest offset will always return you
+ a single element.
+ @param max_num_offsets (int)
+
+ @return offsets (list)
+ """
+ reqs = [OffsetRequest(topic, partition, request_time_ms, max_num_offsets)]
+
+ (resp,) = self._client.send_offset_request(reqs)
- check_error(resp)
+ check_error(resp)
- # Just for sanity..
- # probably unnecessary
- assert resp.topic == topic
- assert resp.partition == partition
+ # Just for sanity..
+ # probably unnecessary
+ assert resp.topic == topic
+ assert resp.partition == partition
+
+ return resp.offsets
+
+ def _refresh_metadata_on_error(self):
+ sleep_ms = self._config['refresh_leader_backoff_ms']
+ while True:
+ logger.info("Sleeping for refresh_leader_backoff_ms: %d", sleep_ms)
+ time.sleep(sleep_ms / 1000.0)
+ try:
+ self._client.load_metadata_for_topics()
+ except KafkaUnavailableError:
+ logger.warning("Unable to refresh topic metadata... cluster unavailable")
+ self._check_consumer_timeout()
+ else:
+ logger.info("Topic metadata refreshed")
+ return
- return resp.offsets
+ def _set_consumer_timeout_start(self):
+ self._consumer_timeout = False
+ if self._config['consumer_timeout_ms'] >= 0:
+ self._consumer_timeout = time.time() + (self._config['consumer_timeout_ms'] / 1000.0)
- def _refresh_metadata_on_error(self):
- sleep_ms = self._config['refresh_leader_backoff_ms']
- while True:
- logger.info("Sleeping for refresh_leader_backoff_ms: %d", sleep_ms)
- time.sleep(sleep_ms / 1000.0)
- try:
- self._client.load_metadata_for_topics()
- except KafkaUnavailableError:
- logger.warning("Unable to refresh topic metadata... cluster unavailable")
- self._check_consumer_timeout()
- else:
- logger.info("Topic metadata refreshed")
- return
-
- def _set_consumer_timeout_start(self):
- self._consumer_timeout = False
- if self._config['consumer_timeout_ms'] >= 0:
- self._consumer_timeout = time.time() + (self._config['consumer_timeout_ms'] / 1000.0)
-
- def _check_consumer_timeout(self):
- if self._consumer_timeout and time.time() > self._consumer_timeout:
- raise ConsumerTimeout('Consumer timed out after %d ms' % + self._config['consumer_timeout_ms'])
+ def _check_consumer_timeout(self):
+ if self._consumer_timeout and time.time() > self._consumer_timeout:
+ raise ConsumerTimeout('Consumer timed out after %d ms' % + self._config['consumer_timeout_ms'])