diff options
-rw-r--r-- | kafka/producer/simple.py | 2 | ||||
-rw-r--r-- | test/test_producer.py | 21 | ||||
-rw-r--r-- | test/test_producer_integration.py | 15 |
3 files changed, 18 insertions, 20 deletions
diff --git a/kafka/producer/simple.py b/kafka/producer/simple.py index 13e60d9..78d5a4d 100644 --- a/kafka/producer/simple.py +++ b/kafka/producer/simple.py @@ -33,7 +33,7 @@ class SimpleProducer(Producer): def _next_partition(self, topic): if topic not in self.partition_cycles: if not self.client.has_metadata_for_topic(topic): - self.client.load_metadata_for_topics(topic) + self.client.ensure_topic_exists(topic) self.partition_cycles[topic] = cycle(self.client.get_partition_ids_for_topic(topic)) diff --git a/test/test_producer.py b/test/test_producer.py index 31282bf..cc65a0a 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -97,19 +97,20 @@ class TestKafkaProducer(unittest.TestCase): def test_producer_sync_fail_on_error(self): error = FailedPayloadsError('failure') with patch.object(KafkaClient, 'load_metadata_for_topics'): - with patch.object(KafkaClient, 'get_partition_ids_for_topic', return_value=[0, 1]): - with patch.object(KafkaClient, '_send_broker_aware_request', return_value = [error]): + with patch.object(KafkaClient, 'ensure_topic_exists'): + with patch.object(KafkaClient, 'get_partition_ids_for_topic', return_value=[0, 1]): + with patch.object(KafkaClient, '_send_broker_aware_request', return_value = [error]): - client = KafkaClient(MagicMock()) - producer = SimpleProducer(client, async=False, sync_fail_on_error=False) + client = KafkaClient(MagicMock()) + producer = SimpleProducer(client, async=False, sync_fail_on_error=False) - # This should not raise - (response,) = producer.send_messages('foobar', b'test message') - self.assertEqual(response, error) + # This should not raise + (response,) = producer.send_messages('foobar', b'test message') + self.assertEqual(response, error) - producer = SimpleProducer(client, async=False, sync_fail_on_error=True) - with self.assertRaises(FailedPayloadsError): - producer.send_messages('foobar', b'test message') + producer = SimpleProducer(client, async=False, sync_fail_on_error=True) + with self.assertRaises(FailedPayloadsError): + producer.send_messages('foobar', b'test message') def test_cleanup_is_not_called_on_stopped_producer(self): producer = Producer(MagicMock(), async=True) diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py index 34963d3..e522e00 100644 --- a/test/test_producer_integration.py +++ b/test/test_producer_integration.py @@ -133,6 +133,12 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): # SimpleProducer Tests # ############################ + def test_simple_producer_new_topic(self): + producer = SimpleProducer(self.client) + resp = producer.send_messages('new_topic', self.msg('foobar')) + self.assert_produce_response(resp, 0) + producer.stop() + def test_simple_producer(self): partitions = self.client.get_partition_ids_for_topic(self.topic) start_offsets = [self.current_offset(self.topic, p) for p in partitions] @@ -157,15 +163,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): producer.stop() - def test_produce__new_topic_fails_with_reasonable_error(self): - new_topic = 'new_topic_{guid}'.format(guid = str(uuid.uuid4())).encode('utf-8') - producer = SimpleProducer(self.client, random_start=False) - - # At first it doesn't exist - with self.assertRaises((UnknownTopicOrPartitionError, - LeaderNotAvailableError)): - producer.send_messages(new_topic, self.msg("one")) - def test_producer_random_order(self): producer = SimpleProducer(self.client, random_start=True) resp1 = producer.send_messages(self.topic, self.msg("one"), self.msg("two")) |