diff options
author | David Arthur <mumrah@gmail.com> | 2015-02-03 19:44:03 -0500 |
---|---|---|
committer | David Arthur <mumrah@gmail.com> | 2015-02-03 19:45:22 -0500 |
commit | a4219999926b11d059382d17bd7f495b362c517c (patch) | |
tree | 7b191e0f083df578418ce0e1e3dea23a089db113 | |
parent | 93e625ddd4af6e5a1551990defb6a1b809bafed1 (diff) | |
download | kafka-python-a4219999926b11d059382d17bd7f495b362c517c.tar.gz |
Removing queue.py
It's just collecting dust and throwing off the coverage report.
I pushed a branch queue-fixup in case someone wants to take a stab at
refactoring it.
-rw-r--r-- | kafka/queue.py | 215 |
1 files changed, 0 insertions, 215 deletions
diff --git a/kafka/queue.py b/kafka/queue.py deleted file mode 100644 index 26cafad..0000000 --- a/kafka/queue.py +++ /dev/null @@ -1,215 +0,0 @@ -from __future__ import absolute_import - -from copy import copy -import logging -from multiprocessing import Process, Queue, Event -from Queue import Empty -import time - -from kafka.client import KafkaClient, FetchRequest, ProduceRequest - -log = logging.getLogger("kafka") - -raise NotImplementedError("Still need to refactor this class") - - -class KafkaConsumerProcess(Process): - def __init__(self, client, topic, partition, out_queue, barrier, - consumer_fetch_size=1024, consumer_sleep=200): - self.client = copy(client) - self.topic = topic - self.partition = partition - self.out_queue = out_queue - self.barrier = barrier - self.consumer_fetch_size = consumer_fetch_size - self.consumer_sleep = consumer_sleep / 1000. - log.info("Initializing %s" % self) - Process.__init__(self) - - def __str__(self): - return "[KafkaConsumerProcess: topic=%s, \ - partition=%s, sleep=%s]" % \ - (self.topic, self.partition, self.consumer_sleep) - - def run(self): - self.barrier.wait() - log.info("Starting %s" % self) - fetchRequest = FetchRequest(self.topic, self.partition, - offset=0, size=self.consumer_fetch_size) - - while True: - if self.barrier.is_set() is False: - log.info("Shutdown %s" % self) - self.client.close() - break - - lastOffset = fetchRequest.offset - (messages, fetchRequest) = self.client.get_message_set(fetchRequest) - - if fetchRequest.offset == lastOffset: - log.debug("No more data for this partition, " - "sleeping a bit (200ms)") - time.sleep(self.consumer_sleep) - continue - - for message in messages: - self.out_queue.put(message) - - -class KafkaProducerProcess(Process): - def __init__(self, client, topic, in_queue, barrier, - producer_flush_buffer=500, - producer_flush_timeout=2000, - producer_timeout=100): - - self.client = copy(client) - self.topic = topic - self.in_queue = in_queue - self.barrier = barrier - self.producer_flush_buffer = producer_flush_buffer - self.producer_flush_timeout = producer_flush_timeout / 1000. - self.producer_timeout = producer_timeout / 1000. - log.info("Initializing %s" % self) - Process.__init__(self) - - def __str__(self): - return "[KafkaProducerProcess: topic=%s, \ - flush_buffer=%s, flush_timeout=%s, timeout=%s]" % \ - (self.topic, - self.producer_flush_buffer, - self.producer_flush_timeout, - self.producer_timeout) - - def run(self): - self.barrier.wait() - log.info("Starting %s" % self) - messages = [] - last_produce = time.time() - - def flush(messages): - self.client.send_message_set(ProduceRequest(self.topic, -1, - messages)) - del messages[:] - - while True: - if self.barrier.is_set() is False: - log.info("Shutdown %s, flushing messages" % self) - flush(messages) - self.client.close() - break - - if len(messages) > self.producer_flush_buffer: - log.debug("Message count threshold reached. Flushing messages") - flush(messages) - last_produce = time.time() - - elif (time.time() - last_produce) > self.producer_flush_timeout: - log.debug("Producer timeout reached. Flushing messages") - flush(messages) - last_produce = time.time() - - try: - msg = KafkaClient.create_message( - self.in_queue.get(True, self.producer_timeout)) - messages.append(msg) - - except Empty: - continue - - -class KafkaQueue(object): - def __init__(self, client, topic, partitions, - producer_config=None, consumer_config=None): - """ - KafkaQueue a Queue-like object backed by a Kafka producer and some - number of consumers - - Messages are eagerly loaded by the consumer in batches of size - consumer_fetch_size. - Messages are buffered in the producer thread until - producer_flush_timeout or producer_flush_buffer is reached. - - Arguments: - client: KafkaClient object - topic: str, the topic name - partitions: list of ints, the partions to consume from - producer_config: dict, see below - consumer_config: dict, see below - - Consumer Config - =============== - consumer_fetch_size: int, number of bytes to fetch in one call - to Kafka. Default is 1024 - consumer_sleep: int, time in milliseconds a consumer should sleep - when it reaches the end of a partition. Default is 200 - - Producer Config - =============== - producer_timeout: int, time in milliseconds a producer should - wait for messages to enqueue for producing. - Default is 100 - producer_flush_timeout: int, time in milliseconds a producer - should allow messages to accumulate before - sending to Kafka. Default is 2000 - producer_flush_buffer: int, number of messages a producer should - allow to accumulate. Default is 500 - - """ - producer_config = {} if producer_config is None else producer_config - consumer_config = {} if consumer_config is None else consumer_config - - self.in_queue = Queue() - self.out_queue = Queue() - self.consumers = [] - self.barrier = Event() - - # Initialize and start consumer threads - for partition in partitions: - consumer = KafkaConsumerProcess(client, topic, partition, - self.in_queue, self.barrier, - **consumer_config) - consumer.start() - self.consumers.append(consumer) - - # Initialize and start producer thread - self.producer = KafkaProducerProcess(client, topic, self.out_queue, - self.barrier, **producer_config) - self.producer.start() - - # Trigger everything to start - self.barrier.set() - - def get(self, block=True, timeout=None): - """ - Consume a message from Kafka - - Arguments: - block: boolean, default True - timeout: int, number of seconds to wait when blocking, default None - - Returns: - msg: str, the payload from Kafka - """ - return self.in_queue.get(block, timeout).payload - - def put(self, msg, block=True, timeout=None): - """ - Send a message to Kafka - - Arguments: - msg: std, the message to send - block: boolean, default True - timeout: int, number of seconds to wait when blocking, default None - """ - self.out_queue.put(msg, block, timeout) - - def close(self): - """ - Close the internal queues and Kafka consumers/producer - """ - self.in_queue.close() - self.out_queue.close() - self.barrier.clear() - self.producer.join() - for consumer in self.consumers: - consumer.join() |