diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-07-14 08:35:24 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-07-14 20:43:03 -0700 |
commit | 594f7079da4fc1598966dcc82caaf73532dea4d4 (patch) | |
tree | a1349c4a71fcee681d1670509255d2cf4e0e3ad8 /test/test_buffer.py | |
parent | 1eb7e05c323322818fb60192f638d6b83f2fd1ef (diff) | |
download | kafka-python-594f7079da4fc1598966dcc82caaf73532dea4d4.tar.gz |
Test MessageSetBuffer close -- cover double close compression bug
Diffstat (limited to 'test/test_buffer.py')
-rw-r--r-- | test/test_buffer.py | 70 |
1 files changed, 70 insertions, 0 deletions
diff --git a/test/test_buffer.py b/test/test_buffer.py new file mode 100644 index 0000000..c8e283d --- /dev/null +++ b/test/test_buffer.py @@ -0,0 +1,70 @@ +# pylint: skip-file +from __future__ import absolute_import + +import io + +import pytest + +from kafka.producer.buffer import MessageSetBuffer +from kafka.protocol.message import Message, MessageSet + + +def test_buffer_close(): + records = MessageSetBuffer(io.BytesIO(), 100000) + orig_msg = Message(b'foobar') + records.append(1234, orig_msg) + records.close() + + msgset = MessageSet.decode(records.buffer()) + assert len(msgset) == 1 + (offset, size, msg) = msgset[0] + assert offset == 1234 + assert msg == orig_msg + + # Closing again should work fine + records.close() + + msgset = MessageSet.decode(records.buffer()) + assert len(msgset) == 1 + (offset, size, msg) = msgset[0] + assert offset == 1234 + assert msg == orig_msg + + +@pytest.mark.parametrize('compression', [ + 'gzip', + 'snappy', + pytest.mark.skipif("sys.version_info < (2,7)")('lz4'), # lz4tools does not work on py26 +]) +def test_compressed_buffer_close(compression): + records = MessageSetBuffer(io.BytesIO(), 100000, compression_type=compression) + orig_msg = Message(b'foobar') + records.append(1234, orig_msg) + records.close() + + msgset = MessageSet.decode(records.buffer()) + assert len(msgset) == 1 + (offset, size, msg) = msgset[0] + assert offset == 0 + assert msg.is_compressed() + + msgset = msg.decompress() + (offset, size, msg) = msgset[0] + assert not msg.is_compressed() + assert offset == 1234 + assert msg == orig_msg + + # Closing again should work fine + records.close() + + msgset = MessageSet.decode(records.buffer()) + assert len(msgset) == 1 + (offset, size, msg) = msgset[0] + assert offset == 0 + assert msg.is_compressed() + + msgset = msg.decompress() + (offset, size, msg) = msgset[0] + assert not msg.is_compressed() + assert offset == 1234 + assert msg == orig_msg |