diff options
-rw-r--r-- | kafka/consumer/base.py | 4 | ||||
-rw-r--r-- | kafka/consumer/kafka.py | 11 | ||||
-rw-r--r-- | kafka/consumer/simple.py | 26 | ||||
-rw-r--r-- | kafka/producer/base.py | 17 | ||||
-rw-r--r-- | test/test_client_integration.py | 14 | ||||
-rw-r--r-- | test/test_consumer_integration.py | 4 | ||||
-rw-r--r-- | test/test_producer.py | 10 | ||||
-rw-r--r-- | test/test_producer_integration.py | 6 | ||||
-rw-r--r-- | test/testutil.py | 4 |
9 files changed, 46 insertions, 50 deletions
diff --git a/kafka/consumer/base.py b/kafka/consumer/base.py index c9f6e48..034d35c 100644 --- a/kafka/consumer/base.py +++ b/kafka/consumer/base.py @@ -7,7 +7,7 @@ from threading import Lock import kafka.common from kafka.common import ( - OffsetRequest, OffsetCommitRequest, OffsetFetchRequest, + OffsetRequestPayload, OffsetCommitRequest, OffsetFetchRequest, UnknownTopicOrPartitionError, check_error, KafkaError ) @@ -217,7 +217,7 @@ class Consumer(object): reqs = [] for partition in partitions: - reqs.append(OffsetRequest(self.topic, partition, -1, 1)) + reqs.append(OffsetRequestPayload(self.topic, partition, -1, 1)) resps = self.client.send_offset_request(reqs) for resp in resps: diff --git a/kafka/consumer/kafka.py b/kafka/consumer/kafka.py index 3ef106c..1bd3def 100644 --- a/kafka/consumer/kafka.py +++ b/kafka/consumer/kafka.py @@ -11,7 +11,8 @@ import six from kafka.client import KafkaClient from kafka.common import ( - OffsetFetchRequest, OffsetCommitRequest, OffsetRequest, FetchRequest, + OffsetFetchRequest, OffsetCommitRequest, + OffsetRequestPayload, FetchRequestPayload, check_error, NotLeaderForPartitionError, UnknownTopicOrPartitionError, OffsetOutOfRangeError, RequestTimedOutError, KafkaMessage, ConsumerTimeout, FailedPayloadsError, KafkaUnavailableError, KafkaConfigurationError @@ -333,9 +334,9 @@ class KafkaConsumer(object): 'No fetch offsets found when calling fetch_messages' ) - fetches = [FetchRequest(topic, partition, - self._offsets.fetch[(topic, partition)], - max_bytes) + fetches = [FetchRequestPayload(topic, partition, + self._offsets.fetch[(topic, partition)], + max_bytes) for (topic, partition) in self._topics] # send_fetch_request will batch topic/partition requests by leader @@ -425,7 +426,7 @@ class KafkaConsumer(object): topic / partition. See: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetAPI """ - reqs = [OffsetRequest(topic, partition, request_time_ms, max_num_offsets)] + reqs = [OffsetRequestPayload(topic, partition, request_time_ms, max_num_offsets)] (resp,) = self._client.send_offset_request(reqs) diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py index 7c63246..1c2aee6 100644 --- a/kafka/consumer/simple.py +++ b/kafka/consumer/simple.py @@ -27,7 +27,7 @@ from .base import ( NO_MESSAGES_WAIT_TIME_SECONDS ) from ..common import ( - FetchRequest, KafkaError, OffsetRequest, + FetchRequestPayload, KafkaError, OffsetRequestPayload, ConsumerFetchSizeTooSmall, ConsumerNoMoreData, UnknownTopicOrPartitionError, NotLeaderForPartitionError, OffsetOutOfRangeError, FailedPayloadsError, check_error @@ -153,9 +153,9 @@ class SimpleConsumer(Consumer): LATEST = -1 EARLIEST = -2 if self.auto_offset_reset == 'largest': - reqs = [OffsetRequest(self.topic, partition, LATEST, 1)] + reqs = [OffsetRequestPayload(self.topic, partition, LATEST, 1)] elif self.auto_offset_reset == 'smallest': - reqs = [OffsetRequest(self.topic, partition, EARLIEST, 1)] + reqs = [OffsetRequestPayload(self.topic, partition, EARLIEST, 1)] else: # Let's raise an reasonable exception type if user calls # outside of an exception context @@ -224,23 +224,17 @@ class SimpleConsumer(Consumer): for tmp_partition in self.offsets.keys(): if whence == 0: - reqs.append(OffsetRequest(self.topic, - tmp_partition, - -2, - 1)) + reqs.append(OffsetRequestPayload(self.topic, tmp_partition, -2, 1)) elif whence == 2: - reqs.append(OffsetRequest(self.topic, - tmp_partition, - -1, - 1)) + reqs.append(OffsetRequestPayload(self.topic, tmp_partition, -1, 1)) else: pass else: deltas[partition] = offset if whence == 0: - reqs.append(OffsetRequest(self.topic, partition, -2, 1)) + reqs.append(OffsetRequestPayload(self.topic, partition, -2, 1)) elif whence == 2: - reqs.append(OffsetRequest(self.topic, partition, -1, 1)) + reqs.append(OffsetRequestPayload(self.topic, partition, -1, 1)) else: pass @@ -370,9 +364,9 @@ class SimpleConsumer(Consumer): while partitions: requests = [] for partition, buffer_size in six.iteritems(partitions): - requests.append(FetchRequest(self.topic, partition, - self.fetch_offsets[partition], - buffer_size)) + requests.append(FetchRequestPayload(self.topic, partition, + self.fetch_offsets[partition], + buffer_size)) # Send request responses = self.client.send_fetch_request( requests, diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 39b1f84..3f2bba6 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -15,7 +15,7 @@ from threading import Thread, Event import six from kafka.common import ( - ProduceRequest, ProduceResponse, TopicAndPartition, RetryOptions, + ProduceRequestPayload, ProduceResponsePayload, TopicAndPartition, RetryOptions, kafka_errors, UnsupportedCodecError, FailedPayloadsError, RequestTimedOutError, AsyncProducerQueueFull, UnknownError, RETRY_ERROR_TYPES, RETRY_BACKOFF_ERROR_TYPES, RETRY_REFRESH_ERROR_TYPES @@ -133,9 +133,10 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, # Send collected requests upstream for topic_partition, msg in msgset.items(): messages = create_message_set(msg, codec, key, codec_compresslevel) - req = ProduceRequest(topic_partition.topic, - topic_partition.partition, - tuple(messages)) + req = ProduceRequestPayload( + topic_partition.topic, + topic_partition.partition, + tuple(messages)) request_tries[req] = 0 if not request_tries: @@ -169,13 +170,13 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, error_cls = response.__class__ orig_req = response.payload - elif isinstance(response, ProduceResponse) and response.error: + elif isinstance(response, ProduceResponsePayload) and response.error: error_cls = kafka_errors.get(response.error, UnknownError) orig_req = requests[i] if error_cls: _handle_error(error_cls, orig_req) - log.error('%s sending ProduceRequest (#%d of %d) ' + log.error('%s sending ProduceRequestPayload (#%d of %d) ' 'to %s:%d with msgs %s', error_cls.__name__, (i + 1), len(requests), orig_req.topic, orig_req.partition, @@ -210,7 +211,7 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, # Log messages we are going to retry for orig_req in request_tries.keys(): - log.info('Retrying ProduceRequest to %s:%d with msgs %s', + log.info('Retrying ProduceRequestPayload to %s:%d with msgs %s', orig_req.topic, orig_req.partition, orig_req.messages if log_messages_on_error else hash(orig_req.messages)) @@ -404,7 +405,7 @@ class Producer(object): resp = [] else: messages = create_message_set([(m, key) for m in msg], self.codec, key, self.codec_compresslevel) - req = ProduceRequest(topic, partition, messages) + req = ProduceRequestPayload(topic, partition, messages) try: resp = self.client.send_produce_request( [req], acks=self.req_acks, timeout=self.ack_timeout, diff --git a/test/test_client_integration.py b/test/test_client_integration.py index 8853350..70da4a3 100644 --- a/test/test_client_integration.py +++ b/test/test_client_integration.py @@ -1,8 +1,8 @@ import os from kafka.common import ( - FetchRequest, OffsetCommitRequest, OffsetFetchRequest, - KafkaTimeoutError, ProduceRequest + FetchRequestPayload, OffsetCommitRequest, OffsetFetchRequest, + KafkaTimeoutError, ProduceRequestPayload ) from kafka.protocol import create_message @@ -29,7 +29,7 @@ class TestKafkaClientIntegration(KafkaIntegrationTestCase): @kafka_versions("all") def test_consume_none(self): - fetch = FetchRequest(self.bytes_topic, 0, 0, 1024) + fetch = FetchRequestPayload(self.bytes_topic, 0, 0, 1024) fetch_resp, = self.client.send_fetch_request([fetch]) self.assertEqual(fetch_resp.error, 0) @@ -57,16 +57,16 @@ class TestKafkaClientIntegration(KafkaIntegrationTestCase): self.client.ensure_topic_exists(b'bar') requests = [ - ProduceRequest( + ProduceRequestPayload( b'foo', 0, [create_message(b'a'), create_message(b'b')]), - ProduceRequest( + ProduceRequestPayload( b'bar', 1, [create_message(b'a'), create_message(b'b')]), - ProduceRequest( + ProduceRequestPayload( b'foo', 1, [create_message(b'a'), create_message(b'b')]), - ProduceRequest( + ProduceRequestPayload( b'bar', 0, [create_message(b'a'), create_message(b'b')]), ] diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index fee53f5..d536537 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -7,7 +7,7 @@ from kafka import ( KafkaConsumer, MultiProcessConsumer, SimpleConsumer, create_message ) from kafka.common import ( - ProduceRequest, ConsumerFetchSizeTooSmall, ConsumerTimeout, + ProduceRequestPayload, ConsumerFetchSizeTooSmall, ConsumerTimeout, OffsetOutOfRangeError ) from kafka.consumer.base import MAX_FETCH_BUFFER_SIZE_BYTES @@ -41,7 +41,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): def send_messages(self, partition, messages): messages = [ create_message(self.msg(str(msg))) for msg in messages ] - produce = ProduceRequest(self.bytes_topic, partition, messages = messages) + produce = ProduceRequestPayload(self.bytes_topic, partition, messages = messages) resp, = self.client.send_produce_request([produce]) self.assertEqual(resp.error, 0) diff --git a/test/test_producer.py b/test/test_producer.py index 31282bf..cbc1773 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -10,7 +10,7 @@ from . import unittest from kafka import KafkaClient, SimpleProducer, KeyedProducer from kafka.common import ( AsyncProducerQueueFull, FailedPayloadsError, NotLeaderForPartitionError, - ProduceResponse, RetryOptions, TopicAndPartition + ProduceResponsePayload, RetryOptions, TopicAndPartition ) from kafka.producer.base import Producer, _send_upstream from kafka.protocol import CODEC_NONE @@ -186,7 +186,7 @@ class TestKafkaProducerSendUpstream(unittest.TestCase): offset = offsets[req.topic][req.partition] offsets[req.topic][req.partition] += len(req.messages) responses.append( - ProduceResponse(req.topic, req.partition, 0, offset) + ProduceResponsePayload(req.topic, req.partition, 0, offset) ) return responses @@ -234,8 +234,8 @@ class TestKafkaProducerSendUpstream(unittest.TestCase): def send_side_effect(reqs, *args, **kwargs): if self.client.is_first_time: self.client.is_first_time = False - return [ProduceResponse(req.topic, req.partition, - NotLeaderForPartitionError.errno, -1) + return [ProduceResponsePayload(req.topic, req.partition, + NotLeaderForPartitionError.errno, -1) for req in reqs] responses = [] @@ -243,7 +243,7 @@ class TestKafkaProducerSendUpstream(unittest.TestCase): offset = offsets[req.topic][req.partition] offsets[req.topic][req.partition] += len(req.messages) responses.append( - ProduceResponse(req.topic, req.partition, 0, offset) + ProduceResponsePayload(req.topic, req.partition, 0, offset) ) return responses diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py index c99ed63..ee0b2fd 100644 --- a/test/test_producer_integration.py +++ b/test/test_producer_integration.py @@ -11,7 +11,7 @@ from kafka import ( ) from kafka.codec import has_snappy from kafka.common import ( - FetchRequest, ProduceRequest, + FetchRequestPayload, ProduceRequestPayload, UnknownTopicOrPartitionError, LeaderNotAvailableError ) from kafka.producer.base import Producer @@ -488,7 +488,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): def assert_produce_request(self, messages, initial_offset, message_ct, partition=0): - produce = ProduceRequest(self.bytes_topic, partition, messages=messages) + produce = ProduceRequestPayload(self.bytes_topic, partition, messages=messages) # There should only be one response message from the server. # This will throw an exception if there's more than one. @@ -506,7 +506,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): # There should only be one response message from the server. # This will throw an exception if there's more than one. - resp, = self.client.send_fetch_request([ FetchRequest(self.bytes_topic, partition, start_offset, 1024) ]) + resp, = self.client.send_fetch_request([FetchRequestPayload(self.bytes_topic, partition, start_offset, 1024)]) self.assertEqual(resp.error, 0) self.assertEqual(resp.partition, partition) diff --git a/test/testutil.py b/test/testutil.py index 3a1d2ba..b5b2529 100644 --- a/test/testutil.py +++ b/test/testutil.py @@ -11,7 +11,7 @@ from six.moves import xrange from . import unittest from kafka import KafkaClient -from kafka.common import OffsetRequest +from kafka.common import OffsetRequestPayload from kafka.util import kafka_bytestring __all__ = [ @@ -81,7 +81,7 @@ class KafkaIntegrationTestCase(unittest.TestCase): def current_offset(self, topic, partition): try: - offsets, = self.client.send_offset_request([ OffsetRequest(kafka_bytestring(topic), partition, -1, 1) ]) + offsets, = self.client.send_offset_request([OffsetRequestPayload(kafka_bytestring(topic), partition, -1, 1)]) except: # XXX: We've seen some UnknownErrors here and cant debug w/o server logs self.zk.child.dump_logs() |