diff options
Diffstat (limited to 'kafka')
-rw-r--r-- | kafka/client.py | 211 | ||||
-rw-r--r-- | kafka/common.py | 35 | ||||
-rw-r--r-- | kafka/conn.py | 93 | ||||
-rw-r--r-- | kafka/consumer.py | 337 | ||||
-rw-r--r-- | kafka/producer.py | 104 | ||||
-rw-r--r-- | kafka/protocol.py | 30 | ||||
-rw-r--r-- | kafka/queue.py | 2 |
7 files changed, 471 insertions, 341 deletions
diff --git a/kafka/client.py b/kafka/client.py index 81eec7d..33c4419 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -1,14 +1,16 @@ import copy +import logging + from collections import defaultdict from functools import partial from itertools import count -import logging -import socket -import time -from kafka.common import ErrorMapping, TopicAndPartition -from kafka.common import ConnectionError, FailedPayloadsException -from kafka.conn import collect_hosts, KafkaConnection +from kafka.common import (ErrorMapping, TopicAndPartition, + ConnectionError, FailedPayloadsError, + BrokerResponseError, PartitionUnavailableError, + KafkaUnavailableError, KafkaRequestError) + +from kafka.conn import KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS from kafka.protocol import KafkaProtocol log = logging.getLogger("kafka") @@ -19,19 +21,21 @@ class KafkaClient(object): CLIENT_ID = "kafka-python" ID_GEN = count() - def __init__(self, hosts, bufsize=4096, client_id=CLIENT_ID): + # NOTE: The timeout given to the client should always be greater than the + # one passed to SimpleConsumer.get_message(), otherwise you can get a + # socket timeout. + def __init__(self, host, port, client_id=CLIENT_ID, + timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS): # We need one connection to bootstrap - self.bufsize = bufsize self.client_id = client_id - - self.hosts = collect_hosts(hosts) - - # create connections only when we need them - self.conns = {} + self.timeout = timeout + self.conns = { # (host, port) -> KafkaConnection + (host, port): KafkaConnection(host, port, timeout=timeout) + } self.brokers = {} # broker_id -> BrokerMetadata self.topics_to_brokers = {} # topic_id -> broker_id - self.topic_partitions = defaultdict(list) # topic_id -> [0, 1, 2, ...] - self._load_metadata_for_topics() + self.topic_partitions = {} # topic_id -> [0, 1, 2, ...] + self.load_metadata_for_topics() # bootstrap with all metadata ################## # Private API # @@ -47,62 +51,25 @@ class KafkaClient(object): return self.conns[host_key] def _get_conn_for_broker(self, broker): - "Get or create a connection to a broker" + """ + Get or create a connection to a broker + """ + if (broker.host, broker.port) not in self.conns: + self.conns[(broker.host, broker.port)] = \ + KafkaConnection(broker.host, broker.port, timeout=self.timeout) return self._get_conn(broker.host, broker.port) def _get_leader_for_partition(self, topic, partition): key = TopicAndPartition(topic, partition) if key not in self.topics_to_brokers: - self._load_metadata_for_topics(topic) + self.load_metadata_for_topics(topic) if key not in self.topics_to_brokers: - raise Exception("Partition does not exist: %s" % str(key)) + raise KafkaRequestError("Partition does not exist: %s" % str(key)) return self.topics_to_brokers[key] - def _load_metadata_for_topics(self, *topics): - """ - Discover brokers and metadata for a set of topics. This method will - recurse in the event of a retry. - """ - request_id = self._next_id() - request = KafkaProtocol.encode_metadata_request(self.client_id, - request_id, topics) - - response = self._send_broker_unaware_request(request_id, request) - if response is None: - raise Exception("All servers failed to process request") - - (brokers, topics) = KafkaProtocol.decode_metadata_response(response) - - log.debug("Broker metadata: %s", brokers) - log.debug("Topic metadata: %s", topics) - - self.brokers = brokers - self.topics_to_brokers = {} - - for topic, partitions in topics.items(): - # Clear the list once before we add it. This removes stale entries - # and avoids duplicates - self.topic_partitions.pop(topic, None) - - if not partitions: - log.info("Partition is unassigned, delay for 1s and retry") - time.sleep(1) - self._load_metadata_for_topics(topic) - break - - for partition, meta in partitions.items(): - if meta.leader == -1: - log.info("Partition is unassigned, delay for 1s and retry") - time.sleep(1) - self._load_metadata_for_topics(topic) - else: - topic_part = TopicAndPartition(topic, partition) - self.topics_to_brokers[topic_part] = brokers[meta.leader] - self.topic_partitions[topic].append(partition) - def _next_id(self): """ Generate a new correlation id @@ -125,7 +92,7 @@ class KafkaClient(object): "trying next server: %s" % (request, conn, e)) continue - return None + raise KafkaUnavailableError("All servers failed to process request") def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn): """ @@ -156,6 +123,8 @@ class KafkaClient(object): for payload in payloads: leader = self._get_leader_for_partition(payload.topic, payload.partition) + if leader == -1: + raise PartitionUnavailableError("Leader is unassigned for %s-%s" % payload.topic, payload.partition) payloads_by_broker[leader].append(payload) original_keys.append((payload.topic, payload.partition)) @@ -172,30 +141,73 @@ class KafkaClient(object): request = encoder_fn(client_id=self.client_id, correlation_id=requestId, payloads=payloads) + failed = False # Send the request, recv the response try: conn.send(requestId, request) if decoder_fn is None: continue - response = conn.recv(requestId) - except ConnectionError, e: # ignore BufferUnderflow for now - log.warning("Could not send request [%s] to server %s: %s" % (request, conn, e)) + try: + response = conn.recv(requestId) + except ConnectionError, e: + log.warning("Could not receive response to request [%s] " + "from server %s: %s", request, conn, e) + failed = True + except ConnectionError, e: + log.warning("Could not send request [%s] to server %s: %s", + request, conn, e) + failed = True + + if failed: failed_payloads += payloads - self.topics_to_brokers = {} # reset metadata + self.reset_all_metadata() continue for response in decoder_fn(response): acc[(response.topic, response.partition)] = response if failed_payloads: - raise FailedPayloadsException(failed_payloads) + raise FailedPayloadsError(failed_payloads) # Order the accumulated responses by the original key order return (acc[k] for k in original_keys) if acc else () + def __repr__(self): + return '<KafkaClient client_id=%s>' % (self.client_id) + + def _raise_on_response_error(self, resp): + if resp.error == ErrorMapping.NO_ERROR: + return + + if resp.error in (ErrorMapping.UNKNOWN_TOPIC_OR_PARTITON, + ErrorMapping.NOT_LEADER_FOR_PARTITION): + self.reset_topic_metadata(resp.topic) + + raise BrokerResponseError( + "Request for %s failed with errorcode=%d" % + (TopicAndPartition(resp.topic, resp.partition), resp.error)) + ################# # Public API # ################# + def reset_topic_metadata(self, *topics): + for topic in topics: + try: + partitions = self.topic_partitions[topic] + except KeyError: + continue + + for partition in partitions: + self.topics_to_brokers.pop(TopicAndPartition(topic, partition), None) + + del self.topic_partitions[topic] + + def reset_all_metadata(self): + self.topics_to_brokers.clear() + self.topic_partitions.clear() + + def has_metadata_for_topic(self, topic): + return topic in self.topic_partitions def close(self): for conn in self.conns.values(): @@ -215,6 +227,36 @@ class KafkaClient(object): for conn in self.conns.values(): conn.reinit() + def load_metadata_for_topics(self, *topics): + """ + Discover brokers and metadata for a set of topics. This function is called + lazily whenever metadata is unavailable. + """ + request_id = self._next_id() + request = KafkaProtocol.encode_metadata_request(self.client_id, + request_id, topics) + + response = self._send_broker_unaware_request(request_id, request) + + (brokers, topics) = KafkaProtocol.decode_metadata_response(response) + + log.debug("Broker metadata: %s", brokers) + log.debug("Topic metadata: %s", topics) + + self.brokers = brokers + + for topic, partitions in topics.items(): + self.reset_topic_metadata(topic) + + if not partitions: + continue + + self.topic_partitions[topic] = [] + for partition, meta in partitions.items(): + topic_part = TopicAndPartition(topic, partition) + self.topics_to_brokers[topic_part] = brokers[meta.leader] + self.topic_partitions[topic].append(partition) + def send_produce_request(self, payloads=[], acks=1, timeout=1000, fail_on_error=True, callback=None): """ @@ -252,14 +294,9 @@ class KafkaClient(object): out = [] for resp in resps: - # Check for errors - if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR: - raise Exception( - "ProduceRequest for %s failed with errorcode=%d" % - (TopicAndPartition(resp.topic, resp.partition), - resp.error)) - - # Run the callback + if fail_on_error is True: + self._raise_on_response_error(resp) + if callback is not None: out.append(callback(resp)) else: @@ -285,14 +322,9 @@ class KafkaClient(object): out = [] for resp in resps: - # Check for errors - if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR: - raise Exception( - "FetchRequest for %s failed with errorcode=%d" % - (TopicAndPartition(resp.topic, resp.partition), - resp.error)) - - # Run the callback + if fail_on_error is True: + self._raise_on_response_error(resp) + if callback is not None: out.append(callback(resp)) else: @@ -308,9 +340,8 @@ class KafkaClient(object): out = [] for resp in resps: - if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR: - raise Exception("OffsetRequest failed with errorcode=%s", - resp.error) + if fail_on_error is True: + self._raise_on_response_error(resp) if callback is not None: out.append(callback(resp)) else: @@ -326,9 +357,8 @@ class KafkaClient(object): out = [] for resp in resps: - if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR: - raise Exception("OffsetCommitRequest failed with " - "errorcode=%s", resp.error) + if fail_on_error is True: + self._raise_on_response_error(resp) if callback is not None: out.append(callback(resp)) @@ -346,9 +376,8 @@ class KafkaClient(object): out = [] for resp in resps: - if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR: - raise Exception("OffsetCommitRequest failed with errorcode=%s", - resp.error) + if fail_on_error is True: + self._raise_on_response_error(resp) if callback is not None: out.append(callback(resp)) else: diff --git a/kafka/common.py b/kafka/common.py index 6f0dd32..c0a1a6a 100644 --- a/kafka/common.py +++ b/kafka/common.py @@ -69,23 +69,46 @@ class ErrorMapping(object): # Exceptions # ################# -class FailedPayloadsException(Exception): + +class KafkaError(RuntimeError): + pass + + +class KafkaRequestError(KafkaError): + pass + + +class KafkaUnavailableError(KafkaError): + pass + + +class BrokerResponseError(KafkaError): pass -class ConnectionError(Exception): + +class PartitionUnavailableError(KafkaError): + pass + + +class FailedPayloadsError(KafkaError): pass -class BufferUnderflowError(Exception): + +class ConnectionError(KafkaError): + pass + + +class BufferUnderflowError(KafkaError): pass -class ChecksumError(Exception): +class ChecksumError(KafkaError): pass -class ConsumerFetchSizeTooSmall(Exception): +class ConsumerFetchSizeTooSmall(KafkaError): pass -class ConsumerNoMoreData(Exception): +class ConsumerNoMoreData(KafkaError): pass diff --git a/kafka/conn.py b/kafka/conn.py index 614b1bb..de2d385 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -5,11 +5,11 @@ import struct from random import shuffle from threading import local -from kafka.common import BufferUnderflowError from kafka.common import ConnectionError log = logging.getLogger("kafka") +DEFAULT_SOCKET_TIMEOUT_SECONDS = 120 def collect_hosts(hosts, randomize=True): """ @@ -39,64 +39,53 @@ class KafkaConnection(local): by a call to `recv` in order to get the correct response. Eventually, we can do something in here to facilitate multiplexed requests/responses since the Kafka API includes a correlation id. + + host: the host name or IP address of a kafka broker + port: the port number the kafka broker is listening on + timeout: default 120. The socket timeout for sending and receiving data + in seconds. None means no timeout, so a request can block forever. """ - def __init__(self, host, port, bufsize=4096, timeout=10): + def __init__(self, host, port, timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS): super(KafkaConnection, self).__init__() self.host = host self.port = port - self.bufsize = bufsize + self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self._sock.connect((host, port)) self.timeout = timeout - - self._sock = socket.create_connection((host, port), timeout=timeout) + self._sock.settimeout(self.timeout) self._dirty = False - def __str__(self): + def __repr__(self): return "<KafkaConnection host=%s port=%d>" % (self.host, self.port) ################### # Private API # ################### - def _consume_response(self): - """ - Fully consumer the response iterator - """ - data = "" - for chunk in self._consume_response_iter(): - data += chunk - return data - - def _consume_response_iter(self): - """ - This method handles the response header and error messages. It - then returns an iterator for the chunks of the response - """ - log.debug("Handling response from Kafka") - - # Read the size off of the header - resp = self._sock.recv(4) - if resp == "": - self._raise_connection_error() - (size,) = struct.unpack('>i', resp) - - messagesize = size - 4 - log.debug("About to read %d bytes from Kafka", messagesize) - - # Read the remainder of the response - total = 0 - while total < messagesize: - resp = self._sock.recv(self.bufsize) - log.debug("Read %d bytes from Kafka", len(resp)) - if resp == "": - raise BufferUnderflowError( - "Not enough data to read this response") - - total += len(resp) - yield resp - def _raise_connection_error(self): self._dirty = True - raise ConnectionError("Kafka @ {}:{} went away".format(self.host, self.port)) + raise ConnectionError("Kafka @ {0}:{1} went away".format(self.host, self.port)) + + def _read_bytes(self, num_bytes): + bytes_left = num_bytes + resp = '' + log.debug("About to read %d bytes from Kafka", num_bytes) + if self._dirty: + self.reinit() + while bytes_left: + try: + data = self._sock.recv(bytes_left) + except socket.error: + log.exception('Unable to receive data from Kafka') + self._raise_connection_error() + if data == '': + log.error("Not enough data to read this response") + self._raise_connection_error() + bytes_left -= len(data) + log.debug("Read %d/%d bytes from Kafka", num_bytes - bytes_left, num_bytes) + resp += data + + return resp ################## # Public API # @@ -113,7 +102,7 @@ class KafkaConnection(local): sent = self._sock.sendall(payload) if sent is not None: self._raise_connection_error() - except socket.error: + except socket.error, e: log.exception('Unable to send payload to Kafka') self._raise_connection_error() @@ -122,8 +111,14 @@ class KafkaConnection(local): Get a response from Kafka """ log.debug("Reading response %d from Kafka" % request_id) - self.data = self._consume_response() - return self.data + # Read the size off of the header + resp = self._read_bytes(4) + + (size,) = struct.unpack('>i', resp) + + # Read the remainder of the response + resp = self._read_bytes(size) + return str(resp) def copy(self): """ @@ -146,5 +141,7 @@ class KafkaConnection(local): Re-initialize the socket connection """ self.close() - self._sock = socket.create_connection((self.host, self.port), timeout=self.timeout) + self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self._sock.connect((self.host, self.port)) + self._sock.settimeout(self.timeout) self._dirty = False diff --git a/kafka/consumer.py b/kafka/consumer.py index f2898ad..28b53ec 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -1,10 +1,11 @@ -from collections import defaultdict +from __future__ import absolute_import + from itertools import izip_longest, repeat import logging import time from threading import Lock -from multiprocessing import Process, Queue, Event, Value -from Queue import Empty +from multiprocessing import Process, Queue as MPQueue, Event, Value +from Queue import Empty, Queue from kafka.common import ( ErrorMapping, FetchRequest, @@ -22,6 +23,11 @@ AUTO_COMMIT_INTERVAL = 5000 FETCH_DEFAULT_BLOCK_TIMEOUT = 1 FETCH_MAX_WAIT_TIME = 100 FETCH_MIN_BYTES = 4096 +FETCH_BUFFER_SIZE_BYTES = 4096 +MAX_FETCH_BUFFER_SIZE_BYTES = FETCH_BUFFER_SIZE_BYTES * 8 + +ITER_TIMEOUT_SECONDS = 60 +NO_MESSAGES_WAIT_TIME_SECONDS = 0.1 class FetchContext(object): @@ -32,13 +38,15 @@ class FetchContext(object): self.consumer = consumer self.block = block - if block and not timeout: - timeout = FETCH_DEFAULT_BLOCK_TIMEOUT - - self.timeout = timeout * 1000 + if block: + if not timeout: + timeout = FETCH_DEFAULT_BLOCK_TIMEOUT + self.timeout = timeout * 1000 def __enter__(self): """Set fetch values based on blocking status""" + self.orig_fetch_max_wait_time = self.consumer.fetch_max_wait_time + self.orig_fetch_min_bytes = self.consumer.fetch_min_bytes if self.block: self.consumer.fetch_max_wait_time = self.timeout self.consumer.fetch_min_bytes = 1 @@ -46,9 +54,9 @@ class FetchContext(object): self.consumer.fetch_min_bytes = 0 def __exit__(self, type, value, traceback): - """Reset values to default""" - self.consumer.fetch_max_wait_time = FETCH_MAX_WAIT_TIME - self.consumer.fetch_min_bytes = FETCH_MIN_BYTES + """Reset values""" + self.consumer.fetch_max_wait_time = self.orig_fetch_max_wait_time + self.consumer.fetch_min_bytes = self.orig_fetch_min_bytes class Consumer(object): @@ -67,7 +75,7 @@ class Consumer(object): self.client = client self.topic = topic self.group = group - self.client._load_metadata_for_topics(topic) + self.client.load_metadata_for_topics(topic) self.offsets = {} if not partitions: @@ -204,8 +212,14 @@ class SimpleConsumer(Consumer): before a commit auto_commit_every_t: default 5000. How much time (in milliseconds) to wait before commit - fetch_size_bytes: number of bytes to request in a FetchRequest + buffer_size: default 4K. Initial number of bytes to tell kafka we + have available. This will double as needed. + max_buffer_size: default 16K. Max number of bytes to tell kafka we have + available. None means no limit. + iter_timeout: default None. How much time (in seconds) to wait for a + message in the iterator before exiting. None means no + timeout, so it will wait forever. Auto commit details: If both auto_commit_every_n and auto_commit_every_t are set, they will @@ -216,13 +230,10 @@ class SimpleConsumer(Consumer): def __init__(self, client, group, topic, auto_commit=True, partitions=None, auto_commit_every_n=AUTO_COMMIT_MSG_COUNT, auto_commit_every_t=AUTO_COMMIT_INTERVAL, - fetch_size_bytes=FETCH_MIN_BYTES): - - self.partition_info = False # Do not return partition info in msgs - self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME - self.fetch_min_bytes = fetch_size_bytes - self.fetch_started = defaultdict(bool) # defaults to false - + fetch_size_bytes=FETCH_MIN_BYTES, + buffer_size=FETCH_BUFFER_SIZE_BYTES, + max_buffer_size=MAX_FETCH_BUFFER_SIZE_BYTES, + iter_timeout=None): super(SimpleConsumer, self).__init__( client, group, topic, partitions=partitions, @@ -230,6 +241,23 @@ class SimpleConsumer(Consumer): auto_commit_every_n=auto_commit_every_n, auto_commit_every_t=auto_commit_every_t) + if max_buffer_size is not None and buffer_size > max_buffer_size: + raise ValueError("buffer_size (%d) is greater than " + "max_buffer_size (%d)" % + (buffer_size, max_buffer_size)) + self.buffer_size = buffer_size + self.max_buffer_size = max_buffer_size + self.partition_info = False # Do not return partition info in msgs + self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME + self.fetch_min_bytes = fetch_size_bytes + self.fetch_offsets = self.offsets.copy() + self.iter_timeout = iter_timeout + self.queue = Queue() + + def __repr__(self): + return '<SimpleConsumer group=%s, topic=%s, partitions=%s>' % \ + (self.group, self.topic, str(self.offsets.keys())) + def provide_partition_info(self): """ Indicates that partition info must be returned by the consumer @@ -265,12 +293,6 @@ class SimpleConsumer(Consumer): reqs.append(OffsetRequest(self.topic, partition, -2, 1)) elif whence == 2: reqs.append(OffsetRequest(self.topic, partition, -1, 1)) - - # The API returns back the next available offset - # For eg: if the current offset is 18, the API will return - # back 19. So, if we have to seek 5 points before, we will - # end up going back to 14, instead of 13. Adjust this - deltas[partition] -= 1 else: pass @@ -281,128 +303,148 @@ class SimpleConsumer(Consumer): else: raise ValueError("Unexpected value for `whence`, %d" % whence) + # Reset queue and fetch offsets since they are invalid + self.fetch_offsets = self.offsets.copy() + self.queue = Queue() + def get_messages(self, count=1, block=True, timeout=0.1): """ Fetch the specified number of messages count: Indicates the maximum number of messages to be fetched block: If True, the API will block till some messages are fetched. - timeout: If None, and block=True, the API will block infinitely. - If >0, API will block for specified time (in seconds) + timeout: If block is True, the function will block for the specified + time (in seconds) until count messages is fetched. If None, + it will block forever. """ messages = [] - iterator = self.__iter__() - - # HACK: This splits the timeout between available partitions - timeout = timeout * 1.0 / len(self.offsets) - - with FetchContext(self, block, timeout): - while count > 0: - try: - messages.append(next(iterator)) - except StopIteration: - break + if timeout is not None: + max_time = time.time() + timeout + + new_offsets = {} + while count > 0 and (timeout is None or timeout > 0): + result = self._get_message(block, timeout, get_partition_info=True, + update_offset=False) + if result: + partition, message = result + if self.partition_info: + messages.append(result) + else: + messages.append(message) + new_offsets[partition] = message.offset + 1 count -= 1 - + else: + # Ran out of messages for the last request. + if not block: + # If we're not blocking, break. + break + if timeout is not None: + # If we're blocking and have a timeout, reduce it to the + # appropriate value + timeout = max_time - time.time() + + # Update and commit offsets if necessary + self.offsets.update(new_offsets) + self.count_since_commit += len(messages) + self._auto_commit() return messages - def __iter__(self): + def get_message(self, block=True, timeout=0.1, get_partition_info=None): + return self._get_message(block, timeout, get_partition_info) + + def _get_message(self, block=True, timeout=0.1, get_partition_info=None, + update_offset=True): """ - Create an iterate per partition. Iterate through them calling next() - until they are all exhausted. + If no messages can be fetched, returns None. + If get_partition_info is None, it defaults to self.partition_info + If get_partition_info is True, returns (partition, message) + If get_partition_info is False, returns message """ - iters = {} - for partition, offset in self.offsets.items(): - iters[partition] = self.__iter_partition__(partition, offset) - - if len(iters) == 0: - return - - while True: - if len(iters) == 0: - break - - for partition, it in iters.items(): - try: - if self.partition_info: - yield (partition, it.next()) - else: - yield it.next() - except StopIteration: - log.debug("Done iterating over partition %s" % partition) - del iters[partition] + if self.queue.empty(): + # We're out of messages, go grab some more. + with FetchContext(self, block, timeout): + self._fetch() + try: + partition, message = self.queue.get_nowait() - # skip auto-commit since we didn't yield anything - continue + if update_offset: + # Update partition offset + self.offsets[partition] = message.offset + 1 # Count, check and commit messages if necessary self.count_since_commit += 1 self._auto_commit() - def __iter_partition__(self, partition, offset): - """ - Iterate over the messages in a partition. Create a FetchRequest - to get back a batch of messages, yield them one at a time. - After a batch is exhausted, start a new batch unless we've reached - the end of this partition. - """ - - # The offset that is stored in the consumer is the offset that - # we have consumed. In subsequent iterations, we are supposed to - # fetch the next message (that is from the next offset) - # However, for the 0th message, the offset should be as-is. - # An OffsetFetchRequest to Kafka gives 0 for a new queue. This is - # problematic, since 0 is offset of a message which we have not yet - # consumed. - if self.fetch_started[partition]: - offset += 1 + if get_partition_info is None: + get_partition_info = self.partition_info + if get_partition_info: + return partition, message + else: + return message + except Empty: + return None - fetch_size = self.fetch_min_bytes + def __iter__(self): + if self.iter_timeout is None: + timeout = ITER_TIMEOUT_SECONDS + else: + timeout = self.iter_timeout while True: - # use MaxBytes = client's bufsize since we're only - # fetching one topic + partition - req = FetchRequest( - self.topic, partition, offset, self.client.bufsize) - - (resp,) = self.client.send_fetch_request( - [req], - max_wait_time=self.fetch_max_wait_time, - min_bytes=fetch_size) - - assert resp.topic == self.topic - assert resp.partition == partition - - next_offset = None - try: - for message in resp.messages: - next_offset = message.offset - - # update the offset before the message is yielded. This - # is so that the consumer state is not lost in certain - # cases. - # - # For eg: the message is yielded and consumed by the - # caller, but the caller does not come back into the - # generator again. The message will be consumed but the - # status will not be updated in the consumer - self.fetch_started[partition] = True - self.offsets[partition] = message.offset - yield message - except ConsumerFetchSizeTooSmall, e: - fetch_size *= 1.5 - log.warn( - "Fetch size too small, increasing to %d (1.5x) and retry", - fetch_size) - continue - except ConsumerNoMoreData, e: - log.debug("Iteration was ended by %r", e) - - if next_offset is None: - break + message = self.get_message(True, timeout) + if message: + yield message + elif self.iter_timeout is None: + # We did not receive any message yet but we don't have a + # timeout, so give up the CPU for a while before trying again + time.sleep(NO_MESSAGES_WAIT_TIME_SECONDS) else: - offset = next_offset + 1 + # Timed out waiting for a message + break + def _fetch(self): + # Create fetch request payloads for all the partitions + requests = [] + partitions = self.fetch_offsets.keys() + while partitions: + for partition in partitions: + requests.append(FetchRequest(self.topic, partition, + self.fetch_offsets[partition], + self.buffer_size)) + # Send request + responses = self.client.send_fetch_request( + requests, + max_wait_time=int(self.fetch_max_wait_time), + min_bytes=self.fetch_min_bytes) + + retry_partitions = set() + for resp in responses: + partition = resp.partition + try: + for message in resp.messages: + # Put the message in our queue + self.queue.put((partition, message)) + self.fetch_offsets[partition] = message.offset + 1 + except ConsumerFetchSizeTooSmall, e: + if (self.max_buffer_size is not None and + self.buffer_size == self.max_buffer_size): + log.error("Max fetch size %d too small", + self.max_buffer_size) + raise e + if self.max_buffer_size is None: + self.buffer_size *= 2 + else: + self.buffer_size = max(self.buffer_size * 2, + self.max_buffer_size) + log.warn("Fetch size too small, increase to %d (2x) " + "and retry", self.buffer_size) + retry_partitions.add(partition) + except ConsumerNoMoreData, e: + log.debug("Iteration was ended by %r", e) + except StopIteration: + # Stop iterating through this partition + log.debug("Done iterating over partition %s" % partition) + partitions = retry_partitions def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size): """ @@ -440,8 +482,9 @@ def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size): # indicates a specific number of messages, follow that advice count = 0 - for partition, message in consumer: - queue.put((partition, message)) + message = consumer.get_message() + if message: + queue.put(message) count += 1 # We have reached the required size. The controller might have @@ -451,12 +494,11 @@ def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size): # can reset the 'start' event if count == size.value: pause.wait() - break - # In case we did not receive any message, give up the CPU for - # a while before we try again - if count == 0: - time.sleep(0.1) + else: + # In case we did not receive any message, give up the CPU for + # a while before we try again + time.sleep(NO_MESSAGES_WAIT_TIME_SECONDS) consumer.stop() @@ -501,7 +543,7 @@ class MultiProcessConsumer(Consumer): # Variables for managing and controlling the data flow from # consumer child process to master - self.queue = Queue(1024) # Child consumers dump messages into this + self.queue = MPQueue(1024) # Child consumers dump messages into this self.start = Event() # Indicates the consumers to start fetch self.exit = Event() # Requests the consumers to shutdown self.pause = Event() # Requests the consumers to pause fetch @@ -535,6 +577,10 @@ class MultiProcessConsumer(Consumer): proc.start() self.procs.append(proc) + def __repr__(self): + return '<MultiProcessConsumer group=%s, topic=%s, consumers=%d>' % \ + (self.group, self.topic, len(self.procs)) + def stop(self): # Set exit and start off all waiting consumers self.exit.set() @@ -568,12 +614,11 @@ class MultiProcessConsumer(Consumer): break # Count, check and commit messages if necessary - self.offsets[partition] = message.offset + self.offsets[partition] = message.offset + 1 self.start.clear() - yield message - self.count_since_commit += 1 self._auto_commit() + yield message self.start.clear() @@ -583,8 +628,9 @@ class MultiProcessConsumer(Consumer): count: Indicates the maximum number of messages to be fetched block: If True, the API will block till some messages are fetched. - timeout: If None, and block=True, the API will block infinitely. - If >0, API will block for specified time (in seconds) + timeout: If block is True, the function will block for the specified + time (in seconds) until count messages is fetched. If None, + it will block forever. """ messages = [] @@ -595,7 +641,11 @@ class MultiProcessConsumer(Consumer): self.size.value = count self.pause.clear() - while count > 0: + if timeout is not None: + max_time = time.time() + timeout + + new_offsets = {} + while count > 0 and (timeout is None or timeout > 0): # Trigger consumption only if the queue is empty # By doing this, we will ensure that consumers do not # go into overdrive and keep consuming thousands of @@ -609,15 +659,18 @@ class MultiProcessConsumer(Consumer): break messages.append(message) - - # Count, check and commit messages if necessary - self.offsets[partition] = message.offset - self.count_since_commit += 1 - self._auto_commit() + new_offsets[partition] = message.offset + 1 count -= 1 + if timeout is not None: + timeout = max_time - time.time() self.size.value = 0 self.start.clear() self.pause.set() + # Update and commit offsets if necessary + self.offsets.update(new_offsets) + self.count_since_commit += len(messages) + self._auto_commit() + return messages diff --git a/kafka/producer.py b/kafka/producer.py index 7ef7896..12a2934 100644 --- a/kafka/producer.py +++ b/kafka/producer.py @@ -1,15 +1,16 @@ +from __future__ import absolute_import + +import logging +import time + +from Queue import Empty from collections import defaultdict -from datetime import datetime, timedelta from itertools import cycle from multiprocessing import Queue, Process -from Queue import Empty -import logging -import sys -from kafka.common import ProduceRequest -from kafka.common import FailedPayloadsException -from kafka.protocol import create_message +from kafka.common import ProduceRequest, TopicAndPartition from kafka.partitioner import HashedPartitioner +from kafka.protocol import create_message log = logging.getLogger("kafka") @@ -19,7 +20,7 @@ BATCH_SEND_MSG_COUNT = 20 STOP_ASYNC_PRODUCER = -1 -def _send_upstream(topic, queue, client, batch_time, batch_size, +def _send_upstream(queue, client, batch_time, batch_size, req_acks, ack_timeout): """ Listen on the queue for a specified number of messages or till @@ -36,38 +37,41 @@ def _send_upstream(topic, queue, client, batch_time, batch_size, while not stop: timeout = batch_time count = batch_size - send_at = datetime.now() + timedelta(seconds=timeout) + send_at = time.time() + timeout msgset = defaultdict(list) # Keep fetching till we gather enough messages or a # timeout is reached while count > 0 and timeout >= 0: try: - partition, msg = queue.get(timeout=timeout) + topic_partition, msg = queue.get(timeout=timeout) + except Empty: break # Check if the controller has requested us to stop - if partition == STOP_ASYNC_PRODUCER: + if topic_partition == STOP_ASYNC_PRODUCER: stop = True break # Adjust the timeout to match the remaining period count -= 1 - timeout = (send_at - datetime.now()).total_seconds() - msgset[partition].append(msg) + timeout = send_at - time.time() + msgset[topic_partition].append(msg) # Send collected requests upstream reqs = [] - for partition, messages in msgset.items(): - req = ProduceRequest(topic, partition, messages) + for topic_partition, messages in msgset.items(): + req = ProduceRequest(topic_partition.topic, + topic_partition.partition, + messages) reqs.append(req) try: client.send_produce_request(reqs, acks=req_acks, timeout=ack_timeout) - except Exception as exp: + except Exception: log.exception("Unable to send message") @@ -77,7 +81,6 @@ class Producer(object): Params: client - The Kafka client instance to use - topic - The topic for sending messages to async - If set to true, the messages are sent asynchronously via another thread (process). We will not wait for a response to these req_acks - A value indicating the acknowledgements that the server must @@ -118,8 +121,7 @@ class Producer(object): if self.async: self.queue = Queue() # Messages are sent through this queue self.proc = Process(target=_send_upstream, - args=(self.topic, - self.queue, + args=(self.queue, self.client.copy(), batch_send_every_t, batch_send_every_n, @@ -130,23 +132,24 @@ class Producer(object): self.proc.daemon = True self.proc.start() - def send_messages(self, partition, *msg): + def send_messages(self, topic, partition, *msg): """ Helper method to send produce requests """ if self.async: for m in msg: - self.queue.put((partition, create_message(m))) + self.queue.put((TopicAndPartition(topic, partition), + create_message(m))) resp = [] else: messages = [create_message(m) for m in msg] - req = ProduceRequest(self.topic, partition, messages) + req = ProduceRequest(topic, partition, messages) try: resp = self.client.send_produce_request([req], acks=self.req_acks, timeout=self.ack_timeout) - except Exception as e: + except Exception: log.exception("Unable to send messages") - raise e + raise return resp def stop(self, timeout=1): @@ -168,7 +171,6 @@ class SimpleProducer(Producer): Params: client - The Kafka client instance to use - topic - The topic for sending messages to async - If True, the messages are sent asynchronously via another thread (process). We will not wait for a response to these req_acks - A value indicating the acknowledgements that the server must @@ -179,24 +181,31 @@ class SimpleProducer(Producer): batch_send_every_n - If set, messages are send in batches of this size batch_send_every_t - If set, messages are send after this timeout """ - def __init__(self, client, topic, async=False, + def __init__(self, client, async=False, req_acks=Producer.ACK_AFTER_LOCAL_WRITE, ack_timeout=Producer.DEFAULT_ACK_TIMEOUT, batch_send=False, batch_send_every_n=BATCH_SEND_MSG_COUNT, batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL): - self.topic = topic - client._load_metadata_for_topics(topic) - self.next_partition = cycle(client.topic_partitions[topic]) - + self.partition_cycles = {} super(SimpleProducer, self).__init__(client, async, req_acks, ack_timeout, batch_send, batch_send_every_n, batch_send_every_t) - def send_messages(self, *msg): - partition = self.next_partition.next() - return super(SimpleProducer, self).send_messages(partition, *msg) + def _next_partition(self, topic): + if topic not in self.partition_cycles: + if topic not in self.client.topic_partitions: + self.client.load_metadata_for_topics(topic) + self.partition_cycles[topic] = cycle(self.client.topic_partitions[topic]) + return self.partition_cycles[topic].next() + + def send_messages(self, topic, *msg): + partition = self._next_partition(topic) + return super(SimpleProducer, self).send_messages(topic, partition, *msg) + + def __repr__(self): + return '<SimpleProducer batch=%s>' % self.async class KeyedProducer(Producer): @@ -205,7 +214,6 @@ class KeyedProducer(Producer): Args: client - The kafka client instance - topic - The kafka topic to send messages to partitioner - A partitioner class that will be used to get the partition to send the message to. Must be derived from Partitioner async - If True, the messages are sent asynchronously via another @@ -216,26 +224,34 @@ class KeyedProducer(Producer): batch_send_every_n - If set, messages are send in batches of this size batch_send_every_t - If set, messages are send after this timeout """ - def __init__(self, client, topic, partitioner=None, async=False, + def __init__(self, client, partitioner=None, async=False, req_acks=Producer.ACK_AFTER_LOCAL_WRITE, ack_timeout=Producer.DEFAULT_ACK_TIMEOUT, batch_send=False, batch_send_every_n=BATCH_SEND_MSG_COUNT, batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL): - self.topic = topic - client._load_metadata_for_topics(topic) - if not partitioner: partitioner = HashedPartitioner - - self.partitioner = partitioner(client.topic_partitions[topic]) + self.partitioner_class = partitioner + self.partitioners = {} super(KeyedProducer, self).__init__(client, async, req_acks, ack_timeout, batch_send, batch_send_every_n, batch_send_every_t) - def send(self, key, msg): - partitions = self.client.topic_partitions[self.topic] - partition = self.partitioner.partition(key, partitions) - return self.send_messages(partition, msg) + def _next_partition(self, topic, key): + if topic not in self.partitioners: + if topic not in self.client.topic_partitions: + self.client.load_metadata_for_topics(topic) + self.partitioners[topic] = \ + self.partitioner_class(self.client.topic_partitions[topic]) + partitioner = self.partitioners[topic] + return partitioner.partition(key, self.client.topic_partitions[topic]) + + def send(self, topic, key, msg): + partition = self._next_partition(topic, key) + return self.send_messages(topic, partition, msg) + + def __repr__(self): + return '<KeyedProducer batch=%s>' % self.async diff --git a/kafka/protocol.py b/kafka/protocol.py index 612acf6..25be023 100644 --- a/kafka/protocol.py +++ b/kafka/protocol.py @@ -29,8 +29,8 @@ class KafkaProtocol(object): FETCH_KEY = 1 OFFSET_KEY = 2 METADATA_KEY = 3 - OFFSET_COMMIT_KEY = 6 - OFFSET_FETCH_KEY = 7 + OFFSET_COMMIT_KEY = 8 + OFFSET_FETCH_KEY = 9 ATTRIBUTE_CODEC_MASK = 0x03 CODEC_NONE = 0x00 @@ -119,9 +119,17 @@ class KafkaProtocol(object): read_message = True yield OffsetAndMessage(offset, message) except BufferUnderflowError: + # NOTE: Not sure this is correct error handling: + # Is it possible to get a BUE if the message set is somewhere + # in the middle of the fetch response? If so, we probably have + # an issue that's not fetch size too small. + # Aren't we ignoring errors if we fail to unpack data by + # raising StopIteration()? + # If _decode_message() raises a ChecksumError, couldn't that + # also be due to the fetch size being too small? if read_message is False: # If we get a partial read of a message, but haven't - # yielded anyhting there's a problem + # yielded anything there's a problem raise ConsumerFetchSizeTooSmall() else: raise StopIteration() @@ -171,7 +179,7 @@ class KafkaProtocol(object): Params ====== client_id: string - correlation_id: string + correlation_id: int payloads: list of ProduceRequest acks: How "acky" you want the request to be 0: immediate response @@ -231,7 +239,7 @@ class KafkaProtocol(object): Params ====== client_id: string - correlation_id: string + correlation_id: int payloads: list of FetchRequest max_wait_time: int, how long to block waiting on min_bytes of data min_bytes: int, the minimum number of bytes to accumulate before @@ -338,7 +346,7 @@ class KafkaProtocol(object): Params ====== client_id: string - correlation_id: string + correlation_id: int topics: list of strings """ topics = [] if topics is None else topics @@ -376,12 +384,16 @@ class KafkaProtocol(object): topic_metadata = {} for i in range(num_topics): + # NOTE: topic_error is discarded. Should probably be returned with + # the topic metadata. ((topic_error,), cur) = relative_unpack('>h', data, cur) (topic_name, cur) = read_short_string(data, cur) ((num_partitions,), cur) = relative_unpack('>i', data, cur) partition_metadata = {} for j in range(num_partitions): + # NOTE: partition_error_code is discarded. Should probably be + # returned with the partition metadata. ((partition_error_code, partition, leader, numReplicas), cur) = \ relative_unpack('>hiii', data, cur) @@ -408,7 +420,7 @@ class KafkaProtocol(object): Params ====== client_id: string - correlation_id: string + correlation_id: int group: string, the consumer group you are committing offsets for payloads: list of OffsetCommitRequest """ @@ -439,7 +451,6 @@ class KafkaProtocol(object): data: bytes to decode """ ((correlation_id,), cur) = relative_unpack('>i', data, 0) - (client_id, cur) = read_short_string(data, cur) ((num_topics,), cur) = relative_unpack('>i', data, cur) for i in xrange(num_topics): @@ -459,7 +470,7 @@ class KafkaProtocol(object): Params ====== client_id: string - correlation_id: string + correlation_id: int group: string, the consumer group you are fetching offsets for payloads: list of OffsetFetchRequest """ @@ -490,7 +501,6 @@ class KafkaProtocol(object): """ ((correlation_id,), cur) = relative_unpack('>i', data, 0) - (client_id, cur) = read_short_string(data, cur) ((num_topics,), cur) = relative_unpack('>i', data, cur) for i in range(num_topics): diff --git a/kafka/queue.py b/kafka/queue.py index a996369..ada495f 100644 --- a/kafka/queue.py +++ b/kafka/queue.py @@ -1,3 +1,5 @@ +from __future__ import absolute_import + from copy import copy import logging from multiprocessing import Process, Queue, Event |