from collections import namedtuple ############### # Structs # ############### # Request payloads ProduceRequest = namedtuple("ProduceRequest", ["topic", "partition", "messages"]) FetchRequest = namedtuple("FetchRequest", ["topic", "partition", "offset", "max_bytes"]) OffsetRequest = namedtuple("OffsetRequest", ["topic", "partition", "time", "max_offsets"]) OffsetCommitRequest = namedtuple("OffsetCommitRequest", ["topic", "partition", "offset", "metadata"]) OffsetFetchRequest = namedtuple("OffsetFetchRequest", ["topic", "partition"]) # Response payloads ProduceResponse = namedtuple("ProduceResponse", ["topic", "partition", "error", "offset"]) FetchResponse = namedtuple("FetchResponse", ["topic", "partition", "error", "highwaterMark", "messages"]) OffsetResponse = namedtuple("OffsetResponse", ["topic", "partition", "error", "offsets"]) OffsetCommitResponse = namedtuple("OffsetCommitResponse", ["topic", "partition", "error"]) OffsetFetchResponse = namedtuple("OffsetFetchResponse", ["topic", "partition", "offset", "metadata", "error"]) BrokerMetadata = namedtuple("BrokerMetadata", ["nodeId", "host", "port"]) PartitionMetadata = namedtuple("PartitionMetadata", ["topic", "partition", "leader", "replicas", "isr"]) # Other useful structs OffsetAndMessage = namedtuple("OffsetAndMessage", ["offset", "message"]) Message = namedtuple("Message", ["magic", "attributes", "key", "value"]) TopicAndPartition = namedtuple("TopicAndPartition", ["topic", "partition"]) class ErrorMapping(object): # Many of these are not actually used by the client UNKNOWN = -1 NO_ERROR = 0 OFFSET_OUT_OF_RANGE = 1 INVALID_MESSAGE = 2 UNKNOWN_TOPIC_OR_PARTITON = 3 INVALID_FETCH_SIZE = 4 LEADER_NOT_AVAILABLE = 5 NOT_LEADER_FOR_PARTITION = 6 REQUEST_TIMED_OUT = 7 BROKER_NOT_AVAILABLE = 8 REPLICA_NOT_AVAILABLE = 9 MESSAGE_SIZE_TO_LARGE = 10 STALE_CONTROLLER_EPOCH = 11 OFFSET_METADATA_TOO_LARGE = 12 ################# # Exceptions # ################# class BufferUnderflowError(Exception): pass class ChecksumError(Exception): pass class ConsumerFetchSizeTooSmall(Exception): pass class ConsumerNoMoreData(Exception): pass