diff options
Diffstat (limited to 'kafka/consumer/kafka.py')
-rw-r--r-- | kafka/consumer/kafka.py | 74 |
1 files changed, 48 insertions, 26 deletions
diff --git a/kafka/consumer/kafka.py b/kafka/consumer/kafka.py index b141a98..11c4221 100644 --- a/kafka/consumer/kafka.py +++ b/kafka/consumer/kafka.py @@ -120,7 +120,10 @@ class KafkaConsumer(object): if self._config['auto_commit_enable']: if not self._config['group_id']: - raise KafkaConfigurationError('KafkaConsumer configured to auto-commit without required consumer group (group_id)') + raise KafkaConfigurationError( + 'KafkaConsumer configured to auto-commit ' + 'without required consumer group (group_id)' + ) # Check auto-commit configuration if self._config['auto_commit_enable']: @@ -128,12 +131,15 @@ class KafkaConsumer(object): self._reset_auto_commit() if not self._config['bootstrap_servers']: - raise KafkaConfigurationError('bootstrap_servers required to ' - 'configure KafkaConsumer') - - self._client = KafkaClient(self._config['bootstrap_servers'], - client_id=self._config['client_id'], - timeout=(self._config['socket_timeout_ms'] / 1000.0)) + raise KafkaConfigurationError( + 'bootstrap_servers required to configure KafkaConsumer' + ) + + self._client = KafkaClient( + self._config['bootstrap_servers'], + client_id=self._config['client_id'], + timeout=(self._config['socket_timeout_ms'] / 1000.0) + ) def set_topic_partitions(self, *topics): """ @@ -163,12 +169,12 @@ class KafkaConsumer(object): # Consume topic1-all; topic2-partition2; topic3-partition0 kafka.set_topic_partitions("topic1", ("topic2", 2), {"topic3": 0}) - # Consume topic1-0 starting at offset 123, and topic2-1 at offset 456 + # Consume topic1-0 starting at offset 12, and topic2-1 at offset 45 # using tuples -- - kafka.set_topic_partitions(("topic1", 0, 123), ("topic2", 1, 456)) + kafka.set_topic_partitions(("topic1", 0, 12), ("topic2", 1, 45)) # using dict -- - kafka.set_topic_partitions({ ("topic1", 0): 123, ("topic2", 1): 456 }) + kafka.set_topic_partitions({ ("topic1", 0): 12, ("topic2", 1): 45 }) """ self._topics = [] @@ -216,8 +222,10 @@ class KafkaConsumer(object): for partition in value: self._consume_topic_partition(topic, partition) else: - raise KafkaConfigurationError('Unknown topic type (dict key must be ' - 'int or list/tuple of ints)') + raise KafkaConfigurationError( + 'Unknown topic type ' + '(dict key must be int or list/tuple of ints)' + ) # (topic, partition): offset elif isinstance(key, tuple): @@ -316,7 +324,9 @@ class KafkaConsumer(object): raise KafkaConfigurationError('No topics or partitions configured') if not self._offsets.fetch: - raise KafkaConfigurationError('No fetch offsets found when calling fetch_messages') + raise KafkaConfigurationError( + 'No fetch offsets found when calling fetch_messages' + ) fetches = [FetchRequest(topic, partition, self._offsets.fetch[(topic, partition)], @@ -383,7 +393,8 @@ class KafkaConsumer(object): logger.debug('message offset less than fetched offset ' 'skipping: %s', msg) continue - # Only increment fetch offset if we safely got the message and deserialized + # Only increment fetch offset + # if we safely got the message and deserialized self._offsets.fetch[(topic, partition)] = offset + 1 # Then yield to user @@ -396,10 +407,12 @@ class KafkaConsumer(object): topic (str): topic for offset request partition (int): partition for offset request request_time_ms (int): Used to ask for all messages before a - certain time (ms). There are two special values. Specify -1 to receive the latest - offset (i.e. the offset of the next coming message) and -2 to receive the earliest - available offset. Note that because offsets are pulled in descending order, asking for - the earliest offset will always return you a single element. + certain time (ms). There are two special values. + Specify -1 to receive the latest offset (i.e. the offset of the + next coming message) and -2 to receive the earliest available + offset. Note that because offsets are pulled in descending + order, asking for the earliest offset will always return you a + single element. max_num_offsets (int): Maximum offsets to include in the OffsetResponse Returns: @@ -499,7 +512,10 @@ class KafkaConsumer(object): """ if not self._config['group_id']: logger.warning('Cannot commit without a group_id!') - raise KafkaConfigurationError('Attempted to commit offsets without a configured consumer group (group_id)') + raise KafkaConfigurationError( + 'Attempted to commit offsets ' + 'without a configured consumer group (group_id)' + ) # API supports storing metadata with each commit # but for now it is unused @@ -523,13 +539,17 @@ class KafkaConsumer(object): if commit_offset == self._offsets.commit[topic_partition]: continue - commits.append(OffsetCommitRequest(topic_partition[0], topic_partition[1], commit_offset, metadata)) + commits.append( + OffsetCommitRequest(topic_partition[0], topic_partition[1], + commit_offset, metadata) + ) if commits: logger.info('committing consumer offsets to group %s', self._config['group_id']) - resps = self._client.send_offset_commit_request(kafka_bytestring(self._config['group_id']), - commits, - fail_on_error=False) + resps = self._client.send_offset_commit_request( + kafka_bytestring(self._config['group_id']), commits, + fail_on_error=False + ) for r in resps: check_error(r) @@ -726,9 +746,11 @@ class KafkaConsumer(object): # def __repr__(self): - return '<KafkaConsumer topics=(%s)>' % ', '.join(["%s-%d" % topic_partition - for topic_partition in - self._topics]) + return '<{0} topics=({1})>'.format( + self.__class__.__name__, + '|'.join(["%s-%d" % topic_partition + for topic_partition in self._topics]) + ) # # other private methods |