summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2015-12-28 15:05:08 -0800
committerDana Powers <dana.powers@rd.io>2015-12-28 15:18:43 -0800
commitbaab076c7e70a721d958f588c4199acbaae41481 (patch)
tree4a5db85a6c55f480272745d878fe524de0f56718
parent9820c5d55398bdb49ffbcd0e6a997bde9f8891fe (diff)
downloadkafka-python-baab076c7e70a721d958f588c4199acbaae41481.tar.gz
AbstractCoordinator (modelled after upstream Java class)
- Heartbeat class manages HeartbeatRequest / Response state - metrics are still TODO
-rw-r--r--kafka/common.py5
-rw-r--r--kafka/coordinator/__init__.py0
-rw-r--r--kafka/coordinator/abstract.py642
-rw-r--r--kafka/coordinator/heartbeat.py43
4 files changed, 690 insertions, 0 deletions
diff --git a/kafka/common.py b/kafka/common.py
index 33f24a4..597fb5c 100644
--- a/kafka/common.py
+++ b/kafka/common.py
@@ -111,6 +111,11 @@ class DisconnectError(KafkaError):
invalid_metadata = True
+class NoBrokersAvailable(KafkaError):
+ retriable = True
+ invalid_metadata = True
+
+
class NodeNotReadyError(KafkaError):
retriable = True
diff --git a/kafka/coordinator/__init__.py b/kafka/coordinator/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/kafka/coordinator/__init__.py
diff --git a/kafka/coordinator/abstract.py b/kafka/coordinator/abstract.py
new file mode 100644
index 0000000..1713433
--- /dev/null
+++ b/kafka/coordinator/abstract.py
@@ -0,0 +1,642 @@
+import abc
+import logging
+import time
+
+import six
+
+import kafka.common as Errors
+from kafka.future import Future
+from kafka.protocol.commit import (GroupCoordinatorRequest,
+ OffsetCommitRequest_v2 as OffsetCommitRequest)
+from kafka.protocol.group import (HeartbeatRequest, JoinGroupRequest,
+ LeaveGroupRequest, SyncGroupRequest)
+from .heartbeat import Heartbeat
+
+log = logging.getLogger(__name__)
+
+
+class AbstractCoordinator(object):
+ """
+ AbstractCoordinator implements group management for a single group member
+ by interacting with a designated Kafka broker (the coordinator). Group
+ semantics are provided by extending this class. See ConsumerCoordinator
+ for example usage.
+
+ From a high level, Kafka's group management protocol consists of the
+ following sequence of actions:
+
+ 1. Group Registration: Group members register with the coordinator providing
+ their own metadata (such as the set of topics they are interested in).
+
+ 2. Group/Leader Selection: The coordinator select the members of the group
+ and chooses one member as the leader.
+
+ 3. State Assignment: The leader collects the metadata from all the members
+ of the group and assigns state.
+
+ 4. Group Stabilization: Each member receives the state assigned by the
+ leader and begins processing.
+
+ To leverage this protocol, an implementation must define the format of
+ metadata provided by each member for group registration in group_protocols()
+ and the format of the state assignment provided by the leader in
+ _perform_assignment() and which becomes available to members in
+ _on_join_complete().
+ """
+
+ _session_timeout_ms = 30000
+ _heartbeat_interval_ms = 3000
+ _retry_backoff_ms = 100
+
+ def __init__(self, client, group_id, **kwargs):
+ if not client:
+ raise Errors.IllegalStateError('a client is required to use'
+ ' Group Coordinator')
+ if not group_id:
+ raise Errors.IllegalStateError('a group_id is required to use'
+ ' Group Coordinator')
+ for config in ('session_timeout_ms',
+ 'heartbeat_interval_ms',
+ 'retry_backoff_ms'):
+ if config in kwargs:
+ setattr(self, '_' + config, kwargs.pop(config))
+
+ self._client = client
+ self.generation = OffsetCommitRequest.DEFAULT_GENERATION_ID
+ self.member_id = JoinGroupRequest.UNKNOWN_MEMBER_ID
+ self.group_id = group_id
+ self.coordinator_id = None
+ self.rejoin_needed = True
+ self.needs_join_prepare = True
+ self.heartbeat = Heartbeat(
+ session_timeout_ms=self._session_timeout_ms,
+ heartbeat_interval_ms=self._heartbeat_interval_ms)
+ self.heartbeat_task = HeartbeatTask(self)
+ #self.sensors = GroupCoordinatorMetrics(metrics, metric_group_prefix, metric_tags)
+
+ @abc.abstractmethod
+ def protocol_type(self):
+ """
+ Unique identifier for the class of protocols implements
+ (e.g. "consumer" or "connect").
+
+ @return str protocol type name
+ """
+ pass
+
+ @abc.abstractmethod
+ def group_protocols(self):
+ """Return the list of supported group protocols and metadata.
+
+ This list is submitted by each group member via a JoinGroupRequest.
+ The order of the protocols in the list indicates the preference of the
+ protocol (the first entry is the most preferred). The coordinator takes
+ this preference into account when selecting the generation protocol
+ (generally more preferred protocols will be selected as long as all
+ members support them and there is no disagreement on the preference).
+
+ Note: metadata must be type bytes or support an encode() method
+
+ @return [(protocol, metadata), ...]
+ """
+ pass
+
+ @abc.abstractmethod
+ def _on_join_prepare(self, generation, member_id):
+ """Invoked prior to each group join or rejoin.
+
+ This is typically used to perform any cleanup from the previous
+ generation (such as committing offsets for the consumer)
+
+ @param generation The previous generation or -1 if there was none
+ @param member_id The identifier of this member in the previous group
+ or '' if there was none
+ """
+ pass
+
+ @abc.abstractmethod
+ def _perform_assignment(self, leader_id, protocol, members):
+ """Perform assignment for the group.
+
+ This is used by the leader to push state to all the members of the group
+ (e.g. to push partition assignments in the case of the new consumer)
+
+ @param leader_id: The id of the leader (which is this member)
+ @param protocol: the chosen group protocol (assignment strategy)
+ @param members: [(member_id, metadata_bytes)] from JoinGroupResponse.
+ metadata_bytes are associated with the chosen group
+ protocol, and the Coordinator subclass is responsible
+ for decoding metadata_bytes based on that protocol.
+
+ @return dict of {member_id: assignment}; assignment must either be bytes
+ or have an encode() method to convert to bytes
+ """
+ pass
+
+ @abc.abstractmethod
+ def _on_join_complete(self, generation, member_id, protocol,
+ member_assignment_bytes):
+ """Invoked when a group member has successfully joined a group.
+
+ @param generation The generation that was joined
+ @param member_id The identifier for the local member in the group
+ @param protocol The protocol selected by the coordinator
+ @param member_assignment_bytes The protocol-encoded assignment
+ propagated from the group leader. The Coordinator instance is
+ responsible for decoding based on the chosen protocol.
+ """
+ pass
+
+ def coordinator_unknown(self):
+ """
+ Check if we know who the coordinator is and we have an active connection
+
+ Side-effect: reset coordinator_id to None if connection failed
+
+ @return True if the coordinator is unknown
+ """
+ if self.coordinator_id is None:
+ return True
+
+ if self._client.connection_failed(self.coordinator_id):
+ self.coordinator_dead()
+ return True
+
+ return not self._client.ready(self.coordinator_id)
+
+ def ensure_coordinator_known(self):
+ """Block until the coordinator for this group is known
+ (and we have an active connection -- java client uses unsent queue).
+ """
+ while self.coordinator_unknown():
+
+ # Dont look for a new coordinator node if we are just waiting
+ # for connection to finish
+ if self.coordinator_id is not None:
+ self._client.poll()
+ continue
+
+ future = self.send_group_metadata_request()
+ self._client.poll(future=future)
+
+ if future.failed():
+ if future.retriable():
+ metadata_update = self._client.cluster.request_update()
+ self._client.poll(future=metadata_update)
+ else:
+ raise future.exception # pylint: disable-msg=raising-bad-type
+
+ def need_rejoin(self):
+ """
+ Check whether the group should be rejoined (e.g. if metadata changes)
+ @return True if it should, False otherwise
+ """
+ return self.rejoin_needed
+
+ def ensure_active_group(self):
+ """Ensure that the group is active (i.e. joined and synced)"""
+ if not self.need_rejoin():
+ return
+
+ if self.needs_join_prepare:
+ self._on_join_prepare(self.generation, self.member_id)
+ self.needs_join_prepare = False
+
+ while self.need_rejoin():
+ self.ensure_coordinator_known()
+
+ future = self.perform_group_join()
+ self._client.poll(future=future)
+
+ if future.succeeded():
+ member_assignment_bytes = future.value
+ self._on_join_complete(self.generation, self.member_id,
+ self.protocol, member_assignment_bytes)
+ self.needs_join_prepare = True
+ self.heartbeat_task.reset()
+ else:
+ exception = future.exception
+ if isinstance(exception, (Errors.UnknownMemberIdError,
+ Errors.RebalanceInProgressError,
+ Errors.IllegalGenerationError)):
+ continue
+ elif not future.retriable():
+ raise exception # pylint: disable-msg=raising-bad-type
+ time.sleep(self._retry_backoff_ms / 1000.0)
+
+ def perform_group_join(self):
+ """Join the group and return the assignment for the next generation.
+
+ This function handles both JoinGroup and SyncGroup, delegating to
+ _perform_assignment() if elected leader by the coordinator.
+
+ @return Future() of the assignment returned from the group leader
+ """
+ if self.coordinator_unknown():
+ e = Errors.GroupCoordinatorNotAvailableError(self.coordinator_id)
+ return Future().failure(e)
+
+ # send a join group request to the coordinator
+ log.debug("(Re-)joining group %s", self.group_id)
+ request = JoinGroupRequest(
+ self.group_id,
+ self._session_timeout_ms,
+ self.member_id,
+ self.protocol_type(),
+ [(protocol,
+ metadata if isinstance(metadata, bytes) else metadata.encode())
+ for protocol, metadata in self.group_protocols()])
+
+ # create the request for the coordinator
+ log.debug("Issuing request (%s) to coordinator %s", request, self.coordinator_id)
+ future = Future()
+ _f = self._client.send(self.coordinator_id, request)
+ _f.add_callback(self._handle_join_group_response, future)
+ _f.add_errback(self._failed_request, future)
+ return future
+
+ def _failed_request(self, future, error):
+ self.coordinator_dead()
+ future.failure(error)
+
+ def _handle_join_group_response(self, future, response):
+ error_type = Errors.for_code(response.error_code)
+ if error_type is Errors.NoError:
+ log.debug("Joined group: %s", response)
+ self.member_id = response.member_id
+ self.generation = response.generation_id
+ self.rejoin_needed = False
+ self.protocol = response.group_protocol
+ #self.sensors.join_latency.record(response.requestLatencyMs())
+ if response.leader_id == response.member_id:
+ self.on_join_leader(response).chain(future)
+ else:
+ self.on_join_follower().chain(future)
+
+ elif error_type is Errors.GroupLoadInProgressError:
+ log.debug("Attempt to join group %s rejected since coordinator is"
+ " loading the group.", self.group_id)
+ # backoff and retry
+ future.failure(error_type(response))
+ elif error_type is Errors.UnknownMemberIdError:
+ # reset the member id and retry immediately
+ error = error_type(self.member_id)
+ self.member_id = JoinGroupRequest.UNKNOWN_MEMBER_ID
+ log.info("Attempt to join group %s failed due to unknown member id,"
+ " resetting and retrying.", self.group_id)
+ future.failure(error)
+ elif error_type in (Errors.GroupCoordinatorNotAvailableError,
+ Errors.NotCoordinatorForGroupError):
+ # re-discover the coordinator and retry with backoff
+ self.coordinator_dead()
+ log.info("Attempt to join group %s failed due to obsolete "
+ "coordinator information, retrying.", self.group_id)
+ future.failure(error_type())
+ elif error_type in (Errors.InconsistentGroupProtocolError,
+ Errors.InvalidSessionTimeoutError,
+ Errors.InvalidGroupIdError):
+ # log the error and re-throw the exception
+ error = error_type(response)
+ log.error("Attempt to join group %s failed due to: %s",
+ self.group_id, error)
+ future.failure(error)
+ elif error_type is Errors.GroupAuthorizationFailedError:
+ future.failure(error_type(self.group_id))
+ else:
+ # unexpected error, throw the exception
+ error = error_type()
+ log.error("Unexpected error in join group response: %s", error)
+ future.failure(error)
+
+ def on_join_follower(self):
+ # send follower's sync group with an empty assignment
+ request = SyncGroupRequest(
+ self.group_id,
+ self.generation,
+ self.member_id,
+ {})
+ log.debug("Issuing follower SyncGroup (%s) to coordinator %s",
+ request, self.coordinator_id)
+ return self.send_sync_group_request(request)
+
+ def on_join_leader(self, response):
+ """
+ Perform leader synchronization and send back the assignment
+ for the group via SyncGroupRequest
+ """
+ try:
+ group_assignment = self._perform_assignment(response.leader_id,
+ response.group_protocol,
+ response.members)
+ except Exception as e:
+ raise
+ return Future().failure(e)
+
+ request = SyncGroupRequest(
+ self.group_id,
+ self.generation,
+ self.member_id,
+ [(member_id,
+ assignment if isinstance(assignment, bytes) else assignment.encode())
+ for member_id, assignment in six.iteritems(group_assignment)])
+
+ log.debug("Issuing leader SyncGroup (%s) to coordinator %s",
+ request, self.coordinator_id)
+ return self.send_sync_group_request(request)
+
+ def send_sync_group_request(self, request):
+ if self.coordinator_unknown():
+ return Future().failure(Errors.GroupCoordinatorNotAvailableError())
+ future = Future()
+ _f = self._client.send(self.coordinator_id, request)
+ _f.add_callback(self._handle_sync_group_response, future)
+ _f.add_errback(self._failed_request, future)
+ return future
+
+ def _handle_sync_group_response(self, future, response):
+ error_type = Errors.for_code(response.error_code)
+ if error_type is Errors.NoError:
+ log.debug("Received successful sync group response for group %s: %s",
+ self.group_id, response)
+ #self.sensors.syncLatency.record(response.requestLatencyMs())
+ future.success(response.member_assignment)
+ return
+
+ # Always rejoin on error
+ self.rejoin_needed = True
+ if error_type is Errors.GroupAuthorizationFailedError:
+ future.failure(error_type(self.group_id))
+ elif error_type is Errors.RebalanceInProgressError:
+ log.info("SyncGroup for group %s failed due to coordinator"
+ " rebalance, rejoining the group", self.group_id)
+ future.failure(error_type(self.group_id))
+ elif error_type in (Errors.UnknownMemberIdError,
+ Errors.IllegalGenerationError):
+ error = error_type()
+ log.info("SyncGroup for group %s failed due to %s,"
+ " rejoining the group", self.group_id, error)
+ self.member_id = JoinGroupRequest.UNKNOWN_MEMBER_ID
+ future.failure(error)
+ elif error_type in (Errors.GroupCoordinatorNotAvailableError,
+ Errors.NotCoordinatorForGroupError):
+ error = error_type()
+ log.info("SyncGroup for group %s failed due to %s, will find new"
+ " coordinator and rejoin", self.group_id, error)
+ self.coordinator_dead()
+ future.failure(error)
+ else:
+ error = error_type()
+ log.error("Unexpected error from SyncGroup: %s", error)
+ future.failure(error)
+
+ def send_group_metadata_request(self):
+ """Discover the current coordinator for the group.
+
+ Sends a GroupMetadata request to one of the brokers. The returned future
+ should be polled to get the result of the request.
+
+ @return future indicating the completion of the metadata request
+ """
+ node_id = self._client.least_loaded_node()
+ if node_id is None or not self._client.ready(node_id):
+ return Future().failure(Errors.NoBrokersAvailable())
+
+ log.debug("Issuing group metadata request to broker %s", node_id)
+ request = GroupCoordinatorRequest(self.group_id)
+ future = Future()
+ _f = self._client.send(node_id, request)
+ _f.add_callback(self._handle_group_coordinator_response, future)
+ _f.add_errback(self._failed_request, future)
+ return future
+
+ def _handle_group_coordinator_response(self, future, response):
+ log.debug("Group metadata response %s", response)
+ if not self.coordinator_unknown():
+ # We already found the coordinator, so ignore the request
+ log.debug("Coordinator already known -- ignoring metadata response")
+ future.success(self.coordinator_id)
+ return
+
+ error_type = Errors.for_code(response.error_code)
+ if error_type is Errors.NoError:
+ ok = self._client.cluster.add_group_coordinator(self.group_id, response)
+ if not ok:
+ # This could happen if coordinator metadata is different
+ # than broker metadata
+ future.failure(Errors.IllegalStateError())
+ return
+
+ self.coordinator_id = response.coordinator_id
+ self._client.ready(self.coordinator_id)
+
+ # start sending heartbeats only if we have a valid generation
+ if self.generation > 0:
+ self.heartbeat_task.reset()
+ future.success(None)
+ elif error_type is Errors.GroupAuthorizationFailedError:
+ error = error_type(self.group_id)
+ log.error("Group Coordinator Request failed: %s", error)
+ future.failure(error)
+ else:
+ error = error_type()
+ log.error("Unrecognized failure in Group Coordinator Request: %s",
+ error)
+ future.failure(error)
+
+ def coordinator_dead(self, error=None):
+ """Mark the current coordinator as dead."""
+ if self.coordinator_id is not None:
+ log.info("Marking the coordinator dead (node %s): %s.",
+ self.coordinator_id, error)
+ self.coordinator_id = None
+
+ def close(self):
+ """Close the coordinator, leave the current group
+ and reset local generation/memberId."""
+ try:
+ self._client.unschedule(self.heartbeat_task)
+ except KeyError:
+ pass
+ if not self.coordinator_unknown() and self.generation > 0:
+ # this is a minimal effort attempt to leave the group. we do not
+ # attempt any resending if the request fails or times out.
+ request = LeaveGroupRequest(self.group_id, self.member_id)
+ future = self._client.send(self.coordinator_id, request)
+ future.add_callback(self._handle_leave_group_response)
+ future.add_errback(log.error, "LeaveGroup request failed: %s")
+ self._client.poll(future=future)
+
+ self.generation = OffsetCommitRequest.DEFAULT_GENERATION_ID
+ self.member_id = JoinGroupRequest.UNKNOWN_MEMBER_ID
+ self.rejoin_needed = True
+
+ def _handle_leave_group_response(self, response):
+ error_type = Errors.for_code(response.error_code)
+ if error_type is Errors.NoError:
+ log.info("LeaveGroup request succeeded")
+ else:
+ log.error("LeaveGroup request failed: %s", error_type())
+
+ def send_heartbeat_request(self):
+ """Send a heartbeat request now (visible only for testing)."""
+ request = HeartbeatRequest(self.group_id, self.generation, self.member_id)
+ future = Future()
+ _f = self._client.send(self.coordinator_id, request)
+ _f.add_callback(self._handle_heartbeat_response, future)
+ _f.add_errback(self._failed_request, future)
+ return future
+
+ def _handle_heartbeat_response(self, future, response):
+ #self.sensors.heartbeat_latency.record(response.requestLatencyMs())
+ error_type = Errors.for_code(response.error_code)
+ if error_type is Errors.NoError:
+ log.debug("Received successful heartbeat response.")
+ future.success(None)
+ elif error_type in (Errors.GroupCoordinatorNotAvailableError,
+ Errors.NotCoordinatorForGroupError):
+ log.info("Attempt to heart beat failed since coordinator is either"
+ " not started or not valid; marking it as dead.")
+ self.coordinator_dead()
+ future.failure(error_type())
+ elif error_type is Errors.RebalanceInProgressError:
+ log.info("Attempt to heart beat failed since the group is"
+ " rebalancing; try to re-join group.")
+ self.rejoin_needed = True
+ future.failure(error_type())
+ elif error_type is Errors.IllegalGenerationError:
+ log.info("Attempt to heart beat failed since generation id"
+ " is not legal; try to re-join group.")
+ self.rejoin_needed = True
+ future.failure(error_type())
+ elif error_type is Errors.UnknownMemberIdError:
+ log.info("Attempt to heart beat failed since member id"
+ " is not valid; reset it and try to re-join group.")
+ self.member_id = JoinGroupRequest.UNKNOWN_MEMBER_ID
+ self.rejoin_needed = True
+ future.failure(error_type)
+ elif error_type is Errors.GroupAuthorizationFailedError:
+ error = error_type(self.group_id)
+ log.error("Attempt to heart beat failed authorization: %s", error)
+ future.failure(error)
+ else:
+ error = error_type()
+ log.error("Unknown error in heart beat response: %s", error)
+ future.failure(error)
+
+
+class HeartbeatTask(object):
+ def __init__(self, coordinator):
+ self._coordinator = coordinator
+ self._heartbeat = coordinator.heartbeat
+ self._client = coordinator._client
+ self._request_in_flight = False
+
+ def reset(self):
+ # start or restart the heartbeat task to be executed at the next chance
+ self._heartbeat.reset_session_timeout()
+ try:
+ self._client.unschedule(self)
+ except KeyError:
+ pass
+ if not self._request_in_flight:
+ self._client.schedule(self, time.time())
+
+ def __call__(self):
+ log.debug("Running Heartbeat task")
+ if (self._coordinator.generation < 0 or
+ self._coordinator.need_rejoin() or
+ self._coordinator.coordinator_unknown()):
+ # no need to send the heartbeat we're not using auto-assignment
+ # or if we are awaiting a rebalance
+ log.debug("Skipping heartbeat: no auto-assignment"
+ " or waiting on rebalance")
+ return
+
+ if self._heartbeat.session_expired():
+ # we haven't received a successful heartbeat in one session interval
+ # so mark the coordinator dead
+ log.error("Heartbeat session expired")
+ self._coordinator.coordinator_dead()
+ return
+
+ if not self._heartbeat.should_heartbeat():
+ # we don't need to heartbeat now, so reschedule for when we do
+ ttl = self._heartbeat.ttl()
+ log.debug("Heartbeat unneeded now, retrying in %s", ttl)
+ self._client.schedule(self, time.time() + ttl)
+ else:
+ log.debug("Sending HeartbeatRequest")
+ self._heartbeat.sent_heartbeat()
+ self._request_in_flight = True
+ future = self._coordinator.send_heartbeat_request()
+ future.add_callback(self._handle_heartbeat_success)
+ future.add_errback(self._handle_heartbeat_failure)
+
+ def _handle_heartbeat_success(self, v):
+ log.debug("Received successful heartbeat")
+ self.request_in_flight = False
+ self._heartbeat.received_heartbeat()
+ ttl = self._heartbeat.ttl()
+ self._client.schedule(self, time.time() + ttl)
+
+ def _handle_heartbeat_failure(self, e):
+ log.debug("Heartbeat failed; retrying")
+ self._request_in_flight = False
+ etd = time.time() + self._coordinator._retry_backoff_ms / 1000.0
+ self._client.schedule(self, etd)
+
+
+class GroupCoordinatorMetrics(object):
+ def __init__(self, metrics, prefix, tags=None):
+ self.metrics = metrics
+ self.group_name = prefix + "-coordinator-metrics"
+
+ self.heartbeat_latency = metrics.sensor("heartbeat-latency")
+ self.heartbeat_latency.add(metrics.metricName(
+ "heartbeat-response-time-max", self.group_name,
+ "The max time taken to receive a response to a heartbeat request",
+ tags), metrics.Max())
+ self.heartbeat_latency.add(metrics.metricName(
+ "heartbeat-rate", self.group_name,
+ "The average number of heartbeats per second",
+ tags), metrics.Rate(metrics.Count()))
+
+ self.join_latency = metrics.sensor("join-latency")
+ self.join_latency.add(metrics.metricName(
+ "join-time-avg", self.group_name,
+ "The average time taken for a group rejoin",
+ tags), metrics.Avg())
+ self.join_latency.add(metrics.metricName(
+ "join-time-max", self.group_name,
+ "The max time taken for a group rejoin",
+ tags), metrics.Avg())
+ self.join_latency.add(metrics.metricName(
+ "join-rate", self.group_name,
+ "The number of group joins per second",
+ tags), metrics.Rate(metrics.Count()))
+
+ self.sync_latency = metrics.sensor("sync-latency")
+ self.sync_latency.add(metrics.metricName(
+ "sync-time-avg", self.group_name,
+ "The average time taken for a group sync",
+ tags), metrics.Avg())
+ self.sync_latency.add(metrics.MetricName(
+ "sync-time-max", self.group_name,
+ "The max time taken for a group sync",
+ tags), metrics.Avg())
+ self.sync_latency.add(metrics.metricName(
+ "sync-rate", self.group_name,
+ "The number of group syncs per second",
+ tags), metrics.Rate(metrics.Count()))
+
+ """
+ lastHeartbeat = Measurable(
+ measure=lambda _, value: value - heartbeat.last_heartbeat_send()
+ )
+ metrics.addMetric(metrics.metricName(
+ "last-heartbeat-seconds-ago", self.group_name,
+ "The number of seconds since the last controller heartbeat",
+ tags), lastHeartbeat)
+ """
+
+
diff --git a/kafka/coordinator/heartbeat.py b/kafka/coordinator/heartbeat.py
new file mode 100644
index 0000000..c153ddd
--- /dev/null
+++ b/kafka/coordinator/heartbeat.py
@@ -0,0 +1,43 @@
+import time
+
+import kafka.common as Errors
+
+
+class Heartbeat(object):
+ _heartbeat_interval_ms = 3000
+ _session_timeout_ms = 30000
+
+ def __init__(self, **kwargs):
+ for config in ('heartbeat_interval_ms', 'session_timeout_ms'):
+ if config in kwargs:
+ setattr(self, '_' + config, kwargs.pop(config))
+
+ if self._heartbeat_interval_ms > self._session_timeout_ms:
+ raise Errors.IllegalArgumentError("Heartbeat interval must be set"
+ " lower than the session timeout")
+
+ self.interval = self._heartbeat_interval_ms / 1000.0
+ self.timeout = self._session_timeout_ms / 1000.0
+ self.last_send = 0
+ self.last_receive = 0
+ self.last_reset = time.time()
+
+ def sent_heartbeat(self):
+ self.last_send = time.time()
+
+ def received_heartbeat(self):
+ self.last_receive = time.time()
+
+ def ttl(self):
+ last_beat = max(self.last_send, self.last_reset)
+ return max(0, last_beat + self.interval - time.time())
+
+ def should_heartbeat(self):
+ return self.ttl() == 0
+
+ def session_expired(self):
+ last_recv = max(self.last_receive, self.last_reset)
+ return (time.time() - last_recv) > self.timeout
+
+ def reset_session_timeout(self):
+ self.last_reset = time.time()