diff options
Diffstat (limited to 'kafka/consumer/fetcher.py')
-rw-r--r-- | kafka/consumer/fetcher.py | 30 |
1 files changed, 23 insertions, 7 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index d3ee26e..ddd7567 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -23,6 +23,10 @@ from kafka.structs import TopicPartition, OffsetAndTimestamp log = logging.getLogger(__name__) +# Isolation levels +READ_UNCOMMITTED = 0 +READ_COMMITTED = 1 + ConsumerRecord = collections.namedtuple("ConsumerRecord", ["topic", "partition", "offset", "timestamp", "timestamp_type", "key", "value", "checksum", "serialized_key_size", "serialized_value_size"]) @@ -114,6 +118,7 @@ class Fetcher(six.Iterator): self._iterator = None self._fetch_futures = collections.deque() self._sensors = FetchManagerMetrics(metrics, self.config['metric_group_prefix']) + self._isolation_level = READ_UNCOMMITTED def send_fetches(self): """Send FetchRequests for all assigned partitions that do not already have @@ -670,7 +675,9 @@ class Fetcher(six.Iterator): log.debug("Adding fetch request for partition %s at offset %d", partition, position) - if self.config['api_version'] >= (0, 10, 1): + if self.config['api_version'] >= (0, 11, 0): + version = 4 + elif self.config['api_version'] >= (0, 10, 1): version = 3 elif self.config['api_version'] >= (0, 10): version = 2 @@ -696,12 +703,21 @@ class Fetcher(six.Iterator): # dicts retain insert order. partition_data = list(partition_data.items()) random.shuffle(partition_data) - requests[node_id] = FetchRequest[version]( - -1, # replica_id - self.config['fetch_max_wait_ms'], - self.config['fetch_min_bytes'], - self.config['fetch_max_bytes'], - partition_data) + if version == 3: + requests[node_id] = FetchRequest[version]( + -1, # replica_id + self.config['fetch_max_wait_ms'], + self.config['fetch_min_bytes'], + self.config['fetch_max_bytes'], + partition_data) + else: + requests[node_id] = FetchRequest[version]( + -1, # replica_id + self.config['fetch_max_wait_ms'], + self.config['fetch_min_bytes'], + self.config['fetch_max_bytes'], + self._isolation_level, + partition_data) return requests def _handle_fetch_response(self, request, send_time, response): |