diff options
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r-- | kafka/client_async.py | 9 |
1 files changed, 6 insertions, 3 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index caa88cf..58f22d4 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -201,10 +201,15 @@ class KafkaClient(object): if key in configs: self.config[key] = configs[key] + # these properties need to be set on top of the initialization pipeline + # because they are used when __del__ method is called + self._closed = False + self._wake_r, self._wake_w = socket.socketpair() + self._selector = self.config['selector']() + self.cluster = ClusterMetadata(**self.config) self._topics = set() # empty set will fetch all topic metadata self._metadata_refresh_in_progress = False - self._selector = self.config['selector']() self._conns = Dict() # object to support weakrefs self._api_versions = None self._connecting = set() @@ -212,7 +217,6 @@ class KafkaClient(object): self._refresh_on_disconnects = True self._last_bootstrap = 0 self._bootstrap_fails = 0 - self._wake_r, self._wake_w = socket.socketpair() self._wake_r.setblocking(False) self._wake_w.settimeout(self.config['wakeup_timeout_ms'] / 1000.0) self._wake_lock = threading.Lock() @@ -226,7 +230,6 @@ class KafkaClient(object): self._selector.register(self._wake_r, selectors.EVENT_READ) self._idle_expiry_manager = IdleConnectionManager(self.config['connections_max_idle_ms']) - self._closed = False self._sensors = None if self.config['metrics']: self._sensors = KafkaClientMetrics(self.config['metrics'], |