diff options
Diffstat (limited to 'kafka/coordinator/consumer.py')
-rw-r--r-- | kafka/coordinator/consumer.py | 16 |
1 files changed, 7 insertions, 9 deletions
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index 3ce7570..cd3d48a 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -14,9 +14,7 @@ from .assignors.roundrobin import RoundRobinPartitionAssignor from .protocol import ConsumerProtocol from .. import errors as Errors from ..future import Future -from ..protocol.commit import ( - OffsetCommitRequest_v2, OffsetCommitRequest_v1, OffsetCommitRequest_v0, - OffsetFetchRequest_v0, OffsetFetchRequest_v1) +from ..protocol.commit import OffsetCommitRequest, OffsetFetchRequest from ..structs import OffsetAndMetadata, TopicPartition from ..util import WeakMethod @@ -430,11 +428,11 @@ class ConsumerCoordinator(BaseCoordinator): offset_data[tp.topic][tp.partition] = offset if self.config['api_version'] >= (0, 9): - request = OffsetCommitRequest_v2( + request = OffsetCommitRequest[2]( self.group_id, self.generation, self.member_id, - OffsetCommitRequest_v2.DEFAULT_RETENTION_TIME, + OffsetCommitRequest[2].DEFAULT_RETENTION_TIME, [( topic, [( partition, @@ -444,7 +442,7 @@ class ConsumerCoordinator(BaseCoordinator): ) for topic, partitions in six.iteritems(offset_data)] ) elif self.config['api_version'] >= (0, 8, 2): - request = OffsetCommitRequest_v1( + request = OffsetCommitRequest[1]( self.group_id, -1, '', [( topic, [( @@ -456,7 +454,7 @@ class ConsumerCoordinator(BaseCoordinator): ) for topic, partitions in six.iteritems(offset_data)] ) elif self.config['api_version'] >= (0, 8, 1): - request = OffsetCommitRequest_v0( + request = OffsetCommitRequest[0]( self.group_id, [( topic, [( @@ -593,12 +591,12 @@ class ConsumerCoordinator(BaseCoordinator): topic_partitions[tp.topic].add(tp.partition) if self.config['api_version'] >= (0, 8, 2): - request = OffsetFetchRequest_v1( + request = OffsetFetchRequest[1]( self.group_id, list(topic_partitions.items()) ) else: - request = OffsetFetchRequest_v0( + request = OffsetFetchRequest[0]( self.group_id, list(topic_partitions.items()) ) |