diff options
Diffstat (limited to 'kafka/consumer/group.py')
-rw-r--r-- | kafka/consumer/group.py | 105 |
1 files changed, 51 insertions, 54 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 63a1b2e..b7093f3 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -1,5 +1,6 @@ from __future__ import absolute_import +import copy import logging import time @@ -18,33 +19,36 @@ log = logging.getLogger(__name__) class KafkaConsumer(object): """Consumer for Kafka 0.9""" - _bootstrap_servers = 'localhost' - _client_id = 'kafka-python-' + __version__ - _group_id = 'kafka-python-default-group' - _key_deserializer = None - _value_deserializer = None - _fetch_max_wait_ms = 500 - _fetch_min_bytes = 1024 - _max_partition_fetch_bytes = 1 * 1024 * 1024 - _request_timeout_ms = 40 * 1000 - _retry_backoff_ms = 100 - _reconnect_backoff_ms = 50 - _auto_offset_reset = 'latest' - _enable_auto_commit = True - _auto_commit_interval_ms = 5000 - _check_crcs = True - _metadata_max_age_ms = 5 * 60 * 1000 - _partition_assignment_strategy = (RoundRobinPartitionAssignor,) - _heartbeat_interval_ms = 3000 - _session_timeout_ms = 30000 - _send_buffer_bytes = 128 * 1024 - _receive_buffer_bytes = 32 * 1024 - _connections_max_idle_ms = 9 * 60 * 1000 # not implemented yet - #_metric_reporters = None - #_metrics_num_samples = 2 - #_metrics_sample_window_ms = 30000 - - def __init__(self, *topics, **kwargs): + DEFAULT_CONFIG = { + 'bootstrap_servers': 'localhost', + 'client_id': 'kafka-python-' + __version__, + 'group_id': 'kafka-python-default-group', + 'key_deserializer': None, + 'value_deserializer': None, + 'fetch_max_wait_ms': 500, + 'fetch_min_bytes': 1024, + 'max_partition_fetch_bytes': 1 * 1024 * 1024, + 'request_timeout_ms': 40 * 1000, + 'retry_backoff_ms': 100, + 'reconnect_backoff_ms': 50, + 'max_in_flight_requests_per_connection': 5, + 'auto_offset_reset': 'latest', + 'enable_auto_commit': True, + 'auto_commit_interval_ms': 5000, + 'check_crcs': True, + 'metadata_max_age_ms': 5 * 60 * 1000, + 'partition_assignment_strategy': (RoundRobinPartitionAssignor,), + 'heartbeat_interval_ms': 3000, + 'session_timeout_ms': 30000, + 'send_buffer_bytes': 128 * 1024, + 'receive_buffer_bytes': 32 * 1024, + 'connections_max_idle_ms': 9 * 60 * 1000, # not implemented yet + #'metric_reporters': None, + #'metrics_num_samples': 2, + #'metrics_sample_window_ms': 30000, + } + + def __init__(self, *topics, **configs): """A Kafka client that consumes records from a Kafka cluster. The consumer will transparently handle the failure of servers in the @@ -79,8 +83,8 @@ class KafkaConsumer(object): raw message value and returns a deserialized value. fetch_min_bytes (int): Minimum amount of data the server should return for a fetch request, otherwise wait up to - fetch_wait_max_ms for more data to accumulate. Default: 1024. - fetch_wait_max_ms (int): The maximum amount of time in milliseconds + fetch_max_wait_ms for more data to accumulate. Default: 1024. + fetch_max_wait_ms (int): The maximum amount of time in milliseconds the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy the requirement given by fetch_min_bytes. Default: 500. @@ -97,8 +101,11 @@ class KafkaConsumer(object): retry_backoff_ms (int): Milliseconds to backoff when retrying on errors. Default: 100. reconnect_backoff_ms (int): The amount of time in milliseconds to - wait before attempting to reconnect to a given host. Defaults - to 50. + wait before attempting to reconnect to a given host. + Default: 50. + max_in_flight_requests_per_connection (int): Requests are pipelined + to kafka brokers up to this number of maximum requests per + broker connection. Default: 5. auto_offset_reset (str): A policy for resetting offsets on OffsetOutOfRange errors: 'earliest' will move to the oldest available message, 'latest' will move to the most recent. Any @@ -137,29 +144,19 @@ class KafkaConsumer(object): Configuration parameters are described in more detail at https://kafka.apache.org/090/configuration.html#newconsumerconfigs """ - for config in ('bootstrap_servers', 'client_id', 'group_id', - 'key_deserializer', 'value_deserializer', - 'fetch_max_wait_ms', 'fetch_min_bytes', - 'max_partition_fetch_bytes', 'request_timeout_ms', - 'retry_backoff_ms', 'reconnect_backoff_ms', - 'auto_offset_reset', 'enable_auto_commit', - 'auto_commit_interval_ms', 'check_crcs', - 'metadata_max_age_ms', 'partition_assignment_strategy', - 'heartbeat_interval_ms', 'session_timeout_ms', - 'send_buffer_bytes', 'receive_buffer_bytes'): - if config in kwargs: - setattr(self, '_' + config, kwargs[config]) - - self._client = KafkaClient(**kwargs) - self._subscription = SubscriptionState(self._auto_offset_reset) + self.config = copy.copy(self.DEFAULT_CONFIG) + for key in self.config: + if key in configs: + self.config[key] = configs[key] + + self._client = KafkaClient(**self.config) + self._subscription = SubscriptionState(self.config['auto_offset_reset']) self._fetcher = Fetcher( - self._client, self._subscription, **kwargs) + self._client, self._subscription, **self.config) self._coordinator = ConsumerCoordinator( - self._client, self._group_id, self._subscription, - enable_auto_commit=self._enable_auto_commit, - auto_commit_interval_ms=self._auto_commit_interval_ms, - assignors=self._partition_assignment_strategy, - **kwargs) + self._client, self.config['group_id'], self._subscription, + assignors=self.config['partition_assignment_strategy'], + **self.config) self._closed = False #self.metrics = None @@ -213,11 +210,11 @@ class KafkaConsumer(object): #self.metrics.close() self._client.close() try: - self._key_deserializer.close() + self.config['key_deserializer'].close() except AttributeError: pass try: - self._value_deserializer.close() + self.config['value_deserializer'].close() except AttributeError: pass log.debug("The KafkaConsumer has closed.") |