diff options
Diffstat (limited to 'kafka')
| -rw-r--r-- | kafka/__init__.py | 2 | ||||
| -rw-r--r-- | kafka/client.py | 62 | ||||
| -rw-r--r-- | kafka/conn.py | 23 | ||||
| -rw-r--r-- | kafka/consumer/base.py | 10 | ||||
| -rw-r--r-- | kafka/consumer/kafka.py | 246 | ||||
| -rw-r--r-- | kafka/consumer/multiprocess.py | 39 | ||||
| -rw-r--r-- | kafka/consumer/simple.py | 128 | ||||
| -rw-r--r-- | kafka/context.py | 5 | ||||
| -rw-r--r-- | kafka/partitioner/base.py | 11 | ||||
| -rw-r--r-- | kafka/partitioner/hashed.py | 4 | ||||
| -rw-r--r-- | kafka/partitioner/roundrobin.py | 4 | ||||
| -rw-r--r-- | kafka/producer/base.py | 26 | ||||
| -rw-r--r-- | kafka/producer/keyed.py | 24 | ||||
| -rw-r--r-- | kafka/producer/simple.py | 38 | ||||
| -rw-r--r-- | kafka/protocol.py | 125 | ||||
| -rw-r--r-- | kafka/queue.py | 219 | ||||
| -rw-r--r-- | kafka/util.py | 20 |
17 files changed, 448 insertions, 538 deletions
diff --git a/kafka/__init__.py b/kafka/__init__.py index 8ccdb4c..3536084 100644 --- a/kafka/__init__.py +++ b/kafka/__init__.py @@ -4,7 +4,7 @@ import pkg_resources __version__ = pkg_resources.require('kafka-python')[0].version __author__ = 'David Arthur' __license__ = 'Apache License 2.0' -__copyright__ = 'Copyright 2014, David Arthur under Apache License, v2.0' +__copyright__ = 'Copyright 2015, David Arthur under Apache License, v2.0' from kafka.client import KafkaClient from kafka.conn import KafkaConnection diff --git a/kafka/client.py b/kafka/client.py index 7a0cf18..48a534e 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -15,6 +15,7 @@ from kafka.common import (TopicAndPartition, BrokerMetadata, from kafka.conn import collect_hosts, KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS from kafka.protocol import KafkaProtocol +from kafka.util import kafka_bytestring log = logging.getLogger("kafka") @@ -30,7 +31,7 @@ class KafkaClient(object): def __init__(self, hosts, client_id=CLIENT_ID, timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS): # We need one connection to bootstrap - self.client_id = client_id + self.client_id = kafka_bytestring(client_id) self.timeout = timeout self.hosts = collect_hosts(hosts) @@ -85,7 +86,7 @@ class KafkaClient(object): self.load_metadata_for_topics(topic) # If the partition doesn't actually exist, raise - if partition not in self.topic_partitions[topic]: + if partition not in self.topic_partitions.get(topic, []): raise UnknownTopicOrPartitionError(key) # If there's no leader for the partition, raise @@ -131,19 +132,21 @@ class KafkaClient(object): the leader broker for that partition using the supplied encode/decode functions - Params - ====== + Arguments: + payloads: list of object-like entities with a topic (str) and - partition (int) attribute + partition (int) attribute + encode_fn: a method to encode the list of payloads to a request body, - must accept client_id, correlation_id, and payloads as - keyword arguments + must accept client_id, correlation_id, and payloads as + keyword arguments + decode_fn: a method to decode a response body into response objects. - The response objects must be object-like and have topic - and partition attributes + The response objects must be object-like and have topic + and partition attributes + + Returns: - Return - ====== List of response objects in the same order as the supplied payloads """ @@ -175,8 +178,13 @@ class KafkaClient(object): # Send the request, recv the response try: conn.send(requestId, request) + + # decoder_fn=None signal that the server is expected to not + # send a response. This probably only applies to + # ProduceRequest w/ acks = 0 if decoder_fn is None: continue + try: response = conn.recv(requestId) except ConnectionError as e: @@ -257,9 +265,9 @@ class KafkaClient(object): def get_partition_ids_for_topic(self, topic): if topic not in self.topic_partitions: - return None + return [] - return list(self.topic_partitions[topic]) + return sorted(list(self.topic_partitions[topic])) def ensure_topic_exists(self, topic, timeout = 30): start_time = time.time() @@ -285,9 +293,9 @@ class KafkaClient(object): This method should be called after receiving any error - @param: *topics (optional) - If a list of topics is provided, the metadata refresh will be limited - to the specified topics only. + Arguments: + *topics (optional): If a list of topics is provided, + the metadata refresh will be limited to the specified topics only. Exceptions: ---------- @@ -384,18 +392,16 @@ class KafkaClient(object): sent to a specific broker. Output is a list of responses in the same order as the list of payloads specified - Params - ====== - payloads: list of ProduceRequest - fail_on_error: boolean, should we raise an Exception if we - encounter an API error? - callback: function, instead of returning the ProduceResponse, - first pass it through this function - - Return - ====== - list of ProduceResponse or callback(ProduceResponse), in the - order of input payloads + Arguments: + payloads: list of ProduceRequest + fail_on_error: boolean, should we raise an Exception if we + encounter an API error? + callback: function, instead of returning the ProduceResponse, + first pass it through this function + + Returns: + list of ProduceResponse or callback(ProduceResponse), in the + order of input payloads """ encoder = functools.partial( diff --git a/kafka/conn.py b/kafka/conn.py index ddfee8b..30debec 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -47,10 +47,11 @@ class KafkaConnection(local): we can do something in here to facilitate multiplexed requests/responses since the Kafka API includes a correlation id. - host: the host name or IP address of a kafka broker - port: the port number the kafka broker is listening on - timeout: default 120. The socket timeout for sending and receiving data - in seconds. None means no timeout, so a request can block forever. + Arguments: + host: the host name or IP address of a kafka broker + port: the port number the kafka broker is listening on + timeout: default 120. The socket timeout for sending and receiving data + in seconds. None means no timeout, so a request can block forever. """ def __init__(self, host, port, timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS): super(KafkaConnection, self).__init__() @@ -116,8 +117,10 @@ class KafkaConnection(local): def send(self, request_id, payload): """ Send a request to Kafka - param: request_id -- can be any int (used only for debug logging...) - param: payload -- an encoded kafka packet (see KafkaProtocol) + + Arguments:: + request_id (int): can be any int (used only for debug logging...) + payload: an encoded kafka packet (see KafkaProtocol) """ log.debug("About to send %d bytes to Kafka, request %d" % (len(payload), request_id)) @@ -135,8 +138,12 @@ class KafkaConnection(local): def recv(self, request_id): """ Get a response packet from Kafka - param: request_id -- can be any int (only used for debug logging...) - returns encoded kafka packet response from server as type str + + Arguments: + request_id: can be any int (only used for debug logging...) + + Returns: + str: Encoded kafka packet response from server """ log.debug("Reading response %d from Kafka" % request_id) diff --git a/kafka/consumer/base.py b/kafka/consumer/base.py index 2464aaf..9cdcf89 100644 --- a/kafka/consumer/base.py +++ b/kafka/consumer/base.py @@ -32,9 +32,11 @@ class Consumer(object): Base class to be used by other consumers. Not to be used directly This base class provides logic for + * initialization and fetching metadata of partitions * Auto-commit logic * APIs for fetching pending message count + """ def __init__(self, client, group, topic, partitions=None, auto_commit=True, auto_commit_every_n=AUTO_COMMIT_MSG_COUNT, @@ -93,8 +95,9 @@ class Consumer(object): """ Commit offsets for this consumer - partitions: list of partitions to commit, default is to commit - all of them + Keyword Arguments: + partitions (list): list of partitions to commit, default is to commit + all of them """ # short circuit if nothing happened. This check is kept outside @@ -148,7 +151,8 @@ class Consumer(object): """ Gets the pending message count - partitions: list of partitions to check for, default is to check all + Keyword Arguments: + partitions (list): list of partitions to check for, default is to check all """ if not partitions: partitions = self.offsets.keys() diff --git a/kafka/consumer/kafka.py b/kafka/consumer/kafka.py index f16b526..49ffa7b 100644 --- a/kafka/consumer/kafka.py +++ b/kafka/consumer/kafka.py @@ -47,79 +47,86 @@ DEFAULT_CONSUMER_CONFIG = { 'rebalance_backoff_ms': 2000, } -BYTES_CONFIGURATION_KEYS = ('client_id', 'group_id') - class KafkaConsumer(object): """ A simpler kafka consumer - ``` - # A very basic 'tail' consumer, with no stored offset management - kafka = KafkaConsumer('topic1') - for m in kafka: - print m - - # Alternate interface: next() - print kafka.next() - - # Alternate interface: batch iteration - while True: - for m in kafka.fetch_messages(): - print m - print "Done with batch - let's do another!" - ``` - - ``` - # more advanced consumer -- multiple topics w/ auto commit offset management - kafka = KafkaConsumer('topic1', 'topic2', - group_id='my_consumer_group', - auto_commit_enable=True, - auto_commit_interval_ms=30 * 1000, - auto_offset_reset='smallest') - - # Infinite iteration - for m in kafka: - process_message(m) - kafka.task_done(m) - - # Alternate interface: next() - m = kafka.next() - process_message(m) - kafka.task_done(m) - - # If auto_commit_enable is False, remember to commit() periodically - kafka.commit() - - # Batch process interface - while True: - for m in kafka.fetch_messages(): + .. code:: python + + # A very basic 'tail' consumer, with no stored offset management + kafka = KafkaConsumer('topic1', + metadata_broker_list=['localhost:9092']) + for m in kafka: + print m + + # Alternate interface: next() + print kafka.next() + + # Alternate interface: batch iteration + while True: + for m in kafka.fetch_messages(): + print m + print "Done with batch - let's do another!" + + + .. code:: python + + # more advanced consumer -- multiple topics w/ auto commit offset + # management + kafka = KafkaConsumer('topic1', 'topic2', + metadata_broker_list=['localhost:9092'], + group_id='my_consumer_group', + auto_commit_enable=True, + auto_commit_interval_ms=30 * 1000, + auto_offset_reset='smallest') + + # Infinite iteration + for m in kafka: + process_message(m) + kafka.task_done(m) + + # Alternate interface: next() + m = kafka.next() process_message(m) kafka.task_done(m) - ``` + + # If auto_commit_enable is False, remember to commit() periodically + kafka.commit() + + # Batch process interface + while True: + for m in kafka.fetch_messages(): + process_message(m) + kafka.task_done(m) + messages (m) are namedtuples with attributes: - m.topic: topic name (str) - m.partition: partition number (int) - m.offset: message offset on topic-partition log (int) - m.key: key (bytes - can be None) - m.value: message (output of deserializer_class - default is raw bytes) + + * `m.topic`: topic name (str) + * `m.partition`: partition number (int) + * `m.offset`: message offset on topic-partition log (int) + * `m.key`: key (bytes - can be None) + * `m.value`: message (output of deserializer_class - default is raw bytes) Configuration settings can be passed to constructor, otherwise defaults will be used: - client_id='kafka.consumer.kafka', - group_id=None, - fetch_message_max_bytes=1024*1024, - fetch_min_bytes=1, - fetch_wait_max_ms=100, - refresh_leader_backoff_ms=200, - metadata_broker_list=None, - socket_timeout_ms=30*1000, - auto_offset_reset='largest', - deserializer_class=lambda msg: msg, - auto_commit_enable=False, - auto_commit_interval_ms=60 * 1000, - consumer_timeout_ms=-1 + + .. code:: python + + client_id='kafka.consumer.kafka', + group_id=None, + fetch_message_max_bytes=1024*1024, + fetch_min_bytes=1, + fetch_wait_max_ms=100, + refresh_leader_backoff_ms=200, + metadata_broker_list=None, + socket_timeout_ms=30*1000, + auto_offset_reset='largest', + deserializer_class=lambda msg: msg, + auto_commit_enable=False, + auto_commit_interval_ms=60 * 1000, + consumer_timeout_ms=-1 Configuration parameters are described in more detail at http://kafka.apache.org/documentation.html#highlevelconsumerapi @@ -133,6 +140,9 @@ class KafkaConsumer(object): """ Configuration settings can be passed to constructor, otherwise defaults will be used: + + .. code:: python + client_id='kafka.consumer.kafka', group_id=None, fetch_message_max_bytes=1024*1024, @@ -159,13 +169,6 @@ class KafkaConsumer(object): raise KafkaConfigurationError('Unknown configuration key(s): ' + str(list(configs.keys()))) - # Handle str/bytes conversions - for config_key in BYTES_CONFIGURATION_KEYS: - if isinstance(self._config[config_key], six.string_types): - logger.warning("Converting configuration key '%s' to bytes" % - config_key) - self._config[config_key] = self._config[config_key].encode('utf-8') - if self._config['auto_commit_enable']: if not self._config['group_id']: raise KafkaConfigurationError('KafkaConsumer configured to auto-commit without required consumer group (group_id)') @@ -189,28 +192,35 @@ class KafkaConsumer(object): Optionally specify offsets to start from Accepts types: - str (utf-8): topic name (will consume all available partitions) - tuple: (topic, partition) - dict: { topic: partition } - { topic: [partition list] } - { topic: (partition tuple,) } + + * str (utf-8): topic name (will consume all available partitions) + * tuple: (topic, partition) + * dict: + - { topic: partition } + - { topic: [partition list] } + - { topic: (partition tuple,) } Optionally, offsets can be specified directly: - tuple: (topic, partition, offset) - dict: { (topic, partition): offset, ... } - Ex: - kafka = KafkaConsumer() + * tuple: (topic, partition, offset) + * dict: { (topic, partition): offset, ... } + + Example: + + .. code:: python + + kafka = KafkaConsumer() - # Consume topic1-all; topic2-partition2; topic3-partition0 - kafka.set_topic_partitions("topic1", ("topic2", 2), {"topic3": 0}) + # Consume topic1-all; topic2-partition2; topic3-partition0 + kafka.set_topic_partitions("topic1", ("topic2", 2), {"topic3": 0}) - # Consume topic1-0 starting at offset 123, and topic2-1 at offset 456 - # using tuples -- - kafka.set_topic_partitions(("topic1", 0, 123), ("topic2", 1, 456)) + # Consume topic1-0 starting at offset 123, and topic2-1 at offset 456 + # using tuples -- + kafka.set_topic_partitions(("topic1", 0, 123), ("topic2", 1, 456)) + + # using dict -- + kafka.set_topic_partitions({ ("topic1", 0): 123, ("topic2", 1): 456 }) - # using dict -- - kafka.set_topic_partitions({ ("topic1", 0): 123, ("topic2", 1): 456 }) """ self._topics = [] self._client.load_metadata_for_topics() @@ -309,10 +319,12 @@ class KafkaConsumer(object): Otherwise blocks indefinitely Note that this is also the method called internally during iteration: - ``` - for m in consumer: - pass - ``` + + .. code:: python + + for m in consumer: + pass + """ self._set_consumer_timeout_start() while True: @@ -336,11 +348,12 @@ class KafkaConsumer(object): OffsetOutOfRange, per the configured `auto_offset_reset` policy Key configuration parameters: - `fetch_message_max_bytes` - `fetch_max_wait_ms` - `fetch_min_bytes` - `deserializer_class` - `auto_offset_reset` + + * `fetch_message_max_bytes` + * `fetch_max_wait_ms` + * `fetch_min_bytes` + * `deserializer_class` + * `auto_offset_reset` """ max_bytes = self._config['fetch_message_max_bytes'] @@ -408,6 +421,10 @@ class KafkaConsumer(object): offset, message.key, self._config['deserializer_class'](message.value)) + if offset < self._offsets.fetch[topic_partition]: + logger.debug('Skipping message %s because its offset is less than the consumer offset', + msg) + continue # Only increment fetch offset if we safely got the message and deserialized self._offsets.fetch[topic_partition] = offset + 1 @@ -418,20 +435,18 @@ class KafkaConsumer(object): """ Request available fetch offsets for a single topic/partition - @param topic (str) - @param partition (int) - @param request_time_ms (int) -- Used to ask for all messages before a - certain time (ms). There are two special - values. Specify -1 to receive the latest - offset (i.e. the offset of the next coming - message) and -2 to receive the earliest - available offset. Note that because offsets - are pulled in descending order, asking for - the earliest offset will always return you - a single element. - @param max_num_offsets (int) - - @return offsets (list) + Arguments: + topic (str) + partition (int) + request_time_ms (int): Used to ask for all messages before a + certain time (ms). There are two special values. Specify -1 to receive the latest + offset (i.e. the offset of the next coming message) and -2 to receive the earliest + available offset. Note that because offsets are pulled in descending order, asking for + the earliest offset will always return you a single element. + max_num_offsets (int) + + Returns: + offsets (list) """ reqs = [OffsetRequest(topic, partition, request_time_ms, max_num_offsets)] @@ -448,9 +463,12 @@ class KafkaConsumer(object): def offsets(self, group=None): """ - Returns a copy of internal offsets struct - optional param: group [fetch|commit|task_done|highwater] - if no group specified, returns all groups + Keyword Arguments: + group: Either "fetch", "commit", "task_done", or "highwater". + If no group specified, returns all groups. + + Returns: + A copy of internal offsets struct """ if not group: return { @@ -498,8 +516,8 @@ class KafkaConsumer(object): Store consumed message offsets (marked via task_done()) to kafka cluster for this consumer_group. - Note -- this functionality requires server version >=0.8.1.1 - see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI + **Note**: this functionality requires server version >=0.8.1.1 + See `this wiki page <https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI>`_. """ if not self._config['group_id']: logger.warning('Cannot commit without a group_id!') @@ -531,7 +549,7 @@ class KafkaConsumer(object): if commits: logger.info('committing consumer offsets to group %s', self._config['group_id']) - resps = self._client.send_offset_commit_request(self._config['group_id'], + resps = self._client.send_offset_commit_request(kafka_bytestring(self._config['group_id']), commits, fail_on_error=False) @@ -595,7 +613,7 @@ class KafkaConsumer(object): logger.info("Consumer fetching stored offsets") for topic_partition in self._topics: (resp,) = self._client.send_offset_fetch_request( - self._config['group_id'], + kafka_bytestring(self._config['group_id']), [OffsetFetchRequest(topic_partition[0], topic_partition[1])], fail_on_error=False) try: diff --git a/kafka/consumer/multiprocess.py b/kafka/consumer/multiprocess.py index 912e64b..4dc04dc 100644 --- a/kafka/consumer/multiprocess.py +++ b/kafka/consumer/multiprocess.py @@ -80,19 +80,21 @@ class MultiProcessConsumer(Consumer): A consumer implementation that consumes partitions for a topic in parallel using multiple processes - client: a connected KafkaClient - group: a name for this consumer, used for offset storage and must be unique - topic: the topic to consume - - auto_commit: default True. Whether or not to auto commit the offsets - auto_commit_every_n: default 100. How many messages to consume - before a commit - auto_commit_every_t: default 5000. How much time (in milliseconds) to - wait before commit - num_procs: Number of processes to start for consuming messages. - The available partitions will be divided among these processes - partitions_per_proc: Number of partitions to be allocated per process - (overrides num_procs) + Arguments: + client: a connected KafkaClient + group: a name for this consumer, used for offset storage and must be unique + topic: the topic to consume + + Keyword Arguments: + auto_commit: default True. Whether or not to auto commit the offsets + auto_commit_every_n: default 100. How many messages to consume + before a commit + auto_commit_every_t: default 5000. How much time (in milliseconds) to + wait before commit + num_procs: Number of processes to start for consuming messages. + The available partitions will be divided among these processes + partitions_per_proc: Number of partitions to be allocated per process + (overrides num_procs) Auto commit details: If both auto_commit_every_n and auto_commit_every_t are set, they will @@ -198,11 +200,12 @@ class MultiProcessConsumer(Consumer): """ Fetch the specified number of messages - count: Indicates the maximum number of messages to be fetched - block: If True, the API will block till some messages are fetched. - timeout: If block is True, the function will block for the specified - time (in seconds) until count messages is fetched. If None, - it will block forever. + Keyword Arguments: + count: Indicates the maximum number of messages to be fetched + block: If True, the API will block till some messages are fetched. + timeout: If block is True, the function will block for the specified + time (in seconds) until count messages is fetched. If None, + it will block forever. """ messages = [] diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py index df975f4..3d250ea 100644 --- a/kafka/consumer/simple.py +++ b/kafka/consumer/simple.py @@ -8,6 +8,7 @@ import logging import time import six +import sys try: from Queue import Empty, Queue @@ -16,7 +17,9 @@ except ImportError: # python 2 from kafka.common import ( FetchRequest, OffsetRequest, - ConsumerFetchSizeTooSmall, ConsumerNoMoreData + ConsumerFetchSizeTooSmall, ConsumerNoMoreData, + UnknownTopicOrPartitionError, NotLeaderForPartitionError, + OffsetOutOfRangeError, check_error ) from .base import ( Consumer, @@ -67,24 +70,36 @@ class SimpleConsumer(Consumer): A simple consumer implementation that consumes all/specified partitions for a topic - client: a connected KafkaClient - group: a name for this consumer, used for offset storage and must be unique - topic: the topic to consume - partitions: An optional list of partitions to consume the data from - - auto_commit: default True. Whether or not to auto commit the offsets - auto_commit_every_n: default 100. How many messages to consume - before a commit - auto_commit_every_t: default 5000. How much time (in milliseconds) to - wait before commit - fetch_size_bytes: number of bytes to request in a FetchRequest - buffer_size: default 4K. Initial number of bytes to tell kafka we - have available. This will double as needed. - max_buffer_size: default 16K. Max number of bytes to tell kafka we have - available. None means no limit. - iter_timeout: default None. How much time (in seconds) to wait for a - message in the iterator before exiting. None means no - timeout, so it will wait forever. + Arguments: + client: a connected KafkaClient + group: a name for this consumer, used for offset storage and must be unique + topic: the topic to consume + + Keyword Arguments: + partitions: An optional list of partitions to consume the data from + + auto_commit: default True. Whether or not to auto commit the offsets + + auto_commit_every_n: default 100. How many messages to consume + before a commit + + auto_commit_every_t: default 5000. How much time (in milliseconds) to + wait before commit + fetch_size_bytes: number of bytes to request in a FetchRequest + + buffer_size: default 4K. Initial number of bytes to tell kafka we + have available. This will double as needed. + + max_buffer_size: default 16K. Max number of bytes to tell kafka we have + available. None means no limit. + + iter_timeout: default None. How much time (in seconds) to wait for a + message in the iterator before exiting. None means no + timeout, so it will wait forever. + + auto_offset_reset: default largest. Reset partition offsets upon + OffsetOutOfRangeError. Valid values are largest and smallest. + Otherwise, do not reset the offsets and raise OffsetOutOfRangeError. Auto commit details: If both auto_commit_every_n and auto_commit_every_t are set, they will @@ -98,7 +113,8 @@ class SimpleConsumer(Consumer): fetch_size_bytes=FETCH_MIN_BYTES, buffer_size=FETCH_BUFFER_SIZE_BYTES, max_buffer_size=MAX_FETCH_BUFFER_SIZE_BYTES, - iter_timeout=None): + iter_timeout=None, + auto_offset_reset='largest'): super(SimpleConsumer, self).__init__( client, group, topic, partitions=partitions, @@ -117,12 +133,38 @@ class SimpleConsumer(Consumer): self.fetch_min_bytes = fetch_size_bytes self.fetch_offsets = self.offsets.copy() self.iter_timeout = iter_timeout + self.auto_offset_reset = auto_offset_reset self.queue = Queue() def __repr__(self): return '<SimpleConsumer group=%s, topic=%s, partitions=%s>' % \ (self.group, self.topic, str(self.offsets.keys())) + def reset_partition_offset(self, partition): + LATEST = -1 + EARLIEST = -2 + if self.auto_offset_reset == 'largest': + reqs = [OffsetRequest(self.topic, partition, LATEST, 1)] + elif self.auto_offset_reset == 'smallest': + reqs = [OffsetRequest(self.topic, partition, EARLIEST, 1)] + else: + # Let's raise an reasonable exception type if user calls + # outside of an exception context + if sys.exc_info() == (None, None, None): + raise OffsetOutOfRangeError('Cannot reset partition offsets without a ' + 'valid auto_offset_reset setting ' + '(largest|smallest)') + # Otherwise we should re-raise the upstream exception + # b/c it typically includes additional data about + # the request that triggered it, and we do not want to drop that + raise + + # send_offset_request + (resp, ) = self.client.send_offset_request(reqs) + check_error(resp) + self.offsets[partition] = resp.offsets[0] + self.fetch_offsets[partition] = resp.offsets[0] + def provide_partition_info(self): """ Indicates that partition info must be returned by the consumer @@ -133,11 +175,13 @@ class SimpleConsumer(Consumer): """ Alter the current offset in the consumer, similar to fseek - offset: how much to modify the offset - whence: where to modify it from - 0 is relative to the earliest available offset (head) - 1 is relative to the current offset - 2 is relative to the latest known offset (tail) + Arguments: + offset: how much to modify the offset + whence: where to modify it from + + * 0 is relative to the earliest available offset (head) + * 1 is relative to the current offset + * 2 is relative to the latest known offset (tail) """ if whence == 1: # relative to current position @@ -180,11 +224,12 @@ class SimpleConsumer(Consumer): """ Fetch the specified number of messages - count: Indicates the maximum number of messages to be fetched - block: If True, the API will block till some messages are fetched. - timeout: If block is True, the function will block for the specified - time (in seconds) until count messages is fetched. If None, - it will block forever. + Keyword Arguments: + count: Indicates the maximum number of messages to be fetched + block: If True, the API will block till some messages are fetched. + timeout: If block is True, the function will block for the specified + time (in seconds) until count messages is fetched. If None, + it will block forever. """ messages = [] if timeout is not None: @@ -286,14 +331,35 @@ class SimpleConsumer(Consumer): responses = self.client.send_fetch_request( requests, max_wait_time=int(self.fetch_max_wait_time), - min_bytes=self.fetch_min_bytes) + min_bytes=self.fetch_min_bytes, + fail_on_error=False + ) retry_partitions = {} for resp in responses: + + try: + check_error(resp) + except (UnknownTopicOrPartitionError, NotLeaderForPartitionError): + self.client.reset_topic_metadata(resp.topic) + raise + except OffsetOutOfRangeError: + log.warning("OffsetOutOfRangeError for %s - %d. " + "Resetting partition offset...", + resp.topic, resp.partition) + self.reset_partition_offset(resp.partition) + # Retry this partition + retry_partitions[resp.partition] = partitions[resp.partition] + continue + partition = resp.partition buffer_size = partitions[partition] try: for message in resp.messages: + if message.offset < self.fetch_offsets[partition]: + log.debug('Skipping message %s because its offset is less than the consumer offset', + message) + continue # Put the message in our queue self.queue.put((partition, message)) self.fetch_offsets[partition] = message.offset + 1 diff --git a/kafka/context.py b/kafka/context.py index 98ed7b3..ade4db8 100644 --- a/kafka/context.py +++ b/kafka/context.py @@ -18,6 +18,8 @@ class OffsetCommitContext(object): Example: + .. code:: python + consumer = SimpleConsumer(client, group, topic, auto_commit=False) consumer.provide_partition_info() consumer.fetch_last_known_offsets() @@ -57,7 +59,10 @@ class OffsetCommitContext(object): In order to know the current partition, it is helpful to initialize the consumer to provide partition info via: + .. code:: python + consumer.provide_partition_info() + """ max_offset = max(offset + 1, self.high_water_mark.get(partition, 0)) diff --git a/kafka/partitioner/base.py b/kafka/partitioner/base.py index c62b7ed..857f634 100644 --- a/kafka/partitioner/base.py +++ b/kafka/partitioner/base.py @@ -7,17 +7,18 @@ class Partitioner(object): """ Initialize the partitioner - partitions - A list of available partitions (during startup) + Arguments: + partitions: A list of available partitions (during startup) """ self.partitions = partitions - def partition(self, key, partitions): + def partition(self, key, partitions=None): """ Takes a string key and num_partitions as argument and returns a partition to be used for the message - partitions - The list of partitions is passed in every call. This - may look like an overhead, but it will be useful - (in future) when we handle cases like rebalancing + Arguments: + key: the key to use for partitioning + partitions: (optional) a list of partitions. """ raise NotImplementedError('partition function has to be implemented') diff --git a/kafka/partitioner/hashed.py b/kafka/partitioner/hashed.py index 587a3de..fb5e598 100644 --- a/kafka/partitioner/hashed.py +++ b/kafka/partitioner/hashed.py @@ -5,7 +5,9 @@ class HashedPartitioner(Partitioner): Implements a partitioner which selects the target partition based on the hash of the key """ - def partition(self, key, partitions): + def partition(self, key, partitions=None): + if not partitions: + partitions = self.partitions size = len(partitions) idx = hash(key) % size diff --git a/kafka/partitioner/roundrobin.py b/kafka/partitioner/roundrobin.py index 54d00da..6439e53 100644 --- a/kafka/partitioner/roundrobin.py +++ b/kafka/partitioner/roundrobin.py @@ -15,9 +15,9 @@ class RoundRobinPartitioner(Partitioner): self.partitions = partitions self.iterpart = cycle(partitions) - def partition(self, key, partitions): + def partition(self, key, partitions=None): # Refresh the partition list if necessary - if self.partitions != partitions: + if partitions and self.partitions != partitions: self._set_partitions(partitions) return next(self.iterpart) diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 6e19b92..695f195 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -85,20 +85,20 @@ class Producer(object): """ Base class to be used by producers - Params: - client - The Kafka client instance to use - async - If set to true, the messages are sent asynchronously via another + Arguments: + client: The Kafka client instance to use + async: If set to true, the messages are sent asynchronously via another thread (process). We will not wait for a response to these WARNING!!! current implementation of async producer does not guarantee message delivery. Use at your own risk! Or help us improve with a PR! - req_acks - A value indicating the acknowledgements that the server must - receive before responding to the request - ack_timeout - Value (in milliseconds) indicating a timeout for waiting - for an acknowledgement - batch_send - If True, messages are send in batches - batch_send_every_n - If set, messages are send in batches of this size - batch_send_every_t - If set, messages are send after this timeout + req_acks: A value indicating the acknowledgements that the server must + receive before responding to the request + ack_timeout: Value (in milliseconds) indicating a timeout for waiting + for an acknowledgement + batch_send: If True, messages are send in batches + batch_send_every_n: If set, messages are send in batches of this size + batch_send_every_t: If set, messages are send after this timeout """ ACK_NOT_REQUIRED = 0 # No ack is required @@ -127,6 +127,7 @@ class Producer(object): self.async = async self.req_acks = req_acks self.ack_timeout = ack_timeout + self.stopped = False if codec is None: codec = CODEC_NONE @@ -212,3 +213,8 @@ class Producer(object): if self.proc.is_alive(): self.proc.terminate() + self.stopped = True + + def __del__(self): + if not self.stopped: + self.stop() diff --git a/kafka/producer/keyed.py b/kafka/producer/keyed.py index 68c70d9..36328ed 100644 --- a/kafka/producer/keyed.py +++ b/kafka/producer/keyed.py @@ -15,17 +15,19 @@ class KeyedProducer(Producer): """ A producer which distributes messages to partitions based on the key - Args: - client - The kafka client instance - partitioner - A partitioner class that will be used to get the partition - to send the message to. Must be derived from Partitioner - async - If True, the messages are sent asynchronously via another + Arguments: + client: The kafka client instance + + Keyword Arguments: + partitioner: A partitioner class that will be used to get the partition + to send the message to. Must be derived from Partitioner + async: If True, the messages are sent asynchronously via another thread (process). We will not wait for a response to these - ack_timeout - Value (in milliseconds) indicating a timeout for waiting - for an acknowledgement - batch_send - If True, messages are send in batches - batch_send_every_n - If set, messages are send in batches of this size - batch_send_every_t - If set, messages are send after this timeout + ack_timeout: Value (in milliseconds) indicating a timeout for waiting + for an acknowledgement + batch_send: If True, messages are send in batches + batch_send_every_n: If set, messages are send in batches of this size + batch_send_every_t: If set, messages are send after this timeout """ def __init__(self, client, partitioner=None, async=False, req_acks=Producer.ACK_AFTER_LOCAL_WRITE, @@ -52,7 +54,7 @@ class KeyedProducer(Producer): self.partitioners[topic] = self.partitioner_class(self.client.get_partition_ids_for_topic(topic)) partitioner = self.partitioners[topic] - return partitioner.partition(key, self.client.get_partition_ids_for_topic(topic)) + return partitioner.partition(key) def send_messages(self,topic,key,*msg): partition = self._next_partition(topic, key) diff --git a/kafka/producer/simple.py b/kafka/producer/simple.py index 401b79b..2699cf2 100644 --- a/kafka/producer/simple.py +++ b/kafka/producer/simple.py @@ -2,6 +2,7 @@ from __future__ import absolute_import import logging import random +import six from itertools import cycle @@ -19,21 +20,23 @@ class SimpleProducer(Producer): """ A simple, round-robin producer. Each message goes to exactly one partition - Params: - client - The Kafka client instance to use - async - If True, the messages are sent asynchronously via another + Arguments: + client: The Kafka client instance to use + + Keyword Arguments: + async: If True, the messages are sent asynchronously via another thread (process). We will not wait for a response to these - req_acks - A value indicating the acknowledgements that the server must - receive before responding to the request - ack_timeout - Value (in milliseconds) indicating a timeout for waiting - for an acknowledgement - batch_send - If True, messages are send in batches - batch_send_every_n - If set, messages are send in batches of this size - batch_send_every_t - If set, messages are send after this timeout - random_start - If true, randomize the initial partition which the - the first message block will be published to, otherwise - if false, the first message block will always publish - to partition 0 before cycling through each partition + req_acks: A value indicating the acknowledgements that the server must + receive before responding to the request + ack_timeout: Value (in milliseconds) indicating a timeout for waiting + for an acknowledgement + batch_send: If True, messages are send in batches + batch_send_every_n: If set, messages are send in batches of this size + batch_send_every_t: If set, messages are send after this timeout + random_start: If true, randomize the initial partition which the + the first message block will be published to, otherwise + if false, the first message block will always publish + to partition 0 before cycling through each partition """ def __init__(self, client, async=False, req_acks=Producer.ACK_AFTER_LOCAL_WRITE, @@ -66,8 +69,13 @@ class SimpleProducer(Producer): return next(self.partition_cycles[topic]) def send_messages(self, topic, *msg): + if not isinstance(topic, six.binary_type): + topic = topic.encode('utf-8') + partition = self._next_partition(topic) - return super(SimpleProducer, self).send_messages(topic, partition, *msg) + return super(SimpleProducer, self).send_messages( + topic, partition, *msg + ) def __repr__(self): return '<SimpleProducer batch=%s>' % self.async diff --git a/kafka/protocol.py b/kafka/protocol.py index a85c7eb..2a39de6 100644 --- a/kafka/protocol.py +++ b/kafka/protocol.py @@ -185,18 +185,18 @@ class KafkaProtocol(object): """ Encode some ProduceRequest structs - Params - ====== - client_id: string - correlation_id: int - payloads: list of ProduceRequest - acks: How "acky" you want the request to be - 0: immediate response - 1: written to disk by the leader - 2+: waits for this many number of replicas to sync - -1: waits for all replicas to be in sync - timeout: Maximum time the server will wait for acks from replicas. - This is _not_ a socket timeout + Arguments: + client_id: string + correlation_id: int + payloads: list of ProduceRequest + acks: How "acky" you want the request to be + 0: immediate response + 1: written to disk by the leader + 2+: waits for this many number of replicas to sync + -1: waits for all replicas to be in sync + timeout: Maximum time the server will wait for acks from replicas. + This is _not_ a socket timeout + """ payloads = [] if payloads is None else payloads grouped_payloads = group_by_topic_and_partition(payloads) @@ -225,9 +225,9 @@ class KafkaProtocol(object): """ Decode bytes to a ProduceResponse - Params - ====== - data: bytes to decode + Arguments: + data: bytes to decode + """ ((correlation_id, num_topics), cur) = relative_unpack('>ii', data, 0) @@ -248,14 +248,13 @@ class KafkaProtocol(object): """ Encodes some FetchRequest structs - Params - ====== - client_id: string - correlation_id: int - payloads: list of FetchRequest - max_wait_time: int, how long to block waiting on min_bytes of data - min_bytes: int, the minimum number of bytes to accumulate before - returning the response + Arguments: + client_id: string + correlation_id: int + payloads: list of FetchRequest + max_wait_time: int, how long to block waiting on min_bytes of data + min_bytes: int, the minimum number of bytes to accumulate before + returning the response """ payloads = [] if payloads is None else payloads @@ -284,9 +283,8 @@ class KafkaProtocol(object): """ Decode bytes to a FetchResponse - Params - ====== - data: bytes to decode + Arguments: + data: bytes to decode """ ((correlation_id, num_topics), cur) = relative_unpack('>ii', data, 0) @@ -333,9 +331,8 @@ class KafkaProtocol(object): """ Decode bytes to an OffsetResponse - Params - ====== - data: bytes to decode + Arguments: + data: bytes to decode """ ((correlation_id, num_topics), cur) = relative_unpack('>ii', data, 0) @@ -360,11 +357,10 @@ class KafkaProtocol(object): """ Encode a MetadataRequest - Params - ====== - client_id: string - correlation_id: int - topics: list of strings + Arguments: + client_id: string + correlation_id: int + topics: list of strings """ if payloads is None: topics = [] if topics is None else topics @@ -388,9 +384,8 @@ class KafkaProtocol(object): """ Decode bytes to a MetadataResponse - Params - ====== - data: bytes to decode + Arguments: + data: bytes to decode """ ((correlation_id, numbrokers), cur) = relative_unpack('>ii', data, 0) @@ -439,12 +434,11 @@ class KafkaProtocol(object): """ Encode some OffsetCommitRequest structs - Params - ====== - client_id: string - correlation_id: int - group: string, the consumer group you are committing offsets for - payloads: list of OffsetCommitRequest + Arguments: + client_id: string + correlation_id: int + group: string, the consumer group you are committing offsets for + payloads: list of OffsetCommitRequest """ grouped_payloads = group_by_topic_and_partition(payloads) @@ -470,9 +464,8 @@ class KafkaProtocol(object): """ Decode bytes to an OffsetCommitResponse - Params - ====== - data: bytes to decode + Arguments: + data: bytes to decode """ ((correlation_id,), cur) = relative_unpack('>i', data, 0) ((num_topics,), cur) = relative_unpack('>i', data, cur) @@ -491,12 +484,11 @@ class KafkaProtocol(object): """ Encode some OffsetFetchRequest structs - Params - ====== - client_id: string - correlation_id: int - group: string, the consumer group you are fetching offsets for - payloads: list of OffsetFetchRequest + Arguments: + client_id: string + correlation_id: int + group: string, the consumer group you are fetching offsets for + payloads: list of OffsetFetchRequest """ grouped_payloads = group_by_topic_and_partition(payloads) @@ -522,9 +514,8 @@ class KafkaProtocol(object): """ Decode bytes to an OffsetFetchResponse - Params - ====== - data: bytes to decode + Arguments: + data: bytes to decode """ ((correlation_id,), cur) = relative_unpack('>i', data, 0) @@ -547,10 +538,10 @@ def create_message(payload, key=None): """ Construct a Message - Params - ====== - payload: bytes, the payload to send to Kafka - key: bytes, a key used for partition routing (optional) + Arguments: + payload: bytes, the payload to send to Kafka + key: bytes, a key used for partition routing (optional) + """ return Message(0, 0, key, payload) @@ -562,10 +553,10 @@ def create_gzip_message(payloads, key=None): The given payloads will be encoded, compressed, and sent as a single atomic message to Kafka. - Params - ====== - payloads: list(bytes), a list of payload to send be sent to Kafka - key: bytes, a key used for partition routing (optional) + Arguments: + payloads: list(bytes), a list of payload to send be sent to Kafka + key: bytes, a key used for partition routing (optional) + """ message_set = KafkaProtocol._encode_message_set( [create_message(payload, key) for payload in payloads]) @@ -583,10 +574,10 @@ def create_snappy_message(payloads, key=None): The given payloads will be encoded, compressed, and sent as a single atomic message to Kafka. - Params - ====== - payloads: list(bytes), a list of payload to send be sent to Kafka - key: bytes, a key used for partition routing (optional) + Arguments: + payloads: list(bytes), a list of payload to send be sent to Kafka + key: bytes, a key used for partition routing (optional) + """ message_set = KafkaProtocol._encode_message_set( [create_message(payload, key) for payload in payloads]) diff --git a/kafka/queue.py b/kafka/queue.py deleted file mode 100644 index ada495f..0000000 --- a/kafka/queue.py +++ /dev/null @@ -1,219 +0,0 @@ -from __future__ import absolute_import - -from copy import copy -import logging -from multiprocessing import Process, Queue, Event -from Queue import Empty -import time - -from kafka.client import KafkaClient, FetchRequest, ProduceRequest - -log = logging.getLogger("kafka") - -raise NotImplementedError("Still need to refactor this class") - - -class KafkaConsumerProcess(Process): - def __init__(self, client, topic, partition, out_queue, barrier, - consumer_fetch_size=1024, consumer_sleep=200): - self.client = copy(client) - self.topic = topic - self.partition = partition - self.out_queue = out_queue - self.barrier = barrier - self.consumer_fetch_size = consumer_fetch_size - self.consumer_sleep = consumer_sleep / 1000. - log.info("Initializing %s" % self) - Process.__init__(self) - - def __str__(self): - return "[KafkaConsumerProcess: topic=%s, \ - partition=%s, sleep=%s]" % \ - (self.topic, self.partition, self.consumer_sleep) - - def run(self): - self.barrier.wait() - log.info("Starting %s" % self) - fetchRequest = FetchRequest(self.topic, self.partition, - offset=0, size=self.consumer_fetch_size) - - while True: - if self.barrier.is_set() is False: - log.info("Shutdown %s" % self) - self.client.close() - break - - lastOffset = fetchRequest.offset - (messages, fetchRequest) = self.client.get_message_set(fetchRequest) - - if fetchRequest.offset == lastOffset: - log.debug("No more data for this partition, " - "sleeping a bit (200ms)") - time.sleep(self.consumer_sleep) - continue - - for message in messages: - self.out_queue.put(message) - - -class KafkaProducerProcess(Process): - def __init__(self, client, topic, in_queue, barrier, - producer_flush_buffer=500, - producer_flush_timeout=2000, - producer_timeout=100): - - self.client = copy(client) - self.topic = topic - self.in_queue = in_queue - self.barrier = barrier - self.producer_flush_buffer = producer_flush_buffer - self.producer_flush_timeout = producer_flush_timeout / 1000. - self.producer_timeout = producer_timeout / 1000. - log.info("Initializing %s" % self) - Process.__init__(self) - - def __str__(self): - return "[KafkaProducerProcess: topic=%s, \ - flush_buffer=%s, flush_timeout=%s, timeout=%s]" % \ - (self.topic, - self.producer_flush_buffer, - self.producer_flush_timeout, - self.producer_timeout) - - def run(self): - self.barrier.wait() - log.info("Starting %s" % self) - messages = [] - last_produce = time.time() - - def flush(messages): - self.client.send_message_set(ProduceRequest(self.topic, -1, - messages)) - del messages[:] - - while True: - if self.barrier.is_set() is False: - log.info("Shutdown %s, flushing messages" % self) - flush(messages) - self.client.close() - break - - if len(messages) > self.producer_flush_buffer: - log.debug("Message count threshold reached. Flushing messages") - flush(messages) - last_produce = time.time() - - elif (time.time() - last_produce) > self.producer_flush_timeout: - log.debug("Producer timeout reached. Flushing messages") - flush(messages) - last_produce = time.time() - - try: - msg = KafkaClient.create_message( - self.in_queue.get(True, self.producer_timeout)) - messages.append(msg) - - except Empty: - continue - - -class KafkaQueue(object): - def __init__(self, client, topic, partitions, - producer_config=None, consumer_config=None): - """ - KafkaQueue a Queue-like object backed by a Kafka producer and some - number of consumers - - Messages are eagerly loaded by the consumer in batches of size - consumer_fetch_size. - Messages are buffered in the producer thread until - producer_flush_timeout or producer_flush_buffer is reached. - - Params - ====== - client: KafkaClient object - topic: str, the topic name - partitions: list of ints, the partions to consume from - producer_config: dict, see below - consumer_config: dict, see below - - Consumer Config - =============== - consumer_fetch_size: int, number of bytes to fetch in one call - to Kafka. Default is 1024 - consumer_sleep: int, time in milliseconds a consumer should sleep - when it reaches the end of a partition. Default is 200 - - Producer Config - =============== - producer_timeout: int, time in milliseconds a producer should - wait for messages to enqueue for producing. - Default is 100 - producer_flush_timeout: int, time in milliseconds a producer - should allow messages to accumulate before - sending to Kafka. Default is 2000 - producer_flush_buffer: int, number of messages a producer should - allow to accumulate. Default is 500 - - """ - producer_config = {} if producer_config is None else producer_config - consumer_config = {} if consumer_config is None else consumer_config - - self.in_queue = Queue() - self.out_queue = Queue() - self.consumers = [] - self.barrier = Event() - - # Initialize and start consumer threads - for partition in partitions: - consumer = KafkaConsumerProcess(client, topic, partition, - self.in_queue, self.barrier, - **consumer_config) - consumer.start() - self.consumers.append(consumer) - - # Initialize and start producer thread - self.producer = KafkaProducerProcess(client, topic, self.out_queue, - self.barrier, **producer_config) - self.producer.start() - - # Trigger everything to start - self.barrier.set() - - def get(self, block=True, timeout=None): - """ - Consume a message from Kafka - - Params - ====== - block: boolean, default True - timeout: int, number of seconds to wait when blocking, default None - - Returns - ======= - msg: str, the payload from Kafka - """ - return self.in_queue.get(block, timeout).payload - - def put(self, msg, block=True, timeout=None): - """ - Send a message to Kafka - - Params - ====== - msg: std, the message to send - block: boolean, default True - timeout: int, number of seconds to wait when blocking, default None - """ - self.out_queue.put(msg, block, timeout) - - def close(self): - """ - Close the internal queues and Kafka consumers/producer - """ - self.in_queue.close() - self.out_queue.close() - self.barrier.clear() - self.producer.join() - for consumer in self.consumers: - consumer.join() diff --git a/kafka/util.py b/kafka/util.py index 72ac521..78c3607 100644 --- a/kafka/util.py +++ b/kafka/util.py @@ -103,10 +103,12 @@ class ReentrantTimer(object): A timer that can be restarted, unlike threading.Timer (although this uses threading.Timer) - t: timer interval in milliseconds - fn: a callable to invoke - args: tuple of args to be passed to function - kwargs: keyword arguments to be passed to function + Arguments: + + t: timer interval in milliseconds + fn: a callable to invoke + args: tuple of args to be passed to function + kwargs: keyword arguments to be passed to function """ def __init__(self, t, fn, *args, **kwargs): @@ -124,7 +126,11 @@ class ReentrantTimer(object): self.active = None def _timer(self, active): - while not active.wait(self.t): + # python2.6 Event.wait() always returns None + # python2.7 and greater returns the flag value (true/false) + # we want the flag value, so add an 'or' here for python2.6 + # this is redundant for later python versions (FLAG OR FLAG == FLAG) + while not (active.wait(self.t) or active.is_set()): self.fn(*self.args, **self.kwargs) def start(self): @@ -144,3 +150,7 @@ class ReentrantTimer(object): self.thread.join(self.t + 1) # noinspection PyAttributeOutsideInit self.timer = None + self.fn = None + + def __del__(self): + self.stop() |
