diff options
Diffstat (limited to 'test/test_producer_integration.py')
-rw-r--r-- | test/test_producer_integration.py | 24 |
1 files changed, 13 insertions, 11 deletions
diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py index 7d3a180..19d3a6d 100644 --- a/test/test_producer_integration.py +++ b/test/test_producer_integration.py @@ -2,11 +2,18 @@ import os import time import uuid -from kafka import * # noqa -from kafka.common import * # noqa -from kafka.codec import has_gzip, has_snappy -from fixtures import ZookeeperFixture, KafkaFixture -from testutil import * +from kafka import ( + SimpleProducer, KeyedProducer, + create_message, create_gzip_message, create_snappy_message, + RoundRobinPartitioner, HashedPartitioner +) +from kafka.common import ( + FetchRequest, ProduceRequest, UnknownTopicOrPartitionError +) +from kafka.codec import has_snappy + +from test.fixtures import ZookeeperFixture, KafkaFixture +from test.testutil import KafkaIntegrationTestCase, kafka_versions class TestKafkaProducerIntegration(KafkaIntegrationTestCase): topic = 'produce_topic' @@ -149,7 +156,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): # At first it doesn't exist with self.assertRaises(UnknownTopicOrPartitionError): - resp = producer.send_messages(new_topic, self.msg("one")) + producer.send_messages(new_topic, self.msg("one")) @kafka_versions("all") def test_producer_random_order(self): @@ -219,7 +226,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): @kafka_versions("all") def test_acks_none(self): start_offset0 = self.current_offset(self.topic, 0) - start_offset1 = self.current_offset(self.topic, 1) producer = SimpleProducer(self.client, req_acks=SimpleProducer.ACK_NOT_REQUIRED) resp = producer.send_messages(self.topic, self.msg("one")) @@ -231,7 +237,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): @kafka_versions("all") def test_acks_local_write(self): start_offset0 = self.current_offset(self.topic, 0) - start_offset1 = self.current_offset(self.topic, 1) producer = SimpleProducer(self.client, req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE) resp = producer.send_messages(self.topic, self.msg("one")) @@ -244,7 +249,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): @kafka_versions("all") def test_acks_cluster_commit(self): start_offset0 = self.current_offset(self.topic, 0) - start_offset1 = self.current_offset(self.topic, 1) producer = SimpleProducer( self.client, @@ -360,7 +364,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): @kafka_versions("all") def test_async_simple_producer(self): start_offset0 = self.current_offset(self.topic, 0) - start_offset1 = self.current_offset(self.topic, 1) producer = SimpleProducer(self.client, async=True) resp = producer.send_messages(self.topic, self.msg("one")) @@ -373,7 +376,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): @kafka_versions("all") def test_async_keyed_producer(self): start_offset0 = self.current_offset(self.topic, 0) - start_offset1 = self.current_offset(self.topic, 1) producer = KeyedProducer(self.client, partitioner = RoundRobinPartitioner, async=True) |