diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-04-05 08:59:02 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-04-05 09:35:44 -0700 |
commit | 221f56d8a05cdc2d37f85018e4af352b4b2a95c5 (patch) | |
tree | f3dcc17bddd6e0745f679d2d6925bf343798cf51 | |
parent | 679d7485b1fdf049879099404dcecc8496569d2b (diff) | |
download | kafka-python-221f56d8a05cdc2d37f85018e4af352b4b2a95c5.tar.gz |
Split kafka.common into kafka.structs and kafka.errors
-rw-r--r-- | kafka/common.py | 490 | ||||
-rw-r--r-- | kafka/errors.py | 399 | ||||
-rw-r--r-- | kafka/structs.py | 88 |
3 files changed, 489 insertions, 488 deletions
diff --git a/kafka/common.py b/kafka/common.py index 382867c..5761f72 100644 --- a/kafka/common.py +++ b/kafka/common.py @@ -1,488 +1,2 @@ -import inspect -import sys -from collections import namedtuple - - -# SimpleClient Payload Structs - Deprecated - -# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI -MetadataRequest = namedtuple("MetadataRequest", - ["topics"]) - -MetadataResponse = namedtuple("MetadataResponse", - ["brokers", "topics"]) - -# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ConsumerMetadataRequest -ConsumerMetadataRequest = namedtuple("ConsumerMetadataRequest", - ["groups"]) - -ConsumerMetadataResponse = namedtuple("ConsumerMetadataResponse", - ["error", "nodeId", "host", "port"]) - -# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ProduceAPI -ProduceRequestPayload = namedtuple("ProduceRequestPayload", - ["topic", "partition", "messages"]) - -ProduceResponsePayload = namedtuple("ProduceResponsePayload", - ["topic", "partition", "error", "offset"]) - -# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-FetchAPI -FetchRequestPayload = namedtuple("FetchRequestPayload", - ["topic", "partition", "offset", "max_bytes"]) - -FetchResponsePayload = namedtuple("FetchResponsePayload", - ["topic", "partition", "error", "highwaterMark", "messages"]) - -# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetAPI -OffsetRequestPayload = namedtuple("OffsetRequestPayload", - ["topic", "partition", "time", "max_offsets"]) - -OffsetResponsePayload = namedtuple("OffsetResponsePayload", - ["topic", "partition", "error", "offsets"]) - -# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI -OffsetCommitRequestPayload = namedtuple("OffsetCommitRequestPayload", - ["topic", "partition", "offset", "metadata"]) - -OffsetCommitResponsePayload = namedtuple("OffsetCommitResponsePayload", - ["topic", "partition", "error"]) - -OffsetFetchRequestPayload = namedtuple("OffsetFetchRequestPayload", - ["topic", "partition"]) - -OffsetFetchResponsePayload = namedtuple("OffsetFetchResponsePayload", - ["topic", "partition", "offset", "metadata", "error"]) - - - -# Other useful structs -TopicPartition = namedtuple("TopicPartition", - ["topic", "partition"]) - -BrokerMetadata = namedtuple("BrokerMetadata", - ["nodeId", "host", "port"]) - -PartitionMetadata = namedtuple("PartitionMetadata", - ["topic", "partition", "leader", "replicas", "isr", "error"]) - -OffsetAndMetadata = namedtuple("OffsetAndMetadata", - ["offset", "metadata"]) - - -# Deprecated structs -OffsetAndMessage = namedtuple("OffsetAndMessage", - ["offset", "message"]) - -Message = namedtuple("Message", - ["magic", "attributes", "key", "value"]) - -KafkaMessage = namedtuple("KafkaMessage", - ["topic", "partition", "offset", "key", "value"]) - - -# Define retry policy for async producer -# Limit value: int >= 0, 0 means no retries -RetryOptions = namedtuple("RetryOptions", - ["limit", "backoff_ms", "retry_on_timeouts"]) - - -################# -# Exceptions # -################# - - -class KafkaError(RuntimeError): - retriable = False - # whether metadata should be refreshed on error - invalid_metadata = False - - -class IllegalStateError(KafkaError): - pass - - -class IllegalArgumentError(KafkaError): - pass - - -class NoBrokersAvailable(KafkaError): - retriable = True - invalid_metadata = True - - -class NodeNotReadyError(KafkaError): - retriable = True - - -class CorrelationIdError(KafkaError): - retriable = True - - -class Cancelled(KafkaError): - retriable = True - - -class TooManyInFlightRequests(KafkaError): - retriable = True - - -class StaleMetadata(KafkaError): - retriable = True - invalid_metadata = True - - -class UnrecognizedBrokerVersion(KafkaError): - pass - - -class BrokerResponseError(KafkaError): - errno = None - message = None - description = None - - def __str__(self): - return '%s - %s - %s' % (self.__class__.__name__, self.errno, self.description) - - -class NoError(BrokerResponseError): - errno = 0 - message = 'NO_ERROR' - description = 'No error--it worked!' - - -class UnknownError(BrokerResponseError): - errno = -1 - message = 'UNKNOWN' - description = 'An unexpected server error.' - - -class OffsetOutOfRangeError(BrokerResponseError): - errno = 1 - message = 'OFFSET_OUT_OF_RANGE' - description = ('The requested offset is outside the range of offsets' - ' maintained by the server for the given topic/partition.') - - -class InvalidMessageError(BrokerResponseError): - errno = 2 - message = 'INVALID_MESSAGE' - description = ('This indicates that a message contents does not match its' - ' CRC.') - - -class UnknownTopicOrPartitionError(BrokerResponseError): - errno = 3 - message = 'UNKNOWN_TOPIC_OR_PARTITON' - description = ('This request is for a topic or partition that does not' - ' exist on this broker.') - invalid_metadata = True - - -class InvalidFetchRequestError(BrokerResponseError): - errno = 4 - message = 'INVALID_FETCH_SIZE' - description = 'The message has a negative size.' - - -class LeaderNotAvailableError(BrokerResponseError): - errno = 5 - message = 'LEADER_NOT_AVAILABLE' - description = ('This error is thrown if we are in the middle of a' - ' leadership election and there is currently no leader for' - ' this partition and hence it is unavailable for writes.') - retriable = True - invalid_metadata = True - - -class NotLeaderForPartitionError(BrokerResponseError): - errno = 6 - message = 'NOT_LEADER_FOR_PARTITION' - description = ('This error is thrown if the client attempts to send' - ' messages to a replica that is not the leader for some' - ' partition. It indicates that the clients metadata is out' - ' of date.') - retriable = True - invalid_metadata = True - - -class RequestTimedOutError(BrokerResponseError): - errno = 7 - message = 'REQUEST_TIMED_OUT' - description = ('This error is thrown if the request exceeds the' - ' user-specified time limit in the request.') - retriable = True - - -class BrokerNotAvailableError(BrokerResponseError): - errno = 8 - message = 'BROKER_NOT_AVAILABLE' - description = ('This is not a client facing error and is used mostly by' - ' tools when a broker is not alive.') - -class ReplicaNotAvailableError(BrokerResponseError): - errno = 9 - message = 'REPLICA_NOT_AVAILABLE' - description = ('If replica is expected on a broker, but is not (this can be' - ' safely ignored).') - - -class MessageSizeTooLargeError(BrokerResponseError): - errno = 10 - message = 'MESSAGE_SIZE_TOO_LARGE' - description = ('The server has a configurable maximum message size to avoid' - ' unbounded memory allocation. This error is thrown if the' - ' client attempt to produce a message larger than this' - ' maximum.') - - -class StaleControllerEpochError(BrokerResponseError): - errno = 11 - message = 'STALE_CONTROLLER_EPOCH' - description = 'Internal error code for broker-to-broker communication.' - - -class OffsetMetadataTooLargeError(BrokerResponseError): - errno = 12 - message = 'OFFSET_METADATA_TOO_LARGE' - description = ('If you specify a string larger than configured maximum for' - ' offset metadata.') - - -# TODO is this deprecated? https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ErrorCodes -class StaleLeaderEpochCodeError(BrokerResponseError): - errno = 13 - message = 'STALE_LEADER_EPOCH_CODE' - - -class GroupLoadInProgressError(BrokerResponseError): - errno = 14 - message = 'OFFSETS_LOAD_IN_PROGRESS' - description = ('The broker returns this error code for an offset fetch' - ' request if it is still loading offsets (after a leader' - ' change for that offsets topic partition), or in response' - ' to group membership requests (such as heartbeats) when' - ' group metadata is being loaded by the coordinator.') - retriable = True - - -class GroupCoordinatorNotAvailableError(BrokerResponseError): - errno = 15 - message = 'CONSUMER_COORDINATOR_NOT_AVAILABLE' - description = ('The broker returns this error code for group coordinator' - ' requests, offset commits, and most group management' - ' requests if the offsets topic has not yet been created, or' - ' if the group coordinator is not active.') - retriable = True - - -class NotCoordinatorForGroupError(BrokerResponseError): - errno = 16 - message = 'NOT_COORDINATOR_FOR_CONSUMER' - description = ('The broker returns this error code if it receives an offset' - ' fetch or commit request for a group that it is not a' - ' coordinator for.') - retriable = True - - -class InvalidTopicError(BrokerResponseError): - errno = 17 - message = 'INVALID_TOPIC' - description = ('For a request which attempts to access an invalid topic' - ' (e.g. one which has an illegal name), or if an attempt' - ' is made to write to an internal topic (such as the' - ' consumer offsets topic).') - - -class RecordListTooLargeError(BrokerResponseError): - errno = 18 - message = 'RECORD_LIST_TOO_LARGE' - description = ('If a message batch in a produce request exceeds the maximum' - ' configured segment size.') - - -class NotEnoughReplicasError(BrokerResponseError): - errno = 19 - message = 'NOT_ENOUGH_REPLICAS' - description = ('Returned from a produce request when the number of in-sync' - ' replicas is lower than the configured minimum and' - ' requiredAcks is -1.') - - -class NotEnoughReplicasAfterAppendError(BrokerResponseError): - errno = 20 - message = 'NOT_ENOUGH_REPLICAS_AFTER_APPEND' - description = ('Returned from a produce request when the message was' - ' written to the log, but with fewer in-sync replicas than' - ' required.') - - -class InvalidRequiredAcksError(BrokerResponseError): - errno = 21 - message = 'INVALID_REQUIRED_ACKS' - description = ('Returned from a produce request if the requested' - ' requiredAcks is invalid (anything other than -1, 1, or 0).') - - -class IllegalGenerationError(BrokerResponseError): - errno = 22 - message = 'ILLEGAL_GENERATION' - description = ('Returned from group membership requests (such as heartbeats)' - ' when the generation id provided in the request is not the' - ' current generation.') - - -class InconsistentGroupProtocolError(BrokerResponseError): - errno = 23 - message = 'INCONSISTENT_GROUP_PROTOCOL' - description = ('Returned in join group when the member provides a protocol' - ' type or set of protocols which is not compatible with the current group.') - - -class InvalidGroupIdError(BrokerResponseError): - errno = 24 - message = 'INVALID_GROUP_ID' - description = 'Returned in join group when the groupId is empty or null.' - - -class UnknownMemberIdError(BrokerResponseError): - errno = 25 - message = 'UNKNOWN_MEMBER_ID' - description = ('Returned from group requests (offset commits/fetches,' - ' heartbeats, etc) when the memberId is not in the current' - ' generation.') - - -class InvalidSessionTimeoutError(BrokerResponseError): - errno = 26 - message = 'INVALID_SESSION_TIMEOUT' - description = ('Return in join group when the requested session timeout is' - ' outside of the allowed range on the broker') - - -class RebalanceInProgressError(BrokerResponseError): - errno = 27 - message = 'REBALANCE_IN_PROGRESS' - description = ('Returned in heartbeat requests when the coordinator has' - ' begun rebalancing the group. This indicates to the client' - ' that it should rejoin the group.') - - -class InvalidCommitOffsetSizeError(BrokerResponseError): - errno = 28 - message = 'INVALID_COMMIT_OFFSET_SIZE' - description = ('This error indicates that an offset commit was rejected' - ' because of oversize metadata.') - - -class TopicAuthorizationFailedError(BrokerResponseError): - errno = 29 - message = 'TOPIC_AUTHORIZATION_FAILED' - description = ('Returned by the broker when the client is not authorized to' - ' access the requested topic.') - - -class GroupAuthorizationFailedError(BrokerResponseError): - errno = 30 - message = 'GROUP_AUTHORIZATION_FAILED' - description = ('Returned by the broker when the client is not authorized to' - ' access a particular groupId.') - - -class ClusterAuthorizationFailedError(BrokerResponseError): - errno = 31 - message = 'CLUSTER_AUTHORIZATION_FAILED' - description = ('Returned by the broker when the client is not authorized to' - ' use an inter-broker or administrative API.') - - -class KafkaUnavailableError(KafkaError): - pass - - -class KafkaTimeoutError(KafkaError): - pass - - -class FailedPayloadsError(KafkaError): - def __init__(self, payload, *args): - super(FailedPayloadsError, self).__init__(*args) - self.payload = payload - - -class ConnectionError(KafkaError): - retriable = True - invalid_metadata = True - - -class BufferUnderflowError(KafkaError): - pass - - -class ChecksumError(KafkaError): - pass - - -class ConsumerFetchSizeTooSmall(KafkaError): - pass - - -class ConsumerNoMoreData(KafkaError): - pass - - -class ConsumerTimeout(KafkaError): - pass - - -class ProtocolError(KafkaError): - pass - - -class UnsupportedCodecError(KafkaError): - pass - - -class KafkaConfigurationError(KafkaError): - pass - - -class AsyncProducerQueueFull(KafkaError): - def __init__(self, failed_msgs, *args): - super(AsyncProducerQueueFull, self).__init__(*args) - self.failed_msgs = failed_msgs - - -def _iter_broker_errors(): - for name, obj in inspect.getmembers(sys.modules[__name__]): - if inspect.isclass(obj) and issubclass(obj, BrokerResponseError) and obj != BrokerResponseError: - yield obj - - -kafka_errors = dict([(x.errno, x) for x in _iter_broker_errors()]) - - -def for_code(error_code): - return kafka_errors.get(error_code, UnknownError) - - -def check_error(response): - if isinstance(response, Exception): - raise response - if response.error: - error_class = kafka_errors.get(response.error, UnknownError) - raise error_class(response) - - -RETRY_BACKOFF_ERROR_TYPES = ( - KafkaUnavailableError, LeaderNotAvailableError, - ConnectionError, FailedPayloadsError -) - - -RETRY_REFRESH_ERROR_TYPES = ( - NotLeaderForPartitionError, UnknownTopicOrPartitionError, - LeaderNotAvailableError, ConnectionError -) - - -RETRY_ERROR_TYPES = RETRY_BACKOFF_ERROR_TYPES + RETRY_REFRESH_ERROR_TYPES +from kafka.structs import * +from kafka.errors import * diff --git a/kafka/errors.py b/kafka/errors.py new file mode 100644 index 0000000..7b87352 --- /dev/null +++ b/kafka/errors.py @@ -0,0 +1,399 @@ +import inspect +import sys + + +class KafkaError(RuntimeError): + retriable = False + # whether metadata should be refreshed on error + invalid_metadata = False + + +class IllegalStateError(KafkaError): + pass + + +class IllegalArgumentError(KafkaError): + pass + + +class NoBrokersAvailable(KafkaError): + retriable = True + invalid_metadata = True + + +class NodeNotReadyError(KafkaError): + retriable = True + + +class CorrelationIdError(KafkaError): + retriable = True + + +class Cancelled(KafkaError): + retriable = True + + +class TooManyInFlightRequests(KafkaError): + retriable = True + + +class StaleMetadata(KafkaError): + retriable = True + invalid_metadata = True + + +class UnrecognizedBrokerVersion(KafkaError): + pass + + +class BrokerResponseError(KafkaError): + errno = None + message = None + description = None + + def __str__(self): + return '%s - %s - %s' % (self.__class__.__name__, self.errno, self.description) + + +class NoError(BrokerResponseError): + errno = 0 + message = 'NO_ERROR' + description = 'No error--it worked!' + + +class UnknownError(BrokerResponseError): + errno = -1 + message = 'UNKNOWN' + description = 'An unexpected server error.' + + +class OffsetOutOfRangeError(BrokerResponseError): + errno = 1 + message = 'OFFSET_OUT_OF_RANGE' + description = ('The requested offset is outside the range of offsets' + ' maintained by the server for the given topic/partition.') + + +class InvalidMessageError(BrokerResponseError): + errno = 2 + message = 'INVALID_MESSAGE' + description = ('This indicates that a message contents does not match its' + ' CRC.') + + +class UnknownTopicOrPartitionError(BrokerResponseError): + errno = 3 + message = 'UNKNOWN_TOPIC_OR_PARTITON' + description = ('This request is for a topic or partition that does not' + ' exist on this broker.') + invalid_metadata = True + + +class InvalidFetchRequestError(BrokerResponseError): + errno = 4 + message = 'INVALID_FETCH_SIZE' + description = 'The message has a negative size.' + + +class LeaderNotAvailableError(BrokerResponseError): + errno = 5 + message = 'LEADER_NOT_AVAILABLE' + description = ('This error is thrown if we are in the middle of a' + ' leadership election and there is currently no leader for' + ' this partition and hence it is unavailable for writes.') + retriable = True + invalid_metadata = True + + +class NotLeaderForPartitionError(BrokerResponseError): + errno = 6 + message = 'NOT_LEADER_FOR_PARTITION' + description = ('This error is thrown if the client attempts to send' + ' messages to a replica that is not the leader for some' + ' partition. It indicates that the clients metadata is out' + ' of date.') + retriable = True + invalid_metadata = True + + +class RequestTimedOutError(BrokerResponseError): + errno = 7 + message = 'REQUEST_TIMED_OUT' + description = ('This error is thrown if the request exceeds the' + ' user-specified time limit in the request.') + retriable = True + + +class BrokerNotAvailableError(BrokerResponseError): + errno = 8 + message = 'BROKER_NOT_AVAILABLE' + description = ('This is not a client facing error and is used mostly by' + ' tools when a broker is not alive.') + +class ReplicaNotAvailableError(BrokerResponseError): + errno = 9 + message = 'REPLICA_NOT_AVAILABLE' + description = ('If replica is expected on a broker, but is not (this can be' + ' safely ignored).') + + +class MessageSizeTooLargeError(BrokerResponseError): + errno = 10 + message = 'MESSAGE_SIZE_TOO_LARGE' + description = ('The server has a configurable maximum message size to avoid' + ' unbounded memory allocation. This error is thrown if the' + ' client attempt to produce a message larger than this' + ' maximum.') + + +class StaleControllerEpochError(BrokerResponseError): + errno = 11 + message = 'STALE_CONTROLLER_EPOCH' + description = 'Internal error code for broker-to-broker communication.' + + +class OffsetMetadataTooLargeError(BrokerResponseError): + errno = 12 + message = 'OFFSET_METADATA_TOO_LARGE' + description = ('If you specify a string larger than configured maximum for' + ' offset metadata.') + + +# TODO is this deprecated? https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ErrorCodes +class StaleLeaderEpochCodeError(BrokerResponseError): + errno = 13 + message = 'STALE_LEADER_EPOCH_CODE' + + +class GroupLoadInProgressError(BrokerResponseError): + errno = 14 + message = 'OFFSETS_LOAD_IN_PROGRESS' + description = ('The broker returns this error code for an offset fetch' + ' request if it is still loading offsets (after a leader' + ' change for that offsets topic partition), or in response' + ' to group membership requests (such as heartbeats) when' + ' group metadata is being loaded by the coordinator.') + retriable = True + + +class GroupCoordinatorNotAvailableError(BrokerResponseError): + errno = 15 + message = 'CONSUMER_COORDINATOR_NOT_AVAILABLE' + description = ('The broker returns this error code for group coordinator' + ' requests, offset commits, and most group management' + ' requests if the offsets topic has not yet been created, or' + ' if the group coordinator is not active.') + retriable = True + + +class NotCoordinatorForGroupError(BrokerResponseError): + errno = 16 + message = 'NOT_COORDINATOR_FOR_CONSUMER' + description = ('The broker returns this error code if it receives an offset' + ' fetch or commit request for a group that it is not a' + ' coordinator for.') + retriable = True + + +class InvalidTopicError(BrokerResponseError): + errno = 17 + message = 'INVALID_TOPIC' + description = ('For a request which attempts to access an invalid topic' + ' (e.g. one which has an illegal name), or if an attempt' + ' is made to write to an internal topic (such as the' + ' consumer offsets topic).') + + +class RecordListTooLargeError(BrokerResponseError): + errno = 18 + message = 'RECORD_LIST_TOO_LARGE' + description = ('If a message batch in a produce request exceeds the maximum' + ' configured segment size.') + + +class NotEnoughReplicasError(BrokerResponseError): + errno = 19 + message = 'NOT_ENOUGH_REPLICAS' + description = ('Returned from a produce request when the number of in-sync' + ' replicas is lower than the configured minimum and' + ' requiredAcks is -1.') + + +class NotEnoughReplicasAfterAppendError(BrokerResponseError): + errno = 20 + message = 'NOT_ENOUGH_REPLICAS_AFTER_APPEND' + description = ('Returned from a produce request when the message was' + ' written to the log, but with fewer in-sync replicas than' + ' required.') + + +class InvalidRequiredAcksError(BrokerResponseError): + errno = 21 + message = 'INVALID_REQUIRED_ACKS' + description = ('Returned from a produce request if the requested' + ' requiredAcks is invalid (anything other than -1, 1, or 0).') + + +class IllegalGenerationError(BrokerResponseError): + errno = 22 + message = 'ILLEGAL_GENERATION' + description = ('Returned from group membership requests (such as heartbeats)' + ' when the generation id provided in the request is not the' + ' current generation.') + + +class InconsistentGroupProtocolError(BrokerResponseError): + errno = 23 + message = 'INCONSISTENT_GROUP_PROTOCOL' + description = ('Returned in join group when the member provides a protocol' + ' type or set of protocols which is not compatible with the current group.') + + +class InvalidGroupIdError(BrokerResponseError): + errno = 24 + message = 'INVALID_GROUP_ID' + description = 'Returned in join group when the groupId is empty or null.' + + +class UnknownMemberIdError(BrokerResponseError): + errno = 25 + message = 'UNKNOWN_MEMBER_ID' + description = ('Returned from group requests (offset commits/fetches,' + ' heartbeats, etc) when the memberId is not in the current' + ' generation.') + + +class InvalidSessionTimeoutError(BrokerResponseError): + errno = 26 + message = 'INVALID_SESSION_TIMEOUT' + description = ('Return in join group when the requested session timeout is' + ' outside of the allowed range on the broker') + + +class RebalanceInProgressError(BrokerResponseError): + errno = 27 + message = 'REBALANCE_IN_PROGRESS' + description = ('Returned in heartbeat requests when the coordinator has' + ' begun rebalancing the group. This indicates to the client' + ' that it should rejoin the group.') + + +class InvalidCommitOffsetSizeError(BrokerResponseError): + errno = 28 + message = 'INVALID_COMMIT_OFFSET_SIZE' + description = ('This error indicates that an offset commit was rejected' + ' because of oversize metadata.') + + +class TopicAuthorizationFailedError(BrokerResponseError): + errno = 29 + message = 'TOPIC_AUTHORIZATION_FAILED' + description = ('Returned by the broker when the client is not authorized to' + ' access the requested topic.') + + +class GroupAuthorizationFailedError(BrokerResponseError): + errno = 30 + message = 'GROUP_AUTHORIZATION_FAILED' + description = ('Returned by the broker when the client is not authorized to' + ' access a particular groupId.') + + +class ClusterAuthorizationFailedError(BrokerResponseError): + errno = 31 + message = 'CLUSTER_AUTHORIZATION_FAILED' + description = ('Returned by the broker when the client is not authorized to' + ' use an inter-broker or administrative API.') + + +class KafkaUnavailableError(KafkaError): + pass + + +class KafkaTimeoutError(KafkaError): + pass + + +class FailedPayloadsError(KafkaError): + def __init__(self, payload, *args): + super(FailedPayloadsError, self).__init__(*args) + self.payload = payload + + +class ConnectionError(KafkaError): + retriable = True + invalid_metadata = True + + +class BufferUnderflowError(KafkaError): + pass + + +class ChecksumError(KafkaError): + pass + + +class ConsumerFetchSizeTooSmall(KafkaError): + pass + + +class ConsumerNoMoreData(KafkaError): + pass + + +class ConsumerTimeout(KafkaError): + pass + + +class ProtocolError(KafkaError): + pass + + +class UnsupportedCodecError(KafkaError): + pass + + +class KafkaConfigurationError(KafkaError): + pass + + +class AsyncProducerQueueFull(KafkaError): + def __init__(self, failed_msgs, *args): + super(AsyncProducerQueueFull, self).__init__(*args) + self.failed_msgs = failed_msgs + + +def _iter_broker_errors(): + for name, obj in inspect.getmembers(sys.modules[__name__]): + if inspect.isclass(obj) and issubclass(obj, BrokerResponseError) and obj != BrokerResponseError: + yield obj + + +kafka_errors = dict([(x.errno, x) for x in _iter_broker_errors()]) + + +def for_code(error_code): + return kafka_errors.get(error_code, UnknownError) + + +def check_error(response): + if isinstance(response, Exception): + raise response + if response.error: + error_class = kafka_errors.get(response.error, UnknownError) + raise error_class(response) + + +RETRY_BACKOFF_ERROR_TYPES = ( + KafkaUnavailableError, LeaderNotAvailableError, + ConnectionError, FailedPayloadsError +) + + +RETRY_REFRESH_ERROR_TYPES = ( + NotLeaderForPartitionError, UnknownTopicOrPartitionError, + LeaderNotAvailableError, ConnectionError +) + + +RETRY_ERROR_TYPES = RETRY_BACKOFF_ERROR_TYPES + RETRY_REFRESH_ERROR_TYPES diff --git a/kafka/structs.py b/kafka/structs.py new file mode 100644 index 0000000..5902930 --- /dev/null +++ b/kafka/structs.py @@ -0,0 +1,88 @@ +from collections import namedtuple + + +# SimpleClient Payload Structs - Deprecated + +# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI +MetadataRequest = namedtuple("MetadataRequest", + ["topics"]) + +MetadataResponse = namedtuple("MetadataResponse", + ["brokers", "topics"]) + +# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ConsumerMetadataRequest +ConsumerMetadataRequest = namedtuple("ConsumerMetadataRequest", + ["groups"]) + +ConsumerMetadataResponse = namedtuple("ConsumerMetadataResponse", + ["error", "nodeId", "host", "port"]) + +# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ProduceAPI +ProduceRequestPayload = namedtuple("ProduceRequestPayload", + ["topic", "partition", "messages"]) + +ProduceResponsePayload = namedtuple("ProduceResponsePayload", + ["topic", "partition", "error", "offset"]) + +# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-FetchAPI +FetchRequestPayload = namedtuple("FetchRequestPayload", + ["topic", "partition", "offset", "max_bytes"]) + +FetchResponsePayload = namedtuple("FetchResponsePayload", + ["topic", "partition", "error", "highwaterMark", "messages"]) + +# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetAPI +OffsetRequestPayload = namedtuple("OffsetRequestPayload", + ["topic", "partition", "time", "max_offsets"]) + +OffsetResponsePayload = namedtuple("OffsetResponsePayload", + ["topic", "partition", "error", "offsets"]) + +# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI +OffsetCommitRequestPayload = namedtuple("OffsetCommitRequestPayload", + ["topic", "partition", "offset", "metadata"]) + +OffsetCommitResponsePayload = namedtuple("OffsetCommitResponsePayload", + ["topic", "partition", "error"]) + +OffsetFetchRequestPayload = namedtuple("OffsetFetchRequestPayload", + ["topic", "partition"]) + +OffsetFetchResponsePayload = namedtuple("OffsetFetchResponsePayload", + ["topic", "partition", "offset", "metadata", "error"]) + + + +# Other useful structs +TopicPartition = namedtuple("TopicPartition", + ["topic", "partition"]) + +BrokerMetadata = namedtuple("BrokerMetadata", + ["nodeId", "host", "port"]) + +PartitionMetadata = namedtuple("PartitionMetadata", + ["topic", "partition", "leader", "replicas", "isr", "error"]) + +OffsetAndMetadata = namedtuple("OffsetAndMetadata", + ["offset", "metadata"]) + + +# Deprecated structs +OffsetAndMessage = namedtuple("OffsetAndMessage", + ["offset", "message"]) + +Message = namedtuple("Message", + ["magic", "attributes", "key", "value"]) + +KafkaMessage = namedtuple("KafkaMessage", + ["topic", "partition", "offset", "key", "value"]) + + +# Define retry policy for async producer +# Limit value: int >= 0, 0 means no retries +RetryOptions = namedtuple("RetryOptions", + ["limit", "backoff_ms", "retry_on_timeouts"]) + + +# Support legacy imports from kafka.common +from kafka.errors import * |