summaryrefslogtreecommitdiff
path: root/kafka/protocol/legacy.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/protocol/legacy.py')
-rw-r--r--kafka/protocol/legacy.py16
1 files changed, 8 insertions, 8 deletions
diff --git a/kafka/protocol/legacy.py b/kafka/protocol/legacy.py
index e4745f1..2eddf3b 100644
--- a/kafka/protocol/legacy.py
+++ b/kafka/protocol/legacy.py
@@ -136,7 +136,7 @@ class KafkaProtocol(object):
if acks not in (1, 0, -1):
raise ValueError('ProduceRequest acks (%s) must be 1, 0, -1' % acks)
- return kafka.protocol.produce.ProduceRequest(
+ return kafka.protocol.produce.ProduceRequest[0](
required_acks=acks,
timeout=timeout,
topics=[(
@@ -180,7 +180,7 @@ class KafkaProtocol(object):
Return: FetchRequest
"""
- return kafka.protocol.fetch.FetchRequest(
+ return kafka.protocol.fetch.FetchRequest[0](
replica_id=-1,
max_wait_time=max_wait_time,
min_bytes=min_bytes,
@@ -212,7 +212,7 @@ class KafkaProtocol(object):
@classmethod
def encode_offset_request(cls, payloads=()):
- return kafka.protocol.offset.OffsetRequest(
+ return kafka.protocol.offset.OffsetRequest[0](
replica_id=-1,
topics=[(
topic,
@@ -250,7 +250,7 @@ class KafkaProtocol(object):
if payloads is not None:
topics = payloads
- return kafka.protocol.metadata.MetadataRequest(topics)
+ return kafka.protocol.metadata.MetadataRequest[0](topics)
@classmethod
def decode_metadata_response(cls, response):
@@ -297,7 +297,7 @@ class KafkaProtocol(object):
group: string, the consumer group you are committing offsets for
payloads: list of OffsetCommitRequestPayload
"""
- return kafka.protocol.commit.OffsetCommitRequest_v0(
+ return kafka.protocol.commit.OffsetCommitRequest[0](
consumer_group=group,
topics=[(
topic,
@@ -337,11 +337,11 @@ class KafkaProtocol(object):
from_kafka: bool, default False, set True for Kafka-committed offsets
"""
if from_kafka:
- request_class = kafka.protocol.commit.OffsetFetchRequest_v1
+ version = 1
else:
- request_class = kafka.protocol.commit.OffsetFetchRequest_v0
+ version = 0
- return request_class(
+ return kafka.protocol.commit.OffsetFetchRequest[version](
consumer_group=group,
topics=[(
topic,