From 93b6579a0f5192b733e403acc38fc5ae952d4a08 Mon Sep 17 00:00:00 2001 From: Omar Ghishan Date: Thu, 2 Jan 2014 18:10:57 -0800 Subject: Add and fix comments to protocol.py --- kafka/protocol.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) (limited to 'kafka/protocol.py') diff --git a/kafka/protocol.py b/kafka/protocol.py index 612acf6..74a0dce 100644 --- a/kafka/protocol.py +++ b/kafka/protocol.py @@ -121,7 +121,7 @@ class KafkaProtocol(object): except BufferUnderflowError: if read_message is False: # If we get a partial read of a message, but haven't - # yielded anyhting there's a problem + # yielded anything there's a problem raise ConsumerFetchSizeTooSmall() else: raise StopIteration() @@ -171,7 +171,7 @@ class KafkaProtocol(object): Params ====== client_id: string - correlation_id: string + correlation_id: int payloads: list of ProduceRequest acks: How "acky" you want the request to be 0: immediate response @@ -231,7 +231,7 @@ class KafkaProtocol(object): Params ====== client_id: string - correlation_id: string + correlation_id: int payloads: list of FetchRequest max_wait_time: int, how long to block waiting on min_bytes of data min_bytes: int, the minimum number of bytes to accumulate before @@ -338,7 +338,7 @@ class KafkaProtocol(object): Params ====== client_id: string - correlation_id: string + correlation_id: int topics: list of strings """ topics = [] if topics is None else topics @@ -376,12 +376,16 @@ class KafkaProtocol(object): topic_metadata = {} for i in range(num_topics): + # NOTE: topic_error is discarded. Should probably be returned with + # the topic metadata. ((topic_error,), cur) = relative_unpack('>h', data, cur) (topic_name, cur) = read_short_string(data, cur) ((num_partitions,), cur) = relative_unpack('>i', data, cur) partition_metadata = {} for j in range(num_partitions): + # NOTE: partition_error_code is discarded. Should probably be + # returned with the partition metadata. ((partition_error_code, partition, leader, numReplicas), cur) = \ relative_unpack('>hiii', data, cur) @@ -408,7 +412,7 @@ class KafkaProtocol(object): Params ====== client_id: string - correlation_id: string + correlation_id: int group: string, the consumer group you are committing offsets for payloads: list of OffsetCommitRequest """ @@ -459,7 +463,7 @@ class KafkaProtocol(object): Params ====== client_id: string - correlation_id: string + correlation_id: int group: string, the consumer group you are fetching offsets for payloads: list of OffsetFetchRequest """ -- cgit v1.2.1 From 009ed92894b03a95a473359e64c5499665697b10 Mon Sep 17 00:00:00 2001 From: Omar Ghishan Date: Thu, 2 Jan 2014 18:14:44 -0800 Subject: Add note about questionable error handling while decoding messages. Will remove once any error handling issues are resolved. --- kafka/protocol.py | 8 ++++++++ 1 file changed, 8 insertions(+) (limited to 'kafka/protocol.py') diff --git a/kafka/protocol.py b/kafka/protocol.py index 74a0dce..54b8eee 100644 --- a/kafka/protocol.py +++ b/kafka/protocol.py @@ -119,6 +119,14 @@ class KafkaProtocol(object): read_message = True yield OffsetAndMessage(offset, message) except BufferUnderflowError: + # NOTE: Not sure this is correct error handling: + # Is it possible to get a BUE if the message set is somewhere + # in the middle of the fetch response? If so, we probably have + # an issue that's not fetch size too small. + # Aren't we ignoring errors if we fail to unpack data by + # raising StopIteration()? + # If _decode_message() raises a ChecksumError, couldn't that + # also be due to the fetch size being too small? if read_message is False: # If we get a partial read of a message, but haven't # yielded anything there's a problem -- cgit v1.2.1 From 55816493c76b891fca1e719eff0fe48e3f43f27f Mon Sep 17 00:00:00 2001 From: Ashish Walia Date: Tue, 7 Jan 2014 12:27:50 -0500 Subject: Syncing offset commit and fetch api keys with Kafka trunk code --- kafka/protocol.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'kafka/protocol.py') diff --git a/kafka/protocol.py b/kafka/protocol.py index 612acf6..eded41c 100644 --- a/kafka/protocol.py +++ b/kafka/protocol.py @@ -29,8 +29,8 @@ class KafkaProtocol(object): FETCH_KEY = 1 OFFSET_KEY = 2 METADATA_KEY = 3 - OFFSET_COMMIT_KEY = 6 - OFFSET_FETCH_KEY = 7 + OFFSET_COMMIT_KEY = 8 + OFFSET_FETCH_KEY = 9 ATTRIBUTE_CODEC_MASK = 0x03 CODEC_NONE = 0x00 -- cgit v1.2.1 From daabc9f28dd9283b45e5ccd854e8fd8800419d3c Mon Sep 17 00:00:00 2001 From: Ashish Walia Date: Tue, 7 Jan 2014 12:30:52 -0500 Subject: Deleting client_id from offset commit and fetch response as per Kafka trunk code --- kafka/protocol.py | 2 -- 1 file changed, 2 deletions(-) (limited to 'kafka/protocol.py') diff --git a/kafka/protocol.py b/kafka/protocol.py index eded41c..ac1bafd 100644 --- a/kafka/protocol.py +++ b/kafka/protocol.py @@ -439,7 +439,6 @@ class KafkaProtocol(object): data: bytes to decode """ ((correlation_id,), cur) = relative_unpack('>i', data, 0) - (client_id, cur) = read_short_string(data, cur) ((num_topics,), cur) = relative_unpack('>i', data, cur) for i in xrange(num_topics): @@ -490,7 +489,6 @@ class KafkaProtocol(object): """ ((correlation_id,), cur) = relative_unpack('>i', data, 0) - (client_id, cur) = read_short_string(data, cur) ((num_topics,), cur) = relative_unpack('>i', data, cur) for i in range(num_topics): -- cgit v1.2.1