diff options
Diffstat (limited to 'kafka/coordinator/base.py')
-rw-r--r-- | kafka/coordinator/base.py | 29 |
1 files changed, 14 insertions, 15 deletions
diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 3c7ea21..7ff7a04 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -8,8 +8,7 @@ import six import kafka.errors as Errors from kafka.future import Future -from kafka.protocol.commit import (GroupCoordinatorRequest, - OffsetCommitRequest_v2 as OffsetCommitRequest) +from kafka.protocol.commit import GroupCoordinatorRequest, OffsetCommitRequest from kafka.protocol.group import (HeartbeatRequest, JoinGroupRequest, LeaveGroupRequest, SyncGroupRequest) from .heartbeat import Heartbeat @@ -79,8 +78,8 @@ class BaseCoordinator(object): self.config[key] = configs[key] self._client = client - self.generation = OffsetCommitRequest.DEFAULT_GENERATION_ID - self.member_id = JoinGroupRequest.UNKNOWN_MEMBER_ID + self.generation = OffsetCommitRequest[2].DEFAULT_GENERATION_ID + self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID self.group_id = self.config['group_id'] self.coordinator_id = None self.rejoin_needed = True @@ -269,7 +268,7 @@ class BaseCoordinator(object): # send a join group request to the coordinator log.info("(Re-)joining group %s", self.group_id) - request = JoinGroupRequest( + request = JoinGroupRequest[0]( self.group_id, self.config['session_timeout_ms'], self.member_id, @@ -324,7 +323,7 @@ class BaseCoordinator(object): elif error_type is Errors.UnknownMemberIdError: # reset the member id and retry immediately error = error_type(self.member_id) - self.member_id = JoinGroupRequest.UNKNOWN_MEMBER_ID + self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID log.debug("Attempt to join group %s failed due to unknown member id", self.group_id) future.failure(error) @@ -354,7 +353,7 @@ class BaseCoordinator(object): def _on_join_follower(self): # send follower's sync group with an empty assignment - request = SyncGroupRequest( + request = SyncGroupRequest[0]( self.group_id, self.generation, self.member_id, @@ -381,7 +380,7 @@ class BaseCoordinator(object): except Exception as e: return Future().failure(e) - request = SyncGroupRequest( + request = SyncGroupRequest[0]( self.group_id, self.generation, self.member_id, @@ -425,7 +424,7 @@ class BaseCoordinator(object): Errors.IllegalGenerationError): error = error_type() log.debug("SyncGroup for group %s failed due to %s", self.group_id, error) - self.member_id = JoinGroupRequest.UNKNOWN_MEMBER_ID + self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID future.failure(error) elif error_type in (Errors.GroupCoordinatorNotAvailableError, Errors.NotCoordinatorForGroupError): @@ -450,7 +449,7 @@ class BaseCoordinator(object): log.debug("Sending group coordinator request for group %s to broker %s", self.group_id, node_id) - request = GroupCoordinatorRequest(self.group_id) + request = GroupCoordinatorRequest[0](self.group_id) future = Future() _f = self._client.send(node_id, request) _f.add_callback(self._handle_group_coordinator_response, future) @@ -514,14 +513,14 @@ class BaseCoordinator(object): if not self.coordinator_unknown() and self.generation > 0: # this is a minimal effort attempt to leave the group. we do not # attempt any resending if the request fails or times out. - request = LeaveGroupRequest(self.group_id, self.member_id) + request = LeaveGroupRequest[0](self.group_id, self.member_id) future = self._client.send(self.coordinator_id, request) future.add_callback(self._handle_leave_group_response) future.add_errback(log.error, "LeaveGroup request failed: %s") self._client.poll(future=future) - self.generation = OffsetCommitRequest.DEFAULT_GENERATION_ID - self.member_id = JoinGroupRequest.UNKNOWN_MEMBER_ID + self.generation = OffsetCommitRequest[2].DEFAULT_GENERATION_ID + self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID self.rejoin_needed = True def _handle_leave_group_response(self, response): @@ -533,7 +532,7 @@ class BaseCoordinator(object): def _send_heartbeat_request(self): """Send a heartbeat request""" - request = HeartbeatRequest(self.group_id, self.generation, self.member_id) + request = HeartbeatRequest[0](self.group_id, self.generation, self.member_id) log.debug("Heartbeat: %s[%s] %s", request.group, request.generation_id, request.member_id) #pylint: disable-msg=no-member future = Future() _f = self._client.send(self.coordinator_id, request) @@ -569,7 +568,7 @@ class BaseCoordinator(object): elif error_type is Errors.UnknownMemberIdError: log.warning("Heartbeat: local member_id was not recognized;" " this consumer needs to re-join") - self.member_id = JoinGroupRequest.UNKNOWN_MEMBER_ID + self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID self.rejoin_needed = True future.failure(error_type) elif error_type is Errors.GroupAuthorizationFailedError: |