diff options
| author | Dana Powers <dana.powers@gmail.com> | 2017-10-11 17:11:31 -0700 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2017-10-11 17:11:31 -0700 |
| commit | cfddc6bd179e236874e00a899e9349d5c9a54400 (patch) | |
| tree | 5b3c851f0d127f53adfb1c58680f6c2e6ff2fa9f /kafka/coordinator | |
| parent | f04435c5ed97fef0975a77a8dc7bae7c284bba63 (diff) | |
| download | kafka-python-cfddc6bd179e236874e00a899e9349d5c9a54400.tar.gz | |
KAFKA-4034: Avoid unnecessary consumer coordinator lookup (#1254)
Diffstat (limited to 'kafka/coordinator')
| -rw-r--r-- | kafka/coordinator/base.py | 23 | ||||
| -rw-r--r-- | kafka/coordinator/consumer.py | 28 |
2 files changed, 43 insertions, 8 deletions
diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index af0936c..53b3e1d 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -88,6 +88,7 @@ class BaseCoordinator(object): self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID self.group_id = self.config['group_id'] self.coordinator_id = None + self._find_coordinator_future = None self.rejoin_needed = True self.rejoining = False self.heartbeat = Heartbeat(**self.config) @@ -195,12 +196,11 @@ class BaseCoordinator(object): return False - def ensure_coordinator_known(self): + def ensure_coordinator_ready(self): """Block until the coordinator for this group is known (and we have an active connection -- java client uses unsent queue). """ while self.coordinator_unknown(): - # Prior to 0.8.2 there was no group coordinator # so we will just pick a node at random and treat # it as the "coordinator" @@ -210,7 +210,7 @@ class BaseCoordinator(object): self._client.ready(self.coordinator_id) continue - future = self._send_group_coordinator_request() + future = self.lookup_coordinator() self._client.poll(future=future) if future.failed(): @@ -224,6 +224,16 @@ class BaseCoordinator(object): else: raise future.exception # pylint: disable-msg=raising-bad-type + def _reset_find_coordinator_future(self, result): + self._find_coordinator_future = None + + def lookup_coordinator(self): + if self._find_coordinator_future is None: + self._find_coordinator_future = self._send_group_coordinator_request() + + self._find_coordinator_future.add_both(self._reset_find_coordinator_future) + return self._find_coordinator_future + def need_rejoin(self): """Check whether the group should be rejoined (e.g. if metadata changes) @@ -234,6 +244,11 @@ class BaseCoordinator(object): def ensure_active_group(self): """Ensure that the group is active (i.e. joined and synced)""" + # always ensure that the coordinator is ready because we may have been + # disconnected when sending heartbeats and does not necessarily require + # us to rejoin the group. + self.ensure_coordinator_ready() + if not self.need_rejoin(): return @@ -242,7 +257,7 @@ class BaseCoordinator(object): self.rejoining = True while self.need_rejoin(): - self.ensure_coordinator_known() + self.ensure_coordinator_ready() # ensure that there are no pending requests to the coordinator. # This is important in particular to avoid resending a pending diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index 84c62df..0328837 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -315,7 +315,7 @@ class ConsumerCoordinator(BaseCoordinator): return {} while True: - self.ensure_coordinator_known() + self.ensure_coordinator_ready() # contact coordinator to fetch committed offsets future = self._send_offset_fetch_request(partitions) @@ -353,9 +353,29 @@ class ConsumerCoordinator(BaseCoordinator): response will be either an Exception or a OffsetCommitResponse struct. This callback can be used to trigger custom actions when a commit request completes. - Returns: - Future: indicating whether the commit was successful or not """ + if not self.coordinator_unknown(): + self._do_commit_offsets_async(offsets, callback) + else: + # we don't know the current coordinator, so try to find it and then + # send the commit or fail (we don't want recursive retries which can + # cause offset commits to arrive out of order). Note that there may + # be multiple offset commits chained to the same coordinator lookup + # request. This is fine because the listeners will be invoked in the + # same order that they were added. Note also that BaseCoordinator + # prevents multiple concurrent coordinator lookup requests. + future = self.lookup_coordinator() + future.add_callback(self._do_commit_offsets_async, offsets, callback) + if callback: + future.add_errback(callback) + + # ensure the commit has a chance to be transmitted (without blocking on + # its completion). Note that commits are treated as heartbeats by the + # coordinator, so there is no need to explicitly allow heartbeats + # through delayed task execution. + self._client.poll() # no wakeup if we add that feature + + def _do_commit_offsets_async(self, offsets, callback=None): assert self.config['api_version'] >= (0, 8, 1), 'Unsupported Broker API' assert all(map(lambda k: isinstance(k, TopicPartition), offsets)) assert all(map(lambda v: isinstance(v, OffsetAndMetadata), @@ -386,7 +406,7 @@ class ConsumerCoordinator(BaseCoordinator): return while True: - self.ensure_coordinator_known() + self.ensure_coordinator_ready() future = self._send_offset_commit_request(offsets) self._client.poll(future=future) |
