summaryrefslogtreecommitdiff
path: root/kafka/consumer/fetcher.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/consumer/fetcher.py')
-rw-r--r--kafka/consumer/fetcher.py50
1 files changed, 25 insertions, 25 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index ea9c8b9..39e1244 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -1,6 +1,7 @@
from __future__ import absolute_import
import collections
+import copy
import logging
import six
@@ -28,27 +29,25 @@ class RecordTooLargeError(Errors.KafkaError):
class Fetcher(object):
- _key_deserializer = None
- _value_deserializer = None
- _fetch_min_bytes = 1024
- _fetch_max_wait_ms = 500
- _max_partition_fetch_bytes = 1048576
- _check_crcs = True
- _retry_backoff_ms = 100
-
- def __init__(self, client, subscriptions, **kwargs):
+ DEFAULT_CONFIG = {
+ 'key_deserializer': None,
+ 'value_deserializer': None,
+ 'fetch_min_bytes': 1024,
+ 'fetch_max_wait_ms': 500,
+ 'max_partition_fetch_bytes': 1048576,
+ 'check_crcs': True,
+ }
+
+ def __init__(self, client, subscriptions, **configs):
#metrics=None,
#metric_group_prefix='consumer',
+ self.config = copy.copy(self.DEFAULT_CONFIG)
+ for key in self.config:
+ if key in configs:
+ self.config[key] = configs[key]
self._client = client
self._subscriptions = subscriptions
- for config in ('key_deserializer', 'value_deserializer',
- 'fetch_min_bytes', 'fetch_max_wait_ms',
- 'max_partition_fetch_bytes', 'check_crcs',
- 'retry_backoff_ms'):
- if config in kwargs:
- setattr(self, '_' + config, kwargs.pop(config))
-
self._records = collections.deque() # (offset, topic_partition, messages)
self._unauthorized_topics = set()
self._offset_out_of_range_partitions = dict() # {topic_partition: offset}
@@ -204,7 +203,8 @@ class Fetcher(object):
" and hence cannot be ever returned."
" Increase the fetch size, or decrease the maximum message"
" size the broker will allow.",
- copied_record_too_large_partitions, self._max_partition_fetch_bytes)
+ copied_record_too_large_partitions,
+ self.config['max_partition_fetch_bytes'])
def fetched_records(self):
"""Returns previously fetched records and updates consumed offsets
@@ -255,7 +255,7 @@ class Fetcher(object):
for offset, size, msg in messages:
if msg.attributes:
raise Errors.KafkaError('Compressed messages not supported yet')
- elif self._check_crcs and not msg.validate_crc():
+ elif self.config['check_crcs'] and not msg.validate_crc():
raise Errors.InvalidMessageError(msg)
key, value = self._deserialize(msg)
@@ -269,12 +269,12 @@ class Fetcher(object):
return dict(drained)
def _deserialize(self, msg):
- if self._key_deserializer:
- key = self._key_deserializer(msg.key) # pylint: disable-msg=not-callable
+ if self.config['key_deserializer']:
+ key = self.config['key_deserializer'](msg.key) # pylint: disable-msg=not-callable
else:
key = msg.key
- if self._value_deserializer:
- value = self._value_deserializer(msg.value) # pylint: disable-msg=not-callable
+ if self.config['value_deserializer']:
+ value = self.config['value_deserializer'](msg.value) # pylint: disable-msg=not-callable
else:
value = msg.value
return key, value
@@ -376,7 +376,7 @@ class Fetcher(object):
partition_info = (
partition.partition,
fetched,
- self._max_partition_fetch_bytes
+ self.config['max_partition_fetch_bytes']
)
fetchable[node_id][partition.topic].append(partition_info)
else:
@@ -388,8 +388,8 @@ class Fetcher(object):
for node_id, partition_data in six.iteritems(fetchable):
requests[node_id] = FetchRequest(
-1, # replica_id
- self._fetch_max_wait_ms,
- self._fetch_min_bytes,
+ self.config['fetch_max_wait_ms'],
+ self.config['fetch_min_bytes'],
partition_data.items())
return requests