summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/client.py156
1 files changed, 77 insertions, 79 deletions
diff --git a/kafka/client.py b/kafka/client.py
index 4cd9e24..9eb8a0d 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -150,8 +150,10 @@ class KafkaClient(object):
List of response objects in the same order as the supplied payloads
"""
+ log.debug("Sending Payloads: %s" % payloads)
+
# Group the requests by topic+partition
- original_keys = []
+ brokers_for_payloads = []
payloads_by_broker = collections.defaultdict(list)
for payload in payloads:
@@ -159,67 +161,89 @@ class KafkaClient(object):
payload.partition)
payloads_by_broker[leader].append(payload)
- original_keys.append((payload.topic, payload.partition))
-
- # Accumulate the responses in a dictionary
- acc = {}
-
- # keep a list of payloads that were failed to be sent to brokers
- failed_payloads = []
+ brokers_for_payloads.append(leader)
# For each broker, send the list of request payloads
+ # and collect the responses and errors
+ responses_by_broker = collections.defaultdict(list)
+ broker_failures = []
for broker, payloads in payloads_by_broker.items():
conn = self._get_conn(broker.host.decode('utf-8'), broker.port)
requestId = self._next_id()
request = encoder_fn(client_id=self.client_id,
correlation_id=requestId, payloads=payloads)
- failed = False
# Send the request, recv the response
try:
conn.send(requestId, request)
+ except ConnectionError as e:
+ broker_failures.append(broker)
+ log.warning("Could not send request [%s] to server %s: %s",
+ binascii.b2a_hex(request), conn, e)
+
+ for payload in payloads:
+ responses_by_broker[broker].append(FailedPayloadsError(payload))
+
+ # No exception, try to get response
+ else:
+
# decoder_fn=None signal that the server is expected to not
# send a response. This probably only applies to
# ProduceRequest w/ acks = 0
if decoder_fn is None:
+ for payload in payloads:
+ responses_by_broker[broker].append(None)
continue
try:
response = conn.recv(requestId)
except ConnectionError as e:
+ broker_failures.append(broker)
log.warning("Could not receive response to request [%s] "
- "from server %s: %s", binascii.b2a_hex(request), conn, e)
- failed = True
- except ConnectionError as e:
- log.warning("Could not send request [%s] to server %s: %s",
- binascii.b2a_hex(request), conn, e)
- failed = True
+ "from server %s: %s",
+ binascii.b2a_hex(request), conn, e)
- if failed:
- failed_payloads += payloads
- self.reset_all_metadata()
- continue
+ for payload in payloads:
+ responses_by_broker[broker].append(FailedPayloadsError(payload))
- for response in decoder_fn(response):
- acc[(response.topic, response.partition)] = response
+ else:
+
+ for payload_response in decoder_fn(response):
+ responses_by_broker[broker].append(payload_response)
- if failed_payloads:
- raise FailedPayloadsError(failed_payloads)
+ # Connection errors generally mean stale metadata
+ # although sometimes it means incorrect api request
+ # Unfortunately there is no good way to tell the difference
+ # so we'll just reset metadata on all errors to be safe
+ if broker_failures:
+ self.reset_all_metadata()
- # Order the accumulated responses by the original key order
- return (acc[k] for k in original_keys) if acc else ()
+ # Return responses in the same order as provided
+ responses_by_payload = [responses_by_broker[broker].pop(0)
+ for broker in brokers_for_payloads]
+ log.debug('Responses: %s' % responses_by_payload)
+ return responses_by_payload
def __repr__(self):
return '<KafkaClient client_id=%s>' % (self.client_id)
def _raise_on_response_error(self, resp):
+
+ # Response can be an unraised exception object (FailedPayloadsError)
+ if isinstance(resp, Exception):
+ raise resp
+
+ # Or a server api error response
try:
kafka.common.check_error(resp)
except (UnknownTopicOrPartitionError, NotLeaderForPartitionError):
self.reset_topic_metadata(resp.topic)
raise
+ # Return False if no error to enable list comprehensions
+ return False
+
#################
# Public API #
#################
@@ -396,14 +420,25 @@ class KafkaClient(object):
same order as the list of payloads specified
Arguments:
- payloads: list of ProduceRequest
- fail_on_error: boolean, should we raise an Exception if we
- encounter an API error?
- callback: function, instead of returning the ProduceResponse,
- first pass it through this function
+ payloads (list of ProduceRequest): produce requests to send to kafka
+ acks (int, optional): how many acks the servers should receive from replica
+ brokers before responding to the request. If it is 0, the server
+ will not send any response. If it is 1, the server will wait
+ until the data is written to the local log before sending a
+ response. If it is -1, the server will wait until the message
+ is committed by all in-sync replicas before sending a response.
+ For any value > 1, the server will wait for this number of acks to
+ occur (but the server will never wait for more acknowledgements than
+ there are in-sync replicas). defaults to 1.
+ timeout (int, optional): maximum time in milliseconds the server can
+ await the receipt of the number of acks, defaults to 1000.
+ fail_on_error (bool, optional): raise exceptions on connection and
+ server response errors, defaults to True.
+ callback (function, optional): instead of returning the ProduceResponse,
+ first pass it through this function, defaults to None.
Returns:
- list of ProduceResponse or callback(ProduceResponse), in the
+ list of ProduceResponses, or callback results if supplied, in the
order of input payloads
"""
@@ -419,16 +454,9 @@ class KafkaClient(object):
resps = self._send_broker_aware_request(payloads, encoder, decoder)
- out = []
- for resp in resps:
- if fail_on_error is True:
- self._raise_on_response_error(resp)
-
- if callback is not None:
- out.append(callback(resp))
- else:
- out.append(resp)
- return out
+ return [resp if not callback else callback(resp) for resp in resps
+ if resp is not None and
+ (not fail_on_error or not self._raise_on_response_error(resp))]
def send_fetch_request(self, payloads=[], fail_on_error=True,
callback=None, max_wait_time=100, min_bytes=4096):
@@ -447,16 +475,8 @@ class KafkaClient(object):
payloads, encoder,
KafkaProtocol.decode_fetch_response)
- out = []
- for resp in resps:
- if fail_on_error is True:
- self._raise_on_response_error(resp)
-
- if callback is not None:
- out.append(callback(resp))
- else:
- out.append(resp)
- return out
+ return [resp if not callback else callback(resp) for resp in resps
+ if not fail_on_error or not self._raise_on_response_error(resp)]
def send_offset_request(self, payloads=[], fail_on_error=True,
callback=None):
@@ -465,15 +485,8 @@ class KafkaClient(object):
KafkaProtocol.encode_offset_request,
KafkaProtocol.decode_offset_response)
- out = []
- for resp in resps:
- if fail_on_error is True:
- self._raise_on_response_error(resp)
- if callback is not None:
- out.append(callback(resp))
- else:
- out.append(resp)
- return out
+ return [resp if not callback else callback(resp) for resp in resps
+ if not fail_on_error or not self._raise_on_response_error(resp)]
def send_offset_commit_request(self, group, payloads=[],
fail_on_error=True, callback=None):
@@ -482,16 +495,8 @@ class KafkaClient(object):
decoder = KafkaProtocol.decode_offset_commit_response
resps = self._send_broker_aware_request(payloads, encoder, decoder)
- out = []
- for resp in resps:
- if fail_on_error is True:
- self._raise_on_response_error(resp)
-
- if callback is not None:
- out.append(callback(resp))
- else:
- out.append(resp)
- return out
+ return [resp if not callback else callback(resp) for resp in resps
+ if not fail_on_error or not self._raise_on_response_error(resp)]
def send_offset_fetch_request(self, group, payloads=[],
fail_on_error=True, callback=None):
@@ -501,12 +506,5 @@ class KafkaClient(object):
decoder = KafkaProtocol.decode_offset_fetch_response
resps = self._send_broker_aware_request(payloads, encoder, decoder)
- out = []
- for resp in resps:
- if fail_on_error is True:
- self._raise_on_response_error(resp)
- if callback is not None:
- out.append(callback(resp))
- else:
- out.append(resp)
- return out
+ return [resp if not callback else callback(resp) for resp in resps
+ if not fail_on_error or not self._raise_on_response_error(resp)]