diff options
Diffstat (limited to 'kafka/consumer/fetcher.py')
-rw-r--r-- | kafka/consumer/fetcher.py | 50 |
1 files changed, 25 insertions, 25 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index ea9c8b9..39e1244 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -1,6 +1,7 @@ from __future__ import absolute_import import collections +import copy import logging import six @@ -28,27 +29,25 @@ class RecordTooLargeError(Errors.KafkaError): class Fetcher(object): - _key_deserializer = None - _value_deserializer = None - _fetch_min_bytes = 1024 - _fetch_max_wait_ms = 500 - _max_partition_fetch_bytes = 1048576 - _check_crcs = True - _retry_backoff_ms = 100 - - def __init__(self, client, subscriptions, **kwargs): + DEFAULT_CONFIG = { + 'key_deserializer': None, + 'value_deserializer': None, + 'fetch_min_bytes': 1024, + 'fetch_max_wait_ms': 500, + 'max_partition_fetch_bytes': 1048576, + 'check_crcs': True, + } + + def __init__(self, client, subscriptions, **configs): #metrics=None, #metric_group_prefix='consumer', + self.config = copy.copy(self.DEFAULT_CONFIG) + for key in self.config: + if key in configs: + self.config[key] = configs[key] self._client = client self._subscriptions = subscriptions - for config in ('key_deserializer', 'value_deserializer', - 'fetch_min_bytes', 'fetch_max_wait_ms', - 'max_partition_fetch_bytes', 'check_crcs', - 'retry_backoff_ms'): - if config in kwargs: - setattr(self, '_' + config, kwargs.pop(config)) - self._records = collections.deque() # (offset, topic_partition, messages) self._unauthorized_topics = set() self._offset_out_of_range_partitions = dict() # {topic_partition: offset} @@ -204,7 +203,8 @@ class Fetcher(object): " and hence cannot be ever returned." " Increase the fetch size, or decrease the maximum message" " size the broker will allow.", - copied_record_too_large_partitions, self._max_partition_fetch_bytes) + copied_record_too_large_partitions, + self.config['max_partition_fetch_bytes']) def fetched_records(self): """Returns previously fetched records and updates consumed offsets @@ -255,7 +255,7 @@ class Fetcher(object): for offset, size, msg in messages: if msg.attributes: raise Errors.KafkaError('Compressed messages not supported yet') - elif self._check_crcs and not msg.validate_crc(): + elif self.config['check_crcs'] and not msg.validate_crc(): raise Errors.InvalidMessageError(msg) key, value = self._deserialize(msg) @@ -269,12 +269,12 @@ class Fetcher(object): return dict(drained) def _deserialize(self, msg): - if self._key_deserializer: - key = self._key_deserializer(msg.key) # pylint: disable-msg=not-callable + if self.config['key_deserializer']: + key = self.config['key_deserializer'](msg.key) # pylint: disable-msg=not-callable else: key = msg.key - if self._value_deserializer: - value = self._value_deserializer(msg.value) # pylint: disable-msg=not-callable + if self.config['value_deserializer']: + value = self.config['value_deserializer'](msg.value) # pylint: disable-msg=not-callable else: value = msg.value return key, value @@ -376,7 +376,7 @@ class Fetcher(object): partition_info = ( partition.partition, fetched, - self._max_partition_fetch_bytes + self.config['max_partition_fetch_bytes'] ) fetchable[node_id][partition.topic].append(partition_info) else: @@ -388,8 +388,8 @@ class Fetcher(object): for node_id, partition_data in six.iteritems(fetchable): requests[node_id] = FetchRequest( -1, # replica_id - self._fetch_max_wait_ms, - self._fetch_min_bytes, + self.config['fetch_max_wait_ms'], + self.config['fetch_min_bytes'], partition_data.items()) return requests |