diff options
author | mrtheb <mrlabbe@gmail.com> | 2013-10-03 22:52:04 -0400 |
---|---|---|
committer | mrtheb <mrlabbe@gmail.com> | 2013-10-03 22:52:04 -0400 |
commit | a03f0c86b8a504c0e3185cac1611131dba24f625 (patch) | |
tree | 3797524d3411640968292c6eba0141fc4c1f3457 /kafka/consumer.py | |
parent | b0cacc948539d180e4a634a06a10232770deb187 (diff) | |
download | kafka-python-a03f0c86b8a504c0e3185cac1611131dba24f625.tar.gz |
flake8 pass (pep8 and pyflakes)
Diffstat (limited to 'kafka/consumer.py')
-rw-r--r-- | kafka/consumer.py | 53 |
1 files changed, 30 insertions, 23 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py index a97e8c0..7d44f28 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -8,7 +8,7 @@ from Queue import Empty from kafka.common import ( ErrorMapping, FetchRequest, - OffsetRequest, OffsetFetchRequest, OffsetCommitRequest, + OffsetRequest, OffsetCommitRequest, ConsumerFetchSizeTooSmall, ConsumerNoMoreData ) @@ -223,11 +223,12 @@ class SimpleConsumer(Consumer): self.fetch_min_bytes = fetch_size_bytes self.fetch_started = defaultdict(bool) # defaults to false - super(SimpleConsumer, self).__init__(client, group, topic, - partitions=partitions, - auto_commit=auto_commit, - auto_commit_every_n=auto_commit_every_n, - auto_commit_every_t=auto_commit_every_t) + super(SimpleConsumer, self).__init__( + client, group, topic, + partitions=partitions, + auto_commit=auto_commit, + auto_commit_every_n=auto_commit_every_n, + auto_commit_every_t=auto_commit_every_t) def provide_partition_info(self): """ @@ -275,8 +276,8 @@ class SimpleConsumer(Consumer): resps = self.client.send_offset_request(reqs) for resp in resps: - self.offsets[resp.partition] = resp.offsets[0] + \ - deltas[resp.partition] + self.offsets[resp.partition] = \ + resp.offsets[0] + deltas[resp.partition] else: raise ValueError("Unexpected value for `whence`, %d" % whence) @@ -364,9 +365,10 @@ class SimpleConsumer(Consumer): req = FetchRequest( self.topic, partition, offset, self.client.bufsize) - (resp,) = self.client.send_fetch_request([req], - max_wait_time=self.fetch_max_wait_time, - min_bytes=fetch_size) + (resp,) = self.client.send_fetch_request( + [req], + max_wait_time=self.fetch_max_wait_time, + min_bytes=fetch_size) assert resp.topic == self.topic assert resp.partition == partition @@ -376,18 +378,22 @@ class SimpleConsumer(Consumer): for message in resp.messages: next_offset = message.offset - # update the offset before the message is yielded. This is - # so that the consumer state is not lost in certain cases. - # For eg: the message is yielded and consumed by the caller, - # but the caller does not come back into the generator again. - # The message will be consumed but the status will not be - # updated in the consumer + # update the offset before the message is yielded. This + # is so that the consumer state is not lost in certain + # cases. + # + # For eg: the message is yielded and consumed by the + # caller, but the caller does not come back into the + # generator again. The message will be consumed but the + # status will not be updated in the consumer self.fetch_started[partition] = True self.offsets[partition] = message.offset yield message except ConsumerFetchSizeTooSmall, e: - log.warn("Fetch size is too small, increasing by 1.5x and retrying") fetch_size *= 1.5 + log.warn( + "Fetch size too small, increasing to %d (1.5x) and retry", + fetch_size) continue except ConsumerNoMoreData, e: log.debug("Iteration was ended by %r", e) @@ -429,11 +435,12 @@ class MultiProcessConsumer(Consumer): num_procs=1, partitions_per_proc=0): # Initiate the base consumer class - super(MultiProcessConsumer, self).__init__(client, group, topic, - partitions=None, - auto_commit=auto_commit, - auto_commit_every_n=auto_commit_every_n, - auto_commit_every_t=auto_commit_every_t) + super(MultiProcessConsumer, self).__init__( + client, group, topic, + partitions=None, + auto_commit=auto_commit, + auto_commit_every_n=auto_commit_every_n, + auto_commit_every_t=auto_commit_every_t) # Variables for managing and controlling the data flow from # consumer child process to master |