summaryrefslogtreecommitdiff
path: root/test/test_protocol.py
diff options
context:
space:
mode:
authorPatrick Lucas <plucas@yelp.com>2014-05-07 10:02:57 -0700
committerPatrick Lucas <plucas@yelp.com>2014-05-07 10:08:14 -0700
commit805b52a34da9ce0dead80a64d7315412f2034673 (patch)
tree8d9a6d6b378e40ce3177d6fe7ee4030abf362718 /test/test_protocol.py
parent39796ec49162a0895be98a8aeb0e42e8319e5e30 (diff)
downloadkafka-python-805b52a34da9ce0dead80a64d7315412f2034673.tar.gz
Improve error handling and tests w.r.t. codecs
Add function kafka.protocol.create_message_set() that takes a list of payloads and a codec and returns a message set with the desired encoding. Introduce kafka.common.UnsupportedCodecError, raised if an unknown codec is specified. Include a test for the new function.
Diffstat (limited to 'test/test_protocol.py')
-rw-r--r--test/test_protocol.py64
1 files changed, 58 insertions, 6 deletions
diff --git a/test/test_protocol.py b/test/test_protocol.py
index 854a439..2089f48 100644
--- a/test/test_protocol.py
+++ b/test/test_protocol.py
@@ -1,23 +1,30 @@
+import contextlib
+from contextlib import contextmanager
import struct
import unittest2
+import mock
+from mock import sentinel
+
from kafka import KafkaClient
from kafka.common import (
OffsetRequest, OffsetCommitRequest, OffsetFetchRequest,
OffsetResponse, OffsetCommitResponse, OffsetFetchResponse,
ProduceRequest, FetchRequest, Message, ChecksumError,
- ConsumerFetchSizeTooSmall, ProduceResponse, FetchResponse,
- OffsetAndMessage, BrokerMetadata, PartitionMetadata,
- TopicAndPartition, KafkaUnavailableError, ProtocolError,
- LeaderUnavailableError, PartitionUnavailableError
+ ConsumerFetchSizeTooSmall, ProduceResponse, FetchResponse, OffsetAndMessage,
+ BrokerMetadata, PartitionMetadata, TopicAndPartition, KafkaUnavailableError,
+ ProtocolError, LeaderUnavailableError, PartitionUnavailableError,
+ UnsupportedCodecError
)
from kafka.codec import (
has_snappy, gzip_encode, gzip_decode,
snappy_encode, snappy_decode
)
+import kafka.protocol
from kafka.protocol import (
- create_gzip_message, create_message, create_snappy_message, KafkaProtocol,
- ATTRIBUTE_CODEC_MASK, CODEC_GZIP, CODEC_SNAPPY
+ ATTRIBUTE_CODEC_MASK, CODEC_NONE, CODEC_GZIP, CODEC_SNAPPY, KafkaProtocol,
+ create_message, create_gzip_message, create_snappy_message,
+ create_message_set
)
class TestProtocol(unittest2.TestCase):
@@ -691,3 +698,48 @@ class TestProtocol(unittest2.TestCase):
OffsetFetchResponse(topic = 'topic1', partition = 2, offset = 4, error = 0, metadata = "meta"),
OffsetFetchResponse(topic = 'topic1', partition = 4, offset = 8, error = 0, metadata = "meta"),
]))
+
+ @contextmanager
+ def mock_create_message_fns(self):
+ patches = contextlib.nested(
+ mock.patch.object(kafka.protocol, "create_message",
+ return_value=sentinel.message),
+ mock.patch.object(kafka.protocol, "create_gzip_message",
+ return_value=sentinel.gzip_message),
+ mock.patch.object(kafka.protocol, "create_snappy_message",
+ return_value=sentinel.snappy_message),
+ )
+
+ with patches:
+ yield
+
+ def test_create_message_set(self):
+ messages = [1, 2, 3]
+
+ # Default codec is CODEC_NONE. Expect list of regular messages.
+ expect = [sentinel.message] * len(messages)
+ with self.mock_create_message_fns():
+ message_set = create_message_set(messages)
+ self.assertEqual(message_set, expect)
+
+ # CODEC_NONE: Expect list of regular messages.
+ expect = [sentinel.message] * len(messages)
+ with self.mock_create_message_fns():
+ message_set = create_message_set(messages, CODEC_NONE)
+ self.assertEqual(message_set, expect)
+
+ # CODEC_GZIP: Expect list of one gzip-encoded message.
+ expect = [sentinel.gzip_message]
+ with self.mock_create_message_fns():
+ message_set = create_message_set(messages, CODEC_GZIP)
+ self.assertEqual(message_set, expect)
+
+ # CODEC_SNAPPY: Expect list of one snappy-encoded message.
+ expect = [sentinel.snappy_message]
+ with self.mock_create_message_fns():
+ message_set = create_message_set(messages, CODEC_SNAPPY)
+ self.assertEqual(message_set, expect)
+
+ # Unknown codec should raise UnsupportedCodecError.
+ with self.assertRaises(UnsupportedCodecError):
+ create_message_set(messages, -1)