diff options
-rw-r--r-- | MANIFEST.in | 1 | ||||
-rw-r--r-- | kafka/client.py | 21 | ||||
-rw-r--r-- | kafka/codec.py | 7 | ||||
-rw-r--r-- | kafka/conn.py | 4 | ||||
-rw-r--r-- | kafka/consumer/base.py | 9 | ||||
-rw-r--r-- | kafka/consumer/multiprocess.py | 99 | ||||
-rw-r--r-- | kafka/consumer/simple.py | 7 | ||||
-rw-r--r-- | kafka/partitioner/hashed.py | 22 | ||||
-rw-r--r-- | kafka/producer/base.py | 26 | ||||
-rw-r--r-- | kafka/protocol.py | 8 | ||||
-rw-r--r-- | test/test_partitioner.py | 23 | ||||
-rw-r--r-- | test/test_producer.py | 38 | ||||
-rw-r--r-- | test/test_producer_integration.py | 22 |
13 files changed, 203 insertions, 84 deletions
diff --git a/MANIFEST.in b/MANIFEST.in index bdd6505..1731afa 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1 +1,2 @@ recursive-include kafka *.py +include LICENSE diff --git a/kafka/client.py b/kafka/client.py index 817c621..13777a4 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -161,16 +161,21 @@ class KafkaClient(object): brokers_for_payloads = [] payloads_by_broker = collections.defaultdict(list) + responses = {} for payload in payloads: - leader = self._get_leader_for_partition(payload.topic, - payload.partition) - - payloads_by_broker[leader].append(payload) - brokers_for_payloads.append(leader) + try: + leader = self._get_leader_for_partition(payload.topic, + payload.partition) + payloads_by_broker[leader].append(payload) + brokers_for_payloads.append(leader) + except KafkaUnavailableError as e: + log.warning('KafkaUnavailableError attempting to send request ' + 'on topic %s partition %d', payload.topic, payload.partition) + topic_partition = (payload.topic, payload.partition) + responses[topic_partition] = FailedPayloadsError(payload) # For each broker, send the list of request payloads # and collect the responses and errors - responses = {} broker_failures = [] for broker, payloads in payloads_by_broker.items(): requestId = self._next_id() @@ -363,8 +368,8 @@ class KafkaClient(object): resp = self.send_metadata_request(topics) - log.info('Updating broker metadata: %s', resp.brokers) - log.info('Updating topic metadata: %s', resp.topics) + log.debug('Updating broker metadata: %s', resp.brokers) + log.debug('Updating topic metadata: %s', resp.topics) self.brokers = dict([(broker.nodeId, broker) for broker in resp.brokers]) diff --git a/kafka/codec.py b/kafka/codec.py index 19f405b..a9373c7 100644 --- a/kafka/codec.py +++ b/kafka/codec.py @@ -22,12 +22,15 @@ def has_snappy(): return _HAS_SNAPPY -def gzip_encode(payload): +def gzip_encode(payload, compresslevel=None): + if not compresslevel: + compresslevel = 9 + with BytesIO() as buf: # Gzip context manager introduced in python 2.6 # so old-fashioned way until we decide to not support 2.6 - gzipper = gzip.GzipFile(fileobj=buf, mode="w") + gzipper = gzip.GzipFile(fileobj=buf, mode="w", compresslevel=compresslevel) try: gzipper.write(payload) finally: diff --git a/kafka/conn.py b/kafka/conn.py index 432e10b..e6a1f74 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -151,6 +151,10 @@ class KafkaConnection(local): """ log.debug("Reading response %d from Kafka" % request_id) + # Make sure we have a connection + if not self._sock: + self.reinit() + # Read the size off of the header resp = self._read_bytes(4) (size,) = struct.unpack('>i', resp) diff --git a/kafka/consumer/base.py b/kafka/consumer/base.py index 0800327..c9f6e48 100644 --- a/kafka/consumer/base.py +++ b/kafka/consumer/base.py @@ -29,6 +29,7 @@ ITER_TIMEOUT_SECONDS = 60 NO_MESSAGES_WAIT_TIME_SECONDS = 0.1 FULL_QUEUE_WAIT_TIME_SECONDS = 0.1 +MAX_BACKOFF_SECONDS = 60 class Consumer(object): """ @@ -83,6 +84,14 @@ class Consumer(object): self._cleanup_func = cleanup atexit.register(cleanup, self) + self.partition_info = False # Do not return partition info in msgs + + def provide_partition_info(self): + """ + Indicates that partition info must be returned by the consumer + """ + self.partition_info = True + def fetch_last_known_offsets(self, partitions=None): if self.group is None: raise ValueError('KafkaClient.group must not be None') diff --git a/kafka/consumer/multiprocess.py b/kafka/consumer/multiprocess.py index d03eb95..0b09102 100644 --- a/kafka/consumer/multiprocess.py +++ b/kafka/consumer/multiprocess.py @@ -9,11 +9,13 @@ except ImportError: from queue import Empty, Full # python 2 import time +from ..common import KafkaError from .base import ( Consumer, AUTO_COMMIT_MSG_COUNT, AUTO_COMMIT_INTERVAL, NO_MESSAGES_WAIT_TIME_SECONDS, - FULL_QUEUE_WAIT_TIME_SECONDS + FULL_QUEUE_WAIT_TIME_SECONDS, + MAX_BACKOFF_SECONDS, ) from .simple import SimpleConsumer @@ -33,57 +35,67 @@ def _mp_consume(client, group, topic, queue, size, events, **consumer_options): functionality breaks unless this function is kept outside of a class """ - # Make the child processes open separate socket connections - client.reinit() + # Initial interval for retries in seconds. + interval = 1 + while not events.exit.is_set(): + try: + # Make the child processes open separate socket connections + client.reinit() - # We will start consumers without auto-commit. Auto-commit will be - # done by the master controller process. - consumer = SimpleConsumer(client, group, topic, - auto_commit=False, - auto_commit_every_n=None, - auto_commit_every_t=None, - **consumer_options) + # We will start consumers without auto-commit. Auto-commit will be + # done by the master controller process. + consumer = SimpleConsumer(client, group, topic, + auto_commit=False, + auto_commit_every_n=None, + auto_commit_every_t=None, + **consumer_options) - # Ensure that the consumer provides the partition information - consumer.provide_partition_info() + # Ensure that the consumer provides the partition information + consumer.provide_partition_info() - while True: - # Wait till the controller indicates us to start consumption - events.start.wait() + while True: + # Wait till the controller indicates us to start consumption + events.start.wait() - # If we are asked to quit, do so - if events.exit.is_set(): - break + # If we are asked to quit, do so + if events.exit.is_set(): + break - # Consume messages and add them to the queue. If the controller - # indicates a specific number of messages, follow that advice - count = 0 + # Consume messages and add them to the queue. If the controller + # indicates a specific number of messages, follow that advice + count = 0 - message = consumer.get_message() - if message: - while True: - try: - queue.put(message, timeout=FULL_QUEUE_WAIT_TIME_SECONDS) - break - except Full: - if events.exit.is_set(): break + message = consumer.get_message() + if message: + while True: + try: + queue.put(message, timeout=FULL_QUEUE_WAIT_TIME_SECONDS) + break + except Full: + if events.exit.is_set(): break + + count += 1 - count += 1 + # We have reached the required size. The controller might have + # more than what he needs. Wait for a while. + # Without this logic, it is possible that we run into a big + # loop consuming all available messages before the controller + # can reset the 'start' event + if count == size.value: + events.pause.wait() - # We have reached the required size. The controller might have - # more than what he needs. Wait for a while. - # Without this logic, it is possible that we run into a big - # loop consuming all available messages before the controller - # can reset the 'start' event - if count == size.value: - events.pause.wait() + else: + # In case we did not receive any message, give up the CPU for + # a while before we try again + time.sleep(NO_MESSAGES_WAIT_TIME_SECONDS) - else: - # In case we did not receive any message, give up the CPU for - # a while before we try again - time.sleep(NO_MESSAGES_WAIT_TIME_SECONDS) + consumer.stop() - consumer.stop() + except KafkaError as e: + # Retry with exponential backoff + log.error("Problem communicating with Kafka (%s), retrying in %d seconds..." % (e, interval)) + time.sleep(interval) + interval = interval*2 if interval*2 < MAX_BACKOFF_SECONDS else MAX_BACKOFF_SECONDS class MultiProcessConsumer(Consumer): @@ -257,7 +269,8 @@ class MultiProcessConsumer(Consumer): except Empty: break - messages.append(message) + _msg = (partition, message) if self.partition_info else message + messages.append(_msg) new_offsets[partition] = message.offset + 1 count -= 1 if timeout is not None: diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py index 733baa8..9b85f8c 100644 --- a/kafka/consumer/simple.py +++ b/kafka/consumer/simple.py @@ -131,7 +131,6 @@ class SimpleConsumer(Consumer): (buffer_size, max_buffer_size)) self.buffer_size = buffer_size self.max_buffer_size = max_buffer_size - self.partition_info = False # Do not return partition info in msgs self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME self.fetch_min_bytes = fetch_size_bytes self.fetch_offsets = self.offsets.copy() @@ -182,12 +181,6 @@ class SimpleConsumer(Consumer): self.fetch_offsets[partition] = resp.offsets[0] return resp.offsets[0] - def provide_partition_info(self): - """ - Indicates that partition info must be returned by the consumer - """ - self.partition_info = True - def seek(self, offset, whence=None, partition=None): """ Alter the current offset in the consumer, similar to fseek diff --git a/kafka/partitioner/hashed.py b/kafka/partitioner/hashed.py index 6393ce2..d5d6d27 100644 --- a/kafka/partitioner/hashed.py +++ b/kafka/partitioner/hashed.py @@ -1,3 +1,5 @@ +import six + from .base import Partitioner @@ -43,14 +45,16 @@ def murmur2(key): Based on java client, see org.apache.kafka.common.utils.Utils.murmur2 Args: - key: if not a bytearray, converted via bytearray(str(key)) + key: if not a bytes type, encoded using default encoding Returns: MurmurHash2 of key bytearray """ - # Convert key to a bytearray - if not isinstance(key, bytearray): - data = bytearray(str(key)) + # Convert key to bytes or bytearray + if isinstance(key, bytearray) or (six.PY3 and isinstance(key, bytes)): + data = key + else: + data = bytearray(str(key).encode()) length = len(data) seed = 0x9747b28c @@ -61,7 +65,7 @@ def murmur2(key): # Initialize the hash to a random value h = seed ^ length - length4 = length / 4 + length4 = length // 4 for i in range(length4): i4 = i * 4 @@ -84,15 +88,13 @@ def murmur2(key): # Handle the last few bytes of the input array extra_bytes = length % 4 - if extra_bytes == 3: + if extra_bytes >= 3: h ^= (data[(length & ~3) + 2] & 0xff) << 16 h &= 0xffffffff - - if extra_bytes == 2: + if extra_bytes >= 2: h ^= (data[(length & ~3) + 1] & 0xff) << 8 h &= 0xffffffff - - if extra_bytes == 1: + if extra_bytes >= 1: h ^= (data[length & ~3] & 0xff) h &= 0xffffffff h *= m diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 3c826cd..bfbdcf8 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -47,7 +47,8 @@ SYNC_FAIL_ON_ERROR_DEFAULT = True def _send_upstream(queue, client, codec, batch_time, batch_size, req_acks, ack_timeout, retry_options, stop_event, log_messages_on_error=ASYNC_LOG_MESSAGES_ON_ERROR, - stop_timeout=ASYNC_STOP_TIMEOUT_SECS): + stop_timeout=ASYNC_STOP_TIMEOUT_SECS, + codec_compresslevel=None): """Private method to manage producing messages asynchronously Listens on the queue for a specified number of messages or until @@ -123,7 +124,7 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, # Send collected requests upstream for topic_partition, msg in msgset.items(): - messages = create_message_set(msg, codec, key) + messages = create_message_set(msg, codec, key, codec_compresslevel) req = ProduceRequest(topic_partition.topic, topic_partition.partition, tuple(messages)) @@ -267,6 +268,7 @@ class Producer(object): req_acks=ACK_AFTER_LOCAL_WRITE, ack_timeout=DEFAULT_ACK_TIMEOUT, codec=None, + codec_compresslevel=None, sync_fail_on_error=SYNC_FAIL_ON_ERROR_DEFAULT, async=False, batch_send=False, # deprecated, use async @@ -297,6 +299,7 @@ class Producer(object): raise UnsupportedCodecError("Codec 0x%02x unsupported" % codec) self.codec = codec + self.codec_compresslevel = codec_compresslevel if self.async: # Messages are sent through this queue @@ -314,7 +317,8 @@ class Producer(object): self.req_acks, self.ack_timeout, async_retry_options, self.thread_stop_event), kwargs={'log_messages_on_error': async_log_messages_on_error, - 'stop_timeout': async_stop_timeout} + 'stop_timeout': async_stop_timeout, + 'codec_compresslevel': self.codec_compresslevel} ) # Thread will die if main thread exits @@ -322,7 +326,7 @@ class Producer(object): self.thread.start() def cleanup(obj): - if obj.stopped: + if not obj.stopped: obj.stop() self._cleanup_func = cleanup atexit.register(cleanup, self) @@ -355,9 +359,15 @@ class Producer(object): if not isinstance(msg, (list, tuple)): raise TypeError("msg is not a list or tuple!") - # Raise TypeError if any message is not encoded as bytes - if any(not isinstance(m, six.binary_type) for m in msg): - raise TypeError("all produce message payloads must be type bytes") + for m in msg: + # The protocol allows to have key & payload with null values both, + # (https://goo.gl/o694yN) but having (null,null) pair doesn't make sense. + if m is None: + if key is None: + raise TypeError("key and payload can't be null in one") + # Raise TypeError if any non-null message is not encoded as bytes + elif not isinstance(m, six.binary_type): + raise TypeError("all produce message payloads must be null or type bytes") # Raise TypeError if topic is not encoded as bytes if not isinstance(topic, six.binary_type): @@ -382,7 +392,7 @@ class Producer(object): 'Current queue size %d.' % self.queue.qsize()) resp = [] else: - messages = create_message_set([(m, key) for m in msg], self.codec, key) + messages = create_message_set([(m, key) for m in msg], self.codec, key, self.codec_compresslevel) req = ProduceRequest(topic, partition, messages) try: resp = self.client.send_produce_request( diff --git a/kafka/protocol.py b/kafka/protocol.py index d5adf89..a916974 100644 --- a/kafka/protocol.py +++ b/kafka/protocol.py @@ -547,7 +547,7 @@ def create_message(payload, key=None): return Message(0, 0, key, payload) -def create_gzip_message(payloads, key=None): +def create_gzip_message(payloads, key=None, compresslevel=None): """ Construct a Gzipped Message containing multiple Messages @@ -562,7 +562,7 @@ def create_gzip_message(payloads, key=None): message_set = KafkaProtocol._encode_message_set( [create_message(payload, pl_key) for payload, pl_key in payloads]) - gzipped = gzip_encode(message_set) + gzipped = gzip_encode(message_set, compresslevel=compresslevel) codec = ATTRIBUTE_CODEC_MASK & CODEC_GZIP return Message(0, 0x00 | codec, key, gzipped) @@ -589,7 +589,7 @@ def create_snappy_message(payloads, key=None): return Message(0, 0x00 | codec, key, snapped) -def create_message_set(messages, codec=CODEC_NONE, key=None): +def create_message_set(messages, codec=CODEC_NONE, key=None, compresslevel=None): """Create a message set using the given codec. If codec is CODEC_NONE, return a list of raw Kafka messages. Otherwise, @@ -598,7 +598,7 @@ def create_message_set(messages, codec=CODEC_NONE, key=None): if codec == CODEC_NONE: return [create_message(m, k) for m, k in messages] elif codec == CODEC_GZIP: - return [create_gzip_message(messages, key)] + return [create_gzip_message(messages, key, compresslevel)] elif codec == CODEC_SNAPPY: return [create_snappy_message(messages, key)] else: diff --git a/test/test_partitioner.py b/test/test_partitioner.py new file mode 100644 index 0000000..67cd83b --- /dev/null +++ b/test/test_partitioner.py @@ -0,0 +1,23 @@ +import six +from . import unittest + +from kafka.partitioner import (Murmur2Partitioner) + +class TestMurmurPartitioner(unittest.TestCase): + def test_hash_bytes(self): + p = Murmur2Partitioner(range(1000)) + self.assertEqual(p.partition(bytearray(b'test')), p.partition(b'test')) + + def test_hash_encoding(self): + p = Murmur2Partitioner(range(1000)) + self.assertEqual(p.partition('test'), p.partition(u'test')) + + def test_murmur2_java_compatibility(self): + p = Murmur2Partitioner(range(1000)) + # compare with output from Kafka's org.apache.kafka.clients.producer.Partitioner + self.assertEqual(681, p.partition(b'')) + self.assertEqual(524, p.partition(b'a')) + self.assertEqual(434, p.partition(b'ab')) + self.assertEqual(107, p.partition(b'abc')) + self.assertEqual(566, p.partition(b'123456789')) + self.assertEqual(742, p.partition(b'\x00 ')) diff --git a/test/test_producer.py b/test/test_producer.py index 27272f6..3c026e8 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -7,7 +7,7 @@ import time from mock import MagicMock, patch from . import unittest -from kafka import KafkaClient, SimpleProducer +from kafka import KafkaClient, SimpleProducer, KeyedProducer from kafka.common import ( AsyncProducerQueueFull, FailedPayloadsError, NotLeaderForPartitionError, ProduceResponse, RetryOptions, TopicAndPartition @@ -33,7 +33,8 @@ class TestKafkaProducer(unittest.TestCase): topic = b"test-topic" partition = 0 - bad_data_types = (u'你怎么样?', 12, ['a', 'list'], ('a', 'tuple'), {'a': 'dict'}) + bad_data_types = (u'你怎么样?', 12, ['a', 'list'], + ('a', 'tuple'), {'a': 'dict'}, None,) for m in bad_data_types: with self.assertRaises(TypeError): logging.debug("attempting to send message of type %s", type(m)) @@ -44,6 +45,25 @@ class TestKafkaProducer(unittest.TestCase): # This should not raise an exception producer.send_messages(topic, partition, m) + def test_keyedproducer_message_types(self): + client = MagicMock() + client.get_partition_ids_for_topic.return_value = [0, 1] + producer = KeyedProducer(client) + topic = b"test-topic" + key = b"testkey" + + bad_data_types = (u'你怎么样?', 12, ['a', 'list'], + ('a', 'tuple'), {'a': 'dict'},) + for m in bad_data_types: + with self.assertRaises(TypeError): + logging.debug("attempting to send message of type %s", type(m)) + producer.send_messages(topic, key, m) + + good_data_types = (b'a string!', None,) + for m in good_data_types: + # This should not raise an exception + producer.send_messages(topic, key, m) + def test_topic_message_types(self): client = MagicMock() @@ -91,6 +111,20 @@ class TestKafkaProducer(unittest.TestCase): with self.assertRaises(FailedPayloadsError): producer.send_messages('foobar', b'test message') + def test_cleanup_stop_is_called_on_not_stopped_object(self): + producer = Producer(MagicMock(), async=True) + producer.stopped = True + with patch('kafka.producer.base.Producer.stop') as base_stop: + producer._cleanup_func(producer) + self.assertEqual(base_stop.call_count, 0) + + def test_cleanup_stop_is_not_called_on_stopped_object(self): + producer = Producer(MagicMock(), async=True) + producer.stopped = False + with patch('kafka.producer.base.Producer.stop') as base_stop: + producer._cleanup_func(producer) + self.assertEqual(base_stop.call_count, 1) + class TestKafkaProducerSendUpstream(unittest.TestCase): diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py index abf34c3..46b6851 100644 --- a/test/test_producer_integration.py +++ b/test/test_producer_integration.py @@ -341,6 +341,28 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): # KeyedProducer Tests # ############################ + @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.0") + def test_keyedproducer_null_payload(self): + partitions = self.client.get_partition_ids_for_topic(self.topic) + start_offsets = [self.current_offset(self.topic, p) for p in partitions] + + producer = KeyedProducer(self.client, partitioner=RoundRobinPartitioner) + key = "test" + + resp = producer.send_messages(self.topic, self.key("key1"), self.msg("one")) + self.assert_produce_response(resp, start_offsets[0]) + resp = producer.send_messages(self.topic, self.key("key2"), None) + self.assert_produce_response(resp, start_offsets[1]) + resp = producer.send_messages(self.topic, self.key("key3"), None) + self.assert_produce_response(resp, start_offsets[0]+1) + resp = producer.send_messages(self.topic, self.key("key4"), self.msg("four")) + self.assert_produce_response(resp, start_offsets[1]+1) + + self.assert_fetch_offset(partitions[0], start_offsets[0], [ self.msg("one"), None ]) + self.assert_fetch_offset(partitions[1], start_offsets[1], [ None, self.msg("four") ]) + + producer.stop() + @kafka_versions("all") def test_round_robin_partitioner(self): partitions = self.client.get_partition_ids_for_topic(self.topic) |