summaryrefslogtreecommitdiff
path: root/kafka/consumer
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2015-12-29 19:08:35 -0800
committerDana Powers <dana.powers@rd.io>2015-12-29 19:08:35 -0800
commit3afdd285a3c92a2c4add5b2b1bd94cfcec4fedd9 (patch)
tree2c38fd2c577442cb90c99ee2d49b5b0f68300303 /kafka/consumer
parente5c7d81e7c35e6b013cece347ef42d9f21d03aa6 (diff)
downloadkafka-python-3afdd285a3c92a2c4add5b2b1bd94cfcec4fedd9.tar.gz
Switch configs from attributes to dict to make passing / inspecting easier
Diffstat (limited to 'kafka/consumer')
-rw-r--r--kafka/consumer/fetcher.py50
-rw-r--r--kafka/consumer/group.py105
2 files changed, 76 insertions, 79 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index ea9c8b9..39e1244 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -1,6 +1,7 @@
from __future__ import absolute_import
import collections
+import copy
import logging
import six
@@ -28,27 +29,25 @@ class RecordTooLargeError(Errors.KafkaError):
class Fetcher(object):
- _key_deserializer = None
- _value_deserializer = None
- _fetch_min_bytes = 1024
- _fetch_max_wait_ms = 500
- _max_partition_fetch_bytes = 1048576
- _check_crcs = True
- _retry_backoff_ms = 100
-
- def __init__(self, client, subscriptions, **kwargs):
+ DEFAULT_CONFIG = {
+ 'key_deserializer': None,
+ 'value_deserializer': None,
+ 'fetch_min_bytes': 1024,
+ 'fetch_max_wait_ms': 500,
+ 'max_partition_fetch_bytes': 1048576,
+ 'check_crcs': True,
+ }
+
+ def __init__(self, client, subscriptions, **configs):
#metrics=None,
#metric_group_prefix='consumer',
+ self.config = copy.copy(self.DEFAULT_CONFIG)
+ for key in self.config:
+ if key in configs:
+ self.config[key] = configs[key]
self._client = client
self._subscriptions = subscriptions
- for config in ('key_deserializer', 'value_deserializer',
- 'fetch_min_bytes', 'fetch_max_wait_ms',
- 'max_partition_fetch_bytes', 'check_crcs',
- 'retry_backoff_ms'):
- if config in kwargs:
- setattr(self, '_' + config, kwargs.pop(config))
-
self._records = collections.deque() # (offset, topic_partition, messages)
self._unauthorized_topics = set()
self._offset_out_of_range_partitions = dict() # {topic_partition: offset}
@@ -204,7 +203,8 @@ class Fetcher(object):
" and hence cannot be ever returned."
" Increase the fetch size, or decrease the maximum message"
" size the broker will allow.",
- copied_record_too_large_partitions, self._max_partition_fetch_bytes)
+ copied_record_too_large_partitions,
+ self.config['max_partition_fetch_bytes'])
def fetched_records(self):
"""Returns previously fetched records and updates consumed offsets
@@ -255,7 +255,7 @@ class Fetcher(object):
for offset, size, msg in messages:
if msg.attributes:
raise Errors.KafkaError('Compressed messages not supported yet')
- elif self._check_crcs and not msg.validate_crc():
+ elif self.config['check_crcs'] and not msg.validate_crc():
raise Errors.InvalidMessageError(msg)
key, value = self._deserialize(msg)
@@ -269,12 +269,12 @@ class Fetcher(object):
return dict(drained)
def _deserialize(self, msg):
- if self._key_deserializer:
- key = self._key_deserializer(msg.key) # pylint: disable-msg=not-callable
+ if self.config['key_deserializer']:
+ key = self.config['key_deserializer'](msg.key) # pylint: disable-msg=not-callable
else:
key = msg.key
- if self._value_deserializer:
- value = self._value_deserializer(msg.value) # pylint: disable-msg=not-callable
+ if self.config['value_deserializer']:
+ value = self.config['value_deserializer'](msg.value) # pylint: disable-msg=not-callable
else:
value = msg.value
return key, value
@@ -376,7 +376,7 @@ class Fetcher(object):
partition_info = (
partition.partition,
fetched,
- self._max_partition_fetch_bytes
+ self.config['max_partition_fetch_bytes']
)
fetchable[node_id][partition.topic].append(partition_info)
else:
@@ -388,8 +388,8 @@ class Fetcher(object):
for node_id, partition_data in six.iteritems(fetchable):
requests[node_id] = FetchRequest(
-1, # replica_id
- self._fetch_max_wait_ms,
- self._fetch_min_bytes,
+ self.config['fetch_max_wait_ms'],
+ self.config['fetch_min_bytes'],
partition_data.items())
return requests
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index 63a1b2e..b7093f3 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -1,5 +1,6 @@
from __future__ import absolute_import
+import copy
import logging
import time
@@ -18,33 +19,36 @@ log = logging.getLogger(__name__)
class KafkaConsumer(object):
"""Consumer for Kafka 0.9"""
- _bootstrap_servers = 'localhost'
- _client_id = 'kafka-python-' + __version__
- _group_id = 'kafka-python-default-group'
- _key_deserializer = None
- _value_deserializer = None
- _fetch_max_wait_ms = 500
- _fetch_min_bytes = 1024
- _max_partition_fetch_bytes = 1 * 1024 * 1024
- _request_timeout_ms = 40 * 1000
- _retry_backoff_ms = 100
- _reconnect_backoff_ms = 50
- _auto_offset_reset = 'latest'
- _enable_auto_commit = True
- _auto_commit_interval_ms = 5000
- _check_crcs = True
- _metadata_max_age_ms = 5 * 60 * 1000
- _partition_assignment_strategy = (RoundRobinPartitionAssignor,)
- _heartbeat_interval_ms = 3000
- _session_timeout_ms = 30000
- _send_buffer_bytes = 128 * 1024
- _receive_buffer_bytes = 32 * 1024
- _connections_max_idle_ms = 9 * 60 * 1000 # not implemented yet
- #_metric_reporters = None
- #_metrics_num_samples = 2
- #_metrics_sample_window_ms = 30000
-
- def __init__(self, *topics, **kwargs):
+ DEFAULT_CONFIG = {
+ 'bootstrap_servers': 'localhost',
+ 'client_id': 'kafka-python-' + __version__,
+ 'group_id': 'kafka-python-default-group',
+ 'key_deserializer': None,
+ 'value_deserializer': None,
+ 'fetch_max_wait_ms': 500,
+ 'fetch_min_bytes': 1024,
+ 'max_partition_fetch_bytes': 1 * 1024 * 1024,
+ 'request_timeout_ms': 40 * 1000,
+ 'retry_backoff_ms': 100,
+ 'reconnect_backoff_ms': 50,
+ 'max_in_flight_requests_per_connection': 5,
+ 'auto_offset_reset': 'latest',
+ 'enable_auto_commit': True,
+ 'auto_commit_interval_ms': 5000,
+ 'check_crcs': True,
+ 'metadata_max_age_ms': 5 * 60 * 1000,
+ 'partition_assignment_strategy': (RoundRobinPartitionAssignor,),
+ 'heartbeat_interval_ms': 3000,
+ 'session_timeout_ms': 30000,
+ 'send_buffer_bytes': 128 * 1024,
+ 'receive_buffer_bytes': 32 * 1024,
+ 'connections_max_idle_ms': 9 * 60 * 1000, # not implemented yet
+ #'metric_reporters': None,
+ #'metrics_num_samples': 2,
+ #'metrics_sample_window_ms': 30000,
+ }
+
+ def __init__(self, *topics, **configs):
"""A Kafka client that consumes records from a Kafka cluster.
The consumer will transparently handle the failure of servers in the
@@ -79,8 +83,8 @@ class KafkaConsumer(object):
raw message value and returns a deserialized value.
fetch_min_bytes (int): Minimum amount of data the server should
return for a fetch request, otherwise wait up to
- fetch_wait_max_ms for more data to accumulate. Default: 1024.
- fetch_wait_max_ms (int): The maximum amount of time in milliseconds
+ fetch_max_wait_ms for more data to accumulate. Default: 1024.
+ fetch_max_wait_ms (int): The maximum amount of time in milliseconds
the server will block before answering the fetch request if
there isn't sufficient data to immediately satisfy the
requirement given by fetch_min_bytes. Default: 500.
@@ -97,8 +101,11 @@ class KafkaConsumer(object):
retry_backoff_ms (int): Milliseconds to backoff when retrying on
errors. Default: 100.
reconnect_backoff_ms (int): The amount of time in milliseconds to
- wait before attempting to reconnect to a given host. Defaults
- to 50.
+ wait before attempting to reconnect to a given host.
+ Default: 50.
+ max_in_flight_requests_per_connection (int): Requests are pipelined
+ to kafka brokers up to this number of maximum requests per
+ broker connection. Default: 5.
auto_offset_reset (str): A policy for resetting offsets on
OffsetOutOfRange errors: 'earliest' will move to the oldest
available message, 'latest' will move to the most recent. Any
@@ -137,29 +144,19 @@ class KafkaConsumer(object):
Configuration parameters are described in more detail at
https://kafka.apache.org/090/configuration.html#newconsumerconfigs
"""
- for config in ('bootstrap_servers', 'client_id', 'group_id',
- 'key_deserializer', 'value_deserializer',
- 'fetch_max_wait_ms', 'fetch_min_bytes',
- 'max_partition_fetch_bytes', 'request_timeout_ms',
- 'retry_backoff_ms', 'reconnect_backoff_ms',
- 'auto_offset_reset', 'enable_auto_commit',
- 'auto_commit_interval_ms', 'check_crcs',
- 'metadata_max_age_ms', 'partition_assignment_strategy',
- 'heartbeat_interval_ms', 'session_timeout_ms',
- 'send_buffer_bytes', 'receive_buffer_bytes'):
- if config in kwargs:
- setattr(self, '_' + config, kwargs[config])
-
- self._client = KafkaClient(**kwargs)
- self._subscription = SubscriptionState(self._auto_offset_reset)
+ self.config = copy.copy(self.DEFAULT_CONFIG)
+ for key in self.config:
+ if key in configs:
+ self.config[key] = configs[key]
+
+ self._client = KafkaClient(**self.config)
+ self._subscription = SubscriptionState(self.config['auto_offset_reset'])
self._fetcher = Fetcher(
- self._client, self._subscription, **kwargs)
+ self._client, self._subscription, **self.config)
self._coordinator = ConsumerCoordinator(
- self._client, self._group_id, self._subscription,
- enable_auto_commit=self._enable_auto_commit,
- auto_commit_interval_ms=self._auto_commit_interval_ms,
- assignors=self._partition_assignment_strategy,
- **kwargs)
+ self._client, self.config['group_id'], self._subscription,
+ assignors=self.config['partition_assignment_strategy'],
+ **self.config)
self._closed = False
#self.metrics = None
@@ -213,11 +210,11 @@ class KafkaConsumer(object):
#self.metrics.close()
self._client.close()
try:
- self._key_deserializer.close()
+ self.config['key_deserializer'].close()
except AttributeError:
pass
try:
- self._value_deserializer.close()
+ self.config['value_deserializer'].close()
except AttributeError:
pass
log.debug("The KafkaConsumer has closed.")