summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/codec.py55
-rw-r--r--kafka/common.py49
-rw-r--r--kafka/consumer/base.py1
-rw-r--r--kafka/consumer/multiprocess.py109
-rw-r--r--kafka/consumer/simple.py2
-rw-r--r--kafka/producer/base.py4
-rw-r--r--kafka/protocol.py6
-rw-r--r--pylint.rc2
-rw-r--r--test/test_consumer_integration.py5
-rw-r--r--test/test_producer_integration.py16
-rw-r--r--test/test_protocol.py66
-rw-r--r--tox.ini2
12 files changed, 205 insertions, 112 deletions
diff --git a/kafka/codec.py b/kafka/codec.py
index 2279200..19f405b 100644
--- a/kafka/codec.py
+++ b/kafka/codec.py
@@ -1,8 +1,7 @@
-from io import BytesIO
import gzip
+from io import BytesIO
import struct
-import six
from six.moves import xrange
_XERIAL_V1_HEADER = (-126, b'S', b'N', b'A', b'P', b'P', b'Y', 0, 1, 1)
@@ -10,9 +9,9 @@ _XERIAL_V1_FORMAT = 'bccccccBii'
try:
import snappy
- _has_snappy = True
+ _HAS_SNAPPY = True
except ImportError:
- _has_snappy = False
+ _HAS_SNAPPY = False
def has_gzip():
@@ -20,26 +19,36 @@ def has_gzip():
def has_snappy():
- return _has_snappy
+ return _HAS_SNAPPY
def gzip_encode(payload):
- buffer = BytesIO()
- handle = gzip.GzipFile(fileobj=buffer, mode="w")
- handle.write(payload)
- handle.close()
- buffer.seek(0)
- result = buffer.read()
- buffer.close()
+ with BytesIO() as buf:
+
+ # Gzip context manager introduced in python 2.6
+ # so old-fashioned way until we decide to not support 2.6
+ gzipper = gzip.GzipFile(fileobj=buf, mode="w")
+ try:
+ gzipper.write(payload)
+ finally:
+ gzipper.close()
+
+ result = buf.getvalue()
+
return result
def gzip_decode(payload):
- buffer = BytesIO(payload)
- handle = gzip.GzipFile(fileobj=buffer, mode='r')
- result = handle.read()
- handle.close()
- buffer.close()
+ with BytesIO(payload) as buf:
+
+ # Gzip context manager introduced in python 2.6
+ # so old-fashioned way until we decide to not support 2.6
+ gzipper = gzip.GzipFile(fileobj=buf, mode='r')
+ try:
+ result = gzipper.read()
+ finally:
+ gzipper.close()
+
return result
@@ -47,8 +56,8 @@ def snappy_encode(payload, xerial_compatible=False, xerial_blocksize=32 * 1024):
"""Encodes the given data with snappy if xerial_compatible is set then the
stream is encoded in a fashion compatible with the xerial snappy library
- The block size (xerial_blocksize) controls how frequent the blocking occurs
- 32k is the default in the xerial library.
+ The block size (xerial_blocksize) controls how frequent the blocking
+ occurs 32k is the default in the xerial library.
The format winds up being
+-------------+------------+--------------+------------+--------------+
@@ -63,7 +72,7 @@ def snappy_encode(payload, xerial_compatible=False, xerial_blocksize=32 * 1024):
length will always be <= blocksize.
"""
- if not _has_snappy:
+ if not has_snappy():
raise NotImplementedError("Snappy codec is not available")
if xerial_compatible:
@@ -74,7 +83,7 @@ def snappy_encode(payload, xerial_compatible=False, xerial_blocksize=32 * 1024):
out = BytesIO()
header = b''.join([struct.pack('!' + fmt, dat) for fmt, dat
- in zip(_XERIAL_V1_FORMAT, _XERIAL_V1_HEADER)])
+ in zip(_XERIAL_V1_FORMAT, _XERIAL_V1_HEADER)])
out.write(header)
for chunk in _chunker():
@@ -113,13 +122,13 @@ def _detect_xerial_stream(payload):
"""
if len(payload) > 16:
- header = header = struct.unpack('!' + _XERIAL_V1_FORMAT, bytes(payload)[:16])
+ header = struct.unpack('!' + _XERIAL_V1_FORMAT, bytes(payload)[:16])
return header == _XERIAL_V1_HEADER
return False
def snappy_decode(payload):
- if not _has_snappy:
+ if not has_snappy():
raise NotImplementedError("Snappy codec is not available")
if _detect_xerial_stream(payload):
diff --git a/kafka/common.py b/kafka/common.py
index f40c9fa..8207bec 100644
--- a/kafka/common.py
+++ b/kafka/common.py
@@ -6,43 +6,46 @@ from collections import namedtuple
# Structs #
###############
-# Request payloads
-ProduceRequest = namedtuple("ProduceRequest",
- ["topic", "partition", "messages"])
-
-FetchRequest = namedtuple("FetchRequest",
- ["topic", "partition", "offset", "max_bytes"])
-
-OffsetRequest = namedtuple("OffsetRequest",
- ["topic", "partition", "time", "max_offsets"])
-
-OffsetCommitRequest = namedtuple("OffsetCommitRequest",
- ["topic", "partition", "offset", "metadata"])
-
+# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI
MetadataRequest = namedtuple("MetadataRequest",
["topics"])
-OffsetFetchRequest = namedtuple("OffsetFetchRequest", ["topic", "partition"])
-
MetadataResponse = namedtuple("MetadataResponse",
["brokers", "topics"])
-# Response payloads
+# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ProduceAPI
+ProduceRequest = namedtuple("ProduceRequest",
+ ["topic", "partition", "messages"])
+
ProduceResponse = namedtuple("ProduceResponse",
- ["topic", "partition", "error", "offset"])
+ ["topic", "partition", "error", "offset"])
+
+# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-FetchAPI
+FetchRequest = namedtuple("FetchRequest",
+ ["topic", "partition", "offset", "max_bytes"])
-FetchResponse = namedtuple("FetchResponse", ["topic", "partition", "error",
- "highwaterMark", "messages"])
+FetchResponse = namedtuple("FetchResponse",
+ ["topic", "partition", "error", "highwaterMark", "messages"])
+
+# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetAPI
+OffsetRequest = namedtuple("OffsetRequest",
+ ["topic", "partition", "time", "max_offsets"])
OffsetResponse = namedtuple("OffsetResponse",
- ["topic", "partition", "error", "offsets"])
+ ["topic", "partition", "error", "offsets"])
+
+# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI
+OffsetCommitRequest = namedtuple("OffsetCommitRequest",
+ ["topic", "partition", "offset", "metadata"])
OffsetCommitResponse = namedtuple("OffsetCommitResponse",
- ["topic", "partition", "error"])
+ ["topic", "partition", "error"])
+
+OffsetFetchRequest = namedtuple("OffsetFetchRequest",
+ ["topic", "partition"])
OffsetFetchResponse = namedtuple("OffsetFetchResponse",
- ["topic", "partition", "offset",
- "metadata", "error"])
+ ["topic", "partition", "offset", "metadata", "error"])
diff --git a/kafka/consumer/base.py b/kafka/consumer/base.py
index 9cdcf89..efc9404 100644
--- a/kafka/consumer/base.py
+++ b/kafka/consumer/base.py
@@ -25,6 +25,7 @@ MAX_FETCH_BUFFER_SIZE_BYTES = FETCH_BUFFER_SIZE_BYTES * 8
ITER_TIMEOUT_SECONDS = 60
NO_MESSAGES_WAIT_TIME_SECONDS = 0.1
+FULL_QUEUE_WAIT_TIME_SECONDS = 0.1
class Consumer(object):
diff --git a/kafka/consumer/multiprocess.py b/kafka/consumer/multiprocess.py
index 4dc04dc..5ce8b4d 100644
--- a/kafka/consumer/multiprocess.py
+++ b/kafka/consumer/multiprocess.py
@@ -2,23 +2,27 @@ from __future__ import absolute_import
import logging
import time
-from multiprocessing import Process, Queue as MPQueue, Event, Value
+
+from collections import namedtuple
+from multiprocessing import Process, Manager as MPManager
try:
- from Queue import Empty
+ from Queue import Empty, Full
except ImportError: # python 2
- from queue import Empty
+ from queue import Empty, Full
from .base import (
AUTO_COMMIT_MSG_COUNT, AUTO_COMMIT_INTERVAL,
- NO_MESSAGES_WAIT_TIME_SECONDS
+ NO_MESSAGES_WAIT_TIME_SECONDS,
+ FULL_QUEUE_WAIT_TIME_SECONDS
)
from .simple import Consumer, SimpleConsumer
-log = logging.getLogger("kafka")
+Events = namedtuple("Events", ["start", "pause", "exit"])
+log = logging.getLogger("kafka")
-def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size):
+def _mp_consume(client, group, topic, queue, size, events, **consumer_options):
"""
A child process worker which consumes messages based on the
notifications given by the controller process
@@ -34,20 +38,20 @@ def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size):
# We will start consumers without auto-commit. Auto-commit will be
# done by the master controller process.
consumer = SimpleConsumer(client, group, topic,
- partitions=chunk,
auto_commit=False,
auto_commit_every_n=None,
- auto_commit_every_t=None)
+ auto_commit_every_t=None,
+ **consumer_options)
# Ensure that the consumer provides the partition information
consumer.provide_partition_info()
while True:
# Wait till the controller indicates us to start consumption
- start.wait()
+ events.start.wait()
# If we are asked to quit, do so
- if exit.is_set():
+ if events.exit.is_set():
break
# Consume messages and add them to the queue. If the controller
@@ -56,7 +60,13 @@ def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size):
message = consumer.get_message()
if message:
- queue.put(message)
+ while True:
+ try:
+ queue.put(message, timeout=FULL_QUEUE_WAIT_TIME_SECONDS)
+ break
+ except Full:
+ if events.exit.is_set(): break
+
count += 1
# We have reached the required size. The controller might have
@@ -65,7 +75,7 @@ def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size):
# loop consuming all available messages before the controller
# can reset the 'start' event
if count == size.value:
- pause.wait()
+ events.pause.wait()
else:
# In case we did not receive any message, give up the CPU for
@@ -105,7 +115,8 @@ class MultiProcessConsumer(Consumer):
def __init__(self, client, group, topic, auto_commit=True,
auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,
auto_commit_every_t=AUTO_COMMIT_INTERVAL,
- num_procs=1, partitions_per_proc=0):
+ num_procs=1, partitions_per_proc=0,
+ **simple_consumer_options):
# Initiate the base consumer class
super(MultiProcessConsumer, self).__init__(
@@ -117,36 +128,42 @@ class MultiProcessConsumer(Consumer):
# Variables for managing and controlling the data flow from
# consumer child process to master
- self.queue = MPQueue(1024) # Child consumers dump messages into this
- self.start = Event() # Indicates the consumers to start fetch
- self.exit = Event() # Requests the consumers to shutdown
- self.pause = Event() # Requests the consumers to pause fetch
- self.size = Value('i', 0) # Indicator of number of messages to fetch
-
- partitions = self.offsets.keys()
-
- # If unspecified, start one consumer per partition
+ manager = MPManager()
+ self.queue = manager.Queue(1024) # Child consumers dump messages into this
+ self.events = Events(
+ start = manager.Event(), # Indicates the consumers to start fetch
+ exit = manager.Event(), # Requests the consumers to shutdown
+ pause = manager.Event()) # Requests the consumers to pause fetch
+ self.size = manager.Value('i', 0) # Indicator of number of messages to fetch
+
+ # dict.keys() returns a view in py3 + it's not a thread-safe operation
+ # http://blog.labix.org/2008/06/27/watch-out-for-listdictkeys-in-python-3
+ # It's safer to copy dict as it only runs during the init.
+ partitions = list(self.offsets.copy().keys())
+
+ # By default, start one consumer process for all partitions
# The logic below ensures that
# * we do not cross the num_procs limit
# * we have an even distribution of partitions among processes
- if not partitions_per_proc:
- partitions_per_proc = round(len(partitions) * 1.0 / num_procs)
- if partitions_per_proc < num_procs * 0.5:
- partitions_per_proc += 1
+
+ if partitions_per_proc:
+ num_procs = len(partitions) / partitions_per_proc
+ if num_procs * partitions_per_proc < len(partitions):
+ num_procs += 1
# The final set of chunks
- chunker = lambda *x: [] + list(x)
- chunks = map(chunker, *[iter(partitions)] * int(partitions_per_proc))
+ chunks = [partitions[proc::num_procs] for proc in range(num_procs)]
self.procs = []
for chunk in chunks:
- chunk = filter(lambda x: x is not None, chunk)
- args = (client.copy(),
- group, topic, list(chunk),
- self.queue, self.start, self.exit,
- self.pause, self.size)
-
- proc = Process(target=_mp_consume, args=args)
+ options = {'partitions': list(chunk)}
+ if simple_consumer_options:
+ simple_consumer_options.pop('partitions', None)
+ options.update(simple_consumer_options)
+
+ args = (client.copy(), group, topic, self.queue,
+ self.size, self.events)
+ proc = Process(target=_mp_consume, args=args, kwargs=options)
proc.daemon = True
proc.start()
self.procs.append(proc)
@@ -157,9 +174,9 @@ class MultiProcessConsumer(Consumer):
def stop(self):
# Set exit and start off all waiting consumers
- self.exit.set()
- self.pause.set()
- self.start.set()
+ self.events.exit.set()
+ self.events.pause.set()
+ self.events.start.set()
for proc in self.procs:
proc.join()
@@ -174,10 +191,10 @@ class MultiProcessConsumer(Consumer):
# Trigger the consumer procs to start off.
# We will iterate till there are no more messages available
self.size.value = 0
- self.pause.set()
+ self.events.pause.set()
while True:
- self.start.set()
+ self.events.start.set()
try:
# We will block for a small while so that the consumers get
# a chance to run and put some messages in the queue
@@ -189,12 +206,12 @@ class MultiProcessConsumer(Consumer):
# Count, check and commit messages if necessary
self.offsets[partition] = message.offset + 1
- self.start.clear()
+ self.events.start.clear()
self.count_since_commit += 1
self._auto_commit()
yield message
- self.start.clear()
+ self.events.start.clear()
def get_messages(self, count=1, block=True, timeout=10):
"""
@@ -214,7 +231,7 @@ class MultiProcessConsumer(Consumer):
# necessary, but these will not be committed to kafka. Also, the extra
# messages can be provided in subsequent runs
self.size.value = count
- self.pause.clear()
+ self.events.pause.clear()
if timeout is not None:
max_time = time.time() + timeout
@@ -226,7 +243,7 @@ class MultiProcessConsumer(Consumer):
# go into overdrive and keep consuming thousands of
# messages when the user might need only a few
if self.queue.empty():
- self.start.set()
+ self.events.start.set()
try:
partition, message = self.queue.get(block, timeout)
@@ -240,8 +257,8 @@ class MultiProcessConsumer(Consumer):
timeout = max_time - time.time()
self.size.value = 0
- self.start.clear()
- self.pause.set()
+ self.events.start.clear()
+ self.events.pause.set()
# Update and commit offsets if necessary
self.offsets.update(new_offsets)
diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py
index 3d250ea..b50de61 100644
--- a/kafka/consumer/simple.py
+++ b/kafka/consumer/simple.py
@@ -214,8 +214,8 @@ class SimpleConsumer(Consumer):
# Reset queue and fetch offsets since they are invalid
self.fetch_offsets = self.offsets.copy()
+ self.count_since_commit += 1
if self.auto_commit:
- self.count_since_commit += 1
self.commit()
self.queue = Queue()
diff --git a/kafka/producer/base.py b/kafka/producer/base.py
index e32b168..6a5a94e 100644
--- a/kafka/producer/base.py
+++ b/kafka/producer/base.py
@@ -58,7 +58,7 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
# Adjust the timeout to match the remaining period
count -= 1
timeout = send_at - time.time()
- msgset[topic_partition].append(msg)
+ msgset[topic_partition].append((msg, key))
# Send collected requests upstream
reqs = []
@@ -192,7 +192,7 @@ class Producer(object):
self.queue.put((TopicAndPartition(topic, partition), m, key))
resp = []
else:
- messages = create_message_set(msg, self.codec, key)
+ messages = create_message_set([(m, key) for m in msg], self.codec, key)
req = ProduceRequest(topic, partition, messages)
try:
resp = self.client.send_produce_request([req], acks=self.req_acks,
diff --git a/kafka/protocol.py b/kafka/protocol.py
index 2a39de6..b34a95d 100644
--- a/kafka/protocol.py
+++ b/kafka/protocol.py
@@ -559,7 +559,7 @@ def create_gzip_message(payloads, key=None):
"""
message_set = KafkaProtocol._encode_message_set(
- [create_message(payload, key) for payload in payloads])
+ [create_message(payload, pl_key) for payload, pl_key in payloads])
gzipped = gzip_encode(message_set)
codec = ATTRIBUTE_CODEC_MASK & CODEC_GZIP
@@ -580,7 +580,7 @@ def create_snappy_message(payloads, key=None):
"""
message_set = KafkaProtocol._encode_message_set(
- [create_message(payload, key) for payload in payloads])
+ [create_message(payload, pl_key) for payload, pl_key in payloads])
snapped = snappy_encode(message_set)
codec = ATTRIBUTE_CODEC_MASK & CODEC_SNAPPY
@@ -595,7 +595,7 @@ def create_message_set(messages, codec=CODEC_NONE, key=None):
return a list containing a single codec-encoded message.
"""
if codec == CODEC_NONE:
- return [create_message(m, key) for m in messages]
+ return [create_message(m, k) for m, k in messages]
elif codec == CODEC_GZIP:
return [create_gzip_message(messages, key)]
elif codec == CODEC_SNAPPY:
diff --git a/pylint.rc b/pylint.rc
new file mode 100644
index 0000000..1e76d8c
--- /dev/null
+++ b/pylint.rc
@@ -0,0 +1,2 @@
+[TYPECHECK]
+ignored-classes=SyncManager
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py
index 9c89190..d3df56a 100644
--- a/test/test_consumer_integration.py
+++ b/test/test_consumer_integration.py
@@ -61,7 +61,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
group = kwargs.pop('group', self.id().encode('utf-8'))
topic = kwargs.pop('topic', self.topic)
- if consumer_class == SimpleConsumer:
+ if consumer_class in [SimpleConsumer, MultiProcessConsumer]:
kwargs.setdefault('iter_timeout', 0)
return consumer_class(self.client, group, topic, **kwargs)
@@ -243,7 +243,8 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
self.send_messages(0, range(0, 10))
self.send_messages(1, range(10, 20))
- consumer = MultiProcessConsumer(self.client, "group1", self.topic, auto_commit=False)
+ consumer = MultiProcessConsumer(self.client, "group1", self.topic,
+ auto_commit=False, iter_timeout=0)
self.assertEqual(consumer.pending(), 20)
self.assertEqual(consumer.pending(partitions=[0]), 10)
diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py
index 38df69f..1804af0 100644
--- a/test/test_producer_integration.py
+++ b/test/test_producer_integration.py
@@ -71,9 +71,9 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
start_offset = self.current_offset(self.topic, 0)
message1 = create_gzip_message([
- ("Gzipped 1 %d" % i).encode('utf-8') for i in range(100)])
+ (("Gzipped 1 %d" % i).encode('utf-8'), None) for i in range(100)])
message2 = create_gzip_message([
- ("Gzipped 2 %d" % i).encode('utf-8') for i in range(100)])
+ (("Gzipped 2 %d" % i).encode('utf-8'), None) for i in range(100)])
self.assert_produce_request(
[ message1, message2 ],
@@ -87,8 +87,8 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
start_offset = self.current_offset(self.topic, 0)
self.assert_produce_request([
- create_snappy_message(["Snappy 1 %d" % i for i in range(100)]),
- create_snappy_message(["Snappy 2 %d" % i for i in range(100)]),
+ create_snappy_message([("Snappy 1 %d" % i, None) for i in range(100)]),
+ create_snappy_message([("Snappy 2 %d" % i, None) for i in range(100)]),
],
start_offset,
200,
@@ -102,13 +102,13 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
messages = [
create_message(b"Just a plain message"),
create_gzip_message([
- ("Gzipped %d" % i).encode('utf-8') for i in range(100)]),
+ (("Gzipped %d" % i).encode('utf-8'), None) for i in range(100)]),
]
# All snappy integration tests fail with nosnappyjava
if False and has_snappy():
msg_count += 100
- messages.append(create_snappy_message(["Snappy %d" % i for i in range(100)]))
+ messages.append(create_snappy_message([("Snappy %d" % i, None) for i in range(100)]))
self.assert_produce_request(messages, start_offset, msg_count)
@@ -118,7 +118,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
self.assert_produce_request([
create_gzip_message([
- ("Gzipped batch 1, message %d" % i).encode('utf-8')
+ (("Gzipped batch 1, message %d" % i).encode('utf-8'), None)
for i in range(50000)])
],
start_offset,
@@ -127,7 +127,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
self.assert_produce_request([
create_gzip_message([
- ("Gzipped batch 1, message %d" % i).encode('utf-8')
+ (("Gzipped batch 1, message %d" % i).encode('utf-8'), None)
for i in range(50000)])
],
start_offset+50000,
diff --git a/test/test_protocol.py b/test/test_protocol.py
index d20f591..0938228 100644
--- a/test/test_protocol.py
+++ b/test/test_protocol.py
@@ -32,7 +32,7 @@ class TestProtocol(unittest.TestCase):
self.assertEqual(msg.value, payload)
def test_create_gzip(self):
- payloads = [b"v1", b"v2"]
+ payloads = [(b"v1", None), (b"v2", None)]
msg = create_gzip_message(payloads)
self.assertEqual(msg.magic, 0)
self.assertEqual(msg.attributes, ATTRIBUTE_CODEC_MASK & CODEC_GZIP)
@@ -59,9 +59,39 @@ class TestProtocol(unittest.TestCase):
self.assertEqual(decoded, expect)
+ def test_create_gzip_keyed(self):
+ payloads = [(b"v1", b"k1"), (b"v2", b"k2")]
+ msg = create_gzip_message(payloads)
+ self.assertEqual(msg.magic, 0)
+ self.assertEqual(msg.attributes, ATTRIBUTE_CODEC_MASK & CODEC_GZIP)
+ self.assertEqual(msg.key, None)
+ # Need to decode to check since gzipped payload is non-deterministic
+ decoded = gzip_decode(msg.value)
+ expect = b"".join([
+ struct.pack(">q", 0), # MsgSet Offset
+ struct.pack(">i", 18), # Msg Size
+ struct.pack(">i", 1474775406), # CRC
+ struct.pack(">bb", 0, 0), # Magic, flags
+ struct.pack(">i", 2), # Length of key
+ b"k1", # Key
+ struct.pack(">i", 2), # Length of value
+ b"v1", # Value
+
+ struct.pack(">q", 0), # MsgSet Offset
+ struct.pack(">i", 18), # Msg Size
+ struct.pack(">i", -16383415), # CRC
+ struct.pack(">bb", 0, 0), # Magic, flags
+ struct.pack(">i", 2), # Length of key
+ b"k2", # Key
+ struct.pack(">i", 2), # Length of value
+ b"v2", # Value
+ ])
+
+ self.assertEqual(decoded, expect)
+
@unittest.skipUnless(has_snappy(), "Snappy not available")
def test_create_snappy(self):
- payloads = [b"v1", b"v2"]
+ payloads = [(b"v1", None), (b"v2", None)]
msg = create_snappy_message(payloads)
self.assertEqual(msg.magic, 0)
self.assertEqual(msg.attributes, ATTRIBUTE_CODEC_MASK & CODEC_SNAPPY)
@@ -87,6 +117,36 @@ class TestProtocol(unittest.TestCase):
self.assertEqual(decoded, expect)
+ @unittest.skipUnless(has_snappy(), "Snappy not available")
+ def test_create_snappy_keyed(self):
+ payloads = [(b"v1", b"k1"), (b"v2", b"k2")]
+ msg = create_snappy_message(payloads)
+ self.assertEqual(msg.magic, 0)
+ self.assertEqual(msg.attributes, ATTRIBUTE_CODEC_MASK & CODEC_SNAPPY)
+ self.assertEqual(msg.key, None)
+ decoded = snappy_decode(msg.value)
+ expect = b"".join([
+ struct.pack(">q", 0), # MsgSet Offset
+ struct.pack(">i", 18), # Msg Size
+ struct.pack(">i", 1474775406), # CRC
+ struct.pack(">bb", 0, 0), # Magic, flags
+ struct.pack(">i", 2), # Length of key
+ b"k1", # Key
+ struct.pack(">i", 2), # Length of value
+ b"v1", # Value
+
+ struct.pack(">q", 0), # MsgSet Offset
+ struct.pack(">i", 18), # Msg Size
+ struct.pack(">i", -16383415), # CRC
+ struct.pack(">bb", 0, 0), # Magic, flags
+ struct.pack(">i", 2), # Length of key
+ b"k2", # Key
+ struct.pack(">i", 2), # Length of value
+ b"v2", # Value
+ ])
+
+ self.assertEqual(decoded, expect)
+
def test_encode_message_header(self):
expect = b"".join([
struct.pack(">h", 10), # API Key
@@ -701,7 +761,7 @@ class TestProtocol(unittest.TestCase):
yield
def test_create_message_set(self):
- messages = [1, 2, 3]
+ messages = [(1, "k1"), (2, "k2"), (3, "k3")]
# Default codec is CODEC_NONE. Expect list of regular messages.
expect = [sentinel.message] * len(messages)
diff --git a/tox.ini b/tox.ini
index 71565fd..fba7d8e 100644
--- a/tox.ini
+++ b/tox.ini
@@ -37,7 +37,7 @@ deps =
unittest2
mock
pylint
-commands = pylint {posargs: -E kafka test}
+commands = pylint --rcfile=pylint.rc {posargs: -E kafka test}
[testenv:docs]
deps =