summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2015-12-28 15:16:55 -0800
committerDana Powers <dana.powers@rd.io>2015-12-28 15:18:43 -0800
commit161d9ffcf8e879bf65c44ea55851c72ef0b80aa6 (patch)
tree9c7c081745f950d86649d8491fc6ccf8606e8a32
parentbaab076c7e70a721d958f588c4199acbaae41481 (diff)
downloadkafka-python-161d9ffcf8e879bf65c44ea55851c72ef0b80aa6.tar.gz
ConsumerCoordinator (based on upstream Java client)
- Use RoundRobinPartitionAssignor by default - Define AbstractPartitionAssignor for custom assignors - metrics still TODO
-rw-r--r--kafka/coordinator/assignors/__init__.py0
-rw-r--r--kafka/coordinator/assignors/abstract.py35
-rw-r--r--kafka/coordinator/assignors/roundrobin.py63
-rw-r--r--kafka/coordinator/consumer.py605
4 files changed, 703 insertions, 0 deletions
diff --git a/kafka/coordinator/assignors/__init__.py b/kafka/coordinator/assignors/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/kafka/coordinator/assignors/__init__.py
diff --git a/kafka/coordinator/assignors/abstract.py b/kafka/coordinator/assignors/abstract.py
new file mode 100644
index 0000000..ed09a6e
--- /dev/null
+++ b/kafka/coordinator/assignors/abstract.py
@@ -0,0 +1,35 @@
+import abc
+import logging
+
+log = logging.getLogger(__name__)
+
+
+class AbstractPartitionAssignor(object):
+ """
+ Abstract assignor implementation which does some common grunt work (in particular collecting
+ partition counts which are always needed in assignors).
+ """
+
+ @abc.abstractproperty
+ def name(self):
+ """.name should be a string identifying the assignor"""
+ pass
+
+ @abc.abstractmethod
+ def assign(self, cluster, members):
+ """Perform group assignment given cluster metadata and member subscriptions
+
+ @param cluster: cluster metadata
+ @param members: {member_id: subscription}
+ @return {member_id: MemberAssignment}
+ """
+ pass
+
+ @abc.abstractmethod
+ def metadata(self, topics):
+ """return ProtocolMetadata to be submitted via JoinGroupRequest"""
+ pass
+
+ @abc.abstractmethod
+ def on_assignment(self, assignment):
+ pass
diff --git a/kafka/coordinator/assignors/roundrobin.py b/kafka/coordinator/assignors/roundrobin.py
new file mode 100644
index 0000000..2927f3e
--- /dev/null
+++ b/kafka/coordinator/assignors/roundrobin.py
@@ -0,0 +1,63 @@
+import collections
+import itertools
+import logging
+
+import six
+
+from .abstract import AbstractPartitionAssignor
+from ...common import TopicPartition
+from ..consumer import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment
+
+log = logging.getLogger(__name__)
+
+
+class RoundRobinPartitionAssignor(AbstractPartitionAssignor):
+ name = 'roundrobin'
+ version = 0
+
+ @classmethod
+ def assign(cls, cluster, member_metadata):
+ all_topics = set()
+ for metadata in six.itervalues(member_metadata):
+ all_topics.update(metadata.subscription)
+
+ all_topic_partitions = []
+ for topic in all_topics:
+ partitions = cluster.partitions_for_topic(topic)
+ if partitions is None:
+ log.warning('No partition metadata for topic %s', topic)
+ continue
+ for partition in partitions:
+ all_topic_partitions.append(TopicPartition(topic, partition))
+ all_topic_partitions.sort()
+
+ # construct {member_id: {topic: [partition, ...]}}
+ assignment = collections.defaultdict(lambda: collections.defaultdict(list))
+
+ member_iter = itertools.cycle(sorted(member_metadata.keys()))
+ for partition in all_topic_partitions:
+ member_id = member_iter.next()
+
+ # Because we constructed all_topic_partitions from the set of
+ # member subscribed topics, we should be safe assuming that
+ # each topic in all_topic_partitions is in at least one member
+ # subscription; otherwise this could yield an infinite loop
+ while partition.topic not in member_metadata[member_id].subscription:
+ member_id = member_iter.next()
+ assignment[member_id][partition.topic].append(partition.partition)
+
+ protocol_assignment = {}
+ for member_id in member_metadata:
+ protocol_assignment[member_id] = ConsumerProtocolMemberAssignment(
+ cls.version,
+ assignment[member_id].items(),
+ b'')
+ return protocol_assignment
+
+ @classmethod
+ def metadata(cls, topics):
+ return ConsumerProtocolMemberMetadata(cls.version, list(topics), b'')
+
+ @classmethod
+ def on_assignment(cls, assignment):
+ pass
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py
new file mode 100644
index 0000000..c17c593
--- /dev/null
+++ b/kafka/coordinator/consumer.py
@@ -0,0 +1,605 @@
+import collections
+import logging
+import time
+
+import six
+
+from .abstract import AbstractCoordinator
+import kafka.common as Errors
+from kafka.common import OffsetAndMetadata, TopicPartition
+from kafka.future import Future
+from kafka.protocol.commit import OffsetCommitRequest_v2, OffsetFetchRequest_v1
+from kafka.protocol.struct import Struct
+from kafka.protocol.types import Array, Bytes, Int16, Int32, Schema, String
+
+log = logging.getLogger(__name__)
+
+
+class ConsumerProtocolMemberMetadata(Struct):
+ SCHEMA = Schema(
+ ('version', Int16),
+ ('subscription', Array(String('utf-8'))),
+ ('user_data', Bytes))
+
+
+class ConsumerProtocolMemberAssignment(Struct):
+ SCHEMA = Schema(
+ ('version', Int16),
+ ('assignment', Array(
+ ('topic', String('utf-8')),
+ ('partitions', Array(Int32)))),
+ ('user_data', Bytes))
+
+ def partitions(self):
+ return [TopicPartition(topic, partition)
+ for topic, partitions in self.assignment # pylint: disable-msg=no-member
+ for partition in partitions]
+
+
+class ConsumerProtocol(object):
+ PROTOCOL_TYPE = 'consumer'
+ ASSIGNMENT_STRATEGIES = ('roundrobin',)
+ METADATA = ConsumerProtocolMemberMetadata
+ ASSIGNMENT = ConsumerProtocolMemberAssignment
+
+
+class ConsumerCoordinator(AbstractCoordinator):
+ """This class manages the coordination process with the consumer coordinator."""
+ _enable_auto_commit = True
+ _auto_commit_interval_ms = 60 * 1000
+ _default_offset_commit_callback = lambda offsets, error: True
+ _assignors = ()
+ #_heartbeat_interval_ms = 3000
+ #_session_timeout_ms = 30000
+ #_retry_backoff_ms = 100
+
+ def __init__(self, client, group_id, subscription, **kwargs):
+ """Initialize the coordination manager."""
+ super(ConsumerCoordinator, self).__init__(client, group_id, **kwargs)
+ for config in ('enable_auto_commit', 'auto_commit_interval_ms',
+ 'default_offset_commit_callback', 'assignors'):
+ if config in kwargs:
+ setattr(self, '_' + config, kwargs.pop(config))
+
+ self._cluster = client.cluster
+ self._subscription = subscription
+ self._partitions_per_topic = {}
+ self._auto_commit_task = None
+ if not self._assignors:
+ raise Errors.IllegalStateError('Coordinator requires assignors')
+
+ self._cluster.request_update()
+ self._cluster.add_listener(self._handle_metadata_update) #TODO
+
+ if self._enable_auto_commit:
+ interval = self._auto_commit_interval_ms / 1000.0
+ self._auto_commit_task = AutoCommitTask(self, interval)
+
+ # metrics=None,
+ # metric_group_prefix=None,
+ # metric_tags=None,
+ # self.sensors = ConsumerCoordinatorMetrics(metrics, metric_group_prefix, metric_tags)
+
+ def protocol_type(self):
+ return ConsumerProtocol.PROTOCOL_TYPE
+
+ def group_protocols(self):
+ """Returns list of preferred (protocols, metadata)"""
+ topics = self._subscription.subscription
+ metadata_list = []
+ for assignor in self._assignors:
+ metadata = assignor.metadata(topics)
+ group_protocol = (assignor.name, metadata)
+ metadata_list.append(group_protocol)
+ return metadata_list
+
+ def _handle_metadata_update(self, cluster):
+ # if we encounter any unauthorized topics, raise an exception
+ # TODO
+ #if self._cluster.unauthorized_topics:
+ # raise Errors.TopicAuthorizationError(self._cluster.unauthorized_topics)
+
+ if self._subscription.subscribed_pattern:
+ topics = []
+ for topic in cluster.topics():
+ if self._subscription.subscribed_pattern.match(topic):
+ topics.append(topic)
+
+ self._subscription.change_subscription(topics)
+ self._client.set_topics(self._subscription.group_subscription())
+
+ # check if there are any changes to the metadata which should trigger a rebalance
+ if self._subscription_metadata_changed():
+ self._subscription.mark_for_reassignment()
+
+ def _subscription_metadata_changed(self):
+ if not self._subscription.partitions_auto_assigned():
+ return False
+
+ old_partitions_per_topic = self._partitions_per_topic
+ self._partitions_per_topic = {}
+ for topic in self._subscription.group_subscription():
+ self._partitions_per_topic[topic] = set(self._cluster.partitions_for_topic(topic))
+
+ if self._partitions_per_topic != old_partitions_per_topic:
+ return True
+ return False
+
+ def _lookup_assignor(self, name):
+ for assignor in self._assignors:
+ if assignor.name == name:
+ return assignor
+ return None
+
+ def _on_join_complete(self, generation, member_id, protocol,
+ member_assignment_bytes):
+ assignor = self._lookup_assignor(protocol)
+ if not assignor:
+ raise Errors.IllegalStateError("Coordinator selected invalid"
+ " assignment protocol: %s"
+ % protocol)
+
+ assignment = ConsumerProtocol.ASSIGNMENT.decode(member_assignment_bytes)
+
+ # set the flag to refresh last committed offsets
+ self._subscription.needs_fetch_committed_offsets = True
+
+ # update partition assignment
+ self._subscription.assign_from_subscribed(assignment.partitions())
+
+ # give the assignor a chance to update internal state
+ # based on the received assignment
+ assignor.on_assignment(assignment)
+
+ # restart the autocommit task if needed
+ if self._enable_auto_commit:
+ self._auto_commit_task.enable()
+
+ assigned = set(self._subscription.assigned_partitions())
+ log.debug("Set newly assigned partitions %s", assigned)
+
+ # execute the user's callback after rebalance
+ if self._subscription.listener:
+ try:
+ self._subscriptions.listener.on_partitions_assigned(assigned)
+ except Exception:
+ log.exception("User provided listener failed on partition"
+ " assignment: %s", assigned)
+
+ def _perform_assignment(self, leader_id, assignment_strategy, members):
+ assignor = self._lookup_assignor(assignment_strategy)
+ if not assignor:
+ raise Errors.IllegalStateError("Coordinator selected invalid"
+ " assignment protocol: %s"
+ % assignment_strategy)
+ member_metadata = {}
+ all_subscribed_topics = set()
+ for member_id, metadata_bytes in members:
+ metadata = ConsumerProtocol.METADATA.decode(metadata_bytes)
+ member_metadata[member_id] = metadata
+ all_subscribed_topics.update(metadata.subscription) # pylint: disable-msg=no-member
+
+ # the leader will begin watching for changes to any of the topics
+ # the group is interested in, which ensures that all metadata changes
+ # will eventually be seen
+ self._subscription.group_subscribe(all_subscribed_topics)
+ future = self._client.set_topics(self._subscription.group_subscription())
+ self._client.poll(future=future)
+
+ log.debug("Performing %s assignment for subscriptions %s",
+ assignor.name, member_metadata)
+
+ assignments = assignor.assign(self._cluster, member_metadata)
+
+ log.debug("Finished assignment: %s", assignments)
+
+ group_assignment = {}
+ for member_id, assignment in six.iteritems(assignments):
+ group_assignment[member_id] = assignment
+ return group_assignment
+
+ def _on_join_prepare(self, generation, member_id):
+ # commit offsets prior to rebalance if auto-commit enabled
+ self._maybe_auto_commit_offsets_sync()
+
+ # execute the user's callback before rebalance
+ log.debug("Revoking previously assigned partitions %s",
+ self._subscription.assigned_partitions())
+ if self._subscription.listener:
+ try:
+ revoked = set(self._subscription.assigned_partitions())
+ self._subscription.listener.on_partitions_revoked(revoked)
+ except Exception:
+ log.exception("User provided subscription listener failed"
+ " on_partitions_revoked")
+
+ self._subscription.mark_for_reassignment()
+
+ def need_rejoin(self):
+ """
+ Check whether the group should be rejoined (e.g. if metadata changes)
+ @return True if it should, False otherwise
+ """
+ return (self._subscription.partitions_auto_assigned() and
+ (super(ConsumerCoordinator, self).need_rejoin() or
+ self._subscription.needs_partition_assignment))
+
+ def refresh_committed_offsets_if_needed(self):
+ """Fetch committed offsets for assigned partitions."""
+ if self._subscription.needs_fetch_committed_offsets:
+ offsets = self.fetch_committed_offsets(self._subscription.assigned_partitions())
+ for partition, offset in six.iteritems(offsets):
+ # verify assignment is still active
+ if self._subscription.is_assigned(partition):
+ self._subscription.assignment[partition].committed = offset.offset
+ self._subscription.needs_fetch_committed_offsets = False
+
+ def fetch_committed_offsets(self, partitions):
+ """
+ Fetch the current committed offsets from the coordinator for a set of
+ partitions.
+
+ @param partitions The partitions to fetch offsets for
+ @return dict of {TopicPartition: OffsetMetadata}
+ """
+ while True:
+ self.ensure_coordinator_known()
+
+ # contact coordinator to fetch committed offsets
+ future = self._send_offset_fetch_request(partitions)
+ self._client.poll(future=future)
+
+ if future.succeeded():
+ return future.value
+
+ if not future.retriable():
+ raise future.exception # pylint: disable-msg=raising-bad-type
+
+ time.sleep(self._retry_backoff_ms / 1000.0)
+
+ def ensure_partition_assignment(self):
+ """Ensure that we have a valid partition assignment from the coordinator."""
+ if self._subscription.partitions_auto_assigned():
+ self.ensure_active_group()
+
+ def close(self):
+ try:
+ self._maybe_auto_commit_offsets_sync()
+ finally:
+ super(ConsumerCoordinator, self).close()
+
+ def commit_offsets_async(self, offsets, callback=None):
+ """
+ @param offsets: dict of {TopicPartition: OffsetAndMetadata} to commit
+ @param callback: called as callback(offsets, response), with response
+ as either an Exception or a OffsetCommitResponse
+ struct. This callback can be used to trigger custom
+ actions when a commit request completes.
+ @returns Future
+ """
+ self._subscription.needs_fetch_committed_offsets = True
+ future = self._send_offset_commit_request(offsets)
+ cb = callback if callback else self._default_offset_commit_callback
+ future.add_both(cb, offsets)
+
+ def commit_offsets_sync(self, offsets):
+ """
+ Commit offsets synchronously. This method will retry until the commit
+ completes successfully or an unrecoverable error is encountered.
+
+ @param offsets dict of {TopicPartition: OffsetAndMetadata} to commit
+ @raises TopicAuthorizationError if the consumer is not authorized to the
+ group or to any of the specified partitions
+ @raises CommitFailedError if an unrecoverable error occurs before the
+ commit can be completed
+ """
+ if not offsets:
+ return
+
+ while True:
+ self.ensure_coordinator_known()
+
+ future = self._send_offset_commit_request(offsets)
+ self._client.poll(future=future)
+
+ if future.succeeded():
+ return
+
+ if not future.retriable():
+ raise future.exception # pylint: disable-msg=raising-bad-type
+
+ time.sleep(self._retry_backoff_ms / 1000.0)
+
+ def _maybe_auto_commit_offsets_sync(self):
+ if self._enable_auto_commit:
+ # disable periodic commits prior to committing synchronously. note that they will
+ # be re-enabled after a rebalance completes
+ self._auto_commit_task.disable()
+
+ try:
+ self.commit_offsets_sync(self._subscription.all_consumed_offsets())
+ except Exception:
+ # consistent with async auto-commit failures, we do not propagate the exception
+ log.exception("Auto offset commit failed")
+
+ def _send_offset_commit_request(self, offsets):
+ """Commit offsets for the specified list of topics and partitions.
+
+ This is a non-blocking call which returns a request future that can be
+ polled in the case of a synchronous commit or ignored in the
+ asynchronous case.
+
+ @param offsets dict of {TopicPartition: OffsetAndMetadata} that should
+ be committed
+ @return Future indicating whether the commit was successful or not
+ """
+ if self.coordinator_unknown():
+ return Future().failure(Errors.GroupCoordinatorNotAvailableError)
+
+ if not offsets:
+ return Future().failure(None)
+
+ # create the offset commit request
+ offset_data = collections.defaultdict(dict)
+ for tp, offset in six.iteritems(offsets):
+ offset_data[tp.topic][tp.partition] = offset
+
+ request = OffsetCommitRequest_v2(
+ self.group_id,
+ self.generation,
+ self.member_id,
+ OffsetCommitRequest_v2.DEFAULT_RETENTION_TIME,
+ [(
+ topic, [(
+ partition,
+ offset.offset,
+ offset.metadata
+ ) for partition, offset in six.iteritems(partitions)]
+ ) for topic, partitions in six.iteritems(offset_data)]
+ )
+
+ log.debug("Sending offset-commit request with %s to %s",
+ offsets, self.coordinator_id)
+
+ future = Future()
+ _f = self._client.send(self.coordinator_id, request)
+ _f.add_callback(self._handle_offset_commit_response, offsets, future)
+ _f.add_errback(self._failed_request, future)
+ return future
+
+ def _handle_offset_commit_response(self, offsets, future, response):
+ #self.sensors.commit_latency.record(response.requestLatencyMs())
+ unauthorized_topics = set()
+
+ for topic, partitions in response.topics:
+ for partition, error_code in partitions:
+ tp = TopicPartition(topic, partition)
+ offset = offsets[tp]
+
+ error_type = Errors.for_code(error_code)
+ if error_type is Errors.NoError:
+ log.debug("Committed offset %s for partition %s", offset, tp)
+ if self._subscription.is_assigned(tp):
+ self._subscription.assignment[tp].committed = offset.offset
+ elif error_type is Errors.GroupAuthorizationFailedError:
+ log.error("Unauthorized to commit for group %s", self.group_id)
+ future.failure(error_type(self.group_id))
+ return
+ elif error_type is Errors.TopicAuthorizationFailedError:
+ unauthorized_topics.add(topic)
+ elif error_type in (Errors.OffsetMetadataTooLargeError,
+ Errors.InvalidCommitOffsetSizeError):
+ # raise the error to the user
+ error = error_type()
+ log.info("Offset commit for group %s failed on partition"
+ " %s due to %s will retry", self.group_id, tp, error)
+ future.failure(error)
+ return
+ elif error_type is Errors.GroupLoadInProgressError:
+ # just retry
+ error = error_type(self.group_id)
+ log.info("Offset commit for group %s failed due to %s,"
+ " will retry", self.group_id, error)
+ future.failure(error)
+ return
+ elif error_type in (Errors.GroupCoordinatorNotAvailableError,
+ Errors.NotCoordinatorForGroupError,
+ Errors.RequestTimedOutError):
+ error = error_type(self.group_id)
+ log.info("Offset commit for group %s failed due to %s,"
+ " will find new coordinator and retry",
+ self.group_id, error)
+ self.coordinator_dead()
+ future.failure(error)
+ return
+ elif error_type in (Errors.UnknownMemberIdError,
+ Errors.IllegalGenerationError,
+ Errors.RebalanceInProgressError):
+ # need to re-join group
+ error = error_type(self.group_id)
+ log.error("Error %s occurred while committing offsets for"
+ " group %s", error, self.group_id)
+ self._subscription.mark_for_reassignment()
+ # Errors.CommitFailedError("Commit cannot be completed due to group rebalance"))
+ future.failure(error)
+ return
+ else:
+ error = error_type()
+ log.error("Unexpected error committing partition %s at"
+ " offset %s: %s", tp, offset, error)
+ future.failure(error)
+ return
+
+ if unauthorized_topics:
+ log.error("Unauthorized to commit to topics %s", unauthorized_topics)
+ future.failure(Errors.TopicAuthorizationFailedError(unauthorized_topics))
+ else:
+ future.success(True)
+
+ def _send_offset_fetch_request(self, partitions):
+ """Fetch the committed offsets for a set of partitions.
+
+ This is a non-blocking call. The returned future can be polled to get
+ the actual offsets returned from the broker.
+
+ @param partitions list of TopicPartitions
+ @return Future of committed offsets dict: {TopicPartition: offset}
+ """
+ if self.coordinator_unknown():
+ return Future().failure(Errors.GroupCoordinatorNotAvailableError)
+
+ log.debug("Fetching committed offsets for partitions: %s", partitions)
+ # construct the request
+ topic_partitions = collections.defaultdict(set)
+ for tp in partitions:
+ topic_partitions[tp.topic].add(tp.partition)
+ request = OffsetFetchRequest_v1(
+ self.group_id,
+ list(topic_partitions.items())
+ )
+
+ # send the request with a callback
+ future = Future()
+ _f = self._client.send(self.coordinator_id, request)
+ _f.add_callback(self._handle_offset_fetch_response, future)
+ _f.add_errback(self._failed_request, future)
+ return future
+
+ def _handle_offset_fetch_response(self, future, response):
+ offsets = {}
+ for topic, partitions in response.topics:
+ for partition, offset, metadata, error_code in partitions:
+ tp = TopicPartition(topic, partition)
+ error_type = Errors.for_code(error_code)
+ if error_type is not Errors.NoError:
+ error = error_type()
+ log.debug("Error fetching offset for %s: %s", tp, error_type())
+ if error_type is Errors.GroupLoadInProgressError:
+ # just retry
+ future.failure(error)
+ elif error_type is Errors.NotCoordinatorForGroupError:
+ # re-discover the coordinator and retry
+ self.coordinator_dead()
+ future.failure(error)
+ elif error_type in (Errors.UnknownMemberIdError,
+ Errors.IllegalGenerationError):
+ # need to re-join group
+ self._subscription.mark_for_reassignment()
+ future.failure(error)
+ else:
+ log.error("Unknown error fetching offsets for %s: %s",
+ tp, error)
+ future.failure(error)
+ return
+ elif offset >= 0:
+ # record the position with the offset (-1 indicates no committed offset to fetch)
+ offsets[tp] = OffsetAndMetadata(offset, metadata)
+ else:
+ log.debug("No committed offset for partition %s", tp)
+ future.success(offsets)
+
+
+class AutoCommitTask(object):
+ def __init__(self, coordinator, interval):
+ self._coordinator = coordinator
+ self._client = coordinator._client
+ self._interval = interval
+ self._enabled = False
+ self._request_in_flight = False
+
+ def enable(self):
+ if self._enabled:
+ log.warning("AutoCommitTask is already enabled")
+ return
+
+ self._enabled = True
+ if not self._request_in_flight:
+ self._client.schedule(self, time.time() + self._interval)
+
+ def disable(self):
+ self._enabled = False
+ try:
+ self._client.unschedule(self)
+ except KeyError:
+ log.warning("AutoCommitTask was not previously scheduled")
+
+ def _reschedule(self, at):
+ if self._enabled:
+ self._client.schedule(self, at)
+ else:
+ raise Errors.IllegalStateError('AutoCommitTask not enabled')
+
+ def __call__(self):
+ if not self._enabled:
+ return
+
+ if self._coordinator.coordinator_unknown():
+ log.debug("Cannot auto-commit offsets because the coordinator is"
+ " unknown, will retry after backoff")
+ next_at = time.time() + self._coordinator._retry_backoff_ms / 1000.0
+ self._client.schedule(self, next_at)
+ return
+
+ self._request_in_flight = True
+ self._coordinator.commit_offsets_async(
+ self._coordinator._subscription.all_consumed_offsets(),
+ self._handle_commit_response)
+
+ def _handle_commit_response(self, offsets, result):
+ self._request_in_flight = False
+ if result is True:
+ log.debug("Successfully auto-committed offsets")
+ next_at = time.time() + self._interval
+ elif not isinstance(result, BaseException):
+ raise Errors.IllegalStateError(
+ 'Unrecognized result in _handle_commit_response: %s'
+ % result)
+ elif hasattr(result, 'retriable') and result.retriable:
+ log.debug("Failed to auto-commit offsets: %s, will retry"
+ " immediately", result)
+ next_at = time.time()
+ else:
+ log.warning("Auto offset commit failed: %s", result)
+ next_at = time.time() + self._interval
+
+ if not self._enabled:
+ log.warning("Skipping auto-commit reschedule -- it is disabled")
+ return
+ self._reschedule(next_at)
+
+
+# TODO
+"""
+class ConsumerCoordinatorMetrics(object):
+ def __init__(self, metrics, prefix, tags):
+ self.metrics = metrics
+ self.group_name = prefix + "-coordinator-metrics"
+
+ self.commit_latency = metrics.sensor("commit-latency")
+ self.commit_latency.add(metrics.MetricName(
+ "commit-latency-avg", self.group_name,
+ "The average time taken for a commit request",
+ tags), metrics.Avg())
+ self.commit_latency.add(metrics.MetricName(
+ "commit-latency-max", self.group_name,
+ "The max time taken for a commit request",
+ tags), metrics.Max())
+ self.commit_latency.add(metrics.MetricName(
+ "commit-rate", self.group_name,
+ "The number of commit calls per second",
+ tags), metrics.Rate(metrics.Count()))
+
+ '''
+ def _num_partitions(config, now):
+ new Measurable() {
+ public double measure(MetricConfig config, long now) {
+ return subscriptions.assignedPartitions().size();
+ }
+ };
+ metrics.addMetric(new MetricName("assigned-partitions",
+ this.metricGrpName,
+ "The number of partitions currently assigned to this consumer",
+ tags),
+ numParts);
+ '''
+"""