diff options
author | Omar Ghishan <omar.ghishan@rd.io> | 2013-12-18 17:51:22 -0800 |
---|---|---|
committer | Omar Ghishan <omar.ghishan@rd.io> | 2014-01-06 15:14:50 -0800 |
commit | 8c8ca5fa573c21e0f03c892154ba42e187153600 (patch) | |
tree | 5939b4442eb8e72c76f3f48bf9f874d8eb2a0aad /kafka | |
parent | 0f2b08d80217fb82860c51e05e819012f6acb521 (diff) | |
download | kafka-python-8c8ca5fa573c21e0f03c892154ba42e187153600.tar.gz |
* Guarantee reading the expected number of bytes from the socket every time
* Remove bufsize from client and conn, since they're not actually enforced
Notes:
This commit changes behavior a bit by raising a BufferUnderflowError when
no data is received for the message size rather than a ConnectionError.
Since bufsize in the socket is not actually enforced, but it is used by the consumer
when creating requests, moving it there until a better solution is implemented.
Diffstat (limited to 'kafka')
-rw-r--r-- | kafka/client.py | 7 | ||||
-rw-r--r-- | kafka/conn.py | 48 | ||||
-rw-r--r-- | kafka/consumer.py | 7 |
3 files changed, 30 insertions, 32 deletions
diff --git a/kafka/client.py b/kafka/client.py index 9659364..bd3a214 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -19,13 +19,12 @@ class KafkaClient(object): CLIENT_ID = "kafka-python" ID_GEN = count() - def __init__(self, host, port, bufsize=4098, client_id=CLIENT_ID, timeout=10): + def __init__(self, host, port, client_id=CLIENT_ID, timeout=10): # We need one connection to bootstrap - self.bufsize = bufsize self.client_id = client_id self.timeout = timeout self.conns = { # (host, port) -> KafkaConnection - (host, port): KafkaConnection(host, port, bufsize, timeout=timeout) + (host, port): KafkaConnection(host, port, timeout=timeout) } self.brokers = {} # broker_id -> BrokerMetadata self.topics_to_brokers = {} # topic_id -> broker_id @@ -42,7 +41,7 @@ class KafkaClient(object): """ if (broker.host, broker.port) not in self.conns: self.conns[(broker.host, broker.port)] = \ - KafkaConnection(broker.host, broker.port, self.bufsize, timeout=self.timeout) + KafkaConnection(broker.host, broker.port, timeout=self.timeout) return self.conns[(broker.host, broker.port)] diff --git a/kafka/conn.py b/kafka/conn.py index 1997804..ca62f52 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -19,11 +19,10 @@ class KafkaConnection(local): we can do something in here to facilitate multiplexed requests/responses since the Kafka API includes a correlation id. """ - def __init__(self, host, port, bufsize=4098, timeout=10): + def __init__(self, host, port, timeout=10): super(KafkaConnection, self).__init__() self.host = host self.port = port - self.bufsize = bufsize self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._sock.connect((host, port)) self._sock.settimeout(timeout) @@ -36,38 +35,35 @@ class KafkaConnection(local): # Private API # ################### - def _consume_response(self): - """ - Fully consume the response iterator - """ - return "".join(self._consume_response_iter()) + def _read_bytes(self, num_bytes): + bytes_left = num_bytes + resp = '' + log.debug("About to read %d bytes from Kafka", num_bytes) + + while bytes_left: + data = self._sock.recv(bytes_left) + if data == '': + raise BufferUnderflowError("Not enough data to read this response") + bytes_left -= len(data) + log.debug("Read %d/%d bytes from Kafka", num_bytes - bytes_left, num_bytes) + resp += data + + return resp - def _consume_response_iter(self): + def _consume_response(self): """ This method handles the response header and error messages. It - then returns an iterator for the chunks of the response + then returns the response """ - log.debug("Handling response from Kafka") - + log.debug("Expecting response from Kafka") # Read the size off of the header - resp = self._sock.recv(4) - if resp == "": - self._raise_connection_error() - (size,) = struct.unpack('>i', resp) + resp = self._read_bytes(4) - log.debug("About to read %d bytes from Kafka", size) + (size,) = struct.unpack('>i', resp) # Read the remainder of the response - total = 0 - while total < size: - resp = self._sock.recv(self.bufsize) - log.debug("Read %d bytes from Kafka", len(resp)) - if resp == "": - raise BufferUnderflowError( - "Not enough data to read this response") - - total += len(resp) - yield resp + resp = self._read_bytes(size) + return str(resp) def _raise_connection_error(self): self._dirty = True diff --git a/kafka/consumer.py b/kafka/consumer.py index 57b5b97..bead1dd 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -22,6 +22,7 @@ AUTO_COMMIT_INTERVAL = 5000 FETCH_DEFAULT_BLOCK_TIMEOUT = 1 FETCH_MAX_WAIT_TIME = 100 FETCH_MIN_BYTES = 4096 +FETCH_BUFFER_SIZE_BYTES = 4096 class FetchContext(object): @@ -216,8 +217,10 @@ class SimpleConsumer(Consumer): def __init__(self, client, group, topic, auto_commit=True, partitions=None, auto_commit_every_n=AUTO_COMMIT_MSG_COUNT, auto_commit_every_t=AUTO_COMMIT_INTERVAL, - fetch_size_bytes=FETCH_MIN_BYTES): + fetch_size_bytes=FETCH_MIN_BYTES, + buffer_size=FETCH_BUFFER_SIZE_BYTES): + self.buffer_size = buffer_size self.partition_info = False # Do not return partition info in msgs self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME self.fetch_min_bytes = fetch_size_bytes @@ -364,7 +367,7 @@ class SimpleConsumer(Consumer): # use MaxBytes = client's bufsize since we're only # fetching one topic + partition req = FetchRequest( - self.topic, partition, offset, self.client.bufsize) + self.topic, partition, offset, self.buffer_size) (resp,) = self.client.send_fetch_request( [req], |