diff options
author | mrtheb <mrlabbe@gmail.com> | 2013-10-03 22:52:04 -0400 |
---|---|---|
committer | mrtheb <mrlabbe@gmail.com> | 2013-10-03 22:52:04 -0400 |
commit | a03f0c86b8a504c0e3185cac1611131dba24f625 (patch) | |
tree | 3797524d3411640968292c6eba0141fc4c1f3457 /kafka/client.py | |
parent | b0cacc948539d180e4a634a06a10232770deb187 (diff) | |
download | kafka-python-a03f0c86b8a504c0e3185cac1611131dba24f625.tar.gz |
flake8 pass (pep8 and pyflakes)
Diffstat (limited to 'kafka/client.py')
-rw-r--r-- | kafka/client.py | 41 |
1 files changed, 20 insertions, 21 deletions
diff --git a/kafka/client.py b/kafka/client.py index 2fdb271..a31ea66 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -1,14 +1,9 @@ -import base64 from collections import defaultdict from functools import partial -from itertools import count, cycle import logging -from operator import attrgetter -import struct import time -import zlib -from kafka.common import * +from kafka.common import count, ErrorMapping, TopicAndPartition from kafka.conn import KafkaConnection from kafka.protocol import KafkaProtocol @@ -212,8 +207,10 @@ class KafkaClient(object): order of input payloads """ - encoder = partial(KafkaProtocol.encode_produce_request, - acks=acks, timeout=timeout) + encoder = partial( + KafkaProtocol.encode_produce_request, + acks=acks, + timeout=timeout) if acks == 0: decoder = None @@ -226,10 +223,10 @@ class KafkaClient(object): for resp in resps: # Check for errors if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR: - raise Exception("ProduceRequest for %s failed with " - "errorcode=%d" % ( - TopicAndPartition(resp.topic, resp.partition), - resp.error)) + raise Exception( + "ProduceRequest for %s failed with errorcode=%d" % + (TopicAndPartition(resp.topic, resp.partition), + resp.error)) # Run the callback if callback is not None: @@ -251,17 +248,18 @@ class KafkaClient(object): max_wait_time=max_wait_time, min_bytes=min_bytes) - resps = self._send_broker_aware_request(payloads, encoder, - KafkaProtocol.decode_fetch_response) + resps = self._send_broker_aware_request( + payloads, encoder, + KafkaProtocol.decode_fetch_response) out = [] for resp in resps: # Check for errors if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR: - raise Exception("FetchRequest for %s failed with " - "errorcode=%d" % ( - TopicAndPartition(resp.topic, resp.partition), - resp.error)) + raise Exception( + "FetchRequest for %s failed with errorcode=%d" % + (TopicAndPartition(resp.topic, resp.partition), + resp.error)) # Run the callback if callback is not None: @@ -272,9 +270,10 @@ class KafkaClient(object): def send_offset_request(self, payloads=[], fail_on_error=True, callback=None): - resps = self._send_broker_aware_request(payloads, - KafkaProtocol.encode_offset_request, - KafkaProtocol.decode_offset_response) + resps = self._send_broker_aware_request( + payloads, + KafkaProtocol.encode_offset_request, + KafkaProtocol.decode_offset_response) out = [] for resp in resps: |