diff options
Diffstat (limited to 'kafka/consumer')
-rw-r--r-- | kafka/consumer/base.py | 6 | ||||
-rw-r--r-- | kafka/consumer/kafka.py | 6 |
2 files changed, 6 insertions, 6 deletions
diff --git a/kafka/consumer/base.py b/kafka/consumer/base.py index 034d35c..5859d36 100644 --- a/kafka/consumer/base.py +++ b/kafka/consumer/base.py @@ -7,7 +7,7 @@ from threading import Lock import kafka.common from kafka.common import ( - OffsetRequestPayload, OffsetCommitRequest, OffsetFetchRequest, + OffsetRequestPayload, OffsetCommitRequestPayload, OffsetFetchRequestPayload, UnknownTopicOrPartitionError, check_error, KafkaError ) @@ -101,7 +101,7 @@ class Consumer(object): responses = self.client.send_offset_fetch_request( self.group, - [OffsetFetchRequest(self.topic, p) for p in partitions], + [OffsetFetchRequestPayload(self.topic, p) for p in partitions], fail_on_error=False ) @@ -155,7 +155,7 @@ class Consumer(object): 'group=%s, topic=%s, partition=%s', offset, self.group, self.topic, partition) - reqs.append(OffsetCommitRequest(self.topic, partition, + reqs.append(OffsetCommitRequestPayload(self.topic, partition, offset, None)) try: diff --git a/kafka/consumer/kafka.py b/kafka/consumer/kafka.py index 1bd3def..fa70124 100644 --- a/kafka/consumer/kafka.py +++ b/kafka/consumer/kafka.py @@ -11,7 +11,7 @@ import six from kafka.client import KafkaClient from kafka.common import ( - OffsetFetchRequest, OffsetCommitRequest, + OffsetFetchRequestPayload, OffsetCommitRequestPayload, OffsetRequestPayload, FetchRequestPayload, check_error, NotLeaderForPartitionError, UnknownTopicOrPartitionError, OffsetOutOfRangeError, RequestTimedOutError, KafkaMessage, ConsumerTimeout, @@ -546,7 +546,7 @@ class KafkaConsumer(object): continue commits.append( - OffsetCommitRequest(topic_partition[0], topic_partition[1], + OffsetCommitRequestPayload(topic_partition[0], topic_partition[1], commit_offset, metadata) ) @@ -618,7 +618,7 @@ class KafkaConsumer(object): for topic_partition in self._topics: (resp,) = self._client.send_offset_fetch_request( kafka_bytestring(self._config['group_id']), - [OffsetFetchRequest(topic_partition[0], topic_partition[1])], + [OffsetFetchRequestPayload(topic_partition[0], topic_partition[1])], fail_on_error=False) try: check_error(resp) |