summaryrefslogtreecommitdiff
path: root/kafka/common.py
diff options
context:
space:
mode:
authortpalino <tpalino@linkedin.com>2015-07-01 22:29:56 -0700
committertpalino <tpalino@linkedin.com>2015-07-01 22:29:56 -0700
commitc01963682e1ec16dbde9b55cda4ec95ad2346476 (patch)
treeb4de368ad49a6d0e23be9ff7f3af64d1f3c4f890 /kafka/common.py
parentadbd4ac052e4a5b40cfc2a3589b7adbcb656afe5 (diff)
downloadkafka-python-c01963682e1ec16dbde9b55cda4ec95ad2346476.tar.gz
Support consumer metadata requests
Support added for ConsumerMetadataRequest and ConsumerMetadataResponse Added consumer-aware request routine for supporting the consumer coordinator Added separate client method for fetching Kafka-committed offsets from the coordinator
Diffstat (limited to 'kafka/common.py')
-rw-r--r--kafka/common.py22
1 files changed, 22 insertions, 0 deletions
diff --git a/kafka/common.py b/kafka/common.py
index 66987ff..a7d8164 100644
--- a/kafka/common.py
+++ b/kafka/common.py
@@ -13,6 +13,13 @@ MetadataRequest = namedtuple("MetadataRequest",
MetadataResponse = namedtuple("MetadataResponse",
["brokers", "topics"])
+# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ConsumerMetadataRequest
+ConsumerMetadataRequest = namedtuple("ConsumerMetadataRequest",
+ ["groups"])
+
+ConsumerMetadataResponse = namedtuple("ConsumerMetadataResponse",
+ ["error", "nodeId", "host", "port"])
+
# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ProduceAPI
ProduceRequest = namedtuple("ProduceRequest",
["topic", "partition", "messages"])
@@ -160,6 +167,21 @@ class StaleLeaderEpochCodeError(BrokerResponseError):
message = 'STALE_LEADER_EPOCH_CODE'
+class OffsetsLoadInProgressCode(BrokerResponseError):
+ errno = 14
+ message = 'OFFSETS_LOAD_IN_PROGRESS_CODE'
+
+
+class ConsumerCoordinatorNotAvailableCode(BrokerResponseError):
+ errno = 15
+ message = 'CONSUMER_COORDINATOR_NOT_AVAILABLE_CODE'
+
+
+class NotCoordinatorForConsumerCode(BrokerResponseError):
+ errno = 16
+ message = 'NOT_COORDINATOR_FOR_CONSUMER_CODE'
+
+
class KafkaUnavailableError(KafkaError):
pass