diff options
author | Dana Powers <dana.powers@rd.io> | 2016-01-03 16:03:30 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2016-01-03 16:03:30 -0800 |
commit | 5c45ec13f3e59d9c398f2d3035c762ca13589885 (patch) | |
tree | fe2ddf4e841aae1a352c91bc229be7be8a3c49db /kafka/coordinator | |
parent | fae1a227b1eb67fda2264d81c36cdbe39b49e057 (diff) | |
download | kafka-python-5c45ec13f3e59d9c398f2d3035c762ca13589885.tar.gz |
Check api_version in ConsumerCoordinator
- Full group support in 0.9
- Kafka-storage offsets w/ GroupCoordinator in 0.8.2
- Zookeeper-storage offsets in 0.8.1
- Assign all partitions locally if < 0.9
Diffstat (limited to 'kafka/coordinator')
-rw-r--r-- | kafka/coordinator/consumer.py | 126 |
1 files changed, 94 insertions, 32 deletions
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index 99d62f2..673cbaf 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -9,7 +9,9 @@ from .base import BaseCoordinator import kafka.common as Errors from kafka.common import OffsetAndMetadata, TopicPartition from kafka.future import Future -from kafka.protocol.commit import OffsetCommitRequest_v2, OffsetFetchRequest_v1 +from kafka.protocol.commit import ( + OffsetCommitRequest_v2, OffsetCommitRequest_v1, OffsetCommitRequest_v0, + OffsetFetchRequest_v0, OffsetFetchRequest_v1) from kafka.protocol.struct import Struct from kafka.protocol.types import Array, Bytes, Int16, Int32, Schema, String @@ -55,6 +57,7 @@ class ConsumerCoordinator(BaseCoordinator): 'session_timeout_ms': 30000, 'heartbeat_interval_ms': 3000, 'retry_backoff_ms': 100, + 'api_version': (0, 9), } def __init__(self, client, subscription, **configs): @@ -99,14 +102,16 @@ class ConsumerCoordinator(BaseCoordinator): self._subscription = subscription self._partitions_per_topic = {} self._auto_commit_task = None - assert self.config['assignors'], 'Coordinator require assignors' + if self.config['api_version'] >= (0, 9): + assert self.config['assignors'], 'Coordinator require assignors' self._cluster.request_update() self._cluster.add_listener(self._handle_metadata_update) - if self.config['enable_auto_commit']: - interval = self.config['auto_commit_interval_ms'] / 1000.0 - self._auto_commit_task = AutoCommitTask(self, interval) + if self.config['api_version'] >= (0, 8, 1): + if self.config['enable_auto_commit']: + interval = self.config['auto_commit_interval_ms'] / 1000.0 + self._auto_commit_task = AutoCommitTask(self, interval) # metrics=None, # metric_group_prefix=None, @@ -143,7 +148,17 @@ class ConsumerCoordinator(BaseCoordinator): # check if there are any changes to the metadata which should trigger a rebalance if self._subscription_metadata_changed(): - self._subscription.mark_for_reassignment() + if self.config['api_version'] >= (0, 9): + self._subscription.mark_for_reassignment() + + # If we haven't got group coordinator support, + # just assign all partitions locally + else: + self._subscription.assign_from_subscribed([ + TopicPartition(topic, partition) + for topic in self._subscription.subscription + for partition in self._partitions_per_topic[topic] + ]) def _subscription_metadata_changed(self): if not self._subscription.partitions_auto_assigned(): @@ -273,7 +288,8 @@ class ConsumerCoordinator(BaseCoordinator): dict: {TopicPartition: OffsetAndMetadata} """ while True: - self.ensure_coordinator_known() + if self.config['api_version'] >= (0, 8, 2): + self.ensure_coordinator_known() # contact coordinator to fetch committed offsets future = self._send_offset_fetch_request(partitions) @@ -331,7 +347,8 @@ class ConsumerCoordinator(BaseCoordinator): return while True: - self.ensure_coordinator_known() + if self.config['api_version'] >= (0, 8, 2): + self.ensure_coordinator_known() future = self._send_offset_commit_request(offsets) self._client.poll(future=future) @@ -345,6 +362,8 @@ class ConsumerCoordinator(BaseCoordinator): time.sleep(self.config['retry_backoff_ms'] / 1000.0) def _maybe_auto_commit_offsets_sync(self): + if self.config['api_version'] < (0, 8, 1): + return if self.config['enable_auto_commit']: # disable periodic commits prior to committing synchronously. note that they will # be re-enabled after a rebalance completes @@ -379,8 +398,12 @@ class ConsumerCoordinator(BaseCoordinator): Returns: Future: indicating whether the commit was successful or not """ - if self.coordinator_unknown(): - return Future().failure(Errors.GroupCoordinatorNotAvailableError) + if self.config['api_version'] >= (0, 8, 2): + if self.coordinator_unknown(): + return Future().failure(Errors.GroupCoordinatorNotAvailableError) + node_id = self.coordinator_id + else: + node_id = self._client.least_loaded_node() if not offsets: return Future().failure(None) @@ -390,25 +413,49 @@ class ConsumerCoordinator(BaseCoordinator): for tp, offset in six.iteritems(offsets): offset_data[tp.topic][tp.partition] = offset - request = OffsetCommitRequest_v2( - self.group_id, - self.generation, - self.member_id, - OffsetCommitRequest_v2.DEFAULT_RETENTION_TIME, - [( - topic, [( - partition, - offset.offset, - offset.metadata - ) for partition, offset in six.iteritems(partitions)] - ) for topic, partitions in six.iteritems(offset_data)] - ) + if self.config['api_version'] >= (0, 9): + request = OffsetCommitRequest_v2( + self.group_id, + self.generation, + self.member_id, + OffsetCommitRequest_v2.DEFAULT_RETENTION_TIME, + [( + topic, [( + partition, + offset.offset, + offset.metadata + ) for partition, offset in six.iteritems(partitions)] + ) for topic, partitions in six.iteritems(offset_data)] + ) + elif self.config['api_version'] >= (0, 8, 2): + request = OffsetCommitRequest_v1( + self.group_id, -1, '', + [( + topic, [( + partition, + offset.offset, + -1, + offset.metadata + ) for partition, offset in six.iteritems(partitions)] + ) for topic, partitions in six.iteritems(offset_data)] + ) + elif self.config['api_version'] >= (0, 8, 1): + request = OffsetCommitRequest_v0( + self.group_id, + [( + topic, [( + partition, + offset.offset, + offset.metadata + ) for partition, offset in six.iteritems(partitions)] + ) for topic, partitions in six.iteritems(offset_data)] + ) log.debug("Sending offset-commit request with %s to %s", - offsets, self.coordinator_id) + offsets, node_id) future = Future() - _f = self._client.send(self.coordinator_id, request) + _f = self._client.send(node_id, request) _f.add_callback(self._handle_offset_commit_response, offsets, future) _f.add_errback(self._failed_request, future) return future @@ -495,22 +542,33 @@ class ConsumerCoordinator(BaseCoordinator): Returns: Future: resolves to dict of offsets: {TopicPartition: int} """ - if self.coordinator_unknown(): - return Future().failure(Errors.GroupCoordinatorNotAvailableError) + if self.config['api_version'] >= (0, 8, 2): + if self.coordinator_unknown(): + return Future().failure(Errors.GroupCoordinatorNotAvailableError) + node_id = self.coordinator_id + else: + node_id = self._client.least_loaded_node() log.debug("Fetching committed offsets for partitions: %s", partitions) # construct the request topic_partitions = collections.defaultdict(set) for tp in partitions: topic_partitions[tp.topic].add(tp.partition) - request = OffsetFetchRequest_v1( - self.group_id, - list(topic_partitions.items()) - ) + + if self.config['api_version'] >= (0, 8, 2): + request = OffsetFetchRequest_v1( + self.group_id, + list(topic_partitions.items()) + ) + else: + request = OffsetFetchRequest_v0( + self.group_id, + list(topic_partitions.items()) + ) # send the request with a callback future = Future() - _f = self._client.send(self.coordinator_id, request) + _f = self._client.send(node_id, request) _f.add_callback(self._handle_offset_fetch_response, future) _f.add_errback(self._failed_request, future) return future @@ -536,6 +594,10 @@ class ConsumerCoordinator(BaseCoordinator): # need to re-join group self._subscription.mark_for_reassignment() future.failure(error) + elif error_type is Errors.UnknownTopicOrPartitionError: + log.warning("OffsetFetchRequest -- unknown topic %s", + topic) + continue else: log.error("Unknown error fetching offsets for %s: %s", tp, error) |