summaryrefslogtreecommitdiff
path: root/kafka/protocol.py
diff options
context:
space:
mode:
authormrtheb <mrlabbe@gmail.com>2013-10-03 22:52:04 -0400
committermrtheb <mrlabbe@gmail.com>2013-10-03 22:52:04 -0400
commita03f0c86b8a504c0e3185cac1611131dba24f625 (patch)
tree3797524d3411640968292c6eba0141fc4c1f3457 /kafka/protocol.py
parentb0cacc948539d180e4a634a06a10232770deb187 (diff)
downloadkafka-python-a03f0c86b8a504c0e3185cac1611131dba24f625.tar.gz
flake8 pass (pep8 and pyflakes)
Diffstat (limited to 'kafka/protocol.py')
-rw-r--r--kafka/protocol.py38
1 files changed, 19 insertions, 19 deletions
diff --git a/kafka/protocol.py b/kafka/protocol.py
index 421e19b..612acf6 100644
--- a/kafka/protocol.py
+++ b/kafka/protocol.py
@@ -25,12 +25,12 @@ class KafkaProtocol(object):
This class does not have any state associated with it, it is purely
for organization.
"""
- PRODUCE_KEY = 0
- FETCH_KEY = 1
- OFFSET_KEY = 2
- METADATA_KEY = 3
+ PRODUCE_KEY = 0
+ FETCH_KEY = 1
+ OFFSET_KEY = 2
+ METADATA_KEY = 3
OFFSET_COMMIT_KEY = 6
- OFFSET_FETCH_KEY = 7
+ OFFSET_FETCH_KEY = 7
ATTRIBUTE_CODEC_MASK = 0x03
CODEC_NONE = 0x00
@@ -120,8 +120,8 @@ class KafkaProtocol(object):
yield OffsetAndMessage(offset, message)
except BufferUnderflowError:
if read_message is False:
- # If we get a partial read of a message, but haven't yielded anyhting
- # there's a problem
+ # If we get a partial read of a message, but haven't
+ # yielded anyhting there's a problem
raise ConsumerFetchSizeTooSmall()
else:
raise StopIteration()
@@ -274,14 +274,14 @@ class KafkaProtocol(object):
for i in range(num_partitions):
((partition, error, highwater_mark_offset), cur) = \
- relative_unpack('>ihq', data, cur)
+ relative_unpack('>ihq', data, cur)
(message_set, cur) = read_int_string(data, cur)
yield FetchResponse(
- topic, partition, error,
- highwater_mark_offset,
- KafkaProtocol._decode_message_set_iter(message_set))
+ topic, partition, error,
+ highwater_mark_offset,
+ KafkaProtocol._decode_message_set_iter(message_set))
@classmethod
def encode_offset_request(cls, client_id, correlation_id, payloads=None):
@@ -321,7 +321,7 @@ class KafkaProtocol(object):
for i in range(num_partitions):
((partition, error, num_offsets,), cur) = \
- relative_unpack('>ihi', data, cur)
+ relative_unpack('>ihi', data, cur)
offsets = []
for j in range(num_offsets):
@@ -383,17 +383,17 @@ class KafkaProtocol(object):
for j in range(num_partitions):
((partition_error_code, partition, leader, numReplicas), cur) = \
- relative_unpack('>hiii', data, cur)
+ relative_unpack('>hiii', data, cur)
- (replicas, cur) = relative_unpack('>%di' % numReplicas,
- data, cur)
+ (replicas, cur) = relative_unpack(
+ '>%di' % numReplicas, data, cur)
((num_isr,), cur) = relative_unpack('>i', data, cur)
(isr, cur) = relative_unpack('>%di' % num_isr, data, cur)
partition_metadata[partition] = \
- PartitionMetadata(topic_name, partition, leader,
- replicas, isr)
+ PartitionMetadata(
+ topic_name, partition, leader, replicas, isr)
topic_metadata[topic_name] = partition_metadata
@@ -531,7 +531,7 @@ def create_gzip_message(payloads, key=None):
key: bytes, a key used for partition routing (optional)
"""
message_set = KafkaProtocol._encode_message_set(
- [create_message(payload) for payload in payloads])
+ [create_message(payload) for payload in payloads])
gzipped = gzip_encode(message_set)
codec = KafkaProtocol.ATTRIBUTE_CODEC_MASK & KafkaProtocol.CODEC_GZIP
@@ -552,7 +552,7 @@ def create_snappy_message(payloads, key=None):
key: bytes, a key used for partition routing (optional)
"""
message_set = KafkaProtocol._encode_message_set(
- [create_message(payload) for payload in payloads])
+ [create_message(payload) for payload in payloads])
snapped = snappy_encode(message_set)
codec = KafkaProtocol.ATTRIBUTE_CODEC_MASK & KafkaProtocol.CODEC_SNAPPY