summaryrefslogtreecommitdiff
path: root/kafka
diff options
context:
space:
mode:
authorMahendra M <mahendra.m@gmail.com>2013-10-08 08:50:52 +0530
committerMahendra M <mahendra.m@gmail.com>2013-10-08 08:50:52 +0530
commitf9cf62816ff2c2255d414a2d9f3dd32d8c81418b (patch)
treeb43b90fcdaaef0839329b20a02c79f8229773b26 /kafka
parent75de0f00956eb7cf0394fcfabb6a7d63057409fe (diff)
parenteb2c1735f26ce11540fb92ea94817f43b9b3a798 (diff)
downloadkafka-python-f9cf62816ff2c2255d414a2d9f3dd32d8c81418b.tar.gz
Merge branch 'master' into prod-windows
Conflicts: kafka/producer.py
Diffstat (limited to 'kafka')
-rw-r--r--kafka/client.py24
-rw-r--r--kafka/common.py5
-rw-r--r--kafka/conn.py28
-rw-r--r--kafka/producer.py11
4 files changed, 49 insertions, 19 deletions
diff --git a/kafka/client.py b/kafka/client.py
index b7ceb2e..71ededa 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -3,9 +3,11 @@ from collections import defaultdict
from functools import partial
from itertools import count
import logging
+import socket
import time
from kafka.common import ErrorMapping, TopicAndPartition
+from kafka.common import ConnectionError, FailedPayloadsException
from kafka.conn import KafkaConnection
from kafka.protocol import KafkaProtocol
@@ -71,7 +73,7 @@ class KafkaClient(object):
log.debug("Broker metadata: %s", brokers)
log.debug("Topic metadata: %s", topics)
- self.brokers.update(brokers)
+ self.brokers = brokers
self.topics_to_brokers = {}
for topic, partitions in topics.items():
@@ -147,13 +149,15 @@ class KafkaClient(object):
for payload in payloads:
leader = self._get_leader_for_partition(payload.topic,
payload.partition)
-
payloads_by_broker[leader].append(payload)
original_keys.append((payload.topic, payload.partition))
# Accumulate the responses in a dictionary
acc = {}
+ # keep a list of payloads that were failed to be sent to brokers
+ failed_payloads = []
+
# For each broker, send the list of request payloads
for broker, payloads in payloads_by_broker.items():
conn = self._get_conn_for_broker(broker)
@@ -162,15 +166,23 @@ class KafkaClient(object):
correlation_id=requestId, payloads=payloads)
# Send the request, recv the response
- conn.send(requestId, request)
-
- if decoder_fn is None:
+ try:
+ conn.send(requestId, request)
+ if decoder_fn is None:
+ continue
+ response = conn.recv(requestId)
+ except ConnectionError, e: # ignore BufferUnderflow for now
+ log.warning("Could not send request [%s] to server %s: %s" % (request, conn, e))
+ failed_payloads += payloads
+ self.topics_to_brokers = {} # reset metadata
continue
- response = conn.recv(requestId)
for response in decoder_fn(response):
acc[(response.topic, response.partition)] = response
+ if failed_payloads:
+ raise FailedPayloadsException(failed_payloads)
+
# Order the accumulated responses by the original key order
return (acc[k] for k in original_keys) if acc else ()
diff --git a/kafka/common.py b/kafka/common.py
index 8f3154c..6f0dd32 100644
--- a/kafka/common.py
+++ b/kafka/common.py
@@ -69,6 +69,11 @@ class ErrorMapping(object):
# Exceptions #
#################
+class FailedPayloadsException(Exception):
+ pass
+
+class ConnectionError(Exception):
+ pass
class BufferUnderflowError(Exception):
pass
diff --git a/kafka/conn.py b/kafka/conn.py
index 194a19c..14aebc6 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -5,7 +5,7 @@ import struct
from threading import local
from kafka.common import BufferUnderflowError
-
+from kafka.common import ConnectionError
log = logging.getLogger("kafka")
@@ -27,6 +27,7 @@ class KafkaConnection(local):
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._sock.connect((host, port))
self._sock.settimeout(10)
+ self._dirty = False
def __str__(self):
return "<KafkaConnection host=%s port=%d>" % (self.host, self.port)
@@ -54,7 +55,7 @@ class KafkaConnection(local):
# Read the size off of the header
resp = self._sock.recv(4)
if resp == "":
- raise Exception("Got no response from Kafka")
+ self._raise_connection_error()
(size,) = struct.unpack('>i', resp)
messagesize = size - 4
@@ -72,6 +73,10 @@ class KafkaConnection(local):
total += len(resp)
yield resp
+ def _raise_connection_error(self):
+ self._dirty = True
+ raise ConnectionError("Kafka @ {}:{} went away".format(self.host, self.port))
+
##################
# Public API #
##################
@@ -80,14 +85,16 @@ class KafkaConnection(local):
def send(self, request_id, payload):
"Send a request to Kafka"
-
- log.debug(
- "About to send %d bytes to Kafka, request %d" %
- (len(payload), request_id))
-
- sent = self._sock.sendall(payload)
- if sent is not None:
- raise RuntimeError("Kafka went away")
+ log.debug("About to send %d bytes to Kafka, request %d" % (len(payload), request_id))
+ try:
+ if self._dirty:
+ self.reinit()
+ sent = self._sock.sendall(payload)
+ if sent is not None:
+ self._raise_connection_error()
+ except socket.error:
+ log.exception('Unable to send payload to Kafka')
+ self._raise_connection_error()
def recv(self, request_id):
"""
@@ -121,3 +128,4 @@ class KafkaConnection(local):
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._sock.connect((self.host, self.port))
self._sock.settimeout(10)
+ self._dirty = False
diff --git a/kafka/producer.py b/kafka/producer.py
index a7bfe28..7ef7896 100644
--- a/kafka/producer.py
+++ b/kafka/producer.py
@@ -7,6 +7,7 @@ import logging
import sys
from kafka.common import ProduceRequest
+from kafka.common import FailedPayloadsException
from kafka.protocol import create_message
from kafka.partitioner import HashedPartitioner
@@ -67,7 +68,7 @@ def _send_upstream(topic, queue, client, batch_time, batch_size,
acks=req_acks,
timeout=ack_timeout)
except Exception as exp:
- log.error("Error sending message", exc_info=sys.exc_info())
+ log.exception("Unable to send message")
class Producer(object):
@@ -140,8 +141,12 @@ class Producer(object):
else:
messages = [create_message(m) for m in msg]
req = ProduceRequest(self.topic, partition, messages)
- resp = self.client.send_produce_request([req], acks=self.req_acks,
- timeout=self.ack_timeout)
+ try:
+ resp = self.client.send_produce_request([req], acks=self.req_acks,
+ timeout=self.ack_timeout)
+ except Exception as e:
+ log.exception("Unable to send messages")
+ raise e
return resp
def stop(self, timeout=1):