diff options
author | Dana Powers <dana.powers@rd.io> | 2015-12-28 15:16:55 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2015-12-28 15:18:43 -0800 |
commit | 161d9ffcf8e879bf65c44ea55851c72ef0b80aa6 (patch) | |
tree | 9c7c081745f950d86649d8491fc6ccf8606e8a32 | |
parent | baab076c7e70a721d958f588c4199acbaae41481 (diff) | |
download | kafka-python-161d9ffcf8e879bf65c44ea55851c72ef0b80aa6.tar.gz |
ConsumerCoordinator (based on upstream Java client)
- Use RoundRobinPartitionAssignor by default
- Define AbstractPartitionAssignor for custom assignors
- metrics still TODO
-rw-r--r-- | kafka/coordinator/assignors/__init__.py | 0 | ||||
-rw-r--r-- | kafka/coordinator/assignors/abstract.py | 35 | ||||
-rw-r--r-- | kafka/coordinator/assignors/roundrobin.py | 63 | ||||
-rw-r--r-- | kafka/coordinator/consumer.py | 605 |
4 files changed, 703 insertions, 0 deletions
diff --git a/kafka/coordinator/assignors/__init__.py b/kafka/coordinator/assignors/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/kafka/coordinator/assignors/__init__.py diff --git a/kafka/coordinator/assignors/abstract.py b/kafka/coordinator/assignors/abstract.py new file mode 100644 index 0000000..ed09a6e --- /dev/null +++ b/kafka/coordinator/assignors/abstract.py @@ -0,0 +1,35 @@ +import abc +import logging + +log = logging.getLogger(__name__) + + +class AbstractPartitionAssignor(object): + """ + Abstract assignor implementation which does some common grunt work (in particular collecting + partition counts which are always needed in assignors). + """ + + @abc.abstractproperty + def name(self): + """.name should be a string identifying the assignor""" + pass + + @abc.abstractmethod + def assign(self, cluster, members): + """Perform group assignment given cluster metadata and member subscriptions + + @param cluster: cluster metadata + @param members: {member_id: subscription} + @return {member_id: MemberAssignment} + """ + pass + + @abc.abstractmethod + def metadata(self, topics): + """return ProtocolMetadata to be submitted via JoinGroupRequest""" + pass + + @abc.abstractmethod + def on_assignment(self, assignment): + pass diff --git a/kafka/coordinator/assignors/roundrobin.py b/kafka/coordinator/assignors/roundrobin.py new file mode 100644 index 0000000..2927f3e --- /dev/null +++ b/kafka/coordinator/assignors/roundrobin.py @@ -0,0 +1,63 @@ +import collections +import itertools +import logging + +import six + +from .abstract import AbstractPartitionAssignor +from ...common import TopicPartition +from ..consumer import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment + +log = logging.getLogger(__name__) + + +class RoundRobinPartitionAssignor(AbstractPartitionAssignor): + name = 'roundrobin' + version = 0 + + @classmethod + def assign(cls, cluster, member_metadata): + all_topics = set() + for metadata in six.itervalues(member_metadata): + all_topics.update(metadata.subscription) + + all_topic_partitions = [] + for topic in all_topics: + partitions = cluster.partitions_for_topic(topic) + if partitions is None: + log.warning('No partition metadata for topic %s', topic) + continue + for partition in partitions: + all_topic_partitions.append(TopicPartition(topic, partition)) + all_topic_partitions.sort() + + # construct {member_id: {topic: [partition, ...]}} + assignment = collections.defaultdict(lambda: collections.defaultdict(list)) + + member_iter = itertools.cycle(sorted(member_metadata.keys())) + for partition in all_topic_partitions: + member_id = member_iter.next() + + # Because we constructed all_topic_partitions from the set of + # member subscribed topics, we should be safe assuming that + # each topic in all_topic_partitions is in at least one member + # subscription; otherwise this could yield an infinite loop + while partition.topic not in member_metadata[member_id].subscription: + member_id = member_iter.next() + assignment[member_id][partition.topic].append(partition.partition) + + protocol_assignment = {} + for member_id in member_metadata: + protocol_assignment[member_id] = ConsumerProtocolMemberAssignment( + cls.version, + assignment[member_id].items(), + b'') + return protocol_assignment + + @classmethod + def metadata(cls, topics): + return ConsumerProtocolMemberMetadata(cls.version, list(topics), b'') + + @classmethod + def on_assignment(cls, assignment): + pass diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py new file mode 100644 index 0000000..c17c593 --- /dev/null +++ b/kafka/coordinator/consumer.py @@ -0,0 +1,605 @@ +import collections +import logging +import time + +import six + +from .abstract import AbstractCoordinator +import kafka.common as Errors +from kafka.common import OffsetAndMetadata, TopicPartition +from kafka.future import Future +from kafka.protocol.commit import OffsetCommitRequest_v2, OffsetFetchRequest_v1 +from kafka.protocol.struct import Struct +from kafka.protocol.types import Array, Bytes, Int16, Int32, Schema, String + +log = logging.getLogger(__name__) + + +class ConsumerProtocolMemberMetadata(Struct): + SCHEMA = Schema( + ('version', Int16), + ('subscription', Array(String('utf-8'))), + ('user_data', Bytes)) + + +class ConsumerProtocolMemberAssignment(Struct): + SCHEMA = Schema( + ('version', Int16), + ('assignment', Array( + ('topic', String('utf-8')), + ('partitions', Array(Int32)))), + ('user_data', Bytes)) + + def partitions(self): + return [TopicPartition(topic, partition) + for topic, partitions in self.assignment # pylint: disable-msg=no-member + for partition in partitions] + + +class ConsumerProtocol(object): + PROTOCOL_TYPE = 'consumer' + ASSIGNMENT_STRATEGIES = ('roundrobin',) + METADATA = ConsumerProtocolMemberMetadata + ASSIGNMENT = ConsumerProtocolMemberAssignment + + +class ConsumerCoordinator(AbstractCoordinator): + """This class manages the coordination process with the consumer coordinator.""" + _enable_auto_commit = True + _auto_commit_interval_ms = 60 * 1000 + _default_offset_commit_callback = lambda offsets, error: True + _assignors = () + #_heartbeat_interval_ms = 3000 + #_session_timeout_ms = 30000 + #_retry_backoff_ms = 100 + + def __init__(self, client, group_id, subscription, **kwargs): + """Initialize the coordination manager.""" + super(ConsumerCoordinator, self).__init__(client, group_id, **kwargs) + for config in ('enable_auto_commit', 'auto_commit_interval_ms', + 'default_offset_commit_callback', 'assignors'): + if config in kwargs: + setattr(self, '_' + config, kwargs.pop(config)) + + self._cluster = client.cluster + self._subscription = subscription + self._partitions_per_topic = {} + self._auto_commit_task = None + if not self._assignors: + raise Errors.IllegalStateError('Coordinator requires assignors') + + self._cluster.request_update() + self._cluster.add_listener(self._handle_metadata_update) #TODO + + if self._enable_auto_commit: + interval = self._auto_commit_interval_ms / 1000.0 + self._auto_commit_task = AutoCommitTask(self, interval) + + # metrics=None, + # metric_group_prefix=None, + # metric_tags=None, + # self.sensors = ConsumerCoordinatorMetrics(metrics, metric_group_prefix, metric_tags) + + def protocol_type(self): + return ConsumerProtocol.PROTOCOL_TYPE + + def group_protocols(self): + """Returns list of preferred (protocols, metadata)""" + topics = self._subscription.subscription + metadata_list = [] + for assignor in self._assignors: + metadata = assignor.metadata(topics) + group_protocol = (assignor.name, metadata) + metadata_list.append(group_protocol) + return metadata_list + + def _handle_metadata_update(self, cluster): + # if we encounter any unauthorized topics, raise an exception + # TODO + #if self._cluster.unauthorized_topics: + # raise Errors.TopicAuthorizationError(self._cluster.unauthorized_topics) + + if self._subscription.subscribed_pattern: + topics = [] + for topic in cluster.topics(): + if self._subscription.subscribed_pattern.match(topic): + topics.append(topic) + + self._subscription.change_subscription(topics) + self._client.set_topics(self._subscription.group_subscription()) + + # check if there are any changes to the metadata which should trigger a rebalance + if self._subscription_metadata_changed(): + self._subscription.mark_for_reassignment() + + def _subscription_metadata_changed(self): + if not self._subscription.partitions_auto_assigned(): + return False + + old_partitions_per_topic = self._partitions_per_topic + self._partitions_per_topic = {} + for topic in self._subscription.group_subscription(): + self._partitions_per_topic[topic] = set(self._cluster.partitions_for_topic(topic)) + + if self._partitions_per_topic != old_partitions_per_topic: + return True + return False + + def _lookup_assignor(self, name): + for assignor in self._assignors: + if assignor.name == name: + return assignor + return None + + def _on_join_complete(self, generation, member_id, protocol, + member_assignment_bytes): + assignor = self._lookup_assignor(protocol) + if not assignor: + raise Errors.IllegalStateError("Coordinator selected invalid" + " assignment protocol: %s" + % protocol) + + assignment = ConsumerProtocol.ASSIGNMENT.decode(member_assignment_bytes) + + # set the flag to refresh last committed offsets + self._subscription.needs_fetch_committed_offsets = True + + # update partition assignment + self._subscription.assign_from_subscribed(assignment.partitions()) + + # give the assignor a chance to update internal state + # based on the received assignment + assignor.on_assignment(assignment) + + # restart the autocommit task if needed + if self._enable_auto_commit: + self._auto_commit_task.enable() + + assigned = set(self._subscription.assigned_partitions()) + log.debug("Set newly assigned partitions %s", assigned) + + # execute the user's callback after rebalance + if self._subscription.listener: + try: + self._subscriptions.listener.on_partitions_assigned(assigned) + except Exception: + log.exception("User provided listener failed on partition" + " assignment: %s", assigned) + + def _perform_assignment(self, leader_id, assignment_strategy, members): + assignor = self._lookup_assignor(assignment_strategy) + if not assignor: + raise Errors.IllegalStateError("Coordinator selected invalid" + " assignment protocol: %s" + % assignment_strategy) + member_metadata = {} + all_subscribed_topics = set() + for member_id, metadata_bytes in members: + metadata = ConsumerProtocol.METADATA.decode(metadata_bytes) + member_metadata[member_id] = metadata + all_subscribed_topics.update(metadata.subscription) # pylint: disable-msg=no-member + + # the leader will begin watching for changes to any of the topics + # the group is interested in, which ensures that all metadata changes + # will eventually be seen + self._subscription.group_subscribe(all_subscribed_topics) + future = self._client.set_topics(self._subscription.group_subscription()) + self._client.poll(future=future) + + log.debug("Performing %s assignment for subscriptions %s", + assignor.name, member_metadata) + + assignments = assignor.assign(self._cluster, member_metadata) + + log.debug("Finished assignment: %s", assignments) + + group_assignment = {} + for member_id, assignment in six.iteritems(assignments): + group_assignment[member_id] = assignment + return group_assignment + + def _on_join_prepare(self, generation, member_id): + # commit offsets prior to rebalance if auto-commit enabled + self._maybe_auto_commit_offsets_sync() + + # execute the user's callback before rebalance + log.debug("Revoking previously assigned partitions %s", + self._subscription.assigned_partitions()) + if self._subscription.listener: + try: + revoked = set(self._subscription.assigned_partitions()) + self._subscription.listener.on_partitions_revoked(revoked) + except Exception: + log.exception("User provided subscription listener failed" + " on_partitions_revoked") + + self._subscription.mark_for_reassignment() + + def need_rejoin(self): + """ + Check whether the group should be rejoined (e.g. if metadata changes) + @return True if it should, False otherwise + """ + return (self._subscription.partitions_auto_assigned() and + (super(ConsumerCoordinator, self).need_rejoin() or + self._subscription.needs_partition_assignment)) + + def refresh_committed_offsets_if_needed(self): + """Fetch committed offsets for assigned partitions.""" + if self._subscription.needs_fetch_committed_offsets: + offsets = self.fetch_committed_offsets(self._subscription.assigned_partitions()) + for partition, offset in six.iteritems(offsets): + # verify assignment is still active + if self._subscription.is_assigned(partition): + self._subscription.assignment[partition].committed = offset.offset + self._subscription.needs_fetch_committed_offsets = False + + def fetch_committed_offsets(self, partitions): + """ + Fetch the current committed offsets from the coordinator for a set of + partitions. + + @param partitions The partitions to fetch offsets for + @return dict of {TopicPartition: OffsetMetadata} + """ + while True: + self.ensure_coordinator_known() + + # contact coordinator to fetch committed offsets + future = self._send_offset_fetch_request(partitions) + self._client.poll(future=future) + + if future.succeeded(): + return future.value + + if not future.retriable(): + raise future.exception # pylint: disable-msg=raising-bad-type + + time.sleep(self._retry_backoff_ms / 1000.0) + + def ensure_partition_assignment(self): + """Ensure that we have a valid partition assignment from the coordinator.""" + if self._subscription.partitions_auto_assigned(): + self.ensure_active_group() + + def close(self): + try: + self._maybe_auto_commit_offsets_sync() + finally: + super(ConsumerCoordinator, self).close() + + def commit_offsets_async(self, offsets, callback=None): + """ + @param offsets: dict of {TopicPartition: OffsetAndMetadata} to commit + @param callback: called as callback(offsets, response), with response + as either an Exception or a OffsetCommitResponse + struct. This callback can be used to trigger custom + actions when a commit request completes. + @returns Future + """ + self._subscription.needs_fetch_committed_offsets = True + future = self._send_offset_commit_request(offsets) + cb = callback if callback else self._default_offset_commit_callback + future.add_both(cb, offsets) + + def commit_offsets_sync(self, offsets): + """ + Commit offsets synchronously. This method will retry until the commit + completes successfully or an unrecoverable error is encountered. + + @param offsets dict of {TopicPartition: OffsetAndMetadata} to commit + @raises TopicAuthorizationError if the consumer is not authorized to the + group or to any of the specified partitions + @raises CommitFailedError if an unrecoverable error occurs before the + commit can be completed + """ + if not offsets: + return + + while True: + self.ensure_coordinator_known() + + future = self._send_offset_commit_request(offsets) + self._client.poll(future=future) + + if future.succeeded(): + return + + if not future.retriable(): + raise future.exception # pylint: disable-msg=raising-bad-type + + time.sleep(self._retry_backoff_ms / 1000.0) + + def _maybe_auto_commit_offsets_sync(self): + if self._enable_auto_commit: + # disable periodic commits prior to committing synchronously. note that they will + # be re-enabled after a rebalance completes + self._auto_commit_task.disable() + + try: + self.commit_offsets_sync(self._subscription.all_consumed_offsets()) + except Exception: + # consistent with async auto-commit failures, we do not propagate the exception + log.exception("Auto offset commit failed") + + def _send_offset_commit_request(self, offsets): + """Commit offsets for the specified list of topics and partitions. + + This is a non-blocking call which returns a request future that can be + polled in the case of a synchronous commit or ignored in the + asynchronous case. + + @param offsets dict of {TopicPartition: OffsetAndMetadata} that should + be committed + @return Future indicating whether the commit was successful or not + """ + if self.coordinator_unknown(): + return Future().failure(Errors.GroupCoordinatorNotAvailableError) + + if not offsets: + return Future().failure(None) + + # create the offset commit request + offset_data = collections.defaultdict(dict) + for tp, offset in six.iteritems(offsets): + offset_data[tp.topic][tp.partition] = offset + + request = OffsetCommitRequest_v2( + self.group_id, + self.generation, + self.member_id, + OffsetCommitRequest_v2.DEFAULT_RETENTION_TIME, + [( + topic, [( + partition, + offset.offset, + offset.metadata + ) for partition, offset in six.iteritems(partitions)] + ) for topic, partitions in six.iteritems(offset_data)] + ) + + log.debug("Sending offset-commit request with %s to %s", + offsets, self.coordinator_id) + + future = Future() + _f = self._client.send(self.coordinator_id, request) + _f.add_callback(self._handle_offset_commit_response, offsets, future) + _f.add_errback(self._failed_request, future) + return future + + def _handle_offset_commit_response(self, offsets, future, response): + #self.sensors.commit_latency.record(response.requestLatencyMs()) + unauthorized_topics = set() + + for topic, partitions in response.topics: + for partition, error_code in partitions: + tp = TopicPartition(topic, partition) + offset = offsets[tp] + + error_type = Errors.for_code(error_code) + if error_type is Errors.NoError: + log.debug("Committed offset %s for partition %s", offset, tp) + if self._subscription.is_assigned(tp): + self._subscription.assignment[tp].committed = offset.offset + elif error_type is Errors.GroupAuthorizationFailedError: + log.error("Unauthorized to commit for group %s", self.group_id) + future.failure(error_type(self.group_id)) + return + elif error_type is Errors.TopicAuthorizationFailedError: + unauthorized_topics.add(topic) + elif error_type in (Errors.OffsetMetadataTooLargeError, + Errors.InvalidCommitOffsetSizeError): + # raise the error to the user + error = error_type() + log.info("Offset commit for group %s failed on partition" + " %s due to %s will retry", self.group_id, tp, error) + future.failure(error) + return + elif error_type is Errors.GroupLoadInProgressError: + # just retry + error = error_type(self.group_id) + log.info("Offset commit for group %s failed due to %s," + " will retry", self.group_id, error) + future.failure(error) + return + elif error_type in (Errors.GroupCoordinatorNotAvailableError, + Errors.NotCoordinatorForGroupError, + Errors.RequestTimedOutError): + error = error_type(self.group_id) + log.info("Offset commit for group %s failed due to %s," + " will find new coordinator and retry", + self.group_id, error) + self.coordinator_dead() + future.failure(error) + return + elif error_type in (Errors.UnknownMemberIdError, + Errors.IllegalGenerationError, + Errors.RebalanceInProgressError): + # need to re-join group + error = error_type(self.group_id) + log.error("Error %s occurred while committing offsets for" + " group %s", error, self.group_id) + self._subscription.mark_for_reassignment() + # Errors.CommitFailedError("Commit cannot be completed due to group rebalance")) + future.failure(error) + return + else: + error = error_type() + log.error("Unexpected error committing partition %s at" + " offset %s: %s", tp, offset, error) + future.failure(error) + return + + if unauthorized_topics: + log.error("Unauthorized to commit to topics %s", unauthorized_topics) + future.failure(Errors.TopicAuthorizationFailedError(unauthorized_topics)) + else: + future.success(True) + + def _send_offset_fetch_request(self, partitions): + """Fetch the committed offsets for a set of partitions. + + This is a non-blocking call. The returned future can be polled to get + the actual offsets returned from the broker. + + @param partitions list of TopicPartitions + @return Future of committed offsets dict: {TopicPartition: offset} + """ + if self.coordinator_unknown(): + return Future().failure(Errors.GroupCoordinatorNotAvailableError) + + log.debug("Fetching committed offsets for partitions: %s", partitions) + # construct the request + topic_partitions = collections.defaultdict(set) + for tp in partitions: + topic_partitions[tp.topic].add(tp.partition) + request = OffsetFetchRequest_v1( + self.group_id, + list(topic_partitions.items()) + ) + + # send the request with a callback + future = Future() + _f = self._client.send(self.coordinator_id, request) + _f.add_callback(self._handle_offset_fetch_response, future) + _f.add_errback(self._failed_request, future) + return future + + def _handle_offset_fetch_response(self, future, response): + offsets = {} + for topic, partitions in response.topics: + for partition, offset, metadata, error_code in partitions: + tp = TopicPartition(topic, partition) + error_type = Errors.for_code(error_code) + if error_type is not Errors.NoError: + error = error_type() + log.debug("Error fetching offset for %s: %s", tp, error_type()) + if error_type is Errors.GroupLoadInProgressError: + # just retry + future.failure(error) + elif error_type is Errors.NotCoordinatorForGroupError: + # re-discover the coordinator and retry + self.coordinator_dead() + future.failure(error) + elif error_type in (Errors.UnknownMemberIdError, + Errors.IllegalGenerationError): + # need to re-join group + self._subscription.mark_for_reassignment() + future.failure(error) + else: + log.error("Unknown error fetching offsets for %s: %s", + tp, error) + future.failure(error) + return + elif offset >= 0: + # record the position with the offset (-1 indicates no committed offset to fetch) + offsets[tp] = OffsetAndMetadata(offset, metadata) + else: + log.debug("No committed offset for partition %s", tp) + future.success(offsets) + + +class AutoCommitTask(object): + def __init__(self, coordinator, interval): + self._coordinator = coordinator + self._client = coordinator._client + self._interval = interval + self._enabled = False + self._request_in_flight = False + + def enable(self): + if self._enabled: + log.warning("AutoCommitTask is already enabled") + return + + self._enabled = True + if not self._request_in_flight: + self._client.schedule(self, time.time() + self._interval) + + def disable(self): + self._enabled = False + try: + self._client.unschedule(self) + except KeyError: + log.warning("AutoCommitTask was not previously scheduled") + + def _reschedule(self, at): + if self._enabled: + self._client.schedule(self, at) + else: + raise Errors.IllegalStateError('AutoCommitTask not enabled') + + def __call__(self): + if not self._enabled: + return + + if self._coordinator.coordinator_unknown(): + log.debug("Cannot auto-commit offsets because the coordinator is" + " unknown, will retry after backoff") + next_at = time.time() + self._coordinator._retry_backoff_ms / 1000.0 + self._client.schedule(self, next_at) + return + + self._request_in_flight = True + self._coordinator.commit_offsets_async( + self._coordinator._subscription.all_consumed_offsets(), + self._handle_commit_response) + + def _handle_commit_response(self, offsets, result): + self._request_in_flight = False + if result is True: + log.debug("Successfully auto-committed offsets") + next_at = time.time() + self._interval + elif not isinstance(result, BaseException): + raise Errors.IllegalStateError( + 'Unrecognized result in _handle_commit_response: %s' + % result) + elif hasattr(result, 'retriable') and result.retriable: + log.debug("Failed to auto-commit offsets: %s, will retry" + " immediately", result) + next_at = time.time() + else: + log.warning("Auto offset commit failed: %s", result) + next_at = time.time() + self._interval + + if not self._enabled: + log.warning("Skipping auto-commit reschedule -- it is disabled") + return + self._reschedule(next_at) + + +# TODO +""" +class ConsumerCoordinatorMetrics(object): + def __init__(self, metrics, prefix, tags): + self.metrics = metrics + self.group_name = prefix + "-coordinator-metrics" + + self.commit_latency = metrics.sensor("commit-latency") + self.commit_latency.add(metrics.MetricName( + "commit-latency-avg", self.group_name, + "The average time taken for a commit request", + tags), metrics.Avg()) + self.commit_latency.add(metrics.MetricName( + "commit-latency-max", self.group_name, + "The max time taken for a commit request", + tags), metrics.Max()) + self.commit_latency.add(metrics.MetricName( + "commit-rate", self.group_name, + "The number of commit calls per second", + tags), metrics.Rate(metrics.Count())) + + ''' + def _num_partitions(config, now): + new Measurable() { + public double measure(MetricConfig config, long now) { + return subscriptions.assignedPartitions().size(); + } + }; + metrics.addMetric(new MetricName("assigned-partitions", + this.metricGrpName, + "The number of partitions currently assigned to this consumer", + tags), + numParts); + ''' +""" |