summaryrefslogtreecommitdiff
path: root/test/test_producer_integration.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_producer_integration.py')
-rw-r--r--test/test_producer_integration.py24
1 files changed, 13 insertions, 11 deletions
diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py
index 7d3a180..19d3a6d 100644
--- a/test/test_producer_integration.py
+++ b/test/test_producer_integration.py
@@ -2,11 +2,18 @@ import os
import time
import uuid
-from kafka import * # noqa
-from kafka.common import * # noqa
-from kafka.codec import has_gzip, has_snappy
-from fixtures import ZookeeperFixture, KafkaFixture
-from testutil import *
+from kafka import (
+ SimpleProducer, KeyedProducer,
+ create_message, create_gzip_message, create_snappy_message,
+ RoundRobinPartitioner, HashedPartitioner
+)
+from kafka.common import (
+ FetchRequest, ProduceRequest, UnknownTopicOrPartitionError
+)
+from kafka.codec import has_snappy
+
+from test.fixtures import ZookeeperFixture, KafkaFixture
+from test.testutil import KafkaIntegrationTestCase, kafka_versions
class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
topic = 'produce_topic'
@@ -149,7 +156,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
# At first it doesn't exist
with self.assertRaises(UnknownTopicOrPartitionError):
- resp = producer.send_messages(new_topic, self.msg("one"))
+ producer.send_messages(new_topic, self.msg("one"))
@kafka_versions("all")
def test_producer_random_order(self):
@@ -219,7 +226,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
@kafka_versions("all")
def test_acks_none(self):
start_offset0 = self.current_offset(self.topic, 0)
- start_offset1 = self.current_offset(self.topic, 1)
producer = SimpleProducer(self.client, req_acks=SimpleProducer.ACK_NOT_REQUIRED)
resp = producer.send_messages(self.topic, self.msg("one"))
@@ -231,7 +237,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
@kafka_versions("all")
def test_acks_local_write(self):
start_offset0 = self.current_offset(self.topic, 0)
- start_offset1 = self.current_offset(self.topic, 1)
producer = SimpleProducer(self.client, req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE)
resp = producer.send_messages(self.topic, self.msg("one"))
@@ -244,7 +249,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
@kafka_versions("all")
def test_acks_cluster_commit(self):
start_offset0 = self.current_offset(self.topic, 0)
- start_offset1 = self.current_offset(self.topic, 1)
producer = SimpleProducer(
self.client,
@@ -360,7 +364,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
@kafka_versions("all")
def test_async_simple_producer(self):
start_offset0 = self.current_offset(self.topic, 0)
- start_offset1 = self.current_offset(self.topic, 1)
producer = SimpleProducer(self.client, async=True)
resp = producer.send_messages(self.topic, self.msg("one"))
@@ -373,7 +376,6 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
@kafka_versions("all")
def test_async_keyed_producer(self):
start_offset0 = self.current_offset(self.topic, 0)
- start_offset1 = self.current_offset(self.topic, 1)
producer = KeyedProducer(self.client, partitioner = RoundRobinPartitioner, async=True)