diff options
author | Dana Powers <dana.powers@rd.io> | 2015-03-08 22:57:34 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2015-03-23 23:35:28 -0700 |
commit | 3b1ccc68b324cf05740b7b02f8568889c750e4bf (patch) | |
tree | 228d77bec6362ee506b3786b3817f06379aa3837 /kafka/consumer/kafka.py | |
parent | 92aa7e94288cbfc4aed0dfbd52021d21694bced4 (diff) | |
download | kafka-python-3b1ccc68b324cf05740b7b02f8568889c750e4bf.tar.gz |
Deprecate KafkaConsumer config metadata_broker_list in favor of bootstrap_servers
Diffstat (limited to 'kafka/consumer/kafka.py')
-rw-r--r-- | kafka/consumer/kafka.py | 34 |
1 files changed, 26 insertions, 8 deletions
diff --git a/kafka/consumer/kafka.py b/kafka/consumer/kafka.py index 49ffa7b..f03d15e 100644 --- a/kafka/consumer/kafka.py +++ b/kafka/consumer/kafka.py @@ -25,7 +25,7 @@ OffsetsStruct = namedtuple("OffsetsStruct", ["fetch", "highwater", "commit", "ta DEFAULT_CONSUMER_CONFIG = { 'client_id': __name__, 'group_id': None, - 'metadata_broker_list': None, + 'bootstrap_servers': [], 'socket_timeout_ms': 30 * 1000, 'fetch_message_max_bytes': 1024 * 1024, 'auto_offset_reset': 'largest', @@ -47,6 +47,9 @@ DEFAULT_CONSUMER_CONFIG = { 'rebalance_backoff_ms': 2000, } +DEPRECATED_CONFIG_KEYS = { + 'metadata_broker_list': 'bootstrap_servers', +} class KafkaConsumer(object): """ @@ -56,7 +59,7 @@ class KafkaConsumer(object): # A very basic 'tail' consumer, with no stored offset management kafka = KafkaConsumer('topic1', - metadata_broker_list=['localhost:9092']) + bootstrap_servers=['localhost:9092']) for m in kafka: print m @@ -75,7 +78,7 @@ class KafkaConsumer(object): # more advanced consumer -- multiple topics w/ auto commit offset # management kafka = KafkaConsumer('topic1', 'topic2', - metadata_broker_list=['localhost:9092'], + bootstrap_servers=['localhost:9092'], group_id='my_consumer_group', auto_commit_enable=True, auto_commit_interval_ms=30 * 1000, @@ -120,7 +123,7 @@ class KafkaConsumer(object): fetch_min_bytes=1, fetch_wait_max_ms=100, refresh_leader_backoff_ms=200, - metadata_broker_list=None, + bootstrap_servers=[], socket_timeout_ms=30*1000, auto_offset_reset='largest', deserializer_class=lambda msg: msg, @@ -149,7 +152,7 @@ class KafkaConsumer(object): fetch_min_bytes=1, fetch_wait_max_ms=100, refresh_leader_backoff_ms=200, - metadata_broker_list=None, + bootstrap_servers=[], socket_timeout_ms=30*1000, auto_offset_reset='largest', deserializer_class=lambda msg: msg, @@ -161,6 +164,7 @@ class KafkaConsumer(object): Configuration parameters are described in more detail at http://kafka.apache.org/documentation.html#highlevelconsumerapi """ + configs = self._deprecate_configs(**configs) self._config = {} for key in DEFAULT_CONSUMER_CONFIG: self._config[key] = configs.pop(key, DEFAULT_CONSUMER_CONFIG[key]) @@ -178,11 +182,11 @@ class KafkaConsumer(object): logger.info("Configuring consumer to auto-commit offsets") self._reset_auto_commit() - if self._config['metadata_broker_list'] is None: - raise KafkaConfigurationError('metadata_broker_list required to ' + if not self._config['bootstrap_servers']: + raise KafkaConfigurationError('bootstrap_servers required to ' 'configure KafkaConsumer') - self._client = KafkaClient(self._config['metadata_broker_list'], + self._client = KafkaClient(self._config['bootstrap_servers'], client_id=self._config['client_id'], timeout=(self._config['socket_timeout_ms'] / 1000.0)) @@ -751,3 +755,17 @@ class KafkaConsumer(object): return '<KafkaConsumer topics=(%s)>' % ', '.join(["%s-%d" % topic_partition for topic_partition in self._topics]) + + # + # other private methods + # + + def _deprecate_configs(self, **configs): + for old, new in six.iteritems(DEPRECATED_CONFIG_KEYS): + if old in configs: + logger.warning('Deprecated Kafka Consumer configuration: %s. ' + 'Please use %s instead.', old, new) + old_value = configs.pop(old) + if new not in configs: + configs[new] = old_value + return configs |