summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/coordinator/abstract.py26
1 files changed, 13 insertions, 13 deletions
diff --git a/kafka/coordinator/abstract.py b/kafka/coordinator/abstract.py
index 10df9a5..2f7b144 100644
--- a/kafka/coordinator/abstract.py
+++ b/kafka/coordinator/abstract.py
@@ -176,7 +176,7 @@ class AbstractCoordinator(object):
self._client.poll()
continue
- future = self.send_group_metadata_request()
+ future = self._send_group_metadata_request()
self._client.poll(future=future)
if future.failed():
@@ -205,7 +205,7 @@ class AbstractCoordinator(object):
while self.need_rejoin():
self.ensure_coordinator_known()
- future = self.perform_group_join()
+ future = self._perform_group_join()
self._client.poll(future=future)
if future.succeeded():
@@ -224,7 +224,7 @@ class AbstractCoordinator(object):
raise exception # pylint: disable-msg=raising-bad-type
time.sleep(self._retry_backoff_ms / 1000.0)
- def perform_group_join(self):
+ def _perform_group_join(self):
"""Join the group and return the assignment for the next generation.
This function handles both JoinGroup and SyncGroup, delegating to
@@ -269,9 +269,9 @@ class AbstractCoordinator(object):
self.protocol = response.group_protocol
#self.sensors.join_latency.record(response.requestLatencyMs())
if response.leader_id == response.member_id:
- self.on_join_leader(response).chain(future)
+ self._on_join_leader(response).chain(future)
else:
- self.on_join_follower().chain(future)
+ self._on_join_follower().chain(future)
elif error_type is Errors.GroupLoadInProgressError:
log.debug("Attempt to join group %s rejected since coordinator is"
@@ -308,7 +308,7 @@ class AbstractCoordinator(object):
log.error("Unexpected error in join group response: %s", error)
future.failure(error)
- def on_join_follower(self):
+ def _on_join_follower(self):
# send follower's sync group with an empty assignment
request = SyncGroupRequest(
self.group_id,
@@ -317,9 +317,9 @@ class AbstractCoordinator(object):
{})
log.debug("Issuing follower SyncGroup (%s) to coordinator %s",
request, self.coordinator_id)
- return self.send_sync_group_request(request)
+ return self._send_sync_group_request(request)
- def on_join_leader(self, response):
+ def _on_join_leader(self, response):
"""
Perform leader synchronization and send back the assignment
for the group via SyncGroupRequest
@@ -342,9 +342,9 @@ class AbstractCoordinator(object):
log.debug("Issuing leader SyncGroup (%s) to coordinator %s",
request, self.coordinator_id)
- return self.send_sync_group_request(request)
+ return self._send_sync_group_request(request)
- def send_sync_group_request(self, request):
+ def _send_sync_group_request(self, request):
if self.coordinator_unknown():
return Future().failure(Errors.GroupCoordinatorNotAvailableError())
future = Future()
@@ -389,7 +389,7 @@ class AbstractCoordinator(object):
log.error("Unexpected error from SyncGroup: %s", error)
future.failure(error)
- def send_group_metadata_request(self):
+ def _send_group_metadata_request(self):
"""Discover the current coordinator for the group.
Sends a GroupMetadata request to one of the brokers. The returned future
@@ -477,7 +477,7 @@ class AbstractCoordinator(object):
else:
log.error("LeaveGroup request failed: %s", error_type())
- def send_heartbeat_request(self):
+ def _send_heartbeat_request(self):
"""Send a heartbeat request now (visible only for testing)."""
request = HeartbeatRequest(self.group_id, self.generation, self.member_id)
future = Future()
@@ -568,7 +568,7 @@ class HeartbeatTask(object):
log.debug("Sending HeartbeatRequest")
self._heartbeat.sent_heartbeat()
self._request_in_flight = True
- future = self._coordinator.send_heartbeat_request()
+ future = self._coordinator._send_heartbeat_request()
future.add_callback(self._handle_heartbeat_success)
future.add_errback(self._handle_heartbeat_failure)