diff options
author | mrtheb <mrlabbe@gmail.com> | 2013-10-03 22:52:04 -0400 |
---|---|---|
committer | mrtheb <mrlabbe@gmail.com> | 2013-10-03 22:52:04 -0400 |
commit | a03f0c86b8a504c0e3185cac1611131dba24f625 (patch) | |
tree | 3797524d3411640968292c6eba0141fc4c1f3457 /kafka/protocol.py | |
parent | b0cacc948539d180e4a634a06a10232770deb187 (diff) | |
download | kafka-python-a03f0c86b8a504c0e3185cac1611131dba24f625.tar.gz |
flake8 pass (pep8 and pyflakes)
Diffstat (limited to 'kafka/protocol.py')
-rw-r--r-- | kafka/protocol.py | 38 |
1 files changed, 19 insertions, 19 deletions
diff --git a/kafka/protocol.py b/kafka/protocol.py index 421e19b..612acf6 100644 --- a/kafka/protocol.py +++ b/kafka/protocol.py @@ -25,12 +25,12 @@ class KafkaProtocol(object): This class does not have any state associated with it, it is purely for organization. """ - PRODUCE_KEY = 0 - FETCH_KEY = 1 - OFFSET_KEY = 2 - METADATA_KEY = 3 + PRODUCE_KEY = 0 + FETCH_KEY = 1 + OFFSET_KEY = 2 + METADATA_KEY = 3 OFFSET_COMMIT_KEY = 6 - OFFSET_FETCH_KEY = 7 + OFFSET_FETCH_KEY = 7 ATTRIBUTE_CODEC_MASK = 0x03 CODEC_NONE = 0x00 @@ -120,8 +120,8 @@ class KafkaProtocol(object): yield OffsetAndMessage(offset, message) except BufferUnderflowError: if read_message is False: - # If we get a partial read of a message, but haven't yielded anyhting - # there's a problem + # If we get a partial read of a message, but haven't + # yielded anyhting there's a problem raise ConsumerFetchSizeTooSmall() else: raise StopIteration() @@ -274,14 +274,14 @@ class KafkaProtocol(object): for i in range(num_partitions): ((partition, error, highwater_mark_offset), cur) = \ - relative_unpack('>ihq', data, cur) + relative_unpack('>ihq', data, cur) (message_set, cur) = read_int_string(data, cur) yield FetchResponse( - topic, partition, error, - highwater_mark_offset, - KafkaProtocol._decode_message_set_iter(message_set)) + topic, partition, error, + highwater_mark_offset, + KafkaProtocol._decode_message_set_iter(message_set)) @classmethod def encode_offset_request(cls, client_id, correlation_id, payloads=None): @@ -321,7 +321,7 @@ class KafkaProtocol(object): for i in range(num_partitions): ((partition, error, num_offsets,), cur) = \ - relative_unpack('>ihi', data, cur) + relative_unpack('>ihi', data, cur) offsets = [] for j in range(num_offsets): @@ -383,17 +383,17 @@ class KafkaProtocol(object): for j in range(num_partitions): ((partition_error_code, partition, leader, numReplicas), cur) = \ - relative_unpack('>hiii', data, cur) + relative_unpack('>hiii', data, cur) - (replicas, cur) = relative_unpack('>%di' % numReplicas, - data, cur) + (replicas, cur) = relative_unpack( + '>%di' % numReplicas, data, cur) ((num_isr,), cur) = relative_unpack('>i', data, cur) (isr, cur) = relative_unpack('>%di' % num_isr, data, cur) partition_metadata[partition] = \ - PartitionMetadata(topic_name, partition, leader, - replicas, isr) + PartitionMetadata( + topic_name, partition, leader, replicas, isr) topic_metadata[topic_name] = partition_metadata @@ -531,7 +531,7 @@ def create_gzip_message(payloads, key=None): key: bytes, a key used for partition routing (optional) """ message_set = KafkaProtocol._encode_message_set( - [create_message(payload) for payload in payloads]) + [create_message(payload) for payload in payloads]) gzipped = gzip_encode(message_set) codec = KafkaProtocol.ATTRIBUTE_CODEC_MASK & KafkaProtocol.CODEC_GZIP @@ -552,7 +552,7 @@ def create_snappy_message(payloads, key=None): key: bytes, a key used for partition routing (optional) """ message_set = KafkaProtocol._encode_message_set( - [create_message(payload) for payload in payloads]) + [create_message(payload) for payload in payloads]) snapped = snappy_encode(message_set) codec = KafkaProtocol.ATTRIBUTE_CODEC_MASK & KafkaProtocol.CODEC_SNAPPY |