summaryrefslogtreecommitdiff
path: root/kafka/queue.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/queue.py')
-rw-r--r--kafka/queue.py19
1 files changed, 11 insertions, 8 deletions
diff --git a/kafka/queue.py b/kafka/queue.py
index 41f1c31..a996369 100644
--- a/kafka/queue.py
+++ b/kafka/queue.py
@@ -25,8 +25,9 @@ class KafkaConsumerProcess(Process):
Process.__init__(self)
def __str__(self):
- return "[KafkaConsumerProcess: topic=%s, partition=%s, sleep=%s]" % \
- (self.topic, self.partition, self.consumer_sleep)
+ return "[KafkaConsumerProcess: topic=%s, \
+ partition=%s, sleep=%s]" % \
+ (self.topic, self.partition, self.consumer_sleep)
def run(self):
self.barrier.wait()
@@ -70,10 +71,12 @@ class KafkaProducerProcess(Process):
Process.__init__(self)
def __str__(self):
- return "[KafkaProducerProcess: topic=%s, flush_buffer=%s, \
- flush_timeout=%s, timeout=%s]" % (
- self.topic, self.producer_flush_buffer,
- self.producer_flush_timeout, self.producer_timeout)
+ return "[KafkaProducerProcess: topic=%s, \
+ flush_buffer=%s, flush_timeout=%s, timeout=%s]" % \
+ (self.topic,
+ self.producer_flush_buffer,
+ self.producer_flush_timeout,
+ self.producer_timeout)
def run(self):
self.barrier.wait()
@@ -104,8 +107,8 @@ class KafkaProducerProcess(Process):
last_produce = time.time()
try:
- msg = KafkaClient.create_message(self.in_queue.get(True,
- self.producer_timeout))
+ msg = KafkaClient.create_message(
+ self.in_queue.get(True, self.producer_timeout))
messages.append(msg)
except Empty: