summaryrefslogtreecommitdiff
path: root/test/test_protocol.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_protocol.py')
-rw-r--r--test/test_protocol.py69
1 files changed, 60 insertions, 9 deletions
diff --git a/test/test_protocol.py b/test/test_protocol.py
index 8bd2f5e..2089f48 100644
--- a/test/test_protocol.py
+++ b/test/test_protocol.py
@@ -1,22 +1,30 @@
+import contextlib
+from contextlib import contextmanager
import struct
import unittest2
+import mock
+from mock import sentinel
+
from kafka import KafkaClient
from kafka.common import (
OffsetRequest, OffsetCommitRequest, OffsetFetchRequest,
OffsetResponse, OffsetCommitResponse, OffsetFetchResponse,
ProduceRequest, FetchRequest, Message, ChecksumError,
- ConsumerFetchSizeTooSmall, ProduceResponse, FetchResponse,
- OffsetAndMessage, BrokerMetadata, PartitionMetadata,
- TopicAndPartition, KafkaUnavailableError, ProtocolError,
- LeaderUnavailableError, PartitionUnavailableError
+ ConsumerFetchSizeTooSmall, ProduceResponse, FetchResponse, OffsetAndMessage,
+ BrokerMetadata, PartitionMetadata, TopicAndPartition, KafkaUnavailableError,
+ ProtocolError, LeaderUnavailableError, PartitionUnavailableError,
+ UnsupportedCodecError
)
from kafka.codec import (
has_snappy, gzip_encode, gzip_decode,
snappy_encode, snappy_decode
)
+import kafka.protocol
from kafka.protocol import (
- create_gzip_message, create_message, create_snappy_message, KafkaProtocol
+ ATTRIBUTE_CODEC_MASK, CODEC_NONE, CODEC_GZIP, CODEC_SNAPPY, KafkaProtocol,
+ create_message, create_gzip_message, create_snappy_message,
+ create_message_set
)
class TestProtocol(unittest2.TestCase):
@@ -33,8 +41,7 @@ class TestProtocol(unittest2.TestCase):
payloads = ["v1", "v2"]
msg = create_gzip_message(payloads)
self.assertEqual(msg.magic, 0)
- self.assertEqual(msg.attributes, KafkaProtocol.ATTRIBUTE_CODEC_MASK &
- KafkaProtocol.CODEC_GZIP)
+ self.assertEqual(msg.attributes, ATTRIBUTE_CODEC_MASK & CODEC_GZIP)
self.assertEqual(msg.key, None)
# Need to decode to check since gzipped payload is non-deterministic
decoded = gzip_decode(msg.value)
@@ -63,8 +70,7 @@ class TestProtocol(unittest2.TestCase):
payloads = ["v1", "v2"]
msg = create_snappy_message(payloads)
self.assertEqual(msg.magic, 0)
- self.assertEqual(msg.attributes, KafkaProtocol.ATTRIBUTE_CODEC_MASK &
- KafkaProtocol.CODEC_SNAPPY)
+ self.assertEqual(msg.attributes, ATTRIBUTE_CODEC_MASK & CODEC_SNAPPY)
self.assertEqual(msg.key, None)
decoded = snappy_decode(msg.value)
expect = "".join([
@@ -692,3 +698,48 @@ class TestProtocol(unittest2.TestCase):
OffsetFetchResponse(topic = 'topic1', partition = 2, offset = 4, error = 0, metadata = "meta"),
OffsetFetchResponse(topic = 'topic1', partition = 4, offset = 8, error = 0, metadata = "meta"),
]))
+
+ @contextmanager
+ def mock_create_message_fns(self):
+ patches = contextlib.nested(
+ mock.patch.object(kafka.protocol, "create_message",
+ return_value=sentinel.message),
+ mock.patch.object(kafka.protocol, "create_gzip_message",
+ return_value=sentinel.gzip_message),
+ mock.patch.object(kafka.protocol, "create_snappy_message",
+ return_value=sentinel.snappy_message),
+ )
+
+ with patches:
+ yield
+
+ def test_create_message_set(self):
+ messages = [1, 2, 3]
+
+ # Default codec is CODEC_NONE. Expect list of regular messages.
+ expect = [sentinel.message] * len(messages)
+ with self.mock_create_message_fns():
+ message_set = create_message_set(messages)
+ self.assertEqual(message_set, expect)
+
+ # CODEC_NONE: Expect list of regular messages.
+ expect = [sentinel.message] * len(messages)
+ with self.mock_create_message_fns():
+ message_set = create_message_set(messages, CODEC_NONE)
+ self.assertEqual(message_set, expect)
+
+ # CODEC_GZIP: Expect list of one gzip-encoded message.
+ expect = [sentinel.gzip_message]
+ with self.mock_create_message_fns():
+ message_set = create_message_set(messages, CODEC_GZIP)
+ self.assertEqual(message_set, expect)
+
+ # CODEC_SNAPPY: Expect list of one snappy-encoded message.
+ expect = [sentinel.snappy_message]
+ with self.mock_create_message_fns():
+ message_set = create_message_set(messages, CODEC_SNAPPY)
+ self.assertEqual(message_set, expect)
+
+ # Unknown codec should raise UnsupportedCodecError.
+ with self.assertRaises(UnsupportedCodecError):
+ create_message_set(messages, -1)