summaryrefslogtreecommitdiff
path: root/kafka/consumer/fetcher.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/consumer/fetcher.py')
-rw-r--r--kafka/consumer/fetcher.py30
1 files changed, 23 insertions, 7 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index d3ee26e..ddd7567 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -23,6 +23,10 @@ from kafka.structs import TopicPartition, OffsetAndTimestamp
log = logging.getLogger(__name__)
+# Isolation levels
+READ_UNCOMMITTED = 0
+READ_COMMITTED = 1
+
ConsumerRecord = collections.namedtuple("ConsumerRecord",
["topic", "partition", "offset", "timestamp", "timestamp_type",
"key", "value", "checksum", "serialized_key_size", "serialized_value_size"])
@@ -114,6 +118,7 @@ class Fetcher(six.Iterator):
self._iterator = None
self._fetch_futures = collections.deque()
self._sensors = FetchManagerMetrics(metrics, self.config['metric_group_prefix'])
+ self._isolation_level = READ_UNCOMMITTED
def send_fetches(self):
"""Send FetchRequests for all assigned partitions that do not already have
@@ -670,7 +675,9 @@ class Fetcher(six.Iterator):
log.debug("Adding fetch request for partition %s at offset %d",
partition, position)
- if self.config['api_version'] >= (0, 10, 1):
+ if self.config['api_version'] >= (0, 11, 0):
+ version = 4
+ elif self.config['api_version'] >= (0, 10, 1):
version = 3
elif self.config['api_version'] >= (0, 10):
version = 2
@@ -696,12 +703,21 @@ class Fetcher(six.Iterator):
# dicts retain insert order.
partition_data = list(partition_data.items())
random.shuffle(partition_data)
- requests[node_id] = FetchRequest[version](
- -1, # replica_id
- self.config['fetch_max_wait_ms'],
- self.config['fetch_min_bytes'],
- self.config['fetch_max_bytes'],
- partition_data)
+ if version == 3:
+ requests[node_id] = FetchRequest[version](
+ -1, # replica_id
+ self.config['fetch_max_wait_ms'],
+ self.config['fetch_min_bytes'],
+ self.config['fetch_max_bytes'],
+ partition_data)
+ else:
+ requests[node_id] = FetchRequest[version](
+ -1, # replica_id
+ self.config['fetch_max_wait_ms'],
+ self.config['fetch_min_bytes'],
+ self.config['fetch_max_bytes'],
+ self._isolation_level,
+ partition_data)
return requests
def _handle_fetch_response(self, request, send_time, response):