summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--MANIFEST.in1
-rw-r--r--kafka/__init__.py2
-rw-r--r--kafka/client.py75
-rw-r--r--kafka/codec.py7
-rw-r--r--kafka/conn.py9
-rw-r--r--kafka/consumer/base.py9
-rw-r--r--kafka/consumer/kafka.py5
-rw-r--r--kafka/consumer/multiprocess.py121
-rw-r--r--kafka/consumer/simple.py32
-rw-r--r--kafka/partitioner/hashed.py22
-rw-r--r--kafka/producer/base.py31
-rw-r--r--kafka/protocol.py8
-rw-r--r--test/test_conn.py17
-rw-r--r--test/test_consumer_integration.py18
-rw-r--r--test/test_partitioner.py23
-rw-r--r--test/test_producer.py38
-rw-r--r--test/test_producer_integration.py22
17 files changed, 313 insertions, 127 deletions
diff --git a/MANIFEST.in b/MANIFEST.in
index bdd6505..1731afa 100644
--- a/MANIFEST.in
+++ b/MANIFEST.in
@@ -1 +1,2 @@
recursive-include kafka *.py
+include LICENSE
diff --git a/kafka/__init__.py b/kafka/__init__.py
index 396a8b8..2fc59c6 100644
--- a/kafka/__init__.py
+++ b/kafka/__init__.py
@@ -10,7 +10,7 @@ from kafka.protocol import (
create_message, create_gzip_message, create_snappy_message
)
from kafka.producer import SimpleProducer, KeyedProducer
-from kafka.partitioner import RoundRobinPartitioner, HashedPartitioner
+from kafka.partitioner import RoundRobinPartitioner, HashedPartitioner, Murmur2Partitioner
from kafka.consumer import SimpleConsumer, MultiProcessConsumer, KafkaConsumer
__all__ = [
diff --git a/kafka/client.py b/kafka/client.py
index cf180b4..810fa46 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -2,6 +2,7 @@ import collections
import copy
import functools
import logging
+import select
import time
import kafka.common
@@ -181,17 +182,26 @@ class KafkaClient(object):
brokers_for_payloads = []
payloads_by_broker = collections.defaultdict(list)
+ responses = {}
for payload in payloads:
- leader = self._get_leader_for_partition(payload.topic,
- payload.partition)
-
- payloads_by_broker[leader].append(payload)
- brokers_for_payloads.append(leader)
+ try:
+ leader = self._get_leader_for_partition(payload.topic,
+ payload.partition)
+ payloads_by_broker[leader].append(payload)
+ brokers_for_payloads.append(leader)
+ except KafkaUnavailableError as e:
+ log.warning('KafkaUnavailableError attempting to send request '
+ 'on topic %s partition %d', payload.topic, payload.partition)
+ topic_partition = (payload.topic, payload.partition)
+ responses[topic_partition] = FailedPayloadsError(payload)
# For each broker, send the list of request payloads
# and collect the responses and errors
- responses = {}
broker_failures = []
+
+ # For each KafkaConnection keep the real socket so that we can use
+ # a select to perform unblocking I/O
+ connections_by_socket = {}
for broker, payloads in payloads_by_broker.items():
requestId = self._next_id()
log.debug('Request %s to %s: %s', requestId, broker, payloads)
@@ -225,27 +235,34 @@ class KafkaClient(object):
topic_partition = (payload.topic, payload.partition)
responses[topic_partition] = None
continue
+ else:
+ connections_by_socket[conn.get_connected_socket()] = (conn, broker)
- try:
- response = conn.recv(requestId)
- except ConnectionError as e:
- broker_failures.append(broker)
- log.warning('ConnectionError attempting to receive a '
- 'response to request %s from server %s: %s',
- requestId, broker, e)
+ conn = None
+ while connections_by_socket:
+ sockets = connections_by_socket.keys()
+ rlist, _, _ = select.select(sockets, [], [], None)
+ conn, broker = connections_by_socket.pop(rlist[0])
+ try:
+ response = conn.recv(requestId)
+ except ConnectionError as e:
+ broker_failures.append(broker)
+ log.warning('ConnectionError attempting to receive a '
+ 'response to request %s from server %s: %s',
+ requestId, broker, e)
- for payload in payloads:
- topic_partition = (payload.topic, payload.partition)
- responses[topic_partition] = FailedPayloadsError(payload)
+ for payload in payloads_by_broker[broker]:
+ topic_partition = (payload.topic, payload.partition)
+ responses[topic_partition] = FailedPayloadsError(payload)
- else:
- _resps = []
- for payload_response in decoder_fn(response):
- topic_partition = (payload_response.topic,
- payload_response.partition)
- responses[topic_partition] = payload_response
- _resps.append(payload_response)
- log.debug('Response %s: %s', requestId, _resps)
+ else:
+ _resps = []
+ for payload_response in decoder_fn(response):
+ topic_partition = (payload_response.topic,
+ payload_response.partition)
+ responses[topic_partition] = payload_response
+ _resps.append(payload_response)
+ log.debug('Response %s: %s', requestId, _resps)
# Connection errors generally mean stale metadata
# although sometimes it means incorrect api request
@@ -473,8 +490,8 @@ class KafkaClient(object):
resp = self.send_metadata_request(topics)
- log.info('Updating broker metadata: %s', resp.brokers)
- log.info('Updating topic metadata: %s', resp.topics)
+ log.debug('Updating broker metadata: %s', resp.brokers)
+ log.debug('Updating topic metadata: %s', resp.topics)
self.brokers = dict([(broker.nodeId, broker)
for broker in resp.brokers])
@@ -590,7 +607,11 @@ class KafkaClient(object):
else:
decoder = KafkaProtocol.decode_produce_response
- resps = self._send_broker_aware_request(payloads, encoder, decoder)
+ try:
+ resps = self._send_broker_aware_request(payloads, encoder, decoder)
+ except Exception:
+ if fail_on_error:
+ raise
return [resp if not callback else callback(resp) for resp in resps
if resp is not None and
diff --git a/kafka/codec.py b/kafka/codec.py
index 19f405b..a9373c7 100644
--- a/kafka/codec.py
+++ b/kafka/codec.py
@@ -22,12 +22,15 @@ def has_snappy():
return _HAS_SNAPPY
-def gzip_encode(payload):
+def gzip_encode(payload, compresslevel=None):
+ if not compresslevel:
+ compresslevel = 9
+
with BytesIO() as buf:
# Gzip context manager introduced in python 2.6
# so old-fashioned way until we decide to not support 2.6
- gzipper = gzip.GzipFile(fileobj=buf, mode="w")
+ gzipper = gzip.GzipFile(fileobj=buf, mode="w", compresslevel=compresslevel)
try:
gzipper.write(payload)
finally:
diff --git a/kafka/conn.py b/kafka/conn.py
index 432e10b..9514e48 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -118,6 +118,11 @@ class KafkaConnection(local):
# TODO multiplex socket communication to allow for multi-threaded clients
+ def get_connected_socket(self):
+ if not self._sock:
+ self.reinit()
+ return self._sock
+
def send(self, request_id, payload):
"""
Send a request to Kafka
@@ -151,6 +156,10 @@ class KafkaConnection(local):
"""
log.debug("Reading response %d from Kafka" % request_id)
+ # Make sure we have a connection
+ if not self._sock:
+ self.reinit()
+
# Read the size off of the header
resp = self._read_bytes(4)
(size,) = struct.unpack('>i', resp)
diff --git a/kafka/consumer/base.py b/kafka/consumer/base.py
index 0800327..c9f6e48 100644
--- a/kafka/consumer/base.py
+++ b/kafka/consumer/base.py
@@ -29,6 +29,7 @@ ITER_TIMEOUT_SECONDS = 60
NO_MESSAGES_WAIT_TIME_SECONDS = 0.1
FULL_QUEUE_WAIT_TIME_SECONDS = 0.1
+MAX_BACKOFF_SECONDS = 60
class Consumer(object):
"""
@@ -83,6 +84,14 @@ class Consumer(object):
self._cleanup_func = cleanup
atexit.register(cleanup, self)
+ self.partition_info = False # Do not return partition info in msgs
+
+ def provide_partition_info(self):
+ """
+ Indicates that partition info must be returned by the consumer
+ """
+ self.partition_info = True
+
def fetch_last_known_offsets(self, partitions=None):
if self.group is None:
raise ValueError('KafkaClient.group must not be None')
diff --git a/kafka/consumer/kafka.py b/kafka/consumer/kafka.py
index 11c4221..2141101 100644
--- a/kafka/consumer/kafka.py
+++ b/kafka/consumer/kafka.py
@@ -53,6 +53,7 @@ DEPRECATED_CONFIG_KEYS = {
class KafkaConsumer(object):
"""A simpler kafka consumer"""
+ DEFAULT_CONFIG = deepcopy(DEFAULT_CONSUMER_CONFIG)
def __init__(self, *topics, **configs):
self.configure(**configs)
@@ -111,8 +112,8 @@ class KafkaConsumer(object):
"""
configs = self._deprecate_configs(**configs)
self._config = {}
- for key in DEFAULT_CONSUMER_CONFIG:
- self._config[key] = configs.pop(key, DEFAULT_CONSUMER_CONFIG[key])
+ for key in self.DEFAULT_CONFIG:
+ self._config[key] = configs.pop(key, self.DEFAULT_CONFIG[key])
if configs:
raise KafkaConfigurationError('Unknown configuration key(s): ' +
diff --git a/kafka/consumer/multiprocess.py b/kafka/consumer/multiprocess.py
index d03eb95..18a5014 100644
--- a/kafka/consumer/multiprocess.py
+++ b/kafka/consumer/multiprocess.py
@@ -4,16 +4,18 @@ from collections import namedtuple
import logging
from multiprocessing import Process, Manager as MPManager
try:
- from Queue import Empty, Full # python 3
+ import queue # python 3
except ImportError:
- from queue import Empty, Full # python 2
+ import Queue as queue # python 2
import time
+from ..common import KafkaError
from .base import (
Consumer,
AUTO_COMMIT_MSG_COUNT, AUTO_COMMIT_INTERVAL,
NO_MESSAGES_WAIT_TIME_SECONDS,
- FULL_QUEUE_WAIT_TIME_SECONDS
+ FULL_QUEUE_WAIT_TIME_SECONDS,
+ MAX_BACKOFF_SECONDS,
)
from .simple import SimpleConsumer
@@ -33,57 +35,67 @@ def _mp_consume(client, group, topic, queue, size, events, **consumer_options):
functionality breaks unless this function is kept outside of a class
"""
- # Make the child processes open separate socket connections
- client.reinit()
+ # Initial interval for retries in seconds.
+ interval = 1
+ while not events.exit.is_set():
+ try:
+ # Make the child processes open separate socket connections
+ client.reinit()
- # We will start consumers without auto-commit. Auto-commit will be
- # done by the master controller process.
- consumer = SimpleConsumer(client, group, topic,
- auto_commit=False,
- auto_commit_every_n=None,
- auto_commit_every_t=None,
- **consumer_options)
+ # We will start consumers without auto-commit. Auto-commit will be
+ # done by the master controller process.
+ consumer = SimpleConsumer(client, group, topic,
+ auto_commit=False,
+ auto_commit_every_n=None,
+ auto_commit_every_t=None,
+ **consumer_options)
- # Ensure that the consumer provides the partition information
- consumer.provide_partition_info()
+ # Ensure that the consumer provides the partition information
+ consumer.provide_partition_info()
- while True:
- # Wait till the controller indicates us to start consumption
- events.start.wait()
+ while True:
+ # Wait till the controller indicates us to start consumption
+ events.start.wait()
- # If we are asked to quit, do so
- if events.exit.is_set():
- break
+ # If we are asked to quit, do so
+ if events.exit.is_set():
+ break
- # Consume messages and add them to the queue. If the controller
- # indicates a specific number of messages, follow that advice
- count = 0
+ # Consume messages and add them to the queue. If the controller
+ # indicates a specific number of messages, follow that advice
+ count = 0
- message = consumer.get_message()
- if message:
- while True:
- try:
- queue.put(message, timeout=FULL_QUEUE_WAIT_TIME_SECONDS)
- break
- except Full:
- if events.exit.is_set(): break
+ message = consumer.get_message()
+ if message:
+ while True:
+ try:
+ queue.put(message, timeout=FULL_QUEUE_WAIT_TIME_SECONDS)
+ break
+ except queue.Full:
+ if events.exit.is_set(): break
+
+ count += 1
- count += 1
+ # We have reached the required size. The controller might have
+ # more than what he needs. Wait for a while.
+ # Without this logic, it is possible that we run into a big
+ # loop consuming all available messages before the controller
+ # can reset the 'start' event
+ if count == size.value:
+ events.pause.wait()
- # We have reached the required size. The controller might have
- # more than what he needs. Wait for a while.
- # Without this logic, it is possible that we run into a big
- # loop consuming all available messages before the controller
- # can reset the 'start' event
- if count == size.value:
- events.pause.wait()
+ else:
+ # In case we did not receive any message, give up the CPU for
+ # a while before we try again
+ time.sleep(NO_MESSAGES_WAIT_TIME_SECONDS)
- else:
- # In case we did not receive any message, give up the CPU for
- # a while before we try again
- time.sleep(NO_MESSAGES_WAIT_TIME_SECONDS)
+ consumer.stop()
- consumer.stop()
+ except KafkaError as e:
+ # Retry with exponential backoff
+ log.error("Problem communicating with Kafka (%s), retrying in %d seconds..." % (e, interval))
+ time.sleep(interval)
+ interval = interval*2 if interval*2 < MAX_BACKOFF_SECONDS else MAX_BACKOFF_SECONDS
class MultiProcessConsumer(Consumer):
@@ -208,7 +220,7 @@ class MultiProcessConsumer(Consumer):
# TODO: This is a hack and will make the consumer block for
# at least one second. Need to find a better way of doing this
partition, message = self.queue.get(block=True, timeout=1)
- except Empty:
+ except queue.Empty:
break
# Count, check and commit messages if necessary
@@ -226,10 +238,12 @@ class MultiProcessConsumer(Consumer):
Keyword Arguments:
count: Indicates the maximum number of messages to be fetched
- block: If True, the API will block till some messages are fetched.
- timeout: If block is True, the function will block for the specified
- time (in seconds) until count messages is fetched. If None,
- it will block forever.
+ block: If True, the API will block till all messages are fetched.
+ If block is a positive integer the API will block until that
+ many messages are fetched.
+ timeout: When blocking is requested the function will block for
+ the specified time (in seconds) until count messages is
+ fetched. If None, it will block forever.
"""
messages = []
@@ -252,12 +266,15 @@ class MultiProcessConsumer(Consumer):
if self.queue.empty():
self.events.start.set()
+ block_next_call = block is True or block > len(messages)
try:
- partition, message = self.queue.get(block, timeout)
- except Empty:
+ partition, message = self.queue.get(block_next_call,
+ timeout)
+ except queue.Empty:
break
- messages.append(message)
+ _msg = (partition, message) if self.partition_info else message
+ messages.append(_msg)
new_offsets[partition] = message.offset + 1
count -= 1
if timeout is not None:
diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py
index 733baa8..aad229a 100644
--- a/kafka/consumer/simple.py
+++ b/kafka/consumer/simple.py
@@ -6,9 +6,9 @@ except ImportError:
from itertools import izip_longest as izip_longest, repeat # python 2
import logging
try:
- from Queue import Empty, Queue # python 3
+ import queue # python 3
except ImportError:
- from queue import Empty, Queue # python 2
+ import Queue as queue # python 2
import sys
import time
@@ -131,13 +131,12 @@ class SimpleConsumer(Consumer):
(buffer_size, max_buffer_size))
self.buffer_size = buffer_size
self.max_buffer_size = max_buffer_size
- self.partition_info = False # Do not return partition info in msgs
self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME
self.fetch_min_bytes = fetch_size_bytes
self.fetch_offsets = self.offsets.copy()
self.iter_timeout = iter_timeout
self.auto_offset_reset = auto_offset_reset
- self.queue = Queue()
+ self.queue = queue.Queue()
def __repr__(self):
return '<SimpleConsumer group=%s, topic=%s, partitions=%s>' % \
@@ -182,12 +181,6 @@ class SimpleConsumer(Consumer):
self.fetch_offsets[partition] = resp.offsets[0]
return resp.offsets[0]
- def provide_partition_info(self):
- """
- Indicates that partition info must be returned by the consumer
- """
- self.partition_info = True
-
def seek(self, offset, whence=None, partition=None):
"""
Alter the current offset in the consumer, similar to fseek
@@ -264,7 +257,7 @@ class SimpleConsumer(Consumer):
if self.auto_commit:
self.commit()
- self.queue = Queue()
+ self.queue = queue.Queue()
def get_messages(self, count=1, block=True, timeout=0.1):
"""
@@ -272,10 +265,12 @@ class SimpleConsumer(Consumer):
Keyword Arguments:
count: Indicates the maximum number of messages to be fetched
- block: If True, the API will block till some messages are fetched.
- timeout: If block is True, the function will block for the specified
- time (in seconds) until count messages is fetched. If None,
- it will block forever.
+ block: If True, the API will block till all messages are fetched.
+ If block is a positive integer the API will block until that
+ many messages are fetched.
+ timeout: When blocking is requested the function will block for
+ the specified time (in seconds) until count messages is
+ fetched. If None, it will block forever.
"""
messages = []
if timeout is not None:
@@ -286,12 +281,13 @@ class SimpleConsumer(Consumer):
while len(messages) < count:
block_time = timeout - time.time()
log.debug('calling _get_message block=%s timeout=%s', block, block_time)
- result = self._get_message(block, block_time,
+ block_next_call = block is True or block > len(messages)
+ result = self._get_message(block_next_call, block_time,
get_partition_info=True,
update_offset=False)
log.debug('got %s from _get_messages', result)
if not result:
- if block and (timeout is None or time.time() <= timeout):
+ if block_next_call and (timeout is None or time.time() <= timeout):
continue
break
@@ -345,7 +341,7 @@ class SimpleConsumer(Consumer):
return partition, message
else:
return message
- except Empty:
+ except queue.Empty:
log.debug('internal queue empty after fetch - returning None')
return None
diff --git a/kafka/partitioner/hashed.py b/kafka/partitioner/hashed.py
index 6393ce2..d5d6d27 100644
--- a/kafka/partitioner/hashed.py
+++ b/kafka/partitioner/hashed.py
@@ -1,3 +1,5 @@
+import six
+
from .base import Partitioner
@@ -43,14 +45,16 @@ def murmur2(key):
Based on java client, see org.apache.kafka.common.utils.Utils.murmur2
Args:
- key: if not a bytearray, converted via bytearray(str(key))
+ key: if not a bytes type, encoded using default encoding
Returns: MurmurHash2 of key bytearray
"""
- # Convert key to a bytearray
- if not isinstance(key, bytearray):
- data = bytearray(str(key))
+ # Convert key to bytes or bytearray
+ if isinstance(key, bytearray) or (six.PY3 and isinstance(key, bytes)):
+ data = key
+ else:
+ data = bytearray(str(key).encode())
length = len(data)
seed = 0x9747b28c
@@ -61,7 +65,7 @@ def murmur2(key):
# Initialize the hash to a random value
h = seed ^ length
- length4 = length / 4
+ length4 = length // 4
for i in range(length4):
i4 = i * 4
@@ -84,15 +88,13 @@ def murmur2(key):
# Handle the last few bytes of the input array
extra_bytes = length % 4
- if extra_bytes == 3:
+ if extra_bytes >= 3:
h ^= (data[(length & ~3) + 2] & 0xff) << 16
h &= 0xffffffff
-
- if extra_bytes == 2:
+ if extra_bytes >= 2:
h ^= (data[(length & ~3) + 1] & 0xff) << 8
h &= 0xffffffff
-
- if extra_bytes == 1:
+ if extra_bytes >= 1:
h ^= (data[length & ~3] & 0xff)
h &= 0xffffffff
h *= m
diff --git a/kafka/producer/base.py b/kafka/producer/base.py
index 3c826cd..8774c66 100644
--- a/kafka/producer/base.py
+++ b/kafka/producer/base.py
@@ -47,7 +47,8 @@ SYNC_FAIL_ON_ERROR_DEFAULT = True
def _send_upstream(queue, client, codec, batch_time, batch_size,
req_acks, ack_timeout, retry_options, stop_event,
log_messages_on_error=ASYNC_LOG_MESSAGES_ON_ERROR,
- stop_timeout=ASYNC_STOP_TIMEOUT_SECS):
+ stop_timeout=ASYNC_STOP_TIMEOUT_SECS,
+ codec_compresslevel=None):
"""Private method to manage producing messages asynchronously
Listens on the queue for a specified number of messages or until
@@ -123,7 +124,7 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
# Send collected requests upstream
for topic_partition, msg in msgset.items():
- messages = create_message_set(msg, codec, key)
+ messages = create_message_set(msg, codec, key, codec_compresslevel)
req = ProduceRequest(topic_partition.topic,
topic_partition.partition,
tuple(messages))
@@ -185,7 +186,10 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
# refresh topic metadata before next retry
if retry_state['do_refresh']:
log.warn('Async producer forcing metadata refresh metadata before retrying')
- client.load_metadata_for_topics()
+ try:
+ client.load_metadata_for_topics()
+ except Exception as e:
+ log.error("Async producer couldn't reload topic metadata. Error: `%s`", e.message)
# Apply retry limit, dropping messages that are over
request_tries = dict(
@@ -267,6 +271,7 @@ class Producer(object):
req_acks=ACK_AFTER_LOCAL_WRITE,
ack_timeout=DEFAULT_ACK_TIMEOUT,
codec=None,
+ codec_compresslevel=None,
sync_fail_on_error=SYNC_FAIL_ON_ERROR_DEFAULT,
async=False,
batch_send=False, # deprecated, use async
@@ -297,6 +302,7 @@ class Producer(object):
raise UnsupportedCodecError("Codec 0x%02x unsupported" % codec)
self.codec = codec
+ self.codec_compresslevel = codec_compresslevel
if self.async:
# Messages are sent through this queue
@@ -314,7 +320,8 @@ class Producer(object):
self.req_acks, self.ack_timeout,
async_retry_options, self.thread_stop_event),
kwargs={'log_messages_on_error': async_log_messages_on_error,
- 'stop_timeout': async_stop_timeout}
+ 'stop_timeout': async_stop_timeout,
+ 'codec_compresslevel': self.codec_compresslevel}
)
# Thread will die if main thread exits
@@ -322,7 +329,7 @@ class Producer(object):
self.thread.start()
def cleanup(obj):
- if obj.stopped:
+ if not obj.stopped:
obj.stop()
self._cleanup_func = cleanup
atexit.register(cleanup, self)
@@ -355,9 +362,15 @@ class Producer(object):
if not isinstance(msg, (list, tuple)):
raise TypeError("msg is not a list or tuple!")
- # Raise TypeError if any message is not encoded as bytes
- if any(not isinstance(m, six.binary_type) for m in msg):
- raise TypeError("all produce message payloads must be type bytes")
+ for m in msg:
+ # The protocol allows to have key & payload with null values both,
+ # (https://goo.gl/o694yN) but having (null,null) pair doesn't make sense.
+ if m is None:
+ if key is None:
+ raise TypeError("key and payload can't be null in one")
+ # Raise TypeError if any non-null message is not encoded as bytes
+ elif not isinstance(m, six.binary_type):
+ raise TypeError("all produce message payloads must be null or type bytes")
# Raise TypeError if topic is not encoded as bytes
if not isinstance(topic, six.binary_type):
@@ -382,7 +395,7 @@ class Producer(object):
'Current queue size %d.' % self.queue.qsize())
resp = []
else:
- messages = create_message_set([(m, key) for m in msg], self.codec, key)
+ messages = create_message_set([(m, key) for m in msg], self.codec, key, self.codec_compresslevel)
req = ProduceRequest(topic, partition, messages)
try:
resp = self.client.send_produce_request(
diff --git a/kafka/protocol.py b/kafka/protocol.py
index 1f3ea2f..412a957 100644
--- a/kafka/protocol.py
+++ b/kafka/protocol.py
@@ -588,7 +588,7 @@ def create_message(payload, key=None):
return Message(0, 0, key, payload)
-def create_gzip_message(payloads, key=None):
+def create_gzip_message(payloads, key=None, compresslevel=None):
"""
Construct a Gzipped Message containing multiple Messages
@@ -603,7 +603,7 @@ def create_gzip_message(payloads, key=None):
message_set = KafkaProtocol._encode_message_set(
[create_message(payload, pl_key) for payload, pl_key in payloads])
- gzipped = gzip_encode(message_set)
+ gzipped = gzip_encode(message_set, compresslevel=compresslevel)
codec = ATTRIBUTE_CODEC_MASK & CODEC_GZIP
return Message(0, 0x00 | codec, key, gzipped)
@@ -630,7 +630,7 @@ def create_snappy_message(payloads, key=None):
return Message(0, 0x00 | codec, key, snapped)
-def create_message_set(messages, codec=CODEC_NONE, key=None):
+def create_message_set(messages, codec=CODEC_NONE, key=None, compresslevel=None):
"""Create a message set using the given codec.
If codec is CODEC_NONE, return a list of raw Kafka messages. Otherwise,
@@ -639,7 +639,7 @@ def create_message_set(messages, codec=CODEC_NONE, key=None):
if codec == CODEC_NONE:
return [create_message(m, k) for m, k in messages]
elif codec == CODEC_GZIP:
- return [create_gzip_message(messages, key)]
+ return [create_gzip_message(messages, key, compresslevel)]
elif codec == CODEC_SNAPPY:
return [create_snappy_message(messages, key)]
else:
diff --git a/test/test_conn.py b/test/test_conn.py
index 2b70344..1bdfc1e 100644
--- a/test/test_conn.py
+++ b/test/test_conn.py
@@ -165,6 +165,23 @@ class ConnTest(unittest.TestCase):
self.assertEqual(self.conn.recv(self.config['request_id']), self.config['payload'])
self.assertEqual(self.conn.recv(self.config['request_id']), self.config['payload2'])
+ def test_get_connected_socket(self):
+ s = self.conn.get_connected_socket()
+
+ self.assertEqual(s, self.MockCreateConn())
+
+ def test_get_connected_socket_on_dirty_conn(self):
+ # Dirty the connection
+ try:
+ self.conn._raise_connection_error()
+ except ConnectionError:
+ pass
+
+ # Test that get_connected_socket tries to connect
+ self.assertEqual(self.MockCreateConn.call_count, 0)
+ self.conn.get_connected_socket()
+ self.assertEqual(self.MockCreateConn.call_count, 1)
+
def test_close__object_is_reusable(self):
# test that sending to a closed connection
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py
index 52b3e85..fee53f5 100644
--- a/test/test_consumer_integration.py
+++ b/test/test_consumer_integration.py
@@ -204,6 +204,14 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
self.assert_message_count(messages, 5)
self.assertGreaterEqual(t.interval, 1)
+ # Ask for 10 messages, 5 in queue, ask to block for 1 message or 1
+ # second, get 5 back, no blocking
+ self.send_messages(0, range(0, 5))
+ with Timer() as t:
+ messages = consumer.get_messages(count=10, block=1, timeout=1)
+ self.assert_message_count(messages, 5)
+ self.assertLessEqual(t.interval, 1)
+
consumer.stop()
@kafka_versions("all")
@@ -272,6 +280,16 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
self.assert_message_count(messages, 5)
self.assertGreaterEqual(t.interval, 1)
+ # Ask for 10 messages, 5 in queue, ask to block for 1 message or 1
+ # second, get at least one back, no blocking
+ self.send_messages(0, range(0, 5))
+ with Timer() as t:
+ messages = consumer.get_messages(count=10, block=1, timeout=1)
+ received_message_count = len(messages)
+ self.assertGreaterEqual(received_message_count, 1)
+ self.assert_message_count(messages, received_message_count)
+ self.assertLessEqual(t.interval, 1)
+
consumer.stop()
@kafka_versions("all")
diff --git a/test/test_partitioner.py b/test/test_partitioner.py
new file mode 100644
index 0000000..67cd83b
--- /dev/null
+++ b/test/test_partitioner.py
@@ -0,0 +1,23 @@
+import six
+from . import unittest
+
+from kafka.partitioner import (Murmur2Partitioner)
+
+class TestMurmurPartitioner(unittest.TestCase):
+ def test_hash_bytes(self):
+ p = Murmur2Partitioner(range(1000))
+ self.assertEqual(p.partition(bytearray(b'test')), p.partition(b'test'))
+
+ def test_hash_encoding(self):
+ p = Murmur2Partitioner(range(1000))
+ self.assertEqual(p.partition('test'), p.partition(u'test'))
+
+ def test_murmur2_java_compatibility(self):
+ p = Murmur2Partitioner(range(1000))
+ # compare with output from Kafka's org.apache.kafka.clients.producer.Partitioner
+ self.assertEqual(681, p.partition(b''))
+ self.assertEqual(524, p.partition(b'a'))
+ self.assertEqual(434, p.partition(b'ab'))
+ self.assertEqual(107, p.partition(b'abc'))
+ self.assertEqual(566, p.partition(b'123456789'))
+ self.assertEqual(742, p.partition(b'\x00 '))
diff --git a/test/test_producer.py b/test/test_producer.py
index 27272f6..3c026e8 100644
--- a/test/test_producer.py
+++ b/test/test_producer.py
@@ -7,7 +7,7 @@ import time
from mock import MagicMock, patch
from . import unittest
-from kafka import KafkaClient, SimpleProducer
+from kafka import KafkaClient, SimpleProducer, KeyedProducer
from kafka.common import (
AsyncProducerQueueFull, FailedPayloadsError, NotLeaderForPartitionError,
ProduceResponse, RetryOptions, TopicAndPartition
@@ -33,7 +33,8 @@ class TestKafkaProducer(unittest.TestCase):
topic = b"test-topic"
partition = 0
- bad_data_types = (u'你怎么样?', 12, ['a', 'list'], ('a', 'tuple'), {'a': 'dict'})
+ bad_data_types = (u'你怎么样?', 12, ['a', 'list'],
+ ('a', 'tuple'), {'a': 'dict'}, None,)
for m in bad_data_types:
with self.assertRaises(TypeError):
logging.debug("attempting to send message of type %s", type(m))
@@ -44,6 +45,25 @@ class TestKafkaProducer(unittest.TestCase):
# This should not raise an exception
producer.send_messages(topic, partition, m)
+ def test_keyedproducer_message_types(self):
+ client = MagicMock()
+ client.get_partition_ids_for_topic.return_value = [0, 1]
+ producer = KeyedProducer(client)
+ topic = b"test-topic"
+ key = b"testkey"
+
+ bad_data_types = (u'你怎么样?', 12, ['a', 'list'],
+ ('a', 'tuple'), {'a': 'dict'},)
+ for m in bad_data_types:
+ with self.assertRaises(TypeError):
+ logging.debug("attempting to send message of type %s", type(m))
+ producer.send_messages(topic, key, m)
+
+ good_data_types = (b'a string!', None,)
+ for m in good_data_types:
+ # This should not raise an exception
+ producer.send_messages(topic, key, m)
+
def test_topic_message_types(self):
client = MagicMock()
@@ -91,6 +111,20 @@ class TestKafkaProducer(unittest.TestCase):
with self.assertRaises(FailedPayloadsError):
producer.send_messages('foobar', b'test message')
+ def test_cleanup_stop_is_called_on_not_stopped_object(self):
+ producer = Producer(MagicMock(), async=True)
+ producer.stopped = True
+ with patch('kafka.producer.base.Producer.stop') as base_stop:
+ producer._cleanup_func(producer)
+ self.assertEqual(base_stop.call_count, 0)
+
+ def test_cleanup_stop_is_not_called_on_stopped_object(self):
+ producer = Producer(MagicMock(), async=True)
+ producer.stopped = False
+ with patch('kafka.producer.base.Producer.stop') as base_stop:
+ producer._cleanup_func(producer)
+ self.assertEqual(base_stop.call_count, 1)
+
class TestKafkaProducerSendUpstream(unittest.TestCase):
diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py
index abf34c3..46b6851 100644
--- a/test/test_producer_integration.py
+++ b/test/test_producer_integration.py
@@ -341,6 +341,28 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
# KeyedProducer Tests #
############################
+ @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.0")
+ def test_keyedproducer_null_payload(self):
+ partitions = self.client.get_partition_ids_for_topic(self.topic)
+ start_offsets = [self.current_offset(self.topic, p) for p in partitions]
+
+ producer = KeyedProducer(self.client, partitioner=RoundRobinPartitioner)
+ key = "test"
+
+ resp = producer.send_messages(self.topic, self.key("key1"), self.msg("one"))
+ self.assert_produce_response(resp, start_offsets[0])
+ resp = producer.send_messages(self.topic, self.key("key2"), None)
+ self.assert_produce_response(resp, start_offsets[1])
+ resp = producer.send_messages(self.topic, self.key("key3"), None)
+ self.assert_produce_response(resp, start_offsets[0]+1)
+ resp = producer.send_messages(self.topic, self.key("key4"), self.msg("four"))
+ self.assert_produce_response(resp, start_offsets[1]+1)
+
+ self.assert_fetch_offset(partitions[0], start_offsets[0], [ self.msg("one"), None ])
+ self.assert_fetch_offset(partitions[1], start_offsets[1], [ None, self.msg("four") ])
+
+ producer.stop()
+
@kafka_versions("all")
def test_round_robin_partitioner(self):
partitions = self.client.get_partition_ids_for_topic(self.topic)