diff options
author | Omar <omar.ghishan@rd.io> | 2014-01-13 13:52:02 -0800 |
---|---|---|
committer | Omar <omar.ghishan@rd.io> | 2014-01-13 13:52:02 -0800 |
commit | 87c7f9dedfc008e3fff7a010cc4e708eeec5bebe (patch) | |
tree | c55c3c5fea1fab6eef77f5213909ed2c2f8acc92 /kafka/client.py | |
parent | 354fcdbdd9b34b3454b964e6dc0d4a746744bbcd (diff) | |
parent | a0c7141e2cc7399a9472a8169ea5f730f0407386 (diff) | |
download | kafka-python-87c7f9dedfc008e3fff7a010cc4e708eeec5bebe.tar.gz |
Merge pull request #88 from rdiomar/rdiomar_changes
Various changes/fixes, including:
* Allow customizing socket timeouts
* Read the correct number of bytes from kafka
* Guarantee reading the expected number of bytes from the socket every time
* Remove bufsize from client and conn
* SimpleConsumer flow changes
* Fix some error handling
* Add optional upper limit to consumer fetch buffer size
* Add and fix unit and integration tests
Diffstat (limited to 'kafka/client.py')
-rw-r--r-- | kafka/client.py | 31 |
1 files changed, 21 insertions, 10 deletions
diff --git a/kafka/client.py b/kafka/client.py index 71ededa..33c6d77 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -3,11 +3,12 @@ from collections import defaultdict from functools import partial from itertools import count import logging -import socket import time -from kafka.common import ErrorMapping, TopicAndPartition -from kafka.common import ConnectionError, FailedPayloadsException +from kafka.common import ( + ErrorMapping, TopicAndPartition, ConnectionError, + FailedPayloadsException +) from kafka.conn import KafkaConnection from kafka.protocol import KafkaProtocol @@ -19,12 +20,12 @@ class KafkaClient(object): CLIENT_ID = "kafka-python" ID_GEN = count() - def __init__(self, host, port, bufsize=4096, client_id=CLIENT_ID): + def __init__(self, host, port, client_id=CLIENT_ID, timeout=10): # We need one connection to bootstrap - self.bufsize = bufsize self.client_id = client_id + self.timeout = timeout self.conns = { # (host, port) -> KafkaConnection - (host, port): KafkaConnection(host, port, bufsize) + (host, port): KafkaConnection(host, port, timeout=timeout) } self.brokers = {} # broker_id -> BrokerMetadata self.topics_to_brokers = {} # topic_id -> broker_id @@ -41,7 +42,7 @@ class KafkaClient(object): """ if (broker.host, broker.port) not in self.conns: self.conns[(broker.host, broker.port)] = \ - KafkaConnection(broker.host, broker.port, self.bufsize) + KafkaConnection(broker.host, broker.port, timeout=self.timeout) return self.conns[(broker.host, broker.port)] @@ -165,14 +166,24 @@ class KafkaClient(object): request = encoder_fn(client_id=self.client_id, correlation_id=requestId, payloads=payloads) + failed = False # Send the request, recv the response try: conn.send(requestId, request) if decoder_fn is None: continue - response = conn.recv(requestId) - except ConnectionError, e: # ignore BufferUnderflow for now - log.warning("Could not send request [%s] to server %s: %s" % (request, conn, e)) + try: + response = conn.recv(requestId) + except ConnectionError, e: + log.warning("Could not receive response to request [%s] " + "from server %s: %s", request, conn, e) + failed = True + except ConnectionError, e: + log.warning("Could not send request [%s] to server %s: %s", + request, conn, e) + failed = True + + if failed: failed_payloads += payloads self.topics_to_brokers = {} # reset metadata continue |