diff options
author | Dana Powers <dana.powers@rd.io> | 2015-12-09 15:37:17 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2015-12-09 15:44:15 -0800 |
commit | a3ec9bd8e8c730c9f6715b536c0c590230fc2e28 (patch) | |
tree | eaebf6dc87ffb83d7256497355c5559f5eec5d72 /test | |
parent | ad030ccd4df57305bb624b03eddaa2641f956160 (diff) | |
download | kafka-python-a3ec9bd8e8c730c9f6715b536c0c590230fc2e28.tar.gz |
Update references to kafka.common Request/Response (now Payload)
Diffstat (limited to 'test')
-rw-r--r-- | test/test_client_integration.py | 14 | ||||
-rw-r--r-- | test/test_consumer_integration.py | 4 | ||||
-rw-r--r-- | test/test_producer.py | 10 | ||||
-rw-r--r-- | test/test_producer_integration.py | 6 | ||||
-rw-r--r-- | test/testutil.py | 4 |
5 files changed, 19 insertions, 19 deletions
diff --git a/test/test_client_integration.py b/test/test_client_integration.py index 8853350..70da4a3 100644 --- a/test/test_client_integration.py +++ b/test/test_client_integration.py @@ -1,8 +1,8 @@ import os from kafka.common import ( - FetchRequest, OffsetCommitRequest, OffsetFetchRequest, - KafkaTimeoutError, ProduceRequest + FetchRequestPayload, OffsetCommitRequest, OffsetFetchRequest, + KafkaTimeoutError, ProduceRequestPayload ) from kafka.protocol import create_message @@ -29,7 +29,7 @@ class TestKafkaClientIntegration(KafkaIntegrationTestCase): @kafka_versions("all") def test_consume_none(self): - fetch = FetchRequest(self.bytes_topic, 0, 0, 1024) + fetch = FetchRequestPayload(self.bytes_topic, 0, 0, 1024) fetch_resp, = self.client.send_fetch_request([fetch]) self.assertEqual(fetch_resp.error, 0) @@ -57,16 +57,16 @@ class TestKafkaClientIntegration(KafkaIntegrationTestCase): self.client.ensure_topic_exists(b'bar') requests = [ - ProduceRequest( + ProduceRequestPayload( b'foo', 0, [create_message(b'a'), create_message(b'b')]), - ProduceRequest( + ProduceRequestPayload( b'bar', 1, [create_message(b'a'), create_message(b'b')]), - ProduceRequest( + ProduceRequestPayload( b'foo', 1, [create_message(b'a'), create_message(b'b')]), - ProduceRequest( + ProduceRequestPayload( b'bar', 0, [create_message(b'a'), create_message(b'b')]), ] diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index fee53f5..d536537 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -7,7 +7,7 @@ from kafka import ( KafkaConsumer, MultiProcessConsumer, SimpleConsumer, create_message ) from kafka.common import ( - ProduceRequest, ConsumerFetchSizeTooSmall, ConsumerTimeout, + ProduceRequestPayload, ConsumerFetchSizeTooSmall, ConsumerTimeout, OffsetOutOfRangeError ) from kafka.consumer.base import MAX_FETCH_BUFFER_SIZE_BYTES @@ -41,7 +41,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): def send_messages(self, partition, messages): messages = [ create_message(self.msg(str(msg))) for msg in messages ] - produce = ProduceRequest(self.bytes_topic, partition, messages = messages) + produce = ProduceRequestPayload(self.bytes_topic, partition, messages = messages) resp, = self.client.send_produce_request([produce]) self.assertEqual(resp.error, 0) diff --git a/test/test_producer.py b/test/test_producer.py index 31282bf..cbc1773 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -10,7 +10,7 @@ from . import unittest from kafka import KafkaClient, SimpleProducer, KeyedProducer from kafka.common import ( AsyncProducerQueueFull, FailedPayloadsError, NotLeaderForPartitionError, - ProduceResponse, RetryOptions, TopicAndPartition + ProduceResponsePayload, RetryOptions, TopicAndPartition ) from kafka.producer.base import Producer, _send_upstream from kafka.protocol import CODEC_NONE @@ -186,7 +186,7 @@ class TestKafkaProducerSendUpstream(unittest.TestCase): offset = offsets[req.topic][req.partition] offsets[req.topic][req.partition] += len(req.messages) responses.append( - ProduceResponse(req.topic, req.partition, 0, offset) + ProduceResponsePayload(req.topic, req.partition, 0, offset) ) return responses @@ -234,8 +234,8 @@ class TestKafkaProducerSendUpstream(unittest.TestCase): def send_side_effect(reqs, *args, **kwargs): if self.client.is_first_time: self.client.is_first_time = False - return [ProduceResponse(req.topic, req.partition, - NotLeaderForPartitionError.errno, -1) + return [ProduceResponsePayload(req.topic, req.partition, + NotLeaderForPartitionError.errno, -1) for req in reqs] responses = [] @@ -243,7 +243,7 @@ class TestKafkaProducerSendUpstream(unittest.TestCase): offset = offsets[req.topic][req.partition] offsets[req.topic][req.partition] += len(req.messages) responses.append( - ProduceResponse(req.topic, req.partition, 0, offset) + ProduceResponsePayload(req.topic, req.partition, 0, offset) ) return responses diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py index c99ed63..ee0b2fd 100644 --- a/test/test_producer_integration.py +++ b/test/test_producer_integration.py @@ -11,7 +11,7 @@ from kafka import ( ) from kafka.codec import has_snappy from kafka.common import ( - FetchRequest, ProduceRequest, + FetchRequestPayload, ProduceRequestPayload, UnknownTopicOrPartitionError, LeaderNotAvailableError ) from kafka.producer.base import Producer @@ -488,7 +488,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): def assert_produce_request(self, messages, initial_offset, message_ct, partition=0): - produce = ProduceRequest(self.bytes_topic, partition, messages=messages) + produce = ProduceRequestPayload(self.bytes_topic, partition, messages=messages) # There should only be one response message from the server. # This will throw an exception if there's more than one. @@ -506,7 +506,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): # There should only be one response message from the server. # This will throw an exception if there's more than one. - resp, = self.client.send_fetch_request([ FetchRequest(self.bytes_topic, partition, start_offset, 1024) ]) + resp, = self.client.send_fetch_request([FetchRequestPayload(self.bytes_topic, partition, start_offset, 1024)]) self.assertEqual(resp.error, 0) self.assertEqual(resp.partition, partition) diff --git a/test/testutil.py b/test/testutil.py index 3a1d2ba..b5b2529 100644 --- a/test/testutil.py +++ b/test/testutil.py @@ -11,7 +11,7 @@ from six.moves import xrange from . import unittest from kafka import KafkaClient -from kafka.common import OffsetRequest +from kafka.common import OffsetRequestPayload from kafka.util import kafka_bytestring __all__ = [ @@ -81,7 +81,7 @@ class KafkaIntegrationTestCase(unittest.TestCase): def current_offset(self, topic, partition): try: - offsets, = self.client.send_offset_request([ OffsetRequest(kafka_bytestring(topic), partition, -1, 1) ]) + offsets, = self.client.send_offset_request([OffsetRequestPayload(kafka_bytestring(topic), partition, -1, 1)]) except: # XXX: We've seen some UnknownErrors here and cant debug w/o server logs self.zk.child.dump_logs() |