diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-04-05 13:04:24 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-04-06 14:48:13 -0700 |
commit | 3d16f2ff5f75380c8a9fce846f35e92bb5bfb935 (patch) | |
tree | 169a0e740992d6bff7b4e46dbf047d14429b5d82 /kafka/consumer/fetcher.py | |
parent | 331442ee0fcc0d888c2b2d2ed4f2a339d167b4a2 (diff) | |
download | kafka-python-kafka-2136.tar.gz |
KAFKA-2136: support Fetch and Produce v1 (throttle_time_ms)kafka-2136
Diffstat (limited to 'kafka/consumer/fetcher.py')
-rw-r--r-- | kafka/consumer/fetcher.py | 6 |
1 files changed, 4 insertions, 2 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 8ce573b..1f0619b 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -37,6 +37,7 @@ class Fetcher(six.Iterator): 'max_partition_fetch_bytes': 1048576, 'check_crcs': True, 'iterator_refetch_records': 1, # undocumented -- interface may change + 'api_version': (0, 8, 0), } def __init__(self, client, subscriptions, **configs): @@ -531,7 +532,7 @@ class Fetcher(six.Iterator): FetchRequests skipped if no leader, or node has requests in flight Returns: - dict: {node_id: FetchRequest, ...} + dict: {node_id: FetchRequest, ...} (version depends on api_version) """ # create the fetch info as a dict of lists of partition info tuples # which can be passed to FetchRequest() via .items() @@ -564,9 +565,10 @@ class Fetcher(six.Iterator): log.debug("Adding fetch request for partition %s at offset %d", partition, position) + version = 1 if self.config['api_version'] >= (0, 9) else 0 requests = {} for node_id, partition_data in six.iteritems(fetchable): - requests[node_id] = FetchRequest[0]( + requests[node_id] = FetchRequest[version]( -1, # replica_id self.config['fetch_max_wait_ms'], self.config['fetch_min_bytes'], |