summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2017-10-22 00:26:26 -0700
committerDana Powers <dana.powers@gmail.com>2017-12-21 11:39:25 -0800
commit1247a6fb087079ab5dc09350c27563db4823cef5 (patch)
tree921ebab851c9fa22817b4af32a8ef132e6f3f59e
parent7a34ccefc494be3c729fac8c296964388981310e (diff)
downloadkafka-python-1247a6fb087079ab5dc09350c27563db4823cef5.tar.gz
Handle no group, no broker support, and/or manually-assigned partitions in coordinator.poll()
-rw-r--r--kafka/consumer/group.py14
-rw-r--r--kafka/coordinator/consumer.py55
2 files changed, 31 insertions, 38 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index 4755b70..e080251 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -604,12 +604,7 @@ class KafkaConsumer(six.Iterator):
Returns:
dict: Map of topic to list of records (may be empty).
"""
- if self._use_consumer_group():
- self._coordinator.poll()
-
- # 0.8.2 brokers support kafka-backed offset storage via group coordinator
- elif self.config['group_id'] is not None and self.config['api_version'] >= (0, 8, 2):
- self._coordinator.ensure_coordinator_ready()
+ self._coordinator.poll()
# Fetch positions if we have partitions we're subscribed to that we
# don't know the offset for
@@ -1032,12 +1027,7 @@ class KafkaConsumer(six.Iterator):
assert self.assignment() or self.subscription() is not None, 'No topic subscription or manual partition assignment'
while time.time() < self._consumer_timeout:
- if self._use_consumer_group():
- self._coordinator.poll()
-
- # 0.8.2 brokers support kafka-backed offset storage via group coordinator
- elif self.config['group_id'] is not None and self.config['api_version'] >= (0, 8, 2):
- self._coordinator.ensure_coordinator_ready()
+ self._coordinator.poll()
# Fetch offsets for any subscribed partitions that we arent tracking yet
if not self._subscription.has_all_fetch_positions():
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py
index 51fff23..9e680c1 100644
--- a/kafka/coordinator/consumer.py
+++ b/kafka/coordinator/consumer.py
@@ -232,35 +232,39 @@ class ConsumerCoordinator(BaseCoordinator):
def poll(self):
"""
- Poll for coordinator events. This ensures that the coordinator is
- known and that the consumer has joined the group (if it is using
- group management). This also handles periodic offset commits if
- they are enabled.
+ Poll for coordinator events. Only applicable if group_id is set, and
+ broker version supports GroupCoordinators. This ensures that the
+ coordinator is known, and if using automatic partition assignment,
+ ensures that the consumer has joined the group. This also handles
+ periodic offset commits if they are enabled.
"""
+ if self.group_id is None or self.config['api_version'] < (0, 8, 2):
+ return
+
self._invoke_completed_offset_commit_callbacks()
+ self.ensure_coordinator_ready()
- if self._subscription.partitions_auto_assigned() and self.coordinator_unknown():
- self.ensure_coordinator_ready()
+ if self._subscription.partitions_auto_assigned():
+ if self.need_rejoin():
+ # due to a race condition between the initial metadata fetch and the
+ # initial rebalance, we need to ensure that the metadata is fresh
+ # before joining initially, and then request the metadata update. If
+ # metadata update arrives while the rebalance is still pending (for
+ # example, when the join group is still inflight), then we will lose
+ # track of the fact that we need to rebalance again to reflect the
+ # change to the topic subscription. Without ensuring that the
+ # metadata is fresh, any metadata update that changes the topic
+ # subscriptions and arrives while a rebalance is in progress will
+ # essentially be ignored. See KAFKA-3949 for the complete
+ # description of the problem.
+ if self._subscription.subscribed_pattern:
+ metadata_update = self._client.cluster.request_update()
+ self._client.poll(future=metadata_update)
+
+ self.ensure_active_group()
+
+ self.poll_heartbeat()
- if self._subscription.partitions_auto_assigned() and self.need_rejoin():
- # due to a race condition between the initial metadata fetch and the
- # initial rebalance, we need to ensure that the metadata is fresh
- # before joining initially, and then request the metadata update. If
- # metadata update arrives while the rebalance is still pending (for
- # example, when the join group is still inflight), then we will lose
- # track of the fact that we need to rebalance again to reflect the
- # change to the topic subscription. Without ensuring that the
- # metadata is fresh, any metadata update that changes the topic
- # subscriptions and arrives while a rebalance is in progress will
- # essentially be ignored. See KAFKA-3949 for the complete
- # description of the problem.
- if self._subscription.subscribed_pattern:
- metadata_update = self._client.cluster.request_update()
- self._client.poll(future=metadata_update)
-
- self.ensure_active_group()
-
- self.poll_heartbeat()
self._maybe_auto_commit_offsets_async()
def time_to_next_poll(self):
@@ -420,7 +424,6 @@ class ConsumerCoordinator(BaseCoordinator):
future = self.lookup_coordinator()
future.add_callback(self._do_commit_offsets_async, offsets, callback)
if callback:
-
future.add_errback(lambda e: self.completed_offset_commits.appendleft((callback, offsets, e)))
# ensure the commit has a chance to be transmitted (without blocking on