diff options
author | Will Daly <will@edx.org> | 2015-01-04 12:23:10 -0500 |
---|---|---|
committer | Will Daly <will.e.daly@gmail.com> | 2015-01-15 18:01:40 -0500 |
commit | 01f378328e5383d05d52428b815f992eb2c536cb (patch) | |
tree | 2d3366ed91b9744efd40d935a460040150c6d4d8 /kafka/protocol.py | |
parent | 02c2b469003e2ddcb051dbb4d95977137050c19f (diff) | |
download | kafka-python-01f378328e5383d05d52428b815f992eb2c536cb.tar.gz |
Add Sphinx API docs
Diffstat (limited to 'kafka/protocol.py')
-rw-r--r-- | kafka/protocol.py | 125 |
1 files changed, 58 insertions, 67 deletions
diff --git a/kafka/protocol.py b/kafka/protocol.py index a85c7eb..2a39de6 100644 --- a/kafka/protocol.py +++ b/kafka/protocol.py @@ -185,18 +185,18 @@ class KafkaProtocol(object): """ Encode some ProduceRequest structs - Params - ====== - client_id: string - correlation_id: int - payloads: list of ProduceRequest - acks: How "acky" you want the request to be - 0: immediate response - 1: written to disk by the leader - 2+: waits for this many number of replicas to sync - -1: waits for all replicas to be in sync - timeout: Maximum time the server will wait for acks from replicas. - This is _not_ a socket timeout + Arguments: + client_id: string + correlation_id: int + payloads: list of ProduceRequest + acks: How "acky" you want the request to be + 0: immediate response + 1: written to disk by the leader + 2+: waits for this many number of replicas to sync + -1: waits for all replicas to be in sync + timeout: Maximum time the server will wait for acks from replicas. + This is _not_ a socket timeout + """ payloads = [] if payloads is None else payloads grouped_payloads = group_by_topic_and_partition(payloads) @@ -225,9 +225,9 @@ class KafkaProtocol(object): """ Decode bytes to a ProduceResponse - Params - ====== - data: bytes to decode + Arguments: + data: bytes to decode + """ ((correlation_id, num_topics), cur) = relative_unpack('>ii', data, 0) @@ -248,14 +248,13 @@ class KafkaProtocol(object): """ Encodes some FetchRequest structs - Params - ====== - client_id: string - correlation_id: int - payloads: list of FetchRequest - max_wait_time: int, how long to block waiting on min_bytes of data - min_bytes: int, the minimum number of bytes to accumulate before - returning the response + Arguments: + client_id: string + correlation_id: int + payloads: list of FetchRequest + max_wait_time: int, how long to block waiting on min_bytes of data + min_bytes: int, the minimum number of bytes to accumulate before + returning the response """ payloads = [] if payloads is None else payloads @@ -284,9 +283,8 @@ class KafkaProtocol(object): """ Decode bytes to a FetchResponse - Params - ====== - data: bytes to decode + Arguments: + data: bytes to decode """ ((correlation_id, num_topics), cur) = relative_unpack('>ii', data, 0) @@ -333,9 +331,8 @@ class KafkaProtocol(object): """ Decode bytes to an OffsetResponse - Params - ====== - data: bytes to decode + Arguments: + data: bytes to decode """ ((correlation_id, num_topics), cur) = relative_unpack('>ii', data, 0) @@ -360,11 +357,10 @@ class KafkaProtocol(object): """ Encode a MetadataRequest - Params - ====== - client_id: string - correlation_id: int - topics: list of strings + Arguments: + client_id: string + correlation_id: int + topics: list of strings """ if payloads is None: topics = [] if topics is None else topics @@ -388,9 +384,8 @@ class KafkaProtocol(object): """ Decode bytes to a MetadataResponse - Params - ====== - data: bytes to decode + Arguments: + data: bytes to decode """ ((correlation_id, numbrokers), cur) = relative_unpack('>ii', data, 0) @@ -439,12 +434,11 @@ class KafkaProtocol(object): """ Encode some OffsetCommitRequest structs - Params - ====== - client_id: string - correlation_id: int - group: string, the consumer group you are committing offsets for - payloads: list of OffsetCommitRequest + Arguments: + client_id: string + correlation_id: int + group: string, the consumer group you are committing offsets for + payloads: list of OffsetCommitRequest """ grouped_payloads = group_by_topic_and_partition(payloads) @@ -470,9 +464,8 @@ class KafkaProtocol(object): """ Decode bytes to an OffsetCommitResponse - Params - ====== - data: bytes to decode + Arguments: + data: bytes to decode """ ((correlation_id,), cur) = relative_unpack('>i', data, 0) ((num_topics,), cur) = relative_unpack('>i', data, cur) @@ -491,12 +484,11 @@ class KafkaProtocol(object): """ Encode some OffsetFetchRequest structs - Params - ====== - client_id: string - correlation_id: int - group: string, the consumer group you are fetching offsets for - payloads: list of OffsetFetchRequest + Arguments: + client_id: string + correlation_id: int + group: string, the consumer group you are fetching offsets for + payloads: list of OffsetFetchRequest """ grouped_payloads = group_by_topic_and_partition(payloads) @@ -522,9 +514,8 @@ class KafkaProtocol(object): """ Decode bytes to an OffsetFetchResponse - Params - ====== - data: bytes to decode + Arguments: + data: bytes to decode """ ((correlation_id,), cur) = relative_unpack('>i', data, 0) @@ -547,10 +538,10 @@ def create_message(payload, key=None): """ Construct a Message - Params - ====== - payload: bytes, the payload to send to Kafka - key: bytes, a key used for partition routing (optional) + Arguments: + payload: bytes, the payload to send to Kafka + key: bytes, a key used for partition routing (optional) + """ return Message(0, 0, key, payload) @@ -562,10 +553,10 @@ def create_gzip_message(payloads, key=None): The given payloads will be encoded, compressed, and sent as a single atomic message to Kafka. - Params - ====== - payloads: list(bytes), a list of payload to send be sent to Kafka - key: bytes, a key used for partition routing (optional) + Arguments: + payloads: list(bytes), a list of payload to send be sent to Kafka + key: bytes, a key used for partition routing (optional) + """ message_set = KafkaProtocol._encode_message_set( [create_message(payload, key) for payload in payloads]) @@ -583,10 +574,10 @@ def create_snappy_message(payloads, key=None): The given payloads will be encoded, compressed, and sent as a single atomic message to Kafka. - Params - ====== - payloads: list(bytes), a list of payload to send be sent to Kafka - key: bytes, a key used for partition routing (optional) + Arguments: + payloads: list(bytes), a list of payload to send be sent to Kafka + key: bytes, a key used for partition routing (optional) + """ message_set = KafkaProtocol._encode_message_set( [create_message(payload, key) for payload in payloads]) |