diff options
author | Jim Lim <jim@quixey.com> | 2013-09-27 14:02:10 -0700 |
---|---|---|
committer | Jim Lim <jim@quixey.com> | 2013-10-04 14:49:44 -0700 |
commit | a6c99b287b1cf9c39068be74d72150808588dd43 (patch) | |
tree | 0c036a3d0691c6c7f69166bdc373fbe4e79b2705 /kafka | |
parent | cfd9f86e60429d1f7af8bcac5849808354b8719e (diff) | |
download | kafka-python-a6c99b287b1cf9c39068be74d72150808588dd43.tar.gz |
make changes to be more fault tolerant: clean up connections, brokers, failed_messages
- add integration tests for sync producer
- add integration tests for async producer w. leadership election
- use log.exception
Diffstat (limited to 'kafka')
-rw-r--r-- | kafka/client.py | 24 | ||||
-rw-r--r-- | kafka/common.py | 5 | ||||
-rw-r--r-- | kafka/conn.py | 28 | ||||
-rw-r--r-- | kafka/producer.py | 11 |
4 files changed, 49 insertions, 19 deletions
diff --git a/kafka/client.py b/kafka/client.py index 965cbc5..c0a3cdb 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -2,9 +2,11 @@ from collections import defaultdict from functools import partial from itertools import count import logging +import socket import time from kafka.common import ErrorMapping, TopicAndPartition +from kafka.common import ConnectionError, FailedPayloadsException from kafka.conn import KafkaConnection from kafka.protocol import KafkaProtocol @@ -70,7 +72,7 @@ class KafkaClient(object): log.debug("Broker metadata: %s", brokers) log.debug("Topic metadata: %s", topics) - self.brokers.update(brokers) + self.brokers = brokers self.topics_to_brokers = {} for topic, partitions in topics.items(): @@ -146,13 +148,15 @@ class KafkaClient(object): for payload in payloads: leader = self._get_leader_for_partition(payload.topic, payload.partition) - payloads_by_broker[leader].append(payload) original_keys.append((payload.topic, payload.partition)) # Accumulate the responses in a dictionary acc = {} + # keep a list of payloads that were failed to be sent to brokers + failed_payloads = [] + # For each broker, send the list of request payloads for broker, payloads in payloads_by_broker.items(): conn = self._get_conn_for_broker(broker) @@ -161,15 +165,23 @@ class KafkaClient(object): correlation_id=requestId, payloads=payloads) # Send the request, recv the response - conn.send(requestId, request) - - if decoder_fn is None: + try: + conn.send(requestId, request) + if decoder_fn is None: + continue + response = conn.recv(requestId) + except ConnectionError, e: # ignore BufferUnderflow for now + log.warning("Could not send request [%s] to server %s: %s" % (request, conn, e)) + failed_payloads += payloads + self.topics_to_brokers = {} # reset metadata continue - response = conn.recv(requestId) for response in decoder_fn(response): acc[(response.topic, response.partition)] = response + if failed_payloads: + raise FailedPayloadsException(failed_payloads) + # Order the accumulated responses by the original key order return (acc[k] for k in original_keys) if acc else () diff --git a/kafka/common.py b/kafka/common.py index 8f3154c..6f0dd32 100644 --- a/kafka/common.py +++ b/kafka/common.py @@ -69,6 +69,11 @@ class ErrorMapping(object): # Exceptions # ################# +class FailedPayloadsException(Exception): + pass + +class ConnectionError(Exception): + pass class BufferUnderflowError(Exception): pass diff --git a/kafka/conn.py b/kafka/conn.py index e85fd11..9356731 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -4,7 +4,7 @@ import struct from threading import local from kafka.common import BufferUnderflowError - +from kafka.common import ConnectionError log = logging.getLogger("kafka") @@ -26,6 +26,7 @@ class KafkaConnection(local): self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._sock.connect((host, port)) self._sock.settimeout(10) + self._dirty = False def __str__(self): return "<KafkaConnection host=%s port=%d>" % (self.host, self.port) @@ -53,7 +54,7 @@ class KafkaConnection(local): # Read the size off of the header resp = self._sock.recv(4) if resp == "": - raise Exception("Got no response from Kafka") + self._raise_connection_error() (size,) = struct.unpack('>i', resp) messagesize = size - 4 @@ -71,6 +72,10 @@ class KafkaConnection(local): total += len(resp) yield resp + def _raise_connection_error(self): + self._dirty = True + raise ConnectionError("Kafka @ {}:{} went away".format(self.host, self.port)) + ################## # Public API # ################## @@ -79,14 +84,16 @@ class KafkaConnection(local): def send(self, request_id, payload): "Send a request to Kafka" - - log.debug( - "About to send %d bytes to Kafka, request %d" % - (len(payload), request_id)) - - sent = self._sock.sendall(payload) - if sent is not None: - raise RuntimeError("Kafka went away") + log.debug("About to send %d bytes to Kafka, request %d" % (len(payload), request_id)) + try: + if self._dirty: + self.reinit() + sent = self._sock.sendall(payload) + if sent is not None: + self._raise_connection_error() + except socket.error: + log.exception('Unable to send payload to Kafka') + self._raise_connection_error() def recv(self, request_id): """ @@ -110,3 +117,4 @@ class KafkaConnection(local): self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._sock.connect((self.host, self.port)) self._sock.settimeout(10) + self._dirty = False diff --git a/kafka/producer.py b/kafka/producer.py index 5f23285..cceb584 100644 --- a/kafka/producer.py +++ b/kafka/producer.py @@ -7,6 +7,7 @@ import logging import sys from kafka.common import ProduceRequest +from kafka.common import FailedPayloadsException from kafka.protocol import create_message from kafka.partitioner import HashedPartitioner @@ -113,7 +114,7 @@ class Producer(object): self.client.send_produce_request(reqs, acks=self.req_acks, timeout=self.ack_timeout) except Exception: - log.error("Error sending message", exc_info=sys.exc_info()) + log.exception("Unable to send message") def send_messages(self, partition, *msg): """ @@ -126,8 +127,12 @@ class Producer(object): else: messages = [create_message(m) for m in msg] req = ProduceRequest(self.topic, partition, messages) - resp = self.client.send_produce_request([req], acks=self.req_acks, - timeout=self.ack_timeout) + try: + resp = self.client.send_produce_request([req], acks=self.req_acks, + timeout=self.ack_timeout) + except Exception as e: + log.exception("Unable to send messages") + raise e return resp def stop(self, timeout=1): |