summaryrefslogtreecommitdiff
path: root/kafka/client.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/client.py')
-rw-r--r--kafka/client.py90
1 files changed, 53 insertions, 37 deletions
diff --git a/kafka/client.py b/kafka/client.py
index b09927d..7f9969e 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -3,7 +3,6 @@ import copy
import functools
import logging
import random
-import select
import time
import six
@@ -15,7 +14,9 @@ from kafka.common import (TopicAndPartition, BrokerMetadata, UnknownError,
LeaderNotAvailableError, UnknownTopicOrPartitionError,
NotLeaderForPartitionError, ReplicaNotAvailableError)
-from kafka.conn import collect_hosts, BrokerConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS
+from kafka.conn import (
+ collect_hosts, BrokerConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS,
+ ConnectionStates)
from kafka.protocol import KafkaProtocol
@@ -45,7 +46,6 @@ class KafkaClient(object):
self.load_metadata_for_topics() # bootstrap with all metadata
-
##################
# Private API #
##################
@@ -56,11 +56,14 @@ class KafkaClient(object):
if host_key not in self._conns:
self._conns[host_key] = BrokerConnection(
host, port,
- timeout=self.timeout,
+ request_timeout_ms=self.timeout * 1000,
client_id=self.client_id
)
- return self._conns[host_key]
+ conn = self._conns[host_key]
+ while conn.connect() == ConnectionStates.CONNECTING:
+ pass
+ return conn
def _get_leader_for_partition(self, topic, partition):
"""
@@ -137,16 +140,23 @@ class KafkaClient(object):
for (host, port) in hosts:
conn = self._get_conn(host, port)
+ if not conn.connected():
+ log.warning("Skipping unconnected connection: %s", conn)
+ continue
request = encoder_fn(payloads=payloads)
- correlation_id = conn.send(request)
- if correlation_id is None:
+ future = conn.send(request)
+
+ # Block
+ while not future.is_done:
+ conn.recv()
+
+ if future.failed():
+ log.error("Request failed: %s", future.exception)
continue
- response = conn.recv()
- if response is not None:
- decoded = decoder_fn(response)
- return decoded
- raise KafkaUnavailableError('All servers failed to process request')
+ return decoder_fn(future.value)
+
+ raise KafkaUnavailableError('All servers failed to process request: %s' % hosts)
def _payloads_by_broker(self, payloads):
payloads_by_broker = collections.defaultdict(list)
@@ -204,55 +214,59 @@ class KafkaClient(object):
# For each BrokerConnection keep the real socket so that we can use
# a select to perform unblocking I/O
- connections_by_socket = {}
+ connections_by_future = {}
for broker, broker_payloads in six.iteritems(payloads_by_broker):
if broker is None:
failed_payloads(broker_payloads)
continue
conn = self._get_conn(broker.host, broker.port)
+ conn.connect()
+ if not conn.connected():
+ refresh_metadata = True
+ failed_payloads(broker_payloads)
+ continue
+
request = encoder_fn(payloads=broker_payloads)
# decoder_fn=None signal that the server is expected to not
# send a response. This probably only applies to
# ProduceRequest w/ acks = 0
expect_response = (decoder_fn is not None)
- correlation_id = conn.send(request, expect_response=expect_response)
+ future = conn.send(request, expect_response=expect_response)
- if correlation_id is None:
+ if future.failed():
refresh_metadata = True
failed_payloads(broker_payloads)
- log.warning('Error attempting to send request %s '
- 'to server %s', correlation_id, broker)
continue
if not expect_response:
- log.debug('Request %s does not expect a response '
- '(skipping conn.recv)', correlation_id)
for payload in broker_payloads:
topic_partition = (str(payload.topic), payload.partition)
responses[topic_partition] = None
continue
- connections_by_socket[conn._read_fd] = (conn, broker)
+ connections_by_future[future] = (conn, broker)
conn = None
- while connections_by_socket:
- sockets = connections_by_socket.keys()
- rlist, _, _ = select.select(sockets, [], [], None)
- conn, broker = connections_by_socket.pop(rlist[0])
- correlation_id = conn.next_correlation_id_recv()
- response = conn.recv()
- if response is None:
- refresh_metadata = True
- failed_payloads(payloads_by_broker[broker])
- log.warning('Error receiving response to request %s '
- 'from server %s', correlation_id, broker)
- continue
+ while connections_by_future:
+ futures = list(connections_by_future.keys())
+ for future in futures:
+
+ if not future.is_done:
+ conn, _ = connections_by_future[future]
+ conn.recv()
+ continue
- for payload_response in decoder_fn(response):
- topic_partition = (str(payload_response.topic),
- payload_response.partition)
- responses[topic_partition] = payload_response
+ _, broker = connections_by_future.pop(future)
+ if future.failed():
+ refresh_metadata = True
+ failed_payloads(payloads_by_broker[broker])
+
+ else:
+ for payload_response in decoder_fn(future.value):
+ topic_partition = (str(payload_response.topic),
+ payload_response.partition)
+ responses[topic_partition] = payload_response
if refresh_metadata:
self.reset_all_metadata()
@@ -392,7 +406,9 @@ class KafkaClient(object):
def reinit(self):
for conn in self._conns.values():
- conn.reinit()
+ conn.close()
+ while conn.connect() == ConnectionStates.CONNECTING:
+ pass
def reset_topic_metadata(self, *topics):
for topic in topics: