summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/client.py4
-rw-r--r--test/test_client.py25
2 files changed, 27 insertions, 2 deletions
diff --git a/kafka/client.py b/kafka/client.py
index 38136af..8e62e0d 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -132,8 +132,8 @@ class KafkaClient(object):
Params
======
- payloads: list of object-like entities with a topic and
- partition attribute
+ payloads: list of object-like entities with a topic (str) and
+ partition (int) attribute
encode_fn: a method to encode the list of payloads to a request body,
must accept client_id, correlation_id, and payloads as
keyword arguments
diff --git a/test/test_client.py b/test/test_client.py
index 7744ede..bc11857 100644
--- a/test/test_client.py
+++ b/test/test_client.py
@@ -295,3 +295,28 @@ class TestKafkaClient(unittest2.TestCase):
with self.assertRaises(LeaderNotAvailableError):
client.send_produce_request(requests)
+ @patch('kafka.client.KafkaConnection')
+ @patch('kafka.client.KafkaProtocol')
+ def test_send_produce_request_raises_when_topic_unknown(self, protocol, conn):
+
+ conn.recv.return_value = 'response' # anything but None
+
+ brokers = [
+ BrokerMetadata(0, 'broker_1', 4567),
+ BrokerMetadata(1, 'broker_2', 5678)
+ ]
+
+ topics = [
+ TopicMetadata('topic_doesnt_exist', UNKNOWN_TOPIC_OR_PARTITION, []),
+ ]
+ protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
+
+ client = KafkaClient(hosts=['broker_1:4567'])
+
+ requests = [ProduceRequest(
+ "topic_doesnt_exist", 0,
+ [create_message("a"), create_message("b")])]
+
+ with self.assertRaises(UnknownTopicOrPartitionError):
+ client.send_produce_request(requests)
+