diff options
author | Dana Powers <dana.powers@rd.io> | 2015-12-29 16:58:08 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2015-12-29 17:05:07 -0800 |
commit | 357073cc180e3d80fa1f7ece58fc67dbb8f42af7 (patch) | |
tree | 502c4cc34539274db8c2fa53e954fd5d8c6bcd0a | |
parent | 6dea93dfdb0b15270d5fd0ed630e21f319d9cea8 (diff) | |
download | kafka-python-357073cc180e3d80fa1f7ece58fc67dbb8f42af7.tar.gz |
Make more methods private(ish) in AbstractCoordinator
-rw-r--r-- | kafka/coordinator/abstract.py | 26 |
1 files changed, 13 insertions, 13 deletions
diff --git a/kafka/coordinator/abstract.py b/kafka/coordinator/abstract.py index 10df9a5..2f7b144 100644 --- a/kafka/coordinator/abstract.py +++ b/kafka/coordinator/abstract.py @@ -176,7 +176,7 @@ class AbstractCoordinator(object): self._client.poll() continue - future = self.send_group_metadata_request() + future = self._send_group_metadata_request() self._client.poll(future=future) if future.failed(): @@ -205,7 +205,7 @@ class AbstractCoordinator(object): while self.need_rejoin(): self.ensure_coordinator_known() - future = self.perform_group_join() + future = self._perform_group_join() self._client.poll(future=future) if future.succeeded(): @@ -224,7 +224,7 @@ class AbstractCoordinator(object): raise exception # pylint: disable-msg=raising-bad-type time.sleep(self._retry_backoff_ms / 1000.0) - def perform_group_join(self): + def _perform_group_join(self): """Join the group and return the assignment for the next generation. This function handles both JoinGroup and SyncGroup, delegating to @@ -269,9 +269,9 @@ class AbstractCoordinator(object): self.protocol = response.group_protocol #self.sensors.join_latency.record(response.requestLatencyMs()) if response.leader_id == response.member_id: - self.on_join_leader(response).chain(future) + self._on_join_leader(response).chain(future) else: - self.on_join_follower().chain(future) + self._on_join_follower().chain(future) elif error_type is Errors.GroupLoadInProgressError: log.debug("Attempt to join group %s rejected since coordinator is" @@ -308,7 +308,7 @@ class AbstractCoordinator(object): log.error("Unexpected error in join group response: %s", error) future.failure(error) - def on_join_follower(self): + def _on_join_follower(self): # send follower's sync group with an empty assignment request = SyncGroupRequest( self.group_id, @@ -317,9 +317,9 @@ class AbstractCoordinator(object): {}) log.debug("Issuing follower SyncGroup (%s) to coordinator %s", request, self.coordinator_id) - return self.send_sync_group_request(request) + return self._send_sync_group_request(request) - def on_join_leader(self, response): + def _on_join_leader(self, response): """ Perform leader synchronization and send back the assignment for the group via SyncGroupRequest @@ -342,9 +342,9 @@ class AbstractCoordinator(object): log.debug("Issuing leader SyncGroup (%s) to coordinator %s", request, self.coordinator_id) - return self.send_sync_group_request(request) + return self._send_sync_group_request(request) - def send_sync_group_request(self, request): + def _send_sync_group_request(self, request): if self.coordinator_unknown(): return Future().failure(Errors.GroupCoordinatorNotAvailableError()) future = Future() @@ -389,7 +389,7 @@ class AbstractCoordinator(object): log.error("Unexpected error from SyncGroup: %s", error) future.failure(error) - def send_group_metadata_request(self): + def _send_group_metadata_request(self): """Discover the current coordinator for the group. Sends a GroupMetadata request to one of the brokers. The returned future @@ -477,7 +477,7 @@ class AbstractCoordinator(object): else: log.error("LeaveGroup request failed: %s", error_type()) - def send_heartbeat_request(self): + def _send_heartbeat_request(self): """Send a heartbeat request now (visible only for testing).""" request = HeartbeatRequest(self.group_id, self.generation, self.member_id) future = Future() @@ -568,7 +568,7 @@ class HeartbeatTask(object): log.debug("Sending HeartbeatRequest") self._heartbeat.sent_heartbeat() self._request_in_flight = True - future = self._coordinator.send_heartbeat_request() + future = self._coordinator._send_heartbeat_request() future.add_callback(self._handle_heartbeat_success) future.add_errback(self._handle_heartbeat_failure) |