diff options
Diffstat (limited to 'kafka/client.py')
-rw-r--r-- | kafka/client.py | 90 |
1 files changed, 53 insertions, 37 deletions
diff --git a/kafka/client.py b/kafka/client.py index b09927d..7f9969e 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -3,7 +3,6 @@ import copy import functools import logging import random -import select import time import six @@ -15,7 +14,9 @@ from kafka.common import (TopicAndPartition, BrokerMetadata, UnknownError, LeaderNotAvailableError, UnknownTopicOrPartitionError, NotLeaderForPartitionError, ReplicaNotAvailableError) -from kafka.conn import collect_hosts, BrokerConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS +from kafka.conn import ( + collect_hosts, BrokerConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS, + ConnectionStates) from kafka.protocol import KafkaProtocol @@ -45,7 +46,6 @@ class KafkaClient(object): self.load_metadata_for_topics() # bootstrap with all metadata - ################## # Private API # ################## @@ -56,11 +56,14 @@ class KafkaClient(object): if host_key not in self._conns: self._conns[host_key] = BrokerConnection( host, port, - timeout=self.timeout, + request_timeout_ms=self.timeout * 1000, client_id=self.client_id ) - return self._conns[host_key] + conn = self._conns[host_key] + while conn.connect() == ConnectionStates.CONNECTING: + pass + return conn def _get_leader_for_partition(self, topic, partition): """ @@ -137,16 +140,23 @@ class KafkaClient(object): for (host, port) in hosts: conn = self._get_conn(host, port) + if not conn.connected(): + log.warning("Skipping unconnected connection: %s", conn) + continue request = encoder_fn(payloads=payloads) - correlation_id = conn.send(request) - if correlation_id is None: + future = conn.send(request) + + # Block + while not future.is_done: + conn.recv() + + if future.failed(): + log.error("Request failed: %s", future.exception) continue - response = conn.recv() - if response is not None: - decoded = decoder_fn(response) - return decoded - raise KafkaUnavailableError('All servers failed to process request') + return decoder_fn(future.value) + + raise KafkaUnavailableError('All servers failed to process request: %s' % hosts) def _payloads_by_broker(self, payloads): payloads_by_broker = collections.defaultdict(list) @@ -204,55 +214,59 @@ class KafkaClient(object): # For each BrokerConnection keep the real socket so that we can use # a select to perform unblocking I/O - connections_by_socket = {} + connections_by_future = {} for broker, broker_payloads in six.iteritems(payloads_by_broker): if broker is None: failed_payloads(broker_payloads) continue conn = self._get_conn(broker.host, broker.port) + conn.connect() + if not conn.connected(): + refresh_metadata = True + failed_payloads(broker_payloads) + continue + request = encoder_fn(payloads=broker_payloads) # decoder_fn=None signal that the server is expected to not # send a response. This probably only applies to # ProduceRequest w/ acks = 0 expect_response = (decoder_fn is not None) - correlation_id = conn.send(request, expect_response=expect_response) + future = conn.send(request, expect_response=expect_response) - if correlation_id is None: + if future.failed(): refresh_metadata = True failed_payloads(broker_payloads) - log.warning('Error attempting to send request %s ' - 'to server %s', correlation_id, broker) continue if not expect_response: - log.debug('Request %s does not expect a response ' - '(skipping conn.recv)', correlation_id) for payload in broker_payloads: topic_partition = (str(payload.topic), payload.partition) responses[topic_partition] = None continue - connections_by_socket[conn._read_fd] = (conn, broker) + connections_by_future[future] = (conn, broker) conn = None - while connections_by_socket: - sockets = connections_by_socket.keys() - rlist, _, _ = select.select(sockets, [], [], None) - conn, broker = connections_by_socket.pop(rlist[0]) - correlation_id = conn.next_correlation_id_recv() - response = conn.recv() - if response is None: - refresh_metadata = True - failed_payloads(payloads_by_broker[broker]) - log.warning('Error receiving response to request %s ' - 'from server %s', correlation_id, broker) - continue + while connections_by_future: + futures = list(connections_by_future.keys()) + for future in futures: + + if not future.is_done: + conn, _ = connections_by_future[future] + conn.recv() + continue - for payload_response in decoder_fn(response): - topic_partition = (str(payload_response.topic), - payload_response.partition) - responses[topic_partition] = payload_response + _, broker = connections_by_future.pop(future) + if future.failed(): + refresh_metadata = True + failed_payloads(payloads_by_broker[broker]) + + else: + for payload_response in decoder_fn(future.value): + topic_partition = (str(payload_response.topic), + payload_response.partition) + responses[topic_partition] = payload_response if refresh_metadata: self.reset_all_metadata() @@ -392,7 +406,9 @@ class KafkaClient(object): def reinit(self): for conn in self._conns.values(): - conn.reinit() + conn.close() + while conn.connect() == ConnectionStates.CONNECTING: + pass def reset_topic_metadata(self, *topics): for topic in topics: |