summaryrefslogtreecommitdiff
path: root/kafka/consumer
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2014-09-15 21:48:04 -0700
committerDana Powers <dana.powers@rd.io>2014-12-15 12:42:55 -0800
commit08f6ad94556256d710a5d4b517986111de32ffa1 (patch)
treeeca1f10cd8195db62f9f11b56f3026a710307ee1 /kafka/consumer
parent5b882981d17cfec06cf2f7b44ff34313e7f0180a (diff)
downloadkafka-python-08f6ad94556256d710a5d4b517986111de32ffa1.tar.gz
Reorder methods, add docstrings to public methds, section comments for private methods
Diffstat (limited to 'kafka/consumer')
-rw-r--r--kafka/consumer/new.py462
1 files changed, 255 insertions, 207 deletions
diff --git a/kafka/consumer/new.py b/kafka/consumer/new.py
index f0a4424..abafae8 100644
--- a/kafka/consumer/new.py
+++ b/kafka/consumer/new.py
@@ -124,171 +124,6 @@ class KafkaConsumer(object):
self.set_topic_partitions(*topics)
self._msg_iter = None
- def _get_commit_offsets(self):
- logger.info("Consumer fetching stored offsets")
- for topic_partition in self._topics:
- (resp,) = self._client.send_offset_fetch_request(
- self._config['group_id'],
- [OffsetFetchRequest(topic_partition[0], topic_partition[1])],
- fail_on_error=False)
- try:
- check_error(resp)
- # API spec says server wont set an error here
- # but 0.8.1.1 does actually...
- except UnknownTopicOrPartitionError:
- pass
-
- # -1 offset signals no commit is currently stored
- if resp.offset == -1:
- self._offsets.commit[topic_partition] = None
-
- # Otherwise we committed the stored offset
- # and need to fetch the next one
- else:
- self._offsets.commit[topic_partition] = resp.offset
-
- def _reset_highwater_offsets(self):
- for topic_partition in self._topics:
- self._offsets.highwater[topic_partition] = None
-
- def _reset_task_done_offsets(self):
- for topic_partition in self._topics:
- self._offsets.task_done[topic_partition] = None
-
- def __repr__(self):
- return '<KafkaConsumer topics=(%s)>' % ', '.join(["%s-%d" % topic_partition
- for topic_partition in
- self._topics])
-
- def __iter__(self):
- return self
-
- def next(self):
- self._set_consumer_timeout_start()
- while True:
-
- # Fetch a new batch if needed
- if self._msg_iter is None:
- self._msg_iter = self.fetch_messages()
-
- # Check for auto-commit
- if self._should_auto_commit():
- self.commit()
-
- try:
- return self._msg_iter.next()
-
- # Handle batch completion
- except StopIteration:
- self._msg_iter = None
-
- self._check_consumer_timeout()
-
- def offsets(self, group=None):
- if not group:
- return {
- 'fetch': self.offsets('fetch'),
- 'commit': self.offsets('commit'),
- 'task_done': self.offsets('task_done'),
- 'highwater': self.offsets('highwater')
- }
- else:
- return dict(deepcopy(getattr(self._offsets, group)))
-
- def task_done(self, message):
- """
- Mark a fetched message as consumed.
- Offsets for messages marked as "task_done" will be stored back
- to the kafka cluster for this consumer group on commit()
- """
- topic_partition = (message.topic, message.partition)
- offset = message.offset
-
- # Warn on non-contiguous offsets
- prev_done = self._offsets.task_done[topic_partition]
- if prev_done is not None and offset != (prev_done + 1):
- logger.warning('Marking task_done on a non-continuous offset: %d != %d + 1',
- offset, prev_done)
-
- # Warn on smaller offsets than previous commit
- # "commit" offsets are actually the offset of the next # message to fetch.
- # so task_done should be compared with (commit - 1)
- prev_done = (self._offsets.commit[topic_partition] - 1)
- if prev_done is not None and (offset <= prev_done):
- logger.warning('Marking task_done on a previously committed offset?: %d <= %d',
- offset, prev_done)
-
- self._offsets.task_done[topic_partition] = offset
-
- def _should_auto_commit(self):
- if not self._config['auto_commit_enable']:
- return False
-
- if not self._next_commit:
- return False
-
- return (time.time() >= self._next_commit)
-
- def _set_next_auto_commit_time(self):
- self._next_commit = time.time() + (self._config['auto_commit_interval_ms'] / 1000.0)
-
- def commit(self):
- """
- Store consumed message offsets (marked via task_done())
- to kafka cluster for this consumer_group.
-
- Note -- this functionality requires server version >=0.8.1.1
- see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI
- """
- if not self._config['group_id']:
- logger.warning('Cannot commit without a group_id!')
- raise KafkaConfigurationError('Attempted to commit offsets without a configured consumer group (group_id)')
-
- # API supports storing metadata with each commit
- # but for now it is unused
- metadata = ''
-
- offsets = self._offsets.task_done
- commits = []
- for topic_partition, task_done_offset in offsets.iteritems():
-
- # Skip if None
- if task_done_offset is None:
- continue
-
- # Commit offsets as the next offset to fetch
- # which is consistent with the Java Client
- # task_done is marked by messages consumed,
- # so add one to mark the next message for fetching
- commit_offset = (task_done_offset + 1)
-
- # Skip if no change from previous committed
- if commit_offset == self._offsets.commit[topic_partition]:
- continue
-
- commits.append(OffsetCommitRequest(topic_partition[0], topic_partition[1], commit_offset, metadata))
-
- if commits:
- logger.info('committing consumer offsets to group %s', self._config['group_id'])
- resps = self._client.send_offset_commit_request(self._config['group_id'],
- commits,
- fail_on_error=False)
-
- for r in resps:
- check_error(r)
- topic_partition = (r.topic, r.partition)
- task_done = self._offsets.task_done[topic_partition]
- self._offsets.commit[topic_partition] = (task_done + 1)
-
- if self._config['auto_commit_enable']:
- self._set_next_auto_commit_time()
-
- return True
-
- else:
- logger.info('No new offsets found to commit in group %s', self._config['group_id'])
- return False
-
def configure(self, **configs):
"""
Configuration settings can be passed to constructor,
@@ -445,23 +280,56 @@ class KafkaConsumer(object):
self._reset_highwater_offsets()
self._reset_task_done_offsets()
- def _consume_topic_partition(self, topic, partition):
- if not isinstance(topic, six.string_types):
- raise KafkaConfigurationError('Unknown topic type (%s) '
- '-- expected string' % type(topic))
- if not isinstance(partition, int):
- raise KafkaConfigurationError('Unknown partition type (%s) '
- '-- expected int' % type(partition))
- if topic not in self._client.topic_partitions:
- raise UnknownTopicOrPartitionError("Topic %s not found in broker metadata" % topic)
- if partition not in self._client.get_partition_ids_for_topic(topic):
- raise UnknownTopicOrPartitionError("Partition %d not found in Topic %s "
- "in broker metadata" % (partition, topic))
- logger.info("Configuring consumer to fetch topic '%s', partition %d", topic, partition)
- self._topics.append((topic, partition))
+ def next(self):
+ """
+ Return a single message from the message iterator
+ If consumer_timeout_ms is set, will raise ConsumerTimeout
+ if no message is available
+ Otherwise blocks indefinitely
+
+ Note that this is also the method called internally during iteration:
+ ```
+ for m in consumer:
+ pass
+ ```
+ """
+ self._set_consumer_timeout_start()
+ while True:
+
+ # Fetch a new batch if needed
+ if self._msg_iter is None:
+ self._msg_iter = self.fetch_messages()
+
+ # Check for auto-commit
+ if self._should_auto_commit():
+ self.commit()
+
+ try:
+ return self._msg_iter.next()
+
+ # Handle batch completion
+ except StopIteration:
+ self._msg_iter = None
+
+ self._check_consumer_timeout()
def fetch_messages(self):
+ """
+ Sends FetchRequests for all topic/partitions set for consumption
+ Returns a generator that yields KafkaMessage structs
+ after deserializing with the configured `deserializer_class`
+
+ Refreshes metadata on errors, and resets fetch offset on
+ OffsetOutOfRange, per the configured `auto_offset_reset` policy
+
+ Key configuration parameters:
+ `fetch_message_max_bytes`
+ `fetch_max_wait_ms`
+ `fetch_min_bytes`
+ `deserializer_class`
+ `auto_offset_reset`
+ """
max_bytes = self._config['fetch_message_max_bytes']
max_wait_time = self._config['fetch_wait_max_ms']
@@ -528,34 +396,6 @@ class KafkaConsumer(object):
# Then yield to user
yield msg
- def _reset_partition_offset(self, topic_partition):
- (topic, partition) = topic_partition
- LATEST = -1
- EARLIEST = -2
-
- request_time_ms = None
- if self._config['auto_offset_reset'] == 'largest':
- request_time_ms = LATEST
- elif self._config['auto_offset_reset'] == 'smallest':
- request_time_ms = EARLIEST
- else:
-
- # Let's raise an reasonable exception type if user calls
- # outside of an exception context
- if sys.exc_info() == (None, None, None):
- raise OffsetOutOfRangeError('Cannot reset partition offsets without a '
- 'valid auto_offset_reset setting '
- '(largest|smallest)')
-
- # Otherwise we should re-raise the upstream exception
- # b/c it typically includes additional data about
- # the request that triggered it, and we do not want to drop that
- raise
-
- (offset, ) = self.get_partition_offsets(topic, partition,
- request_time_ms, max_num_offsets=1)
- return offset
-
def get_partition_offsets(self, topic, partition, request_time_ms, max_num_offsets):
"""
Request available fetch offsets for a single topic/partition
@@ -588,6 +428,123 @@ class KafkaConsumer(object):
return resp.offsets
+ def offsets(self, group=None):
+ """
+ Returns a copy of internal offsets struct
+ optional param: group [fetch|commit|task_done|highwater]
+ if no group specified, returns all groups
+ """
+ if not group:
+ return {
+ 'fetch': self.offsets('fetch'),
+ 'commit': self.offsets('commit'),
+ 'task_done': self.offsets('task_done'),
+ 'highwater': self.offsets('highwater')
+ }
+ else:
+ return dict(deepcopy(getattr(self._offsets, group)))
+
+ def task_done(self, message):
+ """
+ Mark a fetched message as consumed.
+ Offsets for messages marked as "task_done" will be stored back
+ to the kafka cluster for this consumer group on commit()
+ """
+ topic_partition = (message.topic, message.partition)
+ offset = message.offset
+
+ # Warn on non-contiguous offsets
+ prev_done = self._offsets.task_done[topic_partition]
+ if prev_done is not None and offset != (prev_done + 1):
+ logger.warning('Marking task_done on a non-continuous offset: %d != %d + 1',
+ offset, prev_done)
+
+ # Warn on smaller offsets than previous commit
+ # "commit" offsets are actually the offset of the next # message to fetch.
+ # so task_done should be compared with (commit - 1)
+ prev_done = (self._offsets.commit[topic_partition] - 1)
+ if prev_done is not None and (offset <= prev_done):
+ logger.warning('Marking task_done on a previously committed offset?: %d <= %d',
+ offset, prev_done)
+
+ self._offsets.task_done[topic_partition] = offset
+
+ def commit(self):
+ """
+ Store consumed message offsets (marked via task_done())
+ to kafka cluster for this consumer_group.
+
+ Note -- this functionality requires server version >=0.8.1.1
+ see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI
+ """
+ if not self._config['group_id']:
+ logger.warning('Cannot commit without a group_id!')
+ raise KafkaConfigurationError('Attempted to commit offsets without a configured consumer group (group_id)')
+
+ # API supports storing metadata with each commit
+ # but for now it is unused
+ metadata = ''
+
+ offsets = self._offsets.task_done
+ commits = []
+ for topic_partition, task_done_offset in offsets.iteritems():
+
+ # Skip if None
+ if task_done_offset is None:
+ continue
+
+ # Commit offsets as the next offset to fetch
+ # which is consistent with the Java Client
+ # task_done is marked by messages consumed,
+ # so add one to mark the next message for fetching
+ commit_offset = (task_done_offset + 1)
+
+ # Skip if no change from previous committed
+ if commit_offset == self._offsets.commit[topic_partition]:
+ continue
+
+ commits.append(OffsetCommitRequest(topic_partition[0], topic_partition[1], commit_offset, metadata))
+
+ if commits:
+ logger.info('committing consumer offsets to group %s', self._config['group_id'])
+ resps = self._client.send_offset_commit_request(self._config['group_id'],
+ commits,
+ fail_on_error=False)
+
+ for r in resps:
+ check_error(r)
+ topic_partition = (r.topic, r.partition)
+ task_done = self._offsets.task_done[topic_partition]
+ self._offsets.commit[topic_partition] = (task_done + 1)
+
+ if self._config['auto_commit_enable']:
+ self._set_next_auto_commit_time()
+
+ return True
+
+ else:
+ logger.info('No new offsets found to commit in group %s', self._config['group_id'])
+ return False
+
+ #
+ # Topic/partition management private methods
+ #
+ def _consume_topic_partition(self, topic, partition):
+ if not isinstance(topic, six.string_types):
+ raise KafkaConfigurationError('Unknown topic type (%s) '
+ '-- expected string' % type(topic))
+ if not isinstance(partition, int):
+ raise KafkaConfigurationError('Unknown partition type (%s) '
+ '-- expected int' % type(partition))
+
+ if topic not in self._client.topic_partitions:
+ raise UnknownTopicOrPartitionError("Topic %s not found in broker metadata" % topic)
+ if partition not in self._client.get_partition_ids_for_topic(topic):
+ raise UnknownTopicOrPartitionError("Partition %d not found in Topic %s "
+ "in broker metadata" % (partition, topic))
+ logger.info("Configuring consumer to fetch topic '%s', partition %d", topic, partition)
+ self._topics.append((topic, partition))
+
def _refresh_metadata_on_error(self):
sleep_ms = self._config['refresh_leader_backoff_ms']
while True:
@@ -602,6 +559,71 @@ class KafkaConsumer(object):
logger.info("Topic metadata refreshed")
return
+ #
+ # Offset-managment private methods
+ #
+ def _get_commit_offsets(self):
+ logger.info("Consumer fetching stored offsets")
+ for topic_partition in self._topics:
+ (resp,) = self._client.send_offset_fetch_request(
+ self._config['group_id'],
+ [OffsetFetchRequest(topic_partition[0], topic_partition[1])],
+ fail_on_error=False)
+ try:
+ check_error(resp)
+ # API spec says server wont set an error here
+ # but 0.8.1.1 does actually...
+ except UnknownTopicOrPartitionError:
+ pass
+
+ # -1 offset signals no commit is currently stored
+ if resp.offset == -1:
+ self._offsets.commit[topic_partition] = None
+
+ # Otherwise we committed the stored offset
+ # and need to fetch the next one
+ else:
+ self._offsets.commit[topic_partition] = resp.offset
+
+ def _reset_highwater_offsets(self):
+ for topic_partition in self._topics:
+ self._offsets.highwater[topic_partition] = None
+
+ def _reset_task_done_offsets(self):
+ for topic_partition in self._topics:
+ self._offsets.task_done[topic_partition] = None
+
+ def _reset_partition_offset(self, topic_partition):
+ (topic, partition) = topic_partition
+ LATEST = -1
+ EARLIEST = -2
+
+ request_time_ms = None
+ if self._config['auto_offset_reset'] == 'largest':
+ request_time_ms = LATEST
+ elif self._config['auto_offset_reset'] == 'smallest':
+ request_time_ms = EARLIEST
+ else:
+
+ # Let's raise an reasonable exception type if user calls
+ # outside of an exception context
+ if sys.exc_info() == (None, None, None):
+ raise OffsetOutOfRangeError('Cannot reset partition offsets without a '
+ 'valid auto_offset_reset setting '
+ '(largest|smallest)')
+
+ # Otherwise we should re-raise the upstream exception
+ # b/c it typically includes additional data about
+ # the request that triggered it, and we do not want to drop that
+ raise
+
+ (offset, ) = self.get_partition_offsets(topic, partition,
+ request_time_ms, max_num_offsets=1)
+ return offset
+
+ #
+ # Consumer Timeout private methods
+ #
def _set_consumer_timeout_start(self):
self._consumer_timeout = False
if self._config['consumer_timeout_ms'] >= 0:
@@ -610,3 +632,29 @@ class KafkaConsumer(object):
def _check_consumer_timeout(self):
if self._consumer_timeout and time.time() > self._consumer_timeout:
raise ConsumerTimeout('Consumer timed out after %d ms' % + self._config['consumer_timeout_ms'])
+
+ #
+ # Autocommit private methods
+ #
+ def _should_auto_commit(self):
+ if not self._config['auto_commit_enable']:
+ return False
+
+ if not self._next_commit:
+ return False
+
+ return (time.time() >= self._next_commit)
+
+ def _set_next_auto_commit_time(self):
+ self._next_commit = time.time() + (self._config['auto_commit_interval_ms'] / 1000.0)
+
+ #
+ # python private methods
+ #
+ def __repr__(self):
+ return '<KafkaConsumer topics=(%s)>' % ', '.join(["%s-%d" % topic_partition
+ for topic_partition in
+ self._topics])
+
+ def __iter__(self):
+ return self