diff options
author | David Arthur <mumrah@gmail.com> | 2013-10-04 04:54:12 -0700 |
---|---|---|
committer | David Arthur <mumrah@gmail.com> | 2013-10-04 04:54:12 -0700 |
commit | cfd9f86e60429d1f7af8bcac5849808354b8719e (patch) | |
tree | 82039e80d595b4ad611a831a597521cae7939571 /kafka/client.py | |
parent | b0cacc948539d180e4a634a06a10232770deb187 (diff) | |
parent | 59af614d1d09db6f7e0115dcf39232bf4f0ece9a (diff) | |
download | kafka-python-cfd9f86e60429d1f7af8bcac5849808354b8719e.tar.gz |
Merge pull request #59 from mrtheb/master
flake8 (pep8 and pyflakes) clean-up
Diffstat (limited to 'kafka/client.py')
-rw-r--r-- | kafka/client.py | 42 |
1 files changed, 21 insertions, 21 deletions
diff --git a/kafka/client.py b/kafka/client.py index 2fdb271..965cbc5 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -1,14 +1,10 @@ -import base64 from collections import defaultdict from functools import partial -from itertools import count, cycle +from itertools import count import logging -from operator import attrgetter -import struct import time -import zlib -from kafka.common import * +from kafka.common import ErrorMapping, TopicAndPartition from kafka.conn import KafkaConnection from kafka.protocol import KafkaProtocol @@ -212,8 +208,10 @@ class KafkaClient(object): order of input payloads """ - encoder = partial(KafkaProtocol.encode_produce_request, - acks=acks, timeout=timeout) + encoder = partial( + KafkaProtocol.encode_produce_request, + acks=acks, + timeout=timeout) if acks == 0: decoder = None @@ -226,10 +224,10 @@ class KafkaClient(object): for resp in resps: # Check for errors if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR: - raise Exception("ProduceRequest for %s failed with " - "errorcode=%d" % ( - TopicAndPartition(resp.topic, resp.partition), - resp.error)) + raise Exception( + "ProduceRequest for %s failed with errorcode=%d" % + (TopicAndPartition(resp.topic, resp.partition), + resp.error)) # Run the callback if callback is not None: @@ -251,17 +249,18 @@ class KafkaClient(object): max_wait_time=max_wait_time, min_bytes=min_bytes) - resps = self._send_broker_aware_request(payloads, encoder, - KafkaProtocol.decode_fetch_response) + resps = self._send_broker_aware_request( + payloads, encoder, + KafkaProtocol.decode_fetch_response) out = [] for resp in resps: # Check for errors if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR: - raise Exception("FetchRequest for %s failed with " - "errorcode=%d" % ( - TopicAndPartition(resp.topic, resp.partition), - resp.error)) + raise Exception( + "FetchRequest for %s failed with errorcode=%d" % + (TopicAndPartition(resp.topic, resp.partition), + resp.error)) # Run the callback if callback is not None: @@ -272,9 +271,10 @@ class KafkaClient(object): def send_offset_request(self, payloads=[], fail_on_error=True, callback=None): - resps = self._send_broker_aware_request(payloads, - KafkaProtocol.encode_offset_request, - KafkaProtocol.decode_offset_response) + resps = self._send_broker_aware_request( + payloads, + KafkaProtocol.encode_offset_request, + KafkaProtocol.decode_offset_response) out = [] for resp in resps: |