summaryrefslogtreecommitdiff
path: root/kafka/consumer/kafka.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2015-06-08 22:27:31 -0700
committerDana Powers <dana.powers@rd.io>2015-06-08 22:27:31 -0700
commit25d5a523570cc3e286439e6296755e8746fa3982 (patch)
tree86c311e982a4fccb4d725bd2419f1f3b87827015 /kafka/consumer/kafka.py
parentb998fc7376272fc16ea4c3242d4f009f234ef85b (diff)
downloadkafka-python-25d5a523570cc3e286439e6296755e8746fa3982.tar.gz
KafkaConsumer style nits
Diffstat (limited to 'kafka/consumer/kafka.py')
-rw-r--r--kafka/consumer/kafka.py74
1 files changed, 48 insertions, 26 deletions
diff --git a/kafka/consumer/kafka.py b/kafka/consumer/kafka.py
index b141a98..11c4221 100644
--- a/kafka/consumer/kafka.py
+++ b/kafka/consumer/kafka.py
@@ -120,7 +120,10 @@ class KafkaConsumer(object):
if self._config['auto_commit_enable']:
if not self._config['group_id']:
- raise KafkaConfigurationError('KafkaConsumer configured to auto-commit without required consumer group (group_id)')
+ raise KafkaConfigurationError(
+ 'KafkaConsumer configured to auto-commit '
+ 'without required consumer group (group_id)'
+ )
# Check auto-commit configuration
if self._config['auto_commit_enable']:
@@ -128,12 +131,15 @@ class KafkaConsumer(object):
self._reset_auto_commit()
if not self._config['bootstrap_servers']:
- raise KafkaConfigurationError('bootstrap_servers required to '
- 'configure KafkaConsumer')
-
- self._client = KafkaClient(self._config['bootstrap_servers'],
- client_id=self._config['client_id'],
- timeout=(self._config['socket_timeout_ms'] / 1000.0))
+ raise KafkaConfigurationError(
+ 'bootstrap_servers required to configure KafkaConsumer'
+ )
+
+ self._client = KafkaClient(
+ self._config['bootstrap_servers'],
+ client_id=self._config['client_id'],
+ timeout=(self._config['socket_timeout_ms'] / 1000.0)
+ )
def set_topic_partitions(self, *topics):
"""
@@ -163,12 +169,12 @@ class KafkaConsumer(object):
# Consume topic1-all; topic2-partition2; topic3-partition0
kafka.set_topic_partitions("topic1", ("topic2", 2), {"topic3": 0})
- # Consume topic1-0 starting at offset 123, and topic2-1 at offset 456
+ # Consume topic1-0 starting at offset 12, and topic2-1 at offset 45
# using tuples --
- kafka.set_topic_partitions(("topic1", 0, 123), ("topic2", 1, 456))
+ kafka.set_topic_partitions(("topic1", 0, 12), ("topic2", 1, 45))
# using dict --
- kafka.set_topic_partitions({ ("topic1", 0): 123, ("topic2", 1): 456 })
+ kafka.set_topic_partitions({ ("topic1", 0): 12, ("topic2", 1): 45 })
"""
self._topics = []
@@ -216,8 +222,10 @@ class KafkaConsumer(object):
for partition in value:
self._consume_topic_partition(topic, partition)
else:
- raise KafkaConfigurationError('Unknown topic type (dict key must be '
- 'int or list/tuple of ints)')
+ raise KafkaConfigurationError(
+ 'Unknown topic type '
+ '(dict key must be int or list/tuple of ints)'
+ )
# (topic, partition): offset
elif isinstance(key, tuple):
@@ -316,7 +324,9 @@ class KafkaConsumer(object):
raise KafkaConfigurationError('No topics or partitions configured')
if not self._offsets.fetch:
- raise KafkaConfigurationError('No fetch offsets found when calling fetch_messages')
+ raise KafkaConfigurationError(
+ 'No fetch offsets found when calling fetch_messages'
+ )
fetches = [FetchRequest(topic, partition,
self._offsets.fetch[(topic, partition)],
@@ -383,7 +393,8 @@ class KafkaConsumer(object):
logger.debug('message offset less than fetched offset '
'skipping: %s', msg)
continue
- # Only increment fetch offset if we safely got the message and deserialized
+ # Only increment fetch offset
+ # if we safely got the message and deserialized
self._offsets.fetch[(topic, partition)] = offset + 1
# Then yield to user
@@ -396,10 +407,12 @@ class KafkaConsumer(object):
topic (str): topic for offset request
partition (int): partition for offset request
request_time_ms (int): Used to ask for all messages before a
- certain time (ms). There are two special values. Specify -1 to receive the latest
- offset (i.e. the offset of the next coming message) and -2 to receive the earliest
- available offset. Note that because offsets are pulled in descending order, asking for
- the earliest offset will always return you a single element.
+ certain time (ms). There are two special values.
+ Specify -1 to receive the latest offset (i.e. the offset of the
+ next coming message) and -2 to receive the earliest available
+ offset. Note that because offsets are pulled in descending
+ order, asking for the earliest offset will always return you a
+ single element.
max_num_offsets (int): Maximum offsets to include in the OffsetResponse
Returns:
@@ -499,7 +512,10 @@ class KafkaConsumer(object):
"""
if not self._config['group_id']:
logger.warning('Cannot commit without a group_id!')
- raise KafkaConfigurationError('Attempted to commit offsets without a configured consumer group (group_id)')
+ raise KafkaConfigurationError(
+ 'Attempted to commit offsets '
+ 'without a configured consumer group (group_id)'
+ )
# API supports storing metadata with each commit
# but for now it is unused
@@ -523,13 +539,17 @@ class KafkaConsumer(object):
if commit_offset == self._offsets.commit[topic_partition]:
continue
- commits.append(OffsetCommitRequest(topic_partition[0], topic_partition[1], commit_offset, metadata))
+ commits.append(
+ OffsetCommitRequest(topic_partition[0], topic_partition[1],
+ commit_offset, metadata)
+ )
if commits:
logger.info('committing consumer offsets to group %s', self._config['group_id'])
- resps = self._client.send_offset_commit_request(kafka_bytestring(self._config['group_id']),
- commits,
- fail_on_error=False)
+ resps = self._client.send_offset_commit_request(
+ kafka_bytestring(self._config['group_id']), commits,
+ fail_on_error=False
+ )
for r in resps:
check_error(r)
@@ -726,9 +746,11 @@ class KafkaConsumer(object):
#
def __repr__(self):
- return '<KafkaConsumer topics=(%s)>' % ', '.join(["%s-%d" % topic_partition
- for topic_partition in
- self._topics])
+ return '<{0} topics=({1})>'.format(
+ self.__class__.__name__,
+ '|'.join(["%s-%d" % topic_partition
+ for topic_partition in self._topics])
+ )
#
# other private methods