diff options
Diffstat (limited to 'kafka/consumer/simple.py')
-rw-r--r-- | kafka/consumer/simple.py | 26 |
1 files changed, 10 insertions, 16 deletions
diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py index 7c63246..1c2aee6 100644 --- a/kafka/consumer/simple.py +++ b/kafka/consumer/simple.py @@ -27,7 +27,7 @@ from .base import ( NO_MESSAGES_WAIT_TIME_SECONDS ) from ..common import ( - FetchRequest, KafkaError, OffsetRequest, + FetchRequestPayload, KafkaError, OffsetRequestPayload, ConsumerFetchSizeTooSmall, ConsumerNoMoreData, UnknownTopicOrPartitionError, NotLeaderForPartitionError, OffsetOutOfRangeError, FailedPayloadsError, check_error @@ -153,9 +153,9 @@ class SimpleConsumer(Consumer): LATEST = -1 EARLIEST = -2 if self.auto_offset_reset == 'largest': - reqs = [OffsetRequest(self.topic, partition, LATEST, 1)] + reqs = [OffsetRequestPayload(self.topic, partition, LATEST, 1)] elif self.auto_offset_reset == 'smallest': - reqs = [OffsetRequest(self.topic, partition, EARLIEST, 1)] + reqs = [OffsetRequestPayload(self.topic, partition, EARLIEST, 1)] else: # Let's raise an reasonable exception type if user calls # outside of an exception context @@ -224,23 +224,17 @@ class SimpleConsumer(Consumer): for tmp_partition in self.offsets.keys(): if whence == 0: - reqs.append(OffsetRequest(self.topic, - tmp_partition, - -2, - 1)) + reqs.append(OffsetRequestPayload(self.topic, tmp_partition, -2, 1)) elif whence == 2: - reqs.append(OffsetRequest(self.topic, - tmp_partition, - -1, - 1)) + reqs.append(OffsetRequestPayload(self.topic, tmp_partition, -1, 1)) else: pass else: deltas[partition] = offset if whence == 0: - reqs.append(OffsetRequest(self.topic, partition, -2, 1)) + reqs.append(OffsetRequestPayload(self.topic, partition, -2, 1)) elif whence == 2: - reqs.append(OffsetRequest(self.topic, partition, -1, 1)) + reqs.append(OffsetRequestPayload(self.topic, partition, -1, 1)) else: pass @@ -370,9 +364,9 @@ class SimpleConsumer(Consumer): while partitions: requests = [] for partition, buffer_size in six.iteritems(partitions): - requests.append(FetchRequest(self.topic, partition, - self.fetch_offsets[partition], - buffer_size)) + requests.append(FetchRequestPayload(self.topic, partition, + self.fetch_offsets[partition], + buffer_size)) # Send request responses = self.client.send_fetch_request( requests, |