summaryrefslogtreecommitdiff
path: root/kafka/consumer/kafka.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2015-03-01 11:05:36 -0800
committerDana Powers <dana.powers@rd.io>2015-03-01 11:05:36 -0800
commit0f04eba0977c77fd9edd0dc336d4934cd9de1348 (patch)
treee24c2d9db2f5c2735b94b3ccf14ba0c32b60783d /kafka/consumer/kafka.py
parent9f7d61258bc878cb70fa2f46d542e9aeb96ea462 (diff)
downloadkafka-python-0f04eba0977c77fd9edd0dc336d4934cd9de1348.tar.gz
Remove KafkaConsumer.BYTES_CONFIGURATION_KEYS
- rely on KafkaClient to handle client_id string/bytes encoding - accept either string or bytes for group_id - convert group_id string to utf-8 bytes internally
Diffstat (limited to 'kafka/consumer/kafka.py')
-rw-r--r--kafka/consumer/kafka.py13
1 files changed, 2 insertions, 11 deletions
diff --git a/kafka/consumer/kafka.py b/kafka/consumer/kafka.py
index cd3cc4a..0093381 100644
--- a/kafka/consumer/kafka.py
+++ b/kafka/consumer/kafka.py
@@ -47,8 +47,6 @@ DEFAULT_CONSUMER_CONFIG = {
'rebalance_backoff_ms': 2000,
}
-BYTES_CONFIGURATION_KEYS = ('client_id', 'group_id')
-
class KafkaConsumer(object):
"""
@@ -171,13 +169,6 @@ class KafkaConsumer(object):
raise KafkaConfigurationError('Unknown configuration key(s): ' +
str(list(configs.keys())))
- # Handle str/bytes conversions
- for config_key in BYTES_CONFIGURATION_KEYS:
- if isinstance(self._config[config_key], six.string_types):
- logger.warning("Converting configuration key '%s' to bytes" %
- config_key)
- self._config[config_key] = self._config[config_key].encode('utf-8')
-
if self._config['auto_commit_enable']:
if not self._config['group_id']:
raise KafkaConfigurationError('KafkaConsumer configured to auto-commit without required consumer group (group_id)')
@@ -554,7 +545,7 @@ class KafkaConsumer(object):
if commits:
logger.info('committing consumer offsets to group %s', self._config['group_id'])
- resps = self._client.send_offset_commit_request(self._config['group_id'],
+ resps = self._client.send_offset_commit_request(kafka_bytestring(self._config['group_id']),
commits,
fail_on_error=False)
@@ -618,7 +609,7 @@ class KafkaConsumer(object):
logger.info("Consumer fetching stored offsets")
for topic_partition in self._topics:
(resp,) = self._client.send_offset_fetch_request(
- self._config['group_id'],
+ kafka_bytestring(self._config['group_id']),
[OffsetFetchRequest(topic_partition[0], topic_partition[1])],
fail_on_error=False)
try: