diff options
author | Dana Powers <dana.powers@gmail.com> | 2015-06-09 17:58:52 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2015-06-09 17:58:52 -0700 |
commit | f035de49da2b4d7205e746c99e729c0810285e89 (patch) | |
tree | 888f51785e79fe30789398e29ae92f3707346571 | |
parent | 0cec44a2459b77e3fab2d1eb21abc684296c0f30 (diff) | |
parent | 25d5a523570cc3e286439e6296755e8746fa3982 (diff) | |
download | kafka-python-f035de49da2b4d7205e746c99e729c0810285e89.tar.gz |
Merge pull request #398 from dpkp/kafka_consumer_failed_payloads
Kafka consumer failed payloads
-rw-r--r-- | kafka/consumer/kafka.py | 98 |
1 files changed, 61 insertions, 37 deletions
diff --git a/kafka/consumer/kafka.py b/kafka/consumer/kafka.py index 7ba83cb..11c4221 100644 --- a/kafka/consumer/kafka.py +++ b/kafka/consumer/kafka.py @@ -120,7 +120,10 @@ class KafkaConsumer(object): if self._config['auto_commit_enable']: if not self._config['group_id']: - raise KafkaConfigurationError('KafkaConsumer configured to auto-commit without required consumer group (group_id)') + raise KafkaConfigurationError( + 'KafkaConsumer configured to auto-commit ' + 'without required consumer group (group_id)' + ) # Check auto-commit configuration if self._config['auto_commit_enable']: @@ -128,12 +131,15 @@ class KafkaConsumer(object): self._reset_auto_commit() if not self._config['bootstrap_servers']: - raise KafkaConfigurationError('bootstrap_servers required to ' - 'configure KafkaConsumer') - - self._client = KafkaClient(self._config['bootstrap_servers'], - client_id=self._config['client_id'], - timeout=(self._config['socket_timeout_ms'] / 1000.0)) + raise KafkaConfigurationError( + 'bootstrap_servers required to configure KafkaConsumer' + ) + + self._client = KafkaClient( + self._config['bootstrap_servers'], + client_id=self._config['client_id'], + timeout=(self._config['socket_timeout_ms'] / 1000.0) + ) def set_topic_partitions(self, *topics): """ @@ -163,12 +169,12 @@ class KafkaConsumer(object): # Consume topic1-all; topic2-partition2; topic3-partition0 kafka.set_topic_partitions("topic1", ("topic2", 2), {"topic3": 0}) - # Consume topic1-0 starting at offset 123, and topic2-1 at offset 456 + # Consume topic1-0 starting at offset 12, and topic2-1 at offset 45 # using tuples -- - kafka.set_topic_partitions(("topic1", 0, 123), ("topic2", 1, 456)) + kafka.set_topic_partitions(("topic1", 0, 12), ("topic2", 1, 45)) # using dict -- - kafka.set_topic_partitions({ ("topic1", 0): 123, ("topic2", 1): 456 }) + kafka.set_topic_partitions({ ("topic1", 0): 12, ("topic2", 1): 45 }) """ self._topics = [] @@ -216,8 +222,10 @@ class KafkaConsumer(object): for partition in value: self._consume_topic_partition(topic, partition) else: - raise KafkaConfigurationError('Unknown topic type (dict key must be ' - 'int or list/tuple of ints)') + raise KafkaConfigurationError( + 'Unknown topic type ' + '(dict key must be int or list/tuple of ints)' + ) # (topic, partition): offset elif isinstance(key, tuple): @@ -316,26 +324,30 @@ class KafkaConsumer(object): raise KafkaConfigurationError('No topics or partitions configured') if not self._offsets.fetch: - raise KafkaConfigurationError('No fetch offsets found when calling fetch_messages') + raise KafkaConfigurationError( + 'No fetch offsets found when calling fetch_messages' + ) fetches = [FetchRequest(topic, partition, self._offsets.fetch[(topic, partition)], max_bytes) for (topic, partition) in self._topics] - # client.send_fetch_request will collect topic/partition requests by leader - # and send each group as a single FetchRequest to the correct broker - try: - responses = self._client.send_fetch_request(fetches, - max_wait_time=max_wait_time, - min_bytes=min_bytes, - fail_on_error=False) - except FailedPayloadsError: - logger.warning('FailedPayloadsError attempting to fetch data from kafka') - self._refresh_metadata_on_error() - return + # send_fetch_request will batch topic/partition requests by leader + responses = self._client.send_fetch_request( + fetches, + max_wait_time=max_wait_time, + min_bytes=min_bytes, + fail_on_error=False + ) for resp in responses: + + if isinstance(resp, FailedPayloadsError): + logger.warning('FailedPayloadsError attempting to fetch data') + self._refresh_metadata_on_error() + continue + topic = kafka_bytestring(resp.topic) partition = resp.partition try: @@ -381,7 +393,8 @@ class KafkaConsumer(object): logger.debug('message offset less than fetched offset ' 'skipping: %s', msg) continue - # Only increment fetch offset if we safely got the message and deserialized + # Only increment fetch offset + # if we safely got the message and deserialized self._offsets.fetch[(topic, partition)] = offset + 1 # Then yield to user @@ -394,10 +407,12 @@ class KafkaConsumer(object): topic (str): topic for offset request partition (int): partition for offset request request_time_ms (int): Used to ask for all messages before a - certain time (ms). There are two special values. Specify -1 to receive the latest - offset (i.e. the offset of the next coming message) and -2 to receive the earliest - available offset. Note that because offsets are pulled in descending order, asking for - the earliest offset will always return you a single element. + certain time (ms). There are two special values. + Specify -1 to receive the latest offset (i.e. the offset of the + next coming message) and -2 to receive the earliest available + offset. Note that because offsets are pulled in descending + order, asking for the earliest offset will always return you a + single element. max_num_offsets (int): Maximum offsets to include in the OffsetResponse Returns: @@ -497,7 +512,10 @@ class KafkaConsumer(object): """ if not self._config['group_id']: logger.warning('Cannot commit without a group_id!') - raise KafkaConfigurationError('Attempted to commit offsets without a configured consumer group (group_id)') + raise KafkaConfigurationError( + 'Attempted to commit offsets ' + 'without a configured consumer group (group_id)' + ) # API supports storing metadata with each commit # but for now it is unused @@ -521,13 +539,17 @@ class KafkaConsumer(object): if commit_offset == self._offsets.commit[topic_partition]: continue - commits.append(OffsetCommitRequest(topic_partition[0], topic_partition[1], commit_offset, metadata)) + commits.append( + OffsetCommitRequest(topic_partition[0], topic_partition[1], + commit_offset, metadata) + ) if commits: logger.info('committing consumer offsets to group %s', self._config['group_id']) - resps = self._client.send_offset_commit_request(kafka_bytestring(self._config['group_id']), - commits, - fail_on_error=False) + resps = self._client.send_offset_commit_request( + kafka_bytestring(self._config['group_id']), commits, + fail_on_error=False + ) for r in resps: check_error(r) @@ -724,9 +746,11 @@ class KafkaConsumer(object): # def __repr__(self): - return '<KafkaConsumer topics=(%s)>' % ', '.join(["%s-%d" % topic_partition - for topic_partition in - self._topics]) + return '<{0} topics=({1})>'.format( + self.__class__.__name__, + '|'.join(["%s-%d" % topic_partition + for topic_partition in self._topics]) + ) # # other private methods |