summaryrefslogtreecommitdiff
path: root/test/test_unit.py
diff options
context:
space:
mode:
authormrtheb <mrlabbe@gmail.com>2014-03-17 15:43:00 -0400
committermrtheb <mrlabbe@gmail.com>2014-03-17 15:43:00 -0400
commit19646b1f1f9fae38c3075a9a56b8e7a9d395ff8e (patch)
tree421d3e2d628e8b564eecde6a4efcd4edac31d1ff /test/test_unit.py
parent828133cff064f4f8fba753183ac21619355ac005 (diff)
parent32edabdaaff6746e4926cc897b4bba917a80cb54 (diff)
downloadkafka-python-19646b1f1f9fae38c3075a9a56b8e7a9d395ff8e.tar.gz
Merge branch 'master' into develop
Conflicts: test/test_unit.py
Diffstat (limited to 'test/test_unit.py')
-rw-r--r--test/test_unit.py151
1 files changed, 134 insertions, 17 deletions
diff --git a/test/test_unit.py b/test/test_unit.py
index 0b2b339..47ec561 100644
--- a/test/test_unit.py
+++ b/test/test_unit.py
@@ -3,11 +3,15 @@ import random
import struct
import unittest
+from mock import MagicMock, patch
+
+from kafka import KafkaClient
from kafka.common import (
ProduceRequest, FetchRequest, Message, ChecksumError,
ConsumerFetchSizeTooSmall, ProduceResponse, FetchResponse,
OffsetAndMessage, BrokerMetadata, PartitionMetadata,
- TopicAndPartition, LeaderUnavailableError, PartitionUnavailableError
+ TopicAndPartition, KafkaUnavailableError,
+ LeaderUnavailableError, PartitionUnavailableError
)
from kafka.codec import (
has_gzip, has_snappy, gzip_encode, gzip_decode,
@@ -17,10 +21,6 @@ from kafka.protocol import (
create_gzip_message, create_message, create_snappy_message, KafkaProtocol
)
-from kafka.client import KafkaClient
-
-from mock import patch
-
ITERATIONS = 1000
STRLEN = 100
@@ -55,6 +55,7 @@ class TestPackage(unittest.TestCase):
from kafka import KafkaClient as KafkaClient2
self.assertEquals(KafkaClient2.__name__, "KafkaClient")
+ from kafka.codec import snappy_encode
self.assertEquals(snappy_encode.__name__, "snappy_encode")
@@ -74,6 +75,49 @@ class TestCodec(unittest.TestCase):
s2 = snappy_decode(snappy_encode(s1))
self.assertEquals(s1, s2)
+ @unittest.skipUnless(has_snappy(), "Snappy not available")
+ def test_snappy_detect_xerial(self):
+ import kafka as kafka1
+ _detect_xerial_stream = kafka1.codec._detect_xerial_stream
+
+ header = b'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01Some extra bytes'
+ false_header = b'\x01SNAPPY\x00\x00\x00\x01\x00\x00\x00\x01'
+ random_snappy = snappy_encode('SNAPPY' * 50)
+ short_data = b'\x01\x02\x03\x04'
+
+ self.assertTrue(_detect_xerial_stream(header))
+ self.assertFalse(_detect_xerial_stream(b''))
+ self.assertFalse(_detect_xerial_stream(b'\x00'))
+ self.assertFalse(_detect_xerial_stream(false_header))
+ self.assertFalse(_detect_xerial_stream(random_snappy))
+ self.assertFalse(_detect_xerial_stream(short_data))
+
+ @unittest.skipUnless(has_snappy(), "Snappy not available")
+ def test_snappy_decode_xerial(self):
+ header = b'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01'
+ random_snappy = snappy_encode('SNAPPY' * 50)
+ block_len = len(random_snappy)
+ random_snappy2 = snappy_encode('XERIAL' * 50)
+ block_len2 = len(random_snappy2)
+
+ to_test = header \
+ + struct.pack('!i', block_len) + random_snappy \
+ + struct.pack('!i', block_len2) + random_snappy2 \
+
+ self.assertEquals(snappy_decode(to_test), ('SNAPPY' * 50) + ('XERIAL' * 50))
+
+ @unittest.skipUnless(has_snappy(), "Snappy not available")
+ def test_snappy_encode_xerial(self):
+ to_ensure = b'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01' + \
+ '\x00\x00\x00\x18' + \
+ '\xac\x02\x14SNAPPY\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\x96\x06\x00' + \
+ '\x00\x00\x00\x18' + \
+ '\xac\x02\x14XERIAL\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\x96\x06\x00'
+
+ to_test = ('SNAPPY' * 50) + ('XERIAL' * 50)
+
+ compressed = snappy_encode(to_test, xerial_compatible=True, xerial_blocksize=300)
+ self.assertEquals(compressed, to_ensure)
class TestProtocol(unittest.TestCase):
@@ -91,9 +135,8 @@ class TestProtocol(unittest.TestCase):
payloads = ["v1", "v2"]
msg = create_gzip_message(payloads)
self.assertEqual(msg.magic, 0)
- self.assertEqual(
- msg.attributes,
- KafkaProtocol.ATTRIBUTE_CODEC_MASK & KafkaProtocol.CODEC_GZIP)
+ self.assertEqual(msg.attributes, KafkaProtocol.ATTRIBUTE_CODEC_MASK &
+ KafkaProtocol.CODEC_GZIP)
self.assertEqual(msg.key, None)
# Need to decode to check since gzipped payload is non-deterministic
decoded = gzip_decode(msg.value)
@@ -108,9 +151,8 @@ class TestProtocol(unittest.TestCase):
payloads = ["v1", "v2"]
msg = create_snappy_message(payloads)
self.assertEqual(msg.magic, 0)
- self.assertEqual(
- msg.attributes,
- KafkaProtocol.ATTRIBUTE_CODEC_MASK & KafkaProtocol.CODEC_SNAPPY)
+ self.assertEqual(msg.attributes, KafkaProtocol.ATTRIBUTE_CODEC_MASK &
+ KafkaProtocol.CODEC_SNAPPY)
self.assertEqual(msg.key, None)
expect = ("8\x00\x00\x19\x01@\x10L\x9f[\xc2\x00\x00\xff\xff\xff\xff"
"\x00\x00\x00\x02v1\x19\x1bD\x00\x10\xd5\x96\nx\x00\x00\xff"
@@ -281,7 +323,6 @@ class TestProtocol(unittest.TestCase):
len(ms3), ms3)
responses = list(KafkaProtocol.decode_fetch_response(encoded))
-
def expand_messages(response):
return FetchResponse(response.topic, response.partition,
response.error, response.highwaterMark,
@@ -369,6 +410,7 @@ class TestProtocol(unittest.TestCase):
def test_decode_offset_response(self):
pass
+
@unittest.skip("Not Implemented")
def test_encode_offset_commit_request(self):
pass
@@ -388,6 +430,81 @@ class TestProtocol(unittest.TestCase):
class TestKafkaClient(unittest.TestCase):
+ def test_init_with_list(self):
+
+ with patch.object(KafkaClient, 'load_metadata_for_topics'):
+ client = KafkaClient(
+ hosts=['kafka01:9092', 'kafka02:9092', 'kafka03:9092'])
+
+ self.assertItemsEqual(
+ [('kafka01', 9092), ('kafka02', 9092), ('kafka03', 9092)],
+ client.hosts)
+
+ def test_init_with_csv(self):
+
+ with patch.object(KafkaClient, 'load_metadata_for_topics'):
+ client = KafkaClient(
+ hosts='kafka01:9092,kafka02:9092,kafka03:9092')
+
+ self.assertItemsEqual(
+ [('kafka01', 9092), ('kafka02', 9092), ('kafka03', 9092)],
+ client.hosts)
+
+ def test_send_broker_unaware_request_fail(self):
+ 'Tests that call fails when all hosts are unavailable'
+
+ mocked_conns = {
+ ('kafka01', 9092): MagicMock(),
+ ('kafka02', 9092): MagicMock()
+ }
+ # inject KafkaConnection side effects
+ mocked_conns[('kafka01', 9092)].send.side_effect = RuntimeError("kafka01 went away (unittest)")
+ mocked_conns[('kafka02', 9092)].send.side_effect = RuntimeError("Kafka02 went away (unittest)")
+
+ def mock_get_conn(host, port):
+ return mocked_conns[(host, port)]
+
+ # patch to avoid making requests before we want it
+ with patch.object(KafkaClient, 'load_metadata_for_topics'), \
+ patch.object(KafkaClient, '_get_conn', side_effect=mock_get_conn):
+
+ client = KafkaClient(hosts=['kafka01:9092', 'kafka02:9092'])
+
+ self.assertRaises(
+ KafkaUnavailableError,
+ client._send_broker_unaware_request,
+ 1, 'fake request')
+
+ for key, conn in mocked_conns.iteritems():
+ conn.send.assert_called_with(1, 'fake request')
+
+ def test_send_broker_unaware_request(self):
+ 'Tests that call works when at least one of the host is available'
+
+ mocked_conns = {
+ ('kafka01', 9092): MagicMock(),
+ ('kafka02', 9092): MagicMock(),
+ ('kafka03', 9092): MagicMock()
+ }
+ # inject KafkaConnection side effects
+ mocked_conns[('kafka01', 9092)].send.side_effect = RuntimeError("kafka01 went away (unittest)")
+ mocked_conns[('kafka02', 9092)].recv.return_value = 'valid response'
+ mocked_conns[('kafka03', 9092)].send.side_effect = RuntimeError("kafka03 went away (unittest)")
+
+ def mock_get_conn(host, port):
+ return mocked_conns[(host, port)]
+
+ # patch to avoid making requests before we want it
+ with patch.object(KafkaClient, 'load_metadata_for_topics'), \
+ patch.object(KafkaClient, '_get_conn', side_effect=mock_get_conn):
+
+ client = KafkaClient(hosts='kafka01:9092,kafka02:9092')
+
+ resp = client._send_broker_unaware_request(1, 'fake request')
+
+ self.assertEqual('valid response', resp)
+ mocked_conns[('kafka02', 9092)].recv.assert_called_with(1)
+
@patch('kafka.client.KafkaConnection')
@patch('kafka.client.KafkaProtocol')
def test_load_metadata(self, protocol, conn):
@@ -416,7 +533,7 @@ class TestKafkaClient(unittest.TestCase):
protocol.decode_metadata_response.return_value = (brokers, topics)
# client loads metadata at init
- client = KafkaClient(host='broker_1', port=4567)
+ client = KafkaClient(hosts=['broker_1:4567'])
self.assertDictEqual({
TopicAndPartition('topic_1', 0): brokers[1],
TopicAndPartition('topic_noleader', 0): None,
@@ -440,7 +557,7 @@ class TestKafkaClient(unittest.TestCase):
topics = {'topic_no_partitions': {}}
protocol.decode_metadata_response.return_value = (brokers, topics)
- client = KafkaClient(host='broker_1', port=4567)
+ client = KafkaClient(hosts=['broker_1:4567'])
# topic metadata is loaded but empty
self.assertDictEqual({}, client.topics_to_brokers)
@@ -473,7 +590,7 @@ class TestKafkaClient(unittest.TestCase):
topics = {'topic_no_partitions': {}}
protocol.decode_metadata_response.return_value = (brokers, topics)
- client = KafkaClient(host='broker_1', port=4567)
+ client = KafkaClient(hosts=['broker_1:4567'])
self.assertDictEqual({}, client.topics_to_brokers)
self.assertRaises(
@@ -499,7 +616,7 @@ class TestKafkaClient(unittest.TestCase):
}
protocol.decode_metadata_response.return_value = (brokers, topics)
- client = KafkaClient(host='broker_1', port=4567)
+ client = KafkaClient(hosts=['broker_1:4567'])
self.assertDictEqual(
{
TopicAndPartition('topic_noleader', 0): None,
@@ -535,7 +652,7 @@ class TestKafkaClient(unittest.TestCase):
}
protocol.decode_metadata_response.return_value = (brokers, topics)
- client = KafkaClient(host='broker_1', port=4567)
+ client = KafkaClient(hosts=['broker_1:4567'])
requests = [ProduceRequest(
"topic_noleader", 0,