diff options
Diffstat (limited to 'kafka/protocol.py')
-rw-r--r-- | kafka/protocol.py | 16 |
1 files changed, 10 insertions, 6 deletions
diff --git a/kafka/protocol.py b/kafka/protocol.py index 612acf6..74a0dce 100644 --- a/kafka/protocol.py +++ b/kafka/protocol.py @@ -121,7 +121,7 @@ class KafkaProtocol(object): except BufferUnderflowError: if read_message is False: # If we get a partial read of a message, but haven't - # yielded anyhting there's a problem + # yielded anything there's a problem raise ConsumerFetchSizeTooSmall() else: raise StopIteration() @@ -171,7 +171,7 @@ class KafkaProtocol(object): Params ====== client_id: string - correlation_id: string + correlation_id: int payloads: list of ProduceRequest acks: How "acky" you want the request to be 0: immediate response @@ -231,7 +231,7 @@ class KafkaProtocol(object): Params ====== client_id: string - correlation_id: string + correlation_id: int payloads: list of FetchRequest max_wait_time: int, how long to block waiting on min_bytes of data min_bytes: int, the minimum number of bytes to accumulate before @@ -338,7 +338,7 @@ class KafkaProtocol(object): Params ====== client_id: string - correlation_id: string + correlation_id: int topics: list of strings """ topics = [] if topics is None else topics @@ -376,12 +376,16 @@ class KafkaProtocol(object): topic_metadata = {} for i in range(num_topics): + # NOTE: topic_error is discarded. Should probably be returned with + # the topic metadata. ((topic_error,), cur) = relative_unpack('>h', data, cur) (topic_name, cur) = read_short_string(data, cur) ((num_partitions,), cur) = relative_unpack('>i', data, cur) partition_metadata = {} for j in range(num_partitions): + # NOTE: partition_error_code is discarded. Should probably be + # returned with the partition metadata. ((partition_error_code, partition, leader, numReplicas), cur) = \ relative_unpack('>hiii', data, cur) @@ -408,7 +412,7 @@ class KafkaProtocol(object): Params ====== client_id: string - correlation_id: string + correlation_id: int group: string, the consumer group you are committing offsets for payloads: list of OffsetCommitRequest """ @@ -459,7 +463,7 @@ class KafkaProtocol(object): Params ====== client_id: string - correlation_id: string + correlation_id: int group: string, the consumer group you are fetching offsets for payloads: list of OffsetFetchRequest """ |