summaryrefslogtreecommitdiff
path: root/kafka/protocol.py
diff options
context:
space:
mode:
authorWill Daly <will@edx.org>2015-01-04 12:23:10 -0500
committerWill Daly <will.e.daly@gmail.com>2015-01-15 18:01:40 -0500
commit01f378328e5383d05d52428b815f992eb2c536cb (patch)
tree2d3366ed91b9744efd40d935a460040150c6d4d8 /kafka/protocol.py
parent02c2b469003e2ddcb051dbb4d95977137050c19f (diff)
downloadkafka-python-01f378328e5383d05d52428b815f992eb2c536cb.tar.gz
Add Sphinx API docs
Diffstat (limited to 'kafka/protocol.py')
-rw-r--r--kafka/protocol.py125
1 files changed, 58 insertions, 67 deletions
diff --git a/kafka/protocol.py b/kafka/protocol.py
index a85c7eb..2a39de6 100644
--- a/kafka/protocol.py
+++ b/kafka/protocol.py
@@ -185,18 +185,18 @@ class KafkaProtocol(object):
"""
Encode some ProduceRequest structs
- Params
- ======
- client_id: string
- correlation_id: int
- payloads: list of ProduceRequest
- acks: How "acky" you want the request to be
- 0: immediate response
- 1: written to disk by the leader
- 2+: waits for this many number of replicas to sync
- -1: waits for all replicas to be in sync
- timeout: Maximum time the server will wait for acks from replicas.
- This is _not_ a socket timeout
+ Arguments:
+ client_id: string
+ correlation_id: int
+ payloads: list of ProduceRequest
+ acks: How "acky" you want the request to be
+ 0: immediate response
+ 1: written to disk by the leader
+ 2+: waits for this many number of replicas to sync
+ -1: waits for all replicas to be in sync
+ timeout: Maximum time the server will wait for acks from replicas.
+ This is _not_ a socket timeout
+
"""
payloads = [] if payloads is None else payloads
grouped_payloads = group_by_topic_and_partition(payloads)
@@ -225,9 +225,9 @@ class KafkaProtocol(object):
"""
Decode bytes to a ProduceResponse
- Params
- ======
- data: bytes to decode
+ Arguments:
+ data: bytes to decode
+
"""
((correlation_id, num_topics), cur) = relative_unpack('>ii', data, 0)
@@ -248,14 +248,13 @@ class KafkaProtocol(object):
"""
Encodes some FetchRequest structs
- Params
- ======
- client_id: string
- correlation_id: int
- payloads: list of FetchRequest
- max_wait_time: int, how long to block waiting on min_bytes of data
- min_bytes: int, the minimum number of bytes to accumulate before
- returning the response
+ Arguments:
+ client_id: string
+ correlation_id: int
+ payloads: list of FetchRequest
+ max_wait_time: int, how long to block waiting on min_bytes of data
+ min_bytes: int, the minimum number of bytes to accumulate before
+ returning the response
"""
payloads = [] if payloads is None else payloads
@@ -284,9 +283,8 @@ class KafkaProtocol(object):
"""
Decode bytes to a FetchResponse
- Params
- ======
- data: bytes to decode
+ Arguments:
+ data: bytes to decode
"""
((correlation_id, num_topics), cur) = relative_unpack('>ii', data, 0)
@@ -333,9 +331,8 @@ class KafkaProtocol(object):
"""
Decode bytes to an OffsetResponse
- Params
- ======
- data: bytes to decode
+ Arguments:
+ data: bytes to decode
"""
((correlation_id, num_topics), cur) = relative_unpack('>ii', data, 0)
@@ -360,11 +357,10 @@ class KafkaProtocol(object):
"""
Encode a MetadataRequest
- Params
- ======
- client_id: string
- correlation_id: int
- topics: list of strings
+ Arguments:
+ client_id: string
+ correlation_id: int
+ topics: list of strings
"""
if payloads is None:
topics = [] if topics is None else topics
@@ -388,9 +384,8 @@ class KafkaProtocol(object):
"""
Decode bytes to a MetadataResponse
- Params
- ======
- data: bytes to decode
+ Arguments:
+ data: bytes to decode
"""
((correlation_id, numbrokers), cur) = relative_unpack('>ii', data, 0)
@@ -439,12 +434,11 @@ class KafkaProtocol(object):
"""
Encode some OffsetCommitRequest structs
- Params
- ======
- client_id: string
- correlation_id: int
- group: string, the consumer group you are committing offsets for
- payloads: list of OffsetCommitRequest
+ Arguments:
+ client_id: string
+ correlation_id: int
+ group: string, the consumer group you are committing offsets for
+ payloads: list of OffsetCommitRequest
"""
grouped_payloads = group_by_topic_and_partition(payloads)
@@ -470,9 +464,8 @@ class KafkaProtocol(object):
"""
Decode bytes to an OffsetCommitResponse
- Params
- ======
- data: bytes to decode
+ Arguments:
+ data: bytes to decode
"""
((correlation_id,), cur) = relative_unpack('>i', data, 0)
((num_topics,), cur) = relative_unpack('>i', data, cur)
@@ -491,12 +484,11 @@ class KafkaProtocol(object):
"""
Encode some OffsetFetchRequest structs
- Params
- ======
- client_id: string
- correlation_id: int
- group: string, the consumer group you are fetching offsets for
- payloads: list of OffsetFetchRequest
+ Arguments:
+ client_id: string
+ correlation_id: int
+ group: string, the consumer group you are fetching offsets for
+ payloads: list of OffsetFetchRequest
"""
grouped_payloads = group_by_topic_and_partition(payloads)
@@ -522,9 +514,8 @@ class KafkaProtocol(object):
"""
Decode bytes to an OffsetFetchResponse
- Params
- ======
- data: bytes to decode
+ Arguments:
+ data: bytes to decode
"""
((correlation_id,), cur) = relative_unpack('>i', data, 0)
@@ -547,10 +538,10 @@ def create_message(payload, key=None):
"""
Construct a Message
- Params
- ======
- payload: bytes, the payload to send to Kafka
- key: bytes, a key used for partition routing (optional)
+ Arguments:
+ payload: bytes, the payload to send to Kafka
+ key: bytes, a key used for partition routing (optional)
+
"""
return Message(0, 0, key, payload)
@@ -562,10 +553,10 @@ def create_gzip_message(payloads, key=None):
The given payloads will be encoded, compressed, and sent as a single atomic
message to Kafka.
- Params
- ======
- payloads: list(bytes), a list of payload to send be sent to Kafka
- key: bytes, a key used for partition routing (optional)
+ Arguments:
+ payloads: list(bytes), a list of payload to send be sent to Kafka
+ key: bytes, a key used for partition routing (optional)
+
"""
message_set = KafkaProtocol._encode_message_set(
[create_message(payload, key) for payload in payloads])
@@ -583,10 +574,10 @@ def create_snappy_message(payloads, key=None):
The given payloads will be encoded, compressed, and sent as a single atomic
message to Kafka.
- Params
- ======
- payloads: list(bytes), a list of payload to send be sent to Kafka
- key: bytes, a key used for partition routing (optional)
+ Arguments:
+ payloads: list(bytes), a list of payload to send be sent to Kafka
+ key: bytes, a key used for partition routing (optional)
+
"""
message_set = KafkaProtocol._encode_message_set(
[create_message(payload, key) for payload in payloads])