diff options
Diffstat (limited to 'kafka/consumer/kafka.py')
-rw-r--r-- | kafka/consumer/kafka.py | 11 |
1 files changed, 6 insertions, 5 deletions
diff --git a/kafka/consumer/kafka.py b/kafka/consumer/kafka.py index 3ef106c..1bd3def 100644 --- a/kafka/consumer/kafka.py +++ b/kafka/consumer/kafka.py @@ -11,7 +11,8 @@ import six from kafka.client import KafkaClient from kafka.common import ( - OffsetFetchRequest, OffsetCommitRequest, OffsetRequest, FetchRequest, + OffsetFetchRequest, OffsetCommitRequest, + OffsetRequestPayload, FetchRequestPayload, check_error, NotLeaderForPartitionError, UnknownTopicOrPartitionError, OffsetOutOfRangeError, RequestTimedOutError, KafkaMessage, ConsumerTimeout, FailedPayloadsError, KafkaUnavailableError, KafkaConfigurationError @@ -333,9 +334,9 @@ class KafkaConsumer(object): 'No fetch offsets found when calling fetch_messages' ) - fetches = [FetchRequest(topic, partition, - self._offsets.fetch[(topic, partition)], - max_bytes) + fetches = [FetchRequestPayload(topic, partition, + self._offsets.fetch[(topic, partition)], + max_bytes) for (topic, partition) in self._topics] # send_fetch_request will batch topic/partition requests by leader @@ -425,7 +426,7 @@ class KafkaConsumer(object): topic / partition. See: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetAPI """ - reqs = [OffsetRequest(topic, partition, request_time_ms, max_num_offsets)] + reqs = [OffsetRequestPayload(topic, partition, request_time_ms, max_num_offsets)] (resp,) = self._client.send_offset_request(reqs) |