summaryrefslogtreecommitdiff
path: root/kafka
diff options
context:
space:
mode:
Diffstat (limited to 'kafka')
-rw-r--r--kafka/client.py211
-rw-r--r--kafka/common.py35
-rw-r--r--kafka/conn.py93
-rw-r--r--kafka/consumer.py337
-rw-r--r--kafka/producer.py104
-rw-r--r--kafka/protocol.py30
-rw-r--r--kafka/queue.py2
7 files changed, 471 insertions, 341 deletions
diff --git a/kafka/client.py b/kafka/client.py
index 81eec7d..33c4419 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -1,14 +1,16 @@
import copy
+import logging
+
from collections import defaultdict
from functools import partial
from itertools import count
-import logging
-import socket
-import time
-from kafka.common import ErrorMapping, TopicAndPartition
-from kafka.common import ConnectionError, FailedPayloadsException
-from kafka.conn import collect_hosts, KafkaConnection
+from kafka.common import (ErrorMapping, TopicAndPartition,
+ ConnectionError, FailedPayloadsError,
+ BrokerResponseError, PartitionUnavailableError,
+ KafkaUnavailableError, KafkaRequestError)
+
+from kafka.conn import KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS
from kafka.protocol import KafkaProtocol
log = logging.getLogger("kafka")
@@ -19,19 +21,21 @@ class KafkaClient(object):
CLIENT_ID = "kafka-python"
ID_GEN = count()
- def __init__(self, hosts, bufsize=4096, client_id=CLIENT_ID):
+ # NOTE: The timeout given to the client should always be greater than the
+ # one passed to SimpleConsumer.get_message(), otherwise you can get a
+ # socket timeout.
+ def __init__(self, host, port, client_id=CLIENT_ID,
+ timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS):
# We need one connection to bootstrap
- self.bufsize = bufsize
self.client_id = client_id
-
- self.hosts = collect_hosts(hosts)
-
- # create connections only when we need them
- self.conns = {}
+ self.timeout = timeout
+ self.conns = { # (host, port) -> KafkaConnection
+ (host, port): KafkaConnection(host, port, timeout=timeout)
+ }
self.brokers = {} # broker_id -> BrokerMetadata
self.topics_to_brokers = {} # topic_id -> broker_id
- self.topic_partitions = defaultdict(list) # topic_id -> [0, 1, 2, ...]
- self._load_metadata_for_topics()
+ self.topic_partitions = {} # topic_id -> [0, 1, 2, ...]
+ self.load_metadata_for_topics() # bootstrap with all metadata
##################
# Private API #
@@ -47,62 +51,25 @@ class KafkaClient(object):
return self.conns[host_key]
def _get_conn_for_broker(self, broker):
- "Get or create a connection to a broker"
+ """
+ Get or create a connection to a broker
+ """
+ if (broker.host, broker.port) not in self.conns:
+ self.conns[(broker.host, broker.port)] = \
+ KafkaConnection(broker.host, broker.port, timeout=self.timeout)
return self._get_conn(broker.host, broker.port)
def _get_leader_for_partition(self, topic, partition):
key = TopicAndPartition(topic, partition)
if key not in self.topics_to_brokers:
- self._load_metadata_for_topics(topic)
+ self.load_metadata_for_topics(topic)
if key not in self.topics_to_brokers:
- raise Exception("Partition does not exist: %s" % str(key))
+ raise KafkaRequestError("Partition does not exist: %s" % str(key))
return self.topics_to_brokers[key]
- def _load_metadata_for_topics(self, *topics):
- """
- Discover brokers and metadata for a set of topics. This method will
- recurse in the event of a retry.
- """
- request_id = self._next_id()
- request = KafkaProtocol.encode_metadata_request(self.client_id,
- request_id, topics)
-
- response = self._send_broker_unaware_request(request_id, request)
- if response is None:
- raise Exception("All servers failed to process request")
-
- (brokers, topics) = KafkaProtocol.decode_metadata_response(response)
-
- log.debug("Broker metadata: %s", brokers)
- log.debug("Topic metadata: %s", topics)
-
- self.brokers = brokers
- self.topics_to_brokers = {}
-
- for topic, partitions in topics.items():
- # Clear the list once before we add it. This removes stale entries
- # and avoids duplicates
- self.topic_partitions.pop(topic, None)
-
- if not partitions:
- log.info("Partition is unassigned, delay for 1s and retry")
- time.sleep(1)
- self._load_metadata_for_topics(topic)
- break
-
- for partition, meta in partitions.items():
- if meta.leader == -1:
- log.info("Partition is unassigned, delay for 1s and retry")
- time.sleep(1)
- self._load_metadata_for_topics(topic)
- else:
- topic_part = TopicAndPartition(topic, partition)
- self.topics_to_brokers[topic_part] = brokers[meta.leader]
- self.topic_partitions[topic].append(partition)
-
def _next_id(self):
"""
Generate a new correlation id
@@ -125,7 +92,7 @@ class KafkaClient(object):
"trying next server: %s" % (request, conn, e))
continue
- return None
+ raise KafkaUnavailableError("All servers failed to process request")
def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
"""
@@ -156,6 +123,8 @@ class KafkaClient(object):
for payload in payloads:
leader = self._get_leader_for_partition(payload.topic,
payload.partition)
+ if leader == -1:
+ raise PartitionUnavailableError("Leader is unassigned for %s-%s" % payload.topic, payload.partition)
payloads_by_broker[leader].append(payload)
original_keys.append((payload.topic, payload.partition))
@@ -172,30 +141,73 @@ class KafkaClient(object):
request = encoder_fn(client_id=self.client_id,
correlation_id=requestId, payloads=payloads)
+ failed = False
# Send the request, recv the response
try:
conn.send(requestId, request)
if decoder_fn is None:
continue
- response = conn.recv(requestId)
- except ConnectionError, e: # ignore BufferUnderflow for now
- log.warning("Could not send request [%s] to server %s: %s" % (request, conn, e))
+ try:
+ response = conn.recv(requestId)
+ except ConnectionError, e:
+ log.warning("Could not receive response to request [%s] "
+ "from server %s: %s", request, conn, e)
+ failed = True
+ except ConnectionError, e:
+ log.warning("Could not send request [%s] to server %s: %s",
+ request, conn, e)
+ failed = True
+
+ if failed:
failed_payloads += payloads
- self.topics_to_brokers = {} # reset metadata
+ self.reset_all_metadata()
continue
for response in decoder_fn(response):
acc[(response.topic, response.partition)] = response
if failed_payloads:
- raise FailedPayloadsException(failed_payloads)
+ raise FailedPayloadsError(failed_payloads)
# Order the accumulated responses by the original key order
return (acc[k] for k in original_keys) if acc else ()
+ def __repr__(self):
+ return '<KafkaClient client_id=%s>' % (self.client_id)
+
+ def _raise_on_response_error(self, resp):
+ if resp.error == ErrorMapping.NO_ERROR:
+ return
+
+ if resp.error in (ErrorMapping.UNKNOWN_TOPIC_OR_PARTITON,
+ ErrorMapping.NOT_LEADER_FOR_PARTITION):
+ self.reset_topic_metadata(resp.topic)
+
+ raise BrokerResponseError(
+ "Request for %s failed with errorcode=%d" %
+ (TopicAndPartition(resp.topic, resp.partition), resp.error))
+
#################
# Public API #
#################
+ def reset_topic_metadata(self, *topics):
+ for topic in topics:
+ try:
+ partitions = self.topic_partitions[topic]
+ except KeyError:
+ continue
+
+ for partition in partitions:
+ self.topics_to_brokers.pop(TopicAndPartition(topic, partition), None)
+
+ del self.topic_partitions[topic]
+
+ def reset_all_metadata(self):
+ self.topics_to_brokers.clear()
+ self.topic_partitions.clear()
+
+ def has_metadata_for_topic(self, topic):
+ return topic in self.topic_partitions
def close(self):
for conn in self.conns.values():
@@ -215,6 +227,36 @@ class KafkaClient(object):
for conn in self.conns.values():
conn.reinit()
+ def load_metadata_for_topics(self, *topics):
+ """
+ Discover brokers and metadata for a set of topics. This function is called
+ lazily whenever metadata is unavailable.
+ """
+ request_id = self._next_id()
+ request = KafkaProtocol.encode_metadata_request(self.client_id,
+ request_id, topics)
+
+ response = self._send_broker_unaware_request(request_id, request)
+
+ (brokers, topics) = KafkaProtocol.decode_metadata_response(response)
+
+ log.debug("Broker metadata: %s", brokers)
+ log.debug("Topic metadata: %s", topics)
+
+ self.brokers = brokers
+
+ for topic, partitions in topics.items():
+ self.reset_topic_metadata(topic)
+
+ if not partitions:
+ continue
+
+ self.topic_partitions[topic] = []
+ for partition, meta in partitions.items():
+ topic_part = TopicAndPartition(topic, partition)
+ self.topics_to_brokers[topic_part] = brokers[meta.leader]
+ self.topic_partitions[topic].append(partition)
+
def send_produce_request(self, payloads=[], acks=1, timeout=1000,
fail_on_error=True, callback=None):
"""
@@ -252,14 +294,9 @@ class KafkaClient(object):
out = []
for resp in resps:
- # Check for errors
- if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR:
- raise Exception(
- "ProduceRequest for %s failed with errorcode=%d" %
- (TopicAndPartition(resp.topic, resp.partition),
- resp.error))
-
- # Run the callback
+ if fail_on_error is True:
+ self._raise_on_response_error(resp)
+
if callback is not None:
out.append(callback(resp))
else:
@@ -285,14 +322,9 @@ class KafkaClient(object):
out = []
for resp in resps:
- # Check for errors
- if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR:
- raise Exception(
- "FetchRequest for %s failed with errorcode=%d" %
- (TopicAndPartition(resp.topic, resp.partition),
- resp.error))
-
- # Run the callback
+ if fail_on_error is True:
+ self._raise_on_response_error(resp)
+
if callback is not None:
out.append(callback(resp))
else:
@@ -308,9 +340,8 @@ class KafkaClient(object):
out = []
for resp in resps:
- if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR:
- raise Exception("OffsetRequest failed with errorcode=%s",
- resp.error)
+ if fail_on_error is True:
+ self._raise_on_response_error(resp)
if callback is not None:
out.append(callback(resp))
else:
@@ -326,9 +357,8 @@ class KafkaClient(object):
out = []
for resp in resps:
- if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR:
- raise Exception("OffsetCommitRequest failed with "
- "errorcode=%s", resp.error)
+ if fail_on_error is True:
+ self._raise_on_response_error(resp)
if callback is not None:
out.append(callback(resp))
@@ -346,9 +376,8 @@ class KafkaClient(object):
out = []
for resp in resps:
- if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR:
- raise Exception("OffsetCommitRequest failed with errorcode=%s",
- resp.error)
+ if fail_on_error is True:
+ self._raise_on_response_error(resp)
if callback is not None:
out.append(callback(resp))
else:
diff --git a/kafka/common.py b/kafka/common.py
index 6f0dd32..c0a1a6a 100644
--- a/kafka/common.py
+++ b/kafka/common.py
@@ -69,23 +69,46 @@ class ErrorMapping(object):
# Exceptions #
#################
-class FailedPayloadsException(Exception):
+
+class KafkaError(RuntimeError):
+ pass
+
+
+class KafkaRequestError(KafkaError):
+ pass
+
+
+class KafkaUnavailableError(KafkaError):
+ pass
+
+
+class BrokerResponseError(KafkaError):
pass
-class ConnectionError(Exception):
+
+class PartitionUnavailableError(KafkaError):
+ pass
+
+
+class FailedPayloadsError(KafkaError):
pass
-class BufferUnderflowError(Exception):
+
+class ConnectionError(KafkaError):
+ pass
+
+
+class BufferUnderflowError(KafkaError):
pass
-class ChecksumError(Exception):
+class ChecksumError(KafkaError):
pass
-class ConsumerFetchSizeTooSmall(Exception):
+class ConsumerFetchSizeTooSmall(KafkaError):
pass
-class ConsumerNoMoreData(Exception):
+class ConsumerNoMoreData(KafkaError):
pass
diff --git a/kafka/conn.py b/kafka/conn.py
index 614b1bb..de2d385 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -5,11 +5,11 @@ import struct
from random import shuffle
from threading import local
-from kafka.common import BufferUnderflowError
from kafka.common import ConnectionError
log = logging.getLogger("kafka")
+DEFAULT_SOCKET_TIMEOUT_SECONDS = 120
def collect_hosts(hosts, randomize=True):
"""
@@ -39,64 +39,53 @@ class KafkaConnection(local):
by a call to `recv` in order to get the correct response. Eventually,
we can do something in here to facilitate multiplexed requests/responses
since the Kafka API includes a correlation id.
+
+ host: the host name or IP address of a kafka broker
+ port: the port number the kafka broker is listening on
+ timeout: default 120. The socket timeout for sending and receiving data
+ in seconds. None means no timeout, so a request can block forever.
"""
- def __init__(self, host, port, bufsize=4096, timeout=10):
+ def __init__(self, host, port, timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS):
super(KafkaConnection, self).__init__()
self.host = host
self.port = port
- self.bufsize = bufsize
+ self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self._sock.connect((host, port))
self.timeout = timeout
-
- self._sock = socket.create_connection((host, port), timeout=timeout)
+ self._sock.settimeout(self.timeout)
self._dirty = False
- def __str__(self):
+ def __repr__(self):
return "<KafkaConnection host=%s port=%d>" % (self.host, self.port)
###################
# Private API #
###################
- def _consume_response(self):
- """
- Fully consumer the response iterator
- """
- data = ""
- for chunk in self._consume_response_iter():
- data += chunk
- return data
-
- def _consume_response_iter(self):
- """
- This method handles the response header and error messages. It
- then returns an iterator for the chunks of the response
- """
- log.debug("Handling response from Kafka")
-
- # Read the size off of the header
- resp = self._sock.recv(4)
- if resp == "":
- self._raise_connection_error()
- (size,) = struct.unpack('>i', resp)
-
- messagesize = size - 4
- log.debug("About to read %d bytes from Kafka", messagesize)
-
- # Read the remainder of the response
- total = 0
- while total < messagesize:
- resp = self._sock.recv(self.bufsize)
- log.debug("Read %d bytes from Kafka", len(resp))
- if resp == "":
- raise BufferUnderflowError(
- "Not enough data to read this response")
-
- total += len(resp)
- yield resp
-
def _raise_connection_error(self):
self._dirty = True
- raise ConnectionError("Kafka @ {}:{} went away".format(self.host, self.port))
+ raise ConnectionError("Kafka @ {0}:{1} went away".format(self.host, self.port))
+
+ def _read_bytes(self, num_bytes):
+ bytes_left = num_bytes
+ resp = ''
+ log.debug("About to read %d bytes from Kafka", num_bytes)
+ if self._dirty:
+ self.reinit()
+ while bytes_left:
+ try:
+ data = self._sock.recv(bytes_left)
+ except socket.error:
+ log.exception('Unable to receive data from Kafka')
+ self._raise_connection_error()
+ if data == '':
+ log.error("Not enough data to read this response")
+ self._raise_connection_error()
+ bytes_left -= len(data)
+ log.debug("Read %d/%d bytes from Kafka", num_bytes - bytes_left, num_bytes)
+ resp += data
+
+ return resp
##################
# Public API #
@@ -113,7 +102,7 @@ class KafkaConnection(local):
sent = self._sock.sendall(payload)
if sent is not None:
self._raise_connection_error()
- except socket.error:
+ except socket.error, e:
log.exception('Unable to send payload to Kafka')
self._raise_connection_error()
@@ -122,8 +111,14 @@ class KafkaConnection(local):
Get a response from Kafka
"""
log.debug("Reading response %d from Kafka" % request_id)
- self.data = self._consume_response()
- return self.data
+ # Read the size off of the header
+ resp = self._read_bytes(4)
+
+ (size,) = struct.unpack('>i', resp)
+
+ # Read the remainder of the response
+ resp = self._read_bytes(size)
+ return str(resp)
def copy(self):
"""
@@ -146,5 +141,7 @@ class KafkaConnection(local):
Re-initialize the socket connection
"""
self.close()
- self._sock = socket.create_connection((self.host, self.port), timeout=self.timeout)
+ self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self._sock.connect((self.host, self.port))
+ self._sock.settimeout(self.timeout)
self._dirty = False
diff --git a/kafka/consumer.py b/kafka/consumer.py
index f2898ad..28b53ec 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -1,10 +1,11 @@
-from collections import defaultdict
+from __future__ import absolute_import
+
from itertools import izip_longest, repeat
import logging
import time
from threading import Lock
-from multiprocessing import Process, Queue, Event, Value
-from Queue import Empty
+from multiprocessing import Process, Queue as MPQueue, Event, Value
+from Queue import Empty, Queue
from kafka.common import (
ErrorMapping, FetchRequest,
@@ -22,6 +23,11 @@ AUTO_COMMIT_INTERVAL = 5000
FETCH_DEFAULT_BLOCK_TIMEOUT = 1
FETCH_MAX_WAIT_TIME = 100
FETCH_MIN_BYTES = 4096
+FETCH_BUFFER_SIZE_BYTES = 4096
+MAX_FETCH_BUFFER_SIZE_BYTES = FETCH_BUFFER_SIZE_BYTES * 8
+
+ITER_TIMEOUT_SECONDS = 60
+NO_MESSAGES_WAIT_TIME_SECONDS = 0.1
class FetchContext(object):
@@ -32,13 +38,15 @@ class FetchContext(object):
self.consumer = consumer
self.block = block
- if block and not timeout:
- timeout = FETCH_DEFAULT_BLOCK_TIMEOUT
-
- self.timeout = timeout * 1000
+ if block:
+ if not timeout:
+ timeout = FETCH_DEFAULT_BLOCK_TIMEOUT
+ self.timeout = timeout * 1000
def __enter__(self):
"""Set fetch values based on blocking status"""
+ self.orig_fetch_max_wait_time = self.consumer.fetch_max_wait_time
+ self.orig_fetch_min_bytes = self.consumer.fetch_min_bytes
if self.block:
self.consumer.fetch_max_wait_time = self.timeout
self.consumer.fetch_min_bytes = 1
@@ -46,9 +54,9 @@ class FetchContext(object):
self.consumer.fetch_min_bytes = 0
def __exit__(self, type, value, traceback):
- """Reset values to default"""
- self.consumer.fetch_max_wait_time = FETCH_MAX_WAIT_TIME
- self.consumer.fetch_min_bytes = FETCH_MIN_BYTES
+ """Reset values"""
+ self.consumer.fetch_max_wait_time = self.orig_fetch_max_wait_time
+ self.consumer.fetch_min_bytes = self.orig_fetch_min_bytes
class Consumer(object):
@@ -67,7 +75,7 @@ class Consumer(object):
self.client = client
self.topic = topic
self.group = group
- self.client._load_metadata_for_topics(topic)
+ self.client.load_metadata_for_topics(topic)
self.offsets = {}
if not partitions:
@@ -204,8 +212,14 @@ class SimpleConsumer(Consumer):
before a commit
auto_commit_every_t: default 5000. How much time (in milliseconds) to
wait before commit
-
fetch_size_bytes: number of bytes to request in a FetchRequest
+ buffer_size: default 4K. Initial number of bytes to tell kafka we
+ have available. This will double as needed.
+ max_buffer_size: default 16K. Max number of bytes to tell kafka we have
+ available. None means no limit.
+ iter_timeout: default None. How much time (in seconds) to wait for a
+ message in the iterator before exiting. None means no
+ timeout, so it will wait forever.
Auto commit details:
If both auto_commit_every_n and auto_commit_every_t are set, they will
@@ -216,13 +230,10 @@ class SimpleConsumer(Consumer):
def __init__(self, client, group, topic, auto_commit=True, partitions=None,
auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,
auto_commit_every_t=AUTO_COMMIT_INTERVAL,
- fetch_size_bytes=FETCH_MIN_BYTES):
-
- self.partition_info = False # Do not return partition info in msgs
- self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME
- self.fetch_min_bytes = fetch_size_bytes
- self.fetch_started = defaultdict(bool) # defaults to false
-
+ fetch_size_bytes=FETCH_MIN_BYTES,
+ buffer_size=FETCH_BUFFER_SIZE_BYTES,
+ max_buffer_size=MAX_FETCH_BUFFER_SIZE_BYTES,
+ iter_timeout=None):
super(SimpleConsumer, self).__init__(
client, group, topic,
partitions=partitions,
@@ -230,6 +241,23 @@ class SimpleConsumer(Consumer):
auto_commit_every_n=auto_commit_every_n,
auto_commit_every_t=auto_commit_every_t)
+ if max_buffer_size is not None and buffer_size > max_buffer_size:
+ raise ValueError("buffer_size (%d) is greater than "
+ "max_buffer_size (%d)" %
+ (buffer_size, max_buffer_size))
+ self.buffer_size = buffer_size
+ self.max_buffer_size = max_buffer_size
+ self.partition_info = False # Do not return partition info in msgs
+ self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME
+ self.fetch_min_bytes = fetch_size_bytes
+ self.fetch_offsets = self.offsets.copy()
+ self.iter_timeout = iter_timeout
+ self.queue = Queue()
+
+ def __repr__(self):
+ return '<SimpleConsumer group=%s, topic=%s, partitions=%s>' % \
+ (self.group, self.topic, str(self.offsets.keys()))
+
def provide_partition_info(self):
"""
Indicates that partition info must be returned by the consumer
@@ -265,12 +293,6 @@ class SimpleConsumer(Consumer):
reqs.append(OffsetRequest(self.topic, partition, -2, 1))
elif whence == 2:
reqs.append(OffsetRequest(self.topic, partition, -1, 1))
-
- # The API returns back the next available offset
- # For eg: if the current offset is 18, the API will return
- # back 19. So, if we have to seek 5 points before, we will
- # end up going back to 14, instead of 13. Adjust this
- deltas[partition] -= 1
else:
pass
@@ -281,128 +303,148 @@ class SimpleConsumer(Consumer):
else:
raise ValueError("Unexpected value for `whence`, %d" % whence)
+ # Reset queue and fetch offsets since they are invalid
+ self.fetch_offsets = self.offsets.copy()
+ self.queue = Queue()
+
def get_messages(self, count=1, block=True, timeout=0.1):
"""
Fetch the specified number of messages
count: Indicates the maximum number of messages to be fetched
block: If True, the API will block till some messages are fetched.
- timeout: If None, and block=True, the API will block infinitely.
- If >0, API will block for specified time (in seconds)
+ timeout: If block is True, the function will block for the specified
+ time (in seconds) until count messages is fetched. If None,
+ it will block forever.
"""
messages = []
- iterator = self.__iter__()
-
- # HACK: This splits the timeout between available partitions
- timeout = timeout * 1.0 / len(self.offsets)
-
- with FetchContext(self, block, timeout):
- while count > 0:
- try:
- messages.append(next(iterator))
- except StopIteration:
- break
+ if timeout is not None:
+ max_time = time.time() + timeout
+
+ new_offsets = {}
+ while count > 0 and (timeout is None or timeout > 0):
+ result = self._get_message(block, timeout, get_partition_info=True,
+ update_offset=False)
+ if result:
+ partition, message = result
+ if self.partition_info:
+ messages.append(result)
+ else:
+ messages.append(message)
+ new_offsets[partition] = message.offset + 1
count -= 1
-
+ else:
+ # Ran out of messages for the last request.
+ if not block:
+ # If we're not blocking, break.
+ break
+ if timeout is not None:
+ # If we're blocking and have a timeout, reduce it to the
+ # appropriate value
+ timeout = max_time - time.time()
+
+ # Update and commit offsets if necessary
+ self.offsets.update(new_offsets)
+ self.count_since_commit += len(messages)
+ self._auto_commit()
return messages
- def __iter__(self):
+ def get_message(self, block=True, timeout=0.1, get_partition_info=None):
+ return self._get_message(block, timeout, get_partition_info)
+
+ def _get_message(self, block=True, timeout=0.1, get_partition_info=None,
+ update_offset=True):
"""
- Create an iterate per partition. Iterate through them calling next()
- until they are all exhausted.
+ If no messages can be fetched, returns None.
+ If get_partition_info is None, it defaults to self.partition_info
+ If get_partition_info is True, returns (partition, message)
+ If get_partition_info is False, returns message
"""
- iters = {}
- for partition, offset in self.offsets.items():
- iters[partition] = self.__iter_partition__(partition, offset)
-
- if len(iters) == 0:
- return
-
- while True:
- if len(iters) == 0:
- break
-
- for partition, it in iters.items():
- try:
- if self.partition_info:
- yield (partition, it.next())
- else:
- yield it.next()
- except StopIteration:
- log.debug("Done iterating over partition %s" % partition)
- del iters[partition]
+ if self.queue.empty():
+ # We're out of messages, go grab some more.
+ with FetchContext(self, block, timeout):
+ self._fetch()
+ try:
+ partition, message = self.queue.get_nowait()
- # skip auto-commit since we didn't yield anything
- continue
+ if update_offset:
+ # Update partition offset
+ self.offsets[partition] = message.offset + 1
# Count, check and commit messages if necessary
self.count_since_commit += 1
self._auto_commit()
- def __iter_partition__(self, partition, offset):
- """
- Iterate over the messages in a partition. Create a FetchRequest
- to get back a batch of messages, yield them one at a time.
- After a batch is exhausted, start a new batch unless we've reached
- the end of this partition.
- """
-
- # The offset that is stored in the consumer is the offset that
- # we have consumed. In subsequent iterations, we are supposed to
- # fetch the next message (that is from the next offset)
- # However, for the 0th message, the offset should be as-is.
- # An OffsetFetchRequest to Kafka gives 0 for a new queue. This is
- # problematic, since 0 is offset of a message which we have not yet
- # consumed.
- if self.fetch_started[partition]:
- offset += 1
+ if get_partition_info is None:
+ get_partition_info = self.partition_info
+ if get_partition_info:
+ return partition, message
+ else:
+ return message
+ except Empty:
+ return None
- fetch_size = self.fetch_min_bytes
+ def __iter__(self):
+ if self.iter_timeout is None:
+ timeout = ITER_TIMEOUT_SECONDS
+ else:
+ timeout = self.iter_timeout
while True:
- # use MaxBytes = client's bufsize since we're only
- # fetching one topic + partition
- req = FetchRequest(
- self.topic, partition, offset, self.client.bufsize)
-
- (resp,) = self.client.send_fetch_request(
- [req],
- max_wait_time=self.fetch_max_wait_time,
- min_bytes=fetch_size)
-
- assert resp.topic == self.topic
- assert resp.partition == partition
-
- next_offset = None
- try:
- for message in resp.messages:
- next_offset = message.offset
-
- # update the offset before the message is yielded. This
- # is so that the consumer state is not lost in certain
- # cases.
- #
- # For eg: the message is yielded and consumed by the
- # caller, but the caller does not come back into the
- # generator again. The message will be consumed but the
- # status will not be updated in the consumer
- self.fetch_started[partition] = True
- self.offsets[partition] = message.offset
- yield message
- except ConsumerFetchSizeTooSmall, e:
- fetch_size *= 1.5
- log.warn(
- "Fetch size too small, increasing to %d (1.5x) and retry",
- fetch_size)
- continue
- except ConsumerNoMoreData, e:
- log.debug("Iteration was ended by %r", e)
-
- if next_offset is None:
- break
+ message = self.get_message(True, timeout)
+ if message:
+ yield message
+ elif self.iter_timeout is None:
+ # We did not receive any message yet but we don't have a
+ # timeout, so give up the CPU for a while before trying again
+ time.sleep(NO_MESSAGES_WAIT_TIME_SECONDS)
else:
- offset = next_offset + 1
+ # Timed out waiting for a message
+ break
+ def _fetch(self):
+ # Create fetch request payloads for all the partitions
+ requests = []
+ partitions = self.fetch_offsets.keys()
+ while partitions:
+ for partition in partitions:
+ requests.append(FetchRequest(self.topic, partition,
+ self.fetch_offsets[partition],
+ self.buffer_size))
+ # Send request
+ responses = self.client.send_fetch_request(
+ requests,
+ max_wait_time=int(self.fetch_max_wait_time),
+ min_bytes=self.fetch_min_bytes)
+
+ retry_partitions = set()
+ for resp in responses:
+ partition = resp.partition
+ try:
+ for message in resp.messages:
+ # Put the message in our queue
+ self.queue.put((partition, message))
+ self.fetch_offsets[partition] = message.offset + 1
+ except ConsumerFetchSizeTooSmall, e:
+ if (self.max_buffer_size is not None and
+ self.buffer_size == self.max_buffer_size):
+ log.error("Max fetch size %d too small",
+ self.max_buffer_size)
+ raise e
+ if self.max_buffer_size is None:
+ self.buffer_size *= 2
+ else:
+ self.buffer_size = max(self.buffer_size * 2,
+ self.max_buffer_size)
+ log.warn("Fetch size too small, increase to %d (2x) "
+ "and retry", self.buffer_size)
+ retry_partitions.add(partition)
+ except ConsumerNoMoreData, e:
+ log.debug("Iteration was ended by %r", e)
+ except StopIteration:
+ # Stop iterating through this partition
+ log.debug("Done iterating over partition %s" % partition)
+ partitions = retry_partitions
def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size):
"""
@@ -440,8 +482,9 @@ def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size):
# indicates a specific number of messages, follow that advice
count = 0
- for partition, message in consumer:
- queue.put((partition, message))
+ message = consumer.get_message()
+ if message:
+ queue.put(message)
count += 1
# We have reached the required size. The controller might have
@@ -451,12 +494,11 @@ def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size):
# can reset the 'start' event
if count == size.value:
pause.wait()
- break
- # In case we did not receive any message, give up the CPU for
- # a while before we try again
- if count == 0:
- time.sleep(0.1)
+ else:
+ # In case we did not receive any message, give up the CPU for
+ # a while before we try again
+ time.sleep(NO_MESSAGES_WAIT_TIME_SECONDS)
consumer.stop()
@@ -501,7 +543,7 @@ class MultiProcessConsumer(Consumer):
# Variables for managing and controlling the data flow from
# consumer child process to master
- self.queue = Queue(1024) # Child consumers dump messages into this
+ self.queue = MPQueue(1024) # Child consumers dump messages into this
self.start = Event() # Indicates the consumers to start fetch
self.exit = Event() # Requests the consumers to shutdown
self.pause = Event() # Requests the consumers to pause fetch
@@ -535,6 +577,10 @@ class MultiProcessConsumer(Consumer):
proc.start()
self.procs.append(proc)
+ def __repr__(self):
+ return '<MultiProcessConsumer group=%s, topic=%s, consumers=%d>' % \
+ (self.group, self.topic, len(self.procs))
+
def stop(self):
# Set exit and start off all waiting consumers
self.exit.set()
@@ -568,12 +614,11 @@ class MultiProcessConsumer(Consumer):
break
# Count, check and commit messages if necessary
- self.offsets[partition] = message.offset
+ self.offsets[partition] = message.offset + 1
self.start.clear()
- yield message
-
self.count_since_commit += 1
self._auto_commit()
+ yield message
self.start.clear()
@@ -583,8 +628,9 @@ class MultiProcessConsumer(Consumer):
count: Indicates the maximum number of messages to be fetched
block: If True, the API will block till some messages are fetched.
- timeout: If None, and block=True, the API will block infinitely.
- If >0, API will block for specified time (in seconds)
+ timeout: If block is True, the function will block for the specified
+ time (in seconds) until count messages is fetched. If None,
+ it will block forever.
"""
messages = []
@@ -595,7 +641,11 @@ class MultiProcessConsumer(Consumer):
self.size.value = count
self.pause.clear()
- while count > 0:
+ if timeout is not None:
+ max_time = time.time() + timeout
+
+ new_offsets = {}
+ while count > 0 and (timeout is None or timeout > 0):
# Trigger consumption only if the queue is empty
# By doing this, we will ensure that consumers do not
# go into overdrive and keep consuming thousands of
@@ -609,15 +659,18 @@ class MultiProcessConsumer(Consumer):
break
messages.append(message)
-
- # Count, check and commit messages if necessary
- self.offsets[partition] = message.offset
- self.count_since_commit += 1
- self._auto_commit()
+ new_offsets[partition] = message.offset + 1
count -= 1
+ if timeout is not None:
+ timeout = max_time - time.time()
self.size.value = 0
self.start.clear()
self.pause.set()
+ # Update and commit offsets if necessary
+ self.offsets.update(new_offsets)
+ self.count_since_commit += len(messages)
+ self._auto_commit()
+
return messages
diff --git a/kafka/producer.py b/kafka/producer.py
index 7ef7896..12a2934 100644
--- a/kafka/producer.py
+++ b/kafka/producer.py
@@ -1,15 +1,16 @@
+from __future__ import absolute_import
+
+import logging
+import time
+
+from Queue import Empty
from collections import defaultdict
-from datetime import datetime, timedelta
from itertools import cycle
from multiprocessing import Queue, Process
-from Queue import Empty
-import logging
-import sys
-from kafka.common import ProduceRequest
-from kafka.common import FailedPayloadsException
-from kafka.protocol import create_message
+from kafka.common import ProduceRequest, TopicAndPartition
from kafka.partitioner import HashedPartitioner
+from kafka.protocol import create_message
log = logging.getLogger("kafka")
@@ -19,7 +20,7 @@ BATCH_SEND_MSG_COUNT = 20
STOP_ASYNC_PRODUCER = -1
-def _send_upstream(topic, queue, client, batch_time, batch_size,
+def _send_upstream(queue, client, batch_time, batch_size,
req_acks, ack_timeout):
"""
Listen on the queue for a specified number of messages or till
@@ -36,38 +37,41 @@ def _send_upstream(topic, queue, client, batch_time, batch_size,
while not stop:
timeout = batch_time
count = batch_size
- send_at = datetime.now() + timedelta(seconds=timeout)
+ send_at = time.time() + timeout
msgset = defaultdict(list)
# Keep fetching till we gather enough messages or a
# timeout is reached
while count > 0 and timeout >= 0:
try:
- partition, msg = queue.get(timeout=timeout)
+ topic_partition, msg = queue.get(timeout=timeout)
+
except Empty:
break
# Check if the controller has requested us to stop
- if partition == STOP_ASYNC_PRODUCER:
+ if topic_partition == STOP_ASYNC_PRODUCER:
stop = True
break
# Adjust the timeout to match the remaining period
count -= 1
- timeout = (send_at - datetime.now()).total_seconds()
- msgset[partition].append(msg)
+ timeout = send_at - time.time()
+ msgset[topic_partition].append(msg)
# Send collected requests upstream
reqs = []
- for partition, messages in msgset.items():
- req = ProduceRequest(topic, partition, messages)
+ for topic_partition, messages in msgset.items():
+ req = ProduceRequest(topic_partition.topic,
+ topic_partition.partition,
+ messages)
reqs.append(req)
try:
client.send_produce_request(reqs,
acks=req_acks,
timeout=ack_timeout)
- except Exception as exp:
+ except Exception:
log.exception("Unable to send message")
@@ -77,7 +81,6 @@ class Producer(object):
Params:
client - The Kafka client instance to use
- topic - The topic for sending messages to
async - If set to true, the messages are sent asynchronously via another
thread (process). We will not wait for a response to these
req_acks - A value indicating the acknowledgements that the server must
@@ -118,8 +121,7 @@ class Producer(object):
if self.async:
self.queue = Queue() # Messages are sent through this queue
self.proc = Process(target=_send_upstream,
- args=(self.topic,
- self.queue,
+ args=(self.queue,
self.client.copy(),
batch_send_every_t,
batch_send_every_n,
@@ -130,23 +132,24 @@ class Producer(object):
self.proc.daemon = True
self.proc.start()
- def send_messages(self, partition, *msg):
+ def send_messages(self, topic, partition, *msg):
"""
Helper method to send produce requests
"""
if self.async:
for m in msg:
- self.queue.put((partition, create_message(m)))
+ self.queue.put((TopicAndPartition(topic, partition),
+ create_message(m)))
resp = []
else:
messages = [create_message(m) for m in msg]
- req = ProduceRequest(self.topic, partition, messages)
+ req = ProduceRequest(topic, partition, messages)
try:
resp = self.client.send_produce_request([req], acks=self.req_acks,
timeout=self.ack_timeout)
- except Exception as e:
+ except Exception:
log.exception("Unable to send messages")
- raise e
+ raise
return resp
def stop(self, timeout=1):
@@ -168,7 +171,6 @@ class SimpleProducer(Producer):
Params:
client - The Kafka client instance to use
- topic - The topic for sending messages to
async - If True, the messages are sent asynchronously via another
thread (process). We will not wait for a response to these
req_acks - A value indicating the acknowledgements that the server must
@@ -179,24 +181,31 @@ class SimpleProducer(Producer):
batch_send_every_n - If set, messages are send in batches of this size
batch_send_every_t - If set, messages are send after this timeout
"""
- def __init__(self, client, topic, async=False,
+ def __init__(self, client, async=False,
req_acks=Producer.ACK_AFTER_LOCAL_WRITE,
ack_timeout=Producer.DEFAULT_ACK_TIMEOUT,
batch_send=False,
batch_send_every_n=BATCH_SEND_MSG_COUNT,
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL):
- self.topic = topic
- client._load_metadata_for_topics(topic)
- self.next_partition = cycle(client.topic_partitions[topic])
-
+ self.partition_cycles = {}
super(SimpleProducer, self).__init__(client, async, req_acks,
ack_timeout, batch_send,
batch_send_every_n,
batch_send_every_t)
- def send_messages(self, *msg):
- partition = self.next_partition.next()
- return super(SimpleProducer, self).send_messages(partition, *msg)
+ def _next_partition(self, topic):
+ if topic not in self.partition_cycles:
+ if topic not in self.client.topic_partitions:
+ self.client.load_metadata_for_topics(topic)
+ self.partition_cycles[topic] = cycle(self.client.topic_partitions[topic])
+ return self.partition_cycles[topic].next()
+
+ def send_messages(self, topic, *msg):
+ partition = self._next_partition(topic)
+ return super(SimpleProducer, self).send_messages(topic, partition, *msg)
+
+ def __repr__(self):
+ return '<SimpleProducer batch=%s>' % self.async
class KeyedProducer(Producer):
@@ -205,7 +214,6 @@ class KeyedProducer(Producer):
Args:
client - The kafka client instance
- topic - The kafka topic to send messages to
partitioner - A partitioner class that will be used to get the partition
to send the message to. Must be derived from Partitioner
async - If True, the messages are sent asynchronously via another
@@ -216,26 +224,34 @@ class KeyedProducer(Producer):
batch_send_every_n - If set, messages are send in batches of this size
batch_send_every_t - If set, messages are send after this timeout
"""
- def __init__(self, client, topic, partitioner=None, async=False,
+ def __init__(self, client, partitioner=None, async=False,
req_acks=Producer.ACK_AFTER_LOCAL_WRITE,
ack_timeout=Producer.DEFAULT_ACK_TIMEOUT,
batch_send=False,
batch_send_every_n=BATCH_SEND_MSG_COUNT,
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL):
- self.topic = topic
- client._load_metadata_for_topics(topic)
-
if not partitioner:
partitioner = HashedPartitioner
-
- self.partitioner = partitioner(client.topic_partitions[topic])
+ self.partitioner_class = partitioner
+ self.partitioners = {}
super(KeyedProducer, self).__init__(client, async, req_acks,
ack_timeout, batch_send,
batch_send_every_n,
batch_send_every_t)
- def send(self, key, msg):
- partitions = self.client.topic_partitions[self.topic]
- partition = self.partitioner.partition(key, partitions)
- return self.send_messages(partition, msg)
+ def _next_partition(self, topic, key):
+ if topic not in self.partitioners:
+ if topic not in self.client.topic_partitions:
+ self.client.load_metadata_for_topics(topic)
+ self.partitioners[topic] = \
+ self.partitioner_class(self.client.topic_partitions[topic])
+ partitioner = self.partitioners[topic]
+ return partitioner.partition(key, self.client.topic_partitions[topic])
+
+ def send(self, topic, key, msg):
+ partition = self._next_partition(topic, key)
+ return self.send_messages(topic, partition, msg)
+
+ def __repr__(self):
+ return '<KeyedProducer batch=%s>' % self.async
diff --git a/kafka/protocol.py b/kafka/protocol.py
index 612acf6..25be023 100644
--- a/kafka/protocol.py
+++ b/kafka/protocol.py
@@ -29,8 +29,8 @@ class KafkaProtocol(object):
FETCH_KEY = 1
OFFSET_KEY = 2
METADATA_KEY = 3
- OFFSET_COMMIT_KEY = 6
- OFFSET_FETCH_KEY = 7
+ OFFSET_COMMIT_KEY = 8
+ OFFSET_FETCH_KEY = 9
ATTRIBUTE_CODEC_MASK = 0x03
CODEC_NONE = 0x00
@@ -119,9 +119,17 @@ class KafkaProtocol(object):
read_message = True
yield OffsetAndMessage(offset, message)
except BufferUnderflowError:
+ # NOTE: Not sure this is correct error handling:
+ # Is it possible to get a BUE if the message set is somewhere
+ # in the middle of the fetch response? If so, we probably have
+ # an issue that's not fetch size too small.
+ # Aren't we ignoring errors if we fail to unpack data by
+ # raising StopIteration()?
+ # If _decode_message() raises a ChecksumError, couldn't that
+ # also be due to the fetch size being too small?
if read_message is False:
# If we get a partial read of a message, but haven't
- # yielded anyhting there's a problem
+ # yielded anything there's a problem
raise ConsumerFetchSizeTooSmall()
else:
raise StopIteration()
@@ -171,7 +179,7 @@ class KafkaProtocol(object):
Params
======
client_id: string
- correlation_id: string
+ correlation_id: int
payloads: list of ProduceRequest
acks: How "acky" you want the request to be
0: immediate response
@@ -231,7 +239,7 @@ class KafkaProtocol(object):
Params
======
client_id: string
- correlation_id: string
+ correlation_id: int
payloads: list of FetchRequest
max_wait_time: int, how long to block waiting on min_bytes of data
min_bytes: int, the minimum number of bytes to accumulate before
@@ -338,7 +346,7 @@ class KafkaProtocol(object):
Params
======
client_id: string
- correlation_id: string
+ correlation_id: int
topics: list of strings
"""
topics = [] if topics is None else topics
@@ -376,12 +384,16 @@ class KafkaProtocol(object):
topic_metadata = {}
for i in range(num_topics):
+ # NOTE: topic_error is discarded. Should probably be returned with
+ # the topic metadata.
((topic_error,), cur) = relative_unpack('>h', data, cur)
(topic_name, cur) = read_short_string(data, cur)
((num_partitions,), cur) = relative_unpack('>i', data, cur)
partition_metadata = {}
for j in range(num_partitions):
+ # NOTE: partition_error_code is discarded. Should probably be
+ # returned with the partition metadata.
((partition_error_code, partition, leader, numReplicas), cur) = \
relative_unpack('>hiii', data, cur)
@@ -408,7 +420,7 @@ class KafkaProtocol(object):
Params
======
client_id: string
- correlation_id: string
+ correlation_id: int
group: string, the consumer group you are committing offsets for
payloads: list of OffsetCommitRequest
"""
@@ -439,7 +451,6 @@ class KafkaProtocol(object):
data: bytes to decode
"""
((correlation_id,), cur) = relative_unpack('>i', data, 0)
- (client_id, cur) = read_short_string(data, cur)
((num_topics,), cur) = relative_unpack('>i', data, cur)
for i in xrange(num_topics):
@@ -459,7 +470,7 @@ class KafkaProtocol(object):
Params
======
client_id: string
- correlation_id: string
+ correlation_id: int
group: string, the consumer group you are fetching offsets for
payloads: list of OffsetFetchRequest
"""
@@ -490,7 +501,6 @@ class KafkaProtocol(object):
"""
((correlation_id,), cur) = relative_unpack('>i', data, 0)
- (client_id, cur) = read_short_string(data, cur)
((num_topics,), cur) = relative_unpack('>i', data, cur)
for i in range(num_topics):
diff --git a/kafka/queue.py b/kafka/queue.py
index a996369..ada495f 100644
--- a/kafka/queue.py
+++ b/kafka/queue.py
@@ -1,3 +1,5 @@
+from __future__ import absolute_import
+
from copy import copy
import logging
from multiprocessing import Process, Queue, Event