summaryrefslogtreecommitdiff
path: root/kafka/consumer/fetcher.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-04-05 13:04:24 -0700
committerDana Powers <dana.powers@gmail.com>2016-04-06 14:48:13 -0700
commit3d16f2ff5f75380c8a9fce846f35e92bb5bfb935 (patch)
tree169a0e740992d6bff7b4e46dbf047d14429b5d82 /kafka/consumer/fetcher.py
parent331442ee0fcc0d888c2b2d2ed4f2a339d167b4a2 (diff)
downloadkafka-python-kafka-2136.tar.gz
KAFKA-2136: support Fetch and Produce v1 (throttle_time_ms)kafka-2136
Diffstat (limited to 'kafka/consumer/fetcher.py')
-rw-r--r--kafka/consumer/fetcher.py6
1 files changed, 4 insertions, 2 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index 8ce573b..1f0619b 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -37,6 +37,7 @@ class Fetcher(six.Iterator):
'max_partition_fetch_bytes': 1048576,
'check_crcs': True,
'iterator_refetch_records': 1, # undocumented -- interface may change
+ 'api_version': (0, 8, 0),
}
def __init__(self, client, subscriptions, **configs):
@@ -531,7 +532,7 @@ class Fetcher(six.Iterator):
FetchRequests skipped if no leader, or node has requests in flight
Returns:
- dict: {node_id: FetchRequest, ...}
+ dict: {node_id: FetchRequest, ...} (version depends on api_version)
"""
# create the fetch info as a dict of lists of partition info tuples
# which can be passed to FetchRequest() via .items()
@@ -564,9 +565,10 @@ class Fetcher(six.Iterator):
log.debug("Adding fetch request for partition %s at offset %d",
partition, position)
+ version = 1 if self.config['api_version'] >= (0, 9) else 0
requests = {}
for node_id, partition_data in six.iteritems(fetchable):
- requests[node_id] = FetchRequest[0](
+ requests[node_id] = FetchRequest[version](
-1, # replica_id
self.config['fetch_max_wait_ms'],
self.config['fetch_min_bytes'],