From 0f04eba0977c77fd9edd0dc336d4934cd9de1348 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 1 Mar 2015 11:05:36 -0800 Subject: Remove KafkaConsumer.BYTES_CONFIGURATION_KEYS - rely on KafkaClient to handle client_id string/bytes encoding - accept either string or bytes for group_id - convert group_id string to utf-8 bytes internally --- kafka/consumer/kafka.py | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) (limited to 'kafka') diff --git a/kafka/consumer/kafka.py b/kafka/consumer/kafka.py index cd3cc4a..0093381 100644 --- a/kafka/consumer/kafka.py +++ b/kafka/consumer/kafka.py @@ -47,8 +47,6 @@ DEFAULT_CONSUMER_CONFIG = { 'rebalance_backoff_ms': 2000, } -BYTES_CONFIGURATION_KEYS = ('client_id', 'group_id') - class KafkaConsumer(object): """ @@ -171,13 +169,6 @@ class KafkaConsumer(object): raise KafkaConfigurationError('Unknown configuration key(s): ' + str(list(configs.keys()))) - # Handle str/bytes conversions - for config_key in BYTES_CONFIGURATION_KEYS: - if isinstance(self._config[config_key], six.string_types): - logger.warning("Converting configuration key '%s' to bytes" % - config_key) - self._config[config_key] = self._config[config_key].encode('utf-8') - if self._config['auto_commit_enable']: if not self._config['group_id']: raise KafkaConfigurationError('KafkaConsumer configured to auto-commit without required consumer group (group_id)') @@ -554,7 +545,7 @@ class KafkaConsumer(object): if commits: logger.info('committing consumer offsets to group %s', self._config['group_id']) - resps = self._client.send_offset_commit_request(self._config['group_id'], + resps = self._client.send_offset_commit_request(kafka_bytestring(self._config['group_id']), commits, fail_on_error=False) @@ -618,7 +609,7 @@ class KafkaConsumer(object): logger.info("Consumer fetching stored offsets") for topic_partition in self._topics: (resp,) = self._client.send_offset_fetch_request( - self._config['group_id'], + kafka_bytestring(self._config['group_id']), [OffsetFetchRequest(topic_partition[0], topic_partition[1])], fail_on_error=False) try: -- cgit v1.2.1