summaryrefslogtreecommitdiff
path: root/kafka/protocol.py
diff options
context:
space:
mode:
authormrtheb <mrlabbe@gmail.com>2014-01-31 22:43:59 -0500
committermrtheb <mrlabbe@gmail.com>2014-01-31 22:43:59 -0500
commit84de472a4d5b583ff3ed6cc6d92250a7c9291ceb (patch)
treee3d03da4eeecf8eab2dc63cf113a4daf82addf72 /kafka/protocol.py
parent0bdff4e833f73518a7219fca04dfbc3ed201b06e (diff)
parent4abf7ee1fbbdc47c8cb7b35f2600e58f1f95e6bb (diff)
downloadkafka-python-84de472a4d5b583ff3ed6cc6d92250a7c9291ceb.tar.gz
Merge branch 'master' into multihosts
Conflicts: kafka/client.py kafka/conn.py setup.py test/test_integration.py test/test_unit.py
Diffstat (limited to 'kafka/protocol.py')
-rw-r--r--kafka/protocol.py30
1 files changed, 20 insertions, 10 deletions
diff --git a/kafka/protocol.py b/kafka/protocol.py
index 612acf6..25be023 100644
--- a/kafka/protocol.py
+++ b/kafka/protocol.py
@@ -29,8 +29,8 @@ class KafkaProtocol(object):
FETCH_KEY = 1
OFFSET_KEY = 2
METADATA_KEY = 3
- OFFSET_COMMIT_KEY = 6
- OFFSET_FETCH_KEY = 7
+ OFFSET_COMMIT_KEY = 8
+ OFFSET_FETCH_KEY = 9
ATTRIBUTE_CODEC_MASK = 0x03
CODEC_NONE = 0x00
@@ -119,9 +119,17 @@ class KafkaProtocol(object):
read_message = True
yield OffsetAndMessage(offset, message)
except BufferUnderflowError:
+ # NOTE: Not sure this is correct error handling:
+ # Is it possible to get a BUE if the message set is somewhere
+ # in the middle of the fetch response? If so, we probably have
+ # an issue that's not fetch size too small.
+ # Aren't we ignoring errors if we fail to unpack data by
+ # raising StopIteration()?
+ # If _decode_message() raises a ChecksumError, couldn't that
+ # also be due to the fetch size being too small?
if read_message is False:
# If we get a partial read of a message, but haven't
- # yielded anyhting there's a problem
+ # yielded anything there's a problem
raise ConsumerFetchSizeTooSmall()
else:
raise StopIteration()
@@ -171,7 +179,7 @@ class KafkaProtocol(object):
Params
======
client_id: string
- correlation_id: string
+ correlation_id: int
payloads: list of ProduceRequest
acks: How "acky" you want the request to be
0: immediate response
@@ -231,7 +239,7 @@ class KafkaProtocol(object):
Params
======
client_id: string
- correlation_id: string
+ correlation_id: int
payloads: list of FetchRequest
max_wait_time: int, how long to block waiting on min_bytes of data
min_bytes: int, the minimum number of bytes to accumulate before
@@ -338,7 +346,7 @@ class KafkaProtocol(object):
Params
======
client_id: string
- correlation_id: string
+ correlation_id: int
topics: list of strings
"""
topics = [] if topics is None else topics
@@ -376,12 +384,16 @@ class KafkaProtocol(object):
topic_metadata = {}
for i in range(num_topics):
+ # NOTE: topic_error is discarded. Should probably be returned with
+ # the topic metadata.
((topic_error,), cur) = relative_unpack('>h', data, cur)
(topic_name, cur) = read_short_string(data, cur)
((num_partitions,), cur) = relative_unpack('>i', data, cur)
partition_metadata = {}
for j in range(num_partitions):
+ # NOTE: partition_error_code is discarded. Should probably be
+ # returned with the partition metadata.
((partition_error_code, partition, leader, numReplicas), cur) = \
relative_unpack('>hiii', data, cur)
@@ -408,7 +420,7 @@ class KafkaProtocol(object):
Params
======
client_id: string
- correlation_id: string
+ correlation_id: int
group: string, the consumer group you are committing offsets for
payloads: list of OffsetCommitRequest
"""
@@ -439,7 +451,6 @@ class KafkaProtocol(object):
data: bytes to decode
"""
((correlation_id,), cur) = relative_unpack('>i', data, 0)
- (client_id, cur) = read_short_string(data, cur)
((num_topics,), cur) = relative_unpack('>i', data, cur)
for i in xrange(num_topics):
@@ -459,7 +470,7 @@ class KafkaProtocol(object):
Params
======
client_id: string
- correlation_id: string
+ correlation_id: int
group: string, the consumer group you are fetching offsets for
payloads: list of OffsetFetchRequest
"""
@@ -490,7 +501,6 @@ class KafkaProtocol(object):
"""
((correlation_id,), cur) = relative_unpack('>i', data, 0)
- (client_id, cur) = read_short_string(data, cur)
((num_topics,), cur) = relative_unpack('>i', data, cur)
for i in range(num_topics):