diff options
author | Dana Powers <dana.powers@rd.io> | 2015-12-29 19:08:35 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2015-12-29 19:08:35 -0800 |
commit | 3afdd285a3c92a2c4add5b2b1bd94cfcec4fedd9 (patch) | |
tree | 2c38fd2c577442cb90c99ee2d49b5b0f68300303 /kafka/consumer | |
parent | e5c7d81e7c35e6b013cece347ef42d9f21d03aa6 (diff) | |
download | kafka-python-3afdd285a3c92a2c4add5b2b1bd94cfcec4fedd9.tar.gz |
Switch configs from attributes to dict to make passing / inspecting easier
Diffstat (limited to 'kafka/consumer')
-rw-r--r-- | kafka/consumer/fetcher.py | 50 | ||||
-rw-r--r-- | kafka/consumer/group.py | 105 |
2 files changed, 76 insertions, 79 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index ea9c8b9..39e1244 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -1,6 +1,7 @@ from __future__ import absolute_import import collections +import copy import logging import six @@ -28,27 +29,25 @@ class RecordTooLargeError(Errors.KafkaError): class Fetcher(object): - _key_deserializer = None - _value_deserializer = None - _fetch_min_bytes = 1024 - _fetch_max_wait_ms = 500 - _max_partition_fetch_bytes = 1048576 - _check_crcs = True - _retry_backoff_ms = 100 - - def __init__(self, client, subscriptions, **kwargs): + DEFAULT_CONFIG = { + 'key_deserializer': None, + 'value_deserializer': None, + 'fetch_min_bytes': 1024, + 'fetch_max_wait_ms': 500, + 'max_partition_fetch_bytes': 1048576, + 'check_crcs': True, + } + + def __init__(self, client, subscriptions, **configs): #metrics=None, #metric_group_prefix='consumer', + self.config = copy.copy(self.DEFAULT_CONFIG) + for key in self.config: + if key in configs: + self.config[key] = configs[key] self._client = client self._subscriptions = subscriptions - for config in ('key_deserializer', 'value_deserializer', - 'fetch_min_bytes', 'fetch_max_wait_ms', - 'max_partition_fetch_bytes', 'check_crcs', - 'retry_backoff_ms'): - if config in kwargs: - setattr(self, '_' + config, kwargs.pop(config)) - self._records = collections.deque() # (offset, topic_partition, messages) self._unauthorized_topics = set() self._offset_out_of_range_partitions = dict() # {topic_partition: offset} @@ -204,7 +203,8 @@ class Fetcher(object): " and hence cannot be ever returned." " Increase the fetch size, or decrease the maximum message" " size the broker will allow.", - copied_record_too_large_partitions, self._max_partition_fetch_bytes) + copied_record_too_large_partitions, + self.config['max_partition_fetch_bytes']) def fetched_records(self): """Returns previously fetched records and updates consumed offsets @@ -255,7 +255,7 @@ class Fetcher(object): for offset, size, msg in messages: if msg.attributes: raise Errors.KafkaError('Compressed messages not supported yet') - elif self._check_crcs and not msg.validate_crc(): + elif self.config['check_crcs'] and not msg.validate_crc(): raise Errors.InvalidMessageError(msg) key, value = self._deserialize(msg) @@ -269,12 +269,12 @@ class Fetcher(object): return dict(drained) def _deserialize(self, msg): - if self._key_deserializer: - key = self._key_deserializer(msg.key) # pylint: disable-msg=not-callable + if self.config['key_deserializer']: + key = self.config['key_deserializer'](msg.key) # pylint: disable-msg=not-callable else: key = msg.key - if self._value_deserializer: - value = self._value_deserializer(msg.value) # pylint: disable-msg=not-callable + if self.config['value_deserializer']: + value = self.config['value_deserializer'](msg.value) # pylint: disable-msg=not-callable else: value = msg.value return key, value @@ -376,7 +376,7 @@ class Fetcher(object): partition_info = ( partition.partition, fetched, - self._max_partition_fetch_bytes + self.config['max_partition_fetch_bytes'] ) fetchable[node_id][partition.topic].append(partition_info) else: @@ -388,8 +388,8 @@ class Fetcher(object): for node_id, partition_data in six.iteritems(fetchable): requests[node_id] = FetchRequest( -1, # replica_id - self._fetch_max_wait_ms, - self._fetch_min_bytes, + self.config['fetch_max_wait_ms'], + self.config['fetch_min_bytes'], partition_data.items()) return requests diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 63a1b2e..b7093f3 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -1,5 +1,6 @@ from __future__ import absolute_import +import copy import logging import time @@ -18,33 +19,36 @@ log = logging.getLogger(__name__) class KafkaConsumer(object): """Consumer for Kafka 0.9""" - _bootstrap_servers = 'localhost' - _client_id = 'kafka-python-' + __version__ - _group_id = 'kafka-python-default-group' - _key_deserializer = None - _value_deserializer = None - _fetch_max_wait_ms = 500 - _fetch_min_bytes = 1024 - _max_partition_fetch_bytes = 1 * 1024 * 1024 - _request_timeout_ms = 40 * 1000 - _retry_backoff_ms = 100 - _reconnect_backoff_ms = 50 - _auto_offset_reset = 'latest' - _enable_auto_commit = True - _auto_commit_interval_ms = 5000 - _check_crcs = True - _metadata_max_age_ms = 5 * 60 * 1000 - _partition_assignment_strategy = (RoundRobinPartitionAssignor,) - _heartbeat_interval_ms = 3000 - _session_timeout_ms = 30000 - _send_buffer_bytes = 128 * 1024 - _receive_buffer_bytes = 32 * 1024 - _connections_max_idle_ms = 9 * 60 * 1000 # not implemented yet - #_metric_reporters = None - #_metrics_num_samples = 2 - #_metrics_sample_window_ms = 30000 - - def __init__(self, *topics, **kwargs): + DEFAULT_CONFIG = { + 'bootstrap_servers': 'localhost', + 'client_id': 'kafka-python-' + __version__, + 'group_id': 'kafka-python-default-group', + 'key_deserializer': None, + 'value_deserializer': None, + 'fetch_max_wait_ms': 500, + 'fetch_min_bytes': 1024, + 'max_partition_fetch_bytes': 1 * 1024 * 1024, + 'request_timeout_ms': 40 * 1000, + 'retry_backoff_ms': 100, + 'reconnect_backoff_ms': 50, + 'max_in_flight_requests_per_connection': 5, + 'auto_offset_reset': 'latest', + 'enable_auto_commit': True, + 'auto_commit_interval_ms': 5000, + 'check_crcs': True, + 'metadata_max_age_ms': 5 * 60 * 1000, + 'partition_assignment_strategy': (RoundRobinPartitionAssignor,), + 'heartbeat_interval_ms': 3000, + 'session_timeout_ms': 30000, + 'send_buffer_bytes': 128 * 1024, + 'receive_buffer_bytes': 32 * 1024, + 'connections_max_idle_ms': 9 * 60 * 1000, # not implemented yet + #'metric_reporters': None, + #'metrics_num_samples': 2, + #'metrics_sample_window_ms': 30000, + } + + def __init__(self, *topics, **configs): """A Kafka client that consumes records from a Kafka cluster. The consumer will transparently handle the failure of servers in the @@ -79,8 +83,8 @@ class KafkaConsumer(object): raw message value and returns a deserialized value. fetch_min_bytes (int): Minimum amount of data the server should return for a fetch request, otherwise wait up to - fetch_wait_max_ms for more data to accumulate. Default: 1024. - fetch_wait_max_ms (int): The maximum amount of time in milliseconds + fetch_max_wait_ms for more data to accumulate. Default: 1024. + fetch_max_wait_ms (int): The maximum amount of time in milliseconds the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy the requirement given by fetch_min_bytes. Default: 500. @@ -97,8 +101,11 @@ class KafkaConsumer(object): retry_backoff_ms (int): Milliseconds to backoff when retrying on errors. Default: 100. reconnect_backoff_ms (int): The amount of time in milliseconds to - wait before attempting to reconnect to a given host. Defaults - to 50. + wait before attempting to reconnect to a given host. + Default: 50. + max_in_flight_requests_per_connection (int): Requests are pipelined + to kafka brokers up to this number of maximum requests per + broker connection. Default: 5. auto_offset_reset (str): A policy for resetting offsets on OffsetOutOfRange errors: 'earliest' will move to the oldest available message, 'latest' will move to the most recent. Any @@ -137,29 +144,19 @@ class KafkaConsumer(object): Configuration parameters are described in more detail at https://kafka.apache.org/090/configuration.html#newconsumerconfigs """ - for config in ('bootstrap_servers', 'client_id', 'group_id', - 'key_deserializer', 'value_deserializer', - 'fetch_max_wait_ms', 'fetch_min_bytes', - 'max_partition_fetch_bytes', 'request_timeout_ms', - 'retry_backoff_ms', 'reconnect_backoff_ms', - 'auto_offset_reset', 'enable_auto_commit', - 'auto_commit_interval_ms', 'check_crcs', - 'metadata_max_age_ms', 'partition_assignment_strategy', - 'heartbeat_interval_ms', 'session_timeout_ms', - 'send_buffer_bytes', 'receive_buffer_bytes'): - if config in kwargs: - setattr(self, '_' + config, kwargs[config]) - - self._client = KafkaClient(**kwargs) - self._subscription = SubscriptionState(self._auto_offset_reset) + self.config = copy.copy(self.DEFAULT_CONFIG) + for key in self.config: + if key in configs: + self.config[key] = configs[key] + + self._client = KafkaClient(**self.config) + self._subscription = SubscriptionState(self.config['auto_offset_reset']) self._fetcher = Fetcher( - self._client, self._subscription, **kwargs) + self._client, self._subscription, **self.config) self._coordinator = ConsumerCoordinator( - self._client, self._group_id, self._subscription, - enable_auto_commit=self._enable_auto_commit, - auto_commit_interval_ms=self._auto_commit_interval_ms, - assignors=self._partition_assignment_strategy, - **kwargs) + self._client, self.config['group_id'], self._subscription, + assignors=self.config['partition_assignment_strategy'], + **self.config) self._closed = False #self.metrics = None @@ -213,11 +210,11 @@ class KafkaConsumer(object): #self.metrics.close() self._client.close() try: - self._key_deserializer.close() + self.config['key_deserializer'].close() except AttributeError: pass try: - self._value_deserializer.close() + self.config['value_deserializer'].close() except AttributeError: pass log.debug("The KafkaConsumer has closed.") |