diff options
-rw-r--r-- | kafka/codec.py | 55 | ||||
-rw-r--r-- | kafka/common.py | 49 | ||||
-rw-r--r-- | kafka/consumer/base.py | 1 | ||||
-rw-r--r-- | kafka/consumer/multiprocess.py | 109 | ||||
-rw-r--r-- | kafka/consumer/simple.py | 2 | ||||
-rw-r--r-- | kafka/producer/base.py | 4 | ||||
-rw-r--r-- | kafka/protocol.py | 6 | ||||
-rw-r--r-- | pylint.rc | 2 | ||||
-rw-r--r-- | test/test_consumer_integration.py | 5 | ||||
-rw-r--r-- | test/test_producer_integration.py | 16 | ||||
-rw-r--r-- | test/test_protocol.py | 66 | ||||
-rw-r--r-- | tox.ini | 2 |
12 files changed, 205 insertions, 112 deletions
diff --git a/kafka/codec.py b/kafka/codec.py index 2279200..19f405b 100644 --- a/kafka/codec.py +++ b/kafka/codec.py @@ -1,8 +1,7 @@ -from io import BytesIO import gzip +from io import BytesIO import struct -import six from six.moves import xrange _XERIAL_V1_HEADER = (-126, b'S', b'N', b'A', b'P', b'P', b'Y', 0, 1, 1) @@ -10,9 +9,9 @@ _XERIAL_V1_FORMAT = 'bccccccBii' try: import snappy - _has_snappy = True + _HAS_SNAPPY = True except ImportError: - _has_snappy = False + _HAS_SNAPPY = False def has_gzip(): @@ -20,26 +19,36 @@ def has_gzip(): def has_snappy(): - return _has_snappy + return _HAS_SNAPPY def gzip_encode(payload): - buffer = BytesIO() - handle = gzip.GzipFile(fileobj=buffer, mode="w") - handle.write(payload) - handle.close() - buffer.seek(0) - result = buffer.read() - buffer.close() + with BytesIO() as buf: + + # Gzip context manager introduced in python 2.6 + # so old-fashioned way until we decide to not support 2.6 + gzipper = gzip.GzipFile(fileobj=buf, mode="w") + try: + gzipper.write(payload) + finally: + gzipper.close() + + result = buf.getvalue() + return result def gzip_decode(payload): - buffer = BytesIO(payload) - handle = gzip.GzipFile(fileobj=buffer, mode='r') - result = handle.read() - handle.close() - buffer.close() + with BytesIO(payload) as buf: + + # Gzip context manager introduced in python 2.6 + # so old-fashioned way until we decide to not support 2.6 + gzipper = gzip.GzipFile(fileobj=buf, mode='r') + try: + result = gzipper.read() + finally: + gzipper.close() + return result @@ -47,8 +56,8 @@ def snappy_encode(payload, xerial_compatible=False, xerial_blocksize=32 * 1024): """Encodes the given data with snappy if xerial_compatible is set then the stream is encoded in a fashion compatible with the xerial snappy library - The block size (xerial_blocksize) controls how frequent the blocking occurs - 32k is the default in the xerial library. + The block size (xerial_blocksize) controls how frequent the blocking + occurs 32k is the default in the xerial library. The format winds up being +-------------+------------+--------------+------------+--------------+ @@ -63,7 +72,7 @@ def snappy_encode(payload, xerial_compatible=False, xerial_blocksize=32 * 1024): length will always be <= blocksize. """ - if not _has_snappy: + if not has_snappy(): raise NotImplementedError("Snappy codec is not available") if xerial_compatible: @@ -74,7 +83,7 @@ def snappy_encode(payload, xerial_compatible=False, xerial_blocksize=32 * 1024): out = BytesIO() header = b''.join([struct.pack('!' + fmt, dat) for fmt, dat - in zip(_XERIAL_V1_FORMAT, _XERIAL_V1_HEADER)]) + in zip(_XERIAL_V1_FORMAT, _XERIAL_V1_HEADER)]) out.write(header) for chunk in _chunker(): @@ -113,13 +122,13 @@ def _detect_xerial_stream(payload): """ if len(payload) > 16: - header = header = struct.unpack('!' + _XERIAL_V1_FORMAT, bytes(payload)[:16]) + header = struct.unpack('!' + _XERIAL_V1_FORMAT, bytes(payload)[:16]) return header == _XERIAL_V1_HEADER return False def snappy_decode(payload): - if not _has_snappy: + if not has_snappy(): raise NotImplementedError("Snappy codec is not available") if _detect_xerial_stream(payload): diff --git a/kafka/common.py b/kafka/common.py index f40c9fa..8207bec 100644 --- a/kafka/common.py +++ b/kafka/common.py @@ -6,43 +6,46 @@ from collections import namedtuple # Structs # ############### -# Request payloads -ProduceRequest = namedtuple("ProduceRequest", - ["topic", "partition", "messages"]) - -FetchRequest = namedtuple("FetchRequest", - ["topic", "partition", "offset", "max_bytes"]) - -OffsetRequest = namedtuple("OffsetRequest", - ["topic", "partition", "time", "max_offsets"]) - -OffsetCommitRequest = namedtuple("OffsetCommitRequest", - ["topic", "partition", "offset", "metadata"]) - +# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI MetadataRequest = namedtuple("MetadataRequest", ["topics"]) -OffsetFetchRequest = namedtuple("OffsetFetchRequest", ["topic", "partition"]) - MetadataResponse = namedtuple("MetadataResponse", ["brokers", "topics"]) -# Response payloads +# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ProduceAPI +ProduceRequest = namedtuple("ProduceRequest", + ["topic", "partition", "messages"]) + ProduceResponse = namedtuple("ProduceResponse", - ["topic", "partition", "error", "offset"]) + ["topic", "partition", "error", "offset"]) + +# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-FetchAPI +FetchRequest = namedtuple("FetchRequest", + ["topic", "partition", "offset", "max_bytes"]) -FetchResponse = namedtuple("FetchResponse", ["topic", "partition", "error", - "highwaterMark", "messages"]) +FetchResponse = namedtuple("FetchResponse", + ["topic", "partition", "error", "highwaterMark", "messages"]) + +# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetAPI +OffsetRequest = namedtuple("OffsetRequest", + ["topic", "partition", "time", "max_offsets"]) OffsetResponse = namedtuple("OffsetResponse", - ["topic", "partition", "error", "offsets"]) + ["topic", "partition", "error", "offsets"]) + +# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI +OffsetCommitRequest = namedtuple("OffsetCommitRequest", + ["topic", "partition", "offset", "metadata"]) OffsetCommitResponse = namedtuple("OffsetCommitResponse", - ["topic", "partition", "error"]) + ["topic", "partition", "error"]) + +OffsetFetchRequest = namedtuple("OffsetFetchRequest", + ["topic", "partition"]) OffsetFetchResponse = namedtuple("OffsetFetchResponse", - ["topic", "partition", "offset", - "metadata", "error"]) + ["topic", "partition", "offset", "metadata", "error"]) diff --git a/kafka/consumer/base.py b/kafka/consumer/base.py index 9cdcf89..efc9404 100644 --- a/kafka/consumer/base.py +++ b/kafka/consumer/base.py @@ -25,6 +25,7 @@ MAX_FETCH_BUFFER_SIZE_BYTES = FETCH_BUFFER_SIZE_BYTES * 8 ITER_TIMEOUT_SECONDS = 60 NO_MESSAGES_WAIT_TIME_SECONDS = 0.1 +FULL_QUEUE_WAIT_TIME_SECONDS = 0.1 class Consumer(object): diff --git a/kafka/consumer/multiprocess.py b/kafka/consumer/multiprocess.py index 4dc04dc..5ce8b4d 100644 --- a/kafka/consumer/multiprocess.py +++ b/kafka/consumer/multiprocess.py @@ -2,23 +2,27 @@ from __future__ import absolute_import import logging import time -from multiprocessing import Process, Queue as MPQueue, Event, Value + +from collections import namedtuple +from multiprocessing import Process, Manager as MPManager try: - from Queue import Empty + from Queue import Empty, Full except ImportError: # python 2 - from queue import Empty + from queue import Empty, Full from .base import ( AUTO_COMMIT_MSG_COUNT, AUTO_COMMIT_INTERVAL, - NO_MESSAGES_WAIT_TIME_SECONDS + NO_MESSAGES_WAIT_TIME_SECONDS, + FULL_QUEUE_WAIT_TIME_SECONDS ) from .simple import Consumer, SimpleConsumer -log = logging.getLogger("kafka") +Events = namedtuple("Events", ["start", "pause", "exit"]) +log = logging.getLogger("kafka") -def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size): +def _mp_consume(client, group, topic, queue, size, events, **consumer_options): """ A child process worker which consumes messages based on the notifications given by the controller process @@ -34,20 +38,20 @@ def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size): # We will start consumers without auto-commit. Auto-commit will be # done by the master controller process. consumer = SimpleConsumer(client, group, topic, - partitions=chunk, auto_commit=False, auto_commit_every_n=None, - auto_commit_every_t=None) + auto_commit_every_t=None, + **consumer_options) # Ensure that the consumer provides the partition information consumer.provide_partition_info() while True: # Wait till the controller indicates us to start consumption - start.wait() + events.start.wait() # If we are asked to quit, do so - if exit.is_set(): + if events.exit.is_set(): break # Consume messages and add them to the queue. If the controller @@ -56,7 +60,13 @@ def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size): message = consumer.get_message() if message: - queue.put(message) + while True: + try: + queue.put(message, timeout=FULL_QUEUE_WAIT_TIME_SECONDS) + break + except Full: + if events.exit.is_set(): break + count += 1 # We have reached the required size. The controller might have @@ -65,7 +75,7 @@ def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size): # loop consuming all available messages before the controller # can reset the 'start' event if count == size.value: - pause.wait() + events.pause.wait() else: # In case we did not receive any message, give up the CPU for @@ -105,7 +115,8 @@ class MultiProcessConsumer(Consumer): def __init__(self, client, group, topic, auto_commit=True, auto_commit_every_n=AUTO_COMMIT_MSG_COUNT, auto_commit_every_t=AUTO_COMMIT_INTERVAL, - num_procs=1, partitions_per_proc=0): + num_procs=1, partitions_per_proc=0, + **simple_consumer_options): # Initiate the base consumer class super(MultiProcessConsumer, self).__init__( @@ -117,36 +128,42 @@ class MultiProcessConsumer(Consumer): # Variables for managing and controlling the data flow from # consumer child process to master - self.queue = MPQueue(1024) # Child consumers dump messages into this - self.start = Event() # Indicates the consumers to start fetch - self.exit = Event() # Requests the consumers to shutdown - self.pause = Event() # Requests the consumers to pause fetch - self.size = Value('i', 0) # Indicator of number of messages to fetch - - partitions = self.offsets.keys() - - # If unspecified, start one consumer per partition + manager = MPManager() + self.queue = manager.Queue(1024) # Child consumers dump messages into this + self.events = Events( + start = manager.Event(), # Indicates the consumers to start fetch + exit = manager.Event(), # Requests the consumers to shutdown + pause = manager.Event()) # Requests the consumers to pause fetch + self.size = manager.Value('i', 0) # Indicator of number of messages to fetch + + # dict.keys() returns a view in py3 + it's not a thread-safe operation + # http://blog.labix.org/2008/06/27/watch-out-for-listdictkeys-in-python-3 + # It's safer to copy dict as it only runs during the init. + partitions = list(self.offsets.copy().keys()) + + # By default, start one consumer process for all partitions # The logic below ensures that # * we do not cross the num_procs limit # * we have an even distribution of partitions among processes - if not partitions_per_proc: - partitions_per_proc = round(len(partitions) * 1.0 / num_procs) - if partitions_per_proc < num_procs * 0.5: - partitions_per_proc += 1 + + if partitions_per_proc: + num_procs = len(partitions) / partitions_per_proc + if num_procs * partitions_per_proc < len(partitions): + num_procs += 1 # The final set of chunks - chunker = lambda *x: [] + list(x) - chunks = map(chunker, *[iter(partitions)] * int(partitions_per_proc)) + chunks = [partitions[proc::num_procs] for proc in range(num_procs)] self.procs = [] for chunk in chunks: - chunk = filter(lambda x: x is not None, chunk) - args = (client.copy(), - group, topic, list(chunk), - self.queue, self.start, self.exit, - self.pause, self.size) - - proc = Process(target=_mp_consume, args=args) + options = {'partitions': list(chunk)} + if simple_consumer_options: + simple_consumer_options.pop('partitions', None) + options.update(simple_consumer_options) + + args = (client.copy(), group, topic, self.queue, + self.size, self.events) + proc = Process(target=_mp_consume, args=args, kwargs=options) proc.daemon = True proc.start() self.procs.append(proc) @@ -157,9 +174,9 @@ class MultiProcessConsumer(Consumer): def stop(self): # Set exit and start off all waiting consumers - self.exit.set() - self.pause.set() - self.start.set() + self.events.exit.set() + self.events.pause.set() + self.events.start.set() for proc in self.procs: proc.join() @@ -174,10 +191,10 @@ class MultiProcessConsumer(Consumer): # Trigger the consumer procs to start off. # We will iterate till there are no more messages available self.size.value = 0 - self.pause.set() + self.events.pause.set() while True: - self.start.set() + self.events.start.set() try: # We will block for a small while so that the consumers get # a chance to run and put some messages in the queue @@ -189,12 +206,12 @@ class MultiProcessConsumer(Consumer): # Count, check and commit messages if necessary self.offsets[partition] = message.offset + 1 - self.start.clear() + self.events.start.clear() self.count_since_commit += 1 self._auto_commit() yield message - self.start.clear() + self.events.start.clear() def get_messages(self, count=1, block=True, timeout=10): """ @@ -214,7 +231,7 @@ class MultiProcessConsumer(Consumer): # necessary, but these will not be committed to kafka. Also, the extra # messages can be provided in subsequent runs self.size.value = count - self.pause.clear() + self.events.pause.clear() if timeout is not None: max_time = time.time() + timeout @@ -226,7 +243,7 @@ class MultiProcessConsumer(Consumer): # go into overdrive and keep consuming thousands of # messages when the user might need only a few if self.queue.empty(): - self.start.set() + self.events.start.set() try: partition, message = self.queue.get(block, timeout) @@ -240,8 +257,8 @@ class MultiProcessConsumer(Consumer): timeout = max_time - time.time() self.size.value = 0 - self.start.clear() - self.pause.set() + self.events.start.clear() + self.events.pause.set() # Update and commit offsets if necessary self.offsets.update(new_offsets) diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py index 3d250ea..b50de61 100644 --- a/kafka/consumer/simple.py +++ b/kafka/consumer/simple.py @@ -214,8 +214,8 @@ class SimpleConsumer(Consumer): # Reset queue and fetch offsets since they are invalid self.fetch_offsets = self.offsets.copy() + self.count_since_commit += 1 if self.auto_commit: - self.count_since_commit += 1 self.commit() self.queue = Queue() diff --git a/kafka/producer/base.py b/kafka/producer/base.py index e32b168..6a5a94e 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -58,7 +58,7 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, # Adjust the timeout to match the remaining period count -= 1 timeout = send_at - time.time() - msgset[topic_partition].append(msg) + msgset[topic_partition].append((msg, key)) # Send collected requests upstream reqs = [] @@ -192,7 +192,7 @@ class Producer(object): self.queue.put((TopicAndPartition(topic, partition), m, key)) resp = [] else: - messages = create_message_set(msg, self.codec, key) + messages = create_message_set([(m, key) for m in msg], self.codec, key) req = ProduceRequest(topic, partition, messages) try: resp = self.client.send_produce_request([req], acks=self.req_acks, diff --git a/kafka/protocol.py b/kafka/protocol.py index 2a39de6..b34a95d 100644 --- a/kafka/protocol.py +++ b/kafka/protocol.py @@ -559,7 +559,7 @@ def create_gzip_message(payloads, key=None): """ message_set = KafkaProtocol._encode_message_set( - [create_message(payload, key) for payload in payloads]) + [create_message(payload, pl_key) for payload, pl_key in payloads]) gzipped = gzip_encode(message_set) codec = ATTRIBUTE_CODEC_MASK & CODEC_GZIP @@ -580,7 +580,7 @@ def create_snappy_message(payloads, key=None): """ message_set = KafkaProtocol._encode_message_set( - [create_message(payload, key) for payload in payloads]) + [create_message(payload, pl_key) for payload, pl_key in payloads]) snapped = snappy_encode(message_set) codec = ATTRIBUTE_CODEC_MASK & CODEC_SNAPPY @@ -595,7 +595,7 @@ def create_message_set(messages, codec=CODEC_NONE, key=None): return a list containing a single codec-encoded message. """ if codec == CODEC_NONE: - return [create_message(m, key) for m in messages] + return [create_message(m, k) for m, k in messages] elif codec == CODEC_GZIP: return [create_gzip_message(messages, key)] elif codec == CODEC_SNAPPY: diff --git a/pylint.rc b/pylint.rc new file mode 100644 index 0000000..1e76d8c --- /dev/null +++ b/pylint.rc @@ -0,0 +1,2 @@ +[TYPECHECK] +ignored-classes=SyncManager diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index 9c89190..d3df56a 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -61,7 +61,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): group = kwargs.pop('group', self.id().encode('utf-8')) topic = kwargs.pop('topic', self.topic) - if consumer_class == SimpleConsumer: + if consumer_class in [SimpleConsumer, MultiProcessConsumer]: kwargs.setdefault('iter_timeout', 0) return consumer_class(self.client, group, topic, **kwargs) @@ -243,7 +243,8 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): self.send_messages(0, range(0, 10)) self.send_messages(1, range(10, 20)) - consumer = MultiProcessConsumer(self.client, "group1", self.topic, auto_commit=False) + consumer = MultiProcessConsumer(self.client, "group1", self.topic, + auto_commit=False, iter_timeout=0) self.assertEqual(consumer.pending(), 20) self.assertEqual(consumer.pending(partitions=[0]), 10) diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py index 38df69f..1804af0 100644 --- a/test/test_producer_integration.py +++ b/test/test_producer_integration.py @@ -71,9 +71,9 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): start_offset = self.current_offset(self.topic, 0) message1 = create_gzip_message([ - ("Gzipped 1 %d" % i).encode('utf-8') for i in range(100)]) + (("Gzipped 1 %d" % i).encode('utf-8'), None) for i in range(100)]) message2 = create_gzip_message([ - ("Gzipped 2 %d" % i).encode('utf-8') for i in range(100)]) + (("Gzipped 2 %d" % i).encode('utf-8'), None) for i in range(100)]) self.assert_produce_request( [ message1, message2 ], @@ -87,8 +87,8 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): start_offset = self.current_offset(self.topic, 0) self.assert_produce_request([ - create_snappy_message(["Snappy 1 %d" % i for i in range(100)]), - create_snappy_message(["Snappy 2 %d" % i for i in range(100)]), + create_snappy_message([("Snappy 1 %d" % i, None) for i in range(100)]), + create_snappy_message([("Snappy 2 %d" % i, None) for i in range(100)]), ], start_offset, 200, @@ -102,13 +102,13 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): messages = [ create_message(b"Just a plain message"), create_gzip_message([ - ("Gzipped %d" % i).encode('utf-8') for i in range(100)]), + (("Gzipped %d" % i).encode('utf-8'), None) for i in range(100)]), ] # All snappy integration tests fail with nosnappyjava if False and has_snappy(): msg_count += 100 - messages.append(create_snappy_message(["Snappy %d" % i for i in range(100)])) + messages.append(create_snappy_message([("Snappy %d" % i, None) for i in range(100)])) self.assert_produce_request(messages, start_offset, msg_count) @@ -118,7 +118,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): self.assert_produce_request([ create_gzip_message([ - ("Gzipped batch 1, message %d" % i).encode('utf-8') + (("Gzipped batch 1, message %d" % i).encode('utf-8'), None) for i in range(50000)]) ], start_offset, @@ -127,7 +127,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): self.assert_produce_request([ create_gzip_message([ - ("Gzipped batch 1, message %d" % i).encode('utf-8') + (("Gzipped batch 1, message %d" % i).encode('utf-8'), None) for i in range(50000)]) ], start_offset+50000, diff --git a/test/test_protocol.py b/test/test_protocol.py index d20f591..0938228 100644 --- a/test/test_protocol.py +++ b/test/test_protocol.py @@ -32,7 +32,7 @@ class TestProtocol(unittest.TestCase): self.assertEqual(msg.value, payload) def test_create_gzip(self): - payloads = [b"v1", b"v2"] + payloads = [(b"v1", None), (b"v2", None)] msg = create_gzip_message(payloads) self.assertEqual(msg.magic, 0) self.assertEqual(msg.attributes, ATTRIBUTE_CODEC_MASK & CODEC_GZIP) @@ -59,9 +59,39 @@ class TestProtocol(unittest.TestCase): self.assertEqual(decoded, expect) + def test_create_gzip_keyed(self): + payloads = [(b"v1", b"k1"), (b"v2", b"k2")] + msg = create_gzip_message(payloads) + self.assertEqual(msg.magic, 0) + self.assertEqual(msg.attributes, ATTRIBUTE_CODEC_MASK & CODEC_GZIP) + self.assertEqual(msg.key, None) + # Need to decode to check since gzipped payload is non-deterministic + decoded = gzip_decode(msg.value) + expect = b"".join([ + struct.pack(">q", 0), # MsgSet Offset + struct.pack(">i", 18), # Msg Size + struct.pack(">i", 1474775406), # CRC + struct.pack(">bb", 0, 0), # Magic, flags + struct.pack(">i", 2), # Length of key + b"k1", # Key + struct.pack(">i", 2), # Length of value + b"v1", # Value + + struct.pack(">q", 0), # MsgSet Offset + struct.pack(">i", 18), # Msg Size + struct.pack(">i", -16383415), # CRC + struct.pack(">bb", 0, 0), # Magic, flags + struct.pack(">i", 2), # Length of key + b"k2", # Key + struct.pack(">i", 2), # Length of value + b"v2", # Value + ]) + + self.assertEqual(decoded, expect) + @unittest.skipUnless(has_snappy(), "Snappy not available") def test_create_snappy(self): - payloads = [b"v1", b"v2"] + payloads = [(b"v1", None), (b"v2", None)] msg = create_snappy_message(payloads) self.assertEqual(msg.magic, 0) self.assertEqual(msg.attributes, ATTRIBUTE_CODEC_MASK & CODEC_SNAPPY) @@ -87,6 +117,36 @@ class TestProtocol(unittest.TestCase): self.assertEqual(decoded, expect) + @unittest.skipUnless(has_snappy(), "Snappy not available") + def test_create_snappy_keyed(self): + payloads = [(b"v1", b"k1"), (b"v2", b"k2")] + msg = create_snappy_message(payloads) + self.assertEqual(msg.magic, 0) + self.assertEqual(msg.attributes, ATTRIBUTE_CODEC_MASK & CODEC_SNAPPY) + self.assertEqual(msg.key, None) + decoded = snappy_decode(msg.value) + expect = b"".join([ + struct.pack(">q", 0), # MsgSet Offset + struct.pack(">i", 18), # Msg Size + struct.pack(">i", 1474775406), # CRC + struct.pack(">bb", 0, 0), # Magic, flags + struct.pack(">i", 2), # Length of key + b"k1", # Key + struct.pack(">i", 2), # Length of value + b"v1", # Value + + struct.pack(">q", 0), # MsgSet Offset + struct.pack(">i", 18), # Msg Size + struct.pack(">i", -16383415), # CRC + struct.pack(">bb", 0, 0), # Magic, flags + struct.pack(">i", 2), # Length of key + b"k2", # Key + struct.pack(">i", 2), # Length of value + b"v2", # Value + ]) + + self.assertEqual(decoded, expect) + def test_encode_message_header(self): expect = b"".join([ struct.pack(">h", 10), # API Key @@ -701,7 +761,7 @@ class TestProtocol(unittest.TestCase): yield def test_create_message_set(self): - messages = [1, 2, 3] + messages = [(1, "k1"), (2, "k2"), (3, "k3")] # Default codec is CODEC_NONE. Expect list of regular messages. expect = [sentinel.message] * len(messages) @@ -37,7 +37,7 @@ deps = unittest2 mock pylint -commands = pylint {posargs: -E kafka test} +commands = pylint --rcfile=pylint.rc {posargs: -E kafka test} [testenv:docs] deps = |