diff options
Diffstat (limited to 'kafka/queue.py')
-rw-r--r-- | kafka/queue.py | 96 |
1 files changed, 65 insertions, 31 deletions
diff --git a/kafka/queue.py b/kafka/queue.py index 6fe9eaa..3bd7dca 100644 --- a/kafka/queue.py +++ b/kafka/queue.py @@ -10,8 +10,10 @@ log = logging.getLogger("kafka") raise NotImplementedError("Still need to refactor this class") + class KafkaConsumerProcess(Process): - def __init__(self, client, topic, partition, out_queue, barrier, consumer_fetch_size=1024, consumer_sleep=200): + def __init__(self, client, topic, partition, out_queue, barrier, + consumer_fetch_size=1024, consumer_sleep=200): self.client = copy(client) self.topic = topic self.partition = partition @@ -23,29 +25,40 @@ class KafkaConsumerProcess(Process): Process.__init__(self) def __str__(self): - return "[KafkaConsumerProcess: topic=%s, partition=%s, sleep=%s]" % ( - self.topic, self.partition, self.consumer_sleep) + return "[KafkaConsumerProcess: topic=%s, partition=%s, sleep=%s]" % \ + (self.topic, self.partition, self.consumer_sleep) def run(self): self.barrier.wait() log.info("Starting %s" % self) - fetchRequest = FetchRequest(self.topic, self.partition, offset=0, size=self.consumer_fetch_size) + fetchRequest = FetchRequest(self.topic, self.partition, + offset=0, size=self.consumer_fetch_size) + while True: - if self.barrier.is_set() == False: + if self.barrier.is_set() is False: log.info("Shutdown %s" % self) self.client.close() break + lastOffset = fetchRequest.offset (messages, fetchRequest) = self.client.get_message_set(fetchRequest) + if fetchRequest.offset == lastOffset: - log.debug("No more data for this partition, sleeping a bit (200ms)") + log.debug("No more data for this partition, " + "sleeping a bit (200ms)") time.sleep(self.consumer_sleep) continue + for message in messages: self.out_queue.put(message) + class KafkaProducerProcess(Process): - def __init__(self, client, topic, in_queue, barrier, producer_flush_buffer=500, producer_flush_timeout=2000, producer_timeout=100): + def __init__(self, client, topic, in_queue, barrier, + producer_flush_buffer=500, + producer_flush_timeout=2000, + producer_timeout=100): + self.client = copy(client) self.topic = topic self.in_queue = in_queue @@ -57,8 +70,10 @@ class KafkaProducerProcess(Process): Process.__init__(self) def __str__(self): - return "[KafkaProducerProcess: topic=%s, flush_buffer=%s, flush_timeout=%s, timeout=%s]" % ( - self.topic, self.producer_flush_buffer, self.producer_flush_timeout, self.producer_timeout) + return "[KafkaProducerProcess: topic=%s, flush_buffer=%s, \ + flush_timeout=%s, timeout=%s]" % ( + self.topic, self.producer_flush_buffer, + self.producer_flush_timeout, self.producer_timeout) def run(self): self.barrier.wait() @@ -67,36 +82,47 @@ class KafkaProducerProcess(Process): last_produce = time.time() def flush(messages): - self.client.send_message_set(ProduceRequest(self.topic, -1, messages)) + self.client.send_message_set(ProduceRequest(self.topic, -1, + messages)) del messages[:] while True: - if self.barrier.is_set() == False: + if self.barrier.is_set() is False: log.info("Shutdown %s, flushing messages" % self) flush(messages) self.client.close() break + if len(messages) > self.producer_flush_buffer: - log.debug("Message count threashold reached. Flushing messages") + log.debug("Message count threshold reached. Flushing messages") flush(messages) last_produce = time.time() + elif (time.time() - last_produce) > self.producer_flush_timeout: log.debug("Producer timeout reached. Flushing messages") flush(messages) last_produce = time.time() + try: - messages.append(KafkaClient.create_message(self.in_queue.get(True, self.producer_timeout))) + msg = KafkaClient.create_message(self.in_queue.get(True, + self.producer_timeout)) + messages.append(msg) + except Empty: continue + class KafkaQueue(object): - def __init__(self, client, topic, partitions, producer_config={}, consumer_config={}): + def __init__(self, client, topic, partitions, + producer_config=None, consumer_config=None): """ - KafkaQueue a Queue-like object backed by a Kafka producer and some number of consumers + KafkaQueue a Queue-like object backed by a Kafka producer and some + number of consumers - Messages are eagerly loaded by the consumer in batches of size consumer_fetch_size. - Messages are buffered in the producer thread until producer_flush_timeout or - producer_flush_buffer is reached. + Messages are eagerly loaded by the consumer in batches of size + consumer_fetch_size. + Messages are buffered in the producer thread until + producer_flush_timeout or producer_flush_buffer is reached. Params ====== @@ -108,21 +134,26 @@ class KafkaQueue(object): Consumer Config =============== - consumer_fetch_size: int, number of bytes to fetch in one call to Kafka. Default - is 1024 - consumer_sleep: int, time in milliseconds a consumer should sleep when it reaches - the end of a partition. Default is 200 + consumer_fetch_size: int, number of bytes to fetch in one call + to Kafka. Default is 1024 + consumer_sleep: int, time in milliseconds a consumer should sleep + when it reaches the end of a partition. Default is 200 Producer Config =============== - producer_timeout: int, time in milliseconds a producer should wait for messages to - enqueue for producing. Default is 100 - producer_flush_timeout: int, time in milliseconds a producer should allow messages - to accumulate before sending to Kafka. Default is 2000 - producer_flush_buffer: int, number of messages a producer should allow to accumulate. - Default is 500 - + producer_timeout: int, time in milliseconds a producer should + wait for messages to enqueue for producing. + Default is 100 + producer_flush_timeout: int, time in milliseconds a producer + should allow messages to accumulate before + sending to Kafka. Default is 2000 + producer_flush_buffer: int, number of messages a producer should + allow to accumulate. Default is 500 + """ + producer_config = {} if producer_config is None else producer_config + consumer_config = {} if consumer_config is None else consumer_config + self.in_queue = Queue() self.out_queue = Queue() self.consumers = [] @@ -130,12 +161,15 @@ class KafkaQueue(object): # Initialize and start consumer threads for partition in partitions: - consumer = KafkaConsumerProcess(client, topic, partition, self.in_queue, self.barrier, **consumer_config) + consumer = KafkaConsumerProcess(client, topic, partition, + self.in_queue, self.barrier, + **consumer_config) consumer.start() self.consumers.append(consumer) # Initialize and start producer thread - self.producer = KafkaProducerProcess(client, topic, self.out_queue, self.barrier, **producer_config) + self.producer = KafkaProducerProcess(client, topic, self.out_queue, + self.barrier, **producer_config) self.producer.start() # Trigger everything to start |