summaryrefslogtreecommitdiff
path: root/kafka/common.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2016-01-07 18:51:14 -0800
committerDana Powers <dana.powers@rd.io>2016-01-07 18:51:14 -0800
commit828377377da43749af0d27ee256ef31bf714cf17 (patch)
treefbad4d4381fc4d1ea2be7ce2009214d18fbeb674 /kafka/common.py
parent71e7568fcb8132899f366b37c32645fd5a40dc4b (diff)
parent9a8af1499ca425366d934487469d9977fae7fe5f (diff)
downloadkafka-python-828377377da43749af0d27ee256ef31bf714cf17.tar.gz
Merge branch '0.9'
Conflicts: kafka/codec.py kafka/version.py test/test_producer.py test/test_producer_integration.py
Diffstat (limited to 'kafka/common.py')
-rw-r--r--kafka/common.py255
1 files changed, 235 insertions, 20 deletions
diff --git a/kafka/common.py b/kafka/common.py
index a7d8164..84cf719 100644
--- a/kafka/common.py
+++ b/kafka/common.py
@@ -21,37 +21,37 @@ ConsumerMetadataResponse = namedtuple("ConsumerMetadataResponse",
["error", "nodeId", "host", "port"])
# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ProduceAPI
-ProduceRequest = namedtuple("ProduceRequest",
+ProduceRequestPayload = namedtuple("ProduceRequestPayload",
["topic", "partition", "messages"])
-ProduceResponse = namedtuple("ProduceResponse",
+ProduceResponsePayload = namedtuple("ProduceResponsePayload",
["topic", "partition", "error", "offset"])
# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-FetchAPI
-FetchRequest = namedtuple("FetchRequest",
+FetchRequestPayload = namedtuple("FetchRequestPayload",
["topic", "partition", "offset", "max_bytes"])
-FetchResponse = namedtuple("FetchResponse",
+FetchResponsePayload = namedtuple("FetchResponsePayload",
["topic", "partition", "error", "highwaterMark", "messages"])
# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetAPI
-OffsetRequest = namedtuple("OffsetRequest",
+OffsetRequestPayload = namedtuple("OffsetRequestPayload",
["topic", "partition", "time", "max_offsets"])
-OffsetResponse = namedtuple("OffsetResponse",
+OffsetResponsePayload = namedtuple("OffsetResponsePayload",
["topic", "partition", "error", "offsets"])
# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI
-OffsetCommitRequest = namedtuple("OffsetCommitRequest",
+OffsetCommitRequestPayload = namedtuple("OffsetCommitRequestPayload",
["topic", "partition", "offset", "metadata"])
-OffsetCommitResponse = namedtuple("OffsetCommitResponse",
+OffsetCommitResponsePayload = namedtuple("OffsetCommitResponsePayload",
["topic", "partition", "error"])
-OffsetFetchRequest = namedtuple("OffsetFetchRequest",
+OffsetFetchRequestPayload = namedtuple("OffsetFetchRequestPayload",
["topic", "partition"])
-OffsetFetchResponse = namedtuple("OffsetFetchResponse",
+OffsetFetchResponsePayload = namedtuple("OffsetFetchResponsePayload",
["topic", "partition", "offset", "metadata", "error"])
@@ -72,12 +72,15 @@ OffsetAndMessage = namedtuple("OffsetAndMessage",
Message = namedtuple("Message",
["magic", "attributes", "key", "value"])
-TopicAndPartition = namedtuple("TopicAndPartition",
+TopicPartition = namedtuple("TopicPartition",
["topic", "partition"])
KafkaMessage = namedtuple("KafkaMessage",
["topic", "partition", "offset", "key", "value"])
+OffsetAndMetadata = namedtuple("OffsetAndMetadata",
+ ["offset", "metadata"])
+
# Define retry policy for async producer
# Limit value: int >= 0, 0 means no retries
RetryOptions = namedtuple("RetryOptions",
@@ -90,96 +93,303 @@ RetryOptions = namedtuple("RetryOptions",
class KafkaError(RuntimeError):
+ retriable = False
+ # whether metadata should be refreshed on error
+ invalid_metadata = False
+
+
+class IllegalStateError(KafkaError):
pass
-class BrokerResponseError(KafkaError):
+class IllegalArgumentError(KafkaError):
pass
+class NoBrokersAvailable(KafkaError):
+ retriable = True
+ invalid_metadata = True
+
+
+class NodeNotReadyError(KafkaError):
+ retriable = True
+
+
+class CorrelationIdError(KafkaError):
+ retriable = True
+
+
+class Cancelled(KafkaError):
+ retriable = True
+
+
+class TooManyInFlightRequests(KafkaError):
+ retriable = True
+
+
+class StaleMetadata(KafkaError):
+ retriable = True
+ invalid_metadata = True
+
+
+class BrokerResponseError(KafkaError):
+ errno = None
+ message = None
+ description = None
+
+ def __str__(self):
+ return '%s - %s - %s' % (self.__class__.__name__, self.errno, self.description)
+
+
+class NoError(BrokerResponseError):
+ errno = 0
+ message = 'NO_ERROR'
+ description = 'No error--it worked!'
+
+
class UnknownError(BrokerResponseError):
errno = -1
message = 'UNKNOWN'
+ description = 'An unexpected server error.'
class OffsetOutOfRangeError(BrokerResponseError):
errno = 1
message = 'OFFSET_OUT_OF_RANGE'
+ description = ('The requested offset is outside the range of offsets'
+ ' maintained by the server for the given topic/partition.')
class InvalidMessageError(BrokerResponseError):
errno = 2
message = 'INVALID_MESSAGE'
+ description = ('This indicates that a message contents does not match its'
+ ' CRC.')
class UnknownTopicOrPartitionError(BrokerResponseError):
errno = 3
message = 'UNKNOWN_TOPIC_OR_PARTITON'
+ description = ('This request is for a topic or partition that does not'
+ ' exist on this broker.')
+ invalid_metadata = True
class InvalidFetchRequestError(BrokerResponseError):
errno = 4
message = 'INVALID_FETCH_SIZE'
+ description = 'The message has a negative size.'
class LeaderNotAvailableError(BrokerResponseError):
errno = 5
message = 'LEADER_NOT_AVAILABLE'
+ description = ('This error is thrown if we are in the middle of a'
+ ' leadership election and there is currently no leader for'
+ ' this partition and hence it is unavailable for writes.')
+ retriable = True
+ invalid_metadata = True
class NotLeaderForPartitionError(BrokerResponseError):
errno = 6
message = 'NOT_LEADER_FOR_PARTITION'
+ description = ('This error is thrown if the client attempts to send'
+ ' messages to a replica that is not the leader for some'
+ ' partition. It indicates that the clients metadata is out'
+ ' of date.')
+ retriable = True
+ invalid_metadata = True
class RequestTimedOutError(BrokerResponseError):
errno = 7
message = 'REQUEST_TIMED_OUT'
+ description = ('This error is thrown if the request exceeds the'
+ ' user-specified time limit in the request.')
+ retriable = True
class BrokerNotAvailableError(BrokerResponseError):
errno = 8
message = 'BROKER_NOT_AVAILABLE'
-
+ description = ('This is not a client facing error and is used mostly by'
+ ' tools when a broker is not alive.')
class ReplicaNotAvailableError(BrokerResponseError):
errno = 9
message = 'REPLICA_NOT_AVAILABLE'
+ description = ('If replica is expected on a broker, but is not (this can be'
+ ' safely ignored).')
class MessageSizeTooLargeError(BrokerResponseError):
errno = 10
message = 'MESSAGE_SIZE_TOO_LARGE'
+ description = ('The server has a configurable maximum message size to avoid'
+ ' unbounded memory allocation. This error is thrown if the'
+ ' client attempt to produce a message larger than this'
+ ' maximum.')
class StaleControllerEpochError(BrokerResponseError):
errno = 11
message = 'STALE_CONTROLLER_EPOCH'
+ description = 'Internal error code for broker-to-broker communication.'
class OffsetMetadataTooLargeError(BrokerResponseError):
errno = 12
message = 'OFFSET_METADATA_TOO_LARGE'
+ description = ('If you specify a string larger than configured maximum for'
+ ' offset metadata.')
+# TODO is this deprecated? https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ErrorCodes
class StaleLeaderEpochCodeError(BrokerResponseError):
errno = 13
message = 'STALE_LEADER_EPOCH_CODE'
-class OffsetsLoadInProgressCode(BrokerResponseError):
+class GroupLoadInProgressError(BrokerResponseError):
errno = 14
- message = 'OFFSETS_LOAD_IN_PROGRESS_CODE'
+ message = 'OFFSETS_LOAD_IN_PROGRESS'
+ description = ('The broker returns this error code for an offset fetch'
+ ' request if it is still loading offsets (after a leader'
+ ' change for that offsets topic partition), or in response'
+ ' to group membership requests (such as heartbeats) when'
+ ' group metadata is being loaded by the coordinator.')
+ retriable = True
-class ConsumerCoordinatorNotAvailableCode(BrokerResponseError):
+class GroupCoordinatorNotAvailableError(BrokerResponseError):
errno = 15
- message = 'CONSUMER_COORDINATOR_NOT_AVAILABLE_CODE'
+ message = 'CONSUMER_COORDINATOR_NOT_AVAILABLE'
+ description = ('The broker returns this error code for group coordinator'
+ ' requests, offset commits, and most group management'
+ ' requests if the offsets topic has not yet been created, or'
+ ' if the group coordinator is not active.')
+ retriable = True
-class NotCoordinatorForConsumerCode(BrokerResponseError):
+class NotCoordinatorForGroupError(BrokerResponseError):
errno = 16
- message = 'NOT_COORDINATOR_FOR_CONSUMER_CODE'
+ message = 'NOT_COORDINATOR_FOR_CONSUMER'
+ description = ('The broker returns this error code if it receives an offset'
+ ' fetch or commit request for a group that it is not a'
+ ' coordinator for.')
+ retriable = True
+
+
+class InvalidTopicError(BrokerResponseError):
+ errno = 17
+ message = 'INVALID_TOPIC'
+ description = ('For a request which attempts to access an invalid topic'
+ ' (e.g. one which has an illegal name), or if an attempt'
+ ' is made to write to an internal topic (such as the'
+ ' consumer offsets topic).')
+
+
+class RecordListTooLargeError(BrokerResponseError):
+ errno = 18
+ message = 'RECORD_LIST_TOO_LARGE'
+ description = ('If a message batch in a produce request exceeds the maximum'
+ ' configured segment size.')
+
+
+class NotEnoughReplicasError(BrokerResponseError):
+ errno = 19
+ message = 'NOT_ENOUGH_REPLICAS'
+ description = ('Returned from a produce request when the number of in-sync'
+ ' replicas is lower than the configured minimum and'
+ ' requiredAcks is -1.')
+
+
+class NotEnoughReplicasAfterAppendError(BrokerResponseError):
+ errno = 20
+ message = 'NOT_ENOUGH_REPLICAS_AFTER_APPEND'
+ description = ('Returned from a produce request when the message was'
+ ' written to the log, but with fewer in-sync replicas than'
+ ' required.')
+
+
+class InvalidRequiredAcksError(BrokerResponseError):
+ errno = 21
+ message = 'INVALID_REQUIRED_ACKS'
+ description = ('Returned from a produce request if the requested'
+ ' requiredAcks is invalid (anything other than -1, 1, or 0).')
+
+
+class IllegalGenerationError(BrokerResponseError):
+ errno = 22
+ message = 'ILLEGAL_GENERATION'
+ description = ('Returned from group membership requests (such as heartbeats)'
+ ' when the generation id provided in the request is not the'
+ ' current generation.')
+
+
+class InconsistentGroupProtocolError(BrokerResponseError):
+ errno = 23
+ message = 'INCONSISTENT_GROUP_PROTOCOL'
+ description = ('Returned in join group when the member provides a protocol'
+ ' type or set of protocols which is not compatible with the current group.')
+
+
+class InvalidGroupIdError(BrokerResponseError):
+ errno = 24
+ message = 'INVALID_GROUP_ID'
+ description = 'Returned in join group when the groupId is empty or null.'
+
+
+class UnknownMemberIdError(BrokerResponseError):
+ errno = 25
+ message = 'UNKNOWN_MEMBER_ID'
+ description = ('Returned from group requests (offset commits/fetches,'
+ ' heartbeats, etc) when the memberId is not in the current'
+ ' generation.')
+
+
+class InvalidSessionTimeoutError(BrokerResponseError):
+ errno = 26
+ message = 'INVALID_SESSION_TIMEOUT'
+ description = ('Return in join group when the requested session timeout is'
+ ' outside of the allowed range on the broker')
+
+
+class RebalanceInProgressError(BrokerResponseError):
+ errno = 27
+ message = 'REBALANCE_IN_PROGRESS'
+ description = ('Returned in heartbeat requests when the coordinator has'
+ ' begun rebalancing the group. This indicates to the client'
+ ' that it should rejoin the group.')
+
+
+class InvalidCommitOffsetSizeError(BrokerResponseError):
+ errno = 28
+ message = 'INVALID_COMMIT_OFFSET_SIZE'
+ description = ('This error indicates that an offset commit was rejected'
+ ' because of oversize metadata.')
+
+
+class TopicAuthorizationFailedError(BrokerResponseError):
+ errno = 29
+ message = 'TOPIC_AUTHORIZATION_FAILED'
+ description = ('Returned by the broker when the client is not authorized to'
+ ' access the requested topic.')
+
+
+class GroupAuthorizationFailedError(BrokerResponseError):
+ errno = 30
+ message = 'GROUP_AUTHORIZATION_FAILED'
+ description = ('Returned by the broker when the client is not authorized to'
+ ' access a particular groupId.')
+
+
+class ClusterAuthorizationFailedError(BrokerResponseError):
+ errno = 31
+ message = 'CLUSTER_AUTHORIZATION_FAILED'
+ description = ('Returned by the broker when the client is not authorized to'
+ ' use an inter-broker or administrative API.')
class KafkaUnavailableError(KafkaError):
@@ -197,7 +407,8 @@ class FailedPayloadsError(KafkaError):
class ConnectionError(KafkaError):
- pass
+ retriable = True
+ invalid_metadata = True
class BufferUnderflowError(KafkaError):
@@ -247,6 +458,10 @@ def _iter_broker_errors():
kafka_errors = dict([(x.errno, x) for x in _iter_broker_errors()])
+def for_code(error_code):
+ return kafka_errors.get(error_code, UnknownError)
+
+
def check_error(response):
if isinstance(response, Exception):
raise response