summaryrefslogtreecommitdiff
path: root/kafka/client.py
diff options
context:
space:
mode:
authormrtheb <mrlabbe@gmail.com>2013-10-03 22:52:04 -0400
committermrtheb <mrlabbe@gmail.com>2013-10-03 22:52:04 -0400
commita03f0c86b8a504c0e3185cac1611131dba24f625 (patch)
tree3797524d3411640968292c6eba0141fc4c1f3457 /kafka/client.py
parentb0cacc948539d180e4a634a06a10232770deb187 (diff)
downloadkafka-python-a03f0c86b8a504c0e3185cac1611131dba24f625.tar.gz
flake8 pass (pep8 and pyflakes)
Diffstat (limited to 'kafka/client.py')
-rw-r--r--kafka/client.py41
1 files changed, 20 insertions, 21 deletions
diff --git a/kafka/client.py b/kafka/client.py
index 2fdb271..a31ea66 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -1,14 +1,9 @@
-import base64
from collections import defaultdict
from functools import partial
-from itertools import count, cycle
import logging
-from operator import attrgetter
-import struct
import time
-import zlib
-from kafka.common import *
+from kafka.common import count, ErrorMapping, TopicAndPartition
from kafka.conn import KafkaConnection
from kafka.protocol import KafkaProtocol
@@ -212,8 +207,10 @@ class KafkaClient(object):
order of input payloads
"""
- encoder = partial(KafkaProtocol.encode_produce_request,
- acks=acks, timeout=timeout)
+ encoder = partial(
+ KafkaProtocol.encode_produce_request,
+ acks=acks,
+ timeout=timeout)
if acks == 0:
decoder = None
@@ -226,10 +223,10 @@ class KafkaClient(object):
for resp in resps:
# Check for errors
if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR:
- raise Exception("ProduceRequest for %s failed with "
- "errorcode=%d" % (
- TopicAndPartition(resp.topic, resp.partition),
- resp.error))
+ raise Exception(
+ "ProduceRequest for %s failed with errorcode=%d" %
+ (TopicAndPartition(resp.topic, resp.partition),
+ resp.error))
# Run the callback
if callback is not None:
@@ -251,17 +248,18 @@ class KafkaClient(object):
max_wait_time=max_wait_time,
min_bytes=min_bytes)
- resps = self._send_broker_aware_request(payloads, encoder,
- KafkaProtocol.decode_fetch_response)
+ resps = self._send_broker_aware_request(
+ payloads, encoder,
+ KafkaProtocol.decode_fetch_response)
out = []
for resp in resps:
# Check for errors
if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR:
- raise Exception("FetchRequest for %s failed with "
- "errorcode=%d" % (
- TopicAndPartition(resp.topic, resp.partition),
- resp.error))
+ raise Exception(
+ "FetchRequest for %s failed with errorcode=%d" %
+ (TopicAndPartition(resp.topic, resp.partition),
+ resp.error))
# Run the callback
if callback is not None:
@@ -272,9 +270,10 @@ class KafkaClient(object):
def send_offset_request(self, payloads=[], fail_on_error=True,
callback=None):
- resps = self._send_broker_aware_request(payloads,
- KafkaProtocol.encode_offset_request,
- KafkaProtocol.decode_offset_response)
+ resps = self._send_broker_aware_request(
+ payloads,
+ KafkaProtocol.encode_offset_request,
+ KafkaProtocol.decode_offset_response)
out = []
for resp in resps: