diff options
author | Taras Voinarovskiy <voyn1991@gmail.com> | 2017-08-25 09:38:27 +0000 |
---|---|---|
committer | Taras Voinarovskiy <voyn1991@gmail.com> | 2017-10-24 15:12:11 +0000 |
commit | 3af66bc542efff3f7010bec18b72d844e09488c4 (patch) | |
tree | a20623631b36230c9425b08ee95b85afbc9a9455 /kafka/consumer/fetcher.py | |
parent | e06af5343a55cf8d03e32a645ee970d872cb9ba0 (diff) | |
download | kafka-python-v2_records.tar.gz |
Add DefaultRecordBatch implementation aka V2 message format parser/builder.v2_records
Added bytecode optimization for varint and append/read_msg functions. Mostly based on avoiding LOAD_GLOBAL calls.
Diffstat (limited to 'kafka/consumer/fetcher.py')
-rw-r--r-- | kafka/consumer/fetcher.py | 30 |
1 files changed, 23 insertions, 7 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index d3ee26e..ddd7567 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -23,6 +23,10 @@ from kafka.structs import TopicPartition, OffsetAndTimestamp log = logging.getLogger(__name__) +# Isolation levels +READ_UNCOMMITTED = 0 +READ_COMMITTED = 1 + ConsumerRecord = collections.namedtuple("ConsumerRecord", ["topic", "partition", "offset", "timestamp", "timestamp_type", "key", "value", "checksum", "serialized_key_size", "serialized_value_size"]) @@ -114,6 +118,7 @@ class Fetcher(six.Iterator): self._iterator = None self._fetch_futures = collections.deque() self._sensors = FetchManagerMetrics(metrics, self.config['metric_group_prefix']) + self._isolation_level = READ_UNCOMMITTED def send_fetches(self): """Send FetchRequests for all assigned partitions that do not already have @@ -670,7 +675,9 @@ class Fetcher(six.Iterator): log.debug("Adding fetch request for partition %s at offset %d", partition, position) - if self.config['api_version'] >= (0, 10, 1): + if self.config['api_version'] >= (0, 11, 0): + version = 4 + elif self.config['api_version'] >= (0, 10, 1): version = 3 elif self.config['api_version'] >= (0, 10): version = 2 @@ -696,12 +703,21 @@ class Fetcher(six.Iterator): # dicts retain insert order. partition_data = list(partition_data.items()) random.shuffle(partition_data) - requests[node_id] = FetchRequest[version]( - -1, # replica_id - self.config['fetch_max_wait_ms'], - self.config['fetch_min_bytes'], - self.config['fetch_max_bytes'], - partition_data) + if version == 3: + requests[node_id] = FetchRequest[version]( + -1, # replica_id + self.config['fetch_max_wait_ms'], + self.config['fetch_min_bytes'], + self.config['fetch_max_bytes'], + partition_data) + else: + requests[node_id] = FetchRequest[version]( + -1, # replica_id + self.config['fetch_max_wait_ms'], + self.config['fetch_min_bytes'], + self.config['fetch_max_bytes'], + self._isolation_level, + partition_data) return requests def _handle_fetch_response(self, request, send_time, response): |