diff options
author | Dana Powers <dana.powers@rd.io> | 2014-09-15 21:48:04 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2014-12-15 12:42:55 -0800 |
commit | 08f6ad94556256d710a5d4b517986111de32ffa1 (patch) | |
tree | eca1f10cd8195db62f9f11b56f3026a710307ee1 /kafka/consumer | |
parent | 5b882981d17cfec06cf2f7b44ff34313e7f0180a (diff) | |
download | kafka-python-08f6ad94556256d710a5d4b517986111de32ffa1.tar.gz |
Reorder methods, add docstrings to public methds, section comments for private methods
Diffstat (limited to 'kafka/consumer')
-rw-r--r-- | kafka/consumer/new.py | 462 |
1 files changed, 255 insertions, 207 deletions
diff --git a/kafka/consumer/new.py b/kafka/consumer/new.py index f0a4424..abafae8 100644 --- a/kafka/consumer/new.py +++ b/kafka/consumer/new.py @@ -124,171 +124,6 @@ class KafkaConsumer(object): self.set_topic_partitions(*topics) self._msg_iter = None - def _get_commit_offsets(self): - logger.info("Consumer fetching stored offsets") - for topic_partition in self._topics: - (resp,) = self._client.send_offset_fetch_request( - self._config['group_id'], - [OffsetFetchRequest(topic_partition[0], topic_partition[1])], - fail_on_error=False) - try: - check_error(resp) - # API spec says server wont set an error here - # but 0.8.1.1 does actually... - except UnknownTopicOrPartitionError: - pass - - # -1 offset signals no commit is currently stored - if resp.offset == -1: - self._offsets.commit[topic_partition] = None - - # Otherwise we committed the stored offset - # and need to fetch the next one - else: - self._offsets.commit[topic_partition] = resp.offset - - def _reset_highwater_offsets(self): - for topic_partition in self._topics: - self._offsets.highwater[topic_partition] = None - - def _reset_task_done_offsets(self): - for topic_partition in self._topics: - self._offsets.task_done[topic_partition] = None - - def __repr__(self): - return '<KafkaConsumer topics=(%s)>' % ', '.join(["%s-%d" % topic_partition - for topic_partition in - self._topics]) - - def __iter__(self): - return self - - def next(self): - self._set_consumer_timeout_start() - while True: - - # Fetch a new batch if needed - if self._msg_iter is None: - self._msg_iter = self.fetch_messages() - - # Check for auto-commit - if self._should_auto_commit(): - self.commit() - - try: - return self._msg_iter.next() - - # Handle batch completion - except StopIteration: - self._msg_iter = None - - self._check_consumer_timeout() - - def offsets(self, group=None): - if not group: - return { - 'fetch': self.offsets('fetch'), - 'commit': self.offsets('commit'), - 'task_done': self.offsets('task_done'), - 'highwater': self.offsets('highwater') - } - else: - return dict(deepcopy(getattr(self._offsets, group))) - - def task_done(self, message): - """ - Mark a fetched message as consumed. - Offsets for messages marked as "task_done" will be stored back - to the kafka cluster for this consumer group on commit() - """ - topic_partition = (message.topic, message.partition) - offset = message.offset - - # Warn on non-contiguous offsets - prev_done = self._offsets.task_done[topic_partition] - if prev_done is not None and offset != (prev_done + 1): - logger.warning('Marking task_done on a non-continuous offset: %d != %d + 1', - offset, prev_done) - - # Warn on smaller offsets than previous commit - # "commit" offsets are actually the offset of the next # message to fetch. - # so task_done should be compared with (commit - 1) - prev_done = (self._offsets.commit[topic_partition] - 1) - if prev_done is not None and (offset <= prev_done): - logger.warning('Marking task_done on a previously committed offset?: %d <= %d', - offset, prev_done) - - self._offsets.task_done[topic_partition] = offset - - def _should_auto_commit(self): - if not self._config['auto_commit_enable']: - return False - - if not self._next_commit: - return False - - return (time.time() >= self._next_commit) - - def _set_next_auto_commit_time(self): - self._next_commit = time.time() + (self._config['auto_commit_interval_ms'] / 1000.0) - - def commit(self): - """ - Store consumed message offsets (marked via task_done()) - to kafka cluster for this consumer_group. - - Note -- this functionality requires server version >=0.8.1.1 - see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI - """ - if not self._config['group_id']: - logger.warning('Cannot commit without a group_id!') - raise KafkaConfigurationError('Attempted to commit offsets without a configured consumer group (group_id)') - - # API supports storing metadata with each commit - # but for now it is unused - metadata = '' - - offsets = self._offsets.task_done - commits = [] - for topic_partition, task_done_offset in offsets.iteritems(): - - # Skip if None - if task_done_offset is None: - continue - - # Commit offsets as the next offset to fetch - # which is consistent with the Java Client - # task_done is marked by messages consumed, - # so add one to mark the next message for fetching - commit_offset = (task_done_offset + 1) - - # Skip if no change from previous committed - if commit_offset == self._offsets.commit[topic_partition]: - continue - - commits.append(OffsetCommitRequest(topic_partition[0], topic_partition[1], commit_offset, metadata)) - - if commits: - logger.info('committing consumer offsets to group %s', self._config['group_id']) - resps = self._client.send_offset_commit_request(self._config['group_id'], - commits, - fail_on_error=False) - - for r in resps: - check_error(r) - topic_partition = (r.topic, r.partition) - task_done = self._offsets.task_done[topic_partition] - self._offsets.commit[topic_partition] = (task_done + 1) - - if self._config['auto_commit_enable']: - self._set_next_auto_commit_time() - - return True - - else: - logger.info('No new offsets found to commit in group %s', self._config['group_id']) - return False - def configure(self, **configs): """ Configuration settings can be passed to constructor, @@ -445,23 +280,56 @@ class KafkaConsumer(object): self._reset_highwater_offsets() self._reset_task_done_offsets() - def _consume_topic_partition(self, topic, partition): - if not isinstance(topic, six.string_types): - raise KafkaConfigurationError('Unknown topic type (%s) ' - '-- expected string' % type(topic)) - if not isinstance(partition, int): - raise KafkaConfigurationError('Unknown partition type (%s) ' - '-- expected int' % type(partition)) - if topic not in self._client.topic_partitions: - raise UnknownTopicOrPartitionError("Topic %s not found in broker metadata" % topic) - if partition not in self._client.get_partition_ids_for_topic(topic): - raise UnknownTopicOrPartitionError("Partition %d not found in Topic %s " - "in broker metadata" % (partition, topic)) - logger.info("Configuring consumer to fetch topic '%s', partition %d", topic, partition) - self._topics.append((topic, partition)) + def next(self): + """ + Return a single message from the message iterator + If consumer_timeout_ms is set, will raise ConsumerTimeout + if no message is available + Otherwise blocks indefinitely + + Note that this is also the method called internally during iteration: + ``` + for m in consumer: + pass + ``` + """ + self._set_consumer_timeout_start() + while True: + + # Fetch a new batch if needed + if self._msg_iter is None: + self._msg_iter = self.fetch_messages() + + # Check for auto-commit + if self._should_auto_commit(): + self.commit() + + try: + return self._msg_iter.next() + + # Handle batch completion + except StopIteration: + self._msg_iter = None + + self._check_consumer_timeout() def fetch_messages(self): + """ + Sends FetchRequests for all topic/partitions set for consumption + Returns a generator that yields KafkaMessage structs + after deserializing with the configured `deserializer_class` + + Refreshes metadata on errors, and resets fetch offset on + OffsetOutOfRange, per the configured `auto_offset_reset` policy + + Key configuration parameters: + `fetch_message_max_bytes` + `fetch_max_wait_ms` + `fetch_min_bytes` + `deserializer_class` + `auto_offset_reset` + """ max_bytes = self._config['fetch_message_max_bytes'] max_wait_time = self._config['fetch_wait_max_ms'] @@ -528,34 +396,6 @@ class KafkaConsumer(object): # Then yield to user yield msg - def _reset_partition_offset(self, topic_partition): - (topic, partition) = topic_partition - LATEST = -1 - EARLIEST = -2 - - request_time_ms = None - if self._config['auto_offset_reset'] == 'largest': - request_time_ms = LATEST - elif self._config['auto_offset_reset'] == 'smallest': - request_time_ms = EARLIEST - else: - - # Let's raise an reasonable exception type if user calls - # outside of an exception context - if sys.exc_info() == (None, None, None): - raise OffsetOutOfRangeError('Cannot reset partition offsets without a ' - 'valid auto_offset_reset setting ' - '(largest|smallest)') - - # Otherwise we should re-raise the upstream exception - # b/c it typically includes additional data about - # the request that triggered it, and we do not want to drop that - raise - - (offset, ) = self.get_partition_offsets(topic, partition, - request_time_ms, max_num_offsets=1) - return offset - def get_partition_offsets(self, topic, partition, request_time_ms, max_num_offsets): """ Request available fetch offsets for a single topic/partition @@ -588,6 +428,123 @@ class KafkaConsumer(object): return resp.offsets + def offsets(self, group=None): + """ + Returns a copy of internal offsets struct + optional param: group [fetch|commit|task_done|highwater] + if no group specified, returns all groups + """ + if not group: + return { + 'fetch': self.offsets('fetch'), + 'commit': self.offsets('commit'), + 'task_done': self.offsets('task_done'), + 'highwater': self.offsets('highwater') + } + else: + return dict(deepcopy(getattr(self._offsets, group))) + + def task_done(self, message): + """ + Mark a fetched message as consumed. + Offsets for messages marked as "task_done" will be stored back + to the kafka cluster for this consumer group on commit() + """ + topic_partition = (message.topic, message.partition) + offset = message.offset + + # Warn on non-contiguous offsets + prev_done = self._offsets.task_done[topic_partition] + if prev_done is not None and offset != (prev_done + 1): + logger.warning('Marking task_done on a non-continuous offset: %d != %d + 1', + offset, prev_done) + + # Warn on smaller offsets than previous commit + # "commit" offsets are actually the offset of the next # message to fetch. + # so task_done should be compared with (commit - 1) + prev_done = (self._offsets.commit[topic_partition] - 1) + if prev_done is not None and (offset <= prev_done): + logger.warning('Marking task_done on a previously committed offset?: %d <= %d', + offset, prev_done) + + self._offsets.task_done[topic_partition] = offset + + def commit(self): + """ + Store consumed message offsets (marked via task_done()) + to kafka cluster for this consumer_group. + + Note -- this functionality requires server version >=0.8.1.1 + see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI + """ + if not self._config['group_id']: + logger.warning('Cannot commit without a group_id!') + raise KafkaConfigurationError('Attempted to commit offsets without a configured consumer group (group_id)') + + # API supports storing metadata with each commit + # but for now it is unused + metadata = '' + + offsets = self._offsets.task_done + commits = [] + for topic_partition, task_done_offset in offsets.iteritems(): + + # Skip if None + if task_done_offset is None: + continue + + # Commit offsets as the next offset to fetch + # which is consistent with the Java Client + # task_done is marked by messages consumed, + # so add one to mark the next message for fetching + commit_offset = (task_done_offset + 1) + + # Skip if no change from previous committed + if commit_offset == self._offsets.commit[topic_partition]: + continue + + commits.append(OffsetCommitRequest(topic_partition[0], topic_partition[1], commit_offset, metadata)) + + if commits: + logger.info('committing consumer offsets to group %s', self._config['group_id']) + resps = self._client.send_offset_commit_request(self._config['group_id'], + commits, + fail_on_error=False) + + for r in resps: + check_error(r) + topic_partition = (r.topic, r.partition) + task_done = self._offsets.task_done[topic_partition] + self._offsets.commit[topic_partition] = (task_done + 1) + + if self._config['auto_commit_enable']: + self._set_next_auto_commit_time() + + return True + + else: + logger.info('No new offsets found to commit in group %s', self._config['group_id']) + return False + + # + # Topic/partition management private methods + # + def _consume_topic_partition(self, topic, partition): + if not isinstance(topic, six.string_types): + raise KafkaConfigurationError('Unknown topic type (%s) ' + '-- expected string' % type(topic)) + if not isinstance(partition, int): + raise KafkaConfigurationError('Unknown partition type (%s) ' + '-- expected int' % type(partition)) + + if topic not in self._client.topic_partitions: + raise UnknownTopicOrPartitionError("Topic %s not found in broker metadata" % topic) + if partition not in self._client.get_partition_ids_for_topic(topic): + raise UnknownTopicOrPartitionError("Partition %d not found in Topic %s " + "in broker metadata" % (partition, topic)) + logger.info("Configuring consumer to fetch topic '%s', partition %d", topic, partition) + self._topics.append((topic, partition)) + def _refresh_metadata_on_error(self): sleep_ms = self._config['refresh_leader_backoff_ms'] while True: @@ -602,6 +559,71 @@ class KafkaConsumer(object): logger.info("Topic metadata refreshed") return + # + # Offset-managment private methods + # + def _get_commit_offsets(self): + logger.info("Consumer fetching stored offsets") + for topic_partition in self._topics: + (resp,) = self._client.send_offset_fetch_request( + self._config['group_id'], + [OffsetFetchRequest(topic_partition[0], topic_partition[1])], + fail_on_error=False) + try: + check_error(resp) + # API spec says server wont set an error here + # but 0.8.1.1 does actually... + except UnknownTopicOrPartitionError: + pass + + # -1 offset signals no commit is currently stored + if resp.offset == -1: + self._offsets.commit[topic_partition] = None + + # Otherwise we committed the stored offset + # and need to fetch the next one + else: + self._offsets.commit[topic_partition] = resp.offset + + def _reset_highwater_offsets(self): + for topic_partition in self._topics: + self._offsets.highwater[topic_partition] = None + + def _reset_task_done_offsets(self): + for topic_partition in self._topics: + self._offsets.task_done[topic_partition] = None + + def _reset_partition_offset(self, topic_partition): + (topic, partition) = topic_partition + LATEST = -1 + EARLIEST = -2 + + request_time_ms = None + if self._config['auto_offset_reset'] == 'largest': + request_time_ms = LATEST + elif self._config['auto_offset_reset'] == 'smallest': + request_time_ms = EARLIEST + else: + + # Let's raise an reasonable exception type if user calls + # outside of an exception context + if sys.exc_info() == (None, None, None): + raise OffsetOutOfRangeError('Cannot reset partition offsets without a ' + 'valid auto_offset_reset setting ' + '(largest|smallest)') + + # Otherwise we should re-raise the upstream exception + # b/c it typically includes additional data about + # the request that triggered it, and we do not want to drop that + raise + + (offset, ) = self.get_partition_offsets(topic, partition, + request_time_ms, max_num_offsets=1) + return offset + + # + # Consumer Timeout private methods + # def _set_consumer_timeout_start(self): self._consumer_timeout = False if self._config['consumer_timeout_ms'] >= 0: @@ -610,3 +632,29 @@ class KafkaConsumer(object): def _check_consumer_timeout(self): if self._consumer_timeout and time.time() > self._consumer_timeout: raise ConsumerTimeout('Consumer timed out after %d ms' % + self._config['consumer_timeout_ms']) + + # + # Autocommit private methods + # + def _should_auto_commit(self): + if not self._config['auto_commit_enable']: + return False + + if not self._next_commit: + return False + + return (time.time() >= self._next_commit) + + def _set_next_auto_commit_time(self): + self._next_commit = time.time() + (self._config['auto_commit_interval_ms'] / 1000.0) + + # + # python private methods + # + def __repr__(self): + return '<KafkaConsumer topics=(%s)>' % ', '.join(["%s-%d" % topic_partition + for topic_partition in + self._topics]) + + def __iter__(self): + return self |