diff options
author | Dana Powers <dana.powers@rd.io> | 2015-12-09 15:37:17 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2015-12-09 15:44:15 -0800 |
commit | a3ec9bd8e8c730c9f6715b536c0c590230fc2e28 (patch) | |
tree | eaebf6dc87ffb83d7256497355c5559f5eec5d72 /kafka/consumer | |
parent | ad030ccd4df57305bb624b03eddaa2641f956160 (diff) | |
download | kafka-python-a3ec9bd8e8c730c9f6715b536c0c590230fc2e28.tar.gz |
Update references to kafka.common Request/Response (now Payload)
Diffstat (limited to 'kafka/consumer')
-rw-r--r-- | kafka/consumer/base.py | 4 | ||||
-rw-r--r-- | kafka/consumer/kafka.py | 11 | ||||
-rw-r--r-- | kafka/consumer/simple.py | 26 |
3 files changed, 18 insertions, 23 deletions
diff --git a/kafka/consumer/base.py b/kafka/consumer/base.py index c9f6e48..034d35c 100644 --- a/kafka/consumer/base.py +++ b/kafka/consumer/base.py @@ -7,7 +7,7 @@ from threading import Lock import kafka.common from kafka.common import ( - OffsetRequest, OffsetCommitRequest, OffsetFetchRequest, + OffsetRequestPayload, OffsetCommitRequest, OffsetFetchRequest, UnknownTopicOrPartitionError, check_error, KafkaError ) @@ -217,7 +217,7 @@ class Consumer(object): reqs = [] for partition in partitions: - reqs.append(OffsetRequest(self.topic, partition, -1, 1)) + reqs.append(OffsetRequestPayload(self.topic, partition, -1, 1)) resps = self.client.send_offset_request(reqs) for resp in resps: diff --git a/kafka/consumer/kafka.py b/kafka/consumer/kafka.py index 3ef106c..1bd3def 100644 --- a/kafka/consumer/kafka.py +++ b/kafka/consumer/kafka.py @@ -11,7 +11,8 @@ import six from kafka.client import KafkaClient from kafka.common import ( - OffsetFetchRequest, OffsetCommitRequest, OffsetRequest, FetchRequest, + OffsetFetchRequest, OffsetCommitRequest, + OffsetRequestPayload, FetchRequestPayload, check_error, NotLeaderForPartitionError, UnknownTopicOrPartitionError, OffsetOutOfRangeError, RequestTimedOutError, KafkaMessage, ConsumerTimeout, FailedPayloadsError, KafkaUnavailableError, KafkaConfigurationError @@ -333,9 +334,9 @@ class KafkaConsumer(object): 'No fetch offsets found when calling fetch_messages' ) - fetches = [FetchRequest(topic, partition, - self._offsets.fetch[(topic, partition)], - max_bytes) + fetches = [FetchRequestPayload(topic, partition, + self._offsets.fetch[(topic, partition)], + max_bytes) for (topic, partition) in self._topics] # send_fetch_request will batch topic/partition requests by leader @@ -425,7 +426,7 @@ class KafkaConsumer(object): topic / partition. See: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetAPI """ - reqs = [OffsetRequest(topic, partition, request_time_ms, max_num_offsets)] + reqs = [OffsetRequestPayload(topic, partition, request_time_ms, max_num_offsets)] (resp,) = self._client.send_offset_request(reqs) diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py index 7c63246..1c2aee6 100644 --- a/kafka/consumer/simple.py +++ b/kafka/consumer/simple.py @@ -27,7 +27,7 @@ from .base import ( NO_MESSAGES_WAIT_TIME_SECONDS ) from ..common import ( - FetchRequest, KafkaError, OffsetRequest, + FetchRequestPayload, KafkaError, OffsetRequestPayload, ConsumerFetchSizeTooSmall, ConsumerNoMoreData, UnknownTopicOrPartitionError, NotLeaderForPartitionError, OffsetOutOfRangeError, FailedPayloadsError, check_error @@ -153,9 +153,9 @@ class SimpleConsumer(Consumer): LATEST = -1 EARLIEST = -2 if self.auto_offset_reset == 'largest': - reqs = [OffsetRequest(self.topic, partition, LATEST, 1)] + reqs = [OffsetRequestPayload(self.topic, partition, LATEST, 1)] elif self.auto_offset_reset == 'smallest': - reqs = [OffsetRequest(self.topic, partition, EARLIEST, 1)] + reqs = [OffsetRequestPayload(self.topic, partition, EARLIEST, 1)] else: # Let's raise an reasonable exception type if user calls # outside of an exception context @@ -224,23 +224,17 @@ class SimpleConsumer(Consumer): for tmp_partition in self.offsets.keys(): if whence == 0: - reqs.append(OffsetRequest(self.topic, - tmp_partition, - -2, - 1)) + reqs.append(OffsetRequestPayload(self.topic, tmp_partition, -2, 1)) elif whence == 2: - reqs.append(OffsetRequest(self.topic, - tmp_partition, - -1, - 1)) + reqs.append(OffsetRequestPayload(self.topic, tmp_partition, -1, 1)) else: pass else: deltas[partition] = offset if whence == 0: - reqs.append(OffsetRequest(self.topic, partition, -2, 1)) + reqs.append(OffsetRequestPayload(self.topic, partition, -2, 1)) elif whence == 2: - reqs.append(OffsetRequest(self.topic, partition, -1, 1)) + reqs.append(OffsetRequestPayload(self.topic, partition, -1, 1)) else: pass @@ -370,9 +364,9 @@ class SimpleConsumer(Consumer): while partitions: requests = [] for partition, buffer_size in six.iteritems(partitions): - requests.append(FetchRequest(self.topic, partition, - self.fetch_offsets[partition], - buffer_size)) + requests.append(FetchRequestPayload(self.topic, partition, + self.fetch_offsets[partition], + buffer_size)) # Send request responses = self.client.send_fetch_request( requests, |