summaryrefslogtreecommitdiff
path: root/kafka/consumer/kafka.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2015-03-08 22:57:34 -0700
committerDana Powers <dana.powers@rd.io>2015-03-23 23:35:28 -0700
commit3b1ccc68b324cf05740b7b02f8568889c750e4bf (patch)
tree228d77bec6362ee506b3786b3817f06379aa3837 /kafka/consumer/kafka.py
parent92aa7e94288cbfc4aed0dfbd52021d21694bced4 (diff)
downloadkafka-python-3b1ccc68b324cf05740b7b02f8568889c750e4bf.tar.gz
Deprecate KafkaConsumer config metadata_broker_list in favor of bootstrap_servers
Diffstat (limited to 'kafka/consumer/kafka.py')
-rw-r--r--kafka/consumer/kafka.py34
1 files changed, 26 insertions, 8 deletions
diff --git a/kafka/consumer/kafka.py b/kafka/consumer/kafka.py
index 49ffa7b..f03d15e 100644
--- a/kafka/consumer/kafka.py
+++ b/kafka/consumer/kafka.py
@@ -25,7 +25,7 @@ OffsetsStruct = namedtuple("OffsetsStruct", ["fetch", "highwater", "commit", "ta
DEFAULT_CONSUMER_CONFIG = {
'client_id': __name__,
'group_id': None,
- 'metadata_broker_list': None,
+ 'bootstrap_servers': [],
'socket_timeout_ms': 30 * 1000,
'fetch_message_max_bytes': 1024 * 1024,
'auto_offset_reset': 'largest',
@@ -47,6 +47,9 @@ DEFAULT_CONSUMER_CONFIG = {
'rebalance_backoff_ms': 2000,
}
+DEPRECATED_CONFIG_KEYS = {
+ 'metadata_broker_list': 'bootstrap_servers',
+}
class KafkaConsumer(object):
"""
@@ -56,7 +59,7 @@ class KafkaConsumer(object):
# A very basic 'tail' consumer, with no stored offset management
kafka = KafkaConsumer('topic1',
- metadata_broker_list=['localhost:9092'])
+ bootstrap_servers=['localhost:9092'])
for m in kafka:
print m
@@ -75,7 +78,7 @@ class KafkaConsumer(object):
# more advanced consumer -- multiple topics w/ auto commit offset
# management
kafka = KafkaConsumer('topic1', 'topic2',
- metadata_broker_list=['localhost:9092'],
+ bootstrap_servers=['localhost:9092'],
group_id='my_consumer_group',
auto_commit_enable=True,
auto_commit_interval_ms=30 * 1000,
@@ -120,7 +123,7 @@ class KafkaConsumer(object):
fetch_min_bytes=1,
fetch_wait_max_ms=100,
refresh_leader_backoff_ms=200,
- metadata_broker_list=None,
+ bootstrap_servers=[],
socket_timeout_ms=30*1000,
auto_offset_reset='largest',
deserializer_class=lambda msg: msg,
@@ -149,7 +152,7 @@ class KafkaConsumer(object):
fetch_min_bytes=1,
fetch_wait_max_ms=100,
refresh_leader_backoff_ms=200,
- metadata_broker_list=None,
+ bootstrap_servers=[],
socket_timeout_ms=30*1000,
auto_offset_reset='largest',
deserializer_class=lambda msg: msg,
@@ -161,6 +164,7 @@ class KafkaConsumer(object):
Configuration parameters are described in more detail at
http://kafka.apache.org/documentation.html#highlevelconsumerapi
"""
+ configs = self._deprecate_configs(**configs)
self._config = {}
for key in DEFAULT_CONSUMER_CONFIG:
self._config[key] = configs.pop(key, DEFAULT_CONSUMER_CONFIG[key])
@@ -178,11 +182,11 @@ class KafkaConsumer(object):
logger.info("Configuring consumer to auto-commit offsets")
self._reset_auto_commit()
- if self._config['metadata_broker_list'] is None:
- raise KafkaConfigurationError('metadata_broker_list required to '
+ if not self._config['bootstrap_servers']:
+ raise KafkaConfigurationError('bootstrap_servers required to '
'configure KafkaConsumer')
- self._client = KafkaClient(self._config['metadata_broker_list'],
+ self._client = KafkaClient(self._config['bootstrap_servers'],
client_id=self._config['client_id'],
timeout=(self._config['socket_timeout_ms'] / 1000.0))
@@ -751,3 +755,17 @@ class KafkaConsumer(object):
return '<KafkaConsumer topics=(%s)>' % ', '.join(["%s-%d" % topic_partition
for topic_partition in
self._topics])
+
+ #
+ # other private methods
+ #
+
+ def _deprecate_configs(self, **configs):
+ for old, new in six.iteritems(DEPRECATED_CONFIG_KEYS):
+ if old in configs:
+ logger.warning('Deprecated Kafka Consumer configuration: %s. '
+ 'Please use %s instead.', old, new)
+ old_value = configs.pop(old)
+ if new not in configs:
+ configs[new] = old_value
+ return configs