summaryrefslogtreecommitdiff
path: root/kafka/consumer/simple.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/consumer/simple.py')
-rw-r--r--kafka/consumer/simple.py26
1 files changed, 10 insertions, 16 deletions
diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py
index 7c63246..1c2aee6 100644
--- a/kafka/consumer/simple.py
+++ b/kafka/consumer/simple.py
@@ -27,7 +27,7 @@ from .base import (
NO_MESSAGES_WAIT_TIME_SECONDS
)
from ..common import (
- FetchRequest, KafkaError, OffsetRequest,
+ FetchRequestPayload, KafkaError, OffsetRequestPayload,
ConsumerFetchSizeTooSmall, ConsumerNoMoreData,
UnknownTopicOrPartitionError, NotLeaderForPartitionError,
OffsetOutOfRangeError, FailedPayloadsError, check_error
@@ -153,9 +153,9 @@ class SimpleConsumer(Consumer):
LATEST = -1
EARLIEST = -2
if self.auto_offset_reset == 'largest':
- reqs = [OffsetRequest(self.topic, partition, LATEST, 1)]
+ reqs = [OffsetRequestPayload(self.topic, partition, LATEST, 1)]
elif self.auto_offset_reset == 'smallest':
- reqs = [OffsetRequest(self.topic, partition, EARLIEST, 1)]
+ reqs = [OffsetRequestPayload(self.topic, partition, EARLIEST, 1)]
else:
# Let's raise an reasonable exception type if user calls
# outside of an exception context
@@ -224,23 +224,17 @@ class SimpleConsumer(Consumer):
for tmp_partition in self.offsets.keys():
if whence == 0:
- reqs.append(OffsetRequest(self.topic,
- tmp_partition,
- -2,
- 1))
+ reqs.append(OffsetRequestPayload(self.topic, tmp_partition, -2, 1))
elif whence == 2:
- reqs.append(OffsetRequest(self.topic,
- tmp_partition,
- -1,
- 1))
+ reqs.append(OffsetRequestPayload(self.topic, tmp_partition, -1, 1))
else:
pass
else:
deltas[partition] = offset
if whence == 0:
- reqs.append(OffsetRequest(self.topic, partition, -2, 1))
+ reqs.append(OffsetRequestPayload(self.topic, partition, -2, 1))
elif whence == 2:
- reqs.append(OffsetRequest(self.topic, partition, -1, 1))
+ reqs.append(OffsetRequestPayload(self.topic, partition, -1, 1))
else:
pass
@@ -370,9 +364,9 @@ class SimpleConsumer(Consumer):
while partitions:
requests = []
for partition, buffer_size in six.iteritems(partitions):
- requests.append(FetchRequest(self.topic, partition,
- self.fetch_offsets[partition],
- buffer_size))
+ requests.append(FetchRequestPayload(self.topic, partition,
+ self.fetch_offsets[partition],
+ buffer_size))
# Send request
responses = self.client.send_fetch_request(
requests,