summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/__init__.py2
-rw-r--r--kafka/client.py6
-rw-r--r--kafka/common.py11
-rw-r--r--kafka/consumer/__init__.py3
-rw-r--r--kafka/consumer/kafka.py735
-rw-r--r--kafka/util.py12
-rw-r--r--test/test_client_integration.py16
-rw-r--r--test/test_codec.py8
-rw-r--r--test/test_conn.py6
-rw-r--r--test/test_consumer.py7
-rw-r--r--test/test_consumer_integration.py159
-rw-r--r--test/test_failover_integration.py2
-rw-r--r--test/test_package.py22
-rw-r--r--test/test_producer_integration.py20
14 files changed, 941 insertions, 68 deletions
diff --git a/kafka/__init__.py b/kafka/__init__.py
index 58ca619..16b9094 100644
--- a/kafka/__init__.py
+++ b/kafka/__init__.py
@@ -13,7 +13,7 @@ from kafka.protocol import (
)
from kafka.producer import SimpleProducer, KeyedProducer
from kafka.partitioner import RoundRobinPartitioner, HashedPartitioner
-from kafka.consumer import SimpleConsumer, MultiProcessConsumer
+from kafka.consumer import SimpleConsumer, MultiProcessConsumer, KafkaConsumer
__all__ = [
'KafkaClient', 'KafkaConnection', 'SimpleProducer', 'KeyedProducer',
diff --git a/kafka/client.py b/kafka/client.py
index 8c78694..bc3d853 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -119,9 +119,9 @@ class KafkaClient(object):
response = conn.recv(requestId)
return decoder_fn(response)
- except Exception as e:
- log.warning("Could not send request [%r] to server %s:%i, "
- "trying next server: %s" % (requestId, host, port, e))
+ except Exception:
+ log.exception("Could not send request [%r] to server %s:%i, "
+ "trying next server" % (requestId, host, port))
raise KafkaUnavailableError("All servers failed to process request")
diff --git a/kafka/common.py b/kafka/common.py
index 008736c..e4b3b1b 100644
--- a/kafka/common.py
+++ b/kafka/common.py
@@ -63,6 +63,9 @@ Message = namedtuple("Message",
TopicAndPartition = namedtuple("TopicAndPartition",
["topic", "partition"])
+KafkaMessage = namedtuple("KafkaMessage",
+ ["topic", "partition", "offset", "key", "value"])
+
#################
# Exceptions #
@@ -182,6 +185,10 @@ class ConsumerNoMoreData(KafkaError):
pass
+class ConsumerTimeout(KafkaError):
+ pass
+
+
class ProtocolError(KafkaError):
pass
@@ -190,6 +197,10 @@ class UnsupportedCodecError(KafkaError):
pass
+class KafkaConfigurationError(KafkaError):
+ pass
+
+
kafka_errors = {
-1 : UnknownError,
0 : NoError,
diff --git a/kafka/consumer/__init__.py b/kafka/consumer/__init__.py
index d2fa306..935f56e 100644
--- a/kafka/consumer/__init__.py
+++ b/kafka/consumer/__init__.py
@@ -1,6 +1,7 @@
from .simple import SimpleConsumer
from .multiprocess import MultiProcessConsumer
+from .kafka import KafkaConsumer
__all__ = [
- 'SimpleConsumer', 'MultiProcessConsumer'
+ 'SimpleConsumer', 'MultiProcessConsumer', 'KafkaConsumer'
]
diff --git a/kafka/consumer/kafka.py b/kafka/consumer/kafka.py
new file mode 100644
index 0000000..f16b526
--- /dev/null
+++ b/kafka/consumer/kafka.py
@@ -0,0 +1,735 @@
+from __future__ import absolute_import
+
+from collections import namedtuple
+from copy import deepcopy
+import logging
+import random
+import sys
+import time
+
+import six
+
+from kafka.client import KafkaClient
+from kafka.common import (
+ OffsetFetchRequest, OffsetCommitRequest, OffsetRequest, FetchRequest,
+ check_error, NotLeaderForPartitionError, UnknownTopicOrPartitionError,
+ OffsetOutOfRangeError, RequestTimedOutError, KafkaMessage, ConsumerTimeout,
+ FailedPayloadsError, KafkaUnavailableError, KafkaConfigurationError
+)
+from kafka.util import kafka_bytestring
+
+logger = logging.getLogger(__name__)
+
+OffsetsStruct = namedtuple("OffsetsStruct", ["fetch", "highwater", "commit", "task_done"])
+
+DEFAULT_CONSUMER_CONFIG = {
+ 'client_id': __name__,
+ 'group_id': None,
+ 'metadata_broker_list': None,
+ 'socket_timeout_ms': 30 * 1000,
+ 'fetch_message_max_bytes': 1024 * 1024,
+ 'auto_offset_reset': 'largest',
+ 'fetch_min_bytes': 1,
+ 'fetch_wait_max_ms': 100,
+ 'refresh_leader_backoff_ms': 200,
+ 'deserializer_class': lambda msg: msg,
+ 'auto_commit_enable': False,
+ 'auto_commit_interval_ms': 60 * 1000,
+ 'auto_commit_interval_messages': None,
+ 'consumer_timeout_ms': -1,
+
+ # Currently unused
+ 'socket_receive_buffer_bytes': 64 * 1024,
+ 'num_consumer_fetchers': 1,
+ 'default_fetcher_backoff_ms': 1000,
+ 'queued_max_message_chunks': 10,
+ 'rebalance_max_retries': 4,
+ 'rebalance_backoff_ms': 2000,
+}
+
+BYTES_CONFIGURATION_KEYS = ('client_id', 'group_id')
+
+
+class KafkaConsumer(object):
+ """
+ A simpler kafka consumer
+
+ ```
+ # A very basic 'tail' consumer, with no stored offset management
+ kafka = KafkaConsumer('topic1')
+ for m in kafka:
+ print m
+
+ # Alternate interface: next()
+ print kafka.next()
+
+ # Alternate interface: batch iteration
+ while True:
+ for m in kafka.fetch_messages():
+ print m
+ print "Done with batch - let's do another!"
+ ```
+
+ ```
+ # more advanced consumer -- multiple topics w/ auto commit offset management
+ kafka = KafkaConsumer('topic1', 'topic2',
+ group_id='my_consumer_group',
+ auto_commit_enable=True,
+ auto_commit_interval_ms=30 * 1000,
+ auto_offset_reset='smallest')
+
+ # Infinite iteration
+ for m in kafka:
+ process_message(m)
+ kafka.task_done(m)
+
+ # Alternate interface: next()
+ m = kafka.next()
+ process_message(m)
+ kafka.task_done(m)
+
+ # If auto_commit_enable is False, remember to commit() periodically
+ kafka.commit()
+
+ # Batch process interface
+ while True:
+ for m in kafka.fetch_messages():
+ process_message(m)
+ kafka.task_done(m)
+ ```
+
+ messages (m) are namedtuples with attributes:
+ m.topic: topic name (str)
+ m.partition: partition number (int)
+ m.offset: message offset on topic-partition log (int)
+ m.key: key (bytes - can be None)
+ m.value: message (output of deserializer_class - default is raw bytes)
+
+ Configuration settings can be passed to constructor,
+ otherwise defaults will be used:
+ client_id='kafka.consumer.kafka',
+ group_id=None,
+ fetch_message_max_bytes=1024*1024,
+ fetch_min_bytes=1,
+ fetch_wait_max_ms=100,
+ refresh_leader_backoff_ms=200,
+ metadata_broker_list=None,
+ socket_timeout_ms=30*1000,
+ auto_offset_reset='largest',
+ deserializer_class=lambda msg: msg,
+ auto_commit_enable=False,
+ auto_commit_interval_ms=60 * 1000,
+ consumer_timeout_ms=-1
+
+ Configuration parameters are described in more detail at
+ http://kafka.apache.org/documentation.html#highlevelconsumerapi
+ """
+
+ def __init__(self, *topics, **configs):
+ self.configure(**configs)
+ self.set_topic_partitions(*topics)
+
+ def configure(self, **configs):
+ """
+ Configuration settings can be passed to constructor,
+ otherwise defaults will be used:
+ client_id='kafka.consumer.kafka',
+ group_id=None,
+ fetch_message_max_bytes=1024*1024,
+ fetch_min_bytes=1,
+ fetch_wait_max_ms=100,
+ refresh_leader_backoff_ms=200,
+ metadata_broker_list=None,
+ socket_timeout_ms=30*1000,
+ auto_offset_reset='largest',
+ deserializer_class=lambda msg: msg,
+ auto_commit_enable=False,
+ auto_commit_interval_ms=60 * 1000,
+ auto_commit_interval_messages=None,
+ consumer_timeout_ms=-1
+
+ Configuration parameters are described in more detail at
+ http://kafka.apache.org/documentation.html#highlevelconsumerapi
+ """
+ self._config = {}
+ for key in DEFAULT_CONSUMER_CONFIG:
+ self._config[key] = configs.pop(key, DEFAULT_CONSUMER_CONFIG[key])
+
+ if configs:
+ raise KafkaConfigurationError('Unknown configuration key(s): ' +
+ str(list(configs.keys())))
+
+ # Handle str/bytes conversions
+ for config_key in BYTES_CONFIGURATION_KEYS:
+ if isinstance(self._config[config_key], six.string_types):
+ logger.warning("Converting configuration key '%s' to bytes" %
+ config_key)
+ self._config[config_key] = self._config[config_key].encode('utf-8')
+
+ if self._config['auto_commit_enable']:
+ if not self._config['group_id']:
+ raise KafkaConfigurationError('KafkaConsumer configured to auto-commit without required consumer group (group_id)')
+
+ # Check auto-commit configuration
+ if self._config['auto_commit_enable']:
+ logger.info("Configuring consumer to auto-commit offsets")
+ self._reset_auto_commit()
+
+ if self._config['metadata_broker_list'] is None:
+ raise KafkaConfigurationError('metadata_broker_list required to '
+ 'configure KafkaConsumer')
+
+ self._client = KafkaClient(self._config['metadata_broker_list'],
+ client_id=self._config['client_id'],
+ timeout=(self._config['socket_timeout_ms'] / 1000.0))
+
+ def set_topic_partitions(self, *topics):
+ """
+ Set the topic/partitions to consume
+ Optionally specify offsets to start from
+
+ Accepts types:
+ str (utf-8): topic name (will consume all available partitions)
+ tuple: (topic, partition)
+ dict: { topic: partition }
+ { topic: [partition list] }
+ { topic: (partition tuple,) }
+
+ Optionally, offsets can be specified directly:
+ tuple: (topic, partition, offset)
+ dict: { (topic, partition): offset, ... }
+
+ Ex:
+ kafka = KafkaConsumer()
+
+ # Consume topic1-all; topic2-partition2; topic3-partition0
+ kafka.set_topic_partitions("topic1", ("topic2", 2), {"topic3": 0})
+
+ # Consume topic1-0 starting at offset 123, and topic2-1 at offset 456
+ # using tuples --
+ kafka.set_topic_partitions(("topic1", 0, 123), ("topic2", 1, 456))
+
+ # using dict --
+ kafka.set_topic_partitions({ ("topic1", 0): 123, ("topic2", 1): 456 })
+ """
+ self._topics = []
+ self._client.load_metadata_for_topics()
+
+ # Setup offsets
+ self._offsets = OffsetsStruct(fetch=dict(),
+ commit=dict(),
+ highwater=dict(),
+ task_done=dict())
+
+ # Handle different topic types
+ for arg in topics:
+
+ # Topic name str -- all partitions
+ if isinstance(arg, (six.string_types, six.binary_type)):
+ topic = kafka_bytestring(arg)
+
+ for partition in self._client.get_partition_ids_for_topic(topic):
+ self._consume_topic_partition(topic, partition)
+
+ # (topic, partition [, offset]) tuple
+ elif isinstance(arg, tuple):
+ topic = kafka_bytestring(arg[0])
+ partition = arg[1]
+ if len(arg) == 3:
+ offset = arg[2]
+ self._offsets.fetch[(topic, partition)] = offset
+ self._consume_topic_partition(topic, partition)
+
+ # { topic: partitions, ... } dict
+ elif isinstance(arg, dict):
+ for key, value in six.iteritems(arg):
+
+ # key can be string (a topic)
+ if isinstance(key, (six.string_types, six.binary_type)):
+ topic = kafka_bytestring(key)
+
+ # topic: partition
+ if isinstance(value, int):
+ self._consume_topic_partition(topic, value)
+
+ # topic: [ partition1, partition2, ... ]
+ elif isinstance(value, (list, tuple)):
+ for partition in value:
+ self._consume_topic_partition(topic, partition)
+ else:
+ raise KafkaConfigurationError('Unknown topic type (dict key must be '
+ 'int or list/tuple of ints)')
+
+ # (topic, partition): offset
+ elif isinstance(key, tuple):
+ topic = kafka_bytestring(key[0])
+ partition = key[1]
+ self._consume_topic_partition(topic, partition)
+ self._offsets.fetch[key] = value
+
+ else:
+ raise KafkaConfigurationError('Unknown topic type (%s)' % type(arg))
+
+ # If we have a consumer group, try to fetch stored offsets
+ if self._config['group_id']:
+ self._get_commit_offsets()
+
+ # Update missing fetch/commit offsets
+ for topic_partition in self._topics:
+
+ # Commit offsets default is None
+ if topic_partition not in self._offsets.commit:
+ self._offsets.commit[topic_partition] = None
+
+ # Skip if we already have a fetch offset from user args
+ if topic_partition not in self._offsets.fetch:
+
+ # Fetch offsets default is (1) commit
+ if self._offsets.commit[topic_partition] is not None:
+ self._offsets.fetch[topic_partition] = self._offsets.commit[topic_partition]
+
+ # or (2) auto reset
+ else:
+ self._offsets.fetch[topic_partition] = self._reset_partition_offset(topic_partition)
+
+ # highwater marks (received from server on fetch response)
+ # and task_done (set locally by user)
+ # should always get initialized to None
+ self._reset_highwater_offsets()
+ self._reset_task_done_offsets()
+
+ # Reset message iterator in case we were in the middle of one
+ self._reset_message_iterator()
+
+ def next(self):
+ """
+ Return a single message from the message iterator
+ If consumer_timeout_ms is set, will raise ConsumerTimeout
+ if no message is available
+ Otherwise blocks indefinitely
+
+ Note that this is also the method called internally during iteration:
+ ```
+ for m in consumer:
+ pass
+ ```
+ """
+ self._set_consumer_timeout_start()
+ while True:
+
+ try:
+ return six.next(self._get_message_iterator())
+
+ # Handle batch completion
+ except StopIteration:
+ self._reset_message_iterator()
+
+ self._check_consumer_timeout()
+
+ def fetch_messages(self):
+ """
+ Sends FetchRequests for all topic/partitions set for consumption
+ Returns a generator that yields KafkaMessage structs
+ after deserializing with the configured `deserializer_class`
+
+ Refreshes metadata on errors, and resets fetch offset on
+ OffsetOutOfRange, per the configured `auto_offset_reset` policy
+
+ Key configuration parameters:
+ `fetch_message_max_bytes`
+ `fetch_max_wait_ms`
+ `fetch_min_bytes`
+ `deserializer_class`
+ `auto_offset_reset`
+ """
+
+ max_bytes = self._config['fetch_message_max_bytes']
+ max_wait_time = self._config['fetch_wait_max_ms']
+ min_bytes = self._config['fetch_min_bytes']
+
+ # Get current fetch offsets
+ offsets = self._offsets.fetch
+ if not offsets:
+ if not self._topics:
+ raise KafkaConfigurationError('No topics or partitions configured')
+ raise KafkaConfigurationError('No fetch offsets found when calling fetch_messages')
+
+ fetches = []
+ for topic_partition, offset in six.iteritems(offsets):
+ fetches.append(FetchRequest(topic_partition[0], topic_partition[1], offset, max_bytes))
+
+ # client.send_fetch_request will collect topic/partition requests by leader
+ # and send each group as a single FetchRequest to the correct broker
+ try:
+ responses = self._client.send_fetch_request(fetches,
+ max_wait_time=max_wait_time,
+ min_bytes=min_bytes,
+ fail_on_error=False)
+ except FailedPayloadsError:
+ logger.warning('FailedPayloadsError attempting to fetch data from kafka')
+ self._refresh_metadata_on_error()
+ return
+
+ for resp in responses:
+ topic_partition = (resp.topic, resp.partition)
+ try:
+ check_error(resp)
+ except OffsetOutOfRangeError:
+ logger.warning('OffsetOutOfRange: topic %s, partition %d, offset %d '
+ '(Highwatermark: %d)',
+ resp.topic, resp.partition,
+ offsets[topic_partition], resp.highwaterMark)
+ # Reset offset
+ self._offsets.fetch[topic_partition] = self._reset_partition_offset(topic_partition)
+ continue
+
+ except NotLeaderForPartitionError:
+ logger.warning("NotLeaderForPartitionError for %s - %d. "
+ "Metadata may be out of date",
+ resp.topic, resp.partition)
+ self._refresh_metadata_on_error()
+ continue
+
+ except RequestTimedOutError:
+ logger.warning("RequestTimedOutError for %s - %d",
+ resp.topic, resp.partition)
+ continue
+
+ # Track server highwater mark
+ self._offsets.highwater[topic_partition] = resp.highwaterMark
+
+ # Yield each message
+ # Kafka-python could raise an exception during iteration
+ # we are not catching -- user will need to address
+ for (offset, message) in resp.messages:
+ # deserializer_class could raise an exception here
+ msg = KafkaMessage(resp.topic,
+ resp.partition,
+ offset, message.key,
+ self._config['deserializer_class'](message.value))
+
+ # Only increment fetch offset if we safely got the message and deserialized
+ self._offsets.fetch[topic_partition] = offset + 1
+
+ # Then yield to user
+ yield msg
+
+ def get_partition_offsets(self, topic, partition, request_time_ms, max_num_offsets):
+ """
+ Request available fetch offsets for a single topic/partition
+
+ @param topic (str)
+ @param partition (int)
+ @param request_time_ms (int) -- Used to ask for all messages before a
+ certain time (ms). There are two special
+ values. Specify -1 to receive the latest
+ offset (i.e. the offset of the next coming
+ message) and -2 to receive the earliest
+ available offset. Note that because offsets
+ are pulled in descending order, asking for
+ the earliest offset will always return you
+ a single element.
+ @param max_num_offsets (int)
+
+ @return offsets (list)
+ """
+ reqs = [OffsetRequest(topic, partition, request_time_ms, max_num_offsets)]
+
+ (resp,) = self._client.send_offset_request(reqs)
+
+ check_error(resp)
+
+ # Just for sanity..
+ # probably unnecessary
+ assert resp.topic == topic
+ assert resp.partition == partition
+
+ return resp.offsets
+
+ def offsets(self, group=None):
+ """
+ Returns a copy of internal offsets struct
+ optional param: group [fetch|commit|task_done|highwater]
+ if no group specified, returns all groups
+ """
+ if not group:
+ return {
+ 'fetch': self.offsets('fetch'),
+ 'commit': self.offsets('commit'),
+ 'task_done': self.offsets('task_done'),
+ 'highwater': self.offsets('highwater')
+ }
+ else:
+ return dict(deepcopy(getattr(self._offsets, group)))
+
+ def task_done(self, message):
+ """
+ Mark a fetched message as consumed.
+ Offsets for messages marked as "task_done" will be stored back
+ to the kafka cluster for this consumer group on commit()
+ """
+ topic_partition = (message.topic, message.partition)
+ offset = message.offset
+
+ # Warn on non-contiguous offsets
+ prev_done = self._offsets.task_done[topic_partition]
+ if prev_done is not None and offset != (prev_done + 1):
+ logger.warning('Marking task_done on a non-continuous offset: %d != %d + 1',
+ offset, prev_done)
+
+ # Warn on smaller offsets than previous commit
+ # "commit" offsets are actually the offset of the next message to fetch.
+ prev_commit = self._offsets.commit[topic_partition]
+ if prev_commit is not None and ((offset + 1) <= prev_commit):
+ logger.warning('Marking task_done on a previously committed offset?: %d (+1) <= %d',
+ offset, prev_commit)
+
+ self._offsets.task_done[topic_partition] = offset
+
+ # Check for auto-commit
+ if self._does_auto_commit_messages():
+ self._incr_auto_commit_message_count()
+
+ if self._should_auto_commit():
+ self.commit()
+
+ def commit(self):
+ """
+ Store consumed message offsets (marked via task_done())
+ to kafka cluster for this consumer_group.
+
+ Note -- this functionality requires server version >=0.8.1.1
+ see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI
+ """
+ if not self._config['group_id']:
+ logger.warning('Cannot commit without a group_id!')
+ raise KafkaConfigurationError('Attempted to commit offsets without a configured consumer group (group_id)')
+
+ # API supports storing metadata with each commit
+ # but for now it is unused
+ metadata = b''
+
+ offsets = self._offsets.task_done
+ commits = []
+ for topic_partition, task_done_offset in six.iteritems(offsets):
+
+ # Skip if None
+ if task_done_offset is None:
+ continue
+
+ # Commit offsets as the next offset to fetch
+ # which is consistent with the Java Client
+ # task_done is marked by messages consumed,
+ # so add one to mark the next message for fetching
+ commit_offset = (task_done_offset + 1)
+
+ # Skip if no change from previous committed
+ if commit_offset == self._offsets.commit[topic_partition]:
+ continue
+
+ commits.append(OffsetCommitRequest(topic_partition[0], topic_partition[1], commit_offset, metadata))
+
+ if commits:
+ logger.info('committing consumer offsets to group %s', self._config['group_id'])
+ resps = self._client.send_offset_commit_request(self._config['group_id'],
+ commits,
+ fail_on_error=False)
+
+ for r in resps:
+ check_error(r)
+ topic_partition = (r.topic, r.partition)
+ task_done = self._offsets.task_done[topic_partition]
+ self._offsets.commit[topic_partition] = (task_done + 1)
+
+ if self._config['auto_commit_enable']:
+ self._reset_auto_commit()
+
+ return True
+
+ else:
+ logger.info('No new offsets found to commit in group %s', self._config['group_id'])
+ return False
+
+ #
+ # Topic/partition management private methods
+ #
+
+ def _consume_topic_partition(self, topic, partition):
+ topic = kafka_bytestring(topic)
+ if not isinstance(partition, int):
+ raise KafkaConfigurationError('Unknown partition type (%s) '
+ '-- expected int' % type(partition))
+
+ if topic not in self._client.topic_partitions:
+ raise UnknownTopicOrPartitionError("Topic %s not found in broker metadata" % topic)
+ if partition not in self._client.get_partition_ids_for_topic(topic):
+ raise UnknownTopicOrPartitionError("Partition %d not found in Topic %s "
+ "in broker metadata" % (partition, topic))
+ logger.info("Configuring consumer to fetch topic '%s', partition %d", topic, partition)
+ self._topics.append((topic, partition))
+
+ def _refresh_metadata_on_error(self):
+ refresh_ms = self._config['refresh_leader_backoff_ms']
+ jitter_pct = 0.20
+ sleep_ms = random.randint(
+ int((1.0 - 0.5 * jitter_pct) * refresh_ms),
+ int((1.0 + 0.5 * jitter_pct) * refresh_ms)
+ )
+ while True:
+ logger.info("Sleeping for refresh_leader_backoff_ms: %d", sleep_ms)
+ time.sleep(sleep_ms / 1000.0)
+ try:
+ self._client.load_metadata_for_topics()
+ except KafkaUnavailableError:
+ logger.warning("Unable to refresh topic metadata... cluster unavailable")
+ self._check_consumer_timeout()
+ else:
+ logger.info("Topic metadata refreshed")
+ return
+
+ #
+ # Offset-managment private methods
+ #
+
+ def _get_commit_offsets(self):
+ logger.info("Consumer fetching stored offsets")
+ for topic_partition in self._topics:
+ (resp,) = self._client.send_offset_fetch_request(
+ self._config['group_id'],
+ [OffsetFetchRequest(topic_partition[0], topic_partition[1])],
+ fail_on_error=False)
+ try:
+ check_error(resp)
+ # API spec says server wont set an error here
+ # but 0.8.1.1 does actually...
+ except UnknownTopicOrPartitionError:
+ pass
+
+ # -1 offset signals no commit is currently stored
+ if resp.offset == -1:
+ self._offsets.commit[topic_partition] = None
+
+ # Otherwise we committed the stored offset
+ # and need to fetch the next one
+ else:
+ self._offsets.commit[topic_partition] = resp.offset
+
+ def _reset_highwater_offsets(self):
+ for topic_partition in self._topics:
+ self._offsets.highwater[topic_partition] = None
+
+ def _reset_task_done_offsets(self):
+ for topic_partition in self._topics:
+ self._offsets.task_done[topic_partition] = None
+
+ def _reset_partition_offset(self, topic_partition):
+ (topic, partition) = topic_partition
+ LATEST = -1
+ EARLIEST = -2
+
+ request_time_ms = None
+ if self._config['auto_offset_reset'] == 'largest':
+ request_time_ms = LATEST
+ elif self._config['auto_offset_reset'] == 'smallest':
+ request_time_ms = EARLIEST
+ else:
+
+ # Let's raise an reasonable exception type if user calls
+ # outside of an exception context
+ if sys.exc_info() == (None, None, None):
+ raise OffsetOutOfRangeError('Cannot reset partition offsets without a '
+ 'valid auto_offset_reset setting '
+ '(largest|smallest)')
+
+ # Otherwise we should re-raise the upstream exception
+ # b/c it typically includes additional data about
+ # the request that triggered it, and we do not want to drop that
+ raise
+
+ (offset, ) = self.get_partition_offsets(topic, partition,
+ request_time_ms, max_num_offsets=1)
+ return offset
+
+ #
+ # Consumer Timeout private methods
+ #
+
+ def _set_consumer_timeout_start(self):
+ self._consumer_timeout = False
+ if self._config['consumer_timeout_ms'] >= 0:
+ self._consumer_timeout = time.time() + (self._config['consumer_timeout_ms'] / 1000.0)
+
+ def _check_consumer_timeout(self):
+ if self._consumer_timeout and time.time() > self._consumer_timeout:
+ raise ConsumerTimeout('Consumer timed out after %d ms' % + self._config['consumer_timeout_ms'])
+
+ #
+ # Autocommit private methods
+ #
+
+ def _should_auto_commit(self):
+ if self._does_auto_commit_ms():
+ if time.time() >= self._next_commit_time:
+ return True
+
+ if self._does_auto_commit_messages():
+ if self._uncommitted_message_count >= self._config['auto_commit_interval_messages']:
+ return True
+
+ return False
+
+ def _reset_auto_commit(self):
+ self._uncommitted_message_count = 0
+ self._next_commit_time = None
+ if self._does_auto_commit_ms():
+ self._next_commit_time = time.time() + (self._config['auto_commit_interval_ms'] / 1000.0)
+
+ def _incr_auto_commit_message_count(self, n=1):
+ self._uncommitted_message_count += n
+
+ def _does_auto_commit_ms(self):
+ if not self._config['auto_commit_enable']:
+ return False
+
+ conf = self._config['auto_commit_interval_ms']
+ if conf is not None and conf > 0:
+ return True
+ return False
+
+ def _does_auto_commit_messages(self):
+ if not self._config['auto_commit_enable']:
+ return False
+
+ conf = self._config['auto_commit_interval_messages']
+ if conf is not None and conf > 0:
+ return True
+ return False
+
+ #
+ # Message iterator private methods
+ #
+
+ def __iter__(self):
+ return self
+
+ def __next__(self):
+ return self.next()
+
+ def _get_message_iterator(self):
+ # Fetch a new batch if needed
+ if self._msg_iter is None:
+ self._msg_iter = self.fetch_messages()
+
+ return self._msg_iter
+
+ def _reset_message_iterator(self):
+ self._msg_iter = None
+
+ #
+ # python private methods
+ #
+
+ def __repr__(self):
+ return '<KafkaConsumer topics=(%s)>' % ', '.join(["%s-%d" % topic_partition
+ for topic_partition in
+ self._topics])
diff --git a/kafka/util.py b/kafka/util.py
index 1e03cf1..72ac521 100644
--- a/kafka/util.py
+++ b/kafka/util.py
@@ -86,6 +86,18 @@ def group_by_topic_and_partition(tuples):
return out
+def kafka_bytestring(s):
+ """
+ Takes a string or bytes instance
+ Returns bytes, encoding strings in utf-8 as necessary
+ """
+ if isinstance(s, six.binary_type):
+ return s
+ if isinstance(s, six.string_types):
+ return s.encode('utf-8')
+ raise TypeError(s)
+
+
class ReentrantTimer(object):
"""
A timer that can be restarted, unlike threading.Timer
diff --git a/test/test_client_integration.py b/test/test_client_integration.py
index 0cd2c9e..cc60778 100644
--- a/test/test_client_integration.py
+++ b/test/test_client_integration.py
@@ -32,12 +32,12 @@ class TestKafkaClientIntegration(KafkaIntegrationTestCase):
fetch = FetchRequest(self.topic, 0, 0, 1024)
fetch_resp, = self.client.send_fetch_request([fetch])
- self.assertEquals(fetch_resp.error, 0)
- self.assertEquals(fetch_resp.topic, self.topic)
- self.assertEquals(fetch_resp.partition, 0)
+ self.assertEqual(fetch_resp.error, 0)
+ self.assertEqual(fetch_resp.topic, self.topic)
+ self.assertEqual(fetch_resp.partition, 0)
messages = list(fetch_resp.messages)
- self.assertEquals(len(messages), 0)
+ self.assertEqual(len(messages), 0)
@kafka_versions("all")
def test_ensure_topic_exists(self):
@@ -58,10 +58,10 @@ class TestKafkaClientIntegration(KafkaIntegrationTestCase):
def test_commit_fetch_offsets(self):
req = OffsetCommitRequest(self.topic, 0, 42, b"metadata")
(resp,) = self.client.send_offset_commit_request(b"group", [req])
- self.assertEquals(resp.error, 0)
+ self.assertEqual(resp.error, 0)
req = OffsetFetchRequest(self.topic, 0)
(resp,) = self.client.send_offset_fetch_request(b"group", [req])
- self.assertEquals(resp.error, 0)
- self.assertEquals(resp.offset, 42)
- self.assertEquals(resp.metadata, b"") # Metadata isn't stored for now
+ self.assertEqual(resp.error, 0)
+ self.assertEqual(resp.offset, 42)
+ self.assertEqual(resp.metadata, b"") # Metadata isn't stored for now
diff --git a/test/test_codec.py b/test/test_codec.py
index 0ea1074..2d7670a 100644
--- a/test/test_codec.py
+++ b/test/test_codec.py
@@ -15,14 +15,14 @@ class TestCodec(unittest.TestCase):
for i in xrange(1000):
s1 = random_string(100)
s2 = gzip_decode(gzip_encode(s1))
- self.assertEquals(s1, s2)
+ self.assertEqual(s1, s2)
@unittest.skipUnless(has_snappy(), "Snappy not available")
def test_snappy(self):
for i in xrange(1000):
s1 = random_string(100)
s2 = snappy_decode(snappy_encode(s1))
- self.assertEquals(s1, s2)
+ self.assertEqual(s1, s2)
@unittest.skipUnless(has_snappy(), "Snappy not available")
def test_snappy_detect_xerial(self):
@@ -53,7 +53,7 @@ class TestCodec(unittest.TestCase):
+ struct.pack('!i', block_len) + random_snappy \
+ struct.pack('!i', block_len2) + random_snappy2 \
- self.assertEquals(snappy_decode(to_test), (b'SNAPPY' * 50) + (b'XERIAL' * 50))
+ self.assertEqual(snappy_decode(to_test), (b'SNAPPY' * 50) + (b'XERIAL' * 50))
@unittest.skipUnless(has_snappy(), "Snappy not available")
def test_snappy_encode_xerial(self):
@@ -68,5 +68,5 @@ class TestCodec(unittest.TestCase):
to_test = (b'SNAPPY' * 50) + (b'XERIAL' * 50)
compressed = snappy_encode(to_test, xerial_compatible=True, xerial_blocksize=300)
- self.assertEquals(compressed, to_ensure)
+ self.assertEqual(compressed, to_ensure)
diff --git a/test/test_conn.py b/test/test_conn.py
index 7b3beb7..2c8f3b2 100644
--- a/test/test_conn.py
+++ b/test/test_conn.py
@@ -120,7 +120,7 @@ class ConnTest(unittest.TestCase):
def test_recv(self):
- self.assertEquals(self.conn.recv(self.config['request_id']), self.config['payload'])
+ self.assertEqual(self.conn.recv(self.config['request_id']), self.config['payload'])
def test_recv__reconnects_on_dirty_conn(self):
@@ -151,8 +151,8 @@ class ConnTest(unittest.TestCase):
def test_recv__doesnt_consume_extra_data_in_stream(self):
# Here just test that each call to recv will return a single payload
- self.assertEquals(self.conn.recv(self.config['request_id']), self.config['payload'])
- self.assertEquals(self.conn.recv(self.config['request_id']), self.config['payload2'])
+ self.assertEqual(self.conn.recv(self.config['request_id']), self.config['payload'])
+ self.assertEqual(self.conn.recv(self.config['request_id']), self.config['payload2'])
def test_close__object_is_reusable(self):
diff --git a/test/test_consumer.py b/test/test_consumer.py
index 9060919..7b8f370 100644
--- a/test/test_consumer.py
+++ b/test/test_consumer.py
@@ -2,9 +2,14 @@
from mock import MagicMock
from . import unittest
-from kafka.consumer import SimpleConsumer
+from kafka import SimpleConsumer, KafkaConsumer
+from kafka.common import KafkaConfigurationError
class TestKafkaConsumer(unittest.TestCase):
def test_non_integer_partitions(self):
with self.assertRaises(AssertionError):
SimpleConsumer(MagicMock(), 'group', 'topic', partitions = [ '0' ])
+
+ def test_broker_list_required(self):
+ with self.assertRaises(KafkaConfigurationError):
+ KafkaConsumer()
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py
index 2762008..ea32318 100644
--- a/test/test_consumer_integration.py
+++ b/test/test_consumer_integration.py
@@ -1,9 +1,12 @@
+import logging
import os
from six.moves import xrange
-from kafka import SimpleConsumer, MultiProcessConsumer, create_message
-from kafka.common import ProduceRequest, ConsumerFetchSizeTooSmall
+from kafka import SimpleConsumer, MultiProcessConsumer, KafkaConsumer, create_message
+from kafka.common import (
+ ProduceRequest, ConsumerFetchSizeTooSmall, ConsumerTimeout
+)
from kafka.consumer.base import MAX_FETCH_BUFFER_SIZE_BYTES
from test.fixtures import ZookeeperFixture, KafkaFixture
@@ -36,16 +39,39 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
messages = [ create_message(self.msg(str(msg))) for msg in messages ]
produce = ProduceRequest(self.topic, partition, messages = messages)
resp, = self.client.send_produce_request([produce])
- self.assertEquals(resp.error, 0)
+ self.assertEqual(resp.error, 0)
return [ x.value for x in messages ]
def assert_message_count(self, messages, num_messages):
# Make sure we got them all
- self.assertEquals(len(messages), num_messages)
+ self.assertEqual(len(messages), num_messages)
# Make sure there are no duplicates
- self.assertEquals(len(set(messages)), num_messages)
+ self.assertEqual(len(set(messages)), num_messages)
+
+ def consumer(self, **kwargs):
+ if os.environ['KAFKA_VERSION'] == "0.8.0":
+ # Kafka 0.8.0 simply doesn't support offset requests, so hard code it being off
+ kwargs['auto_commit'] = False
+ else:
+ kwargs.setdefault('auto_commit', True)
+
+ consumer_class = kwargs.pop('consumer', SimpleConsumer)
+ group = kwargs.pop('group', self.id().encode('utf-8'))
+ topic = kwargs.pop('topic', self.topic)
+
+ if consumer_class == SimpleConsumer:
+ kwargs.setdefault('iter_timeout', 0)
+
+ return consumer_class(self.client, group, topic, **kwargs)
+
+ def kafka_consumer(self, **configs):
+ brokers = '%s:%d' % (self.server.host, self.server.port)
+ consumer = KafkaConsumer(self.topic,
+ metadata_broker_list=brokers,
+ **configs)
+ return consumer
@kafka_versions("all")
def test_simple_consumer(self):
@@ -114,9 +140,11 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
self.send_messages(0, range(0, 10))
self.send_messages(1, range(10, 20))
- self.assertEquals(consumer.pending(), 20)
- self.assertEquals(consumer.pending(partitions=[0]), 10)
- self.assertEquals(consumer.pending(partitions=[1]), 10)
+ consumer = self.consumer()
+
+ self.assertEqual(consumer.pending(), 20)
+ self.assertEqual(consumer.pending(partitions=[0]), 10)
+ self.assertEqual(consumer.pending(partitions=[1]), 10)
# move to last message, so one partition should have 1 pending
# message and other 0
@@ -175,9 +203,9 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer = MultiProcessConsumer(self.client, "group1", self.topic, auto_commit=False)
- self.assertEquals(consumer.pending(), 20)
- self.assertEquals(consumer.pending(partitions=[0]), 10)
- self.assertEquals(consumer.pending(partitions=[1]), 10)
+ self.assertEqual(consumer.pending(), 20)
+ self.assertEqual(consumer.pending(partitions=[0]), 10)
+ self.assertEqual(consumer.pending(partitions=[1]), 10)
consumer.stop()
@@ -225,7 +253,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
# Consume giant message successfully
message = big_consumer.get_message(block=False, timeout=10)
self.assertIsNotNone(message)
- self.assertEquals(message.message.value, huge_message)
+ self.assertEqual(message.message.value, huge_message)
big_consumer.stop()
@@ -273,20 +301,101 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer = self.consumer(buffer_size=1024, max_buffer_size=2048)
messages = [ message for message in consumer ]
- self.assertEquals(len(messages), 2)
+ self.assertEqual(len(messages), 2)
- def consumer(self, **kwargs):
- if os.environ['KAFKA_VERSION'] == "0.8.0":
- # Kafka 0.8.0 simply doesn't support offset requests, so hard code it being off
- kwargs['auto_commit'] = False
- else:
- kwargs.setdefault('auto_commit', True)
+ @kafka_versions("all")
+ def test_kafka_consumer(self):
+ self.send_messages(0, range(0, 100))
+ self.send_messages(1, range(100, 200))
- consumer_class = kwargs.pop('consumer', SimpleConsumer)
- group = kwargs.pop('group', self.id().encode('utf-8'))
- topic = kwargs.pop('topic', self.topic)
+ # Start a consumer
+ consumer = self.kafka_consumer(auto_offset_reset='smallest',
+ consumer_timeout_ms=5000)
+ n = 0
+ messages = {0: set(), 1: set()}
+ logging.debug("kafka consumer offsets: %s" % consumer.offsets())
+ for m in consumer:
+ logging.debug("Consumed message %s" % repr(m))
+ n += 1
+ messages[m.partition].add(m.offset)
+ if n >= 200:
+ break
+
+ self.assertEqual(len(messages[0]), 100)
+ self.assertEqual(len(messages[1]), 100)
- if consumer_class == SimpleConsumer:
- kwargs.setdefault('iter_timeout', 0)
+ @kafka_versions("all")
+ def test_kafka_consumer__blocking(self):
+ TIMEOUT_MS = 500
+ consumer = self.kafka_consumer(auto_offset_reset='smallest',
+ consumer_timeout_ms=TIMEOUT_MS)
- return consumer_class(self.client, group, topic, **kwargs)
+ # Ask for 5 messages, nothing in queue, block 5 seconds
+ with Timer() as t:
+ with self.assertRaises(ConsumerTimeout):
+ msg = consumer.next()
+ self.assertGreaterEqual(t.interval, TIMEOUT_MS / 1000.0 )
+
+ self.send_messages(0, range(0, 10))
+
+ # Ask for 5 messages, 10 in queue. Get 5 back, no blocking
+ messages = set()
+ with Timer() as t:
+ for i in range(5):
+ msg = consumer.next()
+ messages.add((msg.partition, msg.offset))
+ self.assertEqual(len(messages), 5)
+ self.assertLess(t.interval, TIMEOUT_MS / 1000.0 )
+
+ # Ask for 10 messages, get 5 back, block 5 seconds
+ messages = set()
+ with Timer() as t:
+ with self.assertRaises(ConsumerTimeout):
+ for i in range(10):
+ msg = consumer.next()
+ messages.add((msg.partition, msg.offset))
+ self.assertEqual(len(messages), 5)
+ self.assertGreaterEqual(t.interval, TIMEOUT_MS / 1000.0 )
+
+ @kafka_versions("0.8.1", "0.8.1.1")
+ def test_kafka_consumer__offset_commit_resume(self):
+ GROUP_ID = random_string(10)
+
+ self.send_messages(0, range(0, 100))
+ self.send_messages(1, range(100, 200))
+
+ # Start a consumer
+ consumer1 = self.kafka_consumer(
+ group_id = GROUP_ID,
+ auto_commit_enable = True,
+ auto_commit_interval_ms = None,
+ auto_commit_interval_messages = 20,
+ auto_offset_reset='smallest',
+ )
+
+ # Grab the first 195 messages
+ output_msgs1 = []
+ for _ in xrange(195):
+ m = consumer1.next()
+ output_msgs1.append(m)
+ consumer1.task_done(m)
+ self.assert_message_count(output_msgs1, 195)
+
+ # The total offset across both partitions should be at 180
+ consumer2 = self.kafka_consumer(
+ group_id = GROUP_ID,
+ auto_commit_enable = True,
+ auto_commit_interval_ms = None,
+ auto_commit_interval_messages = 20,
+ consumer_timeout_ms = 100,
+ auto_offset_reset='smallest',
+ )
+
+ # 181-200
+ output_msgs2 = []
+ with self.assertRaises(ConsumerTimeout):
+ while True:
+ m = consumer2.next()
+ output_msgs2.append(m)
+ self.assert_message_count(output_msgs2, 20)
+ self.assertEqual(len(set(output_msgs1) & set(output_msgs2)), 15)
diff --git a/test/test_failover_integration.py b/test/test_failover_integration.py
index d307d41..ca71f2d 100644
--- a/test/test_failover_integration.py
+++ b/test/test_failover_integration.py
@@ -121,7 +121,7 @@ class TestFailover(KafkaIntegrationTestCase):
logging.debug('_send_random_message to %s:%d -- try %d', topic, partition, j)
resp = producer.send_messages(topic, partition, random_string(10))
if len(resp) > 0:
- self.assertEquals(resp[0].error, 0)
+ self.assertEqual(resp[0].error, 0)
logging.debug('_send_random_message to %s:%d -- try %d success', topic, partition, j)
def _kill_leader(self, topic, partition):
diff --git a/test/test_package.py b/test/test_package.py
index 9b69a7c..e91753c 100644
--- a/test/test_package.py
+++ b/test/test_package.py
@@ -3,27 +3,27 @@ from . import unittest
class TestPackage(unittest.TestCase):
def test_top_level_namespace(self):
import kafka as kafka1
- self.assertEquals(kafka1.KafkaClient.__name__, "KafkaClient")
- self.assertEquals(kafka1.client.__name__, "kafka.client")
- self.assertEquals(kafka1.codec.__name__, "kafka.codec")
+ self.assertEqual(kafka1.KafkaClient.__name__, "KafkaClient")
+ self.assertEqual(kafka1.client.__name__, "kafka.client")
+ self.assertEqual(kafka1.codec.__name__, "kafka.codec")
def test_submodule_namespace(self):
import kafka.client as client1
- self.assertEquals(client1.__name__, "kafka.client")
- self.assertEquals(client1.KafkaClient.__name__, "KafkaClient")
+ self.assertEqual(client1.__name__, "kafka.client")
+ self.assertEqual(client1.KafkaClient.__name__, "KafkaClient")
from kafka import client as client2
- self.assertEquals(client2.__name__, "kafka.client")
- self.assertEquals(client2.KafkaClient.__name__, "KafkaClient")
+ self.assertEqual(client2.__name__, "kafka.client")
+ self.assertEqual(client2.KafkaClient.__name__, "KafkaClient")
from kafka.client import KafkaClient as KafkaClient1
- self.assertEquals(KafkaClient1.__name__, "KafkaClient")
+ self.assertEqual(KafkaClient1.__name__, "KafkaClient")
from kafka.codec import gzip_encode as gzip_encode1
- self.assertEquals(gzip_encode1.__name__, "gzip_encode")
+ self.assertEqual(gzip_encode1.__name__, "gzip_encode")
from kafka import KafkaClient as KafkaClient2
- self.assertEquals(KafkaClient2.__name__, "KafkaClient")
+ self.assertEqual(KafkaClient2.__name__, "KafkaClient")
from kafka.codec import snappy_encode
- self.assertEquals(snappy_encode.__name__, "snappy_encode")
+ self.assertEqual(snappy_encode.__name__, "snappy_encode")
diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py
index d68af72..4331d23 100644
--- a/test/test_producer_integration.py
+++ b/test/test_producer_integration.py
@@ -251,7 +251,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
producer = SimpleProducer(self.client, req_acks=SimpleProducer.ACK_NOT_REQUIRED)
resp = producer.send_messages(self.topic, self.msg("one"))
- self.assertEquals(len(resp), 0)
+ self.assertEqual(len(resp), 0)
self.assert_fetch_offset(0, start_offset0, [ self.msg("one") ])
producer.stop()
@@ -301,7 +301,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
)
# Batch mode is async. No ack
- self.assertEquals(len(resp), 0)
+ self.assertEqual(len(resp), 0)
# It hasn't sent yet
self.assert_fetch_offset(0, start_offset0, [])
@@ -314,7 +314,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
)
# Batch mode is async. No ack
- self.assertEquals(len(resp), 0)
+ self.assertEqual(len(resp), 0)
self.assert_fetch_offset(0, start_offset0, [
self.msg("one"),
@@ -350,7 +350,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
)
# Batch mode is async. No ack
- self.assertEquals(len(resp), 0)
+ self.assertEqual(len(resp), 0)
# It hasn't sent yet
self.assert_fetch_offset(0, start_offset0, [])
@@ -363,7 +363,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
)
# Batch mode is async. No ack
- self.assertEquals(len(resp), 0)
+ self.assertEqual(len(resp), 0)
# Wait the timeout out
time.sleep(5)
@@ -389,7 +389,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
producer = SimpleProducer(self.client, async=True)
resp = producer.send_messages(self.topic, self.msg("one"))
- self.assertEquals(len(resp), 0)
+ self.assertEqual(len(resp), 0)
self.assert_fetch_offset(0, start_offset0, [ self.msg("one") ])
@@ -402,7 +402,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
producer = KeyedProducer(self.client, partitioner = RoundRobinPartitioner, async=True)
resp = producer.send(self.topic, self.key("key1"), self.msg("one"))
- self.assertEquals(len(resp), 0)
+ self.assertEqual(len(resp), 0)
self.assert_fetch_offset(0, start_offset0, [ self.msg("one") ])
@@ -429,9 +429,9 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
resp, = self.client.send_fetch_request([ FetchRequest(self.topic, partition, start_offset, 1024) ])
- self.assertEquals(resp.error, 0)
- self.assertEquals(resp.partition, partition)
+ self.assertEqual(resp.error, 0)
+ self.assertEqual(resp.partition, partition)
messages = [ x.message.value for x in resp.messages ]
self.assertEqual(messages, expected_messages)
- self.assertEquals(resp.highwaterMark, start_offset+len(expected_messages))
+ self.assertEqual(resp.highwaterMark, start_offset+len(expected_messages))