summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMark Roberts <wizzat@gmail.com>2014-04-08 01:28:59 -0700
committerMark Roberts <wizzat@gmail.com>2014-04-08 01:28:59 -0700
commitf0def436c6c9499aa384d8a3fe5319e0c8b9d7da (patch)
treeb652b45eb9001e76242c18e2ea84f17d8f44dd3c
parentd59cbf62067d5991c92ba388d31814e61cf3f3fa (diff)
downloadkafka-python-f0def436c6c9499aa384d8a3fe5319e0c8b9d7da.tar.gz
Explicit testing of protocol errors. Make tests more explicit, and start working on intermittent failures in test_encode_fetch_request and test_encode_produc_request
-rw-r--r--kafka/common.py4
-rw-r--r--kafka/protocol.py7
-rw-r--r--test/test_protocol.py47
3 files changed, 41 insertions, 17 deletions
diff --git a/kafka/common.py b/kafka/common.py
index 005e6dd..830e34d 100644
--- a/kafka/common.py
+++ b/kafka/common.py
@@ -118,3 +118,7 @@ class ConsumerFetchSizeTooSmall(KafkaError):
class ConsumerNoMoreData(KafkaError):
pass
+
+
+class ProtocolError(KafkaError):
+ pass
diff --git a/kafka/protocol.py b/kafka/protocol.py
index 25be023..9b8f3b3 100644
--- a/kafka/protocol.py
+++ b/kafka/protocol.py
@@ -8,7 +8,7 @@ from kafka.codec import (
from kafka.common import (
BrokerMetadata, PartitionMetadata, Message, OffsetAndMessage,
ProduceResponse, FetchResponse, OffsetResponse,
- OffsetCommitResponse, OffsetFetchResponse,
+ OffsetCommitResponse, OffsetFetchResponse, ProtocolError,
BufferUnderflowError, ChecksumError, ConsumerFetchSizeTooSmall
)
from kafka.util import (
@@ -68,8 +68,7 @@ class KafkaProtocol(object):
message_set = ""
for message in messages:
encoded_message = KafkaProtocol._encode_message(message)
- message_set += struct.pack('>qi%ds' % len(encoded_message), 0,
- len(encoded_message), encoded_message)
+ message_set += struct.pack('>qi%ds' % len(encoded_message), 0, len(encoded_message), encoded_message)
return message_set
@classmethod
@@ -96,7 +95,7 @@ class KafkaProtocol(object):
crc = zlib.crc32(msg)
msg = struct.pack('>i%ds' % len(msg), crc, msg)
else:
- raise Exception("Unexpected magic number: %d" % message.magic)
+ raise ProtocolError("Unexpected magic number: %d" % message.magic)
return msg
@classmethod
diff --git a/test/test_protocol.py b/test/test_protocol.py
index 818363c..430e65e 100644
--- a/test/test_protocol.py
+++ b/test/test_protocol.py
@@ -6,7 +6,7 @@ from kafka.common import (
ProduceRequest, FetchRequest, Message, ChecksumError,
ConsumerFetchSizeTooSmall, ProduceResponse, FetchResponse,
OffsetAndMessage, BrokerMetadata, PartitionMetadata,
- TopicAndPartition, KafkaUnavailableError,
+ TopicAndPartition, KafkaUnavailableError, ProtocolError,
LeaderUnavailableError, PartitionUnavailableError
)
from kafka.codec import (
@@ -93,12 +93,20 @@ class TestProtocol(unittest.TestCase):
def test_encode_message(self):
message = create_message("test", "key")
encoded = KafkaProtocol._encode_message(message)
- expect = "\xaa\xf1\x8f[\x00\x00\x00\x00\x00\x03key\x00\x00\x00\x04test"
+ expect = (
+ "\xaa\xf1\x8f\x5b" # CRC
+ "\x00" # Magic
+ "\x00" # Flags
+ "\x00\x00\x00\x03" # Key Length
+ "key" # Key contents
+ "\x00\x00\x00\x04" # Msg Length
+ "test" # Msg contents
+ )
self.assertEqual(encoded, expect)
def test_encode_message_failure(self):
- self.assertRaises(Exception, KafkaProtocol._encode_message,
- Message(1, 0, "key", "test"))
+ with self.assertRaises(ProtocolError):
+ KafkaProtocol._encode_message(Message(1, 0, "key", "test"))
def test_encode_message_set(self):
message_set = [create_message("v1", "k1"), create_message("v2", "k2")]
@@ -224,15 +232,28 @@ class TestProtocol(unittest.TestCase):
def test_encode_fetch_request(self):
requests = [FetchRequest("topic1", 0, 10, 1024),
FetchRequest("topic2", 1, 20, 100)]
- expect = ('\x00\x00\x00Y\x00\x01\x00\x00\x00\x00\x00\x03\x00\x07'
- 'client1\xff\xff\xff\xff\x00\x00\x00\x02\x00\x00\x00d\x00'
- '\x00\x00\x02\x00\x06topic1\x00\x00\x00\x01\x00\x00\x00\x00'
- '\x00\x00\x00\x00\x00\x00\x00\n\x00\x00\x04\x00\x00\x06'
- 'topic2\x00\x00\x00\x01\x00\x00\x00\x01\x00\x00\x00\x00\x00'
- '\x00\x00\x14\x00\x00\x00d')
- encoded = KafkaProtocol.encode_fetch_request("client1", 3, requests, 2,
- 100)
- self.assertEqual(encoded, expect)
+
+ possibility1 = (
+ '\x00\x00\x00Y\x00\x01\x00\x00\x00\x00\x00\x03\x00\x07'
+ 'client1\xff\xff\xff\xff\x00\x00\x00\x02\x00\x00\x00d\x00'
+ '\x00\x00\x02\x00\x06topic1\x00\x00\x00\x01\x00\x00\x00\x00'
+ '\x00\x00\x00\x00\x00\x00\x00\n\x00\x00\x04\x00\x00\x06'
+ 'topic2\x00\x00\x00\x01\x00\x00\x00\x01\x00\x00\x00\x00\x00'
+ '\x00\x00\x14\x00\x00\x00d'
+ )
+
+ # Todo, this isn't currently different
+ possibility2 = (
+ '\x00\x00\x00Y\x00\x01\x00\x00\x00\x00\x00\x03\x00\x07'
+ 'client1\xff\xff\xff\xff\x00\x00\x00\x02\x00\x00\x00d\x00'
+ '\x00\x00\x02\x00\x06topic1\x00\x00\x00\x01\x00\x00\x00\x00'
+ '\x00\x00\x00\x00\x00\x00\x00\n\x00\x00\x04\x00\x00\x06'
+ 'topic2\x00\x00\x00\x01\x00\x00\x00\x01\x00\x00\x00\x00\x00'
+ '\x00\x00\x14\x00\x00\x00d'
+ )
+
+ encoded = KafkaProtocol.encode_fetch_request("client1", 3, requests, 2, 100)
+ self.assertIn(encoded, [ possibility1, possibility2 ])
def test_decode_fetch_response(self):
t1 = "topic1"