summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2015-12-17 23:28:35 -0800
committerDana Powers <dana.powers@gmail.com>2015-12-17 23:28:35 -0800
commit04a3a32eb724fee8c6c21195ea005fe8f35e9fb7 (patch)
tree9526d8cb2571ffe97dac71935eb74d3e281e4da7
parentba4ec47af00d4e45f9a48d36be16b20aece6556e (diff)
parentf89b9da1f3dc15cc728be22d532e4927d33c47f7 (diff)
downloadkafka-python-04a3a32eb724fee8c6c21195ea005fe8f35e9fb7.tar.gz
Merge pull request #489 from dpkp/producer_new_topic
Handle new topic auto-creation in SimpleProducer.send_messages
-rw-r--r--kafka/producer/simple.py2
-rw-r--r--test/test_producer.py21
-rw-r--r--test/test_producer_integration.py15
3 files changed, 18 insertions, 20 deletions
diff --git a/kafka/producer/simple.py b/kafka/producer/simple.py
index 13e60d9..78d5a4d 100644
--- a/kafka/producer/simple.py
+++ b/kafka/producer/simple.py
@@ -33,7 +33,7 @@ class SimpleProducer(Producer):
def _next_partition(self, topic):
if topic not in self.partition_cycles:
if not self.client.has_metadata_for_topic(topic):
- self.client.load_metadata_for_topics(topic)
+ self.client.ensure_topic_exists(topic)
self.partition_cycles[topic] = cycle(self.client.get_partition_ids_for_topic(topic))
diff --git a/test/test_producer.py b/test/test_producer.py
index 31282bf..cc65a0a 100644
--- a/test/test_producer.py
+++ b/test/test_producer.py
@@ -97,19 +97,20 @@ class TestKafkaProducer(unittest.TestCase):
def test_producer_sync_fail_on_error(self):
error = FailedPayloadsError('failure')
with patch.object(KafkaClient, 'load_metadata_for_topics'):
- with patch.object(KafkaClient, 'get_partition_ids_for_topic', return_value=[0, 1]):
- with patch.object(KafkaClient, '_send_broker_aware_request', return_value = [error]):
+ with patch.object(KafkaClient, 'ensure_topic_exists'):
+ with patch.object(KafkaClient, 'get_partition_ids_for_topic', return_value=[0, 1]):
+ with patch.object(KafkaClient, '_send_broker_aware_request', return_value = [error]):
- client = KafkaClient(MagicMock())
- producer = SimpleProducer(client, async=False, sync_fail_on_error=False)
+ client = KafkaClient(MagicMock())
+ producer = SimpleProducer(client, async=False, sync_fail_on_error=False)
- # This should not raise
- (response,) = producer.send_messages('foobar', b'test message')
- self.assertEqual(response, error)
+ # This should not raise
+ (response,) = producer.send_messages('foobar', b'test message')
+ self.assertEqual(response, error)
- producer = SimpleProducer(client, async=False, sync_fail_on_error=True)
- with self.assertRaises(FailedPayloadsError):
- producer.send_messages('foobar', b'test message')
+ producer = SimpleProducer(client, async=False, sync_fail_on_error=True)
+ with self.assertRaises(FailedPayloadsError):
+ producer.send_messages('foobar', b'test message')
def test_cleanup_is_not_called_on_stopped_producer(self):
producer = Producer(MagicMock(), async=True)
diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py
index 34963d3..e522e00 100644
--- a/test/test_producer_integration.py
+++ b/test/test_producer_integration.py
@@ -133,6 +133,12 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
# SimpleProducer Tests #
############################
+ def test_simple_producer_new_topic(self):
+ producer = SimpleProducer(self.client)
+ resp = producer.send_messages('new_topic', self.msg('foobar'))
+ self.assert_produce_response(resp, 0)
+ producer.stop()
+
def test_simple_producer(self):
partitions = self.client.get_partition_ids_for_topic(self.topic)
start_offsets = [self.current_offset(self.topic, p) for p in partitions]
@@ -157,15 +163,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
producer.stop()
- def test_produce__new_topic_fails_with_reasonable_error(self):
- new_topic = 'new_topic_{guid}'.format(guid = str(uuid.uuid4())).encode('utf-8')
- producer = SimpleProducer(self.client, random_start=False)
-
- # At first it doesn't exist
- with self.assertRaises((UnknownTopicOrPartitionError,
- LeaderNotAvailableError)):
- producer.send_messages(new_topic, self.msg("one"))
-
def test_producer_random_order(self):
producer = SimpleProducer(self.client, random_start=True)
resp1 = producer.send_messages(self.topic, self.msg("one"), self.msg("two"))