summaryrefslogtreecommitdiff
path: root/kafka/coordinator/consumer.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/coordinator/consumer.py')
-rw-r--r--kafka/coordinator/consumer.py16
1 files changed, 7 insertions, 9 deletions
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py
index 3ce7570..cd3d48a 100644
--- a/kafka/coordinator/consumer.py
+++ b/kafka/coordinator/consumer.py
@@ -14,9 +14,7 @@ from .assignors.roundrobin import RoundRobinPartitionAssignor
from .protocol import ConsumerProtocol
from .. import errors as Errors
from ..future import Future
-from ..protocol.commit import (
- OffsetCommitRequest_v2, OffsetCommitRequest_v1, OffsetCommitRequest_v0,
- OffsetFetchRequest_v0, OffsetFetchRequest_v1)
+from ..protocol.commit import OffsetCommitRequest, OffsetFetchRequest
from ..structs import OffsetAndMetadata, TopicPartition
from ..util import WeakMethod
@@ -430,11 +428,11 @@ class ConsumerCoordinator(BaseCoordinator):
offset_data[tp.topic][tp.partition] = offset
if self.config['api_version'] >= (0, 9):
- request = OffsetCommitRequest_v2(
+ request = OffsetCommitRequest[2](
self.group_id,
self.generation,
self.member_id,
- OffsetCommitRequest_v2.DEFAULT_RETENTION_TIME,
+ OffsetCommitRequest[2].DEFAULT_RETENTION_TIME,
[(
topic, [(
partition,
@@ -444,7 +442,7 @@ class ConsumerCoordinator(BaseCoordinator):
) for topic, partitions in six.iteritems(offset_data)]
)
elif self.config['api_version'] >= (0, 8, 2):
- request = OffsetCommitRequest_v1(
+ request = OffsetCommitRequest[1](
self.group_id, -1, '',
[(
topic, [(
@@ -456,7 +454,7 @@ class ConsumerCoordinator(BaseCoordinator):
) for topic, partitions in six.iteritems(offset_data)]
)
elif self.config['api_version'] >= (0, 8, 1):
- request = OffsetCommitRequest_v0(
+ request = OffsetCommitRequest[0](
self.group_id,
[(
topic, [(
@@ -593,12 +591,12 @@ class ConsumerCoordinator(BaseCoordinator):
topic_partitions[tp.topic].add(tp.partition)
if self.config['api_version'] >= (0, 8, 2):
- request = OffsetFetchRequest_v1(
+ request = OffsetFetchRequest[1](
self.group_id,
list(topic_partitions.items())
)
else:
- request = OffsetFetchRequest_v0(
+ request = OffsetFetchRequest[0](
self.group_id,
list(topic_partitions.items())
)