diff options
author | Dana Powers <dana.powers@gmail.com> | 2017-10-22 00:26:26 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2017-12-21 11:39:25 -0800 |
commit | 1247a6fb087079ab5dc09350c27563db4823cef5 (patch) | |
tree | 921ebab851c9fa22817b4af32a8ef132e6f3f59e | |
parent | 7a34ccefc494be3c729fac8c296964388981310e (diff) | |
download | kafka-python-1247a6fb087079ab5dc09350c27563db4823cef5.tar.gz |
Handle no group, no broker support, and/or manually-assigned partitions in coordinator.poll()
-rw-r--r-- | kafka/consumer/group.py | 14 | ||||
-rw-r--r-- | kafka/coordinator/consumer.py | 55 |
2 files changed, 31 insertions, 38 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 4755b70..e080251 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -604,12 +604,7 @@ class KafkaConsumer(six.Iterator): Returns: dict: Map of topic to list of records (may be empty). """ - if self._use_consumer_group(): - self._coordinator.poll() - - # 0.8.2 brokers support kafka-backed offset storage via group coordinator - elif self.config['group_id'] is not None and self.config['api_version'] >= (0, 8, 2): - self._coordinator.ensure_coordinator_ready() + self._coordinator.poll() # Fetch positions if we have partitions we're subscribed to that we # don't know the offset for @@ -1032,12 +1027,7 @@ class KafkaConsumer(six.Iterator): assert self.assignment() or self.subscription() is not None, 'No topic subscription or manual partition assignment' while time.time() < self._consumer_timeout: - if self._use_consumer_group(): - self._coordinator.poll() - - # 0.8.2 brokers support kafka-backed offset storage via group coordinator - elif self.config['group_id'] is not None and self.config['api_version'] >= (0, 8, 2): - self._coordinator.ensure_coordinator_ready() + self._coordinator.poll() # Fetch offsets for any subscribed partitions that we arent tracking yet if not self._subscription.has_all_fetch_positions(): diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index 51fff23..9e680c1 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -232,35 +232,39 @@ class ConsumerCoordinator(BaseCoordinator): def poll(self): """ - Poll for coordinator events. This ensures that the coordinator is - known and that the consumer has joined the group (if it is using - group management). This also handles periodic offset commits if - they are enabled. + Poll for coordinator events. Only applicable if group_id is set, and + broker version supports GroupCoordinators. This ensures that the + coordinator is known, and if using automatic partition assignment, + ensures that the consumer has joined the group. This also handles + periodic offset commits if they are enabled. """ + if self.group_id is None or self.config['api_version'] < (0, 8, 2): + return + self._invoke_completed_offset_commit_callbacks() + self.ensure_coordinator_ready() - if self._subscription.partitions_auto_assigned() and self.coordinator_unknown(): - self.ensure_coordinator_ready() + if self._subscription.partitions_auto_assigned(): + if self.need_rejoin(): + # due to a race condition between the initial metadata fetch and the + # initial rebalance, we need to ensure that the metadata is fresh + # before joining initially, and then request the metadata update. If + # metadata update arrives while the rebalance is still pending (for + # example, when the join group is still inflight), then we will lose + # track of the fact that we need to rebalance again to reflect the + # change to the topic subscription. Without ensuring that the + # metadata is fresh, any metadata update that changes the topic + # subscriptions and arrives while a rebalance is in progress will + # essentially be ignored. See KAFKA-3949 for the complete + # description of the problem. + if self._subscription.subscribed_pattern: + metadata_update = self._client.cluster.request_update() + self._client.poll(future=metadata_update) + + self.ensure_active_group() + + self.poll_heartbeat() - if self._subscription.partitions_auto_assigned() and self.need_rejoin(): - # due to a race condition between the initial metadata fetch and the - # initial rebalance, we need to ensure that the metadata is fresh - # before joining initially, and then request the metadata update. If - # metadata update arrives while the rebalance is still pending (for - # example, when the join group is still inflight), then we will lose - # track of the fact that we need to rebalance again to reflect the - # change to the topic subscription. Without ensuring that the - # metadata is fresh, any metadata update that changes the topic - # subscriptions and arrives while a rebalance is in progress will - # essentially be ignored. See KAFKA-3949 for the complete - # description of the problem. - if self._subscription.subscribed_pattern: - metadata_update = self._client.cluster.request_update() - self._client.poll(future=metadata_update) - - self.ensure_active_group() - - self.poll_heartbeat() self._maybe_auto_commit_offsets_async() def time_to_next_poll(self): @@ -420,7 +424,6 @@ class ConsumerCoordinator(BaseCoordinator): future = self.lookup_coordinator() future.add_callback(self._do_commit_offsets_async, offsets, callback) if callback: - future.add_errback(lambda e: self.completed_offset_commits.appendleft((callback, offsets, e))) # ensure the commit has a chance to be transmitted (without blocking on |