diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-07-16 20:20:45 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-07-16 20:54:55 -0700 |
commit | ed0bffbcc652ee6c74297aee8a473d13af928562 (patch) | |
tree | fbe10f9d2f5eb3c8c92a1c4c2742f2d88e29d1b5 /kafka | |
parent | 947625bfa4b6462e3f7c0fdad0a0cd69708beb2c (diff) | |
download | kafka-python-config_selector.tar.gz |
Expose selector type as config optionconfig_selector
Diffstat (limited to 'kafka')
-rw-r--r-- | kafka/client_async.py | 6 | ||||
-rw-r--r-- | kafka/consumer/group.py | 6 | ||||
-rw-r--r-- | kafka/producer/kafka.py | 6 |
3 files changed, 15 insertions, 3 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index 6fa9434..fb9d9ad 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -63,6 +63,7 @@ class KafkaClient(object): 'ssl_crlfile': None, 'api_version': None, 'api_version_auto_timeout_ms': 2000, + 'selector': selectors.DefaultSelector, } API_VERSIONS = [ (0, 10), @@ -134,6 +135,9 @@ class KafkaClient(object): api_version_auto_timeout_ms (int): number of milliseconds to throw a timeout exception from the constructor when checking the broker api version. Only applies if api_version is None + selector (selectors.BaseSelector): Provide a specific selector + implementation to use for I/O multiplexing. + Default: selectors.DefaultSelector """ self.config = copy.copy(self.DEFAULT_CONFIG) for key in self.config: @@ -149,7 +153,7 @@ class KafkaClient(object): self._topics = set() # empty set will fetch all topic metadata self._metadata_refresh_in_progress = False self._last_no_node_available_ms = 0 - self._selector = selectors.DefaultSelector() + self._selector = self.config['selector']() self._conns = {} self._connecting = set() self._refresh_on_disconnects = True diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 982cd7b..5343a9b 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -6,7 +6,7 @@ import time import six -from kafka.client_async import KafkaClient +from kafka.client_async import KafkaClient, selectors from kafka.consumer.fetcher import Fetcher from kafka.consumer.subscription_state import SubscriptionState from kafka.coordinator.consumer import ConsumerCoordinator @@ -173,6 +173,9 @@ class KafkaConsumer(six.Iterator): metrics. Default: 2 metrics_sample_window_ms (int): The maximum age in milliseconds of samples used to compute metrics. Default: 30000 + selector (selectors.BaseSelector): Provide a specific selector + implementation to use for I/O multiplexing. + Default: selectors.DefaultSelector Note: Configuration parameters are described in more detail at @@ -218,6 +221,7 @@ class KafkaConsumer(six.Iterator): 'metric_reporters': [], 'metrics_num_samples': 2, 'metrics_sample_window_ms': 30000, + 'selector': selectors.DefaultSelector, } def __init__(self, *topics, **configs): diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 70c0cd0..c9e16d2 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -8,7 +8,7 @@ import time import weakref from .. import errors as Errors -from ..client_async import KafkaClient +from ..client_async import KafkaClient, selectors from ..metrics import MetricConfig, Metrics from ..partitioner.default import DefaultPartitioner from ..protocol.message import Message, MessageSet @@ -228,6 +228,9 @@ class KafkaProducer(object): metrics. Default: 2 metrics_sample_window_ms (int): The maximum age in milliseconds of samples used to compute metrics. Default: 30000 + selector (selectors.BaseSelector): Provide a specific selector + implementation to use for I/O multiplexing. + Default: selectors.DefaultSelector Note: Configuration parameters are described in more detail at @@ -267,6 +270,7 @@ class KafkaProducer(object): 'metric_reporters': [], 'metrics_num_samples': 2, 'metrics_sample_window_ms': 30000, + 'selector': selectors.DefaultSelector, } def __init__(self, **configs): |