diff options
author | Mahendra M <mahendra.m@gmail.com> | 2013-10-08 08:50:52 +0530 |
---|---|---|
committer | Mahendra M <mahendra.m@gmail.com> | 2013-10-08 08:50:52 +0530 |
commit | f9cf62816ff2c2255d414a2d9f3dd32d8c81418b (patch) | |
tree | b43b90fcdaaef0839329b20a02c79f8229773b26 /kafka | |
parent | 75de0f00956eb7cf0394fcfabb6a7d63057409fe (diff) | |
parent | eb2c1735f26ce11540fb92ea94817f43b9b3a798 (diff) | |
download | kafka-python-f9cf62816ff2c2255d414a2d9f3dd32d8c81418b.tar.gz |
Merge branch 'master' into prod-windows
Conflicts:
kafka/producer.py
Diffstat (limited to 'kafka')
-rw-r--r-- | kafka/client.py | 24 | ||||
-rw-r--r-- | kafka/common.py | 5 | ||||
-rw-r--r-- | kafka/conn.py | 28 | ||||
-rw-r--r-- | kafka/producer.py | 11 |
4 files changed, 49 insertions, 19 deletions
diff --git a/kafka/client.py b/kafka/client.py index b7ceb2e..71ededa 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -3,9 +3,11 @@ from collections import defaultdict from functools import partial from itertools import count import logging +import socket import time from kafka.common import ErrorMapping, TopicAndPartition +from kafka.common import ConnectionError, FailedPayloadsException from kafka.conn import KafkaConnection from kafka.protocol import KafkaProtocol @@ -71,7 +73,7 @@ class KafkaClient(object): log.debug("Broker metadata: %s", brokers) log.debug("Topic metadata: %s", topics) - self.brokers.update(brokers) + self.brokers = brokers self.topics_to_brokers = {} for topic, partitions in topics.items(): @@ -147,13 +149,15 @@ class KafkaClient(object): for payload in payloads: leader = self._get_leader_for_partition(payload.topic, payload.partition) - payloads_by_broker[leader].append(payload) original_keys.append((payload.topic, payload.partition)) # Accumulate the responses in a dictionary acc = {} + # keep a list of payloads that were failed to be sent to brokers + failed_payloads = [] + # For each broker, send the list of request payloads for broker, payloads in payloads_by_broker.items(): conn = self._get_conn_for_broker(broker) @@ -162,15 +166,23 @@ class KafkaClient(object): correlation_id=requestId, payloads=payloads) # Send the request, recv the response - conn.send(requestId, request) - - if decoder_fn is None: + try: + conn.send(requestId, request) + if decoder_fn is None: + continue + response = conn.recv(requestId) + except ConnectionError, e: # ignore BufferUnderflow for now + log.warning("Could not send request [%s] to server %s: %s" % (request, conn, e)) + failed_payloads += payloads + self.topics_to_brokers = {} # reset metadata continue - response = conn.recv(requestId) for response in decoder_fn(response): acc[(response.topic, response.partition)] = response + if failed_payloads: + raise FailedPayloadsException(failed_payloads) + # Order the accumulated responses by the original key order return (acc[k] for k in original_keys) if acc else () diff --git a/kafka/common.py b/kafka/common.py index 8f3154c..6f0dd32 100644 --- a/kafka/common.py +++ b/kafka/common.py @@ -69,6 +69,11 @@ class ErrorMapping(object): # Exceptions # ################# +class FailedPayloadsException(Exception): + pass + +class ConnectionError(Exception): + pass class BufferUnderflowError(Exception): pass diff --git a/kafka/conn.py b/kafka/conn.py index 194a19c..14aebc6 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -5,7 +5,7 @@ import struct from threading import local from kafka.common import BufferUnderflowError - +from kafka.common import ConnectionError log = logging.getLogger("kafka") @@ -27,6 +27,7 @@ class KafkaConnection(local): self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._sock.connect((host, port)) self._sock.settimeout(10) + self._dirty = False def __str__(self): return "<KafkaConnection host=%s port=%d>" % (self.host, self.port) @@ -54,7 +55,7 @@ class KafkaConnection(local): # Read the size off of the header resp = self._sock.recv(4) if resp == "": - raise Exception("Got no response from Kafka") + self._raise_connection_error() (size,) = struct.unpack('>i', resp) messagesize = size - 4 @@ -72,6 +73,10 @@ class KafkaConnection(local): total += len(resp) yield resp + def _raise_connection_error(self): + self._dirty = True + raise ConnectionError("Kafka @ {}:{} went away".format(self.host, self.port)) + ################## # Public API # ################## @@ -80,14 +85,16 @@ class KafkaConnection(local): def send(self, request_id, payload): "Send a request to Kafka" - - log.debug( - "About to send %d bytes to Kafka, request %d" % - (len(payload), request_id)) - - sent = self._sock.sendall(payload) - if sent is not None: - raise RuntimeError("Kafka went away") + log.debug("About to send %d bytes to Kafka, request %d" % (len(payload), request_id)) + try: + if self._dirty: + self.reinit() + sent = self._sock.sendall(payload) + if sent is not None: + self._raise_connection_error() + except socket.error: + log.exception('Unable to send payload to Kafka') + self._raise_connection_error() def recv(self, request_id): """ @@ -121,3 +128,4 @@ class KafkaConnection(local): self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._sock.connect((self.host, self.port)) self._sock.settimeout(10) + self._dirty = False diff --git a/kafka/producer.py b/kafka/producer.py index a7bfe28..7ef7896 100644 --- a/kafka/producer.py +++ b/kafka/producer.py @@ -7,6 +7,7 @@ import logging import sys from kafka.common import ProduceRequest +from kafka.common import FailedPayloadsException from kafka.protocol import create_message from kafka.partitioner import HashedPartitioner @@ -67,7 +68,7 @@ def _send_upstream(topic, queue, client, batch_time, batch_size, acks=req_acks, timeout=ack_timeout) except Exception as exp: - log.error("Error sending message", exc_info=sys.exc_info()) + log.exception("Unable to send message") class Producer(object): @@ -140,8 +141,12 @@ class Producer(object): else: messages = [create_message(m) for m in msg] req = ProduceRequest(self.topic, partition, messages) - resp = self.client.send_produce_request([req], acks=self.req_acks, - timeout=self.ack_timeout) + try: + resp = self.client.send_produce_request([req], acks=self.req_acks, + timeout=self.ack_timeout) + except Exception as e: + log.exception("Unable to send messages") + raise e return resp def stop(self, timeout=1): |