summaryrefslogtreecommitdiff
path: root/kafka/client.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/client.py')
-rw-r--r--kafka/client.py37
1 files changed, 26 insertions, 11 deletions
diff --git a/kafka/client.py b/kafka/client.py
index 2bd2324..6a1a63b 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -67,8 +67,12 @@ class SimpleClient(object):
)
conn = self._conns[host_key]
- while conn.connect() == ConnectionStates.CONNECTING:
- pass
+ timeout = time.time() + self.timeout
+ while time.time() < timeout:
+ if conn.connect() is ConnectionStates.CONNECTED:
+ break
+ else:
+ raise ConnectionError("%s:%s (%s)" % (host, port, afi))
return conn
def _get_leader_for_partition(self, topic, partition):
@@ -149,9 +153,11 @@ class SimpleClient(object):
random.shuffle(hosts)
for (host, port, afi) in hosts:
- conn = self._get_conn(host, port, afi)
- if not conn.connected():
- log.warning("Skipping unconnected connection: %s", conn)
+ try:
+ conn = self._get_conn(host, port, afi)
+ except ConnectionError:
+ log.warning("Skipping unconnected connection: %s:%s (AFI %s)",
+ host, port, afi)
continue
request = encoder_fn(payloads=payloads)
future = conn.send(request)
@@ -233,9 +239,9 @@ class SimpleClient(object):
host, port, afi = get_ip_port_afi(broker.host)
- conn = self._get_conn(host, broker.port, afi)
- conn.connect()
- if not conn.connected():
+ try:
+ conn = self._get_conn(host, broker.port, afi)
+ except ConnectionError:
refresh_metadata = True
failed_payloads(broker_payloads)
continue
@@ -419,10 +425,19 @@ class SimpleClient(object):
return c
def reinit(self):
- for conn in self._conns.values():
+ timeout = time.time() + self.timeout
+ conns = set(self._conns.values())
+ for conn in conns:
conn.close()
- while conn.connect() == ConnectionStates.CONNECTING:
- pass
+ conn.connect()
+
+ while time.time() < timeout:
+ for conn in list(conns):
+ conn.connect()
+ if conn.connected():
+ conns.remove(conn)
+ if not conns:
+ break
def reset_topic_metadata(self, *topics):
for topic in topics: