summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Arthur <mumrah@gmail.com>2013-10-04 04:54:12 -0700
committerDavid Arthur <mumrah@gmail.com>2013-10-04 04:54:12 -0700
commitcfd9f86e60429d1f7af8bcac5849808354b8719e (patch)
tree82039e80d595b4ad611a831a597521cae7939571
parentb0cacc948539d180e4a634a06a10232770deb187 (diff)
parent59af614d1d09db6f7e0115dcf39232bf4f0ece9a (diff)
downloadkafka-python-cfd9f86e60429d1f7af8bcac5849808354b8719e.tar.gz
Merge pull request #59 from mrtheb/master
flake8 (pep8 and pyflakes) clean-up
-rw-r--r--kafka/client.py42
-rw-r--r--kafka/conn.py20
-rw-r--r--kafka/consumer.py53
-rw-r--r--kafka/protocol.py38
-rw-r--r--kafka/queue.py19
-rw-r--r--kafka/util.py6
-rw-r--r--test/test_unit.py4
7 files changed, 100 insertions, 82 deletions
diff --git a/kafka/client.py b/kafka/client.py
index 2fdb271..965cbc5 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -1,14 +1,10 @@
-import base64
from collections import defaultdict
from functools import partial
-from itertools import count, cycle
+from itertools import count
import logging
-from operator import attrgetter
-import struct
import time
-import zlib
-from kafka.common import *
+from kafka.common import ErrorMapping, TopicAndPartition
from kafka.conn import KafkaConnection
from kafka.protocol import KafkaProtocol
@@ -212,8 +208,10 @@ class KafkaClient(object):
order of input payloads
"""
- encoder = partial(KafkaProtocol.encode_produce_request,
- acks=acks, timeout=timeout)
+ encoder = partial(
+ KafkaProtocol.encode_produce_request,
+ acks=acks,
+ timeout=timeout)
if acks == 0:
decoder = None
@@ -226,10 +224,10 @@ class KafkaClient(object):
for resp in resps:
# Check for errors
if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR:
- raise Exception("ProduceRequest for %s failed with "
- "errorcode=%d" % (
- TopicAndPartition(resp.topic, resp.partition),
- resp.error))
+ raise Exception(
+ "ProduceRequest for %s failed with errorcode=%d" %
+ (TopicAndPartition(resp.topic, resp.partition),
+ resp.error))
# Run the callback
if callback is not None:
@@ -251,17 +249,18 @@ class KafkaClient(object):
max_wait_time=max_wait_time,
min_bytes=min_bytes)
- resps = self._send_broker_aware_request(payloads, encoder,
- KafkaProtocol.decode_fetch_response)
+ resps = self._send_broker_aware_request(
+ payloads, encoder,
+ KafkaProtocol.decode_fetch_response)
out = []
for resp in resps:
# Check for errors
if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR:
- raise Exception("FetchRequest for %s failed with "
- "errorcode=%d" % (
- TopicAndPartition(resp.topic, resp.partition),
- resp.error))
+ raise Exception(
+ "FetchRequest for %s failed with errorcode=%d" %
+ (TopicAndPartition(resp.topic, resp.partition),
+ resp.error))
# Run the callback
if callback is not None:
@@ -272,9 +271,10 @@ class KafkaClient(object):
def send_offset_request(self, payloads=[], fail_on_error=True,
callback=None):
- resps = self._send_broker_aware_request(payloads,
- KafkaProtocol.encode_offset_request,
- KafkaProtocol.decode_offset_response)
+ resps = self._send_broker_aware_request(
+ payloads,
+ KafkaProtocol.encode_offset_request,
+ KafkaProtocol.decode_offset_response)
out = []
for resp in resps:
diff --git a/kafka/conn.py b/kafka/conn.py
index 29efbf1..e85fd11 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -3,6 +3,8 @@ import socket
import struct
from threading import local
+from kafka.common import BufferUnderflowError
+
log = logging.getLogger("kafka")
@@ -12,7 +14,7 @@ class KafkaConnection(local):
A socket connection to a single Kafka broker
This class is _not_ thread safe. Each call to `send` must be followed
- by a call to `recv` in order to get the correct response. Eventually,
+ by a call to `recv` in order to get the correct response. Eventually,
we can do something in here to facilitate multiplexed requests/responses
since the Kafka API includes a correlation id.
"""
@@ -43,7 +45,7 @@ class KafkaConnection(local):
def _consume_response_iter(self):
"""
- This method handles the response header and error messages. It
+ This method handles the response header and error messages. It
then returns an iterator for the chunks of the response
"""
log.debug("Handling response from Kafka")
@@ -57,13 +59,15 @@ class KafkaConnection(local):
messagesize = size - 4
log.debug("About to read %d bytes from Kafka", messagesize)
- # Read the remainder of the response
+ # Read the remainder of the response
total = 0
while total < messagesize:
resp = self._sock.recv(self.bufsize)
log.debug("Read %d bytes from Kafka", len(resp))
if resp == "":
- raise BufferUnderflowError("Not enough data to read this response")
+ raise BufferUnderflowError(
+ "Not enough data to read this response")
+
total += len(resp)
yield resp
@@ -75,9 +79,13 @@ class KafkaConnection(local):
def send(self, request_id, payload):
"Send a request to Kafka"
- log.debug("About to send %d bytes to Kafka, request %d" % (len(payload), request_id))
+
+ log.debug(
+ "About to send %d bytes to Kafka, request %d" %
+ (len(payload), request_id))
+
sent = self._sock.sendall(payload)
- if sent != None:
+ if sent is not None:
raise RuntimeError("Kafka went away")
def recv(self, request_id):
diff --git a/kafka/consumer.py b/kafka/consumer.py
index a97e8c0..7d44f28 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -8,7 +8,7 @@ from Queue import Empty
from kafka.common import (
ErrorMapping, FetchRequest,
- OffsetRequest, OffsetFetchRequest, OffsetCommitRequest,
+ OffsetRequest, OffsetCommitRequest,
ConsumerFetchSizeTooSmall, ConsumerNoMoreData
)
@@ -223,11 +223,12 @@ class SimpleConsumer(Consumer):
self.fetch_min_bytes = fetch_size_bytes
self.fetch_started = defaultdict(bool) # defaults to false
- super(SimpleConsumer, self).__init__(client, group, topic,
- partitions=partitions,
- auto_commit=auto_commit,
- auto_commit_every_n=auto_commit_every_n,
- auto_commit_every_t=auto_commit_every_t)
+ super(SimpleConsumer, self).__init__(
+ client, group, topic,
+ partitions=partitions,
+ auto_commit=auto_commit,
+ auto_commit_every_n=auto_commit_every_n,
+ auto_commit_every_t=auto_commit_every_t)
def provide_partition_info(self):
"""
@@ -275,8 +276,8 @@ class SimpleConsumer(Consumer):
resps = self.client.send_offset_request(reqs)
for resp in resps:
- self.offsets[resp.partition] = resp.offsets[0] + \
- deltas[resp.partition]
+ self.offsets[resp.partition] = \
+ resp.offsets[0] + deltas[resp.partition]
else:
raise ValueError("Unexpected value for `whence`, %d" % whence)
@@ -364,9 +365,10 @@ class SimpleConsumer(Consumer):
req = FetchRequest(
self.topic, partition, offset, self.client.bufsize)
- (resp,) = self.client.send_fetch_request([req],
- max_wait_time=self.fetch_max_wait_time,
- min_bytes=fetch_size)
+ (resp,) = self.client.send_fetch_request(
+ [req],
+ max_wait_time=self.fetch_max_wait_time,
+ min_bytes=fetch_size)
assert resp.topic == self.topic
assert resp.partition == partition
@@ -376,18 +378,22 @@ class SimpleConsumer(Consumer):
for message in resp.messages:
next_offset = message.offset
- # update the offset before the message is yielded. This is
- # so that the consumer state is not lost in certain cases.
- # For eg: the message is yielded and consumed by the caller,
- # but the caller does not come back into the generator again.
- # The message will be consumed but the status will not be
- # updated in the consumer
+ # update the offset before the message is yielded. This
+ # is so that the consumer state is not lost in certain
+ # cases.
+ #
+ # For eg: the message is yielded and consumed by the
+ # caller, but the caller does not come back into the
+ # generator again. The message will be consumed but the
+ # status will not be updated in the consumer
self.fetch_started[partition] = True
self.offsets[partition] = message.offset
yield message
except ConsumerFetchSizeTooSmall, e:
- log.warn("Fetch size is too small, increasing by 1.5x and retrying")
fetch_size *= 1.5
+ log.warn(
+ "Fetch size too small, increasing to %d (1.5x) and retry",
+ fetch_size)
continue
except ConsumerNoMoreData, e:
log.debug("Iteration was ended by %r", e)
@@ -429,11 +435,12 @@ class MultiProcessConsumer(Consumer):
num_procs=1, partitions_per_proc=0):
# Initiate the base consumer class
- super(MultiProcessConsumer, self).__init__(client, group, topic,
- partitions=None,
- auto_commit=auto_commit,
- auto_commit_every_n=auto_commit_every_n,
- auto_commit_every_t=auto_commit_every_t)
+ super(MultiProcessConsumer, self).__init__(
+ client, group, topic,
+ partitions=None,
+ auto_commit=auto_commit,
+ auto_commit_every_n=auto_commit_every_n,
+ auto_commit_every_t=auto_commit_every_t)
# Variables for managing and controlling the data flow from
# consumer child process to master
diff --git a/kafka/protocol.py b/kafka/protocol.py
index 421e19b..612acf6 100644
--- a/kafka/protocol.py
+++ b/kafka/protocol.py
@@ -25,12 +25,12 @@ class KafkaProtocol(object):
This class does not have any state associated with it, it is purely
for organization.
"""
- PRODUCE_KEY = 0
- FETCH_KEY = 1
- OFFSET_KEY = 2
- METADATA_KEY = 3
+ PRODUCE_KEY = 0
+ FETCH_KEY = 1
+ OFFSET_KEY = 2
+ METADATA_KEY = 3
OFFSET_COMMIT_KEY = 6
- OFFSET_FETCH_KEY = 7
+ OFFSET_FETCH_KEY = 7
ATTRIBUTE_CODEC_MASK = 0x03
CODEC_NONE = 0x00
@@ -120,8 +120,8 @@ class KafkaProtocol(object):
yield OffsetAndMessage(offset, message)
except BufferUnderflowError:
if read_message is False:
- # If we get a partial read of a message, but haven't yielded anyhting
- # there's a problem
+ # If we get a partial read of a message, but haven't
+ # yielded anyhting there's a problem
raise ConsumerFetchSizeTooSmall()
else:
raise StopIteration()
@@ -274,14 +274,14 @@ class KafkaProtocol(object):
for i in range(num_partitions):
((partition, error, highwater_mark_offset), cur) = \
- relative_unpack('>ihq', data, cur)
+ relative_unpack('>ihq', data, cur)
(message_set, cur) = read_int_string(data, cur)
yield FetchResponse(
- topic, partition, error,
- highwater_mark_offset,
- KafkaProtocol._decode_message_set_iter(message_set))
+ topic, partition, error,
+ highwater_mark_offset,
+ KafkaProtocol._decode_message_set_iter(message_set))
@classmethod
def encode_offset_request(cls, client_id, correlation_id, payloads=None):
@@ -321,7 +321,7 @@ class KafkaProtocol(object):
for i in range(num_partitions):
((partition, error, num_offsets,), cur) = \
- relative_unpack('>ihi', data, cur)
+ relative_unpack('>ihi', data, cur)
offsets = []
for j in range(num_offsets):
@@ -383,17 +383,17 @@ class KafkaProtocol(object):
for j in range(num_partitions):
((partition_error_code, partition, leader, numReplicas), cur) = \
- relative_unpack('>hiii', data, cur)
+ relative_unpack('>hiii', data, cur)
- (replicas, cur) = relative_unpack('>%di' % numReplicas,
- data, cur)
+ (replicas, cur) = relative_unpack(
+ '>%di' % numReplicas, data, cur)
((num_isr,), cur) = relative_unpack('>i', data, cur)
(isr, cur) = relative_unpack('>%di' % num_isr, data, cur)
partition_metadata[partition] = \
- PartitionMetadata(topic_name, partition, leader,
- replicas, isr)
+ PartitionMetadata(
+ topic_name, partition, leader, replicas, isr)
topic_metadata[topic_name] = partition_metadata
@@ -531,7 +531,7 @@ def create_gzip_message(payloads, key=None):
key: bytes, a key used for partition routing (optional)
"""
message_set = KafkaProtocol._encode_message_set(
- [create_message(payload) for payload in payloads])
+ [create_message(payload) for payload in payloads])
gzipped = gzip_encode(message_set)
codec = KafkaProtocol.ATTRIBUTE_CODEC_MASK & KafkaProtocol.CODEC_GZIP
@@ -552,7 +552,7 @@ def create_snappy_message(payloads, key=None):
key: bytes, a key used for partition routing (optional)
"""
message_set = KafkaProtocol._encode_message_set(
- [create_message(payload) for payload in payloads])
+ [create_message(payload) for payload in payloads])
snapped = snappy_encode(message_set)
codec = KafkaProtocol.ATTRIBUTE_CODEC_MASK & KafkaProtocol.CODEC_SNAPPY
diff --git a/kafka/queue.py b/kafka/queue.py
index 41f1c31..a996369 100644
--- a/kafka/queue.py
+++ b/kafka/queue.py
@@ -25,8 +25,9 @@ class KafkaConsumerProcess(Process):
Process.__init__(self)
def __str__(self):
- return "[KafkaConsumerProcess: topic=%s, partition=%s, sleep=%s]" % \
- (self.topic, self.partition, self.consumer_sleep)
+ return "[KafkaConsumerProcess: topic=%s, \
+ partition=%s, sleep=%s]" % \
+ (self.topic, self.partition, self.consumer_sleep)
def run(self):
self.barrier.wait()
@@ -70,10 +71,12 @@ class KafkaProducerProcess(Process):
Process.__init__(self)
def __str__(self):
- return "[KafkaProducerProcess: topic=%s, flush_buffer=%s, \
- flush_timeout=%s, timeout=%s]" % (
- self.topic, self.producer_flush_buffer,
- self.producer_flush_timeout, self.producer_timeout)
+ return "[KafkaProducerProcess: topic=%s, \
+ flush_buffer=%s, flush_timeout=%s, timeout=%s]" % \
+ (self.topic,
+ self.producer_flush_buffer,
+ self.producer_flush_timeout,
+ self.producer_timeout)
def run(self):
self.barrier.wait()
@@ -104,8 +107,8 @@ class KafkaProducerProcess(Process):
last_produce = time.time()
try:
- msg = KafkaClient.create_message(self.in_queue.get(True,
- self.producer_timeout))
+ msg = KafkaClient.create_message(
+ self.in_queue.get(True, self.producer_timeout))
messages.append(msg)
except Empty:
diff --git a/kafka/util.py b/kafka/util.py
index 96b3745..54052fb 100644
--- a/kafka/util.py
+++ b/kafka/util.py
@@ -1,9 +1,8 @@
from collections import defaultdict
-from itertools import groupby
import struct
from threading import Thread, Event
-from common import *
+from kafka.common import BufferUnderflowError
def write_int_string(s):
@@ -39,7 +38,8 @@ def read_short_string(data, cur):
def read_int_string(data, cur):
if len(data) < cur + 4:
raise BufferUnderflowError(
- "Not enough data left to read string len (%d < %d)" % (len(data), cur + 4))
+ "Not enough data left to read string len (%d < %d)" %
+ (len(data), cur + 4))
(strlen,) = struct.unpack('>i', data[cur:cur + 4])
if strlen == -1:
diff --git a/test/test_unit.py b/test/test_unit.py
index c796c94..3f3af66 100644
--- a/test/test_unit.py
+++ b/test/test_unit.py
@@ -3,7 +3,8 @@ import random
import struct
import unittest
-from kafka.client import KafkaClient, ProduceRequest, FetchRequest
+from kafka.client import KafkaClient
+from kafka.common import ProduceRequest, FetchRequest
from kafka.codec import (
has_gzip, has_snappy,
gzip_encode, gzip_decode,
@@ -59,7 +60,6 @@ class TestMisc(unittest.TestCase):
def test_length_prefix(self):
for i in xrange(ITERATIONS):
s1 = random_string()
- s2 = length_prefix_message(s1)
self.assertEquals(struct.unpack('>i', s2[0:4])[0], len(s1))