summaryrefslogtreecommitdiff
path: root/test/test_producer.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_producer.py')
-rw-r--r--test/test_producer.py10
1 files changed, 5 insertions, 5 deletions
diff --git a/test/test_producer.py b/test/test_producer.py
index cbc1773..f62b97a 100644
--- a/test/test_producer.py
+++ b/test/test_producer.py
@@ -10,7 +10,7 @@ from . import unittest
from kafka import KafkaClient, SimpleProducer, KeyedProducer
from kafka.common import (
AsyncProducerQueueFull, FailedPayloadsError, NotLeaderForPartitionError,
- ProduceResponsePayload, RetryOptions, TopicAndPartition
+ ProduceResponsePayload, RetryOptions, TopicPartition
)
from kafka.producer.base import Producer, _send_upstream
from kafka.protocol import CODEC_NONE
@@ -156,7 +156,7 @@ class TestKafkaProducerSendUpstream(unittest.TestCase):
# lets create a queue and add 10 messages for 1 partition
for i in range(10):
- self.queue.put((TopicAndPartition("test", 0), "msg %i", "key %i"))
+ self.queue.put((TopicPartition("test", 0), "msg %i", "key %i"))
self._run_process()
@@ -172,7 +172,7 @@ class TestKafkaProducerSendUpstream(unittest.TestCase):
# lets create a queue and add 10 messages for 10 different partitions
# to show how retries should work ideally
for i in range(10):
- self.queue.put((TopicAndPartition("test", i), "msg %i", "key %i"))
+ self.queue.put((TopicPartition("test", i), "msg %i", "key %i"))
# Mock offsets counter for closure
offsets = collections.defaultdict(lambda: collections.defaultdict(lambda: 0))
@@ -206,7 +206,7 @@ class TestKafkaProducerSendUpstream(unittest.TestCase):
# lets create a queue and add 10 messages for 10 different partitions
# to show how retries should work ideally
for i in range(10):
- self.queue.put((TopicAndPartition("test", i), "msg %i" % i, "key %i" % i))
+ self.queue.put((TopicPartition("test", i), "msg %i" % i, "key %i" % i))
def send_side_effect(reqs, *args, **kwargs):
return [FailedPayloadsError(req) for req in reqs]
@@ -226,7 +226,7 @@ class TestKafkaProducerSendUpstream(unittest.TestCase):
def test_async_producer_not_leader(self):
for i in range(10):
- self.queue.put((TopicAndPartition("test", i), "msg %i", "key %i"))
+ self.queue.put((TopicPartition("test", i), "msg %i", "key %i"))
# Mock offsets counter for closure
offsets = collections.defaultdict(lambda: collections.defaultdict(lambda: 0))