summaryrefslogtreecommitdiff
path: root/kafka/consumer.py
diff options
context:
space:
mode:
authormrtheb <mrlabbe@gmail.com>2013-10-03 22:52:04 -0400
committermrtheb <mrlabbe@gmail.com>2013-10-03 22:52:04 -0400
commita03f0c86b8a504c0e3185cac1611131dba24f625 (patch)
tree3797524d3411640968292c6eba0141fc4c1f3457 /kafka/consumer.py
parentb0cacc948539d180e4a634a06a10232770deb187 (diff)
downloadkafka-python-a03f0c86b8a504c0e3185cac1611131dba24f625.tar.gz
flake8 pass (pep8 and pyflakes)
Diffstat (limited to 'kafka/consumer.py')
-rw-r--r--kafka/consumer.py53
1 files changed, 30 insertions, 23 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py
index a97e8c0..7d44f28 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -8,7 +8,7 @@ from Queue import Empty
from kafka.common import (
ErrorMapping, FetchRequest,
- OffsetRequest, OffsetFetchRequest, OffsetCommitRequest,
+ OffsetRequest, OffsetCommitRequest,
ConsumerFetchSizeTooSmall, ConsumerNoMoreData
)
@@ -223,11 +223,12 @@ class SimpleConsumer(Consumer):
self.fetch_min_bytes = fetch_size_bytes
self.fetch_started = defaultdict(bool) # defaults to false
- super(SimpleConsumer, self).__init__(client, group, topic,
- partitions=partitions,
- auto_commit=auto_commit,
- auto_commit_every_n=auto_commit_every_n,
- auto_commit_every_t=auto_commit_every_t)
+ super(SimpleConsumer, self).__init__(
+ client, group, topic,
+ partitions=partitions,
+ auto_commit=auto_commit,
+ auto_commit_every_n=auto_commit_every_n,
+ auto_commit_every_t=auto_commit_every_t)
def provide_partition_info(self):
"""
@@ -275,8 +276,8 @@ class SimpleConsumer(Consumer):
resps = self.client.send_offset_request(reqs)
for resp in resps:
- self.offsets[resp.partition] = resp.offsets[0] + \
- deltas[resp.partition]
+ self.offsets[resp.partition] = \
+ resp.offsets[0] + deltas[resp.partition]
else:
raise ValueError("Unexpected value for `whence`, %d" % whence)
@@ -364,9 +365,10 @@ class SimpleConsumer(Consumer):
req = FetchRequest(
self.topic, partition, offset, self.client.bufsize)
- (resp,) = self.client.send_fetch_request([req],
- max_wait_time=self.fetch_max_wait_time,
- min_bytes=fetch_size)
+ (resp,) = self.client.send_fetch_request(
+ [req],
+ max_wait_time=self.fetch_max_wait_time,
+ min_bytes=fetch_size)
assert resp.topic == self.topic
assert resp.partition == partition
@@ -376,18 +378,22 @@ class SimpleConsumer(Consumer):
for message in resp.messages:
next_offset = message.offset
- # update the offset before the message is yielded. This is
- # so that the consumer state is not lost in certain cases.
- # For eg: the message is yielded and consumed by the caller,
- # but the caller does not come back into the generator again.
- # The message will be consumed but the status will not be
- # updated in the consumer
+ # update the offset before the message is yielded. This
+ # is so that the consumer state is not lost in certain
+ # cases.
+ #
+ # For eg: the message is yielded and consumed by the
+ # caller, but the caller does not come back into the
+ # generator again. The message will be consumed but the
+ # status will not be updated in the consumer
self.fetch_started[partition] = True
self.offsets[partition] = message.offset
yield message
except ConsumerFetchSizeTooSmall, e:
- log.warn("Fetch size is too small, increasing by 1.5x and retrying")
fetch_size *= 1.5
+ log.warn(
+ "Fetch size too small, increasing to %d (1.5x) and retry",
+ fetch_size)
continue
except ConsumerNoMoreData, e:
log.debug("Iteration was ended by %r", e)
@@ -429,11 +435,12 @@ class MultiProcessConsumer(Consumer):
num_procs=1, partitions_per_proc=0):
# Initiate the base consumer class
- super(MultiProcessConsumer, self).__init__(client, group, topic,
- partitions=None,
- auto_commit=auto_commit,
- auto_commit_every_n=auto_commit_every_n,
- auto_commit_every_t=auto_commit_every_t)
+ super(MultiProcessConsumer, self).__init__(
+ client, group, topic,
+ partitions=None,
+ auto_commit=auto_commit,
+ auto_commit_every_n=auto_commit_every_n,
+ auto_commit_every_t=auto_commit_every_t)
# Variables for managing and controlling the data flow from
# consumer child process to master