summaryrefslogtreecommitdiff
path: root/test/test_consumer_integration.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2014-09-17 13:49:55 -0700
committerDana Powers <dana.powers@rd.io>2014-12-15 12:43:45 -0800
commit9e1b20306919eaa14774befead190ff52df19ba4 (patch)
treef1fee15f9cc88714b7622c04738663ed26b42b5a /test/test_consumer_integration.py
parentb80e83b7335a92fcbfcf25e38d51f24fc00c20ea (diff)
downloadkafka-python-9e1b20306919eaa14774befead190ff52df19ba4.tar.gz
Merge conflict w/ assertEqual (assertEquals deprecated)
Diffstat (limited to 'test/test_consumer_integration.py')
-rw-r--r--test/test_consumer_integration.py28
1 files changed, 15 insertions, 13 deletions
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py
index 1d28f8e..007f788 100644
--- a/test/test_consumer_integration.py
+++ b/test/test_consumer_integration.py
@@ -39,16 +39,16 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
messages = [ create_message(self.msg(str(msg))) for msg in messages ]
produce = ProduceRequest(self.topic, partition, messages = messages)
resp, = self.client.send_produce_request([produce])
- self.assertEquals(resp.error, 0)
+ self.assertEqual(resp.error, 0)
return [ x.value for x in messages ]
def assert_message_count(self, messages, num_messages):
# Make sure we got them all
- self.assertEquals(len(messages), num_messages)
+ self.assertEqual(len(messages), num_messages)
# Make sure there are no duplicates
- self.assertEquals(len(set(messages)), num_messages)
+ self.assertEqual(len(set(messages)), num_messages)
def consumer(self, **kwargs):
if os.environ['KAFKA_VERSION'] == "0.8.0":
@@ -140,9 +140,11 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
self.send_messages(0, range(0, 10))
self.send_messages(1, range(10, 20))
- self.assertEquals(consumer.pending(), 20)
- self.assertEquals(consumer.pending(partitions=[0]), 10)
- self.assertEquals(consumer.pending(partitions=[1]), 10)
+ consumer = self.consumer()
+
+ self.assertEqual(consumer.pending(), 20)
+ self.assertEqual(consumer.pending(partitions=[0]), 10)
+ self.assertEqual(consumer.pending(partitions=[1]), 10)
# move to last message, so one partition should have 1 pending
# message and other 0
@@ -201,9 +203,9 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer = MultiProcessConsumer(self.client, "group1", self.topic, auto_commit=False)
- self.assertEquals(consumer.pending(), 20)
- self.assertEquals(consumer.pending(partitions=[0]), 10)
- self.assertEquals(consumer.pending(partitions=[1]), 10)
+ self.assertEqual(consumer.pending(), 20)
+ self.assertEqual(consumer.pending(partitions=[0]), 10)
+ self.assertEqual(consumer.pending(partitions=[1]), 10)
consumer.stop()
@@ -251,7 +253,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
# Consume giant message successfully
message = big_consumer.get_message(block=False, timeout=10)
self.assertIsNotNone(message)
- self.assertEquals(message.message.value, huge_message)
+ self.assertEqual(message.message.value, huge_message)
big_consumer.stop()
@@ -299,7 +301,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer = self.consumer(buffer_size=1024, max_buffer_size=2048)
messages = [ message for message in consumer ]
- self.assertEquals(len(messages), 2)
+ self.assertEqual(len(messages), 2)
@kafka_versions("all")
def test_kafka_consumer(self):
@@ -319,8 +321,8 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
if n >= 200:
break
- self.assertEquals(len(messages[0]), 100)
- self.assertEquals(len(messages[1]), 100)
+ self.assertEqual(len(messages[0]), 100)
+ self.assertEqual(len(messages[1]), 100)
@kafka_versions("all")
def test_kafka_consumer__blocking(self):