diff options
author | Dana Powers <dana.powers@rd.io> | 2015-03-01 11:05:36 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2015-03-01 11:05:36 -0800 |
commit | 0f04eba0977c77fd9edd0dc336d4934cd9de1348 (patch) | |
tree | e24c2d9db2f5c2735b94b3ccf14ba0c32b60783d /kafka/consumer/kafka.py | |
parent | 9f7d61258bc878cb70fa2f46d542e9aeb96ea462 (diff) | |
download | kafka-python-0f04eba0977c77fd9edd0dc336d4934cd9de1348.tar.gz |
Remove KafkaConsumer.BYTES_CONFIGURATION_KEYS
- rely on KafkaClient to handle client_id string/bytes encoding
- accept either string or bytes for group_id
- convert group_id string to utf-8 bytes internally
Diffstat (limited to 'kafka/consumer/kafka.py')
-rw-r--r-- | kafka/consumer/kafka.py | 13 |
1 files changed, 2 insertions, 11 deletions
diff --git a/kafka/consumer/kafka.py b/kafka/consumer/kafka.py index cd3cc4a..0093381 100644 --- a/kafka/consumer/kafka.py +++ b/kafka/consumer/kafka.py @@ -47,8 +47,6 @@ DEFAULT_CONSUMER_CONFIG = { 'rebalance_backoff_ms': 2000, } -BYTES_CONFIGURATION_KEYS = ('client_id', 'group_id') - class KafkaConsumer(object): """ @@ -171,13 +169,6 @@ class KafkaConsumer(object): raise KafkaConfigurationError('Unknown configuration key(s): ' + str(list(configs.keys()))) - # Handle str/bytes conversions - for config_key in BYTES_CONFIGURATION_KEYS: - if isinstance(self._config[config_key], six.string_types): - logger.warning("Converting configuration key '%s' to bytes" % - config_key) - self._config[config_key] = self._config[config_key].encode('utf-8') - if self._config['auto_commit_enable']: if not self._config['group_id']: raise KafkaConfigurationError('KafkaConsumer configured to auto-commit without required consumer group (group_id)') @@ -554,7 +545,7 @@ class KafkaConsumer(object): if commits: logger.info('committing consumer offsets to group %s', self._config['group_id']) - resps = self._client.send_offset_commit_request(self._config['group_id'], + resps = self._client.send_offset_commit_request(kafka_bytestring(self._config['group_id']), commits, fail_on_error=False) @@ -618,7 +609,7 @@ class KafkaConsumer(object): logger.info("Consumer fetching stored offsets") for topic_partition in self._topics: (resp,) = self._client.send_offset_fetch_request( - self._config['group_id'], + kafka_bytestring(self._config['group_id']), [OffsetFetchRequest(topic_partition[0], topic_partition[1])], fail_on_error=False) try: |