summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/consumer/kafka.py13
1 files changed, 2 insertions, 11 deletions
diff --git a/kafka/consumer/kafka.py b/kafka/consumer/kafka.py
index cd3cc4a..0093381 100644
--- a/kafka/consumer/kafka.py
+++ b/kafka/consumer/kafka.py
@@ -47,8 +47,6 @@ DEFAULT_CONSUMER_CONFIG = {
'rebalance_backoff_ms': 2000,
}
-BYTES_CONFIGURATION_KEYS = ('client_id', 'group_id')
-
class KafkaConsumer(object):
"""
@@ -171,13 +169,6 @@ class KafkaConsumer(object):
raise KafkaConfigurationError('Unknown configuration key(s): ' +
str(list(configs.keys())))
- # Handle str/bytes conversions
- for config_key in BYTES_CONFIGURATION_KEYS:
- if isinstance(self._config[config_key], six.string_types):
- logger.warning("Converting configuration key '%s' to bytes" %
- config_key)
- self._config[config_key] = self._config[config_key].encode('utf-8')
-
if self._config['auto_commit_enable']:
if not self._config['group_id']:
raise KafkaConfigurationError('KafkaConsumer configured to auto-commit without required consumer group (group_id)')
@@ -554,7 +545,7 @@ class KafkaConsumer(object):
if commits:
logger.info('committing consumer offsets to group %s', self._config['group_id'])
- resps = self._client.send_offset_commit_request(self._config['group_id'],
+ resps = self._client.send_offset_commit_request(kafka_bytestring(self._config['group_id']),
commits,
fail_on_error=False)
@@ -618,7 +609,7 @@ class KafkaConsumer(object):
logger.info("Consumer fetching stored offsets")
for topic_partition in self._topics:
(resp,) = self._client.send_offset_fetch_request(
- self._config['group_id'],
+ kafka_bytestring(self._config['group_id']),
[OffsetFetchRequest(topic_partition[0], topic_partition[1])],
fail_on_error=False)
try: