diff options
author | Dana Powers <dana.powers@rd.io> | 2015-04-05 17:46:13 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2015-04-06 10:42:36 -0700 |
commit | 5cc051353eff67713bd4885372e1aee12fab22f0 (patch) | |
tree | 9c39ec2111b0cb20d991cbae8671ad7d8e746af8 /kafka/client.py | |
parent | 46ee816327ece5759c0125fdbf799171fb5a021f (diff) | |
download | kafka-python-5cc051353eff67713bd4885372e1aee12fab22f0.tar.gz |
Refactor KafkaClient._send_broker_aware_request to return a list of responses
and include individual (unraised) FailedPayloadsError instances rather
than always raising a FailedPayloadsError. This should allow producers
to determine which payloads succeeded and which failed, specifically.
Diffstat (limited to 'kafka/client.py')
-rw-r--r-- | kafka/client.py | 130 |
1 files changed, 57 insertions, 73 deletions
diff --git a/kafka/client.py b/kafka/client.py index 4cd9e24..93eeb52 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -151,7 +151,7 @@ class KafkaClient(object): """ # Group the requests by topic+partition - original_keys = [] + brokers_for_payloads = [] payloads_by_broker = collections.defaultdict(list) for payload in payloads: @@ -159,67 +159,88 @@ class KafkaClient(object): payload.partition) payloads_by_broker[leader].append(payload) - original_keys.append((payload.topic, payload.partition)) - - # Accumulate the responses in a dictionary - acc = {} - - # keep a list of payloads that were failed to be sent to brokers - failed_payloads = [] + brokers_for_payloads.append(leader) # For each broker, send the list of request payloads + # and collect the responses and errors + responses_by_broker = collections.defaultdict(list) + broker_failures = [] for broker, payloads in payloads_by_broker.items(): conn = self._get_conn(broker.host.decode('utf-8'), broker.port) requestId = self._next_id() request = encoder_fn(client_id=self.client_id, correlation_id=requestId, payloads=payloads) - failed = False # Send the request, recv the response try: conn.send(requestId, request) + except ConnectionError as e: + broker_failures.append(broker) + log.warning("Could not send request [%s] to server %s: %s", + binascii.b2a_hex(request), conn, e) + + for payload in payloads: + responses_by_broker[broker].append(FailedPayloadsError(payload)) + + # No exception, try to get response + else: + # decoder_fn=None signal that the server is expected to not # send a response. This probably only applies to # ProduceRequest w/ acks = 0 if decoder_fn is None: + for payload in payloads: + responses_by_broker[broker].append(None) continue try: response = conn.recv(requestId) except ConnectionError as e: + broker_failures.append(broker) log.warning("Could not receive response to request [%s] " - "from server %s: %s", binascii.b2a_hex(request), conn, e) - failed = True - except ConnectionError as e: - log.warning("Could not send request [%s] to server %s: %s", - binascii.b2a_hex(request), conn, e) - failed = True + "from server %s: %s", + binascii.b2a_hex(request), conn, e) - if failed: - failed_payloads += payloads - self.reset_all_metadata() - continue + for payload in payloads: + responses_by_broker[broker].append(FailedPayloadsError(payload)) - for response in decoder_fn(response): - acc[(response.topic, response.partition)] = response + else: + + for payload_response in decoder_fn(response): + responses_by_broker[broker].append(payload_response) - if failed_payloads: - raise FailedPayloadsError(failed_payloads) + # Connection errors generally mean stale metadata + # although sometimes it means incorrect api request + # Unfortunately there is no good way to tell the difference + # so we'll just reset metadata on all errors to be safe + if broker_failures: + self.reset_all_metadata() - # Order the accumulated responses by the original key order - return (acc[k] for k in original_keys) if acc else () + # Return responses in the same order as provided + responses_by_payload = [responses_by_broker[broker].pop(0) + for broker in brokers_for_payloads] + return responses_by_payload def __repr__(self): return '<KafkaClient client_id=%s>' % (self.client_id) def _raise_on_response_error(self, resp): + + # Response can be an unraised exception object (FailedPayloadsError) + if isinstance(resp, Exception): + raise resp + + # Or a server api error response try: kafka.common.check_error(resp) except (UnknownTopicOrPartitionError, NotLeaderForPartitionError): self.reset_topic_metadata(resp.topic) raise + # Return False if no error to enable list comprehensions + return False + ################# # Public API # ################# @@ -419,16 +440,9 @@ class KafkaClient(object): resps = self._send_broker_aware_request(payloads, encoder, decoder) - out = [] - for resp in resps: - if fail_on_error is True: - self._raise_on_response_error(resp) - - if callback is not None: - out.append(callback(resp)) - else: - out.append(resp) - return out + return [resp if not callback else callback(resp) for resp in resps + if resp is not None and + (not fail_on_error or not self._raise_on_response_error(resp))] def send_fetch_request(self, payloads=[], fail_on_error=True, callback=None, max_wait_time=100, min_bytes=4096): @@ -447,16 +461,8 @@ class KafkaClient(object): payloads, encoder, KafkaProtocol.decode_fetch_response) - out = [] - for resp in resps: - if fail_on_error is True: - self._raise_on_response_error(resp) - - if callback is not None: - out.append(callback(resp)) - else: - out.append(resp) - return out + return [resp if not callback else callback(resp) for resp in resps + if not fail_on_error or not self._raise_on_response_error(resp)] def send_offset_request(self, payloads=[], fail_on_error=True, callback=None): @@ -465,15 +471,8 @@ class KafkaClient(object): KafkaProtocol.encode_offset_request, KafkaProtocol.decode_offset_response) - out = [] - for resp in resps: - if fail_on_error is True: - self._raise_on_response_error(resp) - if callback is not None: - out.append(callback(resp)) - else: - out.append(resp) - return out + return [resp if not callback else callback(resp) for resp in resps + if not fail_on_error or not self._raise_on_response_error(resp)] def send_offset_commit_request(self, group, payloads=[], fail_on_error=True, callback=None): @@ -482,16 +481,8 @@ class KafkaClient(object): decoder = KafkaProtocol.decode_offset_commit_response resps = self._send_broker_aware_request(payloads, encoder, decoder) - out = [] - for resp in resps: - if fail_on_error is True: - self._raise_on_response_error(resp) - - if callback is not None: - out.append(callback(resp)) - else: - out.append(resp) - return out + return [resp if not callback else callback(resp) for resp in resps + if not fail_on_error or not self._raise_on_response_error(resp)] def send_offset_fetch_request(self, group, payloads=[], fail_on_error=True, callback=None): @@ -501,12 +492,5 @@ class KafkaClient(object): decoder = KafkaProtocol.decode_offset_fetch_response resps = self._send_broker_aware_request(payloads, encoder, decoder) - out = [] - for resp in resps: - if fail_on_error is True: - self._raise_on_response_error(resp) - if callback is not None: - out.append(callback(resp)) - else: - out.append(resp) - return out + return [resp if not callback else callback(resp) for resp in resps + if not fail_on_error or not self._raise_on_response_error(resp)] |